metrics.py 41.1 KB
Newer Older
Dmitry Lukhtionov's avatar
Dmitry Lukhtionov committed
1
2
3
# ---------------------------------------------------------------------
# Metric collector
# ---------------------------------------------------------------------
MaksimSmile13's avatar
MaksimSmile13 committed
4
# Copyright (C) 2007-2021 The NOC Project
Dmitry Lukhtionov's avatar
Dmitry Lukhtionov committed
5
6
7
8
# See LICENSE for details
# ---------------------------------------------------------------------

# Python modules
9
from threading import Lock
10
import operator
11
12
13
import itertools
import time
from collections import defaultdict
14
15
from typing import Any, Optional, List, Dict, Set
from dataclasses import dataclass
Dmitry Volodin's avatar
Dmitry Volodin committed
16

Dmitry Lukhtionov's avatar
Dmitry Lukhtionov committed
17
# Third-party modules
Dmitry Volodin's avatar
Dmitry Volodin committed
18
import cachetools
19
from pymongo import ReadPreference
Dmitry Volodin's avatar
Dmitry Volodin committed
20
import orjson
21
from bson import ObjectId
Dmitry Volodin's avatar
Dmitry Volodin committed
22

Dmitry Lukhtionov's avatar
Dmitry Lukhtionov committed
23
# NOC modules
Dmitry Volodin's avatar
Dmitry Volodin committed
24
from noc.services.discovery.jobs.base import DiscoveryCheck
25
from noc.inv.models.object import Object
26
from noc.sa.models.managedobjectprofile import ManagedObjectProfile
Dmitry Volodin's avatar
Dmitry Volodin committed
27
from noc.inv.models.interfaceprofile import InterfaceProfile
28
from noc.inv.models.sensorprofile import SensorProfile
Dmitry Volodin's avatar
Dmitry Volodin committed
29
from noc.inv.models.interface import Interface
30
from noc.inv.models.subinterface import SubInterface
31
from noc.inv.models.sensor import Sensor
Dmitry Volodin's avatar
Fix    
Dmitry Volodin committed
32
from noc.fm.models.alarmclass import AlarmClass
Dmitry Volodin's avatar
Dmitry Volodin committed
33
from noc.pm.models.metrictype import MetricType
34
from noc.sla.models.slaprofile import SLAProfile
35
from noc.sla.models.slaprobe import SLAProbe
36
from noc.wf.models.state import State
Dmitry Volodin's avatar
Dmitry Volodin committed
37
from noc.pm.models.thresholdprofile import ThresholdProfile
38
from noc.core.hash import hash_str
Dmitry Volodin's avatar
Dmitry Volodin committed
39
40


41
42
43
MAX31 = 0x7FFFFFFF
MAX32 = 0xFFFFFFFF
MAX64 = 0xFFFFFFFFFFFFFFFF
44

45
46
NS = 1000000000.0

47
48
MT_COUNTER = "counter"
MT_BOOL = "bool"
49
MT_DELTA = "delta"
50
MT_COUNTER_DELTA = {MT_COUNTER, MT_DELTA}
51
52
53
54
55
56
57
58

WT_MEASURES = "m"
WT_TIME = "t"

SCOPE_OBJECT = "object"
SCOPE_INTERFACE = "interface"
SCOPE_SLA = "sla"

59
metrics_lock = Lock()
60

61
62
63
64
65
66
67
68

@dataclass
class MetricConfig(object):
    metric_type: MetricType
    enable_box: bool
    enable_periodic: bool
    is_stored: bool
    threshold_profile: Optional[ThresholdProfile]
69
70


71
class MData(object):
Dmitry Volodin's avatar
Dmitry Volodin committed
72
73
74
75
76
77
78
79
80
81
82
83
    __slots__ = (
        "id",
        "ts",
        "metric",
        "labels",
        "value",
        "scale",
        "type",
        "abs_value",
        "label",
        "_key_fmt",
    )
84

Dmitry Volodin's avatar
Dmitry Volodin committed
85
    def __init__(
Dmitry Volodin's avatar
Dmitry Volodin committed
86
87
88
89
90
91
92
93
94
        self,
        id,
        ts,
        metric,
        labels=None,
        value=None,
        scale=None,
        type=None,
        abs_value=None,
Dmitry Volodin's avatar
Dmitry Volodin committed
95
    ):
96
97
98
        self.id = id
        self.ts = ts
        self.metric = metric
Dmitry Volodin's avatar
Dmitry Volodin committed
99
        self.labels = labels
100
101
102
103
        self.value = value
        self.scale = scale
        self.type = type
        self.abs_value = abs_value
Dmitry Volodin's avatar
Dmitry Volodin committed
104
105
106
        if labels:
            self.label = "%s|%s" % (metric, "|".join(str(label) for label in sorted(labels)))
            self._key_fmt = "%%x|%s" % ("|".join(str(label) for label in sorted(labels)),)
107
108
        else:
            self.label = metric
Dmitry Volodin's avatar
Dmitry Volodin committed
109
            self._key_fmt = "%x"
110
111

    def __repr__(self):
Dmitry Volodin's avatar
Dmitry Volodin committed
112
113
114
115
        return f"<MData #{self.id} {self.metric}>"

    def get_key(self, x: int) -> str:
        return self._key_fmt % x
116
117


Dmitry Volodin's avatar
Dmitry Volodin committed
118
119
120
121
class MetricsCheck(DiscoveryCheck):
    """
    MAC discovery
    """
Dmitry Volodin's avatar
Dmitry Volodin committed
122

Dmitry Volodin's avatar
Dmitry Volodin committed
123
124
125
    name = "metrics"
    required_script = "get_metrics"

126
127
    _object_profile_metrics = cachetools.TTLCache(1000, 60)
    _interface_profile_metrics = cachetools.TTLCache(1000, 60)
Dmitry Volodin's avatar
Dmitry Volodin committed
128
    _slaprofile_metrics = cachetools.TTLCache(1000, 60)
129
130
131
132
133

    S_OK = 0
    S_WARN = 1
    S_ERROR = 2

Dmitry Volodin's avatar
Dmitry Volodin committed
134
    SMAP = {0: "ok", 1: "warn", 2: "error"}
135

Dmitry Volodin's avatar
Dmitry Volodin committed
136
    SEV_MAP = {1: 2000, 2: 3000}
137

Dmitry Volodin's avatar
Dmitry Volodin committed
138
    SLA_CAPS = ["Cisco | IP | SLA | Probes"]
139
140

    def __init__(self, *args, **kwargs):
Dmitry Volodin's avatar
Dmitry Volodin committed
141
        super().__init__(*args, **kwargs)
142
        self.id_count = itertools.count()
143
144
        self.id_metrics: Dict[str, MetricConfig] = {}
        self.id_ctx: Dict[str, Dict[str, Any]] = {}
145
146
        # MetricID -> SensorId Map
        self.sensors_metrics: Dict[str, int] = {}
147
148
        # MetricID -> SLAId Map
        self.sla_probe_metrics: Dict[str, int] = {}
149

150
151
152
153
154
    @staticmethod
    @cachetools.cached({})
    def get_ac_pm_thresholds():
        return AlarmClass.get_by_name("NOC | PM | Out of Thresholds")

155
    @classmethod
156
    @cachetools.cachedmethod(
Dmitry Volodin's avatar
Dmitry Volodin committed
157
        operator.attrgetter("_object_profile_metrics"), lock=lambda _: metrics_lock
158
    )
159
    def get_object_profile_metrics(cls, p_id: int) -> Dict[str, MetricConfig]:
160
161
162
163
164
165
166
167
168
169
170
        r = {}
        opr = ManagedObjectProfile.get_by_id(id=p_id)
        if not opr:
            return r
        for m in opr.metrics:
            mt_id = m.get("metric_type")
            if not mt_id:
                continue
            mt = MetricType.get_by_id(mt_id)
            if not mt:
                continue
Dmitry Volodin's avatar
Dmitry Volodin committed
171
172
            if m.get("threshold_profile"):
                threshold_profile = ThresholdProfile.get_by_id(m.get("threshold_profile"))
173
174
            else:
                threshold_profile = None
175
            r[mt.name] = MetricConfig(
176
                mt,
177
178
                m.get("enable_box", True),
                m.get("enable_periodic", True),
179
                m.get("is_stored", True),
Dmitry Volodin's avatar
Dmitry Volodin committed
180
                threshold_profile,
181
182
183
            )
        return r

184
    @staticmethod
185
    def config_from_settings(m) -> MetricConfig:
186
187
188
189
190
191
        """
        Returns MetricConfig from .metrics field
        :param m:
        :return:
        """
        return MetricConfig(
Dmitry Volodin's avatar
Dmitry Volodin committed
192
            m.metric_type, m.enable_box, m.enable_periodic, m.is_stored, m.threshold_profile
193
194
        )

195
196
    @classmethod
    @cachetools.cachedmethod(
Dmitry Volodin's avatar
Dmitry Volodin committed
197
        operator.attrgetter("_interface_profile_metrics"), lock=lambda _: metrics_lock
198
    )
199
    def get_interface_profile_metrics(cls, p_id: ObjectId) -> Dict[str, MetricConfig]:
200
        r = {}
201
        ipr = InterfaceProfile.get_by_id(id=p_id)
202
        if not ipr:
203
            return r
204
        for m in ipr.metrics:
205
            r[m.metric_type.name] = cls.config_from_settings(m)
206
207
        return r

208
    @classmethod
209
    @cachetools.cachedmethod(
Dmitry Volodin's avatar
Dmitry Volodin committed
210
211
        operator.attrgetter("_slaprofile_metrics"), lock=lambda _: metrics_lock
    )
212
    def get_slaprofile_metrics(cls, p_id: ObjectId) -> Dict[str, MetricConfig]:
213
        r = {}
214
215
216
217
        spr = SLAProfile.get_by_id(p_id)
        if not spr:
            return r
        for m in spr.metrics:
218
            r[m.metric_type.name] = cls.config_from_settings(m)
219
220
        return r

221
222
223
224
225
    def get_object_metrics(self):
        """
        Populate metrics list with objects metrics
        :return:
        """
Dmitry Volodin's avatar
Dmitry Volodin committed
226
        # @todo: Inject ManagedObject.effective_labels
227
        metrics = []
228
229
        o_metrics = self.get_object_profile_metrics(self.object.object_profile.id)
        self.logger.debug("Object metrics: %s", o_metrics)
230
        for metric in o_metrics:
Dmitry Volodin's avatar
Dmitry Volodin committed
231
232
233
            if (self.is_box and not o_metrics[metric].enable_box) or (
                self.is_periodic and not o_metrics[metric].enable_periodic
            ):
234
                continue
235
            m_id = next(self.id_count)
Dmitry Volodin's avatar
Dmitry Volodin committed
236
            metrics += [{"id": m_id, "metric": metric}]
237
238
239
240
241
            self.id_metrics[m_id] = o_metrics[metric]
        if not metrics:
            self.logger.info("Object metrics are not configured. Skipping")
        return metrics

