import mysql.connector from influxdb_client import InfluxDBClient, Point, WritePrecision from datetime import datetime import pytz import time import logging import yaml # Load YAML config with open("config.yaml", "r") as f: config = yaml.safe_load(f) # Extract logging config log_config = config.get("logging", {}) log_tofile = log_config.get("tofile", "true") log_file = log_config.get("filename", "safey.log") log_level = getattr(logging, log_config.get("level", "INFO").upper(), logging.INFO) log_format = log_config.get("format", "%(asctime)s [%(levelname)s] %(message)s") general_config = config.get("general", {}) influx_config = config.get("influx", {}) mariadb_config = config.get("mariadb", {}) # Apply logging config if log_config.get("tofile"): logging.basicConfig( filename=log_file, level=log_level, format=log_format, ) else: logging.basicConfig( level=log_level, format=log_format, ) logging.info(f"Logging config: {log_config}") # PBXact MariaDB settings db_config = { 'user': mariadb_config.get("user"), 'password': mariadb_config.get("password"), 'host': mariadb_config.get("host"), 'database': mariadb_config.get("database"), } # InfluxDB settings influx_url = influx_config.get("url") influx_token = influx_config.get("secret") influx_org = influx_config.get("org") influx_bucket = influx_config.get("bucket") # Fetch last todays calls from MariaDB def fetch_cdrs(): conn = mysql.connector.connect(**db_config) cursor = conn.cursor(dictionary=True) today = datetime.today().strftime('%Y-%m-%d') query = f""" SELECT calldate, src, dst, disposition, duration, did, channel, outbound_cnum, lastapp FROM cdr WHERE calldate >= '{today} 00:00:00' ORDER BY calldate DESC """ cursor.execute(query) results = cursor.fetchall() cursor.close() conn.close() return results # Write to InfluxDB def write_to_influx(calls): client = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org, verify_ssl=False) write_api = client.write_api() cet = pytz.timezone('Europe/Oslo') counter = 0 for call in calls: try: dt_cest = cet.localize(datetime.strptime(str(call['calldate']), "%Y-%m-%d %H:%M:%S")) dt_utc = dt_cest.astimezone(pytz.utc) ts_ns = int(dt_utc.timestamp() * 1_000_000_000) counter = counter + 1 logging.debug(f""" ### Result: {counter}, Caller: {call['src']}, Dest: {call['dst']}, Duration: {call['duration']}, App: {call['lastapp']}, Channel: {call['channel']}, time: {call['calldate']} CEST / {dt_utc} UTC / {ts_ns} UNIX ###""") point = ( Point("pbx_call") .tag("src", call["src"]) .tag("dst", call["dst"]) .tag("disposition", call["disposition"]) .tag("did", call["did"]) .tag("channel", call["channel"]) .tag("outbound_cnum", call["outbound_cnum"]) .tag("lastapp", call["lastapp"]) .field("duration", int(call["duration"])) .time(ts_ns) ) write_api.write(bucket=influx_bucket, org=influx_org, record=point) except Exception as e: logging.info("Error with call:", call, "Error:", e) logging.info(f"""Result: {counter}""") client.close() write_api.close() # Run if __name__ == "__main__": while True: logging.info("Updating call history.") calls = fetch_cdrs() write_to_influx(calls) logging.info(f"Sleeping for {general_config.get("interval")} seconds...") time.sleep(general_config.get("interval"))