Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Vladimir
noc
Commits
61a8584e
Verified
Commit
61a8584e
authored
May 05, 2022
by
Andrey Vertiprahov
Browse files
Fix ETL extractors for incremental methods.
parent
5becca72
Changes
6
Hide whitespace changes
Inline
Side-by-side
core/etl/extractor/base.py
View file @
61a8584e
...
...
@@ -254,7 +254,7 @@ class BaseExtractor(object):
yield
d
d
=
next
(
iter_d
,
None
)
def
extract
(
self
,
incremental
:
bool
=
False
)
->
None
:
def
extract
(
self
,
incremental
:
bool
=
False
,
**
kwargs
)
->
None
:
def
q
(
s
:
Any
)
->
str
:
if
s
==
""
or
s
is
None
:
return
""
...
...
core/etl/extractor/fias.py
View file @
61a8584e
...
...
@@ -39,7 +39,7 @@ class FiasRemoteSystem(BaseRemoteSystem):
*FIAS_REGION* - region code FIAS
"""
def
extract
(
self
,
extractors
=
None
):
def
extract
(
self
,
extractors
=
None
,
incremental
:
bool
=
False
):
extractors
=
extractors
or
[]
for
en
in
self
.
extractors_order
:
if
extractors
and
en
not
in
extractors
:
...
...
@@ -108,9 +108,8 @@ class AdmDivExtractor(BaseExtractor):
for
chunk
in
r
.
iter_content
(
1024
):
f
.
write
(
chunk
)
def
extract
(
self
):
super
(
AdmDivExtractor
,
self
).
extract
()
return
def
extract
(
self
,
incremental
:
bool
=
False
,
**
kwargs
)
->
None
:
super
().
extract
(
incremental
=
incremental
)
def
check_twice_code
(
self
,
ter
,
kod1
,
kod2
,
kod3
):
"""
...
...
@@ -153,7 +152,7 @@ class AdmDivExtractor(BaseExtractor):
else
:
return
""
def
iter_data
(
self
):
def
iter_data
(
self
,
checkpoint
=
None
,
**
kwargs
):
self
.
download
()
with
open
(
self
.
csv_path
,
encoding
=
"cp1251"
)
as
f
:
reader
=
csv
.
reader
(
f
,
delimiter
=
";"
,
quotechar
=
'"'
)
...
...
@@ -220,9 +219,8 @@ class StreetExtractor(BaseExtractor):
else
:
raise
Exception
(
"zipfile not found!"
)
def
extract
(
self
):
super
().
extract
()
return
def
extract
(
self
,
incremental
:
bool
=
False
,
**
kwargs
)
->
None
:
super
().
extract
(
incremental
=
incremental
)
def
parent_admdiv_data
(
self
):
parent
=
set
()
...
...
@@ -288,7 +286,7 @@ class StreetExtractor(BaseExtractor):
return
r
return
None
def
iter_data
(
self
):
def
iter_data
(
self
,
checkpoint
=
None
,
**
kwargs
):
self
.
download
()
cities
,
streets
=
self
.
get_tables
()
for
r
in
streets
:
...
...
@@ -345,9 +343,8 @@ class AddressExtractor(BaseExtractor):
else
:
raise
Exception
(
"zipfile not found!"
)
def
extract
(
self
):
super
().
extract
()
return
def
extract
(
self
,
incremental
:
bool
=
False
,
**
kwargs
)
->
None
:
super
().
extract
(
incremental
=
incremental
)
def
street_data
(
self
):
street
=
set
()
...
...
@@ -382,7 +379,7 @@ class AddressExtractor(BaseExtractor):
else
:
return
None
,
None
def
iter_data
(
self
):
def
iter_data
(
self
,
checkpoint
=
None
,
**
kwargs
):
self
.
download
()
with
dbf
.
Table
(
filename
=
self
.
dbf_path
,
codepage
=
"cp866"
)
as
table
:
for
r
in
table
:
...
...
@@ -456,9 +453,8 @@ class BuildingExtractor(BaseExtractor):
else
:
raise
Exception
(
"zipfile not found!"
)
def
extract
(
self
):
super
().
extract
()
return
def
extract
(
self
,
incremental
:
bool
=
False
,
**
kwargs
)
->
None
:
super
().
extract
(
incremental
=
incremental
)
def
get_oktmo_data
(
self
):
oktmo_data
=
{}
...
...
@@ -470,7 +466,7 @@ class BuildingExtractor(BaseExtractor):
return
oktmo_data
def
iter_data
(
self
):
def
iter_data
(
self
,
checkpoint
=
None
,
**
kwargs
):
self
.
download
()
oktmo_data
=
self
.
get_oktmo_data
()
with
dbf
.
Table
(
filename
=
self
.
dbf_path_house
,
codepage
=
"cp866"
)
as
table
:
...
...
core/etl/extractor/mysql.py
View file @
61a8584e
...
...
@@ -56,7 +56,7 @@ class MySQLExtractor(SQLExtractor):
cursor
=
self
.
connect
.
cursor
()
return
cursor
def
iter_data
(
self
):
def
iter_data
(
self
,
checkpoint
=
None
,
**
kwargs
):
cursor
=
self
.
get_cursor
()
# Fetch data
self
.
logger
.
info
(
"Fetching data"
)
...
...
core/etl/extractor/oracle.py
View file @
61a8584e
...
...
@@ -103,7 +103,7 @@ class ORACLEExtractor(SQLExtractor):
yield
from
seq
left
-=
1
def
iter_data
(
self
):
def
iter_data
(
self
,
checkpoint
=
None
,
**
kwargs
):
concurrency
=
int
(
self
.
config
.
get
(
"ORACLE_ARRAYSIZE"
,
1
))
if
concurrency
==
1
:
yield
from
self
.
iter_data_single
()
...
...
core/etl/models/object.py
View file @
61a8584e
...
...
@@ -25,3 +25,4 @@ class Object(BaseModel):
model
:
str
data
:
List
[
ObjectData
]
=
[]
container
:
Optional
[
Reference
[
"Object"
]]
checkpoint
:
Optional
[
str
]
core/etl/models/service.py
View file @
61a8584e
...
...
@@ -44,6 +44,7 @@ class Service(BaseModel):
cpe_group
:
Optional
[
str
]
labels
:
Optional
[
List
[
str
]]
description
:
Optional
[
str
]
=
None
checkpoint
:
Optional
[
str
]
class
Config
:
fields
=
{
"state_changed"
:
"logical_status_start"
,
"state"
:
"logical_status"
}
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment