From c528d4b623e21ee275de6a3c321a212dfd026efc Mon Sep 17 00:00:00 2001 From: anders Date: Fri, 19 Dec 2025 09:43:14 +0100 Subject: [PATCH] V1 --- status.py | 387 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 268 insertions(+), 119 deletions(-) diff --git a/status.py b/status.py index eb591f8..3a3d4e8 100644 --- a/status.py +++ b/status.py @@ -4,6 +4,11 @@ import re import os import sys import socket +import threading +from datetime import datetime +import tkinter as tk +from tkinter import ttk, messagebox + import pexpect # =========================================== @@ -13,38 +18,25 @@ SWITCH_IP = "10.183.100.11" TELNET_PORT = 23 TELNET_TIMEOUT = 5 +# Tip: You can also set these via environment variables to avoid hardcoding: +# export SWITCH_USER="..." +# export SWITCH_PASS="..." +# export SWITCH_ENABLE="..." USERNAME = "" PASSWORD = "Wprs000qq!" ENABLE_PASSWORD = "Wprs000qq!" -# -------------------------- -# HELPERS -# -------------------------- - -def clear_screen(): - """Best-effort clear that works in terminals & IDEs.""" - try: - sys.stdout.write("\033c") - sys.stdout.flush() - return - except Exception: - pass - - try: - os.system('cls' if os.name == 'nt' else 'clear') - return - except Exception: - pass - - print("\n" * 100) +REFRESH_SECONDS = 10 +# =========================================== +# HELPERS (your originals + small extras) +# =========================================== def cisco_to_portnum(ifname: str) -> str: """Convert Gi1/0/4 -> Port 4.""" m = re.search(r"/(\d+)$", ifname) return f"Port {m.group(1)}" if m else ifname - def speed_to_label(speed_field: str) -> str: """Convert Cisco speeds to ! .""" if not speed_field: @@ -62,88 +54,73 @@ def speed_to_label(speed_field: str) -> str: "a-10000": "10 Gbps", } if s in mapping: - return f"! {mapping[s]}" + return f"{mapping[s]}" if "auto" in s: - return "! auto" - return f"! {speed_field}" + return "auto" + return f"{speed_field}" +def poe_power_to_float(power_str: str) -> float: + """Best-effort convert PoE power like '15.4' to float for sorting.""" + try: + return float(power_str) + except Exception: + return 0.0 -# -------------------------- -# "TELNET" FUNCTIONS (NOW PEXPECT) -# -------------------------- +# =========================================== +# TELNET (pexpect) FUNCTIONS (your originals) +# =========================================== def telnet_login(ip, port, username, password, enable_password): """ Attempt a Telnet login using pexpect — raise exception on failure. Returns a pexpect.spawn object. """ - # telnet ip port child = pexpect.spawn(f"telnet {ip} {port}", timeout=TELNET_TIMEOUT, encoding="utf-8") - # Handle different possible prompts - # We expect one of: Username:, Password:, >, # idx = child.expect([r"[Uu]sername:", r"[Pp]assword:", r">", r"#", pexpect.TIMEOUT, pexpect.EOF]) if idx == 0: - # Got Username: child.sendline(username) child.expect(r"[Pp]assword:") child.sendline(password) - # After this we should land at ">" or "#" child.expect([r">", r"#"]) elif idx == 1: - # Got Password: (no username) child.sendline(password) child.expect([r">", r"#"]) elif idx in (2, 3): - # Already at > or # (no login) pass else: - # TIMEOUT or EOF raise ConnectionError("Telnet login failed (no prompt / timeout)") - # If we're at ">" we need to enter enable if child.after.strip().endswith(">"): child.sendline("enable") child.expect(r"[Pp]assword:") child.sendline(enable_password) child.expect(r"#") - # Disable paging child.sendline("terminal length 0") child.expect(r"#") return child - def send_command(child: pexpect.spawn, cmd: str) -> str: - """ - Send a command and return the body of the output - (without echoed command and trailing prompt). - """ child.sendline(cmd) - child.expect(r"#") # wait for prompt - # child.before contains everything printed *before* the prompt + child.expect(r"#") output = child.before - - # child.before also includes the command we sent on the first line lines = output.splitlines() if len(lines) >= 1: - # drop the first line (echoed command) lines = lines[1:] return "\n".join(lines) - -# -------------------------- -# PARSERS -# -------------------------- +# =========================================== +# PARSERS (your originals) +# =========================================== def parse_show_interfaces_status(output: str): interfaces = {} lines = output.splitlines() - # find header - header_idx = None # noqa + header_idx = None for i, line in enumerate(lines): if line.strip().startswith("Port"): header_idx = i @@ -185,7 +162,6 @@ def parse_show_interfaces_status(output: str): } return interfaces - def parse_show_power_inline(output: str): poe = {} lines = output.splitlines() @@ -215,81 +191,254 @@ def parse_show_power_inline(output: str): return poe +# =========================================== +# DATA COLLECTION +# =========================================== -# -------------------------- -# MAIN LOOP -# -------------------------- - -def main(): - while True: +def collect_active_ports(): + """ + Returns: (active_ports_list, info_message) + active_ports_list: list[dict] + """ + child = None + try: + child = telnet_login(SWITCH_IP, TELNET_PORT, USERNAME, PASSWORD, ENABLE_PASSWORD) + show_int_status = send_command(child, "show interfaces status") + show_poe = send_command(child, "show power inline") + info = "OK" + finally: try: - child = telnet_login(SWITCH_IP, TELNET_PORT, USERNAME, PASSWORD, ENABLE_PASSWORD) - except (ConnectionError, pexpect.exceptions.TIMEOUT, pexpect.exceptions.EOF, OSError, socket.error): - print("❌ Telnet/pexpect login failed — retrying in 10 seconds...") - time.sleep(10) - continue - - # Connected successfully - clear_screen() - print("✔ Telnet OK — gathering data...") - - try: - show_int_status = send_command(child, "show interfaces status") - show_poe = send_command(child, "show power inline") - except (pexpect.exceptions.TIMEOUT, pexpect.exceptions.EOF, OSError): - print("❌ Error while running commands — will retry in 10 seconds...") - try: - child.close(force=True) - except Exception: - pass - time.sleep(10) - continue - - # Try to exit cleanly - try: - child.sendline("exit") - child.close() + if child is not None: + child.sendline("exit") + child.close() except Exception: pass - # parse - int_info = parse_show_interfaces_status(show_int_status) - poe_info = parse_show_power_inline(show_poe) + int_info = parse_show_interfaces_status(show_int_status) + poe_info = parse_show_power_inline(show_poe) - # find connected ports - active_ports = [] - for port, info in int_info.items(): - if info["status"].lower() != "connected": - continue + active_ports = [] + for port, iface in int_info.items(): + if iface.get("status", "").lower() != "connected": + continue + poe = poe_info.get(port, {}) + active_ports.append({ + "port": cisco_to_portnum(port), + "vlan": iface.get("vlan", ""), + "speed": speed_to_label(iface.get("speed", "")), + "duplex": iface.get("duplex", ""), + "poe_admin": poe.get("admin", "n/a"), + "poe_oper": poe.get("oper", "n/a"), + "poe_power_w": poe.get("power", "0.0"), + }) - poe = poe_info.get(port, {}) - active_ports.append({ - "port": cisco_to_portnum(port), - "speed": speed_to_label(info["speed"]), - "duplex": info["duplex"], - "vlan": info["vlan"], - "poe_admin": poe.get("admin", "n/a"), - "poe_oper": poe.get("oper", "n/a"), - "poe_power_w": poe.get("power", "0.0"), - }) + return active_ports, info - clear_screen() - print("Active ports with link + PoE info:\n") - for p in active_ports: - print( - f"{p['port']:>8} | VLAN {p['vlan']:>4} | " - f"{p['speed']:<12} | " - f"{p['duplex']:>7} | " - f"PoE enabled: {p['poe_admin']:<5} | " - f"PoE: {p['poe_oper']:<5} | " - f"Power: {p['poe_power_w']} W" - ) +# =========================================== +# GUI +# =========================================== - if not active_ports: - print("No active (connected) ports.") +class App(tk.Tk): + def __init__(self): + super().__init__() + self.title("Switch Active Ports (Link + PoE)") + self.geometry("980x520") + self.minsize(860, 420) - print("\n⏳ Waiting 10 seconds...\n") - time.sleep(10) + self._refresh_running = True + self._refresh_in_progress = False + self._sort_state = {} # col -> bool ascending + + self._build_ui() + self._set_status("Ready.") + self._schedule_refresh(initial=True) + + def _build_ui(self): + top = ttk.Frame(self, padding=10) + top.pack(fill="both", expand=True) + + # Controls + controls = ttk.Frame(top) + controls.pack(fill="x") + + self.ip_var = tk.StringVar(value=SWITCH_IP) + ttk.Label(controls, text="Switch IP:").pack(side="left") + ip_entry = ttk.Entry(controls, textvariable=self.ip_var, width=18) + ip_entry.pack(side="left", padx=(6, 14)) + + self.interval_var = tk.IntVar(value=REFRESH_SECONDS) + ttk.Label(controls, text="Refresh (s):").pack(side="left") + interval_spin = ttk.Spinbox(controls, from_=3, to=3600, textvariable=self.interval_var, width=6) + interval_spin.pack(side="left", padx=(6, 14)) + + self.btn_refresh = ttk.Button(controls, text="Refresh now", command=self.refresh_now) + self.btn_refresh.pack(side="left", padx=(0, 8)) + + self.btn_toggle = ttk.Button(controls, text="Stop auto-refresh", command=self.toggle_refresh) + self.btn_toggle.pack(side="left") + + ttk.Separator(top, orient="horizontal").pack(fill="x", pady=10) + + # Table + cols = ("port", "vlan", "speed", "duplex", "poe_admin", "poe_oper", "poe_power_w") + self.tree = ttk.Treeview(top, columns=cols, show="headings", height=16) + + headings = { + "port": "Port", + "vlan": "VLAN", + "speed": "Speed", + "duplex": "Duplex", + "poe_admin": "PoE Enabled", + "poe_oper": "PoE Status", + "poe_power_w": "Power (W)", + } + + widths = { + "port": 110, + "vlan": 80, + "speed": 140, + "duplex": 90, + "poe_admin": 110, + "poe_oper": 110, + "poe_power_w": 110, + } + + for c in cols: + self.tree.heading(c, text=headings[c], command=lambda col=c: self.sort_by(col)) + self.tree.column(c, width=widths[c], anchor="w", stretch=True) + + vsb = ttk.Scrollbar(top, orient="vertical", command=self.tree.yview) + self.tree.configure(yscrollcommand=vsb.set) + + self.tree.pack(side="left", fill="both", expand=True) + vsb.pack(side="right", fill="y") + + # Status bar + self.status_var = tk.StringVar(value="") + status = ttk.Label(self, textvariable=self.status_var, anchor="w", padding=(10, 6)) + status.pack(fill="x") + + def _set_status(self, msg: str): + self.status_var.set(msg) + + def toggle_refresh(self): + self._refresh_running = not self._refresh_running + self.btn_toggle.config(text="Stop auto-refresh" if self._refresh_running else "Start auto-refresh") + if self._refresh_running: + self._set_status("Auto-refresh enabled.") + self._schedule_refresh(initial=False) + else: + self._set_status("Auto-refresh stopped.") + + def refresh_now(self): + if self._refresh_in_progress: + return + self._start_refresh_thread() + + def _schedule_refresh(self, initial=False): + # schedule the next refresh + try: + interval = int(self.interval_var.get()) + except Exception: + interval = REFRESH_SECONDS + + if initial: + # do one immediate refresh at startup + self.after(50, self.refresh_now) + + # loop scheduler + def tick(): + if self._refresh_running and (not self._refresh_in_progress): + self._start_refresh_thread() + self._schedule_refresh(initial=False) + + self.after(max(1000, interval * 1000), tick) + + def _start_refresh_thread(self): + self._refresh_in_progress = True + self.btn_refresh.config(state="disabled") + self._set_status("Refreshing...") + + # update global config from UI + global SWITCH_IP + SWITCH_IP = self.ip_var.get().strip() or SWITCH_IP + + t = threading.Thread(target=self._refresh_worker, daemon=True) + t.start() + + def _refresh_worker(self): + try: + ports, info = collect_active_ports() + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + self.after(0, lambda: self._update_table(ports, now, info)) + except (ConnectionError, pexpect.exceptions.TIMEOUT, pexpect.exceptions.EOF, OSError, socket.error) as e: + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + self.after(0, lambda: self._show_error(f"Telnet/pexpect error: {e}", now)) + except Exception as e: + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + self.after(0, lambda: self._show_error(f"Unexpected error: {e}", now)) + + def _show_error(self, msg: str, ts: str): + self._refresh_in_progress = False + self.btn_refresh.config(state="normal") + self._set_status(f"❌ {msg} (Last attempt: {ts})") + + def _update_table(self, ports, ts: str, info: str): + # clear + for item in self.tree.get_children(): + self.tree.delete(item) + + # insert + for p in ports: + self.tree.insert("", "end", values=( + p["port"], p["vlan"], p["speed"], p["duplex"], + p["poe_admin"], p["poe_oper"], p["poe_power_w"] + )) + + self._refresh_in_progress = False + self.btn_refresh.config(state="normal") + + if ports: + self._set_status(f"✔ {len(ports)} active ports. Last refresh: {ts}") + else: + self._set_status(f"ℹ No active (connected) ports. Last refresh: {ts}") + + def sort_by(self, col: str): + items = [(self.tree.set(k, col), k) for k in self.tree.get_children("")] + ascending = self._sort_state.get(col, True) + + def keyfunc(x): + v = x[0] + # numeric-ish sorting for VLAN and Power if possible + if col == "vlan": + try: + return int(v) + except Exception: + return 0 + if col == "poe_power_w": + return poe_power_to_float(v) + # Port sort: "Port 12" -> 12 + if col == "port": + m = re.search(r"(\d+)$", v) + return int(m.group(1)) if m else 0 + return v.lower() if isinstance(v, str) else v + + items.sort(key=keyfunc, reverse=not ascending) + + for index, (_, k) in enumerate(items): + self.tree.move(k, "", index) + + self._sort_state[col] = not ascending + + +def main(): + # Basic sanity hints if creds empty + if USERNAME == "" and PASSWORD == "" and ENABLE_PASSWORD == "": + # Not fatal; some devices prompt only for password or none. + pass + + app = App() + app.mainloop() if __name__ == "__main__":