diff --git a/include/ConcurrentQueue.hpp b/include/ConcurrentQueue.hpp index 3b2da6e..57e309e 100644 --- a/include/ConcurrentQueue.hpp +++ b/include/ConcurrentQueue.hpp @@ -1,7 +1,7 @@ #ifndef CONCURRENTQUEUE_HPP #define CONCURRENTQUEUE_HPP -#include +#include #include #include "Mutex.hpp" @@ -39,7 +39,7 @@ public: TRACE; ScopedLock sl(m_mutex); if (m_cancelled) throw CancelledException(); - m_queue.push( value ); + m_queue.push_back( value ); m_condVar.signal(); } @@ -52,7 +52,7 @@ public: if ( m_queue.empty() ) return false; popped_value = m_queue.front(); - m_queue.pop(); + m_queue.pop_front(); return true; } @@ -68,7 +68,7 @@ public: if (m_cancelled) throw CancelledException(); T retVal = m_queue.front(); // cctor - m_queue.pop(); + m_queue.pop_front(); return retVal; } @@ -82,20 +82,44 @@ public: } - void cancel() + void cancel(T a) { TRACE; ScopedLock sl(m_mutex); m_cancelled = true; m_condVar.broadcast(); + + m_queue.clear(); + } + + void cancel(T *a) + { + TRACE; + ScopedLock sl(m_mutex); + m_cancelled = true; + m_condVar.broadcast(); + + typename std::deque::iterator it; + for ( it = m_queue.begin(); it != m_queue.end(); ++it ) { + LOG( Logger::INFO, std::string("Deleting: ").append(*it).c_str() ); + delete *it; + } + m_queue.clear(); + } + + bool cancelled() + { + TRACE; + return m_cancelled; } private: + ConcurrentQueue& operator=( const ConcurrentQueue& ); ConcurrentQueue( const ConcurrentQueue& ); - std::queue m_queue; + std::deque m_queue; bool m_cancelled; mutable Mutex m_mutex; ConditionVariable m_condVar; diff --git a/include/ObjectPool.hpp b/include/ObjectPool.hpp index af6ed6b..8fdb63d 100644 --- a/include/ObjectPool.hpp +++ b/include/ObjectPool.hpp @@ -9,7 +9,9 @@ class ObjectPool { public: - ObjectPool() : m_pool() + ObjectPool() + : m_pool() + , m_numberOfUsedObjects(0) { TRACE; } @@ -19,29 +21,62 @@ public: TRACE; } + void add(const T object) // throws CancelledException + { + TRACE; + m_pool.push(object); + } - T acquire() + + T acquire() // throws CancelledException { TRACE; - return m_pool.waitAndPop(); + T tmp = m_pool.waitAndPop(); + m_numberOfUsedObjects++; + return tmp; } - void release(const T object) + void release(T object) { TRACE; + if ( m_pool.cancelled() ) { + m_numberOfUsedObjects--; + return; + } + m_pool.push(object); + m_numberOfUsedObjects--; } - bool empty() const + void release(T* object) { TRACE; - return m_pool.empty(); + if ( m_pool.cancelled() ) { + m_numberOfUsedObjects--; + delete object; + return; + } + + m_pool.push(object); + m_numberOfUsedObjects--; } + void clear(T a) + { + TRACE; + m_pool.cancel(a); + } + + void clear(T* a) + { + TRACE; + m_pool.cancel(a); + } private: ConcurrentQueue m_pool; + int m_numberOfUsedObjects; }; #endif // OBJECT_POOL_HPP diff --git a/include/Thread.hpp b/include/Thread.hpp index bb9957b..592d212 100644 --- a/include/Thread.hpp +++ b/include/Thread.hpp @@ -17,17 +17,15 @@ public: void sendSignal( const int nSignal ) const; bool isRunning() const; -private: - - virtual void* run() = 0; - static void* threadStarter( void* pData ); - protected: bool m_isRunning; private: + virtual void* run() = 0; + static void* threadStarter( void* pData ); + pthread_t m_threadHandler; }; diff --git a/src/MysqlConnectionPool.cpp b/src/MysqlConnectionPool.cpp index 07259bb..315901f 100644 --- a/src/MysqlConnectionPool.cpp +++ b/src/MysqlConnectionPool.cpp @@ -27,12 +27,5 @@ void MysqlConnectionPool::create() MysqlClient *client = new MysqlClient ( m_host, m_user, m_passwd, m_db ); client->connect(); - release(client); -} - -void MysqlConnectionPool::clear() -{ - TRACE; - while ( !empty() ) - delete acquire(); + add(client); } diff --git a/src/ThreadPool.cpp b/src/ThreadPool.cpp index 5cbb74a..2df53f1 100644 --- a/src/ThreadPool.cpp +++ b/src/ThreadPool.cpp @@ -61,7 +61,8 @@ void ThreadPool::stop() (*it)->stop(); } - m_tasks.cancel(); + /// @todo solve this! +// m_tasks.cancel( ); } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 56e6b87..b76249b 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -18,20 +18,21 @@ if(CXXTEST_FOUND) generated_main.cpp Fixture.hpp - test_ArgParse.hpp - test_Common.hpp - test_ConditionalVariable.hpp - test_Multiton.hpp - test_Mutex.hpp - test_ScopedLock.hpp - test_Semaphore.hpp - test_Singleton_DCLP.hpp - test_Singleton_call_once.hpp - # test_Singleton.hpp Cannot test private member, Ficture.hpp loads it - test_Singleton_meyers.hpp - test_Thread.hpp - test_ThreadPool.hpp - # test_Timer.hpp Takes too much time&buggy +# test_ArgParse.hpp +# test_Common.hpp +# test_ConditionalVariable.hpp +# test_Multiton.hpp +# test_Mutex.hpp + test_ObjectPool.hpp +# test_ScopedLock.hpp +# test_Semaphore.hpp +# test_Singleton_DCLP.hpp +# test_Singleton_call_once.hpp +# # test_Singleton.hpp Cannot test private member, Ficture.hpp loads it +# test_Singleton_meyers.hpp +# test_Thread.hpp +# test_ThreadPool.hpp +# # test_Timer.hpp Takes too much time&buggy ) target_link_libraries(testCppUtils CppUtils gcov) endif() diff --git a/test/test_ObjectPool.hpp b/test/test_ObjectPool.hpp new file mode 100644 index 0000000..64a8b05 --- /dev/null +++ b/test/test_ObjectPool.hpp @@ -0,0 +1,130 @@ +#include + +#include "Common.hpp" +#include "Fixture.hpp" + +#include "ObjectPool.hpp" +#include "Thread.hpp" + +class TestObjectPool : public CxxTest::TestSuite +{ + +public: + + void testBasic( void ) + { + TEST_HEADER; + + ObjectPool op; + + int a(1); + op.add(a); + + TS_ASSERT_EQUALS( op.acquire(), a ); + } + + void testPointers( void ) + { + TEST_HEADER; + + ObjectPool op; + + int *a = new int(1); + int *b = new int(2); + op.add(a); + op.add(b); + + TS_ASSERT_EQUALS( op.acquire(), a ); + TS_ASSERT_EQUALS( op.acquire(), b ); + + delete a; + delete b; + } + + +private: + + class ObjecPoolUserThread : public Thread + { + public: + ObjecPoolUserThread( ObjectPool &objectPool + ) + : m_objectPool(objectPool) + { + TRACE; + } + + private: + void* run() + { + TRACE; + int *a; + try { + a = m_objectPool.acquire(); + LOG( Logger::DEBUG, std::string("Acquired int: "). + append(TToStr(*a)).c_str() ); + } catch ( CancelledException ex ) { + LOG( Logger::DEBUG, "Cancelled while acquiring" ); + } + + sleep(1); + + try { + m_objectPool.release(a); + } catch ( CancelledException ex ) { + LOG( Logger::DEBUG, "Cancelled while releasing" ); + } + return 0; + } + + ObjectPool &m_objectPool; + }; + +public: + + void testCompetingThreads( void ) + { + TEST_HEADER; + + ObjectPool op; + + ObjecPoolUserThread t1(op); + ObjecPoolUserThread t2(op); + + int *a = new int(27); + op.add(a); + + t1.start(); + t2.start(); + + t1.join(); + t2.join(); + + delete a; + } + + +public: + + void testCleanUp( void ) + { + TEST_HEADER; + + ObjectPool cop; + int *a = new int(12); + int *b = new int(13); + int *c = new int(14); + cop.add(a); + cop.add(b); + cop.add(c); + +// ObjecPoolUserThread t1(cop); +// ObjecPoolUserThread t2(cop); + +// t1.start(); +// t2.start(); + + cop.clear(); + } + +}; diff --git a/test/test_Semaphore.hpp b/test/test_Semaphore.hpp index 036be00..c8d39b0 100644 --- a/test/test_Semaphore.hpp +++ b/test/test_Semaphore.hpp @@ -106,7 +106,6 @@ public: t1->join(); t2->join(); - sleep(1); delete t1; delete t2; }