Commit a121cca9 authored by Dmitry Volodin's avatar Dmitry Volodin
Browse files

datastream metadata

parent 38fd88b9
......@@ -36,6 +36,7 @@ class DataStream(object):
F_CHANGEID = "change_id"
F_HASH = "hash"
F_DATA = "data"
F_META = "meta"
HASH_LEN = 16
DEFAULT_LIMIT = 1000
......@@ -77,6 +78,15 @@ class DataStream(object):
"""
raise NotImplementedError()
@classmethod
def get_meta(cls, data):
"""
Extract additional metadata from .get_object() result for additional indexing
:param data: .get_object() result
:return: dict or None
"""
return None
@classmethod
def get_deleted_object(cls, id):
"""
......@@ -104,11 +114,13 @@ class DataStream(object):
metrics["ds_%s_updated" % cls.name] += 1
coll = cls.get_collection()
# Build object for stream
meta = None
if delete:
data = cls.get_deleted_object(id)
else:
try:
data = cls.get_object(id)
meta = cls.get_meta(data)
except KeyError:
data = cls.get_deleted_object(id)
# Calculate hash
......@@ -120,15 +132,22 @@ class DataStream(object):
metrics["ds_%s_changed" % cls.name] += 1
changeid = bson.ObjectId()
data["change_id"] = str(changeid)
coll.update_one({
cls.F_ID: id
}, {
op = {
"$set": {
cls.F_CHANGEID: changeid,
cls.F_HASH: hash,
cls.F_DATA: ujson.dumps(data)
}
}, upsert=True)
}
if meta:
op["$set"][cls.F_META] = meta
elif not "$deleted" in data:
op["$unset"] = {
cls.F_META: ""
}
coll.update_one({
cls.F_ID: id
}, op, upsert=True)
return True
@classmethod
......@@ -292,3 +311,17 @@ class DataStream(object):
"$in": ids
}
}
@classmethod
def filter_shard(cls, instance, n_instances):
"""
Sharding by id
:param instance:
:param n_instances:
:return:
"""
return {
"_id": {
"$mod": [int(n_instances), int(instance)]
}
}
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------
# pingmap datastream
# ----------------------------------------------------------------------
# Copyright (C) 2007-2018 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# NOC modules
from noc.core.datastream.base import DataStream
from noc.sa.models.managedobject import ManagedObject
......@@ -43,6 +43,12 @@ class NoEvenDatastream(DataStream):
"name": "Item #%s" % id
}
@classmethod
def get_meta(cls, data):
return {
"n": data["id"] / 2
}
def test_datastream_base():
with pytest.raises(NotImplementedError):
......@@ -222,6 +228,8 @@ def test_datastream_key_error(ds_index):
assert "$deleted" in doc["data"]
else:
assert "$deleted" not in doc["data"]
assert "meta" in doc
assert doc["meta"]["n"] == ds_index / 2
def test_datastream_delete_object():
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment