I'm working on a programming exercise (university, nothing related to industry) which basically asks to implement a Buffer to be used by two threads (a producer and a consumer). The first one enqueues data calling next(T t), while the other gets the oldest value (in a FIFO mechanism) calling consume() or waits if the buffer is empty. The producer can send a stop signal to declare the enqueuing ended. The text also requires a fail() method in case anything goes wrong, but I'd like to ignore it for this question.
This is my solution
template <typename T>
class Buffer {
std::mutex m;
std::condition_variable cv;
std::queue<T> values;
bool stop, failed;
std::exception_ptr _eptr;
public:
Buffer() : stop(false), failed(false) {}
void fail (const std::exception_ptr &eptr){
{
std::unique_lock ul{m};
failed = true;
_eptr = eptr;
}
cv.notify_all();
}
void terminate(){
{
std::unique_lock ul {m};
if (stop || failed ) throw std::runtime_error("enqueing has stopped");
stop = true;
}
cv.notify_one(); // notify stop signal
}
void next(T t) {
{
std::unique_lock ul{m};
if ( stop || failed ) throw std::runtime_error ("enqueing has stopped");
values.push(t);
}
cv.notify_one(); // notify the consumer (if waiting)
}
std::optional<T> consume(){
std::unique_lock ul{m};
cv.wait(ul, [this]() { return !values.empty() || stop || failed; });
if (values.empty()) { // if got notified and the queue is empty, then stop or failed have been sent
if (stop)
return std::nullopt;
else
std::rethrow_exception(_eptr);
}
// extract the value to consume
T val = values.front();
values.pop();
return std::optional<T>(val);
}
};
This is how I think the Buffer might be used (I'm still ignoring the fail() method)
#define N 10000
Buffer<int> buf;
std::thread prod([&buf](){
for(int i = 0 ; i < N; ++i) {
std::cout << "enqueing: " << i << std::endl;
buf.next(i);
}
buf.terminate();
});
std::thread cons([&buf](){
for(int i = 0; i < N; ++i)
std::cout << "consuming: " << buf.consume().value() << std::endl;
});
prod.join();
cons.join();
I got some questions:
do you agree this is nothing but a blocking queue or am I missing something ?
do I need to implement the destructor ? If it is the case, can you please show me an example of usage that requires having it?
What happens if the object goes out of scope and nobody called
terminate()? Should I take care of this problem ? Is it anyway aBuffer's problem or the programmer using this class should care about it ? Can you please show me an example when this happens (I was thinking about the threads being detached instead of joined, does it fit ?) ?