Verified Commit c36263f3 authored by Dmitry Volodin's avatar Dmitry Volodin
Browse files

DataStream: Configurable change tracker policies

parent 52201427
......@@ -21,7 +21,7 @@ from noc.inv.models.biosegtrial import BioSegTrial
from noc.inv.models.interface import Interface
from noc.inv.models.link import Link
from noc.core.bioseg.moderator.base import moderate_trial
from noc.core.datastream.change import bulk_datastream_changes
from noc.core.datastream.change import change_tracker
from noc.core.text import alnum_key
from noc.core.clickhouse.connect import connection
from noc.inv.models.discoveryid import DiscoveryID
......@@ -217,7 +217,7 @@ class Command(BaseCommand):
trials = list(BioSegTrial.objects.filter(processed=False, id__in=ids).order_by("id"))
else:
trials = list(BioSegTrial.objects.filter(processed=False).order_by("id"))
with bulk_datastream_changes():
with change_tracker.bulk_changes():
for trial in trials:
self.print("@@@ Processing trial %s" % trial.id)
moderate_trial(trial)
......
......@@ -14,7 +14,7 @@ from contextlib import contextmanager
from noc.core.management.base import BaseCommand, CommandError
from noc.core.mongo.connection import connect
from noc.core.validators import is_int
from noc.core.datastream.change import bulk_datastream_changes
from noc.core.datastream.change import change_tracker
from noc.core.comp import smart_text
......@@ -49,7 +49,7 @@ class Command(BaseCommand):
# Wipe objects
from noc.core.debug import error_report
with bulk_datastream_changes():
with change_tracker.bulk_changes():
for o in objects:
with self.log("Wiping '%s':" % smart_text(o), True):
try:
......
# ----------------------------------------------------------------------
# DataStream change notification
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# Copyright (C) 2007-2021 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
......@@ -9,6 +9,7 @@
import threading
import contextlib
from collections import defaultdict
from typing import Optional, Any, Tuple, Set, List
# NOC modules
from noc.core.defer import call_later
......@@ -17,53 +18,131 @@ from noc.core.datastream.loader import loader
tls = threading.local()
def register_changes(data):
class ChangeTracker(object):
"""
Register single change
:param data: List of (datasource name, object id)
:return:
Thread-local datastream change tracker.
"""
if hasattr(tls, "_datastream_changes"):
# Within bulk_datastream_changes context
tls._datastream_changes.update(data)
else:
apply_changes(data)
@staticmethod
def get_policy() -> "BaseChangeTrackerPolicy":
policy = getattr(tls, "_ct_policy", None)
if not policy:
policy = SimpleChangeTrackerPolicy()
tls._ct_policy = policy
return policy
def register(self, changes: List[Tuple[str, Any]]) -> None:
"""
Register datastream change
:param changes: List of (datastream, object id)
:return:
"""
self.get_policy().register(changes)
@staticmethod
def push_policy(policy: "BaseChangeTrackerPolicy") -> None:
"""
Push new effective policy for the current thread,
store current one in the stack
:param policy:
:return:
"""
# Store previous policy
prev_policy = getattr(tls, "_ct_policy", None)
if prev_policy:
# Something to store
stack = getattr(tls, "_ct_policy_stack", None) or []
stack.append(prev_policy)
tls._ct_policy_stack = stack
# Store current policy
tls._ct_policy = policy
@staticmethod
def pop_policy() -> Optional["BaseChangeTrackerPolicy"]:
"""
Pop current effective policy from stack and restore previous one
:return: Current effective policy
"""
# Get current policy
policy = getattr(tls, "_ct_policy", None)
stack = getattr(tls, "_ct_policy_stack", None)
if stack:
# Install previous policy
prev_policy = stack.pop(-1)
tls._ct_policy = prev_policy
if not stack:
del tls._ct_policy_stack
return policy
@contextlib.contextmanager
def bulk_changes(self):
"""
Apply all datastream changes at once
:return:
"""
# Store current effective policy
prev_policy = getattr(tls, "_ct_policy", None)
# Install bulk change policy as
policy = BulkChangeTrackerPolicy()
tls._ct_policy = policy
yield
policy.apply()
if prev_policy:
tls._ct_policy = prev_policy
else:
del tls._ct_policy
@contextlib.contextmanager
def bulk_datastream_changes():
change_tracker = ChangeTracker()
class BaseChangeTrackerPolicy(object):
"""
Base class for change tracker policies
"""
Buffer and deduplicate pending datastream changes
Usage:
def __init__(self):
...
with bulk_datastream_changes:
....
def register(self, changes: List[Tuple[str, Any]]) -> None:
...
:return:
def apply(self):
"""
Apply collected changes
:return:
"""
def apply_changes(self, changes: List[Tuple[str, Any]]):
if changes:
call_later("noc.core.datastream.change.do_changes", changes=changes)
class DropChangeTrackerPolicy(BaseChangeTrackerPolicy):
"""
# Save previous state
last_changes = getattr(tls, "_datastream_changes", None)
# Create new context
tls._datastream_changes = set()
# Perform decorated computations
yield
# Apply changes
apply_changes(list(set(tls._datastream_changes)))
# Restore previous context
if last_changes is not None:
tls._datastream_changes = last_changes
else:
del tls._datastream_changes
def apply_changes(changes):
Drop all changes
"""
:param changes: List of (datastream name, object id)
:return:
class SimpleChangeTrackerPolicy(BaseChangeTrackerPolicy):
"""
Simple policy, applies every registered change
"""
if changes:
call_later("noc.core.datastream.change.do_changes", changes=changes)
def register(self, changes: List[Tuple[str, Any]]) -> None:
self.apply_changes(changes)
class BulkChangeTrackerPolicy(BaseChangeTrackerPolicy):
def __init__(self):
super().__init__()
self.changes: Set[Tuple[str, Any]] = set()
def register(self, changes: List[Tuple[str, Any]]) -> None:
self.changes.update(changes)
def apply(self):
self.apply_changes(list(self.changes))
def do_changes(changes):
......
# ----------------------------------------------------------------------
# @datastream decorator
# ----------------------------------------------------------------------
# Copyright (C) 2007-2018 The NOC Project
# Copyright (C) 2007-2021 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
......@@ -11,7 +11,7 @@ from mongoengine import signals as mongo_signals
# NOC modules
from noc.core.model.decorator import is_document
from .change import register_changes
from .change import change_tracker
def datastream(cls):
......@@ -56,4 +56,4 @@ def _on_document_change(sender, document, *args, **kwargs):
def _on_change(obj, changed_fields=None):
r = list(obj.iter_changed_datastream(changed_fields=changed_fields))
if r:
register_changes(r)
change_tracker.register(r)
......@@ -76,7 +76,7 @@ from noc.core.bi.decorator import bi_sync
from noc.core.script.scheme import SCHEME_CHOICES
from noc.core.matcher import match
from noc.core.datastream.decorator import datastream
from noc.core.datastream.change import register_changes
from noc.core.datastream.change import change_tracker
from noc.core.resourcegroup.decorator import resourcegroup
from noc.core.confdb.tokenizer.loader import loader as tokenizer_loader
from noc.core.confdb.engine.base import Engine
......@@ -1087,7 +1087,7 @@ class ManagedObject(NOCModel):
self.mirror_config(data, changed)
# Rebuild datastream if necessary
if changed:
register_changes([("managedobject", self.id)])
change_tracker.register([("managedobject", self.id)])
return changed
def notify_config_changes(self, is_new, data, diff):
......
......@@ -14,7 +14,7 @@ from noc.services.discovery.jobs.base import MODiscoveryJob
from noc.services.discovery.jobs.periodic.mac import MACCheck
from noc.services.discovery.jobs.periodic.metrics import MetricsCheck
from noc.core.span import Span
from noc.core.datastream.change import bulk_datastream_changes
from noc.core.datastream.change import change_tracker
from .resolver import ResolverCheck
from .suggestsnmp import SuggestSNMPCheck
from .profile import ProfileCheck
......@@ -83,7 +83,7 @@ class BoxDiscoveryJob(MODiscoveryJob):
default_contexts = ("counters", "metric_windows", "active_thresholds")
def handler(self, **kwargs):
with Span(sample=self.object.box_telemetry_sample), bulk_datastream_changes():
with Span(sample=self.object.box_telemetry_sample), change_tracker.bulk_changes():
has_cli = "C" in self.object.get_access_preference()
ResolverCheck(self).run()
if self.object.auth_profile and self.object.auth_profile.enable_suggest:
......
......@@ -15,7 +15,7 @@ from noc.inv.models.interfaceprofile import InterfaceProfile
from noc.sa.models.service import Service
from noc.sa.models.servicesummary import ServiceSummary
from noc.inv.models.interface import Interface
from noc.core.datastream.change import register_changes
from noc.core.datastream.change import change_tracker
class NRIServiceCheck(DiscoveryCheck):
......@@ -98,4 +98,4 @@ class NRIServiceCheck(DiscoveryCheck):
self.logger.info("Sending %d updates", len(bulk))
icol.bulk_write(bulk)
ServiceSummary.refresh_object(self.object.id)
register_changes([("managedobject", self.object.id)])
change_tracker.register([("managedobject", self.object.id)])
......@@ -11,7 +11,7 @@ import random
# NOC modules
from noc.services.discovery.jobs.base import MODiscoveryJob
from noc.core.span import Span
from noc.core.datastream.change import bulk_datastream_changes
from noc.core.datastream.change import change_tracker
from ..box.resolver import ResolverCheck
from .uptime import UptimeCheck
from .interfacestatus import InterfaceStatusCheck
......@@ -32,7 +32,7 @@ class PeriodicDiscoveryJob(MODiscoveryJob):
default_contexts = ("counters", "metric_windows", "active_thresholds")
def handler(self, **kwargs):
with Span(sample=self.object.periodic_telemetry_sample), bulk_datastream_changes():
with Span(sample=self.object.periodic_telemetry_sample), change_tracker.bulk_changes():
if self.object.auth_profile and self.object.auth_profile.type == "S":
self.logger.info("Invalid credentials. Stopping")
return
......
# ----------------------------------------------------------------------
# noc.core.datastream test
# ----------------------------------------------------------------------
# Copyright (C) 2007-2018 The NOC Project
# Copyright (C) 2007-2021 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
......@@ -16,6 +16,7 @@ import bson
# NOC modules
from noc.core.datastream.base import DataStream
from noc.core.datastream.change import change_tracker, SimpleChangeTrackerPolicy
from noc.core.perf import metrics
from noc.core.datastream.loader import loader
......@@ -364,3 +365,17 @@ def test_compile_filters():
assert DataStream.compile_filters(["id(1)", 1])
with pytest.raises(ValueError):
DataStream.compile_filters(["unknown(1)"])
def test_change_tracker_stack():
p1 = SimpleChangeTrackerPolicy()
assert change_tracker.get_policy() is not p1
change_tracker.push_policy(p1)
assert change_tracker.get_policy() is p1
p2 = SimpleChangeTrackerPolicy()
assert change_tracker.get_policy() is not p2
change_tracker.push_policy(p2)
assert change_tracker.get_policy() is not p1
assert change_tracker.get_policy() is p2
assert change_tracker.pop_policy() is p2
assert change_tracker.pop_policy() is p1
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment