diff --git a/include/Connection.hpp b/include/Connection.hpp index 718291d..6b9448b 100644 --- a/include/Connection.hpp +++ b/include/Connection.hpp @@ -31,7 +31,7 @@ public: , m_message(this, msgParam) , m_buffer(0) , m_bufferLength(bufferLength) - + , m_msgParam(msgParam) { TRACE; @@ -50,6 +50,7 @@ public: , m_message(this, msgParam) , m_buffer(0) , m_bufferLength(bufferLength) + , m_msgParam(msgParam) { TRACE; m_socket.createSocket(); @@ -135,6 +136,12 @@ public: return m_port; } + void* getMsgParam() const + { + TRACE; + return m_msgParam; + } + private: @@ -149,6 +156,7 @@ private: unsigned char *m_buffer; size_t m_bufferLength; + void *m_msgParam; }; diff --git a/include/MysqlConnectionPool.hpp b/include/MysqlConnectionPool.hpp index 668b6ac..9867d77 100644 --- a/include/MysqlConnectionPool.hpp +++ b/include/MysqlConnectionPool.hpp @@ -17,7 +17,6 @@ public: void create(); - void clear(); private: diff --git a/include/Poll.hpp b/include/Poll.hpp index 5731b4c..4aa7d8a 100644 --- a/include/Poll.hpp +++ b/include/Poll.hpp @@ -71,6 +71,12 @@ public: m_polling = false; } + bool isPolling() const + { + TRACE; + return m_polling; + } + protected: @@ -95,7 +101,9 @@ protected: append(connection->getHost()).append(":"). append(connection->getPort()).c_str() ); - m_connectionPool[client_socket] = new Connection(client_socket); + m_connectionPool[client_socket] = new Connection( + client_socket, + m_connection->getMsgParam() ); addFd( client_socket, POLLIN | POLLPRI ); } @@ -165,7 +173,7 @@ private: typedef typename std::map< int, Connection* > ConnectionPool; Connection *m_connection; - bool m_polling; + volatile bool m_polling; ConnectionPool m_connectionPool; nfds_t m_maxclients; diff --git a/include/Singleton_DCLP.hpp b/include/Singleton_DCLP.hpp index 55da169..aa9f529 100644 --- a/include/Singleton_DCLP.hpp +++ b/include/Singleton_DCLP.hpp @@ -25,8 +25,6 @@ public: { std::lock_guard guard(m_lock); - // this is now the critical section - if ( not m_instance ) // re-check pinstance { // Douglas Schmidt proposed volatile @@ -48,7 +46,9 @@ public: private: static std::mutex m_lock; - static T* m_instance; + + // instance chack shall not be cached + static volatile T* m_instance; }; template std::mutex Singleton_DCLP::m_lock; diff --git a/include/Task.hpp b/include/Task.hpp index 5eef25e..4946a70 100644 --- a/include/Task.hpp +++ b/include/Task.hpp @@ -8,16 +8,11 @@ class Task public: -// Task() {}; virtual ~Task() {}; - virtual void run () = 0; - virtual bool isItStucked () const = 0; protected: -// time_t m_startedToRun; -// time_t m_timeOut; }; diff --git a/include/TcpClient.hpp b/include/TcpClient.hpp index 4771044..4e789f3 100644 --- a/include/TcpClient.hpp +++ b/include/TcpClient.hpp @@ -115,6 +115,12 @@ public: return m_connection.send(msg, msgLen); } + bool isPolling() const + { + TRACE; + return m_watcher.isPolling(); + } + private: TcpClient(const TcpClient& ); diff --git a/include/TcpServer.hpp b/include/TcpServer.hpp index 685d5dc..f1099cf 100644 --- a/include/TcpServer.hpp +++ b/include/TcpServer.hpp @@ -15,11 +15,12 @@ class TcpServer { public: - TcpServer ( const std::string host, - const std::string port, - const int maxClients = 5, - const int maxPendingQueueLen = 10 ) - : m_connection(host, port) + TcpServer ( const std::string host, + const std::string port, + void *msgParam = 0, + const int maxClients = 5, + const int maxPendingQueueLen = 10 ) + : m_connection(host, port, msgParam) , m_poll( &m_connection, maxClients) , m_maxPendingQueueLen(maxPendingQueueLen) { diff --git a/include/WorkerThread.hpp b/include/WorkerThread.hpp new file mode 100644 index 0000000..cd609be --- /dev/null +++ b/include/WorkerThread.hpp @@ -0,0 +1,23 @@ +#ifndef WORKER_THREAD_HPP +#define WORKER_THREAD_HPP + +#include "Thread.hpp" +#include "ThreadPool.hpp" + + +class WorkerThread : public Thread +{ + +public: + + WorkerThread( ThreadPool& tp ); + +private: + + void* run(); + + ThreadPool& m_tp; +}; + + +#endif // WORKER_THREAD_HPP diff --git a/other/EchoMessage.cpp b/other/EchoMessage.cpp new file mode 100644 index 0000000..d70f100 --- /dev/null +++ b/other/EchoMessage.cpp @@ -0,0 +1,45 @@ +#include "EchoMessage.hpp" + +#include "Logger.hpp" + +#include "MysqlTask.hpp" + + +EchoMessage::EchoMessage( Connection *connection, + void *msgParam ) + : Message(connection, msgParam) +{ + TRACE; +} + +bool EchoMessage::buildMessage( const void *msgPart, + const size_t msgLen ) +{ + TRACE; + m_buffer = std::string( (const char*) msgPart, msgLen ); + onMessageReady(); + return true; +} + +void EchoMessage::onMessageReady() +{ + TRACE; + + LOG( Logger::INFO, std::string("Got message: \""). + append(m_buffer).append("\" from: "). + append(m_connection->getHost().append(":"). + append(m_connection->getPort()) ).c_str() ); + + + MsgParam *msgParam = static_cast(m_param); + msgParam->m_tp->pushTask(new MysqlTask( msgParam->m_cp, + m_connection, + m_buffer )); +} + + +size_t EchoMessage::getExpectedLength() +{ + TRACE; + return 0; +} diff --git a/other/EchoMessage.hpp b/other/EchoMessage.hpp new file mode 100644 index 0000000..7d3e861 --- /dev/null +++ b/other/EchoMessage.hpp @@ -0,0 +1,35 @@ +#ifndef ECHO_MESSAGE_HPP +#define ECHO_MESSAGE_HPP + +#include "Message.hpp" +#include "Connection.hpp" +#include "ThreadPool.hpp" +#include "MysqlConnectionPool.hpp" + +struct MsgParam +{ + MysqlConnectionPool *m_cp; + ThreadPool *m_tp; + MsgParam(MysqlConnectionPool *cp, ThreadPool *tp) : m_cp(cp), m_tp(tp) {}; +}; + + +class EchoMessage : public Message +{ +public: + + EchoMessage( Connection *connection, + void *msgParam = 0); + + bool buildMessage( const void *msgPart, + const size_t msgLen ); + + void onMessageReady(); + +protected: + + size_t getExpectedLength(); + +}; + +#endif // ECHO_MESSAGE_HPP diff --git a/other/MysqlTask.cpp b/other/MysqlTask.cpp new file mode 100644 index 0000000..c370d3f --- /dev/null +++ b/other/MysqlTask.cpp @@ -0,0 +1,42 @@ +#include "MysqlTask.hpp" + +#include + + +MysqlTask::MysqlTask( MysqlConnectionPool *cp, + Connection *connection, + const std::string message ) + : m_connectionPool(cp) + , m_connection(connection) + , m_message(message) +{ + TRACE; +} + +void MysqlTask::run() +{ + TRACE; + LOG( Logger::FINEST, std::string("I'm a task, provessing message: \""). + append(m_message).append("\"").c_str() ); + + MYSQL_RES *res_set(0); + MysqlClient *c = m_connectionPool->acquire(); + if ( !c->querty(m_message.c_str(), m_message.length(), &res_set) ) { + + std::string errorMsg("Could not execute query."); + LOG ( Logger::ERR, errorMsg.c_str() ); + m_connection->send(errorMsg.c_str(), errorMsg.length() ); + } else { + + std::list results; + MysqlClient::queryResultToStringList(res_set, results); + + std::string joinedLines; + std::list::const_iterator it; + for(it = results.begin(); it != results.end(); ++it) + joinedLines.append(*it).append(";"); + + m_connection->send(joinedLines.c_str(), joinedLines.length() ); + } + m_connectionPool->release(c); +} diff --git a/other/MysqlTask.hpp b/other/MysqlTask.hpp new file mode 100644 index 0000000..fbc0daf --- /dev/null +++ b/other/MysqlTask.hpp @@ -0,0 +1,32 @@ +#ifndef MYSQL_TASK_HPP +#define MYSQL_TASK_HPP + +#include "Task.hpp" +#include "MysqlConnectionPool.hpp" +#include "Connection.hpp" +#include "EchoMessage.hpp" + + +class MysqlTask : public Task +{ + +public: + + MysqlTask( MysqlConnectionPool *cp, + Connection *connection, + const std::string message ); + + void run(); + +private: + + MysqlTask(const MysqlTask&); + MysqlTask& operator=(const MysqlTask&); + + MysqlConnectionPool *m_connectionPool; + Connection *m_connection; + std::string m_message; + +}; + +#endif // MYSQL_TASK_HPP diff --git a/other/mysqlclient_main.cpp b/other/mysqlclient_main.cpp index dae693b..0dd9d9a 100644 --- a/other/mysqlclient_main.cpp +++ b/other/mysqlclient_main.cpp @@ -8,8 +8,6 @@ #include "MysqlClient.hpp" #include "MysqlConnectionPool.hpp" -#include - #include #include #include diff --git a/other/mysqlclient_tcpwrapper.cpp b/other/mysqlclient_tcpwrapper.cpp new file mode 100644 index 0000000..2688aea --- /dev/null +++ b/other/mysqlclient_tcpwrapper.cpp @@ -0,0 +1,223 @@ +// g++ mysqlclient_main.cpp src/Logger.cpp src/MysqlClient.cpp src/ArgParse.cpp -I./include -lmysqlclient + + +#include "Logger.hpp" +#include "Common.hpp" + +#include "ArgParse.hpp" +#include "MysqlClient.hpp" +#include "MysqlConnectionPool.hpp" + +#include "TcpServer.hpp" +#include "Message.hpp" + +#include "ThreadPool.hpp" +#include "WorkerThread.hpp" + +#include "EchoMessage.hpp" +#include "MysqlTask.hpp" + +#include + +#include +#include +#include +#include + + + + + + + +void setUpArgs(ArgParse &argParse) +{ + TRACE_STATIC; + + argParse.addArgument("--host", + "MySQL server hostname/IP", + ArgParse::STRING, + ArgParse::REQUIRED ); + argParse.addArgument("-u, --user", + "MsSQL username", + ArgParse::STRING, + ArgParse::REQUIRED ); + argParse.addArgument("-db, --database", + "MySQL database", + ArgParse::STRING, + ArgParse::REQUIRED ); + argParse.addArgument("-p, --password", + "MySQL password", + ArgParse::STRING, + ArgParse::REQUIRED ); + argParse.addArgument("-n, --number-of-connections", + "MySQL connections in connection pool. Default is 5", + ArgParse::INT ); + + argParse.addArgument("--port", + "Listening port. Default is 4455", + ArgParse::INT ); + argParse.addArgument("-cl, --clients", + "Maximum number of served clients. Default is 5.", + ArgParse::INT ); + argParse.addArgument("--pending", + "Maximum number of pending clients. Default is 5.", + ArgParse::INT ); + + argParse.addArgument("-t, --worker-threads", + "Number of worker threads. Default is 5.", + ArgParse::INT ); +} + + +void getArgs( int argc, char* argv[], + ArgParse &argParse, + std::string &host, + std::string &user, + std::string &db, + std::string &pass, + int &conns, + int &port, + int &clients, + int &pending, + int &threads + ) +{ + TRACE_STATIC; + + argParse.parseArgs(argc, argv); + + argParse.argAsString("--host", host); + argParse.argAsString("-u, --user", user); + argParse.argAsString("-db, --database", db); + argParse.argAsString("-p, --password", pass); + argParse.argAsInt("-n, --number-of-connections", conns); + + argParse.argAsInt("--port", port); + argParse.argAsInt("-cl, --clients", clients); + argParse.argAsInt("--pending", pending); + + argParse.argAsInt("-t, --worker-threads", threads); +} + + +bool checkArgs( int argc, char* argv[], + ArgParse &argParse, + std::string &host, + std::string &user, + std::string &db, + std::string &pass, + int &conns, + int &port, + int &clients, + int &pending, + int &threads + ) +{ + TRACE_STATIC; + + try { + getArgs( argc, argv, + argParse, + host, user, db, pass, + conns, port, clients, pending, threads ); + } catch (std::runtime_error e) { + if ( argParse.foundArg("-h, --help") ) { + std::cout << argParse.usage() << std::endl; + return false; + } + std::cerr << e.what() << std::endl + << "Check usage: " << argv[0] << " --help" << std::endl; + return false; + } + + if ( argParse.foundArg("-h, --help") ) { + std::cout << argParse.usage() << std::endl; + return false; + } + + return true; +} + + +void printResults(std::list &results) +{ + TRACE_STATIC; + + LOG ( Logger::DEBUG, std::string("Got query result number of rows: "). + append(TToStr(results.size())).c_str() ); + + for (std::list::const_iterator it = results.begin(); + it != results.end(); + ++it ) { + LOG ( Logger::DEBUG, (*it).c_str() ); + } +} + + +int main(int argc, char* argv[] ) +{ + Logger::createInstance(); + Logger::init(std::cout); + Logger::setLogLevel(Logger::FINEST); + + + // args + ArgParse argParse("TCP server wrapper on a MySQL client", + "Report bugs to: denes.matetelki@gmail.com"); + setUpArgs(argParse); + + std::string host, user, db, pass; + int conns(5), port(4455), clients(5), pending(5), threads(5); + + if ( !checkArgs(argc, argv, argParse, + host, user, db, pass, + conns, port, clients, pending, threads ) ) + return 1; + + // init MySQL connection pool + init_client_errs(); + MysqlConnectionPool mysqlConnectionPool ( + argParse.foundArg("--host") ? host.c_str() : NULL, + argParse.foundArg("-u, --user") ? user.c_str() : NULL, + argParse.foundArg("-p, --password") ? pass.c_str() : NULL, + argParse.foundArg("-db, --database") ? db .c_str() : NULL ); + + for ( int i = 0; i < conns; ++i ) + mysqlConnectionPool.create(); + + + // threadpool + ThreadPool threadPool; + for ( int i = 0; i < threads; ++i ) + threadPool.pushWorkerThread(new WorkerThread(threadPool)); + + threadPool.startWorkerThreads(); + + // TCP server + MsgParam msgParam(&mysqlConnectionPool, &threadPool); + TcpServer tcpServer(std::string("127.0.0.1"), + TToStr(port), + &msgParam, + clients, + pending ); + + if ( !tcpServer.start() ) { + LOG( Logger::ERR, "Failed to start TCP server, exiting..."); + mysqlConnectionPool.clear(); + finish_client_errs(); + Logger::destroy(); + return 1; + } + + // never reached + sleep(1); + tcpServer.stop(); + + + // end + mysqlConnectionPool.clear(); + finish_client_errs(); + Logger::destroy(); + return 0; +} diff --git a/other/tcpclient_main.cpp b/other/tcpclient_main.cpp index 4e082bb..f25c4b2 100644 --- a/other/tcpclient_main.cpp +++ b/other/tcpclient_main.cpp @@ -64,7 +64,7 @@ int main(int argc, char* argv[] ) Logger::createInstance(); Logger::init(std::cout); - Logger::setLogLevel(Logger::DEBUG); + Logger::setLogLevel(Logger::FINEST); bool finished = false; @@ -89,7 +89,7 @@ int main(int argc, char* argv[] ) // wait for the complate &handled reply struct timespec tm = {0,1000}; - while ( !finished ) + while ( !finished && tcpclient.isPolling() ) nanosleep(&tm, &tm) ; tcpclient.disconnect(); diff --git a/src/WorkerThread.cpp b/src/WorkerThread.cpp new file mode 100644 index 0000000..6f6c49d --- /dev/null +++ b/src/WorkerThread.cpp @@ -0,0 +1,28 @@ +#include "WorkerThread.hpp" + +#include "Logger.hpp" + + +WorkerThread::WorkerThread( ThreadPool& tp ) + : m_tp(tp) +{ + TRACE; +} + + +void* WorkerThread::run() +{ + TRACE; + while ( m_isRunning ) + { + Task* task(0); + try { + task = m_tp.popTask(); + task->run(); + delete task; + } catch (CancelledException) { + LOG( Logger::FINEST, "Now I die."); + } + } + return 0; +}