Verified Commit 3bfa4196 authored by Andrey Vertiprahov's avatar Andrey Vertiprahov
Browse files

Add type annotations to topology class.

parent 96081372
Pipeline #36327 passed with stages
in 15 minutes and 18 seconds
......@@ -7,7 +7,7 @@
# Python modules
import operator
from typing import Optional, List, Set
from typing import Optional, List, Set, Dict, Any
from dataclasses import asdict
# Third-Party modules
......@@ -64,14 +64,14 @@ class BaseTopology(object):
Load objects and links
"""
def get_role(self, mo):
def get_role(self, mo: "ManagedObject") -> Optional[str]:
"""
Returns managed object's role.
None if no role
"""
return None
def add_object(self, mo, attrs=None):
def add_object(self, mo: "ManagedObject", attrs: Optional[Dict[str, Any]] = None):
"""
Add managed object to topology
"""
......@@ -138,7 +138,7 @@ class BaseTopology(object):
)
self.G.add_node(link_id, **attrs)
def add_link(self, o1, o2, attrs=None):
def add_link(self, o1: str, o2: str, attrs: Dict[str, Any] = None):
"""
Add link between interfaces to topology
"""
......
......@@ -10,9 +10,11 @@ import operator
import logging
import itertools
from collections import defaultdict
from typing import Dict, List, Set
# Third-party modules
import cachetools
from bson import ObjectId
# NOC modules
from noc.sa.models.managedobject import ManagedObject
......@@ -41,7 +43,7 @@ class SegmentTopology(BaseTopology):
self.ancestor_segments = set()
super().__init__(node_hints, link_hints, force_spring)
def get_role(self, mo):
def get_role(self, mo: "ManagedObject") -> str:
if mo.segment in self.segment_siblings:
return "segment"
elif self.parent_segment and mo.segment.id in self.ancestor_segments:
......@@ -50,12 +52,12 @@ class SegmentTopology(BaseTopology):
return "downlink"
@cachetools.cachedmethod(operator.attrgetter("_uplinks_cache"))
def get_uplinks(self):
def get_uplinks(self) -> List[str]:
self.logger.info("Searching for uplinks")
if not self.G:
return []
for policy in self.segment.profile.iter_uplink_policy():
uplinks = getattr(self, "get_uplinks_%s" % policy)()
uplinks = getattr(self, f"get_uplinks_{policy}")()
if uplinks:
self.logger.info(
"[%s] %d uplinks found: %s",
......@@ -68,7 +70,7 @@ class SegmentTopology(BaseTopology):
self.logger.info("Failed to find uplinks")
return []
def get_uplinks_seghier(self):
def get_uplinks_seghier(self) -> List[str]:
"""
Find uplinks basing on segment hierarchy. Any object with parent segment
is uplink
......@@ -76,7 +78,7 @@ class SegmentTopology(BaseTopology):
"""
return [i for i in self.G.nodes if self.G.nodes[i].get("role") == "uplink"]
def get_uplinks_molevel(self):
def get_uplinks_molevel(self) -> List[str]:
"""
Find uplinks basing on Managed Object's level. Top-leveled objects are returned.
:return:
......@@ -93,14 +95,14 @@ class SegmentTopology(BaseTopology):
and self.G.nodes[i].get("level") == max_level
]
def get_uplinks_seg(self):
def get_uplinks_seg(self) -> List[str]:
"""
All segment objects are uplinks
:return:
"""
return [i for i in self.G.nodes if self.G.nodes[i].get("role") == "segment"]
def get_uplinks_minaddr(self):
def get_uplinks_minaddr(self) -> List[str]:
"""
Segment's Object with lesser address is uplink
:return:
......@@ -116,7 +118,7 @@ class SegmentTopology(BaseTopology):
)
return [s[1]]
def get_uplinks_maxaddr(self):
def get_uplinks_maxaddr(self) -> List[str]:
"""
Segment's Object with greater address is uplink
:return:
......@@ -164,11 +166,15 @@ class SegmentTopology(BaseTopology):
return 0
# Get all links, belonging to segment
links = list(Link.objects.filter(linked_segments__in=[s.id for s in self.segment_siblings]))
links: List["Link"] = list(
Link.objects.filter(linked_segments__in=[s.id for s in self.segment_siblings])
)
# All linked interfaces from map
all_ifaces = list(itertools.chain.from_iterable(link.interface_ids for link in links))
all_ifaces: List["ObjectId"] = list(
itertools.chain.from_iterable(link.interface_ids for link in links)
)
# Bulk fetch all interfaces data
ifs = {
ifs: Dict["ObjectId", "Interface"] = {
i["_id"]: i
for i in Interface._get_collection().find(
{"_id": {"$in": all_ifaces}},
......@@ -183,12 +189,14 @@ class SegmentTopology(BaseTopology):
)
}
# Bulk fetch all managed objects
segment_mos = set(self.segment.managed_objects.values_list("id", flat=True))
all_mos = list(
segment_mos: Set[int] = set(self.segment.managed_objects.values_list("id", flat=True))
all_mos: List[int] = list(
set(i["managed_object"] for i in ifs.values() if "managed_object" in i) | segment_mos
)
mos = {mo.id: mo for mo in ManagedObject.objects.filter(id__in=all_mos)}
self.segment_objects = set(
mos: Dict[int, "ManagedObject"] = {
mo.id: mo for mo in ManagedObject.objects.filter(id__in=all_mos)
}
self.segment_objects: Set[int] = set(
mo_id for mo_id in all_mos if mos[mo_id].segment.id == self.segment.id
)
for mo in mos.values():
......@@ -265,7 +273,7 @@ class SegmentTopology(BaseTopology):
:returns: ObjectUplinks items
"""
def get_node_uplinks(node):
def get_node_uplinks(node: str) -> List[str]:
role = self.G.nodes[node].get("role", "cloud")
if role == "uplink":
# Only downlinks matter
......@@ -286,24 +294,26 @@ class SegmentTopology(BaseTopology):
# Shortest path first
return sorted(ups, key=lambda x: ups[x])
from noc.sa.models.objectdata import ObjectUplinks
from noc.sa.models.managedobject import ObjectUplinks
uplinks = self.get_uplinks()
# @todo: Workaround for empty uplinks
# Get uplinks for cloud nodes
cloud_uplinks = {
cloud_uplinks: Dict[str, List[int]] = {
o: [int(u) for u in get_node_uplinks(o)]
for o in self.G.nodes
if self.G.nodes[o]["type"] == "cloud"
}
# All objects including neighbors
all_objects = set(o for o in self.G.nodes if self.G.nodes[o]["type"] == "managedobject")
all_objects: Set[str] = set(
o for o in self.G.nodes if self.G.nodes[o]["type"] == "managedobject"
)
# Get objects uplinks
obj_uplinks = {}
obj_downlinks = defaultdict(set)
obj_uplinks: Dict[int, List[int]] = {}
obj_downlinks: Dict[int, Set[int]] = defaultdict(set)
for o in all_objects:
mo = int(o)
ups = []
ups: List[int] = []
for u in get_node_uplinks(o):
cu = cloud_uplinks.get(u)
if cu is not None:
......@@ -315,7 +325,7 @@ class SegmentTopology(BaseTopology):
for u in ups:
obj_downlinks[u].add(mo)
# Check uplinks with DownlinkMerge settings
dlm_settings = set(
dlm_settings: Set[int] = set(
ManagedObject.objects.filter(
id__in=obj_uplinks, object_profile__enable_rca_downlink_merge=True
).values_list("id", flat=True)
......@@ -343,7 +353,11 @@ class SegmentTopology(BaseTopology):
#
rca_neighbors = list(sorted(neighbors))
# Recalculated result
yield ObjectUplinks(object_id=mo, uplinks=obj_uplinks[mo], rca_neighbors=rca_neighbors)
yield ObjectUplinks(
object_id=mo,
uplinks=obj_uplinks[mo],
rca_neighbors=rca_neighbors,
)
def update_uplinks(segment_id):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment