_concurrent_queue_internal.h

00001 /*
00002     Copyright 2005-2010 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_concurrent_queue_internal_H
00022 #define __TBB_concurrent_queue_internal_H
00023 
00024 #include "tbb_stddef.h"
00025 #include "tbb_machine.h"
00026 #include "atomic.h"
00027 #include "spin_mutex.h"
00028 #include "cache_aligned_allocator.h"
00029 #include "tbb_exception.h"
00030 #include <new>
00031 
00032 #if !TBB_USE_EXCEPTIONS && _MSC_VER
00033     // Suppress "C++ exception handler used, but unwind semantics are not enabled" warning in STL headers
00034     #pragma warning (push)
00035     #pragma warning (disable: 4530)
00036 #endif
00037 
00038 #include <iterator>
00039 
00040 #if !TBB_USE_EXCEPTIONS && _MSC_VER
00041     #pragma warning (pop)
00042 #endif
00043 
00044 
00045 namespace tbb {
00046 
00047 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00048 
00049 // forward declaration
00050 namespace strict_ppl {
00051 template<typename T, typename A> class concurrent_queue;
00052 }
00053 
00054 template<typename T, typename A> class concurrent_bounded_queue;
00055 
00056 namespace deprecated {
00057 template<typename T, typename A> class concurrent_queue;
00058 }
00059 #endif
00060 
00062 namespace strict_ppl {
00063 
00065 namespace internal {
00066 
00067 using namespace tbb::internal;
00068 
00069 typedef size_t ticket;
00070 
00071 template<typename T> class micro_queue ;
00072 template<typename T> class micro_queue_pop_finalizer ;
00073 template<typename T> class concurrent_queue_base_v3;
00074 
00076 
00079 struct concurrent_queue_rep_base : no_copy {
00080     template<typename T> friend class micro_queue;
00081     template<typename T> friend class concurrent_queue_base_v3;
00082 
00083 protected:
00085     static const size_t phi = 3;
00086 
00087 public:
00088     // must be power of 2
00089     static const size_t n_queue = 8;
00090 
00092     struct page {
00093         page* next;
00094         uintptr_t mask; 
00095     };
00096 
00097     atomic<ticket> head_counter;
00098     char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
00099     atomic<ticket> tail_counter;
00100     char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
00101 
00103     size_t items_per_page;
00104 
00106     size_t item_size;
00107 
00109     atomic<size_t> n_invalid_entries;
00110 
00111     char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
00112 } ;
00113 
00114 inline bool is_valid_page(const concurrent_queue_rep_base::page* p) {
00115     return uintptr_t(p)>1;
00116 }
00117 
00119 
00122 class concurrent_queue_page_allocator
00123 {
00124     template<typename T> friend class micro_queue ;
00125     template<typename T> friend class micro_queue_pop_finalizer ;
00126 protected:
00127     virtual ~concurrent_queue_page_allocator() {}
00128 private:
00129     virtual concurrent_queue_rep_base::page* allocate_page() = 0;
00130     virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
00131 } ;
00132 
00133 #if _MSC_VER && !defined(__INTEL_COMPILER)
00134 // unary minus operator applied to unsigned type, result still unsigned
00135 #pragma warning( push )
00136 #pragma warning( disable: 4146 )
00137 #endif
00138 
00140 
00142 template<typename T>
00143 class micro_queue : no_copy {
00144     typedef concurrent_queue_rep_base::page page;
00145 
00147     class destroyer: no_copy {
00148         T& my_value;
00149     public:
00150         destroyer( T& value ) : my_value(value) {}
00151         ~destroyer() {my_value.~T();}          
00152     };
00153 
00154     void copy_item( page& dst, size_t index, const void* src ) {
00155         new( &get_ref(dst,index) ) T(*static_cast<const T*>(src)); 
00156     }
00157 
00158     void copy_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
00159         new( &get_ref(dst,dindex) ) T( get_ref(const_cast<page&>(src),sindex) );
00160     }
00161 
00162     void assign_and_destroy_item( void* dst, page& src, size_t index ) {
00163         T& from = get_ref(src,index);
00164         destroyer d(from);
00165         *static_cast<T*>(dst) = from;
00166     }
00167 
00168     void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
00169 
00170 public:
00171     friend class micro_queue_pop_finalizer<T>;
00172 
00173     struct padded_page: page {
00175         padded_page(); 
00177         void operator=( const padded_page& );
00179         T last;
00180     };
00181 
00182     static T& get_ref( page& p, size_t index ) {
00183         return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
00184     }
00185 
00186     atomic<page*> head_page;
00187     atomic<ticket> head_counter;
00188 
00189     atomic<page*> tail_page;
00190     atomic<ticket> tail_counter;
00191 
00192     spin_mutex page_mutex;
00193     
00194     void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) ;
00195 
00196     bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
00197 
00198     micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base ) ;
00199 
00200     page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ;
00201 
00202     void invalidate_page_and_rethrow( ticket k ) ;
00203 };
00204 
00205 template<typename T>
00206 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
00207     atomic_backoff backoff;
00208     do {
00209         backoff.pause();
00210         if( counter&1 ) {
00211             ++rb.n_invalid_entries;
00212             throw_exception( eid_bad_last_alloc );
00213         }
00214     } while( counter!=k ) ;
00215 }
00216 
00217 template<typename T>
00218 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) {
00219     k &= -concurrent_queue_rep_base::n_queue;
00220     page* p = NULL;
00221     size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00222     if( !index ) {
00223         __TBB_TRY {
00224             concurrent_queue_page_allocator& pa = base;
00225             p = pa.allocate_page();
00226         } __TBB_CATCH (...) {
00227             ++base.my_rep->n_invalid_entries;
00228             invalidate_page_and_rethrow( k );
00229         }
00230         p->mask = 0;
00231         p->next = NULL;
00232     }
00233     
00234     if( tail_counter!=k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
00235         
00236     if( p ) {
00237         spin_mutex::scoped_lock lock( page_mutex );
00238         page* q = tail_page;
00239         if( is_valid_page(q) )
00240             q->next = p;
00241         else
00242             head_page = p; 
00243         tail_page = p;
00244     } else {
00245         p = tail_page;
00246     }
00247    
00248     __TBB_TRY {
00249         copy_item( *p, index, item );
00250         // If no exception was thrown, mark item as present.
00251         p->mask |= uintptr_t(1)<<index;
00252         tail_counter += concurrent_queue_rep_base::n_queue; 
00253     } __TBB_CATCH (...) {
00254         ++base.my_rep->n_invalid_entries;
00255         tail_counter += concurrent_queue_rep_base::n_queue; 
00256         __TBB_RETHROW();
00257     }
00258 }
00259 
00260 template<typename T>
00261 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
00262     k &= -concurrent_queue_rep_base::n_queue;
00263     if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
00264     if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
00265     page& p = *head_page;
00266     __TBB_ASSERT( &p, NULL );
00267     size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00268     bool success = false; 
00269     {
00270         micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL ); 
00271         if( p.mask & uintptr_t(1)<<index ) {
00272             success = true;
00273             assign_and_destroy_item( dst, p, index );
00274         } else {
00275             --base.my_rep->n_invalid_entries;
00276         }
00277     }
00278     return success;
00279 }
00280 
00281 template<typename T>
00282 micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base ) {
00283     head_counter = src.head_counter;
00284     tail_counter = src.tail_counter;
00285     page_mutex   = src.page_mutex;
00286 
00287     const page* srcp = src.head_page;
00288     if( is_valid_page(srcp) ) {
00289         ticket g_index = head_counter;
00290         __TBB_TRY {
00291             size_t n_items  = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
00292             size_t index = head_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00293             size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
00294 
00295             head_page = make_copy( base, srcp, index, end_in_first_page, g_index );
00296             page* cur_page = head_page;
00297 
00298             if( srcp != src.tail_page ) {
00299                 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
00300                     cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index );
00301                     cur_page = cur_page->next;
00302                 }
00303 
00304                 __TBB_ASSERT( srcp==src.tail_page, NULL );
00305                 size_t last_index = tail_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00306                 if( last_index==0 ) last_index = base.my_rep->items_per_page;
00307 
00308                 cur_page->next = make_copy( base, srcp, 0, last_index, g_index );
00309                 cur_page = cur_page->next;
00310             }
00311             tail_page = cur_page;
00312         } __TBB_CATCH (...) {
00313             invalidate_page_and_rethrow( g_index );
00314         }
00315     } else {
00316         head_page = tail_page = NULL;
00317     }
00318     return *this;
00319 }
00320 
00321 template<typename T>
00322 void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) {
00323     // Append an invalid page at address 1 so that no more pushes are allowed.
00324     page* invalid_page = (page*)uintptr_t(1);
00325     {
00326         spin_mutex::scoped_lock lock( page_mutex );
00327         tail_counter = k+concurrent_queue_rep_base::n_queue+1;
00328         page* q = tail_page;
00329         if( is_valid_page(q) )
00330             q->next = invalid_page;
00331         else
00332             head_page = invalid_page;
00333         tail_page = invalid_page;
00334     }
00335     __TBB_RETHROW();
00336 }
00337 
00338 template<typename T>
00339 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base, const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) {
00340     concurrent_queue_page_allocator& pa = base;
00341     page* new_page = pa.allocate_page();
00342     new_page->next = NULL;
00343     new_page->mask = src_page->mask;
00344     for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
00345         if( new_page->mask & uintptr_t(1)<<begin_in_page )
00346             copy_item( *new_page, begin_in_page, *src_page, begin_in_page );
00347     return new_page;
00348 }
00349 
00350 template<typename T>
00351 class micro_queue_pop_finalizer: no_copy {
00352     typedef concurrent_queue_rep_base::page page;
00353     ticket my_ticket;
00354     micro_queue<T>& my_queue;
00355     page* my_page; 
00356     concurrent_queue_page_allocator& allocator;
00357 public:
00358     micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
00359         my_ticket(k), my_queue(queue), my_page(p), allocator(b)
00360     {}
00361     ~micro_queue_pop_finalizer() ;
00362 };
00363 
00364 template<typename T>
00365 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
00366     page* p = my_page;
00367     if( is_valid_page(p) ) {
00368         spin_mutex::scoped_lock lock( my_queue.page_mutex );
00369         page* q = p->next;
00370         my_queue.head_page = q;
00371         if( !is_valid_page(q) ) {
00372             my_queue.tail_page = NULL;
00373         }
00374     }
00375     my_queue.head_counter = my_ticket;
00376     if( is_valid_page(p) ) {
00377         allocator.deallocate_page( p );
00378     }
00379 }
00380 
00381 #if _MSC_VER && !defined(__INTEL_COMPILER)
00382 #pragma warning( pop )
00383 #endif // warning 4146 is back
00384 
00385 template<typename T> class concurrent_queue_iterator_rep ;
00386 template<typename T> class concurrent_queue_iterator_base_v3;
00387 
00389 
00392 template<typename T>
00393 struct concurrent_queue_rep : public concurrent_queue_rep_base {
00394     micro_queue<T> array[n_queue];
00395 
00397     static size_t index( ticket k ) {
00398         return k*phi%n_queue;
00399     }
00400 
00401     micro_queue<T>& choose( ticket k ) {
00402         // The formula here approximates LRU in a cache-oblivious way.
00403         return array[index(k)];
00404     }
00405 };
00406 
00408 
00412 template<typename T>
00413 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
00415     concurrent_queue_rep<T>* my_rep;
00416 
00417     friend struct concurrent_queue_rep<T>;
00418     friend class micro_queue<T>;
00419     friend class concurrent_queue_iterator_rep<T>;
00420     friend class concurrent_queue_iterator_base_v3<T>;
00421 
00422 protected:
00423     typedef typename concurrent_queue_rep<T>::page page;
00424 
00425 private:
00426     typedef typename micro_queue<T>::padded_page padded_page;
00427 
00428     /* override */ virtual page *allocate_page() {
00429         concurrent_queue_rep<T>& r = *my_rep;
00430         size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
00431         return reinterpret_cast<page*>(allocate_block ( n ));
00432     }
00433 
00434     /* override */ virtual void deallocate_page( concurrent_queue_rep_base::page *p ) {
00435         concurrent_queue_rep<T>& r = *my_rep;
00436         size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
00437         deallocate_block( reinterpret_cast<void*>(p), n );
00438     }
00439 
00441     virtual void *allocate_block( size_t n ) = 0;
00442 
00444     virtual void deallocate_block( void *p, size_t n ) = 0;
00445 
00446 protected:
00447     concurrent_queue_base_v3();
00448 
00449     /* override */ virtual ~concurrent_queue_base_v3() {
00450 #if __TBB_USE_ASSERT
00451         size_t nq = my_rep->n_queue;
00452         for( size_t i=0; i<nq; i++ )
00453             __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
00454 #endif /* __TBB_USE_ASSERT */
00455         cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
00456     }
00457 
00459     void internal_push( const void* src ) {
00460         concurrent_queue_rep<T>& r = *my_rep;
00461         ticket k = r.tail_counter++;
00462         r.choose(k).push( src, k, *this );
00463     }
00464 
00466 
00467     bool internal_try_pop( void* dst ) ;
00468 
00470     size_t internal_size() const ;
00471 
00473     bool internal_empty() const ;
00474 
00476     /* note that the name may be misleading, but it remains so due to a historical accident. */
00477     void internal_finish_clear() ;
00478 
00480     void internal_throw_exception() const {
00481         throw_exception( eid_bad_alloc );
00482     }
00483 
00485     void assign( const concurrent_queue_base_v3& src ) ;
00486 };
00487 
00488 template<typename T>
00489 concurrent_queue_base_v3<T>::concurrent_queue_base_v3() {
00490     const size_t item_size = sizeof(T);
00491     my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
00492     __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
00493     __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
00494     __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
00495     __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
00496     memset(my_rep,0,sizeof(concurrent_queue_rep<T>));
00497     my_rep->item_size = item_size;
00498     my_rep->items_per_page = item_size<=8 ? 32 :
00499                              item_size<=16 ? 16 : 
00500                              item_size<=32 ? 8 :
00501                              item_size<=64 ? 4 :
00502                              item_size<=128 ? 2 :
00503                              1;
00504 }
00505 
00506 template<typename T>
00507 bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
00508     concurrent_queue_rep<T>& r = *my_rep;
00509     ticket k;
00510     do {
00511         k = r.head_counter;
00512         for(;;) {
00513             if( r.tail_counter<=k ) {
00514                 // Queue is empty 
00515                 return false;
00516             }
00517             // Queue had item with ticket k when we looked.  Attempt to get that item.
00518             ticket tk=k;
00519 #if defined(_MSC_VER) && defined(_Wp64)
00520     #pragma warning (push)
00521     #pragma warning (disable: 4267)
00522 #endif
00523             k = r.head_counter.compare_and_swap( tk+1, tk );
00524 #if defined(_MSC_VER) && defined(_Wp64)
00525     #pragma warning (pop)
00526 #endif
00527             if( k==tk )
00528                 break;
00529             // Another thread snatched the item, retry.
00530         }
00531     } while( !r.choose( k ).pop( dst, k, *this ) );
00532     return true;
00533 }
00534 
00535 template<typename T>
00536 size_t concurrent_queue_base_v3<T>::internal_size() const {
00537     concurrent_queue_rep<T>& r = *my_rep;
00538     __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
00539     ticket hc = r.head_counter;
00540     size_t nie = r.n_invalid_entries;
00541     ticket tc = r.tail_counter;
00542     __TBB_ASSERT( hc!=tc || !nie, NULL );
00543     ptrdiff_t sz = tc-hc-nie;
00544     return sz<0 ? 0 :  size_t(sz);
00545 }
00546 
00547 template<typename T>
00548 bool concurrent_queue_base_v3<T>::internal_empty() const {
00549     concurrent_queue_rep<T>& r = *my_rep;
00550     ticket tc = r.tail_counter;
00551     ticket hc = r.head_counter;
00552     // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
00553     return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
00554 }
00555 
00556 template<typename T>
00557 void concurrent_queue_base_v3<T>::internal_finish_clear() {
00558     concurrent_queue_rep<T>& r = *my_rep;
00559     size_t nq = r.n_queue;
00560     for( size_t i=0; i<nq; ++i ) {
00561         page* tp = r.array[i].tail_page;
00562         if( is_valid_page(tp) ) {
00563             __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
00564             deallocate_page( tp );
00565             r.array[i].tail_page = NULL;
00566         } else 
00567             __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
00568     }
00569 }
00570 
00571 template<typename T>
00572 void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src ) {
00573     concurrent_queue_rep<T>& r = *my_rep;
00574     r.items_per_page = src.my_rep->items_per_page;
00575 
00576     // copy concurrent_queue_rep.
00577     r.head_counter = src.my_rep->head_counter;
00578     r.tail_counter = src.my_rep->tail_counter;
00579     r.n_invalid_entries = src.my_rep->n_invalid_entries;
00580 
00581     // copy micro_queues
00582     for( size_t i = 0; i<r.n_queue; ++i )
00583         r.array[i].assign( src.my_rep->array[i], *this);
00584 
00585     __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter, 
00586             "the source concurrent queue should not be concurrently modified." );
00587 }
00588 
00589 template<typename Container, typename Value> class concurrent_queue_iterator;
00590 
00591 template<typename T>
00592 class concurrent_queue_iterator_rep: no_assign {
00593     typedef typename micro_queue<T>::padded_page padded_page;
00594 public:
00595     ticket head_counter;
00596     const concurrent_queue_base_v3<T>& my_queue;
00597     typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
00598     concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
00599         head_counter(queue.my_rep->head_counter),
00600         my_queue(queue)
00601     {
00602         for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
00603             array[k] = queue.my_rep->array[k].head_page;
00604     }
00605 
00607     bool get_item( T*& item, size_t k ) ;
00608 };
00609 
00610 template<typename T>
00611 bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
00612     if( k==my_queue.my_rep->tail_counter ) {
00613         item = NULL;
00614         return true;
00615     } else {
00616         typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
00617         __TBB_ASSERT(p,NULL);
00618         size_t i = k/concurrent_queue_rep<T>::n_queue & (my_queue.my_rep->items_per_page-1);
00619         item = &micro_queue<T>::get_ref(*p,i);
00620         return (p->mask & uintptr_t(1)<<i)!=0;
00621     }
00622 }
00623 
00625 
00626 template<typename Value>
00627 class concurrent_queue_iterator_base_v3 : no_assign {
00629 
00630     concurrent_queue_iterator_rep<Value>* my_rep;
00631 
00632     template<typename C, typename T, typename U>
00633     friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00634 
00635     template<typename C, typename T, typename U>
00636     friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00637 protected:
00639     Value* my_item;
00640 
00642     concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
00643 #if __GNUC__==4&&__GNUC_MINOR__==3
00644         // to get around a possible gcc 4.3 bug
00645         __asm__ __volatile__("": : :"memory");
00646 #endif
00647     }
00648 
00650     concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
00651         assign(i);
00652     }
00653 
00655     concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
00656 
00658     void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
00659 
00661     void advance() ;
00662 
00664     ~concurrent_queue_iterator_base_v3() {
00665         cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
00666         my_rep = NULL;
00667     }
00668 };
00669 
00670 template<typename Value>
00671 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
00672     my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
00673     new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
00674     size_t k = my_rep->head_counter;
00675     if( !my_rep->get_item(my_item, k) ) advance();
00676 }
00677 
00678 template<typename Value>
00679 void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
00680     if( my_rep!=other.my_rep ) {
00681         if( my_rep ) {
00682             cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
00683             my_rep = NULL;
00684         }
00685         if( other.my_rep ) {
00686             my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
00687             new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
00688         }
00689     }
00690     my_item = other.my_item;
00691 }
00692 
00693 template<typename Value>
00694 void concurrent_queue_iterator_base_v3<Value>::advance() {
00695     __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );  
00696     size_t k = my_rep->head_counter;
00697     const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
00698 #if TBB_USE_ASSERT
00699     Value* tmp;
00700     my_rep->get_item(tmp,k);
00701     __TBB_ASSERT( my_item==tmp, NULL );
00702 #endif /* TBB_USE_ASSERT */
00703     size_t i = k/concurrent_queue_rep<Value>::n_queue & (queue.my_rep->items_per_page-1);
00704     if( i==queue.my_rep->items_per_page-1 ) {
00705         typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
00706         root = root->next;
00707     }
00708     // advance k
00709     my_rep->head_counter = ++k;
00710     if( !my_rep->get_item(my_item, k) ) advance();
00711 }
00712 
00714 
00715 template<typename T> struct tbb_remove_cv {typedef T type;};
00716 template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
00717 template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
00718 template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
00719 
00721 
00723 template<typename Container, typename Value>
00724 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
00725         public std::iterator<std::forward_iterator_tag,Value> {
00726 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00727     template<typename T, class A>
00728     friend class ::tbb::strict_ppl::concurrent_queue;
00729 #else
00730 public: // workaround for MSVC
00731 #endif 
00733     concurrent_queue_iterator( const concurrent_queue_base_v3<Value>& queue ) :
00734         concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
00735     {
00736     }
00737 
00738 public:
00739     concurrent_queue_iterator() {}
00740 
00741     concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
00742         concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
00743     {}
00744 
00746     concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00747         assign(other);
00748         return *this;
00749     }
00750 
00752     Value& operator*() const {
00753         return *static_cast<Value*>(this->my_item);
00754     }
00755 
00756     Value* operator->() const {return &operator*();}
00757 
00759     concurrent_queue_iterator& operator++() {
00760         this->advance();
00761         return *this;
00762     }
00763 
00765     Value* operator++(int) {
00766         Value* result = &operator*();
00767         operator++();
00768         return result;
00769     }
00770 }; // concurrent_queue_iterator
00771 
00772 
00773 template<typename C, typename T, typename U>
00774 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00775     return i.my_item==j.my_item;
00776 }
00777 
00778 template<typename C, typename T, typename U>
00779 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00780     return i.my_item!=j.my_item;
00781 }
00782 
00783 } // namespace internal
00784 
00786 
00787 } // namespace strict_ppl
00788 
00790 namespace internal {
00791 
00792 class concurrent_queue_rep;
00793 class concurrent_queue_iterator_rep;
00794 class concurrent_queue_iterator_base_v3;
00795 template<typename Container, typename Value> class concurrent_queue_iterator;
00796 
00798 
00800 class concurrent_queue_base_v3: no_copy {
00802     concurrent_queue_rep* my_rep;
00803 
00804     friend class concurrent_queue_rep;
00805     friend struct micro_queue;
00806     friend class micro_queue_pop_finalizer;
00807     friend class concurrent_queue_iterator_rep;
00808     friend class concurrent_queue_iterator_base_v3;
00809 protected:
00811     struct page {
00812         page* next;
00813         uintptr_t mask; 
00814     };
00815 
00817     ptrdiff_t my_capacity;
00818    
00820     size_t items_per_page;
00821 
00823     size_t item_size;
00824 
00825 #if __GNUC__==3&&__GNUC_MINOR__==3
00826 public:
00827 #endif /* __GNUC__==3&&__GNUC_MINOR__==3 */
00828     template<typename T>
00829     struct padded_page: page {
00831         padded_page(); 
00833         void operator=( const padded_page& );
00835         T last;
00836     };
00837 
00838 private:
00839     virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
00840     virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
00841 protected:
00842     __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
00843     virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
00844 
00846     void __TBB_EXPORTED_METHOD internal_push( const void* src );
00847 
00849     void __TBB_EXPORTED_METHOD internal_pop( void* dst );
00850 
00852     bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
00853 
00855 
00856     bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
00857 
00859     ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
00860 
00862     bool __TBB_EXPORTED_METHOD internal_empty() const;
00863 
00865     void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
00866 
00868     virtual page *allocate_page() = 0;
00869 
00871     virtual void deallocate_page( page *p ) = 0;
00872 
00874     /* note that the name may be misleading, but it remains so due to a historical accident. */
00875     void __TBB_EXPORTED_METHOD internal_finish_clear() ;
00876 
00878     void __TBB_EXPORTED_METHOD internal_throw_exception() const;
00879 
00881     void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
00882 
00883 private:
00884     virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
00885 };
00886 
00888 
00889 class concurrent_queue_iterator_base_v3 {
00891 
00892     concurrent_queue_iterator_rep* my_rep;
00893 
00894     template<typename C, typename T, typename U>
00895     friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00896 
00897     template<typename C, typename T, typename U>
00898     friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00899 
00900     void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
00901 protected:
00903     void* my_item;
00904 
00906     concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
00907 
00909     concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
00910         assign(i);
00911     }
00912 
00914 
00915     __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
00916 
00918     __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data );
00919 
00921     void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
00922 
00924     void __TBB_EXPORTED_METHOD advance();
00925 
00927     __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
00928 };
00929 
00930 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
00931 
00933 
00935 template<typename Container, typename Value>
00936 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
00937         public std::iterator<std::forward_iterator_tag,Value> {
00938 
00939 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
00940     template<typename T, class A>
00941     friend class ::tbb::concurrent_bounded_queue;
00942 
00943     template<typename T, class A>
00944     friend class ::tbb::deprecated::concurrent_queue;
00945 #else
00946 public: // workaround for MSVC
00947 #endif 
00949     concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
00950         concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page<Value>,last))
00951     {
00952     }
00953 
00954 public:
00955     concurrent_queue_iterator() {}
00956 
00959     concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
00960         concurrent_queue_iterator_base_v3(other)
00961     {}
00962 
00964     concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00965         assign(other);
00966         return *this;
00967     }
00968 
00970     Value& operator*() const {
00971         return *static_cast<Value*>(my_item);
00972     }
00973 
00974     Value* operator->() const {return &operator*();}
00975 
00977     concurrent_queue_iterator& operator++() {
00978         advance();
00979         return *this;
00980     }
00981 
00983     Value* operator++(int) {
00984         Value* result = &operator*();
00985         operator++();
00986         return result;
00987     }
00988 }; // concurrent_queue_iterator
00989 
00990 
00991 template<typename C, typename T, typename U>
00992 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00993     return i.my_item==j.my_item;
00994 }
00995 
00996 template<typename C, typename T, typename U>
00997 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00998     return i.my_item!=j.my_item;
00999 }
01000 
01001 } // namespace internal;
01002 
01004 
01005 } // namespace tbb
01006 
01007 #endif /* __TBB_concurrent_queue_internal_H */

Copyright © 2005-2010 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.