1 | #pragma once |
2 | |
3 | #include <list> |
4 | #include <queue> |
5 | #include <atomic> |
6 | #include <thread> |
7 | #include <mutex> |
8 | |
9 | #include <common/logger_useful.h> |
10 | |
11 | #include <DataStreams/IBlockStream_fwd.h> |
12 | #include <Common/setThreadName.h> |
13 | #include <Common/CurrentMetrics.h> |
14 | #include <Common/CurrentThread.h> |
15 | #include <Common/ThreadPool.h> |
16 | |
17 | |
18 | /** Allows to process multiple block input streams (sources) in parallel, using specified number of threads. |
19 | * Reads (pulls) blocks from any available source and passes it to specified handler. |
20 | * |
21 | * Before any reading, calls "readPrefix" method of sources in parallel. |
22 | * |
23 | * (As an example, "readPrefix" can prepare connections to remote servers, |
24 | * and we want this work to be executed in parallel for different sources) |
25 | * |
26 | * Implemented in following way: |
27 | * - there are multiple input sources to read blocks from; |
28 | * - there are multiple threads, that could simultaneously read blocks from different sources; |
29 | * - "available" sources (that are not read in any thread right now) are put in queue of sources; |
30 | * - when thread take a source to read from, it removes source from queue of sources, |
31 | * then read block from source and then put source back to queue of available sources. |
32 | */ |
33 | |
34 | namespace CurrentMetrics |
35 | { |
36 | extern const Metric QueryThread; |
37 | } |
38 | |
39 | namespace DB |
40 | { |
41 | |
42 | /// Example of the handler. |
43 | struct ParallelInputsHandler |
44 | { |
45 | /// Processing the data block. |
46 | void onBlock(Block & /*block*/, size_t /*thread_num*/) {} |
47 | |
48 | /// Called for each thread, when the thread has nothing else to do. |
49 | /// Due to the fact that part of the sources has run out, and now there are fewer sources left than streams. |
50 | /// Called if the `onException` method does not throw an exception; is called before the `onFinish` method. |
51 | void onFinishThread(size_t /*thread_num*/) {} |
52 | |
53 | /// Blocks are over. Due to the fact that all sources ran out or because of the cancellation of work. |
54 | /// This method is always called exactly once, at the end of the work, if the `onException` method does not throw an exception. |
55 | void onFinish() {} |
56 | |
57 | /// Exception handling. It is reasonable to call the ParallelInputsProcessor::cancel method in this method, and also pass the exception to the main thread. |
58 | void onException(std::exception_ptr & /*exception*/, size_t /*thread_num*/) {} |
59 | }; |
60 | |
61 | |
62 | template <typename Handler> |
63 | class ParallelInputsProcessor |
64 | { |
65 | public: |
66 | /** additional_input_at_end - if not nullptr, |
67 | * then the blocks from this source will start to be processed only after all other sources are processed. |
68 | * This is done in the main thread. |
69 | * |
70 | * Intended for implementation of FULL and RIGHT JOIN |
71 | * - where you must first make JOIN in parallel, while noting which keys are not found, |
72 | * and only after the completion of this work, create blocks of keys that are not found. |
73 | */ |
74 | ParallelInputsProcessor(const BlockInputStreams & inputs_, const BlockInputStreamPtr & additional_input_at_end_, size_t max_threads_, Handler & handler_) |
75 | : inputs(inputs_), additional_input_at_end(additional_input_at_end_), max_threads(std::min(inputs_.size(), max_threads_)), handler(handler_) |
76 | { |
77 | for (size_t i = 0; i < inputs_.size(); ++i) |
78 | unprepared_inputs.emplace(inputs_[i], i); |
79 | } |
80 | |
81 | ~ParallelInputsProcessor() |
82 | { |
83 | try |
84 | { |
85 | wait(); |
86 | } |
87 | catch (...) |
88 | { |
89 | tryLogCurrentException(__PRETTY_FUNCTION__); |
90 | } |
91 | } |
92 | |
93 | /// Start background threads, start work. |
94 | void process() |
95 | { |
96 | active_threads = max_threads; |
97 | threads.reserve(max_threads); |
98 | |
99 | try |
100 | { |
101 | for (size_t i = 0; i < max_threads; ++i) |
102 | threads.emplace_back(&ParallelInputsProcessor::thread, this, CurrentThread::getGroup(), i); |
103 | } |
104 | catch (...) |
105 | { |
106 | cancel(false); |
107 | wait(); |
108 | if (active_threads) |
109 | { |
110 | active_threads = 0; |
111 | /// handler.onFinish() is supposed to be called from one of the threads when the number of |
112 | /// finished threads reaches max_threads. But since we weren't able to launch all threads, |
113 | /// we have to call onFinish() manually here. |
114 | handler.onFinish(); |
115 | } |
116 | throw; |
117 | } |
118 | } |
119 | |
120 | /// Ask all sources to stop earlier than they run out. |
121 | void cancel(bool kill) |
122 | { |
123 | finish = true; |
124 | |
125 | for (auto & input : inputs) |
126 | { |
127 | try |
128 | { |
129 | input->cancel(kill); |
130 | } |
131 | catch (...) |
132 | { |
133 | /** If you can not ask one or more sources to stop. |
134 | * (for example, the connection is broken for distributed query processing) |
135 | * - then do not care. |
136 | */ |
137 | LOG_ERROR(log, "Exception while cancelling " << input->getName()); |
138 | } |
139 | } |
140 | } |
141 | |
142 | /// Wait until all threads are finished, before the destructor. |
143 | void wait() |
144 | { |
145 | if (joined_threads) |
146 | return; |
147 | |
148 | for (auto & thread : threads) |
149 | thread.join(); |
150 | |
151 | threads.clear(); |
152 | joined_threads = true; |
153 | } |
154 | |
155 | size_t getNumActiveThreads() const |
156 | { |
157 | return active_threads; |
158 | } |
159 | |
160 | private: |
161 | /// Single source data |
162 | struct InputData |
163 | { |
164 | BlockInputStreamPtr in; |
165 | size_t i = 0; /// The source number (for debugging). |
166 | |
167 | InputData() {} |
168 | InputData(const BlockInputStreamPtr & in_, size_t i_) : in(in_), i(i_) {} |
169 | }; |
170 | |
171 | void publishPayload(Block & block, size_t thread_num) |
172 | { |
173 | handler.onBlock(block, thread_num); |
174 | } |
175 | |
176 | void thread(ThreadGroupStatusPtr thread_group, size_t thread_num) |
177 | { |
178 | std::exception_ptr exception; |
179 | CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread}; |
180 | |
181 | try |
182 | { |
183 | setThreadName("ParalInputsProc" ); |
184 | if (thread_group) |
185 | CurrentThread::attachTo(thread_group); |
186 | |
187 | while (!finish) |
188 | { |
189 | InputData unprepared_input; |
190 | { |
191 | std::lock_guard lock(unprepared_inputs_mutex); |
192 | |
193 | if (unprepared_inputs.empty()) |
194 | break; |
195 | |
196 | unprepared_input = unprepared_inputs.front(); |
197 | unprepared_inputs.pop(); |
198 | } |
199 | |
200 | unprepared_input.in->readPrefix(); |
201 | |
202 | { |
203 | std::lock_guard lock(available_inputs_mutex); |
204 | available_inputs.push(unprepared_input); |
205 | } |
206 | } |
207 | |
208 | loop(thread_num); |
209 | } |
210 | catch (...) |
211 | { |
212 | exception = std::current_exception(); |
213 | } |
214 | |
215 | if (exception) |
216 | { |
217 | handler.onException(exception, thread_num); |
218 | } |
219 | |
220 | handler.onFinishThread(thread_num); |
221 | |
222 | /// The last thread on the output indicates that there is no more data. |
223 | if (0 == --active_threads) |
224 | { |
225 | /// And then it processes an additional source, if there is one. |
226 | if (additional_input_at_end) |
227 | { |
228 | try |
229 | { |
230 | additional_input_at_end->readPrefix(); |
231 | while (Block block = additional_input_at_end->read()) |
232 | publishPayload(block, thread_num); |
233 | } |
234 | catch (...) |
235 | { |
236 | exception = std::current_exception(); |
237 | } |
238 | |
239 | if (exception) |
240 | { |
241 | handler.onException(exception, thread_num); |
242 | } |
243 | } |
244 | |
245 | handler.onFinish(); /// TODO If in `onFinish` or `onFinishThread` there is an exception, then std::terminate is called. |
246 | } |
247 | } |
248 | |
249 | void loop(size_t thread_num) |
250 | { |
251 | while (!finish) /// You may need to stop work earlier than all sources run out. |
252 | { |
253 | InputData input; |
254 | |
255 | /// Select the next source. |
256 | { |
257 | std::lock_guard lock(available_inputs_mutex); |
258 | |
259 | /// If there are no free sources, then this thread is no longer needed. (But other threads can work with their sources.) |
260 | if (available_inputs.empty()) |
261 | break; |
262 | |
263 | input = available_inputs.front(); |
264 | |
265 | /// We remove the source from the queue of available sources. |
266 | available_inputs.pop(); |
267 | } |
268 | |
269 | /// The main work. |
270 | Block block = input.in->read(); |
271 | |
272 | { |
273 | if (finish) |
274 | break; |
275 | |
276 | /// If this source is not run out yet, then put the resulting block in the ready queue. |
277 | { |
278 | std::lock_guard lock(available_inputs_mutex); |
279 | |
280 | if (block) |
281 | { |
282 | available_inputs.push(input); |
283 | } |
284 | else |
285 | { |
286 | if (available_inputs.empty()) |
287 | break; |
288 | } |
289 | } |
290 | |
291 | if (finish) |
292 | break; |
293 | |
294 | if (block) |
295 | publishPayload(block, thread_num); |
296 | } |
297 | } |
298 | } |
299 | |
300 | BlockInputStreams inputs; |
301 | BlockInputStreamPtr additional_input_at_end; |
302 | unsigned max_threads; |
303 | |
304 | Handler & handler; |
305 | |
306 | /// Threads. |
307 | using ThreadsData = std::vector<ThreadFromGlobalPool>; |
308 | ThreadsData threads; |
309 | |
310 | /** A set of available sources that are not currently processed by any thread. |
311 | * Each thread takes one source from this set, takes a block out of the source (at this moment the source does the calculations) |
312 | * and (if the source is not run out), puts it back into the set of available sources. |
313 | * |
314 | * The question arises what is better to use: |
315 | * - the queue (just processed source will be processed the next time later than the rest) |
316 | * - stack (just processed source will be processed as soon as possible). |
317 | * |
318 | * The stack is better than the queue when you need to do work on reading one source more consequentially, |
319 | * and theoretically, this allows you to achieve more consequent/consistent reads from the disk. |
320 | * |
321 | * But when using the stack, there is a problem with distributed query processing: |
322 | * data is read only from a part of the servers, and on the other servers |
323 | * a timeout occurs during send, and the request processing ends with an exception. |
324 | * |
325 | * Therefore, a queue is used. This can be improved in the future. |
326 | */ |
327 | using AvailableInputs = std::queue<InputData>; |
328 | AvailableInputs available_inputs; |
329 | |
330 | /** For parallel preparing (readPrefix) child streams. |
331 | * First, streams are located here. |
332 | * After a stream was prepared, it is moved to "available_inputs" for reading. |
333 | */ |
334 | using UnpreparedInputs = std::queue<InputData>; |
335 | UnpreparedInputs unprepared_inputs; |
336 | |
337 | /// For operations with available_inputs. |
338 | std::mutex available_inputs_mutex; |
339 | |
340 | /// For operations with unprepared_inputs. |
341 | std::mutex unprepared_inputs_mutex; |
342 | |
343 | /// How many sources ran out. |
344 | std::atomic<size_t> active_threads { 0 }; |
345 | /// Finish the threads work (before the sources run out). |
346 | std::atomic<bool> finish { false }; |
347 | /// Wait for the completion of all threads. |
348 | std::atomic<bool> joined_threads { false }; |
349 | |
350 | Logger * log = &Logger::get("ParallelInputsProcessor" ); |
351 | }; |
352 | |
353 | |
354 | } |
355 | |