This commit is contained in:
Anders Knutsen 2025-07-23 10:56:08 +02:00
commit 078effc355
9 changed files with 296 additions and 0 deletions

3
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
# Default ignored files
/shelf/
/workspace.xml

8
.idea/PBXActToInflux.iml Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.12" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
.idea/misc.xml Normal file
View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.12" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/PBXActToInflux.iml" filepath="$PROJECT_DIR$/.idea/PBXActToInflux.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

20
config.yaml Normal file
View File

@ -0,0 +1,20 @@
logging:
tofile: false # Use "true" to redirect output to a file.
filename: pbxsync.log # select filename of the logfile.
level: INFO # Set loglevel
format: "%(asctime)s [%(levelname)s] %(message)s" #Set log format
general:
interval: 10 #seconds
influx:
url: http://192.168.146.251:8086
secret:
org: Alarm24
bucket: PBXAct
mariadb:
host: 192.168.130.9
database: asteriskcdrdb
user: influx
password:

119
pbxtoinfux.py Normal file
View File

@ -0,0 +1,119 @@
import mysql.connector
from influxdb_client import InfluxDBClient, Point, WritePrecision
from datetime import datetime
import pytz
import time
import logging
import yaml
# Load YAML config
with open("config.yaml", "r") as f:
config = yaml.safe_load(f)
# Extract logging config
log_config = config.get("logging", {})
log_tofile = log_config.get("tofile", "true")
log_file = log_config.get("filename", "safey.log")
log_level = getattr(logging, log_config.get("level", "INFO").upper(), logging.INFO)
log_format = log_config.get("format", "%(asctime)s [%(levelname)s] %(message)s")
general_config = config.get("general", {})
influx_config = config.get("influx", {})
mariadb_config = config.get("mariadb", {})
# Apply logging config
if log_config.get("tofile"):
logging.basicConfig(
filename=log_file,
level=log_level,
format=log_format,
)
else:
logging.basicConfig(
level=log_level,
format=log_format,
)
logging.info(f"Logging config: {log_config}")
# PBXact MariaDB settings
db_config = {
'user': mariadb_config.get("user"),
'password': mariadb_config.get("password"),
'host': mariadb_config.get("host"),
'database': mariadb_config.get("database"),
}
# InfluxDB settings
influx_url = influx_config.get("url")
influx_token = influx_config.get("secret")
influx_org = influx_config.get("org")
influx_bucket = influx_config.get("bucket")
# Fetch last todays calls from MariaDB
def fetch_cdrs():
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor(dictionary=True)
today = datetime.today().strftime('%Y-%m-%d')
query = f"""
SELECT calldate, src, dst, disposition, duration, did, channel, outbound_cnum, lastapp
FROM cdr
WHERE calldate >= '{today} 00:00:00'
ORDER BY calldate DESC
"""
cursor.execute(query)
results = cursor.fetchall()
cursor.close()
conn.close()
return results
# Write to InfluxDB
def write_to_influx(calls):
client = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org, verify_ssl=False)
write_api = client.write_api()
cet = pytz.timezone('Europe/Oslo')
counter = 0
for call in calls:
try:
dt_cest = cet.localize(datetime.strptime(str(call['calldate']), "%Y-%m-%d %H:%M:%S"))
dt_utc = dt_cest.astimezone(pytz.utc)
ts_ns = int(dt_utc.timestamp() * 1_000_000_000)
counter = counter + 1
logging.debug(f"""
###
Result: {counter},
Caller: {call['src']},
Dest: {call['dst']},
Duration: {call['duration']},
App: {call['lastapp']},
Channel: {call['channel']},
time: {call['calldate']} CEST / {dt_utc} UTC / {ts_ns} UNIX
###""")
point = (
Point("pbx_call")
.tag("src", call["src"])
.tag("dst", call["dst"])
.tag("disposition", call["disposition"])
.tag("did", call["did"])
.tag("channel", call["channel"])
.tag("outbound_cnum", call["outbound_cnum"])
.tag("lastapp", call["lastapp"])
.field("duration", int(call["duration"]))
.time(ts_ns)
)
write_api.write(bucket=influx_bucket, org=influx_org, record=point)
except Exception as e:
logging.info("Error with call:", call, "Error:", e)
logging.info(f"""Result: {counter}""")
client.close()
write_api.close()
# Run
if __name__ == "__main__":
while True:
logging.info("Updating call history.")
calls = fetch_cdrs()
write_to_influx(calls)
logging.info(f"Sleeping for {general_config.get("interval")} seconds...")
time.sleep(general_config.get("interval"))

