1 | #pragma once |
2 | |
3 | #include <common/logger_useful.h> |
4 | |
5 | #include <Common/ConcurrentBoundedQueue.h> |
6 | #include <DataStreams/IBlockInputStream.h> |
7 | #include <DataStreams/ParallelInputsProcessor.h> |
8 | |
9 | |
10 | namespace DB |
11 | { |
12 | |
13 | namespace ErrorCodes |
14 | { |
15 | extern const int LOGICAL_ERROR; |
16 | } |
17 | |
18 | |
19 | /** Merges several sources into one. |
20 | * Blocks from different sources are interleaved with each other in an arbitrary way. |
21 | * You can specify the number of threads (max_threads), |
22 | * in which data will be retrieved from different sources. |
23 | * |
24 | * It's managed like this: |
25 | * - with the help of ParallelInputsProcessor in several threads it takes out blocks from the sources; |
26 | * - the completed blocks are added to a limited queue of finished blocks; |
27 | * - the main thread takes out completed blocks from the queue of finished blocks; |
28 | */ |
29 | class UnionBlockInputStream final : public IBlockInputStream |
30 | { |
31 | private: |
32 | /// A block or an exception. |
33 | struct OutputData |
34 | { |
35 | Block block; |
36 | std::exception_ptr exception; |
37 | |
38 | OutputData() {} |
39 | OutputData(Block & block_) : block(block_) {} |
40 | OutputData(std::exception_ptr & exception_) : exception(exception_) {} |
41 | }; |
42 | |
43 | public: |
44 | using ExceptionCallback = std::function<void()>; |
45 | |
46 | UnionBlockInputStream( |
47 | BlockInputStreams inputs, |
48 | BlockInputStreamPtr additional_input_at_end, |
49 | size_t max_threads, |
50 | ExceptionCallback exception_callback_ = ExceptionCallback() |
51 | ) : |
52 | output_queue(std::min(inputs.size(), max_threads)), handler(*this), |
53 | processor(inputs, additional_input_at_end, max_threads, handler), |
54 | exception_callback(exception_callback_) |
55 | { |
56 | children = inputs; |
57 | if (additional_input_at_end) |
58 | children.push_back(additional_input_at_end); |
59 | |
60 | size_t num_children = children.size(); |
61 | if (num_children > 1) |
62 | { |
63 | Block = children.at(0)->getHeader(); |
64 | for (size_t i = 1; i < num_children; ++i) |
65 | assertBlocksHaveEqualStructure(children[i]->getHeader(), header, "UNION" ); |
66 | } |
67 | } |
68 | |
69 | String getName() const override { return "Union" ; } |
70 | |
71 | ~UnionBlockInputStream() override |
72 | { |
73 | try |
74 | { |
75 | if (!all_read) |
76 | cancel(false); |
77 | |
78 | finalize(); |
79 | } |
80 | catch (...) |
81 | { |
82 | tryLogCurrentException(__PRETTY_FUNCTION__); |
83 | } |
84 | } |
85 | |
86 | /** Different from the default implementation by trying to stop all sources, |
87 | * skipping failed by execution. |
88 | */ |
89 | void cancel(bool kill) override |
90 | { |
91 | if (kill) |
92 | is_killed = true; |
93 | |
94 | bool old_val = false; |
95 | if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) |
96 | return; |
97 | |
98 | //std::cerr << "cancelling\n"; |
99 | processor.cancel(kill); |
100 | } |
101 | |
102 | Block () const override { return children.at(0)->getHeader(); } |
103 | |
104 | protected: |
105 | void finalize() |
106 | { |
107 | if (!started) |
108 | return; |
109 | |
110 | LOG_TRACE(log, "Waiting for threads to finish" ); |
111 | |
112 | std::exception_ptr exception; |
113 | if (!all_read) |
114 | { |
115 | /** Let's read everything up to the end, so that ParallelInputsProcessor is not blocked when trying to insert into the queue. |
116 | * Maybe there is an exception in the queue. |
117 | */ |
118 | OutputData res; |
119 | while (true) |
120 | { |
121 | //std::cerr << "popping\n"; |
122 | output_queue.pop(res); |
123 | |
124 | if (res.exception) |
125 | { |
126 | if (!exception) |
127 | exception = res.exception; |
128 | else if (Exception * e = exception_cast<Exception *>(exception)) |
129 | e->addMessage("\n" + getExceptionMessage(res.exception, false)); |
130 | } |
131 | else if (!res.block) |
132 | break; |
133 | } |
134 | |
135 | all_read = true; |
136 | } |
137 | |
138 | processor.wait(); |
139 | |
140 | LOG_TRACE(log, "Waited for threads to finish" ); |
141 | |
142 | if (exception) |
143 | std::rethrow_exception(exception); |
144 | } |
145 | |
146 | /// Do nothing, to make the preparation for the query execution in parallel, in ParallelInputsProcessor. |
147 | void readPrefix() override |
148 | { |
149 | } |
150 | |
151 | /** The following options are possible: |
152 | * 1. `readImpl` function is called until it returns an empty block. |
153 | * Then `readSuffix` function is called and then destructor. |
154 | * 2. `readImpl` function is called. At some point, `cancel` function is called perhaps from another thread. |
155 | * Then `readSuffix` function is called and then destructor. |
156 | * 3. At any time, the object can be destroyed (destructor called). |
157 | */ |
158 | |
159 | Block readImpl() override |
160 | { |
161 | if (all_read) |
162 | return received_payload.block; |
163 | |
164 | /// Run threads if this has not already been done. |
165 | if (!started) |
166 | { |
167 | started = true; |
168 | processor.process(); |
169 | } |
170 | |
171 | /// We will wait until the next block is ready or an exception is thrown. |
172 | //std::cerr << "popping\n"; |
173 | output_queue.pop(received_payload); |
174 | |
175 | if (received_payload.exception) |
176 | { |
177 | if (exception_callback) |
178 | exception_callback(); |
179 | std::rethrow_exception(received_payload.exception); |
180 | } |
181 | |
182 | if (!received_payload.block) |
183 | all_read = true; |
184 | |
185 | return received_payload.block; |
186 | } |
187 | |
188 | /// Called either after everything is read, or after cancel. |
189 | void readSuffix() override |
190 | { |
191 | //std::cerr << "readSuffix\n"; |
192 | if (!all_read && !isCancelled()) |
193 | throw Exception("readSuffix called before all data is read" , ErrorCodes::LOGICAL_ERROR); |
194 | |
195 | finalize(); |
196 | |
197 | for (size_t i = 0; i < children.size(); ++i) |
198 | children[i]->readSuffix(); |
199 | } |
200 | |
201 | private: |
202 | using Payload = OutputData; |
203 | using OutputQueue = ConcurrentBoundedQueue<Payload>; |
204 | |
205 | /** The queue of the finished blocks. Also, you can put an exception instead of a block. |
206 | * When data is run out, an empty block is inserted into the queue. |
207 | * Sooner or later, an empty block is always inserted into the queue (even after exception or query cancellation). |
208 | * The queue is always (even after exception or canceling the query, even in destructor) you must read up to an empty block, |
209 | * otherwise ParallelInputsProcessor can be blocked during insertion into the queue. |
210 | */ |
211 | OutputQueue output_queue; |
212 | |
213 | struct Handler |
214 | { |
215 | Handler(UnionBlockInputStream & parent_) : parent(parent_) {} |
216 | |
217 | void onBlock(Block & block, size_t /*thread_num*/) |
218 | { |
219 | parent.output_queue.push(Payload(block)); |
220 | } |
221 | |
222 | void onFinish() |
223 | { |
224 | parent.output_queue.push(Payload()); |
225 | } |
226 | |
227 | void onFinishThread(size_t /*thread_num*/) |
228 | { |
229 | } |
230 | |
231 | void onException(std::exception_ptr & exception, size_t /*thread_num*/) |
232 | { |
233 | //std::cerr << "pushing exception\n"; |
234 | |
235 | /// The order of the rows matters. If it is changed, then the situation is possible, |
236 | /// when before exception, an empty block (end of data) will be put into the queue, |
237 | /// and the exception is lost. |
238 | |
239 | parent.output_queue.push(exception); |
240 | parent.cancel(false); /// Does not throw exceptions. |
241 | } |
242 | |
243 | UnionBlockInputStream & parent; |
244 | }; |
245 | |
246 | Handler handler; |
247 | ParallelInputsProcessor<Handler> processor; |
248 | |
249 | ExceptionCallback exception_callback; |
250 | |
251 | Payload received_payload; |
252 | |
253 | bool started = false; |
254 | bool all_read = false; |
255 | |
256 | Logger * log = &Logger::get("UnionBlockInputStream" ); |
257 | }; |
258 | |
259 | } |
260 | |