Commit cb6951f1 authored by Andrey Vertiprahov's avatar Andrey Vertiprahov
Browse files

release-22.1:Backport!6431

parent 2cd42a7a
...@@ -97,7 +97,7 @@ class ClickhouseClient(object): ...@@ -97,7 +97,7 @@ class ClickhouseClient(object):
:param to_table: :param to_table:
:return: :return:
""" """
self.execute(post=f"RENAME TABLE {from_table} TO {to_table};") self.execute(post=f"RENAME TABLE `{from_table}` TO `{to_table}`;")
def connection(host=None, port=None, read_only=True): def connection(host=None, port=None, read_only=True):
......
...@@ -25,7 +25,9 @@ def ensure_bi_models(connect=None): ...@@ -25,7 +25,9 @@ def ensure_bi_models(connect=None):
if not model: if not model:
continue continue
logger.info("Ensure table %s" % model._meta.db_table) logger.info("Ensure table %s" % model._meta.db_table)
changed |= model.ensure_schema(connect=connect)
changed |= model.ensure_table(connect=connect) changed |= model.ensure_table(connect=connect)
changed |= model.ensure_views(connect=connect, changed=changed)
return changed return changed
...@@ -43,6 +45,7 @@ def ensure_dictionary_models(connect=None): ...@@ -43,6 +45,7 @@ def ensure_dictionary_models(connect=None):
if table_changed: if table_changed:
logger.info("[%s] Drop Dictionary", name) logger.info("[%s] Drop Dictionary", name)
model.drop_dictionary(connect=connect) model.drop_dictionary(connect=connect)
model.ensure_views(connect=connect)
changed |= model.ensure_dictionary(connect=connect) changed |= model.ensure_dictionary(connect=connect)
return changed return changed
......
...@@ -61,7 +61,7 @@ class BaseField(object): ...@@ -61,7 +61,7 @@ class BaseField(object):
:return: :return:
""" """
if self.low_cardinality: if self.low_cardinality:
return "LowCardinality(%s)" % self.db_type return f"LowCardinality({self.db_type})"
return self.db_type return self.db_type
def get_displayed_type(self): def get_displayed_type(self):
...@@ -235,7 +235,7 @@ class ArrayField(BaseField): ...@@ -235,7 +235,7 @@ class ArrayField(BaseField):
return [self.field_type.to_json(v) for v in value] return [self.field_type.to_json(v) for v in value]
def get_db_type(self, name=None): def get_db_type(self, name=None):
return "Array(%s)" % self.field_type.get_db_type() return f"Array({self.field_type.get_db_type()})"
def get_displayed_type(self): def get_displayed_type(self):
return "Array(%s)" % self.field_type.get_db_type() return "Array(%s)" % self.field_type.get_db_type()
...@@ -252,6 +252,12 @@ class MaterializedField(BaseField): ...@@ -252,6 +252,12 @@ class MaterializedField(BaseField):
self.field_type = field_type self.field_type = field_type
self.expression = expression self.expression = expression
def get_db_type(self, name=None):
return f"LowCardinality({self.field_type.get_db_type()})"
def iter_create_sql(self):
yield self.name, f"{self.get_db_type()} MATERIALIZED {self.expression}"
class ReferenceField(BaseField): class ReferenceField(BaseField):
db_type = "UInt64" db_type = "UInt64"
...@@ -321,36 +327,21 @@ class IPv6Field(BaseField): ...@@ -321,36 +327,21 @@ class IPv6Field(BaseField):
class AggregatedField(BaseField): class AggregatedField(BaseField):
def __init__( def __init__(self, expression, field_type, agg_function, params=None, description=None):
self, source_field, field_type, agg_function, params=None, description=None, f_expr=None
):
super().__init__(description=description) super().__init__(description=description)
self.source_field = source_field
self.field_type = field_type self.field_type = field_type
self.is_agg = True
self.agg_function = agg_function self.agg_function = agg_function
self.f_expr = f_expr self.expression = expression
self.params = params self.params = params
def to_json(self, value): def to_json(self, value):
return self.field_type.to_json(value) return self.field_type.to_json(value)
@property def get_db_type(self, name=None):
def db_type(self): return f"AggregateFunction({self.agg_function}, {self.agg_function.get_db_type(self.field_type)})"
if isinstance(self.field_type, tuple):
return ",".join(x.db_type for x in self.field_type) def get_expression(self, combinator: str = None):
return self.field_type.db_type return self.agg_function.get_expression(self, combinator)
def get_create_sql(self):
field_type = self.field_type
if isinstance(field_type, tuple):
field_type = ", ".join(self.field_type)
return f"`{self.name}` AggregateFunction({self.agg_function}, {field_type})"
def get_expr(self):
if self.f_expr:
self.f_expr(self.name)
return self.source_field
# return self.f_expr.format(p={"field": self.name, "function": function, "f_param": f_param}) # return self.f_expr.format(p={"field": self.name, "function": function, "f_param": f_param})
# return "{p[function]}Merge({p[field]}_{p[function]})" # return "{p[function]}Merge({p[field]}_{p[function]})"
...@@ -360,7 +351,7 @@ class NestedField(ArrayField): ...@@ -360,7 +351,7 @@ class NestedField(ArrayField):
def iter_create_sql(self): def iter_create_sql(self):
for nested_field in self.field_type._meta.ordered_fields: for nested_field in self.field_type._meta.ordered_fields:
yield "%s.%s" % (self.name, nested_field.name), self.get_db_type(nested_field.name) yield f"{self.name}.{nested_field.name}", self.get_db_type(nested_field.name)
def apply_json(self, row_json, value): def apply_json(self, row_json, value):
arrays = defaultdict(list) arrays = defaultdict(list)
......
# ----------------------------------------------------------------------
# ClickHouse functions
# ----------------------------------------------------------------------
# Copyright (C) 2007-2022 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
from typing import Optional
class AggregateFunction(object):
db_name = None
def __init__(self, function: Optional[str] = None, **params):
self.function = self.db_name or function
self.params = params
def __repr__(self):
return self.function
# get expression
def get_expression(self, field, combinator: Optional[str] = None, **params):
expr = field.expression
if combinator == "Merge":
expr = field.name
return f"{self.function}{combinator or ''}({expr})"
def get_db_type(self, field):
return field.get_db_type()
class ArgMax(AggregateFunction):
db_name = "argMax"
def get_expression(self, field, combinator: Optional[str] = None, **params):
if combinator == "Merge":
return f"{self.function}{combinator or ''}({field.name})"
return f"{self.function}{combinator or ''}({field.expression}, ts)"
def get_db_type(self, field):
return f"{field.get_db_type()}, DateTime"
...@@ -13,11 +13,11 @@ from time import perf_counter ...@@ -13,11 +13,11 @@ from time import perf_counter
from noc.config import config from noc.config import config
from noc.sa.models.useraccess import UserAccess from noc.sa.models.useraccess import UserAccess
from noc.sa.models.managedobject import ManagedObject from noc.sa.models.managedobject import ManagedObject
from .fields import BaseField from .fields import BaseField, MaterializedField, AggregatedField
from .engines import ReplacingMergeTree from .engines import ReplacingMergeTree, AggregatingMergeTree
from .connect import ClickhouseClient, connection from .connect import ClickhouseClient, connection
__all__ = ["Model", "NestedModel"] __all__ = ["Model", "NestedModel", "DictionaryModel", "ViewModel"]
OLD_PM_SCHEMA_TABLE = "noc_old" OLD_PM_SCHEMA_TABLE = "noc_old"
...@@ -31,9 +31,12 @@ class ModelBase(type): ...@@ -31,9 +31,12 @@ class ModelBase(type):
engine=getattr(cls.Meta, "engine", None), engine=getattr(cls.Meta, "engine", None),
db_table=getattr(cls.Meta, "db_table", None), db_table=getattr(cls.Meta, "db_table", None),
description=getattr(cls.Meta, "description", None), description=getattr(cls.Meta, "description", None),
# Sample key
sample=getattr(cls.Meta, "sample", False), sample=getattr(cls.Meta, "sample", False),
tags=getattr(cls.Meta, "tags", None), tags=getattr(cls.Meta, "tags", None),
managed=getattr(cls.Meta, "managed", True), # Run migrate for model
run_migrate=getattr(cls.Meta, "run_migrate", True),
# Local (non-distributed) table
is_local=getattr(cls.Meta, "is_local", False), is_local=getattr(cls.Meta, "is_local", False),
view_table_source=getattr(cls.Meta, "view_table_source", None), view_table_source=getattr(cls.Meta, "view_table_source", None),
) )
...@@ -54,7 +57,8 @@ class ModelMeta(object): ...@@ -54,7 +57,8 @@ class ModelMeta(object):
description=None, description=None,
sample=False, sample=False,
tags=None, tags=None,
managed=True, # Run migrate for model
run_migrate=True,
is_local=False, # Local table, do not create distributed is_local=False, # Local table, do not create distributed
view_table_source=None, view_table_source=None,
): ):
...@@ -63,7 +67,7 @@ class ModelMeta(object): ...@@ -63,7 +67,7 @@ class ModelMeta(object):
self.description = description self.description = description
self.sample = sample self.sample = sample
self.tags = tags self.tags = tags
self.managed = managed self.run_migrate = run_migrate
self.view_table_source = view_table_source self.view_table_source = view_table_source
self.is_local = is_local self.is_local = is_local
self.fields = {} # name -> Field self.fields = {} # name -> Field
...@@ -93,7 +97,6 @@ class Model(object, metaclass=ModelBase): ...@@ -93,7 +97,6 @@ class Model(object, metaclass=ModelBase):
description = None description = None
sample = False sample = False
tags = None tags = None
managed = True
def __init__(self, **kwargs): def __init__(self, **kwargs):
self.values = kwargs self.values = kwargs
...@@ -148,7 +151,7 @@ class Model(object, metaclass=ModelBase): ...@@ -148,7 +151,7 @@ class Model(object, metaclass=ModelBase):
:return: SQL code with field creation statements :return: SQL code with field creation statements
""" """
return ",\n".join( return ",\n".join(
"%s %s" % (cls.quote_name(name), db_type) for name, db_type in cls.iter_create_sql() f"{cls.quote_name(name)} {db_type}" for name, db_type in cls.iter_create_sql()
) )
@classmethod @classmethod
...@@ -232,18 +235,18 @@ class Model(object, metaclass=ModelBase): ...@@ -232,18 +235,18 @@ class Model(object, metaclass=ModelBase):
c_type = cls._meta.fields[name].get_db_type(nested_name) c_type = cls._meta.fields[name].get_db_type(nested_name)
if existing[field_name] != c_type: if existing[field_name] != c_type:
print( print(
"Warning! Type mismatch for column %s: %s <> %s" f"[{table_name}|{field_name}] Warning! Type mismatch: "
% (field_name, existing[field_name], c_type) f"{existing[field_name]} <> {c_type}"
) )
print( print(
"Set command manually: ALTER TABLE %s MODIFY COLUMN %s %s" f"Set command manually: "
% (table_name, field_name, c_type) f"ALTER TABLE {table_name} MODIFY COLUMN {field_name} {c_type}"
) )
else: else:
print(f"[{table_name}|{field_name}] Alter column") print(f"[{table_name}|{field_name}] Alter column")
query = ( query = (
f"ALTER TABLE {table_name} ADD COLUMN {cls.quote_name(field_name)} {db_type}" f"ALTER TABLE `{table_name}` ADD COLUMN {cls.quote_name(field_name)} {db_type}"
) )
if after: if after:
# None if add before first field # None if add before first field
...@@ -256,7 +259,11 @@ class Model(object, metaclass=ModelBase): ...@@ -256,7 +259,11 @@ class Model(object, metaclass=ModelBase):
return c return c
@classmethod @classmethod
def ensure_schema(cls, connect: "ClickhouseClient", table_name: str) -> bool: def ensure_schema(cls, connect=None) -> bool:
return False
@classmethod
def check_old_schema(cls, connect: "ClickhouseClient", table_name: str) -> bool:
""" """
Ensure create table Syntax. False for old Syntax, True for New Ensure create table Syntax. False for old Syntax, True for New
:param connect: :param connect:
...@@ -277,13 +284,13 @@ class Model(object, metaclass=ModelBase): ...@@ -277,13 +284,13 @@ class Model(object, metaclass=ModelBase):
return not bool(r) return not bool(r)
@classmethod @classmethod
def ensure_table(cls, connect=None): def ensure_table(cls, connect=None) -> bool:
""" """
Check table is exists Check table is exists
:param connect: :param connect:
:return: True, if table has been altered, False otherwise :return: True, if table has been altered, False otherwise
""" """
if not cls._meta.managed: if not cls._meta.run_migrate:
return False return False
changed = False changed = False
ch: "ClickhouseClient" = connect or connection() ch: "ClickhouseClient" = connect or connection()
...@@ -299,7 +306,7 @@ class Model(object, metaclass=ModelBase): ...@@ -299,7 +306,7 @@ class Model(object, metaclass=ModelBase):
ch.rename_table(table, raw_table) ch.rename_table(table, raw_table)
changed = True changed = True
# Old schema # Old schema
if ch.has_table(raw_table) and not cls.ensure_schema(ch, raw_table): if ch.has_table(raw_table) and not cls.check_old_schema(ch, raw_table):
# Old schema, data table will be move to old_noc db for save data. # Old schema, data table will be move to old_noc db for save data.
print(f"[{table}] Old Schema Move Data to {OLD_PM_SCHEMA_TABLE}.{raw_table}") print(f"[{table}] Old Schema Move Data to {OLD_PM_SCHEMA_TABLE}.{raw_table}")
ch.ensure_db(OLD_PM_SCHEMA_TABLE) ch.ensure_db(OLD_PM_SCHEMA_TABLE)
...@@ -322,13 +329,23 @@ class Model(object, metaclass=ModelBase): ...@@ -322,13 +329,23 @@ class Model(object, metaclass=ModelBase):
else: else:
ch.execute(post=cls.cget_create_distributed_sql()) ch.execute(post=cls.cget_create_distributed_sql())
changed = True changed = True
# Synchronize view if not ch.has_table(table, is_view=True):
if changed or not ch.has_table(table, is_view=True):
print(f"[{table}] Synchronize view") print(f"[{table}] Synchronize view")
ch.execute(post=cls.get_create_view_sql()) ch.execute(post=cls.get_create_view_sql())
changed = True changed = True
return changed return changed
@classmethod
def ensure_views(cls, connect=None, changed: bool = True) -> bool:
# Synchronize view
ch: "ClickhouseClient" = connect or connection()
table = cls._get_db_table()
if changed or not ch.has_table(table, is_view=True):
print(f"[{table}] Synchronize view")
ch.execute(post=cls.get_create_view_sql())
return True
return False
@classmethod @classmethod
def transform_query(cls, query, user): def transform_query(cls, query, user):
""" """
...@@ -492,7 +509,7 @@ class DictionaryBase(ModelBase): ...@@ -492,7 +509,7 @@ class DictionaryBase(ModelBase):
primary_key=getattr(cls.Meta, "primary_key", ("bi_id",)), primary_key=getattr(cls.Meta, "primary_key", ("bi_id",)),
incremental_update=getattr(cls.Meta, "incremental_update", False), incremental_update=getattr(cls.Meta, "incremental_update", False),
description=getattr(cls.Meta, "description", None), description=getattr(cls.Meta, "description", None),
managed=getattr(cls.Meta, "managed", True), run_migrate=getattr(cls.Meta, "run_migrate", True),
is_local=getattr(cls.Meta, "is_local", True), is_local=getattr(cls.Meta, "is_local", True),
view_table_source=getattr(cls.Meta, "view_table_source", None), view_table_source=getattr(cls.Meta, "view_table_source", None),
# For Dictionary table # For Dictionary table
...@@ -521,7 +538,7 @@ class DictionaryMeta(object): ...@@ -521,7 +538,7 @@ class DictionaryMeta(object):
incremental_update=False, incremental_update=False,
db_table=None, db_table=None,
description=None, description=None,
managed=True, run_migrate=True,
is_local=False, # Local table, do not create distributed is_local=False, # Local table, do not create distributed
view_table_source=None, view_table_source=None,
layout=None, layout=None,
...@@ -536,7 +553,7 @@ class DictionaryMeta(object): ...@@ -536,7 +553,7 @@ class DictionaryMeta(object):
self.db_table = db_table or name self.db_table = db_table or name
self.description = description self.description = description
self.view_table_source = view_table_source self.view_table_source = view_table_source
self.managed = managed self.run_migrate = run_migrate
self.is_local = is_local self.is_local = is_local
self.layout = layout self.layout = layout
self.lifetime_min = lifetime_min self.lifetime_min = lifetime_min
...@@ -741,3 +758,98 @@ class DictionaryModel(Model, metaclass=DictionaryBase): ...@@ -741,3 +758,98 @@ class DictionaryModel(Model, metaclass=DictionaryBase):
@classmethod @classmethod
def extract(cls, item): def extract(cls, item):
raise NotImplementedError raise NotImplementedError
class ViewModel(Model, metaclass=ModelBase):
# ViewModel
@classmethod
def is_aggregate(cls) -> bool:
return isinstance(cls._meta.engine, AggregatingMergeTree)
@classmethod
def ensure_schema(cls, connect=None) -> bool:
if not cls.is_aggregate():
return False
ch = connect or connection()
old_table_name = f".inner.{cls._get_raw_db_table()}"
if not ch.has_table(old_table_name):
return False
print(f"[{old_table_name}] Migrate old materialized view. For use TO statement")
# Drop view
ch.execute(post=f"DROP VIEW IF EXISTS {cls._get_db_table()}")
cls.drop_view(connect=ch)
# Rename table
ch.rename_table(old_table_name, cls._get_raw_db_table())
return True
@classmethod
def get_create_view_sql(cls):
r = []
group_by = []
for field in cls._meta.ordered_fields:
if isinstance(field, AggregatedField):
r += [f"{field.get_expression(combinator='Merge')} AS {cls.quote_name(field.name)}"]
else:
r += [f"{cls.quote_name(field.name)} "]
group_by += [cls.quote_name(field.name)]
r = [",\n".join(r)]
if config.clickhouse.cluster:
r += [f"FROM {cls._get_distributed_db_table()} "]
else:
r += [f"FROM {cls._get_raw_db_table()} "]
r += [f'GROUP BY {",".join(group_by)} ']
return f'CREATE OR REPLACE VIEW {cls._get_db_table()} AS SELECT {" ".join(r)}'
@classmethod
def get_create_select_sql(cls):
r = []
group_by = []
for field in cls._meta.ordered_fields:
if isinstance(field, MaterializedField):
continue
elif isinstance(field, AggregatedField):
r += [f"{field.get_expression(combinator='State')} AS {cls.quote_name(field.name)}"]
else:
r += [f"{cls.quote_name(field.name)} "]
group_by += [cls.quote_name(field.name)]
r = [",\n".join(r)]
r += [f"FROM {cls.Meta.view_table_source} "]
r += [f'GROUP BY {",".join(group_by)} ']
return "\n".join(r)
@classmethod
def detach_view(cls, connect=None):
ch = connect or connection()
ch.execute(post=f" DETACH VIEW IF EXISTS {cls._get_raw_db_table()}")
return True
@classmethod
def drop_view(cls, connect=None):
ch = connect or connection()
ch.execute(post=f" DROP VIEW IF EXISTS {cls._get_raw_db_table()}")
return True
@classmethod
def ensure_mv_trigger_view(cls, connect=None):
mv_name = f"mv_{cls._get_raw_db_table()}"
connect.execute(post=f" DROP VIEW IF EXISTS {mv_name}")
return "\n".join(
[
f"CREATE MATERIALIZED VIEW IF NOT EXISTS {mv_name}"
f"TO {cls._get_raw_db_table()} "
f"AS SELECT {cls.get_create_select_sql()}",
]
)
@classmethod
def ensure_views(cls, connect=None, changed: bool = True) -> bool:
# Synchronize view
ch: "ClickhouseClient" = connect or connection()
table = cls._get_db_table()
if changed or not ch.has_table(table, is_view=True):
print(f"[{table}] Synchronize view")
cls.ensure_mv_trigger_view()
ch.execute(post=cls.get_create_view_sql())
return True
return False
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment