Commit af2c47ff authored by Dmitry Volodin's avatar Dmitry Volodin
Browse files

Merge branch 'noc-1552' into 'master'

noc/noc#1552 Add filter_policy param to Datastream API.

See merge request noc/noc!5884
parents aacb96a3 42d25886
Pipeline #34262 failed with stages
in 33 minutes and 31 seconds
......@@ -59,6 +59,8 @@ class DataStream(object):
BULK_SIZE = 500
META_MAX_DEPTH = 3
_collections: Dict[str, pymongo.collection.Collection] = {}
_collections_async: Dict[str, pymongo.collection.Collection] = {}
......@@ -138,6 +140,15 @@ class DataStream(object):
"""
return {"id": str(id), "$deleted": True}
@classmethod
def get_moved_object(cls, id: Union[str, int]) -> Dict[str, Any]:
"""
Generate item for deleted object
:param id:
:return:
"""
return {"id": str(id), "$moved": True}
@staticmethod
def get_hash(data) -> str:
return hashlib.sha256(orjson.dumps(data)).hexdigest()[: DataStream.HASH_LEN]
......@@ -156,7 +167,9 @@ class DataStream(object):
chunk, objects = objects[: cls.BULK_SIZE], objects[cls.BULK_SIZE :]
current_state = {
doc[cls.F_ID]: doc
for doc in coll.find({cls.F_ID: {"$in": chunk}}, {cls.F_ID: 1, cls.F_HASH: 1})
for doc in coll.find(
{cls.F_ID: {"$in": chunk}}, {cls.F_ID: 1, cls.F_HASH: 1, cls.F_META: 1}
)
}
bulk = []
fmt_data = defaultdict(list)
......@@ -174,7 +187,7 @@ class DataStream(object):
current_state = {
doc[cls.F_ID]: doc
for doc in fmt_coll[fmt].find(
{cls.F_ID: {"$in": fmt_ids}}, {cls.F_ID: 1, cls.F_HASH: 1}
{cls.F_ID: {"$in": fmt_ids}}, {cls.F_ID: 1, cls.F_HASH: 1, cls.F_META: 1}
)
}
fmt_bulk[fmt] = []
......@@ -190,6 +203,39 @@ class DataStream(object):
logger.info("[%s|%s] Sending %d bulk operations", cls.name, fmt, len(bulk))
fmt_coll[fmt].bulk_write(bulk, ordered=True)
@classmethod
def clean_meta(cls, meta: Dict[str, List[Any]], current_meta: Dict[str, Any]):
"""
Calculate actual meta from calculate and current records
:param meta: Calculated meta
:param current_meta: Meta value for current record
:return:
"""
r = {} # current meta
# Compare meta
for mf, mv in meta.items():
if not mv and isinstance(mv, list) and not current_meta.get(mf):
r[mf] = []
continue
elif mf not in current_meta or not current_meta[mf]:
# @todo Save empty list ?
r[mf] = [mv]
continue
r[mf] = current_meta[mf]
# Check old format
if isinstance(r[mf], str):
r[mf] = [r[mf]]
elif isinstance(mv, list) and not isinstance(r[mf][0], list):
r[mf] = [r[mf]]
# Check changes
if r[mf] and r[mf][0] == mv:
continue
r[mf].insert(0, mv)
# Check max depth
r[mf] = r[mf][: cls.META_MAX_DEPTH]
return r
@classmethod
def _update_object(cls, data, meta=None, fmt=None, state=None, bulk=None) -> bool:
def is_changed(d, h):
......@@ -207,7 +253,9 @@ class DataStream(object):
if state:
doc = state.get(obj_id)
else:
doc = cls.get_collection(fmt).find_one({cls.F_ID: obj_id}, {cls.F_ID: 0, cls.F_HASH: 1})
doc = cls.get_collection(fmt).find_one(
{cls.F_ID: obj_id}, {cls.F_ID: 0, cls.F_HASH: 1, cls.F_META: 1}
)
if not is_changed(doc, hash):
logger.info("[%s] Object hasn't been changed", l_name)
return False # Not changed
......@@ -227,7 +275,9 @@ class DataStream(object):
}
}
if meta:
op["$set"][cls.F_META] = meta
op["$set"][cls.F_META] = cls.clean_meta(
meta, doc[cls.F_META] if doc and doc.get(cls.F_META) else {}
)
elif "$deleted" not in data:
op["$unset"] = {cls.F_META: ""}
if bulk is None:
......@@ -332,22 +382,89 @@ class DataStream(object):
raise ValueError(str(e))
@classmethod
async def iter_data_async(cls, change_id, limit, filters, fmt):
q = {}
def is_moved(cls, meta: Dict[str, List[Any]], meta_filters: Dict[str, Any]) -> bool:
"""
Check record is out of filter scope. Check filter diff on meta and record value
:param meta:
:param meta_filters:
:return:
"""
for field, field_value in meta_filters.items():
if not field.startswith("meta."):
# Not meta query
continue
_, field = field.split(".", 1)
if field not in meta or not meta[field]:
continue
if isinstance(meta[field], str):
# Old meta format
return False
if meta[field][0] != field_value:
return True
return False
@classmethod
async def iter_data_async(
cls,
change_id: str = None,
limit: int = None,
filters: List[str] = None,
fmt=None,
filter_policy: Optional[str] = None,
):
"""
Iterate over data items beginning from change id
Raises ValueError if filters has incorrect input parameters
:param change_id: Staring change id
:param limit: Records limit
:param filters: List of strings with filter expression
:param fmt: Format
:param filter_policy: Metadata changed policy. Behavior if metadata change out of filter scope
* default - no changes
* delete - return $delete message
* keep - ignore filter, return full record
* move - return $moved message
:return: (id, change_id, data)
"""
q, meta_filters = {}, {}
if filters:
q.update(cls.compile_filters(filters))
cf = cls.compile_filters(filters)
q.update(cf)
meta_filters = {k: v for k, v in cf.items() if k.startswith("meta.")}
if change_id:
q[cls.F_CHANGEID] = {"$gt": cls.clean_change_id(change_id)}
coll = cls.get_collection_async(fmt)
async for doc in (
coll.find(q, {cls.F_ID: 1, cls.F_CHANGEID: 1, cls.F_DATA: 1})
coll.find(q, {cls.F_ID: 1, cls.F_CHANGEID: 1, cls.F_DATA: 1, cls.F_META: 1})
.sort([(cls.F_CHANGEID, pymongo.ASCENDING)])
.limit(limit=limit or cls.DEFAULT_LIMIT)
):
yield doc[cls.F_ID], doc[cls.F_CHANGEID], doc[cls.F_DATA]
data = doc[cls.F_DATA]
if not meta_filters or filter_policy == "keep":
# Optimization for exclude is_moved check
yield doc[cls.F_ID], doc[cls.F_CHANGEID], data
continue
is_moved = cls.is_moved(doc[cls.F_META], meta_filters)
if is_moved and (filter_policy == "default" or not filter_policy):
# Default behavior - skip record
continue
elif is_moved and filter_policy in {"delete", "move"}:
data = {cls.F_CHANGEID: str(doc[cls.F_CHANGEID])}
h = {"delete": cls.get_deleted_object, "move": cls.get_moved_object}[filter_policy]
data.update(h(doc[cls.F_ID]))
data = smart_text(orjson.dumps(data))
yield doc[cls.F_ID], doc[cls.F_CHANGEID], data
@classmethod
def iter_data(cls, change_id=None, limit=None, filters=None, fmt=None):
def iter_data(
cls,
change_id: str = None,
limit: int = None,
filters: List[str] = None,
fmt=None,
filter_policy: Optional[str] = None,
):
"""
Iterate over data items beginning from change id
......@@ -356,20 +473,41 @@ class DataStream(object):
:param limit: Records limit
:param filters: List of strings with filter expression
:param fmt: Format
:param filter_policy: Metadata changed policy. Behavior if metadata change out of filter scope
* default - no changes
* delete - return $delete message
* keep - ignore filter, return full record
* move - return $moved message
:return: (id, change_id, data)
"""
q = {}
q, meta_filters = {}, {}
if filters:
q.update(cls.compile_filters(filters))
cf = cls.compile_filters(filters)
q.update(cf)
meta_filters = {k: v for k, v in cf.items() if k.startswith("meta.")}
if change_id:
q[cls.F_CHANGEID] = {"$gt": cls.clean_change_id(change_id)}
coll = cls.get_collection(fmt)
for doc in (
coll.find(q, {cls.F_ID: 1, cls.F_CHANGEID: 1, cls.F_DATA: 1})
coll.find(q, {cls.F_ID: 1, cls.F_CHANGEID: 1, cls.F_DATA: 1, cls.F_META: 1})
.sort([(cls.F_CHANGEID, pymongo.ASCENDING)])
.limit(limit=limit or cls.DEFAULT_LIMIT)
):
yield doc[cls.F_ID], doc[cls.F_CHANGEID], doc[cls.F_DATA]
data = doc[cls.F_DATA]
if not meta_filters or filter_policy == "keep":
# Optimization for exclude is_moved check
yield doc[cls.F_ID], doc[cls.F_CHANGEID], data
continue
is_moved = cls.is_moved(doc[cls.F_META], meta_filters)
if is_moved and (filter_policy == "default" or not filter_policy):
# Default behavior - skip record
continue
elif is_moved and filter_policy in {"delete", "move"}:
data = {cls.F_CHANGEID: str(doc[cls.F_CHANGEID])}
h = {"delete": cls.get_deleted_object, "move": cls.get_moved_object}[filter_policy]
data.update(h(doc[cls.F_ID]))
data = smart_text(orjson.dumps(data))
yield doc[cls.F_ID], doc[cls.F_CHANGEID], data
@classmethod
def on_change(cls, data):
......
......@@ -36,6 +36,13 @@ class DataStreamClient(object):
:return:
"""
async def on_move(self, data):
"""
Called on each moved item received through datastream
:param data:
:return:
"""
async def on_delete(self, data):
"""
Called on each deleted item received through datastream
......@@ -55,13 +62,19 @@ class DataStreamClient(object):
filters=None,
block: bool = False,
limit: Optional[int] = None,
filter_policy: Optional[str] = None,
):
"""
Query datastream
:param change_id:
:param filters:
:param change_id: Staring change id
:param filters: List of strings with filter expression
:param block:
:param limit:
:param limit: Records limit
:param filter_policy: Metadata changed policy. Behavior if metadata change out of filter scope
* default - no changes
* delete - return $delete message
* keep - ignore filter, return full record
* move - return $moved message
:return:
"""
# Basic URL and query
......@@ -73,6 +86,8 @@ class DataStreamClient(object):
base_qs += ["block=1"]
if limit:
base_qs += [f"limit={limit}"]
if filter_policy:
base_qs += [f"filter_policy={filter_policy}"]
req_headers = {"X-NOC-API-Access": f"datastream:{self.name}"}
loop = asyncio.get_running_loop()
# Continue until finish
......@@ -110,6 +125,8 @@ class DataStreamClient(object):
for item in data:
if "$deleted" in item:
await self.on_delete(item)
elif "$moved" in item:
await self.on_move(item)
else:
await self.on_change(item)
#
......
......@@ -186,6 +186,9 @@ class DatastreamAPI(object):
ds_id: Optional[List[str]] = Query(None, aplias="id"),
ds_format: Optional[str] = Query(None, alias="format"),
ds_from: Optional[str] = Query(None, alias="from"),
ds_filter_policy: Optional[str] = Query(
None, alias="filter_policy", regex=r"^(default|delete|keep|move)$"
),
block: Optional[int] = None,
):
# Increase limit by 1 to detect datastream has more data
......@@ -215,7 +218,11 @@ class DatastreamAPI(object):
r = []
try:
async for item_id, change_id, data in datastream.iter_data_async(
limit=limit, filters=filters, change_id=change_id, fmt=fmt
limit=limit,
filters=filters,
change_id=change_id,
fmt=fmt,
filter_policy=ds_filter_policy,
):
if not first_change:
first_change = change_id
......
# ----------------------------------------------------------------------
# noc.core.datastream.base.clean_meta tests
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Third-party modules
import pytest
# NOC modules
from noc.core.datastream.base import DataStream
@pytest.mark.parametrize(
"meta,current_meta,expected",
[
(
{
"client_groups": [],
"pool": "default",
"service_groups": ["6128a1bbda3903fa14f1b4eb", "6128a1bbda3903fa14f1b538"],
},
{"pool": "default", "service_groups": [], "client_groups": []},
{
"client_groups": [],
"pool": ["default"],
"service_groups": [["6128a1bbda3903fa14f1b4eb", "6128a1bbda3903fa14f1b538"]],
},
),
(
{
"client_groups": [],
"pool": "default2",
"service_groups": ["6128a1bbda3903fa14f1b4eb", "6128a1bbda3903fa14f1b538"],
},
{"pool": ["default"], "service_groups": [], "client_groups": []},
{
"client_groups": [],
"pool": ["default2", "default"],
"service_groups": [["6128a1bbda3903fa14f1b4eb", "6128a1bbda3903fa14f1b538"]],
},
),
(
{"client_groups": [], "pool": "default2", "service_groups": []},
{
"pool": ["default"],
"service_groups": ["6128a1bbda3903fa14f1b4eb", "6128a1bbda3903fa14f1b538"],
"client_groups": [],
},
{
"client_groups": [],
"pool": ["default2", "default"],
"service_groups": [[], ["6128a1bbda3903fa14f1b4eb", "6128a1bbda3903fa14f1b538"]],
},
),
(
{"client_groups": [], "pool": "default2", "service_groups": []},
{
"pool": ["default"],
"service_groups": [[], ["6128a1bbda3903fa14f1b4eb", "6128a1bbda3903fa14f1b538"]],
"client_groups": [],
},
{
"client_groups": [],
"pool": ["default2", "default"],
"service_groups": [[], ["6128a1bbda3903fa14f1b4eb", "6128a1bbda3903fa14f1b538"]],
},
),
(
{"pool": "default", "service_groups": [], "client_groups": []},
{},
{"pool": ["default"], "service_groups": [], "client_groups": []},
),
],
)
def test_parse_table(meta, current_meta, expected):
assert DataStream.clean_meta(meta, current_meta) == expected
......@@ -213,7 +213,7 @@ def test_datastream_key_error(ds_index):
else:
assert "$deleted" not in doc["data"]
assert "meta" in doc
assert doc["meta"]["n"] == ds_index / 2
assert doc["meta"]["n"][0] == ds_index / 2
def test_datastream_delete_object():
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment