1#pragma once
2
3#include <Processors/IProcessor.h>
4#include <queue>
5
6
7namespace DB
8{
9
10/** Has arbitary non zero number of inputs and arbitary non zero number of outputs.
11 * All of them have the same structure.
12 *
13 * Pulls data from arbitary input (whenever it is ready) and pushes it to arbitary output (whenever is is not full).
14 * Doesn't do any heavy calculations.
15 * Doesn't preserve an order of data.
16 *
17 * Examples:
18 * - union data from multiple inputs to single output - to serialize data that was processed in parallel.
19 * - split data from single input to multiple outputs - to allow further parallel processing.
20 */
21class ResizeProcessor : public IProcessor
22{
23public:
24 /// TODO Check that there is non zero number of inputs and outputs.
25 ResizeProcessor(const Block & header, size_t num_inputs, size_t num_outputs)
26 : IProcessor(InputPorts(num_inputs, header), OutputPorts(num_outputs, header))
27 , current_input(inputs.begin())
28 , current_output(outputs.begin())
29 {
30 }
31
32 String getName() const override { return "Resize"; }
33
34 Status prepare() override;
35 Status prepare(const PortNumbers &, const PortNumbers &) override;
36
37private:
38 InputPorts::iterator current_input;
39 OutputPorts::iterator current_output;
40
41 size_t num_finished_inputs = 0;
42 size_t num_finished_outputs = 0;
43 std::queue<UInt64> waiting_outputs;
44 std::queue<UInt64> inputs_with_data;
45 bool initialized = false;
46
47 enum class OutputStatus
48 {
49 NotActive,
50 NeedData,
51 Finished,
52 };
53
54 enum class InputStatus
55 {
56 NotActive,
57 HasData,
58 Finished,
59 };
60
61 struct InputPortWithStatus
62 {
63 InputPort * port;
64 InputStatus status;
65 };
66
67 struct OutputPortWithStatus
68 {
69 OutputPort * port;
70 OutputStatus status;
71 };
72
73 std::vector<InputPortWithStatus> input_ports;
74 std::vector<OutputPortWithStatus> output_ports;
75};
76
77}
78