This commit is contained in:
commit
dfa07a7331
|
|
@ -0,0 +1,296 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
import time
|
||||||
|
import re
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import socket
|
||||||
|
import pexpect
|
||||||
|
|
||||||
|
# ===========================================
|
||||||
|
# CONFIG
|
||||||
|
# ===========================================
|
||||||
|
SWITCH_IP = "10.183.100.11"
|
||||||
|
TELNET_PORT = 23
|
||||||
|
TELNET_TIMEOUT = 5
|
||||||
|
|
||||||
|
USERNAME = ""
|
||||||
|
PASSWORD = "Wprs000qq!"
|
||||||
|
ENABLE_PASSWORD = "Wprs000qq!"
|
||||||
|
|
||||||
|
# --------------------------
|
||||||
|
# HELPERS
|
||||||
|
# --------------------------
|
||||||
|
|
||||||
|
def clear_screen():
|
||||||
|
"""Best-effort clear that works in terminals & IDEs."""
|
||||||
|
try:
|
||||||
|
sys.stdout.write("\033c")
|
||||||
|
sys.stdout.flush()
|
||||||
|
return
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
os.system('cls' if os.name == 'nt' else 'clear')
|
||||||
|
return
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
print("\n" * 100)
|
||||||
|
|
||||||
|
|
||||||
|
def cisco_to_portnum(ifname: str) -> str:
|
||||||
|
"""Convert Gi1/0/4 -> Port 4."""
|
||||||
|
m = re.search(r"/(\d+)$", ifname)
|
||||||
|
return f"Port {m.group(1)}" if m else ifname
|
||||||
|
|
||||||
|
|
||||||
|
def speed_to_label(speed_field: str) -> str:
|
||||||
|
"""Convert Cisco speeds to ! <human text>."""
|
||||||
|
if not speed_field:
|
||||||
|
return "! unknown"
|
||||||
|
|
||||||
|
s = speed_field.strip().lower()
|
||||||
|
mapping = {
|
||||||
|
"10": "10 Mbps",
|
||||||
|
"a-10": "10 Mbps",
|
||||||
|
"100": "100 Mbps",
|
||||||
|
"a-100": "100 Mbps",
|
||||||
|
"1000": "1 Gbps",
|
||||||
|
"a-1000": "1 Gbps",
|
||||||
|
"10000": "10 Gbps",
|
||||||
|
"a-10000": "10 Gbps",
|
||||||
|
}
|
||||||
|
if s in mapping:
|
||||||
|
return f"! {mapping[s]}"
|
||||||
|
if "auto" in s:
|
||||||
|
return "! auto"
|
||||||
|
return f"! {speed_field}"
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------
|
||||||
|
# "TELNET" FUNCTIONS (NOW PEXPECT)
|
||||||
|
# --------------------------
|
||||||
|
|
||||||
|
def telnet_login(ip, port, username, password, enable_password):
|
||||||
|
"""
|
||||||
|
Attempt a Telnet login using pexpect — raise exception on failure.
|
||||||
|
Returns a pexpect.spawn object.
|
||||||
|
"""
|
||||||
|
# telnet ip port
|
||||||
|
child = pexpect.spawn(f"telnet {ip} {port}", timeout=TELNET_TIMEOUT, encoding="utf-8")
|
||||||
|
|
||||||
|
# Handle different possible prompts
|
||||||
|
# We expect one of: Username:, Password:, >, #
|
||||||
|
idx = child.expect([r"[Uu]sername:", r"[Pp]assword:", r">", r"#", pexpect.TIMEOUT, pexpect.EOF])
|
||||||
|
|
||||||
|
if idx == 0:
|
||||||
|
# Got Username:
|
||||||
|
child.sendline(username)
|
||||||
|
child.expect(r"[Pp]assword:")
|
||||||
|
child.sendline(password)
|
||||||
|
# After this we should land at ">" or "#"
|
||||||
|
child.expect([r">", r"#"])
|
||||||
|
elif idx == 1:
|
||||||
|
# Got Password: (no username)
|
||||||
|
child.sendline(password)
|
||||||
|
child.expect([r">", r"#"])
|
||||||
|
elif idx in (2, 3):
|
||||||
|
# Already at > or # (no login)
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
# TIMEOUT or EOF
|
||||||
|
raise ConnectionError("Telnet login failed (no prompt / timeout)")
|
||||||
|
|
||||||
|
# If we're at ">" we need to enter enable
|
||||||
|
if child.after.strip().endswith(">"):
|
||||||
|
child.sendline("enable")
|
||||||
|
child.expect(r"[Pp]assword:")
|
||||||
|
child.sendline(enable_password)
|
||||||
|
child.expect(r"#")
|
||||||
|
|
||||||
|
# Disable paging
|
||||||
|
child.sendline("terminal length 0")
|
||||||
|
child.expect(r"#")
|
||||||
|
|
||||||
|
return child
|
||||||
|
|
||||||
|
|
||||||
|
def send_command(child: pexpect.spawn, cmd: str) -> str:
|
||||||
|
"""
|
||||||
|
Send a command and return the body of the output
|
||||||
|
(without echoed command and trailing prompt).
|
||||||
|
"""
|
||||||
|
child.sendline(cmd)
|
||||||
|
child.expect(r"#") # wait for prompt
|
||||||
|
# child.before contains everything printed *before* the prompt
|
||||||
|
output = child.before
|
||||||
|
|
||||||
|
# child.before also includes the command we sent on the first line
|
||||||
|
lines = output.splitlines()
|
||||||
|
if len(lines) >= 1:
|
||||||
|
# drop the first line (echoed command)
|
||||||
|
lines = lines[1:]
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------
|
||||||
|
# PARSERS
|
||||||
|
# --------------------------
|
||||||
|
|
||||||
|
def parse_show_interfaces_status(output: str):
|
||||||
|
interfaces = {}
|
||||||
|
lines = output.splitlines()
|
||||||
|
|
||||||
|
# find header
|
||||||
|
header_idx = None # noqa
|
||||||
|
for i, line in enumerate(lines):
|
||||||
|
if line.strip().startswith("Port"):
|
||||||
|
header_idx = i
|
||||||
|
break
|
||||||
|
if header_idx is None:
|
||||||
|
return interfaces
|
||||||
|
|
||||||
|
for line in lines[header_idx + 1:]:
|
||||||
|
if not line.strip():
|
||||||
|
continue
|
||||||
|
parts = re.split(r"\s+", line)
|
||||||
|
if len(parts) < 7:
|
||||||
|
m = re.match(r"^(?P<port>\S+)\s+(?P<rest>.+)$", line)
|
||||||
|
if not m:
|
||||||
|
continue
|
||||||
|
port = m.group("port")
|
||||||
|
rest_parts = re.split(r"\s+", m.group("rest"))
|
||||||
|
if len(rest_parts) < 5:
|
||||||
|
continue
|
||||||
|
type_ = rest_parts[-1]
|
||||||
|
speed = rest_parts[-2]
|
||||||
|
duplex = rest_parts[-3]
|
||||||
|
vlan = rest_parts[-4]
|
||||||
|
status = rest_parts[-5]
|
||||||
|
else:
|
||||||
|
port = parts[0]
|
||||||
|
status = parts[2]
|
||||||
|
vlan = parts[3]
|
||||||
|
duplex = parts[4]
|
||||||
|
speed = parts[5]
|
||||||
|
type_ = " ".join(parts[6:])
|
||||||
|
|
||||||
|
interfaces[port] = {
|
||||||
|
"status": status,
|
||||||
|
"vlan": vlan,
|
||||||
|
"duplex": duplex,
|
||||||
|
"speed": speed,
|
||||||
|
"type": type_,
|
||||||
|
}
|
||||||
|
return interfaces
|
||||||
|
|
||||||
|
|
||||||
|
def parse_show_power_inline(output: str):
|
||||||
|
poe = {}
|
||||||
|
lines = output.splitlines()
|
||||||
|
|
||||||
|
header_idx = None
|
||||||
|
for i, line in enumerate(lines):
|
||||||
|
if line.strip().startswith("Interface"):
|
||||||
|
header_idx = i
|
||||||
|
break
|
||||||
|
if header_idx is None:
|
||||||
|
return poe
|
||||||
|
|
||||||
|
for line in lines[header_idx + 1:]:
|
||||||
|
if not line.strip():
|
||||||
|
continue
|
||||||
|
|
||||||
|
parts = re.split(r"\s+", line)
|
||||||
|
if len(parts) < 4:
|
||||||
|
continue
|
||||||
|
|
||||||
|
iface = parts[0]
|
||||||
|
admin = parts[1]
|
||||||
|
oper = parts[2]
|
||||||
|
power = parts[3]
|
||||||
|
|
||||||
|
poe[iface] = {"admin": admin, "oper": oper, "power": power}
|
||||||
|
|
||||||
|
return poe
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------
|
||||||
|
# MAIN LOOP
|
||||||
|
# --------------------------
|
||||||
|
|
||||||
|
def main():
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
child = telnet_login(SWITCH_IP, TELNET_PORT, USERNAME, PASSWORD, ENABLE_PASSWORD)
|
||||||
|
except (ConnectionError, pexpect.exceptions.TIMEOUT, pexpect.exceptions.EOF, OSError, socket.error):
|
||||||
|
print("❌ Telnet/pexpect login failed — retrying in 10 seconds...")
|
||||||
|
time.sleep(10)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Connected successfully
|
||||||
|
clear_screen()
|
||||||
|
print("✔ Telnet OK — gathering data...")
|
||||||
|
|
||||||
|
try:
|
||||||
|
show_int_status = send_command(child, "show interfaces status")
|
||||||
|
show_poe = send_command(child, "show power inline")
|
||||||
|
except (pexpect.exceptions.TIMEOUT, pexpect.exceptions.EOF, OSError):
|
||||||
|
print("❌ Error while running commands — will retry in 10 seconds...")
|
||||||
|
try:
|
||||||
|
child.close(force=True)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
time.sleep(10)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Try to exit cleanly
|
||||||
|
try:
|
||||||
|
child.sendline("exit")
|
||||||
|
child.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# parse
|
||||||
|
int_info = parse_show_interfaces_status(show_int_status)
|
||||||
|
poe_info = parse_show_power_inline(show_poe)
|
||||||
|
|
||||||
|
# find connected ports
|
||||||
|
active_ports = []
|
||||||
|
for port, info in int_info.items():
|
||||||
|
if info["status"].lower() != "connected":
|
||||||
|
continue
|
||||||
|
|
||||||
|
poe = poe_info.get(port, {})
|
||||||
|
active_ports.append({
|
||||||
|
"port": cisco_to_portnum(port),
|
||||||
|
"speed": speed_to_label(info["speed"]),
|
||||||
|
"duplex": info["duplex"],
|
||||||
|
"vlan": info["vlan"],
|
||||||
|
"poe_admin": poe.get("admin", "n/a"),
|
||||||
|
"poe_oper": poe.get("oper", "n/a"),
|
||||||
|
"poe_power_w": poe.get("power", "0.0"),
|
||||||
|
})
|
||||||
|
|
||||||
|
clear_screen()
|
||||||
|
print("Active ports with link + PoE info:\n")
|
||||||
|
for p in active_ports:
|
||||||
|
print(
|
||||||
|
f"{p['port']:>8} | VLAN {p['vlan']:>4} | "
|
||||||
|
f"{p['speed']:<12} | "
|
||||||
|
f"{p['duplex']:>7} | "
|
||||||
|
f"PoE enabled: {p['poe_admin']:<5} | "
|
||||||
|
f"PoE: {p['poe_oper']:<5} | "
|
||||||
|
f"Power: {p['poe_power_w']} W"
|
||||||
|
)
|
||||||
|
|
||||||
|
if not active_ports:
|
||||||
|
print("No active (connected) ports.")
|
||||||
|
|
||||||
|
print("\n⏳ Waiting 10 seconds...\n")
|
||||||
|
time.sleep(10)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Reference in New Issue