Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Vladimir
noc
Commits
6da70853
Commit
6da70853
authored
Nov 19, 2020
by
Andrey Vertiprahov
Browse files
Merge branch 'cherry-pick-
8b671348
' into 'release-20.4'
release-20.4:Backport!4634 See merge request
noc/noc!4696
parents
79f537b0
55ebfd15
Changes
9
Hide whitespace changes
Inline
Side-by-side
sa/profiles/HP/Comware/get_chassis_id.py
View file @
6da70853
...
...
@@ -5,10 +5,12 @@
# See LICENSE for details
# ---------------------------------------------------------------------
# Python modules
import
re
from
noc.core.script.base
import
BaseScript
# NOC modules
from
noc.sa.profiles.Generic.get_chassis_id
import
Script
as
BaseScript
from
noc.sa.interfaces.igetchassisid
import
IGetChassisID
import
re
class
Script
(
BaseScript
):
...
...
@@ -18,7 +20,7 @@ class Script(BaseScript):
rx_id
=
re
.
compile
(
r
"^\s*MAC_ADDRESS\s+:\s+(?P<id>\S+)"
,
re
.
IGNORECASE
|
re
.
MULTILINE
)
def
execute
(
self
):
def
execute
_cli
(
self
,
**
kwargs
):
match
=
self
.
re_search
(
self
.
rx_id
,
self
.
cli
(
"display device manuinfo"
,
cached
=
True
))
mac
=
match
.
group
(
"id"
)
return
{
"first_chassis_mac"
:
mac
,
"last_chassis_mac"
:
mac
}
sa/profiles/HP/Comware/get_config.py
View file @
6da70853
...
...
@@ -15,7 +15,8 @@ class Script(BaseScript):
name
=
"HP.Comware.get_config"
interface
=
IGetConfig
def
execute_cli
(
self
,
**
kwargs
):
def
execute_cli
(
self
,
policy
=
"r"
):
assert
policy
in
(
"r"
,
"s"
)
self
.
cli
(
"undo terminal monitor"
)
config
=
self
.
cli
(
"display current-configuration"
)
config
=
self
.
profile
.
clean_spaces
(
config
)
...
...
sa/profiles/HP/Comware/get_fqdn.py
0 → 100644
View file @
6da70853
# ---------------------------------------------------------------------
# HP.Comware.get_fqdn
# ---------------------------------------------------------------------
# Copyright (C) 2007-2020 The NOC Project
# See LICENSE for details
# ---------------------------------------------------------------------
# NOC modules
from
noc.sa.profiles.Generic.get_fqdn
import
Script
as
BaseScript
from
noc.sa.interfaces.igetfqdn
import
IGetFQDN
class
Script
(
BaseScript
):
name
=
"HP.Comware.get_fqdn"
interface
=
IGetFQDN
always_prefer
=
"S"
sa/profiles/HP/Comware/get_interfaces.py
View file @
6da70853
...
...
@@ -11,19 +11,13 @@ import re
# NOC modules
from
noc.core.script.base
import
BaseScript
from
noc.sa.interfaces.igetinterfaces
import
IGetInterfaces
from
noc.core.text
import
parse_kv
class
Script
(
BaseScript
):
name
=
"HP.Comware.get_interfaces"
interface
=
IGetInterfaces
rx_sh_int
=
re
.
compile
(
r
"^\s*(?P<interface>\S+) current state\s*:\s*(?P<oper_status>UP|DOWN|DOWN \( Administratively \))\s*\n"
r
"(^\s*Line protocol current state\s*:\s*(?P<line_status>UP|UP \(spoofing\)|DOWN|DOWN \( Administratively \))\s*\n)?"
r
"(^\s*IP (?:Packet Frame Type:|Sending Frames\' Format is) PKTFMT_ETHNT_2, Hardware Address(?: is|:) (?P<mac>\S+)\s*\n)?"
r
"^\s*Description\s*:(?P<descr>[^\n]*)\n"
,
re
.
MULTILINE
|
re
.
IGNORECASE
|
re
.
DOTALL
,
)
rx_mtu
=
re
.
compile
(
r
"The Maximum Frame Length is (?P<mtu>\d+)"
)
rx_port_type
=
re
.
compile
(
r
"Port link-type: (?P<port_type>hybrid|access|trunk)"
)
rx_port_other
=
re
.
compile
(
...
...
@@ -39,194 +33,158 @@ class Script(BaseScript):
rx_ip
=
re
.
compile
(
r
"Internet Address is (?P<ip>\S+) Primary"
)
rx_ips
=
re
.
compile
(
r
"Internet Address is (?P<ip>\S+) Sub"
)
rx_mac
=
re
.
compile
(
r
"IP (?:Packet Frame Type:|Sending Frames' Format is) PKTFMT_ETHNT_2, Hardware Address(?: is|:) (?P<mac>\S+)"
)
rx_sh_vlan
=
re
.
compile
(
r
"^(?P<interface>\S+) current state\s*:\s*(?P<oper_status>UP|DOWN|DOWN \( Administratively \))\s*\n"
r
"^Line protocol current state\s*:\s*(?P<line_status>UP|UP \(spoofing\)|DOWN|DOWN \( Administratively \))\s*\n"
r
"(^IP (?:Packet Frame Type:|Sending Frames\' Format is) PKTFMT_ETHNT_2, Hardware address(?: is|:) (?P<mac>\S+)\s*\n)?"
r
"(^Internet Address is (?P<ip>\S+) Primary\s*\n)?"
r
"(^Internet protocol processing\s*:\s*\S+\s*\n)?"
r
"^Description\s*:(?P<descr>.*?)\n"
r
"^The Maximum Transmit Unit is (?P<mtu>\d+)"
,
re
.
MULTILINE
,
r
"IP (?:Packet Frame Type:|Sending Frames' Format is)\s*PKTFMT_ETHNT_2, Hardware Address(?: is|:) (?P<mac>\S+)"
)
rx_name
=
re
.
compile
(
r
"^Vlan-interface(?P<vlan>\d+)?"
)
rx_vlan_name
=
re
.
compile
(
r
"^Vlan-interface(?P<vlan>\d+)?"
)
rx_isis
=
re
.
compile
(
r
"Interface:\s+(?P<iface>\S+)"
)
rx_sub_default_vlan
=
re
.
compile
(
r
"\(default vlan\),?"
)
rx_parse_interface_vlan
=
re
.
compile
(
r
"(\d+)(?:\(.+\))?"
)
rx_iface_block_splitter
=
re
.
compile
(
r
"^\s*\S+\d+(?:/\d+)+"
,
re
.
MULTILINE
)
def
execute
(
self
):
isis
=
[]
def
get_isis_interfaces
(
self
):
r
=
[]
try
:
v
=
self
.
cli
(
"display isis interface"
)
for
match
in
self
.
rx_isis
.
finditer
(
v
):
isis
+=
[
match
.
group
(
"iface"
)]
r
+=
[
match
.
group
(
"iface"
)]
except
self
.
CLISyntaxError
:
pass
return
r
interface_map
=
{
"current state"
:
"oper_status"
,
"line protocol state"
:
"line_status"
,
"description"
:
"description"
,
"maximum transmit unit"
:
"mtu"
,
"port link-type"
:
"port_type"
,
"pvid"
:
"pvid"
,
"untagged vlan id"
:
"untagged_vlan"
,
"untagged vlan"
:
"untagged_vlan"
,
"tagged vlan id"
:
"tagged_vlans"
,
"vlan passing"
:
"vlan_passing"
,
"vlan permitted"
:
"vlan_permitted"
,
}
def
parse_interface_block
(
self
,
block
):
r
=
parse_kv
(
self
.
interface_map
,
block
)
if
"mtu"
not
in
r
and
self
.
rx_mtu
.
search
(
block
):
r
[
"mtu"
]
=
self
.
rx_mtu
.
search
(
block
).
group
(
1
)
if
self
.
rx_mac
.
search
(
block
):
r
[
"mac"
]
=
self
.
rx_mac
.
search
(
block
).
group
(
1
)
ip_match
=
self
.
rx_ip
.
search
(
block
)
if
ip_match
:
r
[
"ip"
]
=
ip_match
.
group
(
1
)
if
"tagged_vlans"
in
r
and
self
.
rx_sub_default_vlan
.
search
(
r
[
"tagged_vlans"
]):
r
[
"tagged_vlans"
]
=
r
[
"tagged_vlans"
].
replace
(
"(default vlan)"
,
""
)
if
"vlan_passing"
in
r
and
self
.
rx_sub_default_vlan
.
search
(
r
[
"vlan_passing"
]):
r
[
"vlan_passing"
]
=
r
[
"vlan_passing"
].
replace
(
"(default vlan)"
,
""
)
if
"vlan_permitted"
in
r
and
self
.
rx_sub_default_vlan
.
search
(
r
[
"vlan_permitted"
]):
r
[
"vlan_permitted"
]
=
r
[
"vlan_permitted"
].
replace
(
"(default vlan)"
,
""
)
if
"untagged_vlan"
not
in
r
and
"pvid"
in
r
:
r
[
"untagged_vlan"
]
=
r
[
"pvid"
]
if
"untagged_vlan"
in
r
:
r
[
"untagged_vlan"
]
=
self
.
rx_parse_interface_vlan
.
match
(
r
[
"untagged_vlan"
]).
group
(
1
)
return
r
def
iter_block
(
self
,
v
):
for
b
in
v
.
split
(
"
\n\n
"
):
start
=
0
for
match
in
self
.
rx_iface_block_splitter
.
finditer
(
b
):
"""
Fixed
0 aborts, 0 deferred, 0 collisions, 0 late collisions
- lost carrier, - no carrier
GigabitEthernet1/3
Current state: UP
"""
if
match
.
start
()
>
0
:
start
=
match
.
start
()
yield
b
[:
start
]
yield
b
[
start
:]
if
start
:
continue
else
:
yield
b
def
execute_cli
(
self
,
**
kwargs
):
isis
=
self
.
get_isis_interfaces
()
# Get portchannels
portchannel_members
=
{}
for
pc
in
self
.
scripts
.
get_portchannel
():
i
=
pc
[
"interface"
]
t
=
pc
[
"type"
]
==
"L"
for
m
in
pc
[
"members"
]:
portchannel_members
[
m
]
=
(
i
,
t
)
interfaces
=
[]
v
=
self
.
cli
(
"display interface"
).
split
(
"
\n\n
"
)
for
i
in
v
:
match
=
self
.
rx_sh_int
.
search
(
i
)
if
not
match
:
interfaces
=
{}
v
=
self
.
cli
(
"display interface"
)
# "display interface Vlan-interface"
# "display interface NULL"
for
block
in
self
.
iter_block
(
v
):
if
not
block
:
continue
ifname
=
match
.
group
(
"interface"
)
if
ifname
.
startswith
(
"Bridge-Aggregation"
)
or
ifname
.
startswith
(
"Route-Aggregation"
):
iftype
=
"aggregated"
elif
ifname
.
startswith
(
"LoopBack"
):
iftype
=
"loopback"
elif
ifname
.
startswith
(
"Vlan-interface"
):
ifname
,
block
=
block
.
split
(
None
,
1
)
# print(ifname, "\n\n", block, "\n\n")
if
ifname
in
interfaces
:
continue
elif
ifname
.
startswith
(
"NULL"
):
r
=
self
.
parse_interface_block
(
block
)
if
not
r
:
continue
iftype
=
self
.
profile
.
get_interface_type
(
ifname
)
self
.
logger
.
info
(
"Process interface: %s"
,
ifname
)
o_status
=
r
.
get
(
"oper_status"
,
""
).
lower
()
==
"up"
a_status
=
False
if
"Administratively"
in
r
.
get
(
"oper_status"
,
""
)
else
True
name
=
ifname
vlan_ids
=
0
if
"."
in
ifname
:
ifname
,
vlan_ids
=
ifname
.
split
(
"."
,
1
)
else
:
iftype
=
"physical"
o_stat
=
match
.
group
(
"oper_status"
).
lower
()
==
"up"
if
match
.
group
(
"oper_status"
)
==
r
"DOWN ( Administratively\)"
:
a_stat
=
False
else
:
a_stat
=
True
iface
=
{
"name"
:
ifname
,
"type"
:
iftype
,
"admin_status"
:
a_stat
,
"oper_status"
:
o_stat
,
"enabled_protocols"
:
[],
"subinterfaces"
:
[],
}
interfaces
[
ifname
]
=
{
"name"
:
ifname
,
"type"
:
iftype
,
"admin_status"
:
a_status
,
"oper_status"
:
o_status
,
"enabled_protocols"
:
[],
"subinterfaces"
:
[],
}
if
"description"
in
r
:
interfaces
[
ifname
][
"description"
]
=
r
[
"description"
]
if
"mac"
in
r
:
interfaces
[
ifname
][
"mac"
]
=
r
[
"mac"
]
if
ifname
in
portchannel_members
:
ai
,
is_lacp
=
portchannel_members
[
ifname
]
interfaces
[
ifname
][
"aggregated_interface"
]
=
ai
interfaces
[
ifname
][
"enabled_protocols"
]
+=
[
"LACP"
]
if
self
.
rx_vlan_name
.
match
(
ifname
):
vlan_ids
=
self
.
rx_vlan_name
.
match
(
name
).
group
(
1
)
sub
=
{
"name"
:
ifname
,
"admin_status"
:
a_stat
,
"oper_status"
:
o_stat
,
"enabled_afi"
:
[
"BRIDGE"
],
"name"
:
name
,
"admin_status"
:
a_status
,
"oper_status"
:
o_status
,
"enabled_protocols"
:
[],
"enabled_afi"
:
[],
}
if
ifname
in
isis
:
sub
[
"enabled_protocols"
]
+=
[
"ISIS"
]
if
match
.
group
(
"mac"
):
iface
[
"mac"
]
=
match
.
group
(
"mac"
)
sub
[
"mac"
]
=
match
.
group
(
"mac"
)
if
match
.
group
(
"descr"
):
iface
[
"description"
]
=
match
.
group
(
"descr"
).
strip
()
sub
[
"description"
]
=
match
.
group
(
"descr"
).
strip
()
match1
=
self
.
rx_mtu
.
search
(
i
)
if
match1
:
sub
[
"mtu"
]
=
int
(
match1
.
group
(
"mtu"
))
match1
=
self
.
rx_ip
.
search
(
i
)
if
match1
:
if
"mac"
in
r
:
sub
[
"mac"
]
=
r
[
"mac"
]
if
"ip"
in
r
:
sub
[
"enabled_afi"
]
+=
[
"IPv4"
]
sub
[
"ipv4_addresses"
]
=
[
match1
.
group
(
"ip"
)]
match1
=
self
.
rx_port_type
.
search
(
i
)
if
match1
:
port_type
=
match1
.
group
(
"port_type"
)
if
port_type
in
[
"access"
,
"hybrid"
]:
match2
=
self
.
rx_port_other
.
search
(
i
)
if
match2
.
group
(
"untagged"
)
and
match2
.
group
(
"untagged"
)
!=
"none"
:
sub
[
"untagged_vlan"
]
=
int
(
match2
.
group
(
"untagged"
))
if
match2
.
group
(
"tagged"
)
and
match2
.
group
(
"tagged"
)
!=
"none"
:
sub
[
"tagged_vlan"
]
=
self
.
expand_rangelist
(
match2
.
group
(
"tagged"
))
if
port_type
==
"trunk"
:
match2
=
self
.
rx_port_trunk
.
search
(
i
)
if
match2
.
group
(
"passing"
)
and
match2
.
group
(
"passing"
)
!=
"none"
:
passing
=
re
.
sub
(
self
.
rx_sub_default_vlan
,
""
,
match2
.
group
(
"passing"
))
sub
[
"tagged_vlan"
]
=
self
.
expand_rangelist
(
passing
)
iface
[
"subinterfaces"
]
+=
[
sub
]
if
ifname
in
portchannel_members
:
ai
,
is_lacp
=
portchannel_members
[
ifname
]
iface
[
"aggregated_interface"
]
=
ai
iface
[
"enabled_protocols"
]
+=
[
"LACP"
]
interfaces
+=
[
iface
]
v
=
self
.
cli
(
"display interface Vlan-interface"
).
split
(
"
\n\n
"
)
for
i
in
v
:
match
=
self
.
rx_sh_vlan
.
search
(
i
)
if
not
match
:
continue
ifname
=
match
.
group
(
"interface"
)
o_stat
=
match
.
group
(
"oper_status"
).
lower
()
==
"up"
if
match
.
group
(
"oper_status"
)
==
r
"DOWN ( Administratively\)"
:
a_stat
=
False
else
:
a_stat
=
True
iface
=
{
"name"
:
ifname
,
"type"
:
"SVI"
,
"admin_status"
:
a_stat
,
"oper_status"
:
o_stat
,
"enabled_protocols"
:
[],
"subinterfaces"
:
[],
}
sub
=
{
"name"
:
ifname
,
"admin_status"
:
a_stat
,
"oper_status"
:
o_stat
,
"enabled_afi"
:
[],
}
if
match
.
group
(
"descr"
):
iface
[
"description"
]
=
match
.
group
(
"descr"
).
strip
()
sub
[
"description"
]
=
match
.
group
(
"descr"
).
strip
()
if
match
.
group
(
"mtu"
):
sub
[
"mtu"
]
=
int
(
match
.
group
(
"mtu"
))
match1
=
self
.
rx_mac
.
search
(
i
)
if
match1
:
iface
[
"mac"
]
=
match1
.
group
(
"mac"
)
sub
[
"mac"
]
=
match1
.
group
(
"mac"
)
match1
=
self
.
rx_ip
.
search
(
i
)
if
match1
:
sub
[
"enabled_afi"
]
+=
[
"IPv4"
]
sub
[
"ipv4_addresses"
]
=
[
match1
.
group
(
"ip"
)]
match1
=
self
.
rx_ips
.
search
(
i
)
if
match1
:
sub
[
"ipv4_addresses"
]
+=
[
match1
.
group
(
"ip"
)]
vlan
=
self
.
rx_name
.
search
(
ifname
).
group
(
"vlan"
)
sub
[
"vlan_ids"
]
=
[
vlan
]
iface
[
"subinterfaces"
]
+=
[
sub
]
interfaces
+=
[
iface
]
v
=
self
.
cli
(
"display interface NULL"
).
split
(
"
\n\n
"
)
for
i
in
v
:
match
=
self
.
rx_sh_vlan
.
search
(
i
)
if
not
match
:
continue
ifname
=
match
.
group
(
"interface"
)
o_stat
=
match
.
group
(
"oper_status"
).
lower
()
==
"up (spoofing)"
if
match
.
group
(
"oper_status"
)
==
r
"DOWN ( Administratively\)"
:
a_stat
=
False
else
:
a_stat
=
True
iface
=
{
"name"
:
ifname
,
"type"
:
"null"
,
"admin_status"
:
a_stat
,
"oper_status"
:
o_stat
,
"enabled_protocols"
:
[],
"subinterfaces"
:
[],
}
sub
=
{
"name"
:
ifname
,
"admin_status"
:
a_stat
,
"oper_status"
:
o_stat
,
"enabled_afi"
:
[],
}
if
match
.
group
(
"descr"
):
iface
[
"description"
]
=
match
.
group
(
"descr"
).
strip
()
sub
[
"description"
]
=
match
.
group
(
"descr"
).
strip
()
if
match
.
group
(
"mtu"
):
sub
[
"mtu"
]
=
int
(
match
.
group
(
"mtu"
))
match1
=
self
.
rx_mac
.
search
(
i
)
if
match1
:
iface
[
"mac"
]
=
match1
.
group
(
"mac"
)
sub
[
"mac"
]
=
match1
.
group
(
"mac"
)
match1
=
self
.
rx_ip
.
search
(
i
)
if
match1
:
sub
[
"enabled_afi"
]
+=
[
"IPv4"
]
sub
[
"ipv4_addresses"
]
=
[
match1
.
group
(
"ip"
)]
match1
=
self
.
rx_ips
.
search
(
i
)
if
match1
:
sub
[
"ipv4_addresses"
]
+=
[
match1
.
group
(
"ip"
)]
iface
[
"subinterfaces"
]
+=
[
sub
]
interfaces
+=
[
iface
]
return
[{
"interfaces"
:
interfaces
}]
sub
[
"ipv4_addresses"
]
=
[
r
[
"ip"
]]
if
vlan_ids
:
sub
[
"vlan_ids"
]
=
[
int
(
vlan_ids
)]
if
"port_type"
in
r
:
sub
[
"enabled_afi"
]
+=
[
"BRIDGE"
]
# Bridge interface
if
r
[
"port_type"
]
in
[
"access"
,
"hybrid"
]
and
"untagged_vlan"
in
r
:
sub
[
"untagged_vlan"
]
=
int
(
r
[
"untagged_vlan"
])
if
r
[
"port_type"
]
in
[
"access"
,
"hybrid"
]
and
"tagged_vlans"
in
r
:
sub
[
"tagged_vlans"
]
=
self
.
expand_rangelist
((
r
[
"tagged_vlans"
]))
if
r
[
"port_type"
]
==
"trunk"
and
"vlan_permitted"
in
r
:
sub
[
"tagged_vlans"
]
=
self
.
expand_rangelist
(
r
[
"vlan_permitted"
])
interfaces
[
ifname
][
"subinterfaces"
]
+=
[
sub
]
return
[{
"interfaces"
:
list
(
interfaces
.
values
())}]
sa/profiles/HP/Comware/get_lldp_neighbors.py
View file @
6da70853
...
...
@@ -10,78 +10,122 @@
import
re
# NOC modules
from
noc.
core.script.base
import
BaseScript
from
noc.
sa.profiles.Generic.get_lldp_neighbors
import
Script
as
BaseScript
from
noc.sa.interfaces.igetlldpneighbors
import
IGetLLDPNeighbors
from
noc.core.lldp
import
(
LLDP_CHASSIS_SUBTYPE_CHASSIS_COMPONENT
,
LLDP_CHASSIS_SUBTYPE_PORT_COMPONENT
,
LLDP_CHASSIS_SUBTYPE_INTERFACE_ALIAS
,
LLDP_CHASSIS_SUBTYPE_INTERFACE_NAME
,
LLDP_CHASSIS_SUBTYPE_NETWORK_ADDRESS
,
LLDP_CHASSIS_SUBTYPE_LOCAL
,
LLDP_CHASSIS_SUBTYPE_MAC
,
LLDP_PORT_SUBTYPE_ALIAS
,
LLDP_PORT_SUBTYPE_COMPONENT
,
LLDP_PORT_SUBTYPE_NAME
,
LLDP_PORT_SUBTYPE_MAC
,
LLDP_PORT_SUBTYPE_NETWORK_ADDRESS
,
LLDP_PORT_SUBTYPE_AGENT_CIRCUIT_ID
,
LLDP_PORT_SUBTYPE_LOCAL
,
LLDP_CAP_NAMES
,
lldp_caps_to_bits
,
)
from
noc.core.text
import
parse_kv
class
Script
(
BaseScript
):
name
=
"HP.Comware.get_lldp_neighbors"
interface
=
IGetLLDPNeighbors
rx_item
=
re
.
compile
(
r
"^LLDP neighbor-information of port \d+\[(?P<interface>\S+)\]\:\n"
r
"^ Neighbor index : \d+\n"
r
"^ Update time : .+?\n"
r
"^ Chassis type : (?P<chassis_type>.+?)\n"
r
"^ Chassis ID : (?P<chassis_id>\S+)\n"
r
"^ Port ID type : (?P<port_id_type>.+?)\n"
r
"^ Port ID : (?P<port_id>.+?)\n"
r
"^ Port description : (?P<port_descr>.+?)\n"
r
"^ System name : (?P<system_name>.+?)\n"
r
"(^ System description : (?P<system_descr>.+?)\n)?"
r
"(^ System capabilities supported : (?P<system_caps_s>.+?)\n)?"
,
re
.
MULTILINE
,
)
lldp_kv_map
=
{
"neighbor index"
:
"neighbor_index"
,
"lldp neighbor index"
:
"neighbor_index"
,
"update time"
:
"update_time"
,
"chassis type"
:
"remote_chassis_id_subtype"
,
"chassis id"
:
"remote_chassis_id"
,
"port id type"
:
"remote_port_subtype"
,
"port id"
:
"remote_port"
,
"port description"
:
"remote_port_description"
,
"system name"
:
"remote_system_name"
,
"system description"
:
"remote_system_description"
,
"system capabilities supported"
:
"remote_capabilities"
,
"capabilities"
:
"remote_capabilities"
,
"chassisid/subtype"
:
"remote_chassis_id_type"
,
"portid/subtype"
:
"remote_port_type"
,
}
def
execute
(
self
):
r
=
[]
rx_lldp_neighbor_split
=
re
.
compile
(
r
"LLDP neighbor-information of port"
)
rx_local_port
=
re
.
compile
(
r
"^\s*\d+\[(?P<interface>\S+)\]\:"
)
chassis_type_map
=
{
"chassis component"
:
LLDP_CHASSIS_SUBTYPE_CHASSIS_COMPONENT
,
"interface alias"
:
LLDP_CHASSIS_SUBTYPE_INTERFACE_ALIAS
,
"port component"
:
LLDP_CHASSIS_SUBTYPE_PORT_COMPONENT
,
"mac address"
:
LLDP_CHASSIS_SUBTYPE_MAC
,
"network address"
:
LLDP_CHASSIS_SUBTYPE_NETWORK_ADDRESS
,
"interface name"
:
LLDP_CHASSIS_SUBTYPE_INTERFACE_NAME
,
"local"
:
LLDP_CHASSIS_SUBTYPE_LOCAL
,
}
port_type_map
=
{
"interface alias"
:
LLDP_PORT_SUBTYPE_ALIAS
,
"port component"
:
LLDP_PORT_SUBTYPE_COMPONENT
,
"mac address"
:
LLDP_PORT_SUBTYPE_MAC
,
"network address"
:
LLDP_PORT_SUBTYPE_NETWORK_ADDRESS
,
"interface name"
:
LLDP_PORT_SUBTYPE_NAME
,
"agent circuit id"
:
LLDP_PORT_SUBTYPE_AGENT_CIRCUIT_ID
,
"locally assigned"
:
LLDP_PORT_SUBTYPE_LOCAL
,
}
def
execute_cli
(
self
,
**
kwargs
):
result
=
[]
try
:
v
=
self
.
cli
(
"display lldp neighbor-information"
)
except
self
.
CLISyntaxError
:
return
[]
for
match
in
self
.
rx_item
.
finditer
(
v
):
i
=
{
"local_interface"
:
match
.
group
(
"interface"
),
"neighbors"
:
[]}
for
nei
in
self
.
rx_lldp_neighbor_split
.
split
(
v
):
if
not
nei
.
strip
():
continue
match
=
self
.
rx_local_port
.
search
(
nei
)
if
not
match
:
self
.
logger
.
info
(
"Not find local port"
)
continue
n
=
{}
n
[
"remote_chassis_id_subtype"
]
=
{
"Chassis Component"
:
1
,
"Interface Alias"
:
2
,
"Port Component"
:
3
,
"MAC Address"
:
4
,
"Network Address"
:
5
,
"Interface Name"
:
6
,
"local"
:
7
,
}.
get
(
match
.
group
(
"chassis_type"
))
n
[
"remote_chassis_id"
]
=
match
.
group
(
"chassis_id"
).
strip
()
n
[
"remote_port_subtype"
]
=
{
"Interface Alias"
:
1
,
"Port Component"
:
2
,
"MAC Address"
:
3
,
"Network Address"
:
4
,
"Interface Name"
:
5
,
"Agent Circuit ID"
:
6
,
"Locally assigned"
:
7
,
}.
get
(
match
.
group
(
"port_id_type"
))
n
[
"remote_port"
]
=
match
.
group
(
"port_id"
).
strip
()
n
[
"remote_port_description"
]
=
match
.
group
(
"port_descr"
).
strip
()
n
[
"remote_system_name"
]
=
match
.
group
(
"system_name"
).
strip
()
if
match
.
group
(
"system_descr"
):
n
[
"remote_system_description"
]
=
match
.
group
(
"system_descr"
).
strip
()
if
match
.
group
(
"system_caps_s"
):
cap
=
0
for
c
in
match
.
group
(
"system_caps_s"
).
split
(
","
):
c
=
c
.
strip
()
if
c
:
cap
|=
{
"Other"
:
1
,
"Repeater"
:
2
,
"Bridge"
:
4
,
"WLAN Access Point"
:
8
,
"Router"
:
16
,
"TTelephone"
:
32
,
"DOCSIS Cable Device"
:
64
,
"Station Only"
:
128
,
}[
c
]
n
[
"remote_capabilities"
]
=
cap
i
[
"neighbors"
]
+=
[
n
]
r
+=
[
i
]
return
r
r
=
parse_kv
(
self
.
lldp_kv_map
,
nei
)
if
"remote_chassis_id_subtype"
in
r
:
n
[
"remote_chassis_id_subtype"
]
=
r
[
"remote_chassis_id_subtype"
]
if
"remote_chassis_id_type"
in
r
:
n
[
"remote_chassis_id"
],
n
[
"remote_chassis_id_subtype"
]
=
r
[
"remote_chassis_id_type"
].
rsplit
(
"/"
,
1
)
if
"remote_chassis_id"
in
r
:
n
[
"remote_chassis_id"
]
=
r
[
"remote_chassis_id"
]
if
"remote_chassis_id"
not
in
n
:
self
.
logger
.
warning
(
"[%s] Not found remote chassis id on output"
,
match
.
group
(
"interface"
)
)
continue
n
[
"remote_chassis_id_subtype"
]
=
self
.
chassis_type_map
[
n
[
"remote_chassis_id_subtype"
].
lower
()
]
# Port fields
if
"remote_port_subtype"
in
r
:
n
[
"remote_port_subtype"
]
=
r
[
"remote_port_subtype"
]
if
"remote_port_type"
in
r
:
n
[
"remote_port"
],
n
[
"remote_port_subtype"
]
=
r
[
"remote_port_type"
].
rsplit
(
"/"
,
1
)
if
"remote_port"
in
r
:
n
[
"remote_port"
]
=
r
[
"remote_port"
]
if
"remote_port"
not
in
n
:
self
.
logger
.
warning
(
"[%s] Not found port id on output"
,
match
.
group
(
"interface"
))
continue
n
[
"remote_port_subtype"
]
=
self
.
port_type_map
[
n
[
"remote_port_subtype"
].
lower
()]
if
"remote_system_description"
in
r
:
n
[
"remote_system_description"
]
=
r
[
"remote_system_description"
].
strip
()
if
"remote_capabilities"
in
r
:
n
[
"remote_capabilities"
]
=
lldp_caps_to_bits
(
[
x
.
strip
()
for
x
in
r
[
"remote_capabilities"
].
split
(
","
)],
{
LLDP_CAP_NAMES
[
x1
]:
x1
for
x1
in
LLDP_CAP_NAMES
},
)
result
+=
[{
"local_interface"
:
match
.
group
(
"interface"
),
"neighbors"
:
[
n
]}]
return
result
sa/profiles/HP/Comware/get_portchannel.py