1#include <Processors/Transforms/ExtremesTransform.h>
2
3#include <Core/Field.h>
4
5namespace DB
6{
7
8ExtremesTransform::ExtremesTransform(const Block & header)
9 : ISimpleTransform(header, header, true)
10{
11 /// Port for Extremes.
12 outputs.emplace_back(outputs.front().getHeader(), this);
13}
14
15IProcessor::Status ExtremesTransform::prepare()
16{
17 if (!finished_transform)
18 {
19 auto status = ISimpleTransform::prepare();
20
21 if (status != Status::Finished)
22 return status;
23
24 finished_transform = true;
25 }
26
27 auto & totals_output = getExtremesPort();
28
29 /// Check can output.
30 if (totals_output.isFinished())
31 return Status::Finished;
32
33 if (!totals_output.canPush())
34 return Status::PortFull;
35
36 if (!extremes && !extremes_columns.empty())
37 return Status::Ready;
38
39 if (extremes)
40 totals_output.push(std::move(extremes));
41
42 totals_output.finish();
43 return Status::Finished;
44}
45
46void ExtremesTransform::work()
47{
48 if (finished_transform)
49 {
50 if (!extremes && !extremes_columns.empty())
51 extremes.setColumns(std::move(extremes_columns), 2);
52 }
53 else
54 ISimpleTransform::work();
55}
56
57void ExtremesTransform::transform(DB::Chunk & chunk)
58{
59
60 if (chunk.getNumRows() == 0)
61 return;
62
63 size_t num_columns = chunk.getNumColumns();
64 auto & columns = chunk.getColumns();
65
66 if (extremes_columns.empty())
67 {
68 extremes_columns.resize(num_columns);
69
70 for (size_t i = 0; i < num_columns; ++i)
71 {
72 const ColumnPtr & src = columns[i];
73
74 if (isColumnConst(*src))
75 {
76 /// Equal min and max.
77 extremes_columns[i] = src->cloneResized(2);
78 }
79 else
80 {
81 Field min_value;
82 Field max_value;
83
84 src->getExtremes(min_value, max_value);
85
86 extremes_columns[i] = src->cloneEmpty();
87
88 extremes_columns[i]->insert(min_value);
89 extremes_columns[i]->insert(max_value);
90 }
91 }
92 }
93 else
94 {
95 for (size_t i = 0; i < num_columns; ++i)
96 {
97 if (isColumnConst(*extremes_columns[i]))
98 continue;
99
100 Field min_value = (*extremes_columns[i])[0];
101 Field max_value = (*extremes_columns[i])[1];
102
103 Field cur_min_value;
104 Field cur_max_value;
105
106 columns[i]->getExtremes(cur_min_value, cur_max_value);
107
108 if (cur_min_value < min_value)
109 min_value = cur_min_value;
110 if (cur_max_value > max_value)
111 max_value = cur_max_value;
112
113 MutableColumnPtr new_extremes = extremes_columns[i]->cloneEmpty();
114
115 new_extremes->insert(min_value);
116 new_extremes->insert(max_value);
117
118 extremes_columns[i] = std::move(new_extremes);
119 }
120 }
121}
122
123}
124