OSG::PointMCastConnection Class Reference

#include <OSGPointMCastConnection.h>

Inheritance diagram for OSG::PointMCastConnection:

OSG::PointSockConnection OSG::PointConnection OSG::Connection OSG::ExceptionBinaryDataHandler List of all members.

private helpers



bool recvNextDgram (Dgram *dgram)
void combineAck (Dgram *dgram, SocketAddress from)
bool recvQueue (void)
void initialize (void)
static void recvQueueThread (void *arg)

Public Types

public types


typedef Int32 Channel

Public Member Functions

Constructors


 PointMCastConnection (void)
virtual ~PointMCastConnection (void)
type info


virtual const ConnectionTypegetType (void)
connection


virtual Channel connectGroup (const std::string &address, Time timeout=-1)
virtual void disconnect (void)
virtual Channel acceptGroup (Time timeout=-1)
synchronisation


virtual bool wait (Time timeout) throw (ReadError)
channel handling


virtual Channel selectChannel (Time timeout=-1) throw (ReadError)
connection


virtual Channel connectPoint (const std::string &address, Time timeout=-1)
virtual Channel acceptPoint (Time timeout=-1)
virtual std::string bind (const std::string &interf)
synchronisation


virtual void signal (void) throw (WriteError)
params


virtual void setParams (const std::string &params)
channel handling


const std::string & getInterface (void)
void setInterface (const std::string &interf)
Put


void put (void const *src, UInt32 size)
void putAndFree (MemoryHandle src, UInt32 size)
void putValue (const bool &value)
void putValue (const UInt8 &value)
void putValue (const UInt16 &value)
void putValue (const UInt32 &value)
void putValue (const UInt64 &value)
void putValue (const Int8 &value)
void putValue (const Int16 &value)
void putValue (const Int32 &value)
void putValue (const Int64 &value)
void putValue (const Real16 &value)
void putValue (const Fixed32 &value)
void putValue (const Real32 &value)
void putValue (const Real64 &value)
void putValue (const Real128 &value)
void putValue (const std::string &value)
void putValues (const bool *value, UInt32 size)
void putValues (const UInt8 *value, UInt32 size)
void putValues (const UInt16 *value, UInt32 size)
void putValues (const UInt32 *value, UInt32 size)
void putValues (const UInt64 *value, UInt32 size)
void putValues (const Int8 *value, UInt32 size)
void putValues (const Int16 *value, UInt32 size)
void putValues (const Int32 *value, UInt32 size)
void putValues (const Int64 *value, UInt32 size)
void putValues (const Real16 *value, UInt32 size)
void putValues (const Fixed32 *value, UInt32 size)
void putValues (const Real32 *value, UInt32 size)
void putValues (const Real64 *value, UInt32 size)
void putValues (const Real128 *value, UInt32 size)
void putValues (const std::string *value, UInt32 size)
Get


void get (void *dst, UInt32 size) throw (ReadError)
void getAndAlloc (MemoryHandle &src, UInt32 size) throw (ReadError)
void getValue (bool &value) throw (ReadError)
void getValue (UInt8 &value) throw (ReadError)
void getValue (UInt16 &value) throw (ReadError)
void getValue (UInt32 &value) throw (ReadError)
void getValue (UInt64 &value) throw (ReadError)
void getValue (Int8 &value) throw (ReadError)
void getValue (Int16 &value) throw (ReadError)
void getValue (Int32 &value) throw (ReadError)
void getValue (Int64 &value) throw (ReadError)
void getValue (Real16 &value) throw (ReadError)
void getValue (Fixed32 &value) throw (ReadError)
void getValue (Real32 &value) throw (ReadError)
void getValue (Real64 &value) throw (ReadError)
void getValue (Real128 &value) throw (ReadError)
void getValue (std::string &value) throw (ReadError)
void getValues (bool *value, UInt32 size) throw (ReadError)
void getValues (UInt8 *value, UInt32 size) throw (ReadError)
void getValues (UInt16 *value, UInt32 size) throw (ReadError)
void getValues (UInt32 *value, UInt32 size) throw (ReadError)
void getValues (UInt64 *value, UInt32 size) throw (ReadError)
void getValues (Int8 *value, UInt32 size) throw (ReadError)
void getValues (Int16 *value, UInt32 size) throw (ReadError)
void getValues (Int32 *value, UInt32 size) throw (ReadError)
void getValues (Int64 *value, UInt32 size) throw (ReadError)
void getValues (Real16 *value, UInt32 size) throw (ReadError)
void getValues (Fixed32 *value, UInt32 size) throw (ReadError)
void getValues (Real32 *value, UInt32 size) throw (ReadError)
void getValues (Real64 *value, UInt32 size) throw (ReadError)
void getValues (Real128 *value, UInt32 size) throw (ReadError)
void getValues (std::string *value, UInt32 size) throw (ReadError)
Helper


