Poll has timeout handling

master
Denes Matetelki 12 years ago
parent 1fce1358a4
commit 3bdf10fe7a

@ -13,7 +13,8 @@ class Poll
public: public:
Poll( StreamConnection *connection, Poll( StreamConnection *connection,
const nfds_t maxClient = 10 ); const nfds_t maxClient = 10,
const int timeOut = 10 * 1000 ); // 10sec
virtual ~Poll(); virtual ~Poll();
@ -37,15 +38,21 @@ private:
Poll(const Poll&); Poll(const Poll&);
Poll& operator=(const Poll&); Poll& operator=(const Poll&);
typedef typename std::map< int, StreamConnection* > ConnectionMap;
// can be overriden: behaviour alters in server/client
virtual void removeTimeoutedConnections();
ConnectionMap::iterator removeConnection(int socket, ConnectionMap::iterator it);
bool addFd( const int socket, const short events ); bool addFd( const int socket, const short events );
bool removeFd( const int socket ); bool removeFd( const int socket );
typedef typename std::map< int, Connection* > ConnectionPool; int m_timeOut;
StreamConnection *m_connection; StreamConnection *m_connection;
volatile bool m_polling; volatile bool m_polling;
ConnectionPool m_connectionPool; ConnectionMap m_connections;
nfds_t m_maxclients; nfds_t m_maxclients;
pollfd *m_fds; pollfd *m_fds;

@ -10,10 +10,12 @@
Poll::Poll( StreamConnection *connection, Poll::Poll( StreamConnection *connection,
const nfds_t maxClient ) const nfds_t maxClient,
: m_connection(connection) const int timeOut )
: m_timeOut(timeOut)
, m_connection(connection)
, m_polling(false) , m_polling(false)
, m_connectionPool() , m_connections()
, m_maxclients(maxClient) , m_maxclients(maxClient)
, m_fds(0) , m_fds(0)
, m_num_of_fds(0) , m_num_of_fds(0)
@ -36,28 +38,25 @@ void Poll::startPolling()
TRACE; TRACE;
m_polling = true; m_polling = true;
struct timespec tm = {0,1000};
while ( m_polling ) { while ( m_polling ) {
nanosleep(&tm, &tm) ; int ret = poll( m_fds , m_maxclients, m_timeOut);
/// @todo put poll into Socket class
int ret = poll( m_fds , m_maxclients, 1000);
if ( ret == -1 ) { if ( ret == -1 ) {
LOG( Logger::ERR, errnoToString("ERROR polling. ").c_str() ); LOG( Logger::ERR, errnoToString("ERROR polling. ").c_str() );
/// @todo reconnect /// @todo reconnect at client case?
return; return;
} }
if ( ret == 0 ) // timeout
continue;
for ( nfds_t i = 0; i < m_num_of_fds; ++i ) if ( ret != 0 ) { // not timeout
if ( m_fds[i].revents != 0 ) for ( nfds_t i = 0; i < m_num_of_fds; ++i )
m_fds[i].fd == m_connection->getSocket() ? if ( m_fds[i].revents != 0 )
acceptClient() : m_fds[i].fd == m_connection->getSocket() ?
handleClient(m_fds[i].fd); acceptClient() :
handleClient(m_fds[i].fd);
}
removeTimeoutedConnections();
} // while } // while
} }
@ -81,19 +80,20 @@ void Poll::acceptClient()
{ {
TRACE; TRACE;
int client_socket = m_connection->accept(); int client_socket(-1);
if (!m_connection->accept(client_socket))
if ( client_socket == -1 ) {
return; return;
}
Connection *connection = m_connection->clone(client_socket); StreamConnection *streamConnection = dynamic_cast<StreamConnection*>(
m_connection->clone(client_socket));
LOG( Logger::INFO, std::string("New client connected: "). LOG_BEGIN(Logger::INFO)
append(connection->getHost()).append(":"). LOG_PROP("host", streamConnection->getHost())
append(TToStr(connection->getPort())).c_str() ); LOG_PROP("port", streamConnection->getPort())
LOG_PROP("socket", client_socket)
LOG_END("New client connected.");
m_connectionPool[client_socket] = connection; m_connections[client_socket] = streamConnection;
addFd( client_socket, POLLIN | POLLPRI ); addFd( client_socket, POLLIN | POLLPRI );
} }
@ -102,21 +102,52 @@ void Poll::handleClient( const int socket )
{ {
TRACE; TRACE;
typename ConnectionPool::iterator it = m_connectionPool.find(socket); ConnectionMap::iterator it = m_connections.find(socket);
if (it == m_connections.end()) {
if ( it == m_connectionPool.end() || !it->second->receive() ) { LOG_BEGIN(Logger::ERR)
delete it->second; LOG_SPROP(socket)
m_connectionPool.erase(it); LOG_END("Socket not found in map.");
removeFd(socket); return;
} }
if (!it->second->receive())
removeConnection(socket, it);
}
void Poll::removeTimeoutedConnections()
{
TRACE;
if (m_connections.empty())
return;
ConnectionMap::iterator it;
for (it = m_connections.begin(); it != m_connections.end(); )
if (it->second->closed()) {
it = removeConnection(it->second->getSocket(), it++);
} else {
++it;
}
}
Poll::ConnectionMap::iterator Poll::removeConnection(int socket, ConnectionMap::iterator it)
{
TRACE;
removeFd(socket);
delete it->second;
return m_connections.erase(it);
} }
bool Poll::addFd( const int socket, const short events ) bool Poll::addFd( const int socket, const short events )
{ {
TRACE; TRACE;
LOG( Logger::DEBUG, std::string("Adding socket: "). LOG_BEGIN(Logger::DEBUG)
append(TToStr(socket)).c_str() ); LOG_SPROP(socket)
LOG_END("Adding socket.");
if (m_num_of_fds >= m_maxclients ) if (m_num_of_fds >= m_maxclients )
return false; return false;
@ -133,8 +164,9 @@ bool Poll::addFd( const int socket, const short events )
bool Poll::removeFd( const int socket ) bool Poll::removeFd( const int socket )
{ {
TRACE; TRACE;
LOG( Logger::DEBUG, std::string("Removing socket: "). LOG_BEGIN(Logger::DEBUG)
append(TToStr(socket)).c_str() ); LOG_SPROP(socket)
LOG_END("Removing socket.");
unsigned int i = 0 ; unsigned int i = 0 ;
while (i < m_maxclients && m_fds[i].fd != socket ) while (i < m_maxclients && m_fds[i].fd != socket )

Loading…
Cancel
Save