diff --git a/commands/cdag.py b/commands/cdag.py index 804fb810f342d7392698e3c611be42945edd3c2b..48173f284e816ab8f9cb918825ac78ddf69ac764 100644 --- a/commands/cdag.py +++ b/commands/cdag.py @@ -16,8 +16,6 @@ import orjson # NOC modules from noc.core.management.base import BaseCommand from noc.core.cdag.graph import CDAG -from noc.core.cdag.factory.yaml import YAMLCDAGFactory -from noc.core.cdag.factory.json import JSONCDAGFactory from noc.core.service.loader import get_service @@ -49,6 +47,9 @@ class Command(BaseCommand): senders = {n for n in cdag.nodes.values() if n.name == "metrics"} default_units = {n.node_id: n.config.unit for n in probes.values()} skip_fields = {"ts", "labels", "_units"} + key_fields = set() + for s in senders: + key_fields |= set(kf for kf in s.iter_unbound_inputs() if kf not in ("ts", "labels")) with open(input) as f, open(output, "wb") as fo: for line in f: line = line.strip() @@ -59,7 +60,7 @@ class Command(BaseCommand): units = data.get("_units") or {} ts = data["ts"] for n in data: - if n in skip_fields: + if n in skip_fields or n in key_fields: continue mu = units.get(n) or default_units[n] probe = probes[n] @@ -68,6 +69,10 @@ class Command(BaseCommand): probe.activate(tx, "unit", mu) # Activate senders for sender in senders: + for kf in key_fields: + k = data.get(kf) + if k is not None: + sender.activate(tx, kf, k) sender.activate(tx, "ts", ts) sender.activate(tx, "labels", data.get("labels") or []) for scope in svc._metrics: @@ -77,25 +82,60 @@ class Command(BaseCommand): # Reset metrics svc._metrics = defaultdict(list) + def from_config_paths(self, paths: List[str]) -> CDAG: + from noc.core.mongo.connection import connect + + connect() + cdags = [self.from_config_path(path) for path in paths] + cdag = cdags[0] + for n, other in enumerate(cdags[1:]): + cdag.merge(other, prefix=str(n)) + return cdag + def from_config_path(self, path: str) -> CDAG: + if path.startswith("scope://"): + return self.from_metric_scope(path[8:]) + return self.from_config_file(path) + + def from_config_file(self, path: str) -> CDAG: with open(path) as f: cfg = f.read() ext = os.path.splitext(path)[1] - cdag = CDAG("test", {}) if ext == ".json": - factory = JSONCDAGFactory(cdag, cfg) + return self.from_config_file_json(cfg) elif ext in (".yml", ".yaml"): - factory = YAMLCDAGFactory(cdag, cfg) + return self.from_config_file_yaml(cfg) else: self.die("Unknown config format") + + @staticmethod + def from_config_file_json(cfg: str) -> CDAG: + from noc.core.cdag.factory.json import JSONCDAGFactory + + cdag = CDAG("test", {}) + factory = JSONCDAGFactory(cdag, cfg) factory.construct() return cdag - def from_config_paths(self, paths: List[str]) -> CDAG: - cdags = [self.from_config_path(path) for path in paths] - cdag = cdags[0] - for n, other in enumerate(cdags[1:]): - cdag.merge(other, prefix=str(n)) + @staticmethod + def from_config_file_yaml(cfg: str) -> CDAG: + from noc.core.cdag.factory.yaml import YAMLCDAGFactory + + cdag = CDAG("test", {}) + factory = YAMLCDAGFactory(cdag, cfg) + factory.construct() + return cdag + + def from_metric_scope(self, scope_name: str) -> CDAG: + from noc.pm.models.metricscope import MetricScope + from noc.core.cdag.factory.scope import MetricScopeCDAGFactory + + ms = MetricScope.objects.filter(name=scope_name).first() + if not ms: + self.die(f"Metric scope {scope_name} is not found") + cdag = CDAG("test", {}) + factory = MetricScopeCDAGFactory(cdag, scope=ms, spool=False, sticky=True) + factory.construct() return cdag diff --git a/core/cdag/factory/scope.py b/core/cdag/factory/scope.py new file mode 100644 index 0000000000000000000000000000000000000000..1fe54b57818c8950b5d207baf8a7a901f58c7ec6 --- /dev/null +++ b/core/cdag/factory/scope.py @@ -0,0 +1,62 @@ +# ---------------------------------------------------------------------- +# MetricScopeCDAGFactory +# ---------------------------------------------------------------------- +# Copyright (C) 2007-2021 The NOC Project +# See LICENSE for details +# ---------------------------------------------------------------------- + +# Python modules +from typing import Optional + +# NOC modules +from noc.pm.models.metricscope import MetricScope +from noc.pm.models.metrictype import MetricType +from .base import BaseCDAGFactory, FactoryCtx +from ..graph import CDAG + + +class MetricScopeCDAGFactory(BaseCDAGFactory): + """ + Metric scope collection graph builder + """ + + def __init__( + self, + graph: CDAG, + scope: MetricScope, + ctx: Optional[FactoryCtx] = None, + namespace: Optional[str] = None, + spool: bool = True, + sticky: bool = False, + ): + super().__init__(graph, ctx, namespace) + self.scope = scope + self.spool = spool + self.sticky = sticky + + def construct(self) -> None: + # Construct probe nodes + probes = {} + for mt in MetricType.objects.filter(scope=self.scope.id).order_by("field_name"): + name = mt.field_name + probes[name] = self.graph.add_node( + name, + "probe", + description=f"Input collector for {name} metric", + config={"unit": mt.units.code, "scale": mt.scale.code}, + sticky=self.sticky, + ) + # Construct metric sender node + ms = self.graph.add_node( + "sender", + "metrics", + description=f"{self.scope.name} metric sender", + config={"scope": self.scope.table_name, "spool": self.spool}, + sticky=self.sticky, + ) + # Connect to the probes + for name, node in probes.items(): + node.subscribe(ms, name, dynamic=True) + # Additional key fields + for kf in self.scope.key_fields: + ms.add_input(kf.field_name)