Table of contents
Background
So at the enterprise I’m working with, we are consuming messages from Confluent Cloud topics. These messages should end up in S3 to be available for further processing. Due to enterprise regulations, (network technicalities, but also allowed services) we can not consume messages from Confluent Cloud via serverless options like EventBridge Pipes. Also, we can not use managed solutions provided by Confluent Cloud (SAAS) such as the s3 sink connectors, due to our network setup.
So long story short, we are back to using my good old friend EC2 with a custom consumer Python script. The consumer Python script we use for consuming messages from Confluent Cloud can be found on the Confluent Cloud Developer website.
To make it more robust on the EC2 instance, we have created the EC2 instance inside an AutoScaling group. This makes sure when a glitch in the Matrix occurs, the EC2 will be recreated and starts running the consumer Python script again.
The second step was to create a systemctl service for the consumer Python script, so that when stopping the EC2 instance the Kafka consumer Python script closes the connection gracefully towards Confluent Cloud, and commits the messages.
Real-world scenario
When we scaled up the number of messages on the topics, we noticed that our consumer Python script on the EC2 instance was running behind with processing messages. The so-called consumer lag.
After looking into the issue, the problem was found and pinpointed to:
the number of messages, quantity
the API call length of storing an object in S3 (put_object)
So how can we fix this? Well almost all services in AWS do some type of scaling. Is it horizontally or vertically, often there is a solution available. Where horizontally is adding more instances vs vertically are up scaling to bigger instances.
But looking into AutoScaling, vertically or horizontally, unfortunately, doesn’t cut it for the consumer Python script. AutoScaling gives us problems with maintaining the states of the consumer, the so-called offset. Where horizontally is not needed because we have enough compute and memory available with our current EC2 instance class.
So fixing and tackling the number of messages for now is not possible. This got us looking into problem number two. We will try to optimize the API call length which is the main reason our consumer is lagging.
Performance testing
So how can we optimise this API behaviour? Well, first we need numbers. So let's create a test which creates 1000 unique JSON messages and store these messages in S3.
import boto3
import json
import time
s3 = boto3.client("s3", region_name="eu-central-1")
bucket_name = "my_test_bucket"
s3_start_time = time.time()
for i in range(1000):
message_body = {"message_id": i, "message_content": f"This is message {i}"}
object_key = f"message_{i}.json"
object_body = json.dumps(message_body)
s3.put_object(Bucket=bucket_name, Key=object_key, Body=object_body)
s3_end_time = time.time()
s3_elapsed_time = s3_end_time - s3_start_time
print(f"Elapsed time S3 copy: {s3_elapsed_time:.2f} seconds")
Running the above Python code results in 73 seconds of elapsed time. So this is our baseline to beat.
What can we do to optimize? Maybe sent the messages first to SQS so we then can bulk-process messages with scalable Lambda functions? So let’s test how fast SQS is for accepting messages. Appending our code block above with the following SQS testing:
sqs = boto3.resource("sqs", region_name="eu-central-1")
queue_name = "my_test_sqs_queue"
queue = sqs.get_queue_by_name(QueueName=queue_name)
sqs_start_time = time.time()
for i in range(1000):
message_body = {"message_id": i, "message_content": f"This is message {i}"}
response = queue.send_message(MessageBody=json.dumps(message_body))
sqs_end_time = time.time()
sqs_elapsed_time = sqs_end_time - sqs_start_time
print(f"Elapsed time SQS: {sqs_elapsed_time:.2f} seconds")
percentage = (s3_elapsed_time / sqs_elapsed_time) * 100
print(f"S3 time: {s3_elapsed_time} is {percentage:.2f}% slower then SQS time: {sqs_elapsed_time}")
So let's run it again:
(.venv) ➜ consumer git:(feature/a_ec2_love_story) ✗ python3 ./test_performance.py
Elapsed time SQS: 34.14 seconds
Elapsed time S3 copy: 73.57 seconds
S3 time: 73.57360005378723 is 215.51% slower then SQS time: 34.139643907547
Wow, that is a difference of 215%. But still no real excited milliseconds latency per message here. So how can we optimize some more? Well, what about parallelism?
Parallelism is according to Wikipedia:
Parallel computing is a type of computation in which many calculations or processes are carried out simultaneously
So basically speeding up by running actions in parallel. So what if we store the objects in S3 again but use parallelism to store items in parallel instead of sequential? Ok, let’s update our code with threads.
import boto3
import json
import time
import concurrent.futures
s3 = boto3.client("s3", region_name="eu-central-1")
bucket_name = "my_test_bucket"
def upload_message(i):
message_body = {"message_id": i, "message_content": f"This is message {i}"}
object_key = f"message_{i}.json"
object_body = json.dumps(message_body)
s3.put_object(Bucket=bucket_name, Key=object_key, Body=object_body)
s3_parallelism_start_time = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for i in range(1000):
futures.append(executor.submit(upload_message, i))
s3_parallelism_end_time = time.time()
s3_parallelism_elapsed_time = s3_parallelism_end_time - s3_parallelism_start_time
print(f"Elapsed time S3_parallelism: {s3_parallelism_elapsed_time:.2f} seconds")
So now we are back to 5,11 seconds for 1000 messages. That are more the numbers we are talking about. But what about SQS? is it faster to write messages to SQS than to S3? Adding the code to the previous block:
sqs = boto3.resource("sqs", region_name="eu-central-1")
queue_name = "my_test_sqs_queue"
queue = sqs.get_queue_by_name(QueueName=queue_name)
def upload_sqs_message(i):
message_body = {"message_id": i, "message_content": f"This is message {i}"}
response = queue.send_message(MessageBody=json.dumps(message_body))
sqs_parallelism_start_time = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for i in range(10000):
futures.append(executor.submit(upload_sqs_message, i))
sqs_parallelism_end_time = time.time()
sqs_parallelism_elapsed_time = sqs_parallelism_end_time - sqs_parallelism_start_time
print(f"Elapsed time SQS_parallelism: {sqs_parallelism_elapsed_time:.2f} seconds")
percentage = (s3_parallelism_elapsed_time / sqs_parallelism_elapsed_time) * 100
print(f"S3 time: {s3_parallelism_elapsed_time} is {percentage:.2f}% slower then SQS time: {sqs_parallelism_elapsed_time}")
The result
So running the Python test_performance.py script with parallelism gives us the following results:
(.venv) ➜ consumer git:(feature/a_ec2_love_story) ✗ python3 ./test_performance.py
Elapsed time S3_parallelism: 4.93 seconds
Elapsed time SQS_parallelism: 2.26 seconds
S3 time: 4.93480110168457 is 218.22% slower then SQS time: 2.261399030685425
Compared to non-parallelism the difference between SQS and S3 stays around 220% difference. But the difference between non-parallelism and parallelism is 10x faster with parallelism-enabled code.
Is this the final solution for our consumer Python script? No, it isn't. There is more code optimisation possible. But for now, a simple change, by adding parallelism to our put_objects will make our code 10x faster and will be sufficient to not get consumer lag.