diff --git a/core/cdag/factory/config.py b/core/cdag/factory/config.py index 63f37f0bf995812b4af4ab09dcf23b65cc57f568..971f5cabebb825ec611a642ab60073dda693c6c3 100644 --- a/core/cdag/factory/config.py +++ b/core/cdag/factory/config.py @@ -66,6 +66,9 @@ class ConfigCDAGFactory(BaseCDAGFactory): return True return match(self.ctx, expr) + def clean_node_config(self, node_id: str, config: Optional[Dict[str, Any]]) -> Any: + return config + def construct(self) -> None: for item in self.config.nodes: # Check match @@ -75,11 +78,12 @@ class ConfigCDAGFactory(BaseCDAGFactory): if not self.requirements_met(item.inputs): continue # Create node + node_id = self.get_node_id(item.name) node = self.graph.add_node( - self.get_node_id(item.name), + node_id, node_type=item.type, description=item.description, - config=item.config, + config=self.clean_node_config(node_id, item.config), ctx=self.ctx, sticky=item.sticky, ) diff --git a/core/cdag/factory/scope.py b/core/cdag/factory/scope.py index a9b711b12a5ad05dca59e921a3cb8c7e387e8849..5017c399320fdba66999b3aed823eede3df55e5f 100644 --- a/core/cdag/factory/scope.py +++ b/core/cdag/factory/scope.py @@ -45,7 +45,7 @@ class MetricScopeCDAGFactory(BaseCDAGFactory): "probe", description=f"Input collector for {name} metric", config={ - "unit": mt.units.code if mt.units else "1", + "unit": (mt.units.code or "1") if mt.units else "1", "scale": mt.scale.code if mt.scale else "1", }, sticky=self.sticky, diff --git a/core/cdag/node/subgraph.py b/core/cdag/node/subgraph.py new file mode 100644 index 0000000000000000000000000000000000000000..f344be370b712e0ed59828c9c89607b06e5e312c --- /dev/null +++ b/core/cdag/node/subgraph.py @@ -0,0 +1,135 @@ +# ---------------------------------------------------------------------- +# subgraph node +# ---------------------------------------------------------------------- +# Copyright (C) 2007-2021 The NOC Project +# See LICENSE for details +# ---------------------------------------------------------------------- + +# Python modules +from typing import Any, Optional, List, Iterable, Dict, Tuple, DefaultDict +from collections import defaultdict + +# Third-party modules +from pydantic import BaseModel +import yaml + +# NOC modules +from ..graph import CDAG +from ..factory.config import ConfigCDAGFactory, GraphConfig +from .base import BaseCDAGNode, ValueType, Category + + +class InputMapping(BaseModel): + # Outer input name + public_name: str + # Local node + node: str + # Local node input name + name: str + + +class ConfigMapping(BaseModel): + # Outer config param name + name: str + # Local node + node: str + # Local node param name + param: str + + +class SubgraphConfig(BaseModel): + # Yaml-serialized subgraph + cdag: str + inputs: Optional[List[InputMapping]] + output: Optional[str] + config: Optional[List[ConfigMapping]] + + +class SubgraphState(BaseModel): + state: Optional[Dict[str, Any]] + + +class SubgraphCDAGFactory(ConfigCDAGFactory): + def __init__(self, *args, **kwargs): + # node -> [(param, value)] + self.node_cfg: DefaultDict[str, Dict[str, Any]] = defaultdict(dict) + super().__init__(*args, **kwargs) + + def set_node_config(self, node: str, param: str, value: Any) -> None: + """ + Set additional node config + :param node: + :param param: + :param value: + :return: + """ + self.node_cfg[node][param] = value + + def clean_node_config(self, node_id: str, config: Optional[Dict[str, Any]]) -> Any: + override = self.node_cfg[node_id] + if override: + config = config or {} + config.update(**override) + return config + + +class SubgraphNode(BaseCDAGNode): + """ + Execute nested subgraph + """ + + name = "subgraph" + config_cls = SubgraphConfig + state_cls = SubgraphState + categories = [Category.UTIL] + + def __init__( + self, + node_id: str, + state: Optional[Dict[str, Any]] = None, + description: str = None, + config: Optional[Dict[str, Any]] = None, + sticky: bool = False, + ): + # Clean up config + cfg: SubgraphConfig = self.clean_config(config) + # Get graph config + graph_cfg = GraphConfig(**yaml.safe_load(cfg.cdag)) + # Build graph + self.state = self.clean_state(state) + self.cdag = CDAG("inner", state=self.state.state) + factory = SubgraphCDAGFactory(self.cdag, config=graph_cfg) + if cfg.config: + for m in cfg.config: + factory.set_node_config(m.node, m.param, config.get(m.name)) + factory.construct() + # Build input mappings + self.input_mappings: Dict[str, Tuple[BaseCDAGNode, str]] = {} + if cfg.inputs: + for m in cfg.inputs: + self.input_mappings[m.public_name] = (self.cdag.get_node(m.node), m.name) + # Inject measure node when necessary + self.measure_node = None + if cfg.output: + self.measure_node = self.cdag.add_node(f"__measure_{id(self)}", "none") + self.cdag.get_node(cfg.output).subscribe(self.measure_node, "x") + # Construct all the rest + super().__init__( + node_id=node_id, state=state, description=description, config=config, sticky=sticky + ) + + def iter_inputs(self) -> Iterable[str]: + yield from self.input_mappings + + def get_value(self, *args, **kwargs) -> Optional[ValueType]: + # Start sub-transaction + tx = self.cdag.begin() + for p, v in kwargs.items(): + node, param = self.input_mappings[p] + node.activate(tx, param, v) + changed_state = tx.get_changed_state() + if self.state.state is None and changed_state: + self.state.state = {} + self.state.state.update(changed_state) + out = tx.get_inputs(self.measure_node) + return out["x"] # None node has `x` input diff --git a/tests/cdag/node/test_categories.py b/tests/cdag/node/test_categories.py index 6d8ad09a352bb0b6a0f902214da20d3f9f0508d9..6b29fe48f0f864c64cc8f93d6d23a1b1bdc60226 100644 --- a/tests/cdag/node/test_categories.py +++ b/tests/cdag/node/test_categories.py @@ -33,7 +33,7 @@ CATEGORIES = { Category.ACTIVATION: {"indicator", "logistic", "relu", "softplus"}, Category.COMPARE: {"eq", "ne"}, Category.DEBUG: {"state"}, - Category.UTIL: {"key", "metrics", "none", "one", "probe", "value"}, + Category.UTIL: {"key", "metrics", "none", "one", "probe", "subgraph", "value"}, Category.STATISTICS: {"mean", "std"}, Category.ML: {"gauss"}, Category.WINDOW: {"expdecay", "nth", "percentile", "sumstep"}, diff --git a/tests/cdag/node/test_subgraph.py b/tests/cdag/node/test_subgraph.py new file mode 100644 index 0000000000000000000000000000000000000000..01e24dc6a68efc00c59b53f5f81c5164bc443adb --- /dev/null +++ b/tests/cdag/node/test_subgraph.py @@ -0,0 +1,86 @@ +# ---------------------------------------------------------------------- +# subgraph node test +# ---------------------------------------------------------------------- +# Copyright (C) 2007-2021 The NOC Project +# See LICENSE for details +# ---------------------------------------------------------------------- + +# Third-party modules +import pytest + +# NOC modules +from .util import NodeCDAG + +# n * x + y +SG_CONFIG = """ +nodes: +# Config is mapped outside +- name: n + description: N, mapped outside + type: value +# n * x +# x is external +- name: nx + type: mul + description: n * x + inputs: + - name: y + node: n +# (n*x) + (y) +# y is external +- name: plus + type: add + description: add nx and y + inputs: + - name: x + node: nx +- name: state + type: state + inputs: + - name: x + node: plus +""" + +CONFIG = { + "cdag": SG_CONFIG, + "inputs": [ + {"public_name": "x", "node": "nx", "name": "x"}, + {"public_name": "y", "node": "plus", "name": "y"}, + ], + "output": "state", + "config": [{"name": "n", "node": "n", "param": "value"}], + "n": 2, +} + + +def test_inputs(): + node = NodeCDAG("subgraph", config=CONFIG).get_node() + inputs = set(node.iter_inputs()) + assert inputs == {"x", "y"} + + +def test_config(): + node = NodeCDAG("subgraph", config=CONFIG).get_node() + vnode = node.cdag.get_node("n") + assert vnode.config.value == 2 + + +@pytest.mark.parametrize( + "x,y,expected", + [(0, 0, 0), (1, 0, 2), (0, 1, 1), (2, 1, 5)], +) +def test_subgrapg_node(x, y, expected): + cdag = NodeCDAG("subgraph", config=CONFIG) + assert cdag.is_activated() is False + cdag.activate("x", x) + assert cdag.is_activated() is False + cdag.activate("y", y) + x_act = expected is not None + assert cdag.is_activated() is x_act + value = cdag.get_value() + if expected is None: + assert value is None + else: + assert value == expected + state = cdag.get_changed_state()["node"] + assert state["state"]["state"] == {"value": expected}