declarations and definitions are splitted to hpp and cpp files

master
Denes Matetelki 13 years ago
parent 4acc2b31ed
commit 271f8c25f4

@ -1,9 +1,6 @@
#ifndef CONNECTION_HPP
#define CONNECTION_HPP
#include "Logger.hpp"
#include "Common.hpp"
#include "Socket.hpp"
#include "Message.hpp"
@ -18,124 +15,28 @@ public:
Connection ( const int socket,
Message *message,
const size_t bufferLength = 1024 )
: m_socket(socket)
, m_host()
, m_port()
, m_message(message)
, m_buffer(0)
, m_bufferLength(bufferLength)
{
TRACE;
m_socket.getPeerName(m_host, m_port);
m_buffer = new unsigned char[m_bufferLength];
}
const size_t bufferLength = 1024 );
Connection ( const std::string host,
const std::string port,
Message *message,
const size_t bufferLength = 1024 )
: m_socket(AF_INET, SOCK_STREAM)
, m_host(host)
, m_port(port)
, m_message(message)
, m_buffer(0)
, m_bufferLength(bufferLength)
{
TRACE;
m_socket.createSocket();
m_buffer = new unsigned char[m_bufferLength];
}
virtual ~Connection()
{
TRACE;
m_socket.closeSocket();
delete[] m_buffer;
}
Connection* create(const int socket)
{
TRACE;
Connection *conn = new Connection( socket,
m_message->clone(),
m_bufferLength);
conn->m_message->setConnection(conn);
return conn;
}
bool connectToHost()
{
TRACE;
return m_socket.connectToHost(m_host, m_port);
}
bool bindToHost()
{
TRACE;
return m_socket.bindToHost(m_host, m_port);
}
bool listen( const int maxPendingQueueLen = 64 )
{
TRACE;
return m_socket.listen( maxPendingQueueLen );
}
void closeConnection()
{
TRACE;
m_socket.closeSocket();
}
bool send( const void* message, const size_t length )
{
TRACE;
return m_socket.send( message, length );
}
bool receive()
{
TRACE;
ssize_t length;
if ( !m_socket.receive(m_buffer, m_bufferLength, &length) ) {
if (length == -1) {
LOG( Logger::ERR, errnoToString("ERROR reading from socket. ").c_str() );
}
else if (length == 0) {
LOG( Logger::INFO, std::string("Connection closed by ").
append(m_host).append(":").append(m_port).c_str() );
}
return false;
}
LOG ( Logger::DEBUG, std::string("Received: ").
append(TToStr(length)).append(" bytes from: ").
append(m_host).append(":").append(m_port).c_str() );
return m_message->buildMessage( (void*)m_buffer, (size_t)length);
}
int getSocket() const
{
TRACE;
return m_socket.getSocket();
}
std::string getHost() const
{
TRACE;
return m_host;
}
std::string getPort() const
{
TRACE;
return m_port;
}
const size_t bufferLength = 1024 );
virtual ~Connection();
Connection* create(const int socket);
bool connectToHost();
bool bindToHost();
bool listen( const int maxPendingQueueLen = 64 );
void closeConnection();
bool send( const void* message, const size_t length );
bool receive();
int getSocket() const;
std::string getHost() const;
std::string getPort() const;
private:

@ -3,7 +3,6 @@
#include "Logger.hpp"
// #include "Connection.hpp"
#include <string>
#include <stddef.h> // size_t

