This commit is contained in:
anders 2025-12-19 09:43:14 +01:00
parent dfa07a7331
commit c528d4b623
1 changed files with 268 additions and 119 deletions

387
status.py
View File

@ -4,6 +4,11 @@ import re
import os import os
import sys import sys
import socket import socket
import threading
from datetime import datetime
import tkinter as tk
from tkinter import ttk, messagebox
import pexpect import pexpect
# =========================================== # ===========================================
@ -13,38 +18,25 @@ SWITCH_IP = "10.183.100.11"
TELNET_PORT = 23 TELNET_PORT = 23
TELNET_TIMEOUT = 5 TELNET_TIMEOUT = 5
# Tip: You can also set these via environment variables to avoid hardcoding:
# export SWITCH_USER="..."
# export SWITCH_PASS="..."
# export SWITCH_ENABLE="..."
USERNAME = "" USERNAME = ""
PASSWORD = "Wprs000qq!" PASSWORD = "Wprs000qq!"
ENABLE_PASSWORD = "Wprs000qq!" ENABLE_PASSWORD = "Wprs000qq!"
# -------------------------- REFRESH_SECONDS = 10
# HELPERS
# --------------------------
def clear_screen():
"""Best-effort clear that works in terminals & IDEs."""
try:
sys.stdout.write("\033c")
sys.stdout.flush()
return
except Exception:
pass
try:
os.system('cls' if os.name == 'nt' else 'clear')
return
except Exception:
pass
print("\n" * 100)
# ===========================================
# HELPERS (your originals + small extras)
# ===========================================
def cisco_to_portnum(ifname: str) -> str: def cisco_to_portnum(ifname: str) -> str:
"""Convert Gi1/0/4 -> Port 4.""" """Convert Gi1/0/4 -> Port 4."""
m = re.search(r"/(\d+)$", ifname) m = re.search(r"/(\d+)$", ifname)
return f"Port {m.group(1)}" if m else ifname return f"Port {m.group(1)}" if m else ifname
def speed_to_label(speed_field: str) -> str: def speed_to_label(speed_field: str) -> str:
"""Convert Cisco speeds to ! <human text>.""" """Convert Cisco speeds to ! <human text>."""
if not speed_field: if not speed_field:
@ -62,88 +54,73 @@ def speed_to_label(speed_field: str) -> str:
"a-10000": "10 Gbps", "a-10000": "10 Gbps",
} }
if s in mapping: if s in mapping:
return f"! {mapping[s]}" return f"{mapping[s]}"
if "auto" in s: if "auto" in s:
return "! auto" return "auto"
return f"! {speed_field}" return f"{speed_field}"
def poe_power_to_float(power_str: str) -> float:
"""Best-effort convert PoE power like '15.4' to float for sorting."""
try:
return float(power_str)
except Exception:
return 0.0
# -------------------------- # ===========================================
# "TELNET" FUNCTIONS (NOW PEXPECT) # TELNET (pexpect) FUNCTIONS (your originals)
# -------------------------- # ===========================================
def telnet_login(ip, port, username, password, enable_password): def telnet_login(ip, port, username, password, enable_password):
""" """
Attempt a Telnet login using pexpect raise exception on failure. Attempt a Telnet login using pexpect raise exception on failure.
Returns a pexpect.spawn object. Returns a pexpect.spawn object.
""" """
# telnet ip port
child = pexpect.spawn(f"telnet {ip} {port}", timeout=TELNET_TIMEOUT, encoding="utf-8") child = pexpect.spawn(f"telnet {ip} {port}", timeout=TELNET_TIMEOUT, encoding="utf-8")
# Handle different possible prompts
# We expect one of: Username:, Password:, >, #
idx = child.expect([r"[Uu]sername:", r"[Pp]assword:", r">", r"#", pexpect.TIMEOUT, pexpect.EOF]) idx = child.expect([r"[Uu]sername:", r"[Pp]assword:", r">", r"#", pexpect.TIMEOUT, pexpect.EOF])
if idx == 0: if idx == 0:
# Got Username:
child.sendline(username) child.sendline(username)
child.expect(r"[Pp]assword:") child.expect(r"[Pp]assword:")
child.sendline(password) child.sendline(password)
# After this we should land at ">" or "#"
child.expect([r">", r"#"]) child.expect([r">", r"#"])
elif idx == 1: elif idx == 1:
# Got Password: (no username)
child.sendline(password) child.sendline(password)
child.expect([r">", r"#"]) child.expect([r">", r"#"])
elif idx in (2, 3): elif idx in (2, 3):
# Already at > or # (no login)
pass pass
else: else:
# TIMEOUT or EOF
raise ConnectionError("Telnet login failed (no prompt / timeout)") raise ConnectionError("Telnet login failed (no prompt / timeout)")
# If we're at ">" we need to enter enable
if child.after.strip().endswith(">"): if child.after.strip().endswith(">"):
child.sendline("enable") child.sendline("enable")
child.expect(r"[Pp]assword:") child.expect(r"[Pp]assword:")
child.sendline(enable_password) child.sendline(enable_password)
child.expect(r"#") child.expect(r"#")
# Disable paging
child.sendline("terminal length 0") child.sendline("terminal length 0")
child.expect(r"#") child.expect(r"#")
return child return child
def send_command(child: pexpect.spawn, cmd: str) -> str: def send_command(child: pexpect.spawn, cmd: str) -> str:
"""
Send a command and return the body of the output
(without echoed command and trailing prompt).
"""
child.sendline(cmd) child.sendline(cmd)
child.expect(r"#") # wait for prompt child.expect(r"#")
# child.before contains everything printed *before* the prompt
output = child.before output = child.before
# child.before also includes the command we sent on the first line
lines = output.splitlines() lines = output.splitlines()
if len(lines) >= 1: if len(lines) >= 1:
# drop the first line (echoed command)
lines = lines[1:] lines = lines[1:]
return "\n".join(lines) return "\n".join(lines)
# ===========================================
# -------------------------- # PARSERS (your originals)
# PARSERS # ===========================================
# --------------------------
def parse_show_interfaces_status(output: str): def parse_show_interfaces_status(output: str):
interfaces = {} interfaces = {}
lines = output.splitlines() lines = output.splitlines()
# find header header_idx = None
header_idx = None # noqa
for i, line in enumerate(lines): for i, line in enumerate(lines):
if line.strip().startswith("Port"): if line.strip().startswith("Port"):
header_idx = i header_idx = i
@ -185,7 +162,6 @@ def parse_show_interfaces_status(output: str):
} }
return interfaces return interfaces
def parse_show_power_inline(output: str): def parse_show_power_inline(output: str):
poe = {} poe = {}
lines = output.splitlines() lines = output.splitlines()
@ -215,81 +191,254 @@ def parse_show_power_inline(output: str):
return poe return poe
# ===========================================
# DATA COLLECTION
# ===========================================
# -------------------------- def collect_active_ports():
# MAIN LOOP """
# -------------------------- Returns: (active_ports_list, info_message)
active_ports_list: list[dict]
def main(): """
while True: child = None
try:
child = telnet_login(SWITCH_IP, TELNET_PORT, USERNAME, PASSWORD, ENABLE_PASSWORD)
show_int_status = send_command(child, "show interfaces status")
show_poe = send_command(child, "show power inline")
info = "OK"
finally:
try: try:
child = telnet_login(SWITCH_IP, TELNET_PORT, USERNAME, PASSWORD, ENABLE_PASSWORD) if child is not None:
except (ConnectionError, pexpect.exceptions.TIMEOUT, pexpect.exceptions.EOF, OSError, socket.error): child.sendline("exit")
print("❌ Telnet/pexpect login failed — retrying in 10 seconds...") child.close()
time.sleep(10)
continue
# Connected successfully
clear_screen()
print("✔ Telnet OK — gathering data...")
try:
show_int_status = send_command(child, "show interfaces status")
show_poe = send_command(child, "show power inline")
except (pexpect.exceptions.TIMEOUT, pexpect.exceptions.EOF, OSError):
print("❌ Error while running commands — will retry in 10 seconds...")
try:
child.close(force=True)
except Exception:
pass
time.sleep(10)
continue
# Try to exit cleanly
try:
child.sendline("exit")
child.close()
except Exception: except Exception:
pass pass
# parse int_info = parse_show_interfaces_status(show_int_status)
int_info = parse_show_interfaces_status(show_int_status) poe_info = parse_show_power_inline(show_poe)
poe_info = parse_show_power_inline(show_poe)
# find connected ports active_ports = []
active_ports = [] for port, iface in int_info.items():
for port, info in int_info.items(): if iface.get("status", "").lower() != "connected":
if info["status"].lower() != "connected": continue
continue poe = poe_info.get(port, {})
active_ports.append({
"port": cisco_to_portnum(port),
"vlan": iface.get("vlan", ""),
"speed": speed_to_label(iface.get("speed", "")),
"duplex": iface.get("duplex", ""),
"poe_admin": poe.get("admin", "n/a"),
"poe_oper": poe.get("oper", "n/a"),
"poe_power_w": poe.get("power", "0.0"),
})
poe = poe_info.get(port, {}) return active_ports, info
active_ports.append({
"port": cisco_to_portnum(port),
"speed": speed_to_label(info["speed"]),
"duplex": info["duplex"],
"vlan": info["vlan"],
"poe_admin": poe.get("admin", "n/a"),
"poe_oper": poe.get("oper", "n/a"),
"poe_power_w": poe.get("power", "0.0"),
})
clear_screen() # ===========================================
print("Active ports with link + PoE info:\n") # GUI
for p in active_ports: # ===========================================
print(
f"{p['port']:>8} | VLAN {p['vlan']:>4} | "
f"{p['speed']:<12} | "
f"{p['duplex']:>7} | "
f"PoE enabled: {p['poe_admin']:<5} | "
f"PoE: {p['poe_oper']:<5} | "
f"Power: {p['poe_power_w']} W"
)
if not active_ports: class App(tk.Tk):
print("No active (connected) ports.") def __init__(self):
super().__init__()
self.title("Switch Active Ports (Link + PoE)")
self.geometry("980x520")
self.minsize(860, 420)
print("\n⏳ Waiting 10 seconds...\n") self._refresh_running = True
time.sleep(10) self._refresh_in_progress = False
self._sort_state = {} # col -> bool ascending
self._build_ui()
self._set_status("Ready.")
self._schedule_refresh(initial=True)
def _build_ui(self):
top = ttk.Frame(self, padding=10)
top.pack(fill="both", expand=True)
# Controls
controls = ttk.Frame(top)
controls.pack(fill="x")
self.ip_var = tk.StringVar(value=SWITCH_IP)
ttk.Label(controls, text="Switch IP:").pack(side="left")
ip_entry = ttk.Entry(controls, textvariable=self.ip_var, width=18)
ip_entry.pack(side="left", padx=(6, 14))
self.interval_var = tk.IntVar(value=REFRESH_SECONDS)
ttk.Label(controls, text="Refresh (s):").pack(side="left")
interval_spin = ttk.Spinbox(controls, from_=3, to=3600, textvariable=self.interval_var, width=6)
interval_spin.pack(side="left", padx=(6, 14))
self.btn_refresh = ttk.Button(controls, text="Refresh now", command=self.refresh_now)
self.btn_refresh.pack(side="left", padx=(0, 8))
self.btn_toggle = ttk.Button(controls, text="Stop auto-refresh", command=self.toggle_refresh)
self.btn_toggle.pack(side="left")
ttk.Separator(top, orient="horizontal").pack(fill="x", pady=10)
# Table
cols = ("port", "vlan", "speed", "duplex", "poe_admin", "poe_oper", "poe_power_w")
self.tree = ttk.Treeview(top, columns=cols, show="headings", height=16)
headings = {
"port": "Port",
"vlan": "VLAN",
"speed": "Speed",
"duplex": "Duplex",
"poe_admin": "PoE Enabled",
"poe_oper": "PoE Status",
"poe_power_w": "Power (W)",
}
widths = {
"port": 110,
"vlan": 80,
"speed": 140,
"duplex": 90,
"poe_admin": 110,
"poe_oper": 110,
"poe_power_w": 110,
}
for c in cols:
self.tree.heading(c, text=headings[c], command=lambda col=c: self.sort_by(col))
self.tree.column(c, width=widths[c], anchor="w", stretch=True)
vsb = ttk.Scrollbar(top, orient="vertical", command=self.tree.yview)
self.tree.configure(yscrollcommand=vsb.set)
self.tree.pack(side="left", fill="both", expand=True)
vsb.pack(side="right", fill="y")
# Status bar
self.status_var = tk.StringVar(value="")
status = ttk.Label(self, textvariable=self.status_var, anchor="w", padding=(10, 6))
status.pack(fill="x")
def _set_status(self, msg: str):
self.status_var.set(msg)
def toggle_refresh(self):
self._refresh_running = not self._refresh_running
self.btn_toggle.config(text="Stop auto-refresh" if self._refresh_running else "Start auto-refresh")
if self._refresh_running:
self._set_status("Auto-refresh enabled.")
self._schedule_refresh(initial=False)
else:
self._set_status("Auto-refresh stopped.")
def refresh_now(self):
if self._refresh_in_progress:
return
self._start_refresh_thread()
def _schedule_refresh(self, initial=False):
# schedule the next refresh
try:
interval = int(self.interval_var.get())
except Exception:
interval = REFRESH_SECONDS
if initial:
# do one immediate refresh at startup
self.after(50, self.refresh_now)
# loop scheduler
def tick():
if self._refresh_running and (not self._refresh_in_progress):
self._start_refresh_thread()
self._schedule_refresh(initial=False)
self.after(max(1000, interval * 1000), tick)
def _start_refresh_thread(self):
self._refresh_in_progress = True
self.btn_refresh.config(state="disabled")
self._set_status("Refreshing...")
# update global config from UI
global SWITCH_IP
SWITCH_IP = self.ip_var.get().strip() or SWITCH_IP
t = threading.Thread(target=self._refresh_worker, daemon=True)
t.start()
def _refresh_worker(self):
try:
ports, info = collect_active_ports()
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.after(0, lambda: self._update_table(ports, now, info))
except (ConnectionError, pexpect.exceptions.TIMEOUT, pexpect.exceptions.EOF, OSError, socket.error) as e:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.after(0, lambda: self._show_error(f"Telnet/pexpect error: {e}", now))
except Exception as e:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.after(0, lambda: self._show_error(f"Unexpected error: {e}", now))
def _show_error(self, msg: str, ts: str):
self._refresh_in_progress = False
self.btn_refresh.config(state="normal")
self._set_status(f"{msg} (Last attempt: {ts})")
def _update_table(self, ports, ts: str, info: str):
# clear
for item in self.tree.get_children():
self.tree.delete(item)
# insert
for p in ports:
self.tree.insert("", "end", values=(
p["port"], p["vlan"], p["speed"], p["duplex"],
p["poe_admin"], p["poe_oper"], p["poe_power_w"]
))
self._refresh_in_progress = False
self.btn_refresh.config(state="normal")
if ports:
self._set_status(f"{len(ports)} active ports. Last refresh: {ts}")
else:
self._set_status(f" No active (connected) ports. Last refresh: {ts}")
def sort_by(self, col: str):
items = [(self.tree.set(k, col), k) for k in self.tree.get_children("")]
ascending = self._sort_state.get(col, True)
def keyfunc(x):
v = x[0]
# numeric-ish sorting for VLAN and Power if possible
if col == "vlan":
try:
return int(v)
except Exception:
return 0
if col == "poe_power_w":
return poe_power_to_float(v)
# Port sort: "Port 12" -> 12
if col == "port":
m = re.search(r"(\d+)$", v)
return int(m.group(1)) if m else 0
return v.lower() if isinstance(v, str) else v
items.sort(key=keyfunc, reverse=not ascending)
for index, (_, k) in enumerate(items):
self.tree.move(k, "", index)
self._sort_state[col] = not ascending
def main():
# Basic sanity hints if creds empty
if USERNAME == "" and PASSWORD == "" and ENABLE_PASSWORD == "":
# Not fatal; some devices prompt only for password or none.
pass
app = App()
app.mainloop()
if __name__ == "__main__": if __name__ == "__main__":