Commit a9a46fac authored by Andrey Vertiprahov's avatar Andrey Vertiprahov
Browse files

Merge branch 'noc-1825' into 'master'

noc/noc#1825 Better variable configuration on Grafana JSON Datasource.

See merge request noc/noc!6282
parents 0d74ff8e 7df0585d
......@@ -7,9 +7,10 @@
# Python modules
import datetime
import operator
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional, Iterable, Union, Set, Any
from collections import defaultdict
import operator
# Third-party modules
import orjson
......@@ -43,14 +44,14 @@ SQL = """
%s
FROM
(
SELECT (intDiv(toUInt32(ts), 100) * 100) * 1000 as t,
SELECT %s as t,
%s as target,
%s
FROM
%s
WHERE
%s
GROUP BY labels, t ORDER BY t ASC
GROUP BY %s, t ORDER BY t ASC
)
GROUP BY target FORMAT JSON
"""
......@@ -58,43 +59,85 @@ SQL = """
router = APIRouter()
@dataclass
class QueryConfig(object):
metric_type: str
query_expression: str
alias: Optional[str] = None
aggregate_function: str = "avg"
if_combinator_condition: str = ""
description: str = ""
class JsonDSAPI(object):
"""
Backend for SimpodJson Grafana plugin
"""
QUERY_CONFIGS: List["QueryConfig"] = None
openapi_tags = ["api", "grafanads"]
api_name: str = None
query_payload = None
query_response_model = List[TargetResponseItem]
variable_payload = None
allow_interval_limit: bool = True
def __init__(self, router: APIRouter):
self.service = get_service()
self.logger = self.service.logger
self.router = router
self.query_config: Dict[str, "QueryConfig"] = self.load_query_config()
self.setup_routes()
@classmethod
def load_query_config(cls):
"""
Load Additional QueryConfig on API
:return:
"""
r = {}
for qc in cls.QUERY_CONFIGS or []:
r[qc.alias or qc.metric_type] = qc
return r
async def api_grafanads_search(
self, req: Dict[str, str], user: User = Depends(get_current_user)
):
"""
Method for /search endpoint on datasource
:param req:
:param user:
:return:
"""
self.logger.info("Search Request: %s", req)
return self.get_metrics()
async def api_grafanads_variable(
self, req: VariableRequest, user: User = Depends(get_current_user)
):
"""
Method for /variable endpoint on datasource
:param req:
:param user:
:return:
"""
self.logger.info("Variable Request: %s", req)
if not self.variable_payload:
return []
payload = parse_obj_as(self.variable_payload, req.payload)
h = getattr(self, f"var_{payload.target or 'default'}", None)
h = getattr(payload, "get_variables", None)
if not h:
return []
return h(payload, user)
return h(user)
async def api_grafanads_annotations(
self, req: AnnotationRequest, user: User = Depends(get_current_user)
):
"""
Method for /annotations endpoint on datasource
:param req:
:param user:
:return:
"""
self.logger.debug("Annotation Request: %s", req)
start, end = self.convert_ts_range(req)
return list(
......@@ -110,10 +153,10 @@ class JsonDSAPI(object):
) -> Iterable["Annotation"]:
...
@staticmethod
def get_metrics() -> List[Dict[str, str]]:
@classmethod
def get_metrics(cls) -> List[Dict[str, str]]:
"""
Return Available Metrics
Return Available Metrics for datasource
:return:
"""
r = []
......@@ -124,31 +167,29 @@ class JsonDSAPI(object):
"value": str(mt.id),
}
)
# Append Query Configs
for qc in cls.QUERY_CONFIGS or []:
if qc.alias:
r += [{"text": qc.description or qc.alias, "value": qc.alias}]
return r
@staticmethod
def clean_query_func(field_name, function) -> Optional[str]:
def clean_func_expr(field_name, function: Optional[str] = None) -> str:
"""
Return function expression for field
:param field_name:
:param function:
:return:
"""
if not function:
return field_name
if function.lower() in {"argmax", "argmin"}:
return f"{function}({field_name}, t)"
return f"{function}({field_name})"
async def api_grafanads_query(self, req: QueryRequest, user: User = Depends(get_current_user)):
"""
SELECT
target,
%s
FROM
(
SELECT (intDiv(toUInt32(ts), 100) * 100) * 1000 as t,
name as target,
%s
FROM
%s
WHERE
%s
GROUP BY name, t ORDER BY t ASC
)
GROUP BY name FORMAT JSON
Method for /query endpoint on datasource
:param req:
:param user:
......@@ -157,33 +198,34 @@ class JsonDSAPI(object):
self.logger.info("Query Request: %s", req)
connect = connection()
r = []
# TS Filter
ts_filter = self.get_ts_filter(req)
targets: Dict[Tuple[str, str], List[MetricType]] = defaultdict(list)
targets: Dict[Tuple[str, str], List["QueryConfig"]] = defaultdict(list)
# Merge targets to Metric Scope and Filter
for target in req.targets:
metric_type = MetricType.get_by_id(target.target)
if target.target in self.query_config:
query_config = self.query_config[target.target]
metric_type = MetricType.get_by_name(query_config.metric_type)
else:
metric_type = MetricType.get_by_id(target.target)
query_config = QueryConfig(
metric_type=metric_type.name, query_expression=metric_type.field_name
)
if not metric_type:
self.logger.error("[%s] Unknown MetricType: %s", target.target, query_config)
raise HTTPException(status_code=500, detail="Unknown MetricType in QueryConfig")
# Target Filter
# {"managed_object": "3780187837837487731"}
mt_filter = self.get_metric_type_filter(target.payload, metric_type, user=user)
query_field = f"avg({metric_type.field_name})"
query_mt_condition = self.get_query_metric_type_condition(
target.payload, metric_type, user=user
)
if target.payload and "metric_function" in target.payload:
# Alternative - target with function suffix, percentile ?
query_field = self.clean_query_func(
metric_type.field_name, target.payload["agg_func"]
)
targets[(metric_type.scope.table_name, mt_filter)] += [(metric_type, query_field)]
query_config.aggregate_function = target.payload["agg_func"]
targets[(metric_type.scope.table_name, query_mt_condition)] += [query_config]
# Query
for (table_name, mt_filter), metric_types in targets.items():
# avg(usage) as `CPUUsage`
query = SQL % (
", ".join(f"groupArray((`{mt.name}`, t)) AS `{mt.name}`" for mt, _ in metric_types),
self.get_target_format(table_name),
", ".join(f"{query_field} AS `{mt.name}`" for mt, query_field in metric_types),
table_name,
ts_filter + (f" AND {mt_filter}" if mt_filter else ""),
)
self.logger.debug("Do query: %s", query)
for (table_name, query_condition), query_configs in targets.items():
# Format query
query = self.get_query(req, table_name, query_condition, query_configs)
self.logger.info("Do query: %s", query)
try:
result = connect.execute(query, return_raw=True)
except ClickhouseError as e:
......@@ -192,10 +234,72 @@ class JsonDSAPI(object):
r += self.format_result(
orjson.loads(result),
result_type=req.result_type,
request_metrics={mt.name for mt, _ in metric_types},
request_metrics={qc.alias or qc.metric_type for qc in query_configs},
)
return r
def get_query(
self,
req: QueryRequest,
table_name: str,
query_condition: str,
query_configs: List["QueryConfig"],
) -> str:
"""
Return Query Expression for Clickhouse
SELECT
target,
%s
FROM
(
SELECT (intDiv(toUInt32(ts), 100) * 100) * 1000 as t,
name as target,
%s
FROM
%s
WHERE
%s
GROUP BY target, t ORDER BY t ASC
)
GROUP BY target FORMAT JSON
:param req:
:param table_name:
:param query_condition:
:param query_configs:
:return:
"""
# TS Filter
timestamp_condition: str = self.get_ts_condition(req)
s_fields = []
for qc in query_configs:
if qc.if_combinator_condition:
# groupArrayIf((t, li), traffic_class = '') AS lii,
s_fields += [
f"groupArrayIf((`{qc.metric_type}`, t), {qc.if_combinator_condition}) AS `{qc.alias or qc.metric_type}`"
]
else:
s_fields += [
f"groupArray((`{qc.metric_type}`, t)) AS `{qc.alias or qc.metric_type}`"
]
target_expr, group_by_expr = self.get_target_expression(table_name)
timestamp_expr = "(intDiv(toUInt32(ts), 100) * 100) * 1000"
if self.allow_interval_limit and req.interval.endswith("m"):
timestamp_expr = f"(intDiv(toUInt32(toStartOfInterval(ts, toIntervalMinute({req.interval[:-1]}))), 100) * 100) * 1000"
return SQL % (
", ".join(s_fields),
timestamp_expr,
target_expr,
", ".join(
f"{self.clean_func_expr(qc.query_expression, qc.aggregate_function)} AS `{qc.metric_type}`"
for qc in query_configs
),
table_name,
timestamp_condition + (f" AND {query_condition}" if query_condition else ""),
group_by_expr,
)
@classmethod
def format_result(
cls, result, result_type: str = "timeseries", request_metrics: Set["str"] = None
......@@ -221,13 +325,13 @@ class JsonDSAPI(object):
return r
@staticmethod
def get_target_format(table_name: str = None) -> str:
def get_target_expression(table_name: str = None) -> Tuple[str, Optional[str]]:
"""
Getting Target name format for table
:param table_name:
:return:
"""
return "arrayStringConcat(labels,'/')"
return "arrayStringConcat(labels,'/')", "target"
@staticmethod
def convert_ts_range(req) -> Tuple[datetime.datetime, datetime.datetime]:
......@@ -247,7 +351,7 @@ class JsonDSAPI(object):
return start, end
@classmethod
def get_ts_filter(cls, req: QueryRequest) -> str:
def get_ts_condition(cls, req: QueryRequest) -> str:
"""
Convert Range params to where expression
......@@ -301,7 +405,7 @@ class JsonDSAPI(object):
required_columns.add(field)
return key_fields, required_columns, columns
def get_metric_type_filter(
def get_query_metric_type_condition(
self,
payload: Dict[str, Union[str, List[str]]],
metric_type: Optional["MetricType"] = None,
......@@ -327,8 +431,8 @@ class JsonDSAPI(object):
if kf_name not in payload:
continue
values = payload[kf_name]
if isinstance(values, str):
values = [values]
if isinstance(values, (int, str)):
values = [str(values)]
q_values = []
for value in values:
if not value.isdigit():
......@@ -396,26 +500,26 @@ class JsonDSAPI(object):
response_model=List[SearchResponseItem],
tags=self.openapi_tags,
name=f"{self.api_name}_search",
description=f"Getting available metrics ",
description="Getting available metrics",
)
self.router.add_api_route(
path=f"/api/grafanads/{self.api_name}/query",
endpoint=self.api_grafanads_query,
methods=["POST"],
response_model=List[TargetResponseItem],
response_model=self.query_response_model,
tags=self.openapi_tags,
name=f"{self.api_name}_query",
description=f"Getting target datapoints",
description="Getting target datapoints",
)
# Backward compatible
self.router.add_api_route(
path=f"/api/grafanads/annotations",
path="/api/grafanads/annotations",
endpoint=self.api_grafanads_annotations,
methods=["POST"],
response_model=List[Annotation],
tags=self.openapi_tags,
name=f"{self.api_name}_annotations_back",
description=f"Getting target annotations (Backward compatible)",
description="Getting target annotations (Backward compatible)",
)
self.router.add_api_route(
path=f"/api/grafanads/{self.api_name}/annotations",
......@@ -424,7 +528,7 @@ class JsonDSAPI(object):
response_model=List[Annotation],
tags=self.openapi_tags,
name=f"{self.api_name}_annotations",
description=f"Getting target annotations",
description="Getting target annotations",
)
self.router.add_api_route(
path=f"/api/grafanads/{self.api_name}/variable",
......@@ -433,5 +537,5 @@ class JsonDSAPI(object):
response_model=List[Union[Dict[str, str], str]],
tags=self.openapi_tags,
name=f"{self.api_name}_variable",
description=f"Getting target variable",
description="Getting target variable",
)
......@@ -7,7 +7,7 @@
# Python modules
import datetime
from typing import List, Optional, Dict, Any, Union
from typing import List, Optional, Dict, Any, Union, Tuple
# Third-party modules
from pydantic import BaseModel, Field
......@@ -73,12 +73,12 @@ class QueryRequest(BaseModel):
max_datapoints: int = Field(500, alias="maxDataPoints")
targets: List[TargetItem]
adhoc_filters: Optional[List[AdhocFilterItem]] = Field(None, alias="adhocFilters")
result_type: str = Field("timeseries", alias="format") # matrix
result_type: str = Field("time_series", alias="format") # matrix
class TargetResponseItem(BaseModel):
target: str
datapoints: List[List[float]]
datapoints: List[Tuple[float, int]]
class SearchResponseItem(BaseModel):
......
......@@ -6,7 +6,7 @@
# ----------------------------------------------------------------------
# Python modules
from typing import Optional, Union, Dict, List, Literal, Any
from typing import Optional, Union, List, Literal
# Third-party modules
from pydantic import BaseModel
......@@ -18,7 +18,6 @@ from noc.main.models.label import Label
from noc.sa.models.managedobject import ManagedObject
from noc.sa.models.useraccess import UserAccess
from noc.inv.models.interface import Interface
from ..models.jsonds import RangeSection
MAX_MANAGED_OBJECT_RESPONSE = 2000
......@@ -34,18 +33,16 @@ class QueryPayloadItem(BaseModel):
# Variable
class VariablePayloadItem(BaseModel):
target: Optional[str]
managed_object: Optional[Union[str, int]]
labels: Optional[List[str]] = None
interface_profile: Optional[str] = None
administrative_domain: Optional[str] = None
class LabelTarget(BaseModel):
target: Literal["", "labels"]
@property
def mo(self):
if not self.managed_object:
return None
return ManagedObject.get_by_bi_id(self.managed_object)
@classmethod
def get_variables(cls, user: "User" = None):
# Labels
return [
{"__text": ll, "__value": ll}
for ll in Label.objects.filter(enable_managedobject=True).values_list("name")
]
class ManagedObjectTarget(BaseModel):
......@@ -53,6 +50,20 @@ class ManagedObjectTarget(BaseModel):
labels: Optional[List[str]] = None
administrative_domain: Optional[str] = None
@classmethod
def get_variables(cls, user: "User" = None):
mos = ManagedObject.objects.filter(is_managed=True)
if cls.labels:
mos = mos.filter(effective_labels__overlap=cls.labels)
if not user.is_superuser:
mos = mos.filter(administrative_domain__in=UserAccess.get_domains(user))
return [
{"__text": f"{name} ({address})", "__value": bi_id}
for bi_id, address, name in mos.values_list("bi_id", "address", "name")[
:MAX_MANAGED_OBJECT_RESPONSE
]
]
class InterfaceTarget(BaseModel):
target: Literal["interface"]
......@@ -66,44 +77,10 @@ class InterfaceTarget(BaseModel):
return None
return ManagedObject.get_by_bi_id(self.managed_object)
class InterfaceProfileTarget(BaseModel):
target: Literal["interface_profile"]
VariableRequestItem = Union[ManagedObjectTarget, InterfaceTarget, InterfaceProfileTarget]
class VariableRequest(BaseModel):
payload: Dict[str, Any]
range: RangeSection = None
@staticmethod
def var_default():
# Labels
return [
{"__text": ll, "__value": ll}
for ll in Label.objects.filter(enable_managedobject=True).values_list("name")
]
@staticmethod
def var_managed_object(payload: "ManagedObjectTarget", user: "User" = None):
mos = ManagedObject.objects.filter(is_managed=True)
if payload.labels:
mos = mos.filter(effective_labels__overlap=payload.labels)
if not user.is_superuser:
mos = mos.filter(administrative_domain__in=UserAccess.get_domains(user))
return [
{"__text": f"{name} ({address})", "__value": bi_id}
for bi_id, address, name in mos.values_list("bi_id", "address", "name")[
:MAX_MANAGED_OBJECT_RESPONSE
]
]
@staticmethod
def var_interface(payload: "InterfaceTarget", user: "User" = None):
ifaces = Interface.objects.filter(managed_object=payload.mo, type="physical")
profiles = payload.interface_profile
@classmethod
def get_variables(cls, user: "User" = None):
ifaces = Interface.objects.filter(managed_object=cls.mo, type="physical")
profiles = cls.interface_profile
if isinstance(profiles, str):
profiles = [profiles]
if profiles:
......@@ -116,30 +93,40 @@ class VariableRequest(BaseModel):
for iface in ifaces
]
@staticmethod
def var_interface_profile(payload: "InterfaceProfileTarget", user: "User" = None):
if not user.is_superuser and payload.mo not in UserAccess.get_domains(user):
class TestTarget(BaseModel):
target: Literal["test"]
@classmethod
def get_variables(cls, user: "User" = None):
return [
{"__text": "Device1#59565", "__value": "2083341664757472739"},
{"__text": "Device2#59609", "__value": "272411249935345586"},
{"__text": "Device3#8328", "__value": "825392260101847512"},
{"__text": "Device4", "__value": "3780187837837487731"},
]
class InterfaceProfileTarget(InterfaceTarget):
target: Literal["interface_profile"]
@classmethod
def get_variables(cls, user: "User" = None):
if not user.is_superuser and cls.mo not in UserAccess.get_domains(user):
raise HTTPException(
status_code=404, detail=f"User has no access to ManagedObject: {payload.mo}"
status_code=404, detail=f"User has no access to ManagedObject: {cls.mo}"
)
return [
{"__text": ip.name, "__value": str(ip.id)}
for ip in set(
ip
for ip in Interface.objects.filter(
managed_object=payload.mo, type=payload.type
managed_object=cls.mo, type=cls.type
).values_list("profile")
)
]
@staticmethod
def var_test_variables(*args, **kwargs):
return [
{"__text": "Device1#59565", "__value": "2083341664757472739"},
{"__text": "Device2#59609", "__value": "272411249935345586"},
{"__text": "Device3#8328", "__value": "825392260101847512"},
{"__text": "Device4", "__value": "3780187837837487731"},
]
def get_variable_keys(self):
return [{"type": "managed_object", "text": "ManagedObject"}, {"type": "", "text": "Labels"}]
VariablePayloadItem = Union[
LabelTarget, ManagedObjectTarget, InterfaceTarget, InterfaceProfileTarget, TestTarget
]
......@@ -23,7 +23,7 @@ from noc.fm.models.archivedalarm import ArchivedAlarm
from noc.fm.models.alarmclass import AlarmClass
from noc.models import get_model
from ..models.jsonds import AnnotationSection
from ..models.managedobject import VariableRequest, QueryPayloadItem
from ..models.managedobject import VariablePayloadItem, QueryPayloadItem
from ..jsonds import JsonDSAPI
......@@ -33,7 +33,7 @@ router = APIRouter()
class ManagedObjectJsonDS(JsonDSAPI):
api_name = "managedobject"
query_payload = QueryPayloadItem
variable_payload = VariableRequest
variable_payload = VariablePayloadItem
@staticmethod
def resolve_object_query(model_id, value, user: User = None) -> Optional[int]:
......