Commit 1e41a597 authored by Andrey Vertiprahov's avatar Andrey Vertiprahov
Browse files

Merge branch 'noc-correlator-ensure-group' into 'master'

correlator: ensure_group request

See merge request noc/noc!5886
parents adef51e7 9844a865
......@@ -63,3 +63,19 @@ in the `$op` field. Unknown message types and malformed messages are discarded.
| `$op` | String | Equals to `clear` |
| `reference` | String | Alarm reference. Should be the same as in previous [raise](#raise-message) message. |
| `timestamp` | String | Optional timestamp in ISO 8601 format |
### ensure_group message
`ensure_group` message creates and synchronizes group with given set of alarms.
| Field | Type | Descriptionn |
| -------------------------- | ---------------------------- | --------------------------------------------------------------------------------------- |
| `$op` | String | Equals to `ensure_group` |
| `reference` | String | Alarm reference. See [alarm reference format](../alarm-reference-format.md) for details |
| `name` | String | Group alarm title |
| `alarm_class` | String | Optional group alarm class |
| `alarms` | Array of Object {{ complex}} | List of active alarms |
| {{ tab }} `reference` | String | Alarm reference. See [alarm reference format](../alarm-reference-format.md) for details |
| {{ tab }} `timestamp` | String | Optional timestamp in ISO 8601 format |
| {{ tab }} `managed_object` | String | Managed Object'd ID |
| {{ tab }} `alarm_class` | String | Alarm class name |
| {{ tab }} `vars` | Object {{ complex }} | Alarm variables |
......@@ -12,5 +12,6 @@ from typing import Union
from .clearreq import ClearRequest
from .eventreq import EventRequest
from .raisereq import RaiseRequest
from .ensuregroupreq import EnsureGroupRequest
DisposeRequest = Union[ClearRequest, EventRequest, RaiseRequest]
DisposeRequest = Union[ClearRequest, EventRequest, RaiseRequest, EnsureGroupRequest]
# ---------------------------------------------------------------------
# EnsureGroup Request
# ---------------------------------------------------------------------
# Copyright (C) 2007-2021 The NOC Project
# See LICENSE for details
# ---------------------------------------------------------------------
# Python modules
from typing import Optional, Dict, Any, List
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal # py3.7 support
# Third-party modules
from pydantic import BaseModel, Field
class AlarmItem(BaseModel):
reference: str
managed_object: str
alarm_class: str
timestamp: Optional[str]
vars: Optional[Dict[str, Any]]
remote_system: Optional[str]
remote_id: Optional[str]
class EnsureGroupRequest(BaseModel):
op: Literal["ensure_group"] = Field(None, alias="$op")
reference: str
name: Optional[str]
alarm_class: Optional[str]
alarms: List[AlarmItem]
......@@ -12,7 +12,7 @@ import datetime
import re
from collections import defaultdict
import threading
from typing import Union, Any, Iterable, Optional, Dict, List
from typing import Union, Any, Iterable, Optional, Dict, List, Set
import operator
from hashlib import sha512
......@@ -37,6 +37,7 @@ from noc.services.correlator.models.disposereq import DisposeRequest
from noc.services.correlator.models.eventreq import EventRequest
from noc.services.correlator.models.clearreq import ClearRequest
from noc.services.correlator.models.raisereq import RaiseRequest
from noc.services.correlator.models.ensuregroupreq import EnsureGroupRequest
from noc.fm.models.eventclass import EventClass
from noc.fm.models.activeevent import ActiveEvent
from noc.fm.models.activealarm import ActiveAlarm
......@@ -764,6 +765,96 @@ class CorrelatorService(TornadoService):
ts = parse_date(req.timestamp) if req.timestamp else datetime.datetime.now()
await self.clear_by_reference(req.reference, ts)
async def on_msg_ensure_group(self, req: EnsureGroupRequest) -> None:
"""
Process `ensure_group` message.
"""
# Find existing group alarm
group_alarm = self.get_by_reference(req.reference)
if not group_alarm and not req.alarms:
return # Nothing to clear, nothing to create
# Check managed objects and timestamps
mos: Dict[str, ManagedObject] = {}
tses: Dict[str, datetime.datetime] = {}
alarm_classes: Dict[str, AlarmClass] = {}
for ai in req.alarms:
# Managed Object
mo = ManagedObject.get_by_id(int(ai.managed_object))
if not mo:
self.logger.error("Invalid managed object: %s", ai.managed_object)
return
mos[ai.managed_object] = mo
# Timestamp
if ai.timestamp:
tses[ai.timestamp] = parse_date(ai.timestamp)
# Alarm class
alarm_class = AlarmClass.get_by_name(ai.alarm_class)
if not alarm_class:
self.logger.error("Invalid alarm class: %s", ai.alarm_class)
return
alarm_classes[ai.alarm_class] = alarm_class
#
now = datetime.datetime.now()
if not group_alarm:
# Create group alarm
# Calculate timestamp and managed object
mo_id = req.alarms[0].managed_object
min_ts = now
for ai in req.alarms:
if not ai.timestamp:
continue
ts = tses[ai.timestamp]
if ts < min_ts:
mo_id = ai.managed_object
min_ts = ts
# Resolve managed object
mo = mos[mo_id]
# Get group alarm's alarm class
if req.alarm_class:
alarm_class = AlarmClass.get_by_name(req.alarm_class)
if not alarm_class:
self.logger.error("Invalid group alarm class: %s", req.alarm_class)
return
else:
alarm_class = CAlarmRule.get_default_alarm_class()
# Raise group alarm
group_alarm = await self.raise_alarm(
managed_object=mo,
timestamp=min_ts,
alarm_class=alarm_class,
vars={"name": req.name or "Group"},
reference=req.reference,
)
# Fetch all open alarms in group
open_alarms: Dict[bytes, ActiveAlarm] = {
alarm.reference: alarm
for alarm in ActiveAlarm.objects.filter(groups__in=[group_alarm.reference])
}
seen_refs: Set[bytes] = set()
for ai in req.alarms:
h_ref = self.get_reference_hash(ai.reference)
if h_ref in open_alarms:
seen_refs.add(h_ref)
continue # Alarm is still active, skipping
# Raise new alarm
await self.raise_alarm(
managed_object=mos[ai.managed_object],
timestamp=tses[ai.timestamp] if ai.timestamp else now,
alarm_class=alarm_classes[ai.alarm_class],
vars=ai.vars or {},
reference=ai.reference,
groups=[
GroupItem(
reference=req.reference,
alarm_class=group_alarm.alarm_class,
title=req.name or "Group",
)
],
)
# Close hanging alarms
for h_ref in set(open_alarms) - seen_refs:
await self.clear_by_reference(h_ref, ts=now)
async def clear_by_reference(
self, reference: Union[str, bytes], ts: Optional[datetime.datetime] = None
) -> None:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment