#!/usr/bin/env python3 import time import re import os import sys import socket import threading from datetime import datetime import tkinter as tk from tkinter import ttk, messagebox import pexpect # =========================================== # CONFIG # =========================================== SWITCH_IP = "10.183.100.11" TELNET_PORT = 23 TELNET_TIMEOUT = 5 # Tip: You can also set these via environment variables to avoid hardcoding: # export SWITCH_USER="..." # export SWITCH_PASS="..." # export SWITCH_ENABLE="..." USERNAME = "" PASSWORD = "Wprs000qq!" ENABLE_PASSWORD = "Wprs000qq!" REFRESH_SECONDS = 10 # =========================================== # HELPERS (your originals + small extras) # =========================================== def cisco_to_portnum(ifname: str) -> str: """Convert Gi1/0/4 -> Port 4.""" m = re.search(r"/(\d+)$", ifname) return f"Port {m.group(1)}" if m else ifname def speed_to_label(speed_field: str) -> str: """Convert Cisco speeds to ! .""" if not speed_field: return "! unknown" s = speed_field.strip().lower() mapping = { "10": "10 Mbps", "a-10": "10 Mbps", "100": "100 Mbps", "a-100": "100 Mbps", "1000": "1 Gbps", "a-1000": "1 Gbps", "10000": "10 Gbps", "a-10000": "10 Gbps", } if s in mapping: return f"{mapping[s]}" if "auto" in s: return "auto" return f"{speed_field}" def poe_power_to_float(power_str: str) -> float: """Best-effort convert PoE power like '15.4' to float for sorting.""" try: return float(power_str) except Exception: return 0.0 # =========================================== # TELNET (pexpect) FUNCTIONS (your originals) # =========================================== def telnet_login(ip, port, username, password, enable_password): """ Attempt a Telnet login using pexpect — raise exception on failure. Returns a pexpect.spawn object. """ child = pexpect.spawn(f"telnet {ip} {port}", timeout=TELNET_TIMEOUT, encoding="utf-8") idx = child.expect([r"[Uu]sername:", r"[Pp]assword:", r">", r"#", pexpect.TIMEOUT, pexpect.EOF]) if idx == 0: child.sendline(username) child.expect(r"[Pp]assword:") child.sendline(password) child.expect([r">", r"#"]) elif idx == 1: child.sendline(password) child.expect([r">", r"#"]) elif idx in (2, 3): pass else: raise ConnectionError("Telnet login failed (no prompt / timeout)") if child.after.strip().endswith(">"): child.sendline("enable") child.expect(r"[Pp]assword:") child.sendline(enable_password) child.expect(r"#") child.sendline("terminal length 0") child.expect(r"#") return child def send_command(child: pexpect.spawn, cmd: str) -> str: child.sendline(cmd) child.expect(r"#") output = child.before lines = output.splitlines() if len(lines) >= 1: lines = lines[1:] return "\n".join(lines) # =========================================== # PARSERS (your originals) # =========================================== def parse_show_interfaces_status(output: str): interfaces = {} lines = output.splitlines() header_idx = None for i, line in enumerate(lines): if line.strip().startswith("Port"): header_idx = i break if header_idx is None: return interfaces for line in lines[header_idx + 1:]: if not line.strip(): continue parts = re.split(r"\s+", line) if len(parts) < 7: m = re.match(r"^(?P\S+)\s+(?P.+)$", line) if not m: continue port = m.group("port") rest_parts = re.split(r"\s+", m.group("rest")) if len(rest_parts) < 5: continue type_ = rest_parts[-1] speed = rest_parts[-2] duplex = rest_parts[-3] vlan = rest_parts[-4] status = rest_parts[-5] else: port = parts[0] status = parts[2] vlan = parts[3] duplex = parts[4] speed = parts[5] type_ = " ".join(parts[6:]) interfaces[port] = { "status": status, "vlan": vlan, "duplex": duplex, "speed": speed, "type": type_, } return interfaces def parse_show_power_inline(output: str): poe = {} lines = output.splitlines() header_idx = None for i, line in enumerate(lines): if line.strip().startswith("Interface"): header_idx = i break if header_idx is None: return poe for line in lines[header_idx + 1:]: if not line.strip(): continue parts = re.split(r"\s+", line) if len(parts) < 4: continue iface = parts[0] admin = parts[1] oper = parts[2] power = parts[3] poe[iface] = {"admin": admin, "oper": oper, "power": power} return poe # =========================================== # DATA COLLECTION # =========================================== def collect_active_ports(): """ Returns: (active_ports_list, info_message) active_ports_list: list[dict] """ child = None try: child = telnet_login(SWITCH_IP, TELNET_PORT, USERNAME, PASSWORD, ENABLE_PASSWORD) show_int_status = send_command(child, "show interfaces status") show_poe = send_command(child, "show power inline") info = "OK" finally: try: if child is not None: child.sendline("exit") child.close() except Exception: pass int_info = parse_show_interfaces_status(show_int_status) poe_info = parse_show_power_inline(show_poe) active_ports = [] for port, iface in int_info.items(): if iface.get("status", "").lower() != "connected": continue poe = poe_info.get(port, {}) active_ports.append({ "port": cisco_to_portnum(port), "vlan": iface.get("vlan", ""), "speed": speed_to_label(iface.get("speed", "")), "duplex": iface.get("duplex", ""), "poe_admin": poe.get("admin", "n/a"), "poe_oper": poe.get("oper", "n/a"), "poe_power_w": poe.get("power", "0.0"), }) return active_ports, info # =========================================== # GUI # =========================================== class App(tk.Tk): def __init__(self): super().__init__() self.title("Switch Active Ports (Link + PoE)") self.geometry("980x520") self.minsize(860, 420) self._refresh_running = True self._refresh_in_progress = False self._sort_state = {} # col -> bool ascending self._build_ui() self._set_status("Ready.") self._schedule_refresh(initial=True) def _build_ui(self): top = ttk.Frame(self, padding=10) top.pack(fill="both", expand=True) # Controls controls = ttk.Frame(top) controls.pack(fill="x") self.ip_var = tk.StringVar(value=SWITCH_IP) ttk.Label(controls, text="Switch IP:").pack(side="left") ip_entry = ttk.Entry(controls, textvariable=self.ip_var, width=18) ip_entry.pack(side="left", padx=(6, 14)) self.interval_var = tk.IntVar(value=REFRESH_SECONDS) ttk.Label(controls, text="Refresh (s):").pack(side="left") interval_spin = ttk.Spinbox(controls, from_=3, to=3600, textvariable=self.interval_var, width=6) interval_spin.pack(side="left", padx=(6, 14)) self.btn_refresh = ttk.Button(controls, text="Refresh now", command=self.refresh_now) self.btn_refresh.pack(side="left", padx=(0, 8)) self.btn_toggle = ttk.Button(controls, text="Stop auto-refresh", command=self.toggle_refresh) self.btn_toggle.pack(side="left") ttk.Separator(top, orient="horizontal").pack(fill="x", pady=10) # Table cols = ("port", "vlan", "speed", "duplex", "poe_admin", "poe_oper", "poe_power_w") self.tree = ttk.Treeview(top, columns=cols, show="headings", height=16) headings = { "port": "Port", "vlan": "VLAN", "speed": "Speed", "duplex": "Duplex", "poe_admin": "PoE Enabled", "poe_oper": "PoE Status", "poe_power_w": "Power (W)", } widths = { "port": 110, "vlan": 80, "speed": 140, "duplex": 90, "poe_admin": 110, "poe_oper": 110, "poe_power_w": 110, } for c in cols: self.tree.heading(c, text=headings[c], command=lambda col=c: self.sort_by(col)) self.tree.column(c, width=widths[c], anchor="w", stretch=True) vsb = ttk.Scrollbar(top, orient="vertical", command=self.tree.yview) self.tree.configure(yscrollcommand=vsb.set) self.tree.pack(side="left", fill="both", expand=True) vsb.pack(side="right", fill="y") # Status bar self.status_var = tk.StringVar(value="") status = ttk.Label(self, textvariable=self.status_var, anchor="w", padding=(10, 6)) status.pack(fill="x") def _set_status(self, msg: str): self.status_var.set(msg) def toggle_refresh(self): self._refresh_running = not self._refresh_running self.btn_toggle.config(text="Stop auto-refresh" if self._refresh_running else "Start auto-refresh") if self._refresh_running: self._set_status("Auto-refresh enabled.") self._schedule_refresh(initial=False) else: self._set_status("Auto-refresh stopped.") def refresh_now(self): if self._refresh_in_progress: return self._start_refresh_thread() def _schedule_refresh(self, initial=False): # schedule the next refresh try: interval = int(self.interval_var.get()) except Exception: interval = REFRESH_SECONDS if initial: # do one immediate refresh at startup self.after(50, self.refresh_now) # loop scheduler def tick(): if self._refresh_running and (not self._refresh_in_progress): self._start_refresh_thread() self._schedule_refresh(initial=False) self.after(max(1000, interval * 1000), tick) def _start_refresh_thread(self): self._refresh_in_progress = True self.btn_refresh.config(state="disabled") self._set_status("Refreshing...") # update global config from UI global SWITCH_IP SWITCH_IP = self.ip_var.get().strip() or SWITCH_IP t = threading.Thread(target=self._refresh_worker, daemon=True) t.start() def _refresh_worker(self): try: ports, info = collect_active_ports() now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.after(0, lambda: self._update_table(ports, now, info)) except (ConnectionError, pexpect.exceptions.TIMEOUT, pexpect.exceptions.EOF, OSError, socket.error) as e: now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.after(0, lambda: self._show_error(f"Telnet/pexpect error: {e}", now)) except Exception as e: now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.after(0, lambda: self._show_error(f"Unexpected error: {e}", now)) def _show_error(self, msg: str, ts: str): self._refresh_in_progress = False self.btn_refresh.config(state="normal") self._set_status(f"❌ {msg} (Last attempt: {ts})") def _update_table(self, ports, ts: str, info: str): # clear for item in self.tree.get_children(): self.tree.delete(item) # insert for p in ports: self.tree.insert("", "end", values=( p["port"], p["vlan"], p["speed"], p["duplex"], p["poe_admin"], p["poe_oper"], p["poe_power_w"] )) self._refresh_in_progress = False self.btn_refresh.config(state="normal") if ports: self._set_status(f"✔ {len(ports)} active ports. Last refresh: {ts}") else: self._set_status(f"ℹ No active (connected) ports. Last refresh: {ts}") def sort_by(self, col: str): items = [(self.tree.set(k, col), k) for k in self.tree.get_children("")] ascending = self._sort_state.get(col, True) def keyfunc(x): v = x[0] # numeric-ish sorting for VLAN and Power if possible if col == "vlan": try: return int(v) except Exception: return 0 if col == "poe_power_w": return poe_power_to_float(v) # Port sort: "Port 12" -> 12 if col == "port": m = re.search(r"(\d+)$", v) return int(m.group(1)) if m else 0 return v.lower() if isinstance(v, str) else v items.sort(key=keyfunc, reverse=not ascending) for index, (_, k) in enumerate(items): self.tree.move(k, "", index) self._sort_state[col] = not ascending def main(): # Basic sanity hints if creds empty if USERNAME == "" and PASSWORD == "" and ENABLE_PASSWORD == "": # Not fatal; some devices prompt only for password or none. pass app = App() app.mainloop() if __name__ == "__main__": main()