CiscoConnect/status.py

446 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import time
import re
import os
import sys
import socket
import threading
from datetime import datetime
import tkinter as tk
from tkinter import ttk, messagebox
import pexpect
# ===========================================
# CONFIG
# ===========================================
SWITCH_IP = "10.183.100.11"
TELNET_PORT = 23
TELNET_TIMEOUT = 5
# Tip: You can also set these via environment variables to avoid hardcoding:
# export SWITCH_USER="..."
# export SWITCH_PASS="..."
# export SWITCH_ENABLE="..."
USERNAME = ""
PASSWORD = "Wprs000qq!"
ENABLE_PASSWORD = "Wprs000qq!"
REFRESH_SECONDS = 10
# ===========================================
# HELPERS (your originals + small extras)
# ===========================================
def cisco_to_portnum(ifname: str) -> str:
"""Convert Gi1/0/4 -> Port 4."""
m = re.search(r"/(\d+)$", ifname)
return f"Port {m.group(1)}" if m else ifname
def speed_to_label(speed_field: str) -> str:
"""Convert Cisco speeds to ! <human text>."""
if not speed_field:
return "! unknown"
s = speed_field.strip().lower()
mapping = {
"10": "10 Mbps",
"a-10": "10 Mbps",
"100": "100 Mbps",
"a-100": "100 Mbps",
"1000": "1 Gbps",
"a-1000": "1 Gbps",
"10000": "10 Gbps",
"a-10000": "10 Gbps",
}
if s in mapping:
return f"{mapping[s]}"
if "auto" in s:
return "auto"
return f"{speed_field}"
def poe_power_to_float(power_str: str) -> float:
"""Best-effort convert PoE power like '15.4' to float for sorting."""
try:
return float(power_str)
except Exception:
return 0.0
# ===========================================
# TELNET (pexpect) FUNCTIONS (your originals)
# ===========================================
def telnet_login(ip, port, username, password, enable_password):
"""
Attempt a Telnet login using pexpect — raise exception on failure.
Returns a pexpect.spawn object.
"""
child = pexpect.spawn(f"telnet {ip} {port}", timeout=TELNET_TIMEOUT, encoding="utf-8")
idx = child.expect([r"[Uu]sername:", r"[Pp]assword:", r">", r"#", pexpect.TIMEOUT, pexpect.EOF])
if idx == 0:
child.sendline(username)
child.expect(r"[Pp]assword:")
child.sendline(password)
child.expect([r">", r"#"])
elif idx == 1:
child.sendline(password)
child.expect([r">", r"#"])
elif idx in (2, 3):
pass
else:
raise ConnectionError("Telnet login failed (no prompt / timeout)")
if child.after.strip().endswith(">"):
child.sendline("enable")
child.expect(r"[Pp]assword:")
child.sendline(enable_password)
child.expect(r"#")
child.sendline("terminal length 0")
child.expect(r"#")
return child
def send_command(child: pexpect.spawn, cmd: str) -> str:
child.sendline(cmd)
child.expect(r"#")
output = child.before
lines = output.splitlines()
if len(lines) >= 1:
lines = lines[1:]
return "\n".join(lines)
# ===========================================
# PARSERS (your originals)
# ===========================================
def parse_show_interfaces_status(output: str):
interfaces = {}
lines = output.splitlines()
header_idx = None
for i, line in enumerate(lines):
if line.strip().startswith("Port"):
header_idx = i
break
if header_idx is None:
return interfaces
for line in lines[header_idx + 1:]:
if not line.strip():
continue
parts = re.split(r"\s+", line)
if len(parts) < 7:
m = re.match(r"^(?P<port>\S+)\s+(?P<rest>.+)$", line)
if not m:
continue
port = m.group("port")
rest_parts = re.split(r"\s+", m.group("rest"))
if len(rest_parts) < 5:
continue
type_ = rest_parts[-1]
speed = rest_parts[-2]
duplex = rest_parts[-3]
vlan = rest_parts[-4]
status = rest_parts[-5]
else:
port = parts[0]
status = parts[2]
vlan = parts[3]
duplex = parts[4]
speed = parts[5]
type_ = " ".join(parts[6:])
interfaces[port] = {
"status": status,
"vlan": vlan,
"duplex": duplex,
"speed": speed,
"type": type_,
}
return interfaces
def parse_show_power_inline(output: str):
poe = {}
lines = output.splitlines()
header_idx = None
for i, line in enumerate(lines):
if line.strip().startswith("Interface"):
header_idx = i
break
if header_idx is None:
return poe
for line in lines[header_idx + 1:]:
if not line.strip():
continue
parts = re.split(r"\s+", line)
if len(parts) < 4:
continue
iface = parts[0]
admin = parts[1]
oper = parts[2]
power = parts[3]
poe[iface] = {"admin": admin, "oper": oper, "power": power}
return poe
# ===========================================
# DATA COLLECTION
# ===========================================
def collect_active_ports():
"""
Returns: (active_ports_list, info_message)
active_ports_list: list[dict]
"""
child = None
try:
child = telnet_login(SWITCH_IP, TELNET_PORT, USERNAME, PASSWORD, ENABLE_PASSWORD)
show_int_status = send_command(child, "show interfaces status")
show_poe = send_command(child, "show power inline")
info = "OK"
finally:
try:
if child is not None:
child.sendline("exit")
child.close()
except Exception:
pass
int_info = parse_show_interfaces_status(show_int_status)
poe_info = parse_show_power_inline(show_poe)
active_ports = []
for port, iface in int_info.items():
if iface.get("status", "").lower() != "connected":
continue
poe = poe_info.get(port, {})
active_ports.append({
"port": cisco_to_portnum(port),
"vlan": iface.get("vlan", ""),
"speed": speed_to_label(iface.get("speed", "")),
"duplex": iface.get("duplex", ""),
"poe_admin": poe.get("admin", "n/a"),
"poe_oper": poe.get("oper", "n/a"),
"poe_power_w": poe.get("power", "0.0"),
})
return active_ports, info
# ===========================================
# GUI
# ===========================================
class App(tk.Tk):
def __init__(self):
super().__init__()
self.title("Switch Active Ports (Link + PoE)")
self.geometry("980x520")
self.minsize(860, 420)
self._refresh_running = True
self._refresh_in_progress = False
self._sort_state = {} # col -> bool ascending
self._build_ui()
self._set_status("Ready.")
self._schedule_refresh(initial=True)
def _build_ui(self):
top = ttk.Frame(self, padding=10)
top.pack(fill="both", expand=True)
# Controls
controls = ttk.Frame(top)
controls.pack(fill="x")
self.ip_var = tk.StringVar(value=SWITCH_IP)
ttk.Label(controls, text="Switch IP:").pack(side="left")
ip_entry = ttk.Entry(controls, textvariable=self.ip_var, width=18)
ip_entry.pack(side="left", padx=(6, 14))
self.interval_var = tk.IntVar(value=REFRESH_SECONDS)
ttk.Label(controls, text="Refresh (s):").pack(side="left")
interval_spin = ttk.Spinbox(controls, from_=3, to=3600, textvariable=self.interval_var, width=6)
interval_spin.pack(side="left", padx=(6, 14))
self.btn_refresh = ttk.Button(controls, text="Refresh now", command=self.refresh_now)
self.btn_refresh.pack(side="left", padx=(0, 8))
self.btn_toggle = ttk.Button(controls, text="Stop auto-refresh", command=self.toggle_refresh)
self.btn_toggle.pack(side="left")
ttk.Separator(top, orient="horizontal").pack(fill="x", pady=10)
# Table
cols = ("port", "vlan", "speed", "duplex", "poe_admin", "poe_oper", "poe_power_w")
self.tree = ttk.Treeview(top, columns=cols, show="headings", height=16)
headings = {
"port": "Port",
"vlan": "VLAN",
"speed": "Speed",
"duplex": "Duplex",
"poe_admin": "PoE Enabled",
"poe_oper": "PoE Status",
"poe_power_w": "Power (W)",
}
widths = {
"port": 110,
"vlan": 80,
"speed": 140,
"duplex": 90,
"poe_admin": 110,
"poe_oper": 110,
"poe_power_w": 110,
}
for c in cols:
self.tree.heading(c, text=headings[c], command=lambda col=c: self.sort_by(col))
self.tree.column(c, width=widths[c], anchor="w", stretch=True)
vsb = ttk.Scrollbar(top, orient="vertical", command=self.tree.yview)
self.tree.configure(yscrollcommand=vsb.set)
self.tree.pack(side="left", fill="both", expand=True)
vsb.pack(side="right", fill="y")
# Status bar
self.status_var = tk.StringVar(value="")
status = ttk.Label(self, textvariable=self.status_var, anchor="w", padding=(10, 6))
status.pack(fill="x")
def _set_status(self, msg: str):
self.status_var.set(msg)
def toggle_refresh(self):
self._refresh_running = not self._refresh_running
self.btn_toggle.config(text="Stop auto-refresh" if self._refresh_running else "Start auto-refresh")
if self._refresh_running:
self._set_status("Auto-refresh enabled.")
self._schedule_refresh(initial=False)
else:
self._set_status("Auto-refresh stopped.")
def refresh_now(self):
if self._refresh_in_progress:
return
self._start_refresh_thread()
def _schedule_refresh(self, initial=False):
# schedule the next refresh
try:
interval = int(self.interval_var.get())
except Exception:
interval = REFRESH_SECONDS
if initial:
# do one immediate refresh at startup
self.after(50, self.refresh_now)
# loop scheduler
def tick():
if self._refresh_running and (not self._refresh_in_progress):
self._start_refresh_thread()
self._schedule_refresh(initial=False)
self.after(max(1000, interval * 1000), tick)
def _start_refresh_thread(self):
self._refresh_in_progress = True
self.btn_refresh.config(state="disabled")
self._set_status("Refreshing...")
# update global config from UI
global SWITCH_IP
SWITCH_IP = self.ip_var.get().strip() or SWITCH_IP
t = threading.Thread(target=self._refresh_worker, daemon=True)
t.start()
def _refresh_worker(self):
try:
ports, info = collect_active_ports()
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.after(0, lambda: self._update_table(ports, now, info))
except (ConnectionError, pexpect.exceptions.TIMEOUT, pexpect.exceptions.EOF, OSError, socket.error) as e:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.after(0, lambda: self._show_error(f"Telnet/pexpect error: {e}", now))
except Exception as e:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.after(0, lambda: self._show_error(f"Unexpected error: {e}", now))
def _show_error(self, msg: str, ts: str):
self._refresh_in_progress = False
self.btn_refresh.config(state="normal")
self._set_status(f"{msg} (Last attempt: {ts})")
def _update_table(self, ports, ts: str, info: str):
# clear
for item in self.tree.get_children():
self.tree.delete(item)
# insert
for p in ports:
self.tree.insert("", "end", values=(
p["port"], p["vlan"], p["speed"], p["duplex"],
p["poe_admin"], p["poe_oper"], p["poe_power_w"]
))
self._refresh_in_progress = False
self.btn_refresh.config(state="normal")
if ports:
self._set_status(f"{len(ports)} active ports. Last refresh: {ts}")
else:
self._set_status(f" No active (connected) ports. Last refresh: {ts}")
def sort_by(self, col: str):
items = [(self.tree.set(k, col), k) for k in self.tree.get_children("")]
ascending = self._sort_state.get(col, True)
def keyfunc(x):
v = x[0]
# numeric-ish sorting for VLAN and Power if possible
if col == "vlan":
try:
return int(v)
except Exception:
return 0
if col == "poe_power_w":
return poe_power_to_float(v)
# Port sort: "Port 12" -> 12
if col == "port":
m = re.search(r"(\d+)$", v)
return int(m.group(1)) if m else 0
return v.lower() if isinstance(v, str) else v
items.sort(key=keyfunc, reverse=not ascending)
for index, (_, k) in enumerate(items):
self.tree.move(k, "", index)
self._sort_state[col] = not ascending
def main():
# Basic sanity hints if creds empty
if USERNAME == "" and PASSWORD == "" and ENABLE_PASSWORD == "":
# Not fatal; some devices prompt only for password or none.
pass
app = App()
app.mainloop()
if __name__ == "__main__":
main()