This commit is contained in:
commit
0e04d721f6
|
|
@ -0,0 +1,163 @@
|
|||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
import os
|
||||
import sys
|
||||
import socket
|
||||
import datetime
|
||||
import struct
|
||||
from crccheck.crc import Crc16, CrcArc
|
||||
import binascii
|
||||
|
||||
class StringMaker:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def send_alarm(self, alarm_command, user_id, output_coordinates, key, host, port):
|
||||
key = bytes.fromhex(key)
|
||||
now = datetime.datetime.now()
|
||||
date_part = now.strftime("%m-%d-%Y")
|
||||
time_part = now.strftime("%H:%M:%S")
|
||||
input_string = f'"*SIA-DCS"0005L0#{user_id}[#{user_id}|{alarm_command}]{output_coordinates}_{time_part},{date_part}'
|
||||
|
||||
print(f"Input String: {input_string}")
|
||||
|
||||
part_before, part_after = input_string.split('[', 1)
|
||||
part_before = f"{part_before}["
|
||||
part_after = "|" + part_after
|
||||
|
||||
encryption_part = self.add_padding(part_after)
|
||||
print(f"Padded Input: {encryption_part}")
|
||||
|
||||
encrypted_hex = self.encrypt(encryption_part, key)
|
||||
print(f"Encrypted Hex: {encrypted_hex}")
|
||||
|
||||
before_and_after = f"{part_before}{encrypted_hex}"
|
||||
|
||||
crc_hash = self.calculate_crc(before_and_after.encode())
|
||||
with_hash = f"{crc_hash}{before_and_after}"
|
||||
|
||||
with_line_feed_and_return = f"\u000A{with_hash}\u000D"
|
||||
message_bytes = with_line_feed_and_return.encode()
|
||||
|
||||
tcp_response = self.send_bytes_with_tcp(host, port, message_bytes)
|
||||
return tcp_response.decode() if tcp_response is not None else "No data from Patriot"
|
||||
|
||||
def to_ascii_hex(self, byte_array):
|
||||
return byte_array.hex()
|
||||
|
||||
|
||||
def add_padding(self, input_string):
|
||||
# Convert the input string to bytes
|
||||
input_bytes = input_string.encode()
|
||||
# Calculate the pad length
|
||||
pad_length = 16 - len(input_bytes) % 16
|
||||
pad_length = pad_length if pad_length != 16 else 0
|
||||
# Generate padding with ASCII characters
|
||||
padding_ascii = bytes([i % 26 + 65 for i in range(pad_length)])
|
||||
# Concatenate padding and input bytes
|
||||
padded_bytes = padding_ascii + input_bytes
|
||||
|
||||
return padded_bytes
|
||||
|
||||
def get_string_after_first_occurrence(self, input_string, character):
|
||||
index = input_string.find(character)
|
||||
return input_string[index + 1:] if index != -1 else ""
|
||||
|
||||
def encrypt(self, input_bytes, key):
|
||||
key_bytes = key
|
||||
data_bytes = input_bytes
|
||||
|
||||
# Use the same initialization vector (IV) as in your decryption function
|
||||
iv = b'\x00' * 16
|
||||
|
||||
cipher = Cipher(algorithms.AES(key_bytes), modes.CBC(iv), backend=default_backend())
|
||||
encryptor = cipher.encryptor()
|
||||
|
||||
encrypted_data = encryptor.update(data_bytes) + encryptor.finalize()
|
||||
|
||||
return binascii.hexlify(encrypted_data).decode('ascii')
|
||||
|
||||
def decrypt(self, input_bytes, key):
|
||||
cipher = Cipher(algorithms.AES(key), modes.CFB(os.urandom(16)), backend=default_backend())
|
||||
decryptor = cipher.decryptor()
|
||||
decrypted = decryptor.update(input_bytes) + decryptor.finalize()
|
||||
return decrypted.decode()
|
||||
|
||||
def send_bytes_with_tcp(self, ip_address, port, data):
|
||||
try:
|
||||
with socket.create_connection((ip_address, port), timeout=5) as client:
|
||||
client.sendall(data)
|
||||
response = client.recv(1024)
|
||||
return response
|
||||
except Exception as ex:
|
||||
print(f"Error: {ex}")
|
||||
return None
|
||||
|
||||
def convert_to_dms_format(self, dd, direction):
|
||||
degrees = int(abs(dd))
|
||||
minutes = (abs(dd) - degrees) * 60
|
||||
seconds = (minutes - int(minutes)) * 60
|
||||
sign = 'W' if dd < 0 and direction == "X" else 'S' if dd < 0 and direction == "Y" else 'E' if direction == "X" else 'N'
|
||||
return f"{direction}{degrees:03d}{sign}{int(minutes):02d}.{int(round(seconds * 10000000)):07d}"
|
||||
|
||||
def calculate_crc(self, input_bytes):
|
||||
crc_arc = CrcArc()
|
||||
crc_arc.process(input_bytes)
|
||||
crc_value_hex = crc_arc.finalhex()
|
||||
return f"{crc_value_hex.upper()}{len(input_bytes):04X}"
|
||||
|
||||
arg1 = sys.argv[1] if len(sys.argv) > 1 else None
|
||||
arg2 = sys.argv[2] if len(sys.argv) > 2 else None
|
||||
arg3 = sys.argv[3] if len(sys.argv) > 3 else None
|
||||
host = sys.argv[4] if len(sys.argv) > 4 else None
|
||||
arg5 = sys.argv[5] if len(sys.argv) > 5 else None
|
||||
signalType = sys.argv[6] if len(sys.argv) > 6 else "PA"
|
||||
zone = sys.argv[7] if len(sys.argv) > 7 else "00"
|
||||
arg8 = sys.argv[8] if len(sys.argv) > 8 else "mysmallkey123456"
|
||||
port = int(arg5)
|
||||
|
||||
def main():
|
||||
global host
|
||||
global port
|
||||
global arg1
|
||||
global arg2
|
||||
global arg3
|
||||
global signalType
|
||||
global zone
|
||||
string_maker = StringMaker()
|
||||
key = arg8.encode().hex().upper()
|
||||
print(key)
|
||||
ClientID = arg1
|
||||
SignalType = "Nri/" + signalType
|
||||
Zone = zone
|
||||
SignalAndZone = SignalType + Zone
|
||||
PosistionLat = arg3
|
||||
PositionLong = arg2
|
||||
|
||||
def convert_coordinates(arg2, arg3):
|
||||
# Extract numerical values for longitude and latitude
|
||||
lon_value = float(arg3)
|
||||
lat_value = float(arg2)
|
||||
|
||||
# Convert longitude to the desired format
|
||||
lon_degrees = int(lon_value)
|
||||
lon_minutes = (lon_value - lon_degrees) * 60
|
||||
lon_format = f"[X{lon_degrees:03d}E{lon_minutes:.8f}]"
|
||||
|
||||
# Convert latitude to the desired format
|
||||
lat_degrees = int(lat_value)
|
||||
lat_minutes = (lat_value - lat_degrees) * 60
|
||||
lat_format = f"[Y{lat_degrees:02d}N{lat_minutes:.8f}]"
|
||||
|
||||
# Combine the formatted coordinates
|
||||
result = lon_format + lat_format
|
||||
|
||||
return result
|
||||
|
||||
output_coordinates = convert_coordinates(arg2, arg3)
|
||||
result = string_maker.send_alarm(SignalAndZone, int(ClientID), output_coordinates, key, host, port)
|
||||
print(result)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -0,0 +1,145 @@
|
|||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
import os
|
||||
import socket
|
||||
import datetime
|
||||
import struct
|
||||
from crccheck.crc import Crc16, CrcArc
|
||||
import binascii
|
||||
|
||||
class StringMaker:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def send_alarm(self, alarm_command, user_id, longitude, latitude, key, host, port):
|
||||
key = bytes.fromhex(key)
|
||||
now = datetime.datetime.now()
|
||||
date_part = now.strftime("%m-%d-%Y")
|
||||
time_part = now.strftime("%H:%M:%S")
|
||||
output_coordinates = self.convert_coordinates(latitude, longitude)
|
||||
input_string = f'"*SIA-DCS"0005L0#{user_id}[{user_id}|{alarm_command}]{output_coordinates}_{time_part},{date_part}'
|
||||
|
||||
|
||||
|
||||
part_before, part_after = input_string.split('[', 1)
|
||||
part_before = f"{part_before}["
|
||||
#part_before = f'"*SIA-DCS"0021L0#9999['
|
||||
part_after = "|" + part_after
|
||||
#part_after = '|9999|Nri1/BA01][Vwww.google.no]_12:10:07,12-13-2024'
|
||||
|
||||
encryption_part = self.add_padding(part_after)
|
||||
print(f"Padded Input: {encryption_part}")
|
||||
|
||||
encrypted_hex = self.encrypt(encryption_part, key)
|
||||
print(f"Encrypted Hex: {encrypted_hex}")
|
||||
|
||||
before_and_after = f"{part_before}{encrypted_hex}"
|
||||
|
||||
crc_hash = self.calculate_crc(before_and_after.encode())
|
||||
with_hash = f"{crc_hash}{before_and_after}"
|
||||
|
||||
with_line_feed_and_return = f"\r\n{with_hash}\r"
|
||||
message_bytes = with_line_feed_and_return.encode()
|
||||
|
||||
print(message_bytes)
|
||||
|
||||
tcp_response = self.send_bytes_with_tcp(host, port, message_bytes)
|
||||
return tcp_response.decode() if tcp_response is not None else "No data from Patriot"
|
||||
|
||||
def to_ascii_hex(self, byte_array):
|
||||
return byte_array.hex()
|
||||
|
||||
|
||||
def add_padding(self, input_string):
|
||||
# Convert the input string to bytes
|
||||
input_bytes = input_string.encode()
|
||||
# Calculate the pad length
|
||||
pad_length = 16 - len(input_bytes) % 16
|
||||
pad_length = pad_length if pad_length != 16 else 0
|
||||
# Generate padding with ASCII characters
|
||||
padding_ascii = bytes([i % 26 + 65 for i in range(pad_length)])
|
||||
# Concatenate padding and input bytes
|
||||
padded_bytes = padding_ascii + input_bytes
|
||||
|
||||
return padded_bytes
|
||||
|
||||
def get_string_after_first_occurrence(self, input_string, character):
|
||||
index = input_string.find(character)
|
||||
return input_string[index + 1:] if index != -1 else ""
|
||||
|
||||
def encrypt(self, input_bytes, key):
|
||||
key_bytes = key
|
||||
data_bytes = input_bytes
|
||||
|
||||
# Use the same initialization vector (IV) as in your decryption function
|
||||
iv = b'\x00' * 16
|
||||
|
||||
cipher = Cipher(algorithms.AES(key_bytes), modes.CBC(iv), backend=default_backend())
|
||||
encryptor = cipher.encryptor()
|
||||
|
||||
encrypted_data = encryptor.update(data_bytes) + encryptor.finalize()
|
||||
|
||||
return binascii.hexlify(encrypted_data).decode('ascii')
|
||||
|
||||
def decrypt(self, input_bytes, key):
|
||||
cipher = Cipher(algorithms.AES(key), modes.CFB(os.urandom(16)), backend=default_backend())
|
||||
decryptor = cipher.decryptor()
|
||||
decrypted = decryptor.update(input_bytes) + decryptor.finalize()
|
||||
return decrypted.decode()
|
||||
|
||||
def send_bytes_with_tcp(self, ip_address, port, data):
|
||||
try:
|
||||
with socket.create_connection((ip_address, port), timeout=5) as client:
|
||||
client.sendall(data)
|
||||
response = client.recv(1024)
|
||||
return response
|
||||
except Exception as ex:
|
||||
print(f"Error: {ex}")
|
||||
return None
|
||||
|
||||
def convert_coordinates(self, latitude, longitude):
|
||||
# Extract numerical values for longitude and latitude
|
||||
lon_value = float(longitude)
|
||||
lat_value = float(latitude)
|
||||
|
||||
# Convert longitude to the desired format
|
||||
lon_degrees = int(lon_value)
|
||||
lon_minutes = (lon_value - lon_degrees) * 60
|
||||
lon_format = f"[X{lon_degrees:03d}E{lon_minutes:.8f}]"
|
||||
|
||||
# Convert latitude to the desired format
|
||||
lat_degrees = int(lat_value)
|
||||
lat_minutes = (lat_value - lat_degrees) * 60
|
||||
lat_format = f"[Y{lat_degrees:02d}N{lat_minutes:.8f}]"
|
||||
|
||||
# Combine the formatted coordinates
|
||||
result = lon_format + lat_format
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def calculate_crc(self, input_bytes):
|
||||
crc_arc = CrcArc()
|
||||
crc_arc.process(input_bytes)
|
||||
crc_value_hex = crc_arc.finalhex()
|
||||
return f"{crc_value_hex.upper()}{len(input_bytes):04X}"
|
||||
|
||||
|
||||
def main():
|
||||
string_maker = StringMaker()
|
||||
key = "594162417237323352466D3964673233"
|
||||
host = "213.167.121.142"
|
||||
port = 20180
|
||||
ClientID = input("ClientID: ")
|
||||
SignalType = "Nri/" + input("SignalType: ")
|
||||
Zone = input("Zone: ")
|
||||
SignalAndZone = SignalType + Zone
|
||||
PosistionLat = input("Latitude: ")
|
||||
PositionLong = input("Longitude: ")
|
||||
|
||||
result = string_maker.send_alarm(SignalAndZone, int(ClientID), float(PosistionLat), float(PositionLong), key, host, port)
|
||||
print(result)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -0,0 +1,76 @@
|
|||
import os
|
||||
import sys
|
||||
import socket
|
||||
from time import sleep
|
||||
from crccheck.crc import Crc16, CrcArc
|
||||
|
||||
# Signal Types
|
||||
# Send with image: "SIA-DCS"0001L0#9998[9998|Nri1/BA01^Dette er sone 1^][Vhttps://granitebeltchristmasfarm.com.au/wp-content/uploads/2023/08/Grinch-1.jpg]
|
||||
# PANIC: "SIA-DCS"0001L0#1234[1234|Nri1/PA001][X010E57.5698080][Y59N12.8358840]
|
||||
# PANIC RESTORE: "SIA-DCS"0001L0#1234[1234|Nri1/PR001][X010E57.5698080][Y59N12.8358840]
|
||||
# Set: "SIA-DCS"0001L0#1234[1234|Nri1/CL001]
|
||||
# Unset: "SIA-DCS"0001L0#1234[1234|Nri1/OP001]
|
||||
# Test Report: "SIA-DCS"0001L0#1234[1234|Nri1/RP0]
|
||||
|
||||
def sendSia(type, host, port):
|
||||
#type = '"NULL"0000L0#107504[]'
|
||||
data = type.encode()
|
||||
crc = CrcArc.calc(data)
|
||||
crcstr = str(hex(crc)).split('x')
|
||||
crcstr = str(crcstr[1].upper())
|
||||
if len(crcstr) == 2:
|
||||
crcstr = '00' + crcstr
|
||||
if len(crcstr) == 3:
|
||||
crcstr = '0' + crcstr
|
||||
length = str(hex(len(data))).split('x')
|
||||
client_socket = socket.socket()
|
||||
client_socket.connect((host, port))
|
||||
client_socket.settimeout(10)
|
||||
text = '\n' + crcstr + '00' + length[1].upper() + type + '\r'
|
||||
print('Sent signal: ' + f'{text}')
|
||||
message = bytes(text, 'ASCII')
|
||||
client_socket.send(message)
|
||||
data = client_socket.recv(64).decode()
|
||||
print("Received from server: " + data)
|
||||
client_socket.close()
|
||||
|
||||
def clear_screen():
|
||||
os.system('cls' if os.name == 'nt' else 'clear')
|
||||
|
||||
host = ""
|
||||
port = ""
|
||||
def program():
|
||||
print('### SIA-DCS TESTER v1.0 ###\n')
|
||||
global host
|
||||
global port
|
||||
if not host:
|
||||
host = input('Enter reciever IP: ')
|
||||
port = int(input('Enter reciever port: '))
|
||||
print('1. SIA-DCS(Auto Generated) | 2. SIA-DCS (Full String)')
|
||||
inputType = input('Select signal type: ')
|
||||
if inputType == '1':
|
||||
id = input('Enter Client ID: ')
|
||||
signalType = input('Enter signal type: ')
|
||||
zone = input('Enter zone number: ')
|
||||
if signalType == "BA":
|
||||
image = input('Enter image URL: ')
|
||||
else:
|
||||
image = ""
|
||||
type = '"SIA-DCS"0005L0#' + id + '[' + id + '|Nri1/' + signalType + zone + '][V' + image + ']'
|
||||
if inputType == '2':
|
||||
print('Example string: "SIA-DCS"0005L0#1234[1234|Nri1/RP0]')
|
||||
type = input('Enter full SIA string: ')
|
||||
while inputType not in ['1', '2']:
|
||||
program()
|
||||
|
||||
sendSia(type, host, port)
|
||||
|
||||
check = input('\nSend new signal? y/n: ')
|
||||
if check == 'y':
|
||||
clear_screen()
|
||||
program()
|
||||
else:
|
||||
clear_screen()
|
||||
pass
|
||||
|
||||
program()
|
||||
Binary file not shown.
|
|
@ -0,0 +1,49 @@
|
|||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
import binascii
|
||||
|
||||
def aes_decrypt(encrypted_hex, key):
|
||||
key_bytes = bytes.fromhex(key)
|
||||
encrypted_data = binascii.unhexlify(encrypted_hex)
|
||||
|
||||
cipher = Cipher(algorithms.AES(key_bytes), modes.CBC(b'\x00' * 16), backend=default_backend())
|
||||
decryptor = cipher.decryptor()
|
||||
|
||||
decrypted_data = decryptor.update(encrypted_data) + decryptor.finalize()
|
||||
|
||||
return decrypted_data.decode('utf-8')
|
||||
|
||||
# Your encrypted hex string and key
|
||||
#encrypted_hex = "07132d209c8b798a970ee5022d720090f3a7f1f538955d9936d6b42fe48a199c35f98ebcf781791d3ecaa1f6bc4434818af942ff610459fbd4bf698c2086d30c259f84cd3bed2cfcbebd375e4ef852b4"
|
||||
encrypted_hex = "f8cac9f2d70056463ce528cf2ae836f4a411ae3453cf77ba6cea7060d59dc601f27ef9c59092a18257ccf3e6d1832c101b2999071ac183a1dd52ceff95f4e650"
|
||||
key = "594162417237323352466D3964673233"
|
||||
|
||||
# Decrypt the data
|
||||
decrypted_data = aes_decrypt(encrypted_hex, key)
|
||||
|
||||
print("Decrypted Data:", decrypted_data)
|
||||
|
||||
|
||||
|
||||
def aes_encrypt(data, key):
|
||||
key_bytes = bytes.fromhex(key)
|
||||
data_bytes = data.encode('ascii')
|
||||
|
||||
# Use the same initialization vector (IV) as in your decryption function
|
||||
iv = b'\x00' * 16
|
||||
|
||||
cipher = Cipher(algorithms.AES(key_bytes), modes.CBC(iv), backend=default_backend())
|
||||
encryptor = cipher.encryptor()
|
||||
|
||||
encrypted_data = encryptor.update(data_bytes) + encryptor.finalize()
|
||||
|
||||
return binascii.hexlify(encrypted_data).decode('ascii')
|
||||
|
||||
# Your original decrypted data and key
|
||||
key = "535A6D6766584B4C75314238444E3237"
|
||||
theData = "ABCDEFGHIJKL|9999|Nri1/BA01]_17:04:17,12-11-2024"
|
||||
|
||||
# Encrypt the data
|
||||
encrypted_data = aes_encrypt(theData, key)
|
||||
|
||||
print("Encrypted Data:", encrypted_data)
|
||||
|
|
@ -0,0 +1,5 @@
|
|||
Div. script som sender TCP pakker med SIA-DC09 formatert data. Støtter også ADM-CID over SIA-DC09.
|
||||
|
||||
Testet mot Patriot og ESI.
|
||||
|
||||
MERK! Kryptert er utviklet spesielt mot Patriot, ikke testet mot andre mottaks program.
|
||||
|
|
@ -0,0 +1,149 @@
|
|||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
import os
|
||||
import sys
|
||||
import socket
|
||||
import datetime
|
||||
import struct
|
||||
from crccheck.crc import Crc16, CrcArc
|
||||
import binascii
|
||||
|
||||
class StringMaker:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def send_alarm(self, alarm_command, user_id, key, host, port, V):
|
||||
key = bytes.fromhex(key)
|
||||
now = datetime.datetime.now()
|
||||
date_part = now.strftime("%m-%d-%Y")
|
||||
time_part = now.strftime("%H:%M:%S")
|
||||
input_string = f'"*ADM-CID"0000RA2L0000#{user_id}[#{user_id}|{alarm_command}][V{V}]_{time_part},{date_part}'
|
||||
|
||||
print(f"Input String: {input_string}")
|
||||
|
||||
part_before, part_after = input_string.split('[', 1)
|
||||
part_before = f"{part_before}["
|
||||
part_after = "|" + part_after
|
||||
|
||||
encryption_part = self.add_padding(part_after)
|
||||
print(f"Padded Input: {encryption_part}")
|
||||
|
||||
encrypted_hex = self.encrypt(encryption_part, key)
|
||||
print(f"Encrypted Hex: {encrypted_hex}")
|
||||
|
||||
before_and_after = f"{part_before}{encrypted_hex}"
|
||||
|
||||
crc_hash = self.calculate_crc(before_and_after.encode())
|
||||
with_hash = f"{crc_hash}{before_and_after}"
|
||||
|
||||
with_line_feed_and_return = f"\u000A{with_hash}\u000D"
|
||||
message_bytes = with_line_feed_and_return.encode()
|
||||
|
||||
tcp_response = self.send_bytes_with_tcp(host, port, message_bytes)
|
||||
return tcp_response.decode() if tcp_response is not None else "No data from Patriot"
|
||||
|
||||
def to_ascii_hex(self, byte_array):
|
||||
return byte_array.hex()
|
||||
|
||||
|
||||
def add_padding(self, input_string):
|
||||
# Convert the input string to bytes
|
||||
input_bytes = input_string.encode()
|
||||
# Calculate the pad length
|
||||
pad_length = 16 - len(input_bytes) % 16
|
||||
pad_length = pad_length if pad_length != 16 else 0
|
||||
# Generate padding with ASCII characters
|
||||
padding_ascii = bytes([i % 26 + 65 for i in range(pad_length)])
|
||||
# Concatenate padding and input bytes
|
||||
padded_bytes = padding_ascii + input_bytes
|
||||
|
||||
return padded_bytes
|
||||
|
||||
def get_string_after_first_occurrence(self, input_string, character):
|
||||
index = input_string.find(character)
|
||||
return input_string[index + 1:] if index != -1 else ""
|
||||
|
||||
def encrypt(self, input_bytes, key):
|
||||
key_bytes = key
|
||||
data_bytes = input_bytes
|
||||
|
||||
# Use the same initialization vector (IV) as in your decryption function
|
||||
iv = b'\x00' * 16
|
||||
|
||||
cipher = Cipher(algorithms.AES(key_bytes), modes.CBC(iv), backend=default_backend())
|
||||
encryptor = cipher.encryptor()
|
||||
|
||||
encrypted_data = encryptor.update(data_bytes) + encryptor.finalize()
|
||||
|
||||
return binascii.hexlify(encrypted_data).decode('ascii')
|
||||
|
||||
def decrypt(self, input_bytes, key):
|
||||
cipher = Cipher(algorithms.AES(key), modes.CFB(os.urandom(16)), backend=default_backend())
|
||||
decryptor = cipher.decryptor()
|
||||
decrypted = decryptor.update(input_bytes) + decryptor.finalize()
|
||||
return decrypted.decode()
|
||||
|
||||
def send_bytes_with_tcp(self, ip_address, port, data):
|
||||
try:
|
||||
with socket.create_connection((ip_address, port), timeout=5) as client:
|
||||
client.sendall(data)
|
||||
response = client.recv(1024)
|
||||
return response
|
||||
except Exception as ex:
|
||||
print(f"Error: {ex}")
|
||||
return None
|
||||
|
||||
def convert_to_dms_format(self, dd, direction):
|
||||
degrees = int(abs(dd))
|
||||
minutes = (abs(dd) - degrees) * 60
|
||||
seconds = (minutes - int(minutes)) * 60
|
||||
sign = 'W' if dd < 0 and direction == "X" else 'S' if dd < 0 and direction == "Y" else 'E' if direction == "X" else 'N'
|
||||
return f"{direction}{degrees:03d}{sign}{int(minutes):02d}.{int(round(seconds * 10000000)):07d}"
|
||||
|
||||
def calculate_crc(self, input_bytes):
|
||||
crc_arc = CrcArc()
|
||||
crc_arc.process(input_bytes)
|
||||
crc_value_hex = crc_arc.finalhex()
|
||||
return f"{crc_value_hex.upper()}{len(input_bytes):04X}"
|
||||
|
||||
arg1 = sys.argv[1] if len(sys.argv) > 1 else None
|
||||
arg2 = sys.argv[2] if len(sys.argv) > 2 else None
|
||||
arg3 = sys.argv[3] if len(sys.argv) > 3 else None
|
||||
host = sys.argv[4] if len(sys.argv) > 4 else None
|
||||
arg5 = sys.argv[5] if len(sys.argv) > 5 else None
|
||||
signalType = sys.argv[6] if len(sys.argv) > 6 else "PA"
|
||||
zone = sys.argv[7] if len(sys.argv) > 7 else "00"
|
||||
arg8 = sys.argv[8] if len(sys.argv) > 8 else "0"
|
||||
arg9 = sys.argv[9] if len(sys.argv) > 9 else "0"
|
||||
arg10 = sys.argv[10] if len(sys.argv) > 10 else "0"
|
||||
V = sys.argv[11] if len(sys.argv) > 11 else None
|
||||
port = int(arg5)
|
||||
|
||||
def main():
|
||||
global host
|
||||
global port
|
||||
global arg1
|
||||
global arg2
|
||||
global arg3
|
||||
global signalType
|
||||
global zone
|
||||
global arg8
|
||||
global arg9
|
||||
global V
|
||||
string_maker = StringMaker()
|
||||
key = arg10.encode().hex().upper()
|
||||
ClientID = arg1
|
||||
SignalType = signalType
|
||||
Zone = zone
|
||||
if len(Zone) == 1:
|
||||
Zone = "00" + Zone
|
||||
if len(Zone) == 2:
|
||||
Zone = "0" + Zone
|
||||
SignalAndZone = SignalType + " 00 " + Zone
|
||||
|
||||
result = string_maker.send_alarm(SignalAndZone, int(ClientID), key, host, port, V)
|
||||
print(result)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -0,0 +1,70 @@
|
|||
import os
|
||||
import sys
|
||||
import socket
|
||||
import time
|
||||
from time import sleep
|
||||
from crccheck.crc import CrcArc
|
||||
from datetime import datetime
|
||||
|
||||
# Signal Types
|
||||
# Send with image: "SIA-DCS"0001L0#999
|
||||
|
||||
def sendSia(type, host, port):
|
||||
data = type.encode()
|
||||
crc = CrcArc.calc(data)
|
||||
crcstr = str(hex(crc)).split('x')
|
||||
crcstr = str(crcstr[1].upper())
|
||||
if len(crcstr) == 2:
|
||||
crcstr = '00' + crcstr
|
||||
if len(crcstr) == 3:
|
||||
crcstr = '0' + crcstr
|
||||
length = str(hex(len(data))).split('x')
|
||||
client_socket = socket.socket()
|
||||
client_socket.connect((host, port))
|
||||
client_socket.settimeout(10)
|
||||
text = '\u000A' + crcstr + '00' + length[1].upper() + type + '\u000D'
|
||||
print('Sent signal: ' + text)
|
||||
message = bytes(text, 'ASCII')
|
||||
client_socket.send(message)
|
||||
data = client_socket.recv(64).decode()
|
||||
print("Received from server: " + data)
|
||||
client_socket.close()
|
||||
|
||||
arg1 = sys.argv[1] if len(sys.argv) > 1 else None
|
||||
arg2 = sys.argv[2] if len(sys.argv) > 2 else None
|
||||
arg3 = sys.argv[3] if len(sys.argv) > 3 else None
|
||||
host = sys.argv[4] if len(sys.argv) > 4 else None
|
||||
arg5 = sys.argv[5] if len(sys.argv) > 5 else None
|
||||
signalType = sys.argv[6] if len(sys.argv) > 6 else "PA"
|
||||
zone = sys.argv[7] if len(sys.argv) > 7 else "00"
|
||||
arg8 = sys.argv[8] if len(sys.argv) > 8 else ""
|
||||
arg9 = sys.argv[9] if len(sys.argv) > 9 else "0"
|
||||
arg10 = sys.argv[10] if len(sys.argv) > 10 else None
|
||||
port = int(arg5)
|
||||
|
||||
def program():
|
||||
global host
|
||||
global port
|
||||
global arg1
|
||||
global arg2
|
||||
global arg3
|
||||
global signalType
|
||||
global zone
|
||||
global arg8
|
||||
global arg9
|
||||
global arg10
|
||||
current_time_milliseconds = int(time.time() * 1000)
|
||||
|
||||
Zone = zone
|
||||
if len(Zone) == 1:
|
||||
Zone = "00" + Zone
|
||||
if len(Zone) == 2:
|
||||
Zone = "0" + Zone
|
||||
|
||||
# convert timestamp to string in dd-mm-yyyy HH:MM:SS
|
||||
date_time = datetime.now()
|
||||
str_date_time = date_time.strftime("_%H:%M:%S,%d-%m-%Y")
|
||||
type = f'"ADM-CID"0000R00L0000#{arg1}[#{arg1}|{signalType} 00 {Zone}][V{arg10}?time={current_time_milliseconds}]{str_date_time}'
|
||||
sendSia(type, host, port)
|
||||
|
||||
program()
|
||||
Loading…
Reference in New Issue