1 | #include <DataStreams/CollapsingFinalBlockInputStream.h> |
2 | #include <Common/typeid_cast.h> |
3 | |
4 | /// Maximum number of messages about incorrect data in the log. |
5 | #define MAX_ERROR_MESSAGES 10 |
6 | |
7 | |
8 | namespace DB |
9 | { |
10 | |
11 | CollapsingFinalBlockInputStream::~CollapsingFinalBlockInputStream() |
12 | { |
13 | /// You must cancel all `MergingBlockPtr` so that they do not try to put blocks in `output_blocks`. |
14 | previous.block.cancel(); |
15 | last_positive.block.cancel(); |
16 | |
17 | while (!queue.empty()) |
18 | { |
19 | Cursor c = queue.top(); |
20 | queue.pop(); |
21 | c.block.cancel(); |
22 | } |
23 | |
24 | for (size_t i = 0; i < output_blocks.size(); ++i) |
25 | delete output_blocks[i]; |
26 | } |
27 | |
28 | void CollapsingFinalBlockInputStream::reportBadCounts() |
29 | { |
30 | /// With inconsistent data, this is an unavoidable error that can not be easily fixed by admins. Therefore Warning. |
31 | LOG_WARNING(log, "Incorrect data: number of rows with sign = 1 (" << count_positive |
32 | << ") differs with number of rows with sign = -1 (" << count_negative |
33 | << ") by more than one" ); |
34 | } |
35 | |
36 | void CollapsingFinalBlockInputStream::reportBadSign(Int8 sign) |
37 | { |
38 | LOG_ERROR(log, "Invalid sign: " << static_cast<int>(sign)); |
39 | } |
40 | |
41 | void CollapsingFinalBlockInputStream::fetchNextBlock(size_t input_index) |
42 | { |
43 | BlockInputStreamPtr stream = children[input_index]; |
44 | Block block = stream->read(); |
45 | if (!block) |
46 | return; |
47 | MergingBlockPtr merging_block(new MergingBlock(block, input_index, description, sign_column_name, &output_blocks)); |
48 | ++blocks_fetched; |
49 | queue.push(Cursor(merging_block)); |
50 | } |
51 | |
52 | void CollapsingFinalBlockInputStream::commitCurrent() |
53 | { |
54 | if (count_positive || count_negative) |
55 | { |
56 | if (count_positive >= count_negative && last_is_positive) |
57 | { |
58 | last_positive.addToFilter(); |
59 | } |
60 | |
61 | if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1)) |
62 | { |
63 | if (count_incorrect_data < MAX_ERROR_MESSAGES) |
64 | reportBadCounts(); |
65 | ++count_incorrect_data; |
66 | } |
67 | |
68 | last_positive = Cursor(); |
69 | previous = Cursor(); |
70 | } |
71 | |
72 | count_negative = 0; |
73 | count_positive = 0; |
74 | } |
75 | |
76 | Block CollapsingFinalBlockInputStream::readImpl() |
77 | { |
78 | if (first) |
79 | { |
80 | for (size_t i = 0; i < children.size(); ++i) |
81 | fetchNextBlock(i); |
82 | |
83 | first = false; |
84 | } |
85 | |
86 | /// We will create blocks for the answer until we get a non-empty block. |
87 | while (true) |
88 | { |
89 | while (!queue.empty() && output_blocks.empty()) |
90 | { |
91 | Cursor current = queue.top(); |
92 | queue.pop(); |
93 | |
94 | bool has_next = !queue.empty(); |
95 | Cursor next = has_next ? queue.top() : Cursor(); |
96 | |
97 | /// We will advance in the current block, not using the queue, as long as possible. |
98 | while (true) |
99 | { |
100 | if (!current.equal(previous)) |
101 | { |
102 | commitCurrent(); |
103 | previous = current; |
104 | } |
105 | |
106 | Int8 sign = current.getSign(); |
107 | if (sign == 1) |
108 | { |
109 | last_positive = current; |
110 | last_is_positive = true; |
111 | ++count_positive; |
112 | } |
113 | else if (sign == -1) |
114 | { |
115 | last_is_positive = false; |
116 | ++count_negative; |
117 | } |
118 | else |
119 | reportBadSign(sign); |
120 | |
121 | if (current.isLast()) |
122 | { |
123 | fetchNextBlock(current.block->stream_index); |
124 | |
125 | /// All streams are over. We'll process the last key. |
126 | if (!has_next) |
127 | commitCurrent(); |
128 | |
129 | break; |
130 | } |
131 | else |
132 | { |
133 | current.next(); |
134 | |
135 | if (has_next && !(next < current)) |
136 | { |
137 | queue.push(current); |
138 | break; |
139 | } |
140 | } |
141 | } |
142 | } |
143 | |
144 | /// End of the stream. |
145 | if (output_blocks.empty()) |
146 | { |
147 | if (blocks_fetched != blocks_output) |
148 | LOG_ERROR(log, "Logical error: CollapsingFinalBlockInputStream has output " << blocks_output << " blocks instead of " << blocks_fetched); |
149 | |
150 | return Block(); |
151 | } |
152 | |
153 | MergingBlock * merging_block = output_blocks.back(); |
154 | Block block = merging_block->block; |
155 | |
156 | for (size_t i = 0; i < block.columns(); ++i) |
157 | block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(merging_block->filter, -1); |
158 | |
159 | output_blocks.pop_back(); |
160 | delete merging_block; |
161 | |
162 | ++blocks_output; |
163 | |
164 | if (block) |
165 | return block; |
166 | } |
167 | } |
168 | |
169 | } |
170 | |