From 3bdf10fe7a9c7d3522a8b80ad7194243e39ae9c4 Mon Sep 17 00:00:00 2001 From: Denes Matetelki Date: Fri, 5 Jul 2013 19:44:18 +0200 Subject: [PATCH] Poll has timeout handling --- include/Poll.hpp | 15 +++++-- src/Poll.cpp | 104 +++++++++++++++++++++++++++++++---------------- 2 files changed, 79 insertions(+), 40 deletions(-) diff --git a/include/Poll.hpp b/include/Poll.hpp index 31150ca..175ee58 100644 --- a/include/Poll.hpp +++ b/include/Poll.hpp @@ -13,7 +13,8 @@ class Poll public: Poll( StreamConnection *connection, - const nfds_t maxClient = 10 ); + const nfds_t maxClient = 10, + const int timeOut = 10 * 1000 ); // 10sec virtual ~Poll(); @@ -37,15 +38,21 @@ private: Poll(const Poll&); Poll& operator=(const Poll&); + typedef typename std::map< int, StreamConnection* > ConnectionMap; + + // can be overriden: behaviour alters in server/client + virtual void removeTimeoutedConnections(); + + ConnectionMap::iterator removeConnection(int socket, ConnectionMap::iterator it); + bool addFd( const int socket, const short events ); bool removeFd( const int socket ); - typedef typename std::map< int, Connection* > ConnectionPool; - + int m_timeOut; StreamConnection *m_connection; volatile bool m_polling; - ConnectionPool m_connectionPool; + ConnectionMap m_connections; nfds_t m_maxclients; pollfd *m_fds; diff --git a/src/Poll.cpp b/src/Poll.cpp index 4b9452e..ddfb706 100644 --- a/src/Poll.cpp +++ b/src/Poll.cpp @@ -10,10 +10,12 @@ Poll::Poll( StreamConnection *connection, - const nfds_t maxClient ) - : m_connection(connection) + const nfds_t maxClient, + const int timeOut ) + : m_timeOut(timeOut) + , m_connection(connection) , m_polling(false) - , m_connectionPool() + , m_connections() , m_maxclients(maxClient) , m_fds(0) , m_num_of_fds(0) @@ -36,28 +38,25 @@ void Poll::startPolling() TRACE; m_polling = true; - struct timespec tm = {0,1000}; - while ( m_polling ) { - nanosleep(&tm, &tm) ; - - /// @todo put poll into Socket class - int ret = poll( m_fds , m_maxclients, 1000); + int ret = poll( m_fds , m_maxclients, m_timeOut); if ( ret == -1 ) { LOG( Logger::ERR, errnoToString("ERROR polling. ").c_str() ); - /// @todo reconnect + /// @todo reconnect at client case? return; } - if ( ret == 0 ) // timeout - continue; - for ( nfds_t i = 0; i < m_num_of_fds; ++i ) - if ( m_fds[i].revents != 0 ) - m_fds[i].fd == m_connection->getSocket() ? - acceptClient() : - handleClient(m_fds[i].fd); + if ( ret != 0 ) { // not timeout + for ( nfds_t i = 0; i < m_num_of_fds; ++i ) + if ( m_fds[i].revents != 0 ) + m_fds[i].fd == m_connection->getSocket() ? + acceptClient() : + handleClient(m_fds[i].fd); + } + + removeTimeoutedConnections(); } // while } @@ -81,19 +80,20 @@ void Poll::acceptClient() { TRACE; - int client_socket = m_connection->accept(); - - if ( client_socket == -1 ) { + int client_socket(-1); + if (!m_connection->accept(client_socket)) return; - } - Connection *connection = m_connection->clone(client_socket); + StreamConnection *streamConnection = dynamic_cast( + m_connection->clone(client_socket)); - LOG( Logger::INFO, std::string("New client connected: "). - append(connection->getHost()).append(":"). - append(TToStr(connection->getPort())).c_str() ); + LOG_BEGIN(Logger::INFO) + LOG_PROP("host", streamConnection->getHost()) + LOG_PROP("port", streamConnection->getPort()) + LOG_PROP("socket", client_socket) + LOG_END("New client connected."); - m_connectionPool[client_socket] = connection; + m_connections[client_socket] = streamConnection; addFd( client_socket, POLLIN | POLLPRI ); } @@ -102,21 +102,52 @@ void Poll::handleClient( const int socket ) { TRACE; - typename ConnectionPool::iterator it = m_connectionPool.find(socket); - - if ( it == m_connectionPool.end() || !it->second->receive() ) { - delete it->second; - m_connectionPool.erase(it); - removeFd(socket); + ConnectionMap::iterator it = m_connections.find(socket); + if (it == m_connections.end()) { + LOG_BEGIN(Logger::ERR) + LOG_SPROP(socket) + LOG_END("Socket not found in map."); + return; } + + if (!it->second->receive()) + removeConnection(socket, it); +} + + +void Poll::removeTimeoutedConnections() +{ + TRACE; + + if (m_connections.empty()) + return; + + ConnectionMap::iterator it; + for (it = m_connections.begin(); it != m_connections.end(); ) + if (it->second->closed()) { + it = removeConnection(it->second->getSocket(), it++); + } else { + ++it; + } +} + + +Poll::ConnectionMap::iterator Poll::removeConnection(int socket, ConnectionMap::iterator it) +{ + TRACE; + + removeFd(socket); + delete it->second; + return m_connections.erase(it); } bool Poll::addFd( const int socket, const short events ) { TRACE; - LOG( Logger::DEBUG, std::string("Adding socket: "). - append(TToStr(socket)).c_str() ); + LOG_BEGIN(Logger::DEBUG) + LOG_SPROP(socket) + LOG_END("Adding socket."); if (m_num_of_fds >= m_maxclients ) return false; @@ -133,8 +164,9 @@ bool Poll::addFd( const int socket, const short events ) bool Poll::removeFd( const int socket ) { TRACE; - LOG( Logger::DEBUG, std::string("Removing socket: "). - append(TToStr(socket)).c_str() ); + LOG_BEGIN(Logger::DEBUG) + LOG_SPROP(socket) + LOG_END("Removing socket."); unsigned int i = 0 ; while (i < m_maxclients && m_fds[i].fd != socket )