From 3fff13fdb7407746c0b05e62f1a339db7d4496f8 Mon Sep 17 00:00:00 2001 From: Denes Matetelki Date: Thu, 10 Nov 2011 16:40:55 +0100 Subject: [PATCH] TcpClient uses Poll class --- .gitignore | 3 +- include/Poll.hpp | 11 ++++--- include/TcpClient.hpp | 9 ++++-- include/TcpServer.hpp | 5 ++- other/tcpclient_main.cpp | 2 ++ other/tcpserver_main.cpp | 9 ++++++ src/Poll.cpp | 35 ++++++++++++-------- src/TcpClient.cpp | 70 ++++++++++++++-------------------------- src/TcpServer.cpp | 8 ++--- 9 files changed, 78 insertions(+), 74 deletions(-) diff --git a/.gitignore b/.gitignore index 0e1ccaa..99d0414 100644 --- a/.gitignore +++ b/.gitignore @@ -22,4 +22,5 @@ html/* test/testCppUtils test/testCppUtils.out *.kate-swp - +other/client +other/server diff --git a/include/Poll.hpp b/include/Poll.hpp index 46309d1..e767826 100644 --- a/include/Poll.hpp +++ b/include/Poll.hpp @@ -10,15 +10,13 @@ public: Poll( int &socket, const nfds_t maxClient ); virtual ~Poll(); + void setOwnSocket( const int socket ); void startPolling(); void stopPolling(); virtual void acceptClient(); - virtual void handleClient( const int fd ); - virtual bool receive( const int fd ) = 0; - - bool addFd( const int fd, const short events ); - bool removeFd( const int fd ); + virtual void handleClient( const int socket ); + virtual bool receive( const int socket ) = 0; protected: @@ -31,6 +29,9 @@ private: Poll(const Poll&); Poll& operator=(const Poll&); + bool addFd( const int socket, const short events ); + bool removeFd( const int socket ); + int &m_pollSocket; nfds_t m_maxclients; pollfd *m_fds; diff --git a/include/TcpClient.hpp b/include/TcpClient.hpp index f697d9e..be3ed9d 100644 --- a/include/TcpClient.hpp +++ b/include/TcpClient.hpp @@ -3,6 +3,7 @@ #include "Socket.hpp" #include "Thread.hpp" +#include "Poll.hpp" #include @@ -29,20 +30,24 @@ private: virtual void onDisconnect() = 0; class WatcherThread : public Thread + , public Poll { public: WatcherThread( TcpClient &data ); + // overringind Poll's accept behaviour + void acceptClient(); + void handleClient( const int fd ); + bool receive( const int fd ); + private: void* run(); - bool receive(); TcpClient &m_tcpClient; }; std::string m_host; std::string m_port; - bool m_connected; WatcherThread m_watcher; }; diff --git a/include/TcpServer.hpp b/include/TcpServer.hpp index 6ae12ee..b38e231 100644 --- a/include/TcpServer.hpp +++ b/include/TcpServer.hpp @@ -5,10 +5,9 @@ #include "Poll.hpp" #include -#include -class TcpServer : public Socket, - public Poll +class TcpServer : public Socket + , public Poll { public: diff --git a/other/tcpclient_main.cpp b/other/tcpclient_main.cpp index 55aeecf..6b98b1e 100644 --- a/other/tcpclient_main.cpp +++ b/other/tcpclient_main.cpp @@ -47,6 +47,8 @@ int main( int argc, char * argv[] ) sleep(2); tcpclient.send("madao"); sleep(2); + tcpclient.send("this message is long. Cannot fit into one buffer"); + sleep(2); // std::string reply; // tcpclient.receive(reply); diff --git a/other/tcpserver_main.cpp b/other/tcpserver_main.cpp index f7cd81b..a24ae0c 100644 --- a/other/tcpserver_main.cpp +++ b/other/tcpserver_main.cpp @@ -3,6 +3,7 @@ #include "TcpServer.hpp" #include "Logger.hpp" +#include "Common.hpp" #include #include @@ -25,6 +26,14 @@ public: TRACE; LOG( Logger::DEBUG, std::string("Got msg: ").append(msg).c_str() ); + + std::string reply("Got your msg, buddy: \""); + reply.append(msg).append("\" see you!"); + + ssize_t n = write(clientSocket, reply.c_str(), reply.length()); + if (n == -1) { + LOG( Logger::ERR, errnoToString("ERROR writing to socket. ").c_str() ); + } } }; diff --git a/src/Poll.cpp b/src/Poll.cpp index 8d0d06b..f1e52a6 100644 --- a/src/Poll.cpp +++ b/src/Poll.cpp @@ -5,12 +5,9 @@ #include "Socket.hpp" -#include -#include +#include // malloc, free -#include - Poll::Poll ( int &socket, const nfds_t maxClient ) : m_polling(false) , m_pollSocket(socket) @@ -32,12 +29,20 @@ Poll::~Poll() } +void Poll::setOwnSocket ( const int socket ) +{ + TRACE; + + addFd(socket, POLLIN | POLLPRI); +} + + void Poll::startPolling() { TRACE; m_polling = true; - struct timespec tm = {0,1000}; + struct timespec tm = {0,10000}; while ( m_polling ) { @@ -84,7 +89,7 @@ void Poll::acceptClient() LOG( Logger::ERR, errnoToString("ERROR accepting. ").c_str() ); } else { - /// @bug always "bas family" errors + /// @bug does not works every time std::string clientAddress, clientService; if ( Socket::convertNameInfo(&clientAddr, clientAddrLen, clientAddress, clientService ) ) { @@ -97,24 +102,26 @@ void Poll::acceptClient() } -void Poll::handleClient( const int fd ) +void Poll::handleClient( const int socket ) { TRACE; - if ( !receive( fd ) ) { - removeFd( fd ); + if ( !receive( socket ) ) { + removeFd( socket ); } } -bool Poll::addFd( const int fd, short events ) +bool Poll::addFd( const int socket, short events ) { TRACE; + LOG( Logger::DEBUG, std::string("Adding socket: "). + append(TToStr(socket)).c_str() ); if (m_num_of_fds >= m_maxclients ) return false; - m_fds[m_num_of_fds].fd = fd; + m_fds[m_num_of_fds].fd = socket; m_fds[m_num_of_fds].events = events; m_fds[m_num_of_fds].revents = 0; m_num_of_fds++; @@ -123,12 +130,14 @@ bool Poll::addFd( const int fd, short events ) } -bool Poll::removeFd( const int fd ) +bool Poll::removeFd( const int socket ) { TRACE; + LOG( Logger::DEBUG, std::string("Removing socket: "). + append(TToStr(socket)).c_str() ); unsigned int i = 0 ; - while (i < m_maxclients && m_fds[i].fd != fd ) + while (i < m_maxclients && m_fds[i].fd != socket ) i++; if ( i == m_maxclients ) diff --git a/src/TcpClient.cpp b/src/TcpClient.cpp index f004c32..e56cdfc 100644 --- a/src/TcpClient.cpp +++ b/src/TcpClient.cpp @@ -3,19 +3,12 @@ #include "Logger.hpp" #include "Common.hpp" -#include -#include -#include // inet_ntop - -#include -#include TcpClient::TcpClient( const std::string host, const std::string port ) : Socket(AF_INET, SOCK_STREAM) , m_host(host) , m_port(port) - , m_connected(false) , m_watcher(*this) { TRACE; @@ -26,8 +19,6 @@ TcpClient::~TcpClient() TRACE; disconnect(); - - } @@ -41,8 +32,7 @@ bool TcpClient::connect() if ( !connectToHost(m_host, m_port) ) return false; - m_connected = true; - + m_watcher.setOwnSocket(m_socket); m_watcher.start(); return true; } @@ -53,9 +43,9 @@ void TcpClient::disconnect() TRACE; closeSocket(); - m_connected = false; if ( m_watcher.isRunning() ) { + m_watcher.stopPolling(); m_watcher.stop(); m_watcher.join(); } @@ -66,9 +56,6 @@ bool TcpClient::send(const std::string msg) { TRACE; - if ( !m_connected ) - return false; - ssize_t n = write(m_socket, msg.c_str(), msg.length()); if (n == -1) { LOG( Logger::ERR, errnoToString("ERROR writing to socket. ").c_str() ); @@ -80,52 +67,36 @@ bool TcpClient::send(const std::string msg) TcpClient::WatcherThread::WatcherThread( TcpClient &data ) - : m_tcpClient(data) + : Poll(data.m_socket, 1) + , m_tcpClient(data) { TRACE; } -void* TcpClient::WatcherThread::run() +void TcpClient::WatcherThread::acceptClient() { TRACE; - struct timespec tm = {0,1000}; - while ( m_isRunning ) { - - nanosleep(&tm, &tm) ; - if ( m_tcpClient.m_connected ) { - - pollfd fds[1]; - fds[0].fd = m_tcpClient.m_socket ; - fds[0].events = POLLIN | POLLPRI ; - fds[0].revents = 0 ; - - int ret = poll( fds , 1, 1000) ; - if ( ret == -1 ) { - LOG( Logger::ERR, errnoToString("ERROR polling. ").c_str() ); - m_tcpClient.m_connected = false; - m_tcpClient.onDisconnect(); - } + // not accepting anything + receive( m_tcpClient.m_socket ); +} - if ( ret != 0 && !receive() ) { - m_tcpClient.m_connected = false; - m_tcpClient.onDisconnect(); - } - } - } +void TcpClient::WatcherThread::handleClient( const int fd ) +{ + TRACE; - return 0; + LOG( Logger::DEBUG, "Server closed the connection." ); } -bool TcpClient::WatcherThread::receive() +bool TcpClient::WatcherThread::receive( const int fd) { TRACE; - char buffer[256]; - int len = recv( m_tcpClient.m_socket, buffer , 256, 0) ; + char buffer[14]; + int len = recv( fd, buffer , 14, 0) ; if (len == -1) { LOG( Logger::ERR, errnoToString("ERROR reading from socket. ").c_str() ); @@ -141,4 +112,13 @@ bool TcpClient::WatcherThread::receive() m_tcpClient.msgArrived(msg); return true; -} \ No newline at end of file +} + + +void* TcpClient::WatcherThread::run() +{ + TRACE; + + startPolling(); + return 0; +} diff --git a/src/TcpServer.cpp b/src/TcpServer.cpp index 0b118f6..8e36622 100644 --- a/src/TcpServer.cpp +++ b/src/TcpServer.cpp @@ -3,8 +3,6 @@ #include "Logger.hpp" #include "Common.hpp" -#include -#include TcpServer::TcpServer( const std::string host, const std::string port, @@ -39,7 +37,7 @@ bool TcpServer::start() return false; } - addFd( m_socket, POLLIN | POLLPRI ) ; + setOwnSocket(m_socket); startPolling(); return true; @@ -59,8 +57,8 @@ bool TcpServer::receive(const int clientSocket) { TRACE; - char buffer[256]; - int len = recv( clientSocket, buffer , 256, 0) ; + char buffer[10]; + int len = recv( clientSocket, buffer , 10, 0) ; if (len == -1) { LOG( Logger::ERR, errnoToString("ERROR reading from socket. ").c_str() );