1#include <DataStreams/CollapsingFinalBlockInputStream.h>
2#include <Common/typeid_cast.h>
3
4/// Maximum number of messages about incorrect data in the log.
5#define MAX_ERROR_MESSAGES 10
6
7
8namespace DB
9{
10
11CollapsingFinalBlockInputStream::~CollapsingFinalBlockInputStream()
12{
13 /// You must cancel all `MergingBlockPtr` so that they do not try to put blocks in `output_blocks`.
14 previous.block.cancel();
15 last_positive.block.cancel();
16
17 while (!queue.empty())
18 {
19 Cursor c = queue.top();
20 queue.pop();
21 c.block.cancel();
22 }
23
24 for (size_t i = 0; i < output_blocks.size(); ++i)
25 delete output_blocks[i];
26}
27
28void CollapsingFinalBlockInputStream::reportBadCounts()
29{
30 /// With inconsistent data, this is an unavoidable error that can not be easily fixed by admins. Therefore Warning.
31 LOG_WARNING(log, "Incorrect data: number of rows with sign = 1 (" << count_positive
32 << ") differs with number of rows with sign = -1 (" << count_negative
33 << ") by more than one");
34}
35
36void CollapsingFinalBlockInputStream::reportBadSign(Int8 sign)
37{
38 LOG_ERROR(log, "Invalid sign: " << static_cast<int>(sign));
39}
40
41void CollapsingFinalBlockInputStream::fetchNextBlock(size_t input_index)
42{
43 BlockInputStreamPtr stream = children[input_index];
44 Block block = stream->read();
45 if (!block)
46 return;
47 MergingBlockPtr merging_block(new MergingBlock(block, input_index, description, sign_column_name, &output_blocks));
48 ++blocks_fetched;
49 queue.push(Cursor(merging_block));
50}
51
52void CollapsingFinalBlockInputStream::commitCurrent()
53{
54 if (count_positive || count_negative)
55 {
56 if (count_positive >= count_negative && last_is_positive)
57 {
58 last_positive.addToFilter();
59 }
60
61 if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1))
62 {
63 if (count_incorrect_data < MAX_ERROR_MESSAGES)
64 reportBadCounts();
65 ++count_incorrect_data;
66 }
67
68 last_positive = Cursor();
69 previous = Cursor();
70 }
71
72 count_negative = 0;
73 count_positive = 0;
74}
75
76Block CollapsingFinalBlockInputStream::readImpl()
77{
78 if (first)
79 {
80 for (size_t i = 0; i < children.size(); ++i)
81 fetchNextBlock(i);
82
83 first = false;
84 }
85
86 /// We will create blocks for the answer until we get a non-empty block.
87 while (true)
88 {
89 while (!queue.empty() && output_blocks.empty())
90 {
91 Cursor current = queue.top();
92 queue.pop();
93
94 bool has_next = !queue.empty();
95 Cursor next = has_next ? queue.top() : Cursor();
96
97 /// We will advance in the current block, not using the queue, as long as possible.
98 while (true)
99 {
100 if (!current.equal(previous))
101 {
102 commitCurrent();
103 previous = current;
104 }
105
106 Int8 sign = current.getSign();
107 if (sign == 1)
108 {
109 last_positive = current;
110 last_is_positive = true;
111 ++count_positive;
112 }
113 else if (sign == -1)
114 {
115 last_is_positive = false;
116 ++count_negative;
117 }
118 else
119 reportBadSign(sign);
120
121 if (current.isLast())
122 {
123 fetchNextBlock(current.block->stream_index);
124
125 /// All streams are over. We'll process the last key.
126 if (!has_next)
127 commitCurrent();
128
129 break;
130 }
131 else
132 {
133 current.next();
134
135 if (has_next && !(next < current))
136 {
137 queue.push(current);
138 break;
139 }
140 }
141 }
142 }
143
144 /// End of the stream.
145 if (output_blocks.empty())
146 {
147 if (blocks_fetched != blocks_output)
148 LOG_ERROR(log, "Logical error: CollapsingFinalBlockInputStream has output " << blocks_output << " blocks instead of " << blocks_fetched);
149
150 return Block();
151 }
152
153 MergingBlock * merging_block = output_blocks.back();
154 Block block = merging_block->block;
155
156 for (size_t i = 0; i < block.columns(); ++i)
157 block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(merging_block->filter, -1);
158
159 output_blocks.pop_back();
160 delete merging_block;
161
162 ++blocks_output;
163
164 if (block)
165 return block;
166 }
167}
168
169}
170