Commit aacb96a3 authored by Dmitry Volodin's avatar Dmitry Volodin
Browse files

Merge branch 'noc-1683' into 'master'

#1683 Migrate update_alarms to dispose protocol.

See merge request !5887
parents 1a3fbf20 3f930765
Pipeline #34260 passed with stages
in 46 minutes and 15 seconds
......@@ -20,11 +20,13 @@ from mongoengine.fields import (
)
from jinja2 import Template
import cachetools
from typing import Iterable
# NOC modules
from noc.core.mongo.fields import PlainReferenceField
from noc.core.model.decorator import on_delete_check
from noc.fm.models.alarmclass import AlarmClass
from noc.core.models.problem import ProblemItem
from .confdbquery import ConfDBQuery
id_lock = threading.Lock()
......@@ -67,7 +69,7 @@ class InterfaceValidationPolicy(Document):
def get_by_id(cls, id):
return InterfaceValidationPolicy.objects.filter(id=id).first()
def iter_problems(self, engine, ifname):
def iter_problems(self, engine, ifname: str) -> Iterable[ProblemItem]:
"""
Check rules against ConfDB engine
......@@ -92,12 +94,12 @@ class InterfaceValidationPolicy(Document):
path = [ifname]
if rule.error_code:
path += [rule.error_code]
problem = {
"alarm_class": rule.alarm_class.name if rule.alarm_class else None,
"path": path,
"message": tpl.render(ctx),
"code": rule.error_code or None,
}
yield problem
yield ProblemItem(
alarm_class=rule.alarm_class.name if rule.alarm_class else None,
path=path,
message=tpl.render(ctx),
code=rule.error_code or None,
vars={"interface": ifname},
)
if rule.is_fatal:
return
......@@ -20,11 +20,13 @@ from mongoengine.fields import (
)
from jinja2 import Template
import cachetools
from typing import Iterable
# NOC modules
from noc.core.mongo.fields import PlainReferenceField
from noc.core.model.decorator import on_delete_check
from noc.fm.models.alarmclass import AlarmClass
from noc.core.models.problem import ProblemItem
from .confdbquery import ConfDBQuery
id_lock = threading.Lock()
......@@ -63,7 +65,7 @@ class ObjectValidationPolicy(Document):
def get_by_id(cls, id):
return ObjectValidationPolicy.objects.filter(id=id).first()
def iter_problems(self, engine):
def iter_problems(self, engine) -> Iterable[ProblemItem]:
"""
Check rules against ConfDB engine
......@@ -87,12 +89,11 @@ class ObjectValidationPolicy(Document):
path = []
if rule.error_code:
path += [rule.error_code]
problem = {
"alarm_class": rule.alarm_class.name if rule.alarm_class else None,
"path": path,
"message": tpl.render(ctx),
"code": rule.error_code or None,
}
yield problem
yield ProblemItem(
alarm_class=rule.alarm_class.name if rule.alarm_class else None,
path=path,
message=tpl.render(ctx),
code=rule.error_code or None,
)
if rule.is_fatal:
return
......@@ -4,13 +4,15 @@
"uuid": "581b6801-6606-48ff-bcdf-fac8d988409a",
"description": "Base Alarm Class for umbrella alarms.",
"is_unique": true,
"reference": [],
"is_ephemeral": false,
"user_clearable": true,
"default_severity__name": "WARNING",
"vars": [
],
"subject_template": "Configuration Policy Violations",
"body_template": "When checking the configuration, discrepancies were found with the reference settings:\n \n{% for aa in alarm.iter_consequences %}\n==============\n {{ aa.body }}\n==============\n{% endfor %}",
"body_template": "When checking the configuration, discrepancies were found with the reference settings:\n \n{% for aa in alarm.iter_grouped %}\n==============\n {{ aa.body }}\n==============\n{% endfor %}",
"symptoms": "",
"probable_causes": "New changes or rolling back previous configuration.",
"recommended_actions": "Go to the hardware and analyze the configuration.",
......
......@@ -4,13 +4,13 @@
"uuid": "bcb92084-18ed-4229-9a1f-f85fc80c6a96",
"description": "Base Alarm Class for Box discovery alarms.",
"is_unique": true,
"reference": [],
"is_ephemeral": true,
"user_clearable": true,
"default_severity__name": "WARNING",
"vars": [
],
"vars": [],
"subject_template": "Umbrella for Box discovery alarms",
"body_template": "Alarms on Box discovery processed\n \n{% for aa in alarm.iter_consequences %}\n==============\n {{ aa.body }}\n==============\n{% endfor %}",
"body_template": "Alarms on Box discovery processed\n \n{% for aa in alarm.iter_grouped %}\n==============\n {{ aa.body }}\n==============\n{% endfor %}",
"symptoms": "",
"probable_causes": "Various errors connecting to the equipment",
"recommended_actions": "Connection error - check telnet/ssh port availability on device\nAuthentication error - check allow access to device with credentials\nSNMP error - check SNMP community settings on device",
......
# ----------------------------------------------------------------------
# Job Problem DataClass
# ----------------------------------------------------------------------
# Copyright (C) 2007-2021 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from dataclasses import dataclass, field
# Third-party modules
from typing import List, Dict, Optional, Any
@dataclass(frozen=True)
class ProblemItem(object):
alarm_class: Optional[str]
message: str = ""
path: List[str] = field(default_factory=list)
labels: List[str] = field(default_factory=list)
fatal: bool = False
vars: Dict[str, Any] = field(default_factory=dict)
code: Optional[str] = None
check: Optional[str] = None
......@@ -835,6 +835,14 @@ class ActiveAlarm(Document):
yield a
yield from a.iter_consequences()
def iter_grouped(self):
"""
Generator yielding all alarm in group
"""
for a in ActiveAlarm.objects.filter(groups__in=[self.reference]):
yield a
yield from a.iter_grouped()
def iter_affected(self):
"""
Generator yielding all affected managed objects
......
......@@ -90,6 +90,7 @@ from noc.core.confdb.engine.base import Engine
from noc.core.comp import smart_text, smart_bytes
from noc.main.models.glyph import Glyph
from noc.core.topology.types import ShapeOverlayPosition, ShapeOverlayForm
from noc.core.models.problem import ProblemItem
from .administrativedomain import AdministrativeDomain
from .authprofile import AuthProfile
from .managedobjectprofile import ManagedObjectProfile
......@@ -149,6 +150,7 @@ logger = logging.getLogger(__name__)
("inv.DiscoveryID", "object"),
("inv.Sensor", "managed_object"),
("sa.ObjectCapabilities", "object"),
("sa.ObjectData", "_id"),
],
clean=[("ip.Address", "managed_object"), ("sa.Service", "managed_object")],
)
......@@ -1243,7 +1245,7 @@ class ManagedObject(NOCModel):
return False
return True
def iter_validation_problems(self, changed):
def iter_validation_problems(self, changed: bool) -> Iterable[ProblemItem]:
"""
Yield validation problems
......@@ -1913,6 +1915,17 @@ class ManagedObject(NOCModel):
pool = self.get_effective_fm_pool().name
return "events.%s" % pool, 0
@property
def alarms_stream_and_partition(self) -> Tuple[str, int]:
"""
Return publish stream and partition for alarms
:return: stream name, partition
"""
# @todo: Calculate partition properly
fm_pool = self.get_effective_fm_pool().name
stream = f"dispose.{fm_pool}"
return stream, 0
@classmethod
def iter_effective_labels(cls, instance: "ManagedObject") -> Iterable[List[str]]:
yield list(instance.labels or [])
......
......@@ -28,5 +28,5 @@ class EnsureGroupRequest(BaseModel):
reference: str
name: Optional[str]
alarm_class: Optional[str]
labels: Optional[str]
labels: Optional[List[str]]
alarms: List[AlarmItem]
......@@ -14,6 +14,7 @@ from collections import defaultdict
import threading
from typing import Union, Any, Iterable, Optional, Dict, List, Set
import operator
from itertools import chain
from hashlib import sha512
# Third-party modules
......@@ -462,7 +463,7 @@ class CorrelatorService(TornadoService):
remote_system=remote_system,
remote_id=remote_id,
)
a.effective_labels = a.iter_effective_labels(a)
a.effective_labels = list(chain.from_iterable(ActiveAlarm.iter_effective_labels(a)))
a.raw_reference = reference
# Static groups
alarm_groups: Dict[str, GroupItem] = {}
......@@ -750,7 +751,7 @@ class CorrelatorService(TornadoService):
vars=req.vars,
reference=req.reference,
groups=groups,
labels=req.labels,
labels=req.labels or [],
remote_system=remote_system,
remote_id=req.remote_id if remote_system else None,
)
......@@ -830,7 +831,7 @@ class CorrelatorService(TornadoService):
alarm_class=alarm_class,
vars={"name": req.name or "Group"},
reference=req.reference,
labels=req.labels,
labels=req.labels or [],
)
# Fetch all open alarms in group
open_alarms: Dict[bytes, ActiveAlarm] = {
......
......@@ -2,7 +2,7 @@
# ---------------------------------------------------------------------
# Basic MO discovery job
# ---------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# Copyright (C) 2007-2021 The NOC Project
# See LICENSE for details
# ---------------------------------------------------------------------
......@@ -19,12 +19,14 @@ from time import perf_counter
# Third-party modules
import bson
import cachetools
import orjson
from pymongo import UpdateOne
from typing import List, Dict
from builtins import str, object
# NOC modules
from noc.core.scheduler.periodicjob import PeriodicJob
from noc.core.models.problem import ProblemItem
from noc.sa.models.managedobject import ManagedObject
from noc.inv.models.subinterface import SubInterface
from noc.inv.models.interfaceprofile import InterfaceProfile
......@@ -63,7 +65,7 @@ class MODiscoveryJob(PeriodicJob):
self.out_buffer = StringIO()
self.logger = PrefixLoggerAdapter(self.logger, "", target=self.out_buffer)
self.check_timings = []
self.problems = []
self.problems: List[ProblemItem] = []
self.caps = None
self.has_fatal_error = False
self.service = self.scheduler.service
......@@ -78,18 +80,23 @@ class MODiscoveryJob(PeriodicJob):
)
super().schedule_next(status)
# Update alarm statuses
self.update_alarms()
# Clean up all open alarms as they has been disabled
self.update_alarms(self.problems if self.get_umbrella_settings() else [], self.umbrella_cls)
# Write job log
key = "discovery-%s-%s" % (self.attrs[self.ATTR_CLASS], self.attrs[self.ATTR_KEY])
problems = {}
for p in list(self.problems):
if p["check"] not in problems:
problems[p["check"]] = defaultdict(str)
if p["path"]:
problems[p["check"]][p["path"]] = p["message"]
if not p.check:
# Not Discovery problem
continue
path = " | ".join(p.path)
if p.check not in problems:
problems[p.check] = defaultdict(str)
if p.path:
problems[p.check][path] = p.message
else:
# p["path"] == ""
problems[p["check"]][p["path"]] += "; %s" % p["message"]
problems[p.check][path] += "; %s" % p.message
get_db()["noc.joblog"].update(
{"_id": key},
{
......@@ -147,15 +154,17 @@ class MODiscoveryJob(PeriodicJob):
kwargs,
)
self.problems += [
{
"check": check,
"alarm_class": alarm_class,
# in MongoDB Key must be string
"path": str(path) if path else "",
"message": message,
"fatal": fatal,
"vars": kwargs,
}
ProblemItem(
**{
"check": check,
"alarm_class": alarm_class,
# in MongoDB Key must be string
"path": [str(path)] if path else [],
"message": message,
"fatal": fatal,
"vars": kwargs,
}
)
]
if fatal:
self.has_fatal_error = True
......@@ -286,43 +295,67 @@ class MODiscoveryJob(PeriodicJob):
if umbrella and umbrella_changed:
AlarmEscalation.watch_escalations(umbrella)
def update_alarms(self):
def update_alarms(
self, problems: List[ProblemItem], group_cls: str = None, group_reference: str = None
):
# @todo Save reference to job context
self.logger.info("Updating alarm statuses")
group_cls = AlarmClass.get_by_name(group_cls or "Group")
if not group_cls:
self.logger.info("No umbrella alarm class. Alarm statuses not updated")
return
details = []
now = datetime.datetime.now()
for p in problems:
if not p.alarm_class:
continue
ac = AlarmClass.get_by_name(p.alarm_class)
if not ac:
self.logger.info("Unknown alarm class %s. Skipping", p.alarm_class)
continue
d_vars = {"path": " | ".join(p.path), "message": p.message}
if p.vars:
d_vars.update(p.vars)
details += [
{
"reference": f"d:{p.alarm_class}:{self.object.id}:{' | '.join(p.path)}",
"alarm_class": p.alarm_class,
"managed_object": self.object.id,
"timestamp": now,
"labels": p.labels,
"vars": d_vars,
}
]
msg = {
"$op": "ensure_group",
"reference": group_reference or f"g:d:{self.object.id}:{group_cls.name}",
"alarm_class": group_cls.name,
"alarms": details,
}
stream, partition = self.object.alarms_stream_and_partition
self.service.publish(
orjson.dumps(msg),
stream=stream,
partition=partition,
)
self.logger.debug(
"Dispose: %s", orjson.dumps(msg, option=orjson.OPT_INDENT_2).decode("utf-8")
)
def get_umbrella_settings(self) -> bool:
"""
Check enable Alarm for Discovery
:param self:
:return:
"""
prev_status = self.context.get("umbrella_settings", False)
current_status = self.can_update_alarms()
self.context["umbrella_settings"] = current_status
if not prev_status and not current_status:
return
self.logger.info("Updating alarm statuses")
umbrella_cls = AlarmClass.get_by_name(self.umbrella_cls)
if not umbrella_cls:
self.logger.info("No umbrella alarm class. Alarm statuses not updated")
return
details = []
if current_status:
fatal_weight = self.get_fatal_alarm_weight()
weight = self.get_alarm_weight()
for p in self.problems:
if not p["alarm_class"]:
continue
ac = AlarmClass.get_by_name(p["alarm_class"])
if not ac:
self.logger.info("Unknown alarm class %s. Skipping", p["alarm_class"])
continue
details += [
{
"alarm_class": ac,
"path": p["path"],
"severity": AlarmSeverity.severity_for_weight(
fatal_weight if p["fatal"] else weight
),
"vars": {"path": p["path"], "message": p["message"]},
}
]
else:
# Clean up all open alarms as they has been disabled
details = []
self.update_umbrella(umbrella_cls, details)
return False
return True
def can_update_alarms(self):
return False
......@@ -1572,7 +1605,6 @@ class PolicyDiscoveryCheck(DiscoveryCheck):
# Avoid circular references
from noc.fm.models.alarmseverity import AlarmSeverity
from noc.fm.models.alarmclass import AlarmClass
from noc.fm.models.activealarm import ActiveAlarm
from noc.fm.models.alarmescalation import AlarmEscalation
# ----------------------------------------------------------------------
# ConfigValidation check
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# Copyright (C) 2007-2021 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Third-party modules
import cachetools
# NOC modules
from noc.services.discovery.jobs.base import DiscoveryCheck
from noc.fm.models.alarmclass import AlarmClass
class ConfigValidationCheck(DiscoveryCheck):
......@@ -20,48 +16,26 @@ class ConfigValidationCheck(DiscoveryCheck):
name = "configvalidation"
required_artefacts = ["config_acquired"]
@staticmethod
@cachetools.cached({})
def get_ac_cm_violations():
return AlarmClass.get_by_name("Config | Policy Violations")
umbrella_cls = "Config | Policy Violations"
def handler(self):
self.logger.info("Running config validation")
is_changed = self.get_artefact("config_changed") or False
# New ConfDB path, problems are passed via alarms
alarms = []
problems = []
for problem in self.object.iter_validation_problems(is_changed):
alarms += [self.get_umbrella_alarm_cfg(**problem)]
problems += [problem]
# self.set_problem(**problem)
if alarms:
self.logger.info("%d problem(s) detected", len(alarms))
if problems:
self.logger.info("%d problem(s) detected", len(problems))
else:
self.logger.info("No problems detected")
self.job.update_umbrella(self.get_ac_cm_violations(), alarms)
self.job.update_alarms(
problems,
group_cls=self.umbrella_cls,
group_reference=f"g:c:{self.object.id}:{self.umbrella_cls}",
)
def is_enabled(self):
checks = self.job.attrs.get("_checks", set())
return not checks or "config" in checks
def get_umbrella_alarm_cfg(
self, alarm_class=None, path=None, message=None, fatal=False, **kwargs
):
"""
Getting Umbrella Alarm Cfg
:param alarm_class: Alarm class instance or name
:param path: Additional path
:param message: Text message
:param fatal: True if problem is fatal and all following checks
must be disabled
:param kwargs: Dict containing optional variables
:return:
"""
alarm_cfg = {
"alarm_class": AlarmClass.get_by_name(alarm_class),
"path": " | ".join(path),
"vars": kwargs,
}
alarm_cfg["vars"]["message"] = message
alarm_cfg["vars"]["path"] = path
return alarm_cfg
......@@ -11,7 +11,7 @@ import operator
import itertools
import time
from collections import defaultdict
from typing import Any, Optional, List, Dict, Set
from typing import Any, Optional, List, Dict, Set, Tuple
from dataclasses import dataclass
# Third-party modules
......@@ -22,6 +22,7 @@ from bson import ObjectId
# NOC modules
from noc.services.discovery.jobs.base import DiscoveryCheck
from noc.core.models.problem import ProblemItem
from noc.inv.models.object import Object
from noc.sa.models.managedobjectprofile import ManagedObjectProfile
from noc.inv.models.interfaceprofile import InterfaceProfile
......@@ -34,7 +35,7 @@ from noc.pm.models.metrictype import MetricType
from noc.sla.models.slaprofile import SLAProfile
from noc.sla.models.slaprobe import SLAProbe
from noc.wf.models.state import State
from noc.pm.models.thresholdprofile import ThresholdProfile
from noc.pm.models.thresholdprofile import ThresholdProfile, ThresholdConfig
from noc.core.hash import hash_str
......@@ -137,6 +138,8 @@ class MetricsCheck(DiscoveryCheck):
SLA_CAPS = ["Cisco | IP | SLA | Probes"]
umbrella_cls = "NOC | PM | Out of Thresholds"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.id_count = itertools.count()
......@@ -218,7 +221,7 @@ class MetricsCheck(DiscoveryCheck):
r[m.metric_type.name] = cls.config_from_settings(m)
return r
def get_object_metrics(self):
def get_object_metrics(self) -> List[Dict[str, Any]]:
"""
Populate metrics list with objects metrics
:return:
......@@ -249,7 +252,7 @@ class MetricsCheck(DiscoveryCheck):
subs[si["interface"]] += [{"name": si["name"], "ifindex": si.get("ifindex")}]
return subs
def get_interface_metrics(self):
def get_interface_metrics(self) -> List[Dict[str, Any]]:
"""
Populate metrics list with interface metrics
:return:
......@@ -330,7 +333,7 @@ class MetricsCheck(DiscoveryCheck):
self.logger.info("Interface metrics are not configured. Skipping")
return metrics
def get_sla_metrics(self):
def get_sla_metrics(self) -> List[Dict[str, Any]]:
if not self.has_any_capability(self.SLA_CAPS):
self.logger.info("SLA not configured, skipping SLA metrics")
metrics = []
......@@ -381,7 +384,7 @@ class MetricsCheck(DiscoveryCheck):
self.logger.info("SLA metrics are not configured. Skipping")
return metrics
def get_sensor_metrics(self):
def get_sensor_metrics(self) -> List[Dict[str, Any]]:
metrics = []
o = Object.get_managed(self.object).values_list("id")
for s in (
......@@ -418,7 +421,9 @@ class MetricsCheck(DiscoveryCheck):
self.sensors_metrics[m_id] = int(s["bi_id"])
return metrics
def process_result(self, result: List[MData]):
def process_result(
self, result: List[MData]
) -> Tuple[int, Dict[str, Dict[str, Dict[str, Any]]], List[ProblemItem], List[Dict[str, Any]]]:
"""
Process IGetMetrics result
:param result:
......@@ -591,7 +596,11 @@ class MetricsCheck(DiscoveryCheck):