new classes: concurrentqueue, Task, Thread, ThreadPool, WorkerThread, and some tests. Trying to have cxxtest and cmake

master
Denes Matetelki 14 years ago
parent a39371125e
commit dccd38d8e7

@ -0,0 +1,5 @@
cmake_minimum_required (VERSION 2.6)
project (CPP_UTILS_LIB)
add_subdirectory (build)
add_subdirectory (test)

@ -0,0 +1,17 @@
cmake_minimum_required (VERSION 2.6)
project (CPP_UTILS_LIB)
message(STATUS "Lib dir:")
set (CXX_FLAGS "-Wall -Wextra -pedantic")
add_definitions( ${CXX_FLAGS} )
message(STATUS "g++ flags: ${CXX_FLAGS}")
include_directories (../include)
message(STATUS "include dir: ${CPP_UTILS_LIB}/include")
aux_source_directory(../src CPP_UTILS_LIB_SOURCES)
message(STATUS "sources: ${CPP_UTILS_LIB_SOURCES}")
add_library (CppUtils ${CPP_UTILS_LIB_SOURCES})
target_link_libraries(CppUtils pthread rt)

@ -2,8 +2,9 @@
#ifndef NOTRACE
#define TRACE(x) std::cout << x << " " << __PRETTY_FUNCTION__ << \
" @ " << __FILE__ << " : " << __LINE__ << std::endl
" : " << __LINE__ << std::endl
#else
/// @todo Get rid of the "warning: statement has no effect" compiler msgs
#define TRACE(x) ""
#endif
#endif

@ -0,0 +1,103 @@
#ifndef CONCURRENTQUEUE_HPP
#define CONCURRENTQUEUE_HPP
#include <queue>
#include "Mutex.hpp"
#include "ConditionVariable.hpp"
#include "ScopedLock.hpp"
#include "Common.hpp"
class CancelledException {};
template <typename T>
class ConcurrentQueue {
public:
ConcurrentQueue()
: m_cancelled(false)
, m_mutex()
, m_condition(&m_mutex)
{
TRACE(this);
}
~ConcurrentQueue()
{
TRACE(this);
}
void push(const T task)
{
TRACE(this);
ScopedLock sl(&m_mutex);
if (m_cancelled) throw CancelledException();
m_queue.push( task );
m_condition.signal();
}
bool tryPop(T &popped_value)
{
TRACE(this);
ScopedLock sl(&m_mutex);
if (m_cancelled) throw CancelledException();
if ( m_queue.empty() ) return false;
popped_value = m_queue.front();
m_queue.pop();
return true;
}
T waitAndPop()
{
TRACE(this);
ScopedLock sl(&m_mutex);
while ( m_queue.empty() and not m_cancelled) {
m_condition.wait();
}
if (m_cancelled) throw CancelledException();
T retVal = m_queue.front(); // cctor
m_queue.pop();
return retVal;
}
bool empty() const
{
TRACE(this);
ScopedLock sl(&m_mutex);
if (m_cancelled) throw CancelledException();
return m_queue.empty();
}
void cancel()
{
TRACE(this);
ScopedLock sl(&m_mutex);
m_cancelled = true;
m_condition.broadcast();
}
private:
ConcurrentQueue& operator=( const ConcurrentQueue& );
ConcurrentQueue( const ConcurrentQueue& );
std::queue<T> m_queue;
bool m_cancelled;
Mutex m_mutex;
ConditionVariable m_condition;
};
#endif // CONCURRENTQUEUE_HPP

@ -9,7 +9,7 @@ class ConditionVariable
public:
ConditionVariable(Mutex* m);
ConditionVariable(const Mutex* const mutex);
~ConditionVariable();
void wait(const int interval = 0);
@ -21,7 +21,7 @@ private:
ConditionVariable(const ConditionVariable&);
ConditionVariable& operator=(const ConditionVariable&);
Mutex* m_mutex;
const Mutex* const m_mutex;
pthread_cond_t m_condVar;
};

@ -13,7 +13,7 @@ public:
void lock();
void unlock();
bool tryLock(int interval = 0);
bool tryLock(const int interval = 0);
pthread_mutex_t* getPThreadMutex();

@ -1,14 +1,15 @@
#ifndef SCOPED_LOCK_HPP
#define SCOPED_LOCK_HPP
#ifndef SCOPEDLOCK_HPP
#define SCOPEDLOCK_HPP
#include "Mutex.hpp"
class ScopedLock
{
public:
ScopedLock(Mutex* m);
ScopedLock( const Mutex* const mutex );
~ScopedLock();
private:
@ -16,8 +17,9 @@ private:
ScopedLock(const ScopedLock&);
ScopedLock& operator=(const ScopedLock&);
Mutex* m_mutex;
const Mutex* const m_mutex;
};
#endif // SCOPED_LOCK_HPP
#endif // SCOPEDLOCK_HPP

@ -0,0 +1,55 @@
#ifndef SINGLETON_HPP
#define SINGLETON_HPP
#include "Common.hpp"
template<typename T>
class Singleton
{
protected:
Singleton() { TRACE("Simgleton::Singleton()"); }
virtual ~Singleton() { TRACE("Simgleton::~Singleton()"); }
private:
Singleton( const Singleton& );
Singleton& operator=( const Singleton& );
public:
static void createInstance()
{
TRACE("Simgleton::createInstance()");
if ( m_instance ) {
delete m_instance;
}
m_instance = new T();
}
static T* getInstance()
{
TRACE("Simgleton::getInstance()");
return m_instance;
}
static void destroy()
{
TRACE("Simgleton::destroy()");
if ( m_instance ) {
delete m_instance;
}
m_instance = 0;
}
private:
static T* m_instance;
};
template<class T> T* Singleton<T>::m_instance = 0;
#endif // SINGLETON_HPP

@ -0,0 +1,25 @@
#ifndef TASK_HPP
#define TASK_HPP
#include <time.h>
class Task
{
public:
Task();
void run ();
bool isItStucked () const;
private:
time_t m_startedToRun;
time_t m_timeOut;
};
#endif // TASK_HPP

@ -0,0 +1,30 @@
#ifndef THREAD_HPP
#define THREAD_HPP
#include <pthread.h>
class Thread
{
public:
Thread();
virtual ~Thread();
void start();
void* join() const;
void sendSignal( const int nSignal ) const;
private:
virtual void* run() = 0;
static void* threadStarter( void* pData );
private:
pthread_t m_nThread;
};
#endif // THREAD_HPP

@ -0,0 +1,42 @@
#ifndef THREADPOOL_HPP
#define THREADPOOL_HPP
#include <vector>
#include "ConcurrentQueue.hpp"
#include "WorkerThread.hpp"
#include "Task.hpp"
#include "Mutex.hpp"
class WorkerThread;
class ThreadPool
{
public:
ThreadPool( const int threadNum );
virtual ~ThreadPool();
void startWorkerThreads();
virtual void pushTask( Task* task );
Task* popTask();
void stop();
void join() const;
private:
ThreadPool( const ThreadPool& );
ThreadPool& operator=( const ThreadPool& );
int m_threadNum;
std::vector<WorkerThread*> m_threads;
ConcurrentQueue<Task*> m_tasks;
// Mutex m_mutex;
};
#endif // THREADPOOL_HPP */

@ -0,0 +1,26 @@
#ifndef WORKER_THREAD_HPP
#define WORKER_THREAD_HPP
#include "Thread.hpp"
#include "ThreadPool.hpp"
class ThreadPool;
class WorkerThread : public Thread
{
public:
WorkerThread( ThreadPool& tp );
void stop();
private:
void* run();
ThreadPool& m_tp;
bool m_isRunning;
};
#endif // WORKER_THREAD_HPP

