Commit 16e2605c authored by Dmitry Volodin's avatar Dmitry Volodin Committed by Andrey Vertiprahov
Browse files

Generalized change tracking framework

parent f046e2fc
......@@ -54,7 +54,12 @@ services:
liftbridge:
image: liftbridge/liftbridge:latest
command: liftbridge --embedded-nats
entrypoint: liftbridge --data-dir=/tmp/lift --embedded-nats --raft-bootstrap-seed
volumes:
- type: tmpfs
target: /tmp
tmpfs:
size: "512M"
pytests:
build:
......
......@@ -14,3 +14,4 @@ NOC_NSQD_ADDRESSES=nsqd:4150
NOC_NSQD_HTTP_ADDRESSES=nsqd:4151
NOC_NSQLOOKUPD_ADDRESSES=nsqlookupd:4160
NOC_NSQLOOKUPD_HTTP_ADDRESSES=nsqlookupd:4161
NOC_LIFTBRIDGE_ADDRESSES=liftbridge:9292
......@@ -22,6 +22,7 @@ from noc.config import config
class Command(BaseCommand):
_slots = None
# List of single-partitioned streams
STREAMS = [
"revokedtokens",
......@@ -48,7 +49,18 @@ class Command(BaseCommand):
"jobs": "worker",
}
def handle(self, *args, **options):
def add_arguments(self, parser):
parser.add_argument(
"--slots",
dest="slots",
type=int,
required=False,
help="Static slot count (used for tests)",
)
def handle(self, slots=None, *args, **options):
if slots:
self._slots = slots
changed = False
# Get liftbridge metadata
meta = self.get_meta()
......@@ -85,6 +97,8 @@ class Command(BaseCommand):
def iter_limits(self) -> Tuple[str, int]:
async def get_slot_limits():
nonlocal slot_name
if self._slots:
return self._slots
return await dcs.get_slot_limit(slot_name)
dcs = get_dcs()
......
# ----------------------------------------------------------------------
# ./noc segment command
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# Copyright (C) 2007-2021 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
......@@ -21,7 +21,7 @@ from noc.inv.models.biosegtrial import BioSegTrial
from noc.inv.models.interface import Interface
from noc.inv.models.link import Link
from noc.core.bioseg.moderator.base import moderate_trial
from noc.core.datastream.change import change_tracker
from noc.core.change.policy import change_tracker
from noc.core.text import alnum_key
from noc.core.clickhouse.connect import connection
from noc.inv.models.discoveryid import DiscoveryID
......
# ---------------------------------------------------------------------
# ./noc wipe
# ---------------------------------------------------------------------
# Copyright (C) 2007-2019 The NOC Project
# Copyright (C) 2007-2021 The NOC Project
# See LICENSE for details
# ---------------------------------------------------------------------
......@@ -14,7 +14,7 @@ from contextlib import contextmanager
from noc.core.management.base import BaseCommand, CommandError
from noc.core.mongo.connection import connect
from noc.core.validators import is_int
from noc.core.datastream.change import change_tracker
from noc.core.change.policy import change_tracker
from noc.core.comp import smart_text
......
# ----------------------------------------------------------------------
# Change handler
# ----------------------------------------------------------------------
# Copyright (C) 2007-2021 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from logging import getLogger
from typing import Optional, List, Set, DefaultDict, Tuple
from collections import defaultdict
# NOC modules
from noc.models import get_model
logger = getLogger(__name__)
def on_change(changes: List[Tuple[str, str, str, Optional[List[str]]]], *args, **kwargs) -> None:
"""
Change worker
:param changes: List of (op, model id, item id, changed fields list)
:param args:
:param kwargs:
:return:
"""
# Datastream changes
ds_changes: DefaultDict[str, Set[str]] = defaultdict(set)
# Iterate over changes
for op, model_id, item_id, changed_fields in changes:
# Resolve item
logger.debug("[%s|%s] Processing change: %s", model_id, item_id, op)
model_cls = get_model(model_id)
if not model_cls:
logger.error("[%s|%s] Invalid model. Skipping", model_id, item_id)
return
if op == "delete":
item = None
else:
item = model_cls.get_by_id(item_id)
if not item:
logger.error("[%s|%s] Missed item. Skipping", model_id, item_id)
return
# Process datastreams
if hasattr(item, "iter_changed_datastream"):
for ds_name, ds_id in item.iter_changed_datastream(
changed_fields=set(changed_fields or [])
):
ds_changes[ds_name].add(ds_id)
# Apply datastream changes
if ds_changes:
apply_datastream(ds_changes)
def apply_datastream(ds_changes: DefaultDict[str, Set[str]]) -> None:
"""
Apply datastream changes
:param ds_changes:
:return:
"""
from noc.core.datastream.loader import loader
for ds_name, items in ds_changes.items():
ds = loader[ds_name]
if not ds:
logger.error("Invalid datastream: %s", ds_name)
continue
ds.bulk_update(sorted(items))
# ----------------------------------------------------------------------
# @change decorator and worker
# ----------------------------------------------------------------------
# Copyright (C) 2007-2021 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from logging import getLogger
# NOC modules
from noc.models import is_document, get_model_id
from .policy import change_tracker
logger = getLogger(__name__)
def change(model):
"""
@change decorator to enable generalized change tracking on the model.
:param model:
:return:
"""
if not hasattr(model, "get_by_id"):
raise ValueError("[%s] Missed .get_by_id", get_model_id(model))
if is_document(model):
_track_document(model)
else:
_track_model(model)
return model
def _track_document(model):
"""
Setup document change tracking
:param model:
:return:
"""
from mongoengine import signals
logger.debug("[%s] Tracking changes", get_model_id(model))
signals.post_save.connect(_on_document_change, sender=model)
signals.pre_delete.connect(_on_document_delete, sender=model)
def _track_model(model):
"""
Setup model change tracking
:param model:
:return:
"""
from django.db.models import signals
logger.debug("[%s] Tracking changes", get_model_id(model))
signals.post_save.connect(_on_model_change, sender=model)
signals.pre_delete.connect(_on_model_delete, sender=model)
def _on_document_change(sender, document, created=False, *args, **kwargs):
model_id = get_model_id(document)
op = "create" if created else "update"
logger.debug("[%s|%s] Change detected: %s", model_id, document.id, op)
changed_fields = list(document._changed_fields if not created else [])
change_tracker.register(
op=op,
model=model_id,
id=str(document.id),
fields=changed_fields,
)
def _on_document_delete(sender, document, *args, **kwargs):
model_id = get_model_id(document)
op = "delete"
logger.debug("[%s|%s] Change detected: %s", model_id, document.id, op)
change_tracker.register(
op=op,
model=model_id,
id=str(document.id),
fields=None,
)
def _on_model_change(sender, instance, created=False, *args, **kwargs):
def is_changed(field_name):
ov = instance.initial_data[field_name]
if hasattr(ov, "pk"):
ov = ov.pk
nv = getattr(instance, field_name)
if hasattr(nv, "pk"):
nv = nv.pk
return str(ov) != str(nv)
model_id = get_model_id(instance)
op = "create" if created else "update"
logger.debug("[%s|%s] Change detected: %s", model_id, instance.id, op)
changed_fields = [
f_name for f_name in getattr(instance, "initial_data", []) if is_changed(f_name)
]
change_tracker.register(
op=op,
model=model_id,
id=str(instance.id),
fields=changed_fields,
)
def _on_model_delete(sender, instance, *args, **kwargs):
model_id = get_model_id(instance)
op = "delete"
logger.debug("[%s|%s] Change detected: %s", model_id, instance.id, op)
change_tracker.register(
op=op,
model=model_id,
id=str(instance.id),
fields=None,
)
# ----------------------------------------------------------------------
# DataStream change notification
# Change tracking policy
# ----------------------------------------------------------------------
# Copyright (C) 2007-2021 The NOC Project
# See LICENSE for details
......@@ -9,18 +9,24 @@
import threading
import contextlib
from collections import defaultdict
from typing import Optional, Any, Tuple, Set, List
from typing import Optional, Tuple, List, Dict
from abc import ABCMeta, abstractmethod
# NOC modules
from noc.core.defer import defer
from noc.core.datastream.loader import loader
from noc.core.hash import hash_int
CHANGE_HANDLER = "noc.core.change.change.on_change"
tls = threading.local()
class ChangeTracker(object):
"""
Thread-local datastream change tracker.
Thread-local change tracker.
"""
@staticmethod
......@@ -31,13 +37,16 @@ class ChangeTracker(object):
tls._ct_policy = policy
return policy
def register(self, changes: List[Tuple[str, Any]]) -> None:
def register(self, op: str, model: str, id: str, fields: Optional[List] = None) -> None:
"""
Register datastream change
:param changes: List of (datastream, object id)
:param op: Operation, either create, update or delete
:param model: Model id
:param id: Item id
:param fields: List of changed fields
:return:
"""
self.get_policy().register(changes)
self.get_policy().register(op, model, id, fields)
@staticmethod
def push_policy(policy: "BaseChangeTrackerPolicy") -> None:
......@@ -77,7 +86,7 @@ class ChangeTracker(object):
@contextlib.contextmanager
def bulk_changes(self):
"""
Apply all datastream changes at once
Apply all changes at once
:return:
"""
# Store current effective policy
......@@ -86,7 +95,7 @@ class ChangeTracker(object):
policy = BulkChangeTrackerPolicy()
tls._ct_policy = policy
yield
policy.apply()
policy.commit()
if prev_policy:
tls._ct_policy = prev_policy
else:
......@@ -96,7 +105,7 @@ class ChangeTracker(object):
change_tracker = ChangeTracker()
class BaseChangeTrackerPolicy(object):
class BaseChangeTrackerPolicy(object, metaclass=ABCMeta):
"""
Base class for change tracker policies
"""
......@@ -104,60 +113,69 @@ class BaseChangeTrackerPolicy(object):
def __init__(self):
...
def register(self, changes: List[Tuple[str, Any]]) -> None:
@abstractmethod
def register(self, op: str, model: str, id: str, fields: Optional[List] = None) -> None:
...
def apply(self):
"""
Apply collected changes
:return:
"""
def apply_changes(self, changes: List[Tuple[str, Any]]):
if changes:
defer("noc.core.datastream.change.do_changes", changes=changes)
class DropChangeTrackerPolicy(BaseChangeTrackerPolicy):
"""
Drop all changes
"""
def register(self, op: str, model: str, id: str, fields: Optional[List] = None) -> None:
pass
class SimpleChangeTrackerPolicy(BaseChangeTrackerPolicy):
"""
Simple policy, applies every registered change
"""
def register(self, changes: List[Tuple[str, Any]]) -> None:
self.apply_changes(changes)
def register(self, op: str, model: str, id: str, fields: Optional[List] = None) -> None:
key = hash_int(id)
defer(CHANGE_HANDLER, key=key, changes=[(op, model, str(id), fields)])
class BulkChangeTrackerPolicy(BaseChangeTrackerPolicy):
def __init__(self):
super().__init__()
self.changes: Set[Tuple[str, Any]] = set()
def register(self, changes: List[Tuple[str, Any]]) -> None:
self.changes.update(changes)
def apply(self):
self.apply_changes(list(self.changes))
def do_changes(changes):
"""
Change calculation worker
:param changes: List of datastream name, object id
:return:
"""
# Compact and organize datastreams
datastreams = defaultdict(set)
for ds_name, object_id in changes:
datastreams[ds_name].add(object_id)
# Apply batches
for ds_name in datastreams:
ds = loader[ds_name]
if not ds:
continue
ds.bulk_update(sorted(datastreams[ds_name]))
self.changes: Dict[Tuple[str, str], Tuple[str, Optional[List]]] = {}
def register(self, op: str, model: str, id: str, fields: Optional[List] = None) -> None:
def merge_fields(f1: Optional[List[str]], f2: Optional[List[str]]) -> Optional[List[str]]:
f1 = f1 or []
f2 = f2 or []
return list(set(f1) | set(f2))
prev = self.changes.get((model, id))
if prev is None:
# First change
self.changes[model, id] = (op, fields)
return
# Series of change
if op == "delete":
# Delete overrides any operation
self.changes[model, id] = (op, None)
return
if op == "create":
raise RuntimeError("create must be first update")
# Update
prev_op = prev[0]
if prev_op == "create":
# Create + Update -> Create with merged fields
self.changes[model, id] = ("create", merge_fields(prev[1], fields))
elif prev_op == "update":
# Update + Update -> Update with merged fields
self.changes[model, id] = ("update", merge_fields(prev[1], fields))
elif prev_op == "delete":
raise RuntimeError("Cannot update after delete")
def commit(self) -> None:
# Split to buckets
changes = defaultdict(list)
for (model_id, item_id), (op, fields) in self.changes.items():
part = 0
changes[part].append((op, model_id, item_id, fields))
for part, items in changes.items():
defer(CHANGE_HANDLER, key=part, changes=items)
# ----------------------------------------------------------------------
# @datastream decorator
# ----------------------------------------------------------------------
# Copyright (C) 2007-2021 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Third-party modules
from django.db.models import signals as django_signals
from mongoengine import signals as mongo_signals
# NOC modules
from noc.core.model.decorator import is_document
from .change import change_tracker
def datastream(cls):
"""
Class decorator to track model changes to datastream
Usage
@datastream
class MyModel(Model):
...
def iter_changed_datastream(self, changed_fields=None):
yield <datastream name>, <object id>
...
yield <datastream name>, <object id>
"""
if hasattr(cls, "iter_changed_datastream"):
if is_document(cls):
mongo_signals.post_save.connect(_on_document_change, sender=cls)
mongo_signals.pre_delete.connect(_on_document_change, sender=cls)
else:
django_signals.post_save.connect(_on_model_change, sender=cls)
django_signals.pre_delete.connect(_on_model_change, sender=cls)
return cls
def _on_model_change(sender, instance, *args, **kwargs):
_on_change(
instance,
changed_fields=set(
f_name
for f_name in getattr(instance, "initial_data", [])
if instance.initial_data[f_name] != getattr(instance, f_name)
),
)
def _on_document_change(sender, document, *args, **kwargs):
_on_change(document, changed_fields=getattr(document, "_changed_fields", {}))
def _on_change(obj, changed_fields=None):
r = list(obj.iter_changed_datastream(changed_fields=changed_fields))
if r:
change_tracker.register(r)
......@@ -14,11 +14,11 @@ from noc.core.model.base import NOCModel
from noc.core.model.decorator import on_init
from noc.config import config
from noc.core.model.fields import INETField
from noc.core.datastream.decorator import datastream
from noc.core.change.decorator import change
@on_init
@datastream
@change
class DNSServer(NOCModel):
"""
DNS Server is an database object representing real DNS server.
......@@ -41,6 +41,13 @@ class DNSServer(NOCModel):
def __str__(self):
return self.name
@classmethod
def get_by_id(cls, id):
dnsserver = DNSServer.objects.filter(id=id)[:1]
if dnsserver:
return dnsserver[0]
return None
def iter_changed_datastream(self, changed_fields=None):
if not config.datastream.enable_dnszone:
return
......
......@@ -29,7 +29,7 @@ from noc.core.ip import IPv6
from noc.core.validators import is_ipv4, is_ipv6
from noc.core.rpsl import rpsl_format
from noc.core.gridvcs.manager import GridVCSField
from noc.core.datastream.decorator import datastream
from noc.core.change.decorator import change
from noc.core.model.decorator import on_delete_check
from noc.core.translation import ugettext as _
from .dnszoneprofile import DNSZoneProfile
......@@ -45,7 +45,7 @@ ZONE_REVERSE_IPV6 = "6"
@Label.model
@on_init
@datastream
@change
@on_delete_check(check=[("dns.DNSZoneRecord", "zone")])
class DNSZone(NOCModel):
"""
......
......@@ -18,7 +18,7 @@ from noc.config import config
from noc.core.model.base import NOCModel
from noc.core.model.decorator import on_init
from noc.main.models.notificationgroup import NotificationGroup
from noc.core.datastream.decorator import datastream
from noc.core.change.decorator import change
from noc.core.model.decorator import on_delete_check
from noc.core.translation import ugettext as _
from .dnsserver import DNSServer
......@@ -27,7 +27,7 @@ id_lock = Lock()
@on_init
@datastream
@change
@on_delete_check(check=[("dns.DNSZone", "profile")])
class DNSZoneProfile(NOCModel):
"""
......
......@@ -12,7 +12,7 @@ from django.contrib.postgres.fields import ArrayField
# NOC modules
from noc.core.model.base import NOCModel
from noc.core.model.decorator import on_init
from noc.core.datastream.decorator import datastream
from noc.core.change.decorator import change
from noc.core.translation import ugettext as _
from noc.main.models.label import Label
from .dnszone import DNSZone
......@@ -20,7 +20,7 @@ from .dnszone import DNSZone
@Label.model
@on_init
@datastream
@change
class DNSZoneRecord(NOCModel):
"""
Zone RRs
......@@ -49,6 +49,13 @@ class DNSZoneRecord(NOCModel):
" ".join([x for x in (self.name, self.type, self.content) if x]),
)