wake-up-neo.net

Thread-sichere Warteschlange für C++ 11

Ein Projekt, an dem ich arbeite, verwendet mehrere Threads, um an einer Sammlung von Dateien zu arbeiten. Jeder Thread kann Dateien zur Liste der zu bearbeitenden Dateien hinzufügen, sodass ich eine Thread-sichere Warteschlange zusammenstellen würde (was ich dachte). Relevante Teile folgen:

// qMutex is a std::mutex intended to guard the queue
// populatedNotifier is a std::condition_variable intended to
//                   notify waiting threads of a new item in the queue

void FileQueue::enqueue(std::string&& filename)
{
    std::lock_guard<std::mutex> lock(qMutex);
    q.Push(std::move(filename));

    // Notify anyone waiting for additional files that more have arrived
    populatedNotifier.notify_one();
}

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    if (q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) {
            std::string ret = q.front();
            q.pop();
            return ret;
        }
        else {
            return std::string();
        }
    }
    else {
        std::string ret = q.front();
        q.pop();
        return ret;
    }
}

Ich bin jedoch gelegentlich im if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { }-Block segfaulting, und eine Überprüfung in gdb zeigt an, dass die Segfaults auftreten, weil die Warteschlange leer ist. Wie ist das möglich? Ich habe verstanden, dass wait_for nur cv_status::no_timeout zurückgibt, wenn es benachrichtigt wurde, und dies sollte nur geschehen, nachdem FileQueue::enqueue gerade ein neues Element in die Warteschlange verschoben hat.

52
Matt Kline

Gemäß dem Standard dürfen condition_variables fehlerhaft aufwachen, auch wenn das Ereignis nicht aufgetreten ist. Im Falle eines unechten Aufwachens wird cv_status::no_timeout zurückgegeben (da es statt des Zeitlimits aufgewacht ist), obwohl es nicht benachrichtigt wurde. Die richtige Lösung dafür ist natürlich zu prüfen, ob das Aufwecken tatsächlich legitim war, bevor die Prozedur durchgeführt wurde.

Die Details sind in der Norm §30.5.1 [thread.condition.condvar] angegeben:

—Die Funktion wird die Blockierung aufheben, wenn durch einen Aufruf von notify_one (), einem Aufruf von notify_all (), dem Ablauf des absoluten Timeouts (30.2.4), der durch abs_time festgelegt wurde, oder falsch gemeldet wird. 

... 

Gibt Folgendes zurück: cv_status :: timeout, wenn das durch abs_time angegebene absolute Timeout (30.2.4) abgelaufen ist, andernfalls cv_status :: no_timeout.

26
Grizzly

Wenn Sie nur eine Bedingungsvariable prüfen, ist es am besten, eine while-Schleife zu verwenden (wenn sie aufwacht und immer noch nicht ungültig ist, überprüfen Sie sie erneut). Ich habe gerade eine Vorlage für eine asynchrone Warteschlange geschrieben. Ich hoffe, das hilft.

#ifndef SAFE_QUEUE
#define SAFE_QUEUE

#include <queue>
#include <mutex>
#include <condition_variable>

// A threadsafe-queue.
template <class T>
class SafeQueue
{
public:
  SafeQueue(void)
    : q()
    , m()
    , c()
  {}

  ~SafeQueue(void)
  {}

  // Add an element to the queue.
  void enqueue(T t)
  {
    std::lock_guard<std::mutex> lock(m);
    q.Push(t);
    c.notify_one();
  }

  // Get the "front"-element.
  // If the queue is empty, wait till a element is avaiable.
  T dequeue(void)
  {
    std::unique_lock<std::mutex> lock(m);
    while(q.empty())
    {
      // release lock as long as the wait and reaquire it afterwards.
      c.wait(lock);
    }
    T val = q.front();
    q.pop();
    return val;
  }

private:
  std::queue<T> q;
  mutable std::mutex m;
  std::condition_variable c;
};
#endif
43

So sollten Sie es wahrscheinlich machen:

void Push(std::string&& filename)
{
    {
        std::lock_guard<std::mutex> lock(qMutex);

        q.Push(std::move(filename));
    }

    populatedNotifier.notify_one();
}

bool try_pop(std::string& filename, std::chrono::milliseconds timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);

    if(!populatedNotifier.wait_for(lock, timeout, [this] { return !q.empty(); }))
        return false;

    filename = std::move(q.front());
    q.pop();

    return true;    
}
12
ronag

Zusätzlich zu der akzeptierten Antwort würde ich sagen, dass die Implementierung einer richtigen Warteschlange für Multiproduzenten/Multi-Consumer schwierig ist (obwohl dies seit C++ 11 einfacher ist).

Ich würde Ihnen empfehlen, die (sehr gute) lock-freie Boost-Bibliothek auszuprobieren , die "Queue" -Struktur wird tun, was Sie wollen, mit wartungsfreien/lock-freien Garantien und ohne die Notwendigkeit eines C + +11 Compiler .

Ich füge diese Antwort jetzt hinzu, weil die schlossfreie Bibliothek recht neu ist (seit 1.53, glaube ich).

10
quantdev

Ich würde Ihre Dequeue-Funktion wie folgt umschreiben:

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    while(q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::timeout ) 
           return std::string();
    }
    std::string ret = q.front();
    q.pop();
    return ret;
}

Es ist kürzer und hat keinen doppelten Code wie Sie. Nur das Problem kann länger als das Timeout warten. Um dies zu verhindern, müssen Sie sich die Startzeit vor der Schleife merken, überprüfen Sie das Zeitlimit und passen Sie die Wartezeit entsprechend an. Oder geben Sie die absolute Wartezeit an.

5
Slava

Es gibt auch eine GLib-Lösung für diesen Fall, ich habe es noch nicht ausprobiert, aber ich glaube, dass es eine gute Lösung ist . https://developer.gnome.org/glib/2.36/glib-Asynchronous-Queues) .html # g-async-queue-new

1
ransh

BlockingCollection ist eine sichere C++ 11-Thread-Collection, die Unterstützung für Warteschlangen-, Stapel- und Prioritätscontainer bereitstellt. Es behandelt das von Ihnen beschriebene "leere" Warteschlangen-Szenario. Sowie eine "volle" Warteschlange.

1
gm127

Sie mögen lfqueue, https://github.com/Taymindis/lfqueue . Die gleichzeitige Warteschlange für Sperren ist kostenlos Ich verwende es derzeit, um die Warteschlange von mehreren eingehenden Anrufen zu nutzen und funktioniert wie ein Zauber.

0
woon minika