Thread-Safe Queue - Two Serious Errors

Thread-Safe Queue - Two Serious Errors

This post is a cross-post from www.ModernesCpp.com.

In my last post "Monitor Object" I implemented a thread-safe queue. I made two serious errors. Sorry. Today, I will fix these issues.

First, I want to show you again the erroneous implementation from my last post to understand the context.

// monitorObject.cpp

#include <condition_variable>
#include <functional>
#include <queue>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>

class Monitor {
public:
    void lock() const {
        monitMutex.lock();
    }

    void unlock() const {
        monitMutex.unlock();
    }

    void notify_one() const noexcept {
        monitCond.notify_one();
    }

    template <typename Predicate>
    void wait(Predicate pred) const {                 // (10)
        std::unique_lock<std::mutex> monitLock(monitMutex);
        monitCond.wait(monitLock, pred);
    }
    
private:
    mutable std::mutex monitMutex;
    mutable std::condition_variable monitCond;
};

template <typename T>                                  // (1)
class ThreadSafeQueue: public Monitor {
 public:
    void add(T val){ 
        lock();
        myQueue.push(val);                             // (6)
        unlock();
        notify_one();
    }
    
    T get(){ 
        wait( [this] { return ! myQueue.empty(); } );  // (2)
        lock();
        auto val = myQueue.front();                    // (4)
        myQueue.pop();                                 // (5)
        unlock();
        return val;
    }

private:
    std::queue<T> myQueue;                            // (3)
};


class Dice {
public:
    int operator()(){ return rand(); }
private:
    std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6), 
                                          std::default_random_engine());
};


int main(){
    
    std::cout << '\n';
    
    constexpr auto NumberThreads = 100;
    
    ThreadSafeQueue<int> safeQueue;                      // (7)

    auto addLambda = [&safeQueue](int val){ safeQueue.add(val);          // (8)
                                            std::cout << val << " "
                                            << std::this_thread::get_id() << "; "; 
                                          }; 
    auto getLambda = [&safeQueue]{ safeQueue.get(); };  // (9)

    std::vector<std::thread> addThreads(NumberThreads);
    Dice dice;
    for (auto& thr: addThreads) thr = std::thread(addLambda, dice());

    std::vector<std::thread> getThreads(NumberThreads);
    for (auto& thr: getThreads) thr = std::thread(getLambda);

    for (auto& thr: addThreads) thr.join();
    for (auto& thr: getThreads) thr.join();
    
    std::cout << "\n\n";
     
}        

 

The key idea of the example is that the Monitor Object is encapsulated in a class and can, therefore, be reused. The class Monitor uses a std::mutex as monitor lock and a std::condition_variable as monitor condition. The class Monitor provides the minimal interface that a Monitor Object should support.

 

ThreadSafeQueue in line (1) extends std::queue in line (3) with a thread-safe interface. ThreadSafeQueue derives from the class Monitor and uses its member functions to support the synchronized member functions add and get. The member functions add and get use the monitor lock to protect the Monitor Object, particularly the non-thread-safe myQueue. add notifies the waiting thread when a new item was added to myQueue. This notification is thread-safe. The member function get (line (3)) deserves more attention. First, the wait member function of the underlying condition variable is called. This wait call needs an additional predicate to protect against spurious and lost wakeups (C++ Core Guidelines: Be Aware of the Traps of Condition Variables). The operations modifying myQueue (lines 4 and 5) must also be protected because they can interleave with the call myQueue.push(val) (line 6). The Monitor Object safeQueue line (7) uses the lambda functions in lines (8) and (9) to add or remove a number from the synchronized safeQueue. ThreadSafeQueue itself is a class template and can hold values from an arbitrary type. One hundred clients add 100 random numbers between 1 - 6 to safeQueue (line 7), while hundred clients remove these 100 numbers concurrently from the safeQueue. The output of the program shows the numbers and the thread ids.

No alt text provided for this image

 

No alt text provided for this image

Modernes C++ Mentoring

Be part of my mentoring programs:

 Stay informed: Subscribe via E-Mail.

This program has two serious issues. Dietmar Kühl and Frank Birbacher wrote me an e-mail about it. These are their words, which I translated into English. My additions are cursive and bold.

 

  1. In ThreadSafeQueue::get() it is tested by Monitor::wait() whether myQueue contains an element or waits for an element to be contained. However, the lock is only held inside wait(), i.e. in get() you cannot be sure that the element is still in myQueue: another thread may get the lock and remove the element, resulting in undefined behavior on the call to myQueue.front().
  2. If the copy/move constructor of T throws an exception, the ThreadSafeQueue is in an inconsistent state: no member function is active, but the mutex is locked

 

The fix is that Monitor::wait() can only be called if a unique_lock is held. This can be achieved, for example, by having Monitor provide an appropriate (protected?) function that returns a suitable object and requests a reference to it in wait():

struct Monitor {
   using Lock = std::unique_lock<std::mutex>; // could be wrapper if you prefer
   [[nodiscard]] Lock receiveGuard() { return Lock(monitMutex); }
   template <typename Predicate>
   void wait(Lock& kerberos, Predicate pred) { monitCond.wait(kerberos, pred); }
   // …
};

template <typename T>
T ThreadSafeQueue<T>::get() {
   auto kerberos = receiveGuard();
   wait(kerberos, [this]{ return not myQueue.empty(); });
   T rc = std::move(myQueue.front());
   myqueue.pop();
   return rc;
}        

This version corrects the exception problem for get(). For add() you can simply use the monitor object with a lock_guard: 

 template <typename T>
void add(T val) {
   {
        std::lock_guard<Monitor> kerberos(*this);
        myqueue.push(std::move(val));
    }
    notify_one();
}        

I would probably wrap the notification in a "SendGuard" that contains a lock_guard and a reference to the condition_variable and sends the notification upon destruction:

 class SendGuard {
    friend class Monitor;
    using deleter = decltype([](auto& cond){ cond->notify_one(); });
    std::unique_ptr<std::condition_variable, deleter> notifier;
    std::lock_guard<std::mutex> kerberos;
    SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {}
};        

The move constructor and destructor should still be public and represent the whole interface! This would also make it much easier to use in add():

 template <typename T>
void add(T val) {
   auto kerberos = sendGuard();
   myqueue.push(val);
}        

Finally, here is the full implementation of Dietmar. The numbers correspond to the numbers in my monitorObjec.cpp example.

// threadsafequeue1.cpp

#include <condition_variable>
#include <functional>
#include <queue>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>

class Monitor {
public:
    using Lock = std::unique_lock<std::mutex>;
    [[nodiscard]] Lock receiveGuard() {
        return Lock(monitMutex);
    }

    template <typename Predicate>
    void wait(Lock& kerberos, Predicate pred) {
        monitCond.wait(kerberos, pred);
    }

    class SendGuard {
        friend class Monitor;
        using deleter = decltype([](auto* cond){ cond->notify_one(); });
        std::unique_ptr<std::condition_variable, deleter> notifier;
        std::lock_guard<std::mutex> kerberos;
        SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {}
    };

    SendGuard sendGuard() { return {monitMutex, monitCond}; }
    
private:
    mutable std::mutex monitMutex;
    mutable std::condition_variable monitCond;
};

template <typename T>                                  // (1)
class ThreadSafeQueue: public Monitor {
 public:
    void add(T val){ 
        auto kerberos = sendGuard();
        myQueue.push(val);                             // (6)
    }
    
    T get(){ 
        auto kerberos = receiveGuard();
        wait(kerberos, [this] { return ! myQueue.empty(); } );  // (2)
        auto val = myQueue.front();                    // (4)
        myQueue.pop();                                 // (5)
        return val;
    }

private:
    std::queue<T> myQueue;                            // (3)
};


class Dice {
public:
    int operator()(){ return rand(); }
private:
    std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6), 
                                          std::default_random_engine());
};


int main(){
    
    std::cout << '\n';
    
    constexpr auto NumberThreads = 100;
    
    ThreadSafeQueue<int> safeQueue;                      // (7)

    auto addLambda = [&safeQueue](int val){ safeQueue.add(val);          // (8)
                                            std::cout << val << " "
                                            << std::this_thread::get_id() << "; "; 
                                          }; 
    auto getLambda = [&safeQueue]{ safeQueue.get(); };  // (9)

    std::vector<std::thread> addThreads(NumberThreads);
    Dice dice;
    for (auto& thr: addThreads) thr = std::thread(addLambda, dice());

    std::vector<std::thread> getThreads(NumberThreads);
    for (auto& thr: getThreads) thr = std::thread(getLambda);

    for (auto& thr: addThreads) thr.join();
    for (auto& thr: getThreads) thr.join();
    
    std::cout << "\n\n";
     
}        

As a result of the discussion above, Frank has proposed the following version below, which has a consistent and easy-to-use interface for Monitor.

// threadSafeQueue.cpp

#include <atomic>
#include <algorithm>
#include <condition_variable>
#include <deque>
#include <iterator>
#include <mutex>
#include <stdexcept>
#include <thread>
#include <vector>


class Monitor {
public:
    struct UnlockAndNotify {
        std::mutex d_mutex;
        std::condition_variable d_condition;

        void lock() { d_mutex.lock(); }
        void unlock() { d_mutex.unlock(); d_condition.notify_one(); }
    };

private:
    UnlockAndNotify d_combined;

public:
    std::unique_lock<UnlockAndNotify> makeLockWithNotify() {
        return std::unique_lock{d_combined};
    }

    template <typename PRED>
    std::unique_lock<std::mutex> makeLockWithWait(PRED waitForCondition) {
        std::unique_lock lock{d_combined.d_mutex};
        d_combined.d_condition.wait(lock, waitForCondition);
        return lock;
    }
};

class ThreadQueue {
    Monitor d_monitor;
    std::deque<int> d_numberQueue;

    auto makeLockWhenNotEmpty() {
        return d_monitor.makeLockWithWait([this] { return !d_numberQueue.empty(); });
    }

public:
    void addNumber(int number) {
        const auto lock = d_monitor.makeLockWithNotify();
        d_numberQueue.push_back(number);
    }

    int removeNumber() {
        const auto lock = makeLockWhenNotEmpty();
        const auto number = d_numberQueue.front();
        d_numberQueue.pop_front();
        return number;
    }
};

int main() {
   ThreadQueue queue;
   std::atomic<int> sharedSum{};
   std::atomic<int> sharedCounter{};

   std::vector<std::jthread> threads;
   threads.reserve(200);
   std::generate_n(std::back_inserter(threads), 100, [&] {
       return std::jthread{[&] { sharedSum += queue.removeNumber(); }};
   });
   std::generate_n(std::back_inserter(threads), 100, [&] {
       return std::jthread{[&] { queue.addNumber(++sharedCounter); }};
   });

   threads.clear(); // wait for all threads to finish
   if (sharedSum.load() != 5050) {
       throw std::logic_error("Wrong result for sum of 1..100");
   }
}        

The implementation of the monitor pattern here is based on the flexibility of std::unique_lock through its template parameter. All of the std RAII lock guards can be used with any class that has lock() and unlock() methods. The UnlockAndNotify class implements this interface and notifies its condition variable from within the unlock() method. On top of that, the Monitor class provides a reduced public interface to create two different kinds of locks, one with notification and one without, by creating a std::unique_lock on either the whole UnlockAndNotify instance or just the contained std::mutex.

On the choice of std::unique_lock versus std::lock_guard I (Frank) prefer the unique_lock in the interface. This choice allows the user of the Monitor class more flexibility. I value this flexibility higher than a possible performance difference to lock_guard which anyway needs to be measured first. I acknowledge that the given examples don't make use of this extra flexibility.

Afterward, Dietmar further developed Frank's idea: here, the protected data is kept in the Monitor, making it harder to access it unprotected

// threadsafequeue2.cpp

#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <deque>
#include <functional>
#include <iostream>
#include <iterator>
#include <mutex>
#include <random>
#include <stdexcept>
#include <thread>
#include <tuple>
#include <vector>

namespace patterns::monitor3 {

template <typename T>
class Monitor {
public:
   struct UnlockAndNotify {
       std::mutex d_mutex;
       std::condition_variable d_condition;
   
       void lock() { d_mutex.lock(); }
       void unlock() { d_mutex.unlock(); d_condition.notify_one(); }
   };

private:
   mutable UnlockAndNotify d_combined;
   mutable T               d_data;

public:
   std::tuple<T&, std::unique_lock<UnlockAndNotify>> makeProducerLock() const {
       return { d_data, std::unique_lock{d_combined} };
   }

   template <typename PRED>
   std::tuple<T&, std::unique_lock<std::mutex>> makeConsumerLockWhen(PRED predicate) const {
       std::unique_lock lock{d_combined.d_mutex};
       d_combined.d_condition.wait(lock, [this, predicate]{ return predicate(d_data); });
       return { d_data, std::move(lock) };
   }
};

template <typename T>
class ThreadQueue {
   Monitor<std::deque<T>> d_monitor;

public:
   void add(T number) {
       auto[numberQueue, lock] = d_monitor.makeProducerLock();
       numberQueue.push_back(number);
   }

   T remove() {
       auto[numberQueue, lock] = d_monitor.makeConsumerLockWhen([](auto& numberQueue) { return !numberQueue.empty(); });
       const auto number = numberQueue.front();
       numberQueue.pop_front();
       return number;
   }
};
}

class Dice {
public:
    int operator()(){ return rand(); }
private:
    std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6), 
                                          std::default_random_engine());
};

