ARC_Starter/backend/tcp_sia_server.py

320 lines
12 KiB
Python

import asyncio
import sys
from crccheck.crc import CrcArc
import re
import json
import pymysql
from datetime import datetime
import logging
import configparser
CONFIG_PATH = "/app/config.ini"
config = configparser.ConfigParser()
config.read(CONFIG_PATH)
loglevel_str = config["log"]["level"].upper()
loglevel = getattr(logging, loglevel_str, logging.INFO)
logging.basicConfig(level=loglevel)
def verify_message(message: bytes) -> tuple[bool, str]:
"""
Verify the message format and CRC.
Returns (True, payload_str) if valid, else (False, None)
"""
if len(message) < 10:
logging.info("Message too short")
return False, None
if message[0] != 10 or message[-1] != 13:
logging.info("Message must start with LF (\\n) and end with CR (\\r)")
return False, None
crc_bytes = message[1:5] # 4 hex chars
length_bytes = message[5:9] # 4 hex chars
data_bytes = message[9:-1]
try:
crc_str = crc_bytes.decode('ascii')
length_str = length_bytes.decode('ascii')
payload_str = data_bytes.decode('ascii')
except UnicodeDecodeError:
logging.info("Non-ASCII characters in message")
return False, None
try:
crc_recv = int(crc_str, 16)
length_recv = int(length_str, 16)
except ValueError:
logging.info("CRC or length not hex")
return False, None
if length_recv != len(data_bytes):
logging.info(f"Length mismatch: declared {length_recv}, actual {len(data_bytes)}")
return False, None
crc_calc = CrcArc.calc(data_bytes)
if crc_calc != crc_recv:
logging.info(f"CRC mismatch: received {crc_str} calculated {crc_calc:04X}")
return False, None
logging.info(f"Valid message received: {payload_str}")
return True, payload_str
def parse_sia_payload(payload: str) -> dict:
"""
Extracts fields from SIA-DCS or ADM-CID message according to the protocol spec.
Examples:
"SIA-DCS"0005L0#1234[1234|Nri1/RP00][Vimage.jpg][X010E57.5698][Y59N12.8358]
"ADM-CID"0000RF3L56789#1234[#1234|1131 01 015]
"""
result = {
"format": None,
"seq": None,
"line": None,
"account": None,
"receiver": None,
"zone": None,
"partition": None,
"code": None,
"x": None,
"y": None,
"v": None,
"raw": payload,
"signal_text": None,
}
# Extract format and header
header_match = re.match(r'"([^"]+)"(\d{4})L(\d)#(\d+)', payload)
if header_match:
result["format"] = header_match.group(1)
result["seq"] = header_match.group(2)
result["line"] = header_match.group(3)
result["account"] = header_match.group(4)
# Parse based on message format
format_type = result["format"]
if format_type == "SIA-DCS":
# Main event block: [account|receiver/eventcode]
event_match = re.search(r'\[\#?(\d+)\|([^\]]+)\]', payload)
if event_match:
result["account_verify"] = event_match.group(1)
receiver_event = event_match.group(2)
# Handle both with and without slash
if "/" in receiver_event:
receiver, event = receiver_event.split("/", 1)
else:
match = re.match(r'(Nri\d+)([A-Z]{2}\d+.*)', receiver_event)
if match:
receiver = match.group(1)
event = match.group(2)
else:
receiver = ""
event = receiver_event # fallback
result["receiver"] = receiver
# Extract code and zone from event (e.g., OP01, BA05)
event_code_match = re.match(r'([A-Z]{2})(\d+)', event)
if event_code_match:
result["code"] = event_code_match.group(1)
result["zone"] = event_code_match.group(2)
# Extract partition from receiver (e.g., Nri1 → 1)
partition_match = re.match(r'Nri(\d+)', receiver)
if partition_match:
result["partition"] = partition_match.group(1)
# Optional signal text (e.g., ^Fire alarm triggered^)
text_match = re.search(r'\^([^^]+)\^', event)
if text_match:
result["signal_text"] = text_match.group(1)
# Optional V block (usually image or file name)
v_match = re.search(r'\[V([^\]]*)\]', payload)
if v_match:
result["v"] = v_match.group(1)
# Optional GPS data
x_match = re.search(r'\[X([^\]]+)\]', payload)
y_match = re.search(r'\[Y([^\]]+)\]', payload)
if x_match:
result["x"] = x_match.group(1)
if y_match:
result["y"] = y_match.group(1)
elif format_type == "ADM-CID":
# Extract format
format_match = re.search(r'"(ADM-CID)"(\d{4})[A-Z]*L(\d)#(\d+)', payload)
if format_match:
result["format"] = format_match.group(1)
result["seq"] = format_match.group(2)
result["line"] = format_match.group(3)
result["account"] = format_match.group(4)
# Parse event block: [account|event_code partition zone]
event_match = re.search(r'\[#?(\d+)\|([A-Z0-9]{3,4}) (\d{2}) (\d{3})\]', payload)
if event_match:
result["account_verify"] = event_match.group(1)
result["code"] = event_match.group(2)
result["partition"] = event_match.group(3)
result["zone"] = event_match.group(4)
# Optional fields
timestamp_match = re.search(r'_(\d{2}:\d{2}:\d{2},\d{2}-\d{2}-\d{4})', payload)
if timestamp_match:
result["timestamp"] = timestamp_match.group(1)
v_match = re.search(r'\[V([^\]]*)\]', payload)
if v_match:
result["v"] = v_match.group(1)
x_match = re.search(r'\[X([^\]]+)\]', payload)
y_match = re.search(r'\[Y([^\]]+)\]', payload)
if x_match:
result["x"] = x_match.group(1)
if y_match:
result["y"] = y_match.group(1)
# Optional timestamp: _hh:mm:ss,mm-dd-yyyy
time_match = re.search(r'_(\d{2}:\d{2}:\d{2}),(\d{2}-\d{2}-\d{4})', payload)
if time_match:
time_str = f"{time_match.group(2)} {time_match.group(1)}"
result["signal_time"] = datetime.strptime(time_str, "%m-%d-%Y %H:%M:%S").isoformat()
return result
def build_nack(seq: str, account: str, receiver: str = '0') -> bytes:
lf = chr(0x0A)
cr = chr(0x0D)
payload = f'"NACK"{seq}L{receiver}#{account}[]'
crc = CrcArc.calc(payload.encode())
length = str(len(payload)).zfill(4)
return f"{lf}{crc:04X}{length}{payload}{cr}".encode()
import asyncio
async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
db_config = {
'host': config["database"]["host"],
'user': config["database"]["user"],
'password': config["database"]["password"],
'database': config["database"]["database"]
}
addr = writer.get_extra_info('peername')
lf = chr(0x0A) # LF
cr = chr(0x0D) # CR
logging.info(f"Connection from {addr}")
try:
while True:
try:
data = await reader.readuntil(b'\r')
except asyncio.IncompleteReadError:
logging.info("Client closed connection")
break
logging.info(f"Raw received: {data}")
is_valid, payload = verify_message(data)
response = b'\n"NACK"0000L0#0000[]\r' # Default response
if is_valid and payload:
parsed = parse_sia_payload(payload)
parsed["source_ip"] = str(addr[0])
logging.info("Parsed payload:")
logging.info(json.dumps(parsed, indent=2))
seq = parsed.get("seq", "0000")
account = parsed.get("account", "0000")
response = build_nack(seq=seq, account=account)
# Validate required fields
required_format = parsed.get("format")
required_fields = [parsed.get("account"), parsed.get("code"), parsed.get("zone")]
if required_format in ["SIA-DCS","ADM-CID"]:
if any(f is None for f in required_fields):
logging.info("Missing required SIA fields — sending NACK.")
# build and send NACK
else:
logging.info("Valid SIA-DCS signal.")
# Connect to MariaDB
conn = pymysql.connect(**db_config)
cursor = conn.cursor()
query = """
INSERT INTO signals (
protocol, raw_message, account, sequence,
line_number, event_code, `partition`, zone, signal_time, source_ip, signal_text, x, y, v
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
values = (
parsed["format"],
parsed["raw"],
parsed.get("account"),
parsed.get("seq"),
parsed.get("line"),
parsed.get("code"),
parsed.get("partition"),
parsed.get("zone"),
parsed.get("signal_time"),
parsed.get("source_ip"),
parsed.get("signal_text"),
parsed.get("x"),
parsed.get("y"),
parsed.get("v")
)
cursor.execute(query, values)
conn.commit()
logging.info("Inserted signal into database.")
cursor.close()
conn.close()
# Build ACK response
response_payload = f'"ACK"{seq}L0#{account}[]'
crc = CrcArc.calc(response_payload.encode())
length = str(len(response_payload)).zfill(4)
response = f"{lf}{crc:04X}{length}{response_payload}{cr}".encode()
logging.info(f"Sending response: {response}")
elif required_format == "NULL":
logging.info("Received NULL signal, skipping field validation.")
# Build ACK response
response_payload = f'"ACK"{seq}L0#{account}[]'
crc = CrcArc.calc(response_payload.encode())
length = str(len(response_payload)).zfill(4)
response = f"{lf}{crc:04X}{length}{response_payload}{cr}".encode()
logging.info(f"Sending response: {response}")
# allow
else:
logging.info("Invalid CRC or malformed message — sending fallback NACK.")
writer.write(response)
await writer.drain()
except Exception as e:
logging.info(f"Unexpected error: {e}")
finally:
writer.close()
await writer.wait_closed()
logging.info(f"Connection with {addr} closed")
async def main(port):
server = await asyncio.start_server(handle_client, host='0.0.0.0', port=port)
logging.info(f"Listening on TCP port {port} for SIA-DCS messages")
async with server:
await server.serve_forever()
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: tcp_sia_server.bin <port>")
sys.exit(1)
try:
port = int(sys.argv[1])
asyncio.run(main(port))
except Exception as e:
logging.error(f"[SIA] Failed to start server: {e}")