Commit 17e82ce7 authored by Andrey Vertiprahov's avatar Andrey Vertiprahov
Browse files

Add send syslog message to mx.

parent 820ace7e
......@@ -467,6 +467,7 @@ class Config(BaseConfig):
enable_metrics = BooleanParameter(default=False)
# Comma-separated list of metric scopes
enable_metric_scopes = ListParameter(item=StringParameter(), default=[])
enable_syslog = BooleanParameter(default=False)
class mongo(ConfigSection):
addresses = ServiceParameter(service="mongo", wait=True)
......
......@@ -30,7 +30,7 @@ MX_PROFILE_ID = "Profile-Id"
MX_LABELS = "Labels"
MX_H_VALUE_SPLITTER = ";"
# Available message types
MESSAGE_TYPES = {"alarm", "managedobject", "reboot", "metrics", "notification"}
MESSAGE_TYPES = {"alarm", "managedobject", "reboot", "metrics", "notification", "syslog"}
MESSAGE_HEADERS = {
MX_SHARDING_KEY,
MX_TO,
......
# syslog MX Message
`syslog` message is generated by [syslogcollector](../../../admin/reference/services/syslogcollector.md)
service when managed object Syslog is received.
## Message Headers
Message-Type
: Type of message. Always `syslog`.
Sharding-Key
: Key for consistent sharding.
Labels
: Managed Object's effective labels.
## Message Format
Message contains JSON array, containing objects of following structure
| Name | Type | Description |
| ---------- | -------- | -------------------------------------------------------------------- |
| timestamp | DateTime | ISO 8601 timestamp (i.e. `YYYY-MM-DDTHH:MM:SS`) of collected metrics |
| uuid | String | Global `UUIDv4` |
| collector_type | String | Always `syslog` |
| collector | String | Collector Pool |
| address | String | SNMP Trap Source Address |
| object | Object {{ complex }} | Managed Object |
| {{ tab }} id | String | Managed Object's ID |
| {{ tab }} bi_id | Integer | Managed Object's BI ID |
| {{ tab }} name | String | Managed Object's name |
| {{ tab }} labels | Array of String | Managed Object's labels |
| {{ tab }} administrative_domain | Object {{ complex }} | Managed Object Administrative Domain's |
| {{ tab2 }} remote_system | Object {{ complex }} | Managed Object Administrative Domain's [Remote System](../../../user/reference/concepts/remote-system/index.md) (if imported) |
| {{ tab4 }} id | String | Remote System's ID |
| {{ tab4 }} name | String | Remote System's Name |
| {{ tab2 }} id | Integer | Managed Object Administrative Domain's ID |
| {{ tab2 }} name | String | Managed Object Administrative Domain's Name |
| {{ tab2 }} remote_id | String | Managed Object Administrative Domain's ID in Remote System (if any) |
| {{ tab }} remote_system | Object {{ complex }} | Managed Object's [Remote System](../../../user/reference/concepts/remote-system/index.md) (if imported) |
| {{ tab2 }} id | String | Remote System's ID |
| {{ tab2 }} name | String | Remote System's Name |
| {{ tab }} remote_id | String | Managed Object's ID in Remote System (if any) |
| syslog | Object {{ complex }} | Syslog message body content |
| {{ tab }} facility | String | Syslog facility |
| {{ tab }} severity | String | Syslog severity |
| {{ tab }} message | String | Syslog message |
## Example
```json
{
"timestamp": "2022-06-20T10:07:15",
"uuid": "3d6a2405-447c-44e7-b885-7a2deb43a26a",
"collector_type": "snmptrap",
"collector": "syslog",
"address": "127.0.0.1",
"managed_object": {
"id": "450",
"bi_id": 7602684790455147111,
"name": "device-1",
"administrative_domain": {
"id": 11,
"name": "default",
"remote_system": {
"id": "596e715fc165cf1e082ea14c",
"name": "TEST"
},
"remote_id": "1"
},
"labels": [],
"remote_system": {
"id": "596e715fc165cf1e082ea14c",
"name": "TEST"
},
"remote_id": "22"
},
"syslog": {
"severity": 1,
"facility": 2,
"message": "Message 1",
}
}
\ No newline at end of file
......@@ -1292,6 +1292,7 @@ nav:
- managedobject: user/reference/mx/managedobject.md
- metrics: user/reference/mx/metrics.md
- reboot: user/reference/mx/reboot.md
- syslog: user/reference/mx/syslog.md
- Technologies:
- Group: user/reference/technology/group.md
- "Network | CGNAT Termination": user/reference/technology/Network/cgnat-termination.md
......
# syslog MX Message
`syslog` message is generated by [syslogcollector](../../../admin/reference/services/syslogcollector.md)
service when managed object Syslog is received.
## Message Headers
Message-Type
: Type of message. Always `syslog`.
Sharding-Key
: Key for consistent sharding.
Labels
: Managed Object's effective labels.
## Message Format
Message contains JSON array, containing objects of following structure
| Name | Type | Description |
| ---------- | -------- | -------------------------------------------------------------------- |
| timestamp | DateTime | ISO 8601 timestamp (i.e. `YYYY-MM-DDTHH:MM:SS`) of collected metrics |
| uuid | String | Global `UUIDv4` |
| collector_type | String | Always `syslog` |
| collector | String | Collector Pool |
| address | String | SNMP Trap Source Address |
| object | Object {{ complex }} | Managed Object |
| {{ tab }} id | String | Managed Object's ID |
| {{ tab }} bi_id | Integer | Managed Object's BI ID |
| {{ tab }} name | String | Managed Object's name |
| {{ tab }} labels | Array of String | Managed Object's labels |
| {{ tab }} administrative_domain | Object {{ complex }} | Managed Object Administrative Domain's |
| {{ tab2 }} remote_system | Object {{ complex }} | Managed Object Administrative Domain's [Remote System](../../../user/reference/concepts/remote-system/index.md) (if imported) |
| {{ tab4 }} id | String | Remote System's ID |
| {{ tab4 }} name | String | Remote System's Name |
| {{ tab2 }} id | Integer | Managed Object Administrative Domain's ID |
| {{ tab2 }} name | String | Managed Object Administrative Domain's Name |
| {{ tab2 }} remote_id | String | Managed Object Administrative Domain's ID in Remote System (if any) |
| {{ tab }} remote_system | Object {{ complex }} | Managed Object's [Remote System](../../../user/reference/concepts/remote-system/index.md) (if imported) |
| {{ tab2 }} id | String | Remote System's ID |
| {{ tab2 }} name | String | Remote System's Name |
| {{ tab }} remote_id | String | Managed Object's ID in Remote System (if any) |
| syslog | Object {{ complex }} | Syslog message body content |
| {{ tab }} facility | String | Syslog facility |
| {{ tab }} severity | String | Syslog severity |
| {{ tab }} message | String | Syslog message |
## Example
```json
{
"timestamp": "2022-06-20T10:07:15",
"uuid": "3d6a2405-447c-44e7-b885-7a2deb43a26a",
"collector_type": "snmptrap",
"collector": "syslog",
"address": "127.0.0.1",
"managed_object": {
"id": "450",
"bi_id": 7602684790455147111,
"name": "device-1",
"administrative_domain": {
"id": 11,
"name": "default",
"remote_system": {
"id": "596e715fc165cf1e082ea14c",
"name": "TEST"
},
"remote_id": "1"
},
"labels": [],
"remote_system": {
"id": "596e715fc165cf1e082ea14c",
"name": "TEST"
},
"remote_id": "22"
},
"syslog": {
"severity": 1,
"facility": 2,
"message": "Message 1",
}
}
\ No newline at end of file
......@@ -1303,6 +1303,7 @@ nav:
- managedobject: user/reference/mx/managedobject.md
- metrics: user/reference/mx/metrics.md
- reboot: user/reference/mx/reboot.md
- syslog: user/reference/mx/syslog.md
- Technologies:
- Group: user/reference/technology/group.md
- "Network | CGNAT Termination": user/reference/technology/Network/cgnat-termination.md
......
......@@ -8,6 +8,8 @@
# NOC modules
from noc.core.datastream.base import DataStream
from noc.main.models.pool import Pool
from noc.main.models.label import Label
from noc.main.models.remotesystem import RemoteSystem
from noc.sa.models.managedobject import ManagedObject
......@@ -19,11 +21,20 @@ class CfgSyslogDataStream(DataStream):
def get_object(cls, id):
mo = ManagedObject.objects.filter(id=id).values_list(
"id",
"name",
"bi_id",
"is_managed",
"pool",
"fm_pool",
"administrative_domain",
"administrative_domain__name",
"administrative_domain__remote_system",
"administrative_domain__remote_id",
"remote_system",
"remote_id",
"address",
"labels",
"effective_labels",
"syslog_source_ip",
"syslog_source_type",
"event_processing_policy",
......@@ -35,11 +46,20 @@ class CfgSyslogDataStream(DataStream):
raise KeyError()
(
mo_id,
name,
bi_id,
is_managed,
pool,
fm_pool,
adm_domain,
adm_domain_name,
adm_domain_remote_system,
adm_domain_remote_id,
remote_system,
remote_id,
address,
labels,
effective_labels,
syslog_source_ip,
syslog_source_type,
event_processing_policy,
......@@ -69,13 +89,38 @@ class CfgSyslogDataStream(DataStream):
pool = str(Pool.get_by_id(pool).name)
r = {
"id": str(mo_id),
"bi_id": str(bi_id),
"bi_id": bi_id,
"pool": pool,
"fm_pool": str(Pool.get_by_id(fm_pool).name) if fm_pool else pool,
"addresses": [],
"process_events": effective_epp,
"archive_events": effective_sap,
"managed_object": {
"id": str(mo_id),
"bi_id": str(bi_id),
"name": name,
"administrative_domain": {"id": adm_domain, "name": adm_domain_name},
"labels": [
cls.qs(ll)
for ll in Label.objects.filter(
name__in=labels, expose_datastream=True
).values_list("name")
],
},
"effective_labels": effective_labels,
"name": name,
}
if remote_system:
rs = RemoteSystem.get_by_id(remote_system)
r["managed_object"]["remote_system"] = {"id": str(rs.id), "name": rs.name}
r["managed_object"]["remote_id"] = remote_id
if adm_domain_remote_system:
rs = RemoteSystem.get_by_id(adm_domain_remote_system)
r["managed_object"]["administrative_domain"]["remote_system"] = {
"id": str(rs.id),
"name": rs.name,
}
r["managed_object"]["administrative_domain"]["remote_id"] = adm_domain_remote_id
if syslog_source_type == "m" and address:
# Managed Object's address
r["addresses"] += [str(address)]
......
......@@ -8,8 +8,10 @@
# Python modules
import datetime
from collections import defaultdict
import asyncio
import uuid
from collections import defaultdict
from dataclasses import asdict
from typing import Optional, Dict
# Third-party modules
......@@ -20,10 +22,19 @@ from noc.config import config
from noc.core.error import NOCError
from noc.core.service.fastapi import FastAPIService
from noc.core.perf import metrics
from noc.core.mx import (
MX_STREAM,
get_mx_partitions,
MX_MESSAGE_TYPE,
MX_SHARDING_KEY,
MX_LABELS,
MX_H_VALUE_SPLITTER,
)
from noc.core.ioloop.timers import PeriodicCallback
from noc.services.syslogcollector.syslogserver import SyslogServer
from noc.services.syslogcollector.datastream import SysologDataStreamClient
from noc.services.syslogcollector.sourceconfig import SourceConfig
from noc.core.ioloop.timers import PeriodicCallback
from noc.services.syslogcollector.sourceconfig import SourceConfig, ManagedObjectData
from noc.core.comp import DEFAULT_ENCODING
class SyslogCollectorService(FastAPIService):
......@@ -80,7 +91,13 @@ class SyslogCollectorService(FastAPIService):
return None
def register_message(
self, cfg: SourceConfig, timestamp: int, message: str, facility: int, severity: int
self,
cfg: SourceConfig,
timestamp: int,
message: str,
facility: int,
severity: int,
source_address: str = None,
) -> None:
"""
Spool message to be sent
......@@ -99,18 +116,16 @@ class SyslogCollectorService(FastAPIService):
stream=cfg.stream,
partition=cfg.partition,
)
now = datetime.datetime.now().replace(microsecond=0)
if cfg.archive_events and cfg.bi_id:
# Archive message
metrics["events_archived"] += 1
now = datetime.datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S")
date = ts.split(" ")[0]
self.register_metrics(
"syslog",
[
{
"date": date,
"ts": ts,
"date": now.date(),
"ts": now.isoformat(sep=" "),
"managed_object": cfg.bi_id,
"facility": facility,
"severity": severity,
......@@ -118,6 +133,36 @@ class SyslogCollectorService(FastAPIService):
}
],
)
if config.message.enable_snmptrap:
metrics["events_message"] += 1
n_partitions = get_mx_partitions()
now = datetime.datetime.now()
self.publish(
value=orjson.dumps(
{
"timestamp": now.replace(microsecond=0),
"uuid": uuid.uuid4(),
"collector_type": "syslog",
"collector": config.pool,
"address": source_address,
"managed_object": asdict(cfg.managed_object),
"syslog": {
"facility": facility,
"severity": severity,
"message": message,
},
}
),
stream=MX_STREAM,
partition=int(cfg.id) % n_partitions,
headers={
MX_MESSAGE_TYPE: b"syslog",
MX_LABELS: MX_H_VALUE_SPLITTER.join(cfg.effective_labels).encode(
DEFAULT_ENCODING
),
MX_SHARDING_KEY: str(cfg.id).encode(DEFAULT_ENCODING),
},
)
async def get_object_mappings(self):
"""
......@@ -131,7 +176,7 @@ class SyslogCollectorService(FastAPIService):
try:
await client.query(
limit=config.syslogcollector.ds_limit,
filters=["pool(%s)" % config.pool],
filters=[f"pool({config.pool})"],
block=True,
filter_policy="delete",
)
......@@ -170,9 +215,12 @@ class SyslogCollectorService(FastAPIService):
bi_id=data.get("bi_id"), # For backward compatibility
process_events=data.get("process_events", True), # For backward compatibility
archive_events=data.get("archive_events", False),
stream="events.%s" % fm_pool,
stream=f"events.{fm_pool}",
partition=int(data["id"]) % num_partitions,
effective_labels=data.get("effective_labels", []),
)
if config.message.enable_syslog and "managed_object" in data:
cfg.managed_object = ManagedObjectData(**data["managed_object"])
new_addresses = set(cfg.addresses)
# Add new addresses, update remaining
for addr in new_addresses:
......
......@@ -7,7 +7,31 @@
# Python modules
from dataclasses import dataclass
from typing import Tuple
from typing import Tuple, Optional, List
@dataclass
class RemoteSystemData(object):
id: str
name: str
@dataclass
class AdministrativeDomainData(object):
id: int
name: str
remote_system: Optional[RemoteSystemData] = None
@dataclass
class ManagedObjectData(object):
id: str
bi_id: int
name: str
administrative_domain: AdministrativeDomainData
labels: List[str] = None
remote_system: Optional[RemoteSystemData] = None
remote_id: Optional[str] = None
@dataclass
......@@ -19,3 +43,6 @@ class SourceConfig(object):
archive_events: bool
stream: str
partition: int
name: Optional[str] = None
effective_labels: List[str] = None
managed_object: Optional[ManagedObjectData] = None
......@@ -51,4 +51,6 @@ class SyslogServer(UDPServer):
# Get timestamp
ts = int(time.time())
#
self.service.register_message(cfg, ts, data, facility=priority >> 3, severity=priority & 7)
self.service.register_message(
cfg, ts, data, facility=priority >> 3, severity=priority & 7, source_address=address[0]
)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment