1#pragma once
2
3#include <common/logger_useful.h>
4
5#include <Common/ConcurrentBoundedQueue.h>
6#include <DataStreams/IBlockInputStream.h>
7#include <DataStreams/ParallelInputsProcessor.h>
8
9
10namespace DB
11{
12
13namespace ErrorCodes
14{
15 extern const int LOGICAL_ERROR;
16}
17
18
19/** Merges several sources into one.
20 * Blocks from different sources are interleaved with each other in an arbitrary way.
21 * You can specify the number of threads (max_threads),
22 * in which data will be retrieved from different sources.
23 *
24 * It's managed like this:
25 * - with the help of ParallelInputsProcessor in several threads it takes out blocks from the sources;
26 * - the completed blocks are added to a limited queue of finished blocks;
27 * - the main thread takes out completed blocks from the queue of finished blocks;
28 */
29class UnionBlockInputStream final : public IBlockInputStream
30{
31private:
32 /// A block or an exception.
33 struct OutputData
34 {
35 Block block;
36 std::exception_ptr exception;
37
38 OutputData() {}
39 OutputData(Block & block_) : block(block_) {}
40 OutputData(std::exception_ptr & exception_) : exception(exception_) {}
41 };
42
43public:
44 using ExceptionCallback = std::function<void()>;
45
46 UnionBlockInputStream(
47 BlockInputStreams inputs,
48 BlockInputStreamPtr additional_input_at_end,
49 size_t max_threads,
50 ExceptionCallback exception_callback_ = ExceptionCallback()
51 ) :
52 output_queue(std::min(inputs.size(), max_threads)), handler(*this),
53 processor(inputs, additional_input_at_end, max_threads, handler),
54 exception_callback(exception_callback_)
55 {
56 children = inputs;
57 if (additional_input_at_end)
58 children.push_back(additional_input_at_end);
59
60 size_t num_children = children.size();
61 if (num_children > 1)
62 {
63 Block header = children.at(0)->getHeader();
64 for (size_t i = 1; i < num_children; ++i)
65 assertBlocksHaveEqualStructure(children[i]->getHeader(), header, "UNION");
66 }
67 }
68
69 String getName() const override { return "Union"; }
70
71 ~UnionBlockInputStream() override
72 {
73 try
74 {
75 if (!all_read)
76 cancel(false);
77
78 finalize();
79 }
80 catch (...)
81 {
82 tryLogCurrentException(__PRETTY_FUNCTION__);
83 }
84 }
85
86 /** Different from the default implementation by trying to stop all sources,
87 * skipping failed by execution.
88 */
89 void cancel(bool kill) override
90 {
91 if (kill)
92 is_killed = true;
93
94 bool old_val = false;
95 if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
96 return;
97
98 //std::cerr << "cancelling\n";
99 processor.cancel(kill);
100 }
101
102 Block getHeader() const override { return children.at(0)->getHeader(); }
103
104protected:
105 void finalize()
106 {
107 if (!started)
108 return;
109
110 LOG_TRACE(log, "Waiting for threads to finish");
111
112 std::exception_ptr exception;
113 if (!all_read)
114 {
115 /** Let's read everything up to the end, so that ParallelInputsProcessor is not blocked when trying to insert into the queue.
116 * Maybe there is an exception in the queue.
117 */
118 OutputData res;
119 while (true)
120 {
121 //std::cerr << "popping\n";
122 output_queue.pop(res);
123
124 if (res.exception)
125 {
126 if (!exception)
127 exception = res.exception;
128 else if (Exception * e = exception_cast<Exception *>(exception))
129 e->addMessage("\n" + getExceptionMessage(res.exception, false));
130 }
131 else if (!res.block)
132 break;
133 }
134
135 all_read = true;
136 }
137
138 processor.wait();
139
140 LOG_TRACE(log, "Waited for threads to finish");
141
142 if (exception)
143 std::rethrow_exception(exception);
144 }
145
146 /// Do nothing, to make the preparation for the query execution in parallel, in ParallelInputsProcessor.
147 void readPrefix() override
148 {
149 }
150
151 /** The following options are possible:
152 * 1. `readImpl` function is called until it returns an empty block.
153 * Then `readSuffix` function is called and then destructor.
154 * 2. `readImpl` function is called. At some point, `cancel` function is called perhaps from another thread.
155 * Then `readSuffix` function is called and then destructor.
156 * 3. At any time, the object can be destroyed (destructor called).
157 */
158
159 Block readImpl() override
160 {
161 if (all_read)
162 return received_payload.block;
163
164 /// Run threads if this has not already been done.
165 if (!started)
166 {
167 started = true;
168 processor.process();
169 }
170
171 /// We will wait until the next block is ready or an exception is thrown.
172 //std::cerr << "popping\n";
173 output_queue.pop(received_payload);
174
175 if (received_payload.exception)
176 {
177 if (exception_callback)
178 exception_callback();
179 std::rethrow_exception(received_payload.exception);
180 }
181
182 if (!received_payload.block)
183 all_read = true;
184
185 return received_payload.block;
186 }
187
188 /// Called either after everything is read, or after cancel.
189 void readSuffix() override
190 {
191 //std::cerr << "readSuffix\n";
192 if (!all_read && !isCancelled())
193 throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR);
194
195 finalize();
196
197 for (size_t i = 0; i < children.size(); ++i)
198 children[i]->readSuffix();
199 }
200
201private:
202 using Payload = OutputData;
203 using OutputQueue = ConcurrentBoundedQueue<Payload>;
204
205 /** The queue of the finished blocks. Also, you can put an exception instead of a block.
206 * When data is run out, an empty block is inserted into the queue.
207 * Sooner or later, an empty block is always inserted into the queue (even after exception or query cancellation).
208 * The queue is always (even after exception or canceling the query, even in destructor) you must read up to an empty block,
209 * otherwise ParallelInputsProcessor can be blocked during insertion into the queue.
210 */
211 OutputQueue output_queue;
212
213 struct Handler
214 {
215 Handler(UnionBlockInputStream & parent_) : parent(parent_) {}
216
217 void onBlock(Block & block, size_t /*thread_num*/)
218 {
219 parent.output_queue.push(Payload(block));
220 }
221
222 void onFinish()
223 {
224 parent.output_queue.push(Payload());
225 }
226
227 void onFinishThread(size_t /*thread_num*/)
228 {
229 }
230
231 void onException(std::exception_ptr & exception, size_t /*thread_num*/)
232 {
233 //std::cerr << "pushing exception\n";
234
235 /// The order of the rows matters. If it is changed, then the situation is possible,
236 /// when before exception, an empty block (end of data) will be put into the queue,
237 /// and the exception is lost.
238
239 parent.output_queue.push(exception);
240 parent.cancel(false); /// Does not throw exceptions.
241 }
242
243 UnionBlockInputStream & parent;
244 };
245
246 Handler handler;
247 ParallelInputsProcessor<Handler> processor;
248
249 ExceptionCallback exception_callback;
250
251 Payload received_payload;
252
253 bool started = false;
254 bool all_read = false;
255
256 Logger * log = &Logger::get("UnionBlockInputStream");
257};
258
259}
260