1#include <Processors/ISimpleTransform.h>
2
3
4namespace DB
5{
6
7ISimpleTransform::ISimpleTransform(Block input_header_, Block output_header_, bool skip_empty_chunks_)
8 : IProcessor({std::move(input_header_)}, {std::move(output_header_)})
9 , input(inputs.front())
10 , output(outputs.front())
11 , skip_empty_chunks(skip_empty_chunks_)
12{
13}
14
15ISimpleTransform::Status ISimpleTransform::prepare()
16{
17 /// Check can output.
18
19 if (output.isFinished())
20 {
21 input.close();
22 return Status::Finished;
23 }
24
25 if (!output.canPush())
26 {
27 input.setNotNeeded();
28 return Status::PortFull;
29 }
30
31 /// Output if has data.
32 if (transformed)
33 {
34 output.pushData(std::move(current_data));
35 transformed = false;
36 }
37
38 /// Stop if don't need more data.
39 if (no_more_data_needed)
40 {
41 input.close();
42 output.finish();
43 return Status::Finished;
44 }
45
46 /// Check can input.
47 if (!has_input)
48 {
49 if (input.isFinished())
50 {
51 output.finish();
52 return Status::Finished;
53 }
54
55 input.setNeeded();
56
57 if (!input.hasData())
58 return Status::NeedData;
59
60 current_data = input.pullData();
61 has_input = true;
62
63 if (current_data.exception)
64 {
65 /// Skip transform in case of exception.
66 has_input = false;
67 transformed = true;
68
69 /// No more data needed. Exception will be thrown (or swallowed) later.
70 input.setNotNeeded();
71 }
72
73 if (set_input_not_needed_after_read)
74 input.setNotNeeded();
75 }
76
77 /// Now transform.
78 return Status::Ready;
79}
80
81void ISimpleTransform::work()
82{
83 if (current_data.exception)
84 return;
85
86 try
87 {
88 transform(current_data.chunk);
89 }
90 catch (DB::Exception &)
91 {
92 current_data.exception = std::current_exception();
93 transformed = true;
94 has_input = false;
95 return;
96 }
97
98 has_input = false;
99
100 if (!skip_empty_chunks || current_data.chunk)
101 transformed = true;
102
103 if (transformed && !current_data.chunk)
104 /// Support invariant that chunks must have the same number of columns as header.
105 current_data.chunk = Chunk(getOutputPort().getHeader().cloneEmpty().getColumns(), 0);
106}
107
108}
109
110