PBXActToInflux/pbxtoinfux.py

120 lines
3.8 KiB
Python

import mysql.connector
from influxdb_client import InfluxDBClient, Point, WritePrecision
from datetime import datetime
import pytz
import time
import logging
import yaml
# Load YAML config
with open("config.yaml", "r") as f:
config = yaml.safe_load(f)
# Extract logging config
log_config = config.get("logging", {})
log_tofile = log_config.get("tofile", "true")
log_file = log_config.get("filename", "safey.log")
log_level = getattr(logging, log_config.get("level", "INFO").upper(), logging.INFO)
log_format = log_config.get("format", "%(asctime)s [%(levelname)s] %(message)s")
general_config = config.get("general", {})
influx_config = config.get("influx", {})
mariadb_config = config.get("mariadb", {})
# Apply logging config
if log_config.get("tofile"):
logging.basicConfig(
filename=log_file,
level=log_level,
format=log_format,
)
else:
logging.basicConfig(
level=log_level,
format=log_format,
)
logging.info(f"Logging config: {log_config}")
# PBXact MariaDB settings
db_config = {
'user': mariadb_config.get("user"),
'password': mariadb_config.get("password"),
'host': mariadb_config.get("host"),
'database': mariadb_config.get("database"),
}
# InfluxDB settings
influx_url = influx_config.get("url")
influx_token = influx_config.get("secret")
influx_org = influx_config.get("org")
influx_bucket = influx_config.get("bucket")
# Fetch last todays calls from MariaDB
def fetch_cdrs():
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor(dictionary=True)
today = datetime.today().strftime('%Y-%m-%d')
query = f"""
SELECT calldate, src, dst, disposition, duration, did, channel, outbound_cnum, lastapp
FROM cdr
WHERE calldate >= '{today} 00:00:00'
ORDER BY calldate DESC
"""
cursor.execute(query)
results = cursor.fetchall()
cursor.close()
conn.close()
return results
# Write to InfluxDB
def write_to_influx(calls):
client = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org, verify_ssl=False)
write_api = client.write_api()
cet = pytz.timezone('Europe/Oslo')
counter = 0
for call in calls:
try:
dt_cest = cet.localize(datetime.strptime(str(call['calldate']), "%Y-%m-%d %H:%M:%S"))
dt_utc = dt_cest.astimezone(pytz.utc)
ts_ns = int(dt_utc.timestamp() * 1_000_000_000)
counter = counter + 1
logging.debug(f"""
###
Result: {counter},
Caller: {call['src']},
Dest: {call['dst']},
Duration: {call['duration']},
App: {call['lastapp']},
Channel: {call['channel']},
time: {call['calldate']} CEST / {dt_utc} UTC / {ts_ns} UNIX
###""")
point = (
Point("pbx_call")
.tag("src", call["src"])
.tag("dst", call["dst"])
.tag("disposition", call["disposition"])
.tag("did", call["did"])
.tag("channel", call["channel"])
.tag("outbound_cnum", call["outbound_cnum"])
.tag("lastapp", call["lastapp"])
.field("duration", int(call["duration"]))
.time(ts_ns)
)
write_api.write(bucket=influx_bucket, org=influx_org, record=point)
except Exception as e:
logging.info("Error with call:", call, "Error:", e)
logging.info(f"""Result: {counter}""")
client.close()
write_api.close()
# Run
if __name__ == "__main__":
while True:
logging.info("Updating call history.")
calls = fetch_cdrs()
write_to_influx(calls)
logging.info(f"Sleeping for {general_config.get("interval")} seconds...")
time.sleep(general_config.get("interval"))