Commit fa66d6e0 authored by Dmitry Volodin's avatar Dmitry Volodin
Browse files

DataStream.wait() method

parent 568dc56c
......@@ -112,7 +112,7 @@ class DataStream(object):
# Get existing object
doc = coll.find_one({cls.F_ID: id}, {cls.F_ID: 0, cls.F_HASH: 1})
if doc and doc.get(cls.F_HASH) == hash:
return False # Not changed
return False # Not changed
metrics["ds_%s_changed" % cls.name] += 1
changeid = bson.ObjectId()
data["change_id"] = str(changeid)
......@@ -202,3 +202,14 @@ class DataStream(object):
:return:
"""
return int(id)
@classmethod
def wait(cls):
"""
Block until datastream receives changes
:return:
"""
coll = cls.get_collection()
with coll.watch() as stream:
next(stream)
return
......@@ -6,6 +6,9 @@
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
import time
import threading
# Third-party modules
import pytest
import ujson
......@@ -273,3 +276,45 @@ def test_loader_invalid_name():
def test_loader_error():
ds = loader.get_datastream("invalid")
assert ds is None
def test_wait():
class WaitDS(DataStream):
name = "wait"
@classmethod
def get_object(cls, id):
return {
"id": id
}
TIMEOUT = 0.1
def writer():
# Wait until main thread is ready
event.wait()
time.sleep(TIMEOUT)
WaitDS.update_object(2)
# Insert first object
WaitDS.ensure_collection()
WaitDS.update_object(1)
# Start writer thread
event = threading.Event()
thread = threading.Thread(target=writer, name="test_wait-writer")
thread.setDaemon(True)
thread.start()
# Ensure WaitDS contains only one item
assert WaitDS.get_total() == 1
# Allow writer thread to start
event.set()
# Hang until writer thread inserts new object
t0 = time.time()
WaitDS.wait()
dt = time.time() - t0
# Cleanup writer thread
thread.join()
# We should be waiting more, than TIMEOUT
assert dt > TIMEOUT
# We must have second object
assert WaitDS.get_total() == 2
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment