OSG::GroupSockConnection Class Reference

#include <OSGGroupSockConnection.h>

Inheritance diagram for OSG::GroupSockConnection:

OSG::GroupConnection OSG::Connection OSG::ExceptionBinaryDataHandler OSG::GroupMCastConnection OSG::GroupSockPipeline List of all members.

Public Types

public types


typedef Int32 Channel

Public Member Functions

Constructors


 GroupSockConnection (void)
virtual ~GroupSockConnection (void)
type info


virtual const ConnectionTypegetType (void)
connection


virtual Channel connectPoint (const std::string &address, Time timeout=-1)
virtual void disconnect (Channel channel)
virtual Channel acceptPoint (Time timeout=-1)
virtual std::string bind (const std::string &interf)
params


virtual void setParams (const std::string &params)
channel handling


virtual Channel selectChannel (Time timeout=-1) throw (ReadError)
channel handling


UInt32 getChannelCount (void)
void addSelection (Channel channel)
void subSelection (Channel channel)
void clearSelection (void)
void resetSelection (void)
UInt32 getSelectionCount (void)
group address


void setDestination (const std::string &destination)
std::string getDestination (void)
channel handling


const std::string & getInterface (void)
void setInterface (const std::string &interf)
Put


void put (void const *src, UInt32 size)
void putAndFree (MemoryHandle src, UInt32 size)
void putValue (const bool &value)
void putValue (const UInt8 &value)
void putValue (const UInt16 &value)
void putValue (const UInt32 &value)
void putValue (const UInt64 &value)
void putValue (const Int8 &value)
void putValue (const Int16 &value)
void putValue (const Int32 &value)
void putValue (const Int64 &value)
void putValue (const Real16 &value)
void putValue (const Fixed32 &value)
void putValue (const Real32 &value)
void putValue (const Real64 &value)
void putValue (const Real128 &value)
void putValue (const std::string &value)
void putValues (const bool *value, UInt32 size)
void putValues (const UInt8 *value, UInt32 size)
void putValues (const UInt16 *value, UInt32 size)
void putValues (const UInt32 *value, UInt32 size)
void putValues (const UInt64 *value, UInt32 size)
void putValues (const Int8 *value, UInt32 size)
void putValues (const Int16 *value, UInt32 size)
void putValues (const Int32 *value, UInt32 size)
void putValues (const Int64 *value, UInt32 size)
void putValues (const Real16 *value, UInt32 size)
void putValues (const Fixed32 *value, UInt32 size)
void putValues (const Real32 *value, UInt32 size)
void putValues (const Real64 *value, UInt32 size)
void putValues (const Real128 *value, UInt32 size)
void putValues (const std::string *value, UInt32 size)
Get


void get (void *dst, UInt32 size) throw (ReadError)
void getAndAlloc (MemoryHandle &src, UInt32 size) throw (ReadError)
void getValue (bool &value) throw (ReadError)
void getValue (UInt8 &value) throw (ReadError)
void getValue (UInt16 &value) throw (ReadError)
void getValue (UInt32 &value) throw (ReadError)
void getValue (UInt64 &value) throw (ReadError)
void getValue (Int8 &value) throw (ReadError)
void getValue (Int16 &value) throw (ReadError)
void getValue (Int32 &value) throw (ReadError)
void getValue (Int64 &value) throw (ReadError)
void getValue (Real16 &value) throw (ReadError)
void getValue (Fixed32 &value) throw (ReadError)
void getValue (Real32 &value) throw (ReadError)
void getValue (Real64 &value) throw (ReadError)
void getValue (Real128 &value) throw (ReadError)
void getValue (std::string &value) throw (ReadError)
void getValues (bool *value, UInt32 size) throw (ReadError)
void getValues (UInt8 *value, UInt32 size) throw (ReadError)
void getValues (UInt16 *value, UInt32 size) throw (ReadError)
void getValues (UInt32 *value, UInt32 size) throw (ReadError)
void getValues (UInt64 *value, UInt32 size) throw (ReadError)
void getValues (Int8 *value, UInt32 size) throw (ReadError)
void getValues (Int16 *value, UInt32 size) throw (ReadError)
void getValues (Int32 *value, UInt32 size) throw (ReadError)
void getValues (Int64 *value, UInt32 size) throw (ReadError)
void getValues (Real16 *value, UInt32 size) throw (ReadError)
void getValues (Fixed32 *value, UInt32 size) throw (ReadError)
void getValues (Real32 *value, UInt32 size) throw (ReadError)
void getValues (Real64 *value, UInt32 size) throw (ReadError)
void getValues (Real128 *value, UInt32 size) throw (ReadError)
void getValues (std::string *value, UInt32 size) throw (ReadError)
Helper


virtual void forceCopy (void)
virtual void forceDirectIO (void)
void flush (void)
 write data not yet written
void setNetworkOrder (bool value)
bool getNetworkOrder (void)

Static Public Member Functions

create


static GroupConnectioncreate (void)
 create conneciton

Protected Types

typedef std::vector< MemoryBlock > BuffersT
typedef std::list< MemoryHandleFreeMemT
protected types


typedef Int32 ChannelIndex

Protected Member Functions

IO Implementation


virtual void read (MemoryHandle mem, UInt32 size)
virtual void readBuffer (void) throw (ReadError)
virtual void write (MemoryHandle mem, UInt32 size)
virtual void writeBuffer (void)
synchronisation


virtual bool wait (Time timeout) throw (ReadError)
virtual void signal (void) throw (WriteError)
internal channel handling


Channel newChannelIndex (ChannelIndex index)
void delChannelIndex (ChannelIndex index)
channel index mapping


ChannelIndex channelToIndex (Channel channel) const
Channel indexToChannel (ChannelIndex index) const
Read


BuffersT::iterator readBufBegin (void)
BuffersT::iterator readBufEnd (void)
void readBufAdd (MemoryHandle mem, UInt32 size, UInt32 dataSize=0)
void readBufClear (void)
Write


BuffersT::iterator writeBufBegin (void)
BuffersT::iterator writeBufEnd (void)
void writeBufAdd (MemoryHandle mem, UInt32 size, UInt32 dataSize=0)
void writeBufClear (void)
Helper


bool isReadBufferEmpty (void)

Protected Attributes

members


StreamSocket _acceptSocket
std::vector< StreamSocket_sockets
std::vector< SocketAddress_remoteAddresses
ChannelIndex _readIndex
std::vector< UInt8 > _socketReadBuffer
std::vector< UInt8 > _socketWriteBuffer
protected fields


std::vector< UInt8 > _selection
std::string _destination
std::set< Channel_disconnectedChannel
protected members


std::string _interface
Member


BuffersT _readBuffers
BuffersT _writeBuffers
BuffersT _zeroCopyBuffers
UInt32 _zeroCopyThreshold
FreeMemT _freeMem
BuffersT::iterator _currentReadBuffer
UInt32 _currentReadBufferPos
BuffersT::iterator _currentWriteBuffer
UInt32 _currentWriteBufferPos
bool _networkOrder

Private Types

typedef GroupConnection Inherited

Private Member Functions

 GroupSockConnection (const GroupSockConnection &source)
GroupSockConnectionoperator= (const GroupSockConnection &source)

Static Private Member Functions

internal methods


static bool connectSocket (StreamSocket &socket, std::string address, SocketAddress &destination, Time timeout)
static bool acceptSocket (StreamSocket &accept, StreamSocket &from, SocketAddress &destination, Time timeout)

Static Private Attributes

static type


static ConnectionType _type

Friends

class PointSockConnection

Classes

struct  SocketBufferHeader

Detailed Description

Definition at line 62 of file OSGGroupSockConnection.h.


Member Typedef Documentation

Reimplemented from OSG::GroupConnection.

Reimplemented in OSG::GroupSockPipeline, and OSG::GroupMCastConnection.

Definition at line 185 of file OSGGroupSockConnection.h.

typedef Int32 OSG::GroupConnection::ChannelIndex [protected, inherited]

Definition at line 123 of file OSGGroupConnection.h.

typedef Int32 OSG::Connection::Channel [inherited]

Definition at line 71 of file OSGConnection.h.

typedef std::vector<MemoryBlock> OSG::ExceptionBinaryDataHandler::BuffersT [protected, inherited]

Definition at line 229 of file OSGExceptionBinaryDataHandler.h.

