Commit b7da751b authored by Dmitry Volodin's avatar Dmitry Volodin
Browse files

Merge branch 'noc-1445' into 'master'

noc/noc#1445 Add Label header to MX messages.

See merge request noc/noc!6394
parents cfc348ba 85b32c7f
...@@ -52,6 +52,8 @@ class DataStream(object): ...@@ -52,6 +52,8 @@ class DataStream(object):
F_HASH = "hash" F_HASH = "hash"
F_DATA = "data" F_DATA = "data"
F_META = "meta" F_META = "meta"
F_LABELS_META = "$meta_labels"
F_ADM_DOMAIN_META = "$meta_adm_domain"
HASH_LEN = 16 HASH_LEN = 16
DEFAULT_LIMIT = 1000 DEFAULT_LIMIT = 1000
...@@ -244,9 +246,9 @@ class DataStream(object): ...@@ -244,9 +246,9 @@ class DataStream(object):
obj_id = cls.clean_id(data["id"]) obj_id = cls.clean_id(data["id"])
if meta is None and "$meta" in data: if meta is None and "$meta" in data:
meta = data.pop("$meta") meta = data.pop("$meta")
m_name = "%s_%s" % (cls.name, fmt) if fmt else cls.name m_name = f"{cls.name}_{fmt}" if fmt else cls.name
l_name = "%s|%s|%s" % (cls.name, obj_id, fmt) if fmt else "%s|%s" % (cls.name, obj_id) l_name = f"{cls.name}|{obj_id}|{fmt}" if fmt else f"{cls.name}|{obj_id}"
metrics["ds_%s_updated" % m_name] += 1 metrics[f"ds_{m_name}_updated"] += 1
# Calculate hash # Calculate hash
hash = cls.get_hash(data) hash = cls.get_hash(data)
# Get existing object state # Get existing object state
...@@ -264,7 +266,7 @@ class DataStream(object): ...@@ -264,7 +266,7 @@ class DataStream(object):
if not is_changed(doc, hash): if not is_changed(doc, hash):
logger.info("[%s] Object hasn't been changed after altering", l_name) logger.info("[%s] Object hasn't been changed after altering", l_name)
return False # Not changed after altering return False # Not changed after altering
metrics["ds_%s_changed" % m_name] += 1 metrics[f"ds_{m_name}_changed"] += 1
change_id = bson.ObjectId() change_id = bson.ObjectId()
data["change_id"] = str(change_id) data["change_id"] = str(change_id)
op = { op = {
...@@ -300,9 +302,10 @@ class DataStream(object): ...@@ -300,9 +302,10 @@ class DataStream(object):
try: try:
data = cls.get_object(obj_id) data = cls.get_object(obj_id)
meta = cls.get_meta(data) meta = cls.get_meta(data)
return data, meta return cls.clean_meta_fields(data), meta
except KeyError: except KeyError:
return cls.get_deleted_object(obj_id), None data, meta = cls.get_deleted_object(obj_id), None
return cls.clean_meta_fields(data), meta
@classmethod @classmethod
def update_object(cls, id, delete=False) -> bool: def update_object(cls, id, delete=False) -> bool:
...@@ -688,3 +691,11 @@ class DataStream(object): ...@@ -688,3 +691,11 @@ class DataStream(object):
) )
# Cleanup # Cleanup
del data["$changeid"] del data["$changeid"]
@classmethod
def clean_meta_fields(cls, data: Dict[str, Any]):
if cls.F_LABELS_META in data:
del data[cls.F_LABELS_META]
if cls.F_ADM_DOMAIN_META in data:
del data[cls.F_ADM_DOMAIN_META]
return data
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
# mx utilities # mx utilities
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
# Copyright (C) 2007-2021 The NOC Project # Copyright (C) 2007-2022 The NOC Project
# See LICENSE for details # See LICENSE for details
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
...@@ -27,16 +27,28 @@ MX_TO = "To" ...@@ -27,16 +27,28 @@ MX_TO = "To"
MX_CHANGE_ID = "Change-Id" MX_CHANGE_ID = "Change-Id"
MX_ADMINISTRATIVE_DOMAIN_ID = "Administrative-Domain-Id" MX_ADMINISTRATIVE_DOMAIN_ID = "Administrative-Domain-Id"
MX_PROFILE_ID = "Profile-Id" MX_PROFILE_ID = "Profile-Id"
MX_LABELS = "Labels"
MX_H_VALUE_SPLITTER = ";"
# Available message types # Available message types
MESSAGE_TYPES = {"alarm", "managedobject", "reboot", "metrics", "notification"} MESSAGE_TYPES = {"alarm", "managedobject", "reboot", "metrics", "notification"}
MESSAGE_HEADERS = {MX_SHARDING_KEY, MX_TO, MX_CHANGE_ID, MX_ADMINISTRATIVE_DOMAIN_ID, MX_PROFILE_ID} MESSAGE_HEADERS = {
MX_SHARDING_KEY,
MX_TO,
MX_CHANGE_ID,
MX_ADMINISTRATIVE_DOMAIN_ID,
MX_PROFILE_ID,
MX_LABELS,
}
_mx_partitions: Optional[int] = None _mx_partitions: Optional[int] = None
_mx_lock = Lock() _mx_lock = Lock()
def send_message( def send_message(
data: Any, message_type: str, headers: Optional[Dict[str, bytes]], sharding_key: int = 0 data: Any,
message_type: str,
headers: Optional[Dict[str, bytes]],
sharding_key: int = 0,
): ):
""" """
Build message and schedule to send to mx service Build message and schedule to send to mx service
......
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
# alarm datastream # alarm datastream
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project # Copyright (C) 2007-2022 The NOC Project
# See LICENSE for details # See LICENSE for details
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
...@@ -18,7 +18,7 @@ from noc.fm.models.alarmclass import AlarmClass ...@@ -18,7 +18,7 @@ from noc.fm.models.alarmclass import AlarmClass
from noc.fm.models.utils import get_alarm from noc.fm.models.utils import get_alarm
from noc.main.models.label import Label from noc.main.models.label import Label
from noc.core.comp import smart_bytes from noc.core.comp import smart_bytes
from noc.core.mx import MX_PROFILE_ID from noc.core.mx import MX_LABELS, MX_PROFILE_ID, MX_ADMINISTRATIVE_DOMAIN_ID, MX_H_VALUE_SPLITTER
class AlarmDataStream(DataStream): class AlarmDataStream(DataStream):
...@@ -35,6 +35,9 @@ class AlarmDataStream(DataStream): ...@@ -35,6 +35,9 @@ class AlarmDataStream(DataStream):
raise KeyError() raise KeyError()
r = { r = {
"id": str(alarm.id), "id": str(alarm.id),
"$version": 1,
cls.F_LABELS_META: alarm.managed_object.effective_labels,
cls.F_ADM_DOMAIN_META: alarm.managed_object.administrative_domain.id,
"timestamp": cls.qs(alarm.timestamp), "timestamp": cls.qs(alarm.timestamp),
"severity": alarm.severity, "severity": alarm.severity,
"reopens": alarm.reopens, "reopens": alarm.reopens,
...@@ -164,4 +167,6 @@ class AlarmDataStream(DataStream): ...@@ -164,4 +167,6 @@ class AlarmDataStream(DataStream):
def get_msg_headers(cls, data: Dict[str, Any]) -> Optional[Dict[str, bytes]]: def get_msg_headers(cls, data: Dict[str, Any]) -> Optional[Dict[str, bytes]]:
return { return {
MX_PROFILE_ID: smart_bytes(data["managed_object"]["object_profile"]["id"]), MX_PROFILE_ID: smart_bytes(data["managed_object"]["object_profile"]["id"]),
MX_ADMINISTRATIVE_DOMAIN_ID: smart_bytes(data[cls.F_ADM_DOMAIN_META]),
MX_LABELS: smart_bytes(MX_H_VALUE_SPLITTER.join(data[cls.F_LABELS_META])),
} }
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
# managedobject datastream # managedobject datastream
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project # Copyright (C) 2007-2022 The NOC Project
# See LICENSE for details # See LICENSE for details
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
...@@ -30,7 +30,7 @@ from noc.inv.models.object import Object ...@@ -30,7 +30,7 @@ from noc.inv.models.object import Object
from noc.sa.models.service import Service from noc.sa.models.service import Service
from noc.core.text import alnum_key from noc.core.text import alnum_key
from noc.core.comp import smart_text, smart_bytes from noc.core.comp import smart_text, smart_bytes
from noc.core.mx import MX_ADMINISTRATIVE_DOMAIN_ID, MX_PROFILE_ID from noc.core.mx import MX_ADMINISTRATIVE_DOMAIN_ID, MX_LABELS, MX_PROFILE_ID, MX_H_VALUE_SPLITTER
def qs(s): def qs(s):
...@@ -55,6 +55,8 @@ class ManagedObjectDataStream(DataStream): ...@@ -55,6 +55,8 @@ class ManagedObjectDataStream(DataStream):
r = { r = {
"id": str(id), "id": str(id),
"$version": 1, "$version": 1,
cls.F_LABELS_META: mo.effective_labels,
cls.F_ADM_DOMAIN_META: mo.administrative_domain.id,
"bi_id": mo.bi_id, "bi_id": mo.bi_id,
"name": qs(mo.name), "name": qs(mo.name),
"profile": qs(mo.profile.name), "profile": qs(mo.profile.name),
...@@ -484,6 +486,7 @@ class ManagedObjectDataStream(DataStream): ...@@ -484,6 +486,7 @@ class ManagedObjectDataStream(DataStream):
@classmethod @classmethod
def get_msg_headers(cls, data: Dict[str, Any]) -> Optional[Dict[str, bytes]]: def get_msg_headers(cls, data: Dict[str, Any]) -> Optional[Dict[str, bytes]]:
return { return {
MX_ADMINISTRATIVE_DOMAIN_ID: smart_bytes(data["administrative_domain"]["id"]), MX_ADMINISTRATIVE_DOMAIN_ID: smart_bytes(data[cls.F_ADM_DOMAIN_META]),
MX_LABELS: smart_bytes(MX_H_VALUE_SPLITTER.join(data[cls.F_LABELS_META])),
MX_PROFILE_ID: smart_bytes(data["object_profile"]["id"]), MX_PROFILE_ID: smart_bytes(data["object_profile"]["id"]),
} }
# --------------------------------------------------------------------- # ---------------------------------------------------------------------
# Uptime check # Uptime check
# --------------------------------------------------------------------- # ---------------------------------------------------------------------
# Copyright (C) 2007-2015 The NOC Project # Copyright (C) 2007-2022 The NOC Project
# See LICENSE for details # See LICENSE for details
# --------------------------------------------------------------------- # ---------------------------------------------------------------------
...@@ -11,7 +11,13 @@ import datetime ...@@ -11,7 +11,13 @@ import datetime
# NOC modules # NOC modules
from noc.services.discovery.jobs.base import DiscoveryCheck from noc.services.discovery.jobs.base import DiscoveryCheck
from noc.fm.models.uptime import Uptime from noc.fm.models.uptime import Uptime
from noc.core.mx import send_message, MX_PROFILE_ID from noc.core.mx import (
send_message,
MX_LABELS,
MX_PROFILE_ID,
MX_ADMINISTRATIVE_DOMAIN_ID,
MX_H_VALUE_SPLITTER,
)
from noc.config import config from noc.config import config
from noc.core.hash import hash_int from noc.core.hash import hash_int
from noc.core.comp import smart_bytes from noc.core.comp import smart_bytes
...@@ -87,6 +93,8 @@ class UptimeCheck(DiscoveryCheck): ...@@ -87,6 +93,8 @@ class UptimeCheck(DiscoveryCheck):
data, data,
message_type="reboot", message_type="reboot",
headers={ headers={
MX_LABELS: smart_bytes(MX_H_VALUE_SPLITTER.join(mo.effective_labels)),
MX_ADMINISTRATIVE_DOMAIN_ID: smart_bytes(mo.administrative_domain.id),
MX_PROFILE_ID: smart_bytes(mo.object_profile.id), MX_PROFILE_ID: smart_bytes(mo.object_profile.id),
}, },
sharding_key=hash_int(mo.id) & 0xFFFFFFFF, sharding_key=hash_int(mo.id) & 0xFFFFFFFF,
......
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
# Route # Route
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project # Copyright (C) 2007-2022 The NOC Project
# See LICENSE for details # See LICENSE for details
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
...@@ -11,8 +11,9 @@ from typing import Tuple, Dict, List, DefaultDict, Iterator ...@@ -11,8 +11,9 @@ from typing import Tuple, Dict, List, DefaultDict, Iterator
# NOC modules # NOC modules
from noc.core.liftbridge.message import Message from noc.core.liftbridge.message import Message
from noc.main.models.messageroute import MessageRoute
from noc.core.comp import smart_bytes from noc.core.comp import smart_bytes
from noc.core.mx import MX_LABELS, MX_H_VALUE_SPLITTER
from noc.main.models.messageroute import MessageRoute
from .action import Action from .action import Action
from .transmute import Transmutation from .transmute import Transmutation
...@@ -31,12 +32,16 @@ class Route(object): ...@@ -31,12 +32,16 @@ class Route(object):
:param msg: :param msg:
:return: :return:
""" """
return eval(self.match_co, {"headers": msg.headers}) headers = msg.headers
if MX_LABELS in headers:
headers[MX_LABELS] = headers[MX_LABELS].split(smart_bytes(MX_H_VALUE_SPLITTER))
return eval(self.match_co, {"headers": headers})
def iter_transmute(self, headers: Dict[str, bytes], data: bytes) -> Iterator[bytes]: def iter_transmute(self, headers: Dict[str, bytes], data: bytes) -> Iterator[bytes]:
""" """
Transmute message body and apply all the transformations Transmute message body and apply all the transformations
:param msg: :param headers:
:param data:
:return: :return:
""" """
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment