Addressing Performance Bottlenecks: Implementing SQS + Python for Efficient Queue-Worker Services
Introduction
If you are looking to relieve pressure from your main application the use of background workers with a Queue system works really well.
In this tutorial, we will create a simple implementation of AWS SQS that you can easily use with existing python code. At the end, you should be able to put a python function as job to queue and have a background worker process the job.
The process may seem tedious but once we set it up, we can then insert any python function into the queue as jobs and the worker will execute any of them so please bear with me!
Use Cases
Here are some use cases that may benefit with queue system:
If your use case falls within the above scenarios or is related or you really think would benefit from queue then Let's get started!
Queue Client Configuration
Queue connection file
The python file here will server as our global client to send/receive queue messages
pip install boto3
import boto3
sqs_client = boto3.client(
'sqs',
aws_access_key_id='ACCESS_KEY',
aws_secret_access_key='SECRET_ACCESS_KEY',
)
QUEUE_URL = 'SQS_QUEUE_URL' # Replace with the URL of your SQS queue
Prepare IAM User
Here we will create an IAM user that will give us credentials to be used by the application to access the queue
Recommended by LinkedIn
Create SQS Queue
Create the Worker
Now we will create the worker file and the functions that we want to enqueue. For simplicity purposes, we defined all functions we want to enqueue together with the queue process function so that it both resides on same namespace and have the worker access the function easily
import json
from queue_client import sqs_client, QUEUE_URL
# a simple function that prints whatever message we give it
def display_message(message):
print(f'your message is: {message}')
# a simple function that takes the sum of two values
def add_values(value_1, value_2):
print(f'your sum is {value_1 + value_2}')
# this function converts the function name into python function object inside the global scope of this file/module
# there are other ways to do it but this is the simplest I can think of to run function based on its name
def execute_function_by_name(func_name, *args, **kwargs):
if func_name in globals() and callable(globals()[func_name]):
print(func_name)
return globals()[func_name](*args, **kwargs)
else:
return f"No function named '{func_name}' found."
# this function will fetch data from queue and execution the function with given parameters
def worker():
# infinte loop to keep on fetching from queue and process it
while True:
# fetch messages
response = sqs_client.receive_message(
QueueUrl=QUEUE_URL,
MaxNumberOfMessages=10, # adjust as needed
WaitTimeSeconds=10, # adjust as needed
MessageAttributeNames=['All'],
)
# process each message
for message in response.get('Messages', []):
print(f"\r\nReceived message: function: {message['Body']} :: parameters: {message['MessageAttributes']}")
# get the function name
function_name = message['MessageAttributes']['function']['StringValue']
kwargs = json.loads(message['MessageAttributes']['kwargs']['StringValue'])
print(function_name)
print(kwargs)
execute_function_by_name(function_name, **kwargs)
sqs_client.delete_message(
QueueUrl=QUEUE_URL,
ReceiptHandle=message['ReceiptHandle']
)
else:
print("No messages to process.")
if __name__ == "__main__":
worker()
Insert jobs to queue
Here we will create functions that can insert python functions and its parameters as jobs to the queue
import json
from queue_client import sqs_client, QUEUE_URL
from worker import display_message, add_values # notice that we imported the function from the worker, this is so we can import its reference
# this function will insert a function as message into the queue
def enqueue(function_name, kwargs={}):
response = sqs_client.send_message(
QueueUrl=QUEUE_URL,
MessageBody=function_name.__name__,
MessageAttributes={
'function': {
'DataType': 'String',
'StringValue': function_name.__name__
},
'kwargs': {
'DataType': 'String',
'StringValue': json.dumps(kwargs)
}
},
)
print(f'{function_name} added to queue')
return response['MessageId']
if __name__ == "__main__":
# insert the display message as job
enqueue(display_message, kwargs={"message": "hello"})
# insert the add values as job
enqueue(add_values, kwargs={"value_1": 1, "value_2": 2})
Test it out
python enqueue.py
python worker.py
Conclusion
That's it! We were able to create a python queue service with AWS SQS. Using this as baseline, we can keep adding more functions that we can simply insert into the queue using the enqueue function and the worker will always process it