Patriot_Safey/safey.py

222 lines
7.5 KiB
Python

import msal
import requests
import time
import re
import traceback
import logging
import yaml
# Load YAML config
with open("config.yaml", "r") as f:
config = yaml.safe_load(f)
# Extract logging config
log_config = config.get("logging", {})
log_tofile = log_config.get("tofile", "true")
log_file = log_config.get("filename", "safey.log")
log_level = getattr(logging, log_config.get("level", "INFO").upper(), logging.INFO)
log_format = log_config.get("format", "%(asctime)s [%(levelname)s] %(message)s")
# Extract config
office365 = config.get("office365")
# Apply logging config
if log_config.get("tofile"):
logging.basicConfig(
filename=log_file,
level=log_level,
format=log_format,
)
else:
logging.basicConfig(
level=log_level,
format=log_format,
)
logging.info(f"Logging config: {log_config}")
logging.info(f"Office365 config: {office365}")
past_timestamp = 0
def login_ms365():
global headers
global POST_URL
# Azure AD credentials
CLIENT_ID = office365.get("clientid")
TENANT_ID = office365.get("tenantid")
USERNAME = office365.get("username")
PASSWORD = office365.get("password")
# Graph scopes and auth settings
AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}"
SCOPES = ["https://graph.microsoft.com/.default"]
# Destination for HTTP POST
POST_URL = "https://api.alarm24.no:7443/SIA"
# Acquire token
app = msal.PublicClientApplication(CLIENT_ID, authority=AUTHORITY)
token_response = app.acquire_token_by_username_password(
username=USERNAME,
password=PASSWORD,
scopes=SCOPES
)
if "access_token" not in token_response:
logging.error("❌ Failed to acquire token")
logging.error(token_response.get("error_description"))
exit(1)
access_token = token_response["access_token"]
headers = {
'Authorization': f'Bearer {access_token}',
'Accept': 'application/json'
}
logging.info("OAuth2 token refreshed!")
return headers
def remove_norwegian(text):
return (text.replace("æ", "ae")
.replace("ø", "o")
.replace("å", "a")
.replace("Æ", "Ae")
.replace("Ø", "O")
.replace("Å", "A"))
def normalize_string(input_string: str) -> str:
return re.sub(r'\s+', ' ', input_string).strip()
def parse_email():
# Fetch top 10 recent emails
response = requests.get(
"https://graph.microsoft.com/v1.0/me/mailFolders/inbox/messages?$top=10&$orderby=receivedDateTime asc",
headers=headers
)
if response.status_code != 200:
logging.error("❌ Failed to fetch emails:", response.text)
exit(1)
emails = response.json().get("value", [])
logging.info(f"Found {len(emails)} emails")
# Process matching emails
for email in emails:
subject = email.get("subject", "")
subject_match = re.search(r"#PROM:(\d+)\.?:\s*(.*)", subject)
message_id = email["id"]
# Get full message to read body
msg_detail = requests.get(
f"https://graph.microsoft.com/v1.0/me/messages/{message_id}",
headers=headers
)
if msg_detail.status_code != 200:
logging.error(f"❌ Failed to get email body: {msg_detail.text}")
continue
body_html = msg_detail.json().get("body", {}).get("content", "")
if subject == "Alarm Notification":
match = re.search(r"#PROM:(\d+)\.", body_html)
match2 = re.search(r'<div style="font-size:13px"><div>(.*?)<br>.*?</div></div>', body_html, re.DOTALL)
# Construct JSON
subject = remove_norwegian(subject)
payload = {
"clientid": remove_norwegian(match.group(1)),
"signal": 140,
"zone": 999,
"cameraLink": "TEXT" + remove_norwegian(match2.group(1).strip().replace(" ","_"))
}
elif subject_match:
client_id = int(subject_match.group(1))
signal_text = subject_match.group(2).strip().upper()
signal_text = normalize_string(signal_text)
match3 = re.search(r"utløst av(.*?\..*?)[\.\n<]", body_html, re.IGNORECASE)
if match3:
device = remove_norwegian(match3.group(1).strip())
logging.info(f"Sone tekst: {match3.group(1).strip()}")
else:
match3 = re.search(r"utløst av(.*?)[\.\n<]", body_html, re.IGNORECASE)
if match3:
device = remove_norwegian(match3.group(1).strip())
logging.info(f"Sone tekst: {match3.group(1).strip()}")
else:
device = ""
if "BRANNALARM UTLØST" in signal_text:
signal = 110
elif "BRANNSIREN STOPPET" in signal_text:
signal = 1110
elif "FIRE ALARM TRIGGERED" in signal_text:
signal = 110
elif "SIKKERHETSALARM UTLØST" in signal_text:
signal = 130
elif "SIKKERHETSALARM AVSLUTTET" in signal_text:
signal = 1130
elif "ENHETEN ER IKKE TILGJENGELIG" in signal_text:
signal = 147
elif "ENHET TILGJENGELIG" in signal_text:
signal = 1147
else:
signal = 147
signal_text = remove_norwegian(signal_text).replace(" ", "_")
device = signal_text.replace(" ","_") + "_" + device.replace(" ","_")
# Construct JSON
payload = {
"clientid": str(client_id),
"signal": signal,
"zone": 999,
"cameraLink": "TEXT" + remove_norwegian(device.replace(" ","_"))
}
else:
# Construct JSON
subject = remove_norwegian(subject)
payload = {
"clientid": 9000,
"signal": 140,
"zone": 999,
"cameraLink": "TEXT" + subject.replace(" ", "_")
}
logging.info(f"Subject: {subject}")
logging.info(f"Body preview: {body_html[:100]}...")
post_resp = requests.post(POST_URL, json=payload)
logging.info(f"📤 Sent to API: {payload}")
logging.info(f"📤 Response from API: {post_resp.status_code} - {post_resp.text}")
# Move the email to Deleted Items folder
move_resp = requests.post(
f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/move",
headers=headers,
json={"destinationId": "deleteditems"}
)
if move_resp.status_code == 201:
logging.info("🗑️ Email moved to Deleted Items.")
else:
logging.error(f"Failed to move email: {move_resp.status_code} - {move_resp.text}")
while True:
try:
if time.time() - past_timestamp > office365.get("token_ttl", 3600):
logging.info("OAuth2 token is expired or is about to expire, refreshing token...")
time.sleep(0.5)
login_ms365()
past_timestamp = time.time()
parse_email()
try:
requests.get("http://watchdog.sikkerhetsservice.no/checkin.php?private_key=YLPRR4uykjZoeQtCtfmjqTPephLf9ZMe")
except:
pass
except Exception as e:
logging.error(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Error: {e}")
requests.get(f"https://watchdog.sikkerhetsservice.no/warn.php?sys_name=A24_Safey&warn_type=sys&label=Error&warn_message={e}")
traceback.print_exc()
time.sleep(office365.get("interval", 30))