Commit 5f650b8a authored by Dmitry Volodin's avatar Dmitry Volodin
Browse files

DataStream: Track managed object's changes

parent 8ab0b1d2
......@@ -95,7 +95,7 @@ class DataStream(object):
Generate and update object in stream
:param id: Object id
:param delete: Object must be marked as deleted
:return:
:return: True if obbject has been updated
"""
metrics["ds_%s_updated" % cls.name] += 1
coll = cls.get_collection()
......@@ -112,7 +112,7 @@ class DataStream(object):
# Get existing object
doc = coll.find_one({cls.F_ID: id}, {cls.F_ID: 0, cls.F_HASH: 1})
if doc and doc.get(cls.F_HASH) == hash:
return # Not changed
return False # Not changed
metrics["ds_%s_changed" % cls.name] += 1
changeid = bson.ObjectId()
data["change_id"] = str(changeid)
......@@ -125,6 +125,7 @@ class DataStream(object):
cls.F_DATA: ujson.dumps(data)
}
}, upsert=True)
return True
@classmethod
def delete_object(cls, id):
......
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------
# DataStream change notification
# ----------------------------------------------------------------------
# Copyright (C) 2007-2018 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
import logging
import threading
import contextlib
# NOC modules
from noc.core.defer import call_later
from noc.core.datastream.loader import loader
tls = threading.local()
logger = logging.getLogger(__name__)
def register_change(ds_name, object_id):
"""
Register single change
:param ds_name: DataSource name
:param object_id: Object id
:return:
"""
if hasattr(tls, "_datastream_changes"):
# Within bulk_datastream_changes context
tls._datastream_changes.add((ds_name, object_id))
else:
apply_change(ds_name, object_id)
@contextlib.contextmanager
def bulk_datastream_changes():
"""
Buffer and deduplicate pending datastream changes
Usage:
with bulk_datastream_changes:
....
:return:
"""
# Save previous state
last_changes = getattr(tls, "_datastream_changes", None)
# Create new context
tls._datastream_changes = set()
# Perform decorated computations
yield
# Apply changes
for ds_name, object_id in tls._datastream_changes:
apply_change(ds_name, object_id)
# Restore previous context
if last_changes is not None:
tls._datastream_changes = last_changes
else:
del tls._datastream_changes
def apply_change(ds_name, object_id):
"""
:param ds_name:
:param object_id:
:return:
"""
call_later("noc.core.datastream.change.update_object", ds_name=ds_name, object_id=object_id)
def update_object(ds_name, object_id):
"""
Really apply DataStream updates
:param ds_name:
:param object_id:
:return:
"""
ds = loader.get_datastream(ds_name)
if not ds:
return
r = ds.update_object(object_id)
if r:
logger.info("[%s|%s] Object has been changed", ds_name, object_id)
else:
logger.info("[%s|%s] Object hasn't been changed", ds_name, object_id)
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------
# @datastream decorator
# ----------------------------------------------------------------------
# Copyright (C) 2007-2018 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from __future__ import absolute_import
# Third-party modules
from django.db.models import signals as django_signals
from mongoengine import signals as mongo_signals
# NOC modules
from noc.core.model.decorator import is_document
from .change import register_change
def datastream(cls):
"""
Class decorator to track model changes to datastream
Usage
@datastream
class MyModel(Model):
...
def iter_changed_datastream(self):
yield <datastream name>, <object id>
...
yield <datastream name>, <object id>
"""
if hasattr(cls, "iter_changed_datastream"):
if is_document(cls):
mongo_signals.post_save.connect(_on_document_change, sender=cls)
mongo_signals.pre_delete.connect(_on_document_change, sender=cls)
else:
django_signals.post_save.connect(_on_model_change, sender=cls)
django_signals.pre_delete.connect(_on_model_change, sender=cls)
return cls
def _on_model_change(sender, instance, *args, **kwargs):
for ds_name, object_id in instance.iter_changed_datastream():
register_change(ds_name, object_id)
def _on_document_change(sender, document, *args, **kwargs):
for ds_name, object_id in document.iter_changed_datastream():
register_change(ds_name, object_id)
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------
# DataStream loader
# ----------------------------------------------------------------------
# Copyright (C) 2007-2018 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from __future__ import absolute_import
import logging
import inspect
import threading
# NOC modules
from .base import DataStream
logger = logging.getLogger(__name__)
class DataStreamLoader(object):
def __init__(self):
self.datastreams = {} # Load datastreams
self.lock = threading.Lock()
def get_datastream(self, name):
"""
Load datastream and return DataStream instance.
Returns None when no datastream found or loading error occured
"""
with self.lock:
datastream = self.datastreams.get(name)
if not datastream:
logger.info("Loading datastream %s", name)
if not self.is_valid_name(name):
logger.error("Invalid datastream name")
return None
module_name = "noc.services.datastream.streams.%s" % name
try:
sm = __import__(module_name, {}, {}, "*")
for n in dir(sm):
o = getattr(sm, n)
if (
inspect.isclass(o) and
issubclass(o, DataStream) and
o.__module__ == sm.__name__
):
datastream = o
break
if not datastream:
logger.error("DataStream not found: %s", name)
except Exception as e:
logger.error("Failed to load DataStream %s: %s", name, e)
datastream = None
self.datastreams[name] = datastream
return datastream
def is_valid_name(self, name):
return ".." not in name
# Create singleton object
loader = DataStreamLoader()
......@@ -26,6 +26,7 @@ from noc.project.models.project import Project
from noc.vc.models.vcdomain import VCDomain
from noc.sa.models.service import Service
from noc.core.model.decorator import on_delete
from noc.core.datastream.decorator import datastream
from .interfaceprofile import InterfaceProfile
from .coverage import Coverage
......@@ -43,6 +44,7 @@ logger = logging.getLogger(__name__)
@on_delete
@datastream
class Interface(Document):
"""
Interfaces
......@@ -99,6 +101,9 @@ class Interface(Document):
def __unicode__(self):
return u"%s: %s" % (self.managed_object.name, self.name)
def iter_changed_datastream(self):
yield "managedobject", self.managed_object.id
def save(self, *args, **kwargs):
if not hasattr(self, "_changed_fields") or "name" in self._changed_fields:
self.name = self.managed_object.get_profile().convert_interface_name(self.name)
......
......@@ -14,10 +14,12 @@ from noc.lib.nosql import (Document, PlainReferenceListField,
StringField, DateTimeField, ListField,
IntField, ObjectIdField)
from noc.core.model.decorator import on_delete, on_save
from noc.core.datastream.decorator import datastream
@on_delete
@on_save
@datastream
class Link(Document):
"""
Network links.
......@@ -79,6 +81,10 @@ class Link(Document):
else:
return u"Stale link (%s)" % self.id
def iter_changed_datastream(self):
for mo_id in self.linked_objects:
yield "managedobject", mo_id
def clean(self):
self.linked_objects = sorted(set(i.managed_object.id for i in self.interfaces))
self.linked_segments = sorted(set(i.managed_object.segment.id for i in self.interfaces))
......
......@@ -16,6 +16,7 @@ from interfaceprofile import InterfaceProfile
from noc.sa.models.managedobject import ManagedObject
from noc.sa.interfaces.igetinterfaces import IGetInterfaces
from noc.project.models.project import Project
from noc.core.datastream.decorator import datastream
SUBINTERFACE_AFI = (
......@@ -39,6 +40,7 @@ TUNNEL_TYPES = (
)
@datastream
class SubInterface(Document):
meta = {
"collection": "noc.subinterfaces",
......@@ -92,6 +94,9 @@ class SubInterface(Document):
def __unicode__(self):
return "%s %s" % (self.interface.managed_object.name, self.name)
def iter_changed_datastream(self):
yield "managedobject", self.managed_object.id
@property
def effective_vc_domain(self):
return self.interface.effective_vc_domain
......
......@@ -64,6 +64,7 @@ from noc.core.cache.base import cache
from noc.core.script.caller import SessionContext
from noc.core.bi.decorator import bi_sync
from noc.core.script.scheme import SCHEME_CHOICES
from noc.core.datastream.decorator import datastream
# Increase whenever new field added
MANAGEDOBJECT_CACHE_VERSION = 8
......@@ -81,6 +82,7 @@ logger = logging.getLogger(__name__)
@on_init
@on_save
@on_delete
@datastream
@on_delete_check(check=[
# ("cm.ValidationRule.ObjectItem", ""),
("fm.ActiveAlarm", "managed_object"),
......@@ -487,6 +489,9 @@ class ManagedObject(Model):
else:
return None
def iter_changed_datastream(self):
yield "managedobject", self.id
@property
def data(self):
try:
......@@ -1557,6 +1562,7 @@ class ActionsProxy(object):
self._cache = {}
def __getattr__(self, name):
from .action import Action
if name in self._cache:
return self._cache[name]
a = Action.objects.filter(name=name).first()
......@@ -1571,7 +1577,6 @@ class ActionsProxy(object):
from .useraccess import UserAccess
from .groupaccess import GroupAccess
from .objectnotification import ObjectNotification
from .action import Action
from .selectorcache import SelectorCache
from .objectcapabilities import ObjectCapabilities
from noc.core.pm.utils import get_objects_metrics
......@@ -19,6 +19,7 @@ from .managedobject import ManagedObject
from noc.lib.nosql import ForeignKeyField
from noc.core.model.decorator import on_save
from noc.core.cache.base import cache
from noc.core.datastream.decorator import datastream
logger = logging.getLogger(__name__)
......@@ -34,6 +35,7 @@ class CapsItem(EmbeddedDocument):
@on_save
@datastream
class ObjectCapabilities(Document):
meta = {
"collection": "noc.sa.objectcapabilities"
......@@ -47,6 +49,9 @@ class ObjectCapabilities(Document):
def on_save(self):
cache.delete("cred-%s" % self.object.id)
def iter_changed_datastream(self):
yield "managedobject", self.object.id
@classmethod
def get_capabilities(cls, object):
"""
......
......@@ -30,9 +30,10 @@ class ManagedObjectDataStream(DataStream):
@classmethod
def get_object(cls, id):
mo = ManagedObject.get_by_id(id)
mo = ManagedObject.objects.filter(id=id)[:1]
if not mo:
raise KeyError()
mo = mo[0]
r = {
"id": str(id),
"$version": 1,
......@@ -43,6 +44,8 @@ class ManagedObjectDataStream(DataStream):
}
if mo.address:
r["address"] = mo.address
if mo.description:
r["description"] = mo.description
cls._apply_remote_system(mo, r)
cls._apply_pool(mo, r)
cls._apply_version(mo, r)
......
......@@ -12,6 +12,7 @@ import ujson
# NOC modules
from noc.core.datastream.base import DataStream
from noc.core.perf import metrics
from noc.core.datastream.loader import loader
class ExampleDataStream(DataStream):
......@@ -250,3 +251,25 @@ def test_datastream_clean_id_int():
assert DS.clean_id("1") == 1
with pytest.raises(ValueError):
DS.clean_id("z")
@pytest.fixture(params=["managedobject"])
def datastream_name(request):
return request.param
def test_loader(datastream_name):
ds = loader.get_datastream(datastream_name)
assert ds is not None
assert issubclass(ds, DataStream)
assert ds.name == datastream_name
def test_loader_invalid_name():
ds = loader.get_datastream("aaa..bbbb")
assert ds is None
def test_loader_error():
ds = loader.get_datastream("invalid")
assert ds is None
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment