diff --git a/config.py b/config.py index 9d8dbac81c976df8c0f3759218aa8c0c826df6b1..6080717dd8b97076c03c3ce7d055f10713a26655 100644 --- a/config.py +++ b/config.py @@ -498,6 +498,10 @@ class Config(BaseConfig): # enable_diagnostic_change = BooleanParameter(default=False) # + embedded_router = BooleanParameter( + default=True, help="Use embedded process router for sending message" + ) + # ds_limit = IntParameter(default=1000) class mongo(ConfigSection): diff --git a/core/mx.py b/core/mx.py index fb34caeb69dbab41224aaef82dcb97de09e2ec66..f5497047146dfe1ab9d6fc047bdaae38c8eaef0b 100644 --- a/core/mx.py +++ b/core/mx.py @@ -8,9 +8,8 @@ # Python modules from typing import Any, Optional, Dict from threading import Lock - -# Third-party modules -import orjson +from dataclasses import dataclass +from functools import partial # NOC services from noc.core.service.loader import get_service @@ -18,8 +17,19 @@ from noc.core.comp import DEFAULT_ENCODING from noc.core.liftbridge.base import LiftBridgeClient from noc.core.ioloop.util import run_sync + +@dataclass +class Message(object): + value: bytes + headers: Dict[str, bytes] + timestamp: int + key: int + + # MX stream name MX_STREAM = "message" +MX_METRICS_TYPE = "metrics" +MX_METRICS_SCOPE = "Metric-Scope" # Headers MX_MESSAGE_TYPE = "Message-Type" MX_SHARDING_KEY = "Sharding-Key" @@ -86,13 +96,14 @@ def send_message( if headers: msg_headers.update(headers) svc = get_service() - n_partitions = get_mx_partitions() - svc.publish( - value=orjson.dumps(data), - stream=MX_STREAM, - partition=sharding_key % n_partitions, - headers=msg_headers, - ) + run_sync(partial(svc.send_message, data, message_type, headers, sharding_key)) + # n_partitions = get_mx_partitions() + # svc.publish( + # value=orjson.dumps(data), + # stream=MX_STREAM, + # partition=sharding_key % n_partitions, + # headers=msg_headers, + # ) def get_mx_partitions() -> int: diff --git a/services/mx/router/__init__.py b/core/router/__init__.py similarity index 100% rename from services/mx/router/__init__.py rename to core/router/__init__.py diff --git a/services/mx/router/action.py b/core/router/action.py similarity index 74% rename from services/mx/router/action.py rename to core/router/action.py index 6016671077283677868630c3373bfa5ba3417b96..408dd0b1353b7df5e57dc4dea7232b19905fe5b4 100644 --- a/services/mx/router/action.py +++ b/core/router/action.py @@ -14,11 +14,9 @@ import orjson # NOC modules from noc.core.liftbridge.message import Message -from noc.main.models.messageroute import MessageRoute from noc.core.comp import DEFAULT_ENCODING -from noc.core.mx import MX_MESSAGE_TYPE, NOTIFICATION_METHODS -from noc.main.models.notificationgroup import NotificationGroup -from noc.main.models.template import Template +from noc.core.mx import MX_MESSAGE_TYPE, NOTIFICATION_METHODS, MX_METRICS_SCOPE +from noc.config import config DROP = "" @@ -57,19 +55,9 @@ class Action(object, metaclass=ActionBase): def __init__(self, cfg: ActionCfg): self.headers: Dict[str, bytes] = { - h.header: h.value.encode(encoding=DEFAULT_ENCODING) for h in cfg.headers + h.header: h.value.encode(encoding=DEFAULT_ENCODING) for h in cfg.headers or [] } - @classmethod - def from_action(cls, mroute: MessageRoute) -> "Action": - global ACTION_TYPES - - return ACTION_TYPES[mroute.type]( - ActionCfg( - type=mroute.type, stream=mroute.stream, notification_group=mroute.notification_group - ) - ) - @classmethod def from_data(cls, data): global ACTION_TYPES @@ -117,6 +105,9 @@ class NotificationAction(Action): name = "notification" def __init__(self, cfg: ActionCfg): + from noc.main.models.notificationgroup import NotificationGroup + from noc.main.models.template import Template + super().__init__(cfg) self.ng: NotificationGroup = NotificationGroup.get_by_id(cfg.notification_group) self.rt: Template = Template.get_by_id(cfg.render_template) @@ -128,3 +119,28 @@ class NotificationAction(Action): yield NOTIFICATION_METHODS[method], header, self.ng.render_message( mt, orjson.loads(msg.value), render_template ) if render_template else body + + +class MetricAction(Action): + name = "metrics" + + def __init__(self, cfg: ActionCfg): + super().__init__(cfg) + self.stream: str = cfg.stream + self.mx_metrics_scopes = {} + self.load_handlers() + + def load_handlers(self): + from noc.main.models.metricstream import MetricStream + + for mss in MetricStream.objects.filter(): + if mss.is_active and mss.scope.table_name in set(config.message.enable_metric_scopes): + self.mx_metrics_scopes[mss.scope.table_name.encode(DEFAULT_ENCODING)] = mss.to_mx + + def iter_action(self, msg: Message) -> Iterator[Tuple[str, Dict[str, bytes], bytes]]: + table = msg.headers.get(MX_METRICS_SCOPE) + if table not in self.mx_metrics_scopes: + return + yield self.stream, self.headers, [ + self.mx_metrics_scopes[table](orjson.loads(v)) for v in msg.value.split(b"\n") + ] diff --git a/core/router/base.py b/core/router/base.py new file mode 100644 index 0000000000000000000000000000000000000000..08c7bc52a398cbae6662b084f7de36eab57d383c --- /dev/null +++ b/core/router/base.py @@ -0,0 +1,267 @@ +# ---------------------------------------------------------------------- +# Router +# ---------------------------------------------------------------------- +# Copyright (C) 2007-2022 The NOC Project +# See LICENSE for details +# ---------------------------------------------------------------------- + +# Python modules +import logging +import operator +from time import time_ns +from collections import defaultdict +from typing import List, DefaultDict, Iterator, Dict, Iterable, Optional, Any +from functools import partial + +# Third-party modules +import orjson + +# NOC modules +from noc.core.mx import MX_MESSAGE_TYPE, MX_SHARDING_KEY, Message +from noc.core.service.loader import get_service +from noc.core.comp import DEFAULT_ENCODING +from noc.core.perf import metrics +from noc.core.ioloop.util import run_sync +from .route import Route, DefaultNotificationRoute +from .action import DROP, DUMP + +logger = logging.getLogger(__name__) + + +class Router(object): + def __init__(self): + self.chains: DefaultDict[bytes, List[Route]] = defaultdict(list) + self.routes: Dict[str, Route] = {} + self.default_route: Optional[DefaultNotificationRoute] = DefaultNotificationRoute() + self.stream_partitions: Dict[str, int] = {} + self.svc = get_service() + # self.out_queue: Optional[QBuffer] = None + + def load(self): + """ + Load up all the rules and populate the chains + :return: + """ + from noc.main.models.messageroute import MessageRoute + + num = 0 + for num, route in enumerate( + MessageRoute.objects.filter(is_active=True).order_by("order"), start=1 + ): + self.chains[route.type.encode(encoding=DEFAULT_ENCODING)] += [ + Route.from_data(route.get_route_config()) + ] + logger.info("Loading %s route", num) + + def has_route(self, route_id: str) -> bool: + """ + Check Route already exists in chains + :param route_id: + :return: + """ + return route_id in self.routes + + def change_route(self, data): + """ + Update route in chain + If change Chain - + * change type = delete + insert + * change order = reorder + * change data = update + :param data: + :return: + """ + r = Route.from_data(data) + route_id = data["id"] + to_rebuild = set() + if not self.has_route(route_id): + self.routes[data["id"]] = r + to_rebuild.add(r.type) + logger.info("[%s|%s] Insert route", route_id, data["name"]) + self.rebuild_chains(to_rebuild) + return + if self.routes[route_id].type != r.type: + # rebuild + logger.info( + "[%s|%s] Change route chain: %s -> %s", + route_id, + data["name"], + self.routes[route_id].type, + r.type, + ) + to_rebuild.add([r.type, self.routes[route_id].type]) + self.routes[route_id].set_type(r.type) + if self.routes[route_id].order != r.order: + logger.info( + "[%s|%s] Change route order: %s -> %s", + route_id, + data["name"], + self.routes[route_id].order, + r.order, + ) + self.routes[route_id].set_order(r.order) + to_rebuild.add(r.type) + if self.routes[route_id].is_differ(data): + logger.info("[%s|%s] Update route", route_id, data["name"]) + self.routes[route_id].update(data) + if to_rebuild: + self.rebuild_chains(to_rebuild) + + def delete_route(self, route_id: str): + """ + Delete Route from Chains + :param route_id: + :return: + """ + r_type = None + if route_id in self.routes: + logger.info("[%s|%s] Delete route", route_id, self.routes[route_id].name) + r_type = self.routes[route_id].type + del self.routes[route_id] + if r_type: + self.rebuild_chains([r_type]) + + def rebuild_chains(self, r_types: Optional[Iterable[str]] = None): + """ + Rebuild Router Chains + Need lock ? + :param r_types: List types for rebuild chains + :return: + """ + chains = defaultdict(list) + for r in self.routes.values(): + if r_types and r.type not in r_types: + continue + chains[r.type].append(r) + for chain in chains: + logger.info("[%s] Rebuild chain", chain) + self.chains[chain.encode(encoding=DEFAULT_ENCODING)] = list( + sorted( + [r for r in chains[chain]], + key=operator.attrgetter("order"), + ) + ) + + def iter_route(self, msg: Message) -> Iterator[Route]: + mt = msg.headers.get(MX_MESSAGE_TYPE) + if not mt: + return + # Check default Route + if self.default_route and self.default_route.is_match(msg): + yield self.default_route + # Iterate over routes + for route in self.chains[mt]: + if route.is_match(msg): + yield route + + async def publish( + self, + value: bytes, + stream: str, + partition: Optional[int] = None, + key: Optional[bytes] = None, + headers: Optional[Dict[str, bytes]] = None, + ): + # if self.out_queue: + # self.out_queue.put(stream, partition, data=value) + # else: + self.svc.publish(value=value, stream=stream, partition=partition, headers=headers) + + def route_sync(self, msg: Message): + """ + Synchronize method + :param msg: + :return: + """ + run_sync(partial(self.route_message, msg)) + + @staticmethod + def get_message( + data: Any, + message_type: str, + headers: Optional[Dict[str, bytes]] = None, + sharding_key: int = 0, + ) -> Message: + """ + Build message + + :param data: Data for transmit + :param message_type: Message type + :param headers: additional message headers + :param sharding_key: Key for sharding + :return: + """ + msg_headers = { + MX_MESSAGE_TYPE: message_type.encode(DEFAULT_ENCODING), + MX_SHARDING_KEY: str(sharding_key).encode(DEFAULT_ENCODING), + } + if headers: + msg_headers.update(headers) + if not isinstance(data, bytes): + data = orjson.dumps(data) + return Message( + value=data, + headers=msg_headers, + timestamp=time_ns(), + key=sharding_key, + ) + + async def route_message(self, msg: Message, msg_id: Optional[str] = None): + """ + Route message by rule + :param msg: + :param msg_id: + :return: + """ + # Apply routes + for route in self.iter_route(msg): + metrics["route_hits", ("type", route.type)] += 1 + logger.debug("[%d] Applying route %s", msg_id, route.name) + # Apply actions + routed: bool = False + for stream, action_headers, body in route.iter_action(msg): + metrics["action_hits", ("stream", stream)] += 1 + # Fameless drop + if stream == DROP: + metrics["action_drops", ("stream", stream)] += 1 + logger.debug("[%s] Dropped. Stopping processing", msg_id) + return + elif stream == DUMP: + logger.info( + "[%s] Dump. Message headers: %s;\n-----\n Body: %s \n----\n ", + msg_id, + msg.headers, + msg.value, + ) + continue + # Build resulting headers + headers = {} + headers.update(msg.headers) + if action_headers: + headers.update(action_headers) + # Determine sharding channel + sharding_key = int(headers.get(MX_SHARDING_KEY, b"0")) + partitions = self.stream_partitions.get(stream) + if not partitions: + # Request amount of partitions + partitions = await self.svc.get_stream_partitions(stream) + self.stream_partitions[stream] = partitions + partition = sharding_key % partitions + # Single message may be transmuted in zero or more messages + body = route.transmute(headers, body) + # for body in route.iter_transmute(headers, msg.value): + if not isinstance(body, bytes): + # Transmute converts message to an arbitrary structure, + # so convert back to the json + body = orjson.dumps(body) + metrics[("forwards", ("stream", stream))] += 1 + logger.debug("[%s] Routing to %s:%s", msg_id, stream, partition) + await self.publish(value=body, stream=stream, partition=partition, headers=headers) + routed = True + if not routed: + logger.debug("[%d] Not routed", msg_id) + metrics[ + "route_misses", + ("message_type", msg.headers.get(MX_MESSAGE_TYPE).decode(DEFAULT_ENCODING)), + ] += 1 + # logger.debug("[%s] Finish processing", msg_id) diff --git a/services/mx/datastream.py b/core/router/datastream.py similarity index 89% rename from services/mx/datastream.py rename to core/router/datastream.py index aa8512ce3aaf9c45d559554ec9bf7a4b0221c48c..de84b2cac99969a3ecde66bc928414f90c4780dd 100644 --- a/services/mx/datastream.py +++ b/core/router/datastream.py @@ -15,6 +15,3 @@ class RouteDataStreamClient(DataStreamClient): async def on_delete(self, data): await self.service.delete_route(data["id"]) - - async def on_ready(self): - await self.service.on_ready() diff --git a/services/mx/router/route.py b/core/router/route.py similarity index 79% rename from services/mx/router/route.py rename to core/router/route.py index e52cc2009bf293d910b8f25c0a1ae8cb067b8e2c..b833c99384b163c0c68e3ee12a8851aff635a231 100644 --- a/services/mx/router/route.py +++ b/core/router/route.py @@ -26,9 +26,6 @@ from noc.core.mx import ( MX_MESSAGE_TYPE, NOTIFICATION_METHODS, ) -from noc.main.models.messageroute import MessageRoute -from noc.main.models.template import Template -from noc.main.models.handler import Handler from .action import Action T_BODY = Union[bytes, Any] @@ -57,7 +54,7 @@ class TransmuteTemplate(object): @dataclass -class HeaderItem(object): +class HeaderMatchItem(object): header: str op: Literal["==", "!=", "regex"] value: str @@ -83,7 +80,7 @@ class MatchItem(object): labels: Optional[List[str]] = None exclude_labels: Optional[List[str]] = None administrative_domain: Optional[int] = None - headers: Optional[List[HeaderItem]] = None + headers: Optional[List[HeaderMatchItem]] = None @classmethod def from_data(cls, data: List[Dict[str, Any]]) -> List["MatchItem"]: @@ -95,32 +92,13 @@ class MatchItem(object): exclude_labels=match["exclude_labels"], administrative_domain=match.get("administrative_domain"), headers=[ - HeaderItem(header=h["header"], op=h["op"], value=h["value"]) + HeaderMatchItem(header=h["header"], op=h["op"], value=h["value"]) for h in match["headers"] ], ) ] return r - @classmethod - def from_route(cls, route: MessageRoute) -> List["MatchItem"]: - r = [] - for match in route.match: - r += [ - MatchItem( - labels=match.labels, - exclude_labels=match.exclude_labels, - administrative_domain=match.administrative_domain.id - if match.administrative_domain - else None, - headers=[ - HeaderItem(header=h.header, op=h.op, value=h.value) - for h in match.headers_match - ], - ) - ] - return r - class Route(object): """ @@ -136,7 +114,6 @@ class Route(object): self.actions: List[Action] = [] self.transmute_handler: Optional[Callable[[Dict[str, bytes], T_BODY], T_BODY]] = None self.transmute_template: Optional[TransmuteTemplate] = None - self.render_template: Optional[Template] = None def is_match(self, msg: Message) -> bool: """ @@ -192,6 +169,9 @@ class Route(object): return True def update(self, data): + from noc.main.models.template import Template + from noc.main.models.handler import Handler + self.match_co = self.compile_match(MatchItem.from_data(data["match"])) # Compile transmute part # r.transmutations = [Transmutation.from_transmute(t) for t in route.transmute] @@ -200,8 +180,6 @@ class Route(object): if "transmute_template" in data: template = Template.objects.get(id=data["transmute_template"]) self.transmute_template = TransmuteTemplate(JTemplate(template.body)) - if "render_template" in data: - self.render_template = data["render_template"] # Compile action part self.actions = [Action.from_data(data)] @@ -236,15 +214,17 @@ class Route(object): for header in match_eq: if len(match_eq[header]) == 1: # == - expr += [f"headers[{header!r}] == {match_eq[header][0]!r}"] + expr += [ + f"{header!r} in headers and headers[{header!r}] == {match_eq[header][0]!r}" + ] else: # in expr += [ - f'headers[{header!r}] in ({", ".join("%r" % x for x in match_eq[header])!r})' + f'{header!r} in headers and headers[{header!r}] in ({", ".join("%r" % x for x in match_eq[header])!r})' ] # Expression for != for header, value in match_ne: - expr += [f"headers[{header!r}] != {value!r}"] + expr += [f"{header!r} in headers and headers[{header!r}] != {value!r}"] # Expression for regex # @todo # Compile matching code @@ -265,27 +245,6 @@ class Route(object): r.update(data) return r - @classmethod - def from_route(cls, route: MessageRoute) -> "Route": - """ - Build Route from database config - :param route: - :return: - """ - r = Route(route.name, route.type, route.order) - r.match_co = cls.compile_match(MatchItem.from_route(route)) - # Compile transmute part - # r.transmutations = [Transmutation.from_transmute(t) for t in route.transmute] - r.transmute_handler = route.transmute_handler - if route.transmute_template: - r.render_template = RenderTemplate( - subject_template=route.transmute_template.subject, - body_template=route.transmute_template.body, - ) - # Compile action part - r.actions = [Action.from_action(route)] - return r - class DefaultNotificationRoute(Route): """ diff --git a/core/service/base.py b/core/service/base.py index 2fe240bc4f5e27c9bf2df51e63d65222a306cca9..3023f57b2b1c9f06b48e88924e7fc529c1176a58 100644 --- a/core/service/base.py +++ b/core/service/base.py @@ -27,6 +27,7 @@ from typing import ( TypeVar, NoReturn, Awaitable, + Set, ) # Third-party modules @@ -48,11 +49,14 @@ from noc.core.liftbridge.error import LiftbridgeError from noc.core.liftbridge.queue import LiftBridgeQueue from noc.core.liftbridge.queuebuffer import QBuffer from noc.core.liftbridge.message import Message +from noc.core.router.base import Router from noc.core.ioloop.util import setup_asyncio from noc.core.ioloop.timers import PeriodicCallback -from noc.core.mx import MX_STREAM, get_mx_partitions, MX_MESSAGE_TYPE +from noc.core.error import NOCError +from noc.core.mx import MX_METRICS_SCOPE, MX_METRICS_TYPE, MX_STREAM from .rpc import RPCProxy from .loader import set_service +from ..router.datastream import RouteDataStreamClient T = TypeVar("T") @@ -94,6 +98,8 @@ class BaseService(object): # Usually means resolution error to required services # temporary leads service to unhealthy state require_dcs_health = True + # Use embedded router for messages + use_router = False LOG_FORMAT = config.log_format @@ -122,6 +128,8 @@ class BaseService(object): self.startup_ts = None self.telemetry_callback = None self.dcs = None + # Message routed + self.router: Optional[Router] = None # Effective address and port self.address = None self.port = None @@ -134,8 +142,7 @@ class BaseService(object): # Metrics publisher buffer self.metrics_queue: Optional[QBuffer] = None # MX metrics publisher buffer - self.mx_metrics_queue: Optional[QBuffer] = None - self.mx_metrics_scopes: Dict[str, Callable] = {} + self.mx_metrics_scopes: Set[str] = set(config.message.enable_metric_scopes) self.mx_partitions: int = 0 # self.active_subscribers = 0 @@ -390,7 +397,11 @@ class BaseService(object): await self.init_api() # - if config.message.enable_metrics: + if config.message.embedded_router and self.use_router: + self.router = Router() + self.router.load() + asyncio.get_running_loop().create_task(self.get_mx_routes_config()) + if not config.message.embedded_router and config.message.enable_metrics: self.mx_partitions = await self.get_stream_partitions("message") # if self.use_telemetry: @@ -643,20 +654,6 @@ class BaseService(object): self.metrics_queue = QBuffer(max_size=config.liftbridge.max_message_size) self.loop.create_task(self.publisher()) self.loop.create_task(self.publish_metrics(self.metrics_queue)) - if config.message.enable_metrics and self.use_mongo: - from noc.main.models.metricstream import MetricStream - - for mss in MetricStream.objects.filter(): - if mss.is_active and mss.scope.table_name in set( - config.message.enable_metric_scopes - ): - self.mx_metrics_scopes[mss.scope.table_name] = mss.to_mx - self.mx_metrics_queue = QBuffer(max_size=config.liftbridge.max_message_size) - self.loop.create_task( - self.publish_metrics( - self.mx_metrics_queue, headers={MX_MESSAGE_TYPE: b"metrics"} - ) - ) def publish( self, @@ -759,6 +756,13 @@ class BaseService(object): t0 = perf_counter() for stream, partititon, chunk in queue.iter_slice(): self.publish(chunk, stream=stream, partition=partititon, headers=headers) + table_name = stream.split(".")[1] + if config.message.enable_metrics and table_name in self.mx_metrics_scopes: + await self.send_message( + data=chunk, + message_type=MX_METRICS_TYPE, + headers={MX_METRICS_SCOPE: table_name.encode(encoding="utf-8")}, + ) if not self.publish_queue.to_shutdown: to_sleep = config.liftbridge.metrics_send_delay - (perf_counter() - t0) if to_sleep > 0: @@ -784,27 +788,6 @@ class BaseService(object): self.metrics_queue.put( stream=f"ch.{table}", partition=key % self.n_metrics_partitions, data=metrics ) - # Mirror to MX - if ( - config.message.enable_metrics - and self.mx_metrics_scopes - and table in self.mx_metrics_scopes - ): - n_partitions = get_mx_partitions() - self.mx_metrics_queue.put( - stream=MX_STREAM, - partition=key % n_partitions, - data=[self.mx_metrics_scopes[table](m) for m in metrics], - ) - # self.publish( - # value=orjson.dumps([self.mx_metrics_scopes[table](m) for m in metrics]), - # stream=MX_STREAM, - # partition=key % n_partitions, - # headers={ - # MX_MESSAGE_TYPE: b"metrics", - # MX_SHARDING_KEY: smart_bytes(key), - # }, - # ) def start_telemetry_callback(self) -> None: """ @@ -824,6 +807,54 @@ class BaseService(object): if spans: self.register_metrics("span", [span_to_dict(s) for s in spans]) + async def get_mx_routes_config(self): + """ + Subscribe and track datastream changes + """ + client = RouteDataStreamClient("cfgmxroute", service=self) + # Track stream changes + while True: + self.logger.info("Starting to track MX route settings") + try: + await client.query(limit=config.message.ds_limit, block=True) + except NOCError as e: + self.logger.info("Failed to get MX route settings: %s", e) + await asyncio.sleep(1) + + async def update_route(self, data: Dict[str, Any]) -> None: + self.router.change_route(data) + + async def delete_route(self, r_id: str) -> None: + self.router.delete_route(r_id) + + async def send_message( + self, + data: Any, + message_type: str, + headers: Optional[Dict[str, bytes]] = None, + sharding_key: int = 0, + ): + """ + Build message and schedule to send to mx service + + :param data: Data for transmit + :param message_type: Message type + :param headers: additional message headers + :param sharding_key: Key for sharding over MX services + :return: + """ + msg = self.router.get_message(data, message_type, headers, sharding_key) + self.logger.debug("Send message: %s", msg) + if self.router and config.message.embedded_router: + await self.router.route_message(msg) + else: + self.publish( + value=msg.value, + stream=MX_STREAM, + partition=sharding_key % self.mx_partitions, + headers=msg.headers, + ) + def get_leader_lock_name(self): if self.leader_lock_name: return self.leader_lock_name % {"pool": config.pool} diff --git a/main/models/messageroute.py b/main/models/messageroute.py index 27ad88b15e935910e18c010505d5fb45198f7f5b..80f368d0533783b49a4d1e0d369df5552ace4ce3 100644 --- a/main/models/messageroute.py +++ b/main/models/messageroute.py @@ -88,7 +88,7 @@ class MessageRoute(Document): transmute_handler = PlainReferenceField(Handler) transmute_template = ForeignKeyField(Template) # Message actions - action = StringField(choices=["drop", "stream", "notification"], default="notification") + action = StringField(choices=["drop", "dump", "stream", "notification"], default="notification") stream = StringField() notification_group = ForeignKeyField(NotificationGroup) render_template = ForeignKeyField(Template) @@ -109,6 +109,8 @@ class MessageRoute(Document): return self.name def clean(self): + if self.type == "metrics" and self.action == "notification": + raise ValidationError({"action": "For type 'metric' Notification is not allowed"}) if self.action == "stream" and not self.stream: raise ValidationError({"stream": "For 'stream' action Stream must be set"}) elif self.action == "notification" and not self.notification_group: @@ -116,3 +118,45 @@ class MessageRoute(Document): {"notification_group": "For 'notification' action NotificationGroup must be set"} ) super().clean() + + def get_route_config(self): + """ + Return data for configured Router + :return: + """ + r = { + "name": self.name, + "type": self.type, + "order": self.order, + "action": self.action, + "match": [], + } + if self.stream: + r["stream"] = self.stream + if self.type == "metrics" and self.action == "stream": + r["action"] = "metrics" + if self.headers: + r["headers"] = [{"header": m.header, "value": m.value} for m in self.headers] + if self.notification_group: + r["notification_group"] = str(self.notification_group.id) + if self.render_template: + r["render_template"] = str(self.render_template.id) + if self.transmute_template: + r["transmute_template"] = str(self.transmute_template.id) + if self.transmute_handler: + r["transmute_handler"] = str(self.transmute_handler.id) + for match in self.match: + r["match"] += [ + { + "labels": match.labels, + "exclude_labels": match.exclude_labels, + "administrative_domain": str(match.administrative_domain.id) + if match.administrative_domain + else None, + "headers": [ + {"header": m.header, "op": m.op, "value": m.value} + for m in match.headers_match + ], + } + ] + return r diff --git a/services/datastream/streams/cfgmxroute.py b/services/datastream/streams/cfgmxroute.py index 1399c4a1dfa01ec0db221efc02ea2a972e527e5c..c99ab55304892f4031244f73228deb829f8c513a 100644 --- a/services/datastream/streams/cfgmxroute.py +++ b/services/datastream/streams/cfgmxroute.py @@ -21,38 +21,6 @@ class CfgMetricsCollectorDataStream(DataStream): route: "MessageRoute" = MessageRoute.objects.get(id=oid) if not route or not route.is_active: raise KeyError() - r = { - "id": str(route.id), - "name": route.name, - "type": route.type, - "order": route.order, - "action": route.action, - "match": [], - } - if route.stream: - r["stream"] = route.stream - if route.headers: - r["headers"] = [{"header": m.header, "value": m.value} for m in route.headers] - if route.notification_group: - r["notification_group"] = str(route.notification_group.id) - if route.render_template: - r["render_template"] = str(route.render_template.id) - if route.transmute_template: - r["transmute_template"] = str(route.transmute_template.id) - if route.transmute_handler: - r["transmute_handler"] = str(route.transmute_handler.id) - for match in route.match: - r["match"] += [ - { - "labels": match.labels, - "exclude_labels": match.exclude_labels, - "administrative_domain": str(match.administrative_domain.id) - if match.administrative_domain - else None, - "headers": [ - {"header": m.header, "op": m.op, "value": m.value} - for m in match.headers_match - ], - } - ] + r = route.get_route_config() + r["id"] = str(route.id) return r diff --git a/services/discovery/service.py b/services/discovery/service.py index d9bcf6729eb28420bb49d91ee4e89dda52623b05..207d72acd1320320c3d4a56da776e73a89a283cc 100755 --- a/services/discovery/service.py +++ b/services/discovery/service.py @@ -16,6 +16,7 @@ class DiscoveryService(FastAPIService): name = "discovery" pooled = True use_mongo = True + use_router = True process_name = "noc-%(name).10s-%(pool).5s" def __init__(self): diff --git a/services/escalator/service.py b/services/escalator/service.py index cd08ab42e84aff3de570a8ddde7ec1df02944018..65b3064148a954e63597d729c4258360a4967ec4 100755 --- a/services/escalator/service.py +++ b/services/escalator/service.py @@ -23,6 +23,7 @@ class EscalatorService(FastAPIService): leader_lock_name = "escalator" use_telemetry = True use_mongo = True + use_router = True def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) diff --git a/services/metrics/service.py b/services/metrics/service.py index ee1909391cbfe6f705862ed47aa75e6283b1ebd9..9c8cd9e6bc4d50dbc705ecbedafaf76f06535934 100755 --- a/services/metrics/service.py +++ b/services/metrics/service.py @@ -249,6 +249,7 @@ class ManagedObjectInfo(object): class MetricsService(FastAPIService): name = "metrics" use_mongo = True + use_router = True def __init__(self): super().__init__() diff --git a/services/mx/router/router.py b/services/mx/router/router.py deleted file mode 100644 index a626ffbec8098088da3e2abcf88068c64d7f3eb9..0000000000000000000000000000000000000000 --- a/services/mx/router/router.py +++ /dev/null @@ -1,141 +0,0 @@ -# ---------------------------------------------------------------------- -# Router -# ---------------------------------------------------------------------- -# Copyright (C) 2007-2020 The NOC Project -# See LICENSE for details -# ---------------------------------------------------------------------- - -# Python modules -import logging -import operator -from collections import defaultdict -from typing import List, DefaultDict, Iterator, Dict, Iterable, Optional - -# NOC modules -from noc.core.liftbridge.message import Message -from noc.core.mx import MX_MESSAGE_TYPE -from noc.main.models.messageroute import MessageRoute -from noc.core.comp import DEFAULT_ENCODING -from .route import Route, DefaultNotificationRoute - -logger = logging.getLogger(__name__) - - -class Router(object): - def __init__(self): - self.chains: DefaultDict[bytes, List[Route]] = defaultdict(list) - self.routes: Dict[str, Route] = {} - self.default_route: Optional[DefaultNotificationRoute] = DefaultNotificationRoute() - - def load(self): - """ - Load up all the rules and populate the chains - :return: - """ - num = 0 - for num, route in enumerate( - MessageRoute.objects.filter(is_active=True).order_by("order"), start=1 - ): - self.chains[route.type.encode(encoding=DEFAULT_ENCODING)] += [Route.from_route(route)] - logger.info("Loading %s route", num) - - def has_route(self, route_id: str) -> bool: - """ - Check Route already exists in chains - :param route_id: - :return: - """ - return route_id in self.routes - - def change_route(self, data): - """ - Update route in chain - If change Chain - - * change type = delete + insert - * change order = reorder - * change data = update - :param data: - :return: - """ - r = Route.from_data(data) - route_id = data["id"] - to_rebuild = set() - if not self.has_route(route_id): - self.routes[data["id"]] = r - to_rebuild.add(r.type) - logger.info("[%s|%s] Insert route", route_id, data["name"]) - self.rebuild_chains(to_rebuild) - return - if self.routes[route_id].type != r.type: - # rebuild - logger.info( - "[%s|%s] Change route chain: %s -> %s", - route_id, - data["name"], - self.routes[route_id].type, - r.type, - ) - to_rebuild.add([r.type, self.routes[route_id].type]) - self.routes[route_id].set_type(r.type) - if self.routes[route_id].order != r.order: - logger.info( - "[%s|%s] Change route order: %s -> %s", - route_id, - data["name"], - self.routes[route_id].order, - r.order, - ) - self.routes[route_id].set_order(r.order) - to_rebuild.add(r.type) - if self.routes[route_id].is_differ(data): - logger.info("[%s|%s] Update route", route_id, data["name"]) - self.routes[route_id].update(data) - if to_rebuild: - self.rebuild_chains(to_rebuild) - - def delete_route(self, route_id: str): - """ - Delete Route from Chains - :param route_id: - :return: - """ - r_type = None - if route_id in self.routes: - logger.info("[%s|%s] Delete route", route_id, self.routes[route_id].name) - r_type = self.routes[route_id].type - del self.routes[route_id] - if r_type: - self.rebuild_chains([r_type]) - - def rebuild_chains(self, r_types: Optional[Iterable[str]] = None): - """ - Rebuild Router Chains - Need lock ? - :param r_types: List types for rebuild chains - :return: - """ - chains = defaultdict(list) - for r in self.routes.values(): - if r_types and r.type not in r_types: - continue - chains[r.type].append(r) - for chain in chains: - logger.info("[%s] Rebuild chain", chain) - self.chains[chain.encode(encoding=DEFAULT_ENCODING)] = list( - sorted( - [r for r in chains[chain]], - key=operator.attrgetter("order"), - ) - ) - - def iter_route(self, msg: Message) -> Iterator[Route]: - mt = msg.headers.get(MX_MESSAGE_TYPE) - if not mt: - return - # Check default Route - if self.default_route and self.default_route.is_match(msg): - yield self.default_route - # Iterate over routes - for route in self.chains[mt]: - if route.is_match(msg): - yield route diff --git a/services/mx/service.py b/services/mx/service.py index ef36bd823a2341dafaf3acc93abdc345d6c53069..980ec937693787216eb4f5d47f49f6075eb5279f 100755 --- a/services/mx/service.py +++ b/services/mx/service.py @@ -2,33 +2,25 @@ # ---------------------------------------------------------------------- # mx service # ---------------------------------------------------------------------- -# Copyright (C) 2007-2021 The NOC Project +# Copyright (C) 2007-2022 The NOC Project # See LICENSE for details # ---------------------------------------------------------------------- # Python modules import asyncio -from typing import Dict, Any - -# Third-party modules -import orjson # NOC modules from noc.core.service.fastapi import FastAPIService -from noc.core.error import NOCError from noc.core.mx import MX_STREAM from noc.config import config from noc.core.liftbridge.message import Message -from noc.core.mx import MX_SHARDING_KEY -from noc.services.mx.router.router import Router -from noc.services.mx.router.action import DROP, DUMP from noc.core.perf import metrics -from noc.services.mx.datastream import RouteDataStreamClient class MXService(FastAPIService): name = "mx" use_mongo = True + use_router = True if config.features.traefik: traefik_backend = "mx" @@ -38,32 +30,15 @@ class MXService(FastAPIService): super().__init__() self.slot_number = 0 self.total_slots = 0 - self.router = Router() - self.stream_partitions: Dict[str, int] = {} async def init_api(self): # Postpone initialization process until config datastream is fully processed self.ready_event = asyncio.Event() - asyncio.get_running_loop().create_task(self.get_mx_routes_config()) # Set by datastream.on_ready await self.ready_event.wait() # Process as usual await super().init_api() - async def get_mx_routes_config(self): - """ - Subscribe and track datastream changes - """ - client = RouteDataStreamClient("cfgmxroute", service=self) - # Track stream changes - while True: - self.logger.info("Starting to track MX route settings") - try: - await client.query(limit=config.message.ds_limit, block=True) - except NOCError as e: - self.logger.info("Failed to get MX route settings: %s", e) - await asyncio.sleep(1) - async def on_ready(self) -> None: # Pass further initialization self.ready_event.set() @@ -74,63 +49,11 @@ class MXService(FastAPIService): self.slot_number, self.total_slots = await self.acquire_slot() await self.subscribe_stream(MX_STREAM, self.slot_number, self.on_message, async_cursor=True) - async def update_route(self, data: Dict[str, Any]) -> None: - self.router.change_route(data) - - async def delete_route(self, r_id: str) -> None: - self.router.delete_route(r_id) - async def on_message(self, msg: Message) -> None: metrics["messages"] += 1 # Apply routes self.logger.debug("[%d] Receiving message %s", msg.offset, msg.headers) - for route in self.router.iter_route(msg): - metrics["route_hits"] += 1 - self.logger.debug("[%d] Applying route %s", msg.offset, route.name) - # Apply actions - routed: bool = False - for stream, action_headers, body in route.iter_action(msg): - metrics["action_hits"] += 1 - # Fameless drop - if stream == DROP: - metrics["action_drops"] += 1 - self.logger.debug("[%s] Dropped. Stopping processing", msg.offset) - return - elif stream == DUMP: - self.logger.info( - "[%s] Dump. Message headers: %s;\n-----\n Body: %s \n----\n ", - msg.offset, - msg.headers, - msg.value, - ) - continue - # Build resulting headers - headers = {} - headers.update(msg.headers) - if action_headers: - headers.update(action_headers) - # Determine sharding channel - sharding_key = int(headers.get(MX_SHARDING_KEY, b"0")) - partitions = self.stream_partitions.get(stream) - if not partitions: - # Request amount of partitions - partitions = await self.get_stream_partitions(stream) - self.stream_partitions[stream] = partitions - partition = sharding_key % partitions - # Single message may be transmuted in zero or more messages - body = route.transmute(headers, body) - # for body in route.iter_transmute(headers, msg.value): - if not isinstance(body, bytes): - # Transmute converts message to an arbitrary structure, - # so convert back to the json - body = orjson.dumps(body) - metrics[("forwards", f"{stream}:{partition}")] += 1 - self.logger.debug("[%s] Routing to %s:%s", msg.offset, stream, partition) - self.publish(value=body, stream=stream, partition=partition, headers=headers) - routed = True - if not routed: - self.logger.debug("[%d] Not routed", msg.offset) - metrics["route_misses"] += 1 + await self.router.route_message(msg, msg_id=msg.offset) self.logger.debug("[%s] Finish processing", msg.offset) diff --git a/services/nbi/service.py b/services/nbi/service.py index 9c1666086de058c9e37425e638887378de149f9b..5bfc522472fb0603976a3f17417e265ecda55aec 100755 --- a/services/nbi/service.py +++ b/services/nbi/service.py @@ -20,6 +20,8 @@ PREFIX_NBI = "/api/nbi/" class NBIService(FastAPIService): name = "nbi" use_mongo = True + use_router = True + if config.features.traefik: traefik_backend = "nbi" traefik_frontend_rule = "PathPrefix:/api/nbi" diff --git a/services/scheduler/service.py b/services/scheduler/service.py index ea31fb7607d48c7273c9bbf71eb215a5697ae616..076e3d62dc7b3101e7a1e02e685a1a842e94b12d 100755 --- a/services/scheduler/service.py +++ b/services/scheduler/service.py @@ -16,6 +16,7 @@ class SchedulerService(FastAPIService): name = "scheduler" leader_lock_name = "scheduler" use_mongo = True + use_router = True async def on_activate(self): self.scheduler = Scheduler( diff --git a/services/web/service.py b/services/web/service.py index ac4333c0f319e743543ac49f17874b244d188812..d7a9b452257909b5bffbe836f9273f73a35d4f83 100755 --- a/services/web/service.py +++ b/services/web/service.py @@ -25,6 +25,8 @@ class WebService(FastAPIService): api = [] use_translation = True use_mongo = True + use_router = True + if config.features.traefik: traefik_backend = "web" traefik_frontend_rule = "PathPrefix:/" diff --git a/services/worker/service.py b/services/worker/service.py index 1d3ef630745b2a6eae029599fb85a71ea31b5b43..13967e159cc6d066f6fbb55dc2c9ba8084b0277a 100755 --- a/services/worker/service.py +++ b/services/worker/service.py @@ -20,6 +20,7 @@ from noc.core.perf import metrics class WorkerService(FastAPIService): name = "worker" use_mongo = True + use_router = True def __init__(self): super().__init__()