Commit 2a7e3aaf authored by MaksimSmile13's avatar MaksimSmile13 Committed by Dmitry Volodin
Browse files

TGsender added file sending if message length> 4096 / 2

parent 92081cae
......@@ -629,3 +629,24 @@ def parse_table_header(v):
header[num] = " ".join(["".join(s).strip(" -") for s in head.transpose().tolist()])
header[num] = header[num].strip()
return header
def split_text(text: str, max_chunk: int) -> Iterable[str]:
"""
Split text by splitline if len > max_chunk
:param text:
:param max_chunk:
:return: Iterable[str]
"""
size = 0
result = []
for line in text.splitlines():
if size + len(line) <= max_chunk:
result.append(line)
size = size + len(line)
else:
size = 0
yield "\n".join(result)
result = [line]
else:
yield "\n".join(result)
......@@ -11,7 +11,7 @@ import re
import orjson
import requests
import time
from urllib.parse import urlencode
from io import StringIO
# NOC modules
from noc.core.service.fastapi import FastAPIService
......@@ -20,6 +20,7 @@ from noc.core.mx import MX_TO
from noc.core.perf import metrics
from noc.config import config
from noc.core.comp import smart_text
from noc.core.text import split_text
API = "https://api.telegram.org/bot"
TGSENDER_STREAM = "tgsender"
......@@ -33,7 +34,7 @@ class TgSenderService(FastAPIService):
self.logger.info("No token defined")
self.url = None
else:
self.url = API + config.tgsender.token
self.url = f"{API}{config.tgsender.token}"
self.slot_number, self.total_slots = await self.acquire_slot()
await self.subscribe_stream(TGSENDER_STREAM, self.slot_number, self.on_message)
......@@ -46,10 +47,10 @@ class TgSenderService(FastAPIService):
:return:
"""
metrics["messages"] += 1
self.logger.debug("[%d] Receiving message %s", msg.offset, msg.headers)
self.logger.debug(f"[{msg.offset}] Receiving message {msg.headers}")
dst = msg.headers.get(MX_TO)
if not dst:
self.logger.debug("[%d] Missed '%s' header. Dropping", msg.offset, MX_TO)
self.logger.debug(f"[{msg.offset}] Missed '{MX_TO}' header. Dropping")
metrics["messages_drops"] += 1
return
metrics["messages_processed"] += 1
......@@ -62,50 +63,78 @@ class TgSenderService(FastAPIService):
return re.sub(r"([%s])" % escape_chars, r"\\\1", text)
def send_tb(self, topic: str, data: str) -> None:
sendMessage = {
body_l = 3000
file_size = 5e7 # 50Mb
t_type = "/sendMessage"
subject = self.escape_markdown(smart_text(data["subject"], errors="ignore"))
body = self.escape_markdown(smart_text(data["body"], errors="ignore"))
send = {
"chat_id": data["address"],
"text": "*"
+ self.escape_markdown(smart_text(data["subject"], errors="ignore"))
+ "*\n"
+ self.escape_markdown(smart_text(data["body"], errors="ignore")),
"text": f"\* {subject} \*\*\n\n {body}",
"parse_mode": "Markdown",
}
# Text of the message to be sent, 1-4096 characters after entities parsing
# Check, if len (body)
# If len(body) > 4096 use /sendDocument
# Bots can currently send files of any type of up to 50 MB in size
if len(body) > body_l:
caption = f"\* {subject} \*\*\n\n {body[0:500]}..."
t_type = "/sendDocument"
time.sleep(config.tgsender.retry_timeout)
if self.url:
get = self.url + "/sendMessage?" + urlencode(sendMessage)
self.logger.info("HTTP GET %s", "/sendMessage?" + urlencode(sendMessage))
url = "".join([self.url, t_type])
proxy = {}
if config.tgsender.use_proxy and config.tgsender.proxy_address:
self.logger.info("USE PROXY %s", config.tgsender.proxy_address)
self.logger.info(f"USE PROXY {config.tgsender.proxy_address}")
proxy = {"https": config.tgsender.proxy_address}
try:
response = requests.get(get, proxies=proxy)
if t_type == "/sendMessage":
self.logger.info("Send Message")
response = requests.post(url, send, proxies=proxy)
else:
self.logger.info("Send Document")
buf = StringIO()
for part, text in enumerate(split_text(body, file_size)):
part = part + 1
buf.write(text)
buf.seek(0)
buf.name = f"part_{part}.txt"
if part > 1:
caption = None
response = requests.post(
url,
{"chat_id": data["address"], "caption": caption},
proxies=proxy,
files={"document": buf},
)
buf = StringIO()
buf.close()
if proxy:
self.logger.info("Proxy Send: %s\n" % response.json())
self.logger.info(f"Proxy Send: {response.json()}\n")
metrics["telegram_proxy_proxy_ok"] += 1
else:
self.logger.info("Send: %s\n" % response.json())
self.logger.info(f"Send: {response.json()}\n")
metrics["telegram_sended_ok"] += 1
except requests.HTTPError as error:
self.logger.error("Http Error:", error)
self.logger.error(f"Http Error: {error}")
if proxy:
metrics["telegram_proxy_failed_httperror"] += 1
else:
metrics["telegram_failed_httperror"] += 1
except requests.ConnectionError as error:
self.logger.error("Error Connecting:", error)
self.logger.error(f"Error Connecting: {error}")
if proxy:
metrics["telegram_proxy_failed_connection"] += 1
else:
metrics["telegram_failed_connection"] += 1
except requests.Timeout as error:
self.logger.error("Timeout Error:", error)
self.logger.error(f"Timeout Error: {error}")
if proxy:
metrics["telegram_proxy_failed_timeout"] += 1
else:
metrics["telegram_failed_timeout"] += 1
except requests.RequestException as error:
self.logger.error("OOps: Something Else", error)
self.logger.error(f"OOps: Something Else {error}")
if proxy:
metrics["telegram_proxy_failed_else_error"] += 1
else:
......
......@@ -25,6 +25,7 @@ from noc.core.text import (
clean_number,
safe_shadow,
ch_escape,
split_text,
)
......@@ -471,3 +472,22 @@ def test_safe_shadow(config, expected):
)
def test_ch_escape(config, expected):
assert ch_escape(config) == expected
@pytest.mark.parametrize(
"config, max_chunk, expected",
[
(
"sssssssssssssssssss\naaaaaaaaaaaaaaaaaaaaa\nsdasdasdasdsadasdasdsad",
500,
["sssssssssssssssssss\naaaaaaaaaaaaaaaaaaaaa\nsdasdasdasdsadasdasdsad"],
),
(
"sssssssssssssssssss\naaaaaaaaaaaaaaaaaaaaa\nsdasdasdasdsadasdasdsad",
50,
["sssssssssssssssssss\naaaaaaaaaaaaaaaaaaaaa", "sdasdasdasdsadasdasdsad"],
),
],
)
def test_split_text(config, max_chunk, expected):
assert list(split_text(config, max_chunk=max_chunk)) == expected
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment