Commit c999d2f9 authored by Andrey Vertiprahov's avatar Andrey Vertiprahov
Browse files

Add event to mx message.

parent 820ace7e
......@@ -462,6 +462,7 @@ class Config(BaseConfig):
class message(ConfigSection):
enable_alarm = BooleanParameter(default=False)
enable_events = BooleanParameter(default=False)
enable_managedobject = BooleanParameter(default=False)
enable_reboot = BooleanParameter(default=False)
enable_metrics = BooleanParameter(default=False)
......
......@@ -15,6 +15,7 @@ import operator
import re
from time import perf_counter
from typing import Optional, Dict
from dataclasses import asdict
# Third-party modules
import cachetools
......@@ -46,8 +47,16 @@ from noc.services.classifier.abdetector import AbductDetector
from noc.core.perf import metrics
from noc.core.handler import get_handler
from noc.core.ioloop.timers import PeriodicCallback
from noc.core.comp import smart_text
from noc.core.comp import smart_text, DEFAULT_ENCODING
from noc.core.liftbridge.message import Message
from noc.core.mx import (
MX_STREAM,
get_mx_partitions,
MX_MESSAGE_TYPE,
MX_SHARDING_KEY,
MX_LABELS,
MX_H_VALUE_SPLITTER,
)
# Patterns
rx_oid = re.compile(r"^(\d+\.){6,}$")
......@@ -265,6 +274,41 @@ class ClassifierService(FastAPIService):
e.mark_as_new("Reclassification has been requested by noc-classifer")
self.logger.debug("Failed event %s has been recovered", e.id)
def register_mx_message(self, event: "ActiveEvent"):
metrics["events_message"] += 1
n_partitions = get_mx_partitions()
msg = {
"timestamp": time,
"uuid": event.raw_vars.get("uuid"),
"collector_type": event.source,
"collector": event.raw_vars.get("collector"),
"address": event.raw_vars.get("source_address"),
"managed_object": asdict(cfg.managed_object),
"event_class": event.event_class,
"event_vars": event.vars,
}
if event.source == E_SRC_SYSLOG:
msg["syslog_vars"] = {
"facility": event.raw_vars.get("facility", ""),
"severity": event.raw_vars.get("severity", ""),
"message": event.raw_vars.get("message", ""),
}
if event.source == E_SRC_SNMP_TRAP:
msg["snmp_vars"] = [{"oid": "", "named_oid": "", "value": "", "raw_value": ""}]
# Register MX message
self.publish(
value=orjson.dumps(msg),
stream=MX_STREAM,
partition=int(event.managed_object.id) % n_partitions,
headers={
MX_MESSAGE_TYPE: b"events",
MX_LABELS: MX_H_VALUE_SPLITTER.join(event.managed_object.effective_labels).encode(
DEFAULT_ENCODING
),
MX_SHARDING_KEY: str(event.managed_object.id).encode(DEFAULT_ENCODING),
},
)
@classmethod
@cachetools.cachedmethod(operator.attrgetter("interface_cache"))
def get_interface(cls, managed_object_id, name):
......@@ -359,7 +403,7 @@ class ClassifierService(FastAPIService):
event.event_class = rule.event_class
# Calculate rule variables
event.vars = self.ruleset.eval_vars(event, event.event_class, vars)
message = "Classified as '%s' by rule '%s'" % (event.event_class.name, rule.name)
message = f"Classified as '{event.event_class.name}' by rule '{rule.name}'"
event.log += [
EventLog(
timestamp=datetime.datetime.now(),
......@@ -391,6 +435,8 @@ class ClassifierService(FastAPIService):
self.dedup_filter.register(event)
# Fill suppress filter
self.suppress_filter.register(event)
if config.message.enable_events:
self.register_mx_message(event)
# Call handlers
if self.call_event_handlers(event):
return
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment