channel.py 2.33 KB
Newer Older
Dmitry Volodin's avatar
Dmitry Volodin committed
1
#!./bin/python
Dmitry Lukhtionov's avatar
Dmitry Lukhtionov committed
2
3
4
# ----------------------------------------------------------------------
# Write channel service
# ----------------------------------------------------------------------
Dmitry Volodin's avatar
Dmitry Volodin committed
5
# Copyright (C) 2007-2020 The NOC Project
Dmitry Lukhtionov's avatar
Dmitry Lukhtionov committed
6
7
# See LICENSE for details
# ----------------------------------------------------------------------
Dmitry Volodin's avatar
Dmitry Volodin committed
8

Dmitry Volodin's avatar
Dmitry Volodin committed
9
# Python modules
10
from time import perf_counter
Dmitry Volodin's avatar
Dmitry Volodin committed
11
from urllib.parse import quote as urllib_quote
Dmitry Volodin's avatar
Dmitry Volodin committed
12

Dmitry Volodin's avatar
Dmitry Volodin committed
13
14
15
# Third-party modules
from typing import List

Dmitry Volodin's avatar
Dmitry Volodin committed
16
# NOC modules
17
from noc.config import config
Dmitry Volodin's avatar
Dmitry Volodin committed
18

19

Dmitry Volodin's avatar
Dmitry Volodin committed
20
class Channel(object):
Dmitry Volodin's avatar
Dmitry Volodin committed
21
    def __init__(self, table: str, address: str, db: str):
22
        """
23
24
25
26
        :param table: ClickHouse table name
        :param address: ClickHouse address
        :param db: ClickHouse database

27
        :return:
Dmitry Volodin's avatar
Dmitry Volodin committed
28
        """
29
        self.name = table
30
31
        self.address = address
        self.db = db
32
        self.sql = "INSERT INTO %s FORMAT JSONEachRow" % table
Dmitry Volodin's avatar
Dmitry Volodin committed
33
        self.encoded_sql = urllib_quote(self.sql.encode("utf8"))
Dmitry Volodin's avatar
Dmitry Volodin committed
34
        self.n = 0
Dmitry Volodin's avatar
Dmitry Volodin committed
35
        self.data: List[bytes] = []
Dmitry Volodin's avatar
Dmitry Volodin committed
36
37
        self.last_updated = perf_counter()
        self.last_flushed = perf_counter()
Dmitry Volodin's avatar
Dmitry Volodin committed
38
        self.flushing = False
39
        self.url = "http://%s/?database=%s&query=%s" % (
40
41
            address,
            db,
Dmitry Volodin's avatar
Dmitry Volodin committed
42
            self.encoded_sql,
43
        )
Dmitry Volodin's avatar
Dmitry Volodin committed
44

Dmitry Volodin's avatar
Dmitry Volodin committed
45
46
    def feed(self, data: bytes):
        n = data.count(b"\n")
47
48
        if data[-1] != b"\n":
            n += 1
Dmitry Volodin's avatar
Dmitry Volodin committed
49
50
51
52
53
        self.n += n
        self.data += [data]
        return n

    def is_expired(self):
Dmitry Volodin's avatar
Dmitry Volodin committed
54
55
        if self.n:
            return False
Dmitry Volodin's avatar
Dmitry Volodin committed
56
        t = perf_counter()
Dmitry Volodin's avatar
Dmitry Volodin committed
57
58
        if self.data or self.flushing:
            return False
59
        return t - self.last_updated > config.chwriter.channel_expire_interval
Dmitry Volodin's avatar
Dmitry Volodin committed
60
61

    def is_ready(self):
62
        if not self.data or self.flushing:
Dmitry Volodin's avatar
Dmitry Volodin committed
63
            return False
64
        if self.n >= config.chwriter.batch_size:
Dmitry Volodin's avatar
Dmitry Volodin committed
65
            return True
Dmitry Volodin's avatar
Dmitry Volodin committed
66
        t = perf_counter()
67
        return (t - self.last_flushed) * 1000 >= config.chwriter.batch_delay_ms
Dmitry Volodin's avatar
Dmitry Volodin committed
68

Dmitry Volodin's avatar
Dmitry Volodin committed
69
    def get_data(self) -> bytes:
Dmitry Volodin's avatar
Dmitry Volodin committed
70
        self.n = 0
Dmitry Volodin's avatar
Dmitry Volodin committed
71
        data = b"\n".join(self.data)
Dmitry Volodin's avatar
Dmitry Volodin committed
72
73
74
75
76
77
78
79
        self.data = []
        return data

    def start_flushing(self):
        self.flushing = True

    def stop_flushing(self):
        self.flushing = False
Dmitry Volodin's avatar
Dmitry Volodin committed
80
        self.last_flushed = perf_counter()
Dmitry Volodin's avatar
Dmitry Volodin committed
81
82
83
84
85
86

    def get_insert_sql(self):
        return self.sql

    def get_encoded_insert_sql(self):
        return self.encoded_sql