Messy Tcp connection stuff...starting the cleanup

master
Denes Matetelki 13 years ago
parent 5f5d376b4b
commit cff3093cad

@ -0,0 +1,23 @@
#ifndef MESSAGE_BUILDER_HPP
#define MESSAGE_BUILDER_HPP
class MessageBuilder
{
public:
bool buildMessage( const unsigned char* message, unsigned int length ) ;
protected:
MessageBuilder( const int bufferLength ) ;
virtual ~MessageBuilder() ;
virtual void onMessageReady( const unsigned char* message, unsigned int length ) = 0 ;
private:
unsigned char *m_buffer;
int m_bufferLength;
int m_bufferUsed;
};
#endif // MESSAGE_BUILDER_HPP

@ -0,0 +1,32 @@
#ifndef MESSAGE_BUILDER_HPP
#define MESSAGE_BUILDER_HPP
class TcpConnection;
class MessageBuilder
{
public:
MessageBuilder( TcpConnection *connection,
const int bufferLength = 1024 );
virtual ~MessageBuilder();
bool buildMessage( const unsigned char* message,
const int length ) ;
private:
MessageBuilder(const MessageBuilder&);
MessageBuilder& operator=(const MessageBuilder&);
TcpConnection *m_connection;
unsigned char *m_buffer;
int m_bufferLength;
int m_bufferUsed;
};
#endif // MESSAGE_BUILDER_HPP