virtual void forceCopy (void)
virtual void forceDirectIO (void)
void flush (void)
 write data not yet written
void setNetworkOrder (bool value)
bool getNetworkOrder (void)

Static Public Member Functions

create


static PointConnectioncreate (void)
 create conneciton

Protected Types

typedef std::vector< MemoryBlock > BuffersT
typedef std::list< MemoryHandleFreeMemT

Protected Member Functions

IO Implementation


virtual void read (MemoryHandle mem, UInt32 size)
virtual void readBuffer (void) throw (ReadError)
IO Implementation


virtual void write (MemoryHandle mem, UInt32 size)
virtual void writeBuffer (void)
Read


BuffersT::iterator readBufBegin (void)
BuffersT::iterator readBufEnd (void)
void readBufAdd (MemoryHandle mem, UInt32 size, UInt32 dataSize=0)
void readBufClear (void)
Write


BuffersT::iterator writeBufBegin (void)
BuffersT::iterator writeBufEnd (void)
void writeBufAdd (MemoryHandle mem, UInt32 size, UInt32 dataSize=0)
void writeBufClear (void)
Helper


bool isReadBufferEmpty (void)

Protected Attributes

members


DgramSocket _mcastSocket
DgramSocket _responseSocket
BaseThread_recvQueueThread
bool _recvQueueThreadRunning
bool _recvQueueThreadStop
UInt16 _seqNumber
SocketAddress _mcastAddress
DgramQueue _queue
DgramQueue _free
Lock_lock
SocketAddress _sender
SocketAddress _ackDestination
Dgram_lastDgram
UInt32 _lastDgramPos
bool _initialized
std::map< SocketAddress, UInt16 > _combineAck
UInt16 _maxAck
members


StreamSocket _acceptSocket
StreamSocket _socket
SocketAddress _remoteAddress
std::vector< UInt8 > _socketReadBuffer
std::vector< UInt8 > _socketWriteBuffer
members


bool _pointToPoint
protected members


std::string _interface
Member


BuffersT _readBuffers
BuffersT _writeBuffers
BuffersT _zeroCopyBuffers
UInt32 _zeroCopyThreshold
FreeMemT _freeMem
BuffersT::iterator _currentReadBuffer
UInt32 _currentReadBufferPos
BuffersT::iterator _currentWriteBuffer
UInt32 _currentWriteBufferPos
bool _networkOrder

Private Types

typedef PointSockConnection Inherited

Private Member Functions

 PointMCastConnection (const PointMCastConnection &source)
PointMCastConnectionoperator= (const PointMCastConnection &source)

Static Private Attributes

static type


static ConnectionType _type

Classes

struct  SocketBufferHeader

Detailed Description

Definition at line 63 of file OSGPointMCastConnection.h.


Member Typedef Documentation

Reimplemented from OSG::PointSockConnection.

Definition at line 181 of file OSGPointMCastConnection.h.

typedef Int32 OSG::Connection::Channel [inherited]

Definition at line 71 of file OSGConnection.h.

typedef std::vector<MemoryBlock> OSG::ExceptionBinaryDataHandler::BuffersT [protected, inherited]

