TcpClient uses Poll class

master
Denes Matetelki 13 years ago
parent e85a403f44
commit 3fff13fdb7

3
.gitignore vendored

@ -22,4 +22,5 @@ html/*
test/testCppUtils test/testCppUtils
test/testCppUtils.out test/testCppUtils.out
*.kate-swp *.kate-swp
other/client
other/server

@ -10,15 +10,13 @@ public:
Poll( int &socket, const nfds_t maxClient ); Poll( int &socket, const nfds_t maxClient );
virtual ~Poll(); virtual ~Poll();
void setOwnSocket( const int socket );
void startPolling(); void startPolling();
void stopPolling(); void stopPolling();
virtual void acceptClient(); virtual void acceptClient();
virtual void handleClient( const int fd ); virtual void handleClient( const int socket );
virtual bool receive( const int fd ) = 0; virtual bool receive( const int socket ) = 0;
bool addFd( const int fd, const short events );
bool removeFd( const int fd );
protected: protected:
@ -31,6 +29,9 @@ private:
Poll(const Poll&); Poll(const Poll&);
Poll& operator=(const Poll&); Poll& operator=(const Poll&);
bool addFd( const int socket, const short events );
bool removeFd( const int socket );
int &m_pollSocket; int &m_pollSocket;
nfds_t m_maxclients; nfds_t m_maxclients;
pollfd *m_fds; pollfd *m_fds;

@ -3,6 +3,7 @@
#include "Socket.hpp" #include "Socket.hpp"
#include "Thread.hpp" #include "Thread.hpp"
#include "Poll.hpp"
#include <string> #include <string>
@ -29,20 +30,24 @@ private:
virtual void onDisconnect() = 0; virtual void onDisconnect() = 0;
class WatcherThread : public Thread class WatcherThread : public Thread
, public Poll
{ {
public: public:
WatcherThread( TcpClient &data ); WatcherThread( TcpClient &data );
// overringind Poll's accept behaviour
void acceptClient();
void handleClient( const int fd );
bool receive( const int fd );
private: private:
void* run(); void* run();
bool receive();
TcpClient &m_tcpClient; TcpClient &m_tcpClient;
}; };
std::string m_host; std::string m_host;
std::string m_port; std::string m_port;
bool m_connected;
WatcherThread m_watcher; WatcherThread m_watcher;
}; };

@ -5,10 +5,9 @@
#include "Poll.hpp" #include "Poll.hpp"
#include <string> #include <string>
#include <poll.h>
class TcpServer : public Socket, class TcpServer : public Socket
public Poll , public Poll
{ {
public: public:

@ -47,6 +47,8 @@ int main( int argc, char * argv[] )
sleep(2); sleep(2);
tcpclient.send("madao"); tcpclient.send("madao");
sleep(2); sleep(2);
tcpclient.send("this message is long. Cannot fit into one buffer");
sleep(2);
// std::string reply; // std::string reply;
// tcpclient.receive(reply); // tcpclient.receive(reply);

@ -3,6 +3,7 @@
#include "TcpServer.hpp" #include "TcpServer.hpp"
#include "Logger.hpp" #include "Logger.hpp"
#include "Common.hpp"
#include <iostream> #include <iostream>
#include <string> #include <string>
@ -25,6 +26,14 @@ public:
TRACE; TRACE;
LOG( Logger::DEBUG, std::string("Got msg: ").append(msg).c_str() ); LOG( Logger::DEBUG, std::string("Got msg: ").append(msg).c_str() );
std::string reply("Got your msg, buddy: \"");
reply.append(msg).append("\" see you!");
ssize_t n = write(clientSocket, reply.c_str(), reply.length());
if (n == -1) {
LOG( Logger::ERR, errnoToString("ERROR writing to socket. ").c_str() );
}
} }
}; };

@ -5,12 +5,9 @@
#include "Socket.hpp" #include "Socket.hpp"
#include <sys/types.h> #include <stdlib.h> // malloc, free
#include <sys/socket.h>
#include <stdlib.h>
Poll::Poll ( int &socket, const nfds_t maxClient ) Poll::Poll ( int &socket, const nfds_t maxClient )
: m_polling(false) : m_polling(false)
, m_pollSocket(socket) , m_pollSocket(socket)
@ -32,12 +29,20 @@ Poll::~Poll()
} }
void Poll::setOwnSocket ( const int socket )
{
TRACE;
addFd(socket, POLLIN | POLLPRI);
}
void Poll::startPolling() void Poll::startPolling()
{ {
TRACE; TRACE;
m_polling = true; m_polling = true;
struct timespec tm = {0,1000}; struct timespec tm = {0,10000};
while ( m_polling ) { while ( m_polling ) {
@ -84,7 +89,7 @@ void Poll::acceptClient()
LOG( Logger::ERR, errnoToString("ERROR accepting. ").c_str() ); LOG( Logger::ERR, errnoToString("ERROR accepting. ").c_str() );
} else { } else {
/// @bug always "bas family" errors /// @bug does not works every time
std::string clientAddress, clientService; std::string clientAddress, clientService;
if ( Socket::convertNameInfo(&clientAddr, clientAddrLen, if ( Socket::convertNameInfo(&clientAddr, clientAddrLen,
clientAddress, clientService ) ) { clientAddress, clientService ) ) {
@ -97,24 +102,26 @@ void Poll::acceptClient()
} }
void Poll::handleClient( const int fd ) void Poll::handleClient( const int socket )
{ {
TRACE; TRACE;
if ( !receive( fd ) ) { if ( !receive( socket ) ) {
removeFd( fd ); removeFd( socket );
} }
} }
bool Poll::addFd( const int fd, short events ) bool Poll::addFd( const int socket, short events )
{ {
TRACE; TRACE;
LOG( Logger::DEBUG, std::string("Adding socket: ").
append(TToStr(socket)).c_str() );
if (m_num_of_fds >= m_maxclients ) if (m_num_of_fds >= m_maxclients )
return false; return false;
m_fds[m_num_of_fds].fd = fd; m_fds[m_num_of_fds].fd = socket;
m_fds[m_num_of_fds].events = events; m_fds[m_num_of_fds].events = events;
m_fds[m_num_of_fds].revents = 0; m_fds[m_num_of_fds].revents = 0;
m_num_of_fds++; m_num_of_fds++;
@ -123,12 +130,14 @@ bool Poll::addFd( const int fd, short events )
} }
bool Poll::removeFd( const int fd ) bool Poll::removeFd( const int socket )
{ {
TRACE; TRACE;
LOG( Logger::DEBUG, std::string("Removing socket: ").
append(TToStr(socket)).c_str() );
unsigned int i = 0 ; unsigned int i = 0 ;
while (i < m_maxclients && m_fds[i].fd != fd ) while (i < m_maxclients && m_fds[i].fd != socket )
i++; i++;
if ( i == m_maxclients ) if ( i == m_maxclients )

@ -3,19 +3,12 @@
#include "Logger.hpp" #include "Logger.hpp"
#include "Common.hpp" #include "Common.hpp"
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h> // inet_ntop
#include <poll.h>
#include <time.h>
TcpClient::TcpClient( const std::string host, TcpClient::TcpClient( const std::string host,
const std::string port ) const std::string port )
: Socket(AF_INET, SOCK_STREAM) : Socket(AF_INET, SOCK_STREAM)
, m_host(host) , m_host(host)
, m_port(port) , m_port(port)
, m_connected(false)
, m_watcher(*this) , m_watcher(*this)
{ {
TRACE; TRACE;
@ -26,8 +19,6 @@ TcpClient::~TcpClient()
TRACE; TRACE;
disconnect(); disconnect();
} }
@ -41,8 +32,7 @@ bool TcpClient::connect()
if ( !connectToHost(m_host, m_port) ) if ( !connectToHost(m_host, m_port) )
return false; return false;
m_connected = true; m_watcher.setOwnSocket(m_socket);
m_watcher.start(); m_watcher.start();
return true; return true;
} }
@ -53,9 +43,9 @@ void TcpClient::disconnect()
TRACE; TRACE;
closeSocket(); closeSocket();
m_connected = false;
if ( m_watcher.isRunning() ) { if ( m_watcher.isRunning() ) {
m_watcher.stopPolling();
m_watcher.stop(); m_watcher.stop();
m_watcher.join(); m_watcher.join();
} }
@ -66,9 +56,6 @@ bool TcpClient::send(const std::string msg)
{ {
TRACE; TRACE;
if ( !m_connected )
return false;
ssize_t n = write(m_socket, msg.c_str(), msg.length()); ssize_t n = write(m_socket, msg.c_str(), msg.length());
if (n == -1) { if (n == -1) {
LOG( Logger::ERR, errnoToString("ERROR writing to socket. ").c_str() ); LOG( Logger::ERR, errnoToString("ERROR writing to socket. ").c_str() );
@ -80,52 +67,36 @@ bool TcpClient::send(const std::string msg)
TcpClient::WatcherThread::WatcherThread( TcpClient &data ) TcpClient::WatcherThread::WatcherThread( TcpClient &data )
: m_tcpClient(data) : Poll(data.m_socket, 1)
, m_tcpClient(data)
{ {
TRACE; TRACE;
} }
void* TcpClient::WatcherThread::run() void TcpClient::WatcherThread::acceptClient()
{ {
TRACE; TRACE;
struct timespec tm = {0,1000}; // not accepting anything
while ( m_isRunning ) { receive( m_tcpClient.m_socket );
}
nanosleep(&tm, &tm) ;
if ( m_tcpClient.m_connected ) {
pollfd fds[1];
fds[0].fd = m_tcpClient.m_socket ;
fds[0].events = POLLIN | POLLPRI ;
fds[0].revents = 0 ;
int ret = poll( fds , 1, 1000) ;
if ( ret == -1 ) {
LOG( Logger::ERR, errnoToString("ERROR polling. ").c_str() );
m_tcpClient.m_connected = false;
m_tcpClient.onDisconnect();
}
if ( ret != 0 && !receive() ) {
m_tcpClient.m_connected = false;
m_tcpClient.onDisconnect();
}
} void TcpClient::WatcherThread::handleClient( const int fd )
} {
TRACE;
return 0; LOG( Logger::DEBUG, "Server closed the connection." );
} }
bool TcpClient::WatcherThread::receive() bool TcpClient::WatcherThread::receive( const int fd)
{ {
TRACE; TRACE;
char buffer[256]; char buffer[14];
int len = recv( m_tcpClient.m_socket, buffer , 256, 0) ; int len = recv( fd, buffer , 14, 0) ;
if (len == -1) { if (len == -1) {
LOG( Logger::ERR, errnoToString("ERROR reading from socket. ").c_str() ); LOG( Logger::ERR, errnoToString("ERROR reading from socket. ").c_str() );
@ -141,4 +112,13 @@ bool TcpClient::WatcherThread::receive()
m_tcpClient.msgArrived(msg); m_tcpClient.msgArrived(msg);
return true; return true;
} }
void* TcpClient::WatcherThread::run()
{
TRACE;
startPolling();
return 0;
}

@ -3,8 +3,6 @@
#include "Logger.hpp" #include "Logger.hpp"
#include "Common.hpp" #include "Common.hpp"
#include <poll.h>
#include <stdlib.h>
TcpServer::TcpServer( const std::string host, TcpServer::TcpServer( const std::string host,
const std::string port, const std::string port,
@ -39,7 +37,7 @@ bool TcpServer::start()
return false; return false;
} }
addFd( m_socket, POLLIN | POLLPRI ) ; setOwnSocket(m_socket);
startPolling(); startPolling();
return true; return true;
@ -59,8 +57,8 @@ bool TcpServer::receive(const int clientSocket)
{ {
TRACE; TRACE;
char buffer[256]; char buffer[10];
int len = recv( clientSocket, buffer , 256, 0) ; int len = recv( clientSocket, buffer , 10, 0) ;
if (len == -1) { if (len == -1) {
LOG( Logger::ERR, errnoToString("ERROR reading from socket. ").c_str() ); LOG( Logger::ERR, errnoToString("ERROR reading from socket. ").c_str() );

Loading…
Cancel
Save