import msal import requests import time import re import traceback import logging import yaml # Load YAML config with open("config.yaml", "r") as f: config = yaml.safe_load(f) # Extract logging config log_config = config.get("logging", {}) log_tofile = log_config.get("tofile", "true") log_file = log_config.get("filename", "safey.log") log_level = getattr(logging, log_config.get("level", "INFO").upper(), logging.INFO) log_format = log_config.get("format", "%(asctime)s [%(levelname)s] %(message)s") # Extract config office365 = config.get("office365") # Apply logging config if log_config.get("tofile"): logging.basicConfig( filename=log_file, level=log_level, format=log_format, ) else: logging.basicConfig( level=log_level, format=log_format, ) logging.info(f"Logging config: {log_config}") logging.info(f"Office365 config: {office365}") past_timestamp = 0 def login_ms365(): global headers global POST_URL # Azure AD credentials CLIENT_ID = office365.get("clientid") TENANT_ID = office365.get("tenantid") USERNAME = office365.get("username") PASSWORD = office365.get("password") # Graph scopes and auth settings AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}" SCOPES = ["https://graph.microsoft.com/.default"] # Destination for HTTP POST POST_URL = "https://api.alarm24.no:7443/SIA" # Acquire token app = msal.PublicClientApplication(CLIENT_ID, authority=AUTHORITY) token_response = app.acquire_token_by_username_password( username=USERNAME, password=PASSWORD, scopes=SCOPES ) if "access_token" not in token_response: logging.error("❌ Failed to acquire token") logging.error(token_response.get("error_description")) exit(1) access_token = token_response["access_token"] headers = { 'Authorization': f'Bearer {access_token}', 'Accept': 'application/json' } logging.info("OAuth2 token refreshed!") return headers def remove_norwegian(text): return (text.replace("æ", "ae") .replace("ø", "o") .replace("å", "a") .replace("Æ", "Ae") .replace("Ø", "O") .replace("Å", "A")) def normalize_string(input_string: str) -> str: return re.sub(r'\s+', ' ', input_string).strip() def parse_email(): # Fetch top 10 recent emails response = requests.get( "https://graph.microsoft.com/v1.0/me/mailFolders/inbox/messages?$top=10&$orderby=receivedDateTime asc", headers=headers ) if response.status_code != 200: logging.error("❌ Failed to fetch emails:", response.text) exit(1) emails = response.json().get("value", []) logging.info(f"Found {len(emails)} emails") # Process matching emails for email in emails: subject = email.get("subject", "") subject_match = re.search(r"#PROM:(\d+)\.?:\s*(.*)", subject) message_id = email["id"] # Get full message to read body msg_detail = requests.get( f"https://graph.microsoft.com/v1.0/me/messages/{message_id}", headers=headers ) if msg_detail.status_code != 200: logging.error(f"❌ Failed to get email body: {msg_detail.text}") continue body_html = msg_detail.json().get("body", {}).get("content", "") if subject == "Alarm Notification": match = re.search(r"#PROM:(\d+)\.", body_html) match2 = re.search(r'