base.py 22.8 KB
Newer Older
Dmitry Volodin's avatar
Dmitry Volodin committed
1
2
3
# ----------------------------------------------------------------------
# DataStream
# ----------------------------------------------------------------------
4
# Copyright (C) 2007-2020 The NOC Project
Dmitry Volodin's avatar
Dmitry Volodin committed
5
6
7
8
9
# See LICENSE for details
# ----------------------------------------------------------------------

# Python modules
import hashlib
Dmitry Volodin's avatar
Dmitry Volodin committed
10
import datetime
11
12
13
import re
import logging
from collections import defaultdict
Dmitry Volodin's avatar
Dmitry Volodin committed
14

Dmitry Volodin's avatar
Dmitry Volodin committed
15
# Third-party modules
Dmitry Volodin's avatar
Dmitry Volodin committed
16
import orjson
Dmitry Volodin's avatar
Dmitry Volodin committed
17
18
19
import bson
import bson.errors
import pymongo
20
import dateutil.parser
21
from typing import Optional, Dict, Any, List, Union, Iterable, Tuple, Callable
Dmitry Volodin's avatar
Dmitry Volodin committed
22

Dmitry Volodin's avatar
Dmitry Volodin committed
23
24
# NOC modules
from noc.core.perf import metrics
kk's avatar
kk committed
25
from noc.core.mongo.connection import get_db
Dmitry Volodin's avatar
Dmitry Volodin committed
26
from noc.core.comp import smart_text, smart_bytes
27
from noc.models import get_model
Dmitry Volodin's avatar
Dmitry Volodin committed
28
29
from noc.core.hash import hash_int
from noc.core.mx import send_message, MX_CHANGE_ID
30
31

logger = logging.getLogger(__name__)
Dmitry Volodin's avatar
Dmitry Volodin committed
32
33
34
35
36
37
38
39
40
41
42


class DataStream(object):
    """
    Datastream stored in collection named ds_<name>.
    Fields:
    _id: Object id
    changeid: Change ID
    data: stream data (serialized JSON)
    hash: Object hash
    """
Dmitry Volodin's avatar
Dmitry Volodin committed
43

Dmitry Volodin's avatar
Dmitry Volodin committed
44
    name = None
45
    model = None  # PydanticModel for Response
Dmitry Volodin's avatar
Dmitry Volodin committed
46

Dmitry Volodin's avatar
Dmitry Volodin committed
47
48
49
    # Generate separate message
    enable_message = False

Dmitry Volodin's avatar
Dmitry Volodin committed
50
51
52
53
    F_ID = "_id"
    F_CHANGEID = "change_id"
    F_HASH = "hash"
    F_DATA = "data"
Dmitry Volodin's avatar
Dmitry Volodin committed
54
    F_META = "meta"
Dmitry Volodin's avatar
Dmitry Volodin committed
55
56
57
    HASH_LEN = 16

    DEFAULT_LIMIT = 1000
58
    rx_ts = re.compile(r"^\d{4}-\d{2}-\d{2}")
Dmitry Volodin's avatar
Dmitry Volodin committed
59

60
61
    BULK_SIZE = 500

62
63
    META_MAX_DEPTH = 3

64
    _collections: Dict[str, pymongo.collection.Collection] = {}
65
    _collections_async: Dict[str, pymongo.collection.Collection] = {}
66

Dmitry Volodin's avatar
Dmitry Volodin committed
67
    @classmethod
68
69
70
    def get_collection_name(cls, format: Optional[str] = None) -> str:
        if format:
            return "ds_%s_%s" % (cls.name, format)
Dmitry Volodin's avatar
Dmitry Volodin committed
71
72
73
        return "ds_%s" % cls.name

    @classmethod
74
    def get_collection(cls, fmt: Optional[str] = None) -> pymongo.collection.Collection:
Dmitry Volodin's avatar
Dmitry Volodin committed
75
76
77
78
        """
        Get pymongo Collection object
        :return:
        """
79
80
        c_name = cls.get_collection_name(fmt)
        coll = cls._collections.get(c_name)
Dmitry Volodin's avatar
Dmitry Volodin committed
81
        if not coll:
82
83
            coll = get_db()[c_name]
            cls._collections[c_name] = coll
Dmitry Volodin's avatar
Dmitry Volodin committed
84
85
        return coll

86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
    @classmethod
    def get_collection_async(cls, fmt: Optional[str] = None) -> pymongo.collection.Collection:
        """
        Get pymongo Collection object
        :return:
        """
        c_name = cls.get_collection_name(fmt)
        if c_name not in cls._collections_async:
            from noc.core.mongo.connection_async import connect_async, get_db

            connect_async()
            coll = get_db()[c_name]
            cls._collections_async[c_name] = coll
        return cls._collections_async[c_name]

Dmitry Volodin's avatar
Dmitry Volodin committed
101
102
103
104
105
106
107
108
    @classmethod
    def ensure_collection(cls):
        """
        Ensure collection is created and properly indexed
        :return:
        """
        coll = cls.get_collection()
        coll.create_index(cls.F_CHANGEID, unique=True)
109
110
111
112
        meta = cls.get_meta({})
        if meta:
            for m in meta:
                coll.create_index("%s.%s" % (cls.F_META, m))
Dmitry Volodin's avatar
Dmitry Volodin committed
113
114
115
116
117
118
119
120
121
122
123
124

    @classmethod
    def get_object(cls, id):
        """
        Generate datastream object for given id.
        Raise KeyError if object is not found
        Must be overriden
        :param id: Object id
        :return: dict containing object data
        """
        raise NotImplementedError()

Dmitry Volodin's avatar
Dmitry Volodin committed
125
    @classmethod
126
    def get_meta(cls, data: Dict[str, Any]) -> Optional[Dict]:
Dmitry Volodin's avatar
Dmitry Volodin committed
127
128
129
130
131
132
133
        """
        Extract additional metadata from .get_object() result for additional indexing
        :param data: .get_object() result
        :return: dict or None
        """
        return None

Dmitry Volodin's avatar
Dmitry Volodin committed
134
135
136
137
138
139
140
    @classmethod
    def get_deleted_object(cls, id):
        """
        Generate item for deleted object
        :param id:
        :return:
        """
Dmitry Volodin's avatar
Dmitry Volodin committed
141
        return {"id": str(id), "$deleted": True}
Dmitry Volodin's avatar
Dmitry Volodin committed
142

143
144
145
146
147
148
149
150
151
    @classmethod
    def get_moved_object(cls, id: Union[str, int]) -> Dict[str, Any]:
        """
        Generate item for deleted object
        :param id:
        :return:
        """
        return {"id": str(id), "$moved": True}

Dmitry Volodin's avatar
Dmitry Volodin committed
152
    @staticmethod
153
    def get_hash(data) -> str:
Dmitry Volodin's avatar
Dmitry Volodin committed
154
        return hashlib.sha256(orjson.dumps(data)).hexdigest()[: DataStream.HASH_LEN]
Dmitry Volodin's avatar
Dmitry Volodin committed
155
156

    @classmethod
157
    def bulk_update(cls, objects: List[Union[id, str, bson.ObjectId]]) -> None:
Dmitry Volodin's avatar
Dmitry Volodin committed
158
        coll = cls.get_collection()
159
160
161
162
163
164
165
166
167
168
169
        # Get possible formats
        fmt_coll: Dict[str, pymongo.collection.Collection] = {}
        fmt_handler: Dict[str, Callable] = {}
        for fmt, handler in cls.iter_formats():
            fmt_coll[fmt] = cls.get_collection(fmt)
            fmt_handler[fmt] = handler
        # Process objects
        while objects:
            chunk, objects = objects[: cls.BULK_SIZE], objects[cls.BULK_SIZE :]
            current_state = {
                doc[cls.F_ID]: doc
170
171
172
                for doc in coll.find(
                    {cls.F_ID: {"$in": chunk}}, {cls.F_ID: 1, cls.F_HASH: 1, cls.F_META: 1}
                )
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
            }
            bulk = []
            fmt_data = defaultdict(list)
            fmt_bulk = {}
            # Apply default format
            for obj_id in chunk:
                data, meta = cls._get_current_data(obj_id)
                cls._update_object(data=data, meta=meta, state=current_state, bulk=bulk)
                # Process formats
                for fmt in fmt_handler:
                    fmt_data[fmt] += list(fmt_handler[fmt](data))
            # Apply formats
            for fmt in fmt_data:
                fmt_ids = [data["id"] for data in fmt_data[fmt]]
                current_state = {
                    doc[cls.F_ID]: doc
                    for doc in fmt_coll[fmt].find(
190
                        {cls.F_ID: {"$in": fmt_ids}}, {cls.F_ID: 1, cls.F_HASH: 1, cls.F_META: 1}
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
                    )
                }
                fmt_bulk[fmt] = []
                for data in fmt_data[fmt]:
                    cls._update_object(data=data, fmt=fmt, state=current_state, bulk=fmt_bulk[fmt])
            # Save pending operations
            if bulk:
                logger.info("[%s] Sending %d bulk operations", cls.name, len(bulk))
                coll.bulk_write(bulk, ordered=True)
            for fmt in fmt_bulk:
                bulk = fmt_bulk[fmt]
                if bulk:
                    logger.info("[%s|%s] Sending %d bulk operations", cls.name, fmt, len(bulk))
                    fmt_coll[fmt].bulk_write(bulk, ordered=True)

206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
    @classmethod
    def clean_meta(cls, meta: Dict[str, List[Any]], current_meta: Dict[str, Any]):
        """
        Calculate actual meta from calculate and current records

        :param meta: Calculated meta
        :param current_meta: Meta value for current record
        :return:
        """
        r = {}  # current meta
        # Compare meta
        for mf, mv in meta.items():
            if not mv and isinstance(mv, list) and not current_meta.get(mf):
                r[mf] = []
                continue
            elif mf not in current_meta or not current_meta[mf]:
                # @todo Save empty list ?
                r[mf] = [mv]
                continue
            r[mf] = current_meta[mf]
            # Check old format
            if isinstance(r[mf], str):
                r[mf] = [r[mf]]
            elif isinstance(mv, list) and not isinstance(r[mf][0], list):
                r[mf] = [r[mf]]
            # Check changes
            if r[mf] and r[mf][0] == mv:
                continue
            r[mf].insert(0, mv)
            # Check max depth
            r[mf] = r[mf][: cls.META_MAX_DEPTH]
        return r

239
240
241
242
243
244
245
246
247
248
249
    @classmethod
    def _update_object(cls, data, meta=None, fmt=None, state=None, bulk=None) -> bool:
        def is_changed(d, h):
            return not d or d.get(cls.F_HASH) != h

        obj_id = cls.clean_id(data["id"])
        if meta is None and "$meta" in data:
            meta = data.pop("$meta")
        m_name = "%s_%s" % (cls.name, fmt) if fmt else cls.name
        l_name = "%s|%s|%s" % (cls.name, obj_id, fmt) if fmt else "%s|%s" % (cls.name, obj_id)
        metrics["ds_%s_updated" % m_name] += 1
Dmitry Volodin's avatar
Dmitry Volodin committed
250
251
        # Calculate hash
        hash = cls.get_hash(data)
252
253
254
255
        # Get existing object state
        if state:
            doc = state.get(obj_id)
        else:
256
257
258
            doc = cls.get_collection(fmt).find_one(
                {cls.F_ID: obj_id}, {cls.F_ID: 0, cls.F_HASH: 1, cls.F_META: 1}
            )
259
260
        if not is_changed(doc, hash):
            logger.info("[%s] Object hasn't been changed", l_name)
Dmitry Volodin's avatar
Dmitry Volodin committed
261
            return False  # Not changed
262
        if not fmt and cls.on_change(data):
Dmitry Volodin's avatar
Dmitry Volodin committed
263
            hash = cls.get_hash(data)
264
            if not is_changed(doc, hash):
265
                logger.info("[%s] Object hasn't been changed after altering", l_name)
266
267
268
269
                return False  # Not changed after altering
        metrics["ds_%s_changed" % m_name] += 1
        change_id = bson.ObjectId()
        data["change_id"] = str(change_id)
Dmitry Volodin's avatar
Dmitry Volodin committed
270
271
272
273
274
275
276
        op = {
            "$set": {
                cls.F_CHANGEID: change_id,
                cls.F_HASH: hash,
                cls.F_DATA: smart_text(orjson.dumps(data)),
            }
        }
Dmitry Volodin's avatar
Dmitry Volodin committed
277
        if meta:
278
279
280
            op["$set"][cls.F_META] = cls.clean_meta(
                meta, doc[cls.F_META] if doc and doc.get(cls.F_META) else {}
            )
Dmitry Volodin's avatar
PEP8    
Dmitry Volodin committed
281
        elif "$deleted" not in data:
Dmitry Volodin's avatar
Dmitry Volodin committed
282
            op["$unset"] = {cls.F_META: ""}
283
284
285
286
287
        if bulk is None:
            cls.get_collection(fmt).update_one({cls.F_ID: obj_id}, op, upsert=True)
        else:
            bulk += [pymongo.UpdateOne({cls.F_ID: obj_id}, op, upsert=True)]
        logger.info("[%s] Object has been changed", l_name)
Dmitry Volodin's avatar
Dmitry Volodin committed
288
289
290
291
        if cls.enable_message:
            # Build MX message
            logger.info("[%s] Sending message", l_name)
            cls.send_message(data, change_id)
292
        return True
Dmitry Volodin's avatar
Dmitry Volodin committed
293

294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
    @classmethod
    def _get_current_data(
        cls, obj_id, delete=False
    ) -> Tuple[Dict[str, Any], Optional[Dict[str, Any]]]:
        if delete:
            return cls.get_deleted_object(obj_id), None
        try:
            data = cls.get_object(obj_id)
            meta = cls.get_meta(data)
            return data, meta
        except KeyError:
            return cls.get_deleted_object(obj_id), None

    @classmethod
    def update_object(cls, id, delete=False) -> bool:
        """
        Generate and update object in stream
        :param id: Object id
        :param delete: Object must be marked as deleted
        :return: True if object has been updated
        """
        data, meta = cls._get_current_data(id, delete=delete)
        r = cls._update_object(data=data, meta=meta)
        for fmt, handler in cls.iter_formats():
            for f_data in handler(data):
                r |= cls._update_object(data=f_data, fmt=fmt)
        return r

Dmitry Volodin's avatar
Dmitry Volodin committed
322
323
324
325
326
327
328
329
330
331
    @classmethod
    def delete_object(cls, id):
        """
        Mark object as deleted
        :param id:
        :return:
        """
        cls.update_object(id, delete=True)

    @classmethod
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
    def iter_formats(
        cls,
    ) -> Iterable[Tuple[str, Callable[[Dict[str, Any]], Iterable[Dict[str, Any]]]]]:
        # Do not load in datastream service
        DataStreamConfig = getattr(cls, "_DataStreamConfig", None)
        if not DataStreamConfig:
            cls._DataStreamConfig = get_model("main.DataStreamConfig")
            DataStreamConfig = cls._DataStreamConfig

        cfg = DataStreamConfig.get_by_name(cls.name)
        if cfg:
            yield from cfg.iter_formats()

    @classmethod
    def get_total(cls, fmt=None):
Dmitry Volodin's avatar
Dmitry Volodin committed
347
348
349
350
        """
        Return total amount of items in datastream
        :return:
        """
351
        return cls.get_collection(fmt).estimated_document_count()
Dmitry Volodin's avatar
Dmitry Volodin committed
352

353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
    @classmethod
    def clean_change_id(cls, change_id):
        """
        Convert change_id to ObjectId. Following formats are possible:
        * ObjectId
        * string containing ObjectId
        * ISO 8601 timestamp either in form
          * YYYY-DD-MM
          * YYYY-DD-MMThh:mm:ss
        :param change_id: Cleaned change_id
        :return:
        """
        # ObjectId
        if isinstance(change_id, bson.ObjectId):
            return change_id
        # String with timestamp or ObjectId
369
        if not isinstance(change_id, str):
370
371
372
373
374
375
376
377
378
379
380
381
382
383
            raise ValueError("Invalid change_id")
        if cls.rx_ts.search(change_id):
            # Timestamp
            try:
                ts = dateutil.parser.parse(change_id)
                return bson.ObjectId.from_datetime(ts)
            except ValueError as e:
                raise ValueError(str(e))
        # ObjectId
        try:
            return bson.ObjectId(change_id)
        except (bson.errors.InvalidId, TypeError) as e:
            raise ValueError(str(e))

384
    @classmethod
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
    def is_moved(cls, meta: Dict[str, List[Any]], meta_filters: Dict[str, Any]) -> bool:
        """
        Check record is out of filter scope. Check filter diff on meta and record value
        :param meta:
        :param meta_filters:
        :return:
        """
        for field, field_value in meta_filters.items():
            if not field.startswith("meta."):
                # Not meta query
                continue
            _, field = field.split(".", 1)
            if field not in meta or not meta[field]:
                continue
            if isinstance(meta[field], str):
                # Old meta format
                return False
402
403
404
405
            if "$elemMatch" in field_value:
                field_value = field_value["$elemMatch"]["$elemMatch"]["$in"]
            if isinstance(meta[field][0], list) and isinstance(field_value, list):
                return not set(field_value).intersection(set(meta[field][0]))
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
            if meta[field][0] != field_value:
                return True
        return False

    @classmethod
    async def iter_data_async(
        cls,
        change_id: str = None,
        limit: int = None,
        filters: List[str] = None,
        fmt=None,
        filter_policy: Optional[str] = None,
    ):
        """
        Iterate over data items beginning from change id

        Raises ValueError if filters has incorrect input parameters
        :param change_id: Staring change id
        :param limit: Records limit
        :param filters: List of strings with filter expression
        :param fmt: Format
        :param filter_policy: Metadata changed policy. Behavior if metadata change out of filter scope
                   * default - no changes
                   * delete - return $delete message
                   * keep - ignore filter, return full record
                   * move - return $moved message
        :return: (id, change_id, data)
        """
        q, meta_filters = {}, {}
435
        if filters:
436
437
438
            cf = cls.compile_filters(filters)
            q.update(cf)
            meta_filters = {k: v for k, v in cf.items() if k.startswith("meta.")}
439
440
441
442
        if change_id:
            q[cls.F_CHANGEID] = {"$gt": cls.clean_change_id(change_id)}
        coll = cls.get_collection_async(fmt)
        async for doc in (
443
            coll.find(q, {cls.F_ID: 1, cls.F_CHANGEID: 1, cls.F_DATA: 1, cls.F_META: 1})
444
445
446
            .sort([(cls.F_CHANGEID, pymongo.ASCENDING)])
            .limit(limit=limit or cls.DEFAULT_LIMIT)
        ):
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
            data = doc[cls.F_DATA]
            if not meta_filters or filter_policy == "keep":
                # Optimization for exclude is_moved check
                yield doc[cls.F_ID], doc[cls.F_CHANGEID], data
                continue
            is_moved = cls.is_moved(doc[cls.F_META], meta_filters)
            if is_moved and (filter_policy == "default" or not filter_policy):
                # Default behavior - skip record
                continue
            elif is_moved and filter_policy in {"delete", "move"}:
                data = {cls.F_CHANGEID: str(doc[cls.F_CHANGEID])}
                h = {"delete": cls.get_deleted_object, "move": cls.get_moved_object}[filter_policy]
                data.update(h(doc[cls.F_ID]))
                data = smart_text(orjson.dumps(data))
            yield doc[cls.F_ID], doc[cls.F_CHANGEID], data
462

Dmitry Volodin's avatar
Dmitry Volodin committed
463
    @classmethod
464
465
466
467
468
469
470
471
    def iter_data(
        cls,
        change_id: str = None,
        limit: int = None,
        filters: List[str] = None,
        fmt=None,
        filter_policy: Optional[str] = None,
    ):
