TcpClient, Poll, Connection went generic

master
Denes Matetelki 13 years ago
parent ede802cc36
commit 8816e1989f

@ -0,0 +1,136 @@
#ifndef CONNECTION_HPP
#define CONNECTION_HPP
#include "Logger.hpp"
#include "Common.hpp"
#include "Socket.hpp"
#include <string>
template <typename T>
class Connection
{
public:
enum Status {
OPENED,
CLOSED
};
Connection ( const int socket,
const size_t bufferLength = 1024 )
: m_socket(socket)
, m_host()
, m_port()
, m_status(CLOSED)
, m_message()
, m_buffer(0)
, m_bufferLength(bufferLength)
{
TRACE;
m_socket.getPeerName(m_host, m_port);
m_buffer = new unsigned char[m_bufferLength];
}
Connection ( const std::string host,
const std::string port,
const size_t bufferLength = 1024 )
: m_socket(AF_INET, SOCK_STREAM)
, m_host(host)
, m_port(port)
, m_status(CLOSED)
, m_message()
, m_buffer(0)
, m_bufferLength(bufferLength)
{
TRACE;
m_socket.createSocket();
m_buffer = new unsigned char[m_bufferLength];
}
virtual ~Connection()
{
TRACE;
m_socket.closeSocket();
delete[] m_buffer;
}
bool connectToHost()
{
TRACE;
return m_socket.connectToHost(m_host, m_port);
}
bool bindToHost()
{
TRACE;
return m_socket.bindToHost(m_host, m_port);
}
void closeConnection()
{
TRACE;
m_socket.closeSocket();
}
bool send( const void* message, const size_t length )
{
TRACE;
return m_socket.send( message, length );
}
bool receive()
{
TRACE;
LOG ( Logger::DEBUG, std::string("receving on socket: ").
append(TToStr(m_socket.getSocket())).c_str() );
ssize_t len = recv(m_socket.getSocket(), m_buffer, m_bufferLength, 0);
LOG ( Logger::DEBUG, std::string("len: ").
append(TToStr(len)).append(" errno: ").
append(TToStr(errno)).c_str() );
if (len == -1) {
LOG( Logger::ERR, errnoToString("ERROR reading from socket. ").c_str() );
return false;
}
if (len == 0) {
LOG( Logger::DEBUG, "Connection closed by peer." );
return false;
}
return m_message.buildMessage( (void*)m_buffer, (size_t)len);
}
int getSocket() const
{
TRACE;
return m_socket.getSocket();
}
private:
Connection(const Connection&);
Connection& operator=(const Connection&);
Socket m_socket;
std::string m_host;
std::string m_port;
Status m_status;
T m_message;
unsigned char *m_buffer;
size_t m_bufferLength;
};
#endif // CONNECTION_HPP

@ -2,9 +2,9 @@
#define MESSAGE_HPP #define MESSAGE_HPP
#include <string> #include <string>
#include <stddef.h> // size_t
/** Append msgParts with buildMessage() to m_buffer.
/** Append messageParts with buildMessage() to m_buffer.
* Call onMessageReady() if the length of the buffer equals the value from * Call onMessageReady() if the length of the buffer equals the value from
* getExpectedLength(). * getExpectedLength().
*/ */
@ -16,13 +16,13 @@ public:
Message() : m_buffer() {}; Message() : m_buffer() {};
virtual ~Message() {}; virtual ~Message() {};
virtual bool buildMessage( const unsigned char* messagePart, virtual bool buildMessage( const void *msgPart,
const int length ) = 0; const size_t msgLen ) = 0;
virtual void onMessageReady() = 0; virtual void onMessageReady() = 0;
protected: protected:
virtual int getExpectedLength() = 0; virtual size_t getExpectedLength() = 0;
/// @todo shall i use dinamic array? /// @todo shall i use dinamic array?
std::string m_buffer; std::string m_buffer;

