Commit a59212d3 authored by Andrey Vertiprahov's avatar Andrey Vertiprahov
Browse files

Merge branch 'noc-cdag' into 'master'

Computational Directed Acyclic Graph

See merge request !4603
parents 8f306689 2be54aeb
Pipeline #31799 passed with stages
in 90 minutes and 19 seconds
# ----------------------------------------------------------------------
# BaseCDAGFactory
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from typing import Optional
# NOC modules
from ..typing import FactoryCtx
from ..graph import CDAG
class BaseCDAGFactory(object):
"""
CDAG factory is responsible for computation graph construction. Factories can be chained
together
"""
def __init__(
self, graph: CDAG, ctx: Optional[FactoryCtx] = None, namespace: Optional[str] = None
):
self.graph = graph
self.ctx = ctx
self.namespace = namespace
def construct(self) -> None: # pragma: no cover
raise NotImplementedError
def get_node_id(self, name: str) -> str:
"""
Generate prefixed node id
:param name:
:return:
"""
if self.namespace and "::" not in name:
return f"{self.namespace}::{name}"
return name
# ----------------------------------------------------------------------
# ConfigCDAGFactory
# ----------------------------------------------------------------------
# Copyright (C) 2007-2021 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from typing import Optional, List, Dict, Any
# Third-party modules
from pydantic import BaseModel
from jinja2 import Template
# NOC modules
from noc.core.matcher import match
from .base import BaseCDAGFactory, FactoryCtx
from ..graph import CDAG
class InputItem(BaseModel):
name: str
node: str
class NodeItem(BaseModel):
name: str
type: str
description: Optional[str]
config: Optional[Dict[str, Any]]
inputs: Optional[List[InputItem]]
match: Optional[Dict[str, Any]]
class ConfigCDAGFactory(BaseCDAGFactory):
"""
Build CDAG from abstract config
"""
def __init__(
self,
graph: CDAG,
config: List[NodeItem],
ctx: Optional[FactoryCtx] = None,
namespace: Optional[str] = None,
):
super().__init__(graph, ctx, namespace)
self.config = config
def requirements_met(self, inputs: Optional[List[InputItem]]):
if not inputs:
return True
for input in inputs:
if self.expand_input(input.node) not in self.graph:
return False
return True
def is_matched(self, expr: Optional[FactoryCtx]) -> bool:
if not expr:
return True
return match(self.ctx, expr)
def construct(self) -> None:
print("@ construct")
for item in self.config:
print(item)
# Check match
if not self.is_matched(item.match):
print("not matched")
continue
# Check for prerequisites
if not self.requirements_met(item.inputs):
print("not met")
continue
# Create node
node = self.graph.add_node(
self.get_node_id(item.name),
node_type=item.type,
description=item.description,
config=item.config,
ctx=self.ctx,
)
# Connect node
if item.inputs:
for input in item.inputs:
r_node = self.graph[self.expand_input(input.node)]
r_node.subscribe(node, input.name)
node.mark_as_bound(input.name)
def expand_input(self, name: str) -> str:
if "{" in name:
name = Template(name).render(**self.ctx)
return self.get_node_id(name)
# ----------------------------------------------------------------------
# JSONCDAGFactory
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from typing import Optional
# Third-party modules
import orjson
# NOC modules
from .base import CDAG
from .config import ConfigCDAGFactory, NodeItem, FactoryCtx
class JSONCDAGFactory(ConfigCDAGFactory):
def __init__(
self,
graph: CDAG,
config: str,
cfx: Optional[FactoryCtx] = None,
namespace: Optional[str] = None,
):
items = [NodeItem(**i) for i in orjson.loads(config)]
super().__init__(graph, items, cfx, namespace)
# ----------------------------------------------------------------------
# YAMLCDAGFactory
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from typing import Optional
# Third-party modules
import yaml
# NOC modules
from .base import CDAG
from .config import ConfigCDAGFactory, NodeItem, FactoryCtx
class YAMLCDAGFactory(ConfigCDAGFactory):
def __init__(
self,
graph: CDAG,
config: str,
ctx: Optional[FactoryCtx] = None,
namespace: Optional[str] = None,
):
items = [NodeItem(**i) for i in yaml.safe_load(config)]
super().__init__(graph, items, ctx, namespace)
# ----------------------------------------------------------------------
# CDAG
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from typing import Optional, Any, Dict
# NOC modules
from .node.base import BaseCDAGNode
from .node.loader import loader
from .tx import Transaction
class CDAG(object):
def __init__(self, graph_id: str, state: Optional[Dict[str, Any]] = None):
self.graph_id = graph_id
self.state: Dict[str, Any] = state or {}
self.nodes: Dict[str, BaseCDAGNode] = {}
def __getitem__(self, item: str) -> BaseCDAGNode:
return self.nodes[item]
def __contains__(self, item: str):
return item in self.nodes
def add_node(
self,
node_id: str,
node_type: str,
description: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
ctx: Optional[Dict[str, Any]] = None,
) -> BaseCDAGNode:
if node_id in self.nodes:
raise ValueError("Node %s is already configured" % node_id)
node_cls = loader.get_class(node_type)
if not node_cls:
raise ValueError("Invalid node type: %s" % node_type)
config = config or {}
#
return node_cls.construct(
self,
node_id,
description=description,
state=self.state.get(node_id),
config=config,
ctx=ctx,
)
def begin(self) -> Transaction:
"""
Start new transaction
:return:
"""
return Transaction(self)
def get_node(self, name: str) -> Optional[BaseCDAGNode]:
return self.nodes.get(name)
def get_state(self) -> Dict[str, Any]:
"""
Construct current state
:return:
"""
r = {}
for node_id, node in self.nodes.items():
if not hasattr(node, "state_cls"):
continue
ns = node.get_state()
if ns is None:
continue
r[node_id] = ns.dict()
return r
def get_dot(self) -> str:
"""
Build graphviz dot representation for graph
:return:
"""
n_map: Dict[str, str] = {}
r = ["digraph {", ' rankdir="LR";']
# Nodes
for n, node_id in enumerate(sorted(self.nodes)):
dot_id = "n%05d" % n
n_map[node_id] = dot_id
node = self.nodes[node_id]
n_attrs = ""
if node.config:
attrs = []
for fn in node.config.__fields__:
v = getattr(node.config, fn)
if hasattr(v, "value"):
v = v.value
attrs += ["%s: %s" % (fn, v)]
if attrs:
n_attrs = "\\n" + "\\n".join(attrs)
r += [
' %s [label="%s\\ntype: %s%s", shape="%s"];'
% (dot_id, node_id, node.name, n_attrs, node.dot_shape)
]
# Edges
for node in self.nodes.values():
for r_node, in_name in node.iter_subscribers():
r += [
' %s -> %s [label="%s"];'
% (n_map[node.node_id], n_map[r_node.node_id], in_name)
]
r += ["}"]
return "\n".join(r)
# ----------------------------------------------------------------------
# AbsNode
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from typing import Optional
# NOC modules
from .base import BaseCDAGNode, ValueType, Category
class AbsNode(BaseCDAGNode):
"""
Get absolute value of 'x'
"""
name = "abs"
categories = [Category.MATH]
def get_value(self, x: ValueType) -> Optional[ValueType]:
return abs(x)
# ----------------------------------------------------------------------
# ACosNode
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from typing import Optional
from math import acos
# NOC modules
from .base import BaseCDAGNode, ValueType, Category
class ACosNode(BaseCDAGNode):
"""
Get arccosinus of 'x'
"""
name = "acos"
categories = [Category.MATH]
def get_value(self, x: ValueType) -> Optional[ValueType]:
return acos(x)
# ----------------------------------------------------------------------
# AddNode
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from typing import Optional
# NOC modules
from .base import BaseCDAGNode, ValueType, Category
class AddNode(BaseCDAGNode):
"""
Add `x` to `y`
"""
name = "add"
categories = [Category.OPERATION]
def get_value(self, x: ValueType, y: ValueType) -> Optional[ValueType]:
return x + y
# ----------------------------------------------------------------------
# ASinNode
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from typing import Optional
from math import asin
# NOC modules
from .base import BaseCDAGNode, ValueType, Category
class ASinNode(BaseCDAGNode):
"""
Get arcsinus of 'x'
"""
name = "asin"
categories = [Category.MATH]
def get_value(self, x: ValueType) -> Optional[ValueType]:
return asin(x)
# ----------------------------------------------------------------------
# ATanNode
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from typing import Optional
from math import atan
# NOC modules
from .base import BaseCDAGNode, ValueType, Category
class ATanNode(BaseCDAGNode):
"""
Get arctangens of 'x'
"""
name = "atan"
categories = [Category.MATH]
def get_value(self, x: ValueType) -> Optional[ValueType]:
return atan(x)
# ----------------------------------------------------------------------
# BaseNode
# ----------------------------------------------------------------------
# Copyright (C) 2007-2021 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from typing import Any, Set, Optional, Type, Dict, List, Iterable, Tuple
from enum import Enum
import inspect
# Third-party modules
from pydantic import BaseModel
# NOC modules
from ..typing import ValueType
from ..tx import Transaction
class Category(str, Enum):
MATH = "math"
OPERATION = "operation"
LOGICAL = "logical"
ACTIVATION = "activation"
COMPARE = "compare"
DEBUG = "debug"
UTIL = "util"
STATISTICS = "statistics"
ML = "ml"
WINDOW = "window"
class BaseCDAGNodeMetaclass(type):
def __new__(mcs, name, bases, attrs):
n = type.__new__(mcs, name, bases, attrs)
sig = inspect.signature(n.get_value)
n.static_inputs = [x for x in sig.parameters if x != "self"]
return n
class BaseCDAGNode(object, metaclass=BaseCDAGNodeMetaclass):
name: str
state_cls: Type[BaseModel]
config_cls: Type[BaseModel]
static_inputs: List[str] # Filled by metaclass
dot_shape: str = "box"
categories: List[Category] = []
def __init__(
self,
node_id: str,
state: Optional[Dict[str, Any]] = None,
description: str = None,
config: Optional[Dict[str, Any]] = None,
):
self.node_id = node_id
self.description = description
self.state = self.clean_state(state)
self.config = self.clean_config(config)
self._inputs = {i for i in self.iter_inputs()}
self._bound_inputs: Set[str] = set()
self._subscribers: List[Tuple[BaseCDAGNode, str]] = []
# Pre-calculated inputs
self.const_inputs: Dict[str, ValueType] = {}
self._const_value: Optional[ValueType] = None
@classmethod
def construct(
cls,
graph,
node_id: str,
description: Optional[str],
state: Optional[BaseModel],
config: Optional[BaseModel],
ctx: Optional[Dict[str, Any]],
) -> Optional["BaseCDAGNode"]:
"""
Construct node
:return:
"""
node = cls(node_id, description=description, state=state, config=config)
graph.nodes[node_id] = node
return node
def clean_state(self, state: Optional[Dict[str, Any]]) -> Optional[BaseModel]:
if not hasattr(self, "state_cls"):
return None
state = state or {}
return self.state_cls(**state)
def clean_config(self, config: Optional[Dict[str, Any]]) -> Optional[BaseModel]:
if not hasattr(self, "config_cls"):
return None
return self.config_cls(**config)
def iter_inputs(self) -> Iterable[str]:
"""
Enumerate all configured inputs
:return:
"""
yield from self.static_inputs
def activate(self, tx: Transaction, name: str, value: ValueType) -> None:
"""
Activate named input with
:param tx: Transaction instance
:param name: Input name
:param value: Input value
:return:
"""
if name not in self._inputs:
raise KeyError(f"Invalid input: {name}")
inputs = tx.get_inputs(self)
is_active = inputs[name] is not None
inputs[name] = value
if is_active or any(True for v in inputs.values() if v is None):
return # Already activated or non-activated inputs
# Activate node, calculate value
value = self.get_value(**inputs)
if hasattr(self, "state_cls"):
tx.update_state(self)
# Notify all subscribers
for s_node, s_name in self._subscribers:
s_node.activate(tx, s_name, value)
def activate_const(self, name: str, value: ValueType) -> None:
"""
Activate const input. Called during construction time.
:param name:
:param value:
:return:
"""
if name not in self._inputs:
raise KeyError(f"Invalid const input: {name}")
self.const_inputs[name] = value
if self.is_const:
for node, name in self._subscribers:
node.activate_const(name, self._const_value)
def subscribe(self, node: "BaseCDAGNode", name: str) -> None:
"""
Subscribe to activation function
:param node: Connected node
:param name: Connected input name
:return:
"""
self._subscribers += [(node, name)]
if self.is_const:
node.activate_const(name, self._const_value)
def mark_as_bound(self, name: str) -> None:
if name in self._inputs:
self._bound_inputs.add(name)
def get_value(self, *args, **kwargs) -> Optional[ValueType]: # pragma: no cover
"""
Calculate node value. Returns None when input is malformed and should not be propagated
:return:
"""
raise NotImplementedError
def get_state(self) -> Optional[BaseModel]:
"""
Get current node state
:return:
"""