Commit 10dfe52b authored by Andrey Vertiprahov's avatar Andrey Vertiprahov
Browse files

Merge branch 'fix-avs-reportdatasource-filter' into 'master'

Fix AggregatingMergeTree engine to new format.

See merge request !5446
parents 07177c35 5750ca79
Pipeline #32144 passed with stages
in 96 minutes and 1 second
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
# Python modules # Python modules
import datetime import datetime
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import List, Optional, Dict, Iterable, Any from typing import List, Optional, Dict, Iterable, Any, Tuple
import time import time
import re import re
import heapq import heapq
...@@ -344,20 +344,23 @@ class ReportDataSource(object): ...@@ -344,20 +344,23 @@ class ReportDataSource(object):
end: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None,
interval: Optional[str] = None, interval: Optional[str] = None,
max_intervals: Optional[int] = None, max_intervals: Optional[int] = None,
filters: Optional[List[str]] = None, filters: Optional[List[Dict[str, str]]] = None,
rows: Optional[int] = None, rows: Optional[int] = None,
groups: Optional[List[str]] = None,
): ):
self.fields = fields self.query_fields: List[str] = fields
self.fields: Dict[str, ReportField] = self.get_fields(fields) # OrderedDict
self.objectids = objectids self.objectids = objectids
self.allobjectids = allobjectids self.allobjectids: bool = allobjectids
self.filters = filters or [] self.filters: List[Dict[str, str]] = filters or []
self.interval = interval self.interval: str = interval
self.max_intervals = max_intervals self.max_intervals: int = max_intervals
self.rows = rows self.rows: int = rows
self.groups: List[str] = groups or []
if self.TIMEBASED and not start: if self.TIMEBASED and not start:
raise ValueError("Timebased Report required start param") raise ValueError("Timebased Report required start param")
self.end = end or datetime.datetime.now() self.end: datetime.datetime = end or datetime.datetime.now()
self.start = start or self.end - datetime.timedelta(days=1) self.start: datetime.datetime = start or self.end - datetime.timedelta(days=1)
@classmethod @classmethod
def get_config(cls) -> ReportConfig: def get_config(cls) -> ReportConfig:
...@@ -377,6 +380,15 @@ class ReportDataSource(object): ...@@ -377,6 +380,15 @@ class ReportDataSource(object):
dataretentiondays=1, dataretentiondays=1,
) )
@classmethod
def get_fields(cls, fields):
r = {}
for query_f in fields:
for f in cls.FIELDS:
if f.name == query_f:
r[query_f] = f
return r
def iter_data(self): def iter_data(self):
pass pass
...@@ -413,47 +425,69 @@ class CHTableReportDataSource(ReportDataSource): ...@@ -413,47 +425,69 @@ class CHTableReportDataSource(ReportDataSource):
"MONTH": "toMonth(toDateTime(ts))", "MONTH": "toMonth(toDateTime(ts))",
} }
def get_group_interval(self) -> str: def get_group(self) -> Tuple[List[str], List[str]]:
""" """
If set max_intervals - use variants interval If set max_intervals - use variants interval
:return: :return:
""" """
select, group = [], []
if self.max_intervals: if self.max_intervals:
minutes = ((self.end - self.start).total_seconds() / 60) / self.max_intervals minutes = ((self.end - self.start).total_seconds() / 60) / self.max_intervals
return f"toStartOfInterval(ts, INTERVAL {minutes} minute)" select += [f"toStartOfInterval(ts, INTERVAL {minutes} minute) AS ts"]
elif self.interval not in self.group_intervals: group += ["ts"]
elif self.interval and self.interval not in self.group_intervals:
raise NotImplementedError("Not supported interval") raise NotImplementedError("Not supported interval")
return self.group_intervals[self.interval] elif self.interval in self.group_intervals:
select += [f"{self.group_intervals[self.interval]} as ts"]
group += ["ts"]
return select, group
def get_custom_conditions(self) -> Dict[str, List[str]]: def get_custom_conditions(self) -> Dict[str, List[str]]:
if not self.filters: if not self.filters:
return {} return {}
where, having = [], []
for ff in self.filters:
f_name = ff["name"]
for s in self.FIELDS:
if s.name == f_name:
f_name = s.metric_name
break
op = ff.get("op", "IN")
if op == "IN":
f_value = f'{tuple(ff["value"])}'
else:
f_value = ff["value"][0]
q = f"{f_name} {op} {f_value}"
if ff["name"] in self.fields and self.fields[ff["name"]].group:
where += [q]
else:
having += [q]
return { return {
"q_where": [ "q_where": where,
f'{f} IN ({", ".join([str(c) for c in self.filters[f]])})' for f in self.filters "q_having": having,
]
} }
def get_query_ch(self, from_date: datetime.datetime, to_date: datetime.datetime) -> str: def get_query_ch(self, from_date: datetime.datetime, to_date: datetime.datetime) -> str:
ts_from_date = time.mktime(from_date.timetuple()) ts_from_date = time.mktime(from_date.timetuple())
ts_to_date = time.mktime(to_date.timetuple()) ts_to_date = time.mktime(to_date.timetuple())
select, group = self.get_group()
query_map = { query_map = {
"q_select": [], "q_select": select or [],
"q_group": group or [],
"q_where": [ "q_where": [
f"(date >= toDate({ts_from_date})) AND (ts >= toDateTime({ts_from_date}) AND ts <= toDateTime({ts_to_date})) %s", # objectids_filter f"(date >= toDate({ts_from_date})) AND (ts >= toDateTime({ts_from_date}) AND ts <= toDateTime({ts_to_date})) %s", # objectids_filter
], ],
} }
ts = self.get_group_interval()
query_map["q_select"] += [f"{ts} AS ts"] for ff in self.fields:
query_map["q_group"] = ["ts"] fc = self.fields[ff]
# if self.interval == "HOUR": if fc.group and fc.name in self.groups:
# query_map["q_select"] += ["toStartOfHour(toDateTime(ts)) AS ts"] # query_map["q_select"] += [f"{f.metric_name} as {f.name}"]
# query_map["q_group"] = ["ts"] query_map["q_group"] += [f"{fc.metric_name}"]
for f in self.FIELDS: query_map["q_select"] += [f"{fc.metric_name} as {fc.name}"]
if f.name not in self.fields: if self.interval:
continue query_map["q_order_by"] = ["ts"]
query_map["q_select"] += [f"{f.metric_name} as {f.name}"]
query_map["q_order_by"] = ["ts"]
custom_conditions = self.get_custom_conditions() custom_conditions = self.get_custom_conditions()
if "q_where" in custom_conditions: if "q_where" in custom_conditions:
query_map["q_where"] += custom_conditions["q_where"] query_map["q_where"] += custom_conditions["q_where"]
...@@ -464,9 +498,9 @@ class CHTableReportDataSource(ReportDataSource): ...@@ -464,9 +498,9 @@ class CHTableReportDataSource(ReportDataSource):
f"FROM {self.get_table()}", f"FROM {self.get_table()}",
f'WHERE {" AND ".join(query_map["q_where"])}', f'WHERE {" AND ".join(query_map["q_where"])}',
] ]
if "q_group" in query_map: if "q_group" in query_map and query_map["q_group"]:
query += [f'GROUP BY {",".join(query_map["q_group"])}'] query += [f'GROUP BY {",".join(query_map["q_group"])}']
if "q_having" in query_map: if "q_having" in query_map and query_map["q_having"]:
query += [f'HAVING {" AND ".join(query_map["q_having"])}'] query += [f'HAVING {" AND ".join(query_map["q_having"])}']
if "q_order_by" in query_map: if "q_order_by" in query_map:
query += [f'ORDER BY {",".join(query_map["q_order_by"])}'] query += [f'ORDER BY {",".join(query_map["q_order_by"])}']
...@@ -494,6 +528,6 @@ class CHTableReportDataSource(ReportDataSource): ...@@ -494,6 +528,6 @@ class CHTableReportDataSource(ReportDataSource):
fields = [] fields = []
if self.interval: if self.interval:
fields += ["ts"] fields += ["ts"]
fields += [f.name for f in self.FIELDS if f.name in self.fields] fields += list(self.fields.keys())
for row in self.do_query(): for row in self.do_query():
yield dict(zip(fields, row)) yield dict(zip(fields, row))
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
# See LICENSE for details # See LICENSE for details
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
# NOC Modules
from .base import CHTableReportDataSource, ReportField from .base import CHTableReportDataSource, ReportField
...@@ -15,18 +16,26 @@ class ReportAvailability(CHTableReportDataSource): ...@@ -15,18 +16,26 @@ class ReportAvailability(CHTableReportDataSource):
TABLE_NAME = "noc.ping" TABLE_NAME = "noc.ping"
FIELDS = [ FIELDS = [
ReportField( ReportField(
name="ping_rtt_avg", name="managed_object",
label="ping_rtt", label="Managed Object BIID",
description="",
unit="INT",
metric_name="managed_object",
group=True,
),
ReportField(
name="ping_rtt",
label="Ping RTT (avg)",
description="", description="",
unit="MILLISECONDS", unit="MILLISECONDS",
metric_name="avg(rtt)", metric_name="avg(rtt)",
), ),
ReportField( ReportField(
name="ping_attemtps_max", name="ping_attempts",
label="ping_rtt", label="Ping Attempts",
description="", description="",
unit="COUNT", unit="COUNT",
metric_name="max(rtt)", metric_name="max(attempts)",
), ),
] ]
TIMEBASED = True TIMEBASED = True
# ----------------------------------------------------------------------
# ReportInterfaceMetrics datasource
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# NOC Modules
from .base import CHTableReportDataSource, ReportField
class ReportInterfaceMetrics(CHTableReportDataSource):
name = "reportinterfacemetrics"
description = "Query Metrics from Interface table"
TABLE_NAME = "noc.interface"
FIELDS = [
ReportField(
name="managed_object",
label="Managed Object BIID",
description="",
unit="INT",
metric_name="managed_object",
group=True,
),
ReportField(
name="iface_name",
label="Interface Name",
description="",
unit="STRING",
metric_name="interface",
group=True,
),
ReportField(
name="interface_profile",
label="Interface Profile Name",
description="",
unit="STRING",
metric_name="dictGetString('interfaceattributes', 'profile', (managed_object, interface))",
group=True,
),
ReportField(
name="iface_description",
label="Interface Description",
description="",
unit="STRING",
metric_name="dictGetString('interfaceattributes','description' , (managed_object, interface))",
group=True,
),
ReportField(
name="iface_speed",
label="Interface Speed",
description="",
unit="BIT/S",
metric_name="if(max(speed) = 0, dictGetUInt64('interfaceattributes', 'in_speed', (managed_object, interface)), max(speed))",
),
ReportField(
name="load_in_perc",
label="Load In (90% percentile)",
description="",
unit="BIT/S",
metric_name="round(quantile(0.90)(load_in), 0)",
),
ReportField(
name="load_in_p",
label="Load In (% Bandwith)",
description="",
unit="BIT/S",
metric_name="replaceOne(toString(round(quantile(0.90)(load_in) / if(max(speed) = 0, dictGetUInt64('interfaceattributes', 'in_speed', (managed_object, arrayStringConcat(path))), max(speed)), 4) * 100), '.', ',')",
),
ReportField(
name="load_out_perc",
label="Load Out (90% percentile)",
description="",
unit="BIT/S",
metric_name="round(quantile(0.90)(load_out), 0)",
),
ReportField(
name="load_out_p",
label="Load Out (% Bandwith)",
description="",
unit="BYTE",
metric_name="replaceOne(toString(round(quantile(0.90)(load_out) / if(max(speed) = 0, dictGetUInt64('interfaceattributes', 'in_speed', (managed_object, arrayStringConcat(path))), max(speed)), 4) * 100), '.', ',')",
),
ReportField(
name="octets_in_sum",
label="Traffic In (Sum by period in MB)",
description="",
unit="MBYTE",
metric_name="round((sum(load_in * time_delta) / 8) / 1048576)",
),
ReportField(
name="octets_out_sum",
label="Traffic Out (Sum by period in MB)",
description="",
unit="MBYTE",
metric_name="round((sum(load_out * time_delta) / 8) / 1048576)",
),
ReportField(
name="errors_in",
label="Errors In (packets/s)",
description="",
unit="PKT/S",
metric_name="quantile(0.90)(errors_in)",
),
ReportField(
name="errors_in_sum",
label="Errors In (Summary)",
description="",
unit="PKT",
metric_name="sum(errors_in_delta)",
),
ReportField(
name="errors_out",
label="Errors Out (packets/s)",
description="",
unit="PKT/S",
metric_name="quantile(0.90)(errors_out)",
),
ReportField(
name="errors_out_sum",
label="Errors Out (Summary)",
description="",
unit="PKT",
metric_name="sum(errors_out_delta)",
),
ReportField(
name="discards_in",
label="Discards In (packets/s)",
description="",
unit="PKT/S",
metric_name="quantile(0.90)(discards_in)",
),
ReportField(
name="discards_in_sum",
label="Discards In (Summary)",
description="",
unit="PKT",
metric_name="sum(discards_in_delta)",
),
ReportField(
name="discards_out",
label="Discards Out (packets/s)",
description="",
unit="PKT/S",
metric_name="quantile(0.90)(discards_out)",
),
ReportField(
name="discards_out_sum",
label="Discards Out (Summary)",
description="",
unit="PKT",
metric_name="sum(discards_out_delta)",
),
ReportField(
name="interface_flap",
label="Interface Flap count",
description="",
unit="COUNT",
metric_name="countEqual(arrayMap((a,p) -> a + p, arrayPushFront(groupArray(status_oper), groupArray(status_oper)[1]), arrayPushBack(groupArray(status_oper), groupArray(status_oper)[-1])), 1)",
),
ReportField(
name="status_oper",
label="Operational status",
description="",
unit="ENUM",
metric_name="anyLast(status_oper)",
),
ReportField(
name="lastchange",
label="Interface Last Change (days)",
description="",
unit="DAY",
metric_name="anyLast(lastchange)",
),
]
TIMEBASED = True
# ----------------------------------------------------------------------
# ReportObjectMetrics datasource
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from collections import defaultdict
# NOC Modules
from .base import CHTableReportDataSource, ReportField
class ReportObjectMetrics(CHTableReportDataSource):
name = "reportobjectmetrics"
description = "1"
TABLE_NAME = "noc.cpu"
FIELDS = [
ReportField(
name="managed_object",
label="Managed Object BIID",
description="",
unit="INT",
metric_name="managed_object",
group=True,
),
ReportField(
name="labels",
label="Labels",
description="",
unit="INT",
metric_name="labels",
group=True,
),
ReportField(
name="cpu_usage",
label="CPU Usage",
description="",
unit="%",
metric_name="avg(usage)",
),
ReportField(
name="memory_usage",
label="Memory Usage",
description="",
unit="%",
metric_name="max(usage)",
),
]
TIMEBASED = True
def get_table(self):
if "cpu_usage" in self.fields:
return "noc.cpu"
elif "memory_usage" in self.fields:
return "noc.memory"
def do_query(self):
"""
Run every query as own, and merge results - iter self.fields
:return:
"""
f_date, to_date = self.start, self.end
result = defaultdict(list)
client = self.get_client()
key_fields = [field for field in self.fields if self.fields[field].group]
for field in list(self.fields):
if field in key_fields:
continue
self.fields = self.get_fields(key_fields + [field])
query = self.get_query_ch(f_date, to_date)
# print("Query: %s", query)
if self.allobjectids or not self.objectids:
for row in client.execute(query % ""):
if row[0] not in result:
result[row[0]] = row
else:
result[row[0]] += row[1:]
else:
# chunked query
ids = self.objectids
while ids:
chunk, ids = ids[: self.CHUNK_SIZE], ids[self.CHUNK_SIZE :]
for row in client.execute(query % f" AND {self.get_object_filter(chunk)}"):
if row[0] not in result:
result[row[0]] = row
else:
result[row[0]] += row[1:]
self.fields = self.get_fields(self.query_fields)
for v in result.values():
yield v
...@@ -22,7 +22,8 @@ Ext.define("NOC.main.metricstream.Application", { ...@@ -22,7 +22,8 @@ Ext.define("NOC.main.metricstream.Application", {
{ {
text: __("Scope"), text: __("Scope"),
dataIndex: "scope", dataIndex: "scope",
width: 200 width: 200,
renderer: NOC.render.Lookup("scope")
}, },
{ {
text: __("Active"), text: __("Active"),
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment