Commit 1496b522 authored by Andrey Vertiprahov's avatar Andrey Vertiprahov
Browse files

Merge branch 'cherry-pick-820ace7e' into 'release-22.1'

release-22.1:Backport!6431

See merge request noc/noc!6432
parents 2cd42a7a cb6951f1
......@@ -97,7 +97,7 @@ class ClickhouseClient(object):
:param to_table:
:return:
"""
self.execute(post=f"RENAME TABLE {from_table} TO {to_table};")
self.execute(post=f"RENAME TABLE `{from_table}` TO `{to_table}`;")
def connection(host=None, port=None, read_only=True):
......
......@@ -25,7 +25,9 @@ def ensure_bi_models(connect=None):
if not model:
continue
logger.info("Ensure table %s" % model._meta.db_table)
changed |= model.ensure_schema(connect=connect)
changed |= model.ensure_table(connect=connect)
changed |= model.ensure_views(connect=connect, changed=changed)
return changed
......@@ -43,6 +45,7 @@ def ensure_dictionary_models(connect=None):
if table_changed:
logger.info("[%s] Drop Dictionary", name)
model.drop_dictionary(connect=connect)
model.ensure_views(connect=connect)
changed |= model.ensure_dictionary(connect=connect)
return changed
......
......@@ -61,7 +61,7 @@ class BaseField(object):
:return:
"""
if self.low_cardinality:
return "LowCardinality(%s)" % self.db_type
return f"LowCardinality({self.db_type})"
return self.db_type
def get_displayed_type(self):
......@@ -235,7 +235,7 @@ class ArrayField(BaseField):
return [self.field_type.to_json(v) for v in value]
def get_db_type(self, name=None):
return "Array(%s)" % self.field_type.get_db_type()
return f"Array({self.field_type.get_db_type()})"
def get_displayed_type(self):
return "Array(%s)" % self.field_type.get_db_type()
......@@ -252,6 +252,12 @@ class MaterializedField(BaseField):
self.field_type = field_type
self.expression = expression
def get_db_type(self, name=None):
return f"LowCardinality({self.field_type.get_db_type()})"
def iter_create_sql(self):
yield self.name, f"{self.get_db_type()} MATERIALIZED {self.expression}"
class ReferenceField(BaseField):
db_type = "UInt64"
......@@ -321,36 +327,21 @@ class IPv6Field(BaseField):
class AggregatedField(BaseField):
def __init__(
self, source_field, field_type, agg_function, params=None, description=None, f_expr=None
):
def __init__(self, expression, field_type, agg_function, params=None, description=None):
super().__init__(description=description)
self.source_field = source_field
self.field_type = field_type
self.is_agg = True
self.agg_function = agg_function
self.f_expr = f_expr
self.expression = expression
self.params = params
def to_json(self, value):
return self.field_type.to_json(value)
@property
def db_type(self):
if isinstance(self.field_type, tuple):
return ",".join(x.db_type for x in self.field_type)
return self.field_type.db_type
def get_create_sql(self):
field_type = self.field_type
if isinstance(field_type, tuple):
field_type = ", ".join(self.field_type)
return f"`{self.name}` AggregateFunction({self.agg_function}, {field_type})"
def get_expr(self):
if self.f_expr:
self.f_expr(self.name)
return self.source_field
def get_db_type(self, name=None):
return f"AggregateFunction({self.agg_function}, {self.agg_function.get_db_type(self.field_type)})"
def get_expression(self, combinator: str = None):
return self.agg_function.get_expression(self, combinator)
# return self.f_expr.format(p={"field": self.name, "function": function, "f_param": f_param})
# return "{p[function]}Merge({p[field]}_{p[function]})"
......@@ -360,7 +351,7 @@ class NestedField(ArrayField):
def iter_create_sql(self):
for nested_field in self.field_type._meta.ordered_fields:
yield "%s.%s" % (self.name, nested_field.name), self.get_db_type(nested_field.name)
yield f"{self.name}.{nested_field.name}", self.get_db_type(nested_field.name)
def apply_json(self, row_json, value):
arrays = defaultdict(list)
......
# ----------------------------------------------------------------------
# ClickHouse functions
# ----------------------------------------------------------------------
# Copyright (C) 2007-2022 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
from typing import Optional
class AggregateFunction(object):
db_name = None
def __init__(self, function: Optional[str] = None, **params):
self.function = self.db_name or function
self.params = params
def __repr__(self):
return self.function
# get expression
def get_expression(self, field, combinator: Optional[str] = None, **params):
expr = field.expression
if combinator == "Merge":
expr = field.name
return f"{self.function}{combinator or ''}({expr})"
def get_db_type(self, field):
return field.get_db_type()
class ArgMax(AggregateFunction):
db_name = "argMax"
def get_expression(self, field, combinator: Optional[str] = None, **params):
if combinator == "Merge":
return f"{self.function}{combinator or ''}({field.name})"
return f"{self.function}{combinator or ''}({field.expression}, ts)"
def get_db_type(self, field):
return f"{field.get_db_type()}, DateTime"
......@@ -13,11 +13,11 @@ from time import perf_counter
from noc.config import config
from noc.sa.models.useraccess import UserAccess
from noc.sa.models.managedobject import ManagedObject
from .fields import BaseField
from .engines import ReplacingMergeTree
from .fields import BaseField, MaterializedField, AggregatedField
from .engines import ReplacingMergeTree, AggregatingMergeTree
from .connect import ClickhouseClient, connection
__all__ = ["Model", "NestedModel"]
__all__ = ["Model", "NestedModel", "DictionaryModel", "ViewModel"]
OLD_PM_SCHEMA_TABLE = "noc_old"
......@@ -31,9 +31,12 @@ class ModelBase(type):
engine=getattr(cls.Meta, "engine", None),
db_table=getattr(cls.Meta, "db_table", None),
description=getattr(cls.Meta, "description", None),
# Sample key
sample=getattr(cls.Meta, "sample", False),
tags=getattr(cls.Meta, "tags", None),
managed=getattr(cls.Meta, "managed", True),
# Run migrate for model
run_migrate=getattr(cls.Meta, "run_migrate", True),
# Local (non-distributed) table
is_local=getattr(cls.Meta, "is_local", False),
view_table_source=getattr(cls.Meta, "view_table_source", None),
)
......@@ -54,7 +57,8 @@ class ModelMeta(object):
description=None,
sample=False,
tags=None,
managed=True,
# Run migrate for model
run_migrate=True,
is_local=False, # Local table, do not create distributed
view_table_source=None,
):
......@@ -63,7 +67,7 @@ class ModelMeta(object):
self.description = description
self.sample = sample
self.tags = tags
self.managed = managed
self.run_migrate = run_migrate
self.view_table_source = view_table_source
self.is_local = is_local
self.fields = {} # name -> Field
......@@ -93,7 +97,6 @@ class Model(object, metaclass=ModelBase):
description = None
sample = False
tags = None
managed = True
def __init__(self, **kwargs):
self.values = kwargs
......@@ -148,7 +151,7 @@ class Model(object, metaclass=ModelBase):
:return: SQL code with field creation statements
"""
return ",\n".join(
"%s %s" % (cls.quote_name(name), db_type) for name, db_type in cls.iter_create_sql()
f"{cls.quote_name(name)} {db_type}" for name, db_type in cls.iter_create_sql()
)
@classmethod
......@@ -232,18 +235,18 @@ class Model(object, metaclass=ModelBase):
c_type = cls._meta.fields[name].get_db_type(nested_name)
if existing[field_name] != c_type:
print(
"Warning! Type mismatch for column %s: %s <> %s"
% (field_name, existing[field_name], c_type)
f"[{table_name}|{field_name}] Warning! Type mismatch: "
f"{existing[field_name]} <> {c_type}"
)
print(
"Set command manually: ALTER TABLE %s MODIFY COLUMN %s %s"
% (table_name, field_name, c_type)
f"Set command manually: "
f"ALTER TABLE {table_name} MODIFY COLUMN {field_name} {c_type}"
)
else:
print(f"[{table_name}|{field_name}] Alter column")
query = (
f"ALTER TABLE {table_name} ADD COLUMN {cls.quote_name(field_name)} {db_type}"
f"ALTER TABLE `{table_name}` ADD COLUMN {cls.quote_name(field_name)} {db_type}"
)
if after:
# None if add before first field
......@@ -256,7 +259,11 @@ class Model(object, metaclass=ModelBase):
return c
@classmethod
def ensure_schema(cls, connect: "ClickhouseClient", table_name: str) -> bool:
def ensure_schema(cls, connect=None) -> bool:
return False
@classmethod
def check_old_schema(cls, connect: "ClickhouseClient", table_name: str) -> bool:
"""
Ensure create table Syntax. False for old Syntax, True for New
:param connect:
......@@ -277,13 +284,13 @@ class Model(object, metaclass=ModelBase):
return not bool(r)
@classmethod
def ensure_table(cls, connect=None):
def ensure_table(cls, connect=None) -> bool:
"""
Check table is exists
:param connect:
:return: True, if table has been altered, False otherwise
"""
if not cls._meta.managed:
if not cls._meta.run_migrate:
return False
changed = False
ch: "ClickhouseClient" = connect or connection()
......@@ -299,7 +306,7 @@ class Model(object, metaclass=ModelBase):
ch.rename_table(table, raw_table)
changed = True
# Old schema
if ch.has_table(raw_table) and not cls.ensure_schema(ch, raw_table):
if ch.has_table(raw_table) and not cls.check_old_schema(ch, raw_table):
# Old schema, data table will be move to old_noc db for save data.
print(f"[{table}] Old Schema Move Data to {OLD_PM_SCHEMA_TABLE}.{raw_table}")
ch.ensure_db(OLD_PM_SCHEMA_TABLE)
......@@ -322,13 +329,23 @@ class Model(object, metaclass=ModelBase):
else:
ch.execute(post=cls.cget_create_distributed_sql())
changed = True
# Synchronize view
if changed or not ch.has_table(table, is_view=True):
if not ch.has_table(table, is_view=True):
print(f"[{table}] Synchronize view")
ch.execute(post=cls.get_create_view_sql())
changed = True
return changed
@classmethod
def ensure_views(cls, connect=None, changed: bool = True) -> bool:
# Synchronize view
ch: "ClickhouseClient" = connect or connection()
table = cls._get_db_table()
if changed or not ch.has_table(table, is_view=True):
print(f"[{table}] Synchronize view")
ch.execute(post=cls.get_create_view_sql())
return True
return False
@classmethod
def transform_query(cls, query, user):
"""
......@@ -492,7 +509,7 @@ class DictionaryBase(ModelBase):
primary_key=getattr(cls.Meta, "primary_key", ("bi_id",)),
incremental_update=getattr(cls.Meta, "incremental_update", False),
description=getattr(cls.Meta, "description", None),
managed=getattr(cls.Meta, "managed", True),
run_migrate=getattr(cls.Meta, "run_migrate", True),
is_local=getattr(cls.Meta, "is_local", True),
view_table_source=getattr(cls.Meta, "view_table_source", None),
# For Dictionary table
......@@ -521,7 +538,7 @@ class DictionaryMeta(object):
incremental_update=False,
db_table=None,
description=None,
managed=True,
run_migrate=True,
is_local=False, # Local table, do not create distributed
view_table_source=None,
layout=None,
......@@ -536,7 +553,7 @@ class DictionaryMeta(object):
self.db_table = db_table or name
self.description = description
self.view_table_source = view_table_source
self.managed = managed
self.run_migrate = run_migrate
self.is_local = is_local
self.layout = layout
self.lifetime_min = lifetime_min
......@@ -741,3 +758,98 @@ class DictionaryModel(Model, metaclass=DictionaryBase):
@classmethod
def extract(cls, item):
raise NotImplementedError
class ViewModel(Model, metaclass=ModelBase):
# ViewModel
@classmethod
def is_aggregate(cls) -> bool:
return isinstance(cls._meta.engine, AggregatingMergeTree)
@classmethod
def ensure_schema(cls, connect=None) -> bool:
if not cls.is_aggregate():
return False
ch = connect or connection()
old_table_name = f".inner.{cls._get_raw_db_table()}"
if not ch.has_table(old_table_name):
return False
print(f"[{old_table_name}] Migrate old materialized view. For use TO statement")
# Drop view
ch.execute(post=f"DROP VIEW IF EXISTS {cls._get_db_table()}")
cls.drop_view(connect=ch)
# Rename table
ch.rename_table(old_table_name, cls._get_raw_db_table())
return True
@classmethod
def get_create_view_sql(cls):
r = []
group_by = []
for field in cls._meta.ordered_fields:
if isinstance(field, AggregatedField):
r += [f"{field.get_expression(combinator='Merge')} AS {cls.quote_name(field.name)}"]
else:
r += [f"{cls.quote_name(field.name)} "]
group_by += [cls.quote_name(field.name)]
r = [",\n".join(r)]
if config.clickhouse.cluster:
r += [f"FROM {cls._get_distributed_db_table()} "]
else:
r += [f"FROM {cls._get_raw_db_table()} "]
r += [f'GROUP BY {",".join(group_by)} ']
return f'CREATE OR REPLACE VIEW {cls._get_db_table()} AS SELECT {" ".join(r)}'
@classmethod
def get_create_select_sql(cls):
r = []
group_by = []
for field in cls._meta.ordered_fields:
if isinstance(field, MaterializedField):
continue
elif isinstance(field, AggregatedField):
r += [f"{field.get_expression(combinator='State')} AS {cls.quote_name(field.name)}"]
else:
r += [f"{cls.quote_name(field.name)} "]
group_by += [cls.quote_name(field.name)]
r = [",\n".join(r)]
r += [f"FROM {cls.Meta.view_table_source} "]
r += [f'GROUP BY {",".join(group_by)} ']
return "\n".join(r)
@classmethod
def detach_view(cls, connect=None):
ch = connect or connection()
ch.execute(post=f" DETACH VIEW IF EXISTS {cls._get_raw_db_table()}")
return True
@classmethod
def drop_view(cls, connect=None):
ch = connect or connection()
ch.execute(post=f" DROP VIEW IF EXISTS {cls._get_raw_db_table()}")
return True
@classmethod
def ensure_mv_trigger_view(cls, connect=None):
mv_name = f"mv_{cls._get_raw_db_table()}"
connect.execute(post=f" DROP VIEW IF EXISTS {mv_name}")
return "\n".join(
[
f"CREATE MATERIALIZED VIEW IF NOT EXISTS {mv_name}"
f"TO {cls._get_raw_db_table()} "
f"AS SELECT {cls.get_create_select_sql()}",
]
)
@classmethod
def ensure_views(cls, connect=None, changed: bool = True) -> bool:
# Synchronize view
ch: "ClickhouseClient" = connect or connection()
table = cls._get_db_table()
if changed or not ch.has_table(table, is_view=True):
print(f"[{table}] Synchronize view")
cls.ensure_mv_trigger_view()
ch.execute(post=cls.get_create_view_sql())
return True
return False
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment