ObjectPool, MysqlConnectionPool fixes

master
Denes Matetelki 13 years ago
parent 9ea53b322c
commit c1fbd3bf9b

@ -2,6 +2,7 @@
#define CONCURRENTQUEUE_HPP #define CONCURRENTQUEUE_HPP
#include <queue> #include <queue>
#include <algorithm>
#include "Mutex.hpp" #include "Mutex.hpp"
#include "ConditionVariable.hpp" #include "ConditionVariable.hpp"
@ -14,91 +15,90 @@ class CancelledException {};
template <typename T> template <typename T>
class ConcurrentQueue { class ConcurrentQueue
{
public: public:
ConcurrentQueue()
ConcurrentQueue() : m_queue()
: m_queue() , m_cancelled(false)
, m_cancelled(false) , m_mutex()
, m_mutex() , m_condVar(m_mutex)
, m_condVar(m_mutex) {
{ TRACE;
TRACE; }
}
~ConcurrentQueue()
~ConcurrentQueue() {
{ TRACE;
TRACE; }
}
void push(const T value)
void push(const T value) {
{ TRACE;
TRACE; ScopedLock sl(m_mutex);
ScopedLock sl(m_mutex); if (m_cancelled) throw CancelledException();
if (m_cancelled) throw CancelledException(); m_queue.push( value );
m_queue.push( value ); m_condVar.signal();
m_condVar.signal(); }
}
bool tryPop(T &popped_value)
bool tryPop(T &popped_value) {
{ TRACE;
TRACE; ScopedLock sl(m_mutex);
ScopedLock sl(m_mutex); if (m_cancelled) throw CancelledException();
if (m_cancelled) throw CancelledException(); if ( m_queue.empty() ) return false;
if ( m_queue.empty() ) return false;
popped_value = m_queue.front();
popped_value = m_queue.front(); m_queue.pop();
m_queue.pop(); return true;
return true; }
T waitAndPop()
{
TRACE;
ScopedLock sl(m_mutex);
while ( m_queue.empty() and not m_cancelled) {
m_condVar.wait();
} }
if (m_cancelled) throw CancelledException();
T retVal = m_queue.front(); // cctor
m_queue.pop();
return retVal;
}
T waitAndPop()
{
TRACE;
ScopedLock sl(m_mutex);
while ( m_queue.empty() and not m_cancelled) { bool empty() const
m_condVar.wait(); {
} TRACE;
if (m_cancelled) throw CancelledException(); ScopedLock sl(m_mutex);
if (m_cancelled) throw CancelledException();
T retVal = m_queue.front(); // cctor return m_queue.empty();
m_queue.pop(); }
return retVal;
}
bool empty() const void cancel()
{ {
TRACE; TRACE;
ScopedLock sl(m_mutex); ScopedLock sl(m_mutex);
if (m_cancelled) throw CancelledException(); m_cancelled = true;
return m_queue.empty(); m_condVar.broadcast();
} }
void cancel()
{
TRACE;
ScopedLock sl(m_mutex);
m_cancelled = true;
m_condVar.broadcast();
}
private: private:
ConcurrentQueue& operator=( const ConcurrentQueue& ); ConcurrentQueue& operator=( const ConcurrentQueue& );
ConcurrentQueue( const ConcurrentQueue& ); ConcurrentQueue( const ConcurrentQueue& );
std::queue<T> m_queue; std::queue<T> m_queue;
bool m_cancelled; bool m_cancelled;
mutable Mutex m_mutex; mutable Mutex m_mutex;
ConditionVariable m_condVar; ConditionVariable m_condVar;
}; };

@ -9,18 +9,27 @@ class MysqlConnectionPool : public ObjectPool<MysqlClient *>
{ {
public: public:
MysqlConnectionPool(); MysqlConnectionPool( const char *host = NULL,
~MysqlConnectionPool();
MysqlClient* create( const char *host = NULL,
const char *user = NULL, const char *user = NULL,
const char *passwd = NULL, const char *passwd = NULL,
const char *db = NULL, const char *db = NULL );
unsigned int port = 0, ~MysqlConnectionPool();
const char *unix_socket = NULL,
unsigned long clientflag = 0 ); void create();
/// @note Shall this be a specialized ObjectPool::clear?
void clear();
private:
MysqlConnectionPool(const MysqlConnectionPool&);
MysqlConnectionPool& operator=(const MysqlConnectionPool&);
const char *m_host;
const char *m_user;
const char *m_passwd;
const char *m_db;
bool reset(const MysqlClient* client);
}; };

@ -2,28 +2,46 @@
#define OBJECT_POOL_HPP #define OBJECT_POOL_HPP
#include "ConcurrentQueue.hpp" #include "ConcurrentQueue.hpp"
#include "Logger.hpp"
template <typename T> template <typename T>
class ObjectPool class ObjectPool
{ {
public: public:
ObjectPool(); ObjectPool() : m_pool()
virtual ~ObjectPool(); {
TRACE;
}
void add(const T object); virtual ~ObjectPool()
void remove(const T object); {
void clear(); TRACE;
}
T get();
virtual void reset(const T object) = 0; T acquire()
void release(const T object); {
TRACE;
return m_pool.waitAndPop();
}
void release(const T object)
{
TRACE;
m_pool.push(object);
}
bool empty() const
{
TRACE;
return m_pool.empty();
}
private: private:
ConcurrentQueue<T> m_pool; ConcurrentQueue<T> m_pool;
}; };
#endif // OBJECT_POOL_HPP #endif // OBJECT_POOL_HPP

@ -6,6 +6,8 @@
#include "ArgParse.hpp" #include "ArgParse.hpp"
#include "MysqlClient.hpp" #include "MysqlClient.hpp"
#include "MysqlConnectionPool.hpp"
#include <mysql/errmsg.h> #include <mysql/errmsg.h>
#include <string> #include <string>
@ -15,9 +17,12 @@
void setUpArgs(ArgParse &argParse) void setUpArgs(ArgParse &argParse)
{ {
TRACE_STATIC;
argParse.addArgument("--host", argParse.addArgument("--host",
"Hostname/IP", "Hostname/IP",
ArgParse::STRING ); ArgParse::STRING,
ArgParse::REQUIRED );
argParse.addArgument("-u, --user", argParse.addArgument("-u, --user",
"Username", "Username",
ArgParse::STRING, ArgParse::STRING,
@ -30,14 +35,9 @@ void setUpArgs(ArgParse &argParse)
"Password", "Password",
ArgParse::STRING, ArgParse::STRING,
ArgParse::REQUIRED ); ArgParse::REQUIRED );
argParse.addArgument("-port",
"Port", argParse.addArgument("-n, --number-of-connections",
ArgParse::INT ); "Number of connections. Default is 5",
argParse.addArgument("-s, --unix-socket",
"Unix socket",
ArgParse::STRING );
argParse.addArgument("-f, --client-flags",
"Client flags",
ArgParse::INT ); ArgParse::INT );
} }
@ -48,24 +48,59 @@ void getArgs( int argc, char* argv[],
std::string &user, std::string &user,
std::string &db, std::string &db,
std::string &pass, std::string &pass,
std::string &unixsocket, int &numberOfConnections )
int &port,
int &clientflags )
{ {
TRACE_STATIC;
argParse.parseArgs(argc, argv); argParse.parseArgs(argc, argv);
argParse.argAsString("--host", host); argParse.argAsString("--host", host);
argParse.argAsString("-u, --user", user); argParse.argAsString("-u, --user", user);
argParse.argAsString("-db, --database", db); argParse.argAsString("-db, --database", db);
argParse.argAsString("-p, --password", pass); argParse.argAsString("-p, --password", pass);
argParse.argAsInt("-port", port);
argParse.argAsString("-s, --unix-socket", unixsocket); argParse.argAsInt("-n, --number-of-connections", numberOfConnections);
argParse.argAsInt("-f, --client-flags", clientflags); }
bool checkArgs( int argc, char* argv[],
ArgParse &argParse,
std::string &host,
std::string &user,
std::string &db,
std::string &pass,
int &numberOfConnections )
{
TRACE_STATIC;
try {
getArgs( argc, argv,
argParse,
host, user, db, pass,
numberOfConnections );
} catch (std::runtime_error e) {
if ( argParse.foundArg("-h, --help") ) {
std::cout << argParse.usage() << std::endl;
return false;
}
std::cerr << e.what() << std::endl
<< "Check usage: " << argv[0] << " --help" << std::endl;
return false;
}
if ( argParse.foundArg("-h, --help") ) {
std::cout << argParse.usage() << std::endl;
return false;
}
return true;
} }
void printResults(std::list<std::string> &results) void printResults(std::list<std::string> &results)
{ {
TRACE_STATIC;
LOG ( Logger::DEBUG, std::string("Got query result number of rows: "). LOG ( Logger::DEBUG, std::string("Got query result number of rows: ").
append(TToStr(results.size())).c_str() ); append(TToStr(results.size())).c_str() );
@ -83,55 +118,44 @@ int main(int argc, char* argv[] )
Logger::init(std::cout); Logger::init(std::cout);
Logger::setLogLevel(Logger::FINEST); Logger::setLogLevel(Logger::FINEST);
// args
ArgParse argParse("Simple MySQL client", ArgParse argParse("Simple MySQL client",
"Report bugs to: denes.matetelki@gmail.com"); "Report bugs to: denes.matetelki@gmail.com");
setUpArgs(argParse); setUpArgs(argParse);
std::string host, user, db, pass, unixsocket; std::string host, user, db, pass;
int port, clientflags; int numberOfConnections(5);
try {
getArgs( argc, argv,
argParse,
host, user, db, pass, unixsocket,
port, clientflags );
} catch (std::runtime_error e) {
if ( argParse.foundArg("-h, --help") ) {
std::cout << argParse.usage() << std::endl;
return 1;
}
std::cerr << e.what() << std::endl
<< "Check usage: " << argv[0] << " --help" << std::endl;
return 1;
}
if ( argParse.foundArg("-h, --help") ) { if ( !checkArgs(argc, argv, argParse,
std::cout << argParse.usage() << std::endl; host, user, db, pass, numberOfConnections ) )
return 1; return 1;
}
// init
init_client_errs(); init_client_errs();
MysqlConnectionPool cp (
argParse.foundArg("--host") ? host.c_str() : NULL,
argParse.foundArg("-u, --user") ? user.c_str() : NULL,
argParse.foundArg("-p, --password") ? pass.c_str() : NULL,
argParse.foundArg("-db, --database") ? db .c_str() : NULL );
for ( int i = 0; i < numberOfConnections; ++i )
cp.create();
MysqlClient mysqlClient (
argParse.foundArg("--host") ? host.c_str() : NULL,
argParse.foundArg("-u, --user") ? user.c_str() : NULL,
argParse.foundArg("-p, --password") ? pass.c_str() : NULL,
argParse.foundArg("-db, --database") ? db .c_str() : NULL,
argParse.foundArg("-port") ? port : 0,
argParse.foundArg("-s, --unix-socket") ? unixsocket.c_str() : NULL,
argParse.foundArg("-f, --client-flags") ? clientflags : 0 );
// work
std::list<std::string> results; std::list<std::string> results;
if ( !mysqlClient.querty("SELECT * FROM seats", results) ) { MysqlClient *c = cp.acquire();
if ( !c->querty("SELECT * FROM seats", results) ) {
LOG ( Logger::ERR, "Could not execute query." ); LOG ( Logger::ERR, "Could not execute query." );
} else { } else {
printResults(results); printResults(results);
} }
cp.release(c);
// end
cp.clear();
finish_client_errs(); finish_client_errs();
Logger::destroy(); Logger::destroy();
return 0; return 0;
} }