Definition at line 229 of file OSGExceptionBinaryDataHandler.h.

typedef std::list<MemoryHandle> OSG::ExceptionBinaryDataHandler::FreeMemT [protected, inherited]

Definition at line 230 of file OSGExceptionBinaryDataHandler.h.


Constructor & Destructor Documentation

PointMCastConnection::PointMCastConnection ( void   ) 

Definition at line 67 of file OSGPointMCastConnection.cpp.

References OSG::PointSockConnection::_acceptSocket, _free, _lock, OSG::Lock::get(), OSG::StreamSocket::open(), OSG_DGRAM_QUEUE_LEN, OSG::DgramQueue::put(), and OSG::Socket::setReusePort().

Referenced by create().

00067                                           :
00068     Inherited(),
00069     _lastDgram(NULL),
00070     _initialized(false)
00071 {
00072     char lockName[256];
00073     sprintf(lockName,"PointMCastConnection%p",this);
00074 
00075     // create locks
00076     _lock     = Lock::get(lockName);
00077 
00078     // fill dgramqueue
00079     for(UInt32 dI = 0 ; dI < OSG_DGRAM_QUEUE_LEN ; ++dI)
00080         _free.put(new Dgram());
00081 
00082     _acceptSocket.open();
00083     _acceptSocket.setReusePort(true);
00084 
00085 /*
00086     _socketWriteBuffer.resize(131071);
00087     // reserve first bytes for buffer size
00088     writeBufAdd(&_socketWriteBuffer[sizeof(SocketBufferHeader)],
00089                 _socketWriteBuffer.size()-sizeof(SocketBufferHeader));
00090 */
00091 }

PointMCastConnection::~PointMCastConnection ( void   )  [virtual]

Destructor

Definition at line 95 of file OSGPointMCastConnection.cpp.

References OSG::PointSockConnection::_acceptSocket, _free, _lock, _mcastSocket, _queue, _recvQueueThread, _recvQueueThreadStop, OSG::Lock::acquire(), OSG::StreamSocket::close(), OSG::DgramSocket::close(), OSG::DgramQueue::empty(), OSG::DgramQueue::get(), OSG::BaseThread::join(), and OSG::Dgram::release().

00096 {
00097     // indicate thread stop
00098     _recvQueueThreadStop = true;
00099     // wait for stop
00100     BaseThread::join(_recvQueueThread);    
00101     // close socket
00102     _mcastSocket.close();
00103     // free queues
00104     _lock->acquire();
00105     while(!_free.empty())
00106         delete _free.get(_lock);
00107     while(!_queue.empty())
00108         delete _queue.get(_lock);
00109     _lock->release();
00110     // close socket
00111     _acceptSocket.close();
00112 }

OSG::PointMCastConnection::PointMCastConnection ( const PointMCastConnection source  )  [private]


Member Function Documentation

const ConnectionType * PointMCastConnection::getType ( void   )  [virtual]

Reimplemented from OSG::PointSockConnection.

Definition at line 116 of file OSGPointMCastConnection.cpp.

References _type.

00117 {
00118     return &_type;
00119 }

Connection::Channel PointMCastConnection::connectGroup ( const std::string &  address,
Time  timeout = -1 
) [virtual]

Reimplemented from OSG::PointSockConnection.

Definition at line 127 of file OSGPointMCastConnection.cpp.

References OSG::PointConnection::connectGroup().

00130 {
00131     Channel channel = Inherited::connectGroup(address,timeout);
00132     return channel;
00133 }

void PointMCastConnection::disconnect ( void   )  [virtual]

disconnect the given channel

Reimplemented from OSG::PointSockConnection.

Definition at line 137 of file OSGPointMCastConnection.cpp.

References OSG::PointSockConnection::_socket, and OSG::StreamSocket::close().

00138 {
00139     _socket.close();
00140 }

Connection::Channel PointMCastConnection::acceptGroup ( Time  timeout = -1  )  [virtual]

