Commit 5e8b7190 authored by Dmitry Volodin's avatar Dmitry Volodin Committed by Andrey Vertiprahov
Browse files

Move to py3-style type annotations

parent a74784fe
......@@ -34,16 +34,13 @@ def smart_text(s, errors="strict", encoding=DEFAULT_ENCODING):
return str(s)
def bord(x):
# type: (int) -> int
def bord(x: int) -> int:
return x
def bchr(x):
# type: (int) -> bytes
def bchr(x: int) -> bytes:
return bytes([x])
def make_bytes(x):
# type: (List[int]) -> bytes
def make_bytes(x: List[int]) -> bytes:
return bytes(x)
......@@ -17,6 +17,5 @@ class BaseCollator(object):
def __init__(self):
pass
def collate(self, physical_path, interfaces):
# type: (str, Dict[str, Interface]) -> Optional[str]
def collate(self, physical_path: str, interfaces: Dict[str, Interface]) -> Optional[str]:
raise NotImplementedError
......@@ -2,7 +2,7 @@
# ----------------------------------------------------------------------
# Crypto-related snippets
# ----------------------------------------------------------------------
# Copyright (C) 2007-2019 The NOC Project
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
......@@ -31,8 +31,7 @@ def gen_salt(salt_len):
return make_bytes([random.choice(SALT_SYMBOLS) for _ in range(salt_len)])
def md5crypt(password, salt=None, magic=b"$1$"):
# type: (bytes, Optional[bytes], bytes) -> bytes
def md5crypt(password: bytes, salt: Optional[bytes] = None, magic: bytes = b"$1$") -> bytes:
"""
MD5 password hash
(Used for RIPE authentication)
......
......@@ -12,8 +12,7 @@ from noc.core.comp import smart_text, smart_bytes
IDNA_PREFIX = str("xn--")
def to_idna(zone):
# type: (str) -> str
def to_idna(zone: str) -> str:
"""
Convert literal zone name to IDNA encoding
:param zone:
......@@ -22,8 +21,7 @@ def to_idna(zone):
return smart_text(smart_text(zone).lower().encode("idna"))
def from_idna(zone):
# type: (str) -> str
def from_idna(zone: str) -> str:
"""
Convert IDNA zone name representation to literal name
:param self:
......@@ -35,8 +33,7 @@ def from_idna(zone):
return smart_text(smart_bytes(zone).decode("idna"))
def is_idna(zone):
# type: (str) -> bool
def is_idna(zone: str) -> bool:
"""
Check if zone name is in IDNA representation
:param zone:
......
......@@ -104,8 +104,7 @@ rx_lf_spaces = re.compile(smart_bytes(r"\r\s+\r"))
rx_ecma = re.compile(get_ecma_re())
def strip_control_sequences(s):
# type: (bytes) -> bytes
def strip_control_sequences(s: bytes) -> bytes:
"""
Normal text leaved untouched
>>> strip_control_sequences("Lorem Ipsum")
......
......@@ -2,7 +2,7 @@
# ---------------------------------------------------------------------
# Gis-related utilities
# ---------------------------------------------------------------------
# Copyright (C) 2007-2019 The NOC Project
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ---------------------------------------------------------------------
......@@ -79,8 +79,7 @@ def bearing_sym(p1, p2):
return BEARINGS[int(math.floor(ob / BS))]
def get_bbox(x0, x1, y0, y1):
# type: (float, float, float, float) -> geojson.Polygon
def get_bbox(x0: float, x1: float, y0: float, y1: float) -> geojson.Polygon:
"""
Get normalized bounding box
:param x0:
......
......@@ -37,8 +37,7 @@ class GridVCS(object):
self.fs = gridfs.GridFS(get_db(), collection="noc.gridvcs.%s" % repo)
self.files = self.fs._GridFS__files
def get_delta(self, src, dst):
# type: (str, str) -> Tuple[str, bytes]
def get_delta(self, src: str, dst: str) -> Tuple[str, bytes]:
"""
Calculate strings delta
:param src: Source string
......@@ -51,8 +50,7 @@ class GridVCS(object):
return self.T_BSDIFF4, delta
@classmethod
def apply_delta(cls, type, src, delta):
# type: (str, str, bytes) -> str
def apply_delta(cls, type: str, src: str, delta: bytes) -> str:
"""
Apply delta
:param type: Delta type
......@@ -63,8 +61,7 @@ class GridVCS(object):
return getattr(cls, "apply_delta_%s" % type)(src, delta)
@staticmethod
def apply_delta_F(src, delta):
# type: (str, bytes) -> str
def apply_delta_F(src: str, delta: bytes) -> str:
"""
Raw string
:param src:
......@@ -74,8 +71,7 @@ class GridVCS(object):
return smart_text(delta)
@staticmethod
def apply_delta_b(src, delta):
# type: (str, bytes) -> str
def apply_delta_b(src: str, delta: bytes) -> str:
"""
Mercurial mdiff. Slow python implementation ported from Mercurial 0.4.
For legacy installations support only
......@@ -98,8 +94,7 @@ class GridVCS(object):
return "".join(r)
@staticmethod
def apply_delta_B(src, delta):
# type: (str, bytes) -> str
def apply_delta_B(src: str, delta: bytes) -> str:
"""
BSDIFF4 diff
:param src:
......@@ -109,31 +104,26 @@ class GridVCS(object):
return smart_text(bsdiff4.patch(src, delta))
@classmethod
def compress(cls, data, method=None):
# type: (bytes, Optional[str]) -> bytes
def compress(cls, data: bytes, method: Optional[str] = None) -> bytes:
if method:
return getattr(cls, "compress_%s" % method)(data)
return data
@classmethod
def decompress(cls, data, method=None):
# type: (bytes, Optional[str]) -> bytes
def decompress(cls, data: bytes, method: Optional[str] = None) -> bytes:
if method:
return getattr(cls, "decompress_%s" % method)(data)
return data
@staticmethod
def compress_z(data):
# type: (bytes) -> bytes
def compress_z(data: bytes) -> bytes:
return zlib.compress(smart_bytes(data))
@staticmethod
def decompress_z(data):
# type: (bytes) -> bytes
def decompress_z(data: bytes) -> bytes:
return zlib.decompress(smart_bytes(data))
def put(self, object, data, ts=None):
# type: (int, str, Optional[datetime.datetime]) -> bool
def put(self, object: int, data: str, ts: Optional[datetime.datetime] = None) -> bool:
"""
Save data
:param object: Object id
......@@ -178,8 +168,7 @@ class GridVCS(object):
)
return True
def get(self, object, revision=None):
# type: (int, Optional[Revision]) -> Optional[str]
def get(self, object: int, revision: Optional[Revision] = None) -> Optional[str]:
"""
Get data
:param object: Object id
......@@ -201,8 +190,7 @@ class GridVCS(object):
break
return data
def delete(self, object):
# type: (int) -> None
def delete(self, object: int) -> None:
"""
Delete object's data and history
:param object:
......@@ -211,8 +199,7 @@ class GridVCS(object):
for r in self.iter_revisions(object):
self.fs.delete(r.id)
def iter_revisions(self, object, reverse=False):
# type: (int, Optional[bool]) -> Iterable[Revision]
def iter_revisions(self, object: int, reverse: Optional[bool] = False) -> Iterable[Revision]:
"""
Get object's revision
:param object:
......@@ -222,8 +209,7 @@ class GridVCS(object):
for r in self.files.find({"object": object}).sort("ts", d):
yield Revision(r["_id"], r["ts"], r["ft"], r.get("c"), r["length"])
def find_last_revision(self, object):
# type: (int) -> Optional[Revision]
def find_last_revision(self, object: int) -> Optional[Revision]:
"""
Find last revision or return None
:param object:
......@@ -234,8 +220,7 @@ class GridVCS(object):
return Revision(r["_id"], r["ts"], r["ft"], r.get("c"), r["length"])
return None
def find_revision(self, object, revision):
# type: (int, str) -> Optional[Revision]
def find_revision(self, object: int, revision: str) -> Optional[Revision]:
"""
:param object:
:param revision: Revision id
......@@ -247,8 +232,7 @@ class GridVCS(object):
return None
@staticmethod
def _unified_diff(src, dst):
# type: (str, str) -> str
def _unified_diff(src: str, dst: str) -> str:
"""
Returns unified diff between src and dest
......@@ -258,8 +242,7 @@ class GridVCS(object):
"""
return "\n".join(difflib.unified_diff(src.splitlines(), dst.splitlines(), lineterm=""))
def diff(self, object, rev1, rev2):
# type: (int, str, str) -> str
def diff(self, object: int, rev1: str, rev2: str) -> str:
"""
Get unified diff between revisions
:param object:
......@@ -271,8 +254,7 @@ class GridVCS(object):
dst = self.get(object, rev2) or ""
return self._unified_diff(src, dst)
def mdiff(self, obj1, rev1, obj2, rev2):
# type: (int, str, int, str) -> str
def mdiff(self, obj1: int, rev1: str, obj2: int, rev2: str) -> str:
"""
Get unified diff between multiple object's revisions
......@@ -286,6 +268,5 @@ class GridVCS(object):
dst = self.get(obj2, rev2) or ""
return self._unified_diff(src, dst)
def ensure_collection(self):
# type: () -> None
def ensure_collection(self) -> None:
self.files.create_index([("object", 1), ("ft", 1)])
......@@ -34,7 +34,6 @@ def hash_int(value):
return hash_fmt.unpack(hash_str(value))
def dict_hash_int(d):
# type: (Dict[str, Any]) -> int
def dict_hash_int(d: Dict[str, Any]) -> int:
r = ["%s=%s" % (k, d[k]) for k in sorted(d)]
return hash_int(",".join(r))
......@@ -28,8 +28,7 @@ class UDPServer(object):
self._pending_sockets = []
self._started = False
def iter_listen(self, cfg):
# type: (str) -> Iterable[Tuple[str, int]]
def iter_listen(self, cfg: str) -> Iterable[Tuple[str, int]]:
"""
Parses listen configuration and yield (address, port) tuples.
Listen configuration is comma-separated string with items:
......@@ -47,8 +46,7 @@ class UDPServer(object):
addr, port = "", listen
yield addr, int(port)
def listen(self, port, address=""):
# type: (int, str) -> None
def listen(self, port: int, address: str = "") -> None:
"""Starts accepting connections on the given port.
This method may be called more than once to listen on multiple ports.
......
......@@ -27,14 +27,12 @@ class MIBRegistry(object):
load_lock = Lock()
def __init__(self):
self.mib = {} # type: Dict[str, str]
self.mib: Dict[str, str] = {}
self.hints = {}
self.loaded_mibs = set()
def __getitem__(self, item):
# type: (Union[str, Tuple[str, int]]) -> str
def maybe_get(k):
# type: (str) -> str
def __getitem__(self, item: Union[str, Tuple[str, int]]) -> str:
def maybe_get(k: str) -> str:
v = self.mib.get(k)
if v is not None:
return v
......@@ -61,8 +59,7 @@ class MIBRegistry(object):
return ".".join([maybe_get(item[0])] + [str(x) for x in item[1:]])
@staticmethod
def mib_to_modname(name):
# type: (str) -> str
def mib_to_modname(name: str) -> str:
"""
Convert MIB name to module name (without .py)
:param name: MIB name, like IF-MIB
......@@ -70,8 +67,7 @@ class MIBRegistry(object):
"""
return name.lower().replace("-", "_")
def load_mib(self, name):
# type: (str) -> None
def load_mib(self, name: str) -> None:
"""
Load MIB by name
......@@ -102,8 +98,7 @@ class MIBRegistry(object):
self.hints.update(m.DISPLAY_HINTS)
self.loaded_mibs.add(name)
def is_loaded(self, name):
# type: (str) -> bool
def is_loaded(self, name: str) -> bool:
"""
Check MIB is loaded
:param name:
......@@ -112,8 +107,7 @@ class MIBRegistry(object):
with self.load_lock:
return name in self.loaded_mibs
def reset(self):
# type: () -> None
def reset(self) -> None:
"""
Reset MIB cache
......@@ -124,8 +118,7 @@ class MIBRegistry(object):
self.loaded_mibs = set()
@staticmethod
def longest_match(d, k):
# type: (Dict[str, Any], str) -> Optional[Any]
def longest_match(d: Dict[str, Any], k: str) -> Optional[Any]:
"""
Returns longest match of key `k` in dict `d`
:param d:
......@@ -137,8 +130,12 @@ class MIBRegistry(object):
return d.get(prefix)
return None
def render(self, oid, value, display_hints=None):
# type: (str, bytes, Dict[str, Callable[[str, bytes], Union[str, bytes]]]) -> str
def render(
self,
oid: str,
value: bytes,
display_hints: Dict[str, Callable[[str, bytes], Union[str, bytes]]] = None,
) -> str:
"""
Apply display-hint
:return:
......
......@@ -48,8 +48,7 @@ def nsq_pub(topic, message):
raise NSQPubError("NSQ Pub error: code=%s message=%s" % (code, body))
def mpub_encode(messages):
# type: (List[Any]) -> bytes
def mpub_encode(messages: List[Any]) -> bytes:
"""
Build mpub binary message
:param messages: List of messages
......
......@@ -24,13 +24,12 @@ from noc.core.backport.time import perf_counter
class TopicQueue(object):
def __init__(self, topic, io_loop=None):
# type: (str, Optional[tornado.ioloop.IOLoop]) -> None
def __init__(self, topic: str, io_loop: Optional[tornado.ioloop.IOLoop] = None) -> None:
self.topic = topic
self.lock = Lock()
self.put_condition = tornado.locks.Condition()
self.shutdown_complete = tornado.locks.Event()
self.queue = deque() # type: deque
self.queue: deque = deque()
self.queue_size = 0
self.to_shutdown = False
self.last_get = None
......@@ -44,8 +43,9 @@ class TopicQueue(object):
self.msg_requeued_size = 0
@staticmethod
def iter_encode_chunks(message, limit=config.nsqd.mpub_size - 8):
# type: (Union[str, object], int) -> Iterable[str]
def iter_encode_chunks(
message: Union[str, object], limit: int = config.nsqd.mpub_size - 8
) -> Iterable[str]:
"""
Encode data to iterable atomic chunks of up to limit size
......@@ -75,8 +75,7 @@ class TopicQueue(object):
else:
raise ValueError("Message too big")
def put(self, message, fifo=True):
# type: (object, bool) -> None
def put(self, message: object, fifo: bool = True) -> None:
"""
Put message into queue. Block if queue is full
......@@ -108,8 +107,7 @@ class TopicQueue(object):
def _notify_all(self):
self.put_condition.notify_all()
def return_messages(self, messages):
# type: (List[str]) -> None
def return_messages(self, messages: List[str]) -> None:
"""
Return messages to the start of the queue
......@@ -126,8 +124,9 @@ class TopicQueue(object):
# Unblock waiters in main thread
self.put_condition.notify_all()
def iter_get(self, n=1, size=None, total_overhead=0, message_overhead=0):
# type: (int, int, int, int) -> Iterable[str]
def iter_get(
self, n: int = 1, size: int = None, total_overhead: int = 0, message_overhead: int = 0
) -> Iterable[str]:
"""
Get up to `n` items up to `size` size.
......@@ -223,8 +222,7 @@ class TopicQueue(object):
timeout = datetime.timedelta(seconds=timeout)
yield self.put_condition.wait(timeout)
def apply_metrics(self, data):
# type: (Dict[str, Any]) -> None
def apply_metrics(self, data: Dict[str, Any]) -> None:
data.update(
{
("nsq_msg_put", ("topic", self.topic)): self.msg_put,
......
......@@ -273,7 +273,7 @@ class BaseProfile(object, metaclass=BaseProfileMetaclass):
# Callable should return str
# It is possible to return bytes in very rare specific cases,
# when you have intention to process binary output in script directly
snmp_display_hints = {} # type: Dict[str, Optional[Callable[[str, bytes], Union[str, bytes]]]]
snmp_display_hints: Dict[str, Optional[Callable[[str, bytes], Union[str, bytes]]]] = {}
# Aggregate up to *snmp_metrics_get_chunk* oids
# to one SNMP GET request
snmp_metrics_get_chunk = 15
......@@ -501,8 +501,7 @@ class BaseProfile(object, metaclass=BaseProfileMetaclass):
#
config_volatile = None
def cleaned_input(self, input):
# type: (bytes) -> bytes
def cleaned_input(self, input: bytes) -> bytes:
"""
Preprocessor to clean up and normalize input from device.
Delete ASCII sequences by default.
......@@ -510,8 +509,7 @@ class BaseProfile(object, metaclass=BaseProfileMetaclass):
"""
return strip_control_sequences(input)
def clean_rogue_chars(self, s):
# type: (bytes) -> bytes
def clean_rogue_chars(self, s: bytes) -> bytes:
if self.rogue_chars:
for cleaner in self.rogue_char_cleaners:
s = cleaner(s)
......@@ -594,8 +592,7 @@ class BaseProfile(object, metaclass=BaseProfileMetaclass):
cls.rx_pattern_operation_error_str = compile(smart_text(cls.pattern_operation_error))
@classmethod
def get_telnet_naws(cls):
# type: () -> bytes
def get_telnet_naws(cls) -> bytes:
return cls.telnet_naws
@classmethod
......
......@@ -772,20 +772,19 @@ class BaseScript(object, metaclass=BaseScriptMetaclass):
def cli(
self,
cmd,
command_submit=None,
bulk_lines=None,
list_re=None,
cached=False,
file=None,
ignore_errors=False,
allow_empty_response=True,
nowait=False,
obj_parser=None,
cmd_next=None,
cmd_stop=None,
):
# type: (str, Optional[bytes], Any, Any, bool, Optional[str], Any, Any, Any, Any, Any, Any) -> str
cmd: str,
command_submit: Optional[bytes] = None,
bulk_lines: Any = None,
list_re: Any = None,
cached: bool = False,
file: Optional[str] = None,
ignore_errors: Any = False,
allow_empty_response: Any = True,
nowait: Any = False,
obj_parser: Any = None,
cmd_next: Any = None,
cmd_stop: Any = None,
) -> str:
"""
Execute CLI command and return result. Initiate cli session
when necessary.
......@@ -861,8 +860,7 @@ class BaseScript(object, metaclass=BaseScriptMetaclass):
self.root.cli_cache[cmd] = r
return format_result(r)
def echo_cancelation(self, r, cmd):
# type: (str, str) -> str
def echo_cancelation(self, r: str, cmd: str) -> str:
"""
Adaptive echo cancelation
......
......@@ -2,7 +2,7 @@
# ----------------------------------------------------------------------
# Beef API
# ----------------------------------------------------------------------
# Copyright (C) 2007-2018 The NOC Project
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
......@@ -16,6 +16,7 @@ import codecs
# Third-party modules
import ujson
import six
from typing import Optional
# NOC modules
from noc.core.comp import smart_text, smart_bytes
......@@ -225,8 +226,7 @@ class Beef(object):
data = cls.decompress_bz2(data)
return Beef.from_json(smart_text(data))
def iter_fsm_state_reply(self, state):
# type: (str) -> bytes
def iter_fsm_state_reply(self, state: str) -> bytes:
"""
Iterate fsm states
:param state:
......@@ -238,8 +238,7 @@ class Beef(object):
yield self._cli_decoder(reply)
break
def iter_cli_reply(self, command):
# type: (bytes) -> bytes
def iter_cli_reply(self, command: bytes) -> bytes:
"""
Iterate fsm states
:param command:
......@@ -259,8 +258,7 @@ class Beef(object):
raise KeyError(b"Command not found")
@staticmethod
def mib_decode_base64(value):
# type: (bytes) -> bytes
def mib_decode_base64(value: bytes) -> bytes:
"""
Decode base64
:param value:
......@@ -278,8 +276,7 @@ class Beef(object):
return value.decode("hex")
@staticmethod
def cli_decode_quopri(value):
# type: (bytes) -> bytes
def cli_decode_quopri(value: bytes) -> bytes:
"""
Decode quoted-printable
:param value:
......@@ -292,8 +289,7 @@ class Beef(object):
self.mib_oid_values = dict((m.oid, m.value) for m in self.mib)
return self.mib_oid_values
def get_mib_value(self, oid):
# type: (str) -> None | bytes
def get_mib_value(self, oid: str) -> Optional[bytes]:
"""
Lookup mib and return oid value
:param oid:
......
......@@ -77,7 +77,7 @@ class CLI(object):
self.command = None
self.prompt_stack = []
self.patterns = self.profile.patterns.copy()
self.buffer = b"" # type: bytes
self.buffer: bytes = b""
self.is_started = False
self.result = None
self.error = None
......@@ -174,8 +174,7 @@ class CLI(object):
s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, self.KEEP_CNT)
return self.iostream_class(s, self)