From cff3093cad244834c7227fb01da65c66c7eba0cf Mon Sep 17 00:00:00 2001 From: Denes Matetelki Date: Mon, 14 Nov 2011 15:33:49 +0100 Subject: [PATCH] Messy Tcp connection stuff...starting the cleanup --- include/MessageBuilder.cpp | 23 +++++++ include/MessageBuilder.hpp | 32 ++++++++++ include/Poll.hpp | 3 +- include/Socket.hpp | 12 +++- include/TcpClient.hpp | 45 +++++++------- include/TcpConnection.hpp | 58 ++++++++++++++++++ include/TcpServer.hpp | 3 +- other/tcpclient_main.cpp | 40 ++++++++++--- other/tcpserver_main.cpp | 10 ++-- src/MessageBuilder.cpp | 41 +++++++++++++ src/MysqlClient.cpp | 4 -- src/Poll.cpp | 4 +- src/Socket.cpp | 56 ++++++++++++++++- src/TcpClient.cpp | 78 +++++++++++------------- src/TcpConnection.cpp | 120 +++++++++++++++++++++++++++++++++++++ src/TcpServer.cpp | 29 ++++++++- 16 files changed, 467 insertions(+), 91 deletions(-) create mode 100644 include/MessageBuilder.cpp create mode 100644 include/MessageBuilder.hpp create mode 100644 include/TcpConnection.hpp create mode 100644 src/MessageBuilder.cpp create mode 100644 src/TcpConnection.cpp diff --git a/include/MessageBuilder.cpp b/include/MessageBuilder.cpp new file mode 100644 index 0000000..855ef1e --- /dev/null +++ b/include/MessageBuilder.cpp @@ -0,0 +1,23 @@ +#ifndef MESSAGE_BUILDER_HPP +#define MESSAGE_BUILDER_HPP + +class MessageBuilder +{ +public: + + bool buildMessage( const unsigned char* message, unsigned int length ) ; + +protected: + + MessageBuilder( const int bufferLength ) ; + virtual ~MessageBuilder() ; + virtual void onMessageReady( const unsigned char* message, unsigned int length ) = 0 ; + +private: + + unsigned char *m_buffer; + int m_bufferLength; + int m_bufferUsed; +}; + +#endif // MESSAGE_BUILDER_HPP diff --git a/include/MessageBuilder.hpp b/include/MessageBuilder.hpp new file mode 100644 index 0000000..0d3c597 --- /dev/null +++ b/include/MessageBuilder.hpp @@ -0,0 +1,32 @@ +#ifndef MESSAGE_BUILDER_HPP +#define MESSAGE_BUILDER_HPP + +class TcpConnection; + + +class MessageBuilder +{ +public: + + MessageBuilder( TcpConnection *connection, + const int bufferLength = 1024 ); + + virtual ~MessageBuilder(); + + bool buildMessage( const unsigned char* message, + const int length ) ; + + +private: + + MessageBuilder(const MessageBuilder&); + MessageBuilder& operator=(const MessageBuilder&); + + + TcpConnection *m_connection; + unsigned char *m_buffer; + int m_bufferLength; + int m_bufferUsed; +}; + +#endif // MESSAGE_BUILDER_HPP diff --git a/include/Poll.hpp b/include/Poll.hpp index e767826..d0897bf 100644 --- a/include/Poll.hpp +++ b/include/Poll.hpp @@ -7,7 +7,7 @@ class Poll { public: - Poll( int &socket, const nfds_t maxClient ); + Poll( int &socket, const nfds_t maxClient = 10 ); virtual ~Poll(); void setOwnSocket( const int socket ); @@ -16,6 +16,7 @@ public: virtual void acceptClient(); virtual void handleClient( const int socket ); + virtual bool receive( const int socket ) = 0; diff --git a/include/Socket.hpp b/include/Socket.hpp index 9cf1da9..71eb014 100644 --- a/include/Socket.hpp +++ b/include/Socket.hpp @@ -14,9 +14,12 @@ public: Socket(const int domain, const int type, const int protocol = 0); + + Socket(const int socket ); + virtual ~Socket(); - bool openSocket(); + bool createSocket(); void closeSocket(); bool connectToHost(const std::string host, @@ -24,6 +27,13 @@ public: bool bindToHost(const std::string host, const std::string port ); + bool getPeerName(const int socket, + std::string &host, + std::string &port); + + bool send( const void *message, const int lenght ); + int& getSocket() const; + static bool convertNameInfo( sockaddr* addr, socklen_t addrLen, std::string &retAddr, diff --git a/include/TcpClient.hpp b/include/TcpClient.hpp index be3ed9d..b1752bf 100644 --- a/include/TcpClient.hpp +++ b/include/TcpClient.hpp @@ -1,34 +1,18 @@ #ifndef TCP_CLIENT_HPP #define TCP_CLIENT_HPP -#include "Socket.hpp" +#include "TcpConnection.hpp" +#include "MessageBuilder.hpp" #include "Thread.hpp" #include "Poll.hpp" #include -class TcpClient : public Socket +class TcpClient { - -public: - - TcpClient ( const std::string host, - const std::string port ); - - virtual ~TcpClient(); - - bool connect(); - void disconnect(); - - bool send(const std::string msg); - - private: - virtual void msgArrived(const std::string) = 0; - virtual void onDisconnect() = 0; - class WatcherThread : public Thread , public Poll { @@ -43,11 +27,28 @@ private: private: void* run(); - TcpClient &m_tcpClient; + TcpClient &m_tcpClient; }; - std::string m_host; - std::string m_port; + +public: + + TcpClient ( const std::string host, + const std::string port, + MessageBuilder *buidler ); + + virtual ~TcpClient(); + + bool connect(); + void disconnect(); + + bool send( const void* message, const int length ); + +private: + + virtual void onDisconnect() = 0; + + TcpConnection m_connection; WatcherThread m_watcher; }; diff --git a/include/TcpConnection.hpp b/include/TcpConnection.hpp new file mode 100644 index 0000000..fb66c57 --- /dev/null +++ b/include/TcpConnection.hpp @@ -0,0 +1,58 @@ +#ifndef TCP_CONNECTION_HPP +#define TCP_CONNECTION_HPP + +#include "MessageBuilder.hpp" +#include "Socket.hpp" + +#include + +class TcpConnection +{ +public: + + enum Status { + OPENED, + CLOSED + }; + + TcpConnection ( const int socket, + const MessageBuilder *m_builder, + const int bufferLenght = 1024 ); + + TcpConnection ( const std::string host, + const std::string port, + MessageBuilder *builder, + const int bufferLenght = 1024 ); + + virtual ~TcpConnection(); + + bool connectToHost(); + bool bindToHost(); + + void closeConnection(); + + bool sendMessage( const void* message, const int length ); + bool readFromSocket(); + virtual void onMessageReady ( const unsigned char * message, const int length ) = 0; + + + int getSocket() const; + + +private: + + TcpConnection(const TcpConnection&); + TcpConnection& operator=(const TcpConnection&); + + Socket m_socket; + std::string m_host; + std::string m_port; + MessageBuilder *m_builder; + Status m_status; + + unsigned char *m_buffer; + int m_bufferLength; +}; + + +#endif // TCP_CONNECTION_HPP diff --git a/include/TcpServer.hpp b/include/TcpServer.hpp index b38e231..0cec2b0 100644 --- a/include/TcpServer.hpp +++ b/include/TcpServer.hpp @@ -24,7 +24,8 @@ public: bool receive( const int fd ); virtual void msgArrived(const int clientSocket, - const std::string msg) = 0; + const unsigned char* msg, + const int msgLen ) = 0; private: diff --git a/other/tcpclient_main.cpp b/other/tcpclient_main.cpp index 6b98b1e..5b9f431 100644 --- a/other/tcpclient_main.cpp +++ b/other/tcpclient_main.cpp @@ -1,19 +1,39 @@ // gpp tcpclient_main.cpp -o client -I../include ../src/Logger.cpp ../src/TcpClient.cpp -#include "TcpClient.hpp" - #include "Logger.hpp" +#include "TcpClient.hpp" +#include "MessageBuilder.hpp" + #include #include +class DummyBuilder : public MessageBuilder +{ +private: + + void messageBuilt( const unsigned char* message, + const int length ) + { + TRACE; + + std::string reply((char *)message, length); + LOG( Logger::INFO, std::string("Got reply from server: "). + append(reply).c_str() ); + } + +}; + + class PrinterTcpClient : public TcpClient { public: PrinterTcpClient ( const std::string host, - const std::string port ) - : TcpClient(host, port) + const std::string port, + MessageBuilder *builder + ) + : TcpClient(host, port, builder) { TRACE; } @@ -40,14 +60,18 @@ int main( int argc, char * argv[] ) Logger::init(std::cout); Logger::setLogLevel(Logger::FINEST); - PrinterTcpClient tcpclient("localhost", "4455"); + MessageBuilder *builder = new DummyBuilder; + + PrinterTcpClient tcpclient("localhost", "4455", builder); tcpclient.connect(); sleep(2); - tcpclient.send("madao"); + std::string msg1("madao"); + tcpclient.send( msg1.c_str(), msg1.length()); sleep(2); - tcpclient.send("this message is long. Cannot fit into one buffer"); + std::string msg2("this message is long. Cannot fit into one buffer"); + tcpclient.send( msg2.c_str(), msg2.length()); sleep(2); // std::string reply; @@ -57,7 +81,7 @@ int main( int argc, char * argv[] ) tcpclient.disconnect(); - + delete builder; Logger::destroy(); return 0; } \ No newline at end of file diff --git a/other/tcpserver_main.cpp b/other/tcpserver_main.cpp index 06b6172..1edbbff 100644 --- a/other/tcpserver_main.cpp +++ b/other/tcpserver_main.cpp @@ -21,16 +21,18 @@ public: } void msgArrived(const int clientSocket, - const std::string msg) + const unsigned char*msg, + const int msgLen ) { TRACE; - LOG( Logger::DEBUG, std::string("Got msg: ").append(msg).c_str() ); + std::string message((char*)msg, msgLen); + LOG( Logger::DEBUG, std::string("Got msg: ").append(message).c_str() ); std::string reply("Got your msg, buddy: \""); - reply.append(msg).append("\" see you!"); + reply.append(message).append("\" see you!"); - ssize_t n = write(clientSocket, reply.c_str(), reply.length()); + ssize_t n = write(clientSocket,message.c_str(), message.length()); if (n == -1) { LOG( Logger::ERR, errnoToString("ERROR writing to socket. ").c_str() ); } diff --git a/src/MessageBuilder.cpp b/src/MessageBuilder.cpp new file mode 100644 index 0000000..5bdbb97 --- /dev/null +++ b/src/MessageBuilder.cpp @@ -0,0 +1,41 @@ +#include "MessageBuilder.hpp" + +#include "Logger.hpp" + +#include // memcpy + + + +MessageBuilder::MessageBuilder( TcpConnection *connection, + const int bufferLength ) + : m_connection(connection) + , m_buffer(0) + , m_bufferLength(bufferLength) + , m_bufferUsed(0) +{ + TRACE; + + m_buffer = new unsigned char[bufferLength]; +} + + +MessageBuilder::~MessageBuilder() +{ + TRACE; + + delete [] m_buffer ; +} + + +bool MessageBuilder::buildMessage( const unsigned char *message, + const int length ) +{ + TRACE; + + /// @todo implement composing the message + memcpy(message, m_buffer, length ); + m_bufferUsed = length; + + m_connection->onMessageReady ( m_buffer, m_bufferUsed ); + return true; +} diff --git a/src/MysqlClient.cpp b/src/MysqlClient.cpp index f74a7d8..0c0f5a3 100644 --- a/src/MysqlClient.cpp +++ b/src/MysqlClient.cpp @@ -140,10 +140,6 @@ MysqlClient::queryResultToStringList(const MYSQL_RES *res_set, { TRACE_STATIC; -// unsigned int numrows = mysql_num_rows(res_set); -// LOG( Logger::DEBUG, std::string("MySQL query returned number of rows: "). -// append(TToStr(numrows)).c_str()); - unsigned int num_fields = mysql_num_fields(const_cast(res_set)); MYSQL_ROW row; diff --git a/src/Poll.cpp b/src/Poll.cpp index f1e52a6..b25b9d4 100644 --- a/src/Poll.cpp +++ b/src/Poll.cpp @@ -42,7 +42,7 @@ void Poll::startPolling() TRACE; m_polling = true; - struct timespec tm = {0,10000}; + struct timespec tm = {0,1000}; while ( m_polling ) { @@ -64,7 +64,6 @@ void Poll::startPolling() acceptClient() : handleClient(m_fds[i].fd); - } // while } @@ -89,7 +88,6 @@ void Poll::acceptClient() LOG( Logger::ERR, errnoToString("ERROR accepting. ").c_str() ); } else { - /// @bug does not works every time std::string clientAddress, clientService; if ( Socket::convertNameInfo(&clientAddr, clientAddrLen, clientAddress, clientService ) ) { diff --git a/src/Socket.cpp b/src/Socket.cpp index c73a97d..3d07e88 100644 --- a/src/Socket.cpp +++ b/src/Socket.cpp @@ -6,6 +6,7 @@ #include #include #include // inet_ntop +#include Socket::Socket(const int domain, @@ -17,9 +18,22 @@ Socket::Socket(const int domain, , m_protocol(protocol) , m_addr() , m_addrLen(0) +{ + TRACE; +} + + +Socket::Socket(const int socket) + : m_socket(socket) + , m_domain(-1) + , m_type(-1) + , m_protocol(-1) + , m_addr() + , m_addrLen(0) { TRACE; + /// @todo get domain type prot from socket } @@ -29,7 +43,7 @@ Socket::~Socket() } -bool Socket::openSocket() +bool Socket::createSocket() { TRACE; @@ -92,6 +106,27 @@ bool Socket::bindToHost( const std::string host, } +bool Socket::send ( const void *message, const int length ) +{ + TRACE; + + if ( ::send(m_socket, message, length, MSG_NOSIGNAL) == -1 ) { + LOG( Logger::ERR, errnoToString("ERROR sending. ").c_str() ); + return false; + } + + return true; +} + + +int& Socket::getSocket() const +{ + TRACE; + + return m_socket; +} + + bool Socket::connectToFirstAddress(struct addrinfo *servinfo) { TRACE; @@ -136,6 +171,25 @@ bool Socket::bindToFirstAddress(struct addrinfo *servinfo ) } +bool Socket::getPeerName( const int socket, + std::string &host, + std::string &port ) +{ + TRACE; + + struct sockaddr_in address ; + memset(&address, 0, sizeof(address)); + socklen_t addressLength = sizeof(address) ; + getpeername( socket, (struct sockaddr*)&address, &addressLength ) ; + + unsigned int ip = address.sin_addr.s_addr ; + + char tmp[INET_ADDRSTRLEN]; + host = inet_ntop(AF_INET, &address.sin_addr, tmp, INET_ADDRSTRLEN); + port = address.sin_port; +} + + bool Socket::getHostInfo( const std::string host, const std::string port, struct addrinfo **servinfo) diff --git a/src/TcpClient.cpp b/src/TcpClient.cpp index cd2fc7e..bcc446c 100644 --- a/src/TcpClient.cpp +++ b/src/TcpClient.cpp @@ -5,10 +5,9 @@ TcpClient::TcpClient( const std::string host, - const std::string port ) - : Socket(AF_INET, SOCK_STREAM) - , m_host(host) - , m_port(port) + const std::string port, + MessageBuilder *builder ) + : m_connection (host, port, builder) , m_watcher(*this) { TRACE; @@ -26,13 +25,10 @@ bool TcpClient::connect() { TRACE; - if ( !openSocket() ) + if ( !m_connection.connectToHost() ) return false; - if ( !connectToHost(m_host, m_port) ) - return false; - - m_watcher.setOwnSocket(m_socket); + m_watcher.setOwnSocket(m_connection.getSocket()); m_watcher.start(); return true; } @@ -42,7 +38,7 @@ void TcpClient::disconnect() { TRACE; - closeSocket(); + m_connection.closeConnection(); if ( m_watcher.isRunning() ) { m_watcher.stopPolling(); @@ -52,23 +48,19 @@ void TcpClient::disconnect() } -bool TcpClient::send(const std::string msg) +bool TcpClient::send( const void* message, const int length ) { TRACE; - ssize_t n = write(m_socket, msg.c_str(), msg.length()); - if (n == -1) { - LOG( Logger::ERR, errnoToString("ERROR writing to socket. ").c_str() ); - m_watcher.stopPolling(); - return false; - } - - return true; + return m_connection.sendMessage(message, length); } + +// WatcherThread + TcpClient::WatcherThread::WatcherThread( TcpClient &data ) - : Poll(data.m_socket, 1) + : Poll(data.m_connection.getSocket()) , m_tcpClient(data) { TRACE; @@ -80,7 +72,7 @@ void TcpClient::WatcherThread::acceptClient() TRACE; // not accepting anything - receive( m_tcpClient.m_socket ); +// receive( m_tcpClient.m_socket ); } @@ -93,29 +85,29 @@ void TcpClient::WatcherThread::handleClient( const int fd ) } -bool TcpClient::WatcherThread::receive( const int fd) -{ - TRACE; - - char buffer[14]; - int len = recv( fd, buffer , 14, 0) ; - - if (len == -1) { - LOG( Logger::ERR, errnoToString("ERROR reading from socket. ").c_str() ); - return false; - } - - if (len == 0) { - LOG( Logger::DEBUG, "Connection closed by peer." ); - stopPolling(); - return false; - } - - std::string msg(buffer, len); - m_tcpClient.msgArrived(msg); - +// bool TcpClient::WatcherThread::receive( const int fd) +// { +// TRACE; + +// char buffer[14]; +// int len = recv( fd, buffer , 14, 0) ; +// +// if (len == -1) { +// LOG( Logger::ERR, errnoToString("ERROR reading from socket. ").c_str() ); +// return false; +// } +// +// if (len == 0) { +// LOG( Logger::DEBUG, "Connection closed by peer." ); +// stopPolling(); +// return false; +// } +// +// std::string msg(buffer, len); +// m_tcpClient.msgArrived(msg); +/* return true; -} +}*/ void* TcpClient::WatcherThread::run() diff --git a/src/TcpConnection.cpp b/src/TcpConnection.cpp new file mode 100644 index 0000000..71e121d --- /dev/null +++ b/src/TcpConnection.cpp @@ -0,0 +1,120 @@ +#include "TcpConnection.hpp" + +#include "Logger.hpp" +#include "Common.hpp" + +#include +#include + + +TcpConnection::TcpConnection ( const int socket, + const int bufferLenght, + MessageBuilder* builder + ) + : m_socket(socket) + , m_host() + , m_port() + , m_status(CLOSED) + , m_builder(builder) + , m_buffer(0) + , m_bufferLength(bufferLenght) + +{ + TRACE; + + m_buffer = new unsigned char[m_bufferLength]; + m_socket.getPeerName(m_host, m_port); +} + + +TcpConnection::TcpConnection ( const std::string host, + const std::string port, + const int bufferLenght, + MessageBuilder* builder ) + : m_socket(AF_INET, SOCK_STREAM) + , m_host(host) + , m_port(port) + , m_status(CLOSED) + , m_builder(builder) + , m_buffer(0) + , m_bufferLength(bufferLenght) +{ + TRACE; + + m_buffer = new unsigned char[m_bufferLength]; + m_socket.createSocket(); +} + + +TcpConnection::~TcpConnection() +{ + TRACE; + + delete[] m_buffer; + m_socket.closeSocket(); +} + + +bool TcpConnection::connectToHost() +{ + TRACE; + + return m_socket.connectToHost(m_host, m_port); +} + + +bool TcpConnection::bindToHost() +{ + TRACE; + + return m_socket.bindToHost(m_host, m_port); +} + + +void TcpConnection::closeConnection() +{ + TRACE; + + m_socket.closeSocket(); +} + + +bool TcpConnection::sendMessage( const void* message, const int length ) +{ + TRACE; + + return m_socket.send( message, length ); +} + + +int& TcpConnection::getSocket() const +{ + TRACE; + + return m_socket.getSocket(); +} + + +bool TcpConnection::readFromSocket() +{ + TRACE; + + int len = recv(m_socket, m_buffer, m_bufferLength, 0); + + if (len == -1) { + LOG( Logger::ERR, errnoToString("ERROR reading from socket. ").c_str() ); + return false; + } + + if (len == 0) { + LOG( Logger::DEBUG, "Connection closed by peer." ); + return false; + } + + if ( !m_builder ) { + onMessageReady(m_buffer, len); + return true; + } + + return m_builder->buildMessage(m_buffer, len); +} diff --git a/src/TcpServer.cpp b/src/TcpServer.cpp index 8e36622..81dbb99 100644 --- a/src/TcpServer.cpp +++ b/src/TcpServer.cpp @@ -3,6 +3,8 @@ #include "Logger.hpp" #include "Common.hpp" +#include "MessageBuilder.hpp" + TcpServer::TcpServer( const std::string host, const std::string port, @@ -29,6 +31,21 @@ bool TcpServer::start() if ( !openSocket() ) return false; +// // Set the socket REUSABLE flag for the quick restart ability. +// if (setsockopt(m_server_socket, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) +// { +// errorArrived( EC_SETSOCKOPT ) ; +// } + + +// // Set the socket NONBLOCKING flag for polling. +// if (-1 == (fc_flags = fcntl(m_server_socket, F_GETFL, 0))) +// { +// fc_flags = 0; +// } +// fcntl(m_server_socket, F_SETFL, fc_flags | O_NONBLOCK); + + if ( !bindToHost(m_host, m_port) ) return false; @@ -57,7 +74,7 @@ bool TcpServer::receive(const int clientSocket) { TRACE; - char buffer[10]; + unsigned char buffer[10]; int len = recv( clientSocket, buffer , 10, 0) ; if (len == -1) { @@ -70,8 +87,14 @@ bool TcpServer::receive(const int clientSocket) return false; } - std::string msg(buffer, len); - msgArrived(clientSocket, msg); + MessageBuilder *m_builder(0); + + if ( !m_builder ) { + msgArrived(clientSocket, buffer, len); + return true; + } + + return m_builder->buildMessage(buffer, len); return true; }