Commit 4f7b0b89 authored by Andrey Vertiprahov's avatar Andrey Vertiprahov
Browse files

Merge branch 'noc-ietl' into 'master'

noc/noc#1780 Incremental ETL

See merge request noc/noc!6264
parents 1badded9 94c19cbc
# ----------------------------------------------------------------------
# Extract/Transfer/Load commands
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# Copyright (C) 2007-2022 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
......@@ -65,10 +65,13 @@ class Command(BaseCommand):
db_diff.add_argument("system", help="Remote system name")
# extract command
extract_parser = subparsers.add_parser("extract")
extract_parser.add_argument("system", help="Remote system name")
extract_parser.add_argument(
"quiet", action="store_true", default=True, help="Remote system name"
"--quiet", action="store_true", default=True, help="Remote system name"
)
extract_parser.add_argument(
"--incremental", action="store_true", default=False, help="Incremental extracting"
)
extract_parser.add_argument("system", help="Remote system name")
extract_parser.add_argument(
"extractors", nargs=argparse.REMAINDER, help="List of extractor names"
)
......@@ -105,7 +108,11 @@ class Command(BaseCommand):
remote_system = RemoteSystem.get_by_name(options["system"])
if not remote_system:
self.die("Invalid remote system: %s" % options["system"])
remote_system.extract(options.get("extractors", []), quiet=options.get("quiet", False))
remote_system.extract(
options.get("extractors", []),
quiet=options.get("quiet", False),
incremental=options.get("incremental", False),
)
if not remote_system.extract_error:
return 0
return 1
......
# ----------------------------------------------------------------------
# Data Extractor
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# Copyright (C) 2007-2022 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
......@@ -13,9 +13,10 @@ import itertools
import io
from time import perf_counter
import contextlib
from typing import Any, List, Iterable, Type, Union, Tuple
from typing import Any, List, Iterable, Type, Union, Tuple, Set, Optional
import dataclasses
import operator
import re
# NOC modules
from noc.core.log import PrefixLoggerAdapter
......@@ -36,13 +37,18 @@ class Problem(object):
row: List[Any]
@dataclasses.dataclass
class RemovedItem(object):
id: str
class BaseExtractor(object):
"""
Data extractor interface. Subclasses must provide
*iter_data* method
"""
name = None
name: str
PREFIX = config.path.etl_import
REPORT_INTERVAL = 1000
# Type of model
......@@ -52,6 +58,10 @@ class BaseExtractor(object):
# Suppress deduplication message
suppress_deduplication_log: bool = False
rx_archive = re.compile(
r"^import-\d{4}(?:-\d{2}){5}.jsonl%s$" % compressor.ext.replace(".", r"\.")
)
def __init__(self, system):
self.system = system
self.config = system.config
......@@ -82,7 +92,7 @@ class BaseExtractor(object):
self.logger.info("Creating directory %s", self.import_dir)
os.makedirs(self.import_dir)
def get_new_state(self) -> io.TextIOWrapper:
def get_new_state(self) -> io.TextIOBase:
self.ensure_import_dir()
path = compressor.get_path(os.path.join(self.import_dir, "import.jsonl"))
self.logger.info("Writing to %s", path)
......@@ -104,7 +114,7 @@ class BaseExtractor(object):
finally:
f.close()
def get_problem_file(self) -> io.TextIOWrapper:
def get_problem_file(self) -> io.TextIOBase:
self.ensure_import_dir()
path = compressor.get_path(os.path.join(self.import_dir, "import.csv.rej"))
self.logger.info("Writing to %s", path)
......@@ -126,38 +136,172 @@ class BaseExtractor(object):
finally:
f.close()
def iter_data(self) -> Iterable[Union[BaseModel, Tuple[Any, ...]]]:
def iter_data(
self, *, checkpoint: Optional[str] = None, **kwargs
) -> Iterable[Union[BaseModel, RemovedItem, Tuple[Any, ...]]]:
"""
Iterator to extract data.
Args:
checkpoint: Incremental extraction from checkpoint, if set.
kwargs: Parameters for future use. Unknown parameters
must be ignored.
Returns:
Iterable of the extracted items.
"""
yield from self.data
def filter(self, row):
def filter(self, row) -> bool:
return True
def clean(self, row):
return row
def extract(self):
def q(s):
def read_current_state(self) -> Optional[List[BaseModel]]:
"""
Read current state.
Returns:
List of items or None
"""
# Check if import_dir is exists
if not os.path.isdir(self.import_dir):
return None # No state
# Check if archive dir is exists
archive_dir = os.path.join(self.import_dir, "archive")
if not os.path.isdir(archive_dir):
return None # No archive
# Read list of archive files
fn = list(
sorted((f for f in os.listdir(archive_dir) if self.rx_archive.match(f)), reverse=True)
)
if not fn:
return None # No files
# Read file
path = os.path.join(self.import_dir, "archive", fn[0])
self.logger.info("Reading current state from %s", path)
data = []
with compressor(path, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
data.append(self.model.parse_raw(line))
return data
@staticmethod
def get_checkpoint(data: List[BaseModel]) -> Optional[str]:
"""
Get latest checkpoint from the state.
Args:
data: List of last state's records.
Returns:
Latest checkpoint, if any. None otherwise.
"""
cp = None
for item in data:
if item.checkpoint and (not cp or item.checkpoint > cp):
cp = item.checkpoint
return cp
def iter_merge_data(
self, current: Optional[List[BaseModel]], delta: Optional[List[BaseModel]]
) -> Iterable[BaseModel]:
"""
Merge current state with delta.
Args:
current: Current state.
delta: Incremental changes.
Returns:
Resulting list
"""
if not delta:
return # No changes
if not current:
yield from delta # No current state
return
iter_c = iter(sorted(current, key=operator.attrgetter("id")))
iter_d = iter(sorted(delta, key=operator.attrgetter("id")))
c = next(iter_c, None)
d = next(iter_d, None)
while c or d:
if c and not d:
# Delta is over, stream current
yield c # Already fetched
yield from iter_c # Left
return
if not c and d:
# Current is over, stream delta
yield d # Already fetched
yield from iter_d # Left
return
if c.id < d.id:
# Less than next delta, yield current
yield c
c = next(iter_c, None)
elif c.id == d.id:
# Exact match, stream delta
yield d
c = next(iter_c, None)
d = next(iter_d, None)
else:
# Delta less than current, yield delta
yield d
d = next(iter_d, None)
def extract(self, incremental: bool = False) -> None:
def q(s: Any) -> str:
if s == "" or s is None:
return ""
elif isinstance(s, str):
return smart_text(s)
else:
return str(s)
if isinstance(s, str):
return s
return str(s)
def get_model(raw) -> BaseModel:
def get_model(raw: Union[BaseModel, Tuple[Any, ...]]) -> BaseModel:
if isinstance(raw, BaseModel):
return raw
return self.model.from_iter(q(x) for x in row)
# Fetch data
self.logger.info("Extracting %s from %s", self.name, self.system.name)
self.logger.info(
"Extracting %s from %s (%s)",
self.name,
self.system.name,
"Incremental" if incremental else "Full",
)
# Prepare iterator
current: Optional[List[BaseModel]] = None
checkpoint: Optional[str] = None
if incremental:
# Incremental extract
current = self.read_current_state()
if current:
# Has state
checkpoint = self.get_checkpoint(current)
if checkpoint:
self.logger.info("Resuming from checkpoint %s", checkpoint)
else:
self.logger.info("Checkpoint not found. Falling back to full extract")
else:
# No current state
self.logger.info("No current state. Falling back to full extract")
# Extract
t0 = perf_counter()
data: List[BaseModel] = []
n = 0
seen = set()
for row in self.iter_data():
seen: Set[str] = set()
removed: Set[str] = set()
for row in self.iter_data(checkpoint=checkpoint):
if not self.filter(row):
continue
if isinstance(row, RemovedItem):
removed.add(row.id)
continue
row = self.clean(row)
# Do not use get_model(self.clean(row)), to zip_longest broken row
row = get_model(row)
......@@ -173,12 +317,20 @@ class BaseExtractor(object):
dt = perf_counter() - t0
speed = n / dt
self.logger.info("%d records extracted in %.2fs (%d records/s)", n, dt, speed)
# Merge incremental data
if incremental:
# Prune removed items
if removed and current:
current = [x for x in current if x.id not in removed]
# Merge
data = list(self.iter_merge_data(current, data))
# Write
with self.with_new_state() as f:
for n, item in enumerate(sorted(data, key=operator.attrgetter("id"))):
if n:
f.write("\n")
f.write(item.json(exclude_defaults=True, exclude_unset=True))
# Report fatal problems
if self.fatal_problems or self.quality_problems:
self.logger.warning(
"Detect problems on extracting, fatal: %d, quality: %d",
......
......@@ -82,6 +82,8 @@ class BaseLoader(object):
workflow_event_model = False
workflow_add_event = "seen"
workflow_delete_event = "missed"
# Incremental
checkpoint_field = "checkpoint"
REPORT_INTERVAL = 1000
......@@ -240,7 +242,7 @@ class BaseLoader(object):
else:
if n.id == o.id:
# Changed
if n.dict(include=include_fields) != o.dict(include=include_fields):
if n.dict(include=include_fields, exclude={self.checkpoint_field}) != o.dict(include=include_fields, exclude={self.checkpoint_field}):
yield o, n
n = next(new, None)
o = next(old, None)
......@@ -378,6 +380,8 @@ class BaseLoader(object):
self.logger.error("Cannot change %s:%s: Does not exists", self.name, object_id)
return None
for k, nv in v.items():
if k == self.checkpoint_field:
continue
if inc_changes and k in inc_changes:
ov = getattr(o, k, [])
nv = list(set(ov).union(set(inc_changes[k]["add"])) - set(inc_changes[k]["remove"]))
......@@ -393,6 +397,8 @@ class BaseLoader(object):
v = self.clean(item)
if "id" in v:
del v["id"]
if self.checkpoint_field in v:
del v[self.checkpoint_field]
for fn in set(v).intersection(self.workflow_fields):
del v[fn]
o = self.find_object(v)
......
# ----------------------------------------------------------------------
# ManagedObjectModel
# ----------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# Copyright (C) 2007-2022 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
......@@ -63,6 +63,7 @@ class ManagedObject(BaseModel):
tt_queue: Optional[str]
tt_system_id: Optional[str]
project: Optional[Reference["Project"]]
checkpoint: Optional[str]
@validator("address")
def address_must_ipaddress(cls, v): # pylint: disable=no-self-argument
......
......@@ -55,7 +55,7 @@ class BaseRemoteSystem(object):
chain.get_loader(ld)
return chain
def extract(self, extractors=None):
def extract(self, extractors=None, incremental: bool = False):
extractors = extractors or []
for en in reversed(self.extractors_order):
if extractors and en not in extractors:
......@@ -66,7 +66,7 @@ class BaseRemoteSystem(object):
continue
# @todo: Config
xc = self.extractors[en](self)
xc.extract()
xc.extract(incremental=incremental)
def load(self, loaders=None):
loaders = loaders or []
......
......@@ -160,11 +160,11 @@ class RemoteSystem(Document):
extractors += [k[7:]]
return extractors
def extract(self, extractors=None, quiet=False):
def extract(self, extractors=None, quiet=False, incremental=False):
extractors = extractors or self.get_extractors()
error = None
try:
self.get_handler().extract(extractors)
self.get_handler().extract(extractors, incremental=incremental)
except Exception as e:
if not quiet:
raise e
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment