Commit d76aab3a authored by Dmitry Volodin's avatar Dmitry Volodin
Browse files

datastream: filter expressions

parent 586b59c7
......@@ -180,24 +180,17 @@ class DataStream(object):
raise ValueError(str(e))
@classmethod
def iter_data(cls, change_id=None, limit=None, filter=None):
def iter_data(cls, change_id=None, limit=None, filters=None):
"""
Iterate over data items beginning from change id
:param change_id: Staring change id
:param limit: Records limit
:param filter: List of ids to filter
:param filters: List of strings with filter expression
:return: (id, change_id, data)
"""
q = {}
if filter:
if not isinstance(filter, (list, tuple)):
raise ValueError("filter must be list or tuple")
if len(filter) == 1:
q[cls.F_ID] = filter[0]
else:
q[cls.F_ID] = {
"$in": filter
}
if filters:
q.update(cls.compile_filters(filters))
if change_id:
q[cls.F_CHANGEID] = {
"$gt": cls.clean_change_id(change_id)
......@@ -243,3 +236,59 @@ class DataStream(object):
with coll.watch() as stream:
next(stream)
return
@classmethod
def _parse_filter(cls, expr):
"""
Convert single filter expression to a S-expression
:param expr: filter expression in form name(arg1, .., argN)
:return: (name, arg1, argN)
"""
if not isinstance(expr, six.string_types):
raise ValueError("Expression must be string")
i1 = expr.find("(")
if i1 < 0:
raise ValueError("Missed opening bracket")
if not expr.endswith(")"):
raise ValueError("Missed closing bracket")
return [expr[:i1]] + [x.strip() for x in expr[i1 + 1:-1].split(",")]
@classmethod
def compile_filters(cls, exprs):
"""
Compile list of filter expressions to MongoDB query
:param exprs: List of strings with expressions
:return: dict with query
"""
if not isinstance(exprs, list):
raise ValueError("expressions must be list of string")
q = {}
for fx in exprs:
pv = cls._parse_filter(fx)
h = getattr(cls, "filter_%s" % pv[0], None)
if not h:
raise ValueError("Invalid filter %s" % pv[0])
q.update(h(*pv[1:]))
return q
@classmethod
def filter_id(cls, id1, *args):
"""
Filter by id. Usage:
id(id1, .., idN)
:param id1:
:param args:
:return:
"""
ids = [cls.clean_id(id1)] + [cls.clean_id(x) for x in args]
if len(ids) == 1:
return {
cls.F_ID: ids[0]
}
else:
return {
cls.F_ID: {
"$in": ids
}
}
......@@ -34,13 +34,11 @@ class DataStreamRequestHandler(APIAccessRequestHandler):
if not limit:
limit = self.datastream.DEFAULT_LIMIT
limit = min(limit, self.datastream.DEFAULT_LIMIT)
# Restrict to id
filter = self.get_arguments("id") or None
if filter:
# Convert id to proper type
filter = [self.datastream.clean_id(x) for x in filter]
else:
filter = None
# Collect filters
filters = self.get_arguments("filter") or []
ids = self.get_arguments("id") or None
if ids:
filters += ["id(%s)" % ",".join(ids)]
# Start from change
change_id = self.get_arguments("from")
if change_id:
......@@ -54,7 +52,7 @@ class DataStreamRequestHandler(APIAccessRequestHandler):
last_change = None
while True:
r = ["["]
for item_id, change_id, data in self.datastream.iter_data(limit=limit, filter=filter,
for item_id, change_id, data in self.datastream.iter_data(limit=limit, filters=filters,
change_id=change_id):
if not first_change:
first_change = change_id
......
......@@ -21,6 +21,7 @@ from noc.core.datastream.loader import loader
class ExampleDataStream(DataStream):
name = "example"
clean_id = DataStream.clean_id_int
@classmethod
def get_object(cls, id):
......@@ -153,7 +154,7 @@ def test_datastream_iter_data():
@pytest.mark.dependency(depends=["datastream_update"])
def test_datastream_iter_data_id1():
seen = set()
for id, change_id, data in ExampleDataStream.iter_data(filter=[3]):
for id, change_id, data in ExampleDataStream.iter_data(filters=["id(3)"]):
assert id not in seen
seen.add(id)
assert seen == {3}
......@@ -162,7 +163,7 @@ def test_datastream_iter_data_id1():
@pytest.mark.dependency(depends=["datastream_update"])
def test_datastream_iter_data_id2():
seen = set()
for id, change_id, data in ExampleDataStream.iter_data(filter=[3, 7]):
for id, change_id, data in ExampleDataStream.iter_data(filters=["id(3,7)"]):
assert id not in seen
seen.add(id)
assert seen == {3, 7}
......@@ -171,7 +172,7 @@ def test_datastream_iter_data_id2():
@pytest.mark.dependency(depends=["datastream_update"])
def test_datastream_iter_data_id_type_check():
with pytest.raises(ValueError):
next(ExampleDataStream.iter_data(filter=3))
next(ExampleDataStream.iter_data(filters=3))
with pytest.raises(ValueError):
next(ExampleDataStream.iter_data(change_id=3))
with pytest.raises(ValueError):
......@@ -244,7 +245,7 @@ def test_datastream_delete_object():
def test_datastream_clean_id():
assert ExampleDataStream.clean_id(1) == 1
assert ExampleDataStream.clean_id("1") == "1"
assert ExampleDataStream.clean_id("1") == 1
def test_datastream_clean_id_int():
......@@ -336,3 +337,31 @@ def test_clean_changeid():
#
with pytest.raises(ValueError):
DataStream.clean_change_id("9999-99-99")
def test_parse_filter():
assert DataStream._parse_filter("id(1)") == ["id", "1"]
assert DataStream._parse_filter("id(1,2)") == ["id", "1", "2"]
assert DataStream._parse_filter("id(1,2, 3 )") == ["id", "1", "2", "3"]
with pytest.raises(ValueError):
DataStream._parse_filter(1)
with pytest.raises(ValueError):
DataStream._parse_filter("id")
with pytest.raises(ValueError):
DataStream._parse_filter("id(")
def test_filter_id():
assert DataStream.filter_id(10) == {"_id": 10}
assert DataStream.filter_id(10, 11) == {"_id": {"$in": [10, 11]}}
def test_compile_filters():
assert DataStream.compile_filters(["id(1)"]) == {"_id": "1"}
assert DataStream.compile_filters(["id(1,2)"]) == {"_id": {"$in": ["1", "2"]}}
with pytest.raises(ValueError):
assert DataStream.compile_filters("id(1)")
with pytest.raises(ValueError):
assert DataStream.compile_filters(["id(1)", 1])
with pytest.raises(ValueError):
DataStream.compile_filters(["unknown(1)"])
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment