test_datastream.py 10.7 KB
Newer Older
Dmitry Volodin's avatar
Dmitry Volodin committed
1
2
3
# ----------------------------------------------------------------------
# noc.core.datastream test
# ----------------------------------------------------------------------
4
# Copyright (C) 2007-2021 The NOC Project
Dmitry Volodin's avatar
Dmitry Volodin committed
5
6
7
# See LICENSE for details
# ----------------------------------------------------------------------

Dmitry Volodin's avatar
Dmitry Volodin committed
8
9
10
# Python modules
import time
import threading
Dmitry Volodin's avatar
Dmitry Volodin committed
11

Dmitry Volodin's avatar
Dmitry Volodin committed
12
13
# Third-party modules
import pytest
Dmitry Volodin's avatar
Dmitry Volodin committed
14
import orjson
15
import bson
Dmitry Volodin's avatar
Dmitry Volodin committed
16

Dmitry Volodin's avatar
Dmitry Volodin committed
17
18
19
# NOC modules
from noc.core.datastream.base import DataStream
from noc.core.perf import metrics
20
from noc.core.datastream.loader import loader
Dmitry Volodin's avatar
Dmitry Volodin committed
21
22
23
24


class ExampleDataStream(DataStream):
    name = "example"
25
    clean_id = DataStream.clean_id_int
Dmitry Volodin's avatar
Dmitry Volodin committed
26
27
28

    @classmethod
    def get_object(cls, id):
Dmitry Volodin's avatar
Dmitry Volodin committed
29
        return {"id": id, "name": "Item #%s" % id}
Dmitry Volodin's avatar
Dmitry Volodin committed
30
31
32
33


class NoEvenDatastream(DataStream):
    name = "noeven"
34
    clean_id = DataStream.clean_id_int
Dmitry Volodin's avatar
Dmitry Volodin committed
35
36
37
38
39

    @classmethod
    def get_object(cls, id):
        if id % 2 == 0:
            raise KeyError
Dmitry Volodin's avatar
Dmitry Volodin committed
40
        return {"id": id, "name": "Item #%s" % id}
Dmitry Volodin's avatar
Dmitry Volodin committed
41

Dmitry Volodin's avatar
Dmitry Volodin committed
42
43
    @classmethod
    def get_meta(cls, data):
Dmitry Volodin's avatar
Dmitry Volodin committed
44
        return {"n": data["id"] / 2}
Dmitry Volodin's avatar
Dmitry Volodin committed
45

Dmitry Volodin's avatar
Dmitry Volodin committed
46
47
48
49
50
51
52
53
54
55
56

def test_datastream_base():
    with pytest.raises(NotImplementedError):
        DataStream.get_object(1)


@pytest.mark.dependency(name="datastream_collection_name")
def test_datastream_collection_name():
    assert ExampleDataStream.get_collection_name() == "ds_example"


Dmitry Volodin's avatar
Dmitry Volodin committed
57
@pytest.mark.dependency(name="datastream_collection", depends=["datastream_collection_name"])
Dmitry Volodin's avatar
Dmitry Volodin committed
58
59
60
61
def test_datastream_collection():
    ExampleDataStream.ensure_collection()
    # Test collection exists
    coll = ExampleDataStream.get_collection()
62
    assert "ds_example" in coll.database.list_collection_names()
Dmitry Volodin's avatar
Dmitry Volodin committed
63
64
65
66
67
68
69
70
    # Test collection indexes
    ii = coll.index_information()
    assert "change_id_1" in ii
    assert ii["change_id_1"].get("unique")


@pytest.mark.dependency(name="datastream_hash")
def test_datastream_hash():
Dmitry Volodin's avatar
Dmitry Volodin committed
71
    data = {"id": 1, "name": "test"}
Dmitry Volodin's avatar
Dmitry Volodin committed
72
73
74
75
76
77
78
79
80
    assert ExampleDataStream.get_hash(data) == "5757d197ae2f024e"


@pytest.fixture(params=list(range(1, 11)))
def ds_index(request):
    return request.param


@pytest.mark.dependency(
Dmitry Volodin's avatar
Dmitry Volodin committed
81
    name="datastream_update", depends=["datastream_hash", "datastream_collection"]
Dmitry Volodin's avatar
Dmitry Volodin committed
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
)
def test_datastream_update(ds_index):
    coll = ExampleDataStream.get_collection()
    # Generate update
    m_u = metrics["ds_example_updated"].value
    m_c = metrics["ds_example_changed"].value
    ExampleDataStream.update_object(ds_index)
    assert metrics["ds_example_updated"].value == m_u + 1
    assert metrics["ds_example_changed"].value == m_c + 1
    # Check document is exists in collection
    doc = coll.find_one({"_id": ds_index})
    assert doc is not None
    assert "hash" in doc
    assert "data" in doc
    assert "change_id" in doc
Dmitry Volodin's avatar
Dmitry Volodin committed
97
    data = orjson.loads(doc["data"])
Dmitry Volodin's avatar
Dmitry Volodin committed
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
    assert "id" in data
    assert "name" in data
    assert data["name"] == "Item #%s" % ds_index
    assert "change_id" in data
    assert data["change_id"] == str(doc["change_id"])
    change_id = doc["change_id"]
    # Try to update once mode
    m_u = metrics["ds_example_updated"].value
    m_c = metrics["ds_example_changed"].value
    ExampleDataStream.update_object(ds_index)
    assert metrics["ds_example_updated"].value == m_u + 1
    assert metrics["ds_example_changed"].value == m_c
    # Check document is still exists
    doc = coll.find_one({"_id": ds_index})
    assert doc is not None
    # Check document is not changed
    assert doc["change_id"] == change_id


@pytest.mark.dependency(depends=["datastream_update"])
def test_datastream_total():
    assert ExampleDataStream.get_total() == 10


@pytest.mark.dependency(depends=["datastream_update"])
def test_datastream_iter_data_limit():
    seen = set()
    for id, change_id, data in ExampleDataStream.iter_data(limit=3):
        assert id not in seen
        seen.add(id)
    assert seen == {1, 2, 3}


@pytest.mark.dependency(depends=["datastream_update"])
def test_datastream_iter_data():
    seen = set()
    last_change = None
    for id, change_id, data in ExampleDataStream.iter_data():
        assert id not in seen
        seen.add(id)
        if last_change:
            assert last_change < change_id
        last_change = change_id
    assert seen == {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}


@pytest.mark.dependency(depends=["datastream_update"])
def test_datastream_iter_data_id1():
    seen = set()
147
    for id, change_id, data in ExampleDataStream.iter_data(filters=["id(3)"]):
Dmitry Volodin's avatar
Dmitry Volodin committed
148
149
150
151
152
153
154
155
        assert id not in seen
        seen.add(id)
    assert seen == {3}


@pytest.mark.dependency(depends=["datastream_update"])
def test_datastream_iter_data_id2():
    seen = set()
156
    for id, change_id, data in ExampleDataStream.iter_data(filters=["id(3,7)"]):
Dmitry Volodin's avatar
Dmitry Volodin committed
157
158
159
160
161
162
163
164
        assert id not in seen
        seen.add(id)
    assert seen == {3, 7}


@pytest.mark.dependency(depends=["datastream_update"])
def test_datastream_iter_data_id_type_check():
    with pytest.raises(ValueError):
165
        next(ExampleDataStream.iter_data(filters=3))
Dmitry Volodin's avatar
Dmitry Volodin committed
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
    with pytest.raises(ValueError):
        next(ExampleDataStream.iter_data(change_id=3))
    with pytest.raises(ValueError):
        next(ExampleDataStream.iter_data(change_id="3"))


@pytest.mark.dependency(depends=["datastream_update"])
def test_datastream_iter_data_chunked():
    seen = set()
    last_change = None
    # First 3
    for id, change_id, data in ExampleDataStream.iter_data(limit=3):
        last_change = change_id
        assert id not in seen
        seen.add(id)
    assert seen == {1, 2, 3}
    # Next 3
    for id, change_id, data in ExampleDataStream.iter_data(change_id=last_change, limit=3):
        last_change = change_id
        assert id not in seen
        seen.add(id)
    assert seen == {1, 2, 3, 4, 5, 6}
    # Next 3
    for id, change_id, data in ExampleDataStream.iter_data(change_id=last_change, limit=3):
        last_change = change_id
        assert id not in seen
        seen.add(id)
    assert seen == {1, 2, 3, 4, 5, 6, 7, 8, 9}
    # Last one
    for id, change_id, data in ExampleDataStream.iter_data(change_id=last_change, limit=3):
        last_change = change_id
        assert id not in seen
        seen.add(id)
    assert seen == {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
    # Nothing more
    assert list(ExampleDataStream.iter_data(change_id=last_change, limit=3)) == []


@pytest.mark.dependency(depends=["datastream_update"])
def test_datastream_key_error(ds_index):
    coll = NoEvenDatastream.get_collection()
    NoEvenDatastream.update_object(ds_index)
    doc = coll.find_one({"_id": ds_index})
    assert doc
    assert "data" in doc
    if ds_index % 2 == 0:
        assert "$deleted" in doc["data"]
    else:
        assert "$deleted" not in doc["data"]
Dmitry Volodin's avatar
Dmitry Volodin committed
215
        assert "meta" in doc
216
        assert doc["meta"]["n"][0] == ds_index / 2
Dmitry Volodin's avatar
Dmitry Volodin committed
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235


def test_datastream_delete_object():
    class DeleteDataStream(ExampleDataStream):
        name = "delete"

    coll = DeleteDataStream.get_collection()
    DeleteDataStream.ensure_collection()
    DeleteDataStream.update_object(1)
    doc = coll.find_one({"_id": 1})
    assert doc
    assert "data" in doc
    assert "$deleted" not in doc["data"]
    # Delete
    DeleteDataStream.delete_object(1)
    doc = coll.find_one({"_id": 1})
    assert doc
    assert "data" in doc
    assert "$deleted" in doc["data"]
Dmitry Volodin's avatar
Dmitry Volodin committed
236
237
238
239


def test_datastream_clean_id():
    assert ExampleDataStream.clean_id(1) == 1
240
    assert ExampleDataStream.clean_id("1") == 1
Dmitry Volodin's avatar
Dmitry Volodin committed
241
242
243
244
245
246
247
248
249
250


def test_datastream_clean_id_int():
    class DS(DataStream):
        clean_id = DataStream.clean_id_int

    assert DS.clean_id(1) == 1
    assert DS.clean_id("1") == 1
    with pytest.raises(ValueError):
        DS.clean_id("z")
251
252


Dmitry Volodin's avatar
Dmitry Volodin committed
253
@pytest.fixture(params=list(loader))
254
255
256
257
258
def datastream_name(request):
    return request.param


def test_loader(datastream_name):
Dmitry Volodin's avatar
Dmitry Volodin committed
259
    ds = loader[datastream_name]
260
261
262
263
264
265
    assert ds is not None
    assert issubclass(ds, DataStream)
    assert ds.name == datastream_name


def test_loader_invalid_name():
Dmitry Volodin's avatar
Dmitry Volodin committed
266
    ds = loader["aaa..bbbb"]
267
268
269
270
    assert ds is None


def test_loader_error():
Dmitry Volodin's avatar
Dmitry Volodin committed
271
    ds = loader["invalid"]
272
    assert ds is None
Dmitry Volodin's avatar
Dmitry Volodin committed
273
274


Dmitry Volodin's avatar
Dmitry Volodin committed
275
276
def test_loader_contains(datastream_name):
    assert datastream_name in loader
277
278


Dmitry Volodin's avatar
Dmitry Volodin committed
279
280
281
282
283
284
def test_wait():
    class WaitDS(DataStream):
        name = "wait"

        @classmethod
        def get_object(cls, id):
Dmitry Volodin's avatar
Dmitry Volodin committed
285
            return {"id": id}
Dmitry Volodin's avatar
Dmitry Volodin committed
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316

    TIMEOUT = 0.1

    def writer():
        # Wait until main thread is ready
        event.wait()
        time.sleep(TIMEOUT)
        WaitDS.update_object(2)

    # Insert first object
    WaitDS.ensure_collection()
    WaitDS.update_object(1)
    # Start writer thread
    event = threading.Event()
    thread = threading.Thread(target=writer, name="test_wait-writer")
    thread.setDaemon(True)
    thread.start()
    # Ensure WaitDS contains only one item
    assert WaitDS.get_total() == 1
    # Allow writer thread to start
    event.set()
    # Hang until writer thread inserts new object
    t0 = time.time()
    WaitDS.wait()
    dt = time.time() - t0
    # Cleanup writer thread
    thread.join()
    # We should be waiting more, than TIMEOUT
    assert dt > TIMEOUT
    # We must have second object
    assert WaitDS.get_total() == 2
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333


def test_clean_changeid():
    oid = bson.ObjectId()
    # ObjectId left unchanged
    assert DataStream.clean_change_id(oid) == oid
    # string converted to object id
    assert DataStream.clean_change_id(str(oid)) == oid
    # date
    oid = DataStream.clean_change_id("2018-07-04")
    assert oid.generation_time.strftime("%Y-%m-%d") == "2018-07-04"
    # datetime
    oid = DataStream.clean_change_id("2018-07-04T09:25:51")
    assert oid.generation_time.strftime("%Y-%m-%dT%H:%M:%S") == "2018-07-04T09:25:51"
    #
    with pytest.raises(ValueError):
        DataStream.clean_change_id("9999-99-99")
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351


def test_parse_filter():
    assert DataStream._parse_filter("id(1)") == ["id", "1"]
    assert DataStream._parse_filter("id(1,2)") == ["id", "1", "2"]
    assert DataStream._parse_filter("id(1,2,  3   )") == ["id", "1", "2", "3"]
    with pytest.raises(ValueError):
        DataStream._parse_filter(1)
    with pytest.raises(ValueError):
        DataStream._parse_filter("id")
    with pytest.raises(ValueError):
        DataStream._parse_filter("id(")


def test_filter_id():
    assert DataStream.filter_id(10) == {"_id": 10}
    assert DataStream.filter_id(10, 11) == {"_id": {"$in": [10, 11]}}

Dmitry Volodin's avatar
PEP8    
Dmitry Volodin committed
352

353
354
355
356
def test_filter_shard():
    assert DataStream.filter_shard(1, 3) == {"_id": {"$mod": [3, 1]}}
    assert DataStream.filter_shard("1", "3") == {"_id": {"$mod": [3, 1]}}

357
358
359
360
361
362
363
364
365
366

def test_compile_filters():
    assert DataStream.compile_filters(["id(1)"]) == {"_id": "1"}
    assert DataStream.compile_filters(["id(1,2)"]) == {"_id": {"$in": ["1", "2"]}}
    with pytest.raises(ValueError):
        assert DataStream.compile_filters("id(1)")
    with pytest.raises(ValueError):
        assert DataStream.compile_filters(["id(1)", 1])
    with pytest.raises(ValueError):
        DataStream.compile_filters(["unknown(1)"])