A24_Patriot_API/xml_combine_for_import.py

434 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
import xml.etree.ElementTree as ET
from pathlib import Path
import logging
import os
import copy
import time
import socket
from datetime import datetime, timedelta
from io import BytesIO
from smb.SMBConnection import SMBConnection # pip install pysmb
import json
CLIENT_STATE_FILE = Path("/opt/patriot_api/client_state.json")
os.makedirs(CLIENT_STATE_FILE.parent, exist_ok=True)
def load_client_state() -> dict:
if CLIENT_STATE_FILE.is_file():
try:
return json.loads(CLIENT_STATE_FILE.read_text(encoding="utf-8"))
except Exception as e:
logging.error("Failed to read client state file %s: %s", CLIENT_STATE_FILE, e)
return {}
return {}
def save_client_state(state: dict):
try:
tmp = CLIENT_STATE_FILE.with_suffix(".tmp")
tmp.write_text(json.dumps(state, indent=2, ensure_ascii=False), encoding="utf-8")
os.replace(tmp, CLIENT_STATE_FILE)
except Exception as e:
logging.error("Failed to write client state file %s: %s", CLIENT_STATE_FILE, e)
def mark_clients_pending_import(client_ids: list[str]):
if not client_ids:
return
state = load_client_state()
now_ts = datetime.now().isoformat(timespec="seconds")
for cid in client_ids:
state[cid] = {
"status": "pending_import",
"last_batch": now_ts,
}
save_client_state(state)
# --------- CONFIG ---------
XML_ROOT_PATH = Path("/opt/patriot_api/out/clients") # per-client XMLs (from main_v2)
READY_DIR = Path("/opt/patriot_api/ready_for_import") # combined XML output
COMBINED_FILENAME = "clients.xml"
LOG_FILE = "/opt/patriot_api/xml_combine.log"
ERROR_LOG_FILE = "/opt/patriot_api/import_errors.log"
# Script will run 5 minutes before every hour (hh:55), so no fixed RUN_INTERVAL needed
RUN_INTERVAL = 3600 # still used as a safety fallback if needed
MAX_CLIENTS_PER_RUN = 300
# SMB / Windows share config (no kernel mount needed)
SMB_ENABLED = True
SMB_SERVER_IP = "10.181.149.83" # Windows server IP
SMB_SERVER_NAME = "PATRIOT" # NetBIOS/hostname (can be anything if IP is used, but fill something)
SMB_SHARE_NAME = "api_import" # Share name (from //IP/share)
# Guest access: empty username/password, use_ntlm_v2=False
SMB_USERNAME = "administrator" # empty = guest
SMB_PASSWORD = "wprs100qq!" # empty = guest
SMB_DOMAIN = "WORKGROUP" # often empty for guest
# Remote path inside the share where clients.xml will be stored
# e.g. "clients/clients.xml" or just "clients.xml"
SMB_REMOTE_PATH = "clients.xml"
SMB_RESULTS_FILENAME = "clients_Import_Results.txt"
# --------------------------
os.makedirs(READY_DIR, exist_ok=True)
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
os.makedirs(os.path.dirname(ERROR_LOG_FILE), exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(LOG_FILE, encoding="utf-8"),
logging.StreamHandler(),
],
)
def log_missing_ids_to_error_log(missing_ids: list[str]):
if not missing_ids:
return
try:
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(ERROR_LOG_FILE, "a", encoding="utf-8") as f:
for cid in missing_ids:
f.write(f"{ts} Missing or failed import for client __id={cid}\n")
except Exception as e:
logging.error("Failed to write to error log %s: %s", ERROR_LOG_FILE, e)
def load_combined_root(path: Path) -> ET.Element:
if not path.is_file():
logging.info("Combined XML %s does not exist yet, creating new.", path)
return ET.Element("Clients")
try:
tree = ET.parse(path)
root = tree.getroot()
if root.tag != "Clients":
logging.warning(
"Combined XML %s has root <%s> instead of <Clients>; recreating.",
path,
root.tag,
)
return ET.Element("Clients")
return root
except Exception as e:
logging.error("Failed to parse existing combined XML %s: %s; recreating.", path, e)
return ET.Element("Clients")
def rows_by_id(root: ET.Element) -> dict[str, ET.Element]:
mapping = {}
for row in root.findall("Row"):
id_el = row.find("__id")
if id_el is None or not (id_el.text or "").strip():
continue
cid = id_el.text.strip()
mapping[cid] = row
return mapping
def upload_to_smb(local_path: Path) -> bool:
"""
Upload the combined XML to the Windows share using SMB.
Before uploading:
- If an existing clients.xml and clients_Import_Results.txt are present
on the SMB share, verify that each __id in that clients.xml has a line
containing that id and the word "Successful" in the results file.
- Any IDs not matching are logged to ERROR_LOG_FILE.
Returns:
True -> upload considered successful (or SMB disabled)
False -> upload failed, caller should NOT delete source XMLs
"""
if not SMB_ENABLED:
logging.info("SMB upload disabled in config; skipping upload but treating as success.")
return True
if not local_path.is_file():
logging.error("SMB upload: local file %s does not exist", local_path)
return False
try:
my_name = socket.gethostname() or "patriot-api"
conn = SMBConnection(
SMB_USERNAME,
SMB_PASSWORD,
my_name,
SMB_SERVER_NAME,
domain=SMB_DOMAIN,
use_ntlm_v2=False, # set True if you move to proper user+password auth
is_direct_tcp=True, # connect directly to port 445
)
logging.info("SMB: connecting to %s (%s)...", SMB_SERVER_NAME, SMB_SERVER_IP)
if not conn.connect(SMB_SERVER_IP, 445, timeout=10):
logging.error("SMB: failed to connect to %s", SMB_SERVER_IP)
return False
# Split directory and filename in remote path
remote_dir, remote_name = os.path.split(SMB_REMOTE_PATH)
if not remote_dir:
remote_dir = "/"
# Build full paths for existing clients.xml and the results file
if remote_dir in ("", "/"):
remote_clients_path = remote_name
remote_results_path = SMB_RESULTS_FILENAME
else:
rd = remote_dir.rstrip("/")
remote_clients_path = f"{rd}/{remote_name}"
remote_results_path = f"{rd}/{SMB_RESULTS_FILENAME}"
# -------- PRE-UPLOAD CHECK vs clients_Import_Results.txt --------
try:
xml_buf = BytesIO()
res_buf = BytesIO()
# Try to retrieve both files; if either is missing, skip the check
try:
conn.retrieveFile(SMB_SHARE_NAME, remote_clients_path, xml_buf)
conn.retrieveFile(SMB_SHARE_NAME, remote_results_path, res_buf)
xml_buf.seek(0)
res_buf.seek(0)
except Exception as e:
logging.info(
"SMB pre-upload check: could not retrieve existing clients.xml or "
"results file (may not exist yet): %s", e
)
else:
# Parse existing clients.xml to gather IDs
try:
tree_remote = ET.parse(xml_buf)
root_remote = tree_remote.getroot()
remote_ids = set()
for row in root_remote.findall(".//Row"):
id_el = row.find("__id")
if id_el is not None and id_el.text and id_el.text.strip():
remote_ids.add(id_el.text.strip())
except Exception as e:
logging.error("SMB pre-upload check: failed to parse remote clients.xml: %s", e)
remote_ids = set()
# Read results txt lines
results_lines = res_buf.getvalue().decode("utf-8", errors="ignore").splitlines()
missing_ids = []
if remote_ids:
for cid in remote_ids:
found_success = False
for line in results_lines:
# Very generic check: line contains id AND word "Completed processing client"
if "Completed processing client" in line and cid in line:
found_success = True
break
if not found_success:
missing_ids.append(cid)
if missing_ids:
log_missing_ids_to_error_log(missing_ids)
logging.warning(
"SMB pre-upload check: %d client(s) from existing clients.xml "
"do not have 'Completed processing client' result lines. Logged to %s.",
len(missing_ids),
ERROR_LOG_FILE,
)
else:
if remote_ids:
logging.info(
"SMB pre-upload check: all %d client IDs in existing clients.xml "
"appear as 'Completed processing client' in results file.",
len(remote_ids),
)
except Exception as e:
logging.error("SMB: unexpected error during pre-upload check: %s", e)
# -------- HANDLE EXISTING clients.xml (rotate/rename) --------
try:
files = conn.listPath(SMB_SHARE_NAME, remote_dir, pattern=remote_name)
exists = any(f.filename == remote_name for f in files)
except Exception:
exists = False
if exists:
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
base, ext = os.path.splitext(remote_name)
backup_name = f"{base}_{ts}{ext}"
if remote_dir in ("", "/"):
old_path = remote_name
new_path = backup_name
else:
old_path = f"{remote_dir.rstrip('/')}/{remote_name}"
new_path = f"{remote_dir.rstrip('/')}/{backup_name}"
try:
conn.rename(SMB_SHARE_NAME, old_path, new_path)
logging.info("SMB: existing %s renamed to %s", old_path, new_path)
except Exception as e:
logging.error("SMB: failed to rename existing %s: %s", old_path, e)
# -------- UPLOAD NEW clients.xml --------
with open(local_path, "rb") as f:
logging.info(
"SMB: uploading %s to //%s/%s/%s",
local_path,
SMB_SERVER_IP,
SMB_SHARE_NAME,
SMB_REMOTE_PATH,
)
conn.storeFile(SMB_SHARE_NAME, SMB_REMOTE_PATH, f)
logging.info("SMB: upload completed successfully.")
conn.close()
return True
except Exception as e:
logging.error("SMB: error during upload of %s: %s", local_path, e)
return False
def combine_xml_once():
combined_path = READY_DIR / COMBINED_FILENAME
# Scan all per-client XMLs
xml_files = sorted(XML_ROOT_PATH.rglob("*.xml")) # sorted for deterministic order
# Track files to delete after upload
processed_files = []
combined_rows: dict[str, ET.Element] = {}
if xml_files:
logging.info("Found %d new per-client XML file(s) to merge.", len(xml_files))
# Limit how many XMLs (clients) we handle per run
limited_files = xml_files[:MAX_CLIENTS_PER_RUN]
if len(xml_files) > MAX_CLIENTS_PER_RUN:
logging.info(
"Limiting this run to %d clients; %d will be processed in later runs.",
MAX_CLIENTS_PER_RUN,
len(xml_files) - MAX_CLIENTS_PER_RUN,
)
for path in limited_files:
logging.info(" Reading %s", path)
try:
tree = ET.parse(path)
root = tree.getroot()
except Exception as e:
logging.error(" Failed to parse %s: %s", path, e)
# Keep previous behavior: delete bad file
processed_files.append(path)
continue
processed_files.append(path)
# Extract rows
if root.tag == "Clients":
rows = root.findall("Row")
elif root.tag == "Row":
rows = [root]
else:
rows = root.findall(".//Row")
for row in rows:
id_el = row.find("__id")
if id_el is None or not (id_el.text or "").strip():
logging.warning(" Skipping row without __id in %s", path)
continue
cid = id_el.text.strip()
logging.info(" Including client __id=%s", cid)
combined_rows[cid] = copy.deepcopy(row)
else:
logging.info("No NEW client XMLs found locally — will upload an empty Clients file.")
# Build XML root (may be empty)
new_root = ET.Element("Clients")
for cid, row in combined_rows.items():
new_root.append(row)
# Always write combined XML (new or empty)
tmp_path = combined_path.with_suffix(".tmp")
try:
ET.ElementTree(new_root).write(tmp_path, encoding="utf-8", xml_declaration=True)
os.replace(tmp_path, combined_path)
logging.info("Wrote combined (or empty) clients.xml to %s", combined_path)
except Exception as e:
logging.error("Failed writing combined XML: %s", e)
if os.path.exists(tmp_path):
os.remove(tmp_path)
return
# Rotate & upload to SMB
upload_ok = upload_to_smb(combined_path)
if not upload_ok:
logging.warning(
"SMB upload failed; keeping per-client XMLs so this batch can be retried next run."
)
return
# Mark all combined client IDs as pending_import
try:
client_ids_in_batch = list(combined_rows.keys()) # these are the __id values
mark_clients_pending_import(client_ids_in_batch)
logging.info(
"Marked %d client(s) as pending_import in %s",
len(client_ids_in_batch),
CLIENT_STATE_FILE,
)
except Exception as e:
logging.error("Failed to update client pending_import state: %s", e)
# Delete ONLY the processed new XMLs if upload succeeded
for src in processed_files:
try:
src.unlink()
logging.info("Deleted processed source XML: %s", src)
except Exception as e:
logging.error("Failed to delete %s: %s", src, e)
def sleep_until_next_run():
"""
Sleep until 1 minute before the next full hour (hh:59).
"""
now = datetime.now()
# Top of the next hour
next_hour = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
run_time = next_hour - timedelta(minutes=1) # hh:59
# If we are already past that run_time (e.g., started at 10:59:30), move to the next hour
if run_time <= now:
next_hour = next_hour + timedelta(hours=1)
run_time = next_hour - timedelta(minutes=1) # still hh:59, just next hour
delta = (run_time - now).total_seconds()
logging.info("Next combine run scheduled at %s (in %.0f seconds)", run_time, delta)
time.sleep(delta)
def main():
while True:
sleep_until_next_run()
logging.info("==== XML combine run ====")
combine_xml_once()
# Safety fallback: if anything goes wrong with time calc, we still avoid tight loop
time.sleep(1)
if __name__ == "__main__":
main()