Addressing Performance Bottlenecks: Implementing SQS + Python for Efficient Queue-Worker Services

Addressing Performance Bottlenecks: Implementing SQS + Python for Efficient Queue-Worker Services

Introduction

If you are looking to relieve pressure from your main application the use of background workers with a Queue system works really well.

In this tutorial, we will create a simple implementation of AWS SQS that you can easily use with existing python code. At the end, you should be able to put a python function as job to queue and have a background worker process the job.

The process may seem tedious but once we set it up, we can then insert any python function into the queue as jobs and the worker will execute any of them so please bear with me!

Use Cases

Here are some use cases that may benefit with queue system:

  1. High-Traffic Websites: During traffic surges, websites can become sluggish or even crash. Queued processing ensures tasks like data writing or notifications are handled in the background, maintaining site responsiveness.
  2. E-commerce Transactions: During sales or peak hours, e-commerce platforms witness a surge in transaction requests. By queuing payment processing tasks, the system can handle each transaction systematically, reducing errors and improving user experience.
  3. Batch Processing: Handling vast amounts of data at once can be resource-intensive. Breaking down large tasks into smaller queued jobs ensures efficient processing without overloading the system.
  4. Real-time Analytics: Analyzing high-volume real-time data can strain the main application. Queues can offload data analytics tasks to background workers, ensuring the primary system remains nimble while still deriving actionable insights.
  5. Non Response Essential Writes: A lot of times, an api call may include writes to multiple table or multiple record insertion that is really not required in the response. By moving these write actions or queries to the queue, your api response can be drastically improved.

If your use case falls within the above scenarios or is related or you really think would benefit from queue then Let's get started!

Queue Client Configuration

Queue connection file

The python file here will server as our global client to send/receive queue messages

  1. Install the aws sdk

pip install boto3        

  1. Create a file named queue_client.py and paste the following content:

import boto3

sqs_client = boto3.client(
    'sqs',
    aws_access_key_id='ACCESS_KEY',
    aws_secret_access_key='SECRET_ACCESS_KEY',
)

QUEUE_URL = 'SQS_QUEUE_URL'  # Replace with the URL of your SQS queue        

  1. Proceed to next step to get the SQS_QUEUE_URL

Prepare IAM User

Here we will create an IAM user that will give us credentials to be used by the application to access the queue

  1. Go to https://us-east-1.console.aws.amazon.com/iamv2/home?region=us-east-1#/users and click "Create User"
  2. Under username let's add "queue_user". Remember the queue_user as we will use it later in setup of queue
  3. Click Next until you reach the end (skip other settings as we do not need it) then Create the user
  4. Now under the list of user select "queue_user"
  5. Go to security credentials and find "Access Keys"

Article content

  1. Click on Create access key and Select Command Line Interface
  2. Click Next then Create access Key then Download the .csv file
  3. Replace the ACCESS_KEY and SECRET_ACCESS_KEY on queue_client.py using the values shown on your screen now

Article content

Create SQS Queue

  1. Go to https://us-east-1.console.aws.amazon.com/sqs/v2/home?region=us-east-1#/queues
  2. Click on Create Queue

Article content

  1. Put "test" under name
  2. Scroll down to Access policy and choose Basic then select the "Only the specified AWS accounts, IAM users and roles" on both options

Article content

  1. Then under the text boxes below each of "Only the specified AWS accounts, IAM users and roles", copy the AWS ARN of the User we created above, you can see it under IAM -> Users -> queue_user -> Permissions -> Summary -> ARN

Article content

  1. Now create the Queue
  2. After creation copy the URL from the SQS Queue Details to the SQS_QUEUE_URL on the queue_client.py

Create the Worker

Now we will create the worker file and the functions that we want to enqueue. For simplicity purposes, we defined all functions we want to enqueue together with the queue process function so that it both resides on same namespace and have the worker access the function easily

  1. Create a file named worker.py
  2. Paste the code below

import json
from queue_client import sqs_client, QUEUE_URL


# a simple function that prints whatever message we give it
def display_message(message):
    print(f'your message is: {message}')


# a simple function that takes the sum of two values
def add_values(value_1, value_2):
    print(f'your sum is {value_1 + value_2}')

# this function converts the function name into python function object inside the global scope of this file/module
# there are other ways to do it but this is the simplest I can think of to run function based on its name
def execute_function_by_name(func_name, *args, **kwargs):
    if func_name in globals() and callable(globals()[func_name]):
        print(func_name)
        return globals()[func_name](*args, **kwargs)
    else:
        return f"No function named '{func_name}' found."

# this function will fetch data from queue and execution the function with given parameters
def worker():
    # infinte loop to keep on fetching from queue and process it
    while True:
        # fetch messages
        response = sqs_client.receive_message(
            QueueUrl=QUEUE_URL,
            MaxNumberOfMessages=10,  # adjust as needed
            WaitTimeSeconds=10,  # adjust as needed
            MessageAttributeNames=['All'],
        )
        
        # process each message
        for message in response.get('Messages', []):
            print(f"\r\nReceived message: function: {message['Body']} :: parameters: {message['MessageAttributes']}")
            
            # get the function name
            function_name = message['MessageAttributes']['function']['StringValue']
            kwargs = json.loads(message['MessageAttributes']['kwargs']['StringValue'])
            print(function_name)
            print(kwargs)
            execute_function_by_name(function_name, **kwargs)
            sqs_client.delete_message(
                QueueUrl=QUEUE_URL,
                ReceiptHandle=message['ReceiptHandle']
            )
        else:
            print("No messages to process.")


if __name__ == "__main__":
    worker()        

Insert jobs to queue

Here we will create functions that can insert python functions and its parameters as jobs to the queue

  1. Create a file named enqueue.py
  2. Paste the following code into the file

import json
from queue_client import sqs_client, QUEUE_URL
from worker import display_message, add_values  # notice that we imported the function from the worker, this is so we can import its reference

# this function will insert a function as message into the queue
def enqueue(function_name, kwargs={}):
    response = sqs_client.send_message(
        QueueUrl=QUEUE_URL,
        MessageBody=function_name.__name__,
        MessageAttributes={
            'function': {
                'DataType': 'String',
                'StringValue': function_name.__name__
            },
            'kwargs': {
                'DataType': 'String',
                'StringValue': json.dumps(kwargs)
            }
        },
    )
    print(f'{function_name} added to queue')
    return response['MessageId']


if __name__ == "__main__":
    # insert the display message as job
    enqueue(display_message, kwargs={"message": "hello"})
    # insert the add values as job
    enqueue(add_values, kwargs={"value_1": 1, "value_2": 2})        

Test it out

  1. Go to terminal
  2. Insert the messages to queue

python enqueue.py         

  1. Go to your sqs and you should now see Messages Available
  2. Open another terminal tab or window
  3. Now we will execute the jobs inside the queue

python worker.py         

  1. You should see that it processed the jobs
  2. If you go back to the enqueue terminal and keep on running the enqueue.py, you can see that the worker will also process new jobs

Conclusion

That's it! We were able to create a python queue service with AWS SQS. Using this as baseline, we can keep adding more functions that we can simply insert into the queue using the enqueue function and the worker will always process it

To view or add a comment, sign in

More articles by The Cloud Fleet

Others also viewed

Explore content categories