Cicada

Cicada

  • Docs
  • Guides
  • API
  • Runners
  • Help

›Guides

Introduction

  • Quickstart
  • Tests
  • State
  • Actions and Asserts
  • Runners
  • Reports

Guides

  • REST API Guide
  • Async App Guide
  • gRPC Server Guide
  • Kubernetes Guide

API

  • Test
  • Action
  • Assert
  • Config
  • Test Engine
  • Verification

Runners

  • REST Runner
  • SQL Runner
  • Kafka Runner
  • S3 Runner
  • gRPC Runner

Async App Guide

In this guide, you'll create tests for a service that receives messages from a Kafka server and modifies files on S3.

Source code

App

First, we'll need to create a Kafka server and S3 compatible data store. We'll use minio as the S3 server and docker-compose to start them:

docker-compose.yml

version: '3'
services:
  zookeeper:
    image: bitnami/zookeeper:latest
    ports:
      - 2181:2181
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: bitnami/kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
      - 29092:29092
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
  s3:
    image: minio/minio:latest
    ports:
      - 9000:9000
    command: server /data
    environment:
      - MINIO_REGION_NAME=local
      - MINIO_ACCESS_KEY=EXAMPLE_ACCESS_KEY
      - MINIO_SECRET_KEY=EXAMPLE_SECRET_KEY

Run docker-compose up to start the services and make sure all 3 are running.

Next, we'll need to create a client that connects to Kafka and S3:

app.py

import time

import boto3
import kafka
from kafka.errors import NoBrokersAvailable


def main():
    consumer = None
    producer = None
    sleep_time = 1

    # Exponential backoff to connect to brokers
    for i in range(5):
        try:
            consumer = kafka.KafkaConsumer("inbound-files", bootstrap_servers=["kafka:9092"])
            producer = kafka.KafkaProducer(bootstrap_servers=["kafka:9092"])
            break
        except NoBrokersAvailable:
            print(f"waiting {sleep_time} seconds for brokers")
            time.sleep(sleep_time)
            sleep_time *= 2

    if consumer is None or producer is None:
        raise RuntimeError("Timed out waiting for message brokers")

    print("Connected to brokers!")

    s3_client = boto3.client(
        "s3",
        region_name="local",
        endpoint_url="http://s3:9000",
        aws_access_key_id="EXAMPLE_ACCESS_KEY",
        aws_secret_access_key="EXAMPLE_SECRET_KEY",
    )


if __name__ == "__main__":
    main()
kafka-python==2.0.1
boto3
pyyaml

Dockerfile

FROM python:3.8-slim-buster

WORKDIR /app

ADD requirements.txt .
RUN pip install -r requirements.txt

ADD app.py .

ENTRYPOINT ["python", "-u", "app.py"]

docker-compose.yml

services:
  ...
  service:
    build: .
    depends_on:
      - kafka

This app connects to Kafka and S3 specified in the docker-compose.yml file and exits. Run docker-compose up again and ensure it exits successfully.

Tests

Once we can confirm the app can connect to Kafka and S3, it's time to begin sending messages to it. First, add the following to the app:

app.py

...
def main()
    ...
    s3_client = ...

    # consume message off stream and process
    for message in consumer:
        print(
            "%s:%d:%d: key=%s value=%s"
            % (
                message.topic,
                message.partition,
                message.offset,
                message.key,
                message.value,
            )
        )

        producer.send("outbound-files", key=bytes(file_key, "utf-8"))
    ...

This will print the contents of a message received in the stream and send a message to a stream called outbound-files with the same message key.

For our test, we will be sending 3 messages to the stream in a batch and checking that the app processes them successfully.

One way to specify the messages is through the globals section of the state container defined before the test begins. Create a directory called test_data and add a file called initial.json to it:

initial.json

{
  "globals": {
    "transform_files": ["file_a", "file_b", "file_c"]
  }
}

Next, use the globals section (The structure is completely up to the user) to create a test that sends the messages:

test.cicada.yaml

description: Example file transform test
tests:
  - name: send-messages
    description: Send a message to service
    runner: kafka-runner
    config:
      servers: kafka:9092
    template: >
      actions:
        {% for tf in state["globals"]["transform_files"] %}
        - type: Send
          params:
            topic: inbound-files
            messages:
              - key: {{ tf }}
        {% endfor %}
      asserts:
        {% for tf in state["globals"]["transform_files"] %}
        - type: FindMessage
          params:
            actionParams:
              topic: outbound-files
            expected:
              key: {{ tf }}
        {% endfor %}

Using the template section of the test, we can create 3 actions and asserts for the transform_files to send and receive messages from different streams.

Finally, add the following to the docker compose file:

docker-compose.yml