119
pbxtoinfux_all.py Normal file
View File

@ -0,0 +1,119 @@
import mysql.connector
from influxdb_client import InfluxDBClient, Point, WritePrecision
from datetime import datetime
import pytz
import time
import logging
import yaml
# Load YAML config
with open("config.yaml", "r") as f:
config = yaml.safe_load(f)
# Extract logging config
log_config = config.get("logging", {})
log_tofile = log_config.get("tofile", "true")
log_file = log_config.get("filename", "safey.log")
log_level = getattr(logging, log_config.get("level", "INFO").upper(), logging.INFO)
log_format = log_config.get("format", "%(asctime)s [%(levelname)s] %(message)s")
general_config = config.get("general", {})
influx_config = config.get("influx", {})
mariadb_config = config.get("mariadb", {})
# Apply logging config
if log_config.get("tofile"):
logging.basicConfig(
filename=log_file,
level=log_level,
format=log_format,
)
else:
logging.basicConfig(
level=log_level,
format=log_format,
)
logging.info(f"Logging config: {log_config}")
# PBXact MariaDB settings
db_config = {
'user': mariadb_config.get("user"),
'password': mariadb_config.get("password"),
'host': mariadb_config.get("host"),
'database': mariadb_config.get("database"),
}
# InfluxDB settings
influx_url = influx_config.get("url")
influx_token = influx_config.get("secret")
influx_org = influx_config.get("org")
influx_bucket = influx_config.get("bucket")
# Fetch last todays calls from MariaDB
def fetch_cdrs():
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor(dictionary=True)
today = datetime.today().strftime('%Y-%m-%d')
query = f"""
SELECT calldate, src, dst, disposition, duration, did, channel, outbound_cnum, lastapp
FROM cdr
#WHERE calldate >= '{today} 00:00:00'
ORDER BY calldate DESC
"""
cursor.execute(query)
results = cursor.fetchall()
cursor.close()
conn.close()
return results
# Write to InfluxDB
def write_to_influx(calls):
client = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org, verify_ssl=False)
write_api = client.write_api()
cet = pytz.timezone('Europe/Oslo')
counter = 0
for call in calls:
try:
dt_cest = cet.localize(datetime.strptime(str(call['calldate']), "%Y-%m-%d %H:%M:%S"))
dt_utc = dt_cest.astimezone(pytz.utc)
ts_ns = int(dt_utc.timestamp() * 1_000_000_000)
counter = counter + 1
logging.debug(f"""
###
Result: {counter},
Caller: {call['src']},
Dest: {call['dst']},
Duration: {call['duration']},
App: {call['lastapp']},
Channel: {call['channel']},
time: {call['calldate']} CEST / {dt_utc} UTC / {ts_ns} UNIX
###""")
point = (
Point("pbx_call")
.tag("src", call["src"])
.tag("dst", call["dst"])
.tag("disposition", call["disposition"])
.tag("did", call["did"])
.tag("channel", call["channel"])
.tag("outbound_cnum", call["outbound_cnum"])
.tag("lastapp", call["lastapp"])
.field("duration", int(call["duration"]))
.time(ts_ns)
)
write_api.write(bucket=influx_bucket, org=influx_org, record=point)
except Exception as e:
logging.info("Error with call:", call, "Error:", e)
logging.info(f"""Result: {counter}""")
client.close()
write_api.close()
# Run
if __name__ == "__main__":
while True:
logging.info("Updating call history.")
calls = fetch_cdrs()
write_to_influx(calls)
logging.info(f"Sleeping for {general_config.get("interval")} seconds...")
time.sleep(general_config.get("interval"))