accept an icomming grop connection. If timeout is reached, -1 is returned. If timeout is -1 then wait without timeout

Reimplemented from OSG::PointSockConnection.

Definition at line 145 of file OSGPointMCastConnection.cpp.

References OSG::PointConnection::acceptGroup().

00146 {
00147     Channel channel = Inherited::acceptGroup(timeout);
00148     return channel;
00149 }

bool PointMCastConnection::wait ( Time  timeout  )  throw (ReadError) [virtual]

Reimplemented from OSG::PointSockConnection.

Definition at line 202 of file OSGPointMCastConnection.cpp.

References OSG::PointConnection::_pointToPoint, FFATAL, OSG::ExceptionBinaryDataHandler::getValue(), selectChannel(), OSG::PointConnection::wait(), and OSG::Exception::what().

00203 {
00204     UInt32 tag;
00205 
00206     if(_pointToPoint)
00207         return Inherited::wait(timeout);
00208     try
00209     {
00210         if(selectChannel(timeout) < 0)
00211             return false;
00212         getValue(tag);
00213         if(tag != 314156)
00214         {
00215             FFATAL(("Stream out of sync in PointMCastConnection\n"));
00216             throw ReadError("Stream out of sync");
00217         }
00218     }
00219     catch(SocketError &e)
00220     {
00221         throw ReadError(e.what());
00222     }
00223     return true;
00224 }

Connection::Channel PointMCastConnection::selectChannel ( Time  timeout = -1  )  throw (ReadError) [virtual]

Reimplemented from OSG::PointSockConnection.

Definition at line 157 of file OSGPointMCastConnection.cpp.

References _initialized, _lastDgram, _lock, _mcastSocket, OSG::PointConnection::_pointToPoint, _queue, OSG::Lock::acquire(), OSG::DgramQueue::empty(), initialize(), OSG::ExceptionBinaryDataHandler::isReadBufferEmpty(), OSG::Lock::release(), OSG::Connection::selectChannel(), OSG::DgramQueue::wait(), OSG::Socket::waitReadable(), and OSG::Exception::what().

Referenced by wait().

00159 {
00160     if(_pointToPoint)
00161         return Inherited::selectChannel(timeout);
00162     try
00163     {
00164         if(!_initialized)
00165             initialize();
00166         // todo
00167         if(isReadBufferEmpty() && 
00168            !_lastDgram && 
00169            _queue.empty())
00170         {
00171             if(timeout == -1)
00172             {
00173                 // wait for a dgram
00174                 _lock->acquire();
00175                 _queue.wait(_lock);
00176                 _lock->release();
00177                 return 0;
00178             }
00179             if(timeout == 0)
00180                 return -1;
00181             while(_queue.empty() && timeout > 0)
00182             {
00183                 _mcastSocket.waitReadable(.1);
00184                 timeout-=.1;
00185             }
00186             if(_queue.empty())
00187                 return -1;
00188         }
00189     }
00190     catch(SocketException &e)
00191     {
00192         throw ReadError(e.what());
00193     }
00194     return 0;
00195 }

PointConnection * PointMCastConnection::create ( void   )  [static]

Reimplemented from OSG::PointSockConnection.

Definition at line 231 of file OSGPointMCastConnection.cpp.

References PointMCastConnection().

00232 {
00233     return new PointMCastConnection();
00234 }

void PointMCastConnection::read ( MemoryHandle  mem,
UInt32  size 
) [protected, virtual]

Reimplemented from OSG::PointSockConnection.

Definition at line 246 of file OSGPointMCastConnection.cpp.

References _free, _initialized, _lastDgram, _lastDgramPos, _lock, OSG::PointConnection::_pointToPoint, _queue, OSG::Lock::acquire(), OSG::DgramQueue::get(), OSG::Dgram::getData(), OSG::Dgram::getSize(), initialize(), OSG::osgMin(), OSG::DgramQueue::put(), OSG::ExceptionBinaryDataHandler::read(), and OSG::Lock::release().