242
243
    def get_subinterfaces(self):
        subs = defaultdict(list)  # interface id -> [{"name":, "ifindex":}]
Dmitry Volodin's avatar
Dmitry Volodin committed
244
245
246
247
248
249
        for si in (
            SubInterface._get_collection()
            .with_options(read_preference=ReadPreference.SECONDARY_PREFERRED)
            .find({"managed_object": self.object.id}, {"name": 1, "interface": 1, "ifindex": 1})
        ):
            subs[si["interface"]] += [{"name": si["name"], "ifindex": si.get("ifindex")}]
250
251
        return subs

252
253
254
255
256
    def get_interface_metrics(self):
        """
        Populate metrics list with interface metrics
        :return:
        """
Dmitry Volodin's avatar
Dmitry Volodin committed
257
        # @todo: Inject Interface.effective_labels
258
        subs = None
259
        metrics = []
Dmitry Volodin's avatar
Dmitry Volodin committed
260
261
262
263
264
        for i in (
            Interface._get_collection()
            .with_options(read_preference=ReadPreference.SECONDARY_PREFERRED)
            .find(
                {"managed_object": self.object.id, "type": "physical"},
265
266
267
268
269
270
271
272
273
                {
                    "_id": 1,
                    "name": 1,
                    "ifindex": 1,
                    "profile": 1,
                    "in_speed": 1,
                    "out_speed": 1,
                    "bandwidth": 1,
                },
Dmitry Volodin's avatar
Dmitry Volodin committed
274
275
            )
        ):
276
            ipr = self.get_interface_profile_metrics(i["profile"])
277
            self.logger.debug("Interface %s. ipr=%s", i["name"], ipr)
Dmitry Volodin's avatar
Dmitry Volodin committed
278
            if not ipr:
279
                continue  # No metrics configured
280
281
282
283
            i_profile = InterfaceProfile.get_by_id(i["profile"])
            if i_profile.allow_subinterface_metrics and subs is None:
                # Resolve subinterfaces
                subs = self.get_subinterfaces()
284
            ifindex = i.get("ifindex")
Dmitry Volodin's avatar
Dmitry Volodin committed
285
            for metric in ipr:
Dmitry Volodin's avatar
Dmitry Volodin committed
286
287
288
                if (self.is_box and not ipr[metric].enable_box) or (
                    self.is_periodic and not ipr[metric].enable_periodic
                ):
289
                    continue
290
                m_id = next(self.id_count)
Dmitry Volodin's avatar
Dmitry Volodin committed
291
                m = {"id": m_id, "metric": metric, "labels": [f"noc::interface::{i['name']}"]}
292
293
294
295
                if ifindex is not None:
                    m["ifindex"] = ifindex
                metrics += [m]
                self.id_metrics[m_id] = ipr[metric]
296
297
                if i_profile.allow_subinterface_metrics:
                    for si in subs[i["_id"]]:
MaksimSmile13's avatar
MaksimSmile13 committed
298
299
300
301
302
                        if si["name"] != i["name"]:
                            m_id = next(self.id_count)
                            m = {
                                "id": m_id,
                                "metric": metric,
Dmitry Volodin's avatar
Dmitry Volodin committed
303
304
305
306
                                "labels": [
                                    f"noc::interface::{i['name']}",
                                    f"noc::subinterface::{si['name']}",
                                ],
MaksimSmile13's avatar
MaksimSmile13 committed
307
308
309
310
311
                            }
                            if si["ifindex"] is not None:
                                m["ifindex"] = si["ifindex"]
                            metrics += [m]
                            self.id_metrics[m_id] = ipr[metric]
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
                threshold_profile = ipr[metric].threshold_profile
                if threshold_profile and threshold_profile.value_handler:
                    # Fill window context
                    in_speed: int = i.get("in_speed") or 0
                    out_speed: int = i.get("out_speed") or 0
                    bandwidth: int = i.get("bandwidth") or 0
                    if in_speed and not out_speed:
                        out_speed = in_speed
                    elif not in_speed and out_speed:
                        in_speed = out_speed
                    if not bandwidth:
                        bandwidth = max(in_speed, out_speed)
                    self.id_ctx[m_id] = {
                        "in_speed": in_speed,
                        "out_speed": out_speed,
                        "bandwidth": bandwidth,
                    }
329
330
331
332
333
334
        if not metrics:
            self.logger.info("Interface metrics are not configured. Skipping")
        return metrics

    def get_sla_metrics(self):
        if not self.has_any_capability(self.SLA_CAPS):
335
            self.logger.info("SLA not configured, skipping SLA metrics")
336
        metrics = []
Dmitry Volodin's avatar
Dmitry Volodin committed
337
338
339
340
        for p in (
            SLAProbe._get_collection()
            .with_options(read_preference=ReadPreference.SECONDARY_PREFERRED)
            .find(
341
                {"managed_object": self.object.id},
342
                {"name": 1, "state": 1, "group": 1, "profile": 1, "type": 1, "bi_id": 1},
Dmitry Volodin's avatar
Dmitry Volodin committed
343
344
            )
        ):
345
            if not p.get("profile"):
vean's avatar
vean committed
346
                self.logger.debug("Probe %s has no profile. Skipping", p["name"])
347
                continue
348
349
350
351
            state = State.get_by_id(p["state"])
            if not state.is_productive:
                self.logger.debug("[%s] SLA Probe is not productive state. Skipping", p["name"])
                continue