@ -2,20 +2,22 @@
#include "Common.hpp"
#include <time.h>
#include <assert.h>
ConditionVariable::ConditionVariable(Mutex* m)
: m_mutex(m)
, m_condVar(PTHREAD_COND_INITIALIZER)
ConditionVariable::ConditionVariable(const Mutex* const mutex)
: m_mutex(mutex)
{
TRACE(this);
pthread_cond_init( &m_condVar, 0 );
int ret = pthread_cond_init( &m_condVar, 0 );
assert( ret == 0);
}
ConditionVariable::~ConditionVariable()
{
TRACE(this);
pthread_cond_destroy( &m_condVar );
int ret = pthread_cond_destroy( &m_condVar );
assert( ret == 0);
}
@ -23,7 +25,9 @@ void ConditionVariable::wait(const int interval)
{
TRACE(this);
if ( interval == 0 ) {
pthread_cond_wait( &m_condVar, m_mutex->getPThreadMutex() );
int ret = pthread_cond_wait( &m_condVar,
const_cast<Mutex*>(m_mutex)->getPThreadMutex() );
assert( ret == 0);
} else {
timespec abs_time;
clock_gettime ( CLOCK_REALTIME, &abs_time );
@ -32,18 +36,23 @@ void ConditionVariable::wait(const int interval)
abs_time.tv_nsec -= 1000000000;
abs_time.tv_sec += 1;
}
pthread_cond_timedwait( &m_condVar, m_mutex->getPThreadMutex(), &abs_time );
int ret = pthread_cond_timedwait( &m_condVar,
const_cast<Mutex*>(m_mutex)->getPThreadMutex(),
&abs_time );
assert( ret == 0);
}
}
void ConditionVariable::signal()
{
TRACE(this);
pthread_cond_signal( &m_condVar );
int ret = pthread_cond_signal( &m_condVar );
assert( ret == 0);
}
void ConditionVariable::broadcast()
{
TRACE(this);
pthread_cond_broadcast( &m_condVar );
int ret = pthread_cond_broadcast( &m_condVar );
assert( ret == 0);
}

@ -5,7 +5,7 @@
#include <time.h>
Mutex::Mutex(int kind) : m_mutex(PTHREAD_MUTEX_INITIALIZER)
Mutex::Mutex(int kind)
{
TRACE(this);
int ret;
@ -45,7 +45,7 @@ void Mutex::unlock()
}
bool Mutex::tryLock(int interval)
bool Mutex::tryLock(const int interval)
{
TRACE(this);
if ( interval == 0 ) {

@ -2,15 +2,15 @@
#include "Common.hpp"
ScopedLock::ScopedLock(Mutex* m) : m_mutex(m)
ScopedLock::ScopedLock( const Mutex* const mutex) : m_mutex(mutex)
{
TRACE(this);
m_mutex->lock();
const_cast<Mutex*>(m_mutex)->lock();
}
ScopedLock::~ScopedLock()
{
TRACE(this);
m_mutex->unlock();
const_cast<Mutex*>(m_mutex)->unlock();
}

@ -0,0 +1,30 @@
#include "Task.hpp"
#include <iostream>
#include "Common.hpp"
Task::Task() : m_timeOut(5)
{
TRACE(this);
}
void Task::run()
{
TRACE(this);
m_startedToRun = time(NULL);
std::cout << "I'm a task..." << std::endl;
// other stuff
m_startedToRun = 0;
}
bool Task::isItStucked () const
{
TRACE(this);
return ( m_startedToRun + m_timeOut < time(NULL) );
}

@ -0,0 +1,46 @@
#include "Thread.hpp"
#include "Common.hpp"
#include <signal.h>
#include <iostream>
Thread::Thread() : m_nThread( 0 )
{
TRACE(this);
}
Thread::~Thread()
{
TRACE(this);
}
void Thread::start()
{
TRACE(this);
pthread_create( &m_nThread, NULL, threadStarter, ( void* )this );
}
void* Thread::join() const
{
TRACE(this);
void* retVal;
pthread_join( m_nThread, &retVal );
return retVal;
}
void Thread::sendSignal( const int nSignal ) const
{
TRACE(this);
pthread_kill( m_nThread, nSignal );
}
void* Thread::threadStarter( void* pData )
{
TRACE("Thread::threadStarter");
return static_cast<Thread *>(pData)->run();
}

@ -0,0 +1,66 @@
#include "ThreadPool.hpp"
#include "Common.hpp"
ThreadPool::ThreadPool( const int threadNum ) : m_threadNum( threadNum )
{
TRACE(this);
}
ThreadPool::~ThreadPool()
{
TRACE(this);
std::vector<WorkerThread*>::iterator it;
for( it = m_threads.begin() ; it != m_threads.end(); it++ )
{
delete (*it);
}
m_threads.clear();
}
void ThreadPool::pushTask( Task* task )
{
TRACE(this);
m_tasks.push(task);
}
Task* ThreadPool::popTask()
{
TRACE(this);
return m_tasks.waitAndPop();
}
void ThreadPool::startWorkerThreads()
{
TRACE(this);
for( int i = 0; i<m_threadNum; i++ )
{
WorkerThread* t = new WorkerThread ( *this );
m_threads.push_back( t );
t->start();
}
}
void ThreadPool::stop()
{
TRACE(this);
std::vector<WorkerThread*>::iterator it;
for( it = m_threads.begin() ; it != m_threads.end(); it++ )
{
(*it)->stop();
}
m_tasks.cancel();
}
void ThreadPool::join() const
{
TRACE(this);
std::vector<WorkerThread*>::const_iterator it;
for( it = m_threads.begin() ; it != m_threads.end(); it++ )
{
(*it)->join();
}
}

@ -0,0 +1,40 @@
#include "WorkerThread.hpp"
#include "Task.hpp"
#include "Common.hpp"
WorkerThread::WorkerThread( ThreadPool& tp )
: m_tp(tp)
, m_isRunning(true)
{
TRACE(this);
}
void WorkerThread::stop()
{
TRACE(this);
m_isRunning = false;
}
void* WorkerThread::run()
{
TRACE(this);
while ( m_isRunning )
{
Task* task(0);
try {
task = m_tp.popTask();
task->run();
delete task;
} catch (CancelledException) {
std::cout << "Now I die." << std::endl;
}
}
return 0;
}

@ -0,0 +1,24 @@
cmake_minimum_required (VERSION 2.6)
project (CPP_UTILS_TEST)
message(STATUS "Test dir:")
set (CXX_FLAGS "-Wall -Wextra -pedantic")
add_definitions( ${CXX_FLAGS} )
message(STATUS "g++ flags: ${CXX_FLAGS}")
include_directories (../include)
message(STATUS "include dir: ${CPP_UTILS_LIB}/include")
add_executable (testThreadPool main_threadpool.cpp)
target_link_libraries (testThreadPool CppUtils)
find_package(CxxTest)
if(CXXTEST_FOUND)
set(CXXTEST_USE_PERL TRUE)
include_directories(${CXXTEST_INCLUDE_DIR})
enable_testing()
# CXXTEST_ADD_TEST(unittest_sos check_sos.cpp ${SOS_TEST_PATH}/check_sos.h)
# target_link_libraries(unittest_sos CppUtils)
endif()

@ -0,0 +1,110 @@
#include <iostream>
#include "../ConcurrentQueue.hpp"
#include "../Thread.hpp"
#define TRACE std::cout << __FILE__ << " @ " << __PRETTY_FUNCTION__ << ":" << __LINE__ << std::endl;
class WaitingThread : public Thread
{
public:
WaitingThread (ConcurrentQueue<int>& cq)
: m_isRunning( true ),
m_cq(cq)
{
TRACE
}
~WaitingThread()
{
TRACE
}
void stop ()
{
TRACE
m_isRunning = false;
}
private:
void* run ()
{
TRACE
while ( m_isRunning )
{
std::cout << "waiting..." << std::endl;
int retval = m_cq.waitAndPop();
std::cout << "waiting...ENDED! Got retval: " << retval << std::endl;
}
return 0;
}
bool m_isRunning;
ConcurrentQueue<int>& m_cq;
};
class fake_type
{
public:
fake_type(int i) : m_int(i)
{
TRACE
}
~fake_type()
{
TRACE
}
fake_type( const fake_type& m)
{
TRACE
m_int = m.m_int;
}
fake_type& operator=( const fake_type& m)
{
TRACE
m_int = m.m_int;
return *this;
}
private:
int m_int;
};
int main()
{
ConcurrentQueue<fake_type> cq1;
cq1.push(fake_type(7));
fake_type m = cq1.waitAndPop();
// other case
ConcurrentQueue<int> cq2;
cq2.push(7);
cq2.push(13);
WaitingThread* wt1 = new WaitingThread(cq2);
wt1->start();
sleep(2);
cq2.push(34);
sleep(5);
wt1->stop();
sleep(5);
return 0;
}

@ -0,0 +1,85 @@
#include <iostream>
#include <stdexcept>
#include <pthread.h>
pthread_mutex_t p_mutex;
#include <boost/thread/mutex.hpp>
boost::mutex b_mutex;
#define TRACE std::cout << __FILE__ << " @ " << __PRETTY_FUNCTION__ << ":" << __LINE__ << std::endl;
class ScopedLock
{
public:
ScopedLock(pthread_mutex_t const& mutex) : m_mutex(mutex)
{
TRACE;
pthread_mutex_lock( &m_mutex );
}
~ScopedLock(void )
{
TRACE;
pthread_mutex_unlock( &m_mutex );
}
private:
pthread_mutex_t m_mutex;
};
void fv ()
{
ScopedLock sl(p_mutex);
throw std::logic_error("p_thread stuff");
}
void fv2 ()
{
boost::mutex::scoped_lock lock(b_mutex);
throw std::logic_error("boost_thread stuff");
}
int main()
{
pthread_mutex_init( &p_mutex, NULL );
try {
fv();
} catch (...) {
TRACE;
if (pthread_mutex_trylock( &p_mutex) == 0 ) {
std::cout << "pthread mutex is OK, unlocked " << std::endl;
pthread_mutex_unlock( &p_mutex );
} else {
std::cout << "pthread mutex is STILL LOCKED!" << std::endl;
}
}
pthread_mutex_destroy( &p_mutex );
try {
fv2();
} catch (...) {
TRACE;
if ( b_mutex.try_lock() == true ) {
std::cout << "boost mutex is OK, unlocked " << std::endl;
b_mutex.unlock();
} else {
std::cout << "boost mutex is STILL LOCKED!" << std::endl;
}
}
return 0;
}

@ -0,0 +1,87 @@
#include <iostream>
#include <signal.h> // signal to thread
#include <stdlib.h> // malloc
// #include <string.h>
#include "Thread.hpp"
#include "Common.hpp"
class ThreadClass : public Thread
{
public:
~ThreadClass() { TRACE(this); }
ThreadClass() { TRACE(this); }
private:
void* run( void ) {
TRACE(this);
void *retval = malloc(sizeof(int));
*((int*)retval) = 14;
return retval;
return 0;
}
};
class ThreadClassWithSignal : public Thread
{
public:
~ThreadClassWithSignal() {
TRACE(this);
}
ThreadClassWithSignal() {
TRACE(this);
signal(SIGINT, signal_handler);
}
private:
void* run( void ) {
TRACE(this);
void *retval = malloc(sizeof(int));
*((int*)retval) = 14;
return retval;
return 0;
}
void static signal_handler(int sig)
{
TRACE("ThreadClassWithSignal::signal_handler");
if (sig==SIGINT) {
TRACE("signal_handler got SIGINT");
pthread_exit(0);
}
}
};
int main()
{
TRACE("main start");
ThreadClass *m = new ThreadClass;
m->start();
void *retVal = m->join();
std::cout << "got retVal: " << *((int*)retVal) << std::endl;
free(retVal);
delete m;
ThreadClass *m2 = new ThreadClass;
m2->start();
m2->sendSignal(SIGINT);
sleep(3);
std::cout << "after sendSignal the:" << m2 <<std::endl;
delete m2;
TRACE("main end");
return 0;
}

@ -0,0 +1,30 @@
#include <iostream>
#include <signal.h>
#include <stdlib.h>
#include <string.h>
#include "ThreadPool.hpp"
#include "Task.hpp"
#include "Common.hpp"
int main()
{
TRACE("Main start");
ThreadPool* tp = new ThreadPool(5);
tp->startWorkerThreads();
Task* t1 = new Task();
tp->pushTask(t1);
sleep(2);
tp->stop();
tp->join();
delete tp;
TRACE("Main end");
return 0;
}
Loading…
Cancel
Save