Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
httpx==0.27.0
matplotlib==3.9.2
pydantic==2.7.1
# reasoner_pydantic==4.1.6
setproctitle==1.3.3
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setup(
name="sri-test-harness",
version="0.6.4",
version="0.6.5",
author="Max Wang",
author_email="max@covar.com",
url="https://github.com/TranslatorSRI/TestHarness",
Expand Down
5 changes: 5 additions & 0 deletions test_harness/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ def main(args):
"json",
collector.performance_stats,
)
for filename, content in collector.render_performance_artifacts():
try:
slacker.upload_binary_file(filename, content)
except Exception as e:
logger.warning(f"Failed to upload perf artifact {filename}: {e}")

logger.info("Finishing up test run...")
reporter.finish_test_run()
Expand Down
120 changes: 120 additions & 0 deletions test_harness/perf_plots.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""Render Locust-style time-series charts from a stats_history snapshot."""

from __future__ import annotations

import io
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional, Sequence, Tuple

import matplotlib

matplotlib.use("Agg")

import matplotlib.dates as mdates
import matplotlib.pyplot as plt

logger = logging.getLogger(__name__)


def _parse_timestamp(value: Any) -> Optional[datetime]:
if isinstance(value, (int, float)):
return datetime.utcfromtimestamp(value)
if isinstance(value, str):
# Locust uses ISO 8601 with a trailing 'Z' for UTC.
text = value.rstrip("Z")
try:
return datetime.fromisoformat(text)
except ValueError:
return None
return None


def _series(
history: Sequence[Dict[str, Any]], key: str
) -> Tuple[List[datetime], List[float]]:
"""Pull a (times, values) pair out of one stats_history series.

Each entry's value is a ``[timestamp, value]`` pair, but we prefer the
snapshot-level ``time`` so all series share the same X axis.
"""
times: List[datetime] = []
values: List[float] = []
for row in history:
ts = _parse_timestamp(row.get("time"))
entry = row.get(key)
if ts is None or not isinstance(entry, (list, tuple)) or len(entry) < 2:
continue
value = entry[1]
if value is None:
continue
times.append(ts)
values.append(float(value))
return times, values


def _percentile_keys(history: Sequence[Dict[str, Any]]) -> List[str]:
"""Return percentile series keys present in the snapshots, sorted."""
keys = set()
for row in history:
for key in row.keys():
if key.startswith("response_time_percentile_"):
keys.add(key)
return sorted(keys, key=lambda k: float(k.split("_")[-1]))


def render_history_png(history: Sequence[Dict[str, Any]], title: str) -> bytes:
"""Render a Locust-style three-panel chart for the given history.

Panels (top to bottom):
1. Total RPS, with failures/sec overlaid.
2. Response time percentiles in milliseconds.
3. Active user count.

Returns the PNG bytes. Caller is responsible for handling empty history;
we raise if there isn't enough data to plot.
"""
if len(history) < 2:
raise ValueError("history too short to plot")

fig, (ax_rps, ax_rt, ax_users) = plt.subplots(
nrows=3, ncols=1, figsize=(10, 12), sharex=True
)

rps_times, rps_values = _series(history, "current_rps")
fail_times, fail_values = _series(history, "current_fail_per_sec")
ax_rps.plot(rps_times, rps_values, color="#2ca02c", label="RPS")
ax_rps.plot(fail_times, fail_values, color="#d62728", label="Failures/s")
ax_rps.set_ylabel("Requests / s")
ax_rps.set_title("Total Requests per Second")
ax_rps.grid(True, alpha=0.3)
ax_rps.legend(loc="upper right")

percentile_colors = ["#1f77b4", "#ff7f0e", "#9467bd", "#8c564b"]
for color, key in zip(percentile_colors, _percentile_keys(history)):
times, values = _series(history, key)
label = "p" + key.split("_")[-1].replace("0.", "")
ax_rt.plot(times, values, color=color, label=label)
ax_rt.set_ylabel("Milliseconds")
ax_rt.set_title("Response Times")
ax_rt.grid(True, alpha=0.3)
ax_rt.legend(loc="upper right")

user_times, user_values = _series(history, "user_count")
ax_users.step(user_times, user_values, color="#1f77b4", where="post")
ax_users.set_ylabel("Users")
ax_users.set_title("Number of Users")
ax_users.set_xlabel("Time (UTC)")
ax_users.grid(True, alpha=0.3)

ax_users.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S"))
fig.autofmt_xdate()
fig.suptitle(title)
fig.tight_layout(rect=(0, 0, 1, 0.97))

buf = io.BytesIO()
try:
fig.savefig(buf, format="png", dpi=110, bbox_inches="tight")
finally:
plt.close(fig)
return buf.getvalue()
79 changes: 61 additions & 18 deletions test_harness/performance_test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import logging
import time
from typing import Dict
from typing import Dict, List

import gevent
from gevent import GreenletExit
from locust import HttpUser, LoadTestShape, task
from locust.env import Environment
from locust.html import get_html_report
from locust.stats import stats_history, stats_printer
from translator_testing_model.datamodel.pydanticmodel import (
AcceptanceTestAsset,
Expand All @@ -20,7 +21,6 @@
from test_harness.runner.generate_query import generate_query
from test_harness.runner.query_runner import QueryRunner, env_map


# Custom request_type values used to distinguish layers of the test in stats.
# Locust groups stats by (method, name); using these as the "method" lets us
# pull each layer out cleanly in the result collector.
Expand All @@ -32,6 +32,10 @@
# one row per parent_pk.
SUBMIT_NAME = "submit_query"
POLL_NAME = "poll_status"
# The /trace poll only returns status metadata; the final TRAPI message lives
# at the merged_version PK. Fetch it on completion so we can record the size
# of the actual response.
MERGED_FETCH_NAME = "fetch_merged"

# Per-outcome names for the end-to-end QUERY event. Distinct names give us
# a count for each outcome directly out of Locust's stats serialization.
Expand Down Expand Up @@ -106,9 +110,7 @@ def send_query(self):
len(response.content) if response.content else 0
)
else:
failure_reason = (
f"Got a bad response: {response.status_code}"
)
failure_reason = f"Got a bad response: {response.status_code}"
response.failure(failure_reason)
except GreenletExit:
outcome = ARA_QUERY_FAILED
Expand All @@ -125,6 +127,29 @@ def send_query(self):
)

class ARSUser(HttpUser):
def _fetch_merged_size(self, merged_pk, trace_response) -> int:
"""Pull the actual final-response byte size for a completed query.

Falls back to the trace response's content length if the merged
message can't be fetched, so the QUERY event still records a size.
"""
fallback = len(trace_response.content) if trace_response.content else 0
if not merged_pk:
return fallback
with self.client.get(
f"/ars/api/messages/{merged_pk}",
catch_response=True,
name=MERGED_FETCH_NAME,
) as merged_res:
if merged_res.status_code != 200:
merged_res.failure(
f"Failed to fetch merged {merged_pk}: "
f"{merged_res.status_code}"
)
return fallback
merged_res.success()
return len(merged_res.content) if merged_res.content else 0

@task
def send_query(self):
query_started = time.time()
Expand Down Expand Up @@ -162,9 +187,7 @@ def send_query(self):
while True:
if remaining_test_time() <= 0:
outcome = OUTCOME_ABANDONED
failure_reason = (
f"Test ended while polling {parent_pk}"
)
failure_reason = f"Test ended while polling {parent_pk}"
return

with self.client.get(
Expand All @@ -185,20 +208,17 @@ def send_query(self):
try:
res = response.json()
except ValueError:
failure_reason = (
f"Non-JSON poll body for {parent_pk}"
)
failure_reason = f"Non-JSON poll body for {parent_pk}"
outcome = OUTCOME_POLLING_FAILED
return

status = res.get("status")
if status == "Done":
outcome = OUTCOME_COMPLETED
failure_reason = None
response_length = (
len(response.content)
if response.content
else 0
response_length = self._fetch_merged_size(
res.get("merged_version"),
response,
)
return
if status == "Error":
Expand All @@ -210,9 +230,7 @@ def send_query(self):
sleep_for = min(POLL_INTERVAL_SECONDS, remaining_test_time())
if sleep_for <= 0:
outcome = OUTCOME_ABANDONED
failure_reason = (
f"Test ended while polling {parent_pk}"
)
failure_reason = f"Test ended while polling {parent_pk}"
return
time.sleep(sleep_for)
except GreenletExit:
Expand Down Expand Up @@ -244,6 +262,20 @@ def send_query(self):
env = Environment(user_classes=[user_class], host=host, shape_class=TestShape())
runner = env.create_local_runner()

# Capture per-query response sizes (one entry per query, keyed by
# outcome name) so the report can flag cases where queries reported the
# same status but came back with different payload sizes.
query_response_sizes: Dict[str, List[int]] = {}

def _record_query_size(
request_type, name, response_time, response_length, exception, context, **kwargs
):
if request_type != QUERY_TYPE:
return
query_response_sizes.setdefault(name, []).append(response_length or 0)

env.events.request.add_listener(_record_query_size)

# Start stats printer
gevent.spawn(stats_printer(env.stats))
gevent.spawn(stats_history, runner)
Expand All @@ -260,12 +292,23 @@ def send_query(self):

print("Done with locust testing!")

try:
summary_html = get_html_report(env, show_download_link=False)
except Exception as e:
logging.getLogger(__name__).warning(
"Failed to render Locust HTML report: %s", e
)
summary_html = None

return {
"stats": env.stats.serialize_stats(),
"failures": env.stats.serialize_errors(),
"test_run_time": test_run_time,
"spawn_rate": spawn_rate,
"target": target,
"query_response_sizes": query_response_sizes,
"stats_history": list(env.runner.stats.history),
"summary_html": summary_html,
}


Expand Down
Loading
Loading