Mutex and Thread safety in Multithread Programs

Mutex and Thread safety in Multithread Programs

What is a mutex lock?

Mutex stands for Mutual Exclusion. It is a tool used in programming to prevent multiple threads from accessing a shared resource at the same time.

Why is it needed?

If two different threads want to add items to a shared list at the same time, the data may be overridden, corrupted or crash the application. That’s why we need to lock the shared resources when one thread is using them.

Suppose a public toilet without a door lock is accessed by multiple people. It will be chaos. But if the door lock is present, only one person can use the toilet at a time.

Now, in the following code, I have implemented a queue with multithreading. In Python, multithreading is limited by GIL (Global Interpreter Lock), which restricts CPython to use multithreading for some specific tasks and ensures only one thread executes Python bytecode at a time. So, I have made a few tweaks intentionally to show the race condition.

import threading

class UnsafeQueue:
    def __init__(self):
        self.items = []

    def enqueue(self, item):
        self.items.append(item)
        time.sleep(0.00001)  # Force race condition

    def dequeue(self):
        if self.items:
            time.sleep(0.00001)  # Create race window
            return self.items.pop(0)
        return None

    def size(self):
        return len(self.items)


q = UnsafeQueue()

def producer():
    for i in range(50_000):
        q.enqueue(i)

def consumer():
    success = 0
    while success < 30_000:
        item = q.dequeue()
        if item is not None:
            success += 1


threads = [
    threading.Thread(target=producer),
    threading.Thread(target=producer),
    threading.Thread(target=consumer),
    threading.Thread(target=consumer),
]

for t in threads:
    t.start()
for t in threads:
    t.join()

print("Final size of queue (expected: 40,000):", q.size())        

What this code does is in two threads in a shared list 1 to 50 thousand integers are put. So, in total, 1 lakh integers are put, and similarly, two more threads are dequeing the list by 30 thousand integers, totalling it to 60 thousand. Thus, the remaining numbers left in the queue should be 40 thousand. But due to no mutex locking, multiple threads may try to access the same resource, resulting in a wrong output, or the deque method may try to remove an element from an empty list, resulting in an error.

To rectify this situation, we must wrap the methods with a lock so that the same resource can not be used by multiple threads at the same time. Please follow the corrected code below.

 import threading

class SafeQueue:
    def __init__(self):
        self.items = []
        self.lock = threading.Lock()

    def enqueue(self, item):
        with self.lock:
            self.items.append(item)
            time.sleep(0.00001)

    def dequeue(self):
        with self.lock:
            if self.items:
                time.sleep(0.00001)
                return self.items.pop(0)
            return None

    def size(self):
        with self.lock:
            return len(self.items)


q = SafeQueue()

def producer():
    for i in range(50000):
        q.enqueue(i)

def consumer():
    success = 0
    while success < 30000:
        item = q.dequeue()
        if item is not None:
            success += 1


threads = [
    threading.Thread(target=producer),
    threading.Thread(target=producer),
    threading.Thread(target=consumer),
    threading.Thread(target=consumer),
]

for t in threads:
    t.start()
for t in threads:
    t.join()

print("Final size of queue (expected: 40,000):", q.size())        

As we can see, the self.lock (mutex lock) is used to wrap the methods that are shared between the threads, which ensures the correct result every time.

#TechIsInteresting

To view or add a comment, sign in

More articles by Abhirup Chandra

Others also viewed

Explore content categories