diff --git a/core/topology/base.py b/core/topology/base.py index 42cd3e8613ddefb5e4aab9d4e551083559d3d15e..43435ae71ab66c45db066f18fa9b1195b3fe79e3 100644 --- a/core/topology/base.py +++ b/core/topology/base.py @@ -7,7 +7,7 @@ # Python modules import operator -from typing import Optional, List, Set +from typing import Optional, List, Set, Dict, Any from dataclasses import asdict # Third-Party modules @@ -64,14 +64,14 @@ class BaseTopology(object): Load objects and links """ - def get_role(self, mo): + def get_role(self, mo: ManagedObject) -> Optional[str]: """ Returns managed object's role. None if no role """ return None - def add_object(self, mo, attrs=None): + def add_object(self, mo: ManagedObject, attrs: Optional[Dict[str, Any]] = None): """ Add managed object to topology """ @@ -138,7 +138,7 @@ class BaseTopology(object): ) self.G.add_node(link_id, **attrs) - def add_link(self, o1, o2, attrs=None): + def add_link(self, o1: str, o2: str, attrs: Optional[Dict[str, Any]] = None): """ Add link between interfaces to topology """ diff --git a/core/topology/segment.py b/core/topology/segment.py index 5760ae44f8d5b19ecdeefdd249d8d70cd5ee4bda..1a88cdec4326c920e4eb64b1f69b3c8d356a1e6a 100644 --- a/core/topology/segment.py +++ b/core/topology/segment.py @@ -10,9 +10,11 @@ import operator import logging import itertools from collections import defaultdict +from typing import Dict, List, Set # Third-party modules import cachetools +from bson import ObjectId # NOC modules from noc.sa.models.managedobject import ManagedObject @@ -41,7 +43,7 @@ class SegmentTopology(BaseTopology): self.ancestor_segments = set() super().__init__(node_hints, link_hints, force_spring) - def get_role(self, mo): + def get_role(self, mo: ManagedObject) -> str: if mo.segment in self.segment_siblings: return "segment" elif self.parent_segment and mo.segment.id in self.ancestor_segments: @@ -50,12 +52,12 @@ class SegmentTopology(BaseTopology): return "downlink" @cachetools.cachedmethod(operator.attrgetter("_uplinks_cache")) - def get_uplinks(self): + def get_uplinks(self) -> List[str]: self.logger.info("Searching for uplinks") if not self.G: return [] for policy in self.segment.profile.iter_uplink_policy(): - uplinks = getattr(self, "get_uplinks_%s" % policy)() + uplinks = getattr(self, f"get_uplinks_{policy}")() if uplinks: self.logger.info( "[%s] %d uplinks found: %s", @@ -68,7 +70,7 @@ class SegmentTopology(BaseTopology): self.logger.info("Failed to find uplinks") return [] - def get_uplinks_seghier(self): + def get_uplinks_seghier(self) -> List[str]: """ Find uplinks basing on segment hierarchy. Any object with parent segment is uplink @@ -76,7 +78,7 @@ class SegmentTopology(BaseTopology): """ return [i for i in self.G.nodes if self.G.nodes[i].get("role") == "uplink"] - def get_uplinks_molevel(self): + def get_uplinks_molevel(self) -> List[str]: """ Find uplinks basing on Managed Object's level. Top-leveled objects are returned. :return: @@ -93,14 +95,14 @@ class SegmentTopology(BaseTopology): and self.G.nodes[i].get("level") == max_level ] - def get_uplinks_seg(self): + def get_uplinks_seg(self) -> List[str]: """ All segment objects are uplinks :return: """ return [i for i in self.G.nodes if self.G.nodes[i].get("role") == "segment"] - def get_uplinks_minaddr(self): + def get_uplinks_minaddr(self) -> List[str]: """ Segment's Object with lesser address is uplink :return: @@ -116,7 +118,7 @@ class SegmentTopology(BaseTopology): ) return [s[1]] - def get_uplinks_maxaddr(self): + def get_uplinks_maxaddr(self) -> List[str]: """ Segment's Object with greater address is uplink :return: @@ -164,11 +166,15 @@ class SegmentTopology(BaseTopology): return 0 # Get all links, belonging to segment - links = list(Link.objects.filter(linked_segments__in=[s.id for s in self.segment_siblings])) + links: List[Link] = list( + Link.objects.filter(linked_segments__in=[s.id for s in self.segment_siblings]) + ) # All linked interfaces from map - all_ifaces = list(itertools.chain.from_iterable(link.interface_ids for link in links)) + all_ifaces: List["ObjectId"] = list( + itertools.chain.from_iterable(link.interface_ids for link in links) + ) # Bulk fetch all interfaces data - ifs = { + ifs: Dict["ObjectId", "Interface"] = { i["_id"]: i for i in Interface._get_collection().find( {"_id": {"$in": all_ifaces}}, @@ -183,12 +189,14 @@ class SegmentTopology(BaseTopology): ) } # Bulk fetch all managed objects - segment_mos = set(self.segment.managed_objects.values_list("id", flat=True)) - all_mos = list( + segment_mos: Set[int] = set(self.segment.managed_objects.values_list("id", flat=True)) + all_mos: List[int] = list( set(i["managed_object"] for i in ifs.values() if "managed_object" in i) | segment_mos ) - mos = {mo.id: mo for mo in ManagedObject.objects.filter(id__in=all_mos)} - self.segment_objects = set( + mos: Dict[int, "ManagedObject"] = { + mo.id: mo for mo in ManagedObject.objects.filter(id__in=all_mos) + } + self.segment_objects: Set[int] = set( mo_id for mo_id in all_mos if mos[mo_id].segment.id == self.segment.id ) for mo in mos.values(): @@ -265,7 +273,7 @@ class SegmentTopology(BaseTopology): :returns: ObjectUplinks items """ - def get_node_uplinks(node): + def get_node_uplinks(node: str) -> List[str]: role = self.G.nodes[node].get("role", "cloud") if role == "uplink": # Only downlinks matter @@ -286,24 +294,26 @@ class SegmentTopology(BaseTopology): # Shortest path first return sorted(ups, key=lambda x: ups[x]) - from noc.sa.models.objectdata import ObjectUplinks + from noc.sa.models.managedobject import ObjectUplinks uplinks = self.get_uplinks() # @todo: Workaround for empty uplinks # Get uplinks for cloud nodes - cloud_uplinks = { + cloud_uplinks: Dict[str, List[int]] = { o: [int(u) for u in get_node_uplinks(o)] for o in self.G.nodes if self.G.nodes[o]["type"] == "cloud" } # All objects including neighbors - all_objects = set(o for o in self.G.nodes if self.G.nodes[o]["type"] == "managedobject") + all_objects: Set[str] = set( + o for o in self.G.nodes if self.G.nodes[o]["type"] == "managedobject" + ) # Get objects uplinks - obj_uplinks = {} - obj_downlinks = defaultdict(set) + obj_uplinks: Dict[int, List[int]] = {} + obj_downlinks: Dict[int, Set[int]] = defaultdict(set) for o in all_objects: mo = int(o) - ups = [] + ups: List[int] = [] for u in get_node_uplinks(o): cu = cloud_uplinks.get(u) if cu is not None: @@ -315,7 +325,7 @@ class SegmentTopology(BaseTopology): for u in ups: obj_downlinks[u].add(mo) # Check uplinks with DownlinkMerge settings - dlm_settings = set( + dlm_settings: Set[int] = set( ManagedObject.objects.filter( id__in=obj_uplinks, object_profile__enable_rca_downlink_merge=True ).values_list("id", flat=True) @@ -343,7 +353,11 @@ class SegmentTopology(BaseTopology): # rca_neighbors = list(sorted(neighbors)) # Recalculated result - yield ObjectUplinks(object_id=mo, uplinks=obj_uplinks[mo], rca_neighbors=rca_neighbors) + yield ObjectUplinks( + object_id=mo, + uplinks=obj_uplinks[mo], + rca_neighbors=rca_neighbors, + ) def update_uplinks(segment_id):