Commit 839bfe50 authored by Dmitry Volodin's avatar Dmitry Volodin Committed by Andrey Vertiprahov
Browse files

mx: metrics stream

parent c73b4f23
......@@ -442,6 +442,9 @@ class Config(BaseConfig):
enable_alarm = BooleanParameter(default=False)
enable_managedobject = BooleanParameter(default=False)
enable_reboot = BooleanParameter(default=False)
enable_metrics = BooleanParameter(default=False)
# Comma-separated list of metric scopes
enable_metric_scopes = ListParameter(item=StringParameter(), default=[])
class mongo(ConfigSection):
addresses = ServiceParameter(service="mongo", wait=True)
......
......@@ -69,7 +69,7 @@ class BaseService(object):
# Service name
name = None
# Leader lock name
# Only one active instace per leader lock can be active
# Only one active instance per leader lock can be active
# at given moment
# Config variables can be expanded as %(varname)s
leader_lock_name = None
......@@ -141,7 +141,10 @@ class BaseService(object):
self.publisher_start_lock = threading.Lock()
# Metrics publisher buffer
self.metrics_queue: Optional[QBuffer] = None
self.metrics_start_lock = threading.Lock()
# MX metrics publisher buffer
self.mx_metrics_queue: Optional[QBuffer] = None
self.mx_metrics_scopes: Dict[str, Callable] = {}
self.mx_partitions: int = 0
#
self.active_subscribers = 0
self.subscriber_shutdown_waiter: Optional[asyncio.Event] = None
......@@ -391,6 +394,9 @@ class BaseService(object):
await self.init_api()
#
if config.message.enable_metrics:
self.mx_partitions = await self.get_stream_partitions("message")
#
if self.use_telemetry:
self.start_telemetry_callback()
self.loop.create_task(self.on_register())
......@@ -674,7 +680,17 @@ class BaseService(object):
self.publish_queue = LiftBridgeQueue(self.loop)
self.metrics_queue = QBuffer(max_size=config.liftbridge.max_message_size)
self.loop.create_task(self.publisher())
self.loop.create_task(self.publish_metrics())
self.loop.create_task(self.publish_metrics(self.metrics_queue))
if config.message.enable_metrics:
from noc.main.models.metricstream import MetricStream
for mss in MetricStream.objects.filter():
if mss.is_active and mss.scope.table_name in set(
config.message.enable_metric_scopes
):
self.mx_metrics_scopes[mss.scope.table_name] = mss.to_mx
self.mx_metrics_queue = QBuffer(max_size=config.liftbridge.max_message_size)
self.loop.create_task(self.publish_metrics(self.mx_metrics_queue))
def publish(
self,
......@@ -880,10 +896,10 @@ class BaseService(object):
executor = self.get_executor(name)
return executor.submit(fn, *args, **kwargs)
async def publish_metrics(self):
while not (self.publish_queue.to_shutdown and self.metrics_queue.is_empty()):
async def publish_metrics(self, queue: QBuffer) -> None:
while not (self.publish_queue.to_shutdown and queue.is_empty()):
t0 = perf_counter()
for stream, partititon, chunk in self.metrics_queue.iter_slice():
for stream, partititon, chunk in queue.iter_slice():
self.publish(chunk, stream=stream, partition=partititon)
if not self.publish_queue.to_shutdown:
to_sleep = config.liftbridge.metrics_send_delay - (perf_counter() - t0)
......@@ -910,6 +926,15 @@ class BaseService(object):
self.metrics_queue.put(
stream=f"ch.{table}", partition=key % self.n_metrics_partitions, data=metrics
)
# Mirror to MX
if config.message.enable_metrics and (
not self.mx_metrics_scopes or table in self.mx_metrics_scopes
):
self.mx_metrics_queue.put(
stream="message",
partition=key % self.mx_partitions,
data=[self.mx_metrics_scopes[table](m) for m in metrics],
)
def start_telemetry_callback(self) -> None:
"""
......
......@@ -4,6 +4,8 @@ Message service configuration
## enable_alarm
Issue [alarms](../../../user/reference/mx/alarm.md) mx messages.
| | |
| -------------- | -------------------------- |
| Default value | `False` |
......@@ -13,9 +15,33 @@ Message service configuration
## enable_managedobject
Issue [managedobject](../../../user/reference/mx/managedobject.md) mx messages.
| | |
| -------------- | ---------------------------------- |
| Default value | `False` |
| YAML Path | `message.enable_managedobject` |
| Key-Value Path | `message/enable_managedobject` |
| Environment | `NOC_MESSAGE_ENABLE_MANAGEDOBJECT` |
## enable_reboot
Issue [reboot](../../../user/reference/mx/reboot.md) mx messages.
| | |
| -------------- | --------------------------- |
| Default value | `False` |
| YAML Path | `message.enable_reboot` |
| Key-Value Path | `message/enable_reboot` |
| Environment | `NOC_MESSAGE_ENABLE_REBOOT` |
## enable_metrics
Issue [metrics](../../../user/reference/mx/metrics.md) mx messages.
| | |
| -------------- | ---------------------------- |
| Default value | `False` |
| YAML Path | `message.enable_metrics` |
| Key-Value Path | `message/enable_metrics` |
| Environment | `NOC_MESSAGE_ENABLE_METRICS` |
# metrics MX Message
`metrics` message is generated by [metrics](../../../admin/reference/discovery/periodic/metrics.md)
periodic check of [discovery](../../../admin/reference/services/discovery.md) when
managed object metrics is collected.
## Message Headers
Message-Type
: Type of message. Always `metrics`.
Sharding-Key
: Key for consistent sharding.
Profile-Id
: Managed Object's Profile Id.
## Message Format
Message contains JSON array, containing objects of following structure
| Name | Type | Description |
| ---------- | -------- | -------------------------------------------------------------------- |
| ts | DateTime | ISO 8601 timestamp (i.e. `YYYY-MM-DDTHH:MM:SS`) of collected metrics |
| bi_id | Number | Managed Object's BI ID |
| scope | String | [Metric Scope](../metrics/scopes/index.md) name |
| `<metric>` | Any | Measured `<metric>` value. Depends on metric scope |
## Example
```json
[
{
"scope": "interface",
"ts": "2021-04-17T11:18:38",
"bi_id": 8904487845945788585,
"labels": ["noc::interface::Gi 1/0/1"],
"status_oper": 1,
"time_delta": 206,
"status_admin": 1,
"discards_out": 0,
"errors_in": 0,
"errors_in_delta": 0,
"packets_in": 3917,
"packets_out": 2120,
"load_out": 10491415,
"load_in": 34310007,
"speed": 1000000000,
"status_duplex": 3,
"errors_out": 0,
"errors_out_delta": 0,
"discards_in": 0
},
{
"scope": "interface",
"ts": "2021-04-17T11:18:38",
"managed_object": 8904487845945788585,
"labels": ["noc::interface::Gi 1/0/10"],
"status_oper": 1,
"time_delta": 206,
"status_admin": 1,
"discards_out": 0,
"errors_in": 0,
"errors_in_delta": 0,
"packets_in": 12631,
"packets_out": 8230,
"load_out": 16998124,
"load_in": 67189306,
"speed": 1000000000,
"status_duplex": 3,
"errors_out": 0,
"errors_out_delta": 0,
"discards_in": 0
}
]
```
......@@ -1277,6 +1277,7 @@ nav:
- Overview: user/reference/mx/index.md
- alarm: user/reference/mx/alarm.md
- managedobject: user/reference/mx/managedobject.md
- metrics: user/reference/mx/metrics.md
- reboot: user/reference/mx/reboot.md
- Technologies:
- Group: user/reference/technology/group.md
......
......@@ -19,3 +19,21 @@ Message service configuration
| YAML Path | `message.enable_managedobject` |
| Key-Value Path | `message/enable_managedobject` |
| Environment | `NOC_MESSAGE_ENABLE_MANAGEDOBJECT` |
## enable_reboot
| | |
| -------------- | --------------------------- |
| Default value | `False` |
| YAML Path | `message.enable_reboot` |
| Key-Value Path | `message/enable_reboot` |
| Environment | `NOC_MESSAGE_ENABLE_REBOOT` |
## enable_metrics
| | |
| -------------- | ---------------------------- |
| Default value | `False` |
| YAML Path | `message.enable_metrics` |
| Key-Value Path | `message/enable_metrics` |
| Environment | `NOC_MESSAGE_ENABLE_METRICS` |
# metrics MX Message
`metrics` message is generated by [metrics](../../../admin/reference/discovery/periodic/metrics.md)
periodic check of [discovery](../../../admin/reference/services/discovery.md) when
managed object metrics is collected.
## Message Headers
Message-Type
: Type of message. Always `metrics`.
Sharding-Key
: Key for consistent sharding.
Profile-Id
: Managed Object's Profile Id.
## Message Format
Message contains JSON array, containing objects of following structure
| Name | Type | Description |
| ---------- | -------- | -------------------------------------------------------------------- |
| ts | DateTime | ISO 8601 timestamp (i.e. `YYYY-MM-DDTHH:MM:SS`) of collected metrics |
| bi_id | Number | Managed Object's BI ID |
| scope | String | [Metric Scope](../metrics/scopes/index.md) name |
| `<metric>` | Any | Measured `<metric>` value. Depends on metric scope |
## Example
```json
[
{
"scope": "interface",
"ts": "2021-04-17T11:18:38",
"bi_id": 8904487845945788585,
"labels": ["noc::interface::Gi 1/0/1"],
"status_oper": 1,
"time_delta": 206,
"status_admin": 1,
"discards_out": 0,
"errors_in": 0,
"errors_in_delta": 0,
"packets_in": 3917,
"packets_out": 2120,
"load_out": 10491415,
"load_in": 34310007,
"speed": 1000000000,
"status_duplex": 3,
"errors_out": 0,
"errors_out_delta": 0,
"discards_in": 0
},
{
"scope": "interface",
"ts": "2021-04-17T11:18:38",
"managed_object": 8904487845945788585,
"labels": ["noc::interface::Gi 1/0/10"],
"status_oper": 1,
"time_delta": 206,
"status_admin": 1,
"discards_out": 0,
"errors_in": 0,
"errors_in_delta": 0,
"packets_in": 12631,
"packets_out": 8230,
"load_out": 16998124,
"load_in": 67189306,
"speed": 1000000000,
"status_duplex": 3,
"errors_out": 0,
"errors_out_delta": 0,
"discards_in": 0
}
]
```
......@@ -1277,6 +1277,7 @@ nav:
- Overview: user/reference/mx/index.md
- alarm: user/reference/mx/alarm.md
- managedobject: user/reference/mx/managedobject.md
- metrics: user/reference/mx/metrics.md
- reboot: user/reference/mx/reboot.md
- Technologies:
- Group: user/reference/technology/group.md
......
# ----------------------------------------------------------------------
# MetricsStream model
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python Modules
from typing import Dict, Any, Callable
from threading import Lock
# Third-party modules
from mongoengine.document import Document, EmbeddedDocument
from mongoengine.fields import (
StringField,
BooleanField,
ReferenceField,
ListField,
EmbeddedDocumentField,
)
# NOC Modules
from noc.core.mongo.fields import PlainReferenceField
from noc.pm.models.metricscope import MetricScope
from noc.pm.models.metrictype import MetricType
transform_code = {}
code_lock = Lock()
class StreamField(EmbeddedDocument):
meta = {"strict": False}
metric_type = ReferenceField(MetricType, required=True)
external_alias = StringField(default=False)
expose_mx = BooleanField(default=True)
class MetricStream(Document):
meta = {
"collection": "metricstream",
"strict": False,
"auto_create_index": False,
}
scope = PlainReferenceField(MetricScope, unique=True)
is_active = BooleanField(default=True)
# Metric scope reference
fields = ListField(EmbeddedDocumentField(StreamField))
def __str__(self):
return f"{self.scope.name or ''}"
# return self.scope
def _get_transform_code(self) -> str:
r = ["def q_mx(input):"]
if not self.fields:
# No path
r += [" return {}"]
return "\n".join(r)
r += [
" if not input:",
" return {}",
" v = {",
f' "scope": "{self.scope.name}",',
' "bi_id": input["managed_object"],',
' "labels": input["labels"],',
" }",
' if "ts" in input:',
' v["ts"] = input["ts"].replace(" ", "T")',
]
for f in self.fields:
if not f.expose_mx:
continue
r += [
f' v["{f.external_alias or f.metric_type.field_name}"] = input.get("{f.metric_type.field_name}")'
]
r += [" return v"]
return "\n".join(r)
def _get_transform(self) -> Callable[[Dict[str, Any]], Dict[str, Any]]:
"""
Generate label -> path function for scope
:return:
"""
fn = transform_code.get(self.scope.name)
if not fn:
with code_lock:
fn = transform_code.get(self.scope.name)
if fn:
return fn
# Compile
code = self._get_transform_code()
eval(compile(code, "<string>", "exec"))
fn = locals()["q_mx"]
transform_code[self.scope.name] = fn
return fn
def to_mx(self, m: Dict[str, Any]) -> Dict[str, Any]:
return self._get_transform()(m)
......@@ -106,6 +106,7 @@ _MODELS = {
"main.Handler": "noc.main.models.handler.Handler",
"main.Language": "noc.main.models.language.Language",
"main.MessageRoute": "noc.main.models.messageroute.MessageRoute",
"main.MetricStream": "noc.main.models.metricstream.MetricStream",
"main.MIMEType": "noc.main.models.mimetype.MIMEType",
"main.NotificationGroup": "noc.main.models.notificationgroup.NotificationGroup",
"main.NotificationGroupOther": "noc.main.models.notificationgroup.NotificationGroupOther",
......
......@@ -101,7 +101,7 @@ class LabelItem(EmbeddedDocument):
return r
@on_delete_check(check=[("pm.MetricType", "scope")])
@on_delete_check(check=[("pm.MetricType", "scope"), ("main.MetricStream", "scope")])
@on_save
class MetricScope(Document):
meta = {
......
# ----------------------------------------------------------------------
# main.metricstream application
# ----------------------------------------------------------------------
# Copyright (C) 2007-2018 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# NOC modules
from noc.lib.app.extdocapplication import ExtDocApplication
from noc.main.models.metricstream import MetricStream
from noc.core.translation import ugettext as _
class MetricStreamApplication(ExtDocApplication):
"""
CHPolicy application
"""
title = "MetricStream"
menu = [_("Setup"), _("Metric Stream")]
model = MetricStream
//---------------------------------------------------------------------
// main.metricstream application
//---------------------------------------------------------------------
// Copyright (C) 2007-2021 The NOC Project
// See LICENSE for details
//---------------------------------------------------------------------
console.debug("Defining NOC.main.metricstream.Application");
Ext.define("NOC.main.metricstream.Application", {
extend: "NOC.core.ModelApplication",
requires: [
"NOC.main.metricstream.Model",
"NOC.pm.metricscope.LookupField",
"NOC.pm.metrictype.LookupField",
"Ext.ux.form.GridField"
],
model: "NOC.main.metricstream.Model",
initComponent: function() {
var me = this;
Ext.apply(me, {
columns: [
{
text: __("Scope"),
dataIndex: "scope",
width: 200
},
{
text: __("Active"),
dataIndex: "is_active",
width: 25,
renderer: NOC.render.Bool
}
],
fields: [
{
name: "scope",
xtype: "pm.metricscope.LookupField",
fieldLabel: __("Scope"),
allowBlank: false
},
{
name: "is_active",
xtype: "checkbox",
boxLabel: __("Active")
},
{
name: "fields",
xtype: "gridfield",
fieldLabel: __("Fields"),
columns: [
{
text: __("Metric Type"),
dataIndex: "metric_type",
width: 150,
editor: {
xtype: "pm.metrictype.LookupField"
},
renderer: NOC.render.Lookup("metric_type")
},
{
dataIndex: "external_alias",
text: __("External Alias"),
width: 150,
editor: "textfield"
},
{
text: __("Expose MX"),
dataIndex: "expose_mx",
width: 50,
renderer: NOC.render.Bool,
editor: "checkbox"
}
]
}
]
});
me.callParent();
},
filters: [
{
title: __("By Active"),
name: "is_active",
ftype: "boolean"
}
]
});
//---------------------------------------------------------------------
// main.metricstream Model
//---------------------------------------------------------------------
// Copyright (C) 2007-2021 The NOC Project
// See LICENSE for details
//---------------------------------------------------------------------
console.debug("Defining NOC.main.metricstream.Model");
Ext.define("NOC.main.metricstream.Model", {
extend: "Ext.data.Model",
rest_url: "/main/metricstream/",
fields: [
{
name: "id",
type: "string"
},
{
name: "scope",
type: "string"
},
{
name: "scope__label",
type: "string",
persist: false
},
{
name: "is_active",
type: "boolean",
defaultValue: true
},
{
name: "fields",
type: "auto"
},
]
});
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment