00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB__aggregator_H
00022 #define __TBB__aggregator_H
00023
00024 #if !TBB_PREVIEW_AGGREGATOR
00025 #error Set TBB_PREVIEW_AGGREGATOR before including aggregator.h
00026 #endif
00027
00028 #include "atomic.h"
00029 #include "tbb_profiling.h"
00030
00031 namespace tbb {
00032 namespace interface6 {
00033
00034 using namespace tbb::internal;
00035
00036 class aggregator_operation {
00037 template<typename handler_type> friend class aggregator_ext;
00038 uintptr_t status;
00039 aggregator_operation* my_next;
00040 public:
00041 enum aggregator_operation_status { agg_waiting=0, agg_finished };
00042 aggregator_operation() : status(agg_waiting), my_next(NULL) {}
00044 void start() { call_itt_notify(acquired, &status); }
00046
00047 void finish() { itt_store_word_with_release(status, uintptr_t(agg_finished)); }
00048 aggregator_operation* next() { return itt_hide_load_word(my_next);}
00049 void set_next(aggregator_operation* n) { itt_hide_store_word(my_next, n); }
00050 };
00051
00052 namespace internal {
00053
00054 class basic_operation_base : public aggregator_operation {
00055 friend class basic_handler;
00056 virtual void apply_body() = 0;
00057 public:
00058 basic_operation_base() : aggregator_operation() {}
00059 virtual ~basic_operation_base() {}
00060 };
00061
00062 template<typename Body>
00063 class basic_operation : public basic_operation_base, no_assign {
00064 const Body& my_body;
00065 void apply_body() { my_body(); }
00066 public:
00067 basic_operation(const Body& b) : basic_operation_base(), my_body(b) {}
00068 };
00069
00070 class basic_handler {
00071 public:
00072 basic_handler() {}
00073 void operator()(aggregator_operation* op_list) const {
00074 while (op_list) {
00075
00076
00077
00078
00079
00080
00081 basic_operation_base& request = static_cast<basic_operation_base&>(*op_list);
00082
00083 op_list = op_list->next();
00084 request.start();
00085 request.apply_body();
00086 request.finish();
00087 }
00088 }
00089 };
00090
00091 }
00092
00094
00096 template <typename handler_type>
00097 class aggregator_ext : tbb::internal::no_copy {
00098 public:
00099 aggregator_ext(const handler_type& h) : handler_busy(0), handle_operations(h) { mailbox = NULL; }
00100
00102
00103 void process(aggregator_operation *op) { execute_impl(*op); }
00104
00105 protected:
00108 void execute_impl(aggregator_operation& op) {
00109 aggregator_operation* res;
00110
00111
00112
00113
00114
00115
00116 call_itt_notify(releasing, &(op.status));
00117
00118 do {
00119
00120
00121 op.my_next = res = mailbox;
00122 } while (mailbox.compare_and_swap(&op, res) != res);
00123 if (!res) {
00124
00125
00126 call_itt_notify(acquired, &mailbox);
00127 start_handle_operations();
00128 __TBB_ASSERT(op.status, NULL);
00129 }
00130 else {
00131 call_itt_notify(prepare, &(op.status));
00132 spin_wait_while_eq(op.status, uintptr_t(aggregator_operation::agg_waiting));
00133 itt_load_word_with_acquire(op.status);
00134 }
00135 }
00136
00137
00138 private:
00140 atomic<aggregator_operation *> mailbox;
00141
00143
00144 uintptr_t handler_busy;
00145
00146 handler_type handle_operations;
00147
00149 void start_handle_operations() {
00150 aggregator_operation *pending_operations;
00151
00152
00153
00154
00155
00156
00157
00158 call_itt_notify(prepare, &handler_busy);
00159
00160 spin_wait_until_eq(handler_busy, uintptr_t(0));
00161 call_itt_notify(acquired, &handler_busy);
00162
00163 __TBB_store_with_release(handler_busy, uintptr_t(1));
00164
00165
00166
00167
00168 call_itt_notify(releasing, &mailbox);
00169
00170 pending_operations = mailbox.fetch_and_store(NULL);
00171
00172
00173 handle_operations(pending_operations);
00174
00175
00176 itt_store_word_with_release(handler_busy, uintptr_t(0));
00177 }
00178 };
00179
00181 class aggregator : private aggregator_ext<internal::basic_handler> {
00182 public:
00183 aggregator() : aggregator_ext<internal::basic_handler>(internal::basic_handler()) {}
00185
00187 template<typename Body>
00188 void execute(const Body& b) {
00189 internal::basic_operation<Body> op(b);
00190 this->execute_impl(op);
00191 }
00192 };
00193
00194 }
00195
00196 using interface6::aggregator;
00197 using interface6::aggregator_ext;
00198 using interface6::aggregator_operation;
00199
00200 }
00201
00202 #endif // __TBB__aggregator_H