Commit 0d74ff8e authored by Andrey Vertiprahov's avatar Andrey Vertiprahov
Browse files

Merge branch 'fix-avs-etl-extractor-incremental' into 'master'

Fix ETL extractors for incremental methods.

See merge request noc/noc!6281
parents 5e90c868 61a8584e
......@@ -254,7 +254,7 @@ class BaseExtractor(object):
yield d
d = next(iter_d, None)
def extract(self, incremental: bool = False) -> None:
def extract(self, incremental: bool = False, **kwargs) -> None:
def q(s: Any) -> str:
if s == "" or s is None:
return ""
......
......@@ -39,7 +39,7 @@ class FiasRemoteSystem(BaseRemoteSystem):
*FIAS_REGION* - region code FIAS
"""
def extract(self, extractors=None):
def extract(self, extractors=None, incremental: bool = False):
extractors = extractors or []
for en in self.extractors_order:
if extractors and en not in extractors:
......@@ -108,9 +108,8 @@ class AdmDivExtractor(BaseExtractor):
for chunk in r.iter_content(1024):
f.write(chunk)
def extract(self):
super(AdmDivExtractor, self).extract()
return
def extract(self, incremental: bool = False, **kwargs) -> None:
super().extract(incremental=incremental)
def check_twice_code(self, ter, kod1, kod2, kod3):
"""
......@@ -153,7 +152,7 @@ class AdmDivExtractor(BaseExtractor):
else:
return ""
def iter_data(self):
def iter_data(self, checkpoint=None, **kwargs):
self.download()
with open(self.csv_path, encoding="cp1251") as f:
reader = csv.reader(f, delimiter=";", quotechar='"')
......@@ -220,9 +219,8 @@ class StreetExtractor(BaseExtractor):
else:
raise Exception("zipfile not found!")
def extract(self):
super().extract()
return
def extract(self, incremental: bool = False, **kwargs) -> None:
super().extract(incremental=incremental)
def parent_admdiv_data(self):
parent = set()
......@@ -288,7 +286,7 @@ class StreetExtractor(BaseExtractor):
return r
return None
def iter_data(self):
def iter_data(self, checkpoint=None, **kwargs):
self.download()
cities, streets = self.get_tables()
for r in streets:
......@@ -345,9 +343,8 @@ class AddressExtractor(BaseExtractor):
else:
raise Exception("zipfile not found!")
def extract(self):
super().extract()
return
def extract(self, incremental: bool = False, **kwargs) -> None:
super().extract(incremental=incremental)
def street_data(self):
street = set()
......@@ -382,7 +379,7 @@ class AddressExtractor(BaseExtractor):
else:
return None, None
def iter_data(self):
def iter_data(self, checkpoint=None, **kwargs):
self.download()
with dbf.Table(filename=self.dbf_path, codepage="cp866") as table:
for r in table:
......@@ -456,9 +453,8 @@ class BuildingExtractor(BaseExtractor):
else:
raise Exception("zipfile not found!")
def extract(self):
super().extract()
return
def extract(self, incremental: bool = False, **kwargs) -> None:
super().extract(incremental=incremental)
def get_oktmo_data(self):
oktmo_data = {}
......@@ -470,7 +466,7 @@ class BuildingExtractor(BaseExtractor):
return oktmo_data
def iter_data(self):
def iter_data(self, checkpoint=None, **kwargs):
self.download()
oktmo_data = self.get_oktmo_data()
with dbf.Table(filename=self.dbf_path_house, codepage="cp866") as table:
......
......@@ -56,7 +56,7 @@ class MySQLExtractor(SQLExtractor):
cursor = self.connect.cursor()
return cursor
def iter_data(self):
def iter_data(self, checkpoint=None, **kwargs):
cursor = self.get_cursor()
# Fetch data
self.logger.info("Fetching data")
......
......@@ -103,7 +103,7 @@ class ORACLEExtractor(SQLExtractor):
yield from seq
left -= 1
def iter_data(self):
def iter_data(self, checkpoint=None, **kwargs):
concurrency = int(self.config.get("ORACLE_ARRAYSIZE", 1))
if concurrency == 1:
yield from self.iter_data_single()
......
......@@ -25,3 +25,4 @@ class Object(BaseModel):
model: str
data: List[ObjectData] = []
container: Optional[Reference["Object"]]
checkpoint: Optional[str]
......@@ -44,6 +44,7 @@ class Service(BaseModel):
cpe_group: Optional[str]
labels: Optional[List[str]]
description: Optional[str] = None
checkpoint: Optional[str]
class Config:
fields = {"state_changed": "logical_status_start", "state": "logical_status"}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment