Verified Commit 94b2c3fa authored by Andrey Vertiprahov's avatar Andrey Vertiprahov
Browse files

Add 'NAG | SNR | CLI | FoxGate' capabilities.

parent ec30d781
Pipeline #36894 passed with stages
in 15 minutes and 20 seconds
{
"name": "NAG | SNR | CLI | FoxGate",
"$collection": "inv.capabilities",
"uuid": "96cb024c-3062-415a-a2a7-d3503d7628c0",
"description": "Supported FoxGate CLI Syntax on SNR Profile",
"type": "bool",
"card_template": null
}
......@@ -7,10 +7,9 @@
# Python modules
import re
from builtins import str
# NOC modules
from noc.core.script.base import BaseScript
from noc.sa.profiles.Generic.get_arp import Script as BaseScript
from noc.sa.interfaces.igetarp import IGetARP
......@@ -24,26 +23,11 @@ class Script(BaseScript):
r"^(?P<ip>\d+\S+)\s+(?P<mac>\S+)\s+\d+\s+(?P<interface>\S+\d+)", re.MULTILINE
)
def execute_snmp(self):
r = []
for v in self.snmp.get_tables(
["1.3.6.1.2.1.4.22.1.1", "1.3.6.1.2.1.4.22.1.2", "1.3.6.1.2.1.4.22.1.3"], bulk=True
):
iface = self.snmp.get("1.3.6.1.2.1.31.1.1.1.1." + str(v[1]), cached=True) # IF-MIB
mac = ":".join(["%02x" % ord(c) for c in v[2]])
# ip = ["%02x" % ord(c) for c in v[3]]
# ip = ".".join(str(int(c, 16)) for c in ip)
r.append({"ip": v[3], "mac": mac, "interface": iface})
return r
def execute_cli(self):
r = []
try:
v = self.cli("show arp")
for match in self.rx_arp.finditer(v):
r += [match.groupdict()]
except self.CLISyntaxError:
if self.is_foxgate_cli:
v = self.cli("show arp all")
for match in self.rx_arp2.finditer(v):
r += [match.groupdict()]
r = [match.groupdict() for match in self.rx_arp2.finditer(v)]
else:
v = self.cli("show arp")
r = [match.groupdict() for match in self.rx_arp.finditer(v)]
return r
......@@ -10,7 +10,6 @@ import re
# NOC modules
from noc.sa.profiles.Generic.get_capabilities import Script as BaseScript
from noc.sa.profiles.Generic.get_capabilities import false_on_cli_error
class Script(BaseScript):
......@@ -19,26 +18,36 @@ class Script(BaseScript):
rx_lldp_en = re.compile(r"LLDP has been enabled globally?")
rx_stack = re.compile(r"-+member :(?P<id>\d+)-+")
@false_on_cli_error
def has_lldp_cli(self):
def get_lldp_cli(self):
"""
Check box has lldp enabled on SNR
"""
cmd = self.cli("show lldp", ignore_errors=True)
if "% Incomplete command" in cmd:
cmd = self.cli("show lldp interface", ignore_errors=True)
return "System LLDP: enable" in cmd
return self.rx_lldp_en.search(cmd) is not None
try:
cmd = self.cli("show lldp")
except self.CLISyntaxError:
# FoxGate CLI
cmd = self.cli("show lldp interface")
return "System LLDP: enable" in cmd, True
return self.rx_lldp_en.search(cmd) is not None, False
def execute_platform_cli(self, caps):
def get_stack_members(self):
s = []
try:
s = []
cmd = self.cli("show slot")
for match in self.rx_stack.finditer(cmd):
i = match.group("id")
s += [i]
if s:
caps["Stack | Members"] = len(s) if len(s) != 1 else 0
caps["Stack | Member Ids"] = " | ".join(s)
except Exception:
pass
except self.CLISyntaxError:
return
for match in self.rx_stack.finditer(cmd):
i = match.group("id")
s += [i]
return s
def execute_platform_cli(self, caps):
members = self.get_stack_members()
if members:
caps["Stack | Members"] = len(members) if len(members) != 1 else 0
caps["Stack | Member Ids"] = " | ".join(members)
has_lldp, foxgate_cli = self.get_lldp_cli()
if has_lldp:
caps["Network | LLDP"] = True
if foxgate_cli:
caps["NAG | SNR | CLI | FoxGate"] = True
......@@ -25,9 +25,10 @@ class Script(BaseScript):
SNMP_GET_OIDS = {"SNMP": [mib["IF-MIB::ifPhysAddress", 1]]}
def execute_cli(self):
macs = sorted(self.rx_mac.findall(self.cli("show version", cached=True)))
if not macs:
if self.is_foxgate_cli:
macs = sorted(self.rx_mac2.findall(self.cli("show ip", cached=True)))
else:
macs = sorted(self.rx_mac.findall(self.cli("show version", cached=True)))
return [
{"first_chassis_mac": f, "last_chassis_mac": t} for f, t in self.macs_to_ranges(macs)
]
......@@ -7,6 +7,8 @@
# Python modules
import re
from typing import Set, Dict, Any
from collections import defaultdict
# NOC modules
from noc.core.script.base import BaseScript
......@@ -78,142 +80,164 @@ class Script(BaseScript):
)
rx_lag_port = re.compile(r"\s*\S+ is LAG member port, LAG port:(?P<lag_port>\S+)\n")
def execute_cli(self):
interfaces = []
# Get LLDP interfaces
lldp = []
def get_switchport_cli(self) -> Dict[str, Dict[str, Any]]:
if self.is_foxgate_cli:
return {}
# New CLI syntax
result = defaultdict(lambda: {"untagged": None, "tagged": []})
v = self.cli("show switchport interface")
for match in self.rx_vlan.finditer(v):
ifname = match.group("ifname")
result[ifname]["untagged"] = match.group("untagged_vlan")
if match.group("tagged_vlans"):
tagged_vlans = match.group("tagged_vlans").replace(";", ",")
result[ifname]["tagged"] = self.expand_rangelist(tagged_vlans)
return result
def get_interface_lldp(self) -> Set[str]:
lldp = set()
if self.is_foxgate_cli:
return lldp
c = self.cli("show lldp", ignore_errors=True)
if self.rx_lldp_en.search(c):
ll = self.rx_lldp.search(c)
if ll:
lldp = ll.group("local_if").split()
lldp = set(ll.group("local_if").split())
return lldp
def get_interfaces_foxgatecli(self):
"""
For FoxGate Like CLI syntax
:return:
"""
v = self.cli("show interface", cached=True)
interfaces = {}
for match in self.rx_sh_int_old.finditer(v):
ifname = match.group("interface")
sub = {
"name": match.group("interface"),
"admin_status": match.group("admin_status") == "enabled",
"oper_status": match.group("oper_status") == "up",
"mac": match.group("mac"),
"enabled_afi": ["BRIDGE"],
}
if match.group("mode") == "access":
sub["untagged_vlan"] = match.group("untagged")
else:
sub["untagged_vlan"] = match.group("pvid")
sub["tagged_vlans"] = self.expand_rangelist(match.group("tagged"))
interfaces[ifname] = {
"name": ifname,
"type": "physical",
"admin_status": match.group("admin_status") == "enabled",
"oper_status": match.group("oper_status") == "up",
"mac": match.group("mac"),
"subinterfaces": [sub],
}
v = self.cli("show ip", cached=True)
match = self.rx_mgmt.search(v)
ip_address = f'{match.group("ip")}/{IPv4.netmask_to_len(match.group("mask"))}'
interfaces["system"] = {
"name": "system",
"type": "SVI",
"admin_status": True,
"oper_status": True,
"mac": match.group("mac"),
"subinterfaces": [
{
"name": "system",
"admin_status": True,
"oper_status": True,
"mac": match.group("mac"),
"enabled_afi": ["IPv4"],
"ipv4_addresses": [ip_address],
"vlan_ids": match.group("vlan_id"),
}
],
}
return [{"interfaces": list(interfaces.values())}]
def execute_cli(self, **kwargs):
interfaces = {}
if self.is_foxgate_cli:
return self.get_interfaces_foxgatecli()
# Get LLDP enabled interfaces
lldp = self.get_interface_lldp()
# Get switchports and fill tagged/untagged lists if they are not empty
switchports = self.get_switchport_cli()
v = self.cli("show interface", cached=True)
for match in self.rx_sh_int.finditer(v):
name = match.group("interface")
ifname = match.group("interface")
a_stat = match.group("admin_status").lower() == "up"
o_stat = match.group("oper_status").lower() == "up"
sub = {
"name": ifname,
"admin_status": a_stat,
"oper_status": o_stat,
"enabled_protocols": [],
"enabled_afi": [],
}
# Switchport
if ifname in switchports:
# Bridge
sub["enabled_afi"] += ["BRIDGE"]
u, t = switchports[ifname]["untagged"], switchports[ifname].get("tagged")
if u:
sub["untagged_vlan"] = u
if t:
sub["tagged_vlans"] = t
# Other
other = match.group("other")
match1 = self.rx_hw.search(other)
# MTU
match1 = self.rx_mtu.search(other)
if match1:
sub["mtu"] = match1.group("mtu")
# PVID
match1 = self.rx_pvid.search(other)
if match1:
sub["untagged_vlan"] = match1.group("pvid")
if ifname.startswith("Vlan"):
sub["vlan_ids"] = [int(ifname[4:])]
# IP Address
match1 = self.rx_ip.search(other)
if match1 and "NULL" not in match1.group("ip"):
ip_address = f'{match1.group("ip")}/{IPv4.netmask_to_len(match1.group("mask"))}'
sub["ipv4_addresses"] = [ip_address]
sub["enabled_afi"] = ["IPv4"]
iface = {
"type": self.profile.get_interface_type(name),
"name": name,
"name": ifname,
"admin_status": a_stat,
"oper_status": o_stat,
"type": self.profile.get_interface_type(ifname),
"enabled_protocols": [],
"subinterfaces": [sub],
}
sub = {"name": name, "admin_status": a_stat, "oper_status": o_stat}
# LLDP protocol
if name in lldp:
iface["enabled_protocols"] = ["LLDP"]
if iface["type"] == "physical":
sub["enabled_afi"] = ["BRIDGE"]
# MAC
match1 = self.rx_hw.search(other)
if match1.group("mac"):
iface["mac"] = match1.group("mac")
sub["mac"] = match1.group("mac")
# Description
match1 = self.rx_alias.search(other)
if match1 and match1.group("alias") != "(null),":
iface["description"] = match1.group("alias")
sub["description"] = match1.group("alias")
# Ifindex
match1 = self.rx_index.search(other)
if match1:
iface["snmp_ifindex"] = match1.group("ifindex")
sub["snmp_ifindex"] = match1.group("ifindex")
else:
if match.group("snmp_ifindex"):
iface["snmp_ifindex"] = match.group("snmp_ifindex")
sub["snmp_ifindex"] = match.group("snmp_ifindex")
# Correct alias and index on some device
match1 = self.rx_alias_and_index.search(other)
if match1:
if match1.group("alias") != "(null)":
iface["description"] = match1.group("alias")
sub["description"] = match1.group("alias")
iface["snmp_ifindex"] = match1.group("ifindex")
sub["snmp_ifindex"] = match1.group("ifindex")
match1 = self.rx_mtu.search(other)
if match1:
sub["mtu"] = match1.group("mtu")
match1 = self.rx_pvid.search(other)
if match1:
sub["untagged_vlan"] = match1.group("pvid")
if name.startswith("Vlan"):
sub["vlan_ids"] = [int(name[4:])]
elif match.group("snmp_ifindex"):
iface["snmp_ifindex"] = match.group("snmp_ifindex")
sub["snmp_ifindex"] = match.group("snmp_ifindex")
# LAG
match1 = self.rx_lag_port.search(other)
if match1:
iface["aggregated_interface"] = match1.group("lag_port")
match1 = self.rx_ip.search(other)
if match1:
if "NULL" in match1.group("ip"):
continue
ip_address = "%s/%s" % (
match1.group("ip"),
IPv4.netmask_to_len(match1.group("mask")),
)
sub["ipv4_addresses"] = [ip_address]
sub["enabled_afi"] = ["IPv4"]
iface["subinterfaces"] = [sub]
interfaces += [iface]
if interfaces:
# New CLI syntax
v = self.cli("show switchport interface")
for match in self.rx_vlan.finditer(v):
ifname = match.group("ifname")
untagged_vlan = match.group("untagged_vlan")
for i in interfaces:
if ifname == i["name"]:
i["subinterfaces"][0]["untagged_vlan"] = untagged_vlan
if match.group("tagged_vlans"):
tagged_vlans = match.group("tagged_vlans").replace(";", ",")
i["subinterfaces"][0]["tagged_vlans"] = self.expand_rangelist(
tagged_vlans
)
break
else:
# Old CLI syntax. V6.5.1.21 and older
for match in self.rx_sh_int_old.finditer(v):
iface = {
"name": match.group("interface"),
"type": "physical",
"admin_status": match.group("admin_status") == "enabled",
"oper_status": match.group("oper_status") == "up",
"mac": match.group("mac"),
}
sub = {
"name": match.group("interface"),
"admin_status": match.group("admin_status") == "enabled",
"oper_status": match.group("oper_status") == "up",
"mac": match.group("mac"),
"enabled_afi": ["BRIDGE"],
}
if match.group("mode") == "access":
sub["untagged_vlan"] = match.group("untagged")
else:
sub["untagged_vlan"] = match.group("pvid")
sub["tagged_vlans"] = self.expand_rangelist(match.group("tagged"))
iface["subinterfaces"] = [sub]
interfaces += [iface]
v = self.cli("show ip", cached=True)
match = self.rx_mgmt.search(v)
ip_address = "%s/%s" % (
match.group("ip"),
IPv4.netmask_to_len(match.group("mask")),
)
iface = {
"name": "system",
"type": "SVI",
"admin_status": True,
"oper_status": True,
"mac": match.group("mac"),
"subinterfaces": [
{
"name": "system",
"admin_status": True,
"oper_status": True,
"mac": match.group("mac"),
"enabled_afi": ["IPv4"],
"ipv4_addresses": [ip_address],
"vlan_ids": match.group("vlan_id"),
}
],
}
interfaces += [iface]
return [{"interfaces": interfaces}]
# LLDP protocol
if ifname in lldp:
iface["enabled_protocols"] = ["LLDP"]
interfaces[ifname] = iface
return [{"interfaces": list(interfaces.values())}]
......@@ -50,35 +50,35 @@ class Script(BaseScript):
"Dec": "12",
}
def execute(self):
def execute_cli(self, **kwargs):
r = []
s = self.scripts.get_version()
revision = s["attributes"]["HW version"]
serial = s["attributes"]["Serial Number"]
part_no = s["platform"]
vendor = s["vendor"]
if self.is_foxgate_cli:
r += [
{
"type": "CHASSIS",
"vendor": vendor,
"part_no": part_no,
"revision": revision,
"serial": serial,
"description": "",
}
]
r += self.get_transceivers(1)
return r
try:
slot = self.cli("show slot")
except self.CLISyntaxError:
slot = "Invalid"
slot_id = 0
if "Invalid" in slot:
p = {
"type": "CHASSIS",
"vendor": vendor,
"part_no": part_no,
"revision": revision,
"serial": serial,
"description": "",
}
r += [p]
r += self.get_transceivers(1)
else:
for match in self.rx_stack.finditer(slot):
mfg_date = match.group("mfg_date")
date = mfg_date.replace("/", "-")
slot_id += 1
sl = {
for slot_id, match in enumerate(self.rx_stack.finditer(slot), start=1):
mfg_date = match.group("mfg_date")
date = mfg_date.replace("/", "-")
r += [
{
"type": "CHASSIS",
"number": slot_id,
"vendor": vendor,
......@@ -88,100 +88,97 @@ class Script(BaseScript):
"serial": match.group("serial"),
"description": "",
}
r += [sl]
r += self.get_transceivers(slot_id)
]
r += self.get_transceivers(slot_id)
return r
def get_transceivers(self, slot_id):
out = []
try:
c = self.cli("show transceiver detail", cached=True)
for match in self.rx_media_type.finditer(c):
description = match.group("part_no")
mbd = int(match.group("mbd"))
nm = match.group("nm")
ch_id = int(match.group("ch_id"))
if ch_id == slot_id:
if not nm:
nm = 0
except self.CLISyntaxError:
return out
for match in self.rx_media_type.finditer(c):
description = match.group("part_no")
mbd = int(match.group("mbd"))
nm = match.group("nm")
ch_id = int(match.group("ch_id"))
if ch_id == slot_id:
nm = nm or 0
nm = int(nm)
vendor = "NONAME"
part_no = "NoName | Transceiver | "
if mbd == 1300 and nm == 1310:
part_no = part_no + "1G | SFP BXU"
elif mbd == 1300 and nm == 1550:
part_no = part_no + "1G | SFP BXD"
elif mbd == 1200 and nm == 1310:
part_no = part_no + "1G | SFP BX10U"
elif mbd >= 1200 and nm == 1490:
part_no = part_no + "1G | SFP BX10D"
elif mbd == 10300 and nm == 0 and description.startswith("unkn"):
part_no = part_no + "10G | SFP+ Twinax"
elif mbd == 10300 and description.startswith("10G"):
if description.endswith(tuple([" ER", "-ER"])):
part_no = part_no + "10G | SFP+ ER"
elif description.endswith(tuple([" LR", "-LR"])):
part_no = part_no + "10G | SFP+ LR"
elif description.endswith(tuple([" SR", "-SR"])):
part_no = part_no + "10G | SFP+ SR"
elif description.endswith(tuple([" ZR", "-ZR"])):
part_no = part_no + "10G | SFP+ ZR"
else:
nm = int(nm)
vendor = "NONAME"
part_no = "NoName | Transceiver | "
if mbd == 1300 and nm == 1310:
part_no = part_no + "1G | SFP BXU"
elif mbd == 1300 and nm == 1550:
part_no = part_no + "1G | SFP BXD"
elif mbd == 1200 and nm == 1310:
part_no = part_no + "1G | SFP BX10U"
elif mbd >= 1200 and nm == 1490:
part_no = part_no + "1G | SFP BX10D"
elif mbd == 10300 and nm == 0 and description.startswith("unkn"):
part_no = part_no + "10G | SFP+ Twinax"
elif mbd == 10300 and description.startswith("10G"):
if description.endswith(tuple([" ER", "-ER"])):
part_no = part_no + "10G | SFP+ ER"
elif description.endswith(tuple([" LR", "-LR"])):
part_no = part_no + "10G | SFP+ LR"
elif description.endswith(tuple([" SR", "-SR"])):
part_no = part_no + "10G | SFP+ SR"
elif description.endswith(tuple([" ZR", "-ZR"])):
part_no = part_no + "10G | SFP+ ZR"
else:
part_no = part_no + "10G | SFP+"
elif mbd == 12000 and nm == 0:
part_no = part_no + "10G | SFP+ Twinax"
elif mbd >= 1000 and mbd <= 1300 and nm == 0:
if description.endswith(tuple([" EX", "-EX"])):
part_no = part_no + "1G | SFP EX"
elif description.endswith(tuple([" LH", "-LH"])):
part_no = part_no + "1G | SFP LH"
elif description.endswith(tuple([" LX", "-LX"])):
part_no = part_no + "1G | SFP LX"
elif description.endswith(tuple([" SX", "-SX"])):
part_no = part_no + "1G | SFP SX"
elif description.endswith(tuple([" T", "-T"])):
part_no = part_no + "1G | SFP T"
elif description.endswith(tuple([" TX", "-TX"])):
part_no = part_no + "1G | SFP TX"
elif description.endswith(tuple([" ZX", "-ZX"])):
part_no = part_no + "1G | SFP ZX"
else:
part_no = part_no + "1G | SFP"
elif mbd == 100 and nm == 1310:
part_no = part_no + "100M | SFP BX100U"
elif mbd == 100 and nm == 1550:
part_no = part_no + "100M | SFP BX100D"
elif description.startswith("40G"):
part_no = part_no + "40G | QSFP+"
part_no = part_no + "10G | SFP+"
elif mbd == 12000 and nm == 0:
part_no = part_no + "10G | SFP+ Twinax"
elif mbd >= 1000 and mbd <= 1300 and nm == 0:
if description.endswith(tuple([" EX", "-EX"])):
part_no = part_no + "1G | SFP EX"
elif description.endswith(tuple([" LH", "-LH"])):
part_no = part_no + "1G | SFP LH"
elif description.endswith(tuple([" LX", "-LX"])):