00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00044 #ifndef CCXX_RTP_IQUEUE_H_
00045 #define CCXX_RTP_IQUEUE_H_
00046
00047 #include <ccrtp/queuebase.h>
00048 #include <ccrtp/CryptoContext.h>
00049
00050 #include <list>
00051
00052 #ifdef CCXX_NAMESPACES
00053 namespace ost {
00054 #endif
00055
00070 class __EXPORT Members
00071 {
00072 public:
00073 inline void
00074 setMembersCount(uint32 n)
00075 { members = n; }
00076
00077 inline void
00078 increaseMembersCount()
00079 { members++; }
00080
00081 inline void
00082 decreaseMembersCount()
00083 { members--; }
00084
00085 inline uint32
00086 getMembersCount() const
00087 { return members; }
00088
00089 inline void
00090 setSendersCount(uint32 n)
00091 { activeSenders = n; }
00092
00093 inline void
00094 increaseSendersCount()
00095 { activeSenders++; }
00096
00097 inline void
00098 decreaseSendersCount()
00099 { activeSenders--; }
00100
00101 inline uint32
00102 getSendersCount() const
00103 { return activeSenders; }
00104
00105 protected:
00106 Members() :
00107 members(0),
00108 activeSenders(0)
00109 { }
00110
00111 inline virtual ~Members()
00112 { }
00113
00114 private:
00116 uint32 members;
00118 uint32 activeSenders;
00119 };
00120
00127 class __EXPORT SyncSourceHandler
00128 {
00129 public:
00136 inline void*
00137 getLink(const SyncSource& source) const
00138 { return source.getLink(); }
00139
00140 inline void
00141 setLink(SyncSource& source, void* link)
00142 { source.setLink(link); }
00143
00144 inline void
00145 setParticipant(SyncSource& source, Participant& p)
00146 { source.setParticipant(p); }
00147
00148 inline void
00149 setState(SyncSource& source, SyncSource::State ns)
00150 { source.setState(ns); }
00151
00152 inline void
00153 setSender(SyncSource& source, bool active)
00154 { source.setSender(active); }
00155
00156 inline void
00157 setDataTransportPort(SyncSource& source, tpport_t p)
00158 { source.setDataTransportPort(p); }
00159
00160 inline void
00161 setControlTransportPort(SyncSource& source, tpport_t p)
00162 { source.setControlTransportPort(p); }
00163
00164 inline void
00165 setNetworkAddress(SyncSource& source, InetAddress addr)
00166 { source.setNetworkAddress(addr); }
00167
00168 protected:
00169 SyncSourceHandler()
00170 { }
00171
00172 inline virtual ~SyncSourceHandler()
00173 { }
00174 };
00175
00182 class __EXPORT ParticipantHandler
00183 {
00184 public:
00185 inline void
00186 setSDESItem(Participant* part, SDESItemType item,
00187 const std::string& val)
00188 { part->setSDESItem(item,val); }
00189
00190 inline void
00191 setPRIVPrefix(Participant* part, const std::string val)
00192 { part->setPRIVPrefix(val); }
00193
00194 protected:
00195 ParticipantHandler()
00196 { }
00197
00198 inline virtual ~ParticipantHandler()
00199 { }
00200 };
00201
00208 class __EXPORT ApplicationHandler
00209 {
00210 public:
00211 inline void
00212 addParticipant(RTPApplication& app, Participant& part)
00213 { app.addParticipant(part); }
00214
00215 inline void
00216 removeParticipant(RTPApplication& app,
00217 RTPApplication::ParticipantLink* pl)
00218 { app.removeParticipant(pl); }
00219
00220 protected:
00221 ApplicationHandler()
00222 { }
00223
00224 inline virtual ~ApplicationHandler()
00225 { }
00226 };
00227
00235 class __EXPORT ConflictHandler
00236 {
00237 public:
00238 struct ConflictingTransportAddress
00239 {
00240 ConflictingTransportAddress(InetAddress na,
00241 tpport_t dtp, tpport_t ctp);
00242
00243 void setNext(ConflictingTransportAddress* nc)
00244 { next = nc; }
00245
00246 inline const InetAddress& getNetworkAddress( ) const
00247 { return networkAddress; }
00248
00249 inline tpport_t getDataTransportPort() const
00250 { return dataTransportPort; }
00251
00252 inline tpport_t getControlTransportPort() const
00253 { return controlTransportPort; }
00254
00255 InetAddress networkAddress;
00256 tpport_t dataTransportPort;
00257 tpport_t controlTransportPort;
00258 ConflictingTransportAddress* next;
00259
00260 timeval lastPacketTime;
00261 };
00262
00267 ConflictingTransportAddress* searchDataConflict(InetAddress na,
00268 tpport_t dtp);
00273 ConflictingTransportAddress* searchControlConflict(InetAddress na,
00274 tpport_t ctp);
00275
00276 void updateConflict(ConflictingTransportAddress& ca)
00277 { gettimeofday(&(ca.lastPacketTime),NULL); }
00278
00279 void addConflict(const InetAddress& na, tpport_t dtp, tpport_t ctp);
00280
00281 protected:
00282 ConflictHandler()
00283 { firstConflict = lastConflict = NULL; }
00284
00285 inline virtual ~ConflictHandler()
00286 { }
00287
00288 ConflictingTransportAddress* firstConflict, * lastConflict;
00289 };
00290
00301 class __EXPORT MembershipBookkeeping :
00302 public SyncSourceHandler,
00303 public ParticipantHandler,
00304 public ApplicationHandler,
00305 public ConflictHandler,
00306 private Members
00307 {
00308 public:
00309 inline size_t getDefaultMembersHashSize()
00310 { return defaultMembersHashSize; }
00311
00312 protected:
00313
00327 MembershipBookkeeping(uint32 initialSize = defaultMembersHashSize);
00328
00333 inline virtual
00334 ~MembershipBookkeeping()
00335 { endMembers(); }
00336
00337 struct SyncSourceLink;
00338
00339 inline SyncSourceLink* getLink(const SyncSource& source) const
00340 { return static_cast<SyncSourceLink*>(SyncSourceHandler::getLink(source)); }
00345 inline bool isMine(const SyncSource& source) const
00346 { return getLink(source)->getMembership() == this; }
00347
00354 struct IncomingRTPPktLink
00355 {
00356 IncomingRTPPktLink(IncomingRTPPkt* pkt, SyncSourceLink* sLink,
00357 struct timeval& recv_ts,
00358 uint32 shifted_ts,
00359 IncomingRTPPktLink* sp,
00360 IncomingRTPPktLink* sn,
00361 IncomingRTPPktLink* p,
00362 IncomingRTPPktLink* n) :
00363 packet(pkt),
00364 sourceLink(sLink),
00365 prev(p), next(n),
00366 srcPrev(sp), srcNext(sn),
00367 receptionTime(recv_ts),
00368 shiftedTimestamp(shifted_ts)
00369 { }
00370
00371 ~IncomingRTPPktLink()
00372 { }
00373
00374 inline SyncSourceLink* getSourceLink() const
00375 { return sourceLink; }
00376
00377 inline void setSourceLink(SyncSourceLink* src)
00378 { sourceLink = src; }
00379
00380 inline IncomingRTPPktLink* getNext() const
00381 { return next; }
00382
00383 inline void setNext(IncomingRTPPktLink* nl)
00384 { next = nl; }
00385
00386 inline IncomingRTPPktLink* getPrev() const
00387 { return prev; }
00388
00389 inline void setPrev(IncomingRTPPktLink* pl)
00390 { prev = pl; }
00391
00392 inline IncomingRTPPktLink* getSrcNext() const
00393 { return srcNext; }
00394
00395 inline void setSrcNext(IncomingRTPPktLink* sn)
00396 { srcNext = sn; }
00397
00398 inline IncomingRTPPktLink* getSrcPrev() const
00399 { return srcPrev; }
00400
00401 inline void setSrcPrev(IncomingRTPPktLink* sp)
00402 { srcPrev = sp; }
00403
00404 inline IncomingRTPPkt* getPacket() const
00405 { return packet; }
00406
00407 inline void setPacket(IncomingRTPPkt* pkt)
00408 { packet = pkt; }
00409
00417 inline void setRecvTime(const timeval &t)
00418 { receptionTime = t; }
00419
00423 inline timeval getRecvTime() const
00424 { return receptionTime; }
00425
00434 inline uint32 getTimestamp() const
00435 { return shiftedTimestamp; };
00436
00437 inline void setTimestamp(uint32 ts)
00438 { shiftedTimestamp = ts;}
00439
00440
00441 IncomingRTPPkt* packet;
00442
00443 SyncSourceLink* sourceLink;
00444
00445 IncomingRTPPktLink* prev, * next;
00446
00447 IncomingRTPPktLink* srcPrev, * srcNext;
00448
00449 struct timeval receptionTime;
00450
00451
00452
00453 uint32 shiftedTimestamp;
00454 };
00455
00472 struct SyncSourceLink
00473 {
00474
00475 static const uint32 SEQNUMMOD;
00476
00477 SyncSourceLink(MembershipBookkeeping* m,
00478 SyncSource* s,
00479 IncomingRTPPktLink* fp = NULL,
00480 IncomingRTPPktLink* lp = NULL,
00481 SyncSourceLink* ps = NULL,
00482 SyncSourceLink* ns = NULL,
00483 SyncSourceLink* ncollis = NULL) :
00484 membership(m), source(s), first(fp), last(lp),
00485 prev(ps), next(ns), nextCollis(ncollis),
00486 prevConflict(NULL)
00487 { m->setLink(*s,this);
00488 initStats();
00489 }
00490
00494 ~SyncSourceLink();
00495
00496 inline MembershipBookkeeping* getMembership()
00497 { return membership; }
00498
00503 inline SyncSource* getSource() { return source; }
00504
00509 inline IncomingRTPPktLink* getFirst()
00510 { return first; }
00511
00512 inline void setFirst(IncomingRTPPktLink* fp)
00513 { first = fp; }
00514
00519 inline IncomingRTPPktLink* getLast()
00520 { return last; }
00521
00522 inline void setLast(IncomingRTPPktLink* lp)
00523 { last = lp; }
00524
00528 inline SyncSourceLink* getPrev()
00529 { return prev; }
00530
00531 inline void setPrev(SyncSourceLink* ps)
00532 { prev = ps; }
00533
00537 inline SyncSourceLink* getNext()
00538 { return next; }
00539
00540 inline void setNext(SyncSourceLink *ns)
00541 { next = ns; }
00542
00549 inline SyncSourceLink* getNextCollis()
00550 { return nextCollis; }
00551
00552 inline void setNextCollis(SyncSourceLink* ns)
00553 { nextCollis = ns; }
00554
00555 inline ConflictingTransportAddress* getPrevConflict() const
00556 { return prevConflict; }
00557
00561 void setPrevConflict(InetAddress& addr, tpport_t dataPort,
00562 tpport_t controlPort);
00563
00564 unsigned char* getSenderInfo()
00565 { return senderInfo; }
00566
00567 void setSenderInfo(unsigned char* si);
00568
00569 unsigned char* getReceiverInfo()
00570 { return receiverInfo; }
00571
00572 void setReceiverInfo(unsigned char* ri);
00573
00574 inline timeval getLastPacketTime() const
00575 { return lastPacketTime; }
00576
00577 inline timeval getLastRTCPPacketTime() const
00578 { return lastRTCPPacketTime; }
00579
00580 inline timeval getLastRTCPSRTime() const
00581 { return lastRTCPSRTime; }
00582
00587 inline uint32 getObservedPacketCount() const
00588 { return obsPacketCount; }
00589
00590 inline void incObservedPacketCount()
00591 { obsPacketCount++; }
00592
00597 inline uint32 getObservedOctetCount() const
00598 { return obsOctetCount; }
00599
00600 inline void incObservedOctetCount(uint32 n)
00601 { obsOctetCount += n; }
00602
00606 uint16
00607 getMaxSeqNum() const
00608 { return maxSeqNum; }
00609
00614 void
00615 setMaxSeqNum(uint16 max)
00616 { maxSeqNum = max; }
00617
00618 inline uint32
00619 getExtendedMaxSeqNum() const
00620 { return extendedMaxSeqNum; }
00621
00622 inline void
00623 setExtendedMaxSeqNum(uint32 seq)
00624 { extendedMaxSeqNum = seq; }
00625
00626 inline uint32 getCumulativePacketLost() const
00627 { return cumulativePacketLost; }
00628
00629 inline void setCumulativePacketLost(uint32 pl)
00630 { cumulativePacketLost = pl; }
00631
00632 inline uint8 getFractionLost() const
00633 { return fractionLost; }
00634
00635 inline void setFractionLost(uint8 fl)
00636 { fractionLost = fl; }
00637
00638 inline uint32 getLastPacketTransitTime()
00639 { return lastPacketTransitTime; }
00640
00641 inline void setLastPacketTransitTime(uint32 time)
00642 { lastPacketTransitTime = time; }
00643
00644 inline float getJitter() const
00645 { return jitter; }
00646
00647 inline void setJitter(float j)
00648 { jitter = j; }
00649
00650 inline uint32 getInitialDataTimestamp() const
00651 { return initialDataTimestamp; }
00652
00653 inline void setInitialDataTimestamp(uint32 ts)
00654 { initialDataTimestamp = ts; }
00655
00656 inline timeval getInitialDataTime() const
00657 { return initialDataTime; }
00658
00659 inline void setInitialDataTime(timeval it)
00660 { initialDataTime = it; }
00661
00669 bool getGoodbye()
00670 {
00671 if(!flag)
00672 return false;
00673 flag = false;
00674 return true;
00675 }
00676
00683 bool getHello() {
00684 if(flag)
00685 return false;
00686 flag = true;
00687 return true;
00688 }
00689
00690 inline uint32 getBadSeqNum() const
00691 { return badSeqNum; }
00692
00693 inline void setBadSeqNum(uint32 seq)
00694 { badSeqNum = seq; }
00695
00696 uint8 getProbation() const
00697 { return probation; }
00698
00699 inline void setProbation(uint8 p)
00700 { probation = p; }
00701
00702 inline void decProbation()
00703 { --probation; }
00704
00705 bool isValid() const
00706 { return 0 == probation; }
00707
00708 inline uint16 getBaseSeqNum() const
00709 { return baseSeqNum; }
00710
00711 inline uint32 getSeqNumAccum() const
00712 { return seqNumAccum; }
00713
00714 inline void incSeqNumAccum()
00715 { seqNumAccum += SEQNUMMOD; }
00716
00720 inline void initSequence(uint16 seqnum)
00721 { maxSeqNum = seqNumAccum = seqnum; }
00722
00733 void recordInsertion(const IncomingRTPPktLink& pl);
00734
00735 void initStats();
00736
00741 void computeStats();
00742
00743 MembershipBookkeeping* membership;
00744
00745 SyncSource* source;
00746
00747 IncomingRTPPktLink* first, * last;
00748
00749
00750 SyncSourceLink* prev, * next;
00751
00752 SyncSourceLink* nextCollis;
00753 ConflictingTransportAddress* prevConflict;
00754 unsigned char* senderInfo;
00755 unsigned char* receiverInfo;
00756
00757
00758 timeval lastPacketTime;
00759
00760 timeval lastRTCPPacketTime;
00761
00762
00763 timeval lastRTCPSRTime;
00764
00765
00766
00767 uint32 obsPacketCount;
00768
00769 uint32 obsOctetCount;
00770
00771 uint16 maxSeqNum;
00772 uint32 extendedMaxSeqNum;
00773 uint32 cumulativePacketLost;
00774 uint8 fractionLost;
00775
00776 uint32 lastPacketTransitTime;
00777
00778 float jitter;
00779 uint32 initialDataTimestamp;
00780 timeval initialDataTime;
00781
00782
00783
00784 bool flag;
00785
00786
00787 uint32 badSeqNum;
00788 uint8 probation;
00789 uint16 baseSeqNum;
00790 uint32 expectedPrior;
00791 uint32 receivedPrior;
00792 uint32 seqNumAccum;
00793 };
00794
00799 bool
00800 isRegistered(uint32 ssrc);
00801
00810 SyncSourceLink*
00811 getSourceBySSRC(uint32 ssrc, bool& created);
00812
00823 bool
00824 BYESource(uint32 ssrc);
00825
00833 bool
00834 removeSource(uint32 ssrc);
00835
00836 inline SyncSourceLink* getFirst()
00837 { return first; }
00838
00839 inline SyncSourceLink* getLast()
00840 { return last; }
00841
00842 inline uint32
00843 getMembersCount()
00844 { return Members::getMembersCount(); }
00845
00846 inline void
00847 setMembersCount(uint32 n)
00848 { Members::setMembersCount(n); }
00849
00850 inline uint32
00851 getSendersCount()
00852 { return Members::getSendersCount(); }
00853
00854 static const size_t defaultMembersHashSize;
00855 static const uint32 SEQNUMMOD;
00856
00857 private:
00858 MembershipBookkeeping(const MembershipBookkeeping &o);
00859
00860 MembershipBookkeeping&
00861 operator=(const MembershipBookkeeping &o);
00862
00867 void
00868 endMembers();
00869
00870
00871 uint32 sourceBucketsNum;
00872 SyncSourceLink** sourceLinks;
00873
00874 SyncSourceLink* first, * last;
00875 };
00876
00883 class __EXPORT IncomingDataQueue: public IncomingDataQueueBase,
00884 protected MembershipBookkeeping
00885 {
00886 public:
00892 class SyncSourcesIterator
00893 {
00894 public:
00895 typedef std::forward_iterator_tag iterator_category;
00896 typedef SyncSource value_type;
00897 typedef ptrdiff_t difference_type;
00898 typedef const SyncSource* pointer;
00899 typedef const SyncSource& reference;
00900
00901 SyncSourcesIterator(SyncSourceLink* l = NULL) :
00902 link(l)
00903 { }
00904
00905 SyncSourcesIterator(const SyncSourcesIterator& si) :
00906 link(si.link)
00907 { }
00908
00909 reference operator*() const
00910 { return *(link->getSource()); }
00911
00912 pointer operator->() const
00913 { return link->getSource(); }
00914
00915 SyncSourcesIterator& operator++() {
00916 link = link->getNext();
00917 return *this;
00918 }
00919
00920 SyncSourcesIterator operator++(int) {
00921 SyncSourcesIterator result(*this);
00922 ++(*this);
00923 return result;
00924 }
00925
00926 friend bool operator==(const SyncSourcesIterator& l,
00927 const SyncSourcesIterator& r)
00928 { return l.link == r.link; }
00929
00930 friend bool operator!=(const SyncSourcesIterator& l,
00931 const SyncSourcesIterator& r)
00932 { return l.link != r.link; }
00933
00934 private:
00935 SyncSourceLink *link;
00936 };
00937
00938 SyncSourcesIterator begin()
00939 { return SyncSourcesIterator(MembershipBookkeeping::getFirst()); }
00940
00941 SyncSourcesIterator end()
00942 { return SyncSourcesIterator(NULL); }
00943
00952 const AppDataUnit*
00953 getData(uint32 stamp, const SyncSource* src = NULL);
00954
00955
00962 bool
00963 isWaiting(const SyncSource* src = NULL) const;
00964
00971 uint32
00972 getFirstTimestamp(const SyncSource* src = NULL) const;
00973
00996 void
00997 setMinValidPacketSequence(uint8 packets)
00998 { minValidPacketSequence = packets; }
00999
01000 uint8
01001 getDefaultMinValidPacketSequence() const
01002 { return defaultMinValidPacketSequence; }
01003
01008 uint8
01009 getMinValidPacketSequence() const
01010 { return minValidPacketSequence; }
01011
01012 void
01013 setMaxPacketMisorder(uint16 packets)
01014 { maxPacketMisorder = packets; }
01015
01016 uint16
01017 getDefaultMaxPacketMisorder() const
01018 { return defaultMaxPacketMisorder; }
01019
01020 uint16
01021 getMaxPacketMisorder() const
01022 { return maxPacketMisorder; }
01023
01029 void
01030 setMaxPacketDropout(uint16 packets)
01031 { maxPacketDropout = packets; }
01032
01033 uint16
01034 getDefaultMaxPacketDropout() const
01035 { return defaultMaxPacketDropout; }
01036
01037 uint16
01038 getMaxPacketDropout() const
01039 { return maxPacketDropout; }
01040
01041
01042
01043 inline static size_t
01044 getDefaultMembersSize()
01045 { return defaultMembersSize; }
01046
01055 void
01056 setInQueueCryptoContext(CryptoContext* cc);
01057
01068 void
01069 removeInQueueCryptoContext(CryptoContext* cc);
01070
01078 CryptoContext*
01079 getInQueueCryptoContext(uint32 ssrc);
01080
01081 protected:
01085 IncomingDataQueue(uint32 size);
01086
01087 virtual ~IncomingDataQueue()
01088 { }
01089
01102 bool checkSSRCInIncomingRTPPkt(SyncSourceLink& sourceLink,
01103 bool is_new, InetAddress& na,
01104 tpport_t tp);
01105
01121 void setSourceExpirationPeriod(uint8 intervals)
01122 { sourceExpirationPeriod = intervals; }
01123
01130 virtual size_t
01131 takeInDataPacket();
01132
01133 void renewLocalSSRC();
01134
01144 IncomingDataQueue::IncomingRTPPktLink*
01145 getWaiting(uint32 timestamp, const SyncSource *src = NULL);
01146
01162 bool
01163 recordReception(SyncSourceLink& srcLink, const IncomingRTPPkt& pkt,
01164 const timeval recvtime);
01165
01172 void
01173 recordExtraction(const IncomingRTPPkt& pkt);
01174
01175 void purgeIncomingQueue();
01176
01183 inline virtual void
01184 onNewSyncSource(const SyncSource&)
01185 { }
01186
01187 protected:
01204 inline virtual bool
01205 onRTPPacketRecv(IncomingRTPPkt&)
01206 { return true; }
01207
01216 inline virtual void onExpireRecv(IncomingRTPPkt&)
01217 { return; }
01218
01232 inline virtual bool
01233 onSRTPPacketError(IncomingRTPPkt& pkt, int32 errorCode)
01234 { return false; }
01235
01236 inline virtual bool
01237 end2EndDelayed(IncomingRTPPktLink&)
01238 { return false; }
01239
01255 bool
01256 insertRecvPacket(IncomingRTPPktLink* packetLink);
01257
01269 virtual size_t
01270 recvData(unsigned char* buffer, size_t length,
01271 InetHostAddress& host, tpport_t& port) = 0;
01272
01273 virtual size_t
01274 getNextDataPacketSize() const = 0;
01275
01276 mutable ThreadLock recvLock;
01277
01278 IncomingRTPPktLink* recvFirst, * recvLast;
01279
01280 static const uint8 defaultMinValidPacketSequence;
01281 static const uint16 defaultMaxPacketMisorder;
01282 static const uint16 defaultMaxPacketDropout;
01283 uint8 minValidPacketSequence;
01284 uint16 maxPacketMisorder;
01285 uint16 maxPacketDropout;
01286 static const size_t defaultMembersSize;
01287 uint8 sourceExpirationPeriod;
01288 mutable Mutex cryptoMutex;
01289 std::list<CryptoContext *> cryptoContexts;
01290 };
01291
01293
01294 #ifdef CCXX_NAMESPACES
01295 }
01296 #endif
01297
01298 #endif //CCXX_RTP_IQUEUE_H_
01299