Ein Projekt, an dem ich arbeite, verwendet mehrere Threads, um an einer Sammlung von Dateien zu arbeiten. Jeder Thread kann Dateien zur Liste der zu bearbeitenden Dateien hinzufügen, sodass ich eine Thread-sichere Warteschlange zusammenstellen würde (was ich dachte). Relevante Teile folgen:
// qMutex is a std::mutex intended to guard the queue
// populatedNotifier is a std::condition_variable intended to
// notify waiting threads of a new item in the queue
void FileQueue::enqueue(std::string&& filename)
{
std::lock_guard<std::mutex> lock(qMutex);
q.Push(std::move(filename));
// Notify anyone waiting for additional files that more have arrived
populatedNotifier.notify_one();
}
std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
std::unique_lock<std::mutex> lock(qMutex);
if (q.empty()) {
if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) {
std::string ret = q.front();
q.pop();
return ret;
}
else {
return std::string();
}
}
else {
std::string ret = q.front();
q.pop();
return ret;
}
}
Ich bin jedoch gelegentlich im if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { }
-Block segfaulting, und eine Überprüfung in gdb zeigt an, dass die Segfaults auftreten, weil die Warteschlange leer ist. Wie ist das möglich? Ich habe verstanden, dass wait_for
nur cv_status::no_timeout
zurückgibt, wenn es benachrichtigt wurde, und dies sollte nur geschehen, nachdem FileQueue::enqueue
gerade ein neues Element in die Warteschlange verschoben hat.
Gemäß dem Standard dürfen condition_variables
fehlerhaft aufwachen, auch wenn das Ereignis nicht aufgetreten ist. Im Falle eines unechten Aufwachens wird cv_status::no_timeout
zurückgegeben (da es statt des Zeitlimits aufgewacht ist), obwohl es nicht benachrichtigt wurde. Die richtige Lösung dafür ist natürlich zu prüfen, ob das Aufwecken tatsächlich legitim war, bevor die Prozedur durchgeführt wurde.
Die Details sind in der Norm §30.5.1 [thread.condition.condvar] angegeben:
—Die Funktion wird die Blockierung aufheben, wenn durch einen Aufruf von notify_one (), einem Aufruf von notify_all (), dem Ablauf des absoluten Timeouts (30.2.4), der durch abs_time festgelegt wurde, oder falsch gemeldet wird.
...
Gibt Folgendes zurück: cv_status :: timeout, wenn das durch abs_time angegebene absolute Timeout (30.2.4) abgelaufen ist, andernfalls cv_status :: no_timeout.
Wenn Sie nur eine Bedingungsvariable prüfen, ist es am besten, eine while-Schleife zu verwenden (wenn sie aufwacht und immer noch nicht ungültig ist, überprüfen Sie sie erneut). Ich habe gerade eine Vorlage für eine asynchrone Warteschlange geschrieben. Ich hoffe, das hilft.
#ifndef SAFE_QUEUE
#define SAFE_QUEUE
#include <queue>
#include <mutex>
#include <condition_variable>
// A threadsafe-queue.
template <class T>
class SafeQueue
{
public:
SafeQueue(void)
: q()
, m()
, c()
{}
~SafeQueue(void)
{}
// Add an element to the queue.
void enqueue(T t)
{
std::lock_guard<std::mutex> lock(m);
q.Push(t);
c.notify_one();
}
// Get the "front"-element.
// If the queue is empty, wait till a element is avaiable.
T dequeue(void)
{
std::unique_lock<std::mutex> lock(m);
while(q.empty())
{
// release lock as long as the wait and reaquire it afterwards.
c.wait(lock);
}
T val = q.front();
q.pop();
return val;
}
private:
std::queue<T> q;
mutable std::mutex m;
std::condition_variable c;
};
#endif
So sollten Sie es wahrscheinlich machen:
void Push(std::string&& filename)
{
{
std::lock_guard<std::mutex> lock(qMutex);
q.Push(std::move(filename));
}
populatedNotifier.notify_one();
}
bool try_pop(std::string& filename, std::chrono::milliseconds timeout)
{
std::unique_lock<std::mutex> lock(qMutex);
if(!populatedNotifier.wait_for(lock, timeout, [this] { return !q.empty(); }))
return false;
filename = std::move(q.front());
q.pop();
return true;
}
Zusätzlich zu der akzeptierten Antwort würde ich sagen, dass die Implementierung einer richtigen Warteschlange für Multiproduzenten/Multi-Consumer schwierig ist (obwohl dies seit C++ 11 einfacher ist).
Ich würde Ihnen empfehlen, die (sehr gute) lock-freie Boost-Bibliothek auszuprobieren , die "Queue" -Struktur wird tun, was Sie wollen, mit wartungsfreien/lock-freien Garantien und ohne die Notwendigkeit eines C + +11 Compiler .
Ich füge diese Antwort jetzt hinzu, weil die schlossfreie Bibliothek recht neu ist (seit 1.53, glaube ich).
Ich würde Ihre Dequeue-Funktion wie folgt umschreiben:
std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
std::unique_lock<std::mutex> lock(qMutex);
while(q.empty()) {
if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::timeout )
return std::string();
}
std::string ret = q.front();
q.pop();
return ret;
}
Es ist kürzer und hat keinen doppelten Code wie Sie. Nur das Problem kann länger als das Timeout warten. Um dies zu verhindern, müssen Sie sich die Startzeit vor der Schleife merken, überprüfen Sie das Zeitlimit und passen Sie die Wartezeit entsprechend an. Oder geben Sie die absolute Wartezeit an.
Es gibt auch eine GLib-Lösung für diesen Fall, ich habe es noch nicht ausprobiert, aber ich glaube, dass es eine gute Lösung ist . https://developer.gnome.org/glib/2.36/glib-Asynchronous-Queues) .html # g-async-queue-new
BlockingCollection ist eine sichere C++ 11-Thread-Collection, die Unterstützung für Warteschlangen-, Stapel- und Prioritätscontainer bereitstellt. Es behandelt das von Ihnen beschriebene "leere" Warteschlangen-Szenario. Sowie eine "volle" Warteschlange.
Sie mögen lfqueue, https://github.com/Taymindis/lfqueue . Die gleichzeitige Warteschlange für Sperren ist kostenlos Ich verwende es derzeit, um die Warteschlange von mehreren eingehenden Anrufen zu nutzen und funktioniert wie ein Zauber.