435 lines
16 KiB
Python
Executable File
435 lines
16 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import Path
|
|
import logging
|
|
import os
|
|
import copy
|
|
import time
|
|
import shutil
|
|
import socket
|
|
from datetime import datetime, timedelta
|
|
from io import BytesIO
|
|
from smb.SMBConnection import SMBConnection # pip install pysmb
|
|
import json
|
|
|
|
CLIENT_STATE_FILE = Path("/opt/patriot_api/client_state.json")
|
|
os.makedirs(CLIENT_STATE_FILE.parent, exist_ok=True)
|
|
|
|
def load_client_state() -> dict:
|
|
if CLIENT_STATE_FILE.is_file():
|
|
try:
|
|
return json.loads(CLIENT_STATE_FILE.read_text(encoding="utf-8"))
|
|
except Exception as e:
|
|
logging.error("Failed to read client state file %s: %s", CLIENT_STATE_FILE, e)
|
|
return {}
|
|
return {}
|
|
|
|
def save_client_state(state: dict):
|
|
try:
|
|
tmp = CLIENT_STATE_FILE.with_suffix(".tmp")
|
|
tmp.write_text(json.dumps(state, indent=2, ensure_ascii=False), encoding="utf-8")
|
|
os.replace(tmp, CLIENT_STATE_FILE)
|
|
except Exception as e:
|
|
logging.error("Failed to write client state file %s: %s", CLIENT_STATE_FILE, e)
|
|
|
|
def mark_clients_pending_import(client_ids: list[str]):
|
|
if not client_ids:
|
|
return
|
|
state = load_client_state()
|
|
now_ts = datetime.now().isoformat(timespec="seconds")
|
|
for cid in client_ids:
|
|
state[cid] = {
|
|
"status": "pending_import",
|
|
"last_batch": now_ts,
|
|
}
|
|
save_client_state(state)
|
|
|
|
|
|
# --------- CONFIG ---------
|
|
|
|
XML_ROOT_PATH = Path("/opt/patriot_api/out/clients") # per-client XMLs (from main_v2)
|
|
READY_DIR = Path("/opt/patriot_api/ready_for_import") # combined XML output
|
|
COMBINED_FILENAME = "clients.xml"
|
|
LOG_FILE = "/opt/patriot_api/xml_combine.log"
|
|
ERROR_LOG_FILE = "/opt/patriot_api/import_errors.log"
|
|
|
|
# Script will run 5 minutes before every hour (hh:55), so no fixed RUN_INTERVAL needed
|
|
RUN_INTERVAL = 3600 # still used as a safety fallback if needed
|
|
MAX_CLIENTS_PER_RUN = 300
|
|
|
|
# SMB / Windows share config (no kernel mount needed)
|
|
SMB_ENABLED = True
|
|
|
|
SMB_SERVER_IP = "10.181.149.83" # Windows server IP
|
|
SMB_SERVER_NAME = "PATRIOT" # NetBIOS/hostname (can be anything if IP is used, but fill something)
|
|
SMB_SHARE_NAME = "api_import" # Share name (from //IP/share)
|
|
|
|
# Guest access: empty username/password, use_ntlm_v2=False
|
|
SMB_USERNAME = "administrator" # empty = guest
|
|
SMB_PASSWORD = "wprs100qq!" # empty = guest
|
|
SMB_DOMAIN = "WORKGROUP" # often empty for guest
|
|
|
|
# Remote path inside the share where clients.xml will be stored
|
|
# e.g. "clients/clients.xml" or just "clients.xml"
|
|
SMB_REMOTE_PATH = "clients.xml"
|
|
SMB_RESULTS_FILENAME = "clients_Import_Results.txt"
|
|
|
|
# --------------------------
|
|
|
|
os.makedirs(READY_DIR, exist_ok=True)
|
|
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
|
|
os.makedirs(os.path.dirname(ERROR_LOG_FILE), exist_ok=True)
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
handlers=[
|
|
logging.FileHandler(LOG_FILE, encoding="utf-8"),
|
|
logging.StreamHandler(),
|
|
],
|
|
)
|
|
|
|
def log_missing_ids_to_error_log(missing_ids: list[str]):
|
|
if not missing_ids:
|
|
return
|
|
try:
|
|
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
with open(ERROR_LOG_FILE, "a", encoding="utf-8") as f:
|
|
for cid in missing_ids:
|
|
f.write(f"{ts} Missing or failed import for client __id={cid}\n")
|
|
except Exception as e:
|
|
logging.error("Failed to write to error log %s: %s", ERROR_LOG_FILE, e)
|
|
|
|
|
|
def load_combined_root(path: Path) -> ET.Element:
|
|
if not path.is_file():
|
|
logging.info("Combined XML %s does not exist yet, creating new.", path)
|
|
return ET.Element("Clients")
|
|
|
|
try:
|
|
tree = ET.parse(path)
|
|
root = tree.getroot()
|
|
if root.tag != "Clients":
|
|
logging.warning(
|
|
"Combined XML %s has root <%s> instead of <Clients>; recreating.",
|
|
path,
|
|
root.tag,
|
|
)
|
|
return ET.Element("Clients")
|
|
return root
|
|
except Exception as e:
|
|
logging.error("Failed to parse existing combined XML %s: %s; recreating.", path, e)
|
|
return ET.Element("Clients")
|
|
|
|
|
|
def rows_by_id(root: ET.Element) -> dict[str, ET.Element]:
|
|
mapping = {}
|
|
for row in root.findall("Row"):
|
|
id_el = row.find("__id")
|
|
if id_el is None or not (id_el.text or "").strip():
|
|
continue
|
|
cid = id_el.text.strip()
|
|
mapping[cid] = row
|
|
return mapping
|
|
|
|
|
|
def upload_to_smb(local_path: Path) -> bool:
|
|
"""
|
|
Upload the combined XML to the Windows share using SMB.
|
|
|
|
Before uploading:
|
|
- If an existing clients.xml and clients_Import_Results.txt are present
|
|
on the SMB share, verify that each __id in that clients.xml has a line
|
|
containing that id and the word "Successful" in the results file.
|
|
- Any IDs not matching are logged to ERROR_LOG_FILE.
|
|
|
|
Returns:
|
|
True -> upload considered successful (or SMB disabled)
|
|
False -> upload failed, caller should NOT delete source XMLs
|
|
"""
|
|
if not SMB_ENABLED:
|
|
logging.info("SMB upload disabled in config; skipping upload but treating as success.")
|
|
return True
|
|
|
|
if not local_path.is_file():
|
|
logging.error("SMB upload: local file %s does not exist", local_path)
|
|
return False
|
|
|
|
try:
|
|
my_name = socket.gethostname() or "patriot-api"
|
|
conn = SMBConnection(
|
|
SMB_USERNAME,
|
|
SMB_PASSWORD,
|
|
my_name,
|
|
SMB_SERVER_NAME,
|
|
domain=SMB_DOMAIN,
|
|
use_ntlm_v2=False, # set True if you move to proper user+password auth
|
|
is_direct_tcp=True, # connect directly to port 445
|
|
)
|
|
|
|
logging.info("SMB: connecting to %s (%s)...", SMB_SERVER_NAME, SMB_SERVER_IP)
|
|
if not conn.connect(SMB_SERVER_IP, 445, timeout=10):
|
|
logging.error("SMB: failed to connect to %s", SMB_SERVER_IP)
|
|
return False
|
|
|
|
# Split directory and filename in remote path
|
|
remote_dir, remote_name = os.path.split(SMB_REMOTE_PATH)
|
|
if not remote_dir:
|
|
remote_dir = "/"
|
|
|
|
# Build full paths for existing clients.xml and the results file
|
|
if remote_dir in ("", "/"):
|
|
remote_clients_path = remote_name
|
|
remote_results_path = SMB_RESULTS_FILENAME
|
|
else:
|
|
rd = remote_dir.rstrip("/")
|
|
remote_clients_path = f"{rd}/{remote_name}"
|
|
remote_results_path = f"{rd}/{SMB_RESULTS_FILENAME}"
|
|
|
|
# -------- PRE-UPLOAD CHECK vs clients_Import_Results.txt --------
|
|
try:
|
|
xml_buf = BytesIO()
|
|
res_buf = BytesIO()
|
|
|
|
# Try to retrieve both files; if either is missing, skip the check
|
|
try:
|
|
conn.retrieveFile(SMB_SHARE_NAME, remote_clients_path, xml_buf)
|
|
conn.retrieveFile(SMB_SHARE_NAME, remote_results_path, res_buf)
|
|
xml_buf.seek(0)
|
|
res_buf.seek(0)
|
|
except Exception as e:
|
|
logging.info(
|
|
"SMB pre-upload check: could not retrieve existing clients.xml or "
|
|
"results file (may not exist yet): %s", e
|
|
)
|
|
else:
|
|
# Parse existing clients.xml to gather IDs
|
|
try:
|
|
tree_remote = ET.parse(xml_buf)
|
|
root_remote = tree_remote.getroot()
|
|
|
|
remote_ids = set()
|
|
for row in root_remote.findall(".//Row"):
|
|
id_el = row.find("__id")
|
|
if id_el is not None and id_el.text and id_el.text.strip():
|
|
remote_ids.add(id_el.text.strip())
|
|
except Exception as e:
|
|
logging.error("SMB pre-upload check: failed to parse remote clients.xml: %s", e)
|
|
remote_ids = set()
|
|
|
|
# Read results txt lines
|
|
results_lines = res_buf.getvalue().decode("utf-8", errors="ignore").splitlines()
|
|
|
|
missing_ids = []
|
|
if remote_ids:
|
|
for cid in remote_ids:
|
|
found_success = False
|
|
for line in results_lines:
|
|
# Very generic check: line contains id AND word "Completed processing client"
|
|
if "Completed processing client" in line and cid in line:
|
|
found_success = True
|
|
break
|
|
if not found_success:
|
|
missing_ids.append(cid)
|
|
|
|
if missing_ids:
|
|
log_missing_ids_to_error_log(missing_ids)
|
|
logging.warning(
|
|
"SMB pre-upload check: %d client(s) from existing clients.xml "
|
|
"do not have 'Completed processing client' result lines. Logged to %s.",
|
|
len(missing_ids),
|
|
ERROR_LOG_FILE,
|
|
)
|
|
else:
|
|
if remote_ids:
|
|
logging.info(
|
|
"SMB pre-upload check: all %d client IDs in existing clients.xml "
|
|
"appear as 'Completed processing client' in results file.",
|
|
len(remote_ids),
|
|
)
|
|
except Exception as e:
|
|
logging.error("SMB: unexpected error during pre-upload check: %s", e)
|
|
|
|
# -------- HANDLE EXISTING clients.xml (rotate/rename) --------
|
|
try:
|
|
files = conn.listPath(SMB_SHARE_NAME, remote_dir, pattern=remote_name)
|
|
exists = any(f.filename == remote_name for f in files)
|
|
except Exception:
|
|
exists = False
|
|
|
|
if exists:
|
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
base, ext = os.path.splitext(remote_name)
|
|
backup_name = f"{base}_{ts}{ext}"
|
|
|
|
if remote_dir in ("", "/"):
|
|
old_path = remote_name
|
|
new_path = backup_name
|
|
else:
|
|
old_path = f"{remote_dir.rstrip('/')}/{remote_name}"
|
|
new_path = f"{remote_dir.rstrip('/')}/{backup_name}"
|
|
|
|
try:
|
|
conn.rename(SMB_SHARE_NAME, old_path, new_path)
|
|
logging.info("SMB: existing %s renamed to %s", old_path, new_path)
|
|
except Exception as e:
|
|
logging.error("SMB: failed to rename existing %s: %s", old_path, e)
|
|
|
|
# -------- UPLOAD NEW clients.xml --------
|
|
with open(local_path, "rb") as f:
|
|
logging.info(
|
|
"SMB: uploading %s to //%s/%s/%s",
|
|
local_path,
|
|
SMB_SERVER_IP,
|
|
SMB_SHARE_NAME,
|
|
SMB_REMOTE_PATH,
|
|
)
|
|
conn.storeFile(SMB_SHARE_NAME, SMB_REMOTE_PATH, f)
|
|
|
|
logging.info("SMB: upload completed successfully.")
|
|
conn.close()
|
|
return True
|
|
|
|
except Exception as e:
|
|
logging.error("SMB: error during upload of %s: %s", local_path, e)
|
|
return False
|
|
|
|
|
|
|
|
|
|
def combine_xml_once():
|
|
combined_path = READY_DIR / COMBINED_FILENAME
|
|
|
|
# Scan all per-client XMLs
|
|
xml_files = sorted(XML_ROOT_PATH.rglob("*.xml")) # sorted for deterministic order
|
|
|
|
# Track files to delete after upload
|
|
processed_files = []
|
|
|
|
combined_rows: dict[str, ET.Element] = {}
|
|
|
|
if xml_files:
|
|
logging.info("Found %d new per-client XML file(s) to merge.", len(xml_files))
|
|
|
|
# Limit how many XMLs (clients) we handle per run
|
|
limited_files = xml_files[:MAX_CLIENTS_PER_RUN]
|
|
if len(xml_files) > MAX_CLIENTS_PER_RUN:
|
|
logging.info(
|
|
"Limiting this run to %d clients; %d will be processed in later runs.",
|
|
MAX_CLIENTS_PER_RUN,
|
|
len(xml_files) - MAX_CLIENTS_PER_RUN,
|
|
)
|
|
|
|
for path in limited_files:
|
|
logging.info(" Reading %s", path)
|
|
try:
|
|
tree = ET.parse(path)
|
|
root = tree.getroot()
|
|
except Exception as e:
|
|
logging.error(" Failed to parse %s: %s", path, e)
|
|
# Keep previous behavior: delete bad file
|
|
processed_files.append(path)
|
|
continue
|
|
|
|
processed_files.append(path)
|
|
|
|
# Extract rows
|
|
if root.tag == "Clients":
|
|
rows = root.findall("Row")
|
|
elif root.tag == "Row":
|
|
rows = [root]
|
|
else:
|
|
rows = root.findall(".//Row")
|
|
|
|
for row in rows:
|
|
id_el = row.find("__id")
|
|
if id_el is None or not (id_el.text or "").strip():
|
|
logging.warning(" Skipping row without __id in %s", path)
|
|
continue
|
|
cid = id_el.text.strip()
|
|
logging.info(" Including client __id=%s", cid)
|
|
combined_rows[cid] = copy.deepcopy(row)
|
|
|
|
else:
|
|
logging.info("No NEW client XMLs found locally — will upload an empty Clients file.")
|
|
|
|
|
|
# Build XML root (may be empty)
|
|
new_root = ET.Element("Clients")
|
|
for cid, row in combined_rows.items():
|
|
new_root.append(row)
|
|
|
|
# Always write combined XML (new or empty)
|
|
tmp_path = combined_path.with_suffix(".tmp")
|
|
try:
|
|
ET.ElementTree(new_root).write(tmp_path, encoding="utf-8", xml_declaration=True)
|
|
os.replace(tmp_path, combined_path)
|
|
logging.info("Wrote combined (or empty) clients.xml to %s", combined_path)
|
|
except Exception as e:
|
|
logging.error("Failed writing combined XML: %s", e)
|
|
if os.path.exists(tmp_path):
|
|
os.remove(tmp_path)
|
|
return
|
|
|
|
# Rotate & upload to SMB
|
|
upload_ok = upload_to_smb(combined_path)
|
|
|
|
if not upload_ok:
|
|
logging.warning(
|
|
"SMB upload failed; keeping per-client XMLs so this batch can be retried next run."
|
|
)
|
|
return
|
|
|
|
# Mark all combined client IDs as pending_import
|
|
try:
|
|
client_ids_in_batch = list(combined_rows.keys()) # these are the __id values
|
|
mark_clients_pending_import(client_ids_in_batch)
|
|
logging.info(
|
|
"Marked %d client(s) as pending_import in %s",
|
|
len(client_ids_in_batch),
|
|
CLIENT_STATE_FILE,
|
|
)
|
|
except Exception as e:
|
|
logging.error("Failed to update client pending_import state: %s", e)
|
|
|
|
# Delete ONLY the processed new XMLs if upload succeeded
|
|
for src in processed_files:
|
|
try:
|
|
src.unlink()
|
|
logging.info("Deleted processed source XML: %s", src)
|
|
except Exception as e:
|
|
logging.error("Failed to delete %s: %s", src, e)
|
|
|
|
|
|
def sleep_until_next_run():
|
|
"""
|
|
Sleep until 1 minute before the next full hour (hh:59).
|
|
"""
|
|
now = datetime.now()
|
|
|
|
# Top of the next hour
|
|
next_hour = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
|
|
run_time = next_hour - timedelta(minutes=1) # hh:59
|
|
|
|
# If we are already past that run_time (e.g., started at 10:59:30), move to the next hour
|
|
if run_time <= now:
|
|
next_hour = next_hour + timedelta(hours=1)
|
|
run_time = next_hour - timedelta(minutes=1) # still hh:59, just next hour
|
|
|
|
delta = (run_time - now).total_seconds()
|
|
logging.info("Next combine run scheduled at %s (in %.0f seconds)", run_time, delta)
|
|
time.sleep(delta)
|
|
|
|
|
|
def main():
|
|
while True:
|
|
sleep_until_next_run()
|
|
logging.info("==== XML combine run ====")
|
|
combine_xml_once()
|
|
# Safety fallback: if anything goes wrong with time calc, we still avoid tight loop
|
|
time.sleep(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|