import asyncio import sys from crccheck.crc import CrcArc import re import json import pymysql from datetime import datetime import logging import configparser CONFIG_PATH = "/app/config.ini" config = configparser.ConfigParser() config.read(CONFIG_PATH) loglevel_str = config["log"]["level"].upper() loglevel = getattr(logging, loglevel_str, logging.INFO) logging.basicConfig(level=loglevel) def verify_message(message: bytes) -> tuple[bool, str]: """ Verify the message format and CRC. Returns (True, payload_str) if valid, else (False, None) """ if len(message) < 10: logging.info("Message too short") return False, None if message[0] != 10 or message[-1] != 13: logging.info("Message must start with LF (\\n) and end with CR (\\r)") return False, None crc_bytes = message[1:5] # 4 hex chars length_bytes = message[5:9] # 4 hex chars data_bytes = message[9:-1] try: crc_str = crc_bytes.decode('ascii') length_str = length_bytes.decode('ascii') payload_str = data_bytes.decode('ascii') except UnicodeDecodeError: logging.info("Non-ASCII characters in message") return False, None try: crc_recv = int(crc_str, 16) length_recv = int(length_str, 16) except ValueError: logging.info("CRC or length not hex") return False, None if length_recv != len(data_bytes): logging.info(f"Length mismatch: declared {length_recv}, actual {len(data_bytes)}") return False, None crc_calc = CrcArc.calc(data_bytes) if crc_calc != crc_recv: logging.info(f"CRC mismatch: received {crc_str} calculated {crc_calc:04X}") return False, None logging.info(f"Valid message received: {payload_str}") return True, payload_str def parse_sia_payload(payload: str) -> dict: """ Extracts fields from SIA-DCS or ADM-CID message according to the protocol spec. Examples: "SIA-DCS"0005L0#1234[1234|Nri1/RP00][Vimage.jpg][X010E57.5698][Y59N12.8358] "ADM-CID"0000RF3L56789#1234[#1234|1131 01 015] """ result = { "format": None, "seq": None, "line": None, "account": None, "receiver": None, "zone": None, "partition": None, "code": None, "x": None, "y": None, "v": None, "raw": payload, "signal_text": None, } # Extract format and header header_match = re.match(r'"([^"]+)"(\d{4})L(\d)#(\d+)', payload) if header_match: result["format"] = header_match.group(1) result["seq"] = header_match.group(2) result["line"] = header_match.group(3) result["account"] = header_match.group(4) # Parse based on message format format_type = result["format"] if format_type == "SIA-DCS": # Main event block: [account|receiver/eventcode] event_match = re.search(r'\[\#?(\d+)\|([^\]]+)\]', payload) if event_match: result["account_verify"] = event_match.group(1) receiver_event = event_match.group(2) # Handle both with and without slash if "/" in receiver_event: receiver, event = receiver_event.split("/", 1) else: match = re.match(r'(Nri\d+)([A-Z]{2}\d+.*)', receiver_event) if match: receiver = match.group(1) event = match.group(2) else: receiver = "" event = receiver_event # fallback result["receiver"] = receiver # Extract code and zone from event (e.g., OP01, BA05) event_code_match = re.match(r'([A-Z]{2})(\d+)', event) if event_code_match: result["code"] = event_code_match.group(1) result["zone"] = event_code_match.group(2) # Extract partition from receiver (e.g., Nri1 → 1) partition_match = re.match(r'Nri(\d+)', receiver) if partition_match: result["partition"] = partition_match.group(1) # Optional signal text (e.g., ^Fire alarm triggered^) text_match = re.search(r'\^([^^]+)\^', event) if text_match: result["signal_text"] = text_match.group(1) # Optional V block (usually image or file name) v_match = re.search(r'\[V([^\]]*)\]', payload) if v_match: result["v"] = v_match.group(1) # Optional GPS data x_match = re.search(r'\[X([^\]]+)\]', payload) y_match = re.search(r'\[Y([^\]]+)\]', payload) if x_match: result["x"] = x_match.group(1) if y_match: result["y"] = y_match.group(1) elif format_type == "ADM-CID": # Extract format format_match = re.search(r'"(ADM-CID)"(\d{4})[A-Z]*L(\d)#(\d+)', payload) if format_match: result["format"] = format_match.group(1) result["seq"] = format_match.group(2) result["line"] = format_match.group(3) result["account"] = format_match.group(4) # Parse event block: [account|event_code partition zone] event_match = re.search(r'\[#?(\d+)\|([A-Z0-9]{3,4}) (\d{2}) (\d{3})\]', payload) if event_match: result["account_verify"] = event_match.group(1) result["code"] = event_match.group(2) result["partition"] = event_match.group(3) result["zone"] = event_match.group(4) # Optional fields timestamp_match = re.search(r'_(\d{2}:\d{2}:\d{2},\d{2}-\d{2}-\d{4})', payload) if timestamp_match: result["timestamp"] = timestamp_match.group(1) v_match = re.search(r'\[V([^\]]*)\]', payload) if v_match: result["v"] = v_match.group(1) x_match = re.search(r'\[X([^\]]+)\]', payload) y_match = re.search(r'\[Y([^\]]+)\]', payload) if x_match: result["x"] = x_match.group(1) if y_match: result["y"] = y_match.group(1) # Optional timestamp: _hh:mm:ss,mm-dd-yyyy time_match = re.search(r'_(\d{2}:\d{2}:\d{2}),(\d{2}-\d{2}-\d{4})', payload) if time_match: time_str = f"{time_match.group(2)} {time_match.group(1)}" result["signal_time"] = datetime.strptime(time_str, "%m-%d-%Y %H:%M:%S").isoformat() return result def build_nack(seq: str, account: str, receiver: str = '0') -> bytes: lf = chr(0x0A) cr = chr(0x0D) payload = f'"NACK"{seq}L{receiver}#{account}[]' crc = CrcArc.calc(payload.encode()) length = str(len(payload)).zfill(4) return f"{lf}{crc:04X}{length}{payload}{cr}".encode() import asyncio async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): db_config = { 'host': config["database"]["host"], 'user': config["database"]["user"], 'password': config["database"]["password"], 'database': config["database"]["database"] } addr = writer.get_extra_info('peername') lf = chr(0x0A) # LF cr = chr(0x0D) # CR logging.info(f"Connection from {addr}") try: while True: try: data = await reader.readuntil(b'\r') except asyncio.IncompleteReadError: logging.info("Client closed connection") break logging.info(f"Raw received: {data}") is_valid, payload = verify_message(data) response = b'\n"NACK"0000L0#0000[]\r' # Default response if is_valid and payload: parsed = parse_sia_payload(payload) parsed["source_ip"] = str(addr[0]) logging.info("Parsed payload:") logging.info(json.dumps(parsed, indent=2)) seq = parsed.get("seq", "0000") account = parsed.get("account", "0000") response = build_nack(seq=seq, account=account) # Validate required fields required_format = parsed.get("format") required_fields = [parsed.get("account"), parsed.get("code"), parsed.get("zone")] if required_format in ["SIA-DCS","ADM-CID"]: if any(f is None for f in required_fields): logging.info("Missing required SIA fields — sending NACK.") # build and send NACK else: logging.info("Valid SIA-DCS signal.") # Connect to MariaDB conn = pymysql.connect(**db_config) cursor = conn.cursor() query = """ INSERT INTO signals ( protocol, raw_message, account, sequence, line_number, event_code, `partition`, zone, signal_time, source_ip, signal_text, x, y, v ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ values = ( parsed["format"], parsed["raw"], parsed.get("account"), parsed.get("seq"), parsed.get("line"), parsed.get("code"), parsed.get("partition"), parsed.get("zone"), parsed.get("signal_time"), parsed.get("source_ip"), parsed.get("signal_text"), parsed.get("x"), parsed.get("y"), parsed.get("v") ) cursor.execute(query, values) conn.commit() logging.info("Inserted signal into database.") cursor.close() conn.close() # Build ACK response response_payload = f'"ACK"{seq}L0#{account}[]' crc = CrcArc.calc(response_payload.encode()) length = str(len(response_payload)).zfill(4) response = f"{lf}{crc:04X}{length}{response_payload}{cr}".encode() logging.info(f"Sending response: {response}") elif required_format == "NULL": logging.info("Received NULL signal, skipping field validation.") # Build ACK response response_payload = f'"ACK"{seq}L0#{account}[]' crc = CrcArc.calc(response_payload.encode()) length = str(len(response_payload)).zfill(4) response = f"{lf}{crc:04X}{length}{response_payload}{cr}".encode() logging.info(f"Sending response: {response}") # allow else: logging.info("Invalid CRC or malformed message — sending fallback NACK.") writer.write(response) await writer.drain() except Exception as e: logging.info(f"Unexpected error: {e}") finally: writer.close() await writer.wait_closed() logging.info(f"Connection with {addr} closed") async def main(port): server = await asyncio.start_server(handle_client, host='0.0.0.0', port=port) logging.info(f"Listening on TCP port {port} for SIA-DCS messages") async with server: await server.serve_forever() if __name__ == '__main__': if len(sys.argv) < 2: print("Usage: tcp_sia_server.bin ") sys.exit(1) try: port = int(sys.argv[1]) asyncio.run(main(port)) except Exception as e: logging.error(f"[SIA] Failed to start server: {e}")