Commit cba4e3c9 authored by Dmitry Volodin's avatar Dmitry Volodin
Browse files

DataStream accepts ISO 8601-formatted date as change_id

parent 5006e4bb
......@@ -13,6 +13,9 @@ import ujson
import bson
import bson.errors
import pymongo
import six
import dateutil.parser
import re
# NOC modules
from noc.core.perf import metrics
from noc.lib.nosql import get_db
......@@ -36,6 +39,7 @@ class DataStream(object):
HASH_LEN = 16
DEFAULT_LIMIT = 1000
rx_ts = re.compile(r"^\d{4}-\d{2}-\d{2}")
@classmethod
def get_collection_name(cls):
......@@ -144,6 +148,37 @@ class DataStream(object):
"""
return cls.get_collection().count()
@classmethod
def clean_change_id(cls, change_id):
"""
Convert change_id to ObjectId. Following formats are possible:
* ObjectId
* string containing ObjectId
* ISO 8601 timestamp either in form
* YYYY-DD-MM
* YYYY-DD-MMThh:mm:ss
:param change_id: Cleaned change_id
:return:
"""
# ObjectId
if isinstance(change_id, bson.ObjectId):
return change_id
# String with timestamp or ObjectId
if not isinstance(change_id, six.string_types):
raise ValueError("Invalid change_id")
if cls.rx_ts.search(change_id):
# Timestamp
try:
ts = dateutil.parser.parse(change_id)
return bson.ObjectId.from_datetime(ts)
except ValueError as e:
raise ValueError(str(e))
# ObjectId
try:
return bson.ObjectId(change_id)
except (bson.errors.InvalidId, TypeError) as e:
raise ValueError(str(e))
@classmethod
def iter_data(cls, change_id=None, limit=None, filter=None):
"""
......@@ -151,7 +186,7 @@ class DataStream(object):
:param change_id: Staring change id
:param limit: Records limit
:param filter: List of ids to filter
:return: (id, changeid, data)
:return: (id, change_id, data)
"""
q = {}
if filter:
......@@ -164,13 +199,8 @@ class DataStream(object):
"$in": filter
}
if change_id:
if not isinstance(change_id, bson.ObjectId):
try:
change_id = bson.ObjectId(change_id)
except (bson.errors.InvalidId, TypeError) as e:
raise ValueError(str(e))
q[cls.F_CHANGEID] = {
"$gt": change_id
"$gt": cls.clean_change_id(change_id)
}
coll = cls.get_collection()
for doc in coll.find(q, {
......
......@@ -8,10 +8,12 @@
# Python modules
import time
import datetime
import threading
# Third-party modules
import pytest
import ujson
import bson
# NOC modules
from noc.core.datastream.base import DataStream
from noc.core.perf import metrics
......@@ -318,3 +320,20 @@ def test_wait():
assert dt > TIMEOUT
# We must have second object
assert WaitDS.get_total() == 2
def test_clean_changeid():
oid = bson.ObjectId()
# ObjectId left unchanged
assert DataStream.clean_change_id(oid) == oid
# string converted to object id
assert DataStream.clean_change_id(str(oid)) == oid
# date
oid = DataStream.clean_change_id("2018-07-04")
assert oid.generation_time.strftime("%Y-%m-%d") == "2018-07-04"
# datetime
oid = DataStream.clean_change_id("2018-07-04T09:25:51")
assert oid.generation_time.strftime("%Y-%m-%dT%H:%M:%S") == "2018-07-04T09:25:51"
#
with pytest.raises(ValueError):
DataStream.clean_change_id("9999-99-99")
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment