1 | #include <Processors/ISimpleTransform.h> |
---|---|
2 | |
3 | |
4 | namespace DB |
5 | { |
6 | |
7 | ISimpleTransform::ISimpleTransform(Block input_header_, Block output_header_, bool skip_empty_chunks_) |
8 | : IProcessor({std::move(input_header_)}, {std::move(output_header_)}) |
9 | , input(inputs.front()) |
10 | , output(outputs.front()) |
11 | , skip_empty_chunks(skip_empty_chunks_) |
12 | { |
13 | } |
14 | |
15 | ISimpleTransform::Status ISimpleTransform::prepare() |
16 | { |
17 | /// Check can output. |
18 | |
19 | if (output.isFinished()) |
20 | { |
21 | input.close(); |
22 | return Status::Finished; |
23 | } |
24 | |
25 | if (!output.canPush()) |
26 | { |
27 | input.setNotNeeded(); |
28 | return Status::PortFull; |
29 | } |
30 | |
31 | /// Output if has data. |
32 | if (transformed) |
33 | { |
34 | output.pushData(std::move(current_data)); |
35 | transformed = false; |
36 | } |
37 | |
38 | /// Stop if don't need more data. |
39 | if (no_more_data_needed) |
40 | { |
41 | input.close(); |
42 | output.finish(); |
43 | return Status::Finished; |
44 | } |
45 | |
46 | /// Check can input. |
47 | if (!has_input) |
48 | { |
49 | if (input.isFinished()) |
50 | { |
51 | output.finish(); |
52 | return Status::Finished; |
53 | } |
54 | |
55 | input.setNeeded(); |
56 | |
57 | if (!input.hasData()) |
58 | return Status::NeedData; |
59 | |
60 | current_data = input.pullData(); |
61 | has_input = true; |
62 | |
63 | if (current_data.exception) |
64 | { |
65 | /// Skip transform in case of exception. |
66 | has_input = false; |
67 | transformed = true; |
68 | |
69 | /// No more data needed. Exception will be thrown (or swallowed) later. |
70 | input.setNotNeeded(); |
71 | } |
72 | |
73 | if (set_input_not_needed_after_read) |
74 | input.setNotNeeded(); |
75 | } |
76 | |
77 | /// Now transform. |
78 | return Status::Ready; |
79 | } |
80 | |
81 | void ISimpleTransform::work() |
82 | { |
83 | if (current_data.exception) |
84 | return; |
85 | |
86 | try |
87 | { |
88 | transform(current_data.chunk); |
89 | } |
90 | catch (DB::Exception &) |
91 | { |
92 | current_data.exception = std::current_exception(); |
93 | transformed = true; |
94 | has_input = false; |
95 | return; |
96 | } |
97 | |
98 | has_input = false; |
99 | |
100 | if (!skip_empty_chunks || current_data.chunk) |
101 | transformed = true; |
102 | |
103 | if (transformed && !current_data.chunk) |
104 | /// Support invariant that chunks must have the same number of columns as header. |
105 | current_data.chunk = Chunk(getOutputPort().getHeader().cloneEmpty().getColumns(), 0); |
106 | } |
107 | |
108 | } |
109 | |
110 |