Connection augments messageParam, not just passing on to Message. TcpClient stops if Polling ended (server crashed). Polling's loop's statusvariable is volatile now. New class: WorkerThread, MysqlTask. New end-user program: mysqlclient_tcpwrapper.cpp

master
Denes Matetelki 13 years ago
parent 524063f565
commit db3433c59f

@ -31,7 +31,7 @@ public:
, m_message(this, msgParam)
, m_buffer(0)
, m_bufferLength(bufferLength)
, m_msgParam(msgParam)
{
TRACE;
@ -50,6 +50,7 @@ public:
, m_message(this, msgParam)
, m_buffer(0)
, m_bufferLength(bufferLength)
, m_msgParam(msgParam)
{
TRACE;
m_socket.createSocket();
@ -135,6 +136,12 @@ public:
return m_port;
}
void* getMsgParam() const
{
TRACE;
return m_msgParam;
}
private:
@ -149,6 +156,7 @@ private:
unsigned char *m_buffer;
size_t m_bufferLength;
void *m_msgParam;
};

@ -17,7 +17,6 @@ public:
void create();
void clear();
private:

@ -71,6 +71,12 @@ public:
m_polling = false;
}
bool isPolling() const
{
TRACE;
return m_polling;
}
protected:
@ -95,7 +101,9 @@ protected:
append(connection->getHost()).append(":").
append(connection->getPort()).c_str() );
m_connectionPool[client_socket] = new Connection<T>(client_socket);
m_connectionPool[client_socket] = new Connection<T>(
client_socket,
m_connection->getMsgParam() );
addFd( client_socket, POLLIN | POLLPRI );
}
@ -165,7 +173,7 @@ private:
typedef typename std::map< int, Connection<T>* > ConnectionPool;
Connection<T> *m_connection;
bool m_polling;
volatile bool m_polling;
ConnectionPool m_connectionPool;
nfds_t m_maxclients;

@ -25,8 +25,6 @@ public:
{
std::lock_guard<std::mutex> guard(m_lock);
// this is now the critical section
if ( not m_instance ) // re-check pinstance
{
// Douglas Schmidt proposed volatile
@ -48,7 +46,9 @@ public:
private:
static std::mutex m_lock;
static T* m_instance;
// instance chack shall not be cached
static volatile T* m_instance;
};
template<class T> std::mutex Singleton_DCLP<T>::m_lock;

@ -8,16 +8,11 @@ class Task
public:
// Task() {};
virtual ~Task() {};
virtual void run () = 0;
virtual bool isItStucked () const = 0;
protected:
// time_t m_startedToRun;
// time_t m_timeOut;
};

@ -115,6 +115,12 @@ public:
return m_connection.send(msg, msgLen);
}
bool isPolling() const
{
TRACE;
return m_watcher.isPolling();
}
private:
TcpClient(const TcpClient& );

@ -17,9 +17,10 @@ public:
TcpServer ( const std::string host,
const std::string port,
void *msgParam = 0,
const int maxClients = 5,
const int maxPendingQueueLen = 10 )
: m_connection(host, port)
: m_connection(host, port, msgParam)
, m_poll( &m_connection, maxClients)
, m_maxPendingQueueLen(maxPendingQueueLen)
{

@ -0,0 +1,23 @@
#ifndef WORKER_THREAD_HPP
#define WORKER_THREAD_HPP
#include "Thread.hpp"
#include "ThreadPool.hpp"
class WorkerThread : public Thread
{
public:
WorkerThread( ThreadPool& tp );
private:
void* run();
ThreadPool& m_tp;
};
#endif // WORKER_THREAD_HPP

@ -0,0 +1,45 @@
#include "EchoMessage.hpp"
#include "Logger.hpp"
#include "MysqlTask.hpp"
EchoMessage::EchoMessage( Connection<EchoMessage> *connection,
void *msgParam )
: Message<EchoMessage>(connection, msgParam)
{
TRACE;
}
bool EchoMessage::buildMessage( const void *msgPart,
const size_t msgLen )
{
TRACE;
m_buffer = std::string( (const char*) msgPart, msgLen );
onMessageReady();
return true;
}
void EchoMessage::onMessageReady()
{
TRACE;
LOG( Logger::INFO, std::string("Got message: \"").
append(m_buffer).append("\" from: ").
append(m_connection->getHost().append(":").
append(m_connection->getPort()) ).c_str() );
MsgParam *msgParam = static_cast<MsgParam*>(m_param);
msgParam->m_tp->pushTask(new MysqlTask( msgParam->m_cp,
m_connection,
m_buffer ));
}
size_t EchoMessage::getExpectedLength()
{
TRACE;
return 0;
}

@ -0,0 +1,35 @@
#ifndef ECHO_MESSAGE_HPP
#define ECHO_MESSAGE_HPP
#include "Message.hpp"
#include "Connection.hpp"
#include "ThreadPool.hpp"
#include "MysqlConnectionPool.hpp"
struct MsgParam
{
MysqlConnectionPool *m_cp;
ThreadPool *m_tp;
MsgParam(MysqlConnectionPool *cp, ThreadPool *tp) : m_cp(cp), m_tp(tp) {};
};
class EchoMessage : public Message<EchoMessage>
{
public:
EchoMessage( Connection<EchoMessage> *connection,
void *msgParam = 0);
bool buildMessage( const void *msgPart,
const size_t msgLen );
void onMessageReady();
protected:
size_t getExpectedLength();
};
#endif // ECHO_MESSAGE_HPP

@ -0,0 +1,42 @@
#include "MysqlTask.hpp"
#include <mysql/mysql.h>
MysqlTask::MysqlTask( MysqlConnectionPool *cp,
Connection<EchoMessage> *connection,
const std::string message )
: m_connectionPool(cp)
, m_connection(connection)
, m_message(message)
{
TRACE;
}
void MysqlTask::run()
{
TRACE;
LOG( Logger::FINEST, std::string("I'm a task, provessing message: \"").
append(m_message).append("\"").c_str() );
MYSQL_RES *res_set(0);
MysqlClient *c = m_connectionPool->acquire();
if ( !c->querty(m_message.c_str(), m_message.length(), &res_set) ) {
std::string errorMsg("Could not execute query.");
LOG ( Logger::ERR, errorMsg.c_str() );
m_connection->send(errorMsg.c_str(), errorMsg.length() );
} else {
std::list<std::string> results;
MysqlClient::queryResultToStringList(res_set, results);
std::string joinedLines;
std::list<std::string>::const_iterator it;
for(it = results.begin(); it != results.end(); ++it)
joinedLines.append(*it).append(";");
m_connection->send(joinedLines.c_str(), joinedLines.length() );
}
m_connectionPool->release(c);
}

@ -0,0 +1,32 @@
#ifndef MYSQL_TASK_HPP
#define MYSQL_TASK_HPP
#include "Task.hpp"
#include "MysqlConnectionPool.hpp"
#include "Connection.hpp"
#include "EchoMessage.hpp"
class MysqlTask : public Task
{
public:
MysqlTask( MysqlConnectionPool *cp,
Connection<EchoMessage> *connection,
const std::string message );
void run();
private:
MysqlTask(const MysqlTask&);
MysqlTask& operator=(const MysqlTask&);
MysqlConnectionPool *m_connectionPool;
Connection<EchoMessage> *m_connection;
std::string m_message;
};
#endif // MYSQL_TASK_HPP

@ -8,8 +8,6 @@
#include "MysqlClient.hpp"
#include "MysqlConnectionPool.hpp"
#include <mysql/errmsg.h>
#include <string>
#include <list>
#include <iostream>

@ -0,0 +1,223 @@
// g++ mysqlclient_main.cpp src/Logger.cpp src/MysqlClient.cpp src/ArgParse.cpp -I./include -lmysqlclient
#include "Logger.hpp"
#include "Common.hpp"
#include "ArgParse.hpp"
#include "MysqlClient.hpp"
#include "MysqlConnectionPool.hpp"
#include "TcpServer.hpp"
#include "Message.hpp"
#include "ThreadPool.hpp"
#include "WorkerThread.hpp"
#include "EchoMessage.hpp"
#include "MysqlTask.hpp"
#include <mysql/errmsg.h>
#include <string>
#include <list>
#include <iostream>
#include <stdexcept>
void setUpArgs(ArgParse &argParse)
{
TRACE_STATIC;
argParse.addArgument("--host",
"MySQL server hostname/IP",
ArgParse::STRING,
ArgParse::REQUIRED );
argParse.addArgument("-u, --user",
"MsSQL username",
ArgParse::STRING,
ArgParse::REQUIRED );
argParse.addArgument("-db, --database",
"MySQL database",
ArgParse::STRING,
ArgParse::REQUIRED );
argParse.addArgument("-p, --password",
"MySQL password",
ArgParse::STRING,
ArgParse::REQUIRED );
argParse.addArgument("-n, --number-of-connections",
"MySQL connections in connection pool. Default is 5",
ArgParse::INT );
argParse.addArgument("--port",
"Listening port. Default is 4455",
ArgParse::INT );
argParse.addArgument("-cl, --clients",
"Maximum number of served clients. Default is 5.",
ArgParse::INT );
argParse.addArgument("--pending",
"Maximum number of pending clients. Default is 5.",
ArgParse::INT );
argParse.addArgument("-t, --worker-threads",
"Number of worker threads. Default is 5.",
ArgParse::INT );
}
void getArgs( int argc, char* argv[],
ArgParse &argParse,
std::string &host,
std::string &user,
std::string &db,
std::string &pass,
int &conns,
int &port,
int &clients,
int &pending,
int &threads
)
{
TRACE_STATIC;
argParse.parseArgs(argc, argv);
argParse.argAsString("--host", host);
argParse.argAsString("-u, --user", user);
argParse.argAsString("-db, --database", db);
argParse.argAsString("-p, --password", pass);
argParse.argAsInt("-n, --number-of-connections", conns);
argParse.argAsInt("--port", port);
argParse.argAsInt("-cl, --clients", clients);
argParse.argAsInt("--pending", pending);
argParse.argAsInt("-t, --worker-threads", threads);
}
bool checkArgs( int argc, char* argv[],
ArgParse &argParse,
std::string &host,
std::string &user,
std::string &db,
std::string &pass,
int &conns,
int &port,
int &clients,
int &pending,
int &threads
)
{
TRACE_STATIC;
try {
getArgs( argc, argv,
argParse,
host, user, db, pass,
conns, port, clients, pending, threads );
} catch (std::runtime_error e) {
if ( argParse.foundArg("-h, --help") ) {
std::cout << argParse.usage() << std::endl;
return false;
}
std::cerr << e.what() << std::endl
<< "Check usage: " << argv[0] << " --help" << std::endl;
return false;
}
if ( argParse.foundArg("-h, --help") ) {
std::cout << argParse.usage() << std::endl;
return false;
}
return true;
}
void printResults(std::list<std::string> &results)
{
TRACE_STATIC;
LOG ( Logger::DEBUG, std::string("Got query result number of rows: ").
append(TToStr(results.size())).c_str() );
for (std::list<std::string>::const_iterator it = results.begin();
it != results.end();
++it ) {
LOG ( Logger::DEBUG, (*it).c_str() );
}
}
int main(int argc, char* argv[] )
{
Logger::createInstance();
Logger::init(std::cout);
Logger::setLogLevel(Logger::FINEST);
// args
ArgParse argParse("TCP server wrapper on a MySQL client",
"Report bugs to: denes.matetelki@gmail.com");
setUpArgs(argParse);
std::string host, user, db, pass;
int conns(5), port(4455), clients(5), pending(5), threads(5);
if ( !checkArgs(argc, argv, argParse,
host, user, db, pass,
conns, port, clients, pending, threads ) )
return 1;
// init MySQL connection pool
init_client_errs();
MysqlConnectionPool mysqlConnectionPool (
argParse.foundArg("--host") ? host.c_str() : NULL,
argParse.foundArg("-u, --user") ? user.c_str() : NULL,
argParse.foundArg("-p, --password") ? pass.c_str() : NULL,
argParse.foundArg("-db, --database") ? db .c_str() : NULL );
for ( int i = 0; i < conns; ++i )
mysqlConnectionPool.create();
// threadpool
ThreadPool threadPool;
for ( int i = 0; i < threads; ++i )
threadPool.pushWorkerThread(new WorkerThread(threadPool));
threadPool.startWorkerThreads();
// TCP server
MsgParam msgParam(&mysqlConnectionPool, &threadPool);
TcpServer<EchoMessage> tcpServer(std::string("127.0.0.1"),
TToStr(port),
&msgParam,
clients,
pending );
if ( !tcpServer.start() ) {
LOG( Logger::ERR, "Failed to start TCP server, exiting...");
mysqlConnectionPool.clear();
finish_client_errs();
Logger::destroy();
return 1;
}
// never reached
sleep(1);
tcpServer.stop();
// end
mysqlConnectionPool.clear();
finish_client_errs();
Logger::destroy();
return 0;
}

@ -64,7 +64,7 @@ int main(int argc, char* argv[] )
Logger::createInstance();
Logger::init(std::cout);
Logger::setLogLevel(Logger::DEBUG);
Logger::setLogLevel(Logger::FINEST);
bool finished = false;
@ -89,7 +89,7 @@ int main(int argc, char* argv[] )
// wait for the complate &handled reply
struct timespec tm = {0,1000};
while ( !finished )
while ( !finished && tcpclient.isPolling() )
nanosleep(&tm, &tm) ;
tcpclient.disconnect();

@ -0,0 +1,28 @@
#include "WorkerThread.hpp"
#include "Logger.hpp"
WorkerThread::WorkerThread( ThreadPool& tp )
: m_tp(tp)
{
TRACE;
}
void* WorkerThread::run()
{
TRACE;
while ( m_isRunning )
{
Task* task(0);
try {
task = m_tp.popTask();
task->run();
delete task;
} catch (CancelledException) {
LOG( Logger::FINEST, "Now I die.");
}
}
return 0;
}
Loading…
Cancel
Save