1

I am trying to learn how to get conditional variables working with some threads, but am having some troubles. The goal is to get the main thread to spawn some 'tasks' for a set period of time which instantiates a Countdown object that includes a thread which uses the timeout of the condition_variable to know that the requested time has indeed lapsed before notifying the caller.

Each time I run the example below it gets a different result, sometimes erroring out, sometimes running, but the timeout for both child threads is shorter than requested (assuming it gets that far).

I appreciate that there are other ways to skin-the-cat to time threads and I could for instance use atomic_x or other means instead of a callback, but I just would like to see how to get the spirit of this implementation (i.e. conditional variable so I don't have to poll).

#include <chrono>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <map>
#include <mutex>
#include <thread>

using Time = std::chrono::system_clock;
using Seconds = std::chrono::seconds;
using Timepoint = Time::time_point;

class Countdown {
private:
    Timepoint               target;
    std::thread             t;
    std::condition_variable cv;
    std::mutex              cv_m;
    unsigned int            guid;
    std::string             name;

public:
    Countdown() 
    { // Needed to compile, but doesn't appear to run 
        std::cout << "empty Countdown constructor" << std::endl;
    }

    Countdown(unsigned int guid_, std::string name_, unsigned int waitFor, std::function<void(unsigned int)> callback)
        : guid(guid_)
        , name(name_)
        , target(Time::now() + Seconds(waitFor))
    {

        auto exec_run = [this, guid_, waitFor, callback]() mutable {
            std::unique_lock<std::mutex> lk(cv_m);
            std::cout << "[Thread " << guid_ << "] waiting for " << waitFor << " seconds." << std::endl;
 
            Timepoint before = Time::now();
            if (cv.wait_until(lk, target) == std::cv_status::timeout)
            {
                Timepoint after = Time::now();
                std::chrono::duration<float> difference = after - before;
                std::cout << "[Thread " << guid_ << "] Elapsed " << difference.count() << " seconds." << std::endl;
                callback(guid_);
            } 
        };
        
        t = std::thread(exec_run);
    }

    Countdown(Countdown &&from) // move constructor
    {
        //std::cout << "Countdown move constructor" << std::endl; 
        target = from.target;
        t = std::move(from.t);
        name = from.name;
        guid = from.guid;
    }

    ~Countdown()
    {
        //std::cout << "~Countdown()" << std::endl; 
        if (t.joinable()) t.join();
    }
};

class Holder {
private:
    std::map<unsigned int, Countdown>   waitlist;
    unsigned int                        id;
    std::vector<unsigned int>           completed;

public:
    Holder()
        : id(0)
    { }

    // Create a new task with a name for WaitFor (s) period of time
    unsigned int addTask(std::string name, unsigned int waitFor) {
        id++;
        waitlist.emplace(std::pair(id, Countdown(id, name, waitFor, 
                                  std::bind(&Holder::taskComplete, this, std::placeholders::_1))));

        return id;
    }

    void taskComplete(unsigned int id)
    {
        std::cout << "[Thread " << id << "] taskComplete" << std::endl;
        // Add task id to the completed list to be picked up by main thread
        completed.push_back(id);
    }

    void cleanupCompleted()
    {
        // Purge the completed entries from the waitlist
        for (auto& id : completed)
        {
            std::cout << "[Main] Erasing task: " << id << std::endl;
            waitlist.erase(id);
        }

        // Empty the completed list
        completed.clear();
    }
};

int main()
{
    Holder *h = new Holder();
    // Create a task which spawns a thread, which notifies us when complete
    unsigned int id1 = h->addTask("fluffy", 1); // 1 second task
    unsigned int id2 = h->addTask("woof", 4);   // 4 second task
    std::cout << "[Main]: Done adding tasks.." << std::endl;
 
    // Rest a while..
    std::this_thread::sleep_for(Seconds(5));
    h->cleanupCompleted();

    // Just to show the main thread continues on.
    std::cout << "[Main]: Doing other stuff.." << std::endl;
    delete(h);

    return 0;
}

Actual / Expected Result:

[Main]: Done adding tasks..
[Thread 2] waiting for 4 seconds.
[Thread 1] waiting for 1 seconds.
[Thread 2] Elapsed 8.243e-06 seconds. **(This should be ~ 4 seconds)**
[Thread 2] taskComplete
[Thread 1] Elapsed 0.000124505 seconds. **(This should be ~1 second)**
[Thread 1] taskComplete
[Main] Erasing task: 2
[Main] Erasing task: 1
[Main]: Doing other stuff..

I 'think' the problem might be around the in-place addition of the 'Countdown' object to the map in the 'Holder' class. It needs a move constructor whereby I manually set each field (except the mutex and conditional_variable).

Any advice on how to do this correctly?

7
  • 1
    Are you using g++ or clang++? If you are, use the power of the Sanitizers. Compile with -g -fsanitize=address,undefined and run your program, fix all the issues. Then compile with -g -fsanitize=thread and run your program, fix all the issues. Go back to using address,undefined and repeat until none of those sanitizers report any errors. Unfortunately, you can't combine the threadsanitizer with address,undefined so you'll have to compile them separately. Commented Aug 24, 2023 at 16:12
  • Hey Ted. Funny you should respond. I'm investigating this based on your suggestion from yesterday. I'm using g++. I will give your advice a try =). Commented Aug 24, 2023 at 16:43
  • 1
    It's a small world :-) Hope you find the sanitizers as useful as I do! Commented Aug 24, 2023 at 16:48
  • Apparently all-the-action is in the fsanitize=thread. Now my next rabbit hole to understand and fix it =) Commented Aug 24, 2023 at 16:50
  • 1
    Yeah, the thread sanitizer's output isn't as easy to understand as the address,undefined output imo, but as you can see in the link, asan & ubsan also reports "stack-use-after-return" so you could start with trying to understand that one. When compiled with -Wall -Wextra the compiler also reports "...will be initialized after" which isn't a problem here, but it's a sign of that you're trying to initialize things in the wrong order. Try to get it free from those kinds of compiler warnings too. Commented Aug 24, 2023 at 17:43

1 Answer 1

1

Ok, I have it, so I'll post the solution below. It came down to a number of mistakes on my part;

  • from compile switches -g -fsanitize=thread I was able to see I had a number of read/write race conditions and a bunch of other excitement which required some love from a bunch of mutexes.
  • following from above I was calling a read from the thread after the parent object was destroyed. This was because the construction of Coundown object and it's insertion into the waitlist map, caused a move constructor to be created twice, so I separated out another function (start) which would then start the thread after all the constructor nonsense was finished with.
  • From compile switch -Wall picked up I was initializing the initialization list out of order (I didn't know that was a thing)

.

#include <chrono>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <map>
#include <mutex>
#include <thread>

using Time = std::chrono::system_clock;
using Seconds = std::chrono::seconds;
using Timepoint = Time::time_point;

class Countdown {
private:
    Timepoint               target;
    unsigned int            guid;
    std::string             name;
    std::thread             t;
    std::condition_variable cv;
    std::mutex              cv_m;

public:
    Countdown() // Needed to compile, but doesn't appear to run 
    { }

    Countdown(unsigned int guid_, std::string name_, unsigned int waitFor)
        : target(Time::now() + Seconds(waitFor))
        , guid(guid_)
        , name(name_)
    { }

    Countdown(Countdown &&from) // move constructor
    {
        //std::cout << "Countdown move constructor" << std::endl; 
        target = from.target;
        t = std::move(from.t);
        name = from.name;
        guid = from.guid;
    }

    ~Countdown()
    {
        //std::cout << "~Countdown()" << std::endl; 
        if (t.joinable()) t.join();
    }

    void start(std::function<void(unsigned int)> callback)
    {
        auto exec_run = [this, callback]() mutable {
            std::unique_lock<std::mutex> lk(cv_m);
            unsigned int guid = getGUID();
            //std::time_t target = std::chrono::system_clock::to_time_t(getTarget());
            //std::cout << "[Thread " << guid << "] waiting for " << std::ctime(&target) << " seconds." << std::endl;
 
            Timepoint before = Time::now();
            if (cv.wait_until(lk, target) == std::cv_status::timeout)
            {
                Timepoint after = Time::now();
                std::chrono::duration<float> difference = after - before;
                std::cout << "[Thread " << guid << "] Elapsed " << difference.count() << " seconds." << std::endl;
                callback(guid);
            } 
        };
        
        t = std::thread(exec_run);
    }

    unsigned int getGUID() { return guid; }

};

class Holder {
private:
    unsigned int                        id;
    std::map<unsigned int, Countdown>   waitlist;
    std::mutex                          waitlist_mutex;
    std::vector<unsigned int>           completed;
    std::mutex                          completed_mutex;

public:
    Holder()
        : id(0)
    { }

    // Create a new task with a name for WaitFor (s) period of time
    unsigned int addTask(std::string name, unsigned int waitFor) {
        id++;
        std::lock_guard<std::mutex> guard(waitlist_mutex);
        Countdown newCountdown(id, name, waitFor);
        
        // Needed to separate the following into 3 lines to give the move constructors a chance
        // to settle down, and separate out a start which initiates the thread so that the thread
        // isn't trying to access stuff as the destructors after the move constructor are called
        std::pair<unsigned int, Countdown> pair(id, std::move(newCountdown));
        waitlist.emplace(std::move(pair));
        
        waitlist[id].start(std::bind(&Holder::taskComplete, this, std::placeholders::_1)); 
        
        return id;
    }

    void taskComplete(unsigned int id)
    {
        std::cout << "[Thread " << id << "] taskComplete" << std::endl;
        // Add task id to the completed list to be picked up by main thread
        std::lock_guard<std::mutex> guard(completed_mutex);
        completed.push_back(id);
    }

    void cleanupCompleted()
    {
        // Purge the completed entries from the waitlist
        std::lock_guard<std::mutex> guard_c(completed_mutex);

        for (auto& id : completed)
        {
            std::cout << "[Main] Erasing task: " << id << std::endl;
            std::lock_guard<std::mutex> guard_w(waitlist_mutex);
            waitlist.erase(id);
        }

        // Empty the completed list
        completed.clear();
    }
};

int main()
{
    Holder *h = new Holder();
    // Create a task which spawns a thread, which notifies us when complete
    h->addTask("fluffy", 1);
    h->addTask("woof", 4);
    std::cout << "[Main]: Done adding tasks.." << std::endl;
 
    // Rest a while..
    std::this_thread::sleep_for(Seconds(5));
    h->cleanupCompleted();

    // Just to show the main thread continues on.
    std::cout << "[Main]: Doing other stuff.." << std::endl;
    delete(h);

    return 0;
}

...and the proof =)

[Main]: Done adding tasks..
[Thread 1] Elapsed 0.999498 seconds.
[Thread 1] taskComplete
[Thread 2] Elapsed 3.99984 seconds.
[Thread 2] taskComplete
[Main] Erasing task: 1
[Main] Erasing task: 2
[Main]: Doing other stuff..

Thanks again Ted =)

Sign up to request clarification or add additional context in comments.

1 Comment

Cool! Next, compile your program normally (without sanitizers) and use valgrind --tool=helgrind --exit-on-first-error=yes --error-exitcode=1 ./your_program which is a thread error detector that's also pretty good, but I'm not sure how good. It does report a possible data race that might be worth looking in to.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.