commit dfa07a73319e3650d685f0a8c03dcd2a1ed72033 Author: Anders Knutsen Date: Wed Nov 26 14:35:06 2025 +0100 V1 diff --git a/status.py b/status.py new file mode 100644 index 0000000..eb591f8 --- /dev/null +++ b/status.py @@ -0,0 +1,296 @@ +#!/usr/bin/env python3 +import time +import re +import os +import sys +import socket +import pexpect + +# =========================================== +# CONFIG +# =========================================== +SWITCH_IP = "10.183.100.11" +TELNET_PORT = 23 +TELNET_TIMEOUT = 5 + +USERNAME = "" +PASSWORD = "Wprs000qq!" +ENABLE_PASSWORD = "Wprs000qq!" + +# -------------------------- +# HELPERS +# -------------------------- + +def clear_screen(): + """Best-effort clear that works in terminals & IDEs.""" + try: + sys.stdout.write("\033c") + sys.stdout.flush() + return + except Exception: + pass + + try: + os.system('cls' if os.name == 'nt' else 'clear') + return + except Exception: + pass + + print("\n" * 100) + + +def cisco_to_portnum(ifname: str) -> str: + """Convert Gi1/0/4 -> Port 4.""" + m = re.search(r"/(\d+)$", ifname) + return f"Port {m.group(1)}" if m else ifname + + +def speed_to_label(speed_field: str) -> str: + """Convert Cisco speeds to ! .""" + if not speed_field: + return "! unknown" + + s = speed_field.strip().lower() + mapping = { + "10": "10 Mbps", + "a-10": "10 Mbps", + "100": "100 Mbps", + "a-100": "100 Mbps", + "1000": "1 Gbps", + "a-1000": "1 Gbps", + "10000": "10 Gbps", + "a-10000": "10 Gbps", + } + if s in mapping: + return f"! {mapping[s]}" + if "auto" in s: + return "! auto" + return f"! {speed_field}" + + +# -------------------------- +# "TELNET" FUNCTIONS (NOW PEXPECT) +# -------------------------- + +def telnet_login(ip, port, username, password, enable_password): + """ + Attempt a Telnet login using pexpect — raise exception on failure. + Returns a pexpect.spawn object. + """ + # telnet ip port + child = pexpect.spawn(f"telnet {ip} {port}", timeout=TELNET_TIMEOUT, encoding="utf-8") + + # Handle different possible prompts + # We expect one of: Username:, Password:, >, # + idx = child.expect([r"[Uu]sername:", r"[Pp]assword:", r">", r"#", pexpect.TIMEOUT, pexpect.EOF]) + + if idx == 0: + # Got Username: + child.sendline(username) + child.expect(r"[Pp]assword:") + child.sendline(password) + # After this we should land at ">" or "#" + child.expect([r">", r"#"]) + elif idx == 1: + # Got Password: (no username) + child.sendline(password) + child.expect([r">", r"#"]) + elif idx in (2, 3): + # Already at > or # (no login) + pass + else: + # TIMEOUT or EOF + raise ConnectionError("Telnet login failed (no prompt / timeout)") + + # If we're at ">" we need to enter enable + if child.after.strip().endswith(">"): + child.sendline("enable") + child.expect(r"[Pp]assword:") + child.sendline(enable_password) + child.expect(r"#") + + # Disable paging + child.sendline("terminal length 0") + child.expect(r"#") + + return child + + +def send_command(child: pexpect.spawn, cmd: str) -> str: + """ + Send a command and return the body of the output + (without echoed command and trailing prompt). + """ + child.sendline(cmd) + child.expect(r"#") # wait for prompt + # child.before contains everything printed *before* the prompt + output = child.before + + # child.before also includes the command we sent on the first line + lines = output.splitlines() + if len(lines) >= 1: + # drop the first line (echoed command) + lines = lines[1:] + return "\n".join(lines) + + +# -------------------------- +# PARSERS +# -------------------------- + +def parse_show_interfaces_status(output: str): + interfaces = {} + lines = output.splitlines() + + # find header + header_idx = None # noqa + for i, line in enumerate(lines): + if line.strip().startswith("Port"): + header_idx = i + break + if header_idx is None: + return interfaces + + for line in lines[header_idx + 1:]: + if not line.strip(): + continue + parts = re.split(r"\s+", line) + if len(parts) < 7: + m = re.match(r"^(?P\S+)\s+(?P.+)$", line) + if not m: + continue + port = m.group("port") + rest_parts = re.split(r"\s+", m.group("rest")) + if len(rest_parts) < 5: + continue + type_ = rest_parts[-1] + speed = rest_parts[-2] + duplex = rest_parts[-3] + vlan = rest_parts[-4] + status = rest_parts[-5] + else: + port = parts[0] + status = parts[2] + vlan = parts[3] + duplex = parts[4] + speed = parts[5] + type_ = " ".join(parts[6:]) + + interfaces[port] = { + "status": status, + "vlan": vlan, + "duplex": duplex, + "speed": speed, + "type": type_, + } + return interfaces + + +def parse_show_power_inline(output: str): + poe = {} + lines = output.splitlines() + + header_idx = None + for i, line in enumerate(lines): + if line.strip().startswith("Interface"): + header_idx = i + break + if header_idx is None: + return poe + + for line in lines[header_idx + 1:]: + if not line.strip(): + continue + + parts = re.split(r"\s+", line) + if len(parts) < 4: + continue + + iface = parts[0] + admin = parts[1] + oper = parts[2] + power = parts[3] + + poe[iface] = {"admin": admin, "oper": oper, "power": power} + + return poe + + +# -------------------------- +# MAIN LOOP +# -------------------------- + +def main(): + while True: + try: + child = telnet_login(SWITCH_IP, TELNET_PORT, USERNAME, PASSWORD, ENABLE_PASSWORD) + except (ConnectionError, pexpect.exceptions.TIMEOUT, pexpect.exceptions.EOF, OSError, socket.error): + print("❌ Telnet/pexpect login failed — retrying in 10 seconds...") + time.sleep(10) + continue + + # Connected successfully + clear_screen() + print("✔ Telnet OK — gathering data...") + + try: + show_int_status = send_command(child, "show interfaces status") + show_poe = send_command(child, "show power inline") + except (pexpect.exceptions.TIMEOUT, pexpect.exceptions.EOF, OSError): + print("❌ Error while running commands — will retry in 10 seconds...") + try: + child.close(force=True) + except Exception: + pass + time.sleep(10) + continue + + # Try to exit cleanly + try: + child.sendline("exit") + child.close() + except Exception: + pass + + # parse + int_info = parse_show_interfaces_status(show_int_status) + poe_info = parse_show_power_inline(show_poe) + + # find connected ports + active_ports = [] + for port, info in int_info.items(): + if info["status"].lower() != "connected": + continue + + poe = poe_info.get(port, {}) + active_ports.append({ + "port": cisco_to_portnum(port), + "speed": speed_to_label(info["speed"]), + "duplex": info["duplex"], + "vlan": info["vlan"], + "poe_admin": poe.get("admin", "n/a"), + "poe_oper": poe.get("oper", "n/a"), + "poe_power_w": poe.get("power", "0.0"), + }) + + clear_screen() + print("Active ports with link + PoE info:\n") + for p in active_ports: + print( + f"{p['port']:>8} | VLAN {p['vlan']:>4} | " + f"{p['speed']:<12} | " + f"{p['duplex']:>7} | " + f"PoE enabled: {p['poe_admin']:<5} | " + f"PoE: {p['poe_oper']:<5} | " + f"Power: {p['poe_power_w']} W" + ) + + if not active_ports: + print("No active (connected) ports.") + + print("\n⏳ Waiting 10 seconds...\n") + time.sleep(10) + + +if __name__ == "__main__": + main()