307 lines
8.9 KiB
Python
307 lines
8.9 KiB
Python
#!/usr/bin/env python3
|
|
import random
|
|
import string
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
|
|
import requests
|
|
|
|
# ================== CONFIG ==================
|
|
|
|
BASE_URL = "http://10.181.149.220:8081" # through nginx
|
|
API_KEY = "_2sW6roe2ZQ4V6Cldo0v295fakHT8vBHqHScfliX445tZuzxDwMRqjPeCE7FDcVVr" # your real API key
|
|
|
|
# Auth style: choose one of these:
|
|
USE_X_API_KEY = False # send X-API-Key header
|
|
USE_BEARER = True # send Authorization: Bearer <API_KEY>
|
|
|
|
# <<< IMPORTANT >>>
|
|
# List of existing 4-digit client IDs you want to mutate.
|
|
# Fill this with IDs you know exist (created by your previous test/import).
|
|
TARGET_CLIENT_IDS = [
|
|
# Example: 1234, 5678, 9012
|
|
]
|
|
|
|
# If TARGET_CLIENT_IDS is empty, we fallback to random 4-digit IDs (you'll get some 404s)
|
|
FALLBACK_RANDOM_RANGE = (1235, 1244)
|
|
|
|
# How aggressive should this be?
|
|
SLEEP_BETWEEN_OPS = 10 # seconds between operations
|
|
|
|
# Relative probabilities of operations
|
|
OP_WEIGHTS = {
|
|
"update_client": 0.4,
|
|
"create_or_update_user": 0.3,
|
|
"create_or_update_zone": 0.3,
|
|
}
|
|
|
|
# ============================================
|
|
|
|
|
|
def auth_headers():
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
}
|
|
if USE_X_API_KEY:
|
|
headers["X-API-Key"] = API_KEY
|
|
if USE_BEARER:
|
|
headers["Authorization"] = f"Bearer {API_KEY}"
|
|
return headers
|
|
|
|
|
|
def pick_client_id() -> int:
|
|
if TARGET_CLIENT_IDS:
|
|
return random.choice(TARGET_CLIENT_IDS)
|
|
return random.randint(*FALLBACK_RANDOM_RANGE)
|
|
|
|
|
|
def random_phone():
|
|
return "+47" + "".join(random.choice("0123456789") for _ in range(8))
|
|
|
|
|
|
def random_email(name_stub: str):
|
|
domains = ["example.com", "test.com", "mailinator.com", "demo.net"]
|
|
clean = "".join(c for c in name_stub.lower() if c.isalnum())
|
|
return f"{clean or 'user'}@{random.choice(domains)}"
|
|
|
|
|
|
def random_string(prefix: str, length: int = 8):
|
|
tail = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(length))
|
|
return f"{prefix}{tail}"
|
|
|
|
|
|
def random_client_info():
|
|
# Random-ish basic client fields
|
|
name = random_string("Client_", 5)
|
|
alias = random_string("AL", 4)
|
|
street_no = random.randint(1, 200)
|
|
street_name = random.choice(["Gate", "Veien", "Stien", "Allé"])
|
|
location = f"{random.choice(['Test', 'Demo', 'Fake'])} {street_name} {street_no}"
|
|
|
|
area_code = str(random.randint(1000, 9999))
|
|
area = random.choice(["Oslo", "Sarpsborg", "Bergen", "Trondheim", "Stavanger"])
|
|
|
|
bus_phone = random_phone()
|
|
email = random_email(name)
|
|
|
|
ok_password = random_string("pwd_", 6)
|
|
spec_request = random.choice(
|
|
[
|
|
"Script-endret testkunde.",
|
|
"Oppdatert spesialinstruks fra load-test.",
|
|
"Ingen reelle tiltak, kun test.",
|
|
]
|
|
)
|
|
|
|
no_sigs_mon = random.choice(["ActiveAny", "Disabled"])
|
|
since_days = random.randint(0, 7)
|
|
since_hrs = random.randint(0, 23)
|
|
since_mins = random.randint(0, 59)
|
|
|
|
reset_ignored = random.choice([True, False])
|
|
reset_days = random.randint(0, 14)
|
|
reset_hrs = random.randint(0, 23)
|
|
reset_mins = random.randint(0, 59)
|
|
|
|
install_date = (datetime.now() - timedelta(days=random.randint(0, 365))).date().isoformat()
|
|
|
|
panel_name = random.choice(["Ajax", "Future"])
|
|
panel_site = random.choice(["Stue", "Gang", "Kontor", "Lager"])
|
|
keypad_location = random.choice(["Inngang", "Bakdør", "Garasje", "2. etg"])
|
|
sp_page = random.choice(
|
|
[
|
|
"Ekstra info endret av load-test.",
|
|
"Test entry, kan ignoreres.",
|
|
"API performance-test.",
|
|
]
|
|
)
|
|
|
|
return {
|
|
"Name": name,
|
|
"Alias": alias,
|
|
"Location": location,
|
|
"area_code": area_code,
|
|
"area": area,
|
|
"BusPhone": bus_phone,
|
|
"Email": email,
|
|
"OKPassword": ok_password,
|
|
"SpecRequest": spec_request,
|
|
"NoSigsMon": no_sigs_mon,
|
|
"SinceDays": since_days,
|
|
"SinceHrs": since_hrs,
|
|
"SinceMins": since_mins,
|
|
"ResetNosigsIgnored": reset_ignored,
|
|
"ResetNosigsDays": reset_days,
|
|
"ResetNosigsHrs": reset_hrs,
|
|
"ResetNosigsMins": reset_mins,
|
|
"InstallDateTime": install_date,
|
|
"PanelName": panel_name,
|
|
"PanelSite": panel_site,
|
|
"KeypadLocation": keypad_location,
|
|
"SPPage": sp_page,
|
|
}
|
|
|
|
|
|
def random_user_payload(existing_user_no: int | None = None):
|
|
first_names = ["Anders", "Per", "Lise", "Kari", "Ole", "Nina"]
|
|
last_names = ["Knutsen", "Olsen", "Hansen", "Johansen", "Pedersen"]
|
|
name = f"{random.choice(first_names)} {random.choice(last_names)}"
|
|
|
|
mobile = random_phone()
|
|
email = random_email(name.replace(" ", "."))
|
|
|
|
user_type = "U"
|
|
instructions = random.choice(
|
|
[
|
|
"Oppdatert instrukser.",
|
|
"Ring ved alarm.",
|
|
"Kun SMS.",
|
|
"Kontakt vaktmester først.",
|
|
]
|
|
)
|
|
|
|
call_order = random.randint(0, 3)
|
|
mobile_order = random.randint(1, 3)
|
|
|
|
if existing_user_no is None:
|
|
user_no = random.randint(1, 5)
|
|
else:
|
|
user_no = existing_user_no
|
|
|
|
return {
|
|
"User_Name": name,
|
|
"MobileNo": mobile,
|
|
"MobileNoOrder": mobile_order,
|
|
"Email": email,
|
|
"Type": user_type,
|
|
"UserNo": user_no,
|
|
"Instructions": instructions,
|
|
"CallOrder": call_order,
|
|
}
|
|
|
|
|
|
def random_zone_payload(existing_zone_no: int | None = None):
|
|
zone_names = [
|
|
"Stue",
|
|
"Kjøkken",
|
|
"Gang",
|
|
"Soverom",
|
|
"Garasje",
|
|
"Kontor",
|
|
"Lager",
|
|
"Uteområde",
|
|
]
|
|
if existing_zone_no is None:
|
|
zone_no = random.randint(1, 20)
|
|
else:
|
|
zone_no = existing_zone_no
|
|
|
|
zone_text = random.choice(zone_names) + " " + random.choice(["1", "2", "A", "B"])
|
|
return {"ZoneNo": zone_no, "ZoneText": zone_text}
|
|
|
|
|
|
def do_update_client(client_id: int):
|
|
url = f"{BASE_URL}/clients"
|
|
payload = {
|
|
"client_id": client_id,
|
|
"info": random_client_info(),
|
|
}
|
|
r = requests.put(url, json=payload, headers=auth_headers(), timeout=10)
|
|
print(f"[update_client] client_id={client_id} -> {r.status_code}")
|
|
if r.status_code >= 400:
|
|
print(f" body: {r.text[:500]}")
|
|
return r.status_code
|
|
|
|
|
|
def do_create_or_update_user(client_id: int):
|
|
url = f"{BASE_URL}/users"
|
|
# Randomly choose new or existing user no
|
|
if random.random() < 0.5:
|
|
user_no = random.randint(1, 3) # likely already created by your first script
|
|
else:
|
|
user_no = random.randint(4, 10) # maybe new user
|
|
|
|
user_payload = random_user_payload(existing_user_no=user_no)
|
|
payload = {
|
|
"client_id": client_id,
|
|
"user": user_payload,
|
|
}
|
|
r = requests.post(url, json=payload, headers=auth_headers(), timeout=10)
|
|
print(
|
|
f"[user] client_id={client_id} UserNo={user_no} -> {r.status_code}"
|
|
)
|
|
if r.status_code >= 400:
|
|
print(f" body: {r.text[:500]}")
|
|
return r.status_code
|
|
|
|
|
|
def do_create_or_update_zone(client_id: int):
|
|
url = f"{BASE_URL}/zones"
|
|
# Same trick: sometimes hit likely existing zones, sometimes new
|
|
if random.random() < 0.5:
|
|
zone_no = random.randint(1, 10)
|
|
else:
|
|
zone_no = random.randint(11, 30)
|
|
|
|
zone_payload = random_zone_payload(existing_zone_no=zone_no)
|
|
payload = {
|
|
"client_id": client_id,
|
|
"zone": zone_payload,
|
|
}
|
|
r = requests.post(url, json=payload, headers=auth_headers(), timeout=10)
|
|
print(
|
|
f"[zone] client_id={client_id} ZoneNo={zone_no} -> {r.status_code}"
|
|
)
|
|
if r.status_code >= 400:
|
|
print(f" body: {r.text[:500]}")
|
|
return r.status_code
|
|
|
|
|
|
def pick_operation():
|
|
ops = list(OP_WEIGHTS.keys())
|
|
weights = [OP_WEIGHTS[o] for o in ops]
|
|
return random.choices(ops, weights=weights, k=1)[0]
|
|
|
|
|
|
def main():
|
|
if not TARGET_CLIENT_IDS:
|
|
print(
|
|
"WARNING: TARGET_CLIENT_IDS is empty.\n"
|
|
" Script will use random 4-digit client IDs and you may see many 404s."
|
|
)
|
|
else:
|
|
print(f"Targeting these client IDs: {TARGET_CLIENT_IDS}")
|
|
|
|
print("Starting random modification loop. Press Ctrl+C to stop.\n")
|
|
|
|
op_count = 0
|
|
try:
|
|
while True:
|
|
client_id = pick_client_id()
|
|
op = pick_operation()
|
|
|
|
print(f"\n=== op #{op_count} ===")
|
|
print(f"Client: {client_id}, operation: {op}")
|
|
|
|
try:
|
|
if op == "update_client":
|
|
do_update_client(client_id)
|
|
elif op == "create_or_update_user":
|
|
do_create_or_update_user(client_id)
|
|
elif op == "create_or_update_zone":
|
|
do_create_or_update_zone(client_id)
|
|
else:
|
|
print(f"Unknown operation {op}, skipping.")
|
|
except requests.RequestException as e:
|
|
print(f"HTTP error: {e}")
|
|
|
|
op_count += 1
|
|
time.sleep(SLEEP_BETWEEN_OPS)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nStopping on user request (Ctrl+C).")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|