From a798461448fdf0b29b6d916106bb1a40fd170916 Mon Sep 17 00:00:00 2001 From: Andrey Vertiprahov Date: Thu, 17 Jun 2021 19:24:50 +0500 Subject: [PATCH] Mirror to MX metrics use service pub. --- core/service/base.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/core/service/base.py b/core/service/base.py index 82a3969126..f1bb0429b2 100644 --- a/core/service/base.py +++ b/core/service/base.py @@ -53,7 +53,8 @@ from noc.core.liftbridge.queuebuffer import QBuffer from noc.core.liftbridge.message import Message from noc.core.ioloop.util import setup_asyncio from noc.core.ioloop.timers import PeriodicCallback -from noc.core.mx import send_message +from noc.core.mx import MX_STREAM, get_mx_partitions, MX_MESSAGE_TYPE, MX_SHARDING_KEY +from noc.core.comp import smart_bytes from .rpc import RPCProxy from .loader import set_service @@ -928,14 +929,20 @@ class BaseService(object): stream=f"ch.{table}", partition=key % self.n_metrics_partitions, data=metrics ) # Mirror to MX - if config.message.enable_metrics and ( - not self.mx_metrics_scopes or table in self.mx_metrics_scopes + if ( + config.message.enable_metrics + and self.mx_metrics_scopes + and table in self.mx_metrics_scopes ): - send_message( - data=[self.mx_metrics_scopes[table](m) for m in metrics], - message_type="metrics", - headers={}, - sharding_key=key % self.mx_partitions, + n_partitions = get_mx_partitions() + self.publish( + value=orjson.dumps([self.mx_metrics_scopes[table](m) for m in metrics]), + stream=MX_STREAM, + partition=key % n_partitions, + headers={ + MX_MESSAGE_TYPE: b"metrics", + MX_SHARDING_KEY: smart_bytes(key), + }, ) def start_telemetry_callback(self) -> None: -- GitLab