#include <OSGPointMCastConnection.h>
Inheritance diagram for OSG::PointMCastConnection:

private helpers | |
| bool | recvNextDgram (Dgram *dgram) |
| void | combineAck (Dgram *dgram, SocketAddress from) |
| bool | recvQueue (void) |
| void | initialize (void) |
| static void | recvQueueThread (void *arg) |
Public Types | |
public types | |
| typedef Int32 | Channel |
Public Member Functions | |
Constructors | |
| PointMCastConnection (void) | |
| virtual | ~PointMCastConnection (void) |
type info | |
| virtual const ConnectionType * | getType (void) |
connection | |
| virtual Channel | connectGroup (const std::string &address, Time timeout=-1) |
| virtual void | disconnect (void) |
| virtual Channel | acceptGroup (Time timeout=-1) |
synchronisation | |
| virtual bool | wait (Time timeout) throw (ReadError) |
channel handling | |
| virtual Channel | selectChannel (Time timeout=-1) throw (ReadError) |
connection | |
| virtual Channel | connectPoint (const std::string &address, Time timeout=-1) |
| virtual Channel | acceptPoint (Time timeout=-1) |
| virtual std::string | bind (const std::string &interf) |
synchronisation | |
| virtual void | signal (void) throw (WriteError) |
params | |
| virtual void | setParams (const std::string ¶ms) |
channel handling | |
| const std::string & | getInterface (void) |
| void | setInterface (const std::string &interf) |
Put | |
| void | put (void const *src, UInt32 size) |
| void | putAndFree (MemoryHandle src, UInt32 size) |
| void | putValue (const bool &value) |
| void | putValue (const UInt8 &value) |
| void | putValue (const UInt16 &value) |
| void | putValue (const UInt32 &value) |
| void | putValue (const UInt64 &value) |
| void | putValue (const Int8 &value) |
| void | putValue (const Int16 &value) |
| void | putValue (const Int32 &value) |
| void | putValue (const Int64 &value) |
| void | putValue (const Real16 &value) |
| void | putValue (const Fixed32 &value) |
| void | putValue (const Real32 &value) |
| void | putValue (const Real64 &value) |
| void | putValue (const Real128 &value) |
| void | putValue (const std::string &value) |
| void | putValues (const bool *value, UInt32 size) |
| void | putValues (const UInt8 *value, UInt32 size) |
| void | putValues (const UInt16 *value, UInt32 size) |
| void | putValues (const UInt32 *value, UInt32 size) |
| void | putValues (const UInt64 *value, UInt32 size) |
| void | putValues (const Int8 *value, UInt32 size) |
| void | putValues (const Int16 *value, UInt32 size) |
| void | putValues (const Int32 *value, UInt32 size) |
| void | putValues (const Int64 *value, UInt32 size) |
| void | putValues (const Real16 *value, UInt32 size) |
| void | putValues (const Fixed32 *value, UInt32 size) |
| void | putValues (const Real32 *value, UInt32 size) |
| void | putValues (const Real64 *value, UInt32 size) |
| void | putValues (const Real128 *value, UInt32 size) |
| void | putValues (const std::string *value, UInt32 size) |
Get | |
| void | get (void *dst, UInt32 size) throw (ReadError) |
| void | getAndAlloc (MemoryHandle &src, UInt32 size) throw (ReadError) |
| void | getValue (bool &value) throw (ReadError) |
| void | getValue (UInt8 &value) throw (ReadError) |
| void | getValue (UInt16 &value) throw (ReadError) |
| void | getValue (UInt32 &value) throw (ReadError) |
| void | getValue (UInt64 &value) throw (ReadError) |
| void | getValue (Int8 &value) throw (ReadError) |
| void | getValue (Int16 &value) throw (ReadError) |
| void | getValue (Int32 &value) throw (ReadError) |
| void | getValue (Int64 &value) throw (ReadError) |
| void | getValue (Real16 &value) throw (ReadError) |
| void | getValue (Fixed32 &value) throw (ReadError) |
| void | getValue (Real32 &value) throw (ReadError) |
| void | getValue (Real64 &value) throw (ReadError) |
| void | getValue (Real128 &value) throw (ReadError) |
| void | getValue (std::string &value) throw (ReadError) |
| void | getValues (bool *value, UInt32 size) throw (ReadError) |
| void | getValues (UInt8 *value, UInt32 size) throw (ReadError) |
| void | getValues (UInt16 *value, UInt32 size) throw (ReadError) |
| void | getValues (UInt32 *value, UInt32 size) throw (ReadError) |
| void | getValues (UInt64 *value, UInt32 size) throw (ReadError) |
| void | getValues (Int8 *value, UInt32 size) throw (ReadError) |
| void | getValues (Int16 *value, UInt32 size) throw (ReadError) |
| void | getValues (Int32 *value, UInt32 size) throw (ReadError) |
| void | getValues (Int64 *value, UInt32 size) throw (ReadError) |
| void | getValues (Real16 *value, UInt32 size) throw (ReadError) |
| void | getValues (Fixed32 *value, UInt32 size) throw (ReadError) |
| void | getValues (Real32 *value, UInt32 size) throw (ReadError) |
| void | getValues (Real64 *value, UInt32 size) throw (ReadError) |
| void | getValues (Real128 *value, UInt32 size) throw (ReadError) |
| void | getValues (std::string *value, UInt32 size) throw (ReadError) |
Helper | |
| virtual void | forceCopy (void) |
| virtual void | forceDirectIO (void) |
| void | flush (void) |
| write data not yet written | |
| void | setNetworkOrder (bool value) |
| bool | getNetworkOrder (void) |
Static Public Member Functions | |
create | |
| static PointConnection * | create (void) |
| create conneciton | |
Protected Types | |
| typedef std::vector< MemoryBlock > | BuffersT |
| typedef std::list< MemoryHandle > | FreeMemT |
Protected Member Functions | |
IO Implementation | |
| virtual void | read (MemoryHandle mem, UInt32 size) |
| virtual void | readBuffer (void) throw (ReadError) |
IO Implementation | |
| virtual void | write (MemoryHandle mem, UInt32 size) |
| virtual void | writeBuffer (void) |
Read | |
| BuffersT::iterator | readBufBegin (void) |
| BuffersT::iterator | readBufEnd (void) |
| void | readBufAdd (MemoryHandle mem, UInt32 size, UInt32 dataSize=0) |
| void | readBufClear (void) |
Write | |
| BuffersT::iterator | writeBufBegin (void) |
| BuffersT::iterator | writeBufEnd (void) |
| void | writeBufAdd (MemoryHandle mem, UInt32 size, UInt32 dataSize=0) |
| void | writeBufClear (void) |
Helper | |
| bool | isReadBufferEmpty (void) |
Protected Attributes | |
members | |
| DgramSocket | _mcastSocket |
| DgramSocket | _responseSocket |
| BaseThread * | _recvQueueThread |
| bool | _recvQueueThreadRunning |
| bool | _recvQueueThreadStop |
| UInt16 | _seqNumber |
| SocketAddress | _mcastAddress |
| DgramQueue | _queue |
| DgramQueue | _free |
| Lock * | _lock |
| SocketAddress | _sender |
| SocketAddress | _ackDestination |
| Dgram * | _lastDgram |
| UInt32 | _lastDgramPos |
| bool | _initialized |
| std::map< SocketAddress, UInt16 > | _combineAck |
| UInt16 | _maxAck |
members | |
| StreamSocket | _acceptSocket |
| StreamSocket | _socket |
| SocketAddress | _remoteAddress |
| std::vector< UInt8 > | _socketReadBuffer |
| std::vector< UInt8 > | _socketWriteBuffer |
members | |
| bool | _pointToPoint |
protected members | |
| std::string | _interface |
Member | |
| BuffersT | _readBuffers |
| BuffersT | _writeBuffers |
| BuffersT | _zeroCopyBuffers |
| UInt32 | _zeroCopyThreshold |
| FreeMemT | _freeMem |
| BuffersT::iterator | _currentReadBuffer |
| UInt32 | _currentReadBufferPos |
| BuffersT::iterator | _currentWriteBuffer |
| UInt32 | _currentWriteBufferPos |
| bool | _networkOrder |
Private Types | |
| typedef PointSockConnection | Inherited |
Private Member Functions | |
| PointMCastConnection (const PointMCastConnection &source) | |
| PointMCastConnection & | operator= (const PointMCastConnection &source) |
Static Private Attributes | |
static type | |
| static ConnectionType | _type |
Classes | |
| struct | SocketBufferHeader |
Definition at line 63 of file OSGPointMCastConnection.h.
typedef PointSockConnection OSG::PointMCastConnection::Inherited [private] |
Reimplemented from OSG::PointSockConnection.
Definition at line 181 of file OSGPointMCastConnection.h.
typedef Int32 OSG::Connection::Channel [inherited] |
Definition at line 71 of file OSGConnection.h.
typedef std::vector<MemoryBlock> OSG::ExceptionBinaryDataHandler::BuffersT [protected, inherited] |
Definition at line 229 of file OSGExceptionBinaryDataHandler.h.
typedef std::list<MemoryHandle> OSG::ExceptionBinaryDataHandler::FreeMemT [protected, inherited] |
Definition at line 230 of file OSGExceptionBinaryDataHandler.h.
| PointMCastConnection::PointMCastConnection | ( | void | ) |
Definition at line 67 of file OSGPointMCastConnection.cpp.
References OSG::PointSockConnection::_acceptSocket, _free, _lock, OSG::Lock::get(), OSG::StreamSocket::open(), OSG_DGRAM_QUEUE_LEN, OSG::DgramQueue::put(), and OSG::Socket::setReusePort().
Referenced by create().
00067 : 00068 Inherited(), 00069 _lastDgram(NULL), 00070 _initialized(false) 00071 { 00072 char lockName[256]; 00073 sprintf(lockName,"PointMCastConnection%p",this); 00074 00075 // create locks 00076 _lock = Lock::get(lockName); 00077 00078 // fill dgramqueue 00079 for(UInt32 dI = 0 ; dI < OSG_DGRAM_QUEUE_LEN ; ++dI) 00080 _free.put(new Dgram()); 00081 00082 _acceptSocket.open(); 00083 _acceptSocket.setReusePort(true); 00084 00085 /* 00086 _socketWriteBuffer.resize(131071); 00087 // reserve first bytes for buffer size 00088 writeBufAdd(&_socketWriteBuffer[sizeof(SocketBufferHeader)], 00089 _socketWriteBuffer.size()-sizeof(SocketBufferHeader)); 00090 */ 00091 }
| PointMCastConnection::~PointMCastConnection | ( | void | ) | [virtual] |
Destructor
Definition at line 95 of file OSGPointMCastConnection.cpp.
References OSG::PointSockConnection::_acceptSocket, _free, _lock, _mcastSocket, _queue, _recvQueueThread, _recvQueueThreadStop, OSG::Lock::acquire(), OSG::StreamSocket::close(), OSG::DgramSocket::close(), OSG::DgramQueue::empty(), OSG::DgramQueue::get(), OSG::BaseThread::join(), and OSG::Dgram::release().
00096 { 00097 // indicate thread stop 00098 _recvQueueThreadStop = true; 00099 // wait for stop 00100 BaseThread::join(_recvQueueThread); 00101 // close socket 00102 _mcastSocket.close(); 00103 // free queues 00104 _lock->acquire(); 00105 while(!_free.empty()) 00106 delete _free.get(_lock); 00107 while(!_queue.empty()) 00108 delete _queue.get(_lock); 00109 _lock->release(); 00110 // close socket 00111 _acceptSocket.close(); 00112 }
| OSG::PointMCastConnection::PointMCastConnection | ( | const PointMCastConnection & | source | ) | [private] |
| const ConnectionType * PointMCastConnection::getType | ( | void | ) | [virtual] |
Reimplemented from OSG::PointSockConnection.
Definition at line 116 of file OSGPointMCastConnection.cpp.
References _type.
00117 { 00118 return &_type; 00119 }
| Connection::Channel PointMCastConnection::connectGroup | ( | const std::string & | address, | |
| Time | timeout = -1 | |||
| ) | [virtual] |
Reimplemented from OSG::PointSockConnection.
Definition at line 127 of file OSGPointMCastConnection.cpp.
References OSG::PointConnection::connectGroup().
00130 { 00131 Channel channel = Inherited::connectGroup(address,timeout); 00132 return channel; 00133 }
| void PointMCastConnection::disconnect | ( | void | ) | [virtual] |
disconnect the given channel
Reimplemented from OSG::PointSockConnection.
Definition at line 137 of file OSGPointMCastConnection.cpp.
References OSG::PointSockConnection::_socket, and OSG::StreamSocket::close().
| Connection::Channel PointMCastConnection::acceptGroup | ( | Time | timeout = -1 |
) | [virtual] |
accept an icomming grop connection. If timeout is reached, -1 is returned. If timeout is -1 then wait without timeout
Reimplemented from OSG::PointSockConnection.
Definition at line 145 of file OSGPointMCastConnection.cpp.
References OSG::PointConnection::acceptGroup().
00146 { 00147 Channel channel = Inherited::acceptGroup(timeout); 00148 return channel; 00149 }
| bool PointMCastConnection::wait | ( | Time | timeout | ) | throw (ReadError) [virtual] |
Reimplemented from OSG::PointSockConnection.
Definition at line 202 of file OSGPointMCastConnection.cpp.
References OSG::PointConnection::_pointToPoint, FFATAL, OSG::ExceptionBinaryDataHandler::getValue(), selectChannel(), OSG::PointConnection::wait(), and OSG::Exception::what().
00203 { 00204 UInt32 tag; 00205 00206 if(_pointToPoint) 00207 return Inherited::wait(timeout); 00208 try 00209 { 00210 if(selectChannel(timeout) < 0) 00211 return false; 00212 getValue(tag); 00213 if(tag != 314156) 00214 { 00215 FFATAL(("Stream out of sync in PointMCastConnection\n")); 00216 throw ReadError("Stream out of sync"); 00217 } 00218 } 00219 catch(SocketError &e) 00220 { 00221 throw ReadError(e.what()); 00222 } 00223 return true; 00224 }
| Connection::Channel PointMCastConnection::selectChannel | ( | Time | timeout = -1 |
) | throw (ReadError) [virtual] |
Reimplemented from OSG::PointSockConnection.
Definition at line 157 of file OSGPointMCastConnection.cpp.
References _initialized, _lastDgram, _lock, _mcastSocket, OSG::PointConnection::_pointToPoint, _queue, OSG::Lock::acquire(), OSG::DgramQueue::empty(), initialize(), OSG::ExceptionBinaryDataHandler::isReadBufferEmpty(), OSG::Lock::release(), OSG::Connection::selectChannel(), OSG::DgramQueue::wait(), OSG::Socket::waitReadable(), and OSG::Exception::what().
Referenced by wait().
00159 { 00160 if(_pointToPoint) 00161 return Inherited::selectChannel(timeout); 00162 try 00163 { 00164 if(!_initialized) 00165 initialize(); 00166 // todo 00167 if(isReadBufferEmpty() && 00168 !_lastDgram && 00169 _queue.empty()) 00170 { 00171 if(timeout == -1) 00172 { 00173 // wait for a dgram 00174 _lock->acquire(); 00175 _queue.wait(_lock); 00176 _lock->release(); 00177 return 0; 00178 } 00179 if(timeout == 0) 00180 return -1; 00181 while(_queue.empty() && timeout > 0) 00182 { 00183 _mcastSocket.waitReadable(.1); 00184 timeout-=.1; 00185 } 00186 if(_queue.empty()) 00187 return -1; 00188 } 00189 } 00190 catch(SocketException &e) 00191 { 00192 throw ReadError(e.what()); 00193 } 00194 return 0; 00195 }
| PointConnection * PointMCastConnection::create | ( | void | ) | [static] |
Reimplemented from OSG::PointSockConnection.
Definition at line 231 of file OSGPointMCastConnection.cpp.
References PointMCastConnection().
00232 { 00233 return new PointMCastConnection(); 00234 }
| void PointMCastConnection::read | ( | MemoryHandle | mem, | |
| UInt32 | size | |||
| ) | [protected, virtual] |
Reimplemented from OSG::PointSockConnection.
Definition at line 246 of file OSGPointMCastConnection.cpp.
References _free, _initialized, _lastDgram, _lastDgramPos, _lock, OSG::PointConnection::_pointToPoint, _queue, OSG::Lock::acquire(), OSG::DgramQueue::get(), OSG::Dgram::getData(), OSG::Dgram::getSize(), initialize(), OSG::osgMin(), OSG::DgramQueue::put(), OSG::ExceptionBinaryDataHandler::read(), and OSG::Lock::release().
00247 { 00248 if(_pointToPoint) 00249 { 00250 Inherited::read(mem,size); 00251 return; 00252 } 00253 Dgram *dgram = NULL; 00254 char *buffer = (char*)mem; 00255 UInt32 len; 00256 UInt32 dgramPos; 00257 00258 if(!_initialized) 00259 initialize(); 00260 00261 while(size) 00262 { 00263 if(_lastDgram) 00264 { 00265 dgramPos = _lastDgramPos; 00266 dgram = _lastDgram; 00267 } 00268 else 00269 { 00270 // get next dgram 00271 _lock->acquire(); 00272 dgram = _queue.get(_lock); 00273 _lock->release(); 00274 dgramPos = 0; 00275 if(dgram->getSize() == 0) 00276 throw ReadError("Channel closed\n"); 00277 } 00278 // copy to buffer 00279 len = osgMin(size,dgram->getSize()-dgramPos); 00280 memcpy(buffer,dgram->getData()+dgramPos,len); 00281 buffer += len; 00282 size -= len; 00283 dgramPos += len; 00284 if(dgramPos == dgram->getSize()) 00285 { 00286 // put to free queue 00287 _lock->acquire(); 00288 _free.put(dgram); 00289 _lock->release(); 00290 _lastDgram = NULL; 00291 } 00292 else 00293 { 00294 _lastDgram = dgram; 00295 _lastDgramPos = dgramPos; 00296 } 00297 } 00298 }
| void PointMCastConnection::readBuffer | ( | void | ) | throw (ReadError) [protected, virtual] |
Read next data block
The stream connection uses only BinaryDataHandler buffer. If more then one buffer is present, then this methode must be changed!
Reimplemented from OSG::PointSockConnection.
Definition at line 307 of file OSGPointMCastConnection.cpp.
References _free, _initialized, _lastDgram, _lastDgramPos, _lock, OSG::PointConnection::_pointToPoint, _queue, OSG::Lock::acquire(), OSG::DgramQueue::empty(), OSG::DgramQueue::get(), OSG::Dgram::getData(), OSG::Dgram::getSize(), initialize(), OSG::osgMin(), OSG::DgramQueue::put(), OSG::ExceptionBinaryDataHandler::readBufBegin(), OSG::ExceptionBinaryDataHandler::readBuffer(), OSG::Lock::release(), and size.
00308 { 00309 if(_pointToPoint) 00310 { 00311 Inherited::readBuffer(); 00312 return; 00313 } 00314 00315 static int sumSize=0; 00316 Dgram *dgram = NULL; 00317 UInt32 size = readBufBegin()->getSize(); 00318 MemoryHandle buffer = readBufBegin()->getMem(); 00319 UInt32 len; 00320 UInt32 dgramPos; 00321 00322 if(!_initialized) 00323 initialize(); 00324 00325 do 00326 { 00327 if(_lastDgram) 00328 { 00329 dgramPos = _lastDgramPos; 00330 dgram = _lastDgram; 00331 } 00332 else 00333 { 00334 // get next dgram 00335 _lock->acquire(); 00336 dgram = _queue.get(_lock); 00337 _lock->release(); 00338 dgramPos = 0; 00339 if(dgram->getSize() == 0) 00340 throw ReadError("Channel closed"); 00341 } 00342 // copy to buffer 00343 len = osgMin(size,dgram->getSize()-dgramPos); 00344 memcpy(buffer,dgram->getData()+dgramPos,len); 00345 buffer += len; 00346 size -= len; 00347 dgramPos += len; 00348 if(dgramPos == dgram->getSize()) 00349 { 00350 // put to free queue 00351 _lock->acquire(); 00352 _free.put(dgram); 00353 _lock->release(); 00354 _lastDgram = NULL; 00355 } 00356 else 00357 { 00358 _lastDgram = dgram; 00359 _lastDgramPos = dgramPos; 00360 } 00361 } 00362 while(size && !_queue.empty()); 00363 // set data size 00364 readBufBegin()->setDataSize(readBufBegin()->getSize()-size); 00365 sumSize += readBufBegin()->getDataSize(); 00366 }
| bool PointMCastConnection::recvNextDgram | ( | Dgram * | dgram | ) | [private] |
Definition at line 373 of file OSGPointMCastConnection.cpp.
References _ackDestination, _combineAck, _maxAck, _mcastSocket, _responseSocket, _sender, combineAck(), OSG::Dgram::getBuffer(), OSG::Dgram::getBufferCapacity(), OSG::Dgram::getBufferSize(), OSG::Dgram::getId(), OSG::SocketSelection::isSetRead(), OSG::DgramSocket::recvFrom(), OSG::SocketSelection::select(), OSG::Dgram::setBufferSize(), OSG::Dgram::setId(), OSG::SocketSelection::setRead(), OSG::Dgram::setResponseAck(), and OSG::Dgram::setResponseSize().
Referenced by recvQueue().
00374 { 00375 SocketSelection selection; 00376 SocketAddress from; 00377 UInt32 length; 00378 00379 selection.setRead(_mcastSocket); 00380 selection.setRead(_responseSocket); 00381 if(!selection.select(0.5)) 00382 return false; 00383 if(selection.isSetRead(_responseSocket)) 00384 { 00385 length = _responseSocket.recvFrom(dgram->getBuffer(), 00386 dgram->getBufferCapacity(), 00387 from); 00388 dgram->setBufferSize(length); 00389 #if 0 00390 // ???? 00391 // from sender 00392 if(from == _sender && !_combineAck.empty()) 00393 { 00394 exit(0); 00395 00396 00397 if(_maxAck == dgram->getId()) 00398 { 00399 // do we have all acks ? 00400 dgram->setId(_maxAck); 00401 dgram->setResponseSize(); 00402 dgram->setResponseAck(true); 00403 #ifdef TEST_LOST_DGRAM_RATE 00404 if(drand48()>TEST_LOST_DGRAM_RATE) 00405 #endif 00406 _responseSocket.sendTo( 00407 dgram->getBuffer(), 00408 dgram->getBufferSize(), 00409 _ackDestination); 00410 return false; 00411 } 00412 else 00413 { 00414 return true; 00415 } 00416 } 00417 #endif 00418 combineAck(dgram,from); 00419 } 00420 if(selection.isSetRead(_mcastSocket)) 00421 { 00422 length = _mcastSocket.recvFrom(dgram->getBuffer(), 00423 dgram->getBufferCapacity(), 00424 from); 00425 dgram->setBufferSize(length); 00426 // ignore packages from wrong destination 00427 if(from != _sender) 00428 return false; 00429 else 00430 return true; 00431 } 00432 else 00433 { 00434 return false; 00435 } 00436 }
| void PointMCastConnection::combineAck | ( | Dgram * | dgram, | |
| SocketAddress | from | |||
| ) | [private] |
combine several acks to 1 ack stream
Definition at line 440 of file OSGPointMCastConnection.cpp.
References _ackDestination, _combineAck, _maxAck, _responseSocket, _seqNumber, FFATAL, OSG::Dgram::getBuffer(), OSG::Dgram::getBufferSize(), OSG::Dgram::getId(), OSG::Dgram::less(), OSG::DgramSocket::sendTo(), OSG::Dgram::setId(), OSG::Dgram::setResponseAck(), and OSG::Dgram::setResponseSize().
Referenced by recvNextDgram(), and recvQueue().
00441 { 00442 UInt16 maxAck; 00443 00444 if(dgram) 00445 { 00446 // do we expect acks from different source 00447 if(_combineAck.count(from)==0) 00448 { 00449 FFATAL(("no ack from other expected\n")); 00450 return; 00451 } 00452 // ack retransmission 00453 if( Dgram::less(dgram->getId(),_combineAck[from] ) ) 00454 { 00455 // printf("Ack restranmisson\n"); 00456 return; 00457 } 00458 _combineAck[from] = dgram->getId(); 00459 } 00460 00461 maxAck =