@ -3,45 +3,36 @@
#include "Logger.hpp" #include "Logger.hpp"
MysqlConnectionPool::MysqlConnectionPool() MysqlConnectionPool::MysqlConnectionPool( const char *host,
const char *user,
const char *passwd,
const char *db )
: m_host(host)
, m_user(user)
, m_passwd(passwd)
, m_db(db)
{ {
TRACE; TRACE;
} }
MysqlConnectionPool::~MysqlConnectionPool() MysqlConnectionPool::~MysqlConnectionPool()
{ {
TRACE; TRACE;
} }
MysqlClient* MysqlConnectionPool::create( const char* host, void MysqlConnectionPool::create()
const char* user,
const char* passwd,
const char* db,
unsigned int port,
const char* unix_socket,
long unsigned int clientflag )
{ {
TRACE; TRACE;
MysqlClient *client = new MysqlClient(host, MysqlClient *client = new MysqlClient ( m_host, m_user, m_passwd, m_db );
user, client->connect();
passwd, release(client);
db,
port,
unix_socket,
clientflag);
return client;
} }
void MysqlConnectionPool::clear()
bool MysqlConnectionPool::reset(const MysqlClient* client)
{ {
TRACE; TRACE;
while ( !empty() )
// The MysqlClient is stateless delete acquire();
}
return true;
}

@ -1,63 +0,0 @@
#include "ObjectPool.hpp"
#include "Logger.hpp"
template <typename T>
ObjectPool<T>::ObjectPool()
: m_pool()
{
TRACE;
}
template <typename T>
ObjectPool<T>::~ObjectPool()
{
TRACE;
}
template <typename T>
void ObjectPool<T>::add(const T object)
{
TRACE;
m_pool.push(object);
}
template <typename T>
void ObjectPool<T>::remove(const T object)
{
TRACE;
// m_pool.tryPop(object);
}
template <typename T>
void ObjectPool<T>::clear()
{
TRACE;
// while ( !m_pool.empty() )
// m_pool.
}
template <typename T>
T ObjectPool<T>::get()
{
TRACE;
return m_pool.waitAndPop();
}
template <typename T>
void ObjectPool<T>::release(const T object)
{
TRACE;
m_pool.push(object);
}
Loading…
Cancel
Save