Commit 8b671348 authored by Andrey Vertiprahov's avatar Andrey Vertiprahov
Browse files

Merge branch 'fix-avs-hpe-comware' into 'master'

HP.Comware. Fix prompt for hpe.

See merge request noc/noc!4634
parents 7c549ca1 1f9f1810
......@@ -5,10 +5,12 @@
# See LICENSE for details
# ---------------------------------------------------------------------
# Python modules
import re
from noc.core.script.base import BaseScript
# NOC modules
from noc.sa.profiles.Generic.get_chassis_id import Script as BaseScript
from noc.sa.interfaces.igetchassisid import IGetChassisID
import re
class Script(BaseScript):
......@@ -18,7 +20,7 @@ class Script(BaseScript):
rx_id = re.compile(r"^\s*MAC_ADDRESS\s+:\s+(?P<id>\S+)", re.IGNORECASE | re.MULTILINE)
def execute(self):
def execute_cli(self, **kwargs):
match = self.re_search(self.rx_id, self.cli("display device manuinfo", cached=True))
mac = match.group("id")
return {"first_chassis_mac": mac, "last_chassis_mac": mac}
......@@ -15,7 +15,8 @@ class Script(BaseScript):
name = "HP.Comware.get_config"
interface = IGetConfig
def execute_cli(self, **kwargs):
def execute_cli(self, policy="r"):
assert policy in ("r", "s")
self.cli("undo terminal monitor")
config = self.cli("display current-configuration")
config = self.profile.clean_spaces(config)
......
# ---------------------------------------------------------------------
# HP.Comware.get_fqdn
# ---------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ---------------------------------------------------------------------
# NOC modules
from noc.sa.profiles.Generic.get_fqdn import Script as BaseScript
from noc.sa.interfaces.igetfqdn import IGetFQDN
class Script(BaseScript):
name = "HP.Comware.get_fqdn"
interface = IGetFQDN
always_prefer = "S"
......@@ -11,19 +11,13 @@ import re
# NOC modules
from noc.core.script.base import BaseScript
from noc.sa.interfaces.igetinterfaces import IGetInterfaces
from noc.core.text import parse_kv
class Script(BaseScript):
name = "HP.Comware.get_interfaces"
interface = IGetInterfaces
rx_sh_int = re.compile(
r"^\s*(?P<interface>\S+) current state\s*:\s*(?P<oper_status>UP|DOWN|DOWN \( Administratively \))\s*\n"
r"(^\s*Line protocol current state\s*:\s*(?P<line_status>UP|UP \(spoofing\)|DOWN|DOWN \( Administratively \))\s*\n)?"
r"(^\s*IP (?:Packet Frame Type:|Sending Frames\' Format is) PKTFMT_ETHNT_2, Hardware Address(?: is|:) (?P<mac>\S+)\s*\n)?"
r"^\s*Description\s*:(?P<descr>[^\n]*)\n",
re.MULTILINE | re.IGNORECASE | re.DOTALL,
)
rx_mtu = re.compile(r"The Maximum Frame Length is (?P<mtu>\d+)")
rx_port_type = re.compile(r"Port link-type: (?P<port_type>hybrid|access|trunk)")
rx_port_other = re.compile(
......@@ -39,194 +33,158 @@ class Script(BaseScript):
rx_ip = re.compile(r"Internet Address is (?P<ip>\S+) Primary")
rx_ips = re.compile(r"Internet Address is (?P<ip>\S+) Sub")
rx_mac = re.compile(
r"IP (?:Packet Frame Type:|Sending Frames' Format is) PKTFMT_ETHNT_2, Hardware Address(?: is|:) (?P<mac>\S+)"
)
rx_sh_vlan = re.compile(
r"^(?P<interface>\S+) current state\s*:\s*(?P<oper_status>UP|DOWN|DOWN \( Administratively \))\s*\n"
r"^Line protocol current state\s*:\s*(?P<line_status>UP|UP \(spoofing\)|DOWN|DOWN \( Administratively \))\s*\n"
r"(^IP (?:Packet Frame Type:|Sending Frames\' Format is) PKTFMT_ETHNT_2, Hardware address(?: is|:) (?P<mac>\S+)\s*\n)?"
r"(^Internet Address is (?P<ip>\S+) Primary\s*\n)?"
r"(^Internet protocol processing\s*:\s*\S+\s*\n)?"
r"^Description\s*:(?P<descr>.*?)\n"
r"^The Maximum Transmit Unit is (?P<mtu>\d+)",
re.MULTILINE,
r"IP (?:Packet Frame Type:|Sending Frames' Format is)\s*PKTFMT_ETHNT_2, Hardware Address(?: is|:) (?P<mac>\S+)"
)
rx_name = re.compile(r"^Vlan-interface(?P<vlan>\d+)?")
rx_vlan_name = re.compile(r"^Vlan-interface(?P<vlan>\d+)?")
rx_isis = re.compile(r"Interface:\s+(?P<iface>\S+)")
rx_sub_default_vlan = re.compile(r"\(default vlan\),?")
rx_parse_interface_vlan = re.compile(r"(\d+)(?:\(.+\))?")
rx_iface_block_splitter = re.compile(r"^\s*\S+\d+(?:/\d+)+", re.MULTILINE)
def execute(self):
isis = []
def get_isis_interfaces(self):
r = []
try:
v = self.cli("display isis interface")
for match in self.rx_isis.finditer(v):
isis += [match.group("iface")]
r += [match.group("iface")]
except self.CLISyntaxError:
pass
return r
interface_map = {
"current state": "oper_status",
"line protocol state": "line_status",
"description": "description",
"maximum transmit unit": "mtu",
"port link-type": "port_type",
"pvid": "pvid",
"untagged vlan id": "untagged_vlan",
"untagged vlan": "untagged_vlan",
"tagged vlan id": "tagged_vlans",
"vlan passing": "vlan_passing",
"vlan permitted": "vlan_permitted",
}
def parse_interface_block(self, block):
r = parse_kv(self.interface_map, block)
if "mtu" not in r and self.rx_mtu.search(block):
r["mtu"] = self.rx_mtu.search(block).group(1)
if self.rx_mac.search(block):
r["mac"] = self.rx_mac.search(block).group(1)
ip_match = self.rx_ip.search(block)
if ip_match:
r["ip"] = ip_match.group(1)
if "tagged_vlans" in r and self.rx_sub_default_vlan.search(r["tagged_vlans"]):
r["tagged_vlans"] = r["tagged_vlans"].replace("(default vlan)", "")
if "vlan_passing" in r and self.rx_sub_default_vlan.search(r["vlan_passing"]):
r["vlan_passing"] = r["vlan_passing"].replace("(default vlan)", "")
if "vlan_permitted" in r and self.rx_sub_default_vlan.search(r["vlan_permitted"]):
r["vlan_permitted"] = r["vlan_permitted"].replace("(default vlan)", "")
if "untagged_vlan" not in r and "pvid" in r:
r["untagged_vlan"] = r["pvid"]
if "untagged_vlan" in r:
r["untagged_vlan"] = self.rx_parse_interface_vlan.match(r["untagged_vlan"]).group(1)
return r
def iter_block(self, v):
for b in v.split("\n\n"):
start = 0
for match in self.rx_iface_block_splitter.finditer(b):
"""
Fixed
0 aborts, 0 deferred, 0 collisions, 0 late collisions
- lost carrier, - no carrier
GigabitEthernet1/3
Current state: UP
"""
if match.start() > 0:
start = match.start()
yield b[:start]
yield b[start:]
if start:
continue
else:
yield b
def execute_cli(self, **kwargs):
isis = self.get_isis_interfaces()
# Get portchannels
portchannel_members = {}
for pc in self.scripts.get_portchannel():
i = pc["interface"]
t = pc["type"] == "L"
for m in pc["members"]:
portchannel_members[m] = (i, t)
interfaces = []
v = self.cli("display interface").split("\n\n")
for i in v:
match = self.rx_sh_int.search(i)
if not match:
interfaces = {}
v = self.cli("display interface")
# "display interface Vlan-interface"
# "display interface NULL"
for block in self.iter_block(v):
if not block:
continue
ifname = match.group("interface")
if ifname.startswith("Bridge-Aggregation") or ifname.startswith("Route-Aggregation"):
iftype = "aggregated"
elif ifname.startswith("LoopBack"):
iftype = "loopback"
elif ifname.startswith("Vlan-interface"):
ifname, block = block.split(None, 1)
# print(ifname, "\n\n", block, "\n\n")
if ifname in interfaces:
continue
elif ifname.startswith("NULL"):
r = self.parse_interface_block(block)
if not r:
continue
iftype = self.profile.get_interface_type(ifname)
self.logger.info("Process interface: %s", ifname)
o_status = r.get("oper_status", "").lower() == "up"
a_status = False if "Administratively" in r.get("oper_status", "") else True
name = ifname
vlan_ids = 0
if "." in ifname:
ifname, vlan_ids = ifname.split(".", 1)
else:
iftype = "physical"
o_stat = match.group("oper_status").lower() == "up"
if match.group("oper_status") == r"DOWN ( Administratively\)":
a_stat = False
else:
a_stat = True
iface = {
"name": ifname,
"type": iftype,
"admin_status": a_stat,
"oper_status": o_stat,
"enabled_protocols": [],
"subinterfaces": [],
}
interfaces[ifname] = {
"name": ifname,
"type": iftype,
"admin_status": a_status,
"oper_status": o_status,
"enabled_protocols": [],
"subinterfaces": [],
}
if "description" in r:
interfaces[ifname]["description"] = r["description"]
if "mac" in r:
interfaces[ifname]["mac"] = r["mac"]
if ifname in portchannel_members:
ai, is_lacp = portchannel_members[ifname]
interfaces[ifname]["aggregated_interface"] = ai
interfaces[ifname]["enabled_protocols"] += ["LACP"]
if self.rx_vlan_name.match(ifname):
vlan_ids = self.rx_vlan_name.match(name).group(1)
sub = {
"name": ifname,
"admin_status": a_stat,
"oper_status": o_stat,
"enabled_afi": ["BRIDGE"],
"name": name,
"admin_status": a_status,
"oper_status": o_status,
"enabled_protocols": [],
"enabled_afi": [],
}
if ifname in isis:
sub["enabled_protocols"] += ["ISIS"]
if match.group("mac"):
iface["mac"] = match.group("mac")
sub["mac"] = match.group("mac")
if match.group("descr"):
iface["description"] = match.group("descr").strip()
sub["description"] = match.group("descr").strip()
match1 = self.rx_mtu.search(i)
if match1:
sub["mtu"] = int(match1.group("mtu"))
match1 = self.rx_ip.search(i)
if match1:
if "mac" in r:
sub["mac"] = r["mac"]
if "ip" in r:
sub["enabled_afi"] += ["IPv4"]
sub["ipv4_addresses"] = [match1.group("ip")]
match1 = self.rx_port_type.search(i)
if match1:
port_type = match1.group("port_type")
if port_type in ["access", "hybrid"]:
match2 = self.rx_port_other.search(i)
if match2.group("untagged") and match2.group("untagged") != "none":
sub["untagged_vlan"] = int(match2.group("untagged"))
if match2.group("tagged") and match2.group("tagged") != "none":
sub["tagged_vlan"] = self.expand_rangelist(match2.group("tagged"))
if port_type == "trunk":
match2 = self.rx_port_trunk.search(i)
if match2.group("passing") and match2.group("passing") != "none":
passing = re.sub(self.rx_sub_default_vlan, "", match2.group("passing"))
sub["tagged_vlan"] = self.expand_rangelist(passing)
iface["subinterfaces"] += [sub]
if ifname in portchannel_members:
ai, is_lacp = portchannel_members[ifname]
iface["aggregated_interface"] = ai
iface["enabled_protocols"] += ["LACP"]
interfaces += [iface]
v = self.cli("display interface Vlan-interface").split("\n\n")
for i in v:
match = self.rx_sh_vlan.search(i)
if not match:
continue
ifname = match.group("interface")
o_stat = match.group("oper_status").lower() == "up"
if match.group("oper_status") == r"DOWN ( Administratively\)":
a_stat = False
else:
a_stat = True
iface = {
"name": ifname,
"type": "SVI",
"admin_status": a_stat,
"oper_status": o_stat,
"enabled_protocols": [],
"subinterfaces": [],
}
sub = {
"name": ifname,
"admin_status": a_stat,
"oper_status": o_stat,
"enabled_afi": [],
}
if match.group("descr"):
iface["description"] = match.group("descr").strip()
sub["description"] = match.group("descr").strip()
if match.group("mtu"):
sub["mtu"] = int(match.group("mtu"))
match1 = self.rx_mac.search(i)
if match1:
iface["mac"] = match1.group("mac")
sub["mac"] = match1.group("mac")
match1 = self.rx_ip.search(i)
if match1:
sub["enabled_afi"] += ["IPv4"]
sub["ipv4_addresses"] = [match1.group("ip")]
match1 = self.rx_ips.search(i)
if match1:
sub["ipv4_addresses"] += [match1.group("ip")]
vlan = self.rx_name.search(ifname).group("vlan")
sub["vlan_ids"] = [vlan]
iface["subinterfaces"] += [sub]
interfaces += [iface]
v = self.cli("display interface NULL").split("\n\n")
for i in v:
match = self.rx_sh_vlan.search(i)
if not match:
continue
ifname = match.group("interface")
o_stat = match.group("oper_status").lower() == "up (spoofing)"
if match.group("oper_status") == r"DOWN ( Administratively\)":
a_stat = False
else:
a_stat = True
iface = {
"name": ifname,
"type": "null",
"admin_status": a_stat,
"oper_status": o_stat,
"enabled_protocols": [],
"subinterfaces": [],
}
sub = {
"name": ifname,
"admin_status": a_stat,
"oper_status": o_stat,
"enabled_afi": [],
}
if match.group("descr"):
iface["description"] = match.group("descr").strip()
sub["description"] = match.group("descr").strip()
if match.group("mtu"):
sub["mtu"] = int(match.group("mtu"))
match1 = self.rx_mac.search(i)
if match1:
iface["mac"] = match1.group("mac")
sub["mac"] = match1.group("mac")
match1 = self.rx_ip.search(i)
if match1:
sub["enabled_afi"] += ["IPv4"]
sub["ipv4_addresses"] = [match1.group("ip")]
match1 = self.rx_ips.search(i)
if match1:
sub["ipv4_addresses"] += [match1.group("ip")]
iface["subinterfaces"] += [sub]
interfaces += [iface]
return [{"interfaces": interfaces}]
sub["ipv4_addresses"] = [r["ip"]]
if vlan_ids:
sub["vlan_ids"] = [int(vlan_ids)]
if "port_type" in r:
sub["enabled_afi"] += ["BRIDGE"]
# Bridge interface
if r["port_type"] in ["access", "hybrid"] and "untagged_vlan" in r:
sub["untagged_vlan"] = int(r["untagged_vlan"])
if r["port_type"] in ["access", "hybrid"] and "tagged_vlans" in r:
sub["tagged_vlans"] = self.expand_rangelist((r["tagged_vlans"]))
if r["port_type"] == "trunk" and "vlan_permitted" in r:
sub["tagged_vlans"] = self.expand_rangelist(r["vlan_permitted"])
interfaces[ifname]["subinterfaces"] += [sub]
return [{"interfaces": list(interfaces.values())}]
......@@ -10,78 +10,122 @@
import re
# NOC modules
from noc.core.script.base import BaseScript
from noc.sa.profiles.Generic.get_lldp_neighbors import Script as BaseScript
from noc.sa.interfaces.igetlldpneighbors import IGetLLDPNeighbors
from noc.core.lldp import (
LLDP_CHASSIS_SUBTYPE_CHASSIS_COMPONENT,
LLDP_CHASSIS_SUBTYPE_PORT_COMPONENT,
LLDP_CHASSIS_SUBTYPE_INTERFACE_ALIAS,
LLDP_CHASSIS_SUBTYPE_INTERFACE_NAME,
LLDP_CHASSIS_SUBTYPE_NETWORK_ADDRESS,
LLDP_CHASSIS_SUBTYPE_LOCAL,
LLDP_CHASSIS_SUBTYPE_MAC,
LLDP_PORT_SUBTYPE_ALIAS,
LLDP_PORT_SUBTYPE_COMPONENT,
LLDP_PORT_SUBTYPE_NAME,
LLDP_PORT_SUBTYPE_MAC,
LLDP_PORT_SUBTYPE_NETWORK_ADDRESS,
LLDP_PORT_SUBTYPE_AGENT_CIRCUIT_ID,
LLDP_PORT_SUBTYPE_LOCAL,
LLDP_CAP_NAMES,
lldp_caps_to_bits,
)
from noc.core.text import parse_kv
class Script(BaseScript):
name = "HP.Comware.get_lldp_neighbors"
interface = IGetLLDPNeighbors
rx_item = re.compile(
r"^LLDP neighbor-information of port \d+\[(?P<interface>\S+)\]\:\n"
r"^ Neighbor index : \d+\n"
r"^ Update time : .+?\n"
r"^ Chassis type : (?P<chassis_type>.+?)\n"
r"^ Chassis ID : (?P<chassis_id>\S+)\n"
r"^ Port ID type : (?P<port_id_type>.+?)\n"
r"^ Port ID : (?P<port_id>.+?)\n"
r"^ Port description : (?P<port_descr>.+?)\n"
r"^ System name : (?P<system_name>.+?)\n"
r"(^ System description : (?P<system_descr>.+?)\n)?"
r"(^ System capabilities supported : (?P<system_caps_s>.+?)\n)?",
re.MULTILINE,
)
lldp_kv_map = {
"neighbor index": "neighbor_index",
"lldp neighbor index": "neighbor_index",
"update time": "update_time",
"chassis type": "remote_chassis_id_subtype",
"chassis id": "remote_chassis_id",
"port id type": "remote_port_subtype",
"port id": "remote_port",
"port description": "remote_port_description",
"system name": "remote_system_name",
"system description": "remote_system_description",
"system capabilities supported": "remote_capabilities",
"capabilities": "remote_capabilities",
"chassisid/subtype": "remote_chassis_id_type",
"portid/subtype": "remote_port_type",
}
def execute(self):
r = []
rx_lldp_neighbor_split = re.compile(r"LLDP neighbor-information of port")
rx_local_port = re.compile(r"^\s*\d+\[(?P<interface>\S+)\]\:")
chassis_type_map = {
"chassis component": LLDP_CHASSIS_SUBTYPE_CHASSIS_COMPONENT,
"interface alias": LLDP_CHASSIS_SUBTYPE_INTERFACE_ALIAS,
"port component": LLDP_CHASSIS_SUBTYPE_PORT_COMPONENT,
"mac address": LLDP_CHASSIS_SUBTYPE_MAC,
"network address": LLDP_CHASSIS_SUBTYPE_NETWORK_ADDRESS,
"interface name": LLDP_CHASSIS_SUBTYPE_INTERFACE_NAME,
"local": LLDP_CHASSIS_SUBTYPE_LOCAL,
}
port_type_map = {
"interface alias": LLDP_PORT_SUBTYPE_ALIAS,
"port component": LLDP_PORT_SUBTYPE_COMPONENT,
"mac address": LLDP_PORT_SUBTYPE_MAC,
"network address": LLDP_PORT_SUBTYPE_NETWORK_ADDRESS,
"interface name": LLDP_PORT_SUBTYPE_NAME,
"agent circuit id": LLDP_PORT_SUBTYPE_AGENT_CIRCUIT_ID,
"locally assigned": LLDP_PORT_SUBTYPE_LOCAL,
}
def execute_cli(self, **kwargs):
result = []
try:
v = self.cli("display lldp neighbor-information")
except self.CLISyntaxError:
return []
for match in self.rx_item.finditer(v):
i = {"local_interface": match.group("interface"), "neighbors": []}
for nei in self.rx_lldp_neighbor_split.split(v):
if not nei.strip():
continue
match = self.rx_local_port.search(nei)
if not match:
self.logger.info("Not find local port")
continue
n = {}
n["remote_chassis_id_subtype"] = {
"Chassis Component": 1,
"Interface Alias": 2,
"Port Component": 3,
"MAC Address": 4,
"Network Address": 5,
"Interface Name": 6,
"local": 7,
}.get(match.group("chassis_type"))
n["remote_chassis_id"] = match.group("chassis_id").strip()
n["remote_port_subtype"] = {
"Interface Alias": 1,
"Port Component": 2,
"MAC Address": 3,
"Network Address": 4,
"Interface Name": 5,
"Agent Circuit ID": 6,
"Locally assigned": 7,
}.get(match.group("port_id_type"))
n["remote_port"] = match.group("port_id").strip()
n["remote_port_description"] = match.group("port_descr").strip()
n["remote_system_name"] = match.group("system_name").strip()
if match.group("system_descr"):
n["remote_system_description"] = match.group("system_descr").strip()
if match.group("system_caps_s"):
cap = 0
for c in match.group("system_caps_s").split(","):
c = c.strip()
if c:
cap |= {
"Other": 1,
"Repeater": 2,
"Bridge": 4,
"WLAN Access Point": 8,
"Router": 16,
"TTelephone": 32,
"DOCSIS Cable Device": 64,
"Station Only": 128,
}[c]
n["remote_capabilities"] = cap
i["neighbors"] += [n]
r += [i]
return r
r = parse_kv(self.lldp_kv_map, nei)
if "remote_chassis_id_subtype" in r:
n["remote_chassis_id_subtype"] = r["remote_chassis_id_subtype"]
if "remote_chassis_id_type" in r:
n["remote_chassis_id"], n["remote_chassis_id_subtype"] = r[
"remote_chassis_id_type"
].rsplit("/", 1)
if "remote_chassis_id" in r:
n["remote_chassis_id"] = r["remote_chassis_id"]
if "remote_chassis_id" not in n:
self.logger.warning(
"[%s] Not found remote chassis id on output", match.group("interface")
)
continue
n["remote_chassis_id_subtype"] = self.chassis_type_map[
n["remote_chassis_id_subtype"].lower()
]
# Port fields
if "remote_port_subtype" in r:
n["remote_port_subtype"] = r["remote_port_subtype"]
if "remote_port_type" in r:
n["remote_port"], n["remote_port_subtype"] = r["remote_port_type"].rsplit("/", 1)
if "remote_port" in r:
n["remote_port"] = r["remote_port"]
if "remote_port" not in n:
self.logger.warning("[%s] Not found port id on output", match.group("interface"))
continue
n["remote_port_subtype"] = self.port_type_map[n["remote_port_subtype"].lower()]
if "remote_system_description" in r:
n["remote_system_description"] = r["remote_system_description"].strip()
if "remote_capabilities" in r:
n["remote_capabilities"] = lldp_caps_to_bits(
[x.strip() for x in r["remote_capabilities"].split(",")],
{LLDP_CAP_NAMES[x1]: x1 for x1 in LLDP_CAP_NAMES},
)
result += [{"local_interface": match.group("interface"), "neighbors": [n]}]
return result