ARC_Starter/tcp_sia_server.py

214 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
from crccheck.crc import CrcArc
import re
import json
import mysql.connector
from datetime import datetime
def verify_message(message: bytes) -> tuple[bool, str]:
"""
Verify the message format and CRC.
Returns (True, payload_str) if valid, else (False, None)
"""
if len(message) < 10:
print("Message too short")
return False, None
if message[0] != 10 or message[-1] != 13:
print("Message must start with LF (\\n) and end with CR (\\r)")
return False, None
crc_bytes = message[1:5] # 4 hex chars
length_bytes = message[5:9] # 4 hex chars
data_bytes = message[9:-1]
try:
crc_str = crc_bytes.decode('ascii')
length_str = length_bytes.decode('ascii')
payload_str = data_bytes.decode('ascii')
except UnicodeDecodeError:
print("Non-ASCII characters in message")
return False, None
try:
crc_recv = int(crc_str, 16)
length_recv = int(length_str, 16)
except ValueError:
print("CRC or length not hex")
return False, None
if length_recv != len(data_bytes):
print(f"Length mismatch: declared {length_recv}, actual {len(data_bytes)}")
return False, None
crc_calc = CrcArc.calc(data_bytes)
if crc_calc != crc_recv:
print(f"CRC mismatch: received {crc_str} calculated {crc_calc:04X}")
return False, None
print(f"✅ Valid message received: {payload_str}")
return True, payload_str
def parse_sia_payload(payload: str) -> dict:
"""
Extracts fields from SIA-DC09 message according to the full protocol spec.
Example:
"SIA-DCS"0005L0#1234[1234|Nri1/RP00][Vimage.jpg][X010E57.5698][Y59N12.8358]
"""
result = {
"format": None,
"seq": None,
"line": None,
"account": None,
"receiver": None,
"event": None,
"zone": None,
"partition": None,
"code": None,
"x": None,
"y": None,
"v": None,
"raw": payload,
}
# Extract format and header
header_match = re.match(r'"([^"]+)"(\d{4})L(\d)#(\d+)', payload)
if header_match:
result["format"] = header_match.group(1)
result["seq"] = header_match.group(2)
result["line"] = header_match.group(3)
result["account"] = header_match.group(4)
# Main event block: [account|receiver/eventcode]
event_match = re.search(r'\[\#?(\d+)\|([^\]]+)\]', payload)
if event_match:
result["account_verify"] = event_match.group(1)
receiver_event = event_match.group(2)
# Split by slash
if "/" in receiver_event:
receiver, event = receiver_event.split("/", 1)
result["receiver"] = receiver
result["event"] = event
# Optional: split event into type (2 chars), zone (23 digits)
event_code_match = re.match(r'([A-Z]{2})(\d+)', event)
if event_code_match:
result["code"] = event_code_match.group(1)
result["zone"] = event_code_match.group(2)
# Extract partition from receiver (e.g., "Nri0" -> "0")
partition_match = re.match(r'Nri(\d+)', receiver)
if partition_match:
result["partition"] = partition_match.group(1)
# Optional V block (usually image URL)
v_match = re.search(r'\[V([^\]]*)\]', payload)
if v_match:
result["v"] = v_match.group(1)
# Optional GPS data
x_match = re.search(r'\[X([^\]]+)\]', payload)
y_match = re.search(r'\[Y([^\]]+)\]', payload)
if x_match:
result["x"] = x_match.group(1)
if y_match:
result["y"] = y_match.group(1)
time_match = re.search(r'_(\d{2}:\d{2}:\d{2}),(\d{2}-\d{2}-\d{4})', payload)
if time_match:
time_str = f"{time_match.group(2)} {time_match.group(1)}" # e.g., 07-25-2025 07:33:01
result["signal_time"] = datetime.strptime(time_str, "%m-%d-%Y %H:%M:%S").isoformat()
return result
import asyncio
async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
addr = writer.get_extra_info('peername')
print(f"📡 Connection from {addr}")
try:
data = await reader.readuntil(b'\r')
print(f"📥 Raw received: {data}")
is_valid, payload = verify_message(data)
response = b"\n0004NACK\r" # fallback
if is_valid and payload:
parsed = parse_sia_payload(payload)
parsed["source_ip"] = str(addr[0])
print("📦 Parsed payload:")
print(json.dumps(parsed, indent=2))
sia_data = parsed
# Connect to MariaDB
conn = mysql.connector.connect(
host="192.168.10.57",
user="admin",
password="yourpassword",
database="superarc"
)
cursor = conn.cursor()
# Insert data
query = """
INSERT INTO signals (
protocol, raw_message, account, sequence,
line_number, event_code, `partition`, zone, signal_time, source_ip
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
values = (
sia_data["format"],
sia_data["raw"],
sia_data.get("account"),
sia_data.get("seq"),
sia_data.get("line"),
sia_data.get("code"),
sia_data.get("partition"),
sia_data.get("zone"),
sia_data.get("signal_time"),
sia_data.get("source_ip")
)
cursor.execute(query, values)
conn.commit()
print("✅ Inserted signal into database.")
cursor.close()
conn.close()
seq = parsed.get("seq", "0000")
account = parsed.get("account", "0000")
response_payload = f'"ACK"{seq}L0#{account}[]'
crc = CrcArc.calc(response_payload.encode())
length = str(hex(len(response_payload))).split('x')
response = f"\n{crc:04X}00{length[1].upper()}{response_payload}\r".encode()
print(f"✅ Sending response: {response}")
else:
print("❌ Invalid message. Sending fallback NACK.")
writer.write(response)
await writer.drain()
except asyncio.IncompleteReadError:
print("⚠️ Connection closed before end of message")
except Exception as e:
print(f"🔥 Error: {e}")
finally:
writer.close()
await writer.wait_closed()
print(f"🔌 Connection with {addr} closed")
async def main():
server = await asyncio.start_server(handle_client, host='0.0.0.0', port=9000)
print("🚀 Listening on TCP port 9000 for SIA-DCS messages")
async with server:
await server.serve_forever()
if __name__ == '__main__':
asyncio.run(main())