Commit 347c3dc6 authored by Dmitry Lukhtionov's avatar Dmitry Lukhtionov
Browse files

Merge branch 'snr_1' into 'master'

Update NAG.SNR profile

See merge request !6384
parents a7feace4 6b888f0a
{
"name": "NAG | SNR | CLI | Old",
"$collection": "inv.capabilities",
"uuid": "96cb024c-3062-415a-a2a7-d3503d7628c0",
"description": "Supported FoxGate CLI Syntax on SNR Profile",
"type": "bool",
"card_template": null
}
# ---------------------------------------------------------------------
# NAG.SNR.get_arp
# ---------------------------------------------------------------------
# Copyright (C) 2007-2019 The NOC Project
# Copyright (C) 2007-2022 The NOC Project
# See LICENSE for details
# ---------------------------------------------------------------------
# Python modules
import re
from builtins import str
# NOC modules
from noc.core.script.base import BaseScript
from noc.sa.profiles.Generic.get_arp import Script as BaseScript
from noc.sa.interfaces.igetarp import IGetARP
......@@ -20,21 +19,15 @@ class Script(BaseScript):
cache = True
rx_arp = re.compile(r"^(?P<ip>\d+\S+)\s+(?P<mac>\S+)\s+(?P<interface>\S+\d+)", re.MULTILINE)
def execute_snmp(self):
r = []
for v in self.snmp.get_tables(
["1.3.6.1.2.1.4.22.1.1", "1.3.6.1.2.1.4.22.1.2", "1.3.6.1.2.1.4.22.1.3"], bulk=True
):
iface = self.snmp.get("1.3.6.1.2.1.31.1.1.1.1." + str(v[1]), cached=True) # IF-MIB
mac = ":".join(["%02x" % ord(c) for c in v[2]])
# ip = ["%02x" % ord(c) for c in v[3]]
# ip = ".".join(str(int(c, 16)) for c in ip)
r.append({"ip": v[3], "mac": mac, "interface": iface})
return r
rx_arp2 = re.compile(
r"^(?P<ip>\d+\S+)\s+(?P<mac>\S+)\s+\d+\s+(?P<interface>\S+\d+)", re.MULTILINE
)
def execute_cli(self):
r = []
for match in self.rx_arp.finditer(self.cli("show arp")):
r += [match.groupdict()]
if self.is_foxgate_cli:
v = self.cli("show arp all")
r = [match.groupdict() for match in self.rx_arp2.finditer(v)]
else:
v = self.cli("show arp")
r = [match.groupdict() for match in self.rx_arp.finditer(v)]
return r
# ---------------------------------------------------------------------
# NAG.SNR.get_capabilities
# ---------------------------------------------------------------------
# Copyright (C) 2007-2019 The NOC Project
# Copyright (C) 2007-2022 The NOC Project
# See LICENSE for details
# ---------------------------------------------------------------------
......@@ -10,7 +10,6 @@ import re
# NOC modules
from noc.sa.profiles.Generic.get_capabilities import Script as BaseScript
from noc.sa.profiles.Generic.get_capabilities import false_on_cli_error
class Script(BaseScript):
......@@ -19,23 +18,36 @@ class Script(BaseScript):
rx_lldp_en = re.compile(r"LLDP has been enabled globally?")
rx_stack = re.compile(r"-+member :(?P<id>\d+)-+")
@false_on_cli_error
def has_lldp_cli(self):
def get_lldp_cli(self):
"""
Check box has lldp enabled on SNR
"""
cmd = self.cli("show lldp", ignore_errors=True)
return self.rx_lldp_en.search(cmd) is not None
try:
cmd = self.cli("show lldp")
except self.CLISyntaxError:
# FoxGate CLI
cmd = self.cli("show lldp interface")
return "System LLDP: enable" in cmd, True
return self.rx_lldp_en.search(cmd) is not None, False
def execute_platform_cli(self, caps):
def get_stack_members(self):
s = []
try:
s = []
cmd = self.cli("show slot")
for match in self.rx_stack.finditer(cmd):
i = match.group("id")
s += [i]
if s:
caps["Stack | Members"] = len(s) if len(s) != 1 else 0
caps["Stack | Member Ids"] = " | ".join(s)
except Exception:
pass
except self.CLISyntaxError:
return
for match in self.rx_stack.finditer(cmd):
i = match.group("id")
s += [i]
return s
def execute_platform_cli(self, caps):
members = self.get_stack_members()
if members:
caps["Stack | Members"] = len(members) if len(members) != 1 else 0
caps["Stack | Member Ids"] = " | ".join(members)
has_lldp, foxgate_cli = self.get_lldp_cli()
if has_lldp:
caps["Network | LLDP"] = True
if foxgate_cli:
caps["NAG | SNR | CLI | Old"] = True
# ---------------------------------------------------------------------
# NAG.SNR.get_chassis_id
# ---------------------------------------------------------------------
# Copyright (C) 2007-2019 The NOC Project
# Copyright (C) 2007-2022 The NOC Project
# See LICENSE for details
# ---------------------------------------------------------------------
......@@ -20,11 +20,15 @@ class Script(BaseScript):
cache = True
rx_mac = re.compile(r"^\s+\S+\s+mac\s+(\S+)\s*\n", re.MULTILINE | re.IGNORECASE)
rx_mac2 = re.compile(r"^MAC address\s+: (?P<mac>\S+)", re.MULTILINE)
SNMP_GET_OIDS = {"SNMP": [mib["IF-MIB::ifPhysAddress", 1]]}
def execute_cli(self):
macs = sorted(self.rx_mac.findall(self.cli("show version", cached=True)))
if self.is_foxgate_cli:
macs = sorted(self.rx_mac2.findall(self.cli("show ip", cached=True)))
else:
macs = sorted(self.rx_mac.findall(self.cli("show version", cached=True)))
return [
{"first_chassis_mac": f, "last_chassis_mac": t} for f, t in self.macs_to_ranges(macs)
]
# ---------------------------------------------------------------------
# NAG.SNR.get_interfaces
# ---------------------------------------------------------------------
# Copyright (C) 2007-2021 The NOC Project
# Copyright (C) 2007-2022 The NOC Project
# See LICENSE for details
# ---------------------------------------------------------------------
# Python modules
import re
from typing import Set, Dict, Any
from collections import defaultdict
# NOC modules
from noc.core.script.base import BaseScript
......@@ -29,6 +31,24 @@ class Script(BaseScript):
r"(?:^\s+Encapsulation |^\s+Output packets statistics:)",
re.MULTILINE,
)
rx_sh_int_old = re.compile(
r"^\s+(?:Fast|Gigabit) Ethernet (?P<interface>\S+) current state: (?P<admin_status>\S+), port link is (?P<oper_status>\S+)\s*\n"
r"(?:^\s+Port type status : .+\n)?"
r"(?:^\s+Time duration of linkup is .+\n)?"
r"^\s+Hardware address is (?P<mac>\S+)\s*\n"
r"^\s+SetSpeed is .+\n"
r"^\s+Current port type: .+\n"
r"(?:^\s+Transceiver is .+\n)?"
r"(?:^\s+Transceiver Compliance: .+\n)?"
r"^\s+Priority is \d+\s*\n"
r"^\s+Flow control is .+\n"
r"^\s+Broadcast storm control target rate is .+\n"
r"^\s+PVID is (?P<pvid>\d+)\s*\n"
r"^\s+Port mode: (?P<mode>trunk|access)\s*\n"
r"(?:^\s+Untagged\s+VLAN ID: (?P<untagged>\d+)\s*\n)?"
r"(?:^\s+Vlan\s+allowed: (?P<tagged>\S+)\s*\n)?",
re.MULTILINE,
)
rx_hw = re.compile(
r"^\s+Hardware is (?P<hw_type>\S+)(\(card not installed\))?(, active is \S+)?"
r"(,\s+address is (?P<mac>\S+))?\s*\n",
......@@ -50,91 +70,174 @@ class Script(BaseScript):
r"(^Trunk allowed Vlan\s*:\s*(?P<tagged_vlans>\S+)\s*\n)?",
re.MULTILINE,
)
rx_mgmt = re.compile(
r"^ip address\s+: (?P<ip>\S+)\s*\n"
r"^netmask\s+: (?P<mask>\S+)\s*\n"
r"^gateway\s+: .+\n"
r"^ManageVLAN\s+: (?P<vlan_id>\d+)\s*\n"
r"^MAC address\s+: (?P<mac>\S+)",
re.MULTILINE,
)
rx_lag_port = re.compile(r"\s*\S+ is LAG member port, LAG port:(?P<lag_port>\S+)\n")
def execute_cli(self):
interfaces = []
# Get LLDP interfaces
lldp = []
def get_switchport_cli(self) -> Dict[str, Dict[str, Any]]:
if self.is_foxgate_cli:
return {}
# New CLI syntax
result = defaultdict(lambda: {"untagged": None, "tagged": []})
v = self.cli("show switchport interface")
for match in self.rx_vlan.finditer(v):
ifname = match.group("ifname")
result[ifname]["untagged"] = match.group("untagged_vlan")
if match.group("tagged_vlans"):
tagged_vlans = match.group("tagged_vlans").replace(";", ",")
result[ifname]["tagged"] = self.expand_rangelist(tagged_vlans)
return result
def get_interface_lldp(self) -> Set[str]:
lldp = set()
if self.is_foxgate_cli:
return lldp
c = self.cli("show lldp", ignore_errors=True)
if self.rx_lldp_en.search(c):
ll = self.rx_lldp.search(c)
if ll:
lldp = ll.group("local_if").split()
lldp = set(ll.group("local_if").split())
return lldp
def get_interfaces_foxgatecli(self):
"""
For FoxGate Like CLI syntax
:return:
"""
v = self.cli("show interface", cached=True)
interfaces = {}
for match in self.rx_sh_int_old.finditer(v):
ifname = match.group("interface")
sub = {
"name": match.group("interface"),
"admin_status": match.group("admin_status") == "enabled",
"oper_status": match.group("oper_status") == "up",
"mac": match.group("mac"),
"enabled_afi": ["BRIDGE"],
}
if match.group("mode") == "access":
sub["untagged_vlan"] = match.group("untagged")
else:
sub["untagged_vlan"] = match.group("pvid")
sub["tagged_vlans"] = self.expand_rangelist(match.group("tagged"))
interfaces[ifname] = {
"name": ifname,
"type": "physical",
"admin_status": match.group("admin_status") == "enabled",
"oper_status": match.group("oper_status") == "up",
"mac": match.group("mac"),
"subinterfaces": [sub],
}
v = self.cli("show ip", cached=True)
match = self.rx_mgmt.search(v)
ip_address = f'{match.group("ip")}/{IPv4.netmask_to_len(match.group("mask"))}'
interfaces["system"] = {
"name": "system",
"type": "SVI",
"admin_status": True,
"oper_status": True,
"mac": match.group("mac"),
"subinterfaces": [
{
"name": "system",
"admin_status": True,
"oper_status": True,
"mac": match.group("mac"),
"enabled_afi": ["IPv4"],
"ipv4_addresses": [ip_address],
"vlan_ids": match.group("vlan_id"),
}
],
}
return [{"interfaces": list(interfaces.values())}]
def execute_cli(self, **kwargs):
interfaces = {}
if self.is_foxgate_cli:
return self.get_interfaces_foxgatecli()
# Get LLDP enabled interfaces
lldp = self.get_interface_lldp()
# Get switchports and fill tagged/untagged lists if they are not empty
switchports = self.get_switchport_cli()
v = self.cli("show interface", cached=True)
for match in self.rx_sh_int.finditer(v):
name = match.group("interface")
ifname = match.group("interface")
a_stat = match.group("admin_status").lower() == "up"
o_stat = match.group("oper_status").lower() == "up"
sub = {
"name": ifname,
"admin_status": a_stat,
"oper_status": o_stat,
"enabled_protocols": [],
"enabled_afi": [],
}
# Switchport
if ifname in switchports:
# Bridge
sub["enabled_afi"] += ["BRIDGE"]
u, t = switchports[ifname]["untagged"], switchports[ifname].get("tagged")
if u:
sub["untagged_vlan"] = u
if t:
sub["tagged_vlans"] = t
# Other
other = match.group("other")
match1 = self.rx_hw.search(other)
# MTU
match1 = self.rx_mtu.search(other)
if match1:
sub["mtu"] = match1.group("mtu")
# PVID
match1 = self.rx_pvid.search(other)
if match1:
sub["untagged_vlan"] = match1.group("pvid")
if ifname.startswith("Vlan"):
sub["vlan_ids"] = [int(ifname[4:])]
# IP Address
match1 = self.rx_ip.search(other)
if match1 and "NULL" not in match1.group("ip"):
ip_address = f'{match1.group("ip")}/{IPv4.netmask_to_len(match1.group("mask"))}'
sub["ipv4_addresses"] = [ip_address]
sub["enabled_afi"] = ["IPv4"]
iface = {
"type": self.profile.get_interface_type(name),
"name": name,
"name": ifname,
"admin_status": a_stat,
"oper_status": o_stat,
"type": self.profile.get_interface_type(ifname),
"enabled_protocols": [],
"subinterfaces": [sub],
}
sub = {"name": name, "admin_status": a_stat, "oper_status": o_stat}
# LLDP protocol
if name in lldp:
iface["enabled_protocols"] = ["LLDP"]
if iface["type"] == "physical":
sub["enabled_afi"] = ["BRIDGE"]
# MAC
match1 = self.rx_hw.search(other)
if match1.group("mac"):
iface["mac"] = match1.group("mac")
sub["mac"] = match1.group("mac")
# Description
match1 = self.rx_alias.search(other)
if match1 and match1.group("alias") != "(null),":
iface["description"] = match1.group("alias")
sub["description"] = match1.group("alias")
# Ifindex
match1 = self.rx_index.search(other)
if match1:
iface["snmp_ifindex"] = match1.group("ifindex")
sub["snmp_ifindex"] = match1.group("ifindex")
else:
if match.group("snmp_ifindex"):
iface["snmp_ifindex"] = match.group("snmp_ifindex")
sub["snmp_ifindex"] = match.group("snmp_ifindex")
# Correct alias and index on some device
match1 = self.rx_alias_and_index.search(other)
if match1:
if match1.group("alias") != "(null)":
iface["description"] = match1.group("alias")
sub["description"] = match1.group("alias")
iface["snmp_ifindex"] = match1.group("ifindex")
sub["snmp_ifindex"] = match1.group("ifindex")
match1 = self.rx_mtu.search(other)
if match1:
sub["mtu"] = match1.group("mtu")
match1 = self.rx_pvid.search(other)
if match1:
sub["untagged_vlan"] = match1.group("pvid")
if name.startswith("Vlan"):
sub["vlan_ids"] = [int(name[4:])]
elif match.group("snmp_ifindex"):
iface["snmp_ifindex"] = match.group("snmp_ifindex")
sub["snmp_ifindex"] = match.group("snmp_ifindex")
# LAG
match1 = self.rx_lag_port.search(other)
if match1:
iface["aggregated_interface"] = match1.group("lag_port")
match1 = self.rx_ip.search(other)
if match1:
if "NULL" in match1.group("ip"):
continue
ip_address = "%s/%s" % (
match1.group("ip"),
IPv4.netmask_to_len(match1.group("mask")),
)
sub["ipv4_addresses"] = [ip_address]
sub["enabled_afi"] = ["IPv4"]
iface["subinterfaces"] = [sub]
interfaces += [iface]
v = self.cli("show switchport interface")
for match in self.rx_vlan.finditer(v):
ifname = match.group("ifname")
untagged_vlan = match.group("untagged_vlan")
for i in interfaces:
if ifname == i["name"]:
i["subinterfaces"][0]["untagged_vlan"] = untagged_vlan
if match.group("tagged_vlans"):
tagged_vlans = match.group("tagged_vlans").replace(";", ",")
i["subinterfaces"][0]["tagged_vlans"] = self.expand_rangelist(tagged_vlans)
break
return [{"interfaces": interfaces}]
# LLDP protocol
if ifname in lldp:
iface["enabled_protocols"] = ["LLDP"]
interfaces[ifname] = iface
return [{"interfaces": list(interfaces.values())}]
# ---------------------------------------------------------------------
# NAG.SNR.get_inventory
# ---------------------------------------------------------------------
# Copyright (C) 2007-2019 The NOC Project
# Copyright (C) 2007-2022 The NOC Project
# See LICENSE for details
# ---------------------------------------------------------------------
......@@ -50,134 +50,135 @@ class Script(BaseScript):
"Dec": "12",
}
def execute(self):
def execute_cli(self, **kwargs):
r = []
s = self.scripts.get_version()
revision = s["attributes"]["HW version"]
serial = s["attributes"]["Serial Number"]
part_no = s["platform"]
slot = self.cli("show slot")
slot_id = 0
if "Invalid" in slot:
p = {
"type": "CHASSIS",
"vendor": "NAG",
"part_no": part_no,
"revision": revision,
"serial": serial,
"description": "",
}
r += [p]
vendor = s["vendor"]
if self.is_foxgate_cli:
r += [
{
"type": "CHASSIS",
"vendor": vendor,
"part_no": part_no,
"revision": revision,
"serial": serial,
"description": "",
}
]
r += self.get_transceivers(1)
else:
for match in self.rx_stack.finditer(slot):
mfg_date = match.group("mfg_date")
date = mfg_date.replace("/", "-")
slot_id += 1
sl = {
return r
try:
slot = self.cli("show slot")
except self.CLISyntaxError:
slot = "Invalid"
for slot_id, match in enumerate(self.rx_stack.finditer(slot), start=1):
mfg_date = match.group("mfg_date")
date = mfg_date.replace("/", "-")
r += [
{
"type": "CHASSIS",
"number": slot_id,
"vendor": "NAG",
"vendor": vendor,
"mfg_date": date,
"part_no": part_no,
"revision": match.group("hardware"),
"serial": match.group("serial"),
"description": "",
}
r += [sl]
r += self.get_transceivers(slot_id)
]
r += self.get_transceivers(slot_id)
return r
def get_transceivers(self, slot_id):
out = []
try:
c = self.cli("show transceiver detail", cached=True)
for match in self.rx_media_type.finditer(c):
description = match.group("part_no")
mbd = int(match.group("mbd"))
nm = match.group("nm")
ch_id = int(match.group("ch_id"))
if ch_id == slot_id:
if not nm:
nm = 0
except self.CLISyntaxError:
return out
for match in self.rx_media_type.finditer(c):
description = match.group("part_no")
mbd = int(match.group("mbd"))
nm = match.group("nm")
ch_id = int(match.group("ch_id"))
if ch_id == slot_id:
nm = nm or 0
nm = int(nm)
vendor = "NONAME"
part_no = "NoName | Transceiver | "
if mbd == 1300 and nm == 1310:
part_no = part_no + "1G | SFP BXU"
elif mbd == 1300 and nm == 1550:
part_no = part_no + "1G | SFP BXD"
elif mbd == 1200 and nm == 1310:
part_no = part_no + "1G | SFP BX10U"
elif mbd >= 1200 and nm == 1490:
part_no = part_no + "1G | SFP BX10D"
elif mbd == 10300 and nm == 0 and description.startswith("unkn"):
part_no = part_no + "10G | SFP+ Twinax"
elif mbd == 10300 and description.startswith("10G"):
if description.endswith(tuple([" ER", "-ER"])):
part_no = part_no + "10G | SFP+ ER"
elif description.endswith(tuple([" LR", "-LR"])):
part_no = part_no + "10G | SFP+ LR"
elif description.endswith(tuple([" SR", "-SR"])):
part_no = part_no + "10G | SFP+ SR"
elif description.endswith(tuple([" ZR", "-ZR"])):
part_no = part_no + "10G | SFP+ ZR"
else:
nm = int(nm)
vendor = "NONAME"
part_no = "NoName | Transceiver | "
if mbd == 1300 and nm == 1310:
part_no = part_no + "1G | SFP BXU"
elif mbd == 1300 and nm == 1550:
part_no = part_no + "1G | SFP BXD"
elif mbd == 1200 and nm == 1310:
part_no = part_no + "1G | SFP BX10U"
elif mbd >= 1200 and nm == 1490:
part_no = part_no + "1G | SFP BX10D"
elif mbd == 10300 and nm == 0 and description.startswith("unkn"):
part_no = part_no + "10G | SFP+ Twinax"
elif mbd == 10300 and description.startswith("10G"):
if description.endswith(tuple([" ER", "-ER"])):
part_no = part_no + "10G | SFP+ ER"
elif description.endswith(tuple([" LR", "-LR"])):
part_no = part_no + "10G | SFP+ LR"
elif description.endswith(tuple([" SR", "-SR"])):
part_no = part_no + "10G | SFP+ SR"
elif description.endswith(tuple([" ZR", "-ZR"])):
part_no = part_no + "10G | SFP+ ZR"
else:
part_no = part_no + "10G | SFP+"
elif mbd == 12000 and nm == 0:
part_no = part_no + "10G | SFP+ Twinax"
elif mbd >= 1000 and mbd <= 1300 and nm == 0:
if description.endswith(tuple([" EX", "-EX"])):
part_no = part_no + "1G | SFP EX"
elif description.endswith(tuple([" LH", "-LH"])):
part_no = part_no + "1G | SFP LH"
elif description.endswith(tuple([" LX", "-LX"])):
part_no = part_no + "1G | SFP LX"
elif description.endswith(tuple([" SX", "-SX"])):
part_no = part_no + "1G | SFP SX"
elif description.endswith(tuple([" T", "-T"])):
part_no = part_no + "1G | SFP T"
elif description.endswith(tuple([" TX", "-TX"])):
part_no = part_no + "1G | SFP TX"
elif description.endswith(tuple([" ZX", "-ZX"])):
part_no = part_no + "1G | SFP ZX"