1#pragma once
2
3#include <common/logger_useful.h>
4
5#include <DataStreams/MergingSortedBlockInputStream.h>
6#include <DataStreams/ColumnGathererStream.h>
7
8#include <deque>
9
10
11namespace DB
12{
13
14namespace ErrorCodes
15{
16 extern const int LOGICAL_ERROR;
17}
18
19static const size_t MAX_ROWS_IN_MULTIVERSION_QUEUE = 8192;
20
21
22/* Deque with fixed memory size. Allows pushing gaps.
23 * frontGap() returns the number of gaps were inserted before front.
24 *
25 * This structure may be implemented via std::deque, but
26 * - Deque uses fixed amount of memory which is allocated in constructor. No more allocations are performed.
27 * - Gaps are not stored as separate values in queue, which is more memory efficient.
28 * - Deque is responsible for gaps invariant: after removing element, moves gaps into neighbor cell.
29 *
30 * Note: empty deque may have non-zero front gap.
31 */
32template <typename T>
33class FixedSizeDequeWithGaps
34{
35public:
36
37 struct ValueWithGap
38 {
39 /// The number of gaps before current element. The number of gaps after last element stores into end cell.
40 size_t gap;
41 /// Store char[] instead of T in order to make ValueWithGap POD.
42 /// Call placement constructors after push and and destructors after pop.
43 char value[sizeof(T)];
44 };
45
46 explicit FixedSizeDequeWithGaps(size_t size)
47 {
48 container.resize_fill(size + 1);
49 }
50
51 ~FixedSizeDequeWithGaps()
52 {
53 auto destruct_range = [this](size_t from, size_t to)
54 {
55 for (size_t i = from; i < to; ++i)
56 destructValue(i);
57 };
58
59 if (begin <= end)
60 destruct_range(begin, end);
61 else
62 {
63 destruct_range(0, end);
64 destruct_range(begin, container.size());
65 }
66 }
67
68 void pushBack(const T & value)
69 {
70 checkEnoughSpaceToInsert();
71 constructValue(end, value);
72 moveRight(end);
73 container[end].gap = 0;
74 }
75
76 void pushGap(size_t count) { container[end].gap += count; }
77
78 void popBack()
79 {
80 checkHasValuesToRemove();
81 size_t curr_gap = container[end].gap;
82 moveLeft(end);
83 destructValue(end);
84 container[end].gap += curr_gap;
85 }
86
87 void popFront()
88 {
89 checkHasValuesToRemove();
90 destructValue(begin);
91 moveRight(begin);
92 }
93
94 T & front()
95 {
96 checkHasValuesToGet();
97 return getValue(begin);
98 }
99 const T & front() const
100 {
101 checkHasValuesToGet();
102 return getValue(begin);
103 }
104
105 const T & back() const
106 {
107 size_t ps = end;
108 moveLeft(ps);
109 return getValue(ps);
110 }
111
112 size_t & frontGap() { return container[begin].gap; }
113 const size_t & frontGap() const { return container[begin].gap; }
114
115 size_t size() const
116 {
117 if (begin <= end)
118 return end - begin;
119 return end + (container.size() - begin);
120 }
121
122 bool empty() const { return begin == end; }
123
124private:
125 PODArray<ValueWithGap> container;
126
127 size_t gap_before_first = 0;
128 size_t begin = 0;
129 size_t end = 0;
130
131 void constructValue(size_t index, const T & value) { new (container[index].value) T(value); }
132 void destructValue(size_t index) { reinterpret_cast<T *>(container[index].value)->~T(); }
133
134 T & getValue(size_t index) { return *reinterpret_cast<T *>(container[index].value); }
135 const T & getValue(size_t index) const { return *reinterpret_cast<const T *>(container[index].value); }
136
137 void moveRight(size_t & index) const
138 {
139 ++index;
140
141 if (index == container.size())
142 index = 0;
143 }
144
145 void moveLeft(size_t & index) const
146 {
147 if (index == 0)
148 index = container.size();
149
150 --index;
151 }
152
153 void checkEnoughSpaceToInsert() const
154 {
155 if (size() + 1 == container.size())
156 throw Exception("Not enough space to insert into FixedSizeDequeWithGaps with capacity "
157 + toString(container.size() - 1), ErrorCodes::LOGICAL_ERROR);
158 }
159
160 void checkHasValuesToRemove() const
161 {
162 if (empty())
163 throw Exception("Cannot remove from empty FixedSizeDequeWithGaps", ErrorCodes::LOGICAL_ERROR);
164 }
165
166 void checkHasValuesToGet() const
167 {
168 if (empty())
169 throw Exception("Cannot get value from empty FixedSizeDequeWithGaps", ErrorCodes::LOGICAL_ERROR);
170 }
171};
172
173class VersionedCollapsingSortedBlockInputStream : public MergingSortedBlockInputStream
174{
175public:
176 /// Don't need version column. It's in primary key.
177 /// max_rows_in_queue should be about max_block_size_ if we won't store a lot of extra blocks (RowRef holds SharedBlockPtr).
178 VersionedCollapsingSortedBlockInputStream(
179 const BlockInputStreams & inputs_, const SortDescription & description_,
180 const String & sign_column_, size_t max_block_size_,
181 WriteBuffer * out_row_sources_buf_ = nullptr, bool average_block_sizes_ = false);
182
183 String getName() const override { return "VersionedCollapsingSorted"; }
184
185protected:
186 /// Can return 1 more records than max_block_size.
187 Block readImpl() override;
188
189private:
190 size_t sign_column_number = 0;
191
192 Logger * log = &Logger::get("VersionedCollapsingSortedBlockInputStream");
193
194 /// Read is finished.
195 bool finished = false;
196
197 Int8 sign_in_queue = 0;
198 const size_t max_rows_in_queue;
199 /// Rows with the same primary key and sign.
200 FixedSizeDequeWithGaps<SharedBlockRowRef> current_keys;
201
202 size_t blocks_written = 0;
203
204 /// Sources of rows for VERTICAL merge algorithm. Size equals to (size + number of gaps) in current_keys.
205 std::queue<RowSourcePart> current_row_sources;
206
207 void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
208
209 /// Output to result row for the current primary key.
210 void insertRow(size_t skip_rows, const SharedBlockRowRef & row, MutableColumns & merged_columns);
211
212 void insertGap(size_t gap_size);
213};
214
215}
216