I'm implementing a lock-free, multiple consumer, multiple producer FIFO queue/pipe as an exercise in thinking about atomicity in operations.
My main concern is correctness of operation, my second concern is good practices around atomics and general C++11. Performance is interesting but not important for this exercise.
Without futher ado, here's the code:
#include <atomic>
#include <exception>
// For dump
#include <iostream>
#include <string>
/// <summary> A lock free queue implementation.
///
/// Design notes: Here be dragons. The queue is implemented as a single linked
/// list with a head, divider and tail pointer. These are always ordered such
/// that "head -> divider -> tail" and are always non-null. The divider's next
/// pointer points to the first node with data or is null. In other words, this
/// means that "divider == tail -> empty container". Nodes between head and
/// divider are empty and will be freed lazily. </summary>
///
/// <remarks> * Thread Safety : Full.
/// * Exception Safety: Basic. </remarks>
///
/// <tparam name="T"> Generic type parameter. </tparam>
template<typename T>
class lockfree_queue{
struct link;
using link_ptr = std::atomic < link* > ;
struct link{
link() noexcept = default;
link(const link&) = delete;
link& operator = (const link&) = delete;
link_ptr m_next{ nullptr };
};
struct node : link{
template<typename... Args>
node(Args&&... args)
: m_data(std::forward<Args>(args)...)
{}
T m_data;
};
public:
using size_type = std::size_t;
using value_type = T;
/// <summary> Destructor, it's the users responsibility to make sure that
/// no one uses the class after it's destruction and that no
/// thread is in any of the function bodies. </summary>
~lockfree_queue(){
free_nodes(m_head.m_next.load());
}
/// <summary> Tests if this container is empty. This operation only makes
/// sense if there is only one thread reading/consuming the queue.
/// </summary>
/// <returns> True if the queue is empty, false otherwise. </returns>
bool empty() const noexcept{
return m_divider.load() == m_tail.load();
}
/// <summary> Gets the instantaneous number of elements in the queue.
/// Mostly useful as a debug probe to monitor the queue size.
/// </summary>
/// <returns> The number of elements in the queue. </returns>
size_type size() const noexcept{
return m_size;
}
/// <summary> Emplaces a new node on the queue. If the construction of the
/// data throws, the queue is unmodified. </summary>
/// <tparam name="Args"> Type of the arguments. </tparam>
/// <param name="args"> Variable arguments providing the arguments to
/// construct the data with.</param>
template<typename... Args>
void emplace(Args&&... args){
auto l_new_node = new node(std::forward<Args>(args)...);
// m_tail->m_next can have two states:
// 1) It's non-null, means an insertion is in progress but has not bee completed.
// 2) It's null, means no insertion is in progress.
// m_tail->m_next will only be written from this function.
// This loop does a CAS with m_tail->m_next to see if it is null and if it is it
// inserts the new node. At which point any concurrent push will retry until (3)
// below completes.
link* l_null = nullptr;
while (!m_tail.load()->m_next.compare_exchange_weak(l_null, l_new_node));
m_tail = l_new_node; // 3) Commit/publish the new tail.
m_size++;
}
void dump(){
auto n = &m_head;
while (n != nullptr) {
std::string special = "";
if (n == m_divider.load())
special += "D";
if (n == m_tail.load())
special += "T";
std::cout << "[(" << special << ")";
if (n != &m_head)
std::cout << "\"" << static_cast<node*>(n)->m_data << "\"";
else
std::cout << "sentinel";
std::cout << "(" << n << ")] -> ";
n = n->m_next.load();
}
std::cout << "[null]" << std::endl;
}
/// <summary> Consumes one item from the queue. The item is move assigned
/// to result. If the assignment throws, the queue is will have
/// dropped consumed item but is otherwise unmodified. </summary>
/// <param name="result"> [in,out] The result. </param>
/// <returns> An auto. </returns>
bool consume(T& result){
link* l_divider = nullptr;
link* l_snack = nullptr;
// Try to temporarily unlink the head if it is not already unlinked and
// it's not the divider
auto l_head = m_head.m_next.load();
if (l_head == nullptr || &m_head == m_divider.load() ||
!m_head.m_next.compare_exchange_strong(l_head, nullptr)){
l_head = nullptr; // We didn't get to unlink the head this time.
}
do{
// The divider's next pointer points to the next node with data.
l_divider = m_divider.load();
l_snack = l_divider->m_next.load(); // divider is never null.
if (nullptr == l_snack)
return false; // empty
// If the CAS below succeeds, then no one has moved the divider since
// we loaded the new divider position (which is non-null) and we have
// moved the divider to the next node without interruption.
} while (!m_divider.compare_exchange_weak(l_divider, l_snack));
m_size--;
try{
result = std::move(static_cast<node*>(l_snack)->m_data);
cleanup_pop(l_head, l_divider);
}
catch (...){
cleanup_pop(l_head, l_divider);
std::rethrow_exception(std::current_exception());
}
return true;
}
private:
void free_nodes(link* from, link* up_until = nullptr) noexcept {
assert(from != &m_head);
while (from != up_until){
auto next = from->m_next.load();
// All links but the head are nodes, necessary to destroy data.
delete static_cast<node*>(from);
from = next;
}
}
void cleanup_pop(link* l_head, link* l_divider) noexcept {
if (l_head){
// The head has been unlinked by us and we are the only ones
// with a handle to the detached head. We can now safely free
// all nodes from the detached head up until the divider.
auto new_divider = l_divider->m_next.load();
free_nodes(l_head, new_divider);
// Oh, and re-link the head
m_head.m_next = new_divider;
}
}
link m_head;
link_ptr m_divider{ &m_head };
link_ptr m_tail{ &m_head };
std::atomic<size_type> m_size{ 0 };
};
int main(){
lockfree_queue<double> q;
assert(true == q.empty());
assert(0 == q.size());
q.dump();
q.emplace(0);
q.dump();
q.emplace(1);
q.dump();
q.emplace(2);
q.dump();
q.emplace(3);
q.dump();
double ans;
q.consume(ans);
q.dump();
assert(ans == 0);
q.consume(ans);
q.dump();
assert(ans == 1);
q.consume(ans);
q.dump();
assert(ans == 2);
q.consume(ans);
q.dump();
assert(ans == 3);
q.emplace(3.14);
q.dump();
q.consume(ans);
q.dump();
assert(ans == 3.14);
}
I'm also interested in if anyone has some ideas on good test cases for the correctness under concurrency.