352
353
354
            pm = self.get_slaprofile_metrics(p["profile"])
            if not pm:
                self.logger.debug(
Dmitry Volodin's avatar
Dmitry Volodin committed
355
356
                    "Probe %s has profile '%s' with no configured metrics. " "Skipping",
                    p["name"],
357
                    p["profile"],
358
359
360
                )
                continue
            for metric in pm:
Dmitry Volodin's avatar
Dmitry Volodin committed
361
362
363
                if (self.is_box and not pm[metric].enable_box) or (
                    self.is_periodic and not pm[metric].enable_periodic
                ):
364
                    continue
365
                m_id = next(self.id_count)
Dmitry Volodin's avatar
Dmitry Volodin committed
366
367
368
369
                labels = [f"noc::sla::name::{p['name']}"]
                sla_group = p.get("group", "")
                if sla_group:
                    labels += [f"noc::sla::group::{sla_group}"]
Dmitry Volodin's avatar
Dmitry Volodin committed
370
371
372
373
                metrics += [
                    {
                        "id": m_id,
                        "metric": metric,
Dmitry Volodin's avatar
Dmitry Volodin committed
374
                        "labels": labels,
Dmitry Volodin's avatar
Dmitry Volodin committed
375
376
377
                        "sla_type": p["type"],
                    }
                ]
378
                self.id_metrics[m_id] = pm[metric]
379
                self.sla_probe_metrics[m_id] = p["bi_id"]
380
        if not metrics:
381
382
383
            self.logger.info("SLA metrics are not configured. Skipping")
        return metrics

384
385
386
387
388
389
390
391
392
393
394
395
    def get_sensor_metrics(self):
        metrics = []
        o = Object.get_managed(self.object).values_list("id")
        for s in (
            Sensor._get_collection()
            .with_options(read_preference=ReadPreference.SECONDARY_PREFERRED)
            .find(
                {"object": {"$in": list(o)}, "snmp_oid": {"$exists": True}},
                {"local_id": 1, "profile": 1, "state": 1, "snmp_oid": 1, "labels": 1, "bi_id": 1},
            )
        ):
            if not s.get("profile"):
396
                self.logger.debug("[%s] Sensor has no profile. Skipping", s["local_id"])
397
398
399
400
401
402
                continue
            pm: "SensorProfile" = SensorProfile.get_by_id(s["profile"])
            if not pm.enable_collect:
                continue
            state = State.get_by_id(s["state"])
            if not state.is_productive:
403
                self.logger.debug("[%s] Sensor is not productive state. Skipping", s["local_id"])
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
                continue
            for mtype in ["Sensor | Value", "Sensor | Status"]:
                m_id = next(self.id_count)
                metric = MetricType.get_by_name(mtype)
                labels = [f'noc::sensor::{s["local_id"]}'] + s.get("labels", [])
                metrics += [
                    {
                        "id": m_id,
                        "metric": metric.name,
                        "labels": labels,
                        "oid": s["snmp_oid"],
                    }
                ]
                self.id_metrics[m_id] = MetricConfig(metric, False, True, True, None)
                self.sensors_metrics[m_id] = int(s["bi_id"])
        return metrics

421
    def process_result(self, result: List[MData]):
422
423
424
425
426
427
        """
        Process IGetMetrics result
        :param result:
        :return:
        """
        # Restore last counter state
428
        if self.has_artefact("reboot"):
Dmitry Volodin's avatar
Dmitry Volodin committed
429
            self.logger.info("Resetting counter context due to detected reboot")
430
            self.job.context["counters"] = {}
431
        counters = self.job.context["counters"]
432
        alarms = []
MaksimSmile13's avatar
MaksimSmile13 committed
433
        events = []
434
        data = defaultdict(dict)  # table -> item hash -> {field:value, ...}
435
        n_metrics = 0
436
        mo_id = self.object.bi_id
437
        ts_cache = {}  # timestamp -> (date, ts)
438
439
440
441
442
443
444
445
446
447
        # Calculate time_delta
        time_delta = self.job.context.get("time_delta", None)
        if time_delta:
            del self.job.context["time_delta"]  # Remove from context
        if time_delta and time_delta > 0xFFFF:
            self.logger.info(
                "time_delta overflow (%d). time_delta measurement will be dropped" % time_delta
            )
            time_delta = None
        # Process collected metrics
448
        seen: Set[str] = set()
449
        for m in result:
Dmitry Volodin's avatar
Dmitry Volodin committed
450
451
            # Filter out duplicates
            labels = m.labels
452
            cfg = self.id_metrics.get(m.id)
Dmitry Volodin's avatar
Dmitry Volodin committed
453
            key = m.get_key(cfg.metric_type.bi_id)
454
455
456
457
458
            if key in seen:
                # Prevent duplicated metrics
                self.logger.error(
                    "Duplicated metric %s [%s]. Ignoring",
                    cfg.metric_type.name,
Dmitry Volodin's avatar
Dmitry Volodin committed
459
                    "|".join(str(label) for label in sorted(labels)),
460
461
462
                )
                continue
            seen.add(key)
463
            if m.type in MT_COUNTER_DELTA:
464
465
466
467
468
469
                # Counter type
                # Restore old value and save new
                r = counters.get(key)
                counters[key] = (m.ts, m.value)
                if r is None:
                    # No stored state
470
                    self.logger.debug(
Dmitry Volodin's avatar
Dmitry Volodin committed
471
472
                        "[%s] COUNTER value is not found. " "Storing and waiting for a new result",
                        m.label,
473
474
475
476
                    )
                    continue
                # Calculate counter
                self.logger.debug(
Dmitry Volodin's avatar
Dmitry Volodin committed
477
                    "[%s] Old value: %s@%s, new value: %s@%s.", m.label, r[1], r[0], m.value, m.ts
478
                )
