tcp_splitter/worker.py

162 lines
6.0 KiB
Python

import socket
import threading
import json
import time
import logging
from datetime import datetime
from crccheck.crc import CrcArc
# Load configuration from config.json
with open('config.json', 'r') as config_file:
config = json.load(config_file)
# Configuration from config.json
LISTEN_HOST = config['listen_host']
LISTEN_PORT = config['listen_port']
TARGET_HOSTS = config['target_hosts']
RUN_DURATION = config['run_duration']
LOG_LEVEL = config['log_level']
# Map log level strings to logging module constants
log_level_mapping = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL
}
# Validate and set the log level
if LOG_LEVEL not in log_level_mapping:
raise ValueError(f"Invalid log level: {LOG_LEVEL}. Valid options are: {list(log_level_mapping.keys())}")
# Logging configuration
logging.basicConfig(
level=log_level_mapping[LOG_LEVEL],
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('relay_server.log', encoding='utf-8'), # Log to a file
logging.StreamHandler() # Log to the console
]
)
logger = logging.getLogger(__name__)
def relay_to_server(server_host, server_port, data):
"""Relay data to a server and return the response."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
server_socket.connect((server_host, server_port))
server_socket.sendall(data)
response = server_socket.recv(4096)
return response
except Exception as e:
logger.error(f"Failed to relay data to {server_host}:{server_port}: {e}")
return None
def calculate_crc(data):
"""Calculate CRC using CrcArc."""
crc = CrcArc.calc(data.encode()) # Encode data to bytes
crc_hex = hex(crc).split('x')[1].upper().zfill(4) # Format as uppercase hex (4 chars)
return crc_hex
def calculate_length(data):
"""Calculate the length of the data in hexadecimal."""
length = len(data.encode()) # Length in bytes
length_hex = hex(length)[2:].upper().zfill(4) # Convert to hex, ensure 4 digits
return length_hex
def handle_client(client_socket, client_address):
"""Handle a client connection."""
try:
# Receive data from the client
data = client_socket.recv(4096)
if not data:
logger.warning(f"No data received from {client_address}")
return
logger.debug(f"Received data from {client_address}: {data}")
# Decode the data and check for NULL message
plain_data = data.decode("utf-8", errors='ignore')
# Validate message integrity
if not "SIA-DCS" in plain_data or not "ADM-CID" in plain_data:
return
if '"NULL"' in plain_data:
logger.debug("NULL Message detected, sending ACK without relaying signal!")
orig_text = plain_data[15:].strip()
ack_text = '"ACK"' + orig_text
ack_text_bytes = ack_text.encode()
logger.debug(f"ACK Message: {ack_text}")
# Calculate CRC and format it
crc = CrcArc.calc(ack_text_bytes)
crcstr = str(hex(crc)).split('x')
crcstr = str(crcstr[1].upper())
if len(crcstr) == 2:
crcstr = '00' + crcstr
if len(crcstr) == 3:
crcstr = '0' + crcstr
# Calculate length and format it
length = str(hex(len(ack_text_bytes))).split('x')
logger.debug(f"CRC & Length: {crcstr}, 00{length[1]}")
# Construct the ACK message
ack_msg = '\n' + crcstr + '00' + length[1].upper() + ack_text + '\r'
# Send the ACK response
ack_bytes = bytes(ack_msg, 'ASCII')
client_socket.sendall(ack_bytes)
logger.debug(f"Response sent to client: {ack_msg.strip()}")
return
# Relay the data to all target hosts
responses = []
for target in TARGET_HOSTS:
logger.info(f"Relaying data to {target['host']}:{target['port']}")
response = relay_to_server(target['host'], target['port'], data)
if response:
responses.append(response)
logger.debug(f"Received response from {target['host']}:{target['port']}: {response}")
# Send only the first server's response back to the client
if responses:
client_socket.sendall(responses[0])
logger.info(f"Sent response to {client_address}: {responses[0]}")
else:
logger.warning(f"No responses received from target hosts for {client_address}")
except Exception as e:
logger.error(f"Error handling client {client_address}: {e}")
finally:
client_socket.close()
logger.debug(f"Closed connection with {client_address}")
def start_relay():
"""Start the TCP relay server."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
server_socket.bind((LISTEN_HOST, LISTEN_PORT))
server_socket.listen(5)
logger.info(f"Relay server started listening on {LISTEN_HOST}:{LISTEN_PORT}")
# Set a timer to stop the server after RUN_DURATION seconds
stop_time = time.time() + RUN_DURATION
logger.debug(f"Server will run for {RUN_DURATION} seconds.")
while time.time() < stop_time:
try:
# Set a timeout to periodically check if the run duration has elapsed
server_socket.settimeout(1)
client_socket, client_address = server_socket.accept()
logger.debug(f"Accepted connection from {client_address}")
client_thread = threading.Thread(target=handle_client, args=(client_socket, client_address))
client_thread.start()
except socket.timeout:
continue
except Exception as e:
logger.error(f"Error accepting connection: {e}")
logger.info("Server run duration elapsed. Shutting down.")
if __name__ == "__main__":
start_relay()