commit 078effc3559a9ae73c72e0d3d5a75c375dba88ab Author: Anders Knutsen Date: Wed Jul 23 10:56:08 2025 +0200 V1 diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/.idea/PBXActToInflux.iml b/.idea/PBXActToInflux.iml new file mode 100644 index 0000000..f571432 --- /dev/null +++ b/.idea/PBXActToInflux.iml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..db8786c --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..e959b7a --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..ab72972 --- /dev/null +++ b/config.yaml @@ -0,0 +1,20 @@ +logging: + tofile: false # Use "true" to redirect output to a file. + filename: pbxsync.log # select filename of the logfile. + level: INFO # Set loglevel + format: "%(asctime)s [%(levelname)s] %(message)s" #Set log format +general: + interval: 10 #seconds +influx: + url: http://192.168.146.251:8086 + secret: + org: Alarm24 + bucket: PBXAct +mariadb: + host: 192.168.130.9 + database: asteriskcdrdb + user: influx + password: + + + diff --git a/pbxtoinfux.py b/pbxtoinfux.py new file mode 100644 index 0000000..0e1871f --- /dev/null +++ b/pbxtoinfux.py @@ -0,0 +1,119 @@ +import mysql.connector +from influxdb_client import InfluxDBClient, Point, WritePrecision +from datetime import datetime +import pytz +import time +import logging +import yaml + +# Load YAML config +with open("config.yaml", "r") as f: + config = yaml.safe_load(f) + +# Extract logging config +log_config = config.get("logging", {}) +log_tofile = log_config.get("tofile", "true") +log_file = log_config.get("filename", "safey.log") +log_level = getattr(logging, log_config.get("level", "INFO").upper(), logging.INFO) +log_format = log_config.get("format", "%(asctime)s [%(levelname)s] %(message)s") + +general_config = config.get("general", {}) +influx_config = config.get("influx", {}) +mariadb_config = config.get("mariadb", {}) + +# Apply logging config +if log_config.get("tofile"): + logging.basicConfig( + filename=log_file, + level=log_level, + format=log_format, + ) +else: + logging.basicConfig( + level=log_level, + format=log_format, + ) + +logging.info(f"Logging config: {log_config}") + +# PBXact MariaDB settings +db_config = { + 'user': mariadb_config.get("user"), + 'password': mariadb_config.get("password"), + 'host': mariadb_config.get("host"), + 'database': mariadb_config.get("database"), +} + +# InfluxDB settings +influx_url = influx_config.get("url") +influx_token = influx_config.get("secret") +influx_org = influx_config.get("org") +influx_bucket = influx_config.get("bucket") + +# Fetch last todays calls from MariaDB +def fetch_cdrs(): + conn = mysql.connector.connect(**db_config) + cursor = conn.cursor(dictionary=True) + today = datetime.today().strftime('%Y-%m-%d') + query = f""" + SELECT calldate, src, dst, disposition, duration, did, channel, outbound_cnum, lastapp + FROM cdr + WHERE calldate >= '{today} 00:00:00' + ORDER BY calldate DESC + """ + cursor.execute(query) + results = cursor.fetchall() + cursor.close() + conn.close() + return results + +# Write to InfluxDB +def write_to_influx(calls): + client = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org, verify_ssl=False) + write_api = client.write_api() + cet = pytz.timezone('Europe/Oslo') + counter = 0 + for call in calls: + try: + dt_cest = cet.localize(datetime.strptime(str(call['calldate']), "%Y-%m-%d %H:%M:%S")) + dt_utc = dt_cest.astimezone(pytz.utc) + ts_ns = int(dt_utc.timestamp() * 1_000_000_000) + counter = counter + 1 + logging.debug(f""" + ### + Result: {counter}, + Caller: {call['src']}, + Dest: {call['dst']}, + Duration: {call['duration']}, + App: {call['lastapp']}, + Channel: {call['channel']}, + time: {call['calldate']} CEST / {dt_utc} UTC / {ts_ns} UNIX + ###""") + point = ( + Point("pbx_call") + .tag("src", call["src"]) + .tag("dst", call["dst"]) + .tag("disposition", call["disposition"]) + .tag("did", call["did"]) + .tag("channel", call["channel"]) + .tag("outbound_cnum", call["outbound_cnum"]) + .tag("lastapp", call["lastapp"]) + .field("duration", int(call["duration"])) + .time(ts_ns) + ) + write_api.write(bucket=influx_bucket, org=influx_org, record=point) + except Exception as e: + logging.info("Error with call:", call, "Error:", e) + logging.info(f"""Result: {counter}""") + client.close() + write_api.close() + + +# Run +if __name__ == "__main__": + while True: + logging.info("Updating call history.") + calls = fetch_cdrs() + write_to_influx(calls) + logging.info(f"Sleeping for {general_config.get("interval")} seconds...") + time.sleep(general_config.get("interval")) diff --git a/pbxtoinfux_all.py b/pbxtoinfux_all.py new file mode 100644 index 0000000..850e1c9 --- /dev/null +++ b/pbxtoinfux_all.py @@ -0,0 +1,119 @@ +import mysql.connector +from influxdb_client import InfluxDBClient, Point, WritePrecision +from datetime import datetime +import pytz +import time +import logging +import yaml + +# Load YAML config +with open("config.yaml", "r") as f: + config = yaml.safe_load(f) + +# Extract logging config +log_config = config.get("logging", {}) +log_tofile = log_config.get("tofile", "true") +log_file = log_config.get("filename", "safey.log") +log_level = getattr(logging, log_config.get("level", "INFO").upper(), logging.INFO) +log_format = log_config.get("format", "%(asctime)s [%(levelname)s] %(message)s") + +general_config = config.get("general", {}) +influx_config = config.get("influx", {}) +mariadb_config = config.get("mariadb", {}) + +# Apply logging config +if log_config.get("tofile"): + logging.basicConfig( + filename=log_file, + level=log_level, + format=log_format, + ) +else: + logging.basicConfig( + level=log_level, + format=log_format, + ) + +logging.info(f"Logging config: {log_config}") + +# PBXact MariaDB settings +db_config = { + 'user': mariadb_config.get("user"), + 'password': mariadb_config.get("password"), + 'host': mariadb_config.get("host"), + 'database': mariadb_config.get("database"), +} + +# InfluxDB settings +influx_url = influx_config.get("url") +influx_token = influx_config.get("secret") +influx_org = influx_config.get("org") +influx_bucket = influx_config.get("bucket") + +# Fetch last todays calls from MariaDB +def fetch_cdrs(): + conn = mysql.connector.connect(**db_config) + cursor = conn.cursor(dictionary=True) + today = datetime.today().strftime('%Y-%m-%d') + query = f""" + SELECT calldate, src, dst, disposition, duration, did, channel, outbound_cnum, lastapp + FROM cdr + #WHERE calldate >= '{today} 00:00:00' + ORDER BY calldate DESC + """ + cursor.execute(query) + results = cursor.fetchall() + cursor.close() + conn.close() + return results + +# Write to InfluxDB +def write_to_influx(calls): + client = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org, verify_ssl=False) + write_api = client.write_api() + cet = pytz.timezone('Europe/Oslo') + counter = 0 + for call in calls: + try: + dt_cest = cet.localize(datetime.strptime(str(call['calldate']), "%Y-%m-%d %H:%M:%S")) + dt_utc = dt_cest.astimezone(pytz.utc) + ts_ns = int(dt_utc.timestamp() * 1_000_000_000) + counter = counter + 1 + logging.debug(f""" + ### + Result: {counter}, + Caller: {call['src']}, + Dest: {call['dst']}, + Duration: {call['duration']}, + App: {call['lastapp']}, + Channel: {call['channel']}, + time: {call['calldate']} CEST / {dt_utc} UTC / {ts_ns} UNIX + ###""") + point = ( + Point("pbx_call") + .tag("src", call["src"]) + .tag("dst", call["dst"]) + .tag("disposition", call["disposition"]) + .tag("did", call["did"]) + .tag("channel", call["channel"]) + .tag("outbound_cnum", call["outbound_cnum"]) + .tag("lastapp", call["lastapp"]) + .field("duration", int(call["duration"])) + .time(ts_ns) + ) + write_api.write(bucket=influx_bucket, org=influx_org, record=point) + except Exception as e: + logging.info("Error with call:", call, "Error:", e) + logging.info(f"""Result: {counter}""") + client.close() + write_api.close() + + +# Run +if __name__ == "__main__": + while True: + logging.info("Updating call history.") + calls = fetch_cdrs() + write_to_influx(calls) + logging.info(f"Sleeping for {general_config.get("interval")} seconds...") + time.sleep(general_config.get("interval"))