1 | #pragma once |
2 | |
3 | #include <Processors/IProcessor.h> |
4 | #include <queue> |
5 | |
6 | |
7 | namespace DB |
8 | { |
9 | |
10 | /** Has arbitary non zero number of inputs and arbitary non zero number of outputs. |
11 | * All of them have the same structure. |
12 | * |
13 | * Pulls data from arbitary input (whenever it is ready) and pushes it to arbitary output (whenever is is not full). |
14 | * Doesn't do any heavy calculations. |
15 | * Doesn't preserve an order of data. |
16 | * |
17 | * Examples: |
18 | * - union data from multiple inputs to single output - to serialize data that was processed in parallel. |
19 | * - split data from single input to multiple outputs - to allow further parallel processing. |
20 | */ |
21 | class ResizeProcessor : public IProcessor |
22 | { |
23 | public: |
24 | /// TODO Check that there is non zero number of inputs and outputs. |
25 | ResizeProcessor(const Block & , size_t num_inputs, size_t num_outputs) |
26 | : IProcessor(InputPorts(num_inputs, header), OutputPorts(num_outputs, header)) |
27 | , current_input(inputs.begin()) |
28 | , current_output(outputs.begin()) |
29 | { |
30 | } |
31 | |
32 | String getName() const override { return "Resize" ; } |
33 | |
34 | Status prepare() override; |
35 | Status prepare(const PortNumbers &, const PortNumbers &) override; |
36 | |
37 | private: |
38 | InputPorts::iterator current_input; |
39 | OutputPorts::iterator current_output; |
40 | |
41 | size_t num_finished_inputs = 0; |
42 | size_t num_finished_outputs = 0; |
43 | std::queue<UInt64> waiting_outputs; |
44 | std::queue<UInt64> inputs_with_data; |
45 | bool initialized = false; |
46 | |
47 | enum class OutputStatus |
48 | { |
49 | NotActive, |
50 | NeedData, |
51 | Finished, |
52 | }; |
53 | |
54 | enum class InputStatus |
55 | { |
56 | NotActive, |
57 | HasData, |
58 | Finished, |
59 | }; |
60 | |
61 | struct InputPortWithStatus |
62 | { |
63 | InputPort * port; |
64 | InputStatus status; |
65 | }; |
66 | |
67 | struct OutputPortWithStatus |
68 | { |
69 | OutputPort * port; |
70 | OutputStatus status; |
71 | }; |
72 | |
73 | std::vector<InputPortWithStatus> input_ports; |
74 | std::vector<OutputPortWithStatus> output_ports; |
75 | }; |
76 | |
77 | } |
78 | |