00247 {
00248     if(_pointToPoint)
00249     {
00250         Inherited::read(mem,size);
00251         return;
00252     }
00253     Dgram *dgram  = NULL;
00254     char  *buffer = (char*)mem;
00255     UInt32 len;
00256     UInt32 dgramPos;
00257 
00258     if(!_initialized)
00259         initialize();
00260 
00261     while(size)
00262     {
00263         if(_lastDgram)
00264         {
00265             dgramPos = _lastDgramPos;
00266             dgram = _lastDgram;
00267         }
00268         else
00269         {
00270             // get next dgram
00271             _lock->acquire();
00272             dgram = _queue.get(_lock);
00273             _lock->release();
00274             dgramPos = 0;
00275             if(dgram->getSize() == 0)
00276                 throw ReadError("Channel closed\n");
00277         }
00278         // copy to buffer
00279         len = osgMin(size,dgram->getSize()-dgramPos);
00280         memcpy(buffer,dgram->getData()+dgramPos,len);
00281         buffer   += len;
00282         size     -= len;
00283         dgramPos += len;
00284         if(dgramPos == dgram->getSize())
00285         {
00286             // put to free queue
00287             _lock->acquire();
00288             _free.put(dgram);
00289             _lock->release();
00290             _lastDgram = NULL;
00291         }
00292         else
00293         {
00294             _lastDgram    = dgram;
00295             _lastDgramPos = dgramPos;
00296         }
00297     }
00298 }

void PointMCastConnection::readBuffer ( void   )  throw (ReadError) [protected, virtual]

Read next data block

The stream connection uses only BinaryDataHandler buffer. If more then one buffer is present, then this methode must be changed!

Reimplemented from OSG::PointSockConnection.

Definition at line 307 of file OSGPointMCastConnection.cpp.

References _free, _initialized, _lastDgram, _lastDgramPos, _lock, OSG::PointConnection::_pointToPoint, _queue, OSG::Lock::acquire(), OSG::DgramQueue::empty(), OSG::DgramQueue::get(), OSG::Dgram::getData(), OSG::Dgram::getSize(), initialize(), OSG::osgMin(), OSG::DgramQueue::put(), OSG::ExceptionBinaryDataHandler::readBufBegin(), OSG::ExceptionBinaryDataHandler::readBuffer(), OSG::Lock::release(), and size.

00308 {
00309     if(_pointToPoint)
00310     {
00311         Inherited::readBuffer();
00312         return;
00313     }
00314 
00315     static int sumSize=0;
00316     Dgram       *dgram  = NULL;
00317     UInt32       size   = readBufBegin()->getSize();
00318     MemoryHandle buffer = readBufBegin()->getMem();
00319     UInt32       len;
00320     UInt32       dgramPos;
00321 
00322     if(!_initialized)
00323         initialize();
00324 
00325     do
00326     {
00327         if(_lastDgram)
00328         {
00329             dgramPos = _lastDgramPos;
00330             dgram = _lastDgram;
00331         }
00332         else
00333         {
00334             // get next dgram
00335             _lock->acquire();
00336             dgram = _queue.get(_lock);
00337             _lock->release();
00338             dgramPos = 0;
00339             if(dgram->getSize() == 0)
00340                 throw ReadError("Channel closed");
00341         }
00342         // copy to buffer
00343         len = osgMin(size,dgram->getSize()-dgramPos);
00344         memcpy(buffer,dgram->getData()+dgramPos,len);
00345         buffer   += len;
00346         size     -= len;
00347         dgramPos += len;
00348         if(dgramPos == dgram->getSize())
00349         {
00350             // put to free queue
00351             _lock->acquire();
00352             _free.put(dgram);
00353             _lock->release();
00354             _lastDgram = NULL;
00355         }
00356         else
00357         {
00358             _lastDgram    = dgram;
00359             _lastDgramPos = dgramPos;
00360         }
00361     }
00362     while(size && !_queue.empty());
00363     // set data size
00364     readBufBegin()->setDataSize(readBufBegin()->getSize()-size);
00365     sumSize += readBufBegin()->getDataSize();
00366 }    

