ConcurrentDeque using c++-11 mutex, unique_lock and condition_variable

master
dmatetelki 9 years ago
parent 544be0dcf7
commit 11f1df7a99

@ -2,14 +2,10 @@
#define CONCURRENTQUEUE_HPP #define CONCURRENTQUEUE_HPP
#include <deque> #include <deque>
#include <algorithm> #include <mutex>
#include <type_traits> #include <condition_variable>
#include "Mutex.hpp"
#include "ConditionVariable.hpp"
#include "ScopedLock.hpp"
#include "Common.hpp"
#include "Logger.hpp"
class CancelledException {}; class CancelledException {};
@ -24,61 +20,47 @@ public:
: m_queue() : m_queue()
, m_cancelled(false) , m_cancelled(false)
, m_mutex() , m_mutex()
, m_condVar(m_mutex) , m_condVar()
{ {
TRACE; TRACE;
} }
ConcurrentDeque& operator=(const ConcurrentDeque&) = delete;
ConcurrentDeque(const ConcurrentDeque&) = delete;
~ConcurrentDeque() ~ConcurrentDeque()
{ {
TRACE; TRACE;
freeDeque();
} }
void push(const T value) void push(const T value)
{ {
TRACE; TRACE;
ScopedLock sl(m_mutex); std::unique_lock<std::mutex> lock(m_mutex);
if (m_cancelled) throw CancelledException();
m_queue.push_back( value );
m_condVar.signal();
}
bool tryPop(T &popped_value)
{
TRACE;
ScopedLock sl(m_mutex);
if (m_cancelled) throw CancelledException(); if (m_cancelled) throw CancelledException();
if ( m_queue.empty() ) return false; m_queue.push_back(value);
m_condVar.notify_one();
popped_value = m_queue.front();
m_queue.pop_front();
return true;
} }
T waitAndPop() T waitAndPop()
{ {
TRACE; TRACE;
ScopedLock sl(m_mutex); std::unique_lock<std::mutex> lock(m_mutex);
while (m_queue.empty() && !m_cancelled)
m_condVar.wait(lock);
while ( m_queue.empty() and not m_cancelled) {
m_condVar.wait();
}
if (m_cancelled) throw CancelledException(); if (m_cancelled) throw CancelledException();
T retVal = m_queue.front(); // cctor T retVal = m_queue.front();
m_queue.pop_front(); m_queue.pop_front();
return retVal; return retVal;
} }
bool empty() const bool empty() const
{ {
TRACE; TRACE;
ScopedLock sl(m_mutex); std::unique_lock<std::mutex> lock(m_mutex);
if (m_cancelled) throw CancelledException(); if (m_cancelled) throw CancelledException();
return m_queue.empty(); return m_queue.empty();
} }
@ -86,49 +68,17 @@ public:
void cancel() void cancel()
{ {
TRACE; TRACE;
ScopedLock sl(m_mutex); std::unique_lock<std::mutex> lock(m_mutex);
m_cancelled = true; m_cancelled = true;
m_condVar.broadcast(); m_condVar.notify_all();
freeDeque();
}
template<class U = T>
typename std::enable_if< std::is_pointer<U>::value >::type
freeDeque()
{
TRACE;
typename std::deque<T>::iterator it;
for ( it = m_queue.begin(); it != m_queue.end(); ++it )
delete *it;
m_queue.clear();
}
template<class U = T>
typename std::enable_if< !(std::is_pointer<U>::value) >::type
freeDeque()
{
TRACE;
m_queue.clear();
}
bool cancelled()
{
TRACE;
return m_cancelled;
} }
private: private:
ConcurrentDeque& operator=( const ConcurrentDeque& );
ConcurrentDeque( const ConcurrentDeque& );
std::deque<T> m_queue; std::deque<T> m_queue;
bool m_cancelled; bool m_cancelled;
mutable Mutex m_mutex; std::mutex m_mutex;
ConditionVariable m_condVar; std::condition_variable m_condVar;
}; };
#endif // CONCURRENTQUEUE_HPP #endif // CONCURRENTQUEUE_HPP

Loading…
Cancel
Save