diff --git a/core/service/base.py b/core/service/base.py index 82a3969126fca1ad7d1671e6c8d4e5a2cb9e94e8..f1bb0429b2b888a126d69290228cef52915b4a1c 100644 --- a/core/service/base.py +++ b/core/service/base.py @@ -53,7 +53,8 @@ from noc.core.liftbridge.queuebuffer import QBuffer from noc.core.liftbridge.message import Message from noc.core.ioloop.util import setup_asyncio from noc.core.ioloop.timers import PeriodicCallback -from noc.core.mx import send_message +from noc.core.mx import MX_STREAM, get_mx_partitions, MX_MESSAGE_TYPE, MX_SHARDING_KEY +from noc.core.comp import smart_bytes from .rpc import RPCProxy from .loader import set_service @@ -928,14 +929,20 @@ class BaseService(object): stream=f"ch.{table}", partition=key % self.n_metrics_partitions, data=metrics ) # Mirror to MX - if config.message.enable_metrics and ( - not self.mx_metrics_scopes or table in self.mx_metrics_scopes + if ( + config.message.enable_metrics + and self.mx_metrics_scopes + and table in self.mx_metrics_scopes ): - send_message( - data=[self.mx_metrics_scopes[table](m) for m in metrics], - message_type="metrics", - headers={}, - sharding_key=key % self.mx_partitions, + n_partitions = get_mx_partitions() + self.publish( + value=orjson.dumps([self.mx_metrics_scopes[table](m) for m in metrics]), + stream=MX_STREAM, + partition=key % n_partitions, + headers={ + MX_MESSAGE_TYPE: b"metrics", + MX_SHARDING_KEY: smart_bytes(key), + }, ) def start_telemetry_callback(self) -> None: