import msal import requests import time import re import traceback import logging import yaml # Load YAML config with open("config.yaml", "r") as f: config = yaml.safe_load(f) # Extract logging config log_config = config.get("logging", {}) log_tofile = log_config.get("tofile", "true") log_file = log_config.get("filename", "safey.log") log_level = getattr(logging, log_config.get("level", "INFO").upper(), logging.INFO) log_format = log_config.get("format", "%(asctime)s [%(levelname)s] %(message)s") # Extract config office365 = config.get("office365") # Apply logging config if log_config.get("tofile"): logging.basicConfig( filename=log_file, level=log_level, format=log_format, ) else: logging.basicConfig( level=log_level, format=log_format, ) logging.info(f"Logging config: {log_config}") logging.info(f"Office365 config: {office365}") past_timestamp = 0 def login_ms365(): global headers global POST_URL # Azure AD credentials CLIENT_ID = office365.get("clientid") TENANT_ID = office365.get("tenantid") USERNAME = office365.get("username") PASSWORD = office365.get("password") # Graph scopes and auth settings AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}" SCOPES = ["https://graph.microsoft.com/.default"] # Destination for HTTP POST POST_URL = "https://api.alarm24.no:7443/SIA" # Acquire token app = msal.PublicClientApplication(CLIENT_ID, authority=AUTHORITY) token_response = app.acquire_token_by_username_password( username=USERNAME, password=PASSWORD, scopes=SCOPES ) if "access_token" not in token_response: logging.error("❌ Failed to acquire token") logging.error(token_response.get("error_description")) exit(1) access_token = token_response["access_token"] headers = { 'Authorization': f'Bearer {access_token}', 'Accept': 'application/json' } logging.info("OAuth2 token refreshed!") return headers def remove_norwegian(text): return (text.replace("æ", "ae") .replace("ø", "o") .replace("å", "a") .replace("Æ", "Ae") .replace("Ø", "O") .replace("Å", "A")) def normalize_string(input_string: str) -> str: return re.sub(r'\s+', ' ', input_string).strip() def parse_email(): # Fetch top 10 recent emails response = requests.get( "https://graph.microsoft.com/v1.0/me/mailFolders/inbox/messages?$top=10&$orderby=receivedDateTime asc", headers=headers ) if response.status_code != 200: logging.error("❌ Failed to fetch emails:", response.text) exit(1) emails = response.json().get("value", []) logging.info(f"Found {len(emails)} emails") # Process matching emails for email in emails: subject = email.get("subject", "") subject_match = re.search(r"#PROM:(\d+)\.?:\s*(.*)", subject) message_id = email["id"] # Get full message to read body msg_detail = requests.get( f"https://graph.microsoft.com/v1.0/me/messages/{message_id}", headers=headers ) if msg_detail.status_code != 200: logging.error(f"❌ Failed to get email body: {msg_detail.text}") continue body_html = msg_detail.json().get("body", {}).get("content", "") if subject == "Alarm Notification": match = re.search(r"#PROM:(\d+)\.", body_html) match2 = re.search(r'
(.*?)
.*?
', body_html, re.DOTALL) # Construct JSON subject = remove_norwegian(subject) payload = { "clientid": remove_norwegian(match.group(1)), "signal": 140, "zone": 999, "cameraLink": "TEXT" + remove_norwegian(match2.group(1).strip().replace(" ","_")) } elif subject_match: client_id = int(subject_match.group(1)) signal_text = subject_match.group(2).strip().upper() signal_text = normalize_string(signal_text) match3 = re.search(r"utløst av(.*?\..*?)[\.\n<]", body_html, re.IGNORECASE) if match3: device = remove_norwegian(match3.group(1).strip()) logging.info(f"Sone tekst: {match3.group(1).strip()}") else: match3 = re.search(r"utløst av(.*?)[\.\n<]", body_html, re.IGNORECASE) if match3: device = remove_norwegian(match3.group(1).strip()) logging.info(f"Sone tekst: {match3.group(1).strip()}") else: device = "" if "BRANNALARM UTLØST" in signal_text: signal = 110 elif "BRANNSIREN STOPPET" in signal_text: signal = 1110 elif "FIRE ALARM TRIGGERED" in signal_text: signal = 110 elif "SIKKERHETSALARM UTLØST" in signal_text: signal = 130 elif "SIKKERHETSALARM AVSLUTTET" in signal_text: signal = 1130 elif "ENHETEN ER IKKE TILGJENGELIG" in signal_text: signal = 147 elif "ENHET TILGJENGELIG" in signal_text: signal = 1147 else: signal = 147 signal_text = remove_norwegian(signal_text).replace(" ", "_") device = signal_text.replace(" ","_") + "_" + device.replace(" ","_") # Construct JSON payload = { "clientid": str(client_id), "signal": signal, "zone": 999, "cameraLink": "TEXT" + remove_norwegian(device.replace(" ","_")) } else: # Construct JSON subject = remove_norwegian(subject) payload = { "clientid": 9000, "signal": 140, "zone": 999, "cameraLink": "TEXT" + subject.replace(" ", "_") } logging.info(f"Subject: {subject}") logging.info(f"Body preview: {body_html[:100]}...") post_resp = requests.post(POST_URL, json=payload) logging.info(f"📤 Sent to API: {payload}") logging.info(f"📤 Response from API: {post_resp.status_code} - {post_resp.text}") # Move the email to Deleted Items folder move_resp = requests.post( f"https://graph.microsoft.com/v1.0/me/messages/{message_id}/move", headers=headers, json={"destinationId": "deleteditems"} ) if move_resp.status_code == 201: logging.info("🗑️ Email moved to Deleted Items.") else: logging.error(f"Failed to move email: {move_resp.status_code} - {move_resp.text}") while True: try: if time.time() - past_timestamp > office365.get("token_ttl", 3600): logging.info("OAuth2 token is expired or is about to expire, refreshing token...") time.sleep(0.5) login_ms365() past_timestamp = time.time() parse_email() try: requests.get("http://watchdog.sikkerhetsservice.no/checkin.php?private_key=YLPRR4uykjZoeQtCtfmjqTPephLf9ZMe") except: pass except Exception as e: logging.error(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Error: {e}") requests.get(f"https://watchdog.sikkerhetsservice.no/warn.php?sys_name=A24_Safey&warn_type=sys&label=Error&warn_message={e}") traceback.print_exc() time.sleep(office365.get("interval", 30))