bool PointMCastConnection::recvNextDgram ( Dgram dgram  )  [private]

Definition at line 373 of file OSGPointMCastConnection.cpp.

References _ackDestination, _combineAck, _maxAck, _mcastSocket, _responseSocket, _sender, combineAck(), OSG::Dgram::getBuffer(), OSG::Dgram::getBufferCapacity(), OSG::Dgram::getBufferSize(), OSG::Dgram::getId(), OSG::SocketSelection::isSetRead(), OSG::DgramSocket::recvFrom(), OSG::SocketSelection::select(), OSG::Dgram::setBufferSize(), OSG::Dgram::setId(), OSG::SocketSelection::setRead(), OSG::Dgram::setResponseAck(), and OSG::Dgram::setResponseSize().

Referenced by recvQueue().

00374 {
00375     SocketSelection selection;
00376     SocketAddress   from;
00377     UInt32          length;
00378     
00379     selection.setRead(_mcastSocket);
00380     selection.setRead(_responseSocket);
00381     if(!selection.select(0.5))
00382         return false;
00383     if(selection.isSetRead(_responseSocket))
00384     {
00385         length = _responseSocket.recvFrom(dgram->getBuffer(),
00386                                           dgram->getBufferCapacity(),
00387                                           from);
00388         dgram->setBufferSize(length);
00389 #if 0
00390 // ????
00391         // from sender
00392         if(from == _sender && !_combineAck.empty())
00393         {
00394             exit(0);
00395 
00396 
00397             if(_maxAck == dgram->getId())
00398             {
00399                 // do we have all acks ?
00400                 dgram->setId(_maxAck);
00401                 dgram->setResponseSize();
00402                 dgram->setResponseAck(true);
00403 #ifdef TEST_LOST_DGRAM_RATE
00404                 if(drand48()>TEST_LOST_DGRAM_RATE)
00405 #endif
00406                     _responseSocket.sendTo(
00407                         dgram->getBuffer(),
00408                         dgram->getBufferSize(),
00409                         _ackDestination);
00410                 return false;
00411             }
00412             else
00413             {
00414                 return true;
00415             }
00416         }
00417 #endif
00418         combineAck(dgram,from);
00419     } 
00420     if(selection.isSetRead(_mcastSocket))
00421     {
00422         length = _mcastSocket.recvFrom(dgram->getBuffer(),
00423                                        dgram->getBufferCapacity(),
00424                                        from);
00425         dgram->setBufferSize(length);
00426         // ignore packages from wrong destination
00427         if(from != _sender)
00428             return false;
00429         else
00430             return true;
00431     }
00432     else
00433     {
00434         return false;
00435     }
00436 }

void PointMCastConnection::combineAck ( Dgram dgram,
SocketAddress  from 
) [private]

combine several acks to 1 ack stream

Definition at line 440 of file OSGPointMCastConnection.cpp.

References _ackDestination, _combineAck, _maxAck, _responseSocket, _seqNumber, FFATAL, OSG::Dgram::getBuffer(), OSG::Dgram::getBufferSize(), OSG::Dgram::getId(), OSG::Dgram::less(), OSG::DgramSocket::sendTo(), OSG::Dgram::setId(), OSG::Dgram::setResponseAck(), and OSG::Dgram::setResponseSize().

Referenced by recvNextDgram(), and recvQueue().

00441 {
00442     UInt16 maxAck;
00443 
00444     if(dgram)
00445     {
00446         // do we expect acks from different source
00447         if(_combineAck.count(from)==0)
00448         {
00449             FFATAL(("no ack from other expected\n"));
00450             return;
00451         }
00452         // ack retransmission
00453         if( Dgram::less(dgram->getId(),_combineAck[from] ) )
00454         {
00455 //        printf("Ack restranmisson\n");
00456             return;
00457         }
00458         _combineAck[from] = dgram->getId();
00459     }
00460 
00461     maxAck =