Commit 3efc55ef authored by Andrey Vertiprahov's avatar Andrey Vertiprahov
Browse files

Merge branch 'noc-alarm-groups' into 'master'

correlator: Dynamic alarm groups

See merge request noc/noc!5873
parents 4bbcc3be 4d29ac91
......@@ -32,7 +32,7 @@ class Migration(BaseMigration):
"vars": [
{"name": "name", "description": "Alarm Group name"},
],
"subject_template": 'Group Alarm {{alarm.vars["name"]}}',
"subject_template": "Group Alarm {{alarm.vars.name}}",
"body_template": "Group Alarm",
"recommended_actions": "Ignore this",
"flap_condition": "none",
......@@ -56,11 +56,13 @@ class Migration(BaseMigration):
"is_active": True,
"description": "Default Alarm Group",
"match": [],
"groups": [{
"reference_template": "",
"alarm_class": ObjectId("6172add005d0668829888eae"),
"title_template": "",
}],
"groups": [
{
"reference_template": "",
"alarm_class": ObjectId("6172add005d0668829888eae"),
"title_template": "",
}
],
"actions": [],
"bi_id": 8422126749852039790,
}
......
......@@ -73,6 +73,7 @@ class ActiveAlarm(Document):
("alarm_class", "rca_neighbors"),
"labels",
"effective_labels",
"groups",
],
}
status = "A"
......@@ -105,6 +106,8 @@ class ActiveAlarm(Document):
# RCA
# Reference to root cause (Active Alarm or Archived Alarm instance)
root = ObjectIdField(required=False)
# Group alarm references
groups = ListField(BinaryField())
# Escalated TT ID in form
# <external system name>:<external tt id>
escalation_ts = DateTimeField(required=False)
......@@ -171,9 +174,7 @@ class ActiveAlarm(Document):
self.rca_neighbors = data.rca_neighbors
self.dlm_windows = data.dlm_windows
if not self.id:
self.effective_labels = [
label for label in self.iter_effective_labels() if self.can_set_label(label)
]
self.effective_labels = self.iter_effective_labels(self)
def safe_save(self, **kwargs):
"""
......@@ -291,6 +292,7 @@ class ActiveAlarm(Document):
ack_ts=self.ack_ts,
ack_user=self.ack_user,
root=self.root,
groups=self.groups,
escalation_ts=self.escalation_ts,
escalation_tt=self.escalation_tt,
escalation_error=self.escalation_error,
......@@ -851,14 +853,10 @@ class ActiveAlarm(Document):
if a.escalation_tt:
yield a
def iter_effective_labels(self):
@classmethod
def iter_effective_labels(cls, instance: "ActiveAlarm"):
return [
ll
for ll in set(self.managed_object.labels or [])
| set(self.managed_object.object_profile.labels or [])
if Label.get_effective_setting(ll, "expose_alarm")
]
return [ll for ll in instance.managed_object.effective_labels if cls.can_set_label(ll)]
@classmethod
def can_set_label(cls, label):
......
# ---------------------------------------------------------------------
# AlarmGroup model
# AlarmRule model
# ---------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# Copyright (C) 2007-2021 The NOC Project
# See LICENSE for details
# ---------------------------------------------------------------------
# Python modules
import operator
from threading import Lock
from typing import Optional
# Third-party modules
import cachetools
......@@ -36,10 +37,10 @@ id_lock = Lock()
class Match(EmbeddedDocument):
labels = ListField(StringField())
alarm_class = ReferenceField(AlarmClass)
reference_re = StringField()
reference_rx = StringField()
def __str__(self):
return f'{", ".join(self.labels)}, {self.alarm_class or ""}/{self.reference_re}'
return f'{", ".join(self.labels)}, {self.alarm_class or ""}/{self.reference_rx}'
def get_labels(self):
return list(Label.objects.filter(name__in=self.labels))
......@@ -51,7 +52,7 @@ class Group(EmbeddedDocument):
# Group Alarm Class (Group by default)
alarm_class = PlainReferenceField(AlarmClass)
# Group Title template
title_template = StringField()
title_template = StringField(default="")
def __str__(self):
return f'{self.alarm_class or ""}/{self.title_template or ""}: {self.reference_template}'
......@@ -113,15 +114,15 @@ class AlarmRule(Document):
@classmethod
@cachetools.cachedmethod(operator.attrgetter("_id_cache"), lock=lambda _: id_lock)
def get_by_id(cls, id) -> "AlarmRule":
def get_by_id(cls, id) -> Optional["AlarmRule"]:
return AlarmRule.objects.filter(id=id).first()
@classmethod
@cachetools.cachedmethod(operator.attrgetter("_name_cache"), lock=lambda _: id_lock)
def get_by_name(cls, name) -> "AlarmRule":
def get_by_name(cls, name: str) -> Optional["AlarmRule"]:
return AlarmRule.objects.filter(name=name).first()
@classmethod
@cachetools.cachedmethod(operator.attrgetter("_bi_id_cache"), lock=lambda _: id_lock)
def get_by_bi_id(cls, id) -> "AlarmRule":
def get_by_bi_id(cls, id) -> Optional["AlarmRule"]:
return AlarmRule.objects.filter(bi_id=id).first()
......@@ -79,6 +79,8 @@ class ArchivedAlarm(Document):
# RCA
# Reference to root cause (Active Alarm or Archived Alarm instance)
root = ObjectIdField(required=False)
# Group alarm references
groups = ListField(BinaryField())
# Escalated TT ID in form
# <external system name>:<external tt id>
escalation_ts = DateTimeField(required=False)
......@@ -201,6 +203,7 @@ class ArchivedAlarm(Document):
vars=self.vars,
log=log,
root=self.root,
groups=self.groups,
escalation_ts=self.escalation_ts,
escalation_tt=self.escalation_tt,
escalation_error=self.escalation_error,
......
# ---------------------------------------------------------------------
# Alarm Rule
# ---------------------------------------------------------------------
# Copyright (C) 2007-2021 The NOC Project
# See LICENSE for details
# ---------------------------------------------------------------------
# Python modules
import re
from collections import defaultdict
from typing import Optional, DefaultDict, Set, List, Iterable, Pattern, Tuple
from dataclasses import dataclass
# Third-party modules
from jinja2 import Template
# NOC modules
from noc.fm.models.alarmrule import AlarmRule as CfgAlarmRule
from noc.fm.models.alarmclass import AlarmClass
from noc.fm.models.activealarm import ActiveAlarm
DEFAULT_GROUP_CLASS = "Group"
@dataclass
class Match(object):
labels: Set[str]
alarm_class: Optional[AlarmClass]
reference_rx: Optional[Pattern]
@dataclass
class Group(object):
reference_template: Template
alarm_class: AlarmClass
title_template: Template
@dataclass
class GroupItem(object):
reference: str
alarm_class: AlarmClass
title: str
class AlarmRule(object):
_default_alarm_class: Optional[AlarmClass] = None
def __init__(self):
self.match: List[Match] = []
self.groups: List[Group] = []
@classmethod
def try_from(cls, rule_cfg: CfgAlarmRule) -> Optional["AlarmRule"]:
"""
Generate rule from config
"""
rule = AlarmRule()
# Add matches
for match in rule_cfg.match:
rule.match.append(
Match(
labels=set(match.labels),
alarm_class=match.alarm_class,
reference_rx=re.compile(match.reference_rx) if match.reference_rx else None,
)
)
# Add groups
for group in rule_cfg.groups:
rule.groups.append(
Group(
reference_template=Template(group.reference_template),
alarm_class=group.alarm_class
if group.alarm_class
else cls.get_default_alarm_class(),
title_template=Template(group.title_template),
)
)
return rule
def is_match(self, alarm: ActiveAlarm) -> bool:
"""
Check if alarm matches the rule
"""
if not self.match:
return True
lset = set(alarm.effective_labels)
for match in self.match:
# Match against labels
if not match.labels.issubset(lset):
continue
# Match against alarm class
if match.alarm_class and match.alarm_class != alarm.alarm_class:
continue
# Match against referene re
if (
getattr(alarm, "raw_reference", None)
and match.reference_rx
and not match.reference_rx.search(alarm.raw_reference)
):
continue
return True
return False
@classmethod
def get_default_alarm_class(cls) -> AlarmClass:
if not cls._default_alarm_class:
cls._default_alarm_class = AlarmClass.get_by_name(DEFAULT_GROUP_CLASS)
assert cls._default_alarm_class
return cls._default_alarm_class
def iter_groups(self, alarm: ActiveAlarm) -> Iterable[GroupItem]:
"""
Render group templates
"""
ctx = {"alarm": alarm}
for group in self.groups:
yield GroupItem(
reference=group.reference_template.render(**ctx),
alarm_class=group.alarm_class,
title=group.title_template.render(**ctx),
)
class AlarmRuleSet(object):
"""
Full set of alarm rules
"""
def __init__(self):
self._label_rules: DefaultDict[Tuple[str, ...], List[AlarmRule]] = defaultdict(list)
self.label_rules: List[Tuple[Set[str], List[AlarmRule]]] = []
def add(self, rule: CfgAlarmRule):
"""
Add rule to set
"""
if not rule.is_active:
return
new_rule = AlarmRule.try_from(rule)
if not new_rule:
return
if rule.match:
for match in rule.match:
lset = tuple(sorted(match.labels))
self._label_rules[lset].append(new_rule)
else:
self._label_rules[tuple()].append(new_rule)
def compile(self):
"""
Finalize rules
"""
self.label_rules = [(set(k), v) for k, v in self._label_rules.items()]
self._label_rules = defaultdict(list)
def iter_candidates(self, alarm: ActiveAlarm) -> Iterable[AlarmRule]:
"""
Iterable candidate rules with matching labels
"""
lset = set(alarm.effective_labels)
seen: Set[AlarmRule] = set()
for mset, rules in self.label_rules:
if not mset.issubset(lset):
continue
for rule in rules:
if rule in seen:
continue
yield rule
seen.add(rule)
def iter_rules(self, alarm: ActiveAlarm) -> Iterable[AlarmRule]:
"""
Iterate all matched rules
"""
for rule in self.iter_candidates(alarm):
if rule.is_match(alarm):
yield rule
......@@ -11,7 +11,8 @@ import sys
import datetime
import re
from collections import defaultdict
from typing import Any, Optional, Dict, List
import threading
from typing import Union, Any, Iterable, Optional, Dict, List
import operator
from hashlib import sha512
......@@ -20,6 +21,7 @@ import orjson
from bson import ObjectId
from dateutil.parser import parse as parse_date
from pydantic import parse_obj_as, ValidationError
import cachetools
# NOC modules
from noc.config import config
......@@ -27,6 +29,7 @@ from noc.core.service.tornado import TornadoService
from noc.core.scheduler.scheduler import Scheduler
from noc.core.mongo.connection import connect
from noc.sa.models.managedobject import ManagedObject
from noc.services.correlator.alarmrule import AlarmRuleSet
from noc.services.correlator.rule import Rule
from noc.services.correlator.rcacondition import RCACondition
from noc.services.correlator.trigger import Trigger
......@@ -43,6 +46,7 @@ from noc.fm.models.alarmtrigger import AlarmTrigger
from noc.fm.models.archivedalarm import ArchivedAlarm
from noc.fm.models.alarmescalation import AlarmEscalation
from noc.fm.models.alarmdiagnosticconfig import AlarmDiagnosticConfig
from noc.fm.models.alarmrule import AlarmRule
from noc.main.models.remotesystem import RemoteSystem
from noc.sa.models.servicesummary import ServiceSummary, SummaryItem, ObjectSummaryItem
from noc.core.version import version
......@@ -52,6 +56,9 @@ from noc.core.perf import metrics
from noc.core.fm.enum import RCA_RULE, RCA_TOPOLOGY, RCA_DOWNLINK_MERGE
from noc.core.liftbridge.message import Message
from noc.services.correlator.rcalock import RCALock
from services.correlator.alarmrule import GroupItem
ref_lock = threading.Lock()
class CorrelatorService(TornadoService):
......@@ -60,6 +67,8 @@ class CorrelatorService(TornadoService):
use_mongo = True
process_name = "noc-%(name).10s-%(pool).5s"
_reference_cache = cachetools.TTLCache(100, ttl=60)
def __init__(self):
super().__init__()
self.version = version.version
......@@ -68,6 +77,7 @@ class CorrelatorService(TornadoService):
self.triggers: Dict[ObjectId, List[Trigger]] = {}
self.rca_forward = {} # alarm_class -> [RCA condition, ..., RCA condititon]
self.rca_reverse = defaultdict(set) # alarm_class -> set([alarm_class])
self.alarm_rule_set = AlarmRuleSet()
#
self.slot_number = 0
self.total_slots = 0
......@@ -120,6 +130,7 @@ class CorrelatorService(TornadoService):
self.load_rules()
self.load_triggers()
self.load_rca_rules()
self.load_alarm_rules()
def load_rules(self):
"""
......@@ -145,7 +156,7 @@ class CorrelatorService(TornadoService):
self.back_rules[cc.id] = [dr]
nbr += 1
self.rules[c.id] = r
self.logger.debug("%d rules are loaded. %d combos" % (nr, nbr))
self.logger.debug("%d rules are loaded. %d combos", nr, nbr)
def load_triggers(self):
self.logger.info("Loading triggers")
......@@ -164,7 +175,7 @@ class CorrelatorService(TornadoService):
cn += 1
self.logger.debug(" %s" % c_name)
n += 1
self.logger.info("%d triggers has been loaded to %d classes" % (n, cn))
self.logger.info("%d triggers has been loaded to %d classes", n, cn)
def load_rca_rules(self):
"""
......@@ -185,7 +196,19 @@ class CorrelatorService(TornadoService):
self.rca_reverse[rc.root.id] = []
self.rca_reverse[rc.root.id] += [rc]
n += 1
self.logger.info("%d RCA Rules have been loaded" % n)
self.logger.info("%d RCA Rules have been loaded", n)
def load_alarm_rules(self):
"""
Load Alarm Rules
"""
self.logger.info("Loading alarm rules")
n = 0
for rule in AlarmRule.objects.filter(is_active=True):
self.alarm_rule_set.add(rule)
n += 1
self.alarm_rule_set.compile()
self.logger.info("%d Alam Rules have been loaded", n)
def mark_as_failed(self, event: "ActiveEvent"):
"""
......@@ -432,6 +455,18 @@ class CorrelatorService(TornadoService):
remote_system=remote_system,
remote_id=remote_id,
)
a.effective_labels = a.iter_effective_labels(a)
a.raw_reference = reference
# @todo: Static groups
# Apply rules
groups: Dict[str, GroupItem] = {}
for rule in self.alarm_rule_set.iter_rules(a):
for gi in rule.iter_groups(a):
if gi.reference and gi.reference not in groups:
groups[gi.reference] = gi
all_groups = await self.get_groups(a, groups.values())
a.groups = [g.reference for g in all_groups]
# Save
a.save()
if event:
event.contribute_to_alarm(a)
......@@ -465,6 +500,7 @@ class CorrelatorService(TornadoService):
# Watch for escalations, when necessary
if config.correlator.auto_escalation and not a.root:
AlarmEscalation.watch_escalations(a)
return a
async def raise_alarm_from_rule(self, rule: Rule, event: ActiveEvent):
"""
......@@ -532,7 +568,7 @@ class CorrelatorService(TornadoService):
metrics["alarm_drop"] += 1
return
def clear_alarm_from_rule(self, rule: "Rule", event: "ActiveEvent"):
async def clear_alarm_from_rule(self, rule: "Rule", event: "ActiveEvent"):
managed_object = self.eval_expression(rule.managed_object, event=event)
if not managed_object:
self.logger.info(
......@@ -562,8 +598,10 @@ class CorrelatorService(TornadoService):
event.contribute_to_alarm(alarm)
alarm.closing_event = event.id
alarm.last_update = max(alarm.last_update, event.timestamp)
groups = alarm.groups
alarm.clear_alarm("Cleared by disposition rule '%s'" % rule.u_name, ts=event.timestamp)
metrics["alarm_clear"] += 1
await self.clear_groups(groups, ts=event.timestamp)
def get_delayed_event(self, rule: Rule, event: ActiveEvent):
"""
......@@ -577,7 +615,7 @@ class CorrelatorService(TornadoService):
reference = self.get_default_reference(
managed_object=event.managed_object, alarm_class=rule.alarm_class, vars=vars
)
ref_hash = self.get_reference(reference)
ref_hash = self.get_reference_hash(reference)
ws = event.timestamp - datetime.timedelta(seconds=rule.combo_window)
de = ActiveEvent.objects.filter(
managed_object=event.managed_object_id,
......@@ -707,11 +745,24 @@ class CorrelatorService(TornadoService):
"""
# Fetch timestamp
ts = parse_date(req.timestamp) if req.timestamp else datetime.datetime.now()
await self.clear_by_reference(req.reference, ts)
async def clear_by_reference(
self, reference: Union[str, bytes], ts: Optional[datetime.datetime] = None
) -> None:
"""
Clear alarm by reference
"""
ts = ts or datetime.datetime.now()
# Normalize reference
if isinstance(reference, str):
ref_hash = self.get_reference_hash(reference)
else:
ref_hash = reference
# Get alarm
ref_hash = self.get_reference_hash(req.reference)
alarm = ActiveAlarm.objects.filter(reference=ref_hash).first()
if not alarm:
self.logger.info("Alarm '%s' is not found. Skipping", req.reference)
self.logger.info("Alarm '%s' is not found. Skipping", reference)
return
# Clear alarm
self.logger.info(
......@@ -720,11 +771,13 @@ class CorrelatorService(TornadoService):
alarm.managed_object.address,
alarm.alarm_class.name,
alarm.id,
req.reference,
reference,
)
alarm.last_update = max(alarm.last_update, ts)
groups = alarm.groups
alarm.clear_alarm("Cleared by reference")
metrics["alarm_clear"] += 1
await self.clear_groups(groups, ts=ts)
async def dispose_event(self, e: ActiveEvent):
"""
......@@ -753,7 +806,7 @@ class CorrelatorService(TornadoService):
elif rule.action == "raise" and rule.combo_condition == "none":
await self.raise_alarm_from_rule(rule, e)
elif rule.action == "clear" and rule.combo_condition == "none":
self.clear_alarm_from_rule(rule, e)
await self.clear_alarm_from_rule(rule, e)
if rule.action in ("raise", "clear"):
# Write reference if can trigger delayed event
if rule.unique and rule.event_class.id in self.back_rules:
......@@ -771,7 +824,7 @@ class CorrelatorService(TornadoService):
if br.action == "raise":
await self.raise_alarm_from_rule(br, de)
elif br.action == "clear":
self.clear_alarm_from_rule(br, de)
await self.clear_alarm_from_rule(br, de)
if rule.stop_disposition:
break
self.logger.info("[%s] Disposition complete", event_id)
......@@ -909,6 +962,67 @@ class CorrelatorService(TornadoService):
correlate_uplinks(a)
self.logger.debug("[%s] Correlation completed", alarm.id)
async def get_groups(
self, alarm: ActiveAlarm, groups: Iterable[GroupItem]
) -> List[ActiveAlarm]:
"""
Resolve all groups and create when necessary
"""
r: List[ActiveAlarm] = []
for group in groups:
if group.reference == alarm.raw_reference:
continue # Reference cycle
g_alarm = self.get_by_reference(group.reference)
if not g_alarm:
# Raise group alarm
g_alarm = await self.raise_alarm(
managed_object=alarm.managed_object,
timestamp=alarm.timestamp,
alarm_class=group.alarm_class,
vars={"name": group.title},
reference=group.reference,
)
if g_alarm:
# Update cache
self._reference_cache[group.reference] = g_alarm
if g_alarm:
r.append(g_alarm)
return r
async def clear_groups(self, groups: List[bytes], ts: Optional[datetime.datetime]) -> None:
"""
Clear group alarms from list when necessary
@todo: Possible race when called from different processes
:param groups: List of group reference hashes
:param ts: Clear timestamp
"""
# Get groups summary
r: Dict[bytes, int] = {}
for doc in ActiveAlarm._get_collection().aggregate(
[
# Filter all active alarms in the selected groups
{"$match": {"groups": {"$in": groups}}},
# Leave only `groups` field
{"$project": {"groups": 1}},
# Unwind `groups` array to separate documents
{"$unwind": "$groups"},
# Group by each group reference
{"$group": {"_id": "$groups", "n": {"$sum": 1}}},
]
):
r[doc["_id"]] = doc["n"]
for ref in groups:
if r.get(ref, 0) == 0:
self.logger.info("Clear empty group %r", ref)
await self.clear_by_reference(ref, ts=ts)
@classmethod