Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
concurrent_queue.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB_concurrent_queue_H
18 #define __TBB_concurrent_queue_H
19 
22 
23 namespace tbb {
24 
25 namespace strict_ppl {
26 
28 
31 template<typename T, typename A = cache_aligned_allocator<T> >
32 class concurrent_queue: public internal::concurrent_queue_base_v3<T> {
33  template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
34 
38 
40  virtual void *allocate_block( size_t n ) __TBB_override {
41  void *b = reinterpret_cast<void*>(my_allocator.allocate( n ));
42  if( !b )
44  return b;
45  }
46 
48  virtual void deallocate_block( void *b, size_t n ) __TBB_override {
49  my_allocator.deallocate( reinterpret_cast<char*>(b), n );
50  }
51 
52  static void copy_construct_item(T* location, const void* src){
53  new (location) T(*static_cast<const T*>(src));
54  }
55 
56 #if __TBB_CPP11_RVALUE_REF_PRESENT
57  static void move_construct_item(T* location, const void* src) {
58  new (location) T( std::move(*static_cast<T*>(const_cast<void*>(src))) );
59  }
60 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
61 public:
63  typedef T value_type;
64 
66  typedef T& reference;
67 
69  typedef const T& const_reference;
70 
72  typedef size_t size_type;
73 
75  typedef ptrdiff_t difference_type;
76 
78  typedef A allocator_type;
79 
82  my_allocator( a )
83  {
84  }
85 
87  template<typename InputIterator>
88  concurrent_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
89  my_allocator( a )
90  {
91  for( ; begin != end; ++begin )
92  this->push(*begin);
93  }
94 
97  internal::concurrent_queue_base_v3<T>(), my_allocator( a )
98  {
99  this->assign( src, copy_construct_item );
100  }
101 
102 #if __TBB_CPP11_RVALUE_REF_PRESENT
105  internal::concurrent_queue_base_v3<T>(), my_allocator( std::move(src.my_allocator) )
106  {
107  this->internal_swap( src );
108  }
109 
111  internal::concurrent_queue_base_v3<T>(), my_allocator( a )
112  {
113  // checking that memory allocated by one instance of allocator can be deallocated
114  // with another
115  if( my_allocator == src.my_allocator) {
116  this->internal_swap( src );
117  } else {
118  // allocators are different => performing per-element move
119  this->assign( src, move_construct_item );
120  src.clear();
121  }
122  }
123 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
124 
127 
129  void push( const T& source ) {
130  this->internal_push( &source, copy_construct_item );
131  }
132 
133 #if __TBB_CPP11_RVALUE_REF_PRESENT
134  void push( T&& source ) {
135  this->internal_push( &source, move_construct_item );
136  }
137 
138 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
139  template<typename... Arguments>
140  void emplace( Arguments&&... args ) {
141  push( T(std::forward<Arguments>( args )...) );
142  }
143 #endif //__TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
144 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
145 
147 
149  bool try_pop( T& result ) {
150  return this->internal_try_pop( &result );
151  }
152 
154  size_type unsafe_size() const {return this->internal_size();}
155 
157  bool empty() const {return this->internal_empty();}
158 
160  void clear() ;
161 
163  allocator_type get_allocator() const { return this->my_allocator; }
164 
165  typedef internal::concurrent_queue_iterator<concurrent_queue,T> iterator;
166  typedef internal::concurrent_queue_iterator<concurrent_queue,const T> const_iterator;
167 
168  //------------------------------------------------------------------------
169  // The iterators are intended only for debugging. They are slow and not thread safe.
170  //------------------------------------------------------------------------
171  iterator unsafe_begin() {return iterator(*this);}
173  const_iterator unsafe_begin() const {return const_iterator(*this);}
175 } ;
176 
177 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
178 // Deduction guide for the constructor from two iterators
179 template<typename InputIterator,
180  typename T = typename std::iterator_traits<InputIterator>::value_type,
181  typename A = cache_aligned_allocator<T>
182 > concurrent_queue(InputIterator, InputIterator, const A& = A())
184 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
185 
186 template<typename T, class A>
188  clear();
189  this->internal_finish_clear();
190 }
191 
192 template<typename T, class A>
194  T value;
195  while( !empty() ) try_pop(value);
196 }
197 
198 } // namespace strict_ppl
199 
201 
206 template<typename T, class A = cache_aligned_allocator<T> >
207 class concurrent_bounded_queue: public internal::concurrent_queue_base_v8 {
208  template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
210 
213 
216 
218  class destroyer: internal::no_copy {
220  public:
223  };
224 
225  T& get_ref( page& p, size_t index ) {
226  __TBB_ASSERT( index<items_per_page, NULL );
227  return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
228  }
229 
230  virtual void copy_item( page& dst, size_t index, const void* src ) __TBB_override {
231  new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
232  }
233 
234 #if __TBB_CPP11_RVALUE_REF_PRESENT
235  virtual void move_item( page& dst, size_t index, const void* src ) __TBB_override {
236  new( &get_ref(dst,index) ) T( std::move(*static_cast<T*>(const_cast<void*>(src))) );
237  }
238 #else
239  virtual void move_item( page&, size_t, const void* ) __TBB_override {
240  __TBB_ASSERT( false, "Unreachable code" );
241  }
242 #endif
243 
244  virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) __TBB_override {
245  new( &get_ref(dst,dindex) ) T( get_ref( const_cast<page&>(src), sindex ) );
246  }
247 
248 #if __TBB_CPP11_RVALUE_REF_PRESENT
249  virtual void move_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) __TBB_override {
250  new( &get_ref(dst,dindex) ) T( std::move(get_ref( const_cast<page&>(src), sindex )) );
251  }
252 #else
253  virtual void move_page_item( page&, size_t, const page&, size_t ) __TBB_override {
254  __TBB_ASSERT( false, "Unreachable code" );
255  }
256 #endif
257 
258  virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) __TBB_override {
259  T& from = get_ref(src,index);
260  destroyer d(from);
261  *static_cast<T*>(dst) = tbb::internal::move( from );
262  }
263 
265  size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
266  page *p = reinterpret_cast<page*>(my_allocator.allocate( n ));
267  if( !p )
269  return p;
270  }
271 
273  size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
274  my_allocator.deallocate( reinterpret_cast<char*>(p), n );
275  }
276 
277 public:
279  typedef T value_type;
280 
282  typedef A allocator_type;
283 
285  typedef T& reference;
286 
288  typedef const T& const_reference;
289 
291 
293  typedef std::ptrdiff_t size_type;
294 
296  typedef std::ptrdiff_t difference_type;
297 
300  concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
301  {
302  }
303 
306  : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
307  {
308  assign( src );
309  }
310 
311 #if __TBB_CPP11_RVALUE_REF_PRESENT
314  : concurrent_queue_base_v8( sizeof(T) ), my_allocator( std::move(src.my_allocator) )
315  {
316  internal_swap( src );
317  }
318 
320  : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
321  {
322  // checking that memory allocated by one instance of allocator can be deallocated
323  // with another
324  if( my_allocator == src.my_allocator) {
325  this->internal_swap( src );
326  } else {
327  // allocators are different => performing per-element move
328  this->move_content( src );
329  src.clear();
330  }
331  }
332 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
333 
335  template<typename InputIterator>
336  concurrent_bounded_queue( InputIterator begin, InputIterator end,
337  const allocator_type& a = allocator_type())
338  : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
339  {
340  for( ; begin != end; ++begin )
342  }
343 
346 
348  void push( const T& source ) {
349  internal_push( &source );
350  }
351 
352 #if __TBB_CPP11_RVALUE_REF_PRESENT
353  void push( T&& source ) {
355  internal_push_move( &source );
356  }
357 
358 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
359  template<typename... Arguments>
360  void emplace( Arguments&&... args ) {
361  push( T(std::forward<Arguments>( args )...) );
362  }
363 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
364 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
365 
367 
368  void pop( T& destination ) {
369  internal_pop( &destination );
370  }
371 
372 #if TBB_USE_EXCEPTIONS
373  void abort() {
375  internal_abort();
376  }
377 #endif
378 
380 
382  bool try_push( const T& source ) {
383  return internal_push_if_not_full( &source );
384  }
385 
386 #if __TBB_CPP11_RVALUE_REF_PRESENT
387 
390  bool try_push( T&& source ) {
391  return internal_push_move_if_not_full( &source );
392  }
393 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
394  template<typename... Arguments>
395  bool try_emplace( Arguments&&... args ) {
396  return try_push( T(std::forward<Arguments>( args )...) );
397  }
398 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
399 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
400 
402 
404  bool try_pop( T& destination ) {
405  return internal_pop_if_present( &destination );
406  }
407 
409 
412  size_type size() const {return internal_size();}
413 
415  bool empty() const {return internal_empty();}
416 
418  size_type capacity() const {
419  return my_capacity;
420  }
421 
423 
425  void set_capacity( size_type new_capacity ) {
426  internal_set_capacity( new_capacity, sizeof(T) );
427  }
428 
430  allocator_type get_allocator() const { return this->my_allocator; }
431 
433  void clear() ;
434 
435  typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,T> iterator;
436  typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,const T> const_iterator;
437 
438  //------------------------------------------------------------------------
439  // The iterators are intended only for debugging. They are slow and not thread safe.
440  //------------------------------------------------------------------------
441  iterator unsafe_begin() {return iterator(*this);}
443  const_iterator unsafe_begin() const {return const_iterator(*this);}
445 
446 };
447 
448 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
449 // guide for concurrent_bounded_queue(InputIterator, InputIterator, ...)
450 template<typename InputIterator,
451  typename T = typename std::iterator_traits<InputIterator>::value_type,
452  typename A = cache_aligned_allocator<T>
453 > concurrent_bounded_queue(InputIterator, InputIterator, const A& = A())
454 -> concurrent_bounded_queue<T, A>;
455 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
456 
457 template<typename T, class A>
459  clear();
460  internal_finish_clear();
461 }
462 
463 template<typename T, class A>
465  T value;
466  while( try_pop(value) ) /*noop*/;
467 }
468 
470 
471 } // namespace tbb
472 
473 #endif /* __TBB_concurrent_queue_H */
internal::concurrent_queue_iterator< concurrent_bounded_queue, T > iterator
concurrent_bounded_queue(const allocator_type &a=allocator_type())
Construct empty queue.
virtual void move_page_item(page &dst, size_t dindex, const page &src, size_t sindex) __TBB_override
const_iterator unsafe_begin() const
#define __TBB_override
Definition: tbb_stddef.h:240
bool internal_try_pop(void *dst)
Attempt to dequeue item from queue.
internal::concurrent_queue_iterator< concurrent_queue, const T > const_iterator
virtual void deallocate_block(void *b, size_t n) __TBB_override
Deallocates block created by allocate_block.
bool internal_empty() const
check if the queue is empty; thread safe
auto last(Container &c) -> decltype(begin(c))
bool empty() const
Equivalent to size()==0.
static void move_construct_item(T *location, const void *src)
void push(const T &source)
Enqueue an item at tail of queue.
bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full(const void *src)
Attempt to enqueue item onto queue using move operation.
bool try_push(const T &source)
Enqueue an item at tail of queue if queue is not already full.
ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const
Get size of queue.
void emplace(Arguments &&... args)
bool try_emplace(Arguments &&... args)
void push(const T &source)
Enqueue an item at tail of queue.
ptrdiff_t difference_type
Difference type for iterator.
tbb::internal::allocator_rebind< A, char >::type page_allocator_type
virtual void assign_and_destroy_item(void *dst, page &src, size_t index) __TBB_override
virtual void * allocate_block(size_t n) __TBB_override
Allocates a block of size n (bytes)
void __TBB_EXPORTED_METHOD assign(const concurrent_queue_base_v3 &src)
copy internal representation
T & get_ref(page &p, size_t index)
static void copy_construct_item(T *location, const void *src)
T value_type
Element type in the queue.
tbb::internal::allocator_rebind< A, char >::type page_allocator_type
Allocator type.
size_type capacity() const
Maximum number of allowed elements.
The graph class.
bool __TBB_EXPORTED_METHOD internal_push_if_not_full(const void *src)
Attempt to enqueue item onto queue using copy operation.
~concurrent_bounded_queue()
Destroy queue.
concurrent_bounded_queue(concurrent_bounded_queue &&src, const allocator_type &a)
concurrent_queue_base_v3::copy_specifics copy_specifics
virtual void copy_page_item(page &dst, size_t dindex, const page &src, size_t sindex) __TBB_override
void clear()
clear the queue. not thread-safe.
const_iterator unsafe_begin() const
friend class internal::concurrent_queue_iterator
concurrent_queue(const allocator_type &a=allocator_type())
Construct empty queue.
void __TBB_EXPORTED_METHOD internal_push(const void *src)
Enqueue item at tail of queue using copy operation.
size_t size_type
Integral type for representing size of the queue.
std::ptrdiff_t size_type
Integral type for representing size of the queue.
allocator_type get_allocator() const
return allocator object
concurrent_queue(const concurrent_queue &src, const allocator_type &a=allocator_type())
Copy constructor.
void __TBB_EXPORTED_METHOD move_content(concurrent_queue_base_v8 &src)
move items
A high-performance thread-safe blocking concurrent bounded queue.
concurrent_bounded_queue(InputIterator begin, InputIterator end, const allocator_type &a=allocator_type())
[begin,end) constructor
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
const_iterator unsafe_end() const
void __TBB_EXPORTED_METHOD internal_push_move(const void *src)
Enqueue item at tail of queue using move operation.
void set_capacity(size_type new_capacity)
Set the capacity.
bool try_pop(T &result)
Attempt to dequeue an item from head of queue.
virtual void move_item(page &dst, size_t index, const void *src) __TBB_override
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
virtual void copy_item(page &dst, size_t index, const void *src) __TBB_override
void internal_push(const void *src, item_constructor_t construct_item)
Enqueue item at tail of queue.
virtual page * allocate_page() __TBB_override
custom allocator
void internal_swap(concurrent_queue_base_v3 &src)
swap internal representation
const T & const_reference
Const reference type.
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:305
size_type size() const
Return number of pushes minus number of pops.
size_type unsafe_size() const
Return the number of items in the queue; thread unsafe.
allocator_traits< Alloc >::template rebind_alloc< T >::other type
bool try_push(T &&source)
Move an item at tail of queue if queue is not already full.
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
void __TBB_EXPORTED_METHOD internal_pop(void *dst)
Dequeue item from head of queue.
concurrent_queue(InputIterator begin, InputIterator end, const allocator_type &a=allocator_type())
[begin,end) constructor
size_t internal_size() const
Get size of queue; result may be invalid if queue is modified concurrently.
internal::concurrent_queue_iterator< concurrent_queue, T > iterator
A high-performance thread-safe non-blocking concurrent queue.
bool __TBB_EXPORTED_METHOD internal_pop_if_present(void *dst)
Attempt to dequeue item from queue.
bool empty() const
Equivalent to size()<=0.
allocator_type get_allocator() const
Return allocator object.
void assign(const concurrent_queue_base_v3 &src, item_constructor_t construct_item)
copy or move internal representation
friend class internal::concurrent_queue_iterator
void clear()
Clear the queue. not thread-safe.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
void const char const char int ITT_FORMAT __itt_group_sync p
Class used to ensure exception-safety of method "pop".
void pop(T &destination)
Dequeue item from head of queue.
concurrent_queue(concurrent_queue &&src, const allocator_type &a)
void internal_swap(concurrent_queue_base_v3 &src)
swap queues
void emplace(Arguments &&... args)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp end
internal::concurrent_queue_iterator< concurrent_bounded_queue, const T > const_iterator
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
concurrent_bounded_queue(const concurrent_bounded_queue &src, const allocator_type &a=allocator_type())
Copy constructor.
const_iterator unsafe_end() const
std::ptrdiff_t difference_type
Difference type for iterator.
concurrent_queue_base_v3::padded_page< T > padded_page
bool __TBB_EXPORTED_METHOD internal_empty() const
Check if the queue is empty.
ptrdiff_t my_capacity
Capacity of the queue.
void __TBB_EXPORTED_METHOD internal_abort()
Abort all pending queue operations.
T value_type
Element type in the queue.
bool try_pop(T &destination)
Attempt to dequeue an item from head of queue.
void __TBB_EXPORTED_METHOD internal_set_capacity(ptrdiff_t capacity, size_t element_size)
Set the queue capacity.
const T & const_reference
Const reference type.
virtual void deallocate_page(page *p) __TBB_override
custom de-allocator
page_allocator_type my_allocator
Allocator type.

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.