diff --git a/collections/pm.metricaction/Average_CPU_Usage.json b/collections/pm.metricactions/Average_CPU_Usage.json similarity index 91% rename from collections/pm.metricaction/Average_CPU_Usage.json rename to collections/pm.metricactions/Average_CPU_Usage.json index da835f98a7be769152c2f053d42b405710cdb4fd..1988887747ed70456b1747905b1ab815887921a4 100644 --- a/collections/pm.metricaction/Average_CPU_Usage.json +++ b/collections/pm.metricactions/Average_CPU_Usage.json @@ -8,7 +8,7 @@ "name": "activation.min_window", "description": null, "default": "5", - "max_value": 3.0, + "max_value": 5.0, "min_value": 3.0, "type": "int" }, @@ -16,7 +16,7 @@ "name": "activation.max_window", "description": null, "default": "3", - "max_value": 3.0, + "max_value": 5.0, "min_value": 3.0, "type": "int" }, @@ -24,7 +24,7 @@ "name": "alarm.activation_level", "description": null, "default": "95", - "max_value": 20.0, + "max_value": 99.0, "min_value": 20.0, "type": "int" }, @@ -32,7 +32,7 @@ "name": "alarm.deactivation_level", "description": null, "default": "60", - "max_value": 20.0, + "max_value": 99.0, "min_value": 20.0, "type": "int" } diff --git a/collections/pm.metricaction/Average_Memory_Usage.json b/collections/pm.metricactions/Average_Memory_Usage.json similarity index 91% rename from collections/pm.metricaction/Average_Memory_Usage.json rename to collections/pm.metricactions/Average_Memory_Usage.json index 44adc046b99ab73a794145379d1443cabe24d498..c4cc1465c6e51a9222f39618260733af5f049dea 100644 --- a/collections/pm.metricaction/Average_Memory_Usage.json +++ b/collections/pm.metricactions/Average_Memory_Usage.json @@ -8,7 +8,7 @@ "name": "alarm.activation_level", "description": null, "default": "95", - "max_value": 20.0, + "max_value": 99.0, "min_value": 20.0, "type": "int" }, @@ -16,7 +16,7 @@ "name": "alarm.deactivation_level", "description": null, "default": "65", - "max_value": 20.0, + "max_value": 99.0, "min_value": 20.0, "type": "int" }, @@ -24,7 +24,7 @@ "name": "activation.min_window", "description": null, "default": "3", - "max_value": 3.0, + "max_value": 10.0, "min_value": 3.0, "type": "int" }, @@ -32,7 +32,7 @@ "name": "activation.max_window", "description": null, "default": "5", - "max_value": 3.0, + "max_value": 10.0, "min_value": 3.0, "type": "int" } diff --git a/collections/pm.metricaction/Interface_Bandwidth_more_than_N.json b/collections/pm.metricactions/Interface_Bandwidth_more_than_N.json similarity index 100% rename from collections/pm.metricaction/Interface_Bandwidth_more_than_N.json rename to collections/pm.metricactions/Interface_Bandwidth_more_than_N.json diff --git a/collections/pm.metricaction/Interface_Errors_In.json b/collections/pm.metricactions/Interface_Errors_In.json similarity index 91% rename from collections/pm.metricaction/Interface_Errors_In.json rename to collections/pm.metricactions/Interface_Errors_In.json index 6e4460c70b9d905251abba581702c43ee68fce15..247e21ae185dd7058375b4f1b49a0d0c426e9609 100644 --- a/collections/pm.metricaction/Interface_Errors_In.json +++ b/collections/pm.metricactions/Interface_Errors_In.json @@ -8,7 +8,7 @@ "name": "alarm.activation_level", "description": null, "default": "1", - "max_value": 1.0, + "max_value": 1000.0, "min_value": 1.0, "type": "float" }, @@ -16,7 +16,7 @@ "name": "alarm.deactivation_level", "description": null, "default": "1", - "max_value": 1.0, + "max_value": 1000.0, "min_value": 1.0, "type": "float" }, @@ -24,7 +24,7 @@ "name": "activation.min_window", "description": null, "default": "900", - "max_value": 300.0, + "max_value": 3600.0, "min_value": 300.0, "type": "int" }, @@ -32,7 +32,7 @@ "name": "activation.max_window", "description": null, "default": "1200", - "max_value": 300.0, + "max_value": 3600.0, "min_value": 300.0, "type": "int" } diff --git a/commands/cdag.py b/commands/cdag.py index 10c9d67cd80f32c2f0049eefd33cf06ac0bcdf41..a0cc6b8da39dc3af18d50e419b1611c0b7a40c32 100644 --- a/commands/cdag.py +++ b/commands/cdag.py @@ -91,7 +91,7 @@ class Command(BaseCommand): from noc.core.clickhouse.connect import connection now = datetime.datetime.now() - now = now - datetime.timedelta(hours=2) + now = now - datetime.timedelta(hours=4) q_args = [] if source.startswith("iface://"): source, iface = source[8:].split("::") @@ -109,11 +109,12 @@ class Command(BaseCommand): source = source[6:] source = self.get_source(source) query = SQL % ( - "cpu_usage", + "usage", "cpu", - source.managed_object.bi_id, + source.bi_id, now.date().isoformat(), now.replace(microsecond=0).isoformat(sep=" "), + "", ) else: self.die(f"Unknown source {source}") @@ -135,7 +136,7 @@ class Command(BaseCommand): def iter_metrics( self, f_input: Optional[str], metrics: Optional[List[str]] = None ) -> Iterable[Dict[str, Union[float, str]]]: - if f_input.startswith("mo://") or f_input.startswith("iface://"): + if f_input.startswith("cpu://") or f_input.startswith("iface://"): yield from self.input_from_device(f_input, metrics) else: yield from self.input_from_file(f_input) diff --git a/config.py b/config.py index 258dce08d046fb23c75db87e9b1c55e5cb5f88a3..f14d9463450e26d949e0f1d3816862c143ae9641 100644 --- a/config.py +++ b/config.py @@ -872,9 +872,10 @@ class Config(BaseConfig): enable_postgres_quantiles = BooleanParameter(default=False) class metrics(ConfigSection): - compact_on_start = BooleanParameter(default=True) + # Disable On Start, that compact procedure lost Consul session + compact_on_start = BooleanParameter(default=False) compact_on_stop = BooleanParameter(default=False) - flush_interval = SecondsParameter(default="1") + flush_interval = SecondsParameter(default="1m") compact_interval = SecondsParameter(default="5m") # Metrics disable_spool = BooleanParameter(default=False, help="Disable send metrics to Clickhouse") diff --git a/core/cdag/node/alarm.py b/core/cdag/node/alarm.py index d2ec14412cc8e96d88a7b1d193a528c9e56d03a0..3bf73778494674a4c520251fd9eba12d48c201a6 100644 --- a/core/cdag/node/alarm.py +++ b/core/cdag/node/alarm.py @@ -57,12 +57,12 @@ class AlarmNodeConfig(BaseModel): if config.reference: template = config.reference elif config.labels: - template = "th:{{object}}:{{alarm_class}}:{{';'.join(config.labels)}}" + template = "th:{{object or ''}}:{{alarm_class}}:{{';'.join(config.labels)}}" return Template(template).render( **{ "object": config.managed_object, "alarm_class": config.alarm_class, - "labels": config.labels, + "labels": config.labels or [], "vars": {v.name: v.value for v in config.vars or []}, } ) @@ -117,7 +117,8 @@ class AlarmNode(BaseCDAGNode): "managed_object": self.config.managed_object, "alarm_class": self.config.alarm_class, "labels": self.config.labels if self.config.labels else [], - "vars": {"ovalue": x, "tvalue": self.config.activation_level}, + # x is numpy.float64 type, ? + "vars": {"ovalue": round(float(x), 3), "tvalue": self.config.activation_level}, } # Render vars if self.config.vars: diff --git a/core/collection/base.py b/core/collection/base.py index 72664345d0ba4fd61f2fdaacc9b52c5ebaab1a20..d981875864ae781b0b617708feeb3b7b7bc066e0 100644 --- a/core/collection/base.py +++ b/core/collection/base.py @@ -240,6 +240,12 @@ class Collection(object): except ValueError as e: v = [] self.partial_errors[d["uuid"]] = str(e) + elif isinstance(field, EmbeddedDocumentField): + try: + v = field.document_type(**v) + except ValueError as e: + v = None + self.partial_errors[d["uuid"]] = str(e) # Dereference binary field if isinstance(field, BinaryField): v = b85decode(v) @@ -313,7 +319,7 @@ class Collection(object): o.save() # Try again return self.update_item(data) - self.stdout.write("Not find object by query: %s\n" % qs) + self.stdout.write("Not find object by query: %s\n" % qs) raise def delete_item(self, uuid): diff --git a/core/service/base.py b/core/service/base.py index edc26c6b53b0cdc45d32cffd4a3585b853ddc1cb..472c87fff9a0ca03de479580f68f41a9b2b5d32c 100644 --- a/core/service/base.py +++ b/core/service/base.py @@ -15,6 +15,8 @@ import argparse import threading from time import perf_counter import asyncio +import cachetools +from functools import partial from typing import ( Optional, Dict, @@ -881,3 +883,17 @@ class BaseService(object): # Cluster election in progress or cluster is misconfigured self.logger.info("Stream '%s' has no active partitions. Waiting" % stream) await asyncio.sleep(1) + + @staticmethod + @cachetools.cached(cachetools.TTLCache(maxsize=128, ttl=60)) + def get_slot_limits(slot_name): + """ + Get slot count + :param slot_name: + :return: + """ + from noc.core.dcs.loader import get_dcs + from noc.core.ioloop.util import run_sync + + dcs = get_dcs() + return run_sync(partial(dcs.get_slot_limit, slot_name)) diff --git a/inv/models/sensor.py b/inv/models/sensor.py index 3368989d7334bbfd811b31ecb6154d292162850f..afa730c3fbe25c760f390684ae52bcb0ed54096f 100644 --- a/inv/models/sensor.py +++ b/inv/models/sensor.py @@ -63,7 +63,14 @@ class Sensor(Document): "collection": "sensors", "strict": False, "auto_create_index": False, - "indexes": ["agent", "managed_object", "object", "labels", "effective_labels"], + "indexes": [ + "agent", + "managed_object", + "object", + "labels", + "effective_labels", + ("managed_object", "object"), + ], } profile: "SensorProfile" = PlainReferenceField( @@ -241,6 +248,8 @@ class Sensor(Document): :param sensor: :return: """ + if not sensor.state.is_productive: + return {} labels = [] for ll in sensor.effective_labels: l_c = Label.get_by_name(ll) diff --git a/models.py b/models.py index 80044c801667832bdd5fbf39996583d047d25236..67d0add8c30f768c9632633a902f4ef05d57c7b6 100644 --- a/models.py +++ b/models.py @@ -249,7 +249,7 @@ _MODELS = { "pm.AgentProfile": "noc.pm.models.agent.AgentProfile", "pm.Scale": "noc.pm.models.scale.Scale", "pm.MeasurementUnits": "noc.pm.models.measurementunits.MeasurementUnits", - "pm.MetricAction": "noc.pm.models.metricrule.MetricAction", + "pm.MetricAction": "noc.pm.models.metricaction.MetricAction", "pm.MetricRule": "noc.pm.models.metricrule.MetricRule", "pm.MetricScope": "noc.pm.models.metricscope.MetricScope", "pm.MetricType": "noc.pm.models.metrictype.MetricType", diff --git a/pm/models/metricaction.py b/pm/models/metricaction.py index 45bdc73df6522bcc30a5d7119dfe4346dba1e34e..c63488fd26ee687b838360335cd34b8755c01522 100644 --- a/pm/models/metricaction.py +++ b/pm/models/metricaction.py @@ -72,7 +72,7 @@ class MetricActionParam(EmbeddedDocument): "type": self.type, "description": self.description, "min_value": self.min_value, - "max_value": self.min_value, + "max_value": self.max_value, "default": self.default, } diff --git a/sa/models/managedobject.py b/sa/models/managedobject.py index 2f1ecc2f801c4c8239755e22c735934249022112..910f3a24aa442a092ca2c9e67696f338342a3891 100644 --- a/sa/models/managedobject.py +++ b/sa/models/managedobject.py @@ -2889,7 +2889,7 @@ class ManagedObject(NOCModel): metrics=tuple(metrics), labels=(f"noc::interface::{i['name']}",), hints=[f"ifindex::{ifindex}"] if ifindex else None, - service=i.get("service"), + # service=i.get("service"), ) if not i_profile.allow_subinterface_metrics: continue @@ -2917,31 +2917,35 @@ class ManagedObject(NOCModel): :return: """ from noc.inv.models.interface import Interface + from noc.inv.models.interfaceprofile import InterfaceProfile if not mo.is_managed: return {} + icoll = Interface._get_collection() s_metrics = mo.object_profile.get_object_profile_metrics(mo.object_profile.id) labels = [] for ll in mo.effective_labels: l_c = Label.get_by_name(ll) labels.append({"label": ll, "expose_metric": l_c.expose_metric if l_c else False}) items = [] - for iface in Interface.objects.filter(managed_object=mo.id): + for iface in icoll.find({"managed_object": mo.id}, {"name", "effective_labels", "profile"}): + ip = InterfaceProfile.get_by_id(iface["profile"]) metrics = [ { "name": mc.metric_type.field_name, "is_stored": mc.is_stored, "is_composed": bool(mc.metric_type.compose_expression), } - for mc in iface.profile.metrics + for mc in ip.metrics ] if not metrics: continue items.append( { - "key_labels": [f"noc::interface::{iface.name}"], + "key_labels": [f"noc::interface::{iface['name']}"], "labels": [ - {"label": ll, "expose_metric": False} for ll in iface.effective_labels + {"label": ll, "expose_metric": False} + for ll in iface.get("effective_labels", []) ], "metrics": metrics, } @@ -2973,10 +2977,12 @@ class ManagedObject(NOCModel): from noc.inv.models.object import Object from mongoengine.queryset import Q as m_Q + if not self.is_managed: + return False sla_probe = SLAProbe.objects.filter(managed_object=self.id).first() - config = self.get_metric_config(self) o = Object.get_managed(self) sensor = Sensor.objects.filter(m_Q(managed_object=self.id) | m_Q(object__in=o)).first() + config = self.get_metric_config(self) return bool(sla_probe or sensor or config.get("metrics") or config.get("items")) diff --git a/services/discovery/jobs/periodic/metrics.py b/services/discovery/jobs/periodic/metrics.py index 8594e59d663abdeac2efabb3d5f9e63308240d74..e6ca1a9ff405e530426dd8a4517a3e97eccc788e 100644 --- a/services/discovery/jobs/periodic/metrics.py +++ b/services/discovery/jobs/periodic/metrics.py @@ -110,8 +110,8 @@ class MetricsCheck(DiscoveryCheck): collected=metrics, streaming={ "stream": "metrics", - "partition": 0, - "utc_offset": config.timezone._utcoffset.seconds, + "partition": self.object.id % self.service.get_slot_limits("metrics"), + "utc_offset": config.tz_utc_offset, "data": s_data, }, ) diff --git a/services/metrics/changelog.py b/services/metrics/changelog.py index 3b0632595a29273486dab89f6725ccaca8aadb39..b14430322974957dbd235b66a0026f2361881f0a 100644 --- a/services/metrics/changelog.py +++ b/services/metrics/changelog.py @@ -9,7 +9,6 @@ # Python modules from typing import Any, Dict, List, Iterable import pickle -import lzma import logging import datetime import asyncio @@ -19,6 +18,13 @@ from pymongo import InsertOne, DeleteMany, ASCENDING, DESCENDING from pymongo.collection import Collection from bson import ObjectId +try: + # Only for Python 3.3+ + import lzma as compressor # noqa +except ImportError: + # logger.debug("lzma module is not available") + import bz2 as compressor # noqa + # NOC modules from noc.core.mongo.connection_async import get_db @@ -82,14 +88,14 @@ class ChangeLog(object): Encode state to bytes """ r = pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL) - return lzma.compress(r) + return compressor.compress(r) @staticmethod def decode(data: bytes) -> Dict[str, Dict[str, Any]]: """ Decode bytes to state """ - r = lzma.decompress(data) + r = compressor.decompress(data) return pickle.loads(r) async def feed(self, state: Dict[str, Dict[str, Any]]) -> None: diff --git a/services/metrics/service.py b/services/metrics/service.py index b6eec392466b6ed42b92f95b503738e7f91e4388..78049d425df535df639cc36f6663428f9f52f9d6 100755 --- a/services/metrics/service.py +++ b/services/metrics/service.py @@ -18,6 +18,8 @@ import random # Third-party modules import orjson +import cachetools +from pymongo import DESCENDING # NOC modules from noc.core.service.fastapi import FastAPIService @@ -36,6 +38,7 @@ from noc.core.cdag.factory.scope import MetricScopeCDAGFactory from noc.core.cdag.factory.config import ConfigCDAGFactory, GraphConfig from noc.services.metrics.changelog import ChangeLog from noc.services.metrics.datastream import MetricsDataStreamClient, MetricRulesDataStreamClient +from noc.services.datastream.streams.cfgmetricsources import CfgMetricSourcesDataStream from noc.config import config as global_config # MetricKey - scope, key ctx: (managed_object, ), Key Labels @@ -288,7 +291,7 @@ class MetricsService(FastAPIService): self.logger.info("Waiting for mappings") await self.mappings_ready_event.wait() self.logger.info("Mappings are ready") - await self.subscribe_stream("metrics", self.slot_number, self.on_metrics) + await self.subscribe_stream("metrics", self.slot_number, self.on_metrics, async_cursor=True) async def on_deactivate(self): if self.change_log: @@ -318,11 +321,14 @@ class MetricsService(FastAPIService): """ # Register RPC aliases client = MetricsDataStreamClient("cfgmetricsources", service=self) + coll = CfgMetricSourcesDataStream.get_collection() + r = next(coll.find({}).sort([("change_id", DESCENDING)]), None) # Track stream changes while True: self.logger.info("Starting to track object mappings") try: await client.query( + change_id=str(r["change_id"]) if "change_id" in r else None, limit=global_config.metrics.ds_limit, block=True, filter_policy="delete", @@ -353,6 +359,7 @@ class MetricsService(FastAPIService): async def on_metrics(self, msg: Message) -> None: data = orjson.loads(msg.value) state = {} + metrics["messages"] += 1 for item in data: scope = item.get("scope") if not scope: @@ -439,11 +446,12 @@ class MetricsService(FastAPIService): ) @staticmethod + @cachetools.cached(cachetools.TTLCache(maxsize=128, ttl=60)) def get_key_hash(k: MetricKey) -> str: """ Calculate persistent hash for metric key """ - d = hashlib.sha512(str(k).encode("utf-8")).digest() + d = hashlib.blake2b(str(k).encode("utf-8")).digest() return codecs.encode(d, "base_64")[:7].decode("utf-8") @staticmethod @@ -597,12 +605,12 @@ class MetricsService(FastAPIService): if is_composed: probe_cls = ComposeProbeNode prefix = self.get_key_hash(k) - state_id = f"{self.get_key_hash(k)}::{metric_field}" + state_id = f"{prefix}::{metric_field}" # Create Probe p = probe_cls.construct( metric_field, prefix=prefix, - state=self.start_state.get(state_id), + state=self.start_state.pop(state_id, None), config=self.metric_configs.get(metric_field), sticky=True, ) @@ -617,6 +625,13 @@ class MetricsService(FastAPIService): metrics["cdag_nodes", ("type", p.name)] += 1 return p + @cachetools.cached(cachetools.TTLCache(maxsize=128, ttl=60)) + def get_source(self, s_id): + coll = CfgMetricSourcesDataStream.get_collection() + data = coll.find_one({"_id": str(s_id)}) + sc = self.get_source_config(orjson.loads(data["data"])) + return sc + def get_source_info(self, k: MetricKey) -> Optional[SourceInfo]: """ Get source Info by Metric Key. Sources: @@ -630,13 +645,17 @@ class MetricsService(FastAPIService): key_ctx, source = dict(k[1]), None sensor, sla_probe = None, None if "sensor" in key_ctx: - sensor = self.sources_config.get(key_ctx["sensor"]) + source = self.get_source(key_ctx["sensor"]) + # sensor = self.sources_config.get(key_ctx["sensor"]) elif "sla_probe" in key_ctx: - sla_probe = self.sources_config.get(key_ctx["sla_probe"]) + source = self.get_source(key_ctx["sla_probe"]) + # sla_probe = self.sources_config.get(key_ctx["sla_probe"]) if "agent" in key_ctx: - source = self.sources_config.get(key_ctx["agent"]) + source = self.get_source(key_ctx["agent"]) + # source = self.sources_config.get(key_ctx["agent"]) elif "managed_object" in key_ctx: - source = self.sources_config.get(key_ctx["managed_object"]) + source = self.get_source(key_ctx["managed_object"]) + # source = self.sources_config.get(key_ctx["managed_object"]) if not source: return composed_metrics = [] @@ -668,7 +687,8 @@ class MetricsService(FastAPIService): # Getting Context source = self.get_source_info(k) if not source: - self.logger.info("[%s] Unknown metric source. Skipping apply rules", k) + self.logger.debug("[%s] Unknown metric source. Skipping apply rules", k) + metrics["unknown_metric_source"] += 1 return s_labels = set(self.merge_labels(source.labels, labels)) # Appy matched rules @@ -781,15 +801,12 @@ class MetricsService(FastAPIService): sender.activate(tx, "labels", data.get("labels") or []) return tx.get_changed_state() - async def update_source_config(self, data: Dict[str, Any]) -> None: - """ - Update source config. - """ - sc_id = int(data["id"]) + @staticmethod + def get_source_config(data): sc = SourceConfig( type=data["type"], bi_id=data["bi_id"], - fm_pool=sys.intern(data["fm_pool"]) if data["fm_pool"] else None, + fm_pool=data["fm_pool"] if data["fm_pool"] else None, labels=tuple(sys.intern(ll["label"]) for ll in data["labels"]), metrics=tuple( sys.intern(m["name"]) for m in data["metrics"] if not m.get("is_composed") @@ -809,12 +826,27 @@ class MetricsService(FastAPIService): ), ) ) - if sc_id not in self.sources_config: - self.sources_config[sc_id] = sc + return sc + + async def update_source_config(self, data: Dict[str, Any]) -> None: + """ + Update source config. + """ + if not self.cards: + # Initial config return - diff = self.sources_config[sc_id].is_differ(sc) - if "condition" in diff: - self.invalidate_card_config(sc) + sc_id = int(data["id"]) + if "type" not in data: + self.logger.info("[%s] Bad Source data", sc_id) + return + sc = self.get_source_config(data) + self.invalidate_card_config(sc) + # if sc_id not in self.sources_config: + # self.sources_config[sc_id] = sc + # return + # diff = self.sources_config[sc_id].is_differ(sc) + # if "condition" in diff: + # self.invalidate_card_config(sc) async def delete_source_config(self, c_id: int) -> None: """ @@ -961,6 +993,7 @@ class MetricsService(FastAPIService): elif diff.intersection({"conditions", "graph"}): # Invalidate Cards self.logger.info("[%s] %s Changed. Invalidate cards for rules", r.id, diff) + self.rules[r_id] = r invalidate_rules.add(r_id) if invalidate_rules: await self.invalidate_card_rules(invalidate_rules)