00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_concurrent_queue_internal_H
00022 #define __TBB_concurrent_queue_internal_H
00023
00024 #include "tbb_stddef.h"
00025 #include "tbb_machine.h"
00026 #include "atomic.h"
00027 #include "spin_mutex.h"
00028 #include "cache_aligned_allocator.h"
00029 #include "tbb_exception.h"
00030 #include <new>
00031
00032 #if !TBB_USE_EXCEPTIONS && _MSC_VER
00033
00034 #pragma warning (push)
00035 #pragma warning (disable: 4530)
00036 #endif
00037
00038 #include <iterator>
00039
00040 #if !TBB_USE_EXCEPTIONS && _MSC_VER
00041 #pragma warning (pop)
00042 #endif
00043
00044
00045 namespace tbb {
00046
00047 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00048
00049
00050 namespace strict_ppl {
00051 template<typename T, typename A> class concurrent_queue;
00052 }
00053
00054 template<typename T, typename A> class concurrent_bounded_queue;
00055
00056 namespace deprecated {
00057 template<typename T, typename A> class concurrent_queue;
00058 }
00059 #endif
00060
00062 namespace strict_ppl {
00063
00065 namespace internal {
00066
00067 using namespace tbb::internal;
00068
00069 typedef size_t ticket;
00070
00071 template<typename T> class micro_queue ;
00072 template<typename T> class micro_queue_pop_finalizer ;
00073 template<typename T> class concurrent_queue_base_v3;
00074
00076
00079 struct concurrent_queue_rep_base : no_copy {
00080 template<typename T> friend class micro_queue;
00081 template<typename T> friend class concurrent_queue_base_v3;
00082
00083 protected:
00085 static const size_t phi = 3;
00086
00087 public:
00088
00089 static const size_t n_queue = 8;
00090
00092 struct page {
00093 page* next;
00094 uintptr_t mask;
00095 };
00096
00097 atomic<ticket> head_counter;
00098 char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
00099 atomic<ticket> tail_counter;
00100 char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
00101
00103 size_t items_per_page;
00104
00106 size_t item_size;
00107
00109 atomic<size_t> n_invalid_entries;
00110
00111 char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
00112 } ;
00113
00114 inline bool is_valid_page(const concurrent_queue_rep_base::page* p) {
00115 return uintptr_t(p)>1;
00116 }
00117
00119
00122 class concurrent_queue_page_allocator
00123 {
00124 template<typename T> friend class micro_queue ;
00125 template<typename T> friend class micro_queue_pop_finalizer ;
00126 protected:
00127 virtual ~concurrent_queue_page_allocator() {}
00128 private:
00129 virtual concurrent_queue_rep_base::page* allocate_page() = 0;
00130 virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
00131 } ;
00132
00133 #if _MSC_VER && !defined(__INTEL_COMPILER)
00134
00135 #pragma warning( push )
00136 #pragma warning( disable: 4146 )
00137 #endif
00138
00140
00142 template<typename T>
00143 class micro_queue : no_copy {
00144 typedef concurrent_queue_rep_base::page page;
00145
00147 class destroyer: no_copy {
00148 T& my_value;
00149 public:
00150 destroyer( T& value ) : my_value(value) {}
00151 ~destroyer() {my_value.~T();}
00152 };
00153
00154 void copy_item( page& dst, size_t index, const void* src ) {
00155 new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
00156 }
00157
00158 void copy_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
00159 new( &get_ref(dst,dindex) ) T( get_ref(const_cast<page&>(src),sindex) );
00160 }
00161
00162 void assign_and_destroy_item( void* dst, page& src, size_t index ) {
00163 T& from = get_ref(src,index);
00164 destroyer d(from);
00165 *static_cast<T*>(dst) = from;
00166 }
00167
00168 void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
00169
00170 public:
00171 friend class micro_queue_pop_finalizer<T>;
00172
00173 struct padded_page: page {
00175 padded_page();
00177 void operator=( const padded_page& );
00179 T last;
00180 };
00181
00182 static T& get_ref( page& p, size_t index ) {
00183 return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
00184 }
00185
00186 atomic<page*> head_page;
00187 atomic<ticket> head_counter;
00188
00189 atomic<page*> tail_page;
00190 atomic<ticket> tail_counter;
00191
00192 spin_mutex page_mutex;
00193
00194 void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) ;
00195
00196 bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
00197
00198 micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base ) ;
00199
00200 page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ;
00201
00202 void invalidate_page_and_rethrow( ticket k ) ;
00203 };
00204
00205 template<typename T>
00206 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
00207 atomic_backoff backoff;
00208 do {
00209 backoff.pause();
00210 if( counter&1 ) {
00211 ++rb.n_invalid_entries;
00212 throw_exception( eid_bad_last_alloc );
00213 }
00214 } while( counter!=k ) ;
00215 }
00216
00217 template<typename T>
00218 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) {
00219 k &= -concurrent_queue_rep_base::n_queue;
00220 page* p = NULL;
00221 size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00222 if( !index ) {
00223 __TBB_TRY {
00224 concurrent_queue_page_allocator& pa = base;
00225 p = pa.allocate_page();
00226 } __TBB_CATCH (...) {
00227 ++base.my_rep->n_invalid_entries;
00228 invalidate_page_and_rethrow( k );
00229 }
00230 p->mask = 0;
00231 p->next = NULL;
00232 }
00233
00234 if( tail_counter!=k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
00235
00236 if( p ) {
00237 spin_mutex::scoped_lock lock( page_mutex );
00238 page* q = tail_page;
00239 if( is_valid_page(q) )
00240 q->next = p;
00241 else
00242 head_page = p;
00243 tail_page = p;
00244 } else {
00245 p = tail_page;
00246 }
00247
00248 __TBB_TRY {
00249 copy_item( *p, index, item );
00250
00251 p->mask |= uintptr_t(1)<<index;
00252 tail_counter += concurrent_queue_rep_base::n_queue;
00253 } __TBB_CATCH (...) {
00254 ++base.my_rep->n_invalid_entries;
00255 tail_counter += concurrent_queue_rep_base::n_queue;
00256 __TBB_RETHROW();
00257 }
00258 }
00259
00260 template<typename T>
00261 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
00262 k &= -concurrent_queue_rep_base::n_queue;
00263 if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
00264 if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
00265 page& p = *head_page;
00266 __TBB_ASSERT( &p, NULL );
00267 size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00268 bool success = false;
00269 {
00270 micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL );
00271 if( p.mask & uintptr_t(1)<<index ) {
00272 success = true;
00273 assign_and_destroy_item( dst, p, index );
00274 } else {
00275 --base.my_rep->n_invalid_entries;
00276 }
00277 }
00278 return success;
00279 }
00280
00281 template<typename T>
00282 micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base ) {
00283 head_counter = src.head_counter;
00284 tail_counter = src.tail_counter;
00285 page_mutex = src.page_mutex;
00286
00287 const page* srcp = src.head_page;
00288 if( is_valid_page(srcp) ) {
00289 ticket g_index = head_counter;
00290 __TBB_TRY {
00291 size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
00292 size_t index = head_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00293 size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
00294
00295 head_page = make_copy( base, srcp, index, end_in_first_page, g_index );
00296 page* cur_page = head_page;
00297
00298 if( srcp != src.tail_page ) {
00299 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
00300 cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index );
00301 cur_page = cur_page->next;
00302 }
00303
00304 __TBB_ASSERT( srcp==src.tail_page, NULL );
00305 size_t last_index = tail_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1);
00306 if( last_index==0 ) last_index = base.my_rep->items_per_page;
00307
00308 cur_page->next = make_copy( base, srcp, 0, last_index, g_index );
00309 cur_page = cur_page->next;
00310 }
00311 tail_page = cur_page;
00312 } __TBB_CATCH (...) {
00313 invalidate_page_and_rethrow( g_index );
00314 }
00315 } else {
00316 head_page = tail_page = NULL;
00317 }
00318 return *this;
00319 }
00320
00321 template<typename T>
00322 void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) {
00323
00324 page* invalid_page = (page*)uintptr_t(1);
00325 {
00326 spin_mutex::scoped_lock lock( page_mutex );
00327 tail_counter = k+concurrent_queue_rep_base::n_queue+1;
00328 page* q = tail_page;
00329 if( is_valid_page(q) )
00330 q->next = invalid_page;
00331 else
00332 head_page = invalid_page;
00333 tail_page = invalid_page;
00334 }
00335 __TBB_RETHROW();
00336 }
00337
00338 template<typename T>
00339 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base, const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) {
00340 concurrent_queue_page_allocator& pa = base;
00341 page* new_page = pa.allocate_page();
00342 new_page->next = NULL;
00343 new_page->mask = src_page->mask;
00344 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
00345 if( new_page->mask & uintptr_t(1)<<begin_in_page )
00346 copy_item( *new_page, begin_in_page, *src_page, begin_in_page );
00347 return new_page;
00348 }
00349
00350 template<typename T>
00351 class micro_queue_pop_finalizer: no_copy {
00352 typedef concurrent_queue_rep_base::page page;
00353 ticket my_ticket;
00354 micro_queue<T>& my_queue;
00355 page* my_page;
00356 concurrent_queue_page_allocator& allocator;
00357 public:
00358 micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
00359 my_ticket(k), my_queue(queue), my_page(p), allocator(b)
00360 {}
00361 ~micro_queue_pop_finalizer() ;
00362 };
00363
00364 template<typename T>
00365 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
00366 page* p = my_page;
00367 if( is_valid_page(p) ) {
00368 spin_mutex::scoped_lock lock( my_queue.page_mutex );
00369 page* q = p->next;
00370 my_queue.head_page = q;
00371 if( !is_valid_page(q) ) {
00372 my_queue.tail_page = NULL;
00373 }
00374 }
00375 my_queue.head_counter = my_ticket;
00376 if( is_valid_page(p) ) {
00377 allocator.deallocate_page( p );
00378 }
00379 }
00380
00381 #if _MSC_VER && !defined(__INTEL_COMPILER)
00382 #pragma warning( pop )
00383 #endif // warning 4146 is back
00384
00385 template<typename T> class concurrent_queue_iterator_rep ;
00386 template<typename T> class concurrent_queue_iterator_base_v3;
00387
00389
00392 template<typename T>
00393 struct concurrent_queue_rep : public concurrent_queue_rep_base {
00394 micro_queue<T> array[n_queue];
00395
00397 static size_t index( ticket k ) {
00398 return k*phi%n_queue;
00399 }
00400
00401 micro_queue<T>& choose( ticket k ) {
00402
00403 return array[index(k)];
00404 }
00405 };
00406
00408
00412 template<typename T>
00413 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
00415 concurrent_queue_rep<T>* my_rep;
00416
00417 friend struct concurrent_queue_rep<T>;
00418 friend class micro_queue<T>;
00419 friend class concurrent_queue_iterator_rep<T>;
00420 friend class concurrent_queue_iterator_base_v3<T>;
00421
00422 protected:
00423 typedef typename concurrent_queue_rep<T>::page page;
00424
00425 private:
00426 typedef typename micro_queue<T>::padded_page padded_page;
00427
00428 virtual page *allocate_page() {
00429 concurrent_queue_rep<T>& r = *my_rep;
00430 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
00431 return reinterpret_cast<page*>(allocate_block ( n ));
00432 }
00433
00434 virtual void deallocate_page( concurrent_queue_rep_base::page *p ) {
00435 concurrent_queue_rep<T>& r = *my_rep;
00436 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
00437 deallocate_block( reinterpret_cast<void*>(p), n );
00438 }
00439
00441 virtual void *allocate_block( size_t n ) = 0;
00442
00444 virtual void deallocate_block( void *p, size_t n ) = 0;
00445
00446 protected:
00447 concurrent_queue_base_v3();
00448
00449 virtual ~concurrent_queue_base_v3() {
00450 #if __TBB_USE_ASSERT
00451 size_t nq = my_rep->n_queue;
00452 for( size_t i=0; i<nq; i++ )
00453 __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
00454 #endif
00455 cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
00456 }
00457
00459 void internal_push( const void* src ) {
00460 concurrent_queue_rep<T>& r = *my_rep;
00461 ticket k = r.tail_counter++;
00462 r.choose(k).push( src, k, *this );
00463 }
00464
00466
00467 bool internal_try_pop( void* dst ) ;
00468
00470 size_t internal_size() const ;
00471
00473 bool internal_empty() const ;
00474
00476
00477 void internal_finish_clear() ;
00478
00480 void internal_throw_exception() const {
00481 throw_exception( eid_bad_alloc );
00482 }
00483
00485 void assign( const concurrent_queue_base_v3& src ) ;
00486 };
00487
00488 template<typename T>
00489 concurrent_queue_base_v3<T>::concurrent_queue_base_v3() {
00490 const size_t item_size = sizeof(T);
00491 my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
00492 __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
00493 __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
00494 __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
00495 __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
00496 memset(my_rep,0,sizeof(concurrent_queue_rep<T>));
00497 my_rep->item_size = item_size;
00498 my_rep->items_per_page = item_size<=8 ? 32 :
00499 item_size<=16 ? 16 :
00500 item_size<=32 ? 8 :
00501 item_size<=64 ? 4 :
00502 item_size<=128 ? 2 :
00503 1;
00504 }
00505
00506 template<typename T>
00507 bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
00508 concurrent_queue_rep<T>& r = *my_rep;
00509 ticket k;
00510 do {
00511 k = r.head_counter;
00512 for(;;) {
00513 if( r.tail_counter<=k ) {
00514
00515 return false;
00516 }
00517
00518 ticket tk=k;
00519 #if defined(_MSC_VER) && defined(_Wp64)
00520 #pragma warning (push)
00521 #pragma warning (disable: 4267)
00522 #endif
00523 k = r.head_counter.compare_and_swap( tk+1, tk );
00524 #if defined(_MSC_VER) && defined(_Wp64)
00525 #pragma warning (pop)
00526 #endif
00527 if( k==tk )
00528 break;
00529
00530 }
00531 } while( !r.choose( k ).pop( dst, k, *this ) );
00532 return true;
00533 }
00534
00535 template<typename T>
00536 size_t concurrent_queue_base_v3<T>::internal_size() const {
00537 concurrent_queue_rep<T>& r = *my_rep;
00538 __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
00539 ticket hc = r.head_counter;
00540 size_t nie = r.n_invalid_entries;
00541 ticket tc = r.tail_counter;
00542 __TBB_ASSERT( hc!=tc || !nie, NULL );
00543 ptrdiff_t sz = tc-hc-nie;
00544 return sz<0 ? 0 : size_t(sz);
00545 }
00546
00547 template<typename T>
00548 bool concurrent_queue_base_v3<T>::internal_empty() const {
00549 concurrent_queue_rep<T>& r = *my_rep;
00550 ticket tc = r.tail_counter;
00551 ticket hc = r.head_counter;
00552
00553 return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
00554 }
00555
00556 template<typename T>
00557 void concurrent_queue_base_v3<T>::internal_finish_clear() {
00558 concurrent_queue_rep<T>& r = *my_rep;
00559 size_t nq = r.n_queue;
00560 for( size_t i=0; i<nq; ++i ) {
00561 page* tp = r.array[i].tail_page;
00562 if( is_valid_page(tp) ) {
00563 __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
00564 deallocate_page( tp );
00565 r.array[i].tail_page = NULL;
00566 } else
00567 __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
00568 }
00569 }
00570
00571 template<typename T>
00572 void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src ) {
00573 concurrent_queue_rep<T>& r = *my_rep;
00574 r.items_per_page = src.my_rep->items_per_page;
00575
00576
00577 r.head_counter = src.my_rep->head_counter;
00578 r.tail_counter = src.my_rep->tail_counter;
00579 r.n_invalid_entries = src.my_rep->n_invalid_entries;
00580
00581
00582 for( size_t i = 0; i<r.n_queue; ++i )
00583 r.array[i].assign( src.my_rep->array[i], *this);
00584
00585 __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter,
00586 "the source concurrent queue should not be concurrently modified." );
00587 }
00588
00589 template<typename Container, typename Value> class concurrent_queue_iterator;
00590
00591 template<typename T>
00592 class concurrent_queue_iterator_rep: no_assign {
00593 typedef typename micro_queue<T>::padded_page padded_page;
00594 public:
00595 ticket head_counter;
00596 const concurrent_queue_base_v3<T>& my_queue;
00597 typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
00598 concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
00599 head_counter(queue.my_rep->head_counter),
00600 my_queue(queue)
00601 {
00602 for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
00603 array[k] = queue.my_rep->array[k].head_page;
00604 }
00605
00607 bool get_item( T*& item, size_t k ) ;
00608 };
00609
00610 template<typename T>
00611 bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
00612 if( k==my_queue.my_rep->tail_counter ) {
00613 item = NULL;
00614 return true;
00615 } else {
00616 typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
00617 __TBB_ASSERT(p,NULL);
00618 size_t i = k/concurrent_queue_rep<T>::n_queue & (my_queue.my_rep->items_per_page-1);
00619 item = µ_queue<T>::get_ref(*p,i);
00620 return (p->mask & uintptr_t(1)<<i)!=0;
00621 }
00622 }
00623
00625
00626 template<typename Value>
00627 class concurrent_queue_iterator_base_v3 : no_assign {
00629
00630 concurrent_queue_iterator_rep<Value>* my_rep;
00631
00632 template<typename C, typename T, typename U>
00633 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00634
00635 template<typename C, typename T, typename U>
00636 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00637 protected:
00639 Value* my_item;
00640
00642 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
00643 #if __GNUC__==4&&__GNUC_MINOR__==3
00644
00645 __asm__ __volatile__("": : :"memory");
00646 #endif
00647 }
00648
00650 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
00651 assign(i);
00652 }
00653
00655 concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
00656
00658 void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
00659
00661 void advance() ;
00662
00664 ~concurrent_queue_iterator_base_v3() {
00665 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
00666 my_rep = NULL;
00667 }
00668 };
00669
00670 template<typename Value>
00671 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
00672 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
00673 new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
00674 size_t k = my_rep->head_counter;
00675 if( !my_rep->get_item(my_item, k) ) advance();
00676 }
00677
00678 template<typename Value>
00679 void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
00680 if( my_rep!=other.my_rep ) {
00681 if( my_rep ) {
00682 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
00683 my_rep = NULL;
00684 }
00685 if( other.my_rep ) {
00686 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
00687 new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
00688 }
00689 }
00690 my_item = other.my_item;
00691 }
00692
00693 template<typename Value>
00694 void concurrent_queue_iterator_base_v3<Value>::advance() {
00695 __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
00696 size_t k = my_rep->head_counter;
00697 const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
00698 #if TBB_USE_ASSERT
00699 Value* tmp;
00700 my_rep->get_item(tmp,k);
00701 __TBB_ASSERT( my_item==tmp, NULL );
00702 #endif
00703 size_t i = k/concurrent_queue_rep<Value>::n_queue & (queue.my_rep->items_per_page-1);
00704 if( i==queue.my_rep->items_per_page-1 ) {
00705 typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
00706 root = root->next;
00707 }
00708
00709 my_rep->head_counter = ++k;
00710 if( !my_rep->get_item(my_item, k) ) advance();
00711 }
00712
00714
00715 template<typename T> struct tbb_remove_cv {typedef T type;};
00716 template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
00717 template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
00718 template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
00719
00721
00723 template<typename Container, typename Value>
00724 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
00725 public std::iterator<std::forward_iterator_tag,Value> {
00726 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
00727 template<typename T, class A>
00728 friend class ::tbb::strict_ppl::concurrent_queue;
00729 #else
00730 public:
00731 #endif
00733 concurrent_queue_iterator( const concurrent_queue_base_v3<Value>& queue ) :
00734 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
00735 {
00736 }
00737
00738 public:
00739 concurrent_queue_iterator() {}
00740
00741 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
00742 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
00743 {}
00744
00746 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00747 assign(other);
00748 return *this;
00749 }
00750
00752 Value& operator*() const {
00753 return *static_cast<Value*>(this->my_item);
00754 }
00755
00756 Value* operator->() const {return &operator*();}
00757
00759 concurrent_queue_iterator& operator++() {
00760 this->advance();
00761 return *this;
00762 }
00763
00765 Value* operator++(int) {
00766 Value* result = &operator*();
00767 operator++();
00768 return result;
00769 }
00770 };
00771
00772
00773 template<typename C, typename T, typename U>
00774 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00775 return i.my_item==j.my_item;
00776 }
00777
00778 template<typename C, typename T, typename U>
00779 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00780 return i.my_item!=j.my_item;
00781 }
00782
00783 }
00784
00786
00787 }
00788
00790 namespace internal {
00791
00792 class concurrent_queue_rep;
00793 class concurrent_queue_iterator_rep;
00794 class concurrent_queue_iterator_base_v3;
00795 template<typename Container, typename Value> class concurrent_queue_iterator;
00796
00798
00800 class concurrent_queue_base_v3: no_copy {
00802 concurrent_queue_rep* my_rep;
00803
00804 friend class concurrent_queue_rep;
00805 friend struct micro_queue;
00806 friend class micro_queue_pop_finalizer;
00807 friend class concurrent_queue_iterator_rep;
00808 friend class concurrent_queue_iterator_base_v3;
00809 protected:
00811 struct page {
00812 page* next;
00813 uintptr_t mask;
00814 };
00815
00817 ptrdiff_t my_capacity;
00818
00820 size_t items_per_page;
00821
00823 size_t item_size;
00824
00825 #if __GNUC__==3&&__GNUC_MINOR__==3
00826 public:
00827 #endif
00828 template<typename T>
00829 struct padded_page: page {
00831 padded_page();
00833 void operator=( const padded_page& );
00835 T last;
00836 };
00837
00838 private:
00839 virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
00840 virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
00841 protected:
00842 __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
00843 virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
00844
00846 void __TBB_EXPORTED_METHOD internal_push( const void* src );
00847
00849 void __TBB_EXPORTED_METHOD internal_pop( void* dst );
00850
00852 bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
00853
00855
00856 bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
00857
00859 ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
00860
00862 bool __TBB_EXPORTED_METHOD internal_empty() const;
00863
00865 void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
00866
00868 virtual page *allocate_page() = 0;
00869
00871 virtual void deallocate_page( page *p ) = 0;
00872
00874
00875 void __TBB_EXPORTED_METHOD internal_finish_clear() ;
00876
00878 void __TBB_EXPORTED_METHOD internal_throw_exception() const;
00879
00881 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
00882
00883 private:
00884 virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
00885 };
00886
00888
00889 class concurrent_queue_iterator_base_v3 {
00891
00892 concurrent_queue_iterator_rep* my_rep;
00893
00894 template<typename C, typename T, typename U>
00895 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00896
00897 template<typename C, typename T, typename U>
00898 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
00899
00900 void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
00901 protected:
00903 void* my_item;
00904
00906 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
00907
00909 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
00910 assign(i);
00911 }
00912
00914
00915 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
00916
00918 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data );
00919
00921 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
00922
00924 void __TBB_EXPORTED_METHOD advance();
00925
00927 __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
00928 };
00929
00930 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
00931
00933
00935 template<typename Container, typename Value>
00936 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
00937 public std::iterator<std::forward_iterator_tag,Value> {
00938
00939 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
00940 template<typename T, class A>
00941 friend class ::tbb::concurrent_bounded_queue;
00942
00943 template<typename T, class A>
00944 friend class ::tbb::deprecated::concurrent_queue;
00945 #else
00946 public:
00947 #endif
00949 concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
00950 concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page<Value>,last))
00951 {
00952 }
00953
00954 public:
00955 concurrent_queue_iterator() {}
00956
00959 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
00960 concurrent_queue_iterator_base_v3(other)
00961 {}
00962
00964 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
00965 assign(other);
00966 return *this;
00967 }
00968
00970 Value& operator*() const {
00971 return *static_cast<Value*>(my_item);
00972 }
00973
00974 Value* operator->() const {return &operator*();}
00975
00977 concurrent_queue_iterator& operator++() {
00978 advance();
00979 return *this;
00980 }
00981
00983 Value* operator++(int) {
00984 Value* result = &operator*();
00985 operator++();
00986 return result;
00987 }
00988 };
00989
00990
00991 template<typename C, typename T, typename U>
00992 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00993 return i.my_item==j.my_item;
00994 }
00995
00996 template<typename C, typename T, typename U>
00997 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
00998 return i.my_item!=j.my_item;
00999 }
01000
01001 }
01002
01004
01005 }
01006
01007 #endif