PThreadWrappers return with the retval of called pthread. Task became an interface. WorkerThread moved to test_ThreadPool.hpp. Thread has stop() and m_isRunning members. ThreadPool is now Task and Thread type independent. No more assertions in PThreadWrappers. Unittests refactored. test dir cleaned up.

master
Denes Matetelki 14 years ago
parent 123b9fca17
commit 01cde4928d

@ -12,9 +12,9 @@ public:
ConditionVariable(Mutex& mutex); ConditionVariable(Mutex& mutex);
~ConditionVariable(); ~ConditionVariable();
void wait(const int interval = 0); int wait(const int interval = 0);
void signal(); int signal();
void broadcast(); int broadcast();
private: private:

@ -11,8 +11,8 @@ public:
Mutex(int kind = PTHREAD_MUTEX_DEFAULT); Mutex(int kind = PTHREAD_MUTEX_DEFAULT);
~Mutex(); ~Mutex();
void lock(); int lock();
void unlock(); int unlock();
bool tryLock(const int interval = 0); bool tryLock(const int interval = 0);
pthread_mutex_t* getPThreadMutex(); pthread_mutex_t* getPThreadMutex();

@ -8,13 +8,10 @@ class Task
public: public:
Task(); virtual void run () = 0;
virtual bool isItStucked () const = 0;
void run (); protected:
bool isItStucked () const;
private:
time_t m_startedToRun; time_t m_startedToRun;
time_t m_timeOut; time_t m_timeOut;

@ -13,6 +13,7 @@ class Thread
void start(); void start();
void* join() const; void* join() const;
void stop();
void sendSignal( const int nSignal ) const; void sendSignal( const int nSignal ) const;
private: private:
@ -20,10 +21,13 @@ class Thread
virtual void* run() = 0; virtual void* run() = 0;
static void* threadStarter( void* pData ); static void* threadStarter( void* pData );
private: protected:
bool m_isRunning;
pthread_t m_nThread; private:
pthread_t m_threadHandler;
}; };

@ -4,26 +4,25 @@
#include <vector> #include <vector>
#include "ConcurrentQueue.hpp" #include "ConcurrentQueue.hpp"
#include "WorkerThread.hpp"
#include "Task.hpp" #include "Task.hpp"
#include "Thread.hpp"
#include "Mutex.hpp" #include "Mutex.hpp"
class WorkerThread;
class ThreadPool class ThreadPool
{ {
public: public:
ThreadPool( const int threadNum ); ThreadPool( const int threadNum );
virtual ~ThreadPool(); ~ThreadPool();
void startWorkerThreads();
virtual void pushTask( Task* task ); void pushTask( Task* task );
Task* popTask(); Task* popTask();
void pushWorkerThread( Thread * thread);
void startWorkerThreads();
void stop(); void stop();
void join() const; void join() const;
@ -33,7 +32,7 @@ class ThreadPool
ThreadPool& operator=( const ThreadPool& ); ThreadPool& operator=( const ThreadPool& );
int m_threadNum; int m_threadNum;
std::vector<WorkerThread*> m_threads; std::vector<Thread*> m_threads;
ConcurrentQueue<Task*> m_tasks; ConcurrentQueue<Task*> m_tasks;
// Mutex m_mutex; // Mutex m_mutex;
}; };

@ -1,26 +0,0 @@
#ifndef WORKER_THREAD_HPP
#define WORKER_THREAD_HPP
#include "Thread.hpp"
#include "ThreadPool.hpp"
class ThreadPool;
class WorkerThread : public Thread
{
public:
WorkerThread( ThreadPool& tp );
void stop();
private:
void* run();
ThreadPool& m_tp;
bool m_isRunning;
};
#endif // WORKER_THREAD_HPP

