Commit 24f50f16 authored by Dmitry Lukhtionov's avatar Dmitry Lukhtionov
Browse files

Merge branch '3Com-4500' into 'microservices'

3Com 4500

See merge request !229
parents 84dfe818 52cce6b1
Pipeline #2853 passed with stage
in 1 minute and 52 seconds
# -*- coding: utf-8 -*-
##----------------------------------------------------------------------
## Vendor: 3Com
## OS: 4500
##----------------------------------------------------------------------
## Copyright (C) 2007-2013 The NOC Project
## See LICENSE for details
##----------------------------------------------------------------------
## NOC modules
#from noc.sa.profiles import Profile as NOCProfile
#from noc.sa.models import ManagedObject
from noc.core.profile.base import BaseProfile
import re
class Profile(BaseProfile):
name = "3Com.4500"
##supported_schemes = [BaseProfile.TELNET, BaseProfile.SSH]
pattern_username = r"^Username:"
pattern_password = r"^(Password:|Please input password:)"
pattern_more = [
(r"^\s+---- More ----$", " "),
(r"The current configuration will be written to the device. Are you sure? [Y/N]:", "Y"),
(r"(To leave the existing filename unchanged, press the enter key):", "\n"),
(r"flash:/startup.cfg exists, overwrite? [Y/N]:", "Y"),
]
pattern_prompt = r"^[<\[]\S+[>\]]"
pattern_syntax_error = r"^\s+% (Unrecognized|Incomplete) command found at '\^' position.$"
command_save_config = "save"
#command_enter_config = "system-view"
command_leave_config = "return"
command_exit = "quit"
convert_interface_name = BaseProfile.convert_interface_name_cisco
def setup_session(self, script):
# Yuo may change password instead 512900
# script.cli("_cmdline-mode on\nY\n512900\nsystem-view\n")
script.cli("system-view")
"""
obj = ManagedObject.objects.get()
passwd = obj.super_password()
script.cli("_cmdline-mode on\nY\n" + passwd)
"""
# -*- coding: utf-8 -*-
##----------------------------------------------------------------------
## 3Com.4500.get_arp
##----------------------------------------------------------------------
## Copyright (C) 2007-2013 The NOC Project
## See LICENSE for details
##----------------------------------------------------------------------
## Python modules
import re
## NOC modules
from noc.core.script.base import BaseScript
from noc.sa.interfaces.igetarp import IGetARP
class Script(BaseScript):
name = "3Com.4500.get_arp"
interface = IGetARP
cache = True
rx_line = re.compile(
r"^(?P<ip>\S+)\s+(?P<mac>\S+)\s+\d+\s+(?P<interface>\S+)\s+\d+\s+\S$",
re.MULTILINE)
def execute(self):
r = []
# Try SNMP first
# working but give vlan interface instead port name
if self.has_snmp():
try:
for v in self.snmp.get_tables(
["1.3.6.1.2.1.4.22.1.1", "1.3.6.1.2.1.4.22.1.2",
"1.3.6.1.2.1.4.22.1.3"], bulk=True):
iface = self.snmp.get("1.3.6.1.2.1.31.1.1.1.1." + v[1],
cached=True) # IF-MIB
mac = ":".join(["%02x" % ord(c) for c in v[2]])
ip = ["%02x" % ord(c) for c in v[3]]
ip = ".".join(str(int(c, 16)) for c in ip)
r.append({"ip": ip,
"mac": mac,
"interface": iface,
})
return r
except self.snmp.TimeOutError:
pass
# Fallback to CLI
for match in self.rx_line.finditer(self.cli("display arp", cached=True)):
mac = match.group("mac")
if mac.lower() == "incomplete":
r.append({
"ip": match.group("ip"),
"mac": None,
"interface": None
})
else:
iface = match.group("interface")
iface = iface.replace('GE', 'Gi ')
iface = iface.replace('BAGG', 'Po ')
r.append({
"ip": match.group("ip"),
"mac": match.group("mac"),
"interface": iface
})
return r
# -*- coding: utf-8 -*-
##----------------------------------------------------------------------
## 3Com.4500.get_chassis_id
##----------------------------------------------------------------------
## Copyright (C) 2007-2016 The NOC Project
## See LICENSE for details
##----------------------------------------------------------------------
# Python modules
import re
# NOC modules
from noc.core.script.base import BaseScript
from noc.sa.interfaces.igetchassisid import IGetChassisID
from noc.core.mac import MAC
class Script(BaseScript):
name = "3Com.4500.get_chassis_id"
cache = True
interface = IGetChassisID
rx_mac = re.compile( r"^\s+First mac address\s+:\s+(?P<mac>\S+)$",
re.MULTILINE)
def execute(self):
# Try SNMP first
if self.has_snmp():
try:
macs = []
for v in self.snmp.get_tables(
["1.3.6.1.2.1.2.2.1.6"], bulk=True):
macs += [v[1]]
macs = [x for x in macs if x != '\x00\x00\x00\x00\x00\x00']
return {
"first_chassis_mac": min(macs),
"last_chassis_mac": max(macs)
}
return mac
except self.snmp.TimeOutError:
pass
# Fallback to CLI
match = self.rx_mac.search(self.cli("display device manuinfo", cached=True))
mac = match.group("mac")
return {
"first_chassis_mac": mac,
"last_chassis_mac": mac
}
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------
# 3Com.4500.get_config
# ----------------------------------------------------------------------
# Copyright (C) 2007-2013 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# NOC modules
from noc.core.script.base import BaseScript
from noc.sa.interfaces.igetconfig import IGetConfig
class Script(BaseScript):
name = "3Com.4500.get_config"
interface = IGetConfig
def execute(self):
config = self.cli("display current-configuration")
return self.cleaned_config(config)
# -*- coding: utf-8 -*-
##----------------------------------------------------------------------
## 3Com.4500.get_copper_tdr_diag
##----------------------------------------------------------------------
## Copyright (C) 2007-2013 The NOC Project
## See LICENSE for details
##----------------------------------------------------------------------
## Python modules
from __future__ import with_statement
import re
## NOC modules
from noc.core.script.base import BaseScript
from noc.sa.interfaces.igetcoppertdrdiag import IGetCopperTDRDiag
class Script(BaseScript):
name = "3Com.4500.get_copper_tdr_diag"
interface = IGetCopperTDRDiag
TIMEOUT = 600
rx_diag = re.compile(
r"^Cable status: (?P<status>\S+), (?P<length>\d+) metres")
variance = 100
def parce_pair(self, pair, status, distance=None):
pair = int(pair)
if status == "normal":
st = 'T'
elif status == "abnormal(open)":
st = 'O'
elif status == "abnormal(short)":
st = 'S'
elif status == "abnormal(unknown)":
st = 'N'
else:
raise self.NotSupportedError()
return {"pair": pair, "status": st, "distance_cm": int(distance),
"variance_cm": self.variance}
def execute(self, interface=None):
r = []
#with self.configure():
if interface is None:
interface_status = {}
diag = ''
for iface in self.scripts.get_interface_status():
diag = self.cli("interface %s" % iface['interface'].replace('Ge ', 'GigabitEthernet '))
diag = self.cli("virtual-cable-test")
match = self.rx_diag.search(diag)
if match:
status = match.group("status")
length = int(match.group("length")) * 100
pairs = []
for i in [1, 2, 3, 4]:
pairs.append(self.parce_pair(i, status, length))
r.append({
"interface": iface['interface'],
"pairs": pairs
})
else:
diag = self.cli("interface %s" % interface.replace('Ge ', 'GigabitEthernet '))
diag = self.cli("virtual-cable-test")
match = self.rx_diag.search(diag)
if match:
status = match.group("status")
length = int(match.group("length")) * 100
pairs = []
for i in [1, 2, 3, 4]:
pairs.append(self.parce_pair(i, status, length))
r.append({
"interface": interface,
"pairs": pairs
})
return r
# -*- coding: utf-8 -*-
##----------------------------------------------------------------------
## 3Com.4500.get_fqdn
##----------------------------------------------------------------------
## Copyright (C) 2007-2013 The NOC Project
## See LICENSE for details
##----------------------------------------------------------------------
## Python modules
import re
#import noc.sa.script
## NOC modules
from noc.core.script.base import BaseScript
from noc.sa.interfaces.igetfqdn import IGetFQDN
class Script(BaseScript):
name = "3Com.4500.get_fqdn"
interface = IGetFQDN
rx_hostname = re.compile(r"^\s*sysname (?P<hostname>\S+)$", re.MULTILINE)
rx_domain_name = re.compile(r"^domain (?P<domain>\S+)$", re.MULTILINE)
def execute(self):
fqdn = ''
v = self.cli("display current-configuration")
match = self.rx_hostname.search(v)
if match:
fqdn = match.group("hostname")
match = self.rx_domain_name.search(v)
if match:
fqdn = fqdn + '.' + match.group("domain")
return fqdn
# -*- coding: utf-8 -*-
##----------------------------------------------------------------------
## 3Com.4500.get_interface_status
##----------------------------------------------------------------------
## Copyright (C) 2007-2013 The NOC Project
## See LICENSE for details
##----------------------------------------------------------------------
## Python modules
import re
## NOC modules
from noc.core.script.base import BaseScript
from noc.sa.interfaces.igetinterfacestatus import IGetInterfaceStatus
class Script(BaseScript):
name = "3Com.4500.get_interface_status"
interface = IGetInterfaceStatus
rx_interface_status = re.compile(
r"^(?P<interface>\S+)\s+(?P<status>UP|DOWN)\s+\S+\s+\S+\s+\S+\s+\d+\s*$",
re.MULTILINE)
def execute(self, interface=None):
r = []
# Try SNMP first
"""
if self.has_snmp():
try:
for n, s in self.snmp.join_tables("1.3.6.1.2.1.31.1.1.1.1",
"1.3.6.1.2.1.2.2.1.8", bulk=True): # IF-MIB
if 'Ethernet' in n:
if interface:
if n == interface.replace('Gi ', 'GigabitEthernet'):
r.append({
"interface": n,
"status": int(s) == 1
})
else:
r.append({
"interface": n,
"status": int(s) == 1
})
return r
except self.snmp.TimeOutError:
pass
"""
# Fallback to CLI
if interface:
cmd = "display interface %s" % interface
else:
cmd = "display interface"
for match in self.rx_interface_status.finditer(self.cli(cmd)):
r.append({
"interface": match.group("interface"),
"status": match.group("status") == "UP"
})
if not r:
if interface:
cmd = "display brief interface %s" % interface
else:
cmd = "display brief interface"
for match in self.rx_interface_status.finditer(self.cli(cmd)):
r.append({
"interface": match.group("interface"),
"status": match.group("status") == "UP"
})
return r
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------
# 3Com.4500.get_interfaces
# ----------------------------------------------------------------------
# Copyright (C) 2007-2016 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
import re
from collections import defaultdict
# NOC modules
from noc.sa.profiles.Generic.get_interfaces import Script as BaseScript
from noc.sa.interfaces.igetinterfaces import IGetInterfaces
"""
# Python modules
import re
from collections import defaultdict
# NOC modules
from noc.core.script.base import BaseScript
from noc.sa.interfaces.igetinterfaces import IGetInterfaces
#from noc.core.ip import IPv4
#from noc.lib.validators import is_int
#from noc.sa.profiles.3com.4500.get_portchannel import Script as my_get_portchannel
"""
class Script(BaseScript):
name = "3com.4500.get_interfaces"
interface = IGetInterfaces
rx_sh_svi = re.compile(
r"^\s*(?P<interface>\S+) current state : ?(?P<admin_status>(UP|DOWN|ADMINISTRATIVELY DOWN))\s*.IP Sending Frames' Format is \S+, Hardware address is (?P<mac>\S+).Description: (?P<description>(\S+ \S+ \S+ \S+|\S+ \S+ \S+|\S+ \S+|\S+)).Line protocol current state :(?P<oper_status>\S+).Internet Address is (?P<ip>\S+)\/(?P<mask>\d+)( Primary|, acquired via DHCP).The Maximum Transmit Unit is \d+",
re.DOTALL | re.MULTILINE)
rx_iface = re.compile(
r"^\s*(?P<iface>((\S+|)Ethernet|\S+Aggregation)\S+) current state :\s+(?P<status>(UP|DOWN|ADMINISTRATIVELY DOWN))\s*$")
rx_mac = re.compile(
r"^\s*IP Sending Frames' Format is \S+, Hardware address is (?P<mac>\S+)$")
rx_description = re.compile(
r"^\s*Description:\s+(?P<description>.+)$")
rx_type = re.compile(r"^\s*Port link-type: (?P<type>\S+)$")
rx_tagged = re.compile(r"^\s*VLAN passing : (?P<tagged>.+)$")
rx_tag = re.compile(r"^\s*Tagged\s+VLAN ID :\s+(?P<tagged>.+)$")
rx_untag = re.compile(r"^\s*Untagged\s+VLAN ID :\s+(?P<untagged>.+)$")
types = {
"Et": "physical", # Ethernet
"Gi": "physical", # GigabitEthernet
"Po": "aggregated", # Aggregated
}
def get_ospfint(self):
ospfs = []
return ospfs
def get_ripint(self):
rip = []
return rip
def get_bgpint(self):
bgp = []
return bgp
def execute(self):
interfaces = []
mac = ""
# Get portchannels
portchannel_members = {}
#portchannel = self.scripts.get_portchannel()
#portchannel = self.scripts.my_get_portchannel()
#portchannel = []
for pc in self.scripts.get_portchannel():
i = pc["interface"]
t = pc["type"] == "L"
for m in pc["members"]:
portchannel_members[m] = (i, t)
# Get L3 interfaces
# TODO Get router interfaces
ospfs = self.get_ospfint()
rips = self.get_ripint()
bgps = self.get_bgpint()
ifaces = self.cli("display interface").strip(' ')
for match in self.rx_sh_svi.finditer(ifaces):
description = match.group("description")
if not description:
description = ''
ifname = match.group("interface")
ip = match.group("ip")
netmask = match.group("mask")
enabled_afi = []
if ":" in ip:
ip_interfaces = "ipv6_addresses"
ip_ver = "is_ipv6"
enabled_afi += ["IPv6"]
ip = ip + '/' + netmask
ip_list = [ip]
else:
ip_interfaces = "ipv4_addresses"
ip_ver = "is_ipv4"
enabled_afi += ["IPv4"]
ip = ip + '/' + netmask
ip_list = [ip]
vlan = ifname[14:]
a_stat = match.group("admin_status").lower() == "up"
o_stat = match.group("oper_status").lower() == "up"
mac = match.group("mac")
iface = {
"name": ifname,
"type": "SVI",
"admin_status": a_stat,
"oper_status": o_stat,
"mac": mac,
"description": description,
"subinterfaces": [{
"name": ifname,
"description": description,
"admin_status": a_stat,
"oper_status": o_stat,
ip_ver: True,
"enabled_afi": enabled_afi,
ip_interfaces: ip_list,
"mac": mac,
"vlan_ids": self.expand_rangelist(vlan),
}]
}
interfaces += [iface]
# Get L2 interfaces
ifaces = ifaces.splitlines()
for i in range(len(ifaces)):
match = self.rx_iface.search(ifaces[i])
if match:
ifname = match.group("iface")
if ifname[:18] == 'Bridge-Aggregation':
ifname = 'Po ' + ifname.split('Bridge-Aggregation')[1]
else:
ifname = ifname.replace('Ethernet', 'Et ') #ifname = ifname.replace('GigabitEthernet', 'Gi ')
o_stat = match.group("status") == 'UP'
a_stat = match.group("status") != 'ADMINISTRATIVELY DOWN'
i += 1
match = self.rx_mac.search(ifaces[i])
if match:
mac = match.group("mac")
i += 1
match = self.rx_description.search(ifaces[i])
if match:
description = match.group("description")
else:
description = ''
iface = {
"name": ifname,
"type": self.types[ifname[:2]],
"admin_status": a_stat,
"oper_status": o_stat,
"mac": mac,
"subinterfaces": [{
"name": ifname,
"admin_status": a_stat,
"oper_status": o_stat,
"mac": mac,
# "snmp_ifindex": self.scripts.get_ifindex(interface=ifname)
# "snmp_ifindex": ifname.split('/')[2]
}]
}
# Portchannel member
if ifname in portchannel_members:
ai, is_lacp = portchannel_members[ifname]
iface["aggregated_interface"] = ai
if is_lacp:
iface["enabled_protocols"] = ["LACP"]
else:
iface["description"] = description
iface["subinterfaces"][0]["description"] = description
iface["subinterfaces"][0]["is_bridge"] = True
iface["subinterfaces"][0]["enabled_afi"] = ["BRIDGE"]
while not self.rx_type.search(ifaces[i]):
i += 1
match = self.rx_type.search(ifaces[i])
vtype = match.group("type")
i += 1
if vtype == "trunk":
match = self.rx_tagged.search(ifaces[i])
tagged = match.group("tagged")
tagged = tagged.replace('(default vlan)', '')
tagged_ = []
for j in self.expand_rangelist(tagged):
tagged_.append(int(j))
iface["subinterfaces"][0]["tagged_vlans"] = tagged_
else:
match = self.rx_tag.search(ifaces[i])
tagged = match.group("tagged")
try:
if 'none' not in tagged:
iface["subinterfaces"][0]["tagged_vlans"] = self.expand_rangelist(tagged)
except:
continue
i += 1
match = self.rx_untag.search(ifaces[i])
untagged = match.group("untagged")
if untagged is not 'none':
iface["subinterfaces"][0]["untagged_vlan"] = int(untagged)
interfaces += [iface]
return [{"interfaces": interfaces}]
# -*- coding: utf-8 -*-
##----------------------------------------------------------------------
## 3Com.4500.get_lldp_neighbors
##----------------------------------------------------------------------
## Copyright (C) 2007-2013 The NOC Project
## See LICENSE for details
##----------------------------------------------------------------------
# Python modules
import re
# NOC modules
from noc.core.script.base import BaseScript
from noc.sa.interfaces.igetlldpneighbors import IGetLLDPNeighbors
from noc.lib.validators import is_int, is_ipv4, is_ipv6, is_mac
from noc.sa.interfaces.base import MACAddressParameter
class Script(BaseScript):
name = "3Com.4500.get_lldp_neighbors"
interface = IGetLLDPNeighbors
rx_line = re.compile(
r"^\s+LLDP\sneighbor-information\sof\sport\s\d+\[(?P<interface>\S+)\]:\s+"