A24_Patriot_API/main.py

726 lines
26 KiB
Python

from __future__ import annotations
import json, os, threading, re, logging
from datetime import datetime, timezone
from typing import Dict, Optional
from time import time
from fastapi import FastAPI, HTTPException, Depends, Header, Request, Response, status
from pydantic import BaseModel, Field, EmailStr, field_validator
SAFE_NAME = re.compile(r"[^A-Za-z0-9._-]")
def _safe_folder(name: str) -> str:
cleaned = SAFE_NAME.sub("_", name.strip())
return cleaned or "unknown"
# ------------ Config ------------
DATA_FILE = os.getenv("DATA_FILE", "data.json")
KEY_FILE = os.getenv("KEY_FILE", "keys.json")
XML_DIR = os.getenv("XML_DIR", "out/clients")
# ------------ Auth (hot-reloaded key store) ------------
class KeyRecord(BaseModel):
key_name: str
key: str
enabled: bool = True
valid_to: str # ISO-8601
class KeyStore:
def __init__(self, path: str):
self.path = path
self._mtime = 0.0
self._keys: Dict[str, KeyRecord] = {}
self._lock = threading.Lock()
def _parse_time(self, s: str) -> datetime:
try:
if s.endswith("Z"):
s = s[:-1] + "+00:00"
dt = datetime.fromisoformat(s)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
except Exception:
return datetime.min.replace(tzinfo=timezone.utc)
def _load_if_changed(self):
try:
mtime = os.path.getmtime(self.path)
except FileNotFoundError:
mtime = 0.0
if mtime != self._mtime:
with self._lock:
try:
mtime2 = os.path.getmtime(self.path)
except FileNotFoundError:
self._keys = {}
self._mtime = 0.0
return
if mtime2 == self._mtime:
return
with open(self.path, "r", encoding="utf-8") as f:
raw = json.load(f)
new_map: Dict[str, KeyRecord] = {}
for item in raw:
rec = KeyRecord(**item)
new_map[rec.key] = rec
self._keys = new_map
self._mtime = mtime2
def validate_bearer(self, bearer: str) -> KeyRecord:
self._load_if_changed()
rec = self._keys.get(bearer)
if not rec:
raise HTTPException(status_code=401, detail="Invalid token")
if not rec.enabled:
raise HTTPException(status_code=401, detail="Token disabled")
now = datetime.now(timezone.utc)
valid_to = self._parse_time(rec.valid_to)
if now > valid_to:
raise HTTPException(status_code=401, detail="Token expired")
return rec
_key_store = KeyStore(KEY_FILE)
def require_api_key(authorization: Optional[str] = Header(None)) -> KeyRecord:
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Missing bearer token")
token = authorization.split(" ", 1)[1].strip()
return _key_store.validate_bearer(token)
# ------------ Models ------------
class ClientInfo(BaseModel):
Name: str = Field(..., min_length=1, max_length=200)
Location: str = Field(..., min_length=1, max_length=200)
# split fields used by the API
area_code: str = Field(..., min_length=1, max_length=20)
area: str = Field(..., min_length=1, max_length=200)
AltLookup: bool = True
AltAlarmNo: str | None = None
ConvertType: str = "None"
NoSigsMon: str = "None"
SinceDays: int = Field(0, ge=0)
SinceHrs: int = Field(0, ge=0)
SinceMins: int = Field(0, ge=0)
ResetNosigsIgnored: bool = True
ResetNosigsDays: int = Field(0, ge=0)
ResetNosigsHrs: int = Field(0, ge=0)
ResetNosigsMins: int = Field(0, ge=0)
PanelName: str = "Panel Type"
PanelSite: str = "Panel location"
class User(BaseModel):
User_Name: str = Field(..., min_length=1, max_length=120)
MobileNo: str = Field(..., min_length=1, max_length=40)
MobileNoOrder: int = Field(..., ge=1, le=999)
Email: EmailStr
Type: str = Field("U", min_length=1, max_length=2)
UserNo: int = Field(..., ge=1, le=999)
Instructions: str | None = Field(None, max_length=500)
CallOrder: int = Field(0, ge=0, le=999)
@field_validator("MobileNo")
def phone_has_digit(cls, v: str):
if not any(ch.isdigit() for ch in v):
raise ValueError("MobileNo must contain digits")
return v.strip()
class ClientCreate(BaseModel):
client_id: int = Field(..., ge=1)
info: ClientInfo
class ClientInfoPatch(BaseModel):
Name: Optional[str] = Field(None, min_length=1, max_length=200)
Location: Optional[str] = Field(None, min_length=1, max_length=200)
area_code: Optional[str] = Field(None, min_length=1, max_length=20)
area: Optional[str] = Field(None, min_length=1, max_length=200)
AltLookup: Optional[bool] = None
AltAlarmNo: Optional[str] = None
ConvertType: Optional[str] = None
NoSigsMon: Optional[str] = None
SinceDays: Optional[int] = Field(None, ge=0)
SinceHrs: Optional[int] = Field(None, ge=0)
SinceMins: Optional[int] = Field(None, ge=0)
ResetNosigsIgnored: Optional[bool] = None
ResetNosigsDays: Optional[int] = Field(None, ge=0)
ResetNosigsHrs: Optional[int] = Field(None, ge=0)
ResetNosigsMins: Optional[int] = Field(None, ge=0)
PanelName: Optional[str] = Field(None, min_length=1, max_length=200)
PanelSite: Optional[str] = Field(None, min_length=1, max_length=200)
class ClientPatch(BaseModel):
client_id: int = Field(..., ge=1)
info: Optional[ClientInfoPatch] = None
class ClientID(BaseModel):
client_id: int = Field(..., ge=1)
class ZoneUpdate(BaseModel):
client_id: int = Field(..., ge=1)
zone_id: int = Field(..., ge=1, le=999)
Zone_area: str = Field(..., min_length=1, max_length=200)
ModuleNo: int = Field(0, ge=0)
class ZoneDelete(BaseModel):
client_id: int = Field(..., ge=1)
zone_id: int = Field(..., ge=1, le=999)
class UserWithClient(BaseModel):
client_id: int = Field(..., ge=1)
user: User
class UserKey(BaseModel):
client_id: int = Field(..., ge=1)
user_no: int = Field(..., ge=1, le=999)
# ------------ In-memory store + persistence ------------
_store_lock = threading.Lock()
# Multi-tenant: {"keys": { key_name: { "clients": { client_id_str: {...} } } } }
_store: Dict = {"keys": {}}
def _client_key(client_id: int) -> str:
return str(client_id)
def _tenant_store(key_name: str) -> Dict:
"""Return the per-key tenant store, creating if needed."""
tenants = _store.setdefault("keys", {})
if key_name not in tenants:
tenants[key_name] = {"clients": {}}
return tenants[key_name]
def _clients_for(key_name: str) -> Dict[str, Dict]:
"""Return dict of clients for a given key_name."""
tenant = _tenant_store(key_name)
return tenant.setdefault("clients", {})
def _require_client_for(key_name: str, client_id: int) -> Dict:
"""Fetch a client for this key only; 404 if not found."""
clients = _clients_for(key_name)
cli = clients.get(_client_key(client_id))
if not cli:
raise HTTPException(status_code=404, detail="Client not found")
return cli
def _load_store():
global _store
if os.path.exists(DATA_FILE):
with open(DATA_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
# Backwards compatibility: old single-tenant shape {"clients": {...}}
if isinstance(data, dict) and "keys" in data:
_store = data
elif isinstance(data, dict) and "clients" in data:
# Legacy: put all clients under a pseudo-tenant
_store = {"keys": {"__legacy__": {"clients": data["clients"]}}}
else:
_store = {"keys": {}}
else:
_store = {"keys": {}}
_flush_store()
def _flush_store():
tmp = DATA_FILE + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(_store, f, ensure_ascii=False, indent=2)
os.replace(tmp, DATA_FILE)
# ------------ XML writer ------------
def _ensure_dir(path: str):
os.makedirs(path, exist_ok=True)
def _xml_path_for(client_id: int, key_name: str) -> str:
client_base = os.path.join(XML_DIR, _safe_folder(key_name))
_ensure_dir(client_base)
return os.path.join(client_base, f"{client_id}.xml")
def write_client_xml(client_id: int, key_name: str):
from xml.etree.ElementTree import Element, SubElement, ElementTree
clients = _clients_for(key_name)
key = _client_key(client_id)
cli = clients.get(key)
if not cli:
return
info = ClientInfo(**cli.get("info", {}))
root = Element("Clients")
row = SubElement(root, "Row")
SubElement(row, "__id").text = str(client_id)
SubElement(row, "Name").text = info.Name
SubElement(row, "Location").text = info.Location
SubElement(row, "loc2").text = f"{info.area_code} {info.area}".strip()
SubElement(row, "AltLookup").text = "True" if info.AltLookup else "False"
SubElement(row, "AltAlarmNo").text = info.AltAlarmNo or ""
SubElement(row, "ConvertType").text = info.ConvertType
SubElement(row, "NoSigsMon").text = info.NoSigsMon
SubElement(row, "SinceDays").text = str(info.SinceDays)
SubElement(row, "SinceHrs").text = str(info.SinceHrs)
SubElement(row, "SinceMins").text = str(info.SinceMins)
SubElement(row, "ResetNosigsIgnored").text = "True" if info.ResetNosigsIgnored else "False"
SubElement(row, "ResetNosigsDays").text = str(info.ResetNosigsDays)
SubElement(row, "ResetNosigsHrs").text = str(info.ResetNosigsHrs)
SubElement(row, "ResetNosigsMins").text = str(info.ResetNosigsMins)
SubElement(row, "PanelName").text = info.PanelName
SubElement(row, "PanelSite").text = info.PanelSite
# Zones
zones_el = SubElement(row, "Zones")
zones = cli.get("zones") or {}
for zid_str in sorted(zones.keys(), key=lambda x: int(x)):
z = zones[zid_str] or {}
z_el = SubElement(zones_el, "Zone")
SubElement(z_el, "Zone_area").text = z.get("Zone_area", "")
SubElement(z_el, "Zone_No").text = zid_str
SubElement(z_el, "ModuleNo").text = str(z.get("ModuleNo", 0))
# Users
users_el = SubElement(row, "Users")
users = cli.get("users") or {}
def _sort_user(u: dict):
return (u.get("CallOrder", 0), u.get("MobileNoOrder", 0), u.get("UserNo", 0))
for _, u in sorted(users.items(), key=lambda kv: _sort_user(kv[1])):
u_el = SubElement(users_el, "User")
SubElement(u_el, "User_Name").text = u.get("User_Name", "")
SubElement(u_el, "MobileNo").text = u.get("MobileNo", "")
SubElement(u_el, "MobileNoOrder").text = str(u.get("MobileNoOrder", 0))
SubElement(u_el, "Email").text = u.get("Email", "")
SubElement(u_el, "Type").text = u.get("Type", "U")
SubElement(u_el, "UserNo").text = str(u.get("UserNo", ""))
SubElement(u_el, "Instructions").text = u.get("Instructions", "") or ""
SubElement(u_el, "CallOrder").text = str(u.get("CallOrder", 0))
path = _xml_path_for(client_id, key_name)
try:
import xml.dom.minidom as minidom
tmp_path = path + ".tmp"
ElementTree(root).write(tmp_path, encoding="utf-8", xml_declaration=True)
with open(tmp_path, "rb") as rf:
dom = minidom.parseString(rf.read())
pretty = dom.toprettyxml(indent=" ", encoding="utf-8")
with open(tmp_path, "wb") as wf:
wf.write(pretty)
os.replace(tmp_path, path)
except Exception:
ElementTree(root).write(path, encoding="utf-8", xml_declaration=True)
# ------------ App & logging middleware ------------
_load_store()
app = FastAPI(title="Client Registry API", version="3.1.0")
logging.basicConfig(
filename="api_requests.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
REDACT_KEYS = {"authorization", "password", "secret", "key"}
MAX_BODY_CHARS = 4000
def _redact(obj):
try:
if isinstance(obj, dict):
return {k: ("***" if k.lower() in REDACT_KEYS else _redact(v)) for k, v in obj.items()}
if isinstance(obj, list):
return [_redact(v) for v in obj]
except Exception:
pass
return obj
@app.middleware("http")
async def log_requests(request: Request, call_next):
start = time()
method = request.method.upper()
body_text = ""
if method in {"POST", "PUT", "PATCH", "DELETE"}:
try:
raw = await request.body()
async def receive():
return {"type": "http.request", "body": raw, "more_body": False}
request._receive = receive
try:
parsed = json.loads(raw.decode("utf-8") if raw else "{}")
body_text = json.dumps(_redact(parsed))[:MAX_BODY_CHARS]
except Exception:
body_text = (raw.decode("utf-8", "replace") if raw else "")[:MAX_BODY_CHARS]
except Exception as e:
body_text = f"<<error reading body: {e}>>"
response = await call_next(request)
duration = time() - start
logging.info(
"%s %s - %s - %.3fs - body=%s",
method, request.url.path, response.status_code, duration, body_text
)
return response
# --------- Routes ---------
# UPSERT CLIENT (create if missing, update if exists)
@app.put("/clients", status_code=200)
def upsert_client(
payload: ClientCreate,
keyrec: KeyRecord = Depends(require_api_key),
response: Response = None,
):
client_id = payload.client_id
logging.info(
"AUDIT upsert_client key=%s client_id=%s payload=%s",
keyrec.key_name, client_id, payload.model_dump()
)
info = payload.info.model_dump()
# Auto-defaults
alt = info.get("AltAlarmNo")
if alt is None or (isinstance(alt, str) and not alt.strip()):
info["AltAlarmNo"] = "SIA1000101"
pn = info.get("PanelName")
if pn is None or (isinstance(pn, str) and not pn.strip()):
info["PanelName"] = "STANDARD"
key = _client_key(client_id)
with _store_lock:
clients = _clients_for(keyrec.key_name)
existed = key in clients
if existed:
cli = clients[key]
cli["info"] = info
else:
cli = {
"info": info,
"zones": {},
"users": {}
}
clients[key] = cli
if response is not None:
response.status_code = status.HTTP_200_OK if existed else status.HTTP_201_CREATED
_flush_store()
write_client_xml(client_id, keyrec.key_name)
return {"client_id": client_id}
# PATCH CLIENT INFO (partial replace of info)
@app.patch("/clients", status_code=200)
def patch_client(
payload: ClientPatch,
keyrec: KeyRecord = Depends(require_api_key),
):
client_id = payload.client_id
logging.info(
"AUDIT patch_client key=%s client_id=%s payload=%s",
keyrec.key_name, client_id, payload.model_dump()
)
with _store_lock:
cli = _require_client_for(keyrec.key_name, client_id)
if payload.info is not None:
# Current stored info (must be valid ClientInfo shape)
current_info = cli.get("info", {})
base = ClientInfo(**current_info).model_dump()
# Only the fields actually sent in the request
updates = payload.info.model_dump(exclude_unset=True)
# Merge: updates override base
merged = {**base, **updates}
# Auto-defaults AFTER merge
alt = merged.get("AltAlarmNo")
if alt is None or (isinstance(alt, str) and not alt.strip()):
merged["AltAlarmNo"] = "SIA1000101"
pn = merged.get("PanelName")
if pn is None or (isinstance(pn, str) and not pn.strip()):
merged["PanelName"] = "STANDARD"
merged_valid = ClientInfo(**merged).model_dump()
cli["info"] = merged_valid
_flush_store()
write_client_xml(client_id, keyrec.key_name)
return {"client_id": client_id, **cli}
# GET CLIENT (header-based)
@app.get("/clients")
def get_client(
x_client_id: int = Header(..., alias="X-Client-Id"),
keyrec: KeyRecord = Depends(require_api_key),
):
client_id = x_client_id
logging.info("AUDIT get_client key=%s client_id=%s", keyrec.key_name, client_id)
cli = _require_client_for(keyrec.key_name, client_id)
xml_path = _xml_path_for(client_id, keyrec.key_name)
if not os.path.exists(xml_path):
logging.info("XML missing for client_id=%s (key=%s). Regenerating...", client_id, keyrec.key_name)
write_client_xml(client_id, keyrec.key_name)
return {"client_id": client_id, **cli}
@app.post("/clients/get", status_code=200)
def get_client_post(payload: ClientID, keyrec: KeyRecord = Depends(require_api_key)):
client_id = payload.client_id
logging.info("AUDIT get_client (POST) key=%s client_id=%s", keyrec.key_name, client_id)
cli = _require_client_for(keyrec.key_name, client_id)
xml_path = _xml_path_for(client_id, keyrec.key_name)
if not os.path.exists(xml_path):
logging.info("XML missing for client_id=%s (key=%s). Regenerating...", client_id, keyrec.key_name)
write_client_xml(client_id, keyrec.key_name)
return {"client_id": client_id, **cli}
# REGEN XML explicitly (for this key only)
@app.post("/clients/regen-xml", status_code=200)
def regen_xml(payload: ClientID, keyrec: KeyRecord = Depends(require_api_key)):
client_id = payload.client_id
logging.info("AUDIT regen_xml key=%s client_id=%s", keyrec.key_name, client_id)
_require_client_for(keyrec.key_name, client_id)
write_client_xml(client_id, keyrec.key_name)
return {"client_id": client_id, "xml": "regenerated"}
# DELETE ENTIRE CLIENT (for this key only)
@app.delete("/clients", status_code=204)
def delete_client(payload: ClientID, keyrec: KeyRecord = Depends(require_api_key)):
client_id = payload.client_id
logging.info("AUDIT delete_client key=%s client_id=%s", keyrec.key_name, client_id)
key = _client_key(client_id)
with _store_lock:
clients = _clients_for(keyrec.key_name)
if key not in clients:
raise HTTPException(status_code=404, detail="Client not found")
del clients[key]
_flush_store()
# Remove XML only for *this* key
try:
xml_path = _xml_path_for(client_id, keyrec.key_name)
if os.path.exists(xml_path):
os.remove(xml_path)
except Exception as e:
logging.warning("Failed to remove XML for client %s key %s: %s", client_id, keyrec.key_name, e)
return
# ---- Zones ----
@app.get("/clients/zones")
def list_zones(
x_client_id: int = Header(..., alias="X-Client-Id"),
keyrec: KeyRecord = Depends(require_api_key)
):
client_id = x_client_id
logging.info("AUDIT list_zones key=%s client_id=%s", keyrec.key_name, client_id)
cli = _require_client_for(keyrec.key_name, client_id)
zones_obj = cli.get("zones", {})
zones = [
{"Zone_No": int(zid), "Zone_area": data.get("Zone_area"), "ModuleNo": data.get("ModuleNo", 0)}
for zid, data in zones_obj.items()
]
zones.sort(key=lambda z: z["Zone_No"])
return {"client_id": client_id, "zones": zones}
@app.post("/clients/zones/list", status_code=200)
def list_zones_post(payload: ClientID, keyrec: KeyRecord = Depends(require_api_key)):
client_id = payload.client_id
logging.info("AUDIT list_zones (POST) key=%s client_id=%s", keyrec.key_name, client_id)
cli = _require_client_for(keyrec.key_name, client_id)
zones_obj = cli.get("zones", {})
zones = [
{"Zone_No": int(zid), "Zone_area": data.get("Zone_area"), "ModuleNo": data.get("ModuleNo", 0)}
for zid, data in zones_obj.items()
]
zones.sort(key=lambda z: z["Zone_No"])
return {"client_id": client_id, "zones": zones}
@app.put("/clients/zones", status_code=201)
def upsert_zone(payload: ZoneUpdate, keyrec: KeyRecord = Depends(require_api_key)):
client_id = payload.client_id
zone_id = payload.zone_id
logging.info(
"AUDIT upsert_zone key=%s client_id=%s zone_id=%s body=%s",
keyrec.key_name, client_id, zone_id, payload.model_dump()
)
if not (1 <= zone_id <= 999):
raise HTTPException(status_code=422, detail="zone_id must be 1..999")
with _store_lock:
cli = _require_client_for(keyrec.key_name, client_id)
cli.setdefault("zones", {})
cli["zones"][str(zone_id)] = {
"Zone_area": payload.Zone_area,
"ModuleNo": payload.ModuleNo
}
_flush_store()
write_client_xml(client_id, keyrec.key_name)
return {
"client_id": client_id,
"zone": {"Zone_No": zone_id, "Zone_area": payload.Zone_area, "ModuleNo": payload.ModuleNo}
}
@app.delete("/clients/zones", status_code=204)
def delete_zone(payload: ZoneDelete, keyrec: KeyRecord = Depends(require_api_key)):
client_id = payload.client_id
zone_id = payload.zone_id
logging.info("AUDIT delete_zone key=%s client_id=%s zone_id=%s",
keyrec.key_name, client_id, zone_id)
with _store_lock:
cli = _require_client_for(keyrec.key_name, client_id)
zones = cli.get("zones", {})
if str(zone_id) not in zones:
raise HTTPException(status_code=404, detail="Zone not found")
del zones[str(zone_id)]
_flush_store()
write_client_xml(client_id, keyrec.key_name)
return
# ---- Users ----
@app.get("/clients/users")
def list_users(
x_client_id: int = Header(..., alias="X-Client-Id"),
keyrec: KeyRecord = Depends(require_api_key)
):
client_id = x_client_id
logging.info("AUDIT list_users key=%s client_id=%s", keyrec.key_name, client_id)
cli = _require_client_for(keyrec.key_name, client_id)
users = list(cli.get("users", {}).values())
users.sort(key=lambda u: (u.get("CallOrder", 0), u.get("MobileNoOrder", 0), u.get("UserNo", 0)))
return {"client_id": client_id, "users": users}
@app.get("/clients/users/single")
def get_user_header(
x_client_id: int = Header(..., alias="X-Client-Id"),
x_user_no: int = Header(..., alias="X-User-No"),
keyrec: KeyRecord = Depends(require_api_key)
):
client_id = x_client_id
user_no = x_user_no
logging.info("AUDIT get_user (header) key=%s client_id=%s user_no=%s",
keyrec.key_name, client_id, user_no)
cli = _require_client_for(keyrec.key_name, client_id)
u = cli.get("users", {}).get(str(user_no))
if not u:
raise HTTPException(status_code=404, detail="User not found")
return {"client_id": client_id, "user": u}
@app.post("/clients/users/list", status_code=200)
def list_users_post(payload: ClientID, keyrec: KeyRecord = Depends(require_api_key)):
client_id = payload.client_id
logging.info("AUDIT list_users (POST) key=%s client_id=%s", keyrec.key_name, client_id)
cli = _require_client_for(keyrec.key_name, client_id)
users = list(cli.get("users", {}).values())
users.sort(key=lambda u: (u.get("CallOrder", 0), u.get("MobileNoOrder", 0), u.get("UserNo", 0)))
return {"client_id": client_id, "users": users}
@app.post("/clients/users", status_code=201)
def add_user(payload: UserWithClient, keyrec: KeyRecord = Depends(require_api_key)):
client_id = payload.client_id
user = payload.user
logging.info(
"AUDIT add_user key=%s client_id=%s UserNo=%s user=%s",
keyrec.key_name, client_id, user.UserNo, user.model_dump()
)
with _store_lock:
cli = _require_client_for(keyrec.key_name, client_id)
cli.setdefault("users", {})
if str(user.UserNo) in cli["users"]:
raise HTTPException(status_code=409, detail="UserNo already exists")
cli["users"][str(user.UserNo)] = user.model_dump()
_flush_store()
write_client_xml(client_id, keyrec.key_name)
return {"client_id": client_id, "user": user}
@app.post("/clients/users/get", status_code=200)
def get_user(payload: UserKey, keyrec: KeyRecord = Depends(require_api_key)):
client_id = payload.client_id
user_no = payload.user_no
logging.info("AUDIT get_user (POST) key=%s client_id=%s user_no=%s",
keyrec.key_name, client_id, user_no)
cli = _require_client_for(keyrec.key_name, client_id)
u = cli.get("users", {}).get(str(user_no))
if not u:
raise HTTPException(status_code=404, detail="User not found")
return {"client_id": client_id, "user": u}
@app.put("/clients/users", status_code=200)
def replace_user(payload: UserWithClient, keyrec: KeyRecord = Depends(require_api_key)):
client_id = payload.client_id
user = payload.user
user_no = user.UserNo
logging.info(
"AUDIT replace_user key=%s client_id=%s user_no=%s user=%s",
keyrec.key_name, client_id, user_no, user.model_dump()
)
with _store_lock:
cli = _require_client_for(keyrec.key_name, client_id)
cli.setdefault("users", {})
cli["users"][str(user_no)] = user.model_dump()
_flush_store()
write_client_xml(client_id, keyrec.key_name)
return {"client_id": client_id, "user": user}
@app.delete("/clients/users", status_code=204)
def delete_user(payload: UserKey, keyrec: KeyRecord = Depends(require_api_key)):
client_id = payload.client_id
user_no = payload.user_no
logging.info("AUDIT delete_user key=%s client_id=%s user_no=%s",
keyrec.key_name, client_id, user_no)
with _store_lock:
cli = _require_client_for(keyrec.key_name, client_id)
users = cli.get("users", {})
if str(user_no) not in users:
raise HTTPException(status_code=404, detail="User not found")
del users[str(user_no)]
_flush_store()
write_client_xml(client_id, keyrec.key_name)
return