479
480
481
482
                if m.type == MT_COUNTER:
                    cv = self.convert_counter(m, r)
                else:
                    cv = self.convert_delta(m, r)
483
484
485
486
487
                if cv is None:
                    # Counter stepback or other errors
                    # Remove broken value
                    self.logger.debug(
                        "[%s] Counter stepback from %s@%s to %s@%s: Skipping",
Dmitry Volodin's avatar
Dmitry Volodin committed
488
489
490
491
492
                        m.label,
                        r[1],
                        r[0],
                        m.value,
                        m.ts,
493
                    )
494
495
496
497
498
499
500
                    del counters[key]
                    continue
                m.value = cv
                m.abs_value = cv * m.scale
            elif m.type == MT_BOOL:
                # Convert boolean type
                m.abs_value = 1 if m.value else 0
501
            else:
502
503
                # Gauge
                m.abs_value = m.value * m.scale
504
            self.logger.debug(
505
                "[%s] Measured value: %s. Scale: %s. Resulting value: %s",
Dmitry Volodin's avatar
Dmitry Volodin committed
506
507
508
509
                m.label,
                m.value,
                m.scale,
                m.abs_value,
510
            )
511
            # Schedule to store
512
            if cfg.is_stored:
513
514
                tsc = ts_cache.get(m.ts)
                if not tsc:
515
516
                    lt = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(m.ts // 1000000000))
                    tsc = (lt.split(" ")[0], lt)
517
                    ts_cache[m.ts] = tsc
Dmitry Volodin's avatar
Dmitry Volodin committed
518
519
                if labels:
                    item_hash = hash_str(str((tsc[1], mo_id, labels)))
520
                else:
521
522
523
524
                    item_hash = hash_str(str((tsc[1], mo_id)))
                record = data[cfg.metric_type.scope.table_name].get(item_hash)
                if not record:
                    record = {"date": tsc[0], "ts": tsc[1], "managed_object": mo_id}
Dmitry Volodin's avatar
Dmitry Volodin committed
525
526
                    if labels:
                        record["labels"] = labels
527
528
                    if m.id in self.sensors_metrics:
                        record["sensor"] = self.sensors_metrics[m.id]
529
530
                    if m.id in self.sla_probe_metrics:
                        record["sla_probe"] = self.sla_probe_metrics[m.id]
531
                    data[cfg.metric_type.scope.table_name][item_hash] = record
532
533
                field = cfg.metric_type.field_name
                try:
534
                    record[field] = cfg.metric_type.clean_value(m.abs_value)
535
                except ValueError as e:
Dmitry Volodin's avatar
Dmitry Volodin committed
536
                    self.logger.info("[%s] Cannot clean value %s: %s", m.label, m.abs_value, e)
537
                    continue
538
539
                # Attach time_delta, when required
                if time_delta and cfg.metric_type.scope.enable_timedelta:
540
                    data[cfg.metric_type.scope.table_name][item_hash]["time_delta"] = time_delta
541
                n_metrics += 1
MaksimSmile13's avatar
MaksimSmile13 committed
542
543
            # Metrics path
            path = m.metric
Dmitry Volodin's avatar
Dmitry Volodin committed
544
545
            if m.labels:
                m_path = " | ".join(sorted(m.labels))
MaksimSmile13's avatar
MaksimSmile13 committed
546
547
                if not path.endswith(m_path):
                    path = "%s | %s" % (path, m_path)
548
            if cfg.threshold_profile and m.abs_value is not None:
549
                alarm, event = self.process_thresholds(m, cfg, path, labels)
MaksimSmile13's avatar
MaksimSmile13 committed
550
551
552
                alarms += alarm
                events += event
            elif self.job.context["active_thresholds"].get(path):
553
                alarm, event = self.clear_process_thresholds(m, cfg, path, labels)
MaksimSmile13's avatar
MaksimSmile13 committed
554
555
556
557
                alarms += alarm
                events += event
            else:
                # Build window state key
Dmitry Volodin's avatar
Dmitry Volodin committed
558
                key = m.get_key(cfg.metric_type.bi_id)
MaksimSmile13's avatar
MaksimSmile13 committed
559
560
561
                if self.job.context["metric_windows"].get(key):
                    del self.job.context["metric_windows"][key]
        return n_metrics, data, alarms, events
562
563
564
565
566
567
568

    def handler(self):
        self.logger.info("Collecting metrics")
        # Build get_metrics input parameters
        metrics = self.get_object_metrics()
        metrics += self.get_interface_metrics()
        metrics += self.get_sla_metrics()
569
        metrics += self.get_sensor_metrics()
570
571
572
573
        if not metrics:
            self.logger.info("No metrics configured. Skipping")
            return
        # Collect metrics
574
575
576
577
        ts = time.time()
        if "last_run" in self.job.context and self.job.context["last_run"] < ts:
            self.job.context["time_delta"] = int(round(ts - self.job.context["last_run"]))
        self.job.context["last_run"] = ts
578
        self.logger.debug("Collecting metrics: %s", metrics)
Dmitry Volodin's avatar
Dmitry Volodin committed
579
        result = [MData(**r) for r in self.object.scripts.get_metrics(metrics=metrics)]
580
581
582
583
        if not result:
            self.logger.info("No metrics found")
            return
        # Process results
MaksimSmile13's avatar
MaksimSmile13 committed
584
        n_metrics, data, alarms, events = self.process_result(result)
585
        # Send metrics
586
587
        if n_metrics:
            self.logger.info("Spooling %d metrics", n_metrics)
588
            for table in data:
589
                self.service.register_metrics(table, list(data[table].values()), key=self.object.id)
590
        # Set up threshold alarms
591
        self.logger.info("%d alarms detected", len(alarms))
MaksimSmile13's avatar
MaksimSmile13 committed
592
593
        if events:
            self.logger.info("%d events detected", len(events))
594
        self.job.update_umbrella(self.get_ac_pm_thresholds(), alarms)
Dmitry Volodin's avatar
Dmitry Volodin committed
595

596
    def convert_delta(self, m, r):
597
        """
598
        Calculate value from delta, gently handling overflows
599
600
        :param m: MData
        :param r: Old state (ts, value)
601
        """
602
        if m.value < r[1]:
603
            # Counter decreased, either due wrap or stepback
604
            if r[1] <= MAX31:
605
                mc = MAX31
606
            elif r[1] <= MAX32:
607
608
609
610
                mc = MAX32
            else:
                mc = MAX64
            # Direct distance
611
            d_direct = r[1] - m.value
612
            # Wrap distance
613
            d_wrap = m.value + (mc - r[1])
614
            if d_direct < d_wrap:
615
616
                # Possible counter stepback
                # Skip value
Dmitry Volodin's avatar
Dmitry Volodin committed
617
                self.logger.debug("[%s] Counter stepback: %s -> %s", m.label, r[1], m.value)
618
                return None
619
620
            else:
                # Counter wrap
Dmitry Volodin's avatar
Dmitry Volodin committed
621
                self.logger.debug("[%s] Counter wrap: %s -> %s", m.label, r[1], m.value)
622
                return d_wrap
623
        else:
624
625
626
627
628
629
630
631
632
633
634
635
636
            return m.value - r[1]

    def convert_counter(self, m, r):
        """
        Calculate value from counter, gently handling overflows
        :param m: MData
        :param r: Old state (ts, value)
        """
        dt = (float(m.ts) - float(r[0])) / NS
        delta = self.convert_delta(m, r)
        if delta is None:
            return delta
        return float(delta) / dt
637

638
    def get_window_function(self, m: MData, cfg: MetricConfig) -> Optional[Any]:
639
640
641
642
        """
        Check thresholds
        :param m: dict with metric result
        :param cfg: MetricConfig
643
        :return: Value or None
644
        """
645
        # Build window state key
Dmitry Volodin's avatar
Dmitry Volodin committed
646
        key = m.get_key(cfg.metric_type.bi_id)
647
        #
648
        states = self.job.context["metric_windows"]
649
        value = m.abs_value
650
        if cfg.threshold_profile.value_handler:
MaksimSmile13's avatar
MaksimSmile13 committed
651
652
653
654
655
656
657
658
659
660
661
662
663
664
            if cfg.threshold_profile.value_handler.allow_threshold_value_handler:
                vh = cfg.threshold_profile.value_handler.get_handler()
                if vh:
                    ctx = self.id_ctx.get(m.id) or {}
                    try:
                        value = vh(value, **ctx)
                    except Exception as e:
                        self.logger.error(
                            "Failed to execute value handler %s: %s",
                            cfg.threshold_profile.value_handler,
                            e,
                        )
            else:
                self.logger.warning("Value Handler is not allowed for Thresholds")
665
        ts = m.ts // 1000000000
666
        # Do not store single-value windows
667
668
669
        window_type = cfg.threshold_profile.window_type
        ws = cfg.threshold_profile.window
        drop_window = window_type == "m" and ws == 1
670
671
672
673
        # Restore window
        if drop_window:
            window = [(ts, value)]
            window_full = True
674
675
            if key in states:
                del states[key]
676
        else:
677
            window = states.get(key, [])
678
679
            window += [(ts, value)]
            # Trim window according to policy
680
            if window_type == WT_MEASURES:
681
                # Leave fixed amount of measures
682
683
684
                window = window[-ws:]
                window_full = len(window) == ws
            elif window_type == WT_TIME:
685
                # Time-based window
MaksimSmile13's avatar
MaksimSmile13 committed
686
                window_full = ts - window[0][0] >= ws >= ts - window[-2::][0][0]
687
                while ts - window[0][0] > ws:
688
689
690
691
                    window.pop(0)
            else:
                self.logger.error(
                    "Cannot calculate thresholds for %s (%s): Invalid window type '%s'",
Dmitry Volodin's avatar
Dmitry Volodin committed
692
                    m.metric,
Dmitry Volodin's avatar
Dmitry Volodin committed
693
                    m.labels,
Dmitry Volodin's avatar
Dmitry Volodin committed
694
                    window_type,
695
                )
696
                return None
697
            # Store back to context
698
            states[key] = window
699
700
        if not window_full:
            self.logger.error(
Dmitry Volodin's avatar
Dmitry Volodin committed
701
                "Cannot calculate thresholds for %s (%s): Window is not filled", m.metric, m.labels
702
            )
703
            return None
704
        # Process window function
705
        wf = cfg.threshold_profile.get_window_function()
706
707
708
        if not wf:
            self.logger.error(
                "Cannot calculate thresholds for %s (%s): Invalid window function %s",
Dmitry Volodin's avatar
Dmitry Volodin committed
709
                m.metric,
Dmitry Volodin's avatar
Dmitry Volodin committed
710
                m.labels,
Dmitry Volodin's avatar
Dmitry Volodin committed
711
                cfg.threshold_profile.window_function,
712
            )
713
            return None
714
        try:
715
            return wf(window, cfg.threshold_profile.window_config)
716
        except ValueError as e:
Dmitry Volodin's avatar
Dmitry Volodin committed
717
            self.logger.error("Cannot calculate thresholds for %s (%s): %s", m.metric, m.labels, e)
718
719
            return None

720
    def clear_process_thresholds(self, m, cfg, path, labels=None):
MaksimSmile13's avatar
MaksimSmile13 committed
721
722
723
724
        """
        Check thresholds
        :param m: dict with metric result
        :param cfg: MetricConfig
725
726
        :param path:
        :param labels:
MaksimSmile13's avatar
MaksimSmile13 committed
727
728
729
730
        :return: List of umbrella alarm details
        """
        alarms = []
        events = []
Dmitry Volodin's avatar
Dmitry Volodin committed
731
732
        key = m.get_key(cfg.metric_type.bi_id)
        #
MaksimSmile13's avatar
MaksimSmile13 committed
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
        active = self.job.context["active_thresholds"].get(path)
        threshold_profile = active["threshold_profile"]
        threshold = threshold_profile.find_threshold(active["threshold"])
        if threshold:
            # Close Event
            self.logger.debug(
                "Remove active thresholds %s from metric %s",
                self.job.context["active_thresholds"].get(path),
                path,
            )
            del self.job.context["active_thresholds"][path]
            if self.job.context["metric_windows"].get(key):
                del self.job.context["metric_windows"][key]
            if threshold.close_event_class:
                events += self.get_event_cfg(
                    cfg,
                    threshold_profile,
                    threshold.name,
                    threshold.close_event_class.name,
                    path,
                    m.value,
754
                    labels=labels,
MaksimSmile13's avatar
MaksimSmile13 committed
755
756
757
758
759
760
761
762
763
764
765
766
767
                )
            if threshold.close_handler:
                if threshold.close_handler.allow_threshold:
                    handler = threshold.close_handler.get_handler()
                    if handler:
                        try:
                            handler(self, cfg, threshold, m.value)
                        except Exception as e:
                            self.logger.error("Exception when calling close handler: %s", e)
                else:
                    self.logger.warning("Handler is not allowed for Thresholds")
        return alarms, events

768
769
770
    def process_thresholds(
        self, m: MData, cfg: MetricConfig, path: str, labels: Optional[List[str]] = None
    ):
771
772
773
774
        """
        Check thresholds
        :param m: dict with metric result
        :param cfg: MetricConfig
775
        :param path: Metric path
776
        :param labels: Metric labels
777
778
779
        :return: List of umbrella alarm details
        """
        alarms = []
MaksimSmile13's avatar
MaksimSmile13 committed
780
781
        events = []
        new_threshold = None
MaksimSmile13's avatar
MaksimSmile13 committed
782
783
        # Get active threshold name
        active = self.job.context["active_thresholds"].get(path)
784
        # Check if profile has configured thresholds
MaksimSmile13's avatar
MaksimSmile13 committed
785
        if not cfg.threshold_profile.thresholds and not active:
MaksimSmile13's avatar
MaksimSmile13 committed
786
            return alarms, events
787
        w_value = self.get_window_function(m, cfg)
MaksimSmile13's avatar
MaksimSmile13 committed
788
        if w_value is None and not active:
MaksimSmile13's avatar
MaksimSmile13 committed
789
            return alarms, events
790
791
        if w_value is None:
            w_value = m.abs_value
792
793
        if active:
            # Check we should close existing threshold
MaksimSmile13's avatar
MaksimSmile13 committed
794
            for th in cfg.threshold_profile.thresholds:
795
                if th.is_open_match(w_value):
MaksimSmile13's avatar
MaksimSmile13 committed
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
                    new_threshold = th
                    break
            threshold = cfg.threshold_profile.find_threshold(active["threshold"])
            if new_threshold and threshold != new_threshold:
                # Close Event
                active = None  # Reset threshold
                del self.job.context["active_thresholds"][path]
                if threshold.close_event_class:
                    events += self.get_event_cfg(
                        cfg,
                        cfg.threshold_profile,
                        threshold.name,
                        threshold.close_event_class.name,
                        path,
                        w_value,
811
812
813
                        labels=labels,
                        sensor=self.sensors_metrics.get(m.id),
                        sla_probe=self.sla_probe_metrics.get(m.id),
MaksimSmile13's avatar
MaksimSmile13 committed
814
815
816
817
818
819
820
821
822
823
824
825
826
                    )
                if threshold.close_handler:
                    if threshold.close_handler.allow_threshold:
                        handler = threshold.close_handler.get_handler()
                        if handler:
                            try:
                                handler(self, cfg, threshold, w_value)
                            except Exception as e:
                                self.logger.error("Exception when calling close handler: %s", e)
                    else:
                        self.logger.warning("Handler is not allowed for Thresholds")
                elif threshold.alarm_class:
                    # Remain umbrella alarm
827
828
829
830
831
832
833
834
835
                    alarms += self.get_umbrella_alarm_cfg(
                        cfg,
                        threshold,
                        path,
                        w_value,
                        labels=labels,
                        sensor=self.sensors_metrics.get(m.id),
                        sla_probe=self.sla_probe_metrics.get(m.id),
                    )
MaksimSmile13's avatar
MaksimSmile13 committed
836
            elif threshold:
837
                if threshold.is_clear_match(w_value):
MaksimSmile13's avatar
MaksimSmile13 committed
838
                    # Close Event
839
840
841
                    active = None  # Reset threshold
                    del self.job.context["active_thresholds"][path]
                    if threshold.close_event_class:
MaksimSmile13's avatar
MaksimSmile13 committed
842
843
844
845
846
847
848
                        events += self.get_event_cfg(
                            cfg,
                            cfg.threshold_profile,
                            threshold.name,
                            threshold.close_event_class.name,
                            path,
                            w_value,
849
850
851
                            labels=labels,
                            sensor=self.sensors_metrics.get(m.id),
                            sla_probe=self.sla_probe_metrics.get(m.id),
MaksimSmile13's avatar
MaksimSmile13 committed
852
                        )
853
                    if threshold.close_handler:
854
855
856
857
858
859
860
861
862
                        if threshold.close_handler.allow_threshold:
                            handler = threshold.close_handler.get_handler()
                            if handler:
                                try:
                                    handler(self, cfg, threshold, w_value)
                                except Exception as e:
                                    self.logger.error("Exception when calling close handler: %s", e)
                        else:
                            self.logger.warning("Handler is not allowed for Thresholds")
Dmitry Volodin's avatar
Dmitry Volodin committed
863
864
                if threshold.alarm_class:
                    # Remain umbrella alarm
865
866
867
868
869
870
871
872
873
                    alarms += self.get_umbrella_alarm_cfg(
                        cfg,
                        threshold,
                        path,
                        w_value,
                        labels=labels,
                        sensor=self.sensors_metrics.get(m.id),
                        sla_probe=self.sla_probe_metrics.get(m.id),
                    )
874
875
            else:
                # Threshold has been reconfigured or deleted
MaksimSmile13's avatar
MaksimSmile13 committed
876
877
878
879
880
881
882
                if active.get("close_event_class"):
                    events += self.get_event_cfg(
                        cfg,
                        active["threshold_profile"],
                        active["threshold"],
                        active["close_event_class"].name,
                        path,
883
                        w_value,
884
885
886
                        labels=labels,
                        sensor=self.sensors_metrics.get(m.id),
                        sla_probe=self.sla_probe_metrics.get(m.id),
MaksimSmile13's avatar
MaksimSmile13 committed
887
888
889
890
891
892
                    )
                if active.get("close_handler"):
                    if active["close_handler"].allow_threshold:
                        handler = active["close_handler"].get_handler()
                        if handler:
                            try:
893
                                handler(self, cfg, active["threshold"], w_value)
MaksimSmile13's avatar
MaksimSmile13 committed
894
895
896
897
                            except Exception as e:
                                self.logger.error("Exception when calling close handler: %s", e)
                    else:
                        self.logger.warning("Handler is not allowed for Thresholds")
898
899
                active = None
                del self.job.context["active_thresholds"][path]
900
901
902
903
904
905
        if not active:
            # Check opening thresholds only if no active threshold remains
            for threshold in cfg.threshold_profile.thresholds:
                if not threshold.is_open_match(w_value):
                    continue
                # Set context
MaksimSmile13's avatar
MaksimSmile13 committed
906
907
908
                self.job.context["active_thresholds"][path] = {
                    "threshold": threshold.name,
                    "threshold_profile": cfg.threshold_profile,
MaksimSmile13's avatar
MaksimSmile13 committed
909
910
                    "close_event_class": threshold.close_event_class,
                    "close_handler": threshold.close_handler,
MaksimSmile13's avatar
MaksimSmile13 committed
911
                }
912
                if threshold.open_event_class:
MaksimSmile13's avatar
MaksimSmile13 committed
913
914
915
916
917
918
919
920
                    # Raise Event
                    events += self.get_event_cfg(
                        cfg,
                        cfg.threshold_profile,
                        threshold.name,
                        threshold.open_event_class.name,
                        path,
                        w_value,
921
922
923
                        labels=labels,
                        sensor=self.sensors_metrics.get(m.id),
                        sla_probe=self.sla_probe_metrics.get(m.id),
MaksimSmile13's avatar
MaksimSmile13 committed
924
                    )
925
                if threshold.open_handler:
926
927
928
929
930
931
932
933
934
935
                    if threshold.open_handler.allow_threshold:
                        # Call handler
                        handler = threshold.open_handler.get_handler()
                        if handler:
                            try:
                                handler(self, cfg, threshold, w_value)
                            except Exception as e:
                                self.logger.error("Exception when calling open handler: %s", e)
                    else:
                        self.logger.warning("Handler is not allowed for Thresholds")
936
937
                if threshold.alarm_class:
                    # Raise umbrella alarm
938
939
940
941
942
943
944
945
946
                    alarms += self.get_umbrella_alarm_cfg(
                        cfg,
                        threshold,
                        path,
                        w_value,
                        labels=labels,
                        sensor=self.sensors_metrics.get(m.id),
                        sla_probe=self.sla_probe_metrics.get(m.id),
                    )
947
                break
MaksimSmile13's avatar
MaksimSmile13 committed
948
        return alarms, events
949

950
951
952
    def get_umbrella_alarm_cfg(
        self, metric_config, threshold, path, value, labels=None, sensor=None, sla_probe=None
    ):
953
954
955
        """
        Get configuration for umbrella alarm
        :param threshold:
956
        :param path:
957
958
        :param metric_config:
        :param value:
959
960
961
        :param labels:
        :param sensor:
        :param sla_probe:
962
963
        :return: List of dicts or empty list
        """
964
        self.logger.info("Get Umbrella Alarm CFG: %s", metric_config)
965
        alarm_cfg = {
Dmitry Volodin's avatar
Fix    
Dmitry Volodin committed
966
            "alarm_class": threshold.alarm_class,
967
968
969
970
            "path": path,
            "severity": threshold.alarm_class.default_severity.severity,
            "vars": {
                "path": path,
Andrey Vertiprahov's avatar