@ -8,26 +8,23 @@ ConditionVariable::ConditionVariable(Mutex& mutex)
: m_mutex(mutex) : m_mutex(mutex)
{ {
TRACE(this); TRACE(this);
int ret = pthread_cond_init( &m_condVar, 0 ); pthread_cond_init( &m_condVar, 0 );
assert( ret == 0);
} }
ConditionVariable::~ConditionVariable() ConditionVariable::~ConditionVariable()
{ {
TRACE(this); TRACE(this);
int ret = pthread_cond_destroy( &m_condVar ); pthread_cond_destroy( &m_condVar );
assert( ret == 0);
} }
void ConditionVariable::wait(const int interval) int ConditionVariable::wait(const int interval)
{ {
TRACE(this); TRACE(this);
if ( interval == 0 ) { if ( interval == 0 ) {
int ret = pthread_cond_wait( &m_condVar, return pthread_cond_wait( &m_condVar,
m_mutex.getPThreadMutex() ); m_mutex.getPThreadMutex() );
assert( ret == 0);
} else { } else {
timespec abs_time; timespec abs_time;
clock_gettime ( CLOCK_REALTIME, &abs_time ); clock_gettime ( CLOCK_REALTIME, &abs_time );
@ -36,23 +33,20 @@ void ConditionVariable::wait(const int interval)
abs_time.tv_nsec -= 1000000000; abs_time.tv_nsec -= 1000000000;
abs_time.tv_sec += 1; abs_time.tv_sec += 1;
} }
int ret = pthread_cond_timedwait( &m_condVar, return pthread_cond_timedwait( &m_condVar,
m_mutex.getPThreadMutex(), m_mutex.getPThreadMutex(),
&abs_time ); &abs_time );
assert( ret == 0);
} }
} }
void ConditionVariable::signal() int ConditionVariable::signal()
{ {
TRACE(this); TRACE(this);
int ret = pthread_cond_signal( &m_condVar ); return pthread_cond_signal( &m_condVar );
assert( ret == 0);
} }
void ConditionVariable::broadcast() int ConditionVariable::broadcast()
{ {
TRACE(this); TRACE(this);
int ret = pthread_cond_broadcast( &m_condVar ); return pthread_cond_broadcast( &m_condVar );
assert( ret == 0);
} }

@ -1,47 +1,41 @@
#include "Mutex.hpp" #include "Mutex.hpp"
#include "Common.hpp" #include "Common.hpp"
#include <assert.h>
#include <time.h> #include <time.h>
Mutex::Mutex(int kind) Mutex::Mutex(int kind)
{ {
TRACE(this); TRACE(this);
int ret;
if ( kind == PTHREAD_MUTEX_DEFAULT ) { if ( kind == PTHREAD_MUTEX_DEFAULT ) {
ret = pthread_mutex_init( &m_mutex, 0 ); pthread_mutex_init( &m_mutex, 0 );
} else { } else {
pthread_mutexattr_t attr; pthread_mutexattr_t attr;
pthread_mutexattr_init( &attr ); pthread_mutexattr_init( &attr );
pthread_mutexattr_settype( &attr, kind ); pthread_mutexattr_settype( &attr, kind );
ret = pthread_mutex_init( &m_mutex, &attr ); pthread_mutex_init( &m_mutex, &attr );
} }
assert( ret == 0 );
} }
Mutex::~Mutex() Mutex::~Mutex()
{ {
TRACE(this); TRACE(this);
int ret = pthread_mutex_destroy ( &m_mutex ); pthread_mutex_destroy ( &m_mutex );
assert( ret == 0);
} }
void Mutex::lock() int Mutex::lock()
{ {
TRACE(this); TRACE(this);
int ret = pthread_mutex_lock( &m_mutex ); return pthread_mutex_lock( &m_mutex );
assert( ret == 0);
} }
void Mutex::unlock() int Mutex::unlock()
{ {
TRACE(this); TRACE(this);
int ret = pthread_mutex_unlock ( &m_mutex ); return pthread_mutex_unlock ( &m_mutex );
assert( ret == 0);
} }

@ -1,30 +0,0 @@
#include "Task.hpp"
#include <iostream>
#include "Common.hpp"
Task::Task() : m_timeOut(5)
{
TRACE(this);
}
void Task::run()
{
TRACE(this);
m_startedToRun = time(NULL);
std::cout << "I'm a task..." << std::endl;
// other stuff
m_startedToRun = 0;
}
bool Task::isItStucked () const
{
TRACE(this);
return ( m_startedToRun + m_timeOut < time(NULL) );
}

@ -4,7 +4,10 @@
#include <signal.h> #include <signal.h>
#include <iostream> #include <iostream>
Thread::Thread() : m_nThread( 0 ) Thread::Thread()
: m_isRunning(false)
, m_threadHandler( 0 )
{ {
TRACE(this); TRACE(this);
} }
@ -19,7 +22,8 @@ Thread::~Thread()
void Thread::start() void Thread::start()
{ {
TRACE(this); TRACE(this);
pthread_create( &m_nThread, NULL, threadStarter, ( void* )this ); m_isRunning = true;
pthread_create( &m_threadHandler, NULL, threadStarter, ( void* )this );
} }
@ -27,15 +31,22 @@ void* Thread::join() const
{ {
TRACE(this); TRACE(this);
void* retVal; void* retVal;
pthread_join( m_nThread, &retVal ); pthread_join( m_threadHandler, &retVal );
return retVal; return retVal;
} }
void Thread::stop()
{
TRACE(this);
m_isRunning = false;
}
void Thread::sendSignal( const int nSignal ) const void Thread::sendSignal( const int nSignal ) const
{ {
TRACE(this); TRACE(this);
pthread_kill( m_nThread, nSignal ); pthread_kill( m_threadHandler, nSignal );
} }

@ -2,7 +2,7 @@
#include "Common.hpp" #include "Common.hpp"
ThreadPool::ThreadPool( const int threadNum ) : m_threadNum( threadNum ) ThreadPool::ThreadPool()
{ {
TRACE(this); TRACE(this);
} }
@ -10,7 +10,7 @@ ThreadPool::ThreadPool( const int threadNum ) : m_threadNum( threadNum )
ThreadPool::~ThreadPool() ThreadPool::~ThreadPool()
{ {
TRACE(this); TRACE(this);
std::vector<WorkerThread*>::iterator it; std::vector<Thread*>::iterator it;
for( it = m_threads.begin() ; it != m_threads.end(); it++ ) for( it = m_threads.begin() ; it != m_threads.end(); it++ )
{ {
delete (*it); delete (*it);
@ -24,6 +24,7 @@ void ThreadPool::pushTask( Task* task )
m_tasks.push(task); m_tasks.push(task);
} }
Task* ThreadPool::popTask() Task* ThreadPool::popTask()
{ {
TRACE(this); TRACE(this);
@ -31,21 +32,28 @@ Task* ThreadPool::popTask()
} }
void ThreadPool::pushWorkerThread( Thread * thread)
{
TRACE(this);
m_threads.push_back( thread );
}
void ThreadPool::startWorkerThreads() void ThreadPool::startWorkerThreads()
{ {
TRACE(this); TRACE(this);
for( int i = 0; i<m_threadNum; i++ ) std::vector<Thread*>::iterator it;
for( it = m_threads.begin() ; it != m_threads.end(); it++ )
{ {
WorkerThread* t = new WorkerThread ( *this ); (*it)->start();
m_threads.push_back( t );
t->start();
} }
} }
void ThreadPool::stop() void ThreadPool::stop()
{ {
TRACE(this); TRACE(this);
std::vector<WorkerThread*>::iterator it; std::vector<Thread*>::iterator it;
for( it = m_threads.begin() ; it != m_threads.end(); it++ ) for( it = m_threads.begin() ; it != m_threads.end(); it++ )
{ {
(*it)->stop(); (*it)->stop();
@ -58,7 +66,7 @@ void ThreadPool::stop()
void ThreadPool::join() const void ThreadPool::join() const
{ {
TRACE(this); TRACE(this);
std::vector<WorkerThread*>::const_iterator it; std::vector<Thread*>::const_iterator it;
for( it = m_threads.begin() ; it != m_threads.end(); it++ ) for( it = m_threads.begin() ; it != m_threads.end(); it++ )
{ {
(*it)->join(); (*it)->join();

@ -1,40 +0,0 @@
#include "WorkerThread.hpp"
#include "Task.hpp"
#include "Common.hpp"
WorkerThread::WorkerThread( ThreadPool& tp )
: m_tp(tp)
, m_isRunning(true)
{
TRACE(this);
}
void WorkerThread::stop()
{
TRACE(this);
m_isRunning = false;
}
void* WorkerThread::run()
{
TRACE(this);
while ( m_isRunning )
{
Task* task(0);
try {
task = m_tp.popTask();
task->run();
delete task;
} catch (CancelledException) {
std::cout << "Now I die." << std::endl;
}
}
return 0;
}

@ -6,9 +6,15 @@ if(CXXTEST_FOUND)
set(CXXTEST_USE_PERL TRUE) set(CXXTEST_USE_PERL TRUE)
include_directories(${CXXTEST_INCLUDE_DIR} ../include) include_directories(${CXXTEST_INCLUDE_DIR} ../include)
enable_testing() enable_testing()
CXXTEST_ADD_TEST(test generated_main.cpp CXXTEST_ADD_TEST(test
test_threadpool.hpp generated_main.cpp
test_Singelton.hpp test_Singelton.hpp
test_Mutex.hpp
test_ScopedLock.hpp
test_ConditionalVariable.hpp
test_Thread.hpp
test_ThreadPool.hpp
) )
target_link_libraries(test CppUtils gcov) target_link_libraries(test CppUtils gcov)
endif() endif()

@ -1,41 +0,0 @@
// g++ -Wall -Wextra src/*.cpp test/main_Mutex.cpp -Iinclude -lpthread -lrt
#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"
Mutex m;
int counter = 0;
void *functionC( void* params )
{
m.lock();
counter++;
std::cout << "Counter value: " << counter << std::endl;
m.unlock();
return 0;
}
int main()
{
pthread_t thread1, thread2;
pthread_create( &thread1, 0, &functionC, 0 );
pthread_create( &thread2, 0, &functionC, 0 );
pthread_join( thread1, 0 );
pthread_join( thread2, 0 );
Mutex m2(PTHREAD_MUTEX_ERRORCHECK);
m2.lock();
m2.unlock();
m2.unlock();
m2.unlock();
return 0;
}

@ -1,40 +0,0 @@
// g++ -Wall -Wextra src/*.cpp test/main_Mutex.cpp -Iinclude -lpthread -lrt
#include "ScopedLock.hpp"
#include "Mutex.hpp"
#include "Common.hpp"
#include <stdexcept>
class User
{
public:
User() : m_mutex() {
TRACE(this);
}
~User() {
TRACE(this);
}
void fv() {
TRACE(this);
ScopedLock sl(&m_mutex);
throw std::logic_error("whoops");
}
private:
Mutex m_mutex;
};
int main()
{
User u;
try {
u.fv();
} catch (std::logic_error ex) {
std::cout << "std::logicexception: " << ex.what() << std::endl;
}
return 0;
}

@ -1,110 +0,0 @@
#include <iostream>
#include "../ConcurrentQueue.hpp"
#include "../Thread.hpp"
#define TRACE std::cout << __FILE__ << " @ " << __PRETTY_FUNCTION__ << ":" << __LINE__ << std::endl;
class WaitingThread : public Thread
{
public:
WaitingThread (ConcurrentQueue<int>& cq)
: m_isRunning( true ),
m_cq(cq)
{
TRACE
}
~WaitingThread()
{
TRACE
}
void stop ()
{
TRACE
m_isRunning = false;
}
private:
void* run ()
{
TRACE
while ( m_isRunning )
{
std::cout << "waiting..." << std::endl;
int retval = m_cq.waitAndPop();
std::cout << "waiting...ENDED! Got retval: " << retval << std::endl;
}
return 0;
}
bool m_isRunning;
ConcurrentQueue<int>& m_cq;
};
class fake_type
{
public:
fake_type(int i) : m_int(i)
{
TRACE
}
~fake_type()
{
TRACE
}
fake_type( const fake_type& m)
{
TRACE
m_int = m.m_int;
}
fake_type& operator=( const fake_type& m)
{
TRACE
m_int = m.m_int;
return *this;
}
private:
int m_int;
};
int main()
{
ConcurrentQueue<fake_type> cq1;
cq1.push(fake_type(7));
fake_type m = cq1.waitAndPop();
// other case
ConcurrentQueue<int> cq2;
cq2.push(7);
cq2.push(13);
WaitingThread* wt1 = new WaitingThread(cq2);
wt1->start();
sleep(2);
cq2.push(34);
sleep(5);
wt1->stop();
sleep(5);
return 0;
}

@ -1,85 +0,0 @@
#include <iostream>
#include <stdexcept>
#include <pthread.h>
pthread_mutex_t p_mutex;
// #include <boost/thread/mutex.hpp>
// boost::mutex b_mutex;
#define TRACE std::cout << __FILE__ << " @ " << __PRETTY_FUNCTION__ << ":" << __LINE__ << std::endl;
class ScopedLock
{
public:
ScopedLock(pthread_mutex_t & mutex) : m_mutex(mutex)
{
TRACE;
pthread_mutex_lock( &m_mutex );
}
~ScopedLock(void )
{
TRACE;
pthread_mutex_unlock( &m_mutex );
}
private:
pthread_mutex_t& m_mutex;
};
void fv ()
{
ScopedLock sl(p_mutex);
throw std::logic_error("p_thread stuff");
}
// void fv2 ()
// {
// boost::mutex::scoped_lock lock(&b_mutex);
//
// throw std::logic_error("boost_thread stuff");
// }
int main()
{
pthread_mutex_init( &p_mutex, NULL );
try {
fv();
} catch (...) {
TRACE;
if (pthread_mutex_trylock( &p_mutex) == 0 ) {
// std::cout << "pthread mutex is OK, unlocked " << std::endl;
// pthread_mutex_unlock( &p_mutex );
} else {
// std::cout << "pthread mutex is STILL LOCKED!" << std::endl;
}
}
pthread_mutex_destroy( &p_mutex );
// try {
// fv2();
// } catch (...) {
// TRACE;
// if ( b_mutex.try_lock() == true ) {
// std::cout << "boost mutex is OK, unlocked " << std::endl;
// b_mutex.unlock();
// } else {
// std::cout << "boost mutex is STILL LOCKED!" << std::endl;
// }
// }
return 0;
}

@ -1,206 +0,0 @@
#include <iostream>
#ifndef NOTRACE
#define TRACE(x) std::cout << x << " " << __PRETTY_FUNCTION__ << \
" : " << __LINE__ << std::endl
#else
/// @todo Get rid of the "warning: statement has no effect" compiler msgs
#define TRACE(x) ""
#endif
#include <assert.h>
#include <pthread.h>
#include <stdexcept>
#include <time.h>
class Mutex
{
public:
Mutex() {
TRACE(this);
int ret = pthread_mutex_init( &m_mutex, 0 );
// assert( ret == 0 );
}
~Mutex() {
TRACE(this);
int ret = pthread_mutex_destroy ( &m_mutex );
// assert( ret == 0 );
}
void lock() {
TRACE(this);
int ret = pthread_mutex_lock( &m_mutex );
// assert( ret == 0 );
}
void unlock() {
TRACE(this);
int ret = pthread_mutex_unlock( &m_mutex );
// assert( ret == 0 );
}
// private:
Mutex(const Mutex& m){
TRACE(this);
}
Mutex& operator=(const Mutex& m) {
TRACE(this);
return *this;
}
int tryLock() {
TRACE(this);
// return pthread_mutex_trylock(&m_mutex);
int interval = 1;
timespec abs_time;
clock_gettime ( CLOCK_REALTIME, &abs_time );
abs_time.tv_nsec += interval * 1000000;
if ( abs_time.tv_nsec >= 1000000000 ) {
abs_time.tv_nsec -= 1000000000;
abs_time.tv_sec += 1;
}
return pthread_mutex_timedlock ( &m_mutex, &abs_time );
}
private:
pthread_mutex_t m_mutex;
};
class BadScopedLock
{
public:
BadScopedLock(Mutex& m)
: m_mutex(m) /// @note cctor called!
{
TRACE(this);
m_mutex.lock();
}
~BadScopedLock() {
TRACE(this);
m_mutex.unlock();
}
private:
BadScopedLock(const BadScopedLock&);
BadScopedLock& operator=(const BadScopedLock&);
Mutex& m_mutex;
};
class UnluckyUser
{
public:
UnluckyUser() : m_mutex() {
TRACE(this);
}
~UnluckyUser() {
TRACE(this);
}
void fv() {
TRACE(this);
BadScopedLock sl(m_mutex);
throw std::logic_error("whoops");
}
int tryLock() {
TRACE(this);
return m_mutex.tryLock();
}
private:
Mutex m_mutex;
};
// class GoodScopedLock
// {
// public:
//
// GoodScopedLock(Mutex* m) : m_mutex(m)
// {
// TRACE(this);
// m_mutex->lock();
// }
// ~GoodScopedLock() {
// TRACE(this);
// m_mutex->unlock();
// }
//
// private:
// GoodScopedLock(const GoodScopedLock&);
// GoodScopedLock& operator=(const GoodScopedLock&);
//
// Mutex* m_mutex;
// };
/*
class LuckyUser
{
public:
LuckyUser() : m_mutex() {
TRACE(this);
}
~LuckyUser() {
TRACE(this);
}
void fv() {
TRACE(this);
GoodScopedLock sl(&m_mutex);
throw std::logic_error("whoops");
}
int tryLock() {
TRACE(this);
return m_mutex.tryLock();
}
private:
Mutex m_mutex;
};*/
int main()
{
TRACE("main begin");
// pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
// // pthread_mutex_t mutex = PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP;
// std::cout << "bla1" << std::endl;
// std::cout << pthread_mutex_unlock( &mutex )<< std::endl;
// std::cout << pthread_mutex_trylock( &mutex ) << std::endl;
// std::cout << "bla2" << std::endl;
// // std::cout << pthread_mutex_unlock( &mutex ) << std::endl;
//
// std::cout << "bla3" << std::endl;
//
// std::cout << pthread_mutex_lock( &mutex ) << std::endl;
// std::cout << pthread_mutex_trylock( &mutex ) << std::endl;
// // std::cout << pthread_mutex_lock( &mutex ) << std::endl;
UnluckyUser u;
try {
u.fv();
} catch (std::logic_error ex) {
std::cout << "std::logicexception: " << ex.what() << std::endl;
if (u.tryLock() == 0) {
std::cout << "UnluckyUser: Ok, mutex is unlocked" << std::endl;
} else {
std::cout << "UnluckyUser: Failed, mutex is still locked" << std::endl;
}
}
// TRACE("main middle");
//
// LuckyUser u2;
// try {
// u2.fv();
// } catch (std::logic_error ex) {
// std::cout << "std::logicexception: " << ex.what() << std::endl;
// if (u2.tryLock() == 0) {
// std::cout << "LuckyUser: Ok, mutex is unlocked" << std::endl;
// } else {
// std::cout << "LuckyUser: Failed, mutex is still locked" << std::endl;
// }
// }
TRACE("main end");
return 0;
}

@ -1,87 +0,0 @@
#include <iostream>
#include <signal.h> // signal to thread
#include <stdlib.h> // malloc
// #include <string.h>
#include "Thread.hpp"
#include "Common.hpp"
class ThreadClass : public Thread
{
public:
~ThreadClass() { TRACE(this); }
ThreadClass() { TRACE(this); }
private:
void* run( void ) {
TRACE(this);
void *retval = malloc(sizeof(int));
*((int*)retval) = 14;
return retval;
return 0;
}
};
class ThreadClassWithSignal : public Thread
{
public:
~ThreadClassWithSignal() {
TRACE(this);
}
ThreadClassWithSignal() {
TRACE(this);
signal(SIGINT, signal_handler);
}
private:
void* run( void ) {
TRACE(this);
void *retval = malloc(sizeof(int));
*((int*)retval) = 14;
return retval;
return 0;
}
void static signal_handler(int sig)
{
TRACE("ThreadClassWithSignal::signal_handler");
if (sig==SIGINT) {
TRACE("signal_handler got SIGINT");
pthread_exit(0);
}
}
};
int main()
{
TRACE("main start");
ThreadClass *m = new ThreadClass;
m->start();
void *retVal = m->join();
std::cout << "got retVal: " << *((int*)retVal) << std::endl;
free(retVal);
delete m;
ThreadClass *m2 = new ThreadClass;
m2->start();
m2->sendSignal(SIGINT);
sleep(3);
std::cout << "after sendSignal the:" << m2 <<std::endl;
delete m2;
TRACE("main end");
return 0;
}

@ -1,30 +0,0 @@
#include <iostream>
#include <signal.h>
#include <stdlib.h>
#include <string.h>
#include "ThreadPool.hpp"
#include "Task.hpp"
#include "Common.hpp"
int main()
{
TRACE("Main start");
ThreadPool* tp = new ThreadPool(5);
tp->startWorkerThreads();
Task* t1 = new Task();
tp->pushTask(t1);
sleep(2);
tp->stop();
tp->join();
delete tp;
TRACE("Main end");
return 0;
}

@ -0,0 +1,19 @@
#include <cxxtest/TestSuite.h>
#include "Mutex.hpp"
#include "ConditionVariable.hpp"
class TestConditionVariable : public CxxTest::TestSuite
{
public:
void testBasic( void )
{
Mutex m;
ConditionVariable c(m);
}
};

@ -0,0 +1,47 @@
#include <cxxtest/TestSuite.h>
#include "Common.hpp"
#include "Mutex.hpp"
#include <errno.h> // EDEADLK, EPERM
class TestMutex : public CxxTest::TestSuite
{
public:
void testBasic( void )
{
Mutex m;
TS_ASSERT_EQUALS ( m.lock() , 0 );
// TS_ASSERT_EQUALS ( m.lock() , 0 ); that would be a deadlock
TS_ASSERT_EQUALS ( m.tryLock(0), false );
TS_ASSERT_EQUALS ( m.tryLock(2), false );
TS_ASSERT_EQUALS ( m.unlock() , 0 );
TS_ASSERT_EQUALS ( m.unlock() , 0 );
}
void testErrorCheck( void )
{
Mutex m(PTHREAD_MUTEX_ERRORCHECK);
TS_ASSERT_EQUALS ( m.lock() , 0 );
TS_ASSERT_EQUALS ( m.lock(), EDEADLK );
TS_ASSERT_EQUALS ( m.unlock() , 0 );
TS_ASSERT_EQUALS ( m.unlock() , EPERM );
}
void testRecursive( void )
{
Mutex m(PTHREAD_MUTEX_RECURSIVE);
TS_ASSERT_EQUALS ( m.lock() , 0 );
TS_ASSERT_EQUALS ( m.lock() , 0 );
TS_ASSERT_EQUALS ( m.unlock() , 0 );
TS_ASSERT_EQUALS ( m.unlock() , 0 );
TS_ASSERT_EQUALS ( m.unlock() , EPERM );
}
};

@ -0,0 +1,32 @@
#include <cxxtest/TestSuite.h>
#define private public // need to reach private variables
#include "Common.hpp"
#include "Mutex.hpp"
class TestPThreadWrappers : public CxxTest::TestSuite
{
public:
void testMutexBasic( void )
{
Mutex m;
m.lock();
TS_ASSERT_EQUALS ( m.tryLock(0), 0 );
m.unlock();
}
void testMutexCreate( void )
{
Mutex m(PTHREAD_MUTEX_ERRORCHECK);
m.lock();
TS_ASSERT_EQUALS ( m.lock(), 1 );
m.unlock();
}
};

@ -0,0 +1,23 @@
#include <cxxtest/TestSuite.h>
#include "ScopedLock.hpp"
class TestScopedLock : public CxxTest::TestSuite
{
public:
void testBasic( void )
{
Mutex m;
{
ScopedLock sl(m);
TS_ASSERT_EQUALS ( m.tryLock(0), false );
}
TS_ASSERT_EQUALS ( m.tryLock(0), true );
}
};

@ -1,8 +1,3 @@
// gpp ./generated_main.cpp ../src/*.cpp -I../include -lpthread -lrt
#include <cxxtest/TestSuite.h> #include <cxxtest/TestSuite.h>
#define private public // need to reach Singleton's private m_instance #define private public // need to reach Singleton's private m_instance

@ -0,0 +1,116 @@
#include <cxxtest/TestSuite.h>
#include "Thread.hpp"
#include "Common.hpp"
#include "Mutex.hpp"
#include "ScopedLock.hpp"
#include <stdlib.h> // malloc
#include <signal.h> // SIGINT
class TestThread : public CxxTest::TestSuite
{
private:
class ThreadClass : public Thread
{
public:
ThreadClass() { TRACE(this); }
~ThreadClass() { TRACE(this); }
private:
ThreadClass(const ThreadClass&) { TRACE(this); }
ThreadClass& operator=(const ThreadClass&) { TRACE(this); return*this; }
void* run( void ) {
TRACE(this);
void* retVal = malloc(sizeof(int));
*((int*)retVal) = 14;
return retVal;
}
};
public:
void testBasic( void )
{
ThreadClass *m = new ThreadClass;
m->start();
void *retVal = m->join();
TS_ASSERT_EQUALS ( *((int*)retVal) , 14 );
free(retVal);
delete m;
}
/**
* @note send a signal to a thread
*/
private:
class ThreadClassWithSignal : public Thread
{
public:
ThreadClassWithSignal() {
TRACE(this);
signal(SIGINT, signal_handler);
}
~ThreadClassWithSignal() {
TRACE(this);
}
private:
ThreadClassWithSignal(const ThreadClassWithSignal&) { TRACE(this); }
ThreadClassWithSignal& operator=(const ThreadClassWithSignal&) { TRACE(this); return*this; }
void* run( void ) {
TRACE(this);
/** @note the function will get stopped before it finishes sleeping
* If signal arrives after malloc, it will be a memory leak.
*/
sleep(32);
void* retVal = malloc(sizeof(int));
*((int*)retVal) = 15;
return retVal;
}
static void signal_handler(int sig)
{
TRACE("ThreadClassWithSignal::signal_handler");
if (sig==SIGINT) {
TRACE("signal_handler got SIGINT");
void* retVal = malloc(sizeof(int));
*((int*)retVal) = 16;
pthread_exit(retVal);
}
}
};
public:
void testSignalSend( void )
{
ThreadClassWithSignal *m2 = new ThreadClassWithSignal;
m2->start();
m2->sendSignal(SIGINT);
sleep(3);
void *retVal = m2->join();
TS_ASSERT_EQUALS ( *((int*)retVal) , 16 );
free(retVal);
delete m2;
}
};

@ -0,0 +1,97 @@
#include <cxxtest/TestSuite.h>
// #include <time.h> // time
#include "Task.hpp"
#include "Thread.hpp"
#include "ThreadPool.hpp"
#include "Common.hpp"
class TestThreadPoolSuite : public CxxTest::TestSuite
{
class DummyTask : public Task
{
public:
DummyTask() { m_timeOut = 5; TRACE(this); }
void run()
{
TRACE(this);
m_startedToRun = time(NULL);
TRACE("I'm a task...");
m_startedToRun = 0;
}
bool isItStucked () const
{
TRACE(this);
return ( m_startedToRun + m_timeOut < time(NULL) );
}
};
class WorkerThread : public Thread
{
public:
WorkerThread( ThreadPool& tp )
: m_tp(tp)
{
TRACE(this);
}
private:
void* run()
{
TRACE(this);
while ( m_isRunning )
{
Task* task(0);
try {
task = m_tp.popTask();
task->run();
delete task;
} catch (CancelledException) {
TRACE("Now I die.");
}
}
return 0;
}
ThreadPool& m_tp;
};
public:
void testBasic()
{
ThreadPool* tp = new ThreadPool();
Thread* wt1 = new WorkerThread(*tp);
Thread* wt2 = new WorkerThread(*tp);
Thread* wt3 = new WorkerThread(*tp);
tp->pushWorkerThread(wt1);
tp->pushWorkerThread(wt2);
tp->pushWorkerThread(wt3);
tp->startWorkerThreads();
Task* t1 = new DummyTask();
tp->pushTask(t1);
Task* t2 = new DummyTask();
tp->pushTask(t2);
sleep(2);
tp->stop();
tp->join();
delete tp;
}
};

@ -1,13 +1,79 @@
#include <cxxtest/TestSuite.h> #include <cxxtest/TestSuite.h>
#include "ThreadPool.hpp" // #include <time.h> // time
#include "Task.hpp" #include "Task.hpp"
#include "Thread.hpp"
#include "ThreadPool.hpp"
#include "Common.hpp" #include "Common.hpp"
class TestThreadPoolSuite : public CxxTest::TestSuite class TestThreadPoolSuite : public CxxTest::TestSuite
{ {
class DummyTask : public Task
{
public:
DummyTask() { m_timeOut = 5; TRACE(this); }
void run()
{
TRACE(this);
m_startedToRun = time(NULL);
std::cout << "I'm a task..." << std::endl;
// other stuff
m_startedToRun = 0;
}
bool isItStucked () const
{
TRACE(this);
return ( m_startedToRun + m_timeOut < time(NULL) );
}
};
class WorkerThread : public Thread
{
public:
WorkerThread( ThreadPool& tp )
: m_tp(tp)
{
TRACE(this);
}
private:
void* run()
{
TRACE(this);
while ( m_isRunning )
{
Task* task(0);
try {
task = m_tp.popTask();
task->run();
delete task;
} catch (CancelledException) {
std::cout << "Now I die." << std::endl;
}
}
return 0;
}
ThreadPool& m_tp;
bool m_isRunning;
};
public: public:
void testBasic() void testBasic()
@ -15,9 +81,15 @@ public:
TRACE("testBasic begin"); TRACE("testBasic begin");
ThreadPool* tp = new ThreadPool(5); ThreadPool* tp = new ThreadPool(5);
Thread* wt1 = new WorkerThread(*tp);
// Thread* wt2 = new WorkerThread(*tp);
// Thread* wt3 = new WorkerThread(*tp);
tp->pushWorkerThread(wt1);
// tp->pushWorkerThread(wt2);
// tp->pushWorkerThread(wt3);
tp->startWorkerThreads(); tp->startWorkerThreads();
Task* t1 = new Task(); Task* t1 = new DummyTask();
tp->pushTask(t1); tp->pushTask(t1);
sleep(2); sleep(2);

@ -44,7 +44,6 @@
fun:calloc fun:calloc
fun:_dl_allocate_tls fun:_dl_allocate_tls
fun:pthread_create@@GLIBC_2.2.5 fun:pthread_create@@GLIBC_2.2.5
fun:_ZN6Thread5startEv
fun:_ZN10ThreadPool18startWorkerThreadsEv
fun:main
} }

Loading…
Cancel
Save