services:
  ...
  cicada:
    image: cicadatesting/cicada-2-engine
    environment:
      - CONTAINER_NETWORK=file_transform_service_default
      - INITIAL_STATE_FILE=/initial-data/initial.json
      - WORKDIR=${WORKDIR}
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - .:/tests
      - ./reports:/reports
      - ./test_data:/initial-data
    depends_on:
      - service

This will start cicada with the other services and mount the initial data file in the cicada container as well as tell it to load into state.

Run docker-compose up again and ensure cicada runs successfully (by checking the report in reports/report.md)

Business Logic

Let's add some more functionality to the app. For this use case, we want the app to receive a message through Kafka, locate a file in S3 with that file key, and convert it from YAML to JSON.

Add the following to the app:

app.py

import json
import yaml

...

def main():
    ...
    for message in consumer:
        ...
        file_key = message.key.decode("utf-8")

        # get file from s3 by key
        file = s3_client.get_object(
            Bucket="file-transform-service", Key=f"{file_key}.yaml"
        )

        contents = file["Body"].read().decode("utf-8")

        # transform file contents (yaml format) to JSON
        mapping = yaml.safe_load(contents)

        # save new file contents and send completed message
        s3_client.put_object(
            Bucket="file-transform-service",
            Key=f"{file_key}.json",
            Body=bytes(json.dumps(mapping), "utf-8"),
        )
...

Next, we'll want to seed S3 to make sure there are YAML files available to the app. We can do this by uploading them through Cicada.

First, create 3 files, file_a.yaml, file_b.yaml, and file_c.yaml. They should follow this format:

file_{x}.yaml

foo_{x}: bar_{x}
fizz_{x}: buzz_{x}

Next, use Cicada to upload them (place before send-messages):

test.cicada.yaml

tests:
  - name: seed
    description: create test bucket
    runner: s3-runner
    config:
      accessKeyID: EXAMPLE_ACCESS_KEY
      secretAccessKey: EXAMPLE_SECRET_KEY
      region: local
      endpointURL: http://s3:9000
    template: >
      volumes:
        - source: {{ getenv("WORKDIR") }}/test_data
          destination: /test_data
      actions:
        - type: cb
          params:
            bucketName: file-transform-service
        {% for tf in state["globals"]["transform_files"] %}
        - type: put
          params:
            sourcePath: /test_data/{{ tf }}.yaml
            destinationPath: s3://file-transform-service/{{ tf }}.yaml
        {% endfor %}
    ...

This will mount the tests into the runner's test_data folder (location can be anywhere user specified) and creates the bucket as well as 3 upload actions for each of the files.

Finally, we'll want to check that the app creates JSON versions of the input files. One way of doing this is to create 3 of the expected output files and compare the ones created to these controls.

Create the following files: test_a.json, test_b.json, test_c.json. They should follow this format:

test_{x}.json

{"foo_{x}": "bar_{x}", "fizz_{x}": "buzz_{x}"}

Finally, add an assertion to the test to compare the files:

test.cicada.yaml

tests:
  ...

  - name: check-file-transform
    description: Check that file has been updated
    runner: s3-runner
    config:
      accessKeyID: EXAMPLE_ACCESS_KEY
      secretAccessKey: EXAMPLE_SECRET_KEY
      region: local
      endpointURL: http://s3:9000
    template: >
      volumes:
        - source: {{ getenv("WORKDIR") }}/test_data
          destination: /test_data
      asserts:
        {% for tf in state["globals"]["transform_files"] %}
        - type: FilesEqual
          params:
            path: s3://file-transform-service/{{ tf }}.json
            expected: /test_data/{{ tf }}.json
        {% endfor %}
    dependencies:
      - send-messages

This test will use the template to create 3 FilesEqual asserts, one for each of the expected files.

Once ready, run docker-compose up. The asserts should pass, indicating that the files were uploaded and transformed. You can also double check the files using the Minio UI.

Excellent! We're almost there! The last step is to clean up our bucket. Simply add the following to the test file:

test.cicada.yaml

tests:
    ...
  - name: teardown
    description: Delete temporary S3 bucket
    runner: s3-runner
    config:
      accessKeyID: EXAMPLE_ACCESS_KEY
      secretAccessKey: EXAMPLE_SECRET_KEY
      region: local
      endpointURL: http://s3:9000
    actions:
      - type: rm
        params:
          path: s3://file-transform-service
          recursive: true
    dependencies:
      - check-file-transform

This will recursively delete objects in the bucket we created, file-transform-service, including the bucket itself

Once verifying the test finishes successfully and the bucket was removed in the Minio UI, pat yourself on the back! You successfully created and tested an asynchronous app!

← REST API GuidegRPC Server Guide →
  • App
  • Tests
  • Business Logic
Cicada
Docs
Getting StartedAPI Reference
More
GitHubStar
Copyright © 2021 Cicada