@ -1,28 +1,119 @@
#ifndef POLL_HPP #ifndef POLL_HPP
#define POLL_HPP #define POLL_HPP
#include "Logger.hpp"
#include "Connection.hpp"
#include <poll.h> #include <poll.h>
#include <map>
template <typename T>
class Poll class Poll
{ {
public: public:
Poll( int &socket, const nfds_t maxClient = 10 ); Poll( Connection<T> &connection,
virtual ~Poll(); const nfds_t maxClient = 10,
void *MessageParam )
)
: m_connection(connection)
, m_polling(false)
, m_connectionPool()
, m_maxclients(maxClient)
, m_fds(0)
, m_num_of_fds(0)
{
TRACE;
m_fds = new pollfd[m_maxclients];
addFd( m_connection.getSocket(), POLLIN | POLLPRI );
}
virtual ~Poll()
{
TRACE;
delete[] m_fds;
}
void startPolling()
{
TRACE;
m_polling = true;
struct timespec tm = {0,1000};
while ( m_polling ) {
nanosleep(&tm, &tm) ;
int ret = poll( m_fds , m_maxclients, 1000);
if ( ret == -1 ) {
LOG( Logger::ERR, errnoToString("ERROR polling. ").c_str() );
/// @todo reconnect
return;
}
if ( ret == 0 ) // timeout
continue;
for ( nfds_t i = 0; i < m_num_of_fds; ++i )
if ( m_fds[i].revents != 0 )
m_fds[i].fd == m_connection.getSocket() ?
acceptClient(m_fds[i].fd) :
handleClient(m_fds[i].fd);
} // while
}
void stopPolling()
{
TRACE;
m_polling = false;
}
void setOwnSocket( const int socket );
void startPolling();
void stopPolling();
virtual void acceptClient(); protected:
virtual void handleClient( const int socket );
virtual bool receive( const int socket ) = 0; virtual void acceptClient( const int socket )
{
TRACE;
sockaddr clientAddr;
socklen_t clientAddrLen;
int client_socket = accept( m_connection.getSocket(),
&clientAddr, &clientAddrLen ) ;
protected: if ( client_socket == -1 ) {
LOG( Logger::ERR, errnoToString("ERROR accepting. ").c_str() );
} else {
bool m_polling; std::string clientAddress, clientService;
if ( Socket::convertNameInfo(&clientAddr, clientAddrLen,
clientAddress, clientService ) ) {
LOG( Logger::DEBUG, std::string("New client connected: ").
append(clientAddress).append(":").
append(clientService).c_str() );
}
m_connectionPool[client_socket] = new Connection<T>(client_socket);
addFd( client_socket, POLLIN | POLLPRI );
}
}
virtual void handleClient( const int socket )
{
TRACE;
typename ConnectionPool::iterator it = m_connectionPool.find(socket);
if ( it == m_connectionPool.end() || !it->second->receive() ) {
delete it->second;
m_connectionPool.erase(it);
removeFd(socket);
}
}
private: private:
@ -30,10 +121,54 @@ private:
Poll(const Poll&); Poll(const Poll&);
Poll& operator=(const Poll&); Poll& operator=(const Poll&);
bool addFd( const int socket, const short events ); bool addFd( const int socket, const short events )
bool removeFd( const int socket ); {
TRACE;
LOG( Logger::DEBUG, std::string("Adding socket: ").
append(TToStr(socket)).c_str() );
if (m_num_of_fds >= m_maxclients )
return false;
m_fds[m_num_of_fds].fd = socket;
m_fds[m_num_of_fds].events = events;
m_fds[m_num_of_fds].revents = 0;
m_num_of_fds++;
return true;
}
bool removeFd( const int socket )
{
TRACE;
LOG( Logger::DEBUG, std::string("Removing socket: ").
append(TToStr(socket)).c_str() );
unsigned int i = 0 ;
while (i < m_maxclients && m_fds[i].fd != socket )
i++;
if ( i == m_maxclients )
return false;
for ( ; i < m_maxclients - 1; ++i )
m_fds[i] = m_fds[i+1] ;
m_fds[i].fd = 0 ;
m_fds[i].events = 0 ;
m_fds[i].revents = 0 ;
m_num_of_fds--;
return true;
}
typedef typename std::map< int, Connection<T>* > ConnectionPool;
Connection<T> &m_connection;
bool m_polling;
ConnectionPool m_connectionPool;
int &m_pollSocket;
nfds_t m_maxclients; nfds_t m_maxclients;
pollfd *m_fds; pollfd *m_fds;
nfds_t m_num_of_fds; nfds_t m_num_of_fds;

@ -27,12 +27,11 @@ public:
bool bindToHost(const std::string host, bool bindToHost(const std::string host,
const std::string port ); const std::string port );
bool getPeerName(const int socket, void getPeerName(std::string &host,
std::string &host,
std::string &port); std::string &port);
bool send( const void *message, const int lenght ); bool send( const void *message, const int lenght );
int& getSocket() const; int getSocket() const;
static bool convertNameInfo( sockaddr* addr, static bool convertNameInfo( sockaddr* addr,
socklen_t addrLen, socklen_t addrLen,

@ -1,53 +1,137 @@
#ifndef TCP_CLIENT_HPP #ifndef TCP_CLIENT_HPP
#define TCP_CLIENT_HPP #define TCP_CLIENT_HPP
#include "TcpConnection.hpp" #include "Logger.hpp"
#include "Connection.hpp"
#include "Thread.hpp" #include "Thread.hpp"
#include "Poll.hpp" #include "Poll.hpp"
#include <string> #include <string>
#include <stddef.h> // size_t
template <typename T>
class TcpClient class TcpClient
{ {
private: private:
class WatcherThread : public Thread template <typename U>
, public Poll class PollerThread : public Thread
, public Poll<U>
{ {
public: public:
WatcherThread( TcpClient &data ); PollerThread( TcpClient<U> &data )
: Poll<U>(data.m_connection)
, m_tcpClient(data)
{
TRACE;
}
// overringind Poll's accept behaviour void stopPoller()
void acceptClient(); {
void handleClient( const int fd ); TRACE;
bool receive( const int fd ); stopPolling();
stop();
}
protected:
/// @todo this is unclear and nasty hack
virtual void acceptClient( const int socket )
{
TRACE;
LOG( Logger::DEBUG, std::string("own socket: ").
append( TToStr(m_tcpClient.m_connection.getSocket())).
append( " param socket: ").
append( TToStr( socket) ).c_str() );
m_tcpClient.m_connection.receive();
stopPolling();
}
/// @todo this is unclear and nasty hack
virtual void handleClient( const int socket )
{
TRACE;
LOG( Logger::DEBUG, "Server closed the connection." );
stopPolling();
}
private: private:
void* run();
TcpClient &m_tcpClient; void* run()
}; {
TRACE;
startPolling();
return 0;
}
TcpClient<U> &m_tcpClient;
}; // class PollerThread
public: public:
TcpClient ( const std::string host, TcpClient ( const std::string host,
const std::string port ); const std::string port )
: m_connection (host, port)
, m_watcher(*this)
{
TRACE;
}
virtual ~TcpClient(); virtual ~TcpClient()
{
TRACE;
disconnect();
}
bool connect(); bool connect()
void disconnect(); {
TRACE;
bool send( const void* message, const int length ); if ( !m_connection.connectToHost() )
return false;
m_watcher.start();
return true;
}
void disconnect()
{
TRACE;
if ( m_watcher.isRunning() ) {
m_watcher.stopPoller();
m_watcher.join();
}
m_connection.closeConnection();
}
bool send( const void* msg, const size_t msgLen )
{
TRACE;
return m_connection.send(msg, msgLen);
}
private: private:
// virtual void onDisconnect() = 0; TcpClient(const TcpClient& );
TcpClient& operator=(const TcpClient& );
Connection<T>& getConnection()
{
TRACE;
return m_connection;
}
TcpConnection m_connection; Connection<T> m_connection;
WatcherThread m_watcher; PollerThread<T> m_watcher;
}; };

@ -1,55 +0,0 @@
#ifndef TCP_CONNECTION_HPP
#define TCP_CONNECTION_HPP
#include "Socket.hpp"
#include <string>
class TcpConnection
{
public:
enum Status {
OPENED,
CLOSED
};
TcpConnection ( const int socket,
const int bufferLenght = 1024 );
TcpConnection ( const std::string host,
const std::string port,
const int bufferLenght = 1024 );
virtual ~TcpConnection();
bool connectToHost();
bool bindToHost();
void closeConnection();
bool sendMessage( const void* message, const int length );
bool readFromSocket();
virtual void onMessageReady ( const unsigned char * message,
const int length ) = 0;
int getSocket() const;
private:
TcpConnection(const TcpConnection&);
TcpConnection& operator=(const TcpConnection&);
Socket m_socket;
std::string m_host;
std::string m_port;
Status m_status;
unsigned char *m_buffer;
int m_bufferLength;
};
#endif // TCP_CONNECTION_HPP

@ -10,26 +10,28 @@
class SimpleMessage : public Message class SimpleMessage : public Message
{ {
private: public:
bool buildMessage( const unsigned char* messagePart, bool buildMessage( const void *msgPart,
const int length ) const size_t msgLen )
{ {
TRACE; TRACE;
m_buffer = std::string( (const char*) messagePart, length ); m_buffer = std::string( (const char*) msgPart, msgLen );
onMessageReady(); onMessageReady();
return true;
} }
void onMessageReady() void onMessageReady()
{ {
TRACE; TRACE;
LOG( Logger::INFO, std::string("Got reply from server: "). LOG( Logger::INFO, std::string("Got reply from server: ").
append(m_buffer).c_str() ); append(m_buffer).c_str() );
} }
protected: protected:
int getExpectedLength() size_t getExpectedLength()
{ {
TRACE; TRACE;
return 0; return 0;
@ -51,9 +53,9 @@ int main( int argc, char * argv[] )
std::string msg1("madao"); std::string msg1("madao");
tcpclient.send( msg1.c_str(), msg1.length()); tcpclient.send( msg1.c_str(), msg1.length());
sleep(2); sleep(2);
std::string msg2("this message is long. Cannot fit into one buffer"); // std::string msg2("this message is long. Cannot fit into one buffer");
tcpclient.send( msg2.c_str(), msg2.length()); // tcpclient.send( msg2.c_str(), msg2.length());
sleep(2); // sleep(2);
tcpclient.disconnect(); tcpclient.disconnect();

@ -8,16 +8,16 @@
#include <stdlib.h> // malloc, free #include <stdlib.h> // malloc, free
Poll::Poll ( int &socket, const nfds_t maxClient ) Poll::Poll ( Connection &connection, const nfds_t maxClient )
: m_polling(false) : m_connection(connection)
, m_pollSocket(socket) , m_polling(false)
, m_maxclients(maxClient) , m_maxclients(maxClient)
, m_fds(0) , m_fds(0)
, m_num_of_fds(0) , m_num_of_fds(0)
{ {
TRACE; TRACE;
m_fds = (pollfd*) malloc (sizeof(struct pollfd)*m_maxclients); m_fds = new pollfd[m_maxclients];
} }
@ -25,15 +25,7 @@ Poll::~Poll()
{ {
TRACE; TRACE;
free(m_fds); delete[] m_fds;
}
void Poll::setOwnSocket ( const int socket )
{
TRACE;
addFd(socket, POLLIN | POLLPRI);
} }
@ -51,7 +43,7 @@ void Poll::startPolling()
if ( ret == -1 ) { if ( ret == -1 ) {
LOG( Logger::ERR, errnoToString("ERROR polling. ").c_str() ); LOG( Logger::ERR, errnoToString("ERROR polling. ").c_str() );
/// @todo shall we handle this? /// @todo reconnect
return; return;
} }
@ -60,7 +52,7 @@ void Poll::startPolling()
for ( nfds_t i = 0; i < m_num_of_fds; ++i ) for ( nfds_t i = 0; i < m_num_of_fds; ++i )
if ( m_fds[i].revents != 0 ) if ( m_fds[i].revents != 0 )
m_fds[i].fd == m_pollSocket ? m_fds[i].fd == m_connection.getSocket() ?
acceptClient() : acceptClient() :
handleClient(m_fds[i].fd); handleClient(m_fds[i].fd);
@ -82,7 +74,7 @@ void Poll::acceptClient()
sockaddr clientAddr; sockaddr clientAddr;
socklen_t clientAddrLen; socklen_t clientAddrLen;
int client_socket = accept( m_pollSocket, &clientAddr, &clientAddrLen ) ; int client_socket = accept( m_connection.getSocket(), &clientAddr, &clientAddrLen ) ;
if ( client_socket == -1 ) { if ( client_socket == -1 ) {
LOG( Logger::ERR, errnoToString("ERROR accepting. ").c_str() ); LOG( Logger::ERR, errnoToString("ERROR accepting. ").c_str() );
@ -95,6 +87,9 @@ void Poll::acceptClient()
append(clientAddress).append(":"). append(clientAddress).append(":").
append(clientService).c_str() ); append(clientService).c_str() );
} }
m_connectionPool.insert (
std::pair<int, Connection>( client_socket, Connection(client_socket)) );
addFd( client_socket, POLLIN | POLLPRI ); addFd( client_socket, POLLIN | POLLPRI );
} }
} }
@ -104,13 +99,16 @@ void Poll::handleClient( const int socket )
{ {
TRACE; TRACE;
if ( !receive( socket ) ) { ConnectionPool::iterator it = m_connectionPool.find(socket);
removeFd( socket );
if ( it == m_connectionPool.end() || !it->second.receive() ) {
m_connectionPool.erase(socket);
removeFd(socket);
} }
} }
bool Poll::addFd( const int socket, short events ) bool Poll::addFd( const int socket, const short events )
{ {
TRACE; TRACE;
LOG( Logger::DEBUG, std::string("Adding socket: "). LOG( Logger::DEBUG, std::string("Adding socket: ").

@ -119,10 +119,9 @@ bool Socket::send ( const void *message, const int length )
} }
int& Socket::getSocket() const int Socket::getSocket() const
{ {
TRACE; TRACE;
return m_socket; return m_socket;
} }
@ -171,8 +170,7 @@ bool Socket::bindToFirstAddress(struct addrinfo *servinfo )
} }
bool Socket::getPeerName( const int socket, void Socket::getPeerName( std::string &host,
std::string &host,
std::string &port ) std::string &port )
{ {
TRACE; TRACE;
@ -180,9 +178,7 @@ bool Socket::getPeerName( const int socket,
struct sockaddr_in address ; struct sockaddr_in address ;
memset(&address, 0, sizeof(address)); memset(&address, 0, sizeof(address));
socklen_t addressLength = sizeof(address) ; socklen_t addressLength = sizeof(address) ;
getpeername( socket, (struct sockaddr*)&address, &addressLength ) ; getpeername( m_socket, (struct sockaddr*)&address, &addressLength ) ;
unsigned int ip = address.sin_addr.s_addr ;
char tmp[INET_ADDRSTRLEN]; char tmp[INET_ADDRSTRLEN];
host = inet_ntop(AF_INET, &address.sin_addr, tmp, INET_ADDRSTRLEN); host = inet_ntop(AF_INET, &address.sin_addr, tmp, INET_ADDRSTRLEN);

Loading…
Cancel
Save