1#pragma once
2
3#include <list>
4#include <queue>
5#include <atomic>
6#include <thread>
7#include <mutex>
8
9#include <common/logger_useful.h>
10
11#include <DataStreams/IBlockStream_fwd.h>
12#include <Common/setThreadName.h>
13#include <Common/CurrentMetrics.h>
14#include <Common/CurrentThread.h>
15#include <Common/ThreadPool.h>
16
17
18/** Allows to process multiple block input streams (sources) in parallel, using specified number of threads.
19 * Reads (pulls) blocks from any available source and passes it to specified handler.
20 *
21 * Before any reading, calls "readPrefix" method of sources in parallel.
22 *
23 * (As an example, "readPrefix" can prepare connections to remote servers,
24 * and we want this work to be executed in parallel for different sources)
25 *
26 * Implemented in following way:
27 * - there are multiple input sources to read blocks from;
28 * - there are multiple threads, that could simultaneously read blocks from different sources;
29 * - "available" sources (that are not read in any thread right now) are put in queue of sources;
30 * - when thread take a source to read from, it removes source from queue of sources,
31 * then read block from source and then put source back to queue of available sources.
32 */
33
34namespace CurrentMetrics
35{
36 extern const Metric QueryThread;
37}
38
39namespace DB
40{
41
42/// Example of the handler.
43struct ParallelInputsHandler
44{
45 /// Processing the data block.
46 void onBlock(Block & /*block*/, size_t /*thread_num*/) {}
47
48 /// Called for each thread, when the thread has nothing else to do.
49 /// Due to the fact that part of the sources has run out, and now there are fewer sources left than streams.
50 /// Called if the `onException` method does not throw an exception; is called before the `onFinish` method.
51 void onFinishThread(size_t /*thread_num*/) {}
52
53 /// Blocks are over. Due to the fact that all sources ran out or because of the cancellation of work.
54 /// This method is always called exactly once, at the end of the work, if the `onException` method does not throw an exception.
55 void onFinish() {}
56
57 /// Exception handling. It is reasonable to call the ParallelInputsProcessor::cancel method in this method, and also pass the exception to the main thread.
58 void onException(std::exception_ptr & /*exception*/, size_t /*thread_num*/) {}
59};
60
61
62template <typename Handler>
63class ParallelInputsProcessor
64{
65public:
66 /** additional_input_at_end - if not nullptr,
67 * then the blocks from this source will start to be processed only after all other sources are processed.
68 * This is done in the main thread.
69 *
70 * Intended for implementation of FULL and RIGHT JOIN
71 * - where you must first make JOIN in parallel, while noting which keys are not found,
72 * and only after the completion of this work, create blocks of keys that are not found.
73 */
74 ParallelInputsProcessor(const BlockInputStreams & inputs_, const BlockInputStreamPtr & additional_input_at_end_, size_t max_threads_, Handler & handler_)
75 : inputs(inputs_), additional_input_at_end(additional_input_at_end_), max_threads(std::min(inputs_.size(), max_threads_)), handler(handler_)
76 {
77 for (size_t i = 0; i < inputs_.size(); ++i)
78 unprepared_inputs.emplace(inputs_[i], i);
79 }
80
81 ~ParallelInputsProcessor()
82 {
83 try
84 {
85 wait();
86 }
87 catch (...)
88 {
89 tryLogCurrentException(__PRETTY_FUNCTION__);
90 }
91 }
92
93 /// Start background threads, start work.
94 void process()
95 {
96 active_threads = max_threads;
97 threads.reserve(max_threads);
98
99 try
100 {
101 for (size_t i = 0; i < max_threads; ++i)
102 threads.emplace_back(&ParallelInputsProcessor::thread, this, CurrentThread::getGroup(), i);
103 }
104 catch (...)
105 {
106 cancel(false);
107 wait();
108 if (active_threads)
109 {
110 active_threads = 0;
111 /// handler.onFinish() is supposed to be called from one of the threads when the number of
112 /// finished threads reaches max_threads. But since we weren't able to launch all threads,
113 /// we have to call onFinish() manually here.
114 handler.onFinish();
115 }
116 throw;
117 }
118 }
119
120 /// Ask all sources to stop earlier than they run out.
121 void cancel(bool kill)
122 {
123 finish = true;
124
125 for (auto & input : inputs)
126 {
127 try
128 {
129 input->cancel(kill);
130 }
131 catch (...)
132 {
133 /** If you can not ask one or more sources to stop.
134 * (for example, the connection is broken for distributed query processing)
135 * - then do not care.
136 */
137 LOG_ERROR(log, "Exception while cancelling " << input->getName());
138 }
139 }
140 }
141
142 /// Wait until all threads are finished, before the destructor.
143 void wait()
144 {
145 if (joined_threads)
146 return;
147
148 for (auto & thread : threads)
149 thread.join();
150
151 threads.clear();
152 joined_threads = true;
153 }
154
155 size_t getNumActiveThreads() const
156 {
157 return active_threads;
158 }
159
160private:
161 /// Single source data
162 struct InputData
163 {
164 BlockInputStreamPtr in;
165 size_t i = 0; /// The source number (for debugging).
166
167 InputData() {}
168 InputData(const BlockInputStreamPtr & in_, size_t i_) : in(in_), i(i_) {}
169 };
170
171 void publishPayload(Block & block, size_t thread_num)
172 {
173 handler.onBlock(block, thread_num);
174 }
175
176 void thread(ThreadGroupStatusPtr thread_group, size_t thread_num)
177 {
178 std::exception_ptr exception;
179 CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
180
181 try
182 {
183 setThreadName("ParalInputsProc");
184 if (thread_group)
185 CurrentThread::attachTo(thread_group);
186
187 while (!finish)
188 {
189 InputData unprepared_input;
190 {
191 std::lock_guard lock(unprepared_inputs_mutex);
192
193 if (unprepared_inputs.empty())
194 break;
195
196 unprepared_input = unprepared_inputs.front();
197 unprepared_inputs.pop();
198 }
199
200 unprepared_input.in->readPrefix();
201
202 {
203 std::lock_guard lock(available_inputs_mutex);
204 available_inputs.push(unprepared_input);
205 }
206 }
207
208 loop(thread_num);
209 }
210 catch (...)
211 {
212 exception = std::current_exception();
213 }
214
215 if (exception)
216 {
217 handler.onException(exception, thread_num);
218 }
219
220 handler.onFinishThread(thread_num);
221
222 /// The last thread on the output indicates that there is no more data.
223 if (0 == --active_threads)
224 {
225 /// And then it processes an additional source, if there is one.
226 if (additional_input_at_end)
227 {
228 try
229 {
230 additional_input_at_end->readPrefix();
231 while (Block block = additional_input_at_end->read())
232 publishPayload(block, thread_num);
233 }
234 catch (...)
235 {
236 exception = std::current_exception();
237 }
238
239 if (exception)
240 {
241 handler.onException(exception, thread_num);
242 }
243 }
244
245 handler.onFinish(); /// TODO If in `onFinish` or `onFinishThread` there is an exception, then std::terminate is called.
246 }
247 }
248
249 void loop(size_t thread_num)
250 {
251 while (!finish) /// You may need to stop work earlier than all sources run out.
252 {
253 InputData input;
254
255 /// Select the next source.
256 {
257 std::lock_guard lock(available_inputs_mutex);
258
259 /// If there are no free sources, then this thread is no longer needed. (But other threads can work with their sources.)
260 if (available_inputs.empty())
261 break;
262
263 input = available_inputs.front();
264
265 /// We remove the source from the queue of available sources.
266 available_inputs.pop();
267 }
268
269 /// The main work.
270 Block block = input.in->read();
271
272 {
273 if (finish)
274 break;
275
276 /// If this source is not run out yet, then put the resulting block in the ready queue.
277 {
278 std::lock_guard lock(available_inputs_mutex);
279
280 if (block)
281 {
282 available_inputs.push(input);
283 }
284 else
285 {
286 if (available_inputs.empty())
287 break;
288 }
289 }
290
291 if (finish)
292 break;
293
294 if (block)
295 publishPayload(block, thread_num);
296 }
297 }
298 }
299
300 BlockInputStreams inputs;
301 BlockInputStreamPtr additional_input_at_end;
302 unsigned max_threads;
303
304 Handler & handler;
305
306 /// Threads.
307 using ThreadsData = std::vector<ThreadFromGlobalPool>;
308 ThreadsData threads;
309
310 /** A set of available sources that are not currently processed by any thread.
311 * Each thread takes one source from this set, takes a block out of the source (at this moment the source does the calculations)
312 * and (if the source is not run out), puts it back into the set of available sources.
313 *
314 * The question arises what is better to use:
315 * - the queue (just processed source will be processed the next time later than the rest)
316 * - stack (just processed source will be processed as soon as possible).
317 *
318 * The stack is better than the queue when you need to do work on reading one source more consequentially,
319 * and theoretically, this allows you to achieve more consequent/consistent reads from the disk.
320 *
321 * But when using the stack, there is a problem with distributed query processing:
322 * data is read only from a part of the servers, and on the other servers
323 * a timeout occurs during send, and the request processing ends with an exception.
324 *
325 * Therefore, a queue is used. This can be improved in the future.
326 */
327 using AvailableInputs = std::queue<InputData>;
328 AvailableInputs available_inputs;
329
330 /** For parallel preparing (readPrefix) child streams.
331 * First, streams are located here.
332 * After a stream was prepared, it is moved to "available_inputs" for reading.
333 */
334 using UnpreparedInputs = std::queue<InputData>;
335 UnpreparedInputs unprepared_inputs;
336
337 /// For operations with available_inputs.
338 std::mutex available_inputs_mutex;
339
340 /// For operations with unprepared_inputs.
341 std::mutex unprepared_inputs_mutex;
342
343 /// How many sources ran out.
344 std::atomic<size_t> active_threads { 0 };
345 /// Finish the threads work (before the sources run out).
346 std::atomic<bool> finish { false };
347 /// Wait for the completion of all threads.
348 std::atomic<bool> joined_threads { false };
349
350 Logger * log = &Logger::get("ParallelInputsProcessor");
351};
352
353
354}
355