1#pragma once
2
3#include <memory>
4#include <vector>
5#include <variant>
6#include <cstdint>
7
8#include <Core/Block.h>
9#include <Core/Defines.h>
10#include <Processors/Chunk.h>
11#include <Common/Exception.h>
12#include <common/likely.h>
13
14namespace DB
15{
16
17class InputPort;
18class OutputPort;
19class IProcessor;
20
21namespace ErrorCodes
22{
23 extern const int LOGICAL_ERROR;
24}
25
26class Port
27{
28 friend void connect(OutputPort &, InputPort &);
29 friend class IProcessor;
30
31public:
32 struct UpdateInfo
33 {
34 std::vector<void *> * update_list = nullptr;
35 void * id = nullptr;
36 UInt64 version = 0;
37 UInt64 prev_version = 0;
38
39 void inline ALWAYS_INLINE update()
40 {
41 if (version == prev_version && update_list)
42 update_list->push_back(id);
43
44 ++version;
45 }
46
47 void inline ALWAYS_INLINE trigger() { prev_version = version; }
48 };
49
50protected:
51 /// Shared state of two connected ports.
52 class State
53 {
54 public:
55
56 struct Data
57 {
58 /// Note: std::variant can be used. But move constructor for it can't be inlined.
59 Chunk chunk;
60 std::exception_ptr exception;
61 };
62
63 private:
64 static std::uintptr_t getUInt(Data * data) { return reinterpret_cast<std::uintptr_t>(data); }
65 static Data * getPtr(std::uintptr_t data) { return reinterpret_cast<Data *>(data); }
66
67 public:
68
69 /// Flags for Port state.
70 /// Will store them in least pointer bits.
71
72 /// Port was set finished or closed.
73 static constexpr std::uintptr_t IS_FINISHED = 1;
74 /// Block is not needed right now, but may be will be needed later.
75 /// This allows to pause calculations if we are not sure that we need more data.
76 static constexpr std::uintptr_t IS_NEEDED = 2;
77 /// Check if port has data.
78 static constexpr std::uintptr_t HAS_DATA = 4;
79
80 static constexpr std::uintptr_t FLAGS_MASK = IS_FINISHED | IS_NEEDED | HAS_DATA;
81 static constexpr std::uintptr_t PTR_MASK = ~FLAGS_MASK;
82
83 /// Tiny smart ptr class for Data. Takes into account that ptr can have flags in least bits.
84 class DataPtr
85 {
86 public:
87 DataPtr() : data(new Data())
88 {
89 if (unlikely((getUInt(data) & FLAGS_MASK) != 0))
90 throw Exception("Not alignment memory for Port.", ErrorCodes::LOGICAL_ERROR);
91 }
92 /// Pointer can store flags in case of exception in swap.
93 ~DataPtr() { delete getPtr(getUInt(data) & PTR_MASK); }
94
95 DataPtr(DataPtr const &) : data(new Data()) {}
96 DataPtr& operator=(DataPtr const &) = delete;
97
98 Data * operator->() const { return data; }
99 Data & operator*() const { return *data; }
100
101 Data * get() const { return data; }
102 explicit operator bool() const { return data; }
103
104 Data * release()
105 {
106 Data * result = nullptr;
107 std::swap(result, data);
108 return result;
109 }
110
111 uintptr_t ALWAYS_INLINE swap(std::atomic<Data *> & value, std::uintptr_t flags, std::uintptr_t mask)
112 {
113 Data * expected = nullptr;
114 Data * desired = getPtr(flags | getUInt(data));
115
116 while (!value.compare_exchange_weak(expected, desired))
117 desired = getPtr((getUInt(expected) & FLAGS_MASK & (~mask)) | flags | getUInt(data));
118
119 /// It's not very safe. In case of exception after exchange and before assigment we will get leak.
120 /// Don't know how to make it better.
121 data = getPtr(getUInt(expected) & PTR_MASK);
122
123 return getUInt(expected) & FLAGS_MASK;
124 }
125
126 private:
127 Data * data = nullptr;
128 };
129
130 /// Not finished, not needed, has not data.
131 State() : data(new Data())
132 {
133 if (unlikely((getUInt(data) & FLAGS_MASK) != 0))
134 throw Exception("Not alignment memory for Port.", ErrorCodes::LOGICAL_ERROR);
135 }
136
137 ~State()
138 {
139 Data * desired = nullptr;
140 Data * expected = nullptr;
141
142 while (!data.compare_exchange_weak(expected, desired));
143
144 expected = getPtr(getUInt(expected) & PTR_MASK);
145 delete expected;
146 }
147
148 void ALWAYS_INLINE push(DataPtr & data_, std::uintptr_t & flags)
149 {
150 flags = data_.swap(data, HAS_DATA, HAS_DATA);
151
152 /// It's possible to push data into finished port. Will just ignore it.
153 /// if (flags & IS_FINISHED)
154 /// throw Exception("Cannot push block to finished port.", ErrorCodes::LOGICAL_ERROR);
155
156 /// It's possible to push data into port which is not needed now.
157 /// if ((flags & IS_NEEDED) == 0)
158 /// throw Exception("Cannot push block to port which is not needed.", ErrorCodes::LOGICAL_ERROR);
159
160 if (unlikely(flags & HAS_DATA))
161 throw Exception("Cannot push block to port which already has data.", ErrorCodes::LOGICAL_ERROR);
162 }
163
164 void ALWAYS_INLINE pull(DataPtr & data_, std::uintptr_t & flags)
165 {
166 flags = data_.swap(data, 0, HAS_DATA);
167
168 /// It's ok to check because this flag can be changed only by pulling thread.
169 if (unlikely((flags & IS_NEEDED) == 0))
170 throw Exception("Cannot pull block from port which is not needed.", ErrorCodes::LOGICAL_ERROR);
171
172 if (unlikely((flags & HAS_DATA) == 0))
173 throw Exception("Cannot pull block from port which has no data.", ErrorCodes::LOGICAL_ERROR);
174 }
175
176 std::uintptr_t ALWAYS_INLINE setFlags(std::uintptr_t flags, std::uintptr_t mask)
177 {
178 Data * expected = nullptr;
179 Data * desired = getPtr(flags);
180
181 while (!data.compare_exchange_weak(expected, desired))
182 desired = getPtr((getUInt(expected) & FLAGS_MASK & (~mask)) | flags | (getUInt(expected) & PTR_MASK));
183
184 return getUInt(expected) & FLAGS_MASK;
185 }
186
187 std::uintptr_t ALWAYS_INLINE getFlags() const
188 {
189 return getUInt(data.load()) & FLAGS_MASK;
190 }
191
192 private:
193 std::atomic<Data *> data;
194 };
195
196 Block header;
197 std::shared_ptr<State> state;
198
199 /// This object is only used for data exchange between port and shared state.
200 State::DataPtr data;
201
202 IProcessor * processor = nullptr;
203
204 /// If update_info was set, will call update() for it in case port's state have changed.
205 UpdateInfo * update_info = nullptr;
206
207public:
208 using Data = State::Data;
209
210 Port(Block header_) : header(std::move(header_)) {}
211 Port(Block header_, IProcessor * processor_) : header(std::move(header_)), processor(processor_) {}
212
213 void setUpdateInfo(UpdateInfo * info) { update_info = info; }
214
215 const Block & getHeader() const { return header; }
216 bool ALWAYS_INLINE isConnected() const { return state != nullptr; }
217
218 void ALWAYS_INLINE assumeConnected() const
219 {
220 if (unlikely(!isConnected()))
221 throw Exception("Port is not connected", ErrorCodes::LOGICAL_ERROR);
222 }
223
224 bool ALWAYS_INLINE hasData() const
225 {
226 assumeConnected();
227 return state->getFlags() & State::HAS_DATA;
228 }
229
230 IProcessor & getProcessor()
231 {
232 if (!processor)
233 throw Exception("Port does not belong to Processor", ErrorCodes::LOGICAL_ERROR);
234 return *processor;
235 }
236
237 const IProcessor & getProcessor() const
238 {
239 if (!processor)
240 throw Exception("Port does not belong to Processor", ErrorCodes::LOGICAL_ERROR);
241 return *processor;
242 }
243
244protected:
245 void inline ALWAYS_INLINE updateVersion()
246 {
247 if (likely(update_info))
248 update_info->update();
249 }
250};
251
252/// Invariants:
253/// * If you close port, it isFinished().
254/// * If port isFinished(), you can do nothing with it.
255/// * If port is not needed, you can only setNeeded() or close() it.
256/// * You can pull only if port hasData().
257class InputPort : public Port
258{
259 friend void connect(OutputPort &, InputPort &);
260
261private:
262 OutputPort * output_port = nullptr;
263
264 mutable bool is_finished = false;
265
266public:
267 using Port::Port;
268
269 Data ALWAYS_INLINE pullData()
270 {
271 updateVersion();
272
273 assumeConnected();
274
275 std::uintptr_t flags = 0;
276 state->pull(data, flags);
277
278 is_finished = flags & State::IS_FINISHED;
279
280 if (unlikely(!data->exception && data->chunk.getNumColumns() != header.columns()))
281 {
282 auto & chunk = data->chunk;
283
284 String msg = "Invalid number of columns in chunk pulled from OutputPort. Expected "
285 + std::to_string(header.columns()) + ", found " + std::to_string(chunk.getNumColumns()) + '\n';
286
287 msg += "Header: " + header.dumpStructure() + '\n';
288 msg += "Chunk: " + chunk.dumpStructure() + '\n';
289
290 throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
291 }
292
293 return std::move(*data);
294 }
295
296 Chunk ALWAYS_INLINE pull()
297 {
298 auto data_ = pullData();
299
300 if (data_.exception)
301 std::rethrow_exception(data_.exception);
302
303 return std::move(data_.chunk);
304 }
305
306 bool ALWAYS_INLINE isFinished() const
307 {
308 assumeConnected();
309
310 if (is_finished)
311 return true;
312
313 auto flags = state->getFlags();
314
315 is_finished = (flags & State::IS_FINISHED) && ((flags & State::HAS_DATA) == 0);
316
317 return is_finished;
318 }
319
320 void ALWAYS_INLINE setNeeded()
321 {
322 assumeConnected();
323
324 if ((state->setFlags(State::IS_NEEDED, State::IS_NEEDED) & State::IS_NEEDED) == 0)
325 updateVersion();
326 }
327
328 void ALWAYS_INLINE setNotNeeded()
329 {
330 assumeConnected();
331 state->setFlags(0, State::IS_NEEDED);
332 }
333
334 void ALWAYS_INLINE close()
335 {
336 assumeConnected();
337
338 if ((state->setFlags(State::IS_FINISHED, State::IS_FINISHED) & State::IS_FINISHED) == 0)
339 updateVersion();
340
341 is_finished = true;
342 }
343
344 void ALWAYS_INLINE reopen()
345 {
346 assumeConnected();
347
348 if (!isFinished())
349 return;
350
351 state->setFlags(0, State::IS_FINISHED);
352 is_finished = false;
353 }
354
355 OutputPort & getOutputPort()
356 {
357 assumeConnected();
358 return *output_port;
359 }
360
361 const OutputPort & getOutputPort() const
362 {
363 assumeConnected();
364 return *output_port;
365 }
366};
367
368
369/// Invariants:
370/// * If you finish port, it isFinished().
371/// * If port isFinished(), you can do nothing with it.
372/// * If port not isNeeded(), you can only finish() it.
373/// * You can hush only if port doesn't hasData().
374class OutputPort : public Port
375{
376 friend void connect(OutputPort &, InputPort &);
377
378private:
379 InputPort * input_port = nullptr;
380
381public:
382 using Port::Port;
383
384 void ALWAYS_INLINE push(Chunk chunk)
385 {
386 pushData({.chunk = std::move(chunk), .exception = {}});
387 }
388
389 void ALWAYS_INLINE push(std::exception_ptr exception)
390 {
391 pushData({.chunk = {}, .exception = std::move(exception)});
392 }
393
394 void ALWAYS_INLINE pushData(Data data_)
395 {
396 if (unlikely(!data_.exception && data_.chunk.getNumColumns() != header.columns()))
397 {
398 String msg = "Invalid number of columns in chunk pushed to OutputPort. Expected "
399 + std::to_string(header.columns())
400 + ", found " + std::to_string(data_.chunk.getNumColumns()) + '\n';
401
402 msg += "Header: " + header.dumpStructure() + '\n';
403 msg += "Chunk: " + data_.chunk.dumpStructure() + '\n';
404
405 throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
406 }
407
408 updateVersion();
409
410 assumeConnected();
411
412 std::uintptr_t flags = 0;
413 *data = std::move(data_);
414 state->push(data, flags);
415 }
416
417 void ALWAYS_INLINE finish()
418 {
419 assumeConnected();
420
421 auto flags = state->setFlags(State::IS_FINISHED, State::IS_FINISHED);
422
423 if ((flags & State::IS_FINISHED) == 0)
424 updateVersion();
425 }
426
427 bool ALWAYS_INLINE isNeeded() const
428 {
429 assumeConnected();
430 return state->getFlags() & State::IS_NEEDED;
431 }
432
433 bool ALWAYS_INLINE isFinished() const
434 {
435 assumeConnected();
436 return state->getFlags() & State::IS_FINISHED;
437 }
438
439 bool ALWAYS_INLINE canPush() const
440 {
441 assumeConnected();
442 auto flags = state->getFlags();
443 return (flags & State::IS_NEEDED) && ((flags & State::HAS_DATA) == 0);
444 }
445
446 InputPort & getInputPort()
447 {
448 assumeConnected();
449 return *input_port;
450 }
451
452 const InputPort & getInputPort() const
453 {
454 assumeConnected();
455 return *input_port;
456 }
457};
458
459
460using InputPorts = std::list<InputPort>;
461using OutputPorts = std::list<OutputPort>;
462
463
464void connect(OutputPort & output, InputPort & input);
465
466}
467