Commit f79d8a0b authored by Dmitry Volodin's avatar Dmitry Volodin
Browse files

DataStream: Reduce amount of spawned jobs

parent 1d38ad91
......@@ -18,18 +18,17 @@ tls = threading.local()
logger = logging.getLogger(__name__)
def register_change(ds_name, object_id):
def register_changes(data):
"""
Register single change
:param ds_name: DataSource name
:param object_id: Object id
:param data: List of (datasource name, object id)
:return:
"""
if hasattr(tls, "_datastream_changes"):
# Within bulk_datastream_changes context
tls._datastream_changes.add((ds_name, object_id))
tls._datastream_changes.update(data)
else:
apply_change(ds_name, object_id)
apply_changes(data)
@contextlib.contextmanager
......@@ -51,8 +50,7 @@ def bulk_datastream_changes():
# Perform decorated computations
yield
# Apply changes
for ds_name, object_id in tls._datastream_changes:
apply_change(ds_name, object_id)
apply_changes(list(tls._datastream_changes))
# Restore previous context
if last_changes is not None:
tls._datastream_changes = last_changes
......@@ -60,14 +58,12 @@ def bulk_datastream_changes():
del tls._datastream_changes
def apply_change(ds_name, object_id):
def apply_changes(changes):
"""
:param ds_name:
:param object_id:
:param changes: List of (datastream name, object id)
:return:
"""
call_later("noc.core.datastream.change.update_object", ds_name=ds_name, object_id=object_id)
call_later("noc.core.datastream.change.do_changes", changes=changes)
def update_object(ds_name, object_id):
......@@ -85,3 +81,13 @@ def update_object(ds_name, object_id):
logger.info("[%s|%s] Object has been changed", ds_name, object_id)
else:
logger.info("[%s|%s] Object hasn't been changed", ds_name, object_id)
def do_changes(changes):
"""
Change calculation worker
:param changes: List of datastream name, object id
:return:
"""
for ds_name, object_id in changes:
update_object(ds_name, object_id)
......@@ -13,7 +13,7 @@ from django.db.models import signals as django_signals
from mongoengine import signals as mongo_signals
# NOC modules
from noc.core.model.decorator import is_document
from .change import register_change
from .change import register_changes
def datastream(cls):
......@@ -41,10 +41,14 @@ def datastream(cls):
def _on_model_change(sender, instance, *args, **kwargs):
for ds_name, object_id in instance.iter_changed_datastream():
register_change(ds_name, object_id)
_on_change(instance)
def _on_document_change(sender, document, *args, **kwargs):
for ds_name, object_id in document.iter_changed_datastream():
register_change(ds_name, object_id)
_on_change(document)
def _on_change(obj):
r = list(obj.iter_changed_datastream())
if r:
register_changes(r)
......@@ -368,6 +368,10 @@ def test_filter_id():
assert DataStream.filter_id(10) == {"_id": 10}
assert DataStream.filter_id(10, 11) == {"_id": {"$in": [10, 11]}}
def test_filter_shard():
assert DataStream.filter_shard(1, 3) == {"_id": {"$mod": [3, 1]}}
assert DataStream.filter_shard("1", "3") == {"_id": {"$mod": [3, 1]}}
def test_compile_filters():
assert DataStream.compile_filters(["id(1)"]) == {"_id": "1"}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment