from __future__ import annotations import json, os, threading, re, logging from datetime import datetime, timezone from typing import Dict, Optional, List from time import time from fastapi import FastAPI, HTTPException, Depends, Header, Request, Response, status from pydantic import BaseModel, Field, EmailStr, field_validator SAFE_NAME = re.compile(r"[^A-Za-z0-9._-]") def _safe_folder(name: str) -> str: cleaned = SAFE_NAME.sub("_", name.strip()) return cleaned or "unknown" # ------------ Config ------------ DATA_FILE = os.getenv("DATA_FILE", "data.json") KEY_FILE = os.getenv("KEY_FILE", "keys.json") XML_DIR = os.getenv("XML_DIR", "out/clients") # ------------ Models used in keys.json ------------ class ClientGroupingConfig(BaseModel): description: str grouping_type_description: str grouping_allow_multiple: bool = True # ------------ Auth (hot-reloaded key store) ------------ class KeyRecord(BaseModel): key_name: str key: str enabled: bool = True valid_to: str # ISO-8601 # Used for __id in XML: <__id>{client_id}{port} port: str = "" # Per-key forced XML fields installer_name: str = "" # , user 199 name, grouping #2 installer_email: str = "" # user 199 email use_glob_callouts: bool = True # show_on_callouts: bool = False # glob_callouts: str = "" # use_glob_callouts2: bool = True # show_on_callouts2: bool = False # glob_callouts2: str = "" # alt_lookup: bool = True # alt_alarm_no: str = "CID4BASE01" # convert_type: str = "None" # siginterpret: str = "SIADecimal" # # NEW: per-key client groupings (rendered to ) client_groupings: List[ClientGroupingConfig] = Field(default_factory=list) class KeyStore: def __init__(self, path: str): self.path = path self._mtime = 0.0 self._keys: Dict[str, KeyRecord] = {} self._lock = threading.Lock() def _parse_time(self, s: str) -> datetime: try: if s.endswith("Z"): s = s[:-1] + "+00:00" dt = datetime.fromisoformat(s) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) except Exception: return datetime.min.replace(tzinfo=timezone.utc) def _load_if_changed(self): try: mtime = os.path.getmtime(self.path) except FileNotFoundError: mtime = 0.0 if mtime != self._mtime: with self._lock: try: mtime2 = os.path.getmtime(self.path) except FileNotFoundError: self._keys = {} self._mtime = 0.0 return if mtime2 == self._mtime: return with open(self.path, "r", encoding="utf-8") as f: raw = json.load(f) new_map: Dict[str, KeyRecord] = {} for item in raw: rec = KeyRecord(**item) new_map[rec.key] = rec self._keys = new_map self._mtime = mtime2 def validate_bearer(self, bearer: str) -> KeyRecord: self._load_if_changed() rec = self._keys.get(bearer) if not rec: raise HTTPException(status_code=401, detail="Invalid token") if not rec.enabled: raise HTTPException(status_code=401, detail="Token disabled") now = datetime.now(timezone.utc) valid_to = self._parse_time(rec.valid_to) if now > valid_to: raise HTTPException(status_code=401, detail="Token expired") return rec _key_store = KeyStore(KEY_FILE) def require_api_key(authorization: Optional[str] = Header(None)) -> KeyRecord: if not authorization or not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="Missing bearer token") token = authorization.split(" ", 1)[1].strip() return _key_store.validate_bearer(token) # ------------ Models (client payloads) ------------ class ClientInfo(BaseModel): # From API / client section Name: str = Field(..., min_length=1, max_length=200) Alias: Optional[str] = Field(None, max_length=200) Location: str = Field(..., min_length=1, max_length=200) # split fields used by the API → XML loc2 = "{area_code} {area}" area_code: str = Field(..., min_length=1, max_length=20) area: str = Field(..., min_length=1, max_length=200) BusPhone: Optional[str] = Field(None, max_length=50) Email: Optional[EmailStr] = None OKPassword: Optional[str] = Field(None, max_length=100) SpecRequest: Optional[str] = Field(None, max_length=1000) # No-signal monitoring NoSigsMon: str = "ActiveAny" SinceDays: int = Field(1, ge=0) SinceHrs: int = Field(0, ge=0) SinceMins: int = Field(30, ge=0) ResetNosigsIgnored: bool = True ResetNosigsDays: int = Field(7, ge=0) ResetNosigsHrs: int = Field(0, ge=0) ResetNosigsMins: int = Field(0, ge=0) InstallDateTime: Optional[str] = Field( None, description="Installation date/time, e.g. '2023-02-20'" ) PanelName: str = "Panel Type" PanelSite: str = "Panel location" KeypadLocation: Optional[str] = Field(None, max_length=200) SPPage: Optional[str] = Field(None, max_length=2000) class User(BaseModel): User_Name: str = Field(..., min_length=1, max_length=120) MobileNo: str = Field(..., min_length=1, max_length=40) MobileNoOrder: int = Field(..., ge=1, le=999) Email: EmailStr Type: str = Field("U", min_length=1, max_length=2) UserNo: int = Field(..., ge=1, le=999) Instructions: str | None = Field(None, max_length=500) CallOrder: int = Field(0, ge=0, le=999) @field_validator("MobileNo") def phone_has_digit(cls, v: str): if not any(ch.isdigit() for ch in v): raise ValueError("MobileNo must contain digits") return v.strip() class ClientCreate(BaseModel): client_id: int = Field(..., ge=1) info: ClientInfo class ClientInfoPatch(BaseModel): # all optional, for PATCH Name: Optional[str] = Field(None, min_length=1, max_length=200) Alias: Optional[str] = Field(None, max_length=200) Location: Optional[str] = Field(None, min_length=1, max_length=200) area_code: Optional[str] = Field(None, min_length=1, max_length=20) area: Optional[str] = Field(None, min_length=1, max_length=200) BusPhone: Optional[str] = Field(None, max_length=50) Email: Optional[EmailStr] = None OKPassword: Optional[str] = Field(None, max_length=100) SpecRequest: Optional[str] = Field(None, max_length=1000) NoSigsMon: Optional[str] = None SinceDays: Optional[int] = Field(None, ge=0) SinceHrs: Optional[int] = Field(None, ge=0) SinceMins: Optional[int] = Field(None, ge=0) ResetNosigsIgnored: Optional[bool] = None ResetNosigsDays: Optional[int] = Field(None, ge=0) ResetNosigsHrs: Optional[int] = Field(None, ge=0) ResetNosigsMins: Optional[int] = Field(None, ge=0) InstallDateTime: Optional[str] = None PanelName: Optional[str] = Field(None, min_length=1, max_length=200) PanelSite: Optional[str] = Field(None, min_length=1, max_length=200) KeypadLocation: Optional[str] = Field(None, max_length=200) SPPage: Optional[str] = Field(None, max_length=2000) class ClientPatch(BaseModel): client_id: int = Field(..., ge=1) info: Optional[ClientInfoPatch] = None class ClientID(BaseModel): client_id: int = Field(..., ge=1) class ZoneUpdate(BaseModel): client_id: int = Field(..., ge=1) zone_id: int = Field(..., ge=1, le=999) Zone_area: str = Field(..., min_length=1, max_length=200) ModuleNo: int = Field(0, ge=0) class ZoneDelete(BaseModel): client_id: int = Field(..., ge=1) zone_id: int = Field(..., ge=1, le=999) class UserWithClient(BaseModel): client_id: int = Field(..., ge=1) user: User class UserKey(BaseModel): client_id: int = Field(..., ge=1) user_no: int = Field(..., ge=1, le=999) # ------------ In-memory store + persistence ------------ # Store is namespaced per API key: "key_name:client_id" _store_lock = threading.Lock() _store: Dict = {"clients": {}} def _load_store(): global _store if os.path.exists(DATA_FILE): with open(DATA_FILE, "r", encoding="utf-8") as f: _store = json.load(f) else: _flush_store() def _flush_store(): tmp = DATA_FILE + ".tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(_store, f, ensure_ascii=False, indent=2) os.replace(tmp, DATA_FILE) def _store_key(client_id: int, keyrec: KeyRecord) -> str: return f"{keyrec.key_name}:{client_id}" def _require_client(client_id: int, keyrec: KeyRecord) -> Dict: skey = _store_key(client_id, keyrec) cli = _store["clients"].get(skey) if not cli: raise HTTPException(status_code=404, detail="Client not found") return cli # ------------ XML writer ------------ def _ensure_dir(path: str): os.makedirs(path, exist_ok=True) def _xml_path_for(client_id: int, keyrec: KeyRecord) -> str: client_base = os.path.join(XML_DIR, _safe_folder(keyrec.key_name)) _ensure_dir(client_base) return os.path.join(client_base, f"{client_id}.xml") def _bool_text(val: bool) -> str: return "True" if val else "False" def write_client_xml(client_id: int, keyrec: KeyRecord): from xml.etree.ElementTree import Element, SubElement, ElementTree skey = _store_key(client_id, keyrec) cli = _store["clients"].get(skey) if not cli: return info = ClientInfo(**cli.get("info", {})) root = Element("Clients") row = SubElement(root, "Row") # __id = client_id + port, e.g. "9998BASE11" combined_id = f"{client_id}{keyrec.port}" if keyrec.port else str(client_id) SubElement(row, "__id").text = combined_id # Client fields from API SubElement(row, "Name").text = info.Name SubElement(row, "Alias").text = info.Alias or "" SubElement(row, "Location").text = info.Location SubElement(row, "loc2").text = f"{info.area_code} {info.area}".strip() SubElement(row, "BusPhone").text = info.BusPhone or "" SubElement(row, "Email").text = info.Email or "" SubElement(row, "OKPassword").text = info.OKPassword or "" SubElement(row, "SpecRequest").text = info.SpecRequest or "" SubElement(row, "NoSigsMon").text = info.NoSigsMon SubElement(row, "SinceDays").text = str(info.SinceDays) SubElement(row, "SinceHrs").text = str(info.SinceHrs) SubElement(row, "SinceMins").text = str(info.SinceMins) SubElement(row, "ResetNosigsIgnored").text = _bool_text(info.ResetNosigsIgnored) SubElement(row, "ResetNosigsDays").text = str(info.ResetNosigsDays) SubElement(row, "ResetNosigsHrs").text = str(info.ResetNosigsHrs) SubElement(row, "ResetNosigsMins").text = str(info.ResetNosigsMins) SubElement(row, "InstallDateTime").text = info.InstallDateTime or "" SubElement(row, "PanelName").text = info.PanelName SubElement(row, "PanelSite").text = info.PanelSite SubElement(row, "KeypadLocation").text = info.KeypadLocation or "" SubElement(row, "SPPage").text = info.SPPage or "" # Per-key forced fields (from keys.json) SubElement(row, "Installer").text = keyrec.installer_name or "" SubElement(row, "UseGlobCallOuts").text = _bool_text(keyrec.use_glob_callouts) SubElement(row, "ShowOnCallOuts").text = _bool_text(keyrec.show_on_callouts) SubElement(row, "GlobCallOuts").text = keyrec.glob_callouts or "" SubElement(row, "UseGlobCallOuts2").text = _bool_text(keyrec.use_glob_callouts2) SubElement(row, "ShowOnCallOuts2").text = _bool_text(keyrec.show_on_callouts2) SubElement(row, "GlobCallOuts2").text = keyrec.glob_callouts2 or "" SubElement(row, "AltLookup").text = _bool_text(keyrec.alt_lookup) SubElement(row, "AltAlarmNo").text = keyrec.alt_alarm_no or "" SubElement(row, "ConvertType").text = keyrec.convert_type or "None" SubElement(row, "SIGINTERPRET").text = keyrec.siginterpret or "SIADecimal" # ClientGroupings from keys.json cgs_el = SubElement(row, "ClientGroupings") if keyrec.client_groupings: for cg in keyrec.client_groupings: cg_el = SubElement(cgs_el, "ClientGrouping") SubElement(cg_el, "Description").text = cg.description SubElement(cg_el, "GroupingTypeDescription").text = cg.grouping_type_description SubElement(cg_el, "GroupingAllowMultiple").text = _bool_text( cg.grouping_allow_multiple ) else: # Fallback default if nothing configured (optional) cg_el = SubElement(cgs_el, "ClientGrouping") SubElement(cg_el, "Description").text = "Alarm24" SubElement(cg_el, "GroupingTypeDescription").text = "Alarm24 Tilgang" SubElement(cg_el, "GroupingAllowMultiple").text = "True" # Zones zones_el = SubElement(row, "Zones") zones = cli.get("zones") or {} for zid_str in sorted(zones.keys(), key=lambda x: int(x)): z = zones[zid_str] or {} z_el = SubElement(zones_el, "Zone") SubElement(z_el, "Zone_area").text = z.get("Zone_area", "") SubElement(z_el, "Zone_No").text = zid_str SubElement(z_el, "ModuleNo").text = str(z.get("ModuleNo", 0)) # Users (from API) users_el = SubElement(row, "Users") users = cli.get("users") or {} def _sort_user(u: dict): return (u.get("CallOrder", 0), u.get("MobileNoOrder", 0), u.get("UserNo", 0)) for _, u in sorted(users.items(), key=lambda kv: _sort_user(kv[1])): u_el = SubElement(users_el, "User") SubElement(u_el, "User_Name").text = u.get("User_Name", "") SubElement(u_el, "MobileNo").text = u.get("MobileNo", "") SubElement(u_el, "MobileNoOrder").text = str(u.get("MobileNoOrder", 0)) SubElement(u_el, "Email").text = u.get("Email", "") SubElement(u_el, "Type").text = u.get("Type", "U") SubElement(u_el, "UserNo").text = str(u.get("UserNo", "")) SubElement(u_el, "Instructions").text = u.get("Instructions", "") or "" SubElement(u_el, "CallOrder").text = str(u.get("CallOrder", 0)) # Extra forced installer user (type "N", UserNo 199) inst_user_el = SubElement(users_el, "User") SubElement(inst_user_el, "User_Name").text = keyrec.installer_name or "" SubElement(inst_user_el, "Email").text = keyrec.installer_email or "" SubElement(inst_user_el, "UserNo").text = "199" SubElement(inst_user_el, "CallOrder").text = "0" SubElement(inst_user_el, "Type").text = "N" path = _xml_path_for(client_id, keyrec) try: import xml.dom.minidom as minidom tmp_path = path + ".tmp" ElementTree(root).write(tmp_path, encoding="utf-8", xml_declaration=True) with open(tmp_path, "rb") as rf: dom = minidom.parseString(rf.read()) pretty = dom.toprettyxml(indent=" ", encoding="utf-8") with open(tmp_path, "wb") as wf: wf.write(pretty) os.replace(tmp_path, path) except Exception: ElementTree(root).write(path, encoding="utf-8", xml_declaration=True) # ------------ App & logging middleware ------------ _load_store() app = FastAPI(title="Client Registry API", version="3.3.0") logging.basicConfig( filename="api_requests.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) REDACT_KEYS = {"authorization", "password", "secret", "key"} MAX_BODY_CHARS = 4000 def _redact(obj): try: if isinstance(obj, dict): return {k: ("***" if k.lower() in REDACT_KEYS else _redact(v)) for k, v in obj.items()} if isinstance(obj, list): return [_redact(v) for v in obj] except Exception: pass return obj @app.middleware("http") async def log_requests(request: Request, call_next): start = time() method = request.method.upper() body_text = "" if method in {"POST", "PUT", "PATCH", "DELETE"}: try: raw = await request.body() async def receive(): return {"type": "http.request", "body": raw, "more_body": False} request._receive = receive try: parsed = json.loads(raw.decode("utf-8") if raw else "{}") body_text = json.dumps(_redact(parsed))[:MAX_BODY_CHARS] except Exception: body_text = (raw.decode("utf-8", "replace") if raw else "")[:MAX_BODY_CHARS] except Exception as e: body_text = f"<>" response = await call_next(request) duration = time() - start logging.info( "%s %s - %s - %.3fs - body=%s", method, request.url.path, response.status_code, duration, body_text, ) return response # --------- Routes (unchanged logic, but keyrec now has client_groupings) --------- # UPSERT CLIENT (create if missing, update if exists) @app.put("/clients", status_code=200) def upsert_client( payload: ClientCreate, keyrec: KeyRecord = Depends(require_api_key), response: Response = None, ): client_id = payload.client_id logging.info( "AUDIT upsert_client key=%s client_id=%s payload=%s", keyrec.key_name, client_id, payload.model_dump(), ) info = payload.info.model_dump() skey = _store_key(client_id, keyrec) with _store_lock: existed = skey in _store["clients"] if existed: cli = _store["clients"][skey] cli["info"] = info else: cli = {"info": info, "zones": {}, "users": {}} _store["clients"][skey] = cli if response is not None: response.status_code = status.HTTP_200_OK if existed else status.HTTP_201_CREATED _flush_store() write_client_xml(client_id, keyrec) return {"client_id": client_id} # PATCH CLIENT INFO (partial replace of info) @app.patch("/clients", status_code=200) def patch_client( payload: ClientPatch, keyrec: KeyRecord = Depends(require_api_key), ): client_id = payload.client_id logging.info( "AUDIT patch_client key=%s client_id=%s payload=%s", keyrec.key_name, client_id, payload.model_dump(), ) with _store_lock: cli = _require_client(client_id, keyrec) if payload.info is not None: current_info = cli.get("info", {}) base = ClientInfo(**current_info).model_dump() updates = payload.info.model_dump(exclude_unset=True) merged = {**base, **updates} merged_valid = ClientInfo(**merged).model_dump() cli["info"] = merged_valid _flush_store() write_client_xml(client_id, keyrec) return {"client_id": client_id, **cli} # GET CLIENT (with header X-Client-Id, auto-regens XML if missing) @app.get("/clients") def get_client( x_client_id: int = Header(..., alias="X-Client-Id"), keyrec: KeyRecord = Depends(require_api_key), ): client_id = x_client_id logging.info("AUDIT get_client key=%s client_id=%s", keyrec.key_name, client_id) cli = _require_client(client_id, keyrec) xml_path = _xml_path_for(client_id, keyrec) if not os.path.exists(xml_path): logging.info( "XML missing for client_id=%s (key=%s). Regenerating...", client_id, keyrec.key_name, ) write_client_xml(client_id, keyrec) return {"client_id": client_id, **cli} @app.post("/clients/get", status_code=200) def get_client_body(payload: ClientID, keyrec: KeyRecord = Depends(require_api_key)): client_id = payload.client_id logging.info("AUDIT get_client key=%s client_id=%s", keyrec.key_name, client_id) cli = _require_client(client_id, keyrec) xml_path = _xml_path_for(client_id, keyrec) if not os.path.exists(xml_path): logging.info( "XML missing for client_id=%s (key=%s). Regenerating...", client_id, keyrec.key_name, ) write_client_xml(client_id, keyrec) return {"client_id": client_id, **cli} # REGEN XML explicitly @app.post("/clients/regen-xml", status_code=200) def regen_xml(payload: ClientID, keyrec: KeyRecord = Depends(require_api_key)): client_id = payload.client_id logging.info("AUDIT regen_xml key=%s client_id=%s", keyrec.key_name, client_id) _require_client(client_id, keyrec) write_client_xml(client_id, keyrec) return {"client_id": client_id, "xml": "regenerated"} # DELETE ENTIRE CLIENT (for this key only) @app.delete("/clients", status_code=204) def delete_client(payload: ClientID, keyrec: KeyRecord = Depends(require_api_key)): client_id = payload.client_id logging.info("AUDIT delete_client key=%s client_id=%s", keyrec.key_name, client_id) skey = _store_key(client_id, keyrec) with _store_lock: if skey not in _store["clients"]: raise HTTPException(status_code=404, detail="Client not found") del _store["clients"][skey] _flush_store() try: xml_path = _xml_path_for(client_id, keyrec) if os.path.exists(xml_path): os.remove(xml_path) except Exception as e: logging.warning( "Failed to remove XML file for client %s (key=%s): %s", client_id, keyrec.key_name, e, ) return # ---- Zones ---- @app.get("/clients/zones") def list_zones( x_client_id: int = Header(..., alias="X-Client-Id"), keyrec: KeyRecord = Depends(require_api_key), ): client_id = x_client_id logging.info("AUDIT list_zones key=%s client_id=%s", keyrec.key_name, client_id) cli = _require_client(client_id, keyrec) zones_obj = cli.get("zones", {}) zones = [ {"Zone_No": int(zid), "Zone_area": data.get("Zone_area"), "ModuleNo": data.get("ModuleNo", 0)} for zid, data in zones_obj.items() ] zones.sort(key=lambda z: z["Zone_No"]) return {"client_id": client_id, "zones": zones} @app.post("/clients/zones/list", status_code=200) def list_zones_body(payload: ClientID, keyrec: KeyRecord = Depends(require_api_key)): client_id = payload.client_id logging.info("AUDIT list_zones key=%s client_id=%s", keyrec.key_name, client_id) cli = _require_client(client_id, keyrec) zones_obj = cli.get("zones", {}) zones = [ {"Zone_No": int(zid), "Zone_area": data.get("Zone_area"), "ModuleNo": data.get("ModuleNo", 0)} for zid, data in zones_obj.items() ] zones.sort(key=lambda z: z["Zone_No"]) return {"client_id": client_id, "zones": zones} @app.put("/clients/zones", status_code=201) def upsert_zone(payload: ZoneUpdate, keyrec: KeyRecord = Depends(require_api_key)): client_id = payload.client_id zone_id = payload.zone_id logging.info( "AUDIT upsert_zone key=%s client_id=%s zone_id=%s body=%s", keyrec.key_name, client_id, zone_id, payload.model_dump(), ) if not (1 <= zone_id <= 999): raise HTTPException(status_code=422, detail="zone_id must be 1..999") with _store_lock: cli = _require_client(client_id, keyrec) cli.setdefault("zones", {}) cli["zones"][str(zone_id)] = { "Zone_area": payload.Zone_area, "ModuleNo": payload.ModuleNo, } _flush_store() write_client_xml(client_id, keyrec) return { "client_id": client_id, "zone": {"Zone_No": zone_id, "Zone_area": payload.Zone_area, "ModuleNo": payload.ModuleNo}, } @app.delete("/clients/zones", status_code=204) def delete_zone(payload: ZoneDelete, keyrec: KeyRecord = Depends(require_api_key)): client_id = payload.client_id zone_id = payload.zone_id logging.info( "AUDIT delete_zone key=%s client_id=%s zone_id=%s", keyrec.key_name, client_id, zone_id, ) with _store_lock: cli = _require_client(client_id, keyrec) zones = cli.get("zones", {}) if str(zone_id) not in zones: raise HTTPException(status_code=404, detail="Zone not found") del zones[str(zone_id)] _flush_store() write_client_xml(client_id, keyrec) return # ---- Users ---- @app.get("/clients/users") def list_users( x_client_id: int = Header(..., alias="X-Client-Id"), keyrec: KeyRecord = Depends(require_api_key), ): client_id = x_client_id logging.info("AUDIT list_users key=%s client_id=%s", keyrec.key_name, client_id) cli = _require_client(client_id, keyrec) users = list(cli.get("users", {}).values()) users.sort(key=lambda u: (u.get("CallOrder", 0), u.get("MobileNoOrder", 0), u.get("UserNo", 0))) return {"client_id": client_id, "users": users} @app.get("/clients/users/single") def get_user_header( x_client_id: int = Header(..., alias="X-Client-Id"), x_user_no: int = Header(..., alias="X-User-No"), keyrec: KeyRecord = Depends(require_api_key), ): client_id = x_client_id user_no = x_user_no logging.info( "AUDIT get_user key=%s client_id=%s user_no=%s", keyrec.key_name, client_id, user_no, ) cli = _require_client(client_id, keyrec) u = cli.get("users", {}).get(str(user_no)) if not u: raise HTTPException(status_code=404, detail="User not found") return {"client_id": client_id, "user": u} @app.post("/clients/users/list", status_code=200) def list_users_body(payload: ClientID, keyrec: KeyRecord = Depends(require_api_key)): client_id = payload.client_id logging.info("AUDIT list_users key=%s client_id=%s", keyrec.key_name, client_id) cli = _require_client(client_id, keyrec) users = list(cli.get("users", {}).values()) users.sort(key=lambda u: (u.get("CallOrder", 0), u.get("MobileNoOrder", 0), u.get("UserNo", 0))) return {"client_id": client_id, "users": users} @app.post("/clients/users", status_code=201) def add_user(payload: UserWithClient, keyrec: KeyRecord = Depends(require_api_key)): client_id = payload.client_id user = payload.user logging.info( "AUDIT add_user key=%s client_id=%s UserNo=%s user=%s", keyrec.key_name, client_id, user.UserNo, user.model_dump(), ) with _store_lock: cli = _require_client(client_id, keyrec) cli.setdefault("users", {}) if str(user.UserNo) in cli["users"]: raise HTTPException(status_code=409, detail="UserNo already exists") cli["users"][str(user.UserNo)] = user.model_dump() _flush_store() write_client_xml(client_id, keyrec) return {"client_id": client_id, "user": user} @app.post("/clients/users/get", status_code=200) def get_user_body(payload: UserKey, keyrec: KeyRecord = Depends(require_api_key)): client_id = payload.client_id user_no = payload.user_no logging.info( "AUDIT get_user key=%s client_id=%s user_no=%s", keyrec.key_name, client_id, user_no, ) cli = _require_client(client_id, keyrec) u = cli.get("users", {}).get(str(user_no)) if not u: raise HTTPException(status_code=404, detail="User not found") return {"client_id": client_id, "user": u} @app.put("/clients/users", status_code=200) def replace_user(payload: UserWithClient, keyrec: KeyRecord = Depends(require_api_key)): client_id = payload.client_id user = payload.user user_no = user.UserNo logging.info( "AUDIT replace_user key=%s client_id=%s user_no=%s user=%s", keyrec.key_name, client_id, user_no, user.model_dump(), ) with _store_lock: cli = _require_client(client_id, keyrec) cli.setdefault("users", {}) cli["users"][str(user_no)] = user.model_dump() _flush_store() write_client_xml(client_id, keyrec) return {"client_id": client_id, "user": user} @app.delete("/clients/users", status_code=204) def delete_user(payload: UserKey, keyrec: KeyRecord = Depends(require_api_key)): client_id = payload.client_id user_no = payload.user_no logging.info( "AUDIT delete_user key=%s client_id=%s user_no=%s", keyrec.key_name, client_id, user_no, ) with _store_lock: cli = _require_client(client_id, keyrec) users = cli.get("users", {}) if str(user_no) not in users: raise HTTPException(status_code=404, detail="User not found") del users[str(user_no)] _flush_store() write_client_xml(client_id, keyrec) return