Dmitry Volodin's avatar
Dmitry Volodin committed
472
473
        """
        Iterate over data items beginning from change id
474
475

        Raises ValueError if filters has incorrect input parameters
Dmitry Volodin's avatar
Dmitry Volodin committed
476
477
        :param change_id: Staring change id
        :param limit: Records limit
478
        :param filters: List of strings with filter expression
479
        :param fmt: Format
480
481
482
483
484
        :param filter_policy: Metadata changed policy. Behavior if metadata change out of filter scope
                   * default - no changes
                   * delete - return $delete message
                   * keep - ignore filter, return full record
                   * move - return $moved message
485
        :return: (id, change_id, data)
Dmitry Volodin's avatar
Dmitry Volodin committed
486
        """
487
        q, meta_filters = {}, {}
488
        if filters:
489
490
491
            cf = cls.compile_filters(filters)
            q.update(cf)
            meta_filters = {k: v for k, v in cf.items() if k.startswith("meta.")}
Dmitry Volodin's avatar
Dmitry Volodin committed
492
        if change_id:
Dmitry Volodin's avatar
Dmitry Volodin committed
493
            q[cls.F_CHANGEID] = {"$gt": cls.clean_change_id(change_id)}
494
        coll = cls.get_collection(fmt)
Dmitry Volodin's avatar
Dmitry Volodin committed
495
        for doc in (
496
            coll.find(q, {cls.F_ID: 1, cls.F_CHANGEID: 1, cls.F_DATA: 1, cls.F_META: 1})
Dmitry Volodin's avatar
Dmitry Volodin committed
497
498
            .sort([(cls.F_CHANGEID, pymongo.ASCENDING)])
            .limit(limit=limit or cls.DEFAULT_LIMIT)
Dmitry Volodin's avatar
Dmitry Volodin committed
499
        ):
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
            data = doc[cls.F_DATA]
            if not meta_filters or filter_policy == "keep":
                # Optimization for exclude is_moved check
                yield doc[cls.F_ID], doc[cls.F_CHANGEID], data
                continue
            is_moved = cls.is_moved(doc[cls.F_META], meta_filters)
            if is_moved and (filter_policy == "default" or not filter_policy):
                # Default behavior - skip record
                continue
            elif is_moved and filter_policy in {"delete", "move"}:
                data = {cls.F_CHANGEID: str(doc[cls.F_CHANGEID])}
                h = {"delete": cls.get_deleted_object, "move": cls.get_moved_object}[filter_policy]
                data.update(h(doc[cls.F_ID]))
                data = smart_text(orjson.dumps(data))
            yield doc[cls.F_ID], doc[cls.F_CHANGEID], data
Dmitry Volodin's avatar
Dmitry Volodin committed
515

Dmitry Volodin's avatar
Dmitry Volodin committed
516
517
518
519
520
521
522
523
524
    @classmethod
    def on_change(cls, data):
        """
        Called when datastream changed. May alter data
        :param data:
        :return: True, if data is altered and hash must be recalculated
        """
        return False

Dmitry Volodin's avatar
Dmitry Volodin committed
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
    @classmethod
    def clean_id(cls, id):
        """
        Convert arbitrary string to id data type
        Raise ValueError if invalid type given
        :param id:
        :return:
        """
        return id

    @classmethod
    def clean_id_int(cls, id):
        """
        Convert arbitrary string id to int
        :param id:
        :return:
        """
        return int(id)
Dmitry Volodin's avatar
Dmitry Volodin committed
543

544
545
546
547
548
549
550
551
552
    @classmethod
    def clean_id_bson(cls, id):
        """
        Convert arbitrary string id to bson int
        :param id:
        :return:
        """
        return bson.ObjectId(id)

Dmitry Volodin's avatar
Dmitry Volodin committed
553
554
555
556
557
558
559
560
561
562
    @classmethod
    def wait(cls):
        """
        Block until datastream receives changes
        :return:
        """
        coll = cls.get_collection()
        with coll.watch() as stream:
            next(stream)
            return
563

Dmitry Volodin's avatar
Dmitry Volodin committed
564
565
566
567
568
569
570
571
572
573
574
    @staticmethod
    def qs(s):
        """
        Encode string to utf-8
        :param s:
        :return:
        """
        if not s:
            return ""
        if isinstance(s, datetime.datetime):
            return s.isoformat()
