Commit 3d8b04fb authored by Andrey Vertiprahov's avatar Andrey Vertiprahov
Browse files

Add ensure_labels to ETL loader.

parent 1ae1b6cb
Pipeline #36361 passed with stages
in 17 minutes and 37 seconds
......@@ -82,6 +82,8 @@ class BaseLoader(object):
workflow_event_model = False
workflow_add_event = "seen"
workflow_delete_event = "missed"
# Label
label_enable_setting = None
# Incremental
checkpoint_field = "checkpoint"
......@@ -100,6 +102,7 @@ class BaseLoader(object):
self.mappings_path = os.path.join(self.import_dir, "mappings.csv")
self.mappings = {}
self.wf_state_mappings = {}
self.ensured_labels = set()
self.new_state_path = None
self.c_add = 0
self.c_change = 0
......@@ -138,6 +141,8 @@ class BaseLoader(object):
self.has_remote_system: bool = hasattr(self.model, "remote_system")
if self.workflow_state_sync:
self.load_wf_state_mappings()
if self.label_enable_setting:
self.load_ensured_labels()
@property
def is_document(self):
......@@ -171,6 +176,15 @@ class BaseLoader(object):
for ws in State.objects.filter():
self.wf_state_mappings[(str(ws.workflow.id), ws.name)] = ws
def load_ensured_labels(self):
from noc.main.models.label import Label
self.logger.info("Loading Labels: %s", self.label_enable_setting)
for ll in Label.objects.filter(**{self.label_enable_setting: True}):
if ll.is_wildcard or ll.is_matched:
continue
self.ensured_labels.add(ll.name)
def get_new_state(self) -> Optional[TextIOWrapper]:
"""
Returns file object of new state, or None when not present
......@@ -242,7 +256,9 @@ class BaseLoader(object):
else:
if n.id == o.id:
# Changed
if n.dict(include=include_fields, exclude={self.checkpoint_field}) != o.dict(include=include_fields, exclude={self.checkpoint_field}):
if n.dict(include=include_fields, exclude={self.checkpoint_field}) != o.dict(
include=include_fields, exclude={self.checkpoint_field}
):
yield o, n
n = next(new, None)
o = next(old, None)
......@@ -349,6 +365,8 @@ class BaseLoader(object):
data structures
"""
self.logger.debug("Create object")
if "labels" in v:
self.ensure_labels(v["labels"])
o = self.model(**v)
try:
o.save()
......@@ -382,6 +400,8 @@ class BaseLoader(object):
for k, nv in v.items():
if k == self.checkpoint_field:
continue
if k == "labels":
self.ensure_labels(nv)
if inc_changes and k in inc_changes:
ov = getattr(o, k, [])
nv = list(set(ov).union(set(inc_changes[k]["add"])) - set(inc_changes[k]["remove"]))
......@@ -468,6 +488,12 @@ class BaseLoader(object):
self.logger.debug("Change workflow state: %s -> %s", o.state, state)
o.set_state(state, changed_date)
def ensure_labels(self, labels: List[str]):
from noc.main.models.label import Label
for ll in set(labels) - self.ensured_labels:
Label.ensure_label(**{"name": ll, self.label_enable_setting: True})
def purge(self):
"""
Perform pending deletes
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment