Skip to main content

Writing Stress Tests

Cicada's ultimate goal is to answer this question: At what point does my system break. In this tutorial, we'll discover how to use Cicada's powerful scaling features to push the demo API to it's limit.

Creating the Test

The key to this test is the built-in load model ramp_users_to_threshold. This will allow the scenario to continue adding users until a certain limit is reached. To keep this test short, the limit will be when the average response time reaches 100ms.

Adding an Aggregator

Before we start, we need a function to aggregate runtimes from the users. The threshold function will look at the aggregate of user results to determine if it has reached the threshold or not.

...
from cicadad.core.decorators import (
scenario,
result_aggregator
)
...

def runtime_aggregator(previous_aggregate, latest_results):
if previous_aggregate is None:
num_results = 0
mean_ms = 0
else:
num_results = previous_aggregate["num_results"]
mean_ms = previous_aggregate["mean_ms"]

runtimes = []

for result in latest_results:
if result.exception is None:
runtimes.append(result.output)

if runtimes != []:
latest_num_results = len(runtimes)
latest_mean_ms = statistics.mean(runtimes)

new_num_results = num_results + latest_num_results
new_mean = (
(mean_ms * num_results) + (latest_mean_ms * latest_num_results)
) / (num_results + latest_num_results)
else:
new_num_results = num_results
new_mean = mean_ms

return {
"num_results": new_num_results,
"mean_ms": new_mean,
}

@scenario(engine)
@result_aggregator(runtime_aggregator)
def post_user(context):
start = datetime.now()

requests.post(
url="http://demo-api:8080/users",
json={
"name": "jeremy",
"age": 23,
"email": f"{str(uuid.uuid4())[:8]}@gmail.com",
},
)

end = datetime.now()

return ((end - start).seconds + (end - start).microseconds / 1000000) * 1000

This aggregator will find the mean duration of the latest results and combine it with the mean of all the previous results.

Adding the Load Model

Next, add the ramp_users_to_threshold load model. The load model function will start with 10 users and add 5 users every scale up period. Finally, it will check the aggregate if it has gone over 100 ms.

...
from cicadad.core.decorators import (
scenario,
load_model,
user_loop,
result_aggregator
)
from cicadad.core.scenario import ramp_users_to_threshold, while_alive
...

@scenario(engine)
@load_model(
ramp_users_to_threshold(
initial_users=10,
threshold_fn=lambda agg: agg is not None and agg["mean_ms"] > 100,
next_users_fn=lambda n: n + 5,
)
)
@user_loop(while_alive())
@result_aggregator(runtime_aggregator)
def post_user(context):
start = datetime.now()

requests.post(
url="http://demo-api:8080/users",
json={
"name": "jeremy",
"age": 23,
"email": f"{str(uuid.uuid4())[:8]}@gmail.com",
},
)

end = datetime.now()

return ((end - start).seconds + (end - start).microseconds / 1000000) * 1000

Run the test cicada-distributed run and wait for it to complete. On the machine this was tested on, the API was able to handle 40 users before the average response rate went over 100 ms.