int main(){
    
    std::cout << '\n';
    
    constexpr auto NumberThreads = 100;
    
    patterns::monitor3::ThreadQueue<int> safeQueue;                     

    auto addLambda = [&safeQueue](int val){ safeQueue.add(val);         
                                            std::cout << val << " "
                                            << std::this_thread::get_id() << "; "; 
                                          }; 
    auto getLambda = [&safeQueue]{ safeQueue.remove(); };  

    std::vector<std::thread> addThreads(NumberThreads);
    Dice dice;
    for (auto& thr: addThreads) thr = std::thread(addLambda, dice());

    std::vector<std::thread> getThreads(NumberThreads);
    for (auto& thr: getThreads) thr = std::thread(getLambda);

    for (auto& thr: addThreads) thr.join();
    for (auto& thr: getThreads) thr.join();
    
    std::cout << "\n\n";
     
}        

Once more, thanks a lot to Frank and Dietmar. I didn't want to prove, with my erroneous implementation of a thread-safe queue in my previous post, that concurrency is hard to get right. I'm particularly annoyed that I don't put the mutex inside a lock (error 2). I teach this in my C++ classes: NNM (No Naked Mutex).

What's next?

In my next post, I dive into the future of C++: C++23.

 

Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, G Prvulovic, Reinhold Dröge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Animus24, Jozo Leko, John Breland, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Robert Blanch, Truels Wissneth, Kris Kafka, Mario Luoni, Friedrich Huber, lennonli, Pramod Tikare Muralidhara, Peter Ware, Daniel Hufschläger, Alessandro Pezzato, Bob Perry, Satish Vangipuram, Andi Ireland, Richard Ohnemus, Michael Dunsky, Leo Goodstadt, John Wiederhirn, Yacob Cohen-Arazi, Florian Tischler, Robin Furness, Michael Young, Holger Detering, Bernd Mühlhaus, Matthieu Bolt, Stephen Kelley, Kyle Dean, Tusar Palauri, Dmitry Farberov, Juan Dent, George Liao, Daniel Ceperley, Jon T Hess, Stephen Totten, Wolfgang Fütterer, Matthias Grün, Phillip Diekmann, Ben Atakora, Ann Shatoff, and Rob North.

Thanks, in particular, to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, Dendi Suhubdy, Sudhakar Belagurusamy, Richard Sargeant, Rusty Fleming, John Nebel, Mipko, Alicja Kaminska, and Slavko Radman.

My special thanks to Embarcadero, PVS-Studio , Tipi.build, and Take Up Code.

 

Seminars

I'm happy to give online seminars or face-to-face seminars worldwide. Please call me if you have any questions.

Bookable (Online)

German

Standard Seminars (English/German)

Here is a compilation of my standard seminars. These seminars are only meant to give you a first orientation.

  • C++ - The Core Language
  • C++ - The Standard Library
  • C++ - Compact
  • C++11 and C++14
  • Concurrency with Modern C++
  • Design Pattern and Architectural Pattern with C++
  • Embedded Programming with Modern C++
  • Generic Programming (Templates) with C++

New

  • Clean Code with Modern C++
  • C++20

Contact Me

Modernes C++,

No alt text provided for this image



To view or add a comment, sign in

More articles by Rainer Grimm

  • Charity run for ALS

    Tomorrow, on the 28th, there will be a charity run for ALS in Rottenburg. The course is exactly 1 km long.

    2 Comments
  • Small Safety Improvements in the C++ 26 Core Language

    Safety is an important concern in C++26. Contracts are probably the most important feature for safety.

    1 Comment
  • My ALS Journey (30/n): Cippi at the CppCon

    This week was very exciting for Cippi. She visited CppCon in Aurora, near Denver.

    2 Comments
  • Contracts: Evaluation Semantic

    After briefly presenting the details of contracts in my last article, “Contracts: A Deep Dive“, I would like to take a…

  • My ALS Journey (29/n): I feel Good

    I often receive messages asking about my health and wishing me well. I am very happy to receive these messages and just…

    5 Comments
  • Contracts: A Deep Dive

    August 25, 2025/in C++26/by Rainer GrimmI already introduced contracts in the article “Contracts in C++26”. In this…

  • My ALS Journey (28/n): Bureaucracy – The German Disease

    Today I want to write about a sad topic. Bureaucracy in the German healthcare system is becoming increasingly absurd.

    2 Comments
  • Data-Parallel Types: Algorithms

    The data-parallel types library has four special algorithms for SIMD vectors. The four special algorithms are min, max,…

  • My ALS Journey (27/n): An Emergency Call

    Firstly, I would like to say that I am doing very well and have made a full recovery from my incident. However, I would…

    8 Comments
  • Data-Parallel Types: Reduction

    In this article, I will discuss reduction and mask reduction for data-parallel types. Reduction A reduction reduces the…

Others also viewed

Explore content categories