A24_Patriot_API/main.py

2401 lines
78 KiB
Python

from __future__ import annotations
import json
import os
import threading
import re
import logging
from datetime import datetime, timezone
from typing import Dict, Optional, List
from time import time
from pathlib import Path
from fastapi import FastAPI, HTTPException, Depends, Header, Request, Response, status
from pydantic import BaseModel, Field, EmailStr, field_validator
import pyodbc
# ----------------- Helpers for filenames -----------------
SAFE_NAME = re.compile(r"[^A-Za-z0-9._-]")
def _safe_folder(name: str) -> str:
cleaned = SAFE_NAME.sub("_", name.strip())
return cleaned or "unknown"
# ----------------- Basic JSON helpers (still used for keys.json) -----------------
def _load_json(path: str, default):
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
return default
except Exception as e:
logging.error("Failed to load %s: %s", path, e)
return default
# ----------------- Config -----------------
KEY_FILE = os.getenv("KEY_FILE", "keys.json")
XML_DIR = os.getenv("XML_DIR", "out/clients")
# MSSQL / Patriot DB config
SQL_SERVER = os.getenv("SQL_SERVER", "10.181.149.83") # or PATRIOTDB2\\SQLEXPRESS
SQL_PORT = os.getenv("SQL_PORT", "1433") # empty string if using named instance only
SQL_DATABASE = os.getenv("SQL_DATABASE", "Patriot")
SQL_USERNAME = os.getenv("SQL_USERNAME", "sa")
SQL_PASSWORD = os.getenv("SQL_PASSWORD", "sa")
ODBC_DRIVER = os.getenv("ODBC_DRIVER", "ODBC Driver 18 for SQL Server")
# ----------------- Models used in keys.json -----------------
class ClientGroupingConfig(BaseModel):
description: str
grouping_type_description: str
grouping_allow_multiple: bool = True
# ----------------- Auth (hot-reloaded key store) -----------------
class KeyRecord(BaseModel):
key_name: str
key: str
enabled: bool = True
valid_to: str # ISO-8601
# Used for __id in XML and DB Client_No: <client_id>{port}</client_id>
port: str = ""
# Per-key forced XML fields
installer_name: str = "" # <Installer>,
installer_email: str = ""
use_glob_callouts: bool = False # <UseGlobCallOuts>
show_on_callouts: bool = False # <ShowOnCallOuts>
glob_callouts: str = "" # <GlobCallOuts>
use_glob_callouts2: bool = False
show_on_callouts2: bool = False
glob_callouts2: str = ""
alt_lookup: bool = False # <AltLookup>
alt_alarm_no: str = "" # <AltAlarmNo>
convert_type: str = "None" # <ConvertType>
siginterpret: str = "SIADecimal" # <SIGINTERPRET>
client_groupings: List[ClientGroupingConfig] = Field(default_factory=list)
class KeyStore:
"""
Loads keys.json on-demand and supports hot reloading.
"""
def __init__(self, path: str):
self.path = path
self._lock = threading.Lock()
self._mtime = 0.0
self._keys: Dict[str, KeyRecord] = {}
self._reload()
def _reload(self):
mtime = os.path.getmtime(self.path) if os.path.exists(self.path) else 0
if mtime <= self._mtime:
return
logging.info("Reloading key store from %s", self.path)
raw = _load_json(self.path, {"keys": []})
# Support both formats:
# 1) { "keys": [ {...}, {...} ] }
# 2) [ {...}, {...} ]
if isinstance(raw, list):
keys_raw = raw
elif isinstance(raw, dict):
keys_raw = raw.get("keys", [])
else:
logging.error("Unexpected format in %s (must be list or dict)", self.path)
keys_raw = []
keys: Dict[str, KeyRecord] = {}
for rec in keys_raw:
try:
k = KeyRecord(**rec)
keys[k.key] = k
except Exception as e:
logging.error("Invalid key record in %s: %s", self.path, e)
self._keys = keys
self._mtime = mtime
def _parse_time(self, s: str) -> datetime:
return datetime.fromisoformat(s.replace("Z", "+00:00"))
def get(self, token: str) -> Optional[KeyRecord]:
with self._lock:
self._reload()
return self._keys.get(token)
def must_get(self, token: str) -> KeyRecord:
rec = self.get(token)
if not rec:
raise HTTPException(status_code=401, detail="Invalid token")
if not rec.enabled:
raise HTTPException(status_code=401, detail="Token disabled")
now = datetime.now(timezone.utc)
valid_to = self._parse_time(rec.valid_to)
if now > valid_to:
raise HTTPException(status_code=401, detail="Token expired")
return rec
_key_store = KeyStore(KEY_FILE)
# ----------------- MSSQL helpers -----------------
def _client_combined_id(client_id: int, keyrec: KeyRecord) -> str:
"""
Combined ID used in DB Memalarm.Client_No and XML __id: f"{client_id}{port}".
If port is empty, just client_id as string.
"""
return f"{client_id}{keyrec.port}" if keyrec.port else str(client_id)
def _get_db_connection():
"""
Open a new MSSQL connection to Patriot. Autocommit is enabled.
"""
if SQL_PORT:
server_part = f"{SQL_SERVER},{SQL_PORT}"
else:
server_part = SQL_SERVER
conn_str = (
f"DRIVER={{{{ {ODBC_DRIVER} }}}};"
f"SERVER={server_part};"
f"DATABASE={SQL_DATABASE};"
f"UID={SQL_USERNAME};"
f"PWD={SQL_PASSWORD};"
"Encrypt=no;"
"TrustServerCertificate=yes;"
)
# Note: curly braces around driver are doubled above to survive f-string formatting.
conn_str = conn_str.replace("{{ ", "{").replace(" }}", "}")
return pyodbc.connect(conn_str, autocommit=True)
def _split_loc2(loc2: str) -> tuple[str, str]:
loc2 = (loc2 or "").strip()
if not loc2:
return "", ""
parts = loc2.split(" ", 1)
if len(parts) == 1:
return parts[0], ""
return parts[0], parts[1]
# ------------------ TEMP STORAGE (pending_import state) -------------
CLIENT_STATE_FILE = Path(os.getenv("CLIENT_STATE_FILE", "/opt/patriot_api/client_state.json"))
def _load_client_state() -> dict:
if CLIENT_STATE_FILE.is_file():
try:
return json.loads(CLIENT_STATE_FILE.read_text(encoding="utf-8"))
except Exception as e:
logging.error("Failed to read client state file %s: %s", CLIENT_STATE_FILE, e)
return {}
return {}
def _save_client_state(state: dict):
try:
tmp = CLIENT_STATE_FILE.with_suffix(".tmp")
tmp.write_text(json.dumps(state, indent=2, ensure_ascii=False), encoding="utf-8")
os.replace(tmp, CLIENT_STATE_FILE)
except Exception as e:
logging.error("Failed to write client state file %s: %s", CLIENT_STATE_FILE, e)
def _clear_pending_import_by_combined_id(combined_id: str):
"""
Remove a client from pending_import state if present.
Called automatically whenever we detect the client in DB.
"""
state = _load_client_state()
if combined_id in state:
try:
del state[combined_id]
_save_client_state(state)
logging.info(
"Cleared pending_import state for Client_No=%s (detected in DB).",
combined_id,
)
except Exception as e:
logging.error(
"Failed to clear pending_import state for Client_No=%s: %s",
combined_id,
e,
)
def client_is_pending_import(client_id: int, keyrec: KeyRecord) -> bool:
"""
Return True if this client_id+port (__id / Client_No) is marked as pending_import.
Used to lock non-DB clients while their XML has been sent but not imported yet.
"""
combined_id = _client_combined_id(client_id, keyrec)
state = _load_client_state()
entry = state.get(combined_id)
return isinstance(entry, dict) and entry.get("status") == "pending_import"
def db_client_exists(client_id: int, keyrec: KeyRecord) -> bool:
"""
Check if a client exists in dbo.Memalarm.
Returns True if row exists, False otherwise (or on error).
If the client exists, we also clear any 'pending_import' state for this
client in the shared state file.
"""
combined_id = _client_combined_id(client_id, keyrec)
try:
conn = _get_db_connection()
except Exception as e:
logging.error("DB connect failed in db_client_exists for %s: %s", combined_id, e)
return False
try:
cur = conn.cursor()
cur.execute("SELECT 1 FROM dbo.Memalarm WHERE Client_No = ?", (combined_id,))
row = cur.fetchone()
exists = row is not None
# If the client is now in DB, clear any pending_import state
if exists:
_clear_pending_import_by_combined_id(combined_id)
return exists
except Exception as e:
logging.error("DB query failed in db_client_exists for Client_No=%s: %s", combined_id, e)
return False
finally:
try:
conn.close()
except Exception:
pass
# ----------------- API models -----------------
class ClientInfo(BaseModel):
# From API / client section
Name: str = Field(..., min_length=1, max_length=200)
Alias: Optional[str] = Field(None, max_length=200)
Location: str = Field(..., min_length=1, max_length=200)
# split fields used by the API → XML loc2 = "{area_code} {area}"
area_code: str = Field(..., min_length=1, max_length=20)
area: str = Field(..., min_length=1, max_length=200)
BusPhone: Optional[str] = Field(None, max_length=50)
Email: Optional[EmailStr] = None
OKPassword: Optional[str] = Field(None, max_length=100)
SpecRequest: Optional[str] = Field(None, max_length=1000)
# No-signal monitoring
NoSigsMon: str = Field("ActiveAny", max_length=50)
SinceDays: int = Field(1, ge=0)
SinceHrs: int = Field(0, ge=0)
SinceMins: int = Field(30, ge=0)
ResetNosigsIgnored: bool = True
ResetNosigsDays: int = Field(7, ge=0)
ResetNosigsHrs: int = Field(0, ge=0)
ResetNosigsMins: int = Field(0, ge=0)
InstallDateTime: Optional[str] = Field(
None, description="Installation date/time, e.g. '2023-02-20'"
)
PanelName: str = "Panel Type"
PanelSite: str = "Panel location"
KeypadLocation: Optional[str] = Field(None, max_length=200)
SPPage: Optional[str] = Field(None, max_length=2000)
NonMonitored: bool = False
DecommissionDate: Optional[str] = None
DecommissionReason: Optional[str] = None
class User(BaseModel):
User_Name: str = Field(..., min_length=1, max_length=120)
MobileNo: str = Field(..., min_length=6, max_length=40)
MobileNoOrder: int = Field(..., ge=1, le=999)
Email: Optional[EmailStr] = None
Type: str = Field("U", min_length=1, max_length=1)
UserNo: int = Field(..., ge=1, le=999)
Instructions: str | None = Field(None, max_length=500)
CallOrder: int = Field(0, ge=0, le=999)
@field_validator("MobileNo")
def phone_has_digit(cls, v: str):
if not any(ch.isdigit() for ch in v):
raise ValueError("MobileNo must contain at least one digit")
return v
class Zone(BaseModel):
ZoneNo: int = Field(..., ge=1, le=999)
ZoneText: str = Field(..., min_length=1, max_length=100)
class ClientCreate(BaseModel):
client_id: int = Field(..., ge=1)
info: ClientInfo
class ClientID(BaseModel):
client_id: int = Field(..., ge=1)
class ClientInfoPatch(BaseModel):
# all optional, for PATCH
Name: Optional[str] = Field(None, min_length=1, max_length=200)
Alias: Optional[str] = Field(None, max_length=200)
Location: Optional[str] = Field(None, min_length=1, max_length=200)
area_code: Optional[str] = Field(None, min_length=1, max_length=20)
area: Optional[str] = Field(None, min_length=1, max_length=200)
BusPhone: Optional[str] = Field(None, max_length=50)
Email: Optional[EmailStr] = None
OKPassword: Optional[str] = Field(None, max_length=100)
SpecRequest: Optional[str] = Field(None, max_length=1000)
NoSigsMon: Optional[str] = Field(None, max_length=50)
SinceDays: Optional[int] = Field(None, ge=0)
SinceHrs: Optional[int] = Field(None, ge=0)
SinceMins: Optional[int] = Field(None, ge=0)
ResetNosigsIgnored: Optional[bool] = None
ResetNosigsDays: Optional[int] = Field(None, ge=0)
ResetNosigsHrs: Optional[int] = Field(None, ge=0)
ResetNosigsMins: Optional[int] = Field(None, ge=0)
InstallDateTime: Optional[str] = Field(
None, description="Installation date/time, e.g. '2023-02-20'"
)
PanelName: Optional[str] = None
PanelSite: Optional[str] = None
KeypadLocation: Optional[str] = Field(None, max_length=200)
SPPage: Optional[str] = Field(None, max_length=2000)
class ClientPatch(BaseModel):
client_id: int = Field(..., ge=1)
info: Optional[ClientInfoPatch] = None
class ZoneCreate(BaseModel):
client_id: int = Field(..., ge=1)
zone: Zone
class ZonePatch(BaseModel):
client_id: int = Field(..., ge=1)
zone: Zone
class ZoneDelete(BaseModel):
client_id: int = Field(..., ge=1)
zone_no: int = Field(..., ge=1, le=999)
class UserCreate(BaseModel):
client_id: int = Field(..., ge=1)
user: User
class UserPatch(BaseModel):
client_id: int = Field(..., ge=1)
user: User
class UserDelete(BaseModel):
client_id: int = Field(..., ge=1)
user_no: int = Field(..., ge=1, le=999)
# ----------------- Auth dependency -----------------
def require_api_key(authorization: str = Header(..., alias="Authorization")) -> KeyRecord:
"""
Expect header:
Authorization: Bearer <token>
"""
parts = authorization.split()
if len(parts) != 2 or parts[0].lower() != "bearer":
raise HTTPException(status_code=401, detail="Invalid authorization header")
token = parts[1]
return _key_store.must_get(token)
# ----------------- In-memory data store for non-DB clients -----------------
# Only used for clients that are NOT yet in Patriot DB.
_store_lock = threading.Lock()
_store = {
"clients": {} # key -> {"info": {...}, "zones": {}, "users": {}}
}
def _store_key(client_id: int, keyrec: KeyRecord) -> str:
# per-key namespace, so same client_id can be reused between keys
return f"{keyrec.key_name}:{client_id}"
def _require_client(client_id: int, keyrec: KeyRecord) -> Dict:
"""
Return client from in-memory 'pending' store (for clients not in DB).
Does NOT touch DB. DB-aware logic happens at the endpoint level.
"""
skey = _store_key(client_id, keyrec)
cli = _store["clients"].get(skey)
if not cli:
raise HTTPException(
status_code=404,
detail="Client not found (may be a pending client, not in DB. Please try again later.)",
)
return cli
# ----------------- DB-level client helpers -----------------
def db_client_is_decommissioned(client_id: int, keyrec: KeyRecord) -> bool:
"""
Return True if the client exists in DB and is decommissioned / non-monitored.
We consider a client decommissioned if:
- NonMonitored = 1 OR
- DecommissionDate is not NULL
If the row is missing or DB error -> return False (treat as not decommissioned).
"""
combined_id = _client_combined_id(client_id, keyrec)
try:
conn = _get_db_connection()
except Exception as e:
logging.error("DB connect failed in db_client_is_decommissioned for %s: %s", combined_id, e)
return False
try:
cur = conn.cursor()
cur.execute(
"""
SELECT NonMonitored, DecommissionDate
FROM dbo.Memalarm
WHERE Client_No = ?
""",
(combined_id,),
)
row = cur.fetchone()
if not row:
return False
non_monitored = row.NonMonitored if hasattr(row, "NonMonitored") else 0
decomm_date = row.DecommissionDate if hasattr(row, "DecommissionDate") else None
if non_monitored == 1:
return True
if decomm_date is not None:
return True
return False
except Exception as e:
logging.error(
"DB query failed in db_client_is_decommissioned for Client_No=%s: %s",
combined_id,
e,
)
return False
finally:
try:
conn.close()
except Exception:
pass
def _update_client_in_db_from_data(client_id: int, keyrec: KeyRecord, data: dict) -> bool:
"""
Try to update an existing client in dbo.Memalarm from API data.
Behaviour:
* If Client_No does not exist in DB -> return False (caller can create XML instead).
* If exists:
- For each field present in 'data':
- If value is None or empty string -> we DO NOT touch the DB column.
- If value is non-empty -> we update the mapped DB column.
- Return True (even if no columns actually changed).
* On connection / SQL error -> log and return False (so caller can fall back to XML).
"""
combined_id = _client_combined_id(client_id, keyrec)
try:
conn = _get_db_connection()
except Exception as e:
logging.error("DB connect failed for client %s: %s", combined_id, e)
return False
try:
cur = conn.cursor()
cur.execute(
"""
SELECT
Client_No,
Alias,
Name,
Location,
loc2,
Bus_Phone,
Email,
OKPassword,
SpecRequest,
NOSIGS_MON,
Since_Days,
Since_Hrs,
Since_Mins,
Reset_Nosigs_Days,
Reset_Nosigs_Hrs,
Reset_Nosigs_Mins,
Reset_Nosigs_Ignored,
InstallDateTime,
Panel_Site,
KeypadLocation,
SP_Page,
PanelTypeId
FROM dbo.Memalarm
WHERE Client_No = ?
""",
(combined_id,),
)
row = cur.fetchone()
if not row:
logging.info(
'DB: Client_No "%s" not found in Memalarm (from client_id=%s); will use XML instead.',
combined_id,
client_id,
)
return False
updates: dict[str, object] = {}
def set_text(field_name: str, column: str):
"""
PATCH logic:
* field missing → ignore
* field present and null → clear (set to "")
* field present and "" → clear (set to "")
* field present and "text" → update
"""
# If field is missing in request, ignore.
if field_name not in data:
return
val = data[field_name]
# null means clear
if val is None:
updates[column] = ""
return
sval = str(val)
# empty string means clear
if sval.strip() == "":
updates[column] = ""
return
# normal update
updates[column] = sval
def set_int(field_name: str, column: str):
if field_name not in data:
return
val = data[field_name]
if val is None:
return
try:
ival = int(val)
except (ValueError, TypeError):
logging.warning("DB: field %s has non-int value %r, skipping", field_name, val)
return
updates[column] = ival
def set_bit(field_name: str, column: str):
if field_name not in data:
return
val = data[field_name]
if val is None:
return
b = 1 if bool(val) else 0
updates[column] = b
# Simple text fields
set_text("Name", "Name")
set_text("Alias", "Alias")
set_text("Location", "Location")
set_text("BusPhone", "Bus_Phone")
set_text("Email", "Email")
set_text("OKPassword", "OKPassword")
set_text("SpecRequest", "SpecRequest")
# NoSigsMon -> NOSIGS_MON as "1" or "0" (string)
if "NoSigsMon" in data:
raw = data["NoSigsMon"]
if raw is not None:
s = str(raw).strip().lower()
# Treat these as "on"
if s in ("1", "true", "yes", "y", "activeany", "active", "on"):
updates["NOSIGS_MON"] = "1"
else:
updates["NOSIGS_MON"] = "0"
# PanelName -> PanelTypeId (Ajax=58, Future=59)
if "PanelName" in data:
pname_raw = data["PanelName"]
if pname_raw is not None:
pname = str(pname_raw).strip().lower()
panel_map = {
"ajax": 58,
"futurehome": 59,
"Climax 4G": 60,
}
if pname in panel_map:
updates["PanelTypeId"] = panel_map[pname]
else:
logging.warning(
"Unknown PanelName %r for client_id=%s; not updating PanelTypeId",
pname_raw,
client_id,
)
set_text("PanelSite", "Panel_Site")
set_text("KeypadLocation", "KeypadLocation")
set_text("SPPage", "SP_Page")
# loc2 from area_code + area (merge with existing DB loc2 if only one is sent)
if "area_code" in data or "area" in data:
old_loc2 = getattr(row, "loc2", "") # may be None
old_ac, old_area = _split_loc2(old_loc2 or "")
new_ac = data.get("area_code", None)
new_area = data.get("area", None)
final_ac = str(new_ac).strip() if new_ac is not None else old_ac
final_area = str(new_area).strip() if new_area is not None else old_area
loc2_val = f"{final_ac} {final_area}".strip()
if loc2_val:
updates["loc2"] = loc2_val
# Integers
set_int("SinceDays", "Since_Days")
set_int("SinceHrs", "Since_Hrs")
set_int("SinceMins", "Since_Mins")
set_int("ResetNosigsDays", "Reset_Nosigs_Days")
set_int("ResetNosigsHrs", "Reset_Nosigs_Hrs")
set_int("ResetNosigsMins", "Reset_Nosigs_Mins")
# Boolean bit
set_bit("ResetNosigsIgnored", "Reset_Nosigs_Ignored")
# InstallDateTime (string, e.g. "2023-02-20")
if "InstallDateTime" in data:
val = data["InstallDateTime"]
if val:
updates["InstallDateTime"] = str(val).strip()
if not updates:
logging.info(
"DB: client %s (Client_No=%s) exists but no DB columns to update from API.",
client_id,
combined_id,
)
return True # exists, nothing to change
set_clause = ", ".join(f"{col} = ?" for col in updates.keys())
params = list(updates.values()) + [combined_id]
sql = f"UPDATE dbo.Memalarm SET {set_clause} WHERE Client_No = ?"
cur.execute(sql, params)
logging.info(
"DB: updated client %s (Client_No=%s) columns: %s",
client_id,
combined_id,
", ".join(updates.keys()),
)
return True
except Exception as e:
logging.error(
"DB update failed for client_id=%s (Client_No=%s): %s",
client_id,
combined_id,
e,
)
return False
finally:
try:
conn.close()
except Exception:
pass
def update_client_in_db_from_full(client_id: int, keyrec: KeyRecord, info: ClientInfo) -> bool:
"""
Wrapper for PUT /clients: full payload from ClientInfo, but still only
non-empty fields are written to DB.
Returns True if the DB client exists (even if no columns changed).
"""
data = info.model_dump()
return _update_client_in_db_from_data(client_id, keyrec, data)
def update_client_in_db_from_patch(
client_id: int,
keyrec: KeyRecord,
info_patch: ClientInfoPatch,
) -> bool:
"""
Wrapper for PATCH /clients: only fields provided in the patch payload
are considered for DB update.
Returns True if the DB client exists (even if no columns changed).
"""
data = info_patch.model_dump(exclude_unset=True)
if not data:
return False
return _update_client_in_db_from_data(client_id, keyrec, data)
def _get_client_info_from_db(client_id: int, keyrec: KeyRecord) -> Optional[ClientInfo]:
"""
Try to build a ClientInfo object from dbo.Memalarm for this client.
Returns:
- ClientInfo if DB row exists
- None if no such Client_No or DB error
"""
combined_id = _client_combined_id(client_id, keyrec)
try:
conn = _get_db_connection()
except Exception as e:
logging.error("DB connect failed in _get_client_info_from_db for %s: %s", combined_id, e)
return None
try:
cur = conn.cursor()
cur.execute(
"""
SELECT
Client_No,
Alias,
Name,
Location,
loc2,
Bus_Phone,
Email,
OKPassword,
SpecRequest,
NOSIGS_MON,
Since_Days,
Since_Hrs,
Since_Mins,
Reset_Nosigs_Days,
Reset_Nosigs_Hrs,
Reset_Nosigs_Mins,
Reset_Nosigs_Ignored,
InstallDateTime,
Panel_Site,
KeypadLocation,
SP_Page,
PanelTypeId,
NonMonitored,
DecommissionDate,
DecommissionReason
FROM dbo.Memalarm
WHERE Client_No = ?
""",
(combined_id,),
)
row = cur.fetchone()
if not row:
logging.info("DB: _get_client_info_from_db: Client_No %r not found.", combined_id)
return None
# helper to safely make strings
def s(val) -> str:
if val is None:
return ""
return str(val)
# Basic fields (safe for any type)
name = s(getattr(row, "Name", None)).strip() or "Unnamed client"
alias_val = getattr(row, "Alias", None)
alias = s(alias_val).strip() or None
location = s(getattr(row, "Location", None)).strip() or "Unknown location"
loc2_val = getattr(row, "loc2", None)
area_code, area = _split_loc2(s(loc2_val))
bus_phone_val = getattr(row, "Bus_Phone", None)
bus_phone = s(bus_phone_val).strip() or None
email_db = s(getattr(row, "Email", None)).strip()
email: Optional[str] = email_db if email_db else None
okpassword_val = getattr(row, "OKPassword", None)
okpassword = s(okpassword_val).strip() or None
specrequest_val = getattr(row, "SpecRequest", None)
specrequest = s(specrequest_val).strip() or None
# NOSIGS_MON mapping: treat as string "1" / "0" regardless of underlying type
nosigs_val = getattr(row, "NOSIGS_MON", None)
raw_nosigs = s(nosigs_val).strip()
if raw_nosigs == "1":
nosigsmon = "ActiveAny"
else:
nosigsmon = "Disabled"
# Integers with defaults
since_days = getattr(row, "Since_Days", None)
since_days = int(since_days) if since_days is not None else 1
since_hrs = getattr(row, "Since_Hrs", None)
since_hrs = int(since_hrs) if since_hrs is not None else 0
since_mins = getattr(row, "Since_Mins", None)
since_mins = int(since_mins) if since_mins is not None else 30
reset_days = getattr(row, "Reset_Nosigs_Days", None)
reset_days = int(reset_days) if reset_days is not None else 7
reset_hrs = getattr(row, "Reset_Nosigs_Hrs", None)
reset_hrs = int(reset_hrs) if reset_hrs is not None else 0
reset_mins = getattr(row, "Reset_Nosigs_Mins", None)
reset_mins = int(reset_mins) if reset_mins is not None else 0
reset_ignored_val = getattr(row, "Reset_Nosigs_Ignored", None)
if reset_ignored_val is None:
reset_ignored = True
else:
reset_ignored = bool(reset_ignored_val)
# InstallDateTime
install_raw = getattr(row, "InstallDateTime", None)
if install_raw is None:
install_str: Optional[str] = None
else:
# try datetime/date -> iso string, else just str()
try:
install_str = install_raw.isoformat()
except AttributeError:
install_str = str(install_raw)
panel_site = s(getattr(row, "Panel_Site", None)).strip() or "Panel location"
keypad_location_val = getattr(row, "KeypadLocation", None)
keypad_location = s(keypad_location_val).strip() or None
sp_page_val = getattr(row, "SP_Page", None)
sp_page = s(sp_page_val).strip() or None
# PanelTypeId -> PanelName
ptype = getattr(row, "PanelTypeId", None)
try:
ptype_int = int(ptype) if ptype is not None else None
except (ValueError, TypeError):
ptype_int = None
if ptype_int == 58:
panel_name = "Ajax"
elif ptype_int == 59:
panel_name = "Futurehome"
elif ptype_int == 60:
panel_name = "Climax 4G"
else:
panel_name = "Panel Type"
# Decommission info
non_monitored_val = getattr(row, "NonMonitored", 0)
try:
non_monitored = bool(int(non_monitored_val))
except (ValueError, TypeError):
non_monitored = False
raw_decomm_date = getattr(row, "DecommissionDate", None)
if raw_decomm_date is None:
decomm_date_str: Optional[str] = None
else:
try:
decomm_date_str = raw_decomm_date.isoformat()
except AttributeError:
decomm_date_str = str(raw_decomm_date)
raw_reason = getattr(row, "DecommissionReason", None)
if raw_reason is None:
decomm_reason: Optional[str] = None
else:
decomm_reason = s(raw_reason).strip() or None
info_data = {
"Name": name,
"Alias": alias,
"Location": location,
"area_code": area_code or "0000",
"area": area or "Unknown",
"BusPhone": bus_phone,
"Email": email,
"OKPassword": okpassword,
"SpecRequest": specrequest,
"NoSigsMon": nosigsmon,
"SinceDays": since_days,
"SinceHrs": since_hrs,
"SinceMins": since_mins,
"ResetNosigsIgnored": reset_ignored,
"ResetNosigsDays": reset_days,
"ResetNosigsHrs": reset_hrs,
"ResetNosigsMins": reset_mins,
"InstallDateTime": install_str,
"PanelName": panel_name,
"PanelSite": panel_site,
"KeypadLocation": keypad_location,
"SPPage": sp_page,
"NonMonitored": non_monitored,
"DecommissionDate": decomm_date_str,
"DecommissionReason": decomm_reason,
}
try:
info = ClientInfo(**info_data)
except Exception as e:
logging.error(
"DB row for Client_No=%s could not be mapped to ClientInfo: %s; data=%r",
combined_id,
e,
info_data,
)
return None
return info
except Exception as e:
logging.error(
"DB query failed in _get_client_info_from_db for Client_No=%s: %s",
combined_id,
e,
)
return None
finally:
try:
conn.close()
except Exception:
pass
# ----------------- XML writer (for non-DB clients only) -----------------
def _ensure_dir(path: str):
os.makedirs(path, exist_ok=True)
def _xml_path_for(client_id: int, keyrec: KeyRecord) -> str:
client_base = os.path.join(XML_DIR, _safe_folder(keyrec.key_name))
_ensure_dir(client_base)
return os.path.join(client_base, f"{client_id}.xml")
def _bool_text(val: bool) -> str:
return "True" if val else "False"
def write_client_xml(client_id: int, keyrec: KeyRecord):
"""
Serialize client + zones + users to XML for downstream import.
Only used for clients that do NOT yet exist in Patriot DB.
"""
from xml.etree.ElementTree import Element, SubElement, ElementTree
skey = _store_key(client_id, keyrec)
cli = _store["clients"].get(skey)
if not cli:
return
info = ClientInfo(**cli.get("info", {}))
root = Element("Clients")
row = SubElement(root, "Row")
# __id = client_id + port, e.g. "123456789BASE11"
combined_id = _client_combined_id(client_id, keyrec)
SubElement(row, "__id").text = combined_id
# ---- Client fields ----
SubElement(row, "Name").text = info.Name
SubElement(row, "Alias").text = info.Alias or ""
SubElement(row, "Location").text = info.Location
SubElement(row, "loc2").text = f"{info.area_code} {info.area}".strip()
SubElement(row, "BusPhone").text = info.BusPhone or ""
SubElement(row, "Email").text = info.Email or ""
SubElement(row, "OKPassword").text = info.OKPassword or ""
SubElement(row, "SpecRequest").text = info.SpecRequest or ""
SubElement(row, "NoSigsMon").text = info.NoSigsMon
SubElement(row, "SinceDays").text = str(info.SinceDays)
SubElement(row, "SinceHrs").text = str(info.SinceHrs)
SubElement(row, "SinceMins").text = str(info.SinceMins)
SubElement(row, "ResetNosigsIgnored").text = _bool_text(info.ResetNosigsIgnored)
SubElement(row, "ResetNosigsDays").text = str(info.ResetNosigsDays)
SubElement(row, "ResetNosigsHrs").text = str(info.ResetNosigsHrs)
SubElement(row, "ResetNosigsMins").text = str(info.ResetNosigsMins)
SubElement(row, "InstallDateTime").text = info.InstallDateTime or ""
SubElement(row, "PanelName").text = info.PanelName
SubElement(row, "PanelSite").text = info.PanelSite
SubElement(row, "KeypadLocation").text = info.KeypadLocation or ""
SubElement(row, "SPPage").text = info.SPPage or ""
# Per-key fixed fields
SubElement(row, "Installer").text = keyrec.installer_name or ""
SubElement(row, "UseGlobCallOuts").text = _bool_text(keyrec.use_glob_callouts)
SubElement(row, "ShowOnCallOuts").text = _bool_text(keyrec.show_on_callouts)
SubElement(row, "GlobCallOuts").text = keyrec.glob_callouts or ""
SubElement(row, "UseGlobCallOuts2").text = _bool_text(keyrec.use_glob_callouts2)
SubElement(row, "ShowOnCallOuts2").text = _bool_text(keyrec.show_on_callouts2)
SubElement(row, "GlobCallOuts2").text = keyrec.glob_callouts2 or ""
SubElement(row, "AltLookup").text = _bool_text(keyrec.alt_lookup)
SubElement(row, "AltAlarmNo").text = keyrec.alt_alarm_no or ""
SubElement(row, "ConvertType").text = keyrec.convert_type or "None"
SubElement(row, "SIGINTERPRET").text = keyrec.siginterpret or "SIADecimal"
# ---- ClientGroupings ----
cgs_el = SubElement(row, "ClientGroupings")
if keyrec.client_groupings:
for cg in keyrec.client_groupings:
cg_el = SubElement(cgs_el, "ClientGrouping")
SubElement(cg_el, "Description").text = cg.description
SubElement(cg_el, "GroupingTypeDescription").text = cg.grouping_type_description
SubElement(cg_el, "GroupingAllowMultiple").text = _bool_text(
cg.grouping_allow_multiple
)
else:
cg_el = SubElement(cgs_el, "ClientGrouping")
SubElement(cg_el, "Description").text = ""
SubElement(cg_el, "GroupingTypeDescription").text = ""
SubElement(cg_el, "GroupingAllowMultiple").text = _bool_text(True)
# ---------- ZONES ----------
zones_dict = cli.get("zones", {})
if zones_dict:
zones_el = SubElement(row, "Zones")
# zones_dict keys are "ZoneNo" as string, values like {"ZoneNo": 1, "ZoneText": "..."}
for key_zone_no, zdata in sorted(zones_dict.items(), key=lambda kv: int(kv[0])):
zrow = SubElement(zones_el, "Zone")
zone_no = zdata.get("ZoneNo", int(key_zone_no))
SubElement(zrow, "Zone_No").text = str(zone_no)
SubElement(zrow, "Zone_area").text = zdata.get("ZoneText", "")
# ---------- USERS (including default 199) ----------
raw_users = dict(cli.get("users", {})) # key: "UserNo" -> dict
# Inject default 199 from installer if not already present
if "199" not in raw_users:
u199 = {
"UserNo": 199,
"User_Name": keyrec.installer_name or "",
"MobileNo": "",
"MobileNoOrder": 1,
"Email": keyrec.installer_email or "",
"Type": "N",
"Instructions": "",
"CallOrder": 0,
}
raw_users["199"] = u199
if raw_users:
users_el = SubElement(row, "Users")
for key_user_no, udata in sorted(raw_users.items(), key=lambda kv: int(kv[0])):
urow = SubElement(users_el, "User")
user_no = udata.get("UserNo", int(key_user_no))
SubElement(urow, "UserNo").text = str(user_no)
SubElement(urow, "User_Name").text = udata.get("User_Name", "")
SubElement(urow, "MobileNo").text = udata.get("MobileNo", "")
SubElement(urow, "MobileNoOrder").text = str(udata.get("MobileNoOrder", 1))
SubElement(urow, "Email").text = udata.get("Email", "")
SubElement(urow, "Type").text = udata.get("Type", "U")
SubElement(urow, "Instructions").text = udata.get("Instructions", "")
SubElement(urow, "CallOrder").text = str(udata.get("CallOrder", 0))
# ---- write file ----
xml_path = _xml_path_for(client_id, keyrec)
_ensure_dir(os.path.dirname(xml_path))
ElementTree(root).write(xml_path, encoding="utf-8", xml_declaration=True)
logging.info("Wrote XML for client_id=%s key=%s to %s", client_id, keyrec.key_name, xml_path)
def _purge_cached_client(client_id: int, keyrec: KeyRecord):
"""
Remove stale non-DB client data from memory and delete its XML file.
After Patriot imports a client, all operations must go through DB.
"""
skey = _store_key(client_id, keyrec)
with _store_lock:
if skey in _store["clients"]:
logging.info("Purging cached XML-only client %s for key %s", client_id, keyrec.key_name)
del _store["clients"][skey]
# Remove XML file if present
try:
xml_path = _xml_path_for(client_id, keyrec)
if os.path.exists(xml_path):
os.remove(xml_path)
logging.info("Deleted stale XML file for client %s at %s", client_id, xml_path)
except Exception as e:
logging.error("Failed to remove stale XML file for client %s: %s", client_id, e)
# ----------------- FastAPI app -----------------
app = FastAPI(title="Patriot client API", version="1.0.0")
@app.middleware("http")
async def log_requests(request: Request, call_next):
start = time()
body_bytes = await request.body()
body_text = ""
if body_bytes:
try:
body_text = body_bytes.decode("utf-8")
except Exception as e:
body_text = f"<<un-decodable body: {e}>>"
response = await call_next(request)
duration = time() - start
logging.info(
"%s %s - %s - %.3fs - body=%s",
request.method,
request.url.path,
response.status_code,
duration,
body_text,
)
return response
# ----------------- Client endpoints -----------------
@app.put("/clients", status_code=200)
def upsert_client(
payload: ClientCreate,
keyrec: KeyRecord = Depends(require_api_key),
response: Response = None,
):
"""
Create or replace full client info.
Behaviour:
- If client exists in Patriot DB (Memalarm.Client_No = client_id+port):
* Update DB fields from API only (no XML, no caching).
- If client does NOT exist in DB (or DB unreachable):
* Keep info in in-memory store and write XML so Patriot can import it.
"""
client_id = payload.client_id
logging.info(
"AUDIT upsert_client key=%s client_id=%s payload=%s",
keyrec.key_name,
client_id,
payload.model_dump(),
)
# First: DB client?
if db_client_exists(client_id, keyrec):
_purge_cached_client(client_id, keyrec)
if db_client_is_decommissioned(client_id, keyrec):
raise HTTPException(
status_code=403,
detail="Client is decommissioned and is read-only",
)
try:
update_client_in_db_from_full(client_id, keyrec, payload.info)
except Exception as e:
logging.error("DB sync error on upsert_client client_id=%s: %s", client_id, e)
raise HTTPException(status_code=500, detail="Failed to update client in DB")
if response is not None:
response.status_code = status.HTTP_200_OK
return {"client_id": client_id}
# Non-DB client but state says pending_import -> lock
if client_is_pending_import(client_id, keyrec):
raise HTTPException(
status_code=409,
detail="Client is pending import into Patriot and is currently locked",
)
# Not in DB & not pending: keep in memory (RAM only) and write XML for Patriot import
info = payload.info.model_dump()
skey = _store_key(client_id, keyrec)
with _store_lock:
existed = skey in _store["clients"]
if existed:
cli = _store["clients"][skey]
cli["info"] = info
else:
cli = {"info": info, "zones": {}, "users": {}}
_store["clients"][skey] = cli
if response is not None:
response.status_code = status.HTTP_200_OK if existed else status.HTTP_201_CREATED
write_client_xml(client_id, keyrec)
return {"client_id": client_id}
@app.patch("/clients", status_code=200)
def patch_client(
payload: ClientPatch,
keyrec: KeyRecord = Depends(require_api_key),
):
"""
Partial update of client info.
Behaviour:
- If client exists in DB:
* Only update DB columns for fields present in the patch.
* No XML, no in-memory cache.
- If client does NOT exist in DB:
* Merge into pending in-memory client and rewrite XML.
"""
client_id = payload.client_id
logging.info(
"AUDIT patch_client key=%s client_id=%s payload=%s",
keyrec.key_name,
client_id,
payload.model_dump(),
)
# DB client -> only DB
if db_client_exists(client_id, keyrec):
_purge_cached_client(client_id, keyrec)
if db_client_is_decommissioned(client_id, keyrec):
raise HTTPException(
status_code=403,
detail="Client is decommissioned and is read-only",
)
if payload.info is not None:
try:
update_client_in_db_from_patch(client_id, keyrec, payload.info)
except Exception as e:
logging.error("DB sync error on patch_client client_id=%s: %s", client_id, e)
raise HTTPException(status_code=500, detail="Failed to update client in DB")
return {"client_id": client_id}
# Non-DB client but pending_import -> lock
if client_is_pending_import(client_id, keyrec):
raise HTTPException(
status_code=409,
detail="Client is pending import into Patriot and is currently locked",
)
# Non-DB client -> pending store + XML
if payload.info is None:
raise HTTPException(status_code=400, detail="No info to patch for non-DB client")
with _store_lock:
cli = _require_client(client_id, keyrec)
current_info = cli.get("info", {})
base = ClientInfo(**current_info).model_dump()
updates = payload.info.model_dump(exclude_unset=True)
merged = {**base, **updates}
merged_valid = ClientInfo(**merged).model_dump()
cli["info"] = merged_valid
write_client_xml(client_id, keyrec)
return {"client_id": client_id, **cli}
@app.get("/clients")
def get_client(
x_client_id: int = Header(..., alias="X-Client-Id"),
keyrec: KeyRecord = Depends(require_api_key),
):
"""
Get client info.
Behaviour:
- If client exists in DB, read from DB and return.
- Else, read from in-memory pending store (non-DB client).
"""
client_id = x_client_id
logging.info("AUDIT get_client key=%s client_id=%s", keyrec.key_name, client_id)
if db_client_exists(client_id, keyrec):
_purge_cached_client(client_id, keyrec)
info = _get_client_info_from_db(client_id, keyrec)
if info is None:
raise HTTPException(status_code=500, detail="Failed to load client from DB")
return {"client_id": client_id, "info": info.model_dump()}
with _store_lock:
cli = _require_client(client_id, keyrec)
# Optionally expose pending status here if you want:
body = {"client_id": client_id, **cli}
if client_is_pending_import(client_id, keyrec):
body["import_status"] = "pending_import"
return body
@app.post("/clients/get", status_code=200)
def get_client_body(payload: ClientID, keyrec: KeyRecord = Depends(require_api_key)):
client_id = payload.client_id
logging.info("AUDIT get_client key=%s client_id=%s", keyrec.key_name, client_id)
if db_client_exists(client_id, keyrec):
info = _get_client_info_from_db(client_id, keyrec)
if info is None:
raise HTTPException(status_code=500, detail="Failed to load client from DB")
return {"client_id": client_id, "info": info.model_dump(), "zones": {}, "users": {}}
with _store_lock:
cli = _require_client(client_id, keyrec)
body = {"client_id": client_id, **cli}
if client_is_pending_import(client_id, keyrec):
body["import_status"] = "pending_import"
return body
@app.delete("/clients", status_code=204)
def delete_client(
payload: ClientID,
keyrec: KeyRecord = Depends(require_api_key),
):
"""
Soft-delete a client.
DB clients:
- Do NOT delete row.
- Set:
NonMonitored = 1
DecommissionDate = current date/time (GETDATE())
DecommissionReason = "Decommissioned by API using key <key_name>"
Non-DB clients:
- Remove from in-memory store
- Delete XML file (if any)
"""
client_id = payload.client_id
logging.info(
"AUDIT delete_client key=%s client_id=%s",
keyrec.key_name,
client_id,
)
combined_id = _client_combined_id(client_id, keyrec)
# If client exists in DB → soft-delete via flags
if db_client_exists(client_id, keyrec):
reason = f"Decommissioned by API using key {keyrec.key_name}"
try:
conn = _get_db_connection()
cur = conn.cursor()
cur.execute(
"""
UPDATE dbo.Memalarm
SET NonMonitored = 1,
DecommissionDate = GETDATE(),
DecommissionReason = ?
WHERE Client_No = ?
""",
(reason, combined_id),
)
logging.info(
"DB: soft-decommissioned client Client_No=%s (client_id=%s) reason=%r",
combined_id,
client_id,
reason,
)
except Exception as e:
logging.error(
"DB: failed to soft-delete client_id=%s (Client_No=%s): %s",
client_id,
combined_id,
e,
)
raise HTTPException(status_code=500, detail="Failed to decommission client in DB")
finally:
try:
conn.close()
except Exception:
pass
return
# Non-DB client but pending_import -> lock deletion
if client_is_pending_import(client_id, keyrec):
raise HTTPException(
status_code=409,
detail="Client is pending import into Patriot and cannot be deleted yet",
)
# Non-DB client: remove pending client + XML
skey = _store_key(client_id, keyrec)
with _store_lock:
if skey not in _store["clients"]:
# Nothing in DB, nothing in pending store
raise HTTPException(status_code=404, detail="Client not found")
del _store["clients"][skey]
# Try to remove XML file if it exists
try:
xml_path = _xml_path_for(client_id, keyrec)
if os.path.exists(xml_path):
os.remove(xml_path)
logging.info(
"Deleted XML for non-DB client client_id=%s key=%s at %s",
client_id,
keyrec.key_name,
xml_path,
)
except Exception as e:
logging.warning(
"Failed to delete XML for non-DB client client_id=%s: %s",
client_id,
e,
)
return
# ----------------- Zones -----------------
@app.post("/zones", status_code=201)
def create_zone(
payload: ZoneCreate,
keyrec: KeyRecord = Depends(require_api_key),
):
client_id = payload.client_id
zone = payload.zone
logging.info(
"AUDIT create_zone key=%s client_id=%s zone_no=%s",
keyrec.key_name,
client_id,
zone.ZoneNo,
)
# DB client: create directly in dbo.MZone, never XML
if db_client_exists(client_id, keyrec):
_purge_cached_client(client_id, keyrec)
if db_client_is_decommissioned(client_id, keyrec):
raise HTTPException(
status_code=403,
detail="Client is decommissioned and zones are read-only",
)
combined_id = _client_combined_id(client_id, keyrec)
try:
conn = _get_db_connection()
cur = conn.cursor()
# Check if zone already exists
cur.execute(
"SELECT 1 FROM dbo.MZone WHERE Client_No = ? AND Zone_No = ?",
(combined_id, zone.ZoneNo),
)
if cur.fetchone():
raise HTTPException(status_code=409, detail="Zone already exists")
# Insert new zone; ModuleNo set to 0 by default, Zone_area from ZoneText
cur.execute(
"""
INSERT INTO dbo.Mzone (Client_No, ModuleNo, Zone_No, Zone_area)
VALUES (?, ?, ?, ?)
""",
(combined_id, 0, zone.ZoneNo, zone.ZoneText),
)
logging.info(
"DB: created zone Zone_No=%s for Client_No=%s",
zone.ZoneNo,
combined_id,
)
except HTTPException:
raise
except Exception as e:
logging.error(
"DB: failed to create zone Zone_No=%s for client_id=%s (Client_No=%s): %s",
zone.ZoneNo,
client_id,
combined_id,
e,
)
raise HTTPException(status_code=500, detail="Failed to create zone in DB")
finally:
try:
conn.close()
except Exception:
pass
return {"client_id": client_id, "zone": zone}
# Non-DB client but pending_import -> lock
if client_is_pending_import(client_id, keyrec):
raise HTTPException(
status_code=409,
detail="Client is pending import into Patriot and is currently locked",
)
# Non-DB client: store in JSON + XML
with _store_lock:
cli = _require_client(client_id, keyrec)
zones = cli.setdefault("zones", {})
if str(zone.ZoneNo) in zones:
raise HTTPException(status_code=409, detail="Zone already exists")
zones[str(zone.ZoneNo)] = zone.model_dump()
write_client_xml(client_id, keyrec)
return {"client_id": client_id, "zone": zone}
@app.patch("/zones", status_code=200)
def patch_zone(
payload: ZonePatch,
keyrec: KeyRecord = Depends(require_api_key),
):
client_id = payload.client_id
zone = payload.zone
zone_no = zone.ZoneNo
logging.info(
"AUDIT patch_zone key=%s client_id=%s zone_no=%s",
keyrec.key_name,
client_id,
zone_no,
)
# DB client: update dbo.MZone
if db_client_exists(client_id, keyrec):
_purge_cached_client(client_id, keyrec)
if db_client_is_decommissioned(client_id, keyrec):
raise HTTPException(
status_code=403,
detail="Client is decommissioned and zones are read-only",
)
combined_id = _client_combined_id(client_id, keyrec)
try:
conn = _get_db_connection()
cur = conn.cursor()
# Ensure zone exists
cur.execute(
"SELECT 1 FROM dbo.MZone WHERE Client_No = ? AND Zone_No = ?",
(combined_id, zone_no),
)
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Zone not found")
# Update Zone_area text
cur.execute(
"UPDATE dbo.Mzone SET Zone_area = ? WHERE Client_No = ? AND Zone_No = ?",
(zone.ZoneText, combined_id, zone_no),
)
logging.info(
"DB: updated zone Zone_No=%s for Client_No=%s",
zone_no,
combined_id,
)
except HTTPException:
raise
except Exception as e:
logging.error(
"DB: failed to update zone Zone_No=%s for client_id=%s (Client_No=%s): %s",
zone_no,
client_id,
combined_id,
e,
)
raise HTTPException(status_code=500, detail="Failed to update zone in DB")
finally:
try:
conn.close()
except Exception:
pass
return {"client_id": client_id, "zone_no": zone_no, "zone": zone}
# Non-DB client but pending_import -> lock
if client_is_pending_import(client_id, keyrec):
raise HTTPException(
status_code=409,
detail="Client is pending import into Patriot and is currently locked",
)
# Non-DB client: operate on in-memory + XML
with _store_lock:
cli = _require_client(client_id, keyrec)
zones = cli.setdefault("zones", {})
if str(zone_no) not in zones:
raise HTTPException(status_code=404, detail="Zone not found")
zones[str(zone_no)] = zone.model_dump()
write_client_xml(client_id, keyrec)
return {"client_id": client_id, "zone_no": zone_no, "zone": zone}
@app.delete("/zones", status_code=204)
def delete_zone(
payload: ZoneDelete,
keyrec: KeyRecord = Depends(require_api_key),
):
client_id = payload.client_id
zone_no = payload.zone_no
logging.info(
"AUDIT delete_zone key=%s client_id=%s zone_no=%s",
keyrec.key_name,
client_id,
zone_no,
)
# DB client: delete from dbo.MZone
if db_client_exists(client_id, keyrec):
if db_client_is_decommissioned(client_id, keyrec):
raise HTTPException(
status_code=403,
detail="Client is decommissioned and zones are read-only",
)
combined_id = _client_combined_id(client_id, keyrec)
try:
conn = _get_db_connection()
cur = conn.cursor()
cur.execute(
"SELECT 1 FROM dbo.MZone WHERE Client_No = ? AND Zone_No = ?",
(combined_id, zone_no),
)
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Zone not found")
cur.execute(
"DELETE FROM dbo.MZone WHERE Client_No = ? AND Zone_No = ?",
(combined_id, zone_no),
)
logging.info(
"DB: deleted zone Zone_No=%s for Client_No=%s",
zone_no,
combined_id,
)
except HTTPException:
raise
except Exception as e:
logging.error(
"DB: failed to delete zone Zone_No=%s for client_id=%s (Client_No=%s): %s",
zone_no,
client_id,
combined_id,
e,
)
raise HTTPException(status_code=500, detail="Failed to delete zone in DB")
finally:
try:
conn.close()
except Exception:
pass
return
# Non-DB client but pending_import -> lock
if client_is_pending_import(client_id, keyrec):
raise HTTPException(
status_code=409,
detail="Client is pending import into Patriot and is currently locked",
)
# Non-DB client: operate on in-memory + XML
with _store_lock:
cli = _require_client(client_id, keyrec)
zones = cli.setdefault("zones", {})
if str(zone_no) not in zones:
raise HTTPException(status_code=404, detail="Zone not found")
del zones[str(zone_no)]
write_client_xml(client_id, keyrec)
return
@app.get("/zones", status_code=200)
def get_zones(
x_client_id: int = Header(..., alias="X-Client-Id"),
keyrec: KeyRecord = Depends(require_api_key),
):
client_id = x_client_id
logging.info("AUDIT get_zones key=%s client_id=%s", keyrec.key_name, client_id)
# DB client → return zones from dbo.Mzone
if db_client_exists(client_id, keyrec):
_purge_cached_client(client_id, keyrec)
combined_id = _client_combined_id(client_id, keyrec)
try:
conn = _get_db_connection()
cur = conn.cursor()
cur.execute(
"""
SELECT Zone_No, Zone_area
FROM dbo.Mzone
WHERE Client_No = ?
ORDER BY Zone_No
""",
(combined_id,),
)
rows = cur.fetchall()
zones = [
{"ZoneNo": r.Zone_No, "ZoneText": r.Zone_area or ""}
for r in rows
]
return {"client_id": client_id, "zones": zones}
except Exception as e:
logging.error(
"DB: failed to fetch zones for client_id=%s (Client_No=%s): %s",
client_id, combined_id, e
)
raise HTTPException(status_code=500, detail="Failed to load zones from DB")
finally:
try:
conn.close()
except Exception:
pass
# Non-DB client → return from in-memory store
with _store_lock:
cli = _require_client(client_id, keyrec)
zones = cli.get("zones", {})
zone_list = [
{"ZoneNo": int(zno), "ZoneText": zdata.get("ZoneText", "")}
for zno, zdata in zones.items()
]
return {"client_id": client_id, "zones": zone_list}
# ----------------- Users -----------------
# User types that must NEVER be modified or deleted
RESTRICTED_USER_TYPES = {"N"}
@app.post("/users", status_code=201)
def create_user(
payload: UserCreate,
keyrec: KeyRecord = Depends(require_api_key),
):
client_id = payload.client_id
user = payload.user
logging.info(
"AUDIT create_user key=%s client_id=%s user_no=%s",
keyrec.key_name,
client_id,
user.UserNo,
)
# DB client: create user in dbo.Muser + dbo.UserToClient
if db_client_exists(client_id, keyrec):
_purge_cached_client(client_id, keyrec)
if db_client_is_decommissioned(client_id, keyrec):
raise HTTPException(
status_code=403,
detail="Client is decommissioned and users are read-only",
)
combined_id = _client_combined_id(client_id, keyrec)
try:
conn = _get_db_connection()
cur = conn.cursor()
# Check if this user number is already linked to this client
cur.execute(
"SELECT 1 FROM dbo.UserToClient WHERE ClientNo = ? AND UserNo = ?",
(combined_id, user.UserNo),
)
if cur.fetchone():
raise HTTPException(status_code=409, detail="User already exists")
# Insert into Muser
mobile = user.MobileNo
phone_type = 1 # tinyint, pick a sensible code for "mobile"
notes = user.Instructions or ""
# Use OUTPUT INSERTED.UserId to get the new UserId in one step
cur.execute(
"""
INSERT INTO dbo.Muser (User_Name, PhoneNo3, PhoneNo3Type, Email, Type, Notes)
OUTPUT INSERTED.UserId
VALUES (?, ?, ?, ?, ?, ?)
""",
(user.User_Name, mobile, phone_type, user.Email, user.Type, notes),
)
row = cur.fetchone()
if not row or row[0] is None:
raise Exception("Failed to retrieve inserted UserId")
user_id = int(row[0])
# Link user to client
cur.execute(
"""
INSERT INTO dbo.UserToClient (UserId, ClientNo, UserNo, CallOrder, Instructions)
VALUES (?, ?, ?, ?, ?)
""",
(user_id, combined_id, user.UserNo, user.CallOrder, notes),
)
logging.info(
"DB: created user UserId=%s UserNo=%s for Client_No=%s",
user_id,
user.UserNo,
combined_id,
)
except HTTPException:
raise
except Exception as e:
logging.error(
"DB: failed to create user UserNo=%s for client_id=%s (Client_No=%s): %s",
user.UserNo,
client_id,
combined_id,
e,
)
raise HTTPException(status_code=500, detail="Failed to create user in DB")
finally:
try:
conn.close()
except Exception:
pass
return {"client_id": client_id, "user": user}
# Non-DB client but pending_import -> lock
if client_is_pending_import(client_id, keyrec):
raise HTTPException(
status_code=409,
detail="Client is pending import into Patriot and is currently locked",
)
# Non-DB client: store in in-memory + XML
with _store_lock:
cli = _require_client(client_id, keyrec)
users = cli.setdefault("users", {})
if str(user.UserNo) in users:
raise HTTPException(status_code=409, detail="User already exists")
users[str(user.UserNo)] = user.model_dump()
write_client_xml(client_id, keyrec)
return {"client_id": client_id, "user": user}
@app.patch("/users", status_code=200)
def patch_user(
payload: UserPatch,
keyrec: KeyRecord = Depends(require_api_key),
):
client_id = payload.client_id
user = payload.user
user_no = user.UserNo
logging.info(
"AUDIT patch_user key=%s client_id=%s user_no=%s",
keyrec.key_name,
client_id,
user_no,
)
# DB client: update dbo.Muser + dbo.UserToClient
if db_client_exists(client_id, keyrec):
_purge_cached_client(client_id, keyrec)
if db_client_is_decommissioned(client_id, keyrec):
raise HTTPException(
status_code=403,
detail="Client is decommissioned and users are read-only",
)
combined_id = _client_combined_id(client_id, keyrec)
try:
conn = _get_db_connection()
cur = conn.cursor()
# Look up existing relation + current Type
cur.execute(
"""
SELECT u.UserId, ut.IDUserToClient, u.Type
FROM dbo.UserToClient ut
JOIN dbo.Muser u ON ut.UserId = u.UserId
WHERE ut.ClientNo = ? AND ut.UserNo = ?
""",
(combined_id, user_no),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="User not found")
user_id = int(row[0])
link_id = int(row[1])
current_type = (row[2] or "").strip()
# If existing user type is restricted, do not allow PATCH
if current_type in RESTRICTED_USER_TYPES:
raise HTTPException(
status_code=403,
detail=f"User type '{current_type}' is restricted and cannot be modified",
)
# Also do not allow changing Type TO a restricted value
new_type = (user.Type or "").strip()
if new_type in RESTRICTED_USER_TYPES:
raise HTTPException(
status_code=403,
detail=f"Cannot change user to restricted type '{new_type}'",
)
mobile = user.MobileNo
phone_type = 1 # tinyint, default "mobile"
notes = user.Instructions or ""
# Update core user record
cur.execute(
"""
UPDATE dbo.Muser
SET User_Name = ?, PhoneNo3 = ?, PhoneNo3Type = ?, Email = ?, Type = ?
WHERE UserId = ?
""",
(user.User_Name, mobile, phone_type, user.Email, new_type, user_id),
)
# Update link info
cur.execute(
"""
UPDATE dbo.UserToClient
SET CallOrder = ?, Instructions = ?
WHERE IDUserToClient = ?
""",
(user.CallOrder, notes, link_id),
)
logging.info(
"DB: updated user UserId=%s UserNo=%s for Client_No=%s",
user_id,
user_no,
combined_id,
)
except HTTPException:
raise
except Exception as e:
logging.error(
"DB: failed to update user UserNo=%s for client_id=%s (Client_No=%s): %s",
user_no,
client_id,
combined_id,
e,
)
raise HTTPException(status_code=500, detail="Failed to update user in DB")
finally:
try:
conn.close()
except Exception:
pass
return {"client_id": client_id, "user_no": user_no, "user": user}
# Non-DB client but pending_import -> lock
if client_is_pending_import(client_id, keyrec):
raise HTTPException(
status_code=409,
detail="Client is pending import into Patriot and is currently locked",
)
# Non-DB client: operate on in-memory + XML
with _store_lock:
cli = _require_client(client_id, keyrec)
users = cli.setdefault("users", {})
existing = users.get(str(user_no))
if not existing:
raise HTTPException(status_code=404, detail="User not found")
current_type = (existing.get("Type") or "").strip()
if current_type in RESTRICTED_USER_TYPES:
raise HTTPException(
status_code=403,
detail=f"User type '{current_type}' is restricted and cannot be modified",
)
new_type = (user.Type or "").strip()
if new_type in RESTRICTED_USER_TYPES:
raise HTTPException(
status_code=403,
detail=f"Cannot change user to restricted type '{new_type}'",
)
users[str(user_no)] = user.model_dump()
write_client_xml(client_id, keyrec)
return {"client_id": client_id, "user_no": user_no, "user": user}
@app.get("/users", status_code=200)
def get_users(
x_client_id: int = Header(..., alias="X-Client-Id"),
keyrec: KeyRecord = Depends(require_api_key),
):
"""
Get all users for a client.
- If client exists in DB:
* Read users via dbo.UserToClient + dbo.Muser
- If client does NOT exist in DB:
* Read users from in-memory pending store
"""
client_id = x_client_id
logging.info("AUDIT get_users key=%s client_id=%s", keyrec.key_name, client_id)
# DB client → list from UserToClient + Muser
if db_client_exists(client_id, keyrec):
_purge_cached_client(client_id, keyrec)
combined_id = _client_combined_id(client_id, keyrec)
try:
conn = _get_db_connection()
cur = conn.cursor()
cur.execute(
"""
SELECT
u.UserId,
u.User_Name AS User_Name,
u.PhoneNo3 AS PhoneNo3,
u.Email AS Email,
u.Type AS Type,
u.Notes AS Notes,
ut.UserNo AS UserNo,
ut.CallOrder AS CallOrder,
ut.Instructions AS Instructions
FROM dbo.UserToClient ut
JOIN dbo.Muser u ON ut.UserId = u.UserId
WHERE ut.ClientNo = ?
ORDER BY ut.CallOrder, ut.UserNo
""",
(combined_id,),
)
rows = cur.fetchall()
users = []
for r in rows:
instructions = (r.Instructions or r.Notes or "").strip()
call_order = r.CallOrder if r.CallOrder is not None else 0
users.append(
{
"User_Name": r.User_Name,
"MobileNo": (r.PhoneNo3 or "").strip(),
# We don't have a separate MobileNoOrder column in DB,
# so we mirror CallOrder here for ordering.
"MobileNoOrder": call_order if call_order > 0 else 1,
"Email": (r.Email or "").strip(),
"Type": (r.Type or "U").strip() or "U",
"UserNo": r.UserNo,
"Instructions": instructions,
"CallOrder": call_order,
}
)
return {"client_id": client_id, "users": users}
except Exception as e:
logging.error(
"DB: failed to fetch users for client_id=%s (Client_No=%s): %s",
client_id,
combined_id,
e,
)
raise HTTPException(status_code=500, detail="Failed to load users from DB")
finally:
try:
conn.close()
except Exception:
pass
# Non-DB client → pending in-memory users
with _store_lock:
cli = _require_client(client_id, keyrec)
users_dict = cli.get("users", {})
users = list(users_dict.values())
return {"client_id": client_id, "users": users}
@app.get("/user", status_code=200)
def get_user(
x_client_id: int = Header(..., alias="X-Client-Id"),
x_user_no: int = Header(..., alias="X-User-No"),
keyrec: KeyRecord = Depends(require_api_key),
):
"""
Get a specific user (by UserNo) for a client.
- If client exists in DB:
* Read from UserToClient + Muser
- If client does NOT exist in DB:
* Read from in-memory pending store
"""
client_id = x_client_id
user_no = x_user_no
logging.info(
"AUDIT get_user key=%s client_id=%s user_no=%s",
keyrec.key_name,
client_id,
user_no,
)
# DB client → single user
if db_client_exists(client_id, keyrec):
_purge_cached_client(client_id, keyrec)
combined_id = _client_combined_id(client_id, keyrec)
try:
conn = _get_db_connection()
cur = conn.cursor()
cur.execute(
"""
SELECT
u.UserId,
u.User_Name AS User_Name,
u.PhoneNo3 AS PhoneNo3,
u.Email AS Email,
u.Type AS Type,
u.Notes AS Notes,
ut.UserNo AS UserNo,
ut.CallOrder AS CallOrder,
ut.Instructions AS Instructions
FROM dbo.UserToClient ut
JOIN dbo.Muser u ON ut.UserId = u.UserId
WHERE ut.ClientNo = ? AND ut.UserNo = ?
""",
(combined_id, user_no),
)
r = cur.fetchone()
if not r:
raise HTTPException(status_code=404, detail="User not found")
instructions = (r.Instructions or r.Notes or "").strip()
call_order = r.CallOrder if r.CallOrder is not None else 0
user = {
"User_Name": r.User_Name,
"MobileNo": (r.PhoneNo3 or "").strip(),
"MobileNoOrder": call_order if call_order > 0 else 1,
"Email": (r.Email or "").strip(),
"Type": (r.Type or "U").strip() or "U",
"UserNo": r.UserNo,
"Instructions": instructions,
"CallOrder": call_order,
}
return {"client_id": client_id, "user": user}
except HTTPException:
raise
except Exception as e:
logging.error(
"DB: failed to fetch user UserNo=%s for client_id=%s (Client_No=%s): %s",
user_no,
client_id,
combined_id,
e,
)
raise HTTPException(status_code=500, detail="Failed to load user from DB")
finally:
try:
conn.close()
except Exception:
pass
# Non-DB client → pending in-memory users
with _store_lock:
cli = _require_client(client_id, keyrec)
users_dict = cli.get("users", {})
user = users_dict.get(str(user_no))
if not user:
raise HTTPException(status_code=404, detail="User not found")
return {"client_id": client_id, "user": user}
@app.delete("/users", status_code=204)
def delete_user(
payload: UserDelete,
keyrec: KeyRecord = Depends(require_api_key),
):
client_id = payload.client_id
user_no = payload.user_no
logging.info(
"AUDIT delete_user key=%s client_id=%s user_no=%s",
keyrec.key_name,
client_id,
user_no,
)
# DB client: delete from UserToClient AND Muser
if db_client_exists(client_id, keyrec):
if db_client_is_decommissioned(client_id, keyrec):
raise HTTPException(
status_code=403,
detail="Client is decommissioned and users are read-only",
)
combined_id = _client_combined_id(client_id, keyrec)
try:
conn = _get_db_connection()
cur = conn.cursor()
# Look up link + current Type + UserId
cur.execute(
"""
SELECT ut.IDUserToClient, u.UserId, u.Type
FROM dbo.UserToClient ut
JOIN dbo.Muser u ON ut.UserId = u.UserId
WHERE ut.ClientNo = ? AND ut.UserNo = ?
""",
(combined_id, user_no),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="User not found")
link_id = int(row[0])
user_id = int(row[1])
current_type = (row[2] or "").strip()
if current_type in RESTRICTED_USER_TYPES:
raise HTTPException(
status_code=403,
detail=f"User type '{current_type}' is restricted and cannot be deleted",
)
# First delete link
cur.execute(
"DELETE FROM dbo.UserToClient WHERE IDUserToClient = ?",
(link_id,),
)
# Then delete the user row itself
cur.execute(
"DELETE FROM dbo.Muser WHERE UserId = ?",
(user_id,),
)
logging.info(
"DB: deleted user UserId=%s UserNo=%s (IDUserToClient=%s) for Client_No=%s",
user_id,
user_no,
link_id,
combined_id,
)
except HTTPException:
raise
except Exception as e:
logging.error(
"DB: failed to delete user UserNo=%s for client_id=%s (Client_No=%s): %s",
user_no,
client_id,
combined_id,
e,
)
raise HTTPException(status_code=500, detail="Failed to delete user in DB")
finally:
try:
conn.close()
except Exception:
pass
return
# Non-DB client but pending_import -> lock
if client_is_pending_import(client_id, keyrec):
raise HTTPException(
status_code=409,
detail="Client is pending import into Patriot and is currently locked",
)
# Non-DB client: operate on in-memory + XML
with _store_lock:
cli = _require_client(client_id, keyrec)
users = cli.get("users", {})
existing = users.get(str(user_no))
if not existing:
raise HTTPException(status_code=404, detail="User not found")
current_type = (existing.get("Type") or "").strip()
if current_type in RESTRICTED_USER_TYPES:
raise HTTPException(
status_code=403,
detail=f"User type '{current_type}' is restricted and cannot be deleted",
)
del users[str(user_no)]
write_client_xml(client_id, keyrec)
return
# ----------------- Main (for standalone run) -----------------
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)