Connection has been splitted to abstract SocketConnection and concrete TcpConnection. TcpClient/Server is now depending on only SocketConnection and renamed to SocketClient/Server

master
Denes Matetelki 13 years ago
parent 271f8c25f4
commit 338b1364f1

@ -1,57 +0,0 @@
#ifndef CONNECTION_HPP
#define CONNECTION_HPP
#include "Socket.hpp"
#include "Message.hpp"
#include <string>
/** @todo make connection an iface and this class shall be a TcpConnection,
* inherited from connection */
class Connection
{
public:
Connection ( const int socket,
Message *message,
const size_t bufferLength = 1024 );
Connection ( const std::string host,
const std::string port,
Message *message,
const size_t bufferLength = 1024 );
virtual ~Connection();
Connection* create(const int socket);
bool connectToHost();
bool bindToHost();
bool listen( const int maxPendingQueueLen = 64 );
void closeConnection();
bool send( const void* message, const size_t length );
bool receive();
int getSocket() const;
std::string getHost() const;
std::string getPort() const;
private:
Connection(const Connection&);
Connection& operator=(const Connection&);
Socket m_socket;
std::string m_host;
std::string m_port;
Message *m_message;
unsigned char *m_buffer;
size_t m_bufferLength;
};
#endif // CONNECTION_HPP

@ -12,14 +12,14 @@
* getExpectedLength(). * getExpectedLength().
*/ */
class Connection; class SocketConnection;
class Message class Message
{ {
public: public:
Message( Connection *connection, Message( SocketConnection *connection,
void *msgParam = 0 ) void *msgParam = 0 )
: m_connection(connection) : m_connection(connection)
, m_param(msgParam) , m_param(msgParam)
@ -43,7 +43,7 @@ public:
const size_t msgLen ) = 0; const size_t msgLen ) = 0;
virtual void onMessageReady() = 0; virtual void onMessageReady() = 0;
void setConnection(Connection* conn ) void setConnection(SocketConnection* conn )
{ {
TRACE; TRACE;
m_connection = conn; m_connection = conn;
@ -54,7 +54,7 @@ protected:
virtual size_t getExpectedLength() = 0; virtual size_t getExpectedLength() = 0;
Connection *m_connection; SocketConnection *m_connection;
void *m_param; void *m_param;
std::string m_buffer; std::string m_buffer;

@ -1,7 +1,7 @@
#ifndef POLL_HPP #ifndef POLL_HPP
#define POLL_HPP #define POLL_HPP
#include "Connection.hpp" #include "SocketConnection.hpp"
#include <poll.h> #include <poll.h>
#include <map> #include <map>
@ -12,7 +12,7 @@ class Poll
{ {
public: public:
Poll( Connection *connection, Poll( SocketConnection *connection,
const nfds_t maxClient = 10 ); const nfds_t maxClient = 10 );
virtual ~Poll(); virtual ~Poll();
@ -41,9 +41,9 @@ private:
bool removeFd( const int socket ); bool removeFd( const int socket );
typedef typename std::map< int, Connection* > ConnectionPool; typedef typename std::map< int, SocketConnection* > ConnectionPool;
Connection *m_connection; SocketConnection *m_connection;
volatile bool m_polling; volatile bool m_polling;
ConnectionPool m_connectionPool; ConnectionPool m_connectionPool;

@ -1,8 +1,8 @@
#ifndef TCP_CLIENT_HPP #ifndef SOCKET_CLIENT_HPP
#define TCP_CLIENT_HPP #define SOCKET_CLIENT_HPP
#include "Connection.hpp" #include "SocketConnection.hpp"
#include "Thread.hpp" #include "Thread.hpp"
#include "Poll.hpp" #include "Poll.hpp"
@ -11,7 +11,7 @@
#include <stddef.h> // size_t #include <stddef.h> // size_t
class TcpClient class SocketClient
{ {
private: private:
@ -20,7 +20,7 @@ private:
{ {
public: public:
PollerThread( TcpClient* data ); PollerThread( SocketClient* data );
void stopPoller(); void stopPoller();
@ -39,18 +39,16 @@ private:
void* run(); void* run();
TcpClient *m_tcpClient; SocketClient *m_tcpClient;
}; // class PollerThread }; // class PollerThread
public: public:
TcpClient ( const std::string host, SocketClient (SocketConnection *connection );
const std::string port,
Message *message );
virtual ~TcpClient(); virtual ~SocketClient();
bool connect(); bool connect();
void disconnect(); void disconnect();
@ -61,13 +59,13 @@ public:
private: private:
TcpClient(const TcpClient& ); SocketClient(const SocketClient& );
TcpClient& operator=(const TcpClient& ); SocketClient& operator=(const SocketClient& );
Connection m_connection; SocketConnection *m_connection;
PollerThread m_watcher; PollerThread m_watcher;
}; };
#endif // TCP_CLIENT_HPP #endif // SOCKET_CLIENT_HPP

@ -0,0 +1,56 @@
#ifndef SOCKET_CONNECTION_HPP
#define SOCKET_CONNECTION_HPP
#include "Socket.hpp"
#include "Message.hpp"
#include <string>
class SocketConnection
{
public:
SocketConnection ( const int socket,
Message *message,
const size_t bufferLength = 1024 );
SocketConnection ( const std::string host,
const std::string port,
Message *message,
const size_t bufferLength = 1024 );
virtual ~SocketConnection();
virtual SocketConnection* clone(const int socket) = 0;
virtual bool connectToHost() = 0;
virtual bool bindToHost() = 0;
virtual bool listen( const int maxPendingQueueLen = 64 ) = 0;
virtual void closeConnection() = 0;
virtual bool send( const void* message, const size_t length ) = 0;
virtual bool receive() = 0;
int getSocket() const;
std::string getHost() const;
std::string getPort() const;
protected:
Socket m_socket;
std::string m_host;
std::string m_port;
Message *m_message;
unsigned char *m_buffer;
size_t m_bufferLength;
private:
SocketConnection(const SocketConnection&);
SocketConnection& operator=(const SocketConnection&);
};
#endif // SOCKET_CONNECTION_HPP

@ -0,0 +1,32 @@
#ifndef SOCKET_SERVER_HPP
#define SOCKET_SERVER_HPP
#include "SocketConnection.hpp"
#include "Poll.hpp"
class SocketServer
{
public:
SocketServer ( SocketConnection *connection,
const int maxClients = 5,
const int maxPendingQueueLen = 10 );
virtual ~SocketServer();
bool start();
void stop();
private:
SocketServer(const SocketServer&);
SocketServer& operator=(const SocketServer&);
SocketConnection *m_connection;
Poll m_poll;
const int m_maxPendingQueueLen;
};
#endif // SOCKET_SERVER_HPP

@ -1,26 +0,0 @@
#ifndef SUBJECT_HPP
#define SUBJECT_HPP
#include <list>
class Observer;
class Subject
{
public:
virtual ~Subject();
/// @todo listen only to aspect?
virtual void attach( Observer* );
virtual void detach( Observer* );
virtual void notify();
private:
/// @todo list can be a priority queue
std::list< Observer* > m_observers;
};
#endif // SUBJECT_HPP

@ -0,0 +1,45 @@
#ifndef TCP_CONNECTION_HPP
#define TCP_CONNECTION_HPP
#include "SocketConnection.hpp"
#include "Socket.hpp"
#include "Message.hpp"
#include <string>
class TcpConnection : public SocketConnection
{
public:
TcpConnection ( const int socket,
Message *message,
const size_t bufferLength = 1024 );
TcpConnection ( const std::string host,
const std::string port,
Message *message,
const size_t bufferLength = 1024 );
virtual ~TcpConnection();
SocketConnection* clone(const int socket);
bool connectToHost();
bool bindToHost();
bool listen( const int maxPendingQueueLen = 64 );
void closeConnection();
bool send( const void* message, const size_t length );
bool receive();
private:
TcpConnection(const TcpConnection&);
TcpConnection& operator=(const TcpConnection&);
};
#endif // TCP_CONNECTION_HPP

@ -1,37 +0,0 @@
#ifndef TCP_SERVER_HPP
#define TCP_SERVER_HPP
#include "Connection.hpp"
#include "Poll.hpp"
#include "Message.hpp"
#include <string>
class TcpServer
{
public:
TcpServer ( const std::string host,
const std::string port,
Message *message,
const int maxClients = 5,
const int maxPendingQueueLen = 10 );
virtual ~TcpServer();
bool start();
void stop();
private:
TcpServer(const TcpServer&);
TcpServer& operator=(const TcpServer&);
Connection m_connection;
Poll m_poll;
const int m_maxPendingQueueLen;
};
#endif // TCP_SERVER_HPP

@ -1,10 +1,11 @@
// gpp tcpclient_main.cpp -o client -I../include ../src/Logger.cpp ../src/TcpClient.cpp // gpp tcpclient_main.cpp -o client -I../include ../src/Logger.cpp ../src/Thread.cpp ../src/Socket.cpp -lpthread ../src/SocketClient.cpp ../src/Poll.cpp ../src/SocketConnection.cpp ../src/TcpConnection.cpp
#include "Logger.hpp" #include "Logger.hpp"
#include "TcpClient.hpp"
#include "Connection.hpp"
#include "Message.hpp" #include "Message.hpp"
#include "TcpConnection.hpp"
#include "SocketClient.hpp"
#include <iostream> #include <iostream>
@ -74,12 +75,10 @@ int main(int argc, char* argv[] )
bool finished = false; bool finished = false;
SimpleMessage msg(&finished); SimpleMessage msg(&finished);
TcpConnection conn(argv[1], argv[2], &msg);
SocketClient socketClient(&conn);
TcpClient tcpclient( argv[1], if ( !socketClient.connect() ) {
argv[2],
&msg);
if ( !tcpclient.connect() ) {
LOG( Logger::ERR, "Couldn't connect to server, exiting..." ); LOG( Logger::ERR, "Couldn't connect to server, exiting..." );
Logger::destroy(); Logger::destroy();
return 1; return 1;
@ -90,7 +89,7 @@ int main(int argc, char* argv[] )
// send message to server // send message to server
std::string msg1(argv[3]); std::string msg1(argv[3]);
if ( !tcpclient.send( msg1.c_str(), msg1.length()) ) { if ( !socketClient.send( msg1.c_str(), msg1.length()) ) {
LOG( Logger::ERR, "Couldn't send message to server, exiting..." ); LOG( Logger::ERR, "Couldn't send message to server, exiting..." );
Logger::destroy(); Logger::destroy();
return 1; return 1;
@ -98,10 +97,10 @@ int main(int argc, char* argv[] )
// wait for the complate &handled reply // wait for the complate &handled reply
struct timespec tm = {0,1000}; struct timespec tm = {0,1000};
while ( !finished && tcpclient.isPolling() ) while ( !finished && socketClient.isPolling() )
nanosleep(&tm, &tm) ; nanosleep(&tm, &tm) ;
tcpclient.disconnect(); socketClient.disconnect();
Logger::destroy(); Logger::destroy();
return 0; return 0;
} }

@ -1,11 +1,13 @@
// gpp tcpserver_main.cpp -o server -I../include ../src/Logger.cpp ../src/Socket.cpp // gpp tcpserver_main.cpp -o server -I../include ../src/Logger.cpp ../src/Socket.cpp -ggdb ../src/SocketServer.cpp ../src/SocketConnection.cpp ../src/Poll.cpp ../src/TcpConnection.cpp
#include "Logger.hpp" #include "Logger.hpp"
#include "Common.hpp" #include "Common.hpp"
#include "TcpServer.hpp"
#include "Message.hpp" #include "Message.hpp"
#include "TcpConnection.hpp"
#include "SocketServer.hpp"
#include <iostream> #include <iostream>
#include <string> #include <string>
@ -75,12 +77,10 @@ int main(int argc, char* argv[] )
// Logger::setNoPrefix(); // Logger::setNoPrefix();
EchoMessage msg; EchoMessage msg;
TcpConnection conn(argv[1], argv[2], &msg);
SocketServer socketServer(&conn);
TcpServer tcpServer( argv[1], if ( !socketServer.start() ) {
argv[2],
&msg );
if ( !tcpServer.start() ) {
LOG( Logger::ERR, "Failed to start TCP server, exiting..."); LOG( Logger::ERR, "Failed to start TCP server, exiting...");
Logger::destroy(); Logger::destroy();
return 1; return 1;
@ -89,7 +89,7 @@ int main(int argc, char* argv[] )
// never reached // never reached
sleep(1); sleep(1);
tcpServer.stop(); socketServer.stop();
Logger::destroy(); Logger::destroy();
return 0; return 0;
} }

@ -1,136 +0,0 @@
#include "Connection.hpp"
#include "Logger.hpp"
#include "Common.hpp"
Connection::Connection ( const int socket,
Message *message,
const size_t bufferLength )
: m_socket(socket)
, m_host()
, m_port()
, m_message(message)
, m_buffer(0)
, m_bufferLength(bufferLength)
{
TRACE;
m_socket.getPeerName(m_host, m_port);
m_buffer = new unsigned char[m_bufferLength];
}
Connection::Connection ( const std::string host,
const std::string port,
Message *message,
const size_t bufferLength )
: m_socket(AF_INET, SOCK_STREAM)
, m_host(host)
, m_port(port)
, m_message(message)
, m_buffer(0)
, m_bufferLength(bufferLength)
{
TRACE;
m_socket.createSocket();
m_buffer = new unsigned char[m_bufferLength];
}
Connection::~Connection()
{
TRACE;
m_socket.closeSocket();
delete[] m_buffer;
}
Connection* Connection::create(const int socket)
{
TRACE;
Connection *conn = new Connection( socket,
m_message->clone(),
m_bufferLength);
conn->m_message->setConnection(conn);
return conn;
}
bool Connection::connectToHost()
{
TRACE;
return m_socket.connectToHost(m_host, m_port);
}
bool Connection::bindToHost()
{
TRACE;
return m_socket.bindToHost(m_host, m_port);
}
bool Connection::listen( const int maxPendingQueueLen)
{
TRACE;
return m_socket.listen( maxPendingQueueLen );
}
void Connection::closeConnection()
{
TRACE;
m_socket.closeSocket();
}
bool Connection::send( const void* message, const size_t length )
{
TRACE;
return m_socket.send( message, length );
}
bool Connection::receive()
{
TRACE;
ssize_t length;
if ( !m_socket.receive(m_buffer, m_bufferLength, &length) ) {
if (length == -1) {
LOG( Logger::ERR, errnoToString("ERROR reading from socket. ").c_str() );
}
else if (length == 0) {
LOG( Logger::INFO, std::string("Connection closed by ").
append(m_host).append(":").append(m_port).c_str() );
}
return false;
}
LOG ( Logger::DEBUG, std::string("Received: ").
append(TToStr(length)).append(" bytes from: ").
append(m_host).append(":").append(m_port).c_str() );
return m_message->buildMessage( (void*)m_buffer, (size_t)length);
}
int Connection::getSocket() const
{
TRACE;
return m_socket.getSocket();
}
std::string Connection::getHost() const
{
TRACE;
return m_host;
}
std::string Connection::getPort() const
{
TRACE;
return m_port;
}

@ -4,7 +4,7 @@
#include "Common.hpp" #include "Common.hpp"
Poll::Poll( Connection *connection, Poll::Poll( SocketConnection *connection,
const nfds_t maxClient ) const nfds_t maxClient )
: m_connection(connection) : m_connection(connection)
, m_polling(false) , m_polling(false)
@ -85,7 +85,7 @@ void Poll::acceptClient()
return; return;
} }
Connection *connection = m_connection->create(client_socket); SocketConnection *connection = m_connection->clone(client_socket);
LOG( Logger::INFO, std::string("New client connected: "). LOG( Logger::INFO, std::string("New client connected: ").
append(connection->getHost()).append(":"). append(connection->getHost()).append(":").

@ -0,0 +1,102 @@
#include "SocketClient.hpp"
#include "Logger.hpp"
// PollerThread
SocketClient::PollerThread::PollerThread( SocketClient* data )
: Poll(data->m_connection)
, m_tcpClient(data)
{
TRACE;
}
void SocketClient::PollerThread::stopPoller()
{
TRACE;
stopPolling();
stop();
}
void SocketClient::PollerThread::acceptClient()
{
TRACE;
m_tcpClient->m_connection->receive();
stopPolling();
}
void SocketClient::PollerThread::handleClient( const int )
{
TRACE;
LOG( Logger::DEBUG, "Server closed the connection." );
stopPolling();
}
void* SocketClient::PollerThread::run()
{
TRACE;
startPolling();
return 0;
}
// SocketClient
SocketClient::SocketClient (SocketConnection *connection )
: m_connection (connection)
, m_watcher(this)
{
TRACE;
}
SocketClient::~SocketClient()
{
TRACE;
disconnect();
}
bool SocketClient::connect()
{
TRACE;
if ( !m_connection->connectToHost() )
return false;
m_watcher.start();
return true;
}
void SocketClient::disconnect()
{
TRACE;
if ( m_watcher.isRunning() ) {
m_watcher.stopPoller();
m_watcher.join();
}
m_connection->closeConnection();
}
bool SocketClient::send( const void* msg, const size_t msgLen )
{
TRACE;
return m_connection->send(msg, msgLen);
}
bool SocketClient::isPolling() const
{
TRACE;
return m_watcher.isPolling();
}

@ -0,0 +1,69 @@
#include "SocketConnection.hpp"
#include "Logger.hpp"
#include "Common.hpp"
SocketConnection::SocketConnection ( const int socket,
Message *message,
const size_t bufferLength )
: m_socket(socket)
, m_host()
, m_port()
, m_message(message)
, m_buffer(0)
, m_bufferLength(bufferLength)
{
TRACE;
m_socket.getPeerName(m_host, m_port);
m_buffer = new unsigned char[m_bufferLength];
m_message->setConnection(this);
}
SocketConnection::SocketConnection ( const std::string host,
const std::string port,
Message *message,
const size_t bufferLength )
: m_socket(AF_INET, SOCK_STREAM)
, m_host(host)
, m_port(port)
, m_message(message)
, m_buffer(0)
, m_bufferLength(bufferLength)
{
TRACE;
m_socket.createSocket();
m_buffer = new unsigned char[m_bufferLength];
m_message->setConnection(this);
}
SocketConnection::~SocketConnection()
{
TRACE;
m_socket.closeSocket();
delete[] m_buffer;
}
int SocketConnection::getSocket() const
{
TRACE;
return m_socket.getSocket();
}
std::string SocketConnection::getHost() const
{
TRACE;
return m_host;
}
std::string SocketConnection::getPort() const
{
TRACE;
return m_port;
}

@ -0,0 +1,43 @@
#include "SocketServer.hpp"
#include "Logger.hpp"
SocketServer::SocketServer ( SocketConnection *connection,
const int maxClients,
const int maxPendingQueueLen )
: m_connection(connection)
, m_poll( m_connection, maxClients)
, m_maxPendingQueueLen(maxPendingQueueLen)
{
TRACE;
}
SocketServer::~SocketServer()
{
TRACE;
}
bool SocketServer::start()
{
TRACE;
if ( !m_connection->bindToHost() )
return false;
if ( m_connection->listen( m_maxPendingQueueLen ) == -1 ) {
return false;
}
m_poll.startPolling();
return true;
}
void SocketServer::stop()
{
TRACE;
m_poll.stopPolling();
m_connection->closeConnection();
}

@ -1,106 +0,0 @@
#include "TcpClient.hpp"
#include "Logger.hpp"
// PollerThread
TcpClient::PollerThread::PollerThread( TcpClient* data )
: Poll( &(data->m_connection) )
, m_tcpClient(data)
{
TRACE;
}
void TcpClient::PollerThread::stopPoller()
{
TRACE;
stopPolling();
stop();
}
void TcpClient::PollerThread::acceptClient()
{
TRACE;
m_tcpClient->m_connection.receive();
stopPolling();
}
void TcpClient::PollerThread::handleClient( const int )
{
TRACE;
LOG( Logger::DEBUG, "Server closed the connection." );
stopPolling();
}
void* TcpClient::PollerThread::run()
{
TRACE;
startPolling();
return 0;
}
// TcpClient
TcpClient::TcpClient ( const std::string host,
const std::string port,
Message *message )
: m_connection (host, port, message)
, m_watcher(this)
{
TRACE;
message->setConnection(&m_connection);
}
TcpClient::~TcpClient()
{
TRACE;
disconnect();
}
bool TcpClient::connect()
{
TRACE;
if ( !m_connection.connectToHost() )
return false;
m_watcher.start();
return true;
}
void TcpClient::disconnect()
{
TRACE;
if ( m_watcher.isRunning() ) {
m_watcher.stopPoller();
m_watcher.join();
}
m_connection.closeConnection();
}
bool TcpClient::send( const void* msg, const size_t msgLen )
{
TRACE;
return m_connection.send(msg, msgLen);
}
bool TcpClient::isPolling() const
{
TRACE;
return m_watcher.isPolling();
}

@ -0,0 +1,98 @@
#include "TcpConnection.hpp"
#include "Logger.hpp"
#include "Common.hpp"
TcpConnection::TcpConnection ( const int socket,
Message *message,
const size_t bufferLength )
: SocketConnection(socket, message, bufferLength)
{
TRACE;
}
TcpConnection::TcpConnection ( const std::string host,
const std::string port,
Message *message,
const size_t bufferLength )
: SocketConnection(host, port, message, bufferLength)
{
TRACE;
}
TcpConnection::~TcpConnection()
{
TRACE;
}
SocketConnection* TcpConnection::clone(const int socket)
{
SocketConnection *conn = new TcpConnection(socket,
m_message->clone(),
m_bufferLength );
return conn;
}
bool TcpConnection::connectToHost()
{
TRACE;
return m_socket.connectToHost(m_host, m_port);
}
bool TcpConnection::bindToHost()
{
TRACE;
return m_socket.bindToHost(m_host, m_port);
}
bool TcpConnection::listen( const int maxPendingQueueLen )
{
TRACE;
return m_socket.listen( maxPendingQueueLen );
}
void TcpConnection::closeConnection()
{
TRACE;
m_socket.closeSocket();
}
bool TcpConnection::send( const void* message, const size_t length )
{
TRACE;
return m_socket.send( message, length );
}
bool TcpConnection::receive()
{
TRACE;
ssize_t length;
if ( !m_socket.receive(m_buffer, m_bufferLength, &length) ) {
if (length == -1) {
LOG( Logger::ERR, errnoToString("ERROR reading from socket. ").c_str() );
}
else if (length == 0) {
LOG( Logger::INFO, std::string("Connection closed by ").
append(m_host).append(":").append(m_port).c_str() );
}
return false;
}
LOG ( Logger::DEBUG, std::string("Received: ").
append(TToStr(length)).append(" bytes from: ").
append(m_host).append(":").append(m_port).c_str() );
return m_message->buildMessage( (void*)m_buffer, (size_t)length);
}

@ -1,47 +0,0 @@
#include "TcpServer.hpp"
#include "Logger.hpp"
TcpServer::TcpServer ( const std::string host,
const std::string port,
Message *message,
const int maxClients,
const int maxPendingQueueLen )
: m_connection(host, port, message)
, m_poll( &m_connection, maxClients)
, m_maxPendingQueueLen(maxPendingQueueLen)
{
TRACE;
message->setConnection(&m_connection);
}
TcpServer::~TcpServer()
{
TRACE;
}
bool TcpServer::start()
{
TRACE;
if ( !m_connection.bindToHost() )
return false;
if ( m_connection.listen( m_maxPendingQueueLen ) == -1 ) {
return false;
}
m_poll.startPolling();
return true;
}
void TcpServer::stop()
{
TRACE;
m_poll.stopPolling();
m_connection.closeConnection();
}
Loading…
Cancel
Save