1 | #pragma once |
2 | |
3 | #include <memory> |
4 | #include <vector> |
5 | #include <variant> |
6 | #include <cstdint> |
7 | |
8 | #include <Core/Block.h> |
9 | #include <Core/Defines.h> |
10 | #include <Processors/Chunk.h> |
11 | #include <Common/Exception.h> |
12 | #include <common/likely.h> |
13 | |
14 | namespace DB |
15 | { |
16 | |
17 | class InputPort; |
18 | class OutputPort; |
19 | class IProcessor; |
20 | |
21 | namespace ErrorCodes |
22 | { |
23 | extern const int LOGICAL_ERROR; |
24 | } |
25 | |
26 | class Port |
27 | { |
28 | friend void connect(OutputPort &, InputPort &); |
29 | friend class IProcessor; |
30 | |
31 | public: |
32 | struct UpdateInfo |
33 | { |
34 | std::vector<void *> * update_list = nullptr; |
35 | void * id = nullptr; |
36 | UInt64 version = 0; |
37 | UInt64 prev_version = 0; |
38 | |
39 | void inline ALWAYS_INLINE update() |
40 | { |
41 | if (version == prev_version && update_list) |
42 | update_list->push_back(id); |
43 | |
44 | ++version; |
45 | } |
46 | |
47 | void inline ALWAYS_INLINE trigger() { prev_version = version; } |
48 | }; |
49 | |
50 | protected: |
51 | /// Shared state of two connected ports. |
52 | class State |
53 | { |
54 | public: |
55 | |
56 | struct Data |
57 | { |
58 | /// Note: std::variant can be used. But move constructor for it can't be inlined. |
59 | Chunk chunk; |
60 | std::exception_ptr exception; |
61 | }; |
62 | |
63 | private: |
64 | static std::uintptr_t getUInt(Data * data) { return reinterpret_cast<std::uintptr_t>(data); } |
65 | static Data * getPtr(std::uintptr_t data) { return reinterpret_cast<Data *>(data); } |
66 | |
67 | public: |
68 | |
69 | /// Flags for Port state. |
70 | /// Will store them in least pointer bits. |
71 | |
72 | /// Port was set finished or closed. |
73 | static constexpr std::uintptr_t IS_FINISHED = 1; |
74 | /// Block is not needed right now, but may be will be needed later. |
75 | /// This allows to pause calculations if we are not sure that we need more data. |
76 | static constexpr std::uintptr_t IS_NEEDED = 2; |
77 | /// Check if port has data. |
78 | static constexpr std::uintptr_t HAS_DATA = 4; |
79 | |
80 | static constexpr std::uintptr_t FLAGS_MASK = IS_FINISHED | IS_NEEDED | HAS_DATA; |
81 | static constexpr std::uintptr_t PTR_MASK = ~FLAGS_MASK; |
82 | |
83 | /// Tiny smart ptr class for Data. Takes into account that ptr can have flags in least bits. |
84 | class DataPtr |
85 | { |
86 | public: |
87 | DataPtr() : data(new Data()) |
88 | { |
89 | if (unlikely((getUInt(data) & FLAGS_MASK) != 0)) |
90 | throw Exception("Not alignment memory for Port." , ErrorCodes::LOGICAL_ERROR); |
91 | } |
92 | /// Pointer can store flags in case of exception in swap. |
93 | ~DataPtr() { delete getPtr(getUInt(data) & PTR_MASK); } |
94 | |
95 | DataPtr(DataPtr const &) : data(new Data()) {} |
96 | DataPtr& operator=(DataPtr const &) = delete; |
97 | |
98 | Data * operator->() const { return data; } |
99 | Data & operator*() const { return *data; } |
100 | |
101 | Data * get() const { return data; } |
102 | explicit operator bool() const { return data; } |
103 | |
104 | Data * release() |
105 | { |
106 | Data * result = nullptr; |
107 | std::swap(result, data); |
108 | return result; |
109 | } |
110 | |
111 | uintptr_t ALWAYS_INLINE swap(std::atomic<Data *> & value, std::uintptr_t flags, std::uintptr_t mask) |
112 | { |
113 | Data * expected = nullptr; |
114 | Data * desired = getPtr(flags | getUInt(data)); |
115 | |
116 | while (!value.compare_exchange_weak(expected, desired)) |
117 | desired = getPtr((getUInt(expected) & FLAGS_MASK & (~mask)) | flags | getUInt(data)); |
118 | |
119 | /// It's not very safe. In case of exception after exchange and before assigment we will get leak. |
120 | /// Don't know how to make it better. |
121 | data = getPtr(getUInt(expected) & PTR_MASK); |
122 | |
123 | return getUInt(expected) & FLAGS_MASK; |
124 | } |
125 | |
126 | private: |
127 | Data * data = nullptr; |
128 | }; |
129 | |
130 | /// Not finished, not needed, has not data. |
131 | State() : data(new Data()) |
132 | { |
133 | if (unlikely((getUInt(data) & FLAGS_MASK) != 0)) |
134 | throw Exception("Not alignment memory for Port." , ErrorCodes::LOGICAL_ERROR); |
135 | } |
136 | |
137 | ~State() |
138 | { |
139 | Data * desired = nullptr; |
140 | Data * expected = nullptr; |
141 | |
142 | while (!data.compare_exchange_weak(expected, desired)); |
143 | |
144 | expected = getPtr(getUInt(expected) & PTR_MASK); |
145 | delete expected; |
146 | } |
147 | |
148 | void ALWAYS_INLINE push(DataPtr & data_, std::uintptr_t & flags) |
149 | { |
150 | flags = data_.swap(data, HAS_DATA, HAS_DATA); |
151 | |
152 | /// It's possible to push data into finished port. Will just ignore it. |
153 | /// if (flags & IS_FINISHED) |
154 | /// throw Exception("Cannot push block to finished port.", ErrorCodes::LOGICAL_ERROR); |
155 | |
156 | /// It's possible to push data into port which is not needed now. |
157 | /// if ((flags & IS_NEEDED) == 0) |
158 | /// throw Exception("Cannot push block to port which is not needed.", ErrorCodes::LOGICAL_ERROR); |
159 | |
160 | if (unlikely(flags & HAS_DATA)) |
161 | throw Exception("Cannot push block to port which already has data." , ErrorCodes::LOGICAL_ERROR); |
162 | } |
163 | |
164 | void ALWAYS_INLINE pull(DataPtr & data_, std::uintptr_t & flags) |
165 | { |
166 | flags = data_.swap(data, 0, HAS_DATA); |
167 | |
168 | /// It's ok to check because this flag can be changed only by pulling thread. |
169 | if (unlikely((flags & IS_NEEDED) == 0)) |
170 | throw Exception("Cannot pull block from port which is not needed." , ErrorCodes::LOGICAL_ERROR); |
171 | |
172 | if (unlikely((flags & HAS_DATA) == 0)) |
173 | throw Exception("Cannot pull block from port which has no data." , ErrorCodes::LOGICAL_ERROR); |
174 | } |
175 | |
176 | std::uintptr_t ALWAYS_INLINE setFlags(std::uintptr_t flags, std::uintptr_t mask) |
177 | { |
178 | Data * expected = nullptr; |
179 | Data * desired = getPtr(flags); |
180 | |
181 | while (!data.compare_exchange_weak(expected, desired)) |
182 | desired = getPtr((getUInt(expected) & FLAGS_MASK & (~mask)) | flags | (getUInt(expected) & PTR_MASK)); |
183 | |
184 | return getUInt(expected) & FLAGS_MASK; |
185 | } |
186 | |
187 | std::uintptr_t ALWAYS_INLINE getFlags() const |
188 | { |
189 | return getUInt(data.load()) & FLAGS_MASK; |
190 | } |
191 | |
192 | private: |
193 | std::atomic<Data *> data; |
194 | }; |
195 | |
196 | Block ; |
197 | std::shared_ptr<State> state; |
198 | |
199 | /// This object is only used for data exchange between port and shared state. |
200 | State::DataPtr data; |
201 | |
202 | IProcessor * processor = nullptr; |
203 | |
204 | /// If update_info was set, will call update() for it in case port's state have changed. |
205 | UpdateInfo * update_info = nullptr; |
206 | |
207 | public: |
208 | using Data = State::Data; |
209 | |
210 | Port(Block ) : header(std::move(header_)) {} |
211 | Port(Block , IProcessor * processor_) : header(std::move(header_)), processor(processor_) {} |
212 | |
213 | void setUpdateInfo(UpdateInfo * info) { update_info = info; } |
214 | |
215 | const Block & () const { return header; } |
216 | bool ALWAYS_INLINE isConnected() const { return state != nullptr; } |
217 | |
218 | void ALWAYS_INLINE assumeConnected() const |
219 | { |
220 | if (unlikely(!isConnected())) |
221 | throw Exception("Port is not connected" , ErrorCodes::LOGICAL_ERROR); |
222 | } |
223 | |
224 | bool ALWAYS_INLINE hasData() const |
225 | { |
226 | assumeConnected(); |
227 | return state->getFlags() & State::HAS_DATA; |
228 | } |
229 | |
230 | IProcessor & getProcessor() |
231 | { |
232 | if (!processor) |
233 | throw Exception("Port does not belong to Processor" , ErrorCodes::LOGICAL_ERROR); |
234 | return *processor; |
235 | } |
236 | |
237 | const IProcessor & getProcessor() const |
238 | { |
239 | if (!processor) |
240 | throw Exception("Port does not belong to Processor" , ErrorCodes::LOGICAL_ERROR); |
241 | return *processor; |
242 | } |
243 | |
244 | protected: |
245 | void inline ALWAYS_INLINE updateVersion() |
246 | { |
247 | if (likely(update_info)) |
248 | update_info->update(); |
249 | } |
250 | }; |
251 | |
252 | /// Invariants: |
253 | /// * If you close port, it isFinished(). |
254 | /// * If port isFinished(), you can do nothing with it. |
255 | /// * If port is not needed, you can only setNeeded() or close() it. |
256 | /// * You can pull only if port hasData(). |
257 | class InputPort : public Port |
258 | { |
259 | friend void connect(OutputPort &, InputPort &); |
260 | |
261 | private: |
262 | OutputPort * output_port = nullptr; |
263 | |
264 | mutable bool is_finished = false; |
265 | |
266 | public: |
267 | using Port::Port; |
268 | |
269 | Data ALWAYS_INLINE pullData() |
270 | { |
271 | updateVersion(); |
272 | |
273 | assumeConnected(); |
274 | |
275 | std::uintptr_t flags = 0; |
276 | state->pull(data, flags); |
277 | |
278 | is_finished = flags & State::IS_FINISHED; |
279 | |
280 | if (unlikely(!data->exception && data->chunk.getNumColumns() != header.columns())) |
281 | { |
282 | auto & chunk = data->chunk; |
283 | |
284 | String msg = "Invalid number of columns in chunk pulled from OutputPort. Expected " |
285 | + std::to_string(header.columns()) + ", found " + std::to_string(chunk.getNumColumns()) + '\n'; |
286 | |
287 | msg += "Header: " + header.dumpStructure() + '\n'; |
288 | msg += "Chunk: " + chunk.dumpStructure() + '\n'; |
289 | |
290 | throw Exception(msg, ErrorCodes::LOGICAL_ERROR); |
291 | } |
292 | |
293 | return std::move(*data); |
294 | } |
295 | |
296 | Chunk ALWAYS_INLINE pull() |
297 | { |
298 | auto data_ = pullData(); |
299 | |
300 | if (data_.exception) |
301 | std::rethrow_exception(data_.exception); |
302 | |
303 | return std::move(data_.chunk); |
304 | } |
305 | |
306 | bool ALWAYS_INLINE isFinished() const |
307 | { |
308 | assumeConnected(); |
309 | |
310 | if (is_finished) |
311 | return true; |
312 | |
313 | auto flags = state->getFlags(); |
314 | |
315 | is_finished = (flags & State::IS_FINISHED) && ((flags & State::HAS_DATA) == 0); |
316 | |
317 | return is_finished; |
318 | } |
319 | |
320 | void ALWAYS_INLINE setNeeded() |
321 | { |
322 | assumeConnected(); |
323 | |
324 | if ((state->setFlags(State::IS_NEEDED, State::IS_NEEDED) & State::IS_NEEDED) == 0) |
325 | updateVersion(); |
326 | } |
327 | |
328 | void ALWAYS_INLINE setNotNeeded() |
329 | { |
330 | assumeConnected(); |
331 | state->setFlags(0, State::IS_NEEDED); |
332 | } |
333 | |
334 | void ALWAYS_INLINE close() |
335 | { |
336 | assumeConnected(); |
337 | |
338 | if ((state->setFlags(State::IS_FINISHED, State::IS_FINISHED) & State::IS_FINISHED) == 0) |
339 | updateVersion(); |
340 | |
341 | is_finished = true; |
342 | } |
343 | |
344 | void ALWAYS_INLINE reopen() |
345 | { |
346 | assumeConnected(); |
347 | |
348 | if (!isFinished()) |
349 | return; |
350 | |
351 | state->setFlags(0, State::IS_FINISHED); |
352 | is_finished = false; |
353 | } |
354 | |
355 | OutputPort & getOutputPort() |
356 | { |
357 | assumeConnected(); |
358 | return *output_port; |
359 | } |
360 | |
361 | const OutputPort & getOutputPort() const |
362 | { |
363 | assumeConnected(); |
364 | return *output_port; |
365 | } |
366 | }; |
367 | |
368 | |
369 | /// Invariants: |
370 | /// * If you finish port, it isFinished(). |
371 | /// * If port isFinished(), you can do nothing with it. |
372 | /// * If port not isNeeded(), you can only finish() it. |
373 | /// * You can hush only if port doesn't hasData(). |
374 | class OutputPort : public Port |
375 | { |
376 | friend void connect(OutputPort &, InputPort &); |
377 | |
378 | private: |
379 | InputPort * input_port = nullptr; |
380 | |
381 | public: |
382 | using Port::Port; |
383 | |
384 | void ALWAYS_INLINE push(Chunk chunk) |
385 | { |
386 | pushData({.chunk = std::move(chunk), .exception = {}}); |
387 | } |
388 | |
389 | void ALWAYS_INLINE push(std::exception_ptr exception) |
390 | { |
391 | pushData({.chunk = {}, .exception = std::move(exception)}); |
392 | } |
393 | |
394 | void ALWAYS_INLINE pushData(Data data_) |
395 | { |
396 | if (unlikely(!data_.exception && data_.chunk.getNumColumns() != header.columns())) |
397 | { |
398 | String msg = "Invalid number of columns in chunk pushed to OutputPort. Expected " |
399 | + std::to_string(header.columns()) |
400 | + ", found " + std::to_string(data_.chunk.getNumColumns()) + '\n'; |
401 | |
402 | msg += "Header: " + header.dumpStructure() + '\n'; |
403 | msg += "Chunk: " + data_.chunk.dumpStructure() + '\n'; |
404 | |
405 | throw Exception(msg, ErrorCodes::LOGICAL_ERROR); |
406 | } |
407 | |
408 | updateVersion(); |
409 | |
410 | assumeConnected(); |
411 | |
412 | std::uintptr_t flags = 0; |
413 | *data = std::move(data_); |
414 | state->push(data, flags); |
415 | } |
416 | |
417 | void ALWAYS_INLINE finish() |
418 | { |
419 | assumeConnected(); |
420 | |
421 | auto flags = state->setFlags(State::IS_FINISHED, State::IS_FINISHED); |
422 | |
423 | if ((flags & State::IS_FINISHED) == 0) |
424 | updateVersion(); |
425 | } |
426 | |
427 | bool ALWAYS_INLINE isNeeded() const |
428 | { |
429 | assumeConnected(); |
430 | return state->getFlags() & State::IS_NEEDED; |
431 | } |
432 | |
433 | bool ALWAYS_INLINE isFinished() const |
434 | { |
435 | assumeConnected(); |
436 | return state->getFlags() & State::IS_FINISHED; |
437 | } |
438 | |
439 | bool ALWAYS_INLINE canPush() const |
440 | { |
441 | assumeConnected(); |
442 | auto flags = state->getFlags(); |
443 | return (flags & State::IS_NEEDED) && ((flags & State::HAS_DATA) == 0); |
444 | } |
445 | |
446 | InputPort & getInputPort() |
447 | { |
448 | assumeConnected(); |
449 | return *input_port; |
450 | } |
451 | |
452 | const InputPort & getInputPort() const |
453 | { |
454 | assumeConnected(); |
455 | return *input_port; |
456 | } |
457 | }; |
458 | |
459 | |
460 | using InputPorts = std::list<InputPort>; |
461 | using OutputPorts = std::list<OutputPort>; |
462 | |
463 | |
464 | void connect(OutputPort & output, InputPort & input); |
465 | |
466 | } |
467 | |