Commit 8ccb8360 authored by Dmitry Volodin's avatar Dmitry Volodin
Browse files

DataStream base class

parent e222c009
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------
# DataStream
# ----------------------------------------------------------------------
# Copyright (C) 2007-2018 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
import hashlib
# Third-party modules
import ujson
import bson
import bson.errors
import pymongo
# NOC modules
from noc.core.perf import metrics
from noc.lib.nosql import get_db
class DataStream(object):
"""
Datastream stored in collection named ds_<name>.
Fields:
_id: Object id
changeid: Change ID
data: stream data (serialized JSON)
hash: Object hash
"""
name = None
F_ID = "_id"
F_CHANGEID = "change_id"
F_HASH = "hash"
F_DATA = "data"
HASH_LEN = 16
DEFAULT_LIMIT = 1000
def __init__(self):
pass
@classmethod
def get_collection_name(cls):
return "ds_%s" % cls.name
@classmethod
def get_collection(cls):
"""
Get pymongo Collection object
:return:
"""
coll = getattr(cls, "_collection", None)
if not coll:
coll = get_db()[cls.get_collection_name()]
cls._collection = coll
return coll
@classmethod
def ensure_collection(cls):
"""
Ensure collection is created and properly indexed
:return:
"""
coll = cls.get_collection()
coll.create_index(cls.F_CHANGEID, unique=True)
@classmethod
def get_object(cls, id):
"""
Generate datastream object for given id.
Raise KeyError if object is not found
Must be overriden
:param id: Object id
:return: dict containing object data
"""
raise NotImplementedError()
@classmethod
def get_deleted_object(cls, id):
"""
Generate item for deleted object
:param id:
:return:
"""
return {
"id": id,
"$deleted": True
}
@staticmethod
def get_hash(data):
return hashlib.sha256(ujson.dumps(data)).hexdigest()[:DataStream.HASH_LEN]
@classmethod
def update_object(cls, id, delete=False):
"""
Generate and update object in stream
:param id: Object id
:param delete: Object must be marked as deleted
:return:
"""
metrics["ds_%s_updated" % cls.name] += 1
coll = cls.get_collection()
# Build object for stream
if delete:
data = cls.get_deleted_object(id)
else:
try:
data = cls.get_object(id)
except KeyError:
data = cls.get_deleted_object(id)
# Calculate hash
hash = cls.get_hash(data)
# Get existing object
doc = coll.find_one({cls.F_ID: id}, {cls.F_ID: 0, cls.F_HASH: 1})
if doc and doc.get(cls.F_HASH) == hash:
return # Not changed
metrics["ds_%s_changed" % cls.name] += 1
changeid = bson.ObjectId()
data["change_id"] = str(changeid)
coll.update_one({
cls.F_ID: id
}, {
"$set": {
cls.F_CHANGEID: changeid,
cls.F_HASH: hash,
cls.F_DATA: ujson.dumps(data)
}
}, upsert=True)
@classmethod
def delete_object(cls, id):
"""
Mark object as deleted
:param id:
:return:
"""
cls.update_object(id, delete=True)
@classmethod
def get_total(cls):
"""
Return total amount of items in datastream
:return:
"""
return cls.get_collection().count()
@classmethod
def iter_data(cls, change_id=None, limit=None, filter=None):
"""
Iterate over data items beginning from change id
:param change_id: Staring change id
:param limit: Records limit
:param filter: List of ids to filter
:return: (id, changeid, data)
"""
q = {}
if filter:
if not isinstance(filter, (list, tuple)):
raise ValueError("filter must be list or tuple")
if len(filter) == 1:
q[cls.F_ID] = filter[0]
else:
q[cls.F_ID] = {
"$in": filter
}
if change_id:
if not isinstance(change_id, bson.ObjectId):
try:
change_id = bson.ObjectId(change_id)
except (bson.errors.InvalidId, TypeError) as e:
raise ValueError(str(e))
q[cls.F_CHANGEID] = {
"$gt": change_id
}
coll = cls.get_collection()
for doc in coll.find(q, {
cls.F_ID: 1,
cls.F_CHANGEID: 1,
cls.F_DATA: 1
}).sort([
(cls.F_CHANGEID, pymongo.ASCENDING)
]).limit(
limit=limit or cls.DEFAULT_LIMIT
):
yield doc[cls.F_ID], doc[cls.F_CHANGEID], doc[cls.F_DATA]
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------
# noc.core.datastream test
# ----------------------------------------------------------------------
# Copyright (C) 2007-2018 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Third-party modules
import pytest
import ujson
# NOC modules
from noc.core.datastream.base import DataStream
from noc.core.perf import metrics
class ExampleDataStream(DataStream):
name = "example"
@classmethod
def get_object(cls, id):
return {
"id": id,
"name": "Item #%s" % id
}
class NoEvenDatastream(DataStream):
name = "noeven"
@classmethod
def get_object(cls, id):
if id % 2 == 0:
raise KeyError
return {
"id": id,
"name": "Item #%s" % id
}
def test_datastream_base():
with pytest.raises(NotImplementedError):
DataStream.get_object(1)
@pytest.mark.dependency(name="datastream_collection_name")
def test_datastream_collection_name():
assert ExampleDataStream.get_collection_name() == "ds_example"
@pytest.mark.dependency(
name="datastream_collection",
depends=["datastream_collection_name"]
)
def test_datastream_collection():
ExampleDataStream.ensure_collection()
# Test collection exists
coll = ExampleDataStream.get_collection()
assert "ds_example" in coll.database.collection_names()
# Test collection indexes
ii = coll.index_information()
assert "change_id_1" in ii
assert ii["change_id_1"].get("unique")
@pytest.mark.dependency(name="datastream_hash")
def test_datastream_hash():
data = {
"id": 1,
"name": "test"
}
assert ExampleDataStream.get_hash(data) == "5757d197ae2f024e"
@pytest.fixture(params=list(range(1, 11)))
def ds_index(request):
return request.param
@pytest.mark.dependency(
name="datastream_update",
depends=[
"datastream_hash",
"datastream_collection"
]
)
def test_datastream_update(ds_index):
coll = ExampleDataStream.get_collection()
# Generate update
m_u = metrics["ds_example_updated"].value
m_c = metrics["ds_example_changed"].value
ExampleDataStream.update_object(ds_index)
assert metrics["ds_example_updated"].value == m_u + 1
assert metrics["ds_example_changed"].value == m_c + 1
# Check document is exists in collection
doc = coll.find_one({"_id": ds_index})
assert doc is not None
assert "hash" in doc
assert "data" in doc
assert "change_id" in doc
data = ujson.loads(doc["data"])
assert "id" in data
assert "name" in data
assert data["name"] == "Item #%s" % ds_index
assert "change_id" in data
assert data["change_id"] == str(doc["change_id"])
change_id = doc["change_id"]
# Try to update once mode
m_u = metrics["ds_example_updated"].value
m_c = metrics["ds_example_changed"].value
ExampleDataStream.update_object(ds_index)
assert metrics["ds_example_updated"].value == m_u + 1
assert metrics["ds_example_changed"].value == m_c
# Check document is still exists
doc = coll.find_one({"_id": ds_index})
assert doc is not None
# Check document is not changed
assert doc["change_id"] == change_id
@pytest.mark.dependency(depends=["datastream_update"])
def test_datastream_total():
assert ExampleDataStream.get_total() == 10
@pytest.mark.dependency(depends=["datastream_update"])
def test_datastream_iter_data_limit():
seen = set()
for id, change_id, data in ExampleDataStream.iter_data(limit=3):
assert id not in seen
seen.add(id)
assert seen == {1, 2, 3}
@pytest.mark.dependency(depends=["datastream_update"])
def test_datastream_iter_data():
seen = set()
last_change = None
for id, change_id, data in ExampleDataStream.iter_data():
assert id not in seen
seen.add(id)
if last_change:
assert last_change < change_id
last_change = change_id
assert seen == {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
@pytest.mark.dependency(depends=["datastream_update"])
def test_datastream_iter_data_id1():
seen = set()
for id, change_id, data in ExampleDataStream.iter_data(filter=[3]):
assert id not in seen
seen.add(id)
assert seen == {3}
@pytest.mark.dependency(depends=["datastream_update"])
def test_datastream_iter_data_id2():
seen = set()
for id, change_id, data in ExampleDataStream.iter_data(filter=[3, 7]):
assert id not in seen
seen.add(id)
assert seen == {3, 7}
@pytest.mark.dependency(depends=["datastream_update"])
def test_datastream_iter_data_id_type_check():
with pytest.raises(ValueError):
next(ExampleDataStream.iter_data(filter=3))
with pytest.raises(ValueError):
next(ExampleDataStream.iter_data(change_id=3))
with pytest.raises(ValueError):
next(ExampleDataStream.iter_data(change_id="3"))
@pytest.mark.dependency(depends=["datastream_update"])
def test_datastream_iter_data_chunked():
seen = set()
last_change = None
# First 3
for id, change_id, data in ExampleDataStream.iter_data(limit=3):
last_change = change_id
assert id not in seen
seen.add(id)
assert seen == {1, 2, 3}
# Next 3
for id, change_id, data in ExampleDataStream.iter_data(change_id=last_change, limit=3):
last_change = change_id
assert id not in seen
seen.add(id)
assert seen == {1, 2, 3, 4, 5, 6}
# Next 3
for id, change_id, data in ExampleDataStream.iter_data(change_id=last_change, limit=3):
last_change = change_id
assert id not in seen
seen.add(id)
assert seen == {1, 2, 3, 4, 5, 6, 7, 8, 9}
# Last one
for id, change_id, data in ExampleDataStream.iter_data(change_id=last_change, limit=3):
last_change = change_id
assert id not in seen
seen.add(id)
assert seen == {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
# Nothing more
assert list(ExampleDataStream.iter_data(change_id=last_change, limit=3)) == []
@pytest.mark.dependency(depends=["datastream_update"])
def test_datastream_key_error(ds_index):
coll = NoEvenDatastream.get_collection()
NoEvenDatastream.update_object(ds_index)
doc = coll.find_one({"_id": ds_index})
assert doc
assert "data" in doc
if ds_index % 2 == 0:
assert "$deleted" in doc["data"]
else:
assert "$deleted" not in doc["data"]
def test_datastream_delete_object():
class DeleteDataStream(ExampleDataStream):
name = "delete"
coll = DeleteDataStream.get_collection()
DeleteDataStream.ensure_collection()
DeleteDataStream.update_object(1)
doc = coll.find_one({"_id": 1})
assert doc
assert "data" in doc
assert "$deleted" not in doc["data"]
# Delete
DeleteDataStream.delete_object(1)
doc = coll.find_one({"_id": 1})
assert doc
assert "data" in doc
assert "$deleted" in doc["data"]
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment