switched bach to dependency injection from generic TCP client/server

master
Denes Matetelki 13 years ago
parent bc4301e1c1
commit 4acc2b31ed

@ -5,27 +5,26 @@
#include "Common.hpp" #include "Common.hpp"
#include "Socket.hpp" #include "Socket.hpp"
#include "Message.hpp"
#include <string> #include <string>
/** @todo make connection an iface and this class shall be a TcpConnection, /** @todo make connection an iface and this class shall be a TcpConnection,
* inherited from connection */ * inherited from connection */
template <typename T>
class Connection class Connection
{ {
public: public:
Connection ( const int socket, Connection ( const int socket,
void *msgParam = 0, Message *message,
const size_t bufferLength = 1024 ) const size_t bufferLength = 1024 )
: m_socket(socket) : m_socket(socket)
, m_host() , m_host()
, m_port() , m_port()
, m_message(this, msgParam) , m_message(message)
, m_buffer(0) , m_buffer(0)
, m_bufferLength(bufferLength) , m_bufferLength(bufferLength)
, m_msgParam(msgParam)
{ {
TRACE; TRACE;
@ -35,15 +34,14 @@ public:
Connection ( const std::string host, Connection ( const std::string host,
const std::string port, const std::string port,
void *msgParam = 0, Message *message,
const size_t bufferLength = 1024 ) const size_t bufferLength = 1024 )
: m_socket(AF_INET, SOCK_STREAM) : m_socket(AF_INET, SOCK_STREAM)
, m_host(host) , m_host(host)
, m_port(port) , m_port(port)
, m_message(this, msgParam) , m_message(message)
, m_buffer(0) , m_buffer(0)
, m_bufferLength(bufferLength) , m_bufferLength(bufferLength)
, m_msgParam(msgParam)
{ {
TRACE; TRACE;
m_socket.createSocket(); m_socket.createSocket();
@ -57,6 +55,16 @@ public:
delete[] m_buffer; delete[] m_buffer;
} }
Connection* create(const int socket)
{
TRACE;
Connection *conn = new Connection( socket,
m_message->clone(),
m_bufferLength);
conn->m_message->setConnection(conn);
return conn;
}
bool connectToHost() bool connectToHost()
{ {
TRACE; TRACE;
@ -107,7 +115,7 @@ public:
append(TToStr(length)).append(" bytes from: "). append(TToStr(length)).append(" bytes from: ").
append(m_host).append(":").append(m_port).c_str() ); append(m_host).append(":").append(m_port).c_str() );
return m_message.buildMessage( (void*)m_buffer, (size_t)length); return m_message->buildMessage( (void*)m_buffer, (size_t)length);
} }
@ -129,12 +137,6 @@ public:
return m_port; return m_port;
} }
void* getMsgParam() const
{
TRACE;
return m_msgParam;
}
private: private:
@ -144,11 +146,10 @@ private:
Socket m_socket; Socket m_socket;
std::string m_host; std::string m_host;
std::string m_port; std::string m_port;
T m_message; Message *m_message;
unsigned char *m_buffer; unsigned char *m_buffer;
size_t m_bufferLength; size_t m_bufferLength;
void *m_msgParam;
}; };

@ -1,7 +1,9 @@
#ifndef MESSAGE_HPP #ifndef MESSAGE_HPP
#define MESSAGE_HPP #define MESSAGE_HPP
#include "Connection.hpp" #include "Logger.hpp"
// #include "Connection.hpp"
#include <string> #include <string>
#include <stddef.h> // size_t #include <stddef.h> // size_t
@ -11,37 +13,56 @@
* getExpectedLength(). * getExpectedLength().
*/ */
template <typename T> class Connection;
class Message class Message
{ {
public: public:
Message( Connection<T> *connection, Message( Connection *connection,
void *msgParam = 0 ) void *msgParam = 0 )
: m_connection(connection) : m_connection(connection)
, m_param(msgParam) , m_param(msgParam)
, m_buffer() , m_buffer()
{}; {
TRACE;
};
Message( void *msgParam = 0 )
: m_connection(0)
, m_param(msgParam)
, m_buffer()
{
TRACE;
};
virtual ~Message() {}; virtual ~Message() {};
virtual Message* clone() = 0;
virtual bool buildMessage( const void *msgPart, virtual bool buildMessage( const void *msgPart,
const size_t msgLen ) = 0; const size_t msgLen ) = 0;
virtual void onMessageReady() = 0; virtual void onMessageReady() = 0;
void setConnection(Connection* conn )
{
TRACE;
m_connection = conn;
}
protected: protected:
virtual size_t getExpectedLength() = 0; virtual size_t getExpectedLength() = 0;
Connection<T> *m_connection; Connection *m_connection;
void *m_param; void *m_param;
std::string m_buffer; std::string m_buffer;
private: private:
Message<T>(const Message<T> &); Message(const Message &);
Message<T>& operator=(const Message<T> &); Message& operator=(const Message &);
}; };

@ -10,12 +10,11 @@
template <typename T>
class Poll class Poll
{ {
public: public:
Poll( Connection<T> *connection, Poll( Connection *connection,
const nfds_t maxClient = 10 ) const nfds_t maxClient = 10 )
: m_connection(connection) : m_connection(connection)
, m_polling(false) , m_polling(false)
@ -95,9 +94,7 @@ protected:
return; return;
} }
Connection<T> *connection = new Connection<T>( Connection *connection = m_connection->create(client_socket);
client_socket,
m_connection->getMsgParam() );
LOG( Logger::INFO, std::string("New client connected: "). LOG( Logger::INFO, std::string("New client connected: ").
append(connection->getHost()).append(":"). append(connection->getHost()).append(":").
@ -170,9 +167,9 @@ private:
} }
typedef typename std::map< int, Connection<T>* > ConnectionPool; typedef typename std::map< int, Connection* > ConnectionPool;
Connection<T> *m_connection; Connection *m_connection;
volatile bool m_polling; volatile bool m_polling;
ConnectionPool m_connectionPool; ConnectionPool m_connectionPool;

@ -12,18 +12,17 @@
#include <stddef.h> // size_t #include <stddef.h> // size_t
template <typename T>
class TcpClient class TcpClient
{ {
private: private:
template <typename U>
class PollerThread : public Thread class PollerThread : public Thread
, public Poll<U> , public Poll
{ {
public: public:
PollerThread( TcpClient<U> &data )
: Poll<U>( &(data.m_connection) ) PollerThread( TcpClient* data )
: Poll( &(data->m_connection) )
, m_tcpClient(data) , m_tcpClient(data)
{ {
TRACE; TRACE;
@ -32,39 +31,42 @@ private:
void stopPoller() void stopPoller()
{ {
TRACE; TRACE;
Poll<U>::stopPolling(); stopPolling();
stop(); stop();
} }
protected: protected:
// overridig poll's behaviour // overridig poll's behaviour
virtual void acceptClient() virtual void acceptClient()
{ {
TRACE; TRACE;
m_tcpClient.m_connection.receive(); m_tcpClient->m_connection.receive();
Poll<U>::stopPolling(); stopPolling();
} }
// overridig poll's behaviour // overridig poll's behaviour
virtual void handleClient( const int ) virtual void handleClient( const int )
{ {
TRACE; TRACE;
LOG( Logger::DEBUG, "Server closed the connection." ); LOG( Logger::DEBUG, "Server closed the connection." );
Poll<U>::stopPolling(); stopPolling();
} }
private: private:
PollerThread(const PollerThread&);
PollerThread& operator=(const PollerThread&);
void* run() void* run()
{ {
TRACE; TRACE;
Poll<U>::startPolling(); startPolling();
return 0; return 0;
} }
TcpClient<U> &m_tcpClient; TcpClient *m_tcpClient;
}; // class PollerThread }; // class PollerThread
@ -73,11 +75,13 @@ public:
TcpClient ( const std::string host, TcpClient ( const std::string host,
const std::string port, const std::string port,
void *msgParam = 0 ) Message *message )
: m_connection (host, port, msgParam) : m_connection (host, port, message)
, m_watcher(*this) , m_watcher(this)
{ {
TRACE; TRACE;
message->setConnection(&m_connection);
} }
virtual ~TcpClient() virtual ~TcpClient()
@ -127,8 +131,8 @@ private:
TcpClient& operator=(const TcpClient& ); TcpClient& operator=(const TcpClient& );
Connection<T> m_connection; Connection m_connection;
PollerThread<T> m_watcher; PollerThread m_watcher;
}; };

@ -5,26 +5,27 @@
#include "Connection.hpp" #include "Connection.hpp"
#include "Poll.hpp" #include "Poll.hpp"
#include "Message.hpp"
#include <string> #include <string>
template <typename T>
class TcpServer class TcpServer
{ {
public: public:
TcpServer ( const std::string host, TcpServer ( const std::string host,
const std::string port, const std::string port,
void *msgParam = 0, Message *message,
const int maxClients = 5, const int maxClients = 5,
const int maxPendingQueueLen = 10 ) const int maxPendingQueueLen = 10 )
: m_connection(host, port, msgParam) : m_connection(host, port, message)
, m_poll( &m_connection, maxClients) , m_poll( &m_connection, maxClients)
, m_maxPendingQueueLen(maxPendingQueueLen) , m_maxPendingQueueLen(maxPendingQueueLen)
{ {
TRACE; TRACE;
message->setConnection(&m_connection);
} }
virtual ~TcpServer() virtual ~TcpServer()
@ -60,8 +61,8 @@ private:
TcpServer(const TcpServer&); TcpServer(const TcpServer&);
TcpServer& operator=(const TcpServer&); TcpServer& operator=(const TcpServer&);
Connection<T> m_connection; Connection m_connection;
Poll<T> m_poll; Poll m_poll;
const int m_maxPendingQueueLen; const int m_maxPendingQueueLen;
}; };

@ -15,13 +15,12 @@
class SimpleMessage : public Message<SimpleMessage> class SimpleMessage : public Message
{ {
public: public:
SimpleMessage( Connection<SimpleMessage> *connection, SimpleMessage( void *msgParam = 0)
void *msgParam = 0) : Message(msgParam)
: Message<SimpleMessage>(connection, msgParam)
{ {
TRACE; TRACE;
} }
@ -45,6 +44,12 @@ public:
*( static_cast<bool*>(m_param) ) = true; *( static_cast<bool*>(m_param) ) = true;
} }
Message* clone()
{
TRACE;
return new SimpleMessage(m_param);
}
protected: protected:
size_t getExpectedLength() size_t getExpectedLength()
@ -58,7 +63,7 @@ protected:
int main(int argc, char* argv[] ) int main(int argc, char* argv[] )
{ {
if ( argc != 4 ) { if ( argc != 4 ) {
std::cerr << "Usage: client <HOST> <PORT> <MSG>" << std::endl; std::cerr << "Usage: " << argv[0] << " <HOST> <PORT> <MSG>" << std::endl;
return 1; return 1;
} }
@ -68,7 +73,11 @@ int main(int argc, char* argv[] )
bool finished = false; bool finished = false;
TcpClient<SimpleMessage> tcpclient(argv[1], argv[2], &finished); SimpleMessage msg(&finished);
TcpClient tcpclient( argv[1],
argv[2],
&msg);
if ( !tcpclient.connect() ) { if ( !tcpclient.connect() ) {
LOG( Logger::ERR, "Couldn't connect to server, exiting..." ); LOG( Logger::ERR, "Couldn't connect to server, exiting..." );

@ -10,13 +10,12 @@
#include <iostream> #include <iostream>
#include <string> #include <string>
class EchoMessage : public Message<EchoMessage> class EchoMessage : public Message
{ {
public: public:
EchoMessage( Connection<EchoMessage> *connection, EchoMessage( void *msgParam = 0)
void *msgParam = 0) : Message(msgParam)
: Message<EchoMessage>(connection, msgParam)
{ {
TRACE; TRACE;
} }
@ -47,6 +46,12 @@ public:
m_connection->send( reply.c_str(), reply.length() ); m_connection->send( reply.c_str(), reply.length() );
} }
Message* clone()
{
TRACE;
return new EchoMessage(m_param);
}
protected: protected:
size_t getExpectedLength() size_t getExpectedLength()
@ -57,14 +62,23 @@ protected:
}; };
int main() int main(int argc, char* argv[] )
{ {
if ( argc != 3 ) {
std::cerr << "Usage: " << argv[0] << " <HOST> <PORT>" << std::endl;
return 1;
}
Logger::createInstance(); Logger::createInstance();
Logger::init(std::cout); Logger::init(std::cout);
Logger::setLogLevel(Logger::INFO); Logger::setLogLevel(Logger::FINEST);
Logger::setNoPrefix(); // Logger::setNoPrefix();
EchoMessage msg;
TcpServer<EchoMessage> tcpServer("localhost", "4455"); TcpServer tcpServer( argv[1],
argv[2],
&msg );
if ( !tcpServer.start() ) { if ( !tcpServer.start() ) {
LOG( Logger::ERR, "Failed to start TCP server, exiting..."); LOG( Logger::ERR, "Failed to start TCP server, exiting...");

Loading…
Cancel
Save