120 lines
3.8 KiB
Python
120 lines
3.8 KiB
Python
import mysql.connector
|
|
from influxdb_client import InfluxDBClient, Point, WritePrecision
|
|
from datetime import datetime
|
|
import pytz
|
|
import time
|
|
import logging
|
|
import yaml
|
|
|
|
# Load YAML config
|
|
with open("config.yaml", "r") as f:
|
|
config = yaml.safe_load(f)
|
|
|
|
# Extract logging config
|
|
log_config = config.get("logging", {})
|
|
log_tofile = log_config.get("tofile", "true")
|
|
log_file = log_config.get("filename", "safey.log")
|
|
log_level = getattr(logging, log_config.get("level", "INFO").upper(), logging.INFO)
|
|
log_format = log_config.get("format", "%(asctime)s [%(levelname)s] %(message)s")
|
|
|
|
general_config = config.get("general", {})
|
|
influx_config = config.get("influx", {})
|
|
mariadb_config = config.get("mariadb", {})
|
|
|
|
# Apply logging config
|
|
if log_config.get("tofile"):
|
|
logging.basicConfig(
|
|
filename=log_file,
|
|
level=log_level,
|
|
format=log_format,
|
|
)
|
|
else:
|
|
logging.basicConfig(
|
|
level=log_level,
|
|
format=log_format,
|
|
)
|
|
|
|
logging.info(f"Logging config: {log_config}")
|
|
|
|
# PBXact MariaDB settings
|
|
db_config = {
|
|
'user': mariadb_config.get("user"),
|
|
'password': mariadb_config.get("password"),
|
|
'host': mariadb_config.get("host"),
|
|
'database': mariadb_config.get("database"),
|
|
}
|
|
|
|
# InfluxDB settings
|
|
influx_url = influx_config.get("url")
|
|
influx_token = influx_config.get("secret")
|
|
influx_org = influx_config.get("org")
|
|
influx_bucket = influx_config.get("bucket")
|
|
|
|
# Fetch last todays calls from MariaDB
|
|
def fetch_cdrs():
|
|
conn = mysql.connector.connect(**db_config)
|
|
cursor = conn.cursor(dictionary=True)
|
|
today = datetime.today().strftime('%Y-%m-%d')
|
|
query = f"""
|
|
SELECT calldate, src, dst, disposition, duration, did, channel, outbound_cnum, lastapp
|
|
FROM cdr
|
|
WHERE calldate >= '{today} 00:00:00'
|
|
ORDER BY calldate DESC
|
|
"""
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
cursor.close()
|
|
conn.close()
|
|
return results
|
|
|
|
# Write to InfluxDB
|
|
def write_to_influx(calls):
|
|
client = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org, verify_ssl=False)
|
|
write_api = client.write_api()
|
|
cet = pytz.timezone('Europe/Oslo')
|
|
counter = 0
|
|
for call in calls:
|
|
try:
|
|
dt_cest = cet.localize(datetime.strptime(str(call['calldate']), "%Y-%m-%d %H:%M:%S"))
|
|
dt_utc = dt_cest.astimezone(pytz.utc)
|
|
ts_ns = int(dt_utc.timestamp() * 1_000_000_000)
|
|
counter = counter + 1
|
|
logging.debug(f"""
|
|
###
|
|
Result: {counter},
|
|
Caller: {call['src']},
|
|
Dest: {call['dst']},
|
|
Duration: {call['duration']},
|
|
App: {call['lastapp']},
|
|
Channel: {call['channel']},
|
|
time: {call['calldate']} CEST / {dt_utc} UTC / {ts_ns} UNIX
|
|
###""")
|
|
point = (
|
|
Point("pbx_call")
|
|
.tag("src", call["src"])
|
|
.tag("dst", call["dst"])
|
|
.tag("disposition", call["disposition"])
|
|
.tag("did", call["did"])
|
|
.tag("channel", call["channel"])
|
|
.tag("outbound_cnum", call["outbound_cnum"])
|
|
.tag("lastapp", call["lastapp"])
|
|
.field("duration", int(call["duration"]))
|
|
.time(ts_ns)
|
|
)
|
|
write_api.write(bucket=influx_bucket, org=influx_org, record=point)
|
|
except Exception as e:
|
|
logging.info("Error with call:", call, "Error:", e)
|
|
logging.info(f"""Result: {counter}""")
|
|
client.close()
|
|
write_api.close()
|
|
|
|
|
|
# Run
|
|
if __name__ == "__main__":
|
|
while True:
|
|
logging.info("Updating call history.")
|
|
calls = fetch_cdrs()
|
|
write_to_influx(calls)
|
|
logging.info(f"Sleeping for {general_config.get("interval")} seconds...")
|
|
time.sleep(general_config.get("interval"))
|