@ -7,7 +7,7 @@ class Poll
{ {
public: public:
Poll( int &socket, const nfds_t maxClient ); Poll( int &socket, const nfds_t maxClient = 10 );
virtual ~Poll(); virtual ~Poll();
void setOwnSocket( const int socket ); void setOwnSocket( const int socket );
@ -16,6 +16,7 @@ public:
virtual void acceptClient(); virtual void acceptClient();
virtual void handleClient( const int socket ); virtual void handleClient( const int socket );
virtual bool receive( const int socket ) = 0; virtual bool receive( const int socket ) = 0;

@ -14,9 +14,12 @@ public:
Socket(const int domain, Socket(const int domain,
const int type, const int type,
const int protocol = 0); const int protocol = 0);
Socket(const int socket );
virtual ~Socket(); virtual ~Socket();
bool openSocket(); bool createSocket();
void closeSocket(); void closeSocket();
bool connectToHost(const std::string host, bool connectToHost(const std::string host,
@ -24,6 +27,13 @@ public:
bool bindToHost(const std::string host, bool bindToHost(const std::string host,
const std::string port ); const std::string port );
bool getPeerName(const int socket,
std::string &host,
std::string &port);
bool send( const void *message, const int lenght );
int& getSocket() const;
static bool convertNameInfo( sockaddr* addr, static bool convertNameInfo( sockaddr* addr,
socklen_t addrLen, socklen_t addrLen,
std::string &retAddr, std::string &retAddr,

@ -1,34 +1,18 @@
#ifndef TCP_CLIENT_HPP #ifndef TCP_CLIENT_HPP
#define TCP_CLIENT_HPP #define TCP_CLIENT_HPP
#include "Socket.hpp" #include "TcpConnection.hpp"
#include "MessageBuilder.hpp"
#include "Thread.hpp" #include "Thread.hpp"
#include "Poll.hpp" #include "Poll.hpp"
#include <string> #include <string>
class TcpClient : public Socket class TcpClient
{ {
public:
TcpClient ( const std::string host,
const std::string port );
virtual ~TcpClient();
bool connect();
void disconnect();
bool send(const std::string msg);
private: private:
virtual void msgArrived(const std::string) = 0;
virtual void onDisconnect() = 0;
class WatcherThread : public Thread class WatcherThread : public Thread
, public Poll , public Poll
{ {
@ -46,8 +30,25 @@ private:
TcpClient &m_tcpClient; TcpClient &m_tcpClient;
}; };
std::string m_host;
std::string m_port; public:
TcpClient ( const std::string host,
const std::string port,
MessageBuilder *buidler );
virtual ~TcpClient();
bool connect();
void disconnect();
bool send( const void* message, const int length );
private:
virtual void onDisconnect() = 0;
TcpConnection m_connection;
WatcherThread m_watcher; WatcherThread m_watcher;
}; };

@ -0,0 +1,58 @@
#ifndef TCP_CONNECTION_HPP
#define TCP_CONNECTION_HPP
#include "MessageBuilder.hpp"
#include "Socket.hpp"
#include <string>
class TcpConnection
{
public:
enum Status {
OPENED,
CLOSED
};
TcpConnection ( const int socket,
const MessageBuilder *m_builder,
const int bufferLenght = 1024 );
TcpConnection ( const std::string host,
const std::string port,
MessageBuilder *builder,
const int bufferLenght = 1024 );
virtual ~TcpConnection();
bool connectToHost();
bool bindToHost();
void closeConnection();
bool sendMessage( const void* message, const int length );
bool readFromSocket();
virtual void onMessageReady ( const unsigned char * message, const int length ) = 0;
int getSocket() const;
private:
TcpConnection(const TcpConnection&);
TcpConnection& operator=(const TcpConnection&);
Socket m_socket;
std::string m_host;
std::string m_port;
MessageBuilder *m_builder;
Status m_status;
unsigned char *m_buffer;
int m_bufferLength;
};
#endif // TCP_CONNECTION_HPP

@ -24,7 +24,8 @@ public:
bool receive( const int fd ); bool receive( const int fd );
virtual void msgArrived(const int clientSocket, virtual void msgArrived(const int clientSocket,
const std::string msg) = 0; const unsigned char* msg,
const int msgLen ) = 0;
private: private:

@ -1,19 +1,39 @@
// gpp tcpclient_main.cpp -o client -I../include ../src/Logger.cpp ../src/TcpClient.cpp // gpp tcpclient_main.cpp -o client -I../include ../src/Logger.cpp ../src/TcpClient.cpp
#include "TcpClient.hpp"
#include "Logger.hpp" #include "Logger.hpp"
#include "TcpClient.hpp"
#include "MessageBuilder.hpp"
#include <iostream> #include <iostream>
#include <string> #include <string>
class DummyBuilder : public MessageBuilder
{
private:
void messageBuilt( const unsigned char* message,
const int length )
{
TRACE;
std::string reply((char *)message, length);
LOG( Logger::INFO, std::string("Got reply from server: ").
append(reply).c_str() );
}
};
class PrinterTcpClient : public TcpClient class PrinterTcpClient : public TcpClient
{ {
public: public:
PrinterTcpClient ( const std::string host, PrinterTcpClient ( const std::string host,
const std::string port ) const std::string port,
: TcpClient(host, port) MessageBuilder *builder
)
: TcpClient(host, port, builder)
{ {
TRACE; TRACE;
} }
@ -40,14 +60,18 @@ int main( int argc, char * argv[] )
Logger::init(std::cout); Logger::init(std::cout);
Logger::setLogLevel(Logger::FINEST); Logger::setLogLevel(Logger::FINEST);
PrinterTcpClient tcpclient("localhost", "4455"); MessageBuilder *builder = new DummyBuilder;
PrinterTcpClient tcpclient("localhost", "4455", builder);
tcpclient.connect(); tcpclient.connect();
sleep(2); sleep(2);
tcpclient.send("madao"); std::string msg1("madao");
tcpclient.send( msg1.c_str(), msg1.length());
sleep(2); sleep(2);
tcpclient.send("this message is long. Cannot fit into one buffer"); std::string msg2("this message is long. Cannot fit into one buffer");
tcpclient.send( msg2.c_str(), msg2.length());
sleep(2); sleep(2);
// std::string reply; // std::string reply;
@ -57,7 +81,7 @@ int main( int argc, char * argv[] )
tcpclient.disconnect(); tcpclient.disconnect();
delete builder;
Logger::destroy(); Logger::destroy();
return 0; return 0;
} }

@ -21,16 +21,18 @@ public:
} }
void msgArrived(const int clientSocket, void msgArrived(const int clientSocket,
const std::string msg) const unsigned char*msg,
const int msgLen )
{ {
TRACE; TRACE;
LOG( Logger::DEBUG, std::string("Got msg: ").append(msg).c_str() ); std::string message((char*)msg, msgLen);
LOG( Logger::DEBUG, std::string("Got msg: ").append(message).c_str() );
std::string reply("Got your msg, buddy: \""); std::string reply("Got your msg, buddy: \"");
reply.append(msg).append("\" see you!"); reply.append(message).append("\" see you!");
ssize_t n = write(clientSocket, reply.c_str(), reply.length()); ssize_t n = write(clientSocket,message.c_str(), message.length());
if (n == -1) { if (n == -1) {
LOG( Logger::ERR, errnoToString("ERROR writing to socket. ").c_str() ); LOG( Logger::ERR, errnoToString("ERROR writing to socket. ").c_str() );
} }

@ -0,0 +1,41 @@
#include "MessageBuilder.hpp"
#include "Logger.hpp"
#include <string.h> // memcpy
MessageBuilder::MessageBuilder( TcpConnection *connection,
const int bufferLength )
: m_connection(connection)
, m_buffer(0)
, m_bufferLength(bufferLength)
, m_bufferUsed(0)
{
TRACE;
m_buffer = new unsigned char[bufferLength];
}
MessageBuilder::~MessageBuilder()
{
TRACE;
delete [] m_buffer ;
}
bool MessageBuilder::buildMessage( const unsigned char *message,
const int length )
{
TRACE;
/// @todo implement composing the message
memcpy(message, m_buffer, length );
m_bufferUsed = length;
m_connection->onMessageReady ( m_buffer, m_bufferUsed );
return true;
}

@ -140,10 +140,6 @@ MysqlClient::queryResultToStringList(const MYSQL_RES *res_set,
{ {
TRACE_STATIC; TRACE_STATIC;
// unsigned int numrows = mysql_num_rows(res_set);
// LOG( Logger::DEBUG, std::string("MySQL query returned number of rows: ").
// append(TToStr(numrows)).c_str());
unsigned int num_fields = mysql_num_fields(const_cast<MYSQL_RES *>(res_set)); unsigned int num_fields = mysql_num_fields(const_cast<MYSQL_RES *>(res_set));
MYSQL_ROW row; MYSQL_ROW row;

@ -42,7 +42,7 @@ void Poll::startPolling()
TRACE; TRACE;
m_polling = true; m_polling = true;
struct timespec tm = {0,10000}; struct timespec tm = {0,1000};
while ( m_polling ) { while ( m_polling ) {
@ -64,7 +64,6 @@ void Poll::startPolling()
acceptClient() : acceptClient() :
handleClient(m_fds[i].fd); handleClient(m_fds[i].fd);
} // while } // while
} }
@ -89,7 +88,6 @@ void Poll::acceptClient()
LOG( Logger::ERR, errnoToString("ERROR accepting. ").c_str() ); LOG( Logger::ERR, errnoToString("ERROR accepting. ").c_str() );
} else { } else {
/// @bug does not works every time
std::string clientAddress, clientService; std::string clientAddress, clientService;
if ( Socket::convertNameInfo(&clientAddr, clientAddrLen, if ( Socket::convertNameInfo(&clientAddr, clientAddrLen,
clientAddress, clientService ) ) { clientAddress, clientService ) ) {

@ -6,6 +6,7 @@
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <arpa/inet.h> // inet_ntop #include <arpa/inet.h> // inet_ntop
#include <sys/select.h>
Socket::Socket(const int domain, Socket::Socket(const int domain,
@ -17,9 +18,22 @@ Socket::Socket(const int domain,
, m_protocol(protocol) , m_protocol(protocol)
, m_addr() , m_addr()
, m_addrLen(0) , m_addrLen(0)
{
TRACE;
}
Socket::Socket(const int socket)
: m_socket(socket)
, m_domain(-1)
, m_type(-1)
, m_protocol(-1)
, m_addr()
, m_addrLen(0)
{ {
TRACE; TRACE;
/// @todo get domain type prot from socket
} }
@ -29,7 +43,7 @@ Socket::~Socket()
} }
bool Socket::openSocket() bool Socket::createSocket()
{ {
TRACE; TRACE;
@ -92,6 +106,27 @@ bool Socket::bindToHost( const std::string host,
} }
bool Socket::send ( const void *message, const int length )
{
TRACE;
if ( ::send(m_socket, message, length, MSG_NOSIGNAL) == -1 ) {
LOG( Logger::ERR, errnoToString("ERROR sending. ").c_str() );
return false;
}
return true;
}
int& Socket::getSocket() const
{
TRACE;
return m_socket;
}
bool Socket::connectToFirstAddress(struct addrinfo *servinfo) bool Socket::connectToFirstAddress(struct addrinfo *servinfo)
{ {
TRACE; TRACE;
@ -136,6 +171,25 @@ bool Socket::bindToFirstAddress(struct addrinfo *servinfo )
} }
bool Socket::getPeerName( const int socket,
std::string &host,
std::string &port )
{
TRACE;
struct sockaddr_in address ;
memset(&address, 0, sizeof(address));
socklen_t addressLength = sizeof(address) ;
getpeername( socket, (struct sockaddr*)&address, &addressLength ) ;
unsigned int ip = address.sin_addr.s_addr ;
char tmp[INET_ADDRSTRLEN];
host = inet_ntop(AF_INET, &address.sin_addr, tmp, INET_ADDRSTRLEN);
port = address.sin_port;
}
bool Socket::getHostInfo( const std::string host, bool Socket::getHostInfo( const std::string host,
const std::string port, const std::string port,
struct addrinfo **servinfo) struct addrinfo **servinfo)

@ -5,10 +5,9 @@
TcpClient::TcpClient( const std::string host, TcpClient::TcpClient( const std::string host,
const std::string port ) const std::string port,
: Socket(AF_INET, SOCK_STREAM) MessageBuilder *builder )
, m_host(host) : m_connection (host, port, builder)
, m_port(port)
, m_watcher(*this) , m_watcher(*this)
{ {
TRACE; TRACE;
@ -26,13 +25,10 @@ bool TcpClient::connect()
{ {
TRACE; TRACE;
if ( !openSocket() ) if ( !m_connection.connectToHost() )
return false; return false;
if ( !connectToHost(m_host, m_port) ) m_watcher.setOwnSocket(m_connection.getSocket());
return false;
m_watcher.setOwnSocket(m_socket);
m_watcher.start(); m_watcher.start();
return true; return true;
} }
@ -42,7 +38,7 @@ void TcpClient::disconnect()
{ {
TRACE; TRACE;
closeSocket(); m_connection.closeConnection();
if ( m_watcher.isRunning() ) { if ( m_watcher.isRunning() ) {
m_watcher.stopPolling(); m_watcher.stopPolling();
@ -52,23 +48,19 @@ void TcpClient::disconnect()
} }
bool TcpClient::send(const std::string msg) bool TcpClient::send( const void* message, const int length )
{ {
TRACE; TRACE;
ssize_t n = write(m_socket, msg.c_str(), msg.length()); return m_connection.sendMessage(message, length);
if (n == -1) {
LOG( Logger::ERR, errnoToString("ERROR writing to socket. ").c_str() );
m_watcher.stopPolling();
return false;
} }
return true;
}
// WatcherThread
TcpClient::WatcherThread::WatcherThread( TcpClient &data ) TcpClient::WatcherThread::WatcherThread( TcpClient &data )
: Poll(data.m_socket, 1) : Poll(data.m_connection.getSocket())
, m_tcpClient(data) , m_tcpClient(data)
{ {
TRACE; TRACE;
@ -80,7 +72,7 @@ void TcpClient::WatcherThread::acceptClient()
TRACE; TRACE;
// not accepting anything // not accepting anything
receive( m_tcpClient.m_socket ); // receive( m_tcpClient.m_socket );
} }
@ -93,29 +85,29 @@ void TcpClient::WatcherThread::handleClient( const int fd )
} }
bool TcpClient::WatcherThread::receive( const int fd) // bool TcpClient::WatcherThread::receive( const int fd)
{ // {
TRACE; // TRACE;
char buffer[14]; // char buffer[14];
int len = recv( fd, buffer , 14, 0) ; // int len = recv( fd, buffer , 14, 0) ;
//
if (len == -1) { // if (len == -1) {
LOG( Logger::ERR, errnoToString("ERROR reading from socket. ").c_str() ); // LOG( Logger::ERR, errnoToString("ERROR reading from socket. ").c_str() );
return false; // return false;
} // }
//
if (len == 0) { // if (len == 0) {
LOG( Logger::DEBUG, "Connection closed by peer." ); // LOG( Logger::DEBUG, "Connection closed by peer." );
stopPolling(); // stopPolling();
return false; // return false;
} // }
//
std::string msg(buffer, len); // std::string msg(buffer, len);
m_tcpClient.msgArrived(msg); // m_tcpClient.msgArrived(msg);
/*
return true; return true;
} }*/
void* TcpClient::WatcherThread::run() void* TcpClient::WatcherThread::run()

@ -0,0 +1,120 @@
#include "TcpConnection.hpp"
#include "Logger.hpp"
#include "Common.hpp"
#include <sys/types.h>
#include <sys/socket.h>
TcpConnection::TcpConnection ( const int socket,
const int bufferLenght,
MessageBuilder* builder
)
: m_socket(socket)
, m_host()
, m_port()
, m_status(CLOSED)
, m_builder(builder)
, m_buffer(0)
, m_bufferLength(bufferLenght)
{
TRACE;
m_buffer = new unsigned char[m_bufferLength];
m_socket.getPeerName(m_host, m_port);
}
TcpConnection::TcpConnection ( const std::string host,
const std::string port,
const int bufferLenght,
MessageBuilder* builder )
: m_socket(AF_INET, SOCK_STREAM)
, m_host(host)
, m_port(port)
, m_status(CLOSED)
, m_builder(builder)
, m_buffer(0)
, m_bufferLength(bufferLenght)
{
TRACE;
m_buffer = new unsigned char[m_bufferLength];
m_socket.createSocket();
}
TcpConnection::~TcpConnection()
{
TRACE;
delete[] m_buffer;
m_socket.closeSocket();
}
bool TcpConnection::connectToHost()
{
TRACE;
return m_socket.connectToHost(m_host, m_port);
}
bool TcpConnection::bindToHost()
{
TRACE;
return m_socket.bindToHost(m_host, m_port);
}
void TcpConnection::closeConnection()
{
TRACE;
m_socket.closeSocket();
}
bool TcpConnection::sendMessage( const void* message, const int length )
{
TRACE;
return m_socket.send( message, length );
}
int& TcpConnection::getSocket() const
{
TRACE;
return m_socket.getSocket();
}
bool TcpConnection::readFromSocket()
{
TRACE;
int len = recv(m_socket, m_buffer, m_bufferLength, 0);
if (len == -1) {
LOG( Logger::ERR, errnoToString("ERROR reading from socket. ").c_str() );
return false;
}
if (len == 0) {
LOG( Logger::DEBUG, "Connection closed by peer." );
return false;
}
if ( !m_builder ) {
onMessageReady(m_buffer, len);
return true;
}
return m_builder->buildMessage(m_buffer, len);
}

@ -3,6 +3,8 @@
#include "Logger.hpp" #include "Logger.hpp"
#include "Common.hpp" #include "Common.hpp"
#include "MessageBuilder.hpp"
TcpServer::TcpServer( const std::string host, TcpServer::TcpServer( const std::string host,
const std::string port, const std::string port,
@ -29,6 +31,21 @@ bool TcpServer::start()
if ( !openSocket() ) if ( !openSocket() )
return false; return false;
// // Set the socket REUSABLE flag for the quick restart ability.
// if (setsockopt(m_server_socket, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
// {
// errorArrived( EC_SETSOCKOPT ) ;
// }
// // Set the socket NONBLOCKING flag for polling.
// if (-1 == (fc_flags = fcntl(m_server_socket, F_GETFL, 0)))
// {
// fc_flags = 0;
// }
// fcntl(m_server_socket, F_SETFL, fc_flags | O_NONBLOCK);
if ( !bindToHost(m_host, m_port) ) if ( !bindToHost(m_host, m_port) )
return false; return false;
@ -57,7 +74,7 @@ bool TcpServer::receive(const int clientSocket)
{ {
TRACE; TRACE;
char buffer[10]; unsigned char buffer[10];
int len = recv( clientSocket, buffer , 10, 0) ; int len = recv( clientSocket, buffer , 10, 0) ;
if (len == -1) { if (len == -1) {
@ -70,8 +87,14 @@ bool TcpServer::receive(const int clientSocket)
return false; return false;
} }
std::string msg(buffer, len); MessageBuilder *m_builder(0);
msgArrived(clientSocket, msg);
if ( !m_builder ) {
msgArrived(clientSocket, buffer, len);
return true;
}
return m_builder->buildMessage(buffer, len);
return true; return true;
} }

Loading…
Cancel
Save