From 78ee0edec29fb4b37a40a9c060236e4202cf3aba Mon Sep 17 00:00:00 2001 From: Andrey Vertiprahov Date: Tue, 20 Sep 2022 10:03:35 +0500 Subject: [PATCH 1/9] Move MX Router to core. --- core/mx.py | 10 +++ {services/mx => core}/router/__init__.py | 0 {services/mx => core}/router/action.py | 18 +--- .../router/router.py => core/router/base.py | 87 +++++++++++++++++-- {services/mx => core}/router/route.py | 55 ++---------- main/models/messageroute.py | 42 ++++++++- services/datastream/streams/cfgmxroute.py | 36 +------- services/mx/service.py | 58 +------------ 8 files changed, 148 insertions(+), 158 deletions(-) rename {services/mx => core}/router/__init__.py (100%) rename {services/mx => core}/router/action.py (88%) rename services/mx/router/router.py => core/router/base.py (57%) rename {services/mx => core}/router/route.py (81%) diff --git a/core/mx.py b/core/mx.py index fb34caeb69..9628cfb1d9 100644 --- a/core/mx.py +++ b/core/mx.py @@ -8,6 +8,7 @@ # Python modules from typing import Any, Optional, Dict from threading import Lock +from dataclasses import dataclass # Third-party modules import orjson @@ -18,6 +19,15 @@ from noc.core.comp import DEFAULT_ENCODING from noc.core.liftbridge.base import LiftBridgeClient from noc.core.ioloop.util import run_sync + +@dataclass +class Message(object): + value: bytes + headers: Dict[str, bytes] + timestamp: int + key: bytes + + # MX stream name MX_STREAM = "message" # Headers diff --git a/services/mx/router/__init__.py b/core/router/__init__.py similarity index 100% rename from services/mx/router/__init__.py rename to core/router/__init__.py diff --git a/services/mx/router/action.py b/core/router/action.py similarity index 88% rename from services/mx/router/action.py rename to core/router/action.py index 6016671077..53f5119879 100644 --- a/services/mx/router/action.py +++ b/core/router/action.py @@ -14,11 +14,8 @@ import orjson # NOC modules from noc.core.liftbridge.message import Message -from noc.main.models.messageroute import MessageRoute from noc.core.comp import DEFAULT_ENCODING from noc.core.mx import MX_MESSAGE_TYPE, NOTIFICATION_METHODS -from noc.main.models.notificationgroup import NotificationGroup -from noc.main.models.template import Template DROP = "" @@ -57,19 +54,9 @@ class Action(object, metaclass=ActionBase): def __init__(self, cfg: ActionCfg): self.headers: Dict[str, bytes] = { - h.header: h.value.encode(encoding=DEFAULT_ENCODING) for h in cfg.headers + h.header: h.value.encode(encoding=DEFAULT_ENCODING) for h in cfg.headers or [] } - @classmethod - def from_action(cls, mroute: MessageRoute) -> "Action": - global ACTION_TYPES - - return ACTION_TYPES[mroute.type]( - ActionCfg( - type=mroute.type, stream=mroute.stream, notification_group=mroute.notification_group - ) - ) - @classmethod def from_data(cls, data): global ACTION_TYPES @@ -117,6 +104,9 @@ class NotificationAction(Action): name = "notification" def __init__(self, cfg: ActionCfg): + from noc.main.models.notificationgroup import NotificationGroup + from noc.main.models.template import Template + super().__init__(cfg) self.ng: NotificationGroup = NotificationGroup.get_by_id(cfg.notification_group) self.rt: Template = Template.get_by_id(cfg.render_template) diff --git a/services/mx/router/router.py b/core/router/base.py similarity index 57% rename from services/mx/router/router.py rename to core/router/base.py index a626ffbec8..3d7eedb1dd 100644 --- a/services/mx/router/router.py +++ b/core/router/base.py @@ -1,7 +1,7 @@ # ---------------------------------------------------------------------- # Router # ---------------------------------------------------------------------- -# Copyright (C) 2007-2020 The NOC Project +# Copyright (C) 2007-2022 The NOC Project # See LICENSE for details # ---------------------------------------------------------------------- @@ -10,13 +10,19 @@ import logging import operator from collections import defaultdict from typing import List, DefaultDict, Iterator, Dict, Iterable, Optional +from functools import partial + +# Third-party modules +import orjson # NOC modules -from noc.core.liftbridge.message import Message -from noc.core.mx import MX_MESSAGE_TYPE -from noc.main.models.messageroute import MessageRoute +from noc.core.mx import MX_MESSAGE_TYPE, MX_SHARDING_KEY, Message +from noc.core.service.loader import get_service from noc.core.comp import DEFAULT_ENCODING +from noc.core.perf import metrics +from noc.core.ioloop.util import run_sync from .route import Route, DefaultNotificationRoute +from .action import DROP, DUMP logger = logging.getLogger(__name__) @@ -26,17 +32,23 @@ class Router(object): self.chains: DefaultDict[bytes, List[Route]] = defaultdict(list) self.routes: Dict[str, Route] = {} self.default_route: Optional[DefaultNotificationRoute] = DefaultNotificationRoute() + self.stream_partitions: Dict[str, int] = {} + self.svc = get_service() def load(self): """ Load up all the rules and populate the chains :return: """ + from noc.main.models.messageroute import MessageRoute + num = 0 for num, route in enumerate( MessageRoute.objects.filter(is_active=True).order_by("order"), start=1 ): - self.chains[route.type.encode(encoding=DEFAULT_ENCODING)] += [Route.from_route(route)] + self.chains[route.type.encode(encoding=DEFAULT_ENCODING)] += [ + Route.from_data(route.get_route_config()) + ] logger.info("Loading %s route", num) def has_route(self, route_id: str) -> bool: @@ -139,3 +151,68 @@ class Router(object): for route in self.chains[mt]: if route.is_match(msg): yield route + + def route_sync(self, msg: Message): + """ + Synchronize method + :param msg: + :return: + """ + run_sync(partial(self.route_message, msg)) + + async def route_message(self, msg: Message, msg_id: Optional[str] = None): + """ + Route message by rule + :param msg: + :param msg_id: + :return: + """ + # Apply routes + for route in self.iter_route(msg): + metrics["route_hits"] += 1 + logger.debug("[%d] Applying route %s", msg_id, route.name) + # Apply actions + routed: bool = False + for stream, action_headers, body in route.iter_action(msg): + metrics["action_hits"] += 1 + # Fameless drop + if stream == DROP: + metrics["action_drops"] += 1 + logger.debug("[%s] Dropped. Stopping processing", msg_id) + return + elif stream == DUMP: + logger.info( + "[%s] Dump. Message headers: %s;\n-----\n Body: %s \n----\n ", + msg_id, + msg.headers, + msg.value, + ) + continue + # Build resulting headers + headers = {} + headers.update(msg.headers) + if action_headers: + headers.update(action_headers) + # Determine sharding channel + sharding_key = int(headers.get(MX_SHARDING_KEY, b"0")) + partitions = self.stream_partitions.get(stream) + if not partitions: + # Request amount of partitions + partitions = await self.svc.get_stream_partitions(stream) + self.stream_partitions[stream] = partitions + partition = sharding_key % partitions + # Single message may be transmuted in zero or more messages + body = route.transmute(headers, body) + # for body in route.iter_transmute(headers, msg.value): + if not isinstance(body, bytes): + # Transmute converts message to an arbitrary structure, + # so convert back to the json + body = orjson.dumps(body) + metrics[("forwards", ("stream", f"{stream}:{partition}"))] += 1 + logger.debug("[%s] Routing to %s:%s", msg_id, stream, partition) + self.svc.publish(value=body, stream=stream, partition=partition, headers=headers) + routed = True + if not routed: + logger.debug("[%d] Not routed", msg_id) + metrics["route_misses"] += 1 + # logger.debug("[%s] Finish processing", msg_id) diff --git a/services/mx/router/route.py b/core/router/route.py similarity index 81% rename from services/mx/router/route.py rename to core/router/route.py index e52cc2009b..3dd44e3c4e 100644 --- a/services/mx/router/route.py +++ b/core/router/route.py @@ -26,9 +26,6 @@ from noc.core.mx import ( MX_MESSAGE_TYPE, NOTIFICATION_METHODS, ) -from noc.main.models.messageroute import MessageRoute -from noc.main.models.template import Template -from noc.main.models.handler import Handler from .action import Action T_BODY = Union[bytes, Any] @@ -57,7 +54,7 @@ class TransmuteTemplate(object): @dataclass -class HeaderItem(object): +class HeaderMatchItem(object): header: str op: Literal["==", "!=", "regex"] value: str @@ -83,7 +80,7 @@ class MatchItem(object): labels: Optional[List[str]] = None exclude_labels: Optional[List[str]] = None administrative_domain: Optional[int] = None - headers: Optional[List[HeaderItem]] = None + headers: Optional[List[HeaderMatchItem]] = None @classmethod def from_data(cls, data: List[Dict[str, Any]]) -> List["MatchItem"]: @@ -95,32 +92,13 @@ class MatchItem(object): exclude_labels=match["exclude_labels"], administrative_domain=match.get("administrative_domain"), headers=[ - HeaderItem(header=h["header"], op=h["op"], value=h["value"]) + HeaderMatchItem(header=h["header"], op=h["op"], value=h["value"]) for h in match["headers"] ], ) ] return r - @classmethod - def from_route(cls, route: MessageRoute) -> List["MatchItem"]: - r = [] - for match in route.match: - r += [ - MatchItem( - labels=match.labels, - exclude_labels=match.exclude_labels, - administrative_domain=match.administrative_domain.id - if match.administrative_domain - else None, - headers=[ - HeaderItem(header=h.header, op=h.op, value=h.value) - for h in match.headers_match - ], - ) - ] - return r - class Route(object): """ @@ -136,7 +114,6 @@ class Route(object): self.actions: List[Action] = [] self.transmute_handler: Optional[Callable[[Dict[str, bytes], T_BODY], T_BODY]] = None self.transmute_template: Optional[TransmuteTemplate] = None - self.render_template: Optional[Template] = None def is_match(self, msg: Message) -> bool: """ @@ -192,6 +169,9 @@ class Route(object): return True def update(self, data): + from noc.main.models.template import Template + from noc.main.models.handler import Handler + self.match_co = self.compile_match(MatchItem.from_data(data["match"])) # Compile transmute part # r.transmutations = [Transmutation.from_transmute(t) for t in route.transmute] @@ -200,8 +180,6 @@ class Route(object): if "transmute_template" in data: template = Template.objects.get(id=data["transmute_template"]) self.transmute_template = TransmuteTemplate(JTemplate(template.body)) - if "render_template" in data: - self.render_template = data["render_template"] # Compile action part self.actions = [Action.from_data(data)] @@ -265,27 +243,6 @@ class Route(object): r.update(data) return r - @classmethod - def from_route(cls, route: MessageRoute) -> "Route": - """ - Build Route from database config - :param route: - :return: - """ - r = Route(route.name, route.type, route.order) - r.match_co = cls.compile_match(MatchItem.from_route(route)) - # Compile transmute part - # r.transmutations = [Transmutation.from_transmute(t) for t in route.transmute] - r.transmute_handler = route.transmute_handler - if route.transmute_template: - r.render_template = RenderTemplate( - subject_template=route.transmute_template.subject, - body_template=route.transmute_template.body, - ) - # Compile action part - r.actions = [Action.from_action(route)] - return r - class DefaultNotificationRoute(Route): """ diff --git a/main/models/messageroute.py b/main/models/messageroute.py index 27ad88b15e..9940bde286 100644 --- a/main/models/messageroute.py +++ b/main/models/messageroute.py @@ -88,7 +88,7 @@ class MessageRoute(Document): transmute_handler = PlainReferenceField(Handler) transmute_template = ForeignKeyField(Template) # Message actions - action = StringField(choices=["drop", "stream", "notification"], default="notification") + action = StringField(choices=["drop", "dump", "stream", "notification"], default="notification") stream = StringField() notification_group = ForeignKeyField(NotificationGroup) render_template = ForeignKeyField(Template) @@ -116,3 +116,43 @@ class MessageRoute(Document): {"notification_group": "For 'notification' action NotificationGroup must be set"} ) super().clean() + + def get_route_config(self): + """ + Return data for configured Router + :return: + """ + r = { + "name": self.name, + "type": self.type, + "order": self.order, + "action": self.action, + "match": [], + } + if self.stream: + r["stream"] = self.stream + if self.headers: + r["headers"] = [{"header": m.header, "value": m.value} for m in self.headers] + if self.notification_group: + r["notification_group"] = str(self.notification_group.id) + if self.render_template: + r["render_template"] = str(self.render_template.id) + if self.transmute_template: + r["transmute_template"] = str(self.transmute_template.id) + if self.transmute_handler: + r["transmute_handler"] = str(self.transmute_handler.id) + for match in self.match: + r["match"] += [ + { + "labels": match.labels, + "exclude_labels": match.exclude_labels, + "administrative_domain": str(match.administrative_domain.id) + if match.administrative_domain + else None, + "headers": [ + {"header": m.header, "op": m.op, "value": m.value} + for m in match.headers_match + ], + } + ] + return r diff --git a/services/datastream/streams/cfgmxroute.py b/services/datastream/streams/cfgmxroute.py index 1399c4a1df..c99ab55304 100644 --- a/services/datastream/streams/cfgmxroute.py +++ b/services/datastream/streams/cfgmxroute.py @@ -21,38 +21,6 @@ class CfgMetricsCollectorDataStream(DataStream): route: "MessageRoute" = MessageRoute.objects.get(id=oid) if not route or not route.is_active: raise KeyError() - r = { - "id": str(route.id), - "name": route.name, - "type": route.type, - "order": route.order, - "action": route.action, - "match": [], - } - if route.stream: - r["stream"] = route.stream - if route.headers: - r["headers"] = [{"header": m.header, "value": m.value} for m in route.headers] - if route.notification_group: - r["notification_group"] = str(route.notification_group.id) - if route.render_template: - r["render_template"] = str(route.render_template.id) - if route.transmute_template: - r["transmute_template"] = str(route.transmute_template.id) - if route.transmute_handler: - r["transmute_handler"] = str(route.transmute_handler.id) - for match in route.match: - r["match"] += [ - { - "labels": match.labels, - "exclude_labels": match.exclude_labels, - "administrative_domain": str(match.administrative_domain.id) - if match.administrative_domain - else None, - "headers": [ - {"header": m.header, "op": m.op, "value": m.value} - for m in match.headers_match - ], - } - ] + r = route.get_route_config() + r["id"] = str(route.id) return r diff --git a/services/mx/service.py b/services/mx/service.py index ef36bd823a..e8c09d33b8 100755 --- a/services/mx/service.py +++ b/services/mx/service.py @@ -2,7 +2,7 @@ # ---------------------------------------------------------------------- # mx service # ---------------------------------------------------------------------- -# Copyright (C) 2007-2021 The NOC Project +# Copyright (C) 2007-2022 The NOC Project # See LICENSE for details # ---------------------------------------------------------------------- @@ -10,18 +10,13 @@ import asyncio from typing import Dict, Any -# Third-party modules -import orjson - # NOC modules from noc.core.service.fastapi import FastAPIService from noc.core.error import NOCError from noc.core.mx import MX_STREAM from noc.config import config from noc.core.liftbridge.message import Message -from noc.core.mx import MX_SHARDING_KEY -from noc.services.mx.router.router import Router -from noc.services.mx.router.action import DROP, DUMP +from noc.core.router.base import Router from noc.core.perf import metrics from noc.services.mx.datastream import RouteDataStreamClient @@ -39,7 +34,6 @@ class MXService(FastAPIService): self.slot_number = 0 self.total_slots = 0 self.router = Router() - self.stream_partitions: Dict[str, int] = {} async def init_api(self): # Postpone initialization process until config datastream is fully processed @@ -84,53 +78,7 @@ class MXService(FastAPIService): metrics["messages"] += 1 # Apply routes self.logger.debug("[%d] Receiving message %s", msg.offset, msg.headers) - for route in self.router.iter_route(msg): - metrics["route_hits"] += 1 - self.logger.debug("[%d] Applying route %s", msg.offset, route.name) - # Apply actions - routed: bool = False - for stream, action_headers, body in route.iter_action(msg): - metrics["action_hits"] += 1 - # Fameless drop - if stream == DROP: - metrics["action_drops"] += 1 - self.logger.debug("[%s] Dropped. Stopping processing", msg.offset) - return - elif stream == DUMP: - self.logger.info( - "[%s] Dump. Message headers: %s;\n-----\n Body: %s \n----\n ", - msg.offset, - msg.headers, - msg.value, - ) - continue - # Build resulting headers - headers = {} - headers.update(msg.headers) - if action_headers: - headers.update(action_headers) - # Determine sharding channel - sharding_key = int(headers.get(MX_SHARDING_KEY, b"0")) - partitions = self.stream_partitions.get(stream) - if not partitions: - # Request amount of partitions - partitions = await self.get_stream_partitions(stream) - self.stream_partitions[stream] = partitions - partition = sharding_key % partitions - # Single message may be transmuted in zero or more messages - body = route.transmute(headers, body) - # for body in route.iter_transmute(headers, msg.value): - if not isinstance(body, bytes): - # Transmute converts message to an arbitrary structure, - # so convert back to the json - body = orjson.dumps(body) - metrics[("forwards", f"{stream}:{partition}")] += 1 - self.logger.debug("[%s] Routing to %s:%s", msg.offset, stream, partition) - self.publish(value=body, stream=stream, partition=partition, headers=headers) - routed = True - if not routed: - self.logger.debug("[%d] Not routed", msg.offset) - metrics["route_misses"] += 1 + await self.router.route_message(msg) self.logger.debug("[%s] Finish processing", msg.offset) -- GitLab From d43926bb4f188b72d363c40c386756621d297318 Mon Sep 17 00:00:00 2001 From: Andrey Vertiprahov Date: Thu, 29 Sep 2022 19:23:14 +0500 Subject: [PATCH 2/9] Add router to Base Service. --- core/mx.py | 15 +++++++------- core/router/base.py | 49 ++++++++++++++++++++++++++++++++++++++++++-- core/service/base.py | 43 +++++++++++++++++++++++++++++++++----- 3 files changed, 93 insertions(+), 14 deletions(-) diff --git a/core/mx.py b/core/mx.py index 9628cfb1d9..f06e66804a 100644 --- a/core/mx.py +++ b/core/mx.py @@ -96,13 +96,14 @@ def send_message( if headers: msg_headers.update(headers) svc = get_service() - n_partitions = get_mx_partitions() - svc.publish( - value=orjson.dumps(data), - stream=MX_STREAM, - partition=sharding_key % n_partitions, - headers=msg_headers, - ) + svc.send_message(data, message_type, headers, sharding_key) + # n_partitions = get_mx_partitions() + # svc.publish( + # value=orjson.dumps(data), + # stream=MX_STREAM, + # partition=sharding_key % n_partitions, + # headers=msg_headers, + # ) def get_mx_partitions() -> int: diff --git a/core/router/base.py b/core/router/base.py index 3d7eedb1dd..d6989a8407 100644 --- a/core/router/base.py +++ b/core/router/base.py @@ -8,6 +8,7 @@ # Python modules import logging import operator +from time import time_ns from collections import defaultdict from typing import List, DefaultDict, Iterator, Dict, Iterable, Optional from functools import partial @@ -21,6 +22,7 @@ from noc.core.service.loader import get_service from noc.core.comp import DEFAULT_ENCODING from noc.core.perf import metrics from noc.core.ioloop.util import run_sync +from noc.core.liftbridge.queuebuffer import QBuffer from .route import Route, DefaultNotificationRoute from .action import DROP, DUMP @@ -34,6 +36,7 @@ class Router(object): self.default_route: Optional[DefaultNotificationRoute] = DefaultNotificationRoute() self.stream_partitions: Dict[str, int] = {} self.svc = get_service() + self.out_queue: Optional[QBuffer] = None def load(self): """ @@ -152,6 +155,19 @@ class Router(object): if route.is_match(msg): yield route + async def publish( + self, + value: bytes, + stream: str, + partition: Optional[int] = None, + key: Optional[bytes] = None, + headers: Optional[Dict[str, bytes]] = None, + ): + if self.out_queue: + self.out_queue.put(stream, partition, data=value) + else: + self.svc.publish(value=body, stream=stream, partition=partition, headers=headers) + def route_sync(self, msg: Message): """ Synchronize method @@ -160,6 +176,35 @@ class Router(object): """ run_sync(partial(self.route_message, msg)) + @staticmethod + def get_message( + data: Any, + message_type: str, + headers: Optional[Dict[str, bytes]] = None, + sharding_key: int = 0, + ) -> Message: + """ + Build message + + :param data: Data for transmit + :param message_type: Message type + :param headers: additional message headers + :param sharding_key: Key for sharding + :return: + """ + msg_headers = { + MX_MESSAGE_TYPE: message_type.encode(DEFAULT_ENCODING), + MX_SHARDING_KEY: str(sharding_key).encode(DEFAULT_ENCODING), + } + if headers: + msg_headers.update(headers) + return Message( + value=orjson.dumps(data), + headers=msg_headers, + timestamp=time_ns, + key=sharding_key, + ) + async def route_message(self, msg: Message, msg_id: Optional[str] = None): """ Route message by rule @@ -210,9 +255,9 @@ class Router(object): body = orjson.dumps(body) metrics[("forwards", ("stream", f"{stream}:{partition}"))] += 1 logger.debug("[%s] Routing to %s:%s", msg_id, stream, partition) - self.svc.publish(value=body, stream=stream, partition=partition, headers=headers) + await self.publish(value=body, stream=stream, partition=partition, headers=headers) routed = True if not routed: logger.debug("[%d] Not routed", msg_id) - metrics["route_misses"] += 1 + metrics["route_misses", ("message_type",)] += 1 # logger.debug("[%s] Finish processing", msg_id) diff --git a/core/service/base.py b/core/service/base.py index 2fe240bc4f..0ea4655556 100644 --- a/core/service/base.py +++ b/core/service/base.py @@ -48,6 +48,7 @@ from noc.core.liftbridge.error import LiftbridgeError from noc.core.liftbridge.queue import LiftBridgeQueue from noc.core.liftbridge.queuebuffer import QBuffer from noc.core.liftbridge.message import Message +from noc.core.router.base import Router from noc.core.ioloop.util import setup_asyncio from noc.core.ioloop.timers import PeriodicCallback from noc.core.mx import MX_STREAM, get_mx_partitions, MX_MESSAGE_TYPE @@ -122,6 +123,8 @@ class BaseService(object): self.startup_ts = None self.telemetry_callback = None self.dcs = None + # Message routed + self.router: Optional[Router] = None # Effective address and port self.address = None self.port = None @@ -790,11 +793,14 @@ class BaseService(object): and self.mx_metrics_scopes and table in self.mx_metrics_scopes ): - n_partitions = get_mx_partitions() - self.mx_metrics_queue.put( - stream=MX_STREAM, - partition=key % n_partitions, - data=[self.mx_metrics_scopes[table](m) for m in metrics], + # n_partitions = get_mx_partitions() + # self.mx_metrics_queue.put( + # stream=MX_STREAM, + # partition=key % n_partitions, + # data=[self.mx_metrics_scopes[table](m) for m in metrics], + # ) + self.send_message( + data=[self.mx_metrics_scopes[table](m) for m in metrics], message_type="metrics" ) # self.publish( # value=orjson.dumps([self.mx_metrics_scopes[table](m) for m in metrics]), @@ -824,6 +830,33 @@ class BaseService(object): if spans: self.register_metrics("span", [span_to_dict(s) for s in spans]) + def initialize_router(self) -> None: + """ + + :return: + """ + self.router = Router() + + async def send_message( + self, + data: Any, + message_type: str, + headers: Optional[Dict[str, bytes]] = None, + sharding_key: int = 0, + ): + """ + Build message and schedule to send to mx service + + :param data: Data for transmit + :param message_type: Message type + :param headers: additional message headers + :param sharding_key: Key for sharding over MX services + :return: + """ + + msg = self.router.get_message(data, message_type, headers, sharding_key) + await self.router.route_message(msg) + def get_leader_lock_name(self): if self.leader_lock_name: return self.leader_lock_name % {"pool": config.pool} -- GitLab From 5d6eee348c8ed28cd2d6213e48e08a599ae14d67 Mon Sep 17 00:00:00 2001 From: Andrey Vertiprahov Date: Fri, 7 Oct 2022 23:07:03 +0500 Subject: [PATCH 3/9] Add MetricAction. --- core/mx.py | 5 ++++- core/router/action.py | 30 +++++++++++++++++++++++++++++- core/router/base.py | 19 ++++++++++--------- main/models/messageroute.py | 4 ++++ 4 files changed, 47 insertions(+), 11 deletions(-) diff --git a/core/mx.py b/core/mx.py index f06e66804a..7cc67602bb 100644 --- a/core/mx.py +++ b/core/mx.py @@ -9,6 +9,7 @@ from typing import Any, Optional, Dict from threading import Lock from dataclasses import dataclass +from functools import partial # Third-party modules import orjson @@ -30,6 +31,8 @@ class Message(object): # MX stream name MX_STREAM = "message" +MX_METRICS_TYPE = "metrics" +MX_METRICS_SCOPE = "scope" # Headers MX_MESSAGE_TYPE = "Message-Type" MX_SHARDING_KEY = "Sharding-Key" @@ -96,7 +99,7 @@ def send_message( if headers: msg_headers.update(headers) svc = get_service() - svc.send_message(data, message_type, headers, sharding_key) + run_sync(partial(svc.send_message, data, message_type, headers, sharding_key)) # n_partitions = get_mx_partitions() # svc.publish( # value=orjson.dumps(data), diff --git a/core/router/action.py b/core/router/action.py index 53f5119879..ae49424b41 100644 --- a/core/router/action.py +++ b/core/router/action.py @@ -15,7 +15,8 @@ import orjson # NOC modules from noc.core.liftbridge.message import Message from noc.core.comp import DEFAULT_ENCODING -from noc.core.mx import MX_MESSAGE_TYPE, NOTIFICATION_METHODS +from noc.core.mx import MX_MESSAGE_TYPE, NOTIFICATION_METHODS, MX_METRICS_SCOPE +from noc.config import config DROP = "" @@ -118,3 +119,30 @@ class NotificationAction(Action): yield NOTIFICATION_METHODS[method], header, self.ng.render_message( mt, orjson.loads(msg.value), render_template ) if render_template else body + + +class MetricAction(Action): + name = "metrics" + + def __init__(self, cfg: ActionCfg): + super().__init__(cfg) + self.stream: str = cfg.stream + self.mx_metrics_scopes = {} + self.load_handlers() + + def load_handlers(self): + from noc.main.models.metricstream import MetricStream + + for mss in MetricStream.objects.filter(): + if mss.is_active and mss.scope.table_name in set( + config.message.enable_metric_scopes + ): + self.mx_metrics_scopes[mss.scope.table_name.encode(DEFAULT_ENCODING)] = mss.to_mx + + def iter_action(self, msg: Message) -> Iterator[Tuple[str, Dict[str, bytes], bytes]]: + table = msg.headers.get(MX_METRICS_SCOPE) + if table not in self.mx_metrics_scopes: + return + for value in msg.value.split(b"\n"): + value = self.mx_metrics_scopes[table](orjson.loads(value)) + yield self.stream, self.headers, value diff --git a/core/router/base.py b/core/router/base.py index d6989a8407..a8c12c297d 100644 --- a/core/router/base.py +++ b/core/router/base.py @@ -10,7 +10,7 @@ import logging import operator from time import time_ns from collections import defaultdict -from typing import List, DefaultDict, Iterator, Dict, Iterable, Optional +from typing import List, DefaultDict, Iterator, Dict, Iterable, Optional, Any from functools import partial # Third-party modules @@ -22,7 +22,6 @@ from noc.core.service.loader import get_service from noc.core.comp import DEFAULT_ENCODING from noc.core.perf import metrics from noc.core.ioloop.util import run_sync -from noc.core.liftbridge.queuebuffer import QBuffer from .route import Route, DefaultNotificationRoute from .action import DROP, DUMP @@ -36,7 +35,7 @@ class Router(object): self.default_route: Optional[DefaultNotificationRoute] = DefaultNotificationRoute() self.stream_partitions: Dict[str, int] = {} self.svc = get_service() - self.out_queue: Optional[QBuffer] = None + # self.out_queue: Optional[QBuffer] = None def load(self): """ @@ -163,10 +162,10 @@ class Router(object): key: Optional[bytes] = None, headers: Optional[Dict[str, bytes]] = None, ): - if self.out_queue: - self.out_queue.put(stream, partition, data=value) - else: - self.svc.publish(value=body, stream=stream, partition=partition, headers=headers) + # if self.out_queue: + # self.out_queue.put(stream, partition, data=value) + #else: + self.svc.publish(value=value, stream=stream, partition=partition, headers=headers) def route_sync(self, msg: Message): """ @@ -198,10 +197,12 @@ class Router(object): } if headers: msg_headers.update(headers) + if not isinstance(data, bytes): + data = orjson.dumps(data) return Message( - value=orjson.dumps(data), + value=data, headers=msg_headers, - timestamp=time_ns, + timestamp=time_ns(), key=sharding_key, ) diff --git a/main/models/messageroute.py b/main/models/messageroute.py index 9940bde286..80f368d053 100644 --- a/main/models/messageroute.py +++ b/main/models/messageroute.py @@ -109,6 +109,8 @@ class MessageRoute(Document): return self.name def clean(self): + if self.type == "metrics" and self.action == "notification": + raise ValidationError({"action": "For type 'metric' Notification is not allowed"}) if self.action == "stream" and not self.stream: raise ValidationError({"stream": "For 'stream' action Stream must be set"}) elif self.action == "notification" and not self.notification_group: @@ -131,6 +133,8 @@ class MessageRoute(Document): } if self.stream: r["stream"] = self.stream + if self.type == "metrics" and self.action == "stream": + r["action"] = "metrics" if self.headers: r["headers"] = [{"header": m.header, "value": m.value} for m in self.headers] if self.notification_group: -- GitLab From 6fb081cf3ccaa076cee8f9ebda8dd90697658d79 Mon Sep 17 00:00:00 2001 From: Andrey Vertiprahov Date: Fri, 7 Oct 2022 23:10:35 +0500 Subject: [PATCH 4/9] Fix black. --- core/router/action.py | 4 +-- core/router/base.py | 2 +- core/service/base.py | 58 +++++++++++-------------------------------- 3 files changed, 17 insertions(+), 47 deletions(-) diff --git a/core/router/action.py b/core/router/action.py index ae49424b41..ca76f011bf 100644 --- a/core/router/action.py +++ b/core/router/action.py @@ -134,9 +134,7 @@ class MetricAction(Action): from noc.main.models.metricstream import MetricStream for mss in MetricStream.objects.filter(): - if mss.is_active and mss.scope.table_name in set( - config.message.enable_metric_scopes - ): + if mss.is_active and mss.scope.table_name in set(config.message.enable_metric_scopes): self.mx_metrics_scopes[mss.scope.table_name.encode(DEFAULT_ENCODING)] = mss.to_mx def iter_action(self, msg: Message) -> Iterator[Tuple[str, Dict[str, bytes], bytes]]: diff --git a/core/router/base.py b/core/router/base.py index a8c12c297d..8ae9def438 100644 --- a/core/router/base.py +++ b/core/router/base.py @@ -164,7 +164,7 @@ class Router(object): ): # if self.out_queue: # self.out_queue.put(stream, partition, data=value) - #else: + # else: self.svc.publish(value=value, stream=stream, partition=partition, headers=headers) def route_sync(self, msg: Message): diff --git a/core/service/base.py b/core/service/base.py index 0ea4655556..b56936b6e9 100644 --- a/core/service/base.py +++ b/core/service/base.py @@ -27,6 +27,7 @@ from typing import ( TypeVar, NoReturn, Awaitable, + Set, ) # Third-party modules @@ -51,7 +52,7 @@ from noc.core.liftbridge.message import Message from noc.core.router.base import Router from noc.core.ioloop.util import setup_asyncio from noc.core.ioloop.timers import PeriodicCallback -from noc.core.mx import MX_STREAM, get_mx_partitions, MX_MESSAGE_TYPE +from noc.core.mx import MX_METRICS_SCOPE, MX_METRICS_TYPE from .rpc import RPCProxy from .loader import set_service @@ -137,8 +138,7 @@ class BaseService(object): # Metrics publisher buffer self.metrics_queue: Optional[QBuffer] = None # MX metrics publisher buffer - self.mx_metrics_queue: Optional[QBuffer] = None - self.mx_metrics_scopes: Dict[str, Callable] = {} + self.mx_metrics_scopes: Set[str] = set(config.message.enable_metric_scopes) self.mx_partitions: int = 0 # self.active_subscribers = 0 @@ -646,20 +646,6 @@ class BaseService(object): self.metrics_queue = QBuffer(max_size=config.liftbridge.max_message_size) self.loop.create_task(self.publisher()) self.loop.create_task(self.publish_metrics(self.metrics_queue)) - if config.message.enable_metrics and self.use_mongo: - from noc.main.models.metricstream import MetricStream - - for mss in MetricStream.objects.filter(): - if mss.is_active and mss.scope.table_name in set( - config.message.enable_metric_scopes - ): - self.mx_metrics_scopes[mss.scope.table_name] = mss.to_mx - self.mx_metrics_queue = QBuffer(max_size=config.liftbridge.max_message_size) - self.loop.create_task( - self.publish_metrics( - self.mx_metrics_queue, headers={MX_MESSAGE_TYPE: b"metrics"} - ) - ) def publish( self, @@ -762,6 +748,13 @@ class BaseService(object): t0 = perf_counter() for stream, partititon, chunk in queue.iter_slice(): self.publish(chunk, stream=stream, partition=partititon, headers=headers) + table_name = stream.split(".")[1] + if config.message.enable_metrics and table_name in self.mx_metrics_scopes: + await self.send_message( + data=chunk, + message_type=MX_METRICS_TYPE, + headers={MX_METRICS_SCOPE: table_name.encode(encoding="utf-8")}, + ) if not self.publish_queue.to_shutdown: to_sleep = config.liftbridge.metrics_send_delay - (perf_counter() - t0) if to_sleep > 0: @@ -787,30 +780,6 @@ class BaseService(object): self.metrics_queue.put( stream=f"ch.{table}", partition=key % self.n_metrics_partitions, data=metrics ) - # Mirror to MX - if ( - config.message.enable_metrics - and self.mx_metrics_scopes - and table in self.mx_metrics_scopes - ): - # n_partitions = get_mx_partitions() - # self.mx_metrics_queue.put( - # stream=MX_STREAM, - # partition=key % n_partitions, - # data=[self.mx_metrics_scopes[table](m) for m in metrics], - # ) - self.send_message( - data=[self.mx_metrics_scopes[table](m) for m in metrics], message_type="metrics" - ) - # self.publish( - # value=orjson.dumps([self.mx_metrics_scopes[table](m) for m in metrics]), - # stream=MX_STREAM, - # partition=key % n_partitions, - # headers={ - # MX_MESSAGE_TYPE: b"metrics", - # MX_SHARDING_KEY: smart_bytes(key), - # }, - # ) def start_telemetry_callback(self) -> None: """ @@ -830,12 +799,13 @@ class BaseService(object): if spans: self.register_metrics("span", [span_to_dict(s) for s in spans]) - def initialize_router(self) -> None: + def _initialize_router(self) -> None: """ :return: """ self.router = Router() + self.router.load() async def send_message( self, @@ -853,8 +823,10 @@ class BaseService(object): :param sharding_key: Key for sharding over MX services :return: """ - + if not self.router: + self._initialize_router() msg = self.router.get_message(data, message_type, headers, sharding_key) + self.logger.info("Send message: %s", msg) await self.router.route_message(msg) def get_leader_lock_name(self): -- GitLab From d5b1efde96b74b76fc7a20d733aded4631445438 Mon Sep 17 00:00:00 2001 From: Andrey Vertiprahov Date: Fri, 7 Oct 2022 23:17:07 +0500 Subject: [PATCH 5/9] Fix flake. --- core/mx.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/mx.py b/core/mx.py index 7cc67602bb..97229511a6 100644 --- a/core/mx.py +++ b/core/mx.py @@ -11,9 +11,6 @@ from threading import Lock from dataclasses import dataclass from functools import partial -# Third-party modules -import orjson - # NOC services from noc.core.service.loader import get_service from noc.core.comp import DEFAULT_ENCODING -- GitLab From 550c6eae354858e294ec75369445f54a0d4da5fa Mon Sep 17 00:00:00 2001 From: Andrey Vertiprahov Date: Sun, 9 Oct 2022 22:07:06 +0500 Subject: [PATCH 6/9] Fix match header in route. --- core/router/action.py | 6 +++--- core/router/route.py | 8 +++++--- core/service/base.py | 2 +- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/core/router/action.py b/core/router/action.py index ca76f011bf..408dd0b135 100644 --- a/core/router/action.py +++ b/core/router/action.py @@ -141,6 +141,6 @@ class MetricAction(Action): table = msg.headers.get(MX_METRICS_SCOPE) if table not in self.mx_metrics_scopes: return - for value in msg.value.split(b"\n"): - value = self.mx_metrics_scopes[table](orjson.loads(value)) - yield self.stream, self.headers, value + yield self.stream, self.headers, [ + self.mx_metrics_scopes[table](orjson.loads(v)) for v in msg.value.split(b"\n") + ] diff --git a/core/router/route.py b/core/router/route.py index 3dd44e3c4e..b833c99384 100644 --- a/core/router/route.py +++ b/core/router/route.py @@ -214,15 +214,17 @@ class Route(object): for header in match_eq: if len(match_eq[header]) == 1: # == - expr += [f"headers[{header!r}] == {match_eq[header][0]!r}"] + expr += [ + f"{header!r} in headers and headers[{header!r}] == {match_eq[header][0]!r}" + ] else: # in expr += [ - f'headers[{header!r}] in ({", ".join("%r" % x for x in match_eq[header])!r})' + f'{header!r} in headers and headers[{header!r}] in ({", ".join("%r" % x for x in match_eq[header])!r})' ] # Expression for != for header, value in match_ne: - expr += [f"headers[{header!r}] != {value!r}"] + expr += [f"{header!r} in headers and headers[{header!r}] != {value!r}"] # Expression for regex # @todo # Compile matching code diff --git a/core/service/base.py b/core/service/base.py index b56936b6e9..5833af3e64 100644 --- a/core/service/base.py +++ b/core/service/base.py @@ -826,7 +826,7 @@ class BaseService(object): if not self.router: self._initialize_router() msg = self.router.get_message(data, message_type, headers, sharding_key) - self.logger.info("Send message: %s", msg) + self.logger.debug("Send message: %s", msg) await self.router.route_message(msg) def get_leader_lock_name(self): -- GitLab From 76fd7fac4ee507c69f2979ddca831484e9715eea Mon Sep 17 00:00:00 2001 From: Andrey Vertiprahov Date: Tue, 11 Oct 2022 21:32:08 +0500 Subject: [PATCH 7/9] Add embedded_router and use_router settings. --- config.py | 4 ++ core/mx.py | 2 +- {services/mx => core/router}/datastream.py | 0 core/service/base.py | 46 +++++++++++++++++----- services/discovery/service.py | 1 + services/escalator/service.py | 1 + services/mx/service.py | 29 +------------- services/nbi/service.py | 2 + services/scheduler/service.py | 1 + services/web/service.py | 2 + services/worker/service.py | 1 + 11 files changed, 51 insertions(+), 38 deletions(-) rename {services/mx => core/router}/datastream.py (100%) diff --git a/config.py b/config.py index 9d8dbac81c..6080717dd8 100644 --- a/config.py +++ b/config.py @@ -498,6 +498,10 @@ class Config(BaseConfig): # enable_diagnostic_change = BooleanParameter(default=False) # + embedded_router = BooleanParameter( + default=True, help="Use embedded process router for sending message" + ) + # ds_limit = IntParameter(default=1000) class mongo(ConfigSection): diff --git a/core/mx.py b/core/mx.py index 97229511a6..4be8d44f37 100644 --- a/core/mx.py +++ b/core/mx.py @@ -23,7 +23,7 @@ class Message(object): value: bytes headers: Dict[str, bytes] timestamp: int - key: bytes + key: int # MX stream name diff --git a/services/mx/datastream.py b/core/router/datastream.py similarity index 100% rename from services/mx/datastream.py rename to core/router/datastream.py diff --git a/core/service/base.py b/core/service/base.py index 5833af3e64..3023f57b2b 100644 --- a/core/service/base.py +++ b/core/service/base.py @@ -52,9 +52,11 @@ from noc.core.liftbridge.message import Message from noc.core.router.base import Router from noc.core.ioloop.util import setup_asyncio from noc.core.ioloop.timers import PeriodicCallback -from noc.core.mx import MX_METRICS_SCOPE, MX_METRICS_TYPE +from noc.core.error import NOCError +from noc.core.mx import MX_METRICS_SCOPE, MX_METRICS_TYPE, MX_STREAM from .rpc import RPCProxy from .loader import set_service +from ..router.datastream import RouteDataStreamClient T = TypeVar("T") @@ -96,6 +98,8 @@ class BaseService(object): # Usually means resolution error to required services # temporary leads service to unhealthy state require_dcs_health = True + # Use embedded router for messages + use_router = False LOG_FORMAT = config.log_format @@ -393,7 +397,11 @@ class BaseService(object): await self.init_api() # - if config.message.enable_metrics: + if config.message.embedded_router and self.use_router: + self.router = Router() + self.router.load() + asyncio.get_running_loop().create_task(self.get_mx_routes_config()) + if not config.message.embedded_router and config.message.enable_metrics: self.mx_partitions = await self.get_stream_partitions("message") # if self.use_telemetry: @@ -799,13 +807,25 @@ class BaseService(object): if spans: self.register_metrics("span", [span_to_dict(s) for s in spans]) - def _initialize_router(self) -> None: + async def get_mx_routes_config(self): """ - - :return: + Subscribe and track datastream changes """ - self.router = Router() - self.router.load() + client = RouteDataStreamClient("cfgmxroute", service=self) + # Track stream changes + while True: + self.logger.info("Starting to track MX route settings") + try: + await client.query(limit=config.message.ds_limit, block=True) + except NOCError as e: + self.logger.info("Failed to get MX route settings: %s", e) + await asyncio.sleep(1) + + async def update_route(self, data: Dict[str, Any]) -> None: + self.router.change_route(data) + + async def delete_route(self, r_id: str) -> None: + self.router.delete_route(r_id) async def send_message( self, @@ -823,11 +843,17 @@ class BaseService(object): :param sharding_key: Key for sharding over MX services :return: """ - if not self.router: - self._initialize_router() msg = self.router.get_message(data, message_type, headers, sharding_key) self.logger.debug("Send message: %s", msg) - await self.router.route_message(msg) + if self.router and config.message.embedded_router: + await self.router.route_message(msg) + else: + self.publish( + value=msg.value, + stream=MX_STREAM, + partition=sharding_key % self.mx_partitions, + headers=msg.headers, + ) def get_leader_lock_name(self): if self.leader_lock_name: diff --git a/services/discovery/service.py b/services/discovery/service.py index d9bcf6729e..207d72acd1 100755 --- a/services/discovery/service.py +++ b/services/discovery/service.py @@ -16,6 +16,7 @@ class DiscoveryService(FastAPIService): name = "discovery" pooled = True use_mongo = True + use_router = True process_name = "noc-%(name).10s-%(pool).5s" def __init__(self): diff --git a/services/escalator/service.py b/services/escalator/service.py index cd08ab42e8..65b3064148 100755 --- a/services/escalator/service.py +++ b/services/escalator/service.py @@ -23,6 +23,7 @@ class EscalatorService(FastAPIService): leader_lock_name = "escalator" use_telemetry = True use_mongo = True + use_router = True def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) diff --git a/services/mx/service.py b/services/mx/service.py index e8c09d33b8..980ec93769 100755 --- a/services/mx/service.py +++ b/services/mx/service.py @@ -8,22 +8,19 @@ # Python modules import asyncio -from typing import Dict, Any # NOC modules from noc.core.service.fastapi import FastAPIService -from noc.core.error import NOCError from noc.core.mx import MX_STREAM from noc.config import config from noc.core.liftbridge.message import Message -from noc.core.router.base import Router from noc.core.perf import metrics -from noc.services.mx.datastream import RouteDataStreamClient class MXService(FastAPIService): name = "mx" use_mongo = True + use_router = True if config.features.traefik: traefik_backend = "mx" @@ -33,31 +30,15 @@ class MXService(FastAPIService): super().__init__() self.slot_number = 0 self.total_slots = 0 - self.router = Router() async def init_api(self): # Postpone initialization process until config datastream is fully processed self.ready_event = asyncio.Event() - asyncio.get_running_loop().create_task(self.get_mx_routes_config()) # Set by datastream.on_ready await self.ready_event.wait() # Process as usual await super().init_api() - async def get_mx_routes_config(self): - """ - Subscribe and track datastream changes - """ - client = RouteDataStreamClient("cfgmxroute", service=self) - # Track stream changes - while True: - self.logger.info("Starting to track MX route settings") - try: - await client.query(limit=config.message.ds_limit, block=True) - except NOCError as e: - self.logger.info("Failed to get MX route settings: %s", e) - await asyncio.sleep(1) - async def on_ready(self) -> None: # Pass further initialization self.ready_event.set() @@ -68,17 +49,11 @@ class MXService(FastAPIService): self.slot_number, self.total_slots = await self.acquire_slot() await self.subscribe_stream(MX_STREAM, self.slot_number, self.on_message, async_cursor=True) - async def update_route(self, data: Dict[str, Any]) -> None: - self.router.change_route(data) - - async def delete_route(self, r_id: str) -> None: - self.router.delete_route(r_id) - async def on_message(self, msg: Message) -> None: metrics["messages"] += 1 # Apply routes self.logger.debug("[%d] Receiving message %s", msg.offset, msg.headers) - await self.router.route_message(msg) + await self.router.route_message(msg, msg_id=msg.offset) self.logger.debug("[%s] Finish processing", msg.offset) diff --git a/services/nbi/service.py b/services/nbi/service.py index 9c1666086d..5bfc522472 100755 --- a/services/nbi/service.py +++ b/services/nbi/service.py @@ -20,6 +20,8 @@ PREFIX_NBI = "/api/nbi/" class NBIService(FastAPIService): name = "nbi" use_mongo = True + use_router = True + if config.features.traefik: traefik_backend = "nbi" traefik_frontend_rule = "PathPrefix:/api/nbi" diff --git a/services/scheduler/service.py b/services/scheduler/service.py index ea31fb7607..076e3d62dc 100755 --- a/services/scheduler/service.py +++ b/services/scheduler/service.py @@ -16,6 +16,7 @@ class SchedulerService(FastAPIService): name = "scheduler" leader_lock_name = "scheduler" use_mongo = True + use_router = True async def on_activate(self): self.scheduler = Scheduler( diff --git a/services/web/service.py b/services/web/service.py index ac4333c0f3..d7a9b45225 100755 --- a/services/web/service.py +++ b/services/web/service.py @@ -25,6 +25,8 @@ class WebService(FastAPIService): api = [] use_translation = True use_mongo = True + use_router = True + if config.features.traefik: traefik_backend = "web" traefik_frontend_rule = "PathPrefix:/" diff --git a/services/worker/service.py b/services/worker/service.py index 1d3ef63074..13967e159c 100755 --- a/services/worker/service.py +++ b/services/worker/service.py @@ -20,6 +20,7 @@ from noc.core.perf import metrics class WorkerService(FastAPIService): name = "worker" use_mongo = True + use_router = True def __init__(self): super().__init__() -- GitLab From 3f45f942317d0f470f3c0640127cb47cee58a728 Mon Sep 17 00:00:00 2001 From: Andrey Vertiprahov Date: Tue, 11 Oct 2022 22:17:06 +0500 Subject: [PATCH 8/9] Fix typo. --- core/mx.py | 2 +- core/router/base.py | 12 ++++++------ core/router/datastream.py | 3 --- services/metrics/service.py | 1 + 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/core/mx.py b/core/mx.py index 4be8d44f37..f549704714 100644 --- a/core/mx.py +++ b/core/mx.py @@ -29,7 +29,7 @@ class Message(object): # MX stream name MX_STREAM = "message" MX_METRICS_TYPE = "metrics" -MX_METRICS_SCOPE = "scope" +MX_METRICS_SCOPE = "Metric-Scope" # Headers MX_MESSAGE_TYPE = "Message-Type" MX_SHARDING_KEY = "Sharding-Key" diff --git a/core/router/base.py b/core/router/base.py index 8ae9def438..6a4f18e575 100644 --- a/core/router/base.py +++ b/core/router/base.py @@ -17,7 +17,7 @@ from functools import partial import orjson # NOC modules -from noc.core.mx import MX_MESSAGE_TYPE, MX_SHARDING_KEY, Message +from noc.core.mx import MX_MESSAGE_TYPE, MX_SHARDING_KEY, Message, MESSAGE_TYPES from noc.core.service.loader import get_service from noc.core.comp import DEFAULT_ENCODING from noc.core.perf import metrics @@ -215,15 +215,15 @@ class Router(object): """ # Apply routes for route in self.iter_route(msg): - metrics["route_hits"] += 1 + metrics["route_hits", ("type", route.type)] += 1 logger.debug("[%d] Applying route %s", msg_id, route.name) # Apply actions routed: bool = False for stream, action_headers, body in route.iter_action(msg): - metrics["action_hits"] += 1 + metrics["action_hits", ("stream", stream)] += 1 # Fameless drop if stream == DROP: - metrics["action_drops"] += 1 + metrics["action_drops", ("stream", stream)] += 1 logger.debug("[%s] Dropped. Stopping processing", msg_id) return elif stream == DUMP: @@ -254,11 +254,11 @@ class Router(object): # Transmute converts message to an arbitrary structure, # so convert back to the json body = orjson.dumps(body) - metrics[("forwards", ("stream", f"{stream}:{partition}"))] += 1 + metrics[("forwards", ("stream", stream))] += 1 logger.debug("[%s] Routing to %s:%s", msg_id, stream, partition) await self.publish(value=body, stream=stream, partition=partition, headers=headers) routed = True if not routed: logger.debug("[%d] Not routed", msg_id) - metrics["route_misses", ("message_type",)] += 1 + metrics["route_misses", ("message_type", msg.headers.get(MX_MESSAGE_TYPE).decode(DEFAULT_ENCODING))] += 1 # logger.debug("[%s] Finish processing", msg_id) diff --git a/core/router/datastream.py b/core/router/datastream.py index aa8512ce3a..de84b2cac9 100644 --- a/core/router/datastream.py +++ b/core/router/datastream.py @@ -15,6 +15,3 @@ class RouteDataStreamClient(DataStreamClient): async def on_delete(self, data): await self.service.delete_route(data["id"]) - - async def on_ready(self): - await self.service.on_ready() diff --git a/services/metrics/service.py b/services/metrics/service.py index ee1909391c..9c8cd9e6bc 100755 --- a/services/metrics/service.py +++ b/services/metrics/service.py @@ -249,6 +249,7 @@ class ManagedObjectInfo(object): class MetricsService(FastAPIService): name = "metrics" use_mongo = True + use_router = True def __init__(self): super().__init__() -- GitLab From b070de1f2297f1f5ffb20dbeddb35cd586239bbc Mon Sep 17 00:00:00 2001 From: Andrey Vertiprahov Date: Tue, 11 Oct 2022 22:18:48 +0500 Subject: [PATCH 9/9] Fix black. --- core/router/base.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/router/base.py b/core/router/base.py index 6a4f18e575..08c7bc52a3 100644 --- a/core/router/base.py +++ b/core/router/base.py @@ -17,7 +17,7 @@ from functools import partial import orjson # NOC modules -from noc.core.mx import MX_MESSAGE_TYPE, MX_SHARDING_KEY, Message, MESSAGE_TYPES +from noc.core.mx import MX_MESSAGE_TYPE, MX_SHARDING_KEY, Message from noc.core.service.loader import get_service from noc.core.comp import DEFAULT_ENCODING from noc.core.perf import metrics @@ -260,5 +260,8 @@ class Router(object): routed = True if not routed: logger.debug("[%d] Not routed", msg_id) - metrics["route_misses", ("message_type", msg.headers.get(MX_MESSAGE_TYPE).decode(DEFAULT_ENCODING))] += 1 + metrics[ + "route_misses", + ("message_type", msg.headers.get(MX_MESSAGE_TYPE).decode(DEFAULT_ENCODING)), + ] += 1 # logger.debug("[%s] Finish processing", msg_id) -- GitLab