From 8816e1989fccdced931acbc9779c26f96f018f5c Mon Sep 17 00:00:00 2001 From: Denes Matetelki Date: Wed, 16 Nov 2011 13:09:03 +0100 Subject: [PATCH] TcpClient, Poll, Connection went generic --- include/Connection.hpp | 136 ++++++++++++++++++ include/Message.hpp | 10 +- include/Poll.hpp | 167 +++++++++++++++++++--- include/Socket.hpp | 5 +- include/TcpClient.hpp | 122 +++++++++++++--- include/TcpConnection.hpp | 55 ------- other/tcpclient_main.cpp | 18 +-- src/{TcpConnection.cpp => Connection.cpp} | 0 src/Poll.cpp | 36 +++-- src/Socket.cpp | 10 +- 10 files changed, 427 insertions(+), 132 deletions(-) create mode 100644 include/Connection.hpp delete mode 100644 include/TcpConnection.hpp rename src/{TcpConnection.cpp => Connection.cpp} (100%) diff --git a/include/Connection.hpp b/include/Connection.hpp new file mode 100644 index 0000000..d544bf2 --- /dev/null +++ b/include/Connection.hpp @@ -0,0 +1,136 @@ +#ifndef CONNECTION_HPP +#define CONNECTION_HPP + +#include "Logger.hpp" +#include "Common.hpp" + +#include "Socket.hpp" + +#include + + +template +class Connection +{ +public: + + enum Status { + OPENED, + CLOSED + }; + + Connection ( const int socket, + const size_t bufferLength = 1024 ) + : m_socket(socket) + , m_host() + , m_port() + , m_status(CLOSED) + , m_message() + , m_buffer(0) + , m_bufferLength(bufferLength) + + { + TRACE; + + m_socket.getPeerName(m_host, m_port); + m_buffer = new unsigned char[m_bufferLength]; + } + + Connection ( const std::string host, + const std::string port, + const size_t bufferLength = 1024 ) + : m_socket(AF_INET, SOCK_STREAM) + , m_host(host) + , m_port(port) + , m_status(CLOSED) + , m_message() + , m_buffer(0) + , m_bufferLength(bufferLength) + { + TRACE; + m_socket.createSocket(); + m_buffer = new unsigned char[m_bufferLength]; + } + + virtual ~Connection() + { + TRACE; + m_socket.closeSocket(); + delete[] m_buffer; + } + + bool connectToHost() + { + TRACE; + return m_socket.connectToHost(m_host, m_port); + } + + bool bindToHost() + { + TRACE; + return m_socket.bindToHost(m_host, m_port); + } + + void closeConnection() + { + TRACE; + m_socket.closeSocket(); + } + + bool send( const void* message, const size_t length ) + { + TRACE; + return m_socket.send( message, length ); + } + + bool receive() + { + TRACE; + + LOG ( Logger::DEBUG, std::string("receving on socket: "). + append(TToStr(m_socket.getSocket())).c_str() ); + + ssize_t len = recv(m_socket.getSocket(), m_buffer, m_bufferLength, 0); + + LOG ( Logger::DEBUG, std::string("len: "). + append(TToStr(len)).append(" errno: "). + append(TToStr(errno)).c_str() ); + + if (len == -1) { + LOG( Logger::ERR, errnoToString("ERROR reading from socket. ").c_str() ); + return false; + } + + if (len == 0) { + LOG( Logger::DEBUG, "Connection closed by peer." ); + return false; + } + + return m_message.buildMessage( (void*)m_buffer, (size_t)len); + } + + + int getSocket() const + { + TRACE; + return m_socket.getSocket(); + } + + +private: + + Connection(const Connection&); + Connection& operator=(const Connection&); + + Socket m_socket; + std::string m_host; + std::string m_port; + Status m_status; + T m_message; + + unsigned char *m_buffer; + size_t m_bufferLength; +}; + + +#endif // CONNECTION_HPP diff --git a/include/Message.hpp b/include/Message.hpp index c298046..54114fd 100644 --- a/include/Message.hpp +++ b/include/Message.hpp @@ -2,9 +2,9 @@ #define MESSAGE_HPP #include +#include // size_t - - /** Append messageParts with buildMessage() to m_buffer. + /** Append msgParts with buildMessage() to m_buffer. * Call onMessageReady() if the length of the buffer equals the value from * getExpectedLength(). */ @@ -16,13 +16,13 @@ public: Message() : m_buffer() {}; virtual ~Message() {}; - virtual bool buildMessage( const unsigned char* messagePart, - const int length ) = 0; + virtual bool buildMessage( const void *msgPart, + const size_t msgLen ) = 0; virtual void onMessageReady() = 0; protected: - virtual int getExpectedLength() = 0; + virtual size_t getExpectedLength() = 0; /// @todo shall i use dinamic array? std::string m_buffer; diff --git a/include/Poll.hpp b/include/Poll.hpp index d0897bf..90a6f4e 100644 --- a/include/Poll.hpp +++ b/include/Poll.hpp @@ -1,28 +1,119 @@ #ifndef POLL_HPP #define POLL_HPP +#include "Logger.hpp" + +#include "Connection.hpp" + #include +#include + + +template class Poll { public: - Poll( int &socket, const nfds_t maxClient = 10 ); - virtual ~Poll(); + Poll( Connection &connection, + const nfds_t maxClient = 10, + void *MessageParam ) + ) + : m_connection(connection) + , m_polling(false) + , m_connectionPool() + , m_maxclients(maxClient) + , m_fds(0) + , m_num_of_fds(0) + { + TRACE; + m_fds = new pollfd[m_maxclients]; + addFd( m_connection.getSocket(), POLLIN | POLLPRI ); + } + + virtual ~Poll() + { + TRACE; + delete[] m_fds; + } + + void startPolling() + { + TRACE; + + m_polling = true; + struct timespec tm = {0,1000}; + + while ( m_polling ) { + + nanosleep(&tm, &tm) ; + int ret = poll( m_fds , m_maxclients, 1000); + + if ( ret == -1 ) { + LOG( Logger::ERR, errnoToString("ERROR polling. ").c_str() ); + /// @todo reconnect + return; + } + + if ( ret == 0 ) // timeout + continue; + + for ( nfds_t i = 0; i < m_num_of_fds; ++i ) + if ( m_fds[i].revents != 0 ) + m_fds[i].fd == m_connection.getSocket() ? + acceptClient(m_fds[i].fd) : + handleClient(m_fds[i].fd); + + } // while + } + + void stopPolling() + { + TRACE; + m_polling = false; + } - void setOwnSocket( const int socket ); - void startPolling(); - void stopPolling(); - virtual void acceptClient(); - virtual void handleClient( const int socket ); +protected: - virtual bool receive( const int socket ) = 0; + virtual void acceptClient( const int socket ) + { + TRACE; + sockaddr clientAddr; + socklen_t clientAddrLen; + int client_socket = accept( m_connection.getSocket(), + &clientAddr, &clientAddrLen ) ; -protected: + if ( client_socket == -1 ) { + LOG( Logger::ERR, errnoToString("ERROR accepting. ").c_str() ); + } else { - bool m_polling; + std::string clientAddress, clientService; + if ( Socket::convertNameInfo(&clientAddr, clientAddrLen, + clientAddress, clientService ) ) { + LOG( Logger::DEBUG, std::string("New client connected: "). + append(clientAddress).append(":"). + append(clientService).c_str() ); + } + + m_connectionPool[client_socket] = new Connection(client_socket); + addFd( client_socket, POLLIN | POLLPRI ); + } + } + + virtual void handleClient( const int socket ) + { + TRACE; + + typename ConnectionPool::iterator it = m_connectionPool.find(socket); + + if ( it == m_connectionPool.end() || !it->second->receive() ) { + delete it->second; + m_connectionPool.erase(it); + removeFd(socket); + } + } private: @@ -30,13 +121,57 @@ private: Poll(const Poll&); Poll& operator=(const Poll&); - bool addFd( const int socket, const short events ); - bool removeFd( const int socket ); + bool addFd( const int socket, const short events ) + { + TRACE; + LOG( Logger::DEBUG, std::string("Adding socket: "). + append(TToStr(socket)).c_str() ); + + if (m_num_of_fds >= m_maxclients ) + return false; + + m_fds[m_num_of_fds].fd = socket; + m_fds[m_num_of_fds].events = events; + m_fds[m_num_of_fds].revents = 0; + m_num_of_fds++; + + return true; + } + + bool removeFd( const int socket ) + { + TRACE; + LOG( Logger::DEBUG, std::string("Removing socket: "). + append(TToStr(socket)).c_str() ); + + unsigned int i = 0 ; + while (i < m_maxclients && m_fds[i].fd != socket ) + i++; + + if ( i == m_maxclients ) + return false; + + for ( ; i < m_maxclients - 1; ++i ) + m_fds[i] = m_fds[i+1] ; + + m_fds[i].fd = 0 ; + m_fds[i].events = 0 ; + m_fds[i].revents = 0 ; + m_num_of_fds--; + + return true; + } + + + typedef typename std::map< int, Connection* > ConnectionPool; + + Connection &m_connection; + bool m_polling; + ConnectionPool m_connectionPool; - int &m_pollSocket; - nfds_t m_maxclients; - pollfd *m_fds; - nfds_t m_num_of_fds; + nfds_t m_maxclients; + pollfd *m_fds; + nfds_t m_num_of_fds; }; diff --git a/include/Socket.hpp b/include/Socket.hpp index 71eb014..97f8bb4 100644 --- a/include/Socket.hpp +++ b/include/Socket.hpp @@ -27,12 +27,11 @@ public: bool bindToHost(const std::string host, const std::string port ); - bool getPeerName(const int socket, - std::string &host, + void getPeerName(std::string &host, std::string &port); bool send( const void *message, const int lenght ); - int& getSocket() const; + int getSocket() const; static bool convertNameInfo( sockaddr* addr, socklen_t addrLen, diff --git a/include/TcpClient.hpp b/include/TcpClient.hpp index 6ac9656..7c6a21d 100644 --- a/include/TcpClient.hpp +++ b/include/TcpClient.hpp @@ -1,53 +1,137 @@ #ifndef TCP_CLIENT_HPP #define TCP_CLIENT_HPP -#include "TcpConnection.hpp" +#include "Logger.hpp" + +#include "Connection.hpp" #include "Thread.hpp" #include "Poll.hpp" + #include +#include // size_t +template class TcpClient { private: - class WatcherThread : public Thread - , public Poll + template + class PollerThread : public Thread + , public Poll { public: - WatcherThread( TcpClient &data ); + PollerThread( TcpClient &data ) + : Poll(data.m_connection) + , m_tcpClient(data) + { + TRACE; + } + + void stopPoller() + { + TRACE; + stopPolling(); + stop(); + } + + protected: + + /// @todo this is unclear and nasty hack + virtual void acceptClient( const int socket ) + { + TRACE; + + LOG( Logger::DEBUG, std::string("own socket: "). + append( TToStr(m_tcpClient.m_connection.getSocket())). + append( " param socket: "). + append( TToStr( socket) ).c_str() ); + + m_tcpClient.m_connection.receive(); + stopPolling(); + } - // overringind Poll's accept behaviour - void acceptClient(); - void handleClient( const int fd ); - bool receive( const int fd ); + /// @todo this is unclear and nasty hack + virtual void handleClient( const int socket ) + { + TRACE; + LOG( Logger::DEBUG, "Server closed the connection." ); + stopPolling(); + } private: - void* run(); - TcpClient &m_tcpClient; - }; + void* run() + { + TRACE; + startPolling(); + return 0; + } + + TcpClient &m_tcpClient; + + }; // class PollerThread public: TcpClient ( const std::string host, - const std::string port ); + const std::string port ) + : m_connection (host, port) + , m_watcher(*this) + { + TRACE; + } + + virtual ~TcpClient() + { + TRACE; + disconnect(); + } + + bool connect() + { + TRACE; + + if ( !m_connection.connectToHost() ) + return false; + + m_watcher.start(); + return true; + } - virtual ~TcpClient(); + void disconnect() + { + TRACE; + + if ( m_watcher.isRunning() ) { + m_watcher.stopPoller(); + m_watcher.join(); + } - bool connect(); - void disconnect(); + m_connection.closeConnection(); + } - bool send( const void* message, const int length ); + bool send( const void* msg, const size_t msgLen ) + { + TRACE; + return m_connection.send(msg, msgLen); + } private: -// virtual void onDisconnect() = 0; + TcpClient(const TcpClient& ); + TcpClient& operator=(const TcpClient& ); + + Connection& getConnection() + { + TRACE; + return m_connection; + } - TcpConnection m_connection; - WatcherThread m_watcher; + Connection m_connection; + PollerThread m_watcher; }; diff --git a/include/TcpConnection.hpp b/include/TcpConnection.hpp deleted file mode 100644 index 1097cc3..0000000 --- a/include/TcpConnection.hpp +++ /dev/null @@ -1,55 +0,0 @@ -#ifndef TCP_CONNECTION_HPP -#define TCP_CONNECTION_HPP - -#include "Socket.hpp" - -#include - -class TcpConnection -{ -public: - - enum Status { - OPENED, - CLOSED - }; - - TcpConnection ( const int socket, - const int bufferLenght = 1024 ); - - TcpConnection ( const std::string host, - const std::string port, - const int bufferLenght = 1024 ); - - virtual ~TcpConnection(); - - bool connectToHost(); - bool bindToHost(); - - void closeConnection(); - - bool sendMessage( const void* message, const int length ); - bool readFromSocket(); - virtual void onMessageReady ( const unsigned char * message, - const int length ) = 0; - - - int getSocket() const; - - -private: - - TcpConnection(const TcpConnection&); - TcpConnection& operator=(const TcpConnection&); - - Socket m_socket; - std::string m_host; - std::string m_port; - Status m_status; - - unsigned char *m_buffer; - int m_bufferLength; -}; - - -#endif // TCP_CONNECTION_HPP diff --git a/other/tcpclient_main.cpp b/other/tcpclient_main.cpp index e881639..59fe7bf 100644 --- a/other/tcpclient_main.cpp +++ b/other/tcpclient_main.cpp @@ -10,26 +10,28 @@ class SimpleMessage : public Message { -private: +public: - bool buildMessage( const unsigned char* messagePart, - const int length ) + bool buildMessage( const void *msgPart, + const size_t msgLen ) { TRACE; - m_buffer = std::string( (const char*) messagePart, length ); + m_buffer = std::string( (const char*) msgPart, msgLen ); onMessageReady(); + return true; } void onMessageReady() { TRACE; + LOG( Logger::INFO, std::string("Got reply from server: "). append(m_buffer).c_str() ); } protected: - int getExpectedLength() + size_t getExpectedLength() { TRACE; return 0; @@ -51,9 +53,9 @@ int main( int argc, char * argv[] ) std::string msg1("madao"); tcpclient.send( msg1.c_str(), msg1.length()); sleep(2); - std::string msg2("this message is long. Cannot fit into one buffer"); - tcpclient.send( msg2.c_str(), msg2.length()); - sleep(2); +// std::string msg2("this message is long. Cannot fit into one buffer"); +// tcpclient.send( msg2.c_str(), msg2.length()); +// sleep(2); tcpclient.disconnect(); diff --git a/src/TcpConnection.cpp b/src/Connection.cpp similarity index 100% rename from src/TcpConnection.cpp rename to src/Connection.cpp diff --git a/src/Poll.cpp b/src/Poll.cpp index b25b9d4..c0d102f 100644 --- a/src/Poll.cpp +++ b/src/Poll.cpp @@ -8,16 +8,16 @@ #include // malloc, free -Poll::Poll ( int &socket, const nfds_t maxClient ) - : m_polling(false) - , m_pollSocket(socket) +Poll::Poll ( Connection &connection, const nfds_t maxClient ) + : m_connection(connection) + , m_polling(false) , m_maxclients(maxClient) , m_fds(0) , m_num_of_fds(0) { TRACE; - m_fds = (pollfd*) malloc (sizeof(struct pollfd)*m_maxclients); + m_fds = new pollfd[m_maxclients]; } @@ -25,15 +25,7 @@ Poll::~Poll() { TRACE; - free(m_fds); -} - - -void Poll::setOwnSocket ( const int socket ) -{ - TRACE; - - addFd(socket, POLLIN | POLLPRI); + delete[] m_fds; } @@ -51,7 +43,7 @@ void Poll::startPolling() if ( ret == -1 ) { LOG( Logger::ERR, errnoToString("ERROR polling. ").c_str() ); - /// @todo shall we handle this? + /// @todo reconnect return; } @@ -60,7 +52,7 @@ void Poll::startPolling() for ( nfds_t i = 0; i < m_num_of_fds; ++i ) if ( m_fds[i].revents != 0 ) - m_fds[i].fd == m_pollSocket ? + m_fds[i].fd == m_connection.getSocket() ? acceptClient() : handleClient(m_fds[i].fd); @@ -82,7 +74,7 @@ void Poll::acceptClient() sockaddr clientAddr; socklen_t clientAddrLen; - int client_socket = accept( m_pollSocket, &clientAddr, &clientAddrLen ) ; + int client_socket = accept( m_connection.getSocket(), &clientAddr, &clientAddrLen ) ; if ( client_socket == -1 ) { LOG( Logger::ERR, errnoToString("ERROR accepting. ").c_str() ); @@ -95,6 +87,9 @@ void Poll::acceptClient() append(clientAddress).append(":"). append(clientService).c_str() ); } + + m_connectionPool.insert ( + std::pair( client_socket, Connection(client_socket)) ); addFd( client_socket, POLLIN | POLLPRI ); } } @@ -104,13 +99,16 @@ void Poll::handleClient( const int socket ) { TRACE; - if ( !receive( socket ) ) { - removeFd( socket ); + ConnectionPool::iterator it = m_connectionPool.find(socket); + + if ( it == m_connectionPool.end() || !it->second.receive() ) { + m_connectionPool.erase(socket); + removeFd(socket); } } -bool Poll::addFd( const int socket, short events ) +bool Poll::addFd( const int socket, const short events ) { TRACE; LOG( Logger::DEBUG, std::string("Adding socket: "). diff --git a/src/Socket.cpp b/src/Socket.cpp index 3d07e88..1f93a7e 100644 --- a/src/Socket.cpp +++ b/src/Socket.cpp @@ -119,10 +119,9 @@ bool Socket::send ( const void *message, const int length ) } -int& Socket::getSocket() const +int Socket::getSocket() const { TRACE; - return m_socket; } @@ -171,8 +170,7 @@ bool Socket::bindToFirstAddress(struct addrinfo *servinfo ) } -bool Socket::getPeerName( const int socket, - std::string &host, +void Socket::getPeerName( std::string &host, std::string &port ) { TRACE; @@ -180,9 +178,7 @@ bool Socket::getPeerName( const int socket, struct sockaddr_in address ; memset(&address, 0, sizeof(address)); socklen_t addressLength = sizeof(address) ; - getpeername( socket, (struct sockaddr*)&address, &addressLength ) ; - - unsigned int ip = address.sin_addr.s_addr ; + getpeername( m_socket, (struct sockaddr*)&address, &addressLength ) ; char tmp[INET_ADDRSTRLEN]; host = inet_ntop(AF_INET, &address.sin_addr, tmp, INET_ADDRSTRLEN);