From 11f1df7a99c0503d709391ed574a051973b90037 Mon Sep 17 00:00:00 2001 From: dmatetelki Date: Thu, 12 May 2016 15:32:18 +0200 Subject: [PATCH] ConcurrentDeque using c++-11 mutex, unique_lock and condition_variable --- lib/cpp_utils/ConcurrentDeque.hpp | 90 +++++++------------------------ 1 file changed, 20 insertions(+), 70 deletions(-) diff --git a/lib/cpp_utils/ConcurrentDeque.hpp b/lib/cpp_utils/ConcurrentDeque.hpp index a5908f7..a855911 100644 --- a/lib/cpp_utils/ConcurrentDeque.hpp +++ b/lib/cpp_utils/ConcurrentDeque.hpp @@ -2,14 +2,10 @@ #define CONCURRENTQUEUE_HPP #include -#include -#include - -#include "Mutex.hpp" -#include "ConditionVariable.hpp" -#include "ScopedLock.hpp" -#include "Common.hpp" +#include +#include +#include "Logger.hpp" class CancelledException {}; @@ -24,61 +20,47 @@ public: : m_queue() , m_cancelled(false) , m_mutex() - , m_condVar(m_mutex) + , m_condVar() { TRACE; } + ConcurrentDeque& operator=(const ConcurrentDeque&) = delete; + ConcurrentDeque(const ConcurrentDeque&) = delete; + ~ConcurrentDeque() { TRACE; - freeDeque(); } - void push(const T value) { TRACE; - ScopedLock sl(m_mutex); + std::unique_lock lock(m_mutex); if (m_cancelled) throw CancelledException(); - m_queue.push_back( value ); - m_condVar.signal(); + m_queue.push_back(value); + m_condVar.notify_one(); } - - bool tryPop(T &popped_value) - { - TRACE; - ScopedLock sl(m_mutex); - if (m_cancelled) throw CancelledException(); - if ( m_queue.empty() ) return false; - - popped_value = m_queue.front(); - m_queue.pop_front(); - return true; - } - - T waitAndPop() { TRACE; - ScopedLock sl(m_mutex); + std::unique_lock lock(m_mutex); + + while (m_queue.empty() && !m_cancelled) + m_condVar.wait(lock); - while ( m_queue.empty() and not m_cancelled) { - m_condVar.wait(); - } if (m_cancelled) throw CancelledException(); - T retVal = m_queue.front(); // cctor + T retVal = m_queue.front(); m_queue.pop_front(); return retVal; } - bool empty() const { TRACE; - ScopedLock sl(m_mutex); + std::unique_lock lock(m_mutex); if (m_cancelled) throw CancelledException(); return m_queue.empty(); } @@ -86,49 +68,17 @@ public: void cancel() { TRACE; - ScopedLock sl(m_mutex); + std::unique_lock lock(m_mutex); m_cancelled = true; - m_condVar.broadcast(); - freeDeque(); - } - - template - typename std::enable_if< std::is_pointer::value >::type - freeDeque() - { - TRACE; - typename std::deque::iterator it; - for ( it = m_queue.begin(); it != m_queue.end(); ++it ) - delete *it; - - m_queue.clear(); - } - - template - typename std::enable_if< !(std::is_pointer::value) >::type - freeDeque() - { - TRACE; - m_queue.clear(); - } - - bool cancelled() - { - TRACE; - return m_cancelled; + m_condVar.notify_all(); } private: - - ConcurrentDeque& operator=( const ConcurrentDeque& ); - ConcurrentDeque( const ConcurrentDeque& ); - std::deque m_queue; bool m_cancelled; - mutable Mutex m_mutex; - ConditionVariable m_condVar; - + std::mutex m_mutex; + std::condition_variable m_condVar; }; #endif // CONCURRENTQUEUE_HPP