typedef std::list<MemoryHandle> OSG::ExceptionBinaryDataHandler::FreeMemT [protected, inherited]

Definition at line 230 of file OSGExceptionBinaryDataHandler.h.


Constructor & Destructor Documentation

GroupSockConnection::GroupSockConnection ( void   ) 

Definition at line 67 of file OSGGroupSockConnection.cpp.

References _acceptSocket, _socketReadBuffer, _socketWriteBuffer, OSG::StreamSocket::open(), OSG::ExceptionBinaryDataHandler::readBufAdd(), OSG::Socket::setReusePort(), and OSG::ExceptionBinaryDataHandler::writeBufAdd().

Referenced by create().

00067                                         :
00068     GroupConnection(0)
00069 {
00070     _acceptSocket.open();
00071     _acceptSocket.setReusePort(true);
00072 
00073     _socketReadBuffer.resize(131071);
00074     _socketWriteBuffer.resize( _socketReadBuffer.size() );
00075     // reserve first bytes for buffer size
00076     readBufAdd (&_socketReadBuffer [sizeof(SocketBufferHeader)],
00077                 _socketReadBuffer.size() -sizeof(SocketBufferHeader));
00078     writeBufAdd(&_socketWriteBuffer[sizeof(SocketBufferHeader)],
00079                 _socketWriteBuffer.size()-sizeof(SocketBufferHeader));
00080 }

GroupSockConnection::~GroupSockConnection ( void   )  [virtual]

Destructor

Definition at line 84 of file OSGGroupSockConnection.cpp.

References _acceptSocket, _sockets, and OSG::StreamSocket::close().

00085 {
00086     // close and remove sockets
00087     while(_sockets.size())
00088     {
00089         try
00090         {
00091             _sockets.begin()->close();
00092             _sockets.erase(_sockets.begin());
00093         }
00094         catch(...)
00095         {
00096         }
00097     }
00098     _acceptSocket.close();
00099 }

OSG::GroupSockConnection::GroupSockConnection ( const GroupSockConnection source  )  [private]


Member Function Documentation

const ConnectionType * GroupSockConnection::getType ( void   )  [virtual]

Implements OSG::GroupConnection.

Reimplemented in OSG::GroupSockPipeline, and OSG::GroupMCastConnection.

Definition at line 103 of file OSGGroupSockConnection.cpp.

References _type.

00104 {
00105     return &_type;
00106 }

GroupConnection::Channel GroupSockConnection::connectPoint ( const std::string &  address,
Time  timeout = -1 
) [virtual]

Implements OSG::Connection.

Reimplemented in OSG::GroupSockPipeline, and OSG::GroupMCastConnection.

Definition at line 114 of file OSGGroupSockConnection.cpp.

References _readIndex, _remoteAddresses, _sockets, connectSocket(), and OSG::GroupConnection::newChannelIndex().

00117 {
00118     Channel channel = -1;
00119     StreamSocket socket;
00120     SocketAddress destination;
00121     if(connectSocket(socket,address,destination,timeout))
00122     {
00123         channel = newChannelIndex(_sockets.size());
00124         _sockets.push_back(socket);
00125         _remoteAddresses.push_back(destination);
00126         _readIndex = 0;
00127     }
00128     return channel;
00129 }

void GroupSockConnection::disconnect ( Channel  channel  )  [virtual]

disconnect the given channel

Implements OSG::GroupConnection.

Reimplemented in OSG::GroupSockPipeline, and OSG::GroupMCastConnection.

Definition at line 133 of file OSGGroupSockConnection.cpp.

References _readIndex, _sockets, OSG::GroupConnection::channelToIndex(), and OSG::GroupConnection::delChannelIndex().

00134 {
00135     ChannelIndex index = channelToIndex(channel);
00136     try
00137     {
00138         _sockets[index].close();
00139     }
00140     catch(...)
00141     {
00142     }
00143     _sockets.erase(_sockets.begin() + index);
00144     delChannelIndex(index);
00145     _readIndex = 0;
00146 }

GroupConnection::Channel GroupSockConnection::acceptPoint ( Time  timeout = -1  )  [virtual]

accept an icomming point connection. If timeout is reached, -1 is returned. If timeout is -1 then wait without timeout

Implements OSG::Connection.

Reimplemented in OSG::GroupSockPipeline, and OSG::GroupMCastConnection.

Definition at line 151 of file OSGGroupSockConnection.cpp.

References _acceptSocket, _readIndex, _remoteAddresses, _sockets, acceptSocket(), and OSG::GroupConnection::newChannelIndex().

00152 {
00153     StreamSocket from;
00154     SocketAddress destination;
00155     if(GroupSockConnection::acceptSocket(_acceptSocket,
00156                                          from,
00157                                          destination,
00158                                          timeout))
00159     {
00160         Channel channel = newChannelIndex(_sockets.size());
00161         _sockets.push_back(from);
00162         _remoteAddresses.push_back(destination);
00163         _readIndex = 0;
00164         return channel;
00165     }
00166     else
00167     {
00168         return -1;
00169     }
00170 }

std::string GroupSockConnection::bind ( const std::string &  address  )  [virtual]

bind the connection to an network interface. The address is returned, on wich the port could be connected. The interface is determined by the connection interface filed and the address parameter. Address can be empty, wich means to use a free port or address can contain a port number.

Implements OSG::Connection.

Definition at line 178 of file OSGGroupSockConnection.cpp.

References _acceptSocket, OSG::Socket::bind(), OSG::Socket::getAddress(), OSG::SocketAddress::getHost(), OSG::Connection::getInterface(), OSG::SocketAddress::getPort(), OSG::Socket::listen(), OSG::osgGetHostname(), OSG::Socket::setReusePort(), and SINFO.

00179 {
00180     int         port=0;
00181     char        localhost[256];
00182     char        portStr[256];
00183     std::string interf;
00184     std::string boundedAddress;
00185 
00186     // get local host name
00187     osgGetHostname(localhost,255);
00188     if(!getInterface().empty())
00189         interf = getInterface();
00190     else
00191         interf = localhost;
00192     // parse address
00193     if(!address.empty())
00194         if(sscanf(address.c_str(),"%*[^:]:%d",&port) != 1)
00195             if(sscanf(address.c_str(),":%d",&port) != 1)
00196                 port = 0;
00197     // bind port
00198     _acceptSocket.setReusePort(true);
00199     _acceptSocket.bind(SocketAddress(interf.c_str(),port));
00200     SINFO << "Connection bound to "
00201           << _acceptSocket.getAddress().getHost() << ":"
00202           << _acceptSocket.getAddress().getPort() << std::endl;
00203     _acceptSocket.listen();
00204     // create address
00205     sprintf(portStr,"%d",_acceptSocket.getAddress().getPort());
00206     return interf + ":" + portStr;
00207 }

void GroupSockConnection::setParams ( const std::string &  params  )  [virtual]

Reimplemented from OSG::Connection.

Reimplemented in OSG::GroupMCastConnection.

Definition at line 211 of file OSGGroupSockConnection.cpp.

References _socketReadBuffer, _socketWriteBuffer, FINFO, OSG::ExceptionBinaryDataHandler::readBufAdd(), OSG::ExceptionBinaryDataHandler::readBufClear(), OSG::ExceptionBinaryDataHandler::writeBufAdd(), and OSG::ExceptionBinaryDataHandler::writeBufClear().

00212 {
00213     if(params.empty())
00214         return;
00215 
00216     std::string option = "bufferSize=";
00217     std::string::size_type i = 0;
00218     if((i=params.find(option)) != std::string::npos)
00219     {
00220         std::string str = params.substr(i + option.size());
00221 
00222         std::stringstream ss;
00223         std::string::size_type j = 0;
00224         while(j < str.length() && str[j] != ',' && isdigit(str[j]))
00225         {
00226             ss << str[j++];
00227         }
00228         UInt32 bufferSize;
00229         ss >> bufferSize;
00230 
00231         // clear old buffer.
00232         readBufClear();
00233         writeBufClear();
00234 
00235         _socketReadBuffer.resize(bufferSize);
00236         _socketWriteBuffer.resize(_socketReadBuffer.size());
00237         
00238         // reserve first bytes for buffer size
00239         readBufAdd (&_socketReadBuffer [sizeof(SocketBufferHeader)],
00240                     _socketReadBuffer.size() -sizeof(SocketBufferHeader));
00241         writeBufAdd(&_socketWriteBuffer[sizeof(SocketBufferHeader)],
00242                     _socketWriteBuffer.size()-sizeof(SocketBufferHeader));
00243 
00244         FINFO(("GroupSockConnection::setParams : setting buffer size to %u.\n", bufferSize));
00245     }
00246 }

Connection::Channel GroupSockConnection::selectChannel ( Time  timeout = -1  )  throw (ReadError) [virtual]

Implements OSG::Connection.

Definition at line 254 of file OSGGroupSockConnection.cpp.

References OSG::ExceptionBinaryDataHandler::_currentReadBuffer, _readIndex, OSG::GroupConnection::_selection, _sockets, OSG::ExceptionBinaryDataHandler::_zeroCopyThreshold, FFATAL, OSG::GroupConnection::indexToChannel(), OSG::SocketSelection::isSetRead(), OSG::ExceptionBinaryDataHandler::readBufEnd(), OSG::SocketSelection::select(), OSG::SocketSelection::setRead(), and OSG::Exception::what().

00256 {
00257     Int32 maxnread=0,nread;
00258     ChannelIndex index;
00259     SocketSelection selection,result;
00260 
00261     // if there is data in the read buffer, return current channel
00262     if(_zeroCopyThreshold != 1 &&
00263        _currentReadBuffer != readBufEnd())
00264     {
00265         FFATAL(("Channel change ignores data in current buffer"))
00266         return indexToChannel(_readIndex);
00267     }    
00268 
00269     if(_selection[_readIndex] &&
00270        _sockets[_readIndex].getAvailable())
00271     {
00272         return indexToChannel(_readIndex);;
00273     }
00274 
00275     // wait for first socket to deliver data
00276     for(index = 0 ; 
00277     index < static_cast<ChannelIndex>(_sockets.size()); 
00278     ++index)
00279     {
00280         if(_selection[index])
00281             selection.setRead(_sockets[index]);
00282     }
00283     
00284     try 
00285     {
00286         // select ok ?
00287         if(!selection.select(timeout,result))
00288             return -1;
00289 
00290         // use socket with most data
00291         for(index = 0 ; index < _sockets.size() ; ++index)
00292         {
00293             if(result.isSetRead(_sockets[index]))
00294             {
00295                 nread=_sockets[index].getAvailable();
00296                 if(maxnread < nread)
00297                 {
00298                     maxnread = nread;
00299                     _readIndex=index;
00300                 }
00301             }
00302         }
00303     }
00304     catch(SocketException &e)
00305     {
00306         throw ReadError(e.what());
00307     }
00308 
00309     // return channel id
00310     return indexToChannel(_readIndex);
00311 }

GroupConnection * GroupSockConnection::create ( void   )  [static]

Reimplemented in OSG::GroupSockPipeline, and OSG::GroupMCastConnection.

Definition at line 384 of file OSGGroupSockConnection.cpp.

References GroupSockConnection().

00385 {
00386     return new GroupSockConnection();
00387 }

void GroupSockConnection::read ( MemoryHandle  mem,
UInt32  size 
) [protected, virtual]

Reimplemented from OSG::ExceptionBinaryDataHandler.

Definition at line 399 of file OSGGroupSockConnection.cpp.

References _readIndex, and _sockets.

00400 {
00401     int len;
00402 
00403     // read data
00404     len=_sockets[_readIndex].recv(mem,size);
00405     if(len==0)
00406     {
00407 //        throw ChannelClosed(indexToChannel(_readIndex));
00408         throw ReadError("Channel closed");
00409     }
00410 } 

void GroupSockConnection::readBuffer ( void   )  throw (ReadError) [protected, virtual]

Read next data block

The stream connection uses only BinaryDataHandler buffer. If more then one buffer is present, then this methode must be changed!

Reimplemented from OSG::ExceptionBinaryDataHandler.

Definition at line 419 of file OSGGroupSockConnection.cpp.

References _readIndex, _socketReadBuffer, _sockets, OSG::ExceptionBinaryDataHandler::readBufBegin(), and size.

00420 {
00421     int size;
00422     int len;
00423 
00424     // read buffer header
00425     len=_sockets[_readIndex].recv(&_socketReadBuffer[0],sizeof(SocketBufferHeader));
00426     if(len==0)
00427         throw ReadError("Channel closed");
00428     // read remaining data
00429     size=osgNetToHost<UInt32>(((SocketBufferHeader*)&_socketReadBuffer[0])->size);
00430     len=_sockets[_readIndex].recv(&_socketReadBuffer[sizeof(SocketBufferHeader)],
00431                          size);
00432     if(len==0)
00433         throw ReadError("Channel closed");
00434     readBufBegin()->setDataSize(size);
00435 }    

void GroupSockConnection::write ( MemoryHandle  mem,
UInt32  size 
) [protected, virtual]

Write data to all destinations

Parameters:
mem Pointer to data buffer
size Size of bytes to write

Reimplemented from OSG::ExceptionBinaryDataHandler.

Reimplemented in OSG::GroupSockPipeline, and OSG::GroupMCastConnection.

Definition at line 444 of file OSGGroupSockConnection.cpp.

References _sockets, and OSG::Exception::what().

00445 {
00446     Int32 index;
00447 
00448     try
00449     {
00450         // write to all connected sockets
00451         for(index = 0 ; index < _sockets.size() ; ++index)
00452             _sockets[index].send(mem,size);
00453     }
00454     catch(SocketException &e)
00455     {
00456         throw WriteError(e.what());
00457     }
00458 }

void GroupSockConnection::writeBuffer ( void   )  [protected, virtual]

Write buffer

Write blocksize and data.

Reimplemented from OSG::ExceptionBinaryDataHandler.

Reimplemented in OSG::GroupSockPipeline, and OSG::GroupMCastConnection.

Definition at line 465 of file OSGGroupSockConnection.cpp.

References _sockets, _socketWriteBuffer, size, and OSG::ExceptionBinaryDataHandler::writeBufBegin().

00466 {
00467     Int32 index;
00468     UInt32 size = writeBufBegin()->getDataSize();
00469     // write size to header
00470     ((SocketBufferHeader*)&_socketWriteBuffer[0])->size = 
00471         osgHostToNet<UInt32>(size);
00472     if(size)
00473     {
00474         // write data to all sockets
00475         for(index = 0 ; index < _sockets.size() ; ++index)
00476         {
00477             // write whole block
00478             _sockets[index].send(&_socketWriteBuffer[0],
00479                                  size+sizeof(SocketBufferHeader));
00480         }
00481     }
00482 }

bool GroupSockConnection::wait ( Time  timeout  )  throw (ReadError) [protected, virtual]

Implements OSG::Connection.

Reimplemented in OSG::GroupMCastConnection.

Definition at line 318 of file OSGGroupSockConnection.cpp.

References _sockets, OSG::SocketSelection::clearRead(), FFATAL, OSG::SocketSelection::isSetRead(), OSG::SocketSelection::select(), OSG::SocketSelection::setRead(), and OSG::Exception::what().

00319 {
00320     UInt32 len;
00321     UInt32 index;
00322     UInt32 tag=314156;
00323     UInt32 missing = _sockets.size();
00324     SocketSelection selection,result;
00325 
00326     for(index = 0 ; index < _sockets.size() ; ++index)
00327         selection.setRead(_sockets[index]);
00328 
00329     try
00330     {
00331         while(missing)
00332         {
00333             if(!selection.select(timeout,result))
00334                 return false;
00335             for(index = 0 ; index < _sockets.size() ; ++index)
00336             {
00337                 if(result.isSetRead(_sockets[index]))
00338                 {
00339                     len = _sockets[index].recv(&tag,sizeof(tag));
00340                     tag = osgNetToHost<UInt32>(tag);
00341                     if(len == 0)
00342                         throw ReadError("Channel closed");
00343                     selection.clearRead(_sockets[index]);
00344                     missing--;
00345                     if(tag != 314156)
00346                     {
00347                         FFATAL(("Stream out of sync in SockConnection\n"));
00348                         throw ReadError("Stream out of sync");
00349                     }
00350                 }
00351             }
00352         }
00353     }
00354     catch(SocketException &e)
00355     {
00356         throw ReadError(e.what());
00357     }
00358     return true;
00359 }

void GroupSockConnection::signal ( void   )  throw (WriteError) [protected, virtual]

send signal

Implements OSG::Connection.

Reimpleme