@ -1,8 +1,6 @@
#ifndef POLL_HPP
#define POLL_HPP
#include "Logger.hpp"
#include "Connection.hpp"
#include <poll.h>
@ -15,109 +13,23 @@ class Poll
public:
Poll( Connection *connection,
const nfds_t maxClient = 10 )
: m_connection(connection)
, m_polling(false)
, m_connectionPool()
, m_maxclients(maxClient)
, m_fds(0)
, m_num_of_fds(0)
{
TRACE;
m_fds = new pollfd[m_maxclients+1]; // plus the server socket
addFd( m_connection->getSocket(), POLLIN | POLLPRI );
}
virtual ~Poll()
{
TRACE;
delete[] m_fds;
}
void startPolling()
{
TRACE;
m_polling = true;
struct timespec tm = {0,1000};
while ( m_polling ) {
nanosleep(&tm, &tm) ;
int ret = poll( m_fds , m_maxclients, 1000);
if ( ret == -1 ) {
LOG( Logger::ERR, errnoToString("ERROR polling. ").c_str() );
/// @todo reconnect
return;
}
if ( ret == 0 ) // timeout
continue;
for ( nfds_t i = 0; i < m_num_of_fds; ++i )
if ( m_fds[i].revents != 0 )
m_fds[i].fd == m_connection->getSocket() ?
acceptClient() :
handleClient(m_fds[i].fd);
} // while
}
void stopPolling()
{
TRACE;
m_polling = false;
}
bool isPolling() const
{
TRACE;
return m_polling;
}
protected:
// can be overriden: behaviour alters in server/client
virtual void acceptClient()
{
TRACE;
sockaddr clientAddr;
socklen_t clientAddrLen;
int client_socket = accept( m_connection->getSocket(),
&clientAddr, &clientAddrLen ) ;
const nfds_t maxClient = 10 );
if ( client_socket == -1 ) {
LOG( Logger::ERR, errnoToString("ERROR accepting. ").c_str() );
return;
}
virtual ~Poll();
Connection *connection = m_connection->create(client_socket);
void startPolling();
void stopPolling();
LOG( Logger::INFO, std::string("New client connected: ").
append(connection->getHost()).append(":").
append(connection->getPort()).c_str() );
bool isPolling() const;
m_connectionPool[client_socket] = connection;
addFd( client_socket, POLLIN | POLLPRI );
}
protected:
// can be overriden: behaviour alters in server/client
virtual void handleClient( const int socket )
{
TRACE;
virtual void acceptClient();
typename ConnectionPool::iterator it = m_connectionPool.find(socket);
if ( it == m_connectionPool.end() || !it->second->receive() ) {
delete it->second;
m_connectionPool.erase(it);
removeFd(socket);
}
}
// can be overriden: behaviour alters in server/client
virtual void handleClient( const int socket );
private:
@ -125,46 +37,8 @@ private:
Poll(const Poll&);
Poll& operator=(const Poll&);
bool addFd( const int socket, const short events )
{
TRACE;
LOG( Logger::DEBUG, std::string("Adding socket: ").
append(TToStr(socket)).c_str() );
if (m_num_of_fds >= m_maxclients )
return false;
m_fds[m_num_of_fds].fd = socket;
m_fds[m_num_of_fds].events = events;
m_fds[m_num_of_fds].revents = 0;
m_num_of_fds++;
return true;
}
bool removeFd( const int socket )
{
TRACE;
LOG( Logger::DEBUG, std::string("Removing socket: ").
append(TToStr(socket)).c_str() );
unsigned int i = 0 ;
while (i < m_maxclients && m_fds[i].fd != socket )
i++;
if ( i == m_maxclients )
return false;
for ( ; i < m_maxclients - 1; ++i )
m_fds[i] = m_fds[i+1] ;
m_fds[i].fd = 0 ;
m_fds[i].events = 0 ;
m_fds[i].revents = 0 ;
m_num_of_fds--;
return true;
}
bool addFd( const int socket, const short events );
bool removeFd( const int socket );
typedef typename std::map< int, Connection* > ConnectionPool;

@ -1,7 +1,6 @@
#ifndef TCP_CLIENT_HPP
#define TCP_CLIENT_HPP
#include "Logger.hpp"
#include "Connection.hpp"
#include "Thread.hpp"
@ -21,50 +20,24 @@ private:
{
public:
PollerThread( TcpClient* data )
: Poll( &(data->m_connection) )
, m_tcpClient(data)
{
TRACE;
}
PollerThread( TcpClient* data );
void stopPoller()
{
TRACE;
stopPolling();
stop();
}
void stopPoller();
protected:
// overridig poll's behaviour
virtual void acceptClient()
{
TRACE;
m_tcpClient->m_connection.receive();
stopPolling();
}
virtual void acceptClient();
// overridig poll's behaviour
virtual void handleClient( const int )
{
TRACE;
LOG( Logger::DEBUG, "Server closed the connection." );
stopPolling();
}
virtual void handleClient( const int );
private:
PollerThread(const PollerThread&);
PollerThread& operator=(const PollerThread&);
void* run()
{
TRACE;
startPolling();
return 0;
}
void* run();
TcpClient *m_tcpClient;
@ -75,55 +48,16 @@ public:
TcpClient ( const std::string host,
const std::string port,
Message *message )
: m_connection (host, port, message)
, m_watcher(this)
{
TRACE;
message->setConnection(&m_connection);
}
virtual ~TcpClient()
{
TRACE;
disconnect();
}
bool connect()
{
TRACE;
Message *message );
if ( !m_connection.connectToHost() )
return false;
virtual ~TcpClient();
m_watcher.start();
return true;
}
void disconnect()
{
TRACE;
bool connect();
void disconnect();
if ( m_watcher.isRunning() ) {
m_watcher.stopPoller();
m_watcher.join();
}
bool send( const void* msg, const size_t msgLen );
m_connection.closeConnection();
}
bool send( const void* msg, const size_t msgLen )
{
TRACE;
return m_connection.send(msg, msgLen);
}
bool isPolling() const
{
TRACE;
return m_watcher.isPolling();
}
bool isPolling() const;
private:

@ -1,8 +1,6 @@
#ifndef TCP_SERVER_HPP
#define TCP_SERVER_HPP
#include "Logger.hpp"
#include "Connection.hpp"
#include "Poll.hpp"
#include "Message.hpp"
@ -18,42 +16,12 @@ public:
const std::string port,
Message *message,
const int maxClients = 5,
const int maxPendingQueueLen = 10 )
: m_connection(host, port, message)
, m_poll( &m_connection, maxClients)
, m_maxPendingQueueLen(maxPendingQueueLen)
{
TRACE;
message->setConnection(&m_connection);
}
virtual ~TcpServer()
{
TRACE;
}
bool start()
{
TRACE;
if ( !m_connection.bindToHost() )
return false;
if ( m_connection.listen( m_maxPendingQueueLen ) == -1 ) {
return false;
}
const int maxPendingQueueLen = 10 );
m_poll.startPolling();
return true;
}
virtual ~TcpServer();
void stop()
{
TRACE;
m_poll.stopPolling();
m_connection.closeConnection();
}
bool start();
void stop();
private:

@ -0,0 +1,166 @@
#include "Logger.hpp"
#include "Common.hpp"
#include <iostream>
#include "ArgParse.hpp"
void setUpArgs(ArgParse &argParse)
{
TRACE_STATIC;
argParse.addArgument("--host",
"MySQL server hostname/IP",
ArgParse::STRING,
ArgParse::REQUIRED,
ArgParse::REQUIRED);
argParse.addArgument("-u, --user",
"MsSQL username",
ArgParse::STRING,
ArgParse::REQUIRED,
ArgParse::REQUIRED );
argParse.addArgument("-db, --database",
"MySQL database",
ArgParse::STRING,
ArgParse::REQUIRED,
ArgParse::REQUIRED );
argParse.addArgument("-p, --password",
"MySQL password",
ArgParse::STRING,
ArgParse::REQUIRED,
ArgParse::REQUIRED );
argParse.addArgument("-n, --number-of-connections",
"MySQL connections in connection pool. Default is 5",
ArgParse::INT );
argParse.addArgument("--port",
"Listening port. Default is 4455",
ArgParse::INT );
argParse.addArgument("-cl, --clients",
"Maximum number of served clients. Default is 5.",
ArgParse::INT );
argParse.addArgument("--pending",
"Maximum number of pending clients. Default is 5.",
ArgParse::INT );
argParse.addArgument("-t, --worker-threads",
"Number of worker threads. Default is 5.",
ArgParse::INT );
}
void getArgs( int argc, char* argv[],
ArgParse &argParse,
std::string &host,
std::string &user,
std::string &db,
std::string &pass,
int &conns,
int &port,
int &clients,
int &pending,
int &threads
)
{
TRACE_STATIC;
argParse.parseArgs(argc, argv);
argParse.argAsString("--host", host);
argParse.argAsString("-u, --user", user);
argParse.argAsString("-db, --database", db);
argParse.argAsString("-p, --password", pass);
argParse.argAsInt("-n, --number-of-connections", conns);
argParse.argAsInt("--port", port);
argParse.argAsInt("-cl, --clients", clients);
argParse.argAsInt("--pending", pending);
argParse.argAsInt("-t, --worker-threads", threads);
}
bool checkArgs( int argc, char* argv[],
ArgParse &argParse,
std::string &host,
std::string &user,
std::string &db,
std::string &pass,
int &conns,
int &port,
int &clients,
int &pending,
int &threads
)
{
TRACE_STATIC;
if ( argc == 1 || ( argc == 2 && argv[1][0] != '-' ) ) {
std::cout << argParse.usage() << std::endl;
return false;
}
try {
getArgs( argc, argv,
argParse,
host, user, db, pass,
conns, port, clients, pending, threads );
} catch (std::runtime_error e) {
if ( argParse.foundArg("-h, --help") ) {
std::cout << argParse.usage() << std::endl;
return false;
}
std::cerr << e.what() << std::endl
<< "Check usage: " << argv[0] << " --help" << std::endl;
return false;
}
if ( argParse.foundArg("-h, --help") ) {
std::cout << argParse.usage() << std::endl;
return false;
}
return true;
}
void printResults(std::list<std::string> &results)
{
TRACE_STATIC;
LOG ( Logger::DEBUG, std::string("Got query result number of rows: ").
append(TToStr(results.size())).c_str() );
for (std::list<std::string>::const_iterator it = results.begin();
it != results.end();
++it ) {
LOG ( Logger::DEBUG, (*it).c_str() );
}
}
int main(int argc, char* argv[] )
{
Logger::createInstance();
Logger::init(std::cout);
Logger::setLogLevel(Logger::FINEST);
// args
ArgParse argParse("TCP server wrapper on a MySQL client",
"Report bugs to: denes.matetelki@gmail.com");
setUpArgs(argParse);
std::string host, user, db, pass;
int conns(5), port(4455), clients(5), pending(5), threads(5);
if ( !checkArgs(argc, argv, argParse,
host, user, db, pass,
conns, port, clients, pending, threads ) )
return 1;
Logger::destroy();
return 0;
}

@ -0,0 +1,136 @@
#include "Connection.hpp"
#include "Logger.hpp"
#include "Common.hpp"
Connection::Connection ( const int socket,
Message *message,
const size_t bufferLength )
: m_socket(socket)
, m_host()
, m_port()
, m_message(message)
, m_buffer(0)
, m_bufferLength(bufferLength)
{
TRACE;
m_socket.getPeerName(m_host, m_port);
m_buffer = new unsigned char[m_bufferLength];
}
Connection::Connection ( const std::string host,
const std::string port,
Message *message,
const size_t bufferLength )
: m_socket(AF_INET, SOCK_STREAM)
, m_host(host)
, m_port(port)
, m_message(message)
, m_buffer(0)
, m_bufferLength(bufferLength)
{
TRACE;
m_socket.createSocket();
m_buffer = new unsigned char[m_bufferLength];
}
Connection::~Connection()
{
TRACE;
m_socket.closeSocket();
delete[] m_buffer;
}
Connection* Connection::create(const int socket)
{
TRACE;
Connection *conn = new Connection( socket,
m_message->clone(),
m_bufferLength);
conn->m_message->setConnection(conn);
return conn;
}
bool Connection::connectToHost()
{
TRACE;
return m_socket.connectToHost(m_host, m_port);
}
bool Connection::bindToHost()
{
TRACE;
return m_socket.bindToHost(m_host, m_port);
}
bool Connection::listen( const int maxPendingQueueLen)
{
TRACE;
return m_socket.listen( maxPendingQueueLen );
}
void Connection::closeConnection()
{
TRACE;
m_socket.closeSocket();
}
bool Connection::send( const void* message, const size_t length )
{
TRACE;
return m_socket.send( message, length );
}
bool Connection::receive()
{
TRACE;
ssize_t length;
if ( !m_socket.receive(m_buffer, m_bufferLength, &length) ) {
if (length == -1) {
LOG( Logger::ERR, errnoToString("ERROR reading from socket. ").c_str() );
}
else if (length == 0) {
LOG( Logger::INFO, std::string("Connection closed by ").
append(m_host).append(":").append(m_port).c_str() );
}
return false;
}
LOG ( Logger::DEBUG, std::string("Received: ").
append(TToStr(length)).append(" bytes from: ").
append(m_host).append(":").append(m_port).c_str() );
return m_message->buildMessage( (void*)m_buffer, (size_t)length);
}
int Connection::getSocket() const
{
TRACE;
return m_socket.getSocket();
}
std::string Connection::getHost() const
{
TRACE;
return m_host;
}
std::string Connection::getPort() const
{
TRACE;
return m_port;
}

@ -0,0 +1,153 @@
#include "Poll.hpp"
#include "Logger.hpp"
#include "Common.hpp"
Poll::Poll( Connection *connection,
const nfds_t maxClient )
: m_connection(connection)
, m_polling(false)
, m_connectionPool()
, m_maxclients(maxClient)
, m_fds(0)
, m_num_of_fds(0)
{
TRACE;
m_fds = new pollfd[m_maxclients+1]; // plus the server socket
addFd( m_connection->getSocket(), POLLIN | POLLPRI );
}
Poll::~Poll()
{
TRACE;
delete[] m_fds;
}
void Poll::startPolling()
{
TRACE;
m_polling = true;
struct timespec tm = {0,1000};
while ( m_polling ) {
nanosleep(&tm, &tm) ;
int ret = poll( m_fds , m_maxclients, 1000);
if ( ret == -1 ) {
LOG( Logger::ERR, errnoToString("ERROR polling. ").c_str() );
/// @todo reconnect
return;
}
if ( ret == 0 ) // timeout
continue;
for ( nfds_t i = 0; i < m_num_of_fds; ++i )
if ( m_fds[i].revents != 0 )
m_fds[i].fd == m_connection->getSocket() ?
acceptClient() :
handleClient(m_fds[i].fd);
} // while
}
void Poll::stopPolling()
{
TRACE;
m_polling = false;
}
bool Poll::isPolling() const
{
TRACE;
return m_polling;
}
void Poll::acceptClient()
{
TRACE;
sockaddr clientAddr;
socklen_t clientAddrLen;
int client_socket = accept( m_connection->getSocket(),
&clientAddr, &clientAddrLen ) ;
if ( client_socket == -1 ) {
LOG( Logger::ERR, errnoToString("ERROR accepting. ").c_str() );
return;
}
Connection *connection = m_connection->create(client_socket);
LOG( Logger::INFO, std::string("New client connected: ").
append(connection->getHost()).append(":").
append(connection->getPort()).c_str() );
m_connectionPool[client_socket] = connection;
addFd( client_socket, POLLIN | POLLPRI );
}
void Poll::handleClient( const int socket )
{
TRACE;
typename ConnectionPool::iterator it = m_connectionPool.find(socket);
if ( it == m_connectionPool.end() || !it->second->receive() ) {
delete it->second;
m_connectionPool.erase(it);
removeFd(socket);
}
}
bool Poll::addFd( const int socket, const short events )
{
TRACE;
LOG( Logger::DEBUG, std::string("Adding socket: ").
append(TToStr(socket)).c_str() );
if (m_num_of_fds >= m_maxclients )
return false;
m_fds[m_num_of_fds].fd = socket;
m_fds[m_num_of_fds].events = events;
m_fds[m_num_of_fds].revents = 0;
m_num_of_fds++;
return true;
}
bool Poll::removeFd( const int socket )
{
TRACE;
LOG( Logger::DEBUG, std::string("Removing socket: ").
append(TToStr(socket)).c_str() );
unsigned int i = 0 ;
while (i < m_maxclients && m_fds[i].fd != socket )
i++;
if ( i == m_maxclients )
return false;
for ( ; i < m_maxclients - 1; ++i )
m_fds[i] = m_fds[i+1] ;
m_fds[i].fd = 0 ;
m_fds[i].events = 0 ;
m_fds[i].revents = 0 ;
m_num_of_fds--;
return true;
}

@ -0,0 +1,106 @@
#include "TcpClient.hpp"
#include "Logger.hpp"
// PollerThread
TcpClient::PollerThread::PollerThread( TcpClient* data )
: Poll( &(data->m_connection) )
, m_tcpClient(data)
{
TRACE;
}
void TcpClient::PollerThread::stopPoller()
{
TRACE;
stopPolling();
stop();
}
void TcpClient::PollerThread::acceptClient()
{
TRACE;
m_tcpClient->m_connection.receive();
stopPolling();
}
void TcpClient::PollerThread::handleClient( const int )
{
TRACE;
LOG( Logger::DEBUG, "Server closed the connection." );
stopPolling();
}
void* TcpClient::PollerThread::run()
{
TRACE;
startPolling();
return 0;
}
// TcpClient
TcpClient::TcpClient ( const std::string host,
const std::string port,
Message *message )
: m_connection (host, port, message)
, m_watcher(this)
{
TRACE;
message->setConnection(&m_connection);
}
TcpClient::~TcpClient()
{
TRACE;
disconnect();
}
bool TcpClient::connect()
{
TRACE;
if ( !m_connection.connectToHost() )
return false;
m_watcher.start();
return true;
}
void TcpClient::disconnect()
{
TRACE;
if ( m_watcher.isRunning() ) {
m_watcher.stopPoller();
m_watcher.join();
}
m_connection.closeConnection();
}
bool TcpClient::send( const void* msg, const size_t msgLen )
{
TRACE;
return m_connection.send(msg, msgLen);
}
bool TcpClient::isPolling() const
{
TRACE;
return m_watcher.isPolling();
}

@ -1,20 +1,19 @@
#include "TcpServer.hpp"
#include "Logger.hpp"
#include "Common.hpp"
#include "MessageBuilder.hpp"
TcpServer::TcpServer( const std::string host,
TcpServer::TcpServer ( const std::string host,
const std::string port,
const int maxClients )
: Socket(AF_INET, SOCK_STREAM)
, Poll(m_socket, maxClients)
, m_host(host)
, m_port(port)
Message *message,
const int maxClients,
const int maxPendingQueueLen )
: m_connection(host, port, message)
, m_poll( &m_connection, maxClients)
, m_maxPendingQueueLen(maxPendingQueueLen)
{
TRACE;
message->setConnection(&m_connection);
}
@ -28,35 +27,14 @@ bool TcpServer::start()
{
TRACE;
if ( !openSocket() )
if ( !m_connection.bindToHost() )
return false;
// // Set the socket REUSABLE flag for the quick restart ability.
// if (setsockopt(m_server_socket, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
// {
// errorArrived( EC_SETSOCKOPT ) ;
// }
// // Set the socket NONBLOCKING flag for polling.
// if (-1 == (fc_flags = fcntl(m_server_socket, F_GETFL, 0)))
// {
// fc_flags = 0;
// }
// fcntl(m_server_socket, F_SETFL, fc_flags | O_NONBLOCK);
if ( !bindToHost(m_host, m_port) )
return false;
if ( listen(m_socket, 64) == -1 ) {
LOG( Logger::ERR, errnoToString("ERROR listening. ").c_str() );
if ( m_connection.listen( m_maxPendingQueueLen ) == -1 ) {
return false;
}
setOwnSocket(m_socket);
startPolling();
m_poll.startPolling();
return true;
}
@ -64,37 +42,6 @@ bool TcpServer::start()
void TcpServer::stop()
{
TRACE;
stopPolling();
closeSocket();
}
bool TcpServer::receive(const int clientSocket)
{
TRACE;
unsigned char buffer[10];
int len = recv( clientSocket, buffer , 10, 0) ;
if (len == -1) {
LOG( Logger::ERR, errnoToString("ERROR reading from socket. ").c_str() );
return false;
}
if (len == 0) {
LOG( Logger::DEBUG, "Connection closed by peer." );
return false;
}
MessageBuilder *m_builder(0);
if ( !m_builder ) {
msgArrived(clientSocket, buffer, len);
return true;
}
return m_builder->buildMessage(buffer, len);
return true;
m_poll.stopPolling();
m_connection.closeConnection();
}

Loading…
Cancel
Save