1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef __TBB__aggregator_H
18#define __TBB__aggregator_H
19
20#if !TBB_PREVIEW_AGGREGATOR
21#error Set TBB_PREVIEW_AGGREGATOR before including aggregator.h
22#endif
23
24#include "atomic.h"
25#include "tbb_profiling.h"
26
27namespace tbb {
28namespace interface6 {
29
30using namespace tbb::internal;
31
32class aggregator_operation {
33 template<typename handler_type> friend class aggregator_ext;
34 uintptr_t status;
35 aggregator_operation* my_next;
36public:
37 enum aggregator_operation_status { agg_waiting=0, agg_finished };
38 aggregator_operation() : status(agg_waiting), my_next(NULL) {}
39 /// Call start before handling this operation
40 void start() { call_itt_notify(acquired, &status); }
41 /// Call finish when done handling this operation
42 /** The operation will be released to its originating thread, and possibly deleted. */
43 void finish() { itt_store_word_with_release(status, uintptr_t(agg_finished)); }
44 aggregator_operation* next() { return itt_hide_load_word(my_next);}
45 void set_next(aggregator_operation* n) { itt_hide_store_word(my_next, n); }
46};
47
48namespace internal {
49
50class basic_operation_base : public aggregator_operation {
51 friend class basic_handler;
52 virtual void apply_body() = 0;
53public:
54 basic_operation_base() : aggregator_operation() {}
55 virtual ~basic_operation_base() {}
56};
57
58template<typename Body>
59class basic_operation : public basic_operation_base, no_assign {
60 const Body& my_body;
61 void apply_body() __TBB_override { my_body(); }
62public:
63 basic_operation(const Body& b) : basic_operation_base(), my_body(b) {}
64};
65
66class basic_handler {
67public:
68 basic_handler() {}
69 void operator()(aggregator_operation* op_list) const {
70 while (op_list) {
71 // ITT note: &(op_list->status) tag is used to cover accesses to the operation data.
72 // The executing thread "acquires" the tag (see start()) and then performs
73 // the associated operation w/o triggering a race condition diagnostics.
74 // A thread that created the operation is waiting for its status (see execute_impl()),
75 // so when this thread is done with the operation, it will "release" the tag
76 // and update the status (see finish()) to give control back to the waiting thread.
77 basic_operation_base& request = static_cast<basic_operation_base&>(*op_list);
78 // IMPORTANT: need to advance op_list to op_list->next() before calling request.finish()
79 op_list = op_list->next();
80 request.start();
81 request.apply_body();
82 request.finish();
83 }
84 }
85};
86
87} // namespace internal
88
89//! Aggregator base class and expert interface
90/** An aggregator for collecting operations coming from multiple sources and executing
91 them serially on a single thread. */
92template <typename handler_type>
93class aggregator_ext : tbb::internal::no_copy {
94public:
95 aggregator_ext(const handler_type& h) : handler_busy(0), handle_operations(h) { mailbox = NULL; }
96
97 //! EXPERT INTERFACE: Enter a user-made operation into the aggregator's mailbox.
98 /** Details of user-made operations must be handled by user-provided handler */
99 void process(aggregator_operation *op) { execute_impl(*op); }
100
101protected:
102 /** Place operation in mailbox, then either handle mailbox or wait for the operation
103 to be completed by a different thread. */
104 void execute_impl(aggregator_operation& op) {
105 aggregator_operation* res;
106
107 // ITT note: &(op.status) tag is used to cover accesses to this operation. This
108 // thread has created the operation, and now releases it so that the handler
109 // thread may handle the associated operation w/o triggering a race condition;
110 // thus this tag will be acquired just before the operation is handled in the
111 // handle_operations functor.
112 call_itt_notify(releasing, &(op.status));
113 // insert the operation into the list
114 do {
115 // ITT may flag the following line as a race; it is a false positive:
116 // This is an atomic read; we don't provide itt_hide_load_word for atomics
117 op.my_next = res = mailbox; // NOT A RACE
118 } while (mailbox.compare_and_swap(&op, res) != res);
119 if (!res) { // first in the list; handle the operations
120 // ITT note: &mailbox tag covers access to the handler_busy flag, which this
121 // waiting handler thread will try to set before entering handle_operations.
122 call_itt_notify(acquired, &mailbox);
123 start_handle_operations();
124 __TBB_ASSERT(op.status, NULL);
125 }
126 else { // not first; wait for op to be ready
127 call_itt_notify(prepare, &(op.status));
128 spin_wait_while_eq(op.status, uintptr_t(aggregator_operation::agg_waiting));
129 itt_load_word_with_acquire(op.status);
130 }
131 }
132
133
134private:
135 //! An atomically updated list (aka mailbox) of aggregator_operations
136 atomic<aggregator_operation *> mailbox;
137
138 //! Controls thread access to handle_operations
139 /** Behaves as boolean flag where 0=false, 1=true */
140 uintptr_t handler_busy;
141
142 handler_type handle_operations;
143
144 //! Trigger the handling of operations when the handler is free
145 void start_handle_operations() {
146 aggregator_operation *pending_operations;
147
148 // ITT note: &handler_busy tag covers access to mailbox as it is passed
149 // between active and waiting handlers. Below, the waiting handler waits until
150 // the active handler releases, and the waiting handler acquires &handler_busy as
151 // it becomes the active_handler. The release point is at the end of this
152 // function, when all operations in mailbox have been handled by the
153 // owner of this aggregator.
154 call_itt_notify(prepare, &handler_busy);
155 // get handler_busy: only one thread can possibly spin here at a time
156 spin_wait_until_eq(handler_busy, uintptr_t(0));
157 call_itt_notify(acquired, &handler_busy);
158 // acquire fence not necessary here due to causality rule and surrounding atomics
159 __TBB_store_with_release(handler_busy, uintptr_t(1));
160
161 // ITT note: &mailbox tag covers access to the handler_busy flag itself.
162 // Capturing the state of the mailbox signifies that handler_busy has been
163 // set and a new active handler will now process that list's operations.
164 call_itt_notify(releasing, &mailbox);
165 // grab pending_operations
166 pending_operations = mailbox.fetch_and_store(NULL);
167
168 // handle all the operations
169 handle_operations(pending_operations);
170
171 // release the handler
172 itt_store_word_with_release(handler_busy, uintptr_t(0));
173 }
174};
175
176//! Basic aggregator interface
177class aggregator : private aggregator_ext<internal::basic_handler> {
178public:
179 aggregator() : aggregator_ext<internal::basic_handler>(internal::basic_handler()) {}
180 //! BASIC INTERFACE: Enter a function for exclusive execution by the aggregator.
181 /** The calling thread stores the function object in a basic_operation and
182 places the operation in the aggregator's mailbox */
183 template<typename Body>
184 void execute(const Body& b) {
185 internal::basic_operation<Body> op(b);
186 this->execute_impl(op);
187 }
188};
189
190} // namespace interface6
191
192using interface6::aggregator;
193using interface6::aggregator_ext;
194using interface6::aggregator_operation;
195
196} // namespace tbb
197
198#endif // __TBB__aggregator_H
199