1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #ifndef __TBB__aggregator_H |
18 | #define __TBB__aggregator_H |
19 | |
20 | #if !TBB_PREVIEW_AGGREGATOR |
21 | #error Set TBB_PREVIEW_AGGREGATOR before including aggregator.h |
22 | #endif |
23 | |
24 | #include "atomic.h" |
25 | #include "tbb_profiling.h" |
26 | |
27 | namespace tbb { |
28 | namespace interface6 { |
29 | |
30 | using namespace tbb::internal; |
31 | |
32 | class aggregator_operation { |
33 | template<typename handler_type> friend class aggregator_ext; |
34 | uintptr_t status; |
35 | aggregator_operation* my_next; |
36 | public: |
37 | enum aggregator_operation_status { agg_waiting=0, agg_finished }; |
38 | aggregator_operation() : status(agg_waiting), my_next(NULL) {} |
39 | /// Call start before handling this operation |
40 | void start() { call_itt_notify(acquired, &status); } |
41 | /// Call finish when done handling this operation |
42 | /** The operation will be released to its originating thread, and possibly deleted. */ |
43 | void finish() { itt_store_word_with_release(status, uintptr_t(agg_finished)); } |
44 | aggregator_operation* next() { return itt_hide_load_word(my_next);} |
45 | void set_next(aggregator_operation* n) { itt_hide_store_word(my_next, n); } |
46 | }; |
47 | |
48 | namespace internal { |
49 | |
50 | class basic_operation_base : public aggregator_operation { |
51 | friend class basic_handler; |
52 | virtual void apply_body() = 0; |
53 | public: |
54 | basic_operation_base() : aggregator_operation() {} |
55 | virtual ~basic_operation_base() {} |
56 | }; |
57 | |
58 | template<typename Body> |
59 | class basic_operation : public basic_operation_base, no_assign { |
60 | const Body& my_body; |
61 | void apply_body() __TBB_override { my_body(); } |
62 | public: |
63 | basic_operation(const Body& b) : basic_operation_base(), my_body(b) {} |
64 | }; |
65 | |
66 | class basic_handler { |
67 | public: |
68 | basic_handler() {} |
69 | void operator()(aggregator_operation* op_list) const { |
70 | while (op_list) { |
71 | // ITT note: &(op_list->status) tag is used to cover accesses to the operation data. |
72 | // The executing thread "acquires" the tag (see start()) and then performs |
73 | // the associated operation w/o triggering a race condition diagnostics. |
74 | // A thread that created the operation is waiting for its status (see execute_impl()), |
75 | // so when this thread is done with the operation, it will "release" the tag |
76 | // and update the status (see finish()) to give control back to the waiting thread. |
77 | basic_operation_base& request = static_cast<basic_operation_base&>(*op_list); |
78 | // IMPORTANT: need to advance op_list to op_list->next() before calling request.finish() |
79 | op_list = op_list->next(); |
80 | request.start(); |
81 | request.apply_body(); |
82 | request.finish(); |
83 | } |
84 | } |
85 | }; |
86 | |
87 | } // namespace internal |
88 | |
89 | //! Aggregator base class and expert interface |
90 | /** An aggregator for collecting operations coming from multiple sources and executing |
91 | them serially on a single thread. */ |
92 | template <typename handler_type> |
93 | class aggregator_ext : tbb::internal::no_copy { |
94 | public: |
95 | aggregator_ext(const handler_type& h) : handler_busy(0), handle_operations(h) { mailbox = NULL; } |
96 | |
97 | //! EXPERT INTERFACE: Enter a user-made operation into the aggregator's mailbox. |
98 | /** Details of user-made operations must be handled by user-provided handler */ |
99 | void process(aggregator_operation *op) { execute_impl(*op); } |
100 | |
101 | protected: |
102 | /** Place operation in mailbox, then either handle mailbox or wait for the operation |
103 | to be completed by a different thread. */ |
104 | void execute_impl(aggregator_operation& op) { |
105 | aggregator_operation* res; |
106 | |
107 | // ITT note: &(op.status) tag is used to cover accesses to this operation. This |
108 | // thread has created the operation, and now releases it so that the handler |
109 | // thread may handle the associated operation w/o triggering a race condition; |
110 | // thus this tag will be acquired just before the operation is handled in the |
111 | // handle_operations functor. |
112 | call_itt_notify(releasing, &(op.status)); |
113 | // insert the operation into the list |
114 | do { |
115 | // ITT may flag the following line as a race; it is a false positive: |
116 | // This is an atomic read; we don't provide itt_hide_load_word for atomics |
117 | op.my_next = res = mailbox; // NOT A RACE |
118 | } while (mailbox.compare_and_swap(&op, res) != res); |
119 | if (!res) { // first in the list; handle the operations |
120 | // ITT note: &mailbox tag covers access to the handler_busy flag, which this |
121 | // waiting handler thread will try to set before entering handle_operations. |
122 | call_itt_notify(acquired, &mailbox); |
123 | start_handle_operations(); |
124 | __TBB_ASSERT(op.status, NULL); |
125 | } |
126 | else { // not first; wait for op to be ready |
127 | call_itt_notify(prepare, &(op.status)); |
128 | spin_wait_while_eq(op.status, uintptr_t(aggregator_operation::agg_waiting)); |
129 | itt_load_word_with_acquire(op.status); |
130 | } |
131 | } |
132 | |
133 | |
134 | private: |
135 | //! An atomically updated list (aka mailbox) of aggregator_operations |
136 | atomic<aggregator_operation *> mailbox; |
137 | |
138 | //! Controls thread access to handle_operations |
139 | /** Behaves as boolean flag where 0=false, 1=true */ |
140 | uintptr_t handler_busy; |
141 | |
142 | handler_type handle_operations; |
143 | |
144 | //! Trigger the handling of operations when the handler is free |
145 | void start_handle_operations() { |
146 | aggregator_operation *pending_operations; |
147 | |
148 | // ITT note: &handler_busy tag covers access to mailbox as it is passed |
149 | // between active and waiting handlers. Below, the waiting handler waits until |
150 | // the active handler releases, and the waiting handler acquires &handler_busy as |
151 | // it becomes the active_handler. The release point is at the end of this |
152 | // function, when all operations in mailbox have been handled by the |
153 | // owner of this aggregator. |
154 | call_itt_notify(prepare, &handler_busy); |
155 | // get handler_busy: only one thread can possibly spin here at a time |
156 | spin_wait_until_eq(handler_busy, uintptr_t(0)); |
157 | call_itt_notify(acquired, &handler_busy); |
158 | // acquire fence not necessary here due to causality rule and surrounding atomics |
159 | __TBB_store_with_release(handler_busy, uintptr_t(1)); |
160 | |
161 | // ITT note: &mailbox tag covers access to the handler_busy flag itself. |
162 | // Capturing the state of the mailbox signifies that handler_busy has been |
163 | // set and a new active handler will now process that list's operations. |
164 | call_itt_notify(releasing, &mailbox); |
165 | // grab pending_operations |
166 | pending_operations = mailbox.fetch_and_store(NULL); |
167 | |
168 | // handle all the operations |
169 | handle_operations(pending_operations); |
170 | |
171 | // release the handler |
172 | itt_store_word_with_release(handler_busy, uintptr_t(0)); |
173 | } |
174 | }; |
175 | |
176 | //! Basic aggregator interface |
177 | class aggregator : private aggregator_ext<internal::basic_handler> { |
178 | public: |
179 | aggregator() : aggregator_ext<internal::basic_handler>(internal::basic_handler()) {} |
180 | //! BASIC INTERFACE: Enter a function for exclusive execution by the aggregator. |
181 | /** The calling thread stores the function object in a basic_operation and |
182 | places the operation in the aggregator's mailbox */ |
183 | template<typename Body> |
184 | void execute(const Body& b) { |
185 | internal::basic_operation<Body> op(b); |
186 | this->execute_impl(op); |
187 | } |
188 | }; |
189 | |
190 | } // namespace interface6 |
191 | |
192 | using interface6::aggregator; |
193 | using interface6::aggregator_ext; |
194 | using interface6::aggregator_operation; |
195 | |
196 | } // namespace tbb |
197 | |
198 | #endif // __TBB__aggregator_H |
199 | |