First working signal logger.

This commit is contained in:
Anders Knutsen 2025-07-25 10:14:42 +02:00
commit 6c889380ec
8 changed files with 390 additions and 0 deletions

18
Dockerfile Normal file
View File

@ -0,0 +1,18 @@
# Use an official Python runtime as a parent image
FROM python:3.11-slim
# Set working directory inside container
WORKDIR /app
# Copy the Python script into the container
COPY tcp_sia_server.py /app/
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Expose the port your server listens on
EXPOSE 9000
# Run the script
CMD ["python", "tcp_sia_server.py"]

15
README.md Normal file
View File

@ -0,0 +1,15 @@
First working TCP SIA-DC09 logger. Logs to mariadb.
No frontend, just a backend with mariadb and a TCP server that parses SIA-DC09 protocol data.
Current folder structure:
├── docker-compose.yml
├── Dockerfile
├── frontend
│ └── index.html
├── nginx.conf
├── README.md
├── requirements.txt
├── tcp_sia_server.py
└── tester.py

46
docker-compose.yml Normal file
View File

@ -0,0 +1,46 @@
version: "3.8"
services:
mariadb:
image: mariadb:10.9
environment:
MYSQL_ROOT_PASSWORD: rootpassword
MYSQL_DATABASE: superarc
MYSQL_USER: admin
MYSQL_PASSWORD: yourpassword
volumes:
- mariadb_data:/var/lib/mysql
ports:
- "3306:3306"
restart: always
nginx:
image: nginx:stable
volumes:
- ./frontend:/usr/share/nginx/html:ro
- ./nginx.conf:/etc/nginx/nginx.conf:ro
ports:
- "80:80"
depends_on:
- mariadb
- php-fpm
php-fpm:
image: php:8.2-fpm
volumes:
- ./frontend:/usr/share/nginx/html
tcp_sia_server:
build: .
ports:
- "9000:9000"
volumes:
- ./:/app
working_dir: /app
command: python -u tcp_sia_server.py
depends_on:
- mariadb
volumes:
mariadb_data:

1
frontend/index.html Normal file
View File

@ -0,0 +1 @@
test

20
nginx.conf Normal file
View File

@ -0,0 +1,20 @@
events { }
http {
server {
listen 80;
root /usr/share/nginx/html;
index index.php index.html index.htm;
location / {
try_files $uri $uri/ =404;
}
location ~ \.php$ {
include fastcgi_params;
fastcgi_pass php-fpm:9000;
fastcgi_index index.php;
fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
}
}
}

2
requirements.txt Normal file
View File

@ -0,0 +1,2 @@
crccheck
mysql-connector-python

213
tcp_sia_server.py Normal file
View File

@ -0,0 +1,213 @@
import asyncio
from crccheck.crc import CrcArc
import re
import json
import mysql.connector
from datetime import datetime
def verify_message(message: bytes) -> tuple[bool, str]:
"""
Verify the message format and CRC.
Returns (True, payload_str) if valid, else (False, None)
"""
if len(message) < 10:
print("Message too short")
return False, None
if message[0] != 10 or message[-1] != 13:
print("Message must start with LF (\\n) and end with CR (\\r)")
return False, None
crc_bytes = message[1:5] # 4 hex chars
length_bytes = message[5:9] # 4 hex chars
data_bytes = message[9:-1]
try:
crc_str = crc_bytes.decode('ascii')
length_str = length_bytes.decode('ascii')
payload_str = data_bytes.decode('ascii')
except UnicodeDecodeError:
print("Non-ASCII characters in message")
return False, None
try:
crc_recv = int(crc_str, 16)
length_recv = int(length_str, 16)
except ValueError:
print("CRC or length not hex")
return False, None
if length_recv != len(data_bytes):
print(f"Length mismatch: declared {length_recv}, actual {len(data_bytes)}")
return False, None
crc_calc = CrcArc.calc(data_bytes)
if crc_calc != crc_recv:
print(f"CRC mismatch: received {crc_str} calculated {crc_calc:04X}")
return False, None
print(f"✅ Valid message received: {payload_str}")
return True, payload_str
def parse_sia_payload(payload: str) -> dict:
"""
Extracts fields from SIA-DC09 message according to the full protocol spec.
Example:
"SIA-DCS"0005L0#1234[1234|Nri1/RP00][Vimage.jpg][X010E57.5698][Y59N12.8358]
"""
result = {
"format": None,
"seq": None,
"line": None,
"account": None,
"receiver": None,
"event": None,
"zone": None,
"partition": None,
"code": None,
"x": None,
"y": None,
"v": None,
"raw": payload,
}
# Extract format and header
header_match = re.match(r'"([^"]+)"(\d{4})L(\d)#(\d+)', payload)
if header_match:
result["format"] = header_match.group(1)
result["seq"] = header_match.group(2)
result["line"] = header_match.group(3)
result["account"] = header_match.group(4)
# Main event block: [account|receiver/eventcode]
event_match = re.search(r'\[\#?(\d+)\|([^\]]+)\]', payload)
if event_match:
result["account_verify"] = event_match.group(1)
receiver_event = event_match.group(2)
# Split by slash
if "/" in receiver_event:
receiver, event = receiver_event.split("/", 1)
result["receiver"] = receiver
result["event"] = event
# Optional: split event into type (2 chars), zone (23 digits)
event_code_match = re.match(r'([A-Z]{2})(\d+)', event)
if event_code_match:
result["code"] = event_code_match.group(1)
result["zone"] = event_code_match.group(2)
# Extract partition from receiver (e.g., "Nri0" -> "0")
partition_match = re.match(r'Nri(\d+)', receiver)
if partition_match:
result["partition"] = partition_match.group(1)
# Optional V block (usually image URL)
v_match = re.search(r'\[V([^\]]*)\]', payload)
if v_match:
result["v"] = v_match.group(1)
# Optional GPS data
x_match = re.search(r'\[X([^\]]+)\]', payload)
y_match = re.search(r'\[Y([^\]]+)\]', payload)
if x_match:
result["x"] = x_match.group(1)
if y_match:
result["y"] = y_match.group(1)
time_match = re.search(r'_(\d{2}:\d{2}:\d{2}),(\d{2}-\d{2}-\d{4})', payload)
if time_match:
time_str = f"{time_match.group(2)} {time_match.group(1)}" # e.g., 07-25-2025 07:33:01
result["signal_time"] = datetime.strptime(time_str, "%m-%d-%Y %H:%M:%S").isoformat()
return result
import asyncio
async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
addr = writer.get_extra_info('peername')
print(f"📡 Connection from {addr}")
try:
data = await reader.readuntil(b'\r')
print(f"📥 Raw received: {data}")
is_valid, payload = verify_message(data)
response = b"\n0004NACK\r" # fallback
if is_valid and payload:
parsed = parse_sia_payload(payload)
parsed["source_ip"] = str(addr[0])
print("📦 Parsed payload:")
print(json.dumps(parsed, indent=2))
sia_data = parsed
# Connect to MariaDB
conn = mysql.connector.connect(
host="192.168.10.57",
user="admin",
password="yourpassword",
database="superarc"
)
cursor = conn.cursor()
# Insert data
query = """
INSERT INTO signals (
protocol, raw_message, account, sequence,
line_number, event_code, `partition`, zone, signal_time, source_ip
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
values = (
sia_data["format"],
sia_data["raw"],
sia_data.get("account"),
sia_data.get("seq"),
sia_data.get("line"),
sia_data.get("code"),
sia_data.get("partition"),
sia_data.get("zone"),
sia_data.get("signal_time"),
sia_data.get("source_ip")
)
cursor.execute(query, values)
conn.commit()
print("✅ Inserted signal into database.")
cursor.close()
conn.close()
seq = parsed.get("seq", "0000")
account = parsed.get("account", "0000")
response_payload = f'"ACK"{seq}L0#{account}[]'
crc = CrcArc.calc(response_payload.encode())
length = str(hex(len(response_payload))).split('x')
response = f"\n{crc:04X}00{length[1].upper()}{response_payload}\r".encode()
print(f"✅ Sending response: {response}")
else:
print("❌ Invalid message. Sending fallback NACK.")
writer.write(response)
await writer.drain()
except asyncio.IncompleteReadError:
print("⚠️ Connection closed before end of message")
except Exception as e:
print(f"🔥 Error: {e}")
finally:
writer.close()
await writer.wait_closed()
print(f"🔌 Connection with {addr} closed")
async def main():
server = await asyncio.start_server(handle_client, host='0.0.0.0', port=9000)
print("🚀 Listening on TCP port 9000 for SIA-DCS messages")
async with server:
await server.serve_forever()
if __name__ == '__main__':
asyncio.run(main())

75
tester.py Normal file
View File

@ -0,0 +1,75 @@
import socket
import os
from crccheck.crc import CrcArc
def clear_screen():
os.system("cls" if os.name == "nt" else "clear")
def send_tcp_message(host, port, message: bytes):
try:
with socket.create_connection((host, port), timeout=10) as sock:
print(f"📤 Sending: {message}")
sock.sendall(message)
response = sock.recv(1024)
print(f"📥 Received: {response}")
except Exception as e:
print(f"❌ Error sending message: {e}")
def build_sia_dcs_message():
client_id = input("Enter Client ID (e.g. 1234): ").zfill(4)
code = input("Enter Event Code (e.g. OP, RP, BA): ").upper()
zone = input("Enter Zone (e.g. 01): ").zfill(2)
image_url = input("Optional Image URL (or leave blank): ")
body = f'"SIA-DCS"0001L0#{client_id}[{client_id}|Nri1/{code}{zone}]'
if image_url:
body += f'[V{image_url}]'
data_bytes = body.encode("ascii")
crc = CrcArc.calc(data_bytes)
length = len(data_bytes)
full_message = f"\n{crc:04X}{length:04X}{body}\r".encode("ascii")
return full_message
def build_adm_cid_message():
line = input("Enter phone line number (e.g. 01): ").zfill(2)
account = input("Enter account number (e.g. 1234): ").zfill(4)
code = input("Enter event code (e.g. 401): ").zfill(3)
qualifier = input("Enter qualifier (1=New, 3=Restore): ").zfill(1)
area = input("Enter partition/area (e.g. 1): ").zfill(2)
zone = input("Enter zone/user (e.g. 005): ").zfill(3)
# ADM-CID format: \r\nLLL:18QCCC[AAAA|EEMMZZ]\r
cid_data = f'18{qualifier}{code}[{account}|{area}{area}{zone}]'
msg = f"\r\n{line}{cid_data}\r".encode("ascii")
return msg
def main():
clear_screen()
print("🚨 Alarm Test Sender")
host = input("Enter receiver IP (e.g. 127.0.0.1): ")
port = int(input("Enter port (e.g. 9000): "))
while True:
print("\nChoose protocol:")
print("1. Send SIA-DCS message")
print("2. Send ADM-CID message")
print("3. Exit")
choice = input("Select (1/2/3): ")
if choice == "1":
msg = build_sia_dcs_message()
send_tcp_message(host, port, msg)
elif choice == "2":
msg = build_adm_cid_message()
send_tcp_message(host, port, msg)
elif choice == "3":
print("👋 Bye!")
break
else:
print("Invalid choice")
if __name__ == "__main__":
main()