Dmitry Volodin's avatar
Dmitry Volodin committed
575
        return smart_text(s)
Dmitry Volodin's avatar
Dmitry Volodin committed
576

577
578
579
580
581
582
583
    @classmethod
    def _parse_filter(cls, expr):
        """
        Convert single filter expression to a S-expression
        :param expr: filter expression in form name(arg1, .., argN)
        :return: (name, arg1, argN)
        """
584
        if not isinstance(expr, str):
585
586
587
588
589
590
            raise ValueError("Expression must be string")
        i1 = expr.find("(")
        if i1 < 0:
            raise ValueError("Missed opening bracket")
        if not expr.endswith(")"):
            raise ValueError("Missed closing bracket")
Dmitry Volodin's avatar
Dmitry Volodin committed
591
        return [expr[:i1]] + [x.strip() for x in expr[i1 + 1 : -1].split(",")]
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622

    @classmethod
    def compile_filters(cls, exprs):
        """
        Compile list of filter expressions to MongoDB query
        :param exprs: List of strings with expressions
        :return: dict with query
        """
        if not isinstance(exprs, list):
            raise ValueError("expressions must be list of string")
        q = {}
        for fx in exprs:
            pv = cls._parse_filter(fx)
            h = getattr(cls, "filter_%s" % pv[0], None)
            if not h:
                raise ValueError("Invalid filter %s" % pv[0])
            q.update(h(*pv[1:]))
        return q

    @classmethod
    def filter_id(cls, id1, *args):
        """
        Filter by id. Usage:

        id(id1, .., idN)
        :param id1:
        :param args:
        :return:
        """
        ids = [cls.clean_id(id1)] + [cls.clean_id(x) for x in args]
        if len(ids) == 1:
Dmitry Volodin's avatar
Dmitry Volodin committed
623
            return {cls.F_ID: ids[0]}
Dmitry Volodin's avatar
Dmitry Volodin committed
624
        return {cls.F_ID: {"$in": ids}}
Dmitry Volodin's avatar
Dmitry Volodin committed
625
626
627
628
629
630
631
632
633

    @classmethod
    def filter_shard(cls, instance, n_instances):
        """
        Sharding by id
        :param instance:
        :param n_instances:
        :return:
        """
634
635
636
637
638
639
640
641
642
643
        # Raise ValueError if not integer
        instance = int(instance)
        n_instances = int(n_instances)
        #
        if n_instances < 1:
            raise ValueError("Invalid number of instances")
        if instance < 0:
            raise ValueError("Invalid instance")
        if instance >= n_instances:
            raise ValueError("Invalid instance")
Dmitry Volodin's avatar
Dmitry Volodin committed
644
        return {"_id": {"$mod": [n_instances, instance]}}
645

646
    @classmethod
647
648
649
650
651
652
653
654
655
656
657
658
659
    def get_format_role(cls, fmt: str) -> Optional[str]:
        """
        Returns format role, if any
        :param fmt:
        :return:
        """
        doc = get_db()["datastreamconfigs"].find_one({"name": cls.name})
        if not doc:
            return None
        for f in doc.get("formats", []):
            if f.get("name") == fmt:
                return f.get("role") or None
        return None
Dmitry Volodin's avatar
Dmitry Volodin committed
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690

    @classmethod
    def get_msg_headers(cls, data: Dict[str, Any]) -> Optional[Dict[str, bytes]]:
        return None

    @classmethod
    def send_message(cls, data: Dict[str, Any], change_id: bson.ObjectId) -> None:
        """
        Send MX message

        :param data:
        :param change_id:
        :return:
        """
        data["$changeid"] = str(change_id)
        # Build headers
        headers = {
            MX_CHANGE_ID: smart_bytes(change_id),
        }
        additional_headers = cls.get_msg_headers(data)
        if additional_headers:
            headers.update(additional_headers)
        # Schedule to send
        send_message(
            data,
            message_type=cls.name,
            headers=headers,
            sharding_key=hash_int(data["id"]) & 0xFFFFFFFF,
        )
        # Cleanup
        del data["$changeid"]