Commit a238be71 authored by Dmitry Volodin's avatar Dmitry Volodin Committed by Andrey Vertiprahov
Browse files

#1287: Multi-Format DataStream

parent 316fce34
# ----------------------------------------------------------------------
# noc datastream command
# ----------------------------------------------------------------------
# Copyright (C) 2007-2019 The NOC Project
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
import argparse
import itertools
import logging
# Third-party modules
import ujson
......@@ -105,6 +107,14 @@ class Command(BaseCommand):
ds.update_object(obj_id)
return obj_id
def grouper(iterable, n):
args = [iter(iterable)] * n
return itertools.zip_longest(*args)
def do_update(bulk):
ds.bulk_update(bulk)
yield from range(len(bulk))
if not datastream:
self.die("--datastream is not set. Set one from list: %s" % ", ".join(self.MODELS))
model = self.get_model(datastream)
......@@ -114,6 +124,7 @@ class Command(BaseCommand):
self.die("Cannot initialize datastream")
total = self.get_total(model)
STEP = 100
BATCH = 100
n = 1
report_interval = max(total // STEP, 1)
next_report = report_interval
......@@ -121,11 +132,17 @@ class Command(BaseCommand):
from multiprocessing.pool import ThreadPool
pool = ThreadPool(jobs)
iter = pool.imap_unordered(update_object, self.iter_id(model))
iterable = pool.imap_unordered(update_object, self.iter_id(model))
else:
iter = (update_object(obj_id) for obj_id in self.iter_id(model))
iterable = (ds.bulk_update(bulk) for bulk in grouper(self.iter_id(model), BATCH))
if not self.no_progressbar:
# Disable logging
from noc.core.datastream.base import logger
logger.setLevel(logging.CRITICAL)
for _ in self.progress(iter, max_value=total):
for _ in self.progress(iterable, max_value=total / BATCH):
if self.no_progressbar and n == next_report:
self.print("[%02d%%]" % ((n * 100) // total))
next_report += report_interval
......
......@@ -8,6 +8,9 @@
# Python modules
import hashlib
import datetime
import re
import logging
from collections import defaultdict
# Third-party modules
import ujson
......@@ -15,12 +18,15 @@ import bson
import bson.errors
import pymongo
import dateutil.parser
import re
from typing import Optional, Dict, Any, List, Union, Iterable, Tuple, Callable
# NOC modules
from noc.core.perf import metrics
from noc.core.mongo.connection import get_db
from noc.core.comp import smart_bytes
from noc.models import get_model
logger = logging.getLogger(__name__)
class DataStream(object):
......@@ -45,20 +51,27 @@ class DataStream(object):
DEFAULT_LIMIT = 1000
rx_ts = re.compile(r"^\d{4}-\d{2}-\d{2}")
BULK_SIZE = 500
_collections: Dict[str, pymongo.collection.Collection] = {}
@classmethod
def get_collection_name(cls):
def get_collection_name(cls, format: Optional[str] = None) -> str:
if format:
return "ds_%s_%s" % (cls.name, format)
return "ds_%s" % cls.name
@classmethod
def get_collection(cls):
def get_collection(cls, fmt: Optional[str] = None) -> pymongo.collection.Collection:
"""
Get pymongo Collection object
:return:
"""
coll = getattr(cls, "_collection", None)
c_name = cls.get_collection_name(fmt)
coll = cls._collections.get(c_name)
if not coll:
coll = get_db()[cls.get_collection_name()]
cls._collection = coll
coll = get_db()[c_name]
cls._collections[c_name] = coll
return coll
@classmethod
......@@ -86,7 +99,7 @@ class DataStream(object):
raise NotImplementedError()
@classmethod
def get_meta(cls, data):
def get_meta(cls, data: Dict[str, Any]) -> Optional[Dict]:
"""
Extract additional metadata from .get_object() result for additional indexing
:param data: .get_object() result
......@@ -104,48 +117,126 @@ class DataStream(object):
return {"id": str(id), "$deleted": True}
@staticmethod
def get_hash(data):
def get_hash(data) -> str:
return hashlib.sha256(smart_bytes(ujson.dumps(data))).hexdigest()[: DataStream.HASH_LEN]
@classmethod
def update_object(cls, id, delete=False):
"""
Generate and update object in stream
:param id: Object id
:param delete: Object must be marked as deleted
:return: True if obbject has been updated
"""
metrics["ds_%s_updated" % cls.name] += 1
def bulk_update(cls, objects: List[Union[id, str, bson.ObjectId]]) -> None:
coll = cls.get_collection()
# Build object for stream
meta = None
if delete:
data = cls.get_deleted_object(id)
else:
try:
data = cls.get_object(id)
meta = cls.get_meta(data)
except KeyError:
data = cls.get_deleted_object(id)
# Get possible formats
fmt_coll: Dict[str, pymongo.collection.Collection] = {}
fmt_handler: Dict[str, Callable] = {}
for fmt, handler in cls.iter_formats():
fmt_coll[fmt] = cls.get_collection(fmt)
fmt_handler[fmt] = handler
# Process objects
while objects:
chunk, objects = objects[: cls.BULK_SIZE], objects[cls.BULK_SIZE :]
current_state = {
doc[cls.F_ID]: doc
for doc in coll.find({cls.F_ID: {"$in": chunk}}, {cls.F_ID: 1, cls.F_HASH: 1})
}
bulk = []
fmt_data = defaultdict(list)
fmt_bulk = {}
# Apply default format
for obj_id in chunk:
data, meta = cls._get_current_data(obj_id)
cls._update_object(data=data, meta=meta, state=current_state, bulk=bulk)
# Process formats
for fmt in fmt_handler:
fmt_data[fmt] += list(fmt_handler[fmt](data))
# Apply formats
for fmt in fmt_data:
fmt_ids = [data["id"] for data in fmt_data[fmt]]
current_state = {
doc[cls.F_ID]: doc
for doc in fmt_coll[fmt].find(
{cls.F_ID: {"$in": fmt_ids}}, {cls.F_ID: 1, cls.F_HASH: 1}
)
}
fmt_bulk[fmt] = []
for data in fmt_data[fmt]:
cls._update_object(data=data, fmt=fmt, state=current_state, bulk=fmt_bulk[fmt])
# Save pending operations
if bulk:
logger.info("[%s] Sending %d bulk operations", cls.name, len(bulk))
coll.bulk_write(bulk, ordered=True)
for fmt in fmt_bulk:
bulk = fmt_bulk[fmt]
if bulk:
logger.info("[%s|%s] Sending %d bulk operations", cls.name, fmt, len(bulk))
fmt_coll[fmt].bulk_write(bulk, ordered=True)
@classmethod
def _update_object(cls, data, meta=None, fmt=None, state=None, bulk=None) -> bool:
def is_changed(d, h):
return not d or d.get(cls.F_HASH) != h
obj_id = cls.clean_id(data["id"])
if meta is None and "$meta" in data:
meta = data.pop("$meta")
m_name = "%s_%s" % (cls.name, fmt) if fmt else cls.name
l_name = "%s|%s|%s" % (cls.name, obj_id, fmt) if fmt else "%s|%s" % (cls.name, obj_id)
metrics["ds_%s_updated" % m_name] += 1
# Calculate hash
hash = cls.get_hash(data)
# Get existing object
doc = coll.find_one({cls.F_ID: id}, {cls.F_ID: 0, cls.F_HASH: 1})
if doc and doc.get(cls.F_HASH) == hash:
# Get existing object state
if state:
doc = state.get(obj_id)
else:
doc = cls.get_collection(fmt).find_one({cls.F_ID: obj_id}, {cls.F_ID: 0, cls.F_HASH: 1})
if not is_changed(doc, hash):
logger.info("[%s] Object hasn't been changed", l_name)
return False # Not changed
if cls.on_change(data):
if not format and cls.on_change(data):
hash = cls.get_hash(data)
metrics["ds_%s_changed" % cls.name] += 1
changeid = bson.ObjectId()
data["change_id"] = str(changeid)
op = {"$set": {cls.F_CHANGEID: changeid, cls.F_HASH: hash, cls.F_DATA: ujson.dumps(data)}}
if not is_changed(doc, hash):
logger.info("[%s] Object hasn't been changed", l_name)
return False # Not changed after altering
metrics["ds_%s_changed" % m_name] += 1
change_id = bson.ObjectId()
data["change_id"] = str(change_id)
op = {"$set": {cls.F_CHANGEID: change_id, cls.F_HASH: hash, cls.F_DATA: ujson.dumps(data)}}
if meta:
op["$set"][cls.F_META] = meta
elif "$deleted" not in data:
op["$unset"] = {cls.F_META: ""}
coll.update_one({cls.F_ID: id}, op, upsert=True)
if bulk is None:
cls.get_collection(fmt).update_one({cls.F_ID: obj_id}, op, upsert=True)
else:
bulk += [pymongo.UpdateOne({cls.F_ID: obj_id}, op, upsert=True)]
logger.info("[%s] Object has been changed", l_name)
return True
@classmethod
def _get_current_data(
cls, obj_id, delete=False
) -> Tuple[Dict[str, Any], Optional[Dict[str, Any]]]:
if delete:
return cls.get_deleted_object(obj_id), None
try:
data = cls.get_object(obj_id)
meta = cls.get_meta(data)
return data, meta
except KeyError:
return cls.get_deleted_object(obj_id), None
@classmethod
def update_object(cls, id, delete=False) -> bool:
"""
Generate and update object in stream
:param id: Object id
:param delete: Object must be marked as deleted
:return: True if object has been updated
"""
data, meta = cls._get_current_data(id, delete=delete)
r = cls._update_object(data=data, meta=meta)
for fmt, handler in cls.iter_formats():
for f_data in handler(data):
r |= cls._update_object(data=f_data, fmt=fmt)
return r
@classmethod
def delete_object(cls, id):
"""
......@@ -156,12 +247,26 @@ class DataStream(object):
cls.update_object(id, delete=True)
@classmethod
def get_total(cls):
def iter_formats(
cls,
) -> Iterable[Tuple[str, Callable[[Dict[str, Any]], Iterable[Dict[str, Any]]]]]:
# Do not load in datastream service
DataStreamConfig = getattr(cls, "_DataStreamConfig", None)
if not DataStreamConfig:
cls._DataStreamConfig = get_model("main.DataStreamConfig")
DataStreamConfig = cls._DataStreamConfig
cfg = DataStreamConfig.get_by_name(cls.name)
if cfg:
yield from cfg.iter_formats()
@classmethod
def get_total(cls, fmt=None):
"""
Return total amount of items in datastream
:return:
"""
return cls.get_collection().estimated_document_count()
return cls.get_collection(fmt).estimated_document_count()
@classmethod
def clean_change_id(cls, change_id):
......@@ -195,7 +300,7 @@ class DataStream(object):
raise ValueError(str(e))
@classmethod
def iter_data(cls, change_id=None, limit=None, filters=None):
def iter_data(cls, change_id=None, limit=None, filters=None, fmt=None):
"""
Iterate over data items beginning from change id
......@@ -203,6 +308,7 @@ class DataStream(object):
:param change_id: Staring change id
:param limit: Records limit
:param filters: List of strings with filter expression
:param fmt: Format
:return: (id, change_id, data)
"""
q = {}
......@@ -210,7 +316,7 @@ class DataStream(object):
q.update(cls.compile_filters(filters))
if change_id:
q[cls.F_CHANGEID] = {"$gt": cls.clean_change_id(change_id)}
coll = cls.get_collection()
coll = cls.get_collection(fmt)
for doc in (
coll.find(q, {cls.F_ID: 1, cls.F_CHANGEID: 1, cls.F_DATA: 1})
.sort([(cls.F_CHANGEID, pymongo.ASCENDING)])
......@@ -348,3 +454,17 @@ class DataStream(object):
if instance >= n_instances:
raise ValueError("Invalid instance")
return {"_id": {"$mod": [n_instances, instance]}}
def get_format_role(cls, fmt: str) -> Optional[str]:
"""
Returns format role, if any
:param fmt:
:return:
"""
doc = get_db()["datastreamconfigs"].find_one({"name": cls.name})
if not doc:
return None
for f in doc.get("formats", []):
if f.get("name") == fmt:
return f.get("role") or None
return None
# ----------------------------------------------------------------------
# DataStream change notification
# ----------------------------------------------------------------------
# Copyright (C) 2007-2018 The NOC Project
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
import logging
import threading
import contextlib
from collections import defaultdict
# NOC modules
from noc.core.defer import call_later
from noc.core.datastream.loader import loader
tls = threading.local()
logger = logging.getLogger(__name__)
def register_changes(data):
......@@ -67,28 +66,19 @@ def apply_changes(changes):
call_later("noc.core.datastream.change.do_changes", changes=changes)
def update_object(ds_name, object_id):
"""
Really apply DataStream updates
:param ds_name:
:param object_id:
:return:
"""
ds = loader[ds_name]
if not ds:
return
r = ds.update_object(object_id)
if r:
logger.info("[%s|%s] Object has been changed", ds_name, object_id)
else:
logger.info("[%s|%s] Object hasn't been changed", ds_name, object_id)
def do_changes(changes):
"""
Change calculation worker
:param changes: List of datastream name, object id
:return:
"""
for ds_name, object_id in sorted(set(tuple(x) for x in changes)):
update_object(ds_name, object_id)
# Compact and organize datastreams
datastreams = defaultdict(set)
for ds_name, object_id in changes:
datastreams[ds_name].add(object_id)
# Apply batches
for ds_name in datastreams:
ds = loader[ds_name]
if not ds:
continue
ds.bulk_update(sorted(datastreams[ds_name]))
# ----------------------------------------------------------------------
# DataStreamConfig Model
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from threading import Lock
import operator
# Third-party modules
from mongoengine.document import Document, EmbeddedDocument
from mongoengine.fields import StringField, BooleanField, ListField, EmbeddedDocumentField
from typing import Union, Optional, Iterable, Dict, Any, Callable, Tuple
from bson import ObjectId
import cachetools
# NOC modules
from noc.main.models.handler import Handler
from noc.core.mongo.fields import PlainReferenceField
id_lock = Lock()
class DSFormat(EmbeddedDocument):
name = StringField()
is_active = BooleanField()
handler = PlainReferenceField(Handler)
role = StringField()
def __str__(self):
return self.name
class DataStreamConfig(Document):
meta = {"collection": "datastreamconfigs", "strict": False, "auto_create_index": False}
name = StringField(unique=True)
formats = ListField(EmbeddedDocumentField(DSFormat))
_id_cache = cachetools.TTLCache(maxsize=100, ttl=60)
_name_cache = cachetools.TTLCache(maxsize=100, ttl=60)
def __str__(self) -> str:
return self.name
@classmethod
@cachetools.cachedmethod(operator.attrgetter("_id_cache"), lock=lambda _: id_lock)
def get_by_id(cls, id: Union[str, ObjectId]) -> Optional["DataStreamConfig"]:
return DataStreamConfig.objects.filter(id=id).first()
@classmethod
@cachetools.cachedmethod(operator.attrgetter("_name_cache"), lock=lambda _: id_lock)
def get_by_name(cls, name: str) -> Optional["DataStreamConfig"]:
return DataStreamConfig.objects.filter(name=name).first()
def iter_formats(
self,
) -> Iterable[Tuple[str, Callable[[Dict[str, Any]], Iterable[Dict[str, Any]]]]]:
for fmt in self.formats:
if fmt.is_active:
handler = fmt.handler.get_handler()
if handler:
yield fmt.name, handler
......@@ -43,6 +43,7 @@ class Handler(Document):
allow_housekeeping = BooleanField()
allow_resolver = BooleanField()
allow_threshold = BooleanField()
allow_ds_filter = BooleanField()
_id_cache = cachetools.TTLCache(maxsize=1000, ttl=60)
......
......@@ -95,6 +95,7 @@ _MODELS = {
"main.CustomFieldEnumGroup": "noc.main.models.customfieldenumgroup.CustomFieldEnumGroup",
"main.CustomFieldEnumValue": "noc.main.models.customfieldenumvalue.CustomFieldEnumValue",
"main.DatabaseStorage": "noc.main.models.databasestorage.DatabaseStorage",
"main.DataStreamConfig": "noc.main.models.datastreamconfig.DataStreamConfig",
"main.DocCategory": "noc.main.models.doccategory.DocCategory",
"main.ExtStorage": "noc.main.models.extstorage.ExtStorage",
"main.Favorites": "noc.main.models.favorites.Favorites",
......
......@@ -7,18 +7,30 @@
# Third-party modules
import tornado.web
import cachetools
# NOC modules
from noc.core.service.apiaccess import APIAccessRequestHandler, authenticated
@cachetools.cached
def get_format_role(ds, fmt):
return ds.get_format_role(fmt)
class DataStreamRequestHandler(APIAccessRequestHandler):
def initialize(self, service, datastream):
self.service = service
self.datastream = datastream
def get_access_tokens_set(self):
return {"datastream:*", "datastream:%s" % self.datastream.name}
tokens = {"datastream:*", "datastream:%s" % self.datastream.name}
fmt = self.get_arguments("format")
if fmt:
role = get_format_role(self.datastream, fmt[0])
if role:
tokens.add("datastream:%s" % role)
return tokens
@authenticated
async def get(self, *args, **kwargs):
......@@ -42,6 +54,12 @@ class DataStreamRequestHandler(APIAccessRequestHandler):
change_id = change_id[0]
else:
change_id = None
# Format
fmt = self.get_arguments("format")
if fmt:
fmt = fmt[0]
else:
fmt = None
# block argument
p_block = self.get_arguments("block")
to_block = bool(p_block) and bool(int(p_block[0]))
......@@ -51,7 +69,7 @@ class DataStreamRequestHandler(APIAccessRequestHandler):
r = []
try:
for item_id, change_id, data in self.datastream.iter_data(
limit=limit, filters=filters, change_id=change_id
limit=limit, filters=filters, change_id=change_id, fmt=fmt
):
if not first_change:
first_change = change_id
......
# ----------------------------------------------------------------------
# main.datastreamconfig application
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# NOC modules
from noc.lib.app.extdocapplication import ExtDocApplication
from noc.main.models.datastreamconfig import DataStreamConfig
from noc.core.translation import ugettext as _
class DataStreamConfigApplication(ExtDocApplication):
"""
DataStreamConfig application
"""
title = "DataStream Config"
menu = [_("Setup"), _("DataStream Config")]
model = DataStreamConfig
......@@ -31,6 +31,7 @@ class ExampleDataStream(DataStream):
class NoEvenDatastream(DataStream):
name = "noeven"
clean_id = DataStream.clean_id_int
@classmethod
def get_object(cls, id):
......
//---------------------------------------------------------------------
// main.datastreamconfig application
//---------------------------------------------------------------------
// Copyright (C) 2007-2020 The NOC Project
// See LICENSE for details
//---------------------------------------------------------------------
console.debug("Defining NOC.main.datastreamconfig.Application");
Ext.define("NOC.main.datastreamconfig.Application", {
extend: "NOC.core.ModelApplication",
requires: [
"NOC.main.datastreamconfig.Model",
"NOC.main.handler.LookupField"
],
model: "NOC.main.datastreamconfig.Model",
search: true,