1 | #pragma once |
2 | |
3 | #include <common/logger_useful.h> |
4 | |
5 | #include <DataStreams/MergingSortedBlockInputStream.h> |
6 | #include <DataStreams/ColumnGathererStream.h> |
7 | |
8 | #include <deque> |
9 | |
10 | |
11 | namespace DB |
12 | { |
13 | |
14 | namespace ErrorCodes |
15 | { |
16 | extern const int LOGICAL_ERROR; |
17 | } |
18 | |
19 | static const size_t MAX_ROWS_IN_MULTIVERSION_QUEUE = 8192; |
20 | |
21 | |
22 | /* Deque with fixed memory size. Allows pushing gaps. |
23 | * frontGap() returns the number of gaps were inserted before front. |
24 | * |
25 | * This structure may be implemented via std::deque, but |
26 | * - Deque uses fixed amount of memory which is allocated in constructor. No more allocations are performed. |
27 | * - Gaps are not stored as separate values in queue, which is more memory efficient. |
28 | * - Deque is responsible for gaps invariant: after removing element, moves gaps into neighbor cell. |
29 | * |
30 | * Note: empty deque may have non-zero front gap. |
31 | */ |
32 | template <typename T> |
33 | class FixedSizeDequeWithGaps |
34 | { |
35 | public: |
36 | |
37 | struct ValueWithGap |
38 | { |
39 | /// The number of gaps before current element. The number of gaps after last element stores into end cell. |
40 | size_t gap; |
41 | /// Store char[] instead of T in order to make ValueWithGap POD. |
42 | /// Call placement constructors after push and and destructors after pop. |
43 | char value[sizeof(T)]; |
44 | }; |
45 | |
46 | explicit FixedSizeDequeWithGaps(size_t size) |
47 | { |
48 | container.resize_fill(size + 1); |
49 | } |
50 | |
51 | ~FixedSizeDequeWithGaps() |
52 | { |
53 | auto destruct_range = [this](size_t from, size_t to) |
54 | { |
55 | for (size_t i = from; i < to; ++i) |
56 | destructValue(i); |
57 | }; |
58 | |
59 | if (begin <= end) |
60 | destruct_range(begin, end); |
61 | else |
62 | { |
63 | destruct_range(0, end); |
64 | destruct_range(begin, container.size()); |
65 | } |
66 | } |
67 | |
68 | void pushBack(const T & value) |
69 | { |
70 | checkEnoughSpaceToInsert(); |
71 | constructValue(end, value); |
72 | moveRight(end); |
73 | container[end].gap = 0; |
74 | } |
75 | |
76 | void pushGap(size_t count) { container[end].gap += count; } |
77 | |
78 | void popBack() |
79 | { |
80 | checkHasValuesToRemove(); |
81 | size_t curr_gap = container[end].gap; |
82 | moveLeft(end); |
83 | destructValue(end); |
84 | container[end].gap += curr_gap; |
85 | } |
86 | |
87 | void popFront() |
88 | { |
89 | checkHasValuesToRemove(); |
90 | destructValue(begin); |
91 | moveRight(begin); |
92 | } |
93 | |
94 | T & front() |
95 | { |
96 | checkHasValuesToGet(); |
97 | return getValue(begin); |
98 | } |
99 | const T & front() const |
100 | { |
101 | checkHasValuesToGet(); |
102 | return getValue(begin); |
103 | } |
104 | |
105 | const T & back() const |
106 | { |
107 | size_t ps = end; |
108 | moveLeft(ps); |
109 | return getValue(ps); |
110 | } |
111 | |
112 | size_t & frontGap() { return container[begin].gap; } |
113 | const size_t & frontGap() const { return container[begin].gap; } |
114 | |
115 | size_t size() const |
116 | { |
117 | if (begin <= end) |
118 | return end - begin; |
119 | return end + (container.size() - begin); |
120 | } |
121 | |
122 | bool empty() const { return begin == end; } |
123 | |
124 | private: |
125 | PODArray<ValueWithGap> container; |
126 | |
127 | size_t gap_before_first = 0; |
128 | size_t begin = 0; |
129 | size_t end = 0; |
130 | |
131 | void constructValue(size_t index, const T & value) { new (container[index].value) T(value); } |
132 | void destructValue(size_t index) { reinterpret_cast<T *>(container[index].value)->~T(); } |
133 | |
134 | T & getValue(size_t index) { return *reinterpret_cast<T *>(container[index].value); } |
135 | const T & getValue(size_t index) const { return *reinterpret_cast<const T *>(container[index].value); } |
136 | |
137 | void moveRight(size_t & index) const |
138 | { |
139 | ++index; |
140 | |
141 | if (index == container.size()) |
142 | index = 0; |
143 | } |
144 | |
145 | void moveLeft(size_t & index) const |
146 | { |
147 | if (index == 0) |
148 | index = container.size(); |
149 | |
150 | --index; |
151 | } |
152 | |
153 | void checkEnoughSpaceToInsert() const |
154 | { |
155 | if (size() + 1 == container.size()) |
156 | throw Exception("Not enough space to insert into FixedSizeDequeWithGaps with capacity " |
157 | + toString(container.size() - 1), ErrorCodes::LOGICAL_ERROR); |
158 | } |
159 | |
160 | void checkHasValuesToRemove() const |
161 | { |
162 | if (empty()) |
163 | throw Exception("Cannot remove from empty FixedSizeDequeWithGaps" , ErrorCodes::LOGICAL_ERROR); |
164 | } |
165 | |
166 | void checkHasValuesToGet() const |
167 | { |
168 | if (empty()) |
169 | throw Exception("Cannot get value from empty FixedSizeDequeWithGaps" , ErrorCodes::LOGICAL_ERROR); |
170 | } |
171 | }; |
172 | |
173 | class VersionedCollapsingSortedBlockInputStream : public MergingSortedBlockInputStream |
174 | { |
175 | public: |
176 | /// Don't need version column. It's in primary key. |
177 | /// max_rows_in_queue should be about max_block_size_ if we won't store a lot of extra blocks (RowRef holds SharedBlockPtr). |
178 | VersionedCollapsingSortedBlockInputStream( |
179 | const BlockInputStreams & inputs_, const SortDescription & description_, |
180 | const String & sign_column_, size_t max_block_size_, |
181 | WriteBuffer * out_row_sources_buf_ = nullptr, bool average_block_sizes_ = false); |
182 | |
183 | String getName() const override { return "VersionedCollapsingSorted" ; } |
184 | |
185 | protected: |
186 | /// Can return 1 more records than max_block_size. |
187 | Block readImpl() override; |
188 | |
189 | private: |
190 | size_t sign_column_number = 0; |
191 | |
192 | Logger * log = &Logger::get("VersionedCollapsingSortedBlockInputStream" ); |
193 | |
194 | /// Read is finished. |
195 | bool finished = false; |
196 | |
197 | Int8 sign_in_queue = 0; |
198 | const size_t max_rows_in_queue; |
199 | /// Rows with the same primary key and sign. |
200 | FixedSizeDequeWithGaps<SharedBlockRowRef> current_keys; |
201 | |
202 | size_t blocks_written = 0; |
203 | |
204 | /// Sources of rows for VERTICAL merge algorithm. Size equals to (size + number of gaps) in current_keys. |
205 | std::queue<RowSourcePart> current_row_sources; |
206 | |
207 | void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue); |
208 | |
209 | /// Output to result row for the current primary key. |
210 | void insertRow(size_t skip_rows, const SharedBlockRowRef & row, MutableColumns & merged_columns); |
211 | |
212 | void insertGap(size_t gap_size); |
213 | }; |
214 | |
215 | } |
216 | |