Commit 299c7532 authored by Andrey Vertiprahov's avatar Andrey Vertiprahov
Browse files

Add migrate materialized view schema.

parent 046dfae7
......@@ -97,7 +97,7 @@ class ClickhouseClient(object):
:param to_table:
:return:
"""
self.execute(post=f"RENAME TABLE {from_table} TO {to_table};")
self.execute(post=f"RENAME TABLE `{from_table}` TO `{to_table}`;")
def connection(host=None, port=None, read_only=True):
......
......@@ -25,6 +25,7 @@ def ensure_bi_models(connect=None):
if not model:
continue
logger.info("Ensure table %s" % model._meta.db_table)
changed |= model.ensure_schema(connect=connect)
changed |= model.ensure_table(connect=connect)
changed |= model.ensure_views(connect=connect, changed=changed)
return changed
......
......@@ -260,7 +260,11 @@ class Model(object, metaclass=ModelBase):
return c
@classmethod
def ensure_schema(cls, connect: "ClickhouseClient", table_name: str) -> bool:
def ensure_schema(cls, connect=None) -> bool:
return False
@classmethod
def check_old_schema(cls, connect: "ClickhouseClient", table_name: str) -> bool:
"""
Ensure create table Syntax. False for old Syntax, True for New
:param connect:
......@@ -303,7 +307,7 @@ class Model(object, metaclass=ModelBase):
ch.rename_table(table, raw_table)
changed = True
# Old schema
if ch.has_table(raw_table) and not cls.ensure_schema(ch, raw_table):
if ch.has_table(raw_table) and not cls.check_old_schema(ch, raw_table):
# Old schema, data table will be move to old_noc db for save data.
print(f"[{table}] Old Schema Move Data to {OLD_PM_SCHEMA_TABLE}.{raw_table}")
ch.ensure_db(OLD_PM_SCHEMA_TABLE)
......@@ -764,6 +768,22 @@ class ViewModel(Model, metaclass=ModelBase):
def is_aggregate(cls) -> bool:
return isinstance(cls._meta.engine, AggregatingMergeTree)
@classmethod
def ensure_schema(cls, connect=None) -> bool:
if not cls.is_aggregate():
return False
ch = connect or connection()
old_table_name = f".inner.{cls._get_raw_db_table()}"
if not ch.has_table(old_table_name):
return False
print(f"[{old_table_name}] Migrate old materialized view. For use TO statement")
# Drop view
ch.execute(post=f"DROP VIEW IF EXISTS {cls._get_db_table()}")
cls.drop_view(connect=ch)
# Rename table
ch.rename_table(old_table_name, cls._get_raw_db_table())
return True
@classmethod
def get_create_view_sql(cls):
r = []
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment