import asyncio from crccheck.crc import CrcArc import re import json import mysql.connector from datetime import datetime def verify_message(message: bytes) -> tuple[bool, str]: """ Verify the message format and CRC. Returns (True, payload_str) if valid, else (False, None) """ if len(message) < 10: print("Message too short") return False, None if message[0] != 10 or message[-1] != 13: print("Message must start with LF (\\n) and end with CR (\\r)") return False, None crc_bytes = message[1:5] # 4 hex chars length_bytes = message[5:9] # 4 hex chars data_bytes = message[9:-1] try: crc_str = crc_bytes.decode('ascii') length_str = length_bytes.decode('ascii') payload_str = data_bytes.decode('ascii') except UnicodeDecodeError: print("Non-ASCII characters in message") return False, None try: crc_recv = int(crc_str, 16) length_recv = int(length_str, 16) except ValueError: print("CRC or length not hex") return False, None if length_recv != len(data_bytes): print(f"Length mismatch: declared {length_recv}, actual {len(data_bytes)}") return False, None crc_calc = CrcArc.calc(data_bytes) if crc_calc != crc_recv: print(f"CRC mismatch: received {crc_str} calculated {crc_calc:04X}") return False, None print(f"Valid message received: {payload_str}") return True, payload_str def parse_sia_payload(payload: str) -> dict: """ Extracts fields from SIA-DC09 message according to the full protocol spec. Example: "SIA-DCS"0005L0#1234[1234|Nri1/RP00][Vimage.jpg][X010E57.5698][Y59N12.8358] """ result = { "format": None, "seq": None, "line": None, "account": None, "receiver": None, "event": None, "zone": None, "partition": None, "code": None, "x": None, "y": None, "v": None, "raw": payload, } # Extract format and header header_match = re.match(r'"([^"]+)"(\d{4})L(\d)#(\d+)', payload) if header_match: result["format"] = header_match.group(1) result["seq"] = header_match.group(2) result["line"] = header_match.group(3) result["account"] = header_match.group(4) # Main event block: [account|receiver/eventcode] event_match = re.search(r'\[\#?(\d+)\|([^\]]+)\]', payload) if event_match: result["account_verify"] = event_match.group(1) receiver_event = event_match.group(2) # Split by slash if "/" in receiver_event: receiver, event = receiver_event.split("/", 1) result["receiver"] = receiver result["event"] = event # Optional: split event into type (2 chars), zone (2–3 digits) event_code_match = re.match(r'([A-Z]{2})(\d+)', event) if event_code_match: result["code"] = event_code_match.group(1) result["zone"] = event_code_match.group(2) # Extract partition from receiver (e.g., "Nri0" -> "0") partition_match = re.match(r'Nri(\d+)', receiver) if partition_match: result["partition"] = partition_match.group(1) # Optional V block (usually image URL) v_match = re.search(r'\[V([^\]]*)\]', payload) if v_match: result["v"] = v_match.group(1) # Optional GPS data x_match = re.search(r'\[X([^\]]+)\]', payload) y_match = re.search(r'\[Y([^\]]+)\]', payload) if x_match: result["x"] = x_match.group(1) if y_match: result["y"] = y_match.group(1) time_match = re.search(r'_(\d{2}:\d{2}:\d{2}),(\d{2}-\d{2}-\d{4})', payload) if time_match: time_str = f"{time_match.group(2)} {time_match.group(1)}" # e.g., 07-25-2025 07:33:01 result["signal_time"] = datetime.strptime(time_str, "%m-%d-%Y %H:%M:%S").isoformat() return result import asyncio async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): addr = writer.get_extra_info('peername') print(f"Connection from {addr}") try: data = await reader.readuntil(b'\r') print(f"Raw received: {data}") is_valid, payload = verify_message(data) response = b"\n0004NACK\r" # fallback if is_valid and payload: parsed = parse_sia_payload(payload) parsed["source_ip"] = str(addr[0]) print("Parsed payload:") print(json.dumps(parsed, indent=2)) sia_data = parsed # Connect to MariaDB conn = mysql.connector.connect( host="192.168.10.57", user="admin", password="yourpassword", database="superarc" ) cursor = conn.cursor() # Insert data query = """ INSERT INTO signals ( protocol, raw_message, account, sequence, line_number, event_code, `partition`, zone, signal_time, source_ip ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ values = ( sia_data["format"], sia_data["raw"], sia_data.get("account"), sia_data.get("seq"), sia_data.get("line"), sia_data.get("code"), sia_data.get("partition"), sia_data.get("zone"), sia_data.get("signal_time"), sia_data.get("source_ip") ) cursor.execute(query, values) conn.commit() print("Inserted signal into database.") cursor.close() conn.close() seq = parsed.get("seq", "0000") account = parsed.get("account", "0000") response_payload = f'"ACK"{seq}L0#{account}[]' crc = CrcArc.calc(response_payload.encode()) length = str(hex(len(response_payload))).split('x') response = f"\n{crc:04X}00{length[1].upper()}{response_payload}\r".encode() print(f"Sending response: {response}") else: print("Invalid message. Sending fallback NACK.") writer.write(response) await writer.drain() except asyncio.IncompleteReadError: print("Connection closed before end of message") except Exception as e: print(f"Error: {e}") finally: writer.close() await writer.wait_closed() print(f"Connection with {addr} closed") async def main(): server = await asyncio.start_server(handle_client, host='0.0.0.0', port=9000) print("Listening on TCP port 9000 for SIA-DCS messages") async with server: await server.serve_forever() if __name__ == '__main__': asyncio.run(main())