00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00049 #ifndef CCXX_RTP_RTP_H_
00050 #define CCXX_RTP_RTP_H_
00051
00052 #include <ccrtp/cqueue.h>
00053 #include <ccrtp/channel.h>
00054
00055 #ifdef CCXX_NAMESPACES
00056 namespace ost {
00057 #endif
00058
00085 template <class RTPDataChannel = DualRTPUDPIPv4Channel,
00086 class RTCPChannel = DualRTPUDPIPv4Channel,
00087 class ServiceQueue = AVPQueue>
00088 class __EXPORT TRTPSessionBase : public ServiceQueue
00089 {
00090 public:
00100 TRTPSessionBase(const InetHostAddress& ia, tpport_t dataPort,
00101 tpport_t controlPort, uint32 membersSize,
00102 RTPApplication& app) :
00103 ServiceQueue(membersSize,app)
00104 { build(ia,dataPort,controlPort); }
00105
00117 TRTPSessionBase(uint32 ssrc,
00118 const InetHostAddress& ia,
00119 tpport_t dataPort, tpport_t controlPort,
00120 uint32 membersSize, RTPApplication& app):
00121 ServiceQueue(ssrc,membersSize,app)
00122 { build(ia,dataPort,controlPort); }
00123
00136 TRTPSessionBase(const InetMcastAddress& ia, tpport_t dataPort,
00137 tpport_t controlPort, uint32 membersSize,
00138 RTPApplication& app, uint32 iface) :
00139 ServiceQueue(membersSize,app)
00140 { build(ia,dataPort,controlPort,iface); }
00141
00156 TRTPSessionBase(uint32 ssrc,
00157 const InetMcastAddress& ia, tpport_t dataPort,
00158 tpport_t controlPort, uint32 membersSize,
00159 RTPApplication& app, uint32 iface) :
00160 ServiceQueue(ssrc,membersSize,app)
00161 { build(ia,dataPort,controlPort,iface); }
00162
00163 virtual size_t dispatchBYE(const std::string &str)
00164 {
00165 return QueueRTCPManager::dispatchBYE(str);
00166 }
00167
00168 inline virtual
00169 ~TRTPSessionBase()
00170 {
00171 dispatchBYE("RTP session being destroyed, GNU ccRTP stack finishing.");
00172 endSocket();
00173 }
00174
00175 inline RTPDataChannel *getDSO(void)
00176 {return dso;};
00177
00178 protected:
00182 inline bool
00183 isPendingData(microtimeout_t timeout)
00184 { return dso->isPendingRecv(timeout); }
00185
00186 InetHostAddress
00187 getDataSender(tpport_t *port = NULL) const
00188 { return dso->getSender(port); }
00189
00190 inline size_t
00191 getNextDataPacketSize() const
00192 { return dso->getNextPacketSize(); }
00193
00203 inline size_t
00204 recvData(unsigned char* buffer, size_t len,
00205 InetHostAddress& na, tpport_t& tp)
00206 { na = dso->getSender(tp); return dso->recv(buffer, len); }
00207
00208 inline void
00209 setDataPeer(const InetAddress &host, tpport_t port)
00210 { dso->setPeer(host,port); }
00211
00216 inline size_t
00217 sendData(const unsigned char* const buffer, size_t len)
00218 { return dso->send(buffer, len); }
00219
00220 inline SOCKET getDataRecvSocket() const
00221 { return dso->getRecvSocket(); }
00222
00227 inline bool
00228 isPendingControl(microtimeout_t timeout)
00229 { return cso->isPendingRecv(timeout); }
00230
00231 InetHostAddress
00232 getControlSender(tpport_t *port = NULL) const
00233 { return cso->getSender(port); }
00234
00244 inline size_t
00245 recvControl(unsigned char *buffer, size_t len,
00246 InetHostAddress& na, tpport_t& tp)
00247 { na = cso->getSender(tp); return cso->recv(buffer,len); }
00248
00249 inline void
00250 setControlPeer(const InetAddress &host, tpport_t port)
00251 { cso->setPeer(host,port); }
00252
00258 inline size_t
00259 sendControl(const unsigned char* const buffer, size_t len)
00260 { return cso->send(buffer,len); }
00261
00262 inline SOCKET getControlRecvSocket() const
00263 { return cso->getRecvSocket(); }
00264
00265 inline void
00266 endSocket()
00267 {
00268 dso->endSocket();
00269 cso->endSocket();
00270 if (dso) delete dso;
00271 dso = NULL;
00272 if (cso) delete cso;
00273 cso = NULL;
00274 }
00275
00276 private:
00277 void
00278 build(const InetHostAddress& ia, tpport_t dataPort,
00279 tpport_t controlPort)
00280 {
00281 if ( 0 == controlPort ) {
00282 dataBasePort = even_port(dataPort);
00283 controlBasePort = dataBasePort + 1;
00284 } else {
00285 dataBasePort = dataPort;
00286 controlBasePort = controlPort;
00287 }
00288 dso = new RTPDataChannel(ia,dataBasePort);
00289 cso = new RTCPChannel(ia,controlBasePort);
00290 }
00291
00292 void
00293 build(const InetMcastAddress& ia, tpport_t dataPort,
00294 tpport_t controlPort, uint32 iface)
00295 {
00296 if ( 0 == controlPort ) {
00297 dataBasePort = even_port(dataPort);
00298 controlBasePort = dataBasePort + 1;
00299 } else {
00300 dataBasePort = dataPort;
00301 controlBasePort = controlPort;
00302 }
00303 dso = new RTPDataChannel(InetHostAddress("0.0.0.0"),dataBasePort);
00304 cso = new RTCPChannel(InetHostAddress("0.0.0.0"),controlBasePort);
00305 joinGroup(ia,iface);
00306 }
00307
00314 inline Socket::Error
00315 joinGroup(const InetMcastAddress& ia, uint32 iface)
00316 {
00317 Socket::Error error = dso->setMulticast(true);
00318 if ( error ) return error;
00319 error = dso->join(ia,iface);
00320 if ( error ) return error;
00321 error = cso->setMulticast(true);
00322 if ( error ) {
00323 dso->drop(ia);
00324 return error;
00325 }
00326 error = cso->join(ia,iface);
00327 if ( error ) {
00328 dso->drop(ia);
00329 return error;
00330 }
00331 return Socket::errSuccess;
00332 }
00333
00340 inline Socket::Error
00341 leaveGroup(const InetMcastAddress& ia)
00342 {
00343 Socket::Error error = dso->setMulticast(false);
00344 if ( error ) return error;
00345 error = dso->leaveGroup(ia);
00346 if ( error ) return error;
00347 error = cso->setMulticast(false);
00348 if ( error ) return error;
00349 return cso->leaveGroup(ia);
00350 }
00351
00358 inline Socket::Error
00359 setMcastTTL(uint8 ttl)
00360 {
00361 Socket::Error error = dso->setMulticast(true);
00362 if ( error ) return error;
00363 error = dso->setTimeToLive(ttl);
00364 if ( error ) return error;
00365 error = cso->setMulticast(true);
00366 if ( error ) return error;
00367 return cso->setTimeToLive(ttl);
00368 }
00369
00377 inline tpport_t
00378 odd_port(tpport_t port)
00379 { return (port & 0x01)? (port) : (port - 1); }
00380
00388 inline tpport_t
00389 even_port(tpport_t port)
00390 { return (port & 0x01)? (port - 1) : (port); }
00391
00392 tpport_t dataBasePort;
00393 tpport_t controlBasePort;
00394
00395 protected:
00396 RTPDataChannel* dso;
00397 RTCPChannel* cso;
00398 friend class RTPSessionBaseHandler;
00399 };
00400
00411 template
00412 <class RTPDataChannel = DualRTPUDPIPv4Channel,
00413 class RTCPChannel = DualRTPUDPIPv4Channel,
00414 class ServiceQueue = AVPQueue>
00415 class __EXPORT SingleThreadRTPSession :
00416 protected Thread,
00417 public TRTPSessionBase<RTPDataChannel,RTCPChannel,ServiceQueue>
00418 {
00419 public:
00420 SingleThreadRTPSession(const InetHostAddress& ia,
00421 tpport_t dataPort = DefaultRTPDataPort,
00422 tpport_t controlPort = 0,
00423 int pri = 0,
00424 uint32 memberssize =
00425 MembershipBookkeeping::defaultMembersHashSize,
00426 RTPApplication& app = defaultApplication()
00427 #if defined(_MSC_VER) && _MSC_VER >= 1300
00428 );
00429 #else
00430 ):
00431 Thread(pri),
00432 TRTPSessionBase<RTPDataChannel,RTCPChannel,ServiceQueue>
00433 (ia,dataPort,controlPort,memberssize,app)
00434 { }
00435 #endif
00436
00437 SingleThreadRTPSession(const InetMcastAddress& ia,
00438 tpport_t dataPort = DefaultRTPDataPort,
00439 tpport_t controlPort = 0,
00440 int pri = 0,
00441 uint32 memberssize =
00442 MembershipBookkeeping::defaultMembersHashSize,
00443 RTPApplication& app = defaultApplication(),
00444 uint32 iface = 0)
00445 #if defined(_MSC_VER) && _MSC_VER >= 1300
00446 ;
00447 #else
00448 :
00449 Thread(pri),
00450 TRTPSessionBase<RTPDataChannel,RTCPChannel,ServiceQueue>
00451 (ia,dataPort,controlPort,memberssize,app,iface)
00452 { }
00453 #endif
00454
00455 ~SingleThreadRTPSession()
00456 { terminate(); }
00457
00458 #if defined(_MSC_VER) && _MSC_VER >= 1300
00459 virtual void startRunning();
00460 #else
00461
00464 void
00465 startRunning()
00466 { enableStack(); Thread::start(); }
00467 #endif
00468
00469
00470 protected:
00471 inline void enableStack(void)
00472 {TRTPSessionBase<RTPDataChannel,RTCPChannel,ServiceQueue>::enableStack();}
00473
00474 inline microtimeout_t getSchedulingTimeout(void)
00475 {return TRTPSessionBase<RTPDataChannel,RTCPChannel,ServiceQueue>::getSchedulingTimeout();}
00476
00477 inline void controlReceptionService(void)
00478 {TRTPSessionBase<RTPDataChannel,RTCPChannel,ServiceQueue>::controlReceptionService();}
00479
00480 inline void controlTransmissionService(void)
00481 {TRTPSessionBase<RTPDataChannel,RTCPChannel,ServiceQueue>::controlTransmissionService();}
00482
00483 inline timeval getRTCPCheckInterval(void)
00484 {return TRTPSessionBase<RTPDataChannel,RTCPChannel,ServiceQueue>::getRTCPCheckInterval();};
00485
00486 inline size_t dispatchDataPacket(void)
00487 {return TRTPSessionBase<RTPDataChannel,RTCPChannel,ServiceQueue>::dispatchDataPacket();};
00488
00489 #if defined(_MSC_VER) && _MSC_VER >= 1300
00490 virtual void run(void);
00491
00492 virtual void timerTick(void);
00493
00494 virtual bool isPendingData(microtimeout_t timeout);
00495 #else
00496
00497 virtual void timerTick(void)
00498 {return;}
00499
00500 virtual bool isPendingData(microtimeout_t timeout)
00501 {return TRTPSessionBase<RTPDataChannel,RTCPChannel,ServiceQueue>::isPendingData(timeout);}
00502
00507 virtual void run(void)
00508 {
00509 microtimeout_t timeout = 0;
00510 while ( ServiceQueue::isActive() ) {
00511 if ( timeout < 1000 ){
00512 timeout = getSchedulingTimeout();
00513 }
00514 setCancel(cancelDeferred);
00515 controlReceptionService();
00516 controlTransmissionService();
00517 setCancel(cancelImmediate);
00518 microtimeout_t maxWait =
00519 timeval2microtimeout(getRTCPCheckInterval());
00520
00521
00522
00523 timeout = (timeout > maxWait)? maxWait : timeout;
00524 if ( timeout < 1000 ) {
00525 setCancel(cancelDeferred);
00526 dispatchDataPacket();
00527 setCancel(cancelImmediate);
00528 timerTick();
00529 } else {
00530 if ( isPendingData(timeout/1000) ) {
00531 setCancel(cancelDeferred);
00532 takeInDataPacket();
00533 setCancel(cancelImmediate);
00534 }
00535 timeout = 0;
00536 }
00537 }
00538 dispatchBYE("GNU ccRTP stack finishing.");
00539 sleep((timeout_t)~0);
00540 }
00541
00542 #endif
00543
00544 inline size_t takeInDataPacket(void)
00545 {return TRTPSessionBase<RTPDataChannel,RTCPChannel,ServiceQueue>::takeInDataPacket();}
00546
00547 inline size_t dispatchBYE(const std::string &str)
00548 {return TRTPSessionBase<RTPDataChannel,RTCPChannel,ServiceQueue>::dispatchBYE(str);}
00549 };
00550
00559 typedef SingleThreadRTPSession<> RTPSession;
00560
00566 typedef RTPSession RTPSocket;
00567
00576 typedef SingleThreadRTPSession<SymmetricRTPChannel,
00577 SymmetricRTPChannel> SymmetricRTPSession;
00578
00580
00581 #ifdef CCXX_NAMESPACES
00582 }
00583 #endif
00584
00585 #endif //CCXX_RTP_RTP_H_
00586