1 | #include "duckdb/common/vector_operations/binary_executor.hpp" |
2 | #include "duckdb/storage/data_table.hpp" |
3 | #include "duckdb/common/operator/comparison_operators.hpp" |
4 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
5 | #include "duckdb/storage/uncompressed_segment.hpp" |
6 | #include "duckdb/common/exception.hpp" |
7 | #include "duckdb/common/types/vector.hpp" |
8 | #include "duckdb/transaction/update_info.hpp" |
9 | |
10 | using namespace duckdb; |
11 | using namespace std; |
12 | |
13 | UncompressedSegment::UncompressedSegment(BufferManager &manager, TypeId type, idx_t row_start) |
14 | : manager(manager), type(type), block_id(INVALID_BLOCK), max_vector_count(0), tuple_count(0), |
15 | row_start(row_start), |
16 | versions(nullptr) { |
17 | } |
18 | |
19 | UncompressedSegment::~UncompressedSegment() { |
20 | if (block_id >= MAXIMUM_BLOCK) { |
21 | // if the uncompressed segment had an in-memory segment, destroy it when the uncompressed segment is destroyed |
22 | manager.DestroyBuffer(block_id); |
23 | } |
24 | } |
25 | |
26 | void UncompressedSegment::Verify(Transaction &transaction) { |
27 | #ifdef DEBUG |
28 | ColumnScanState state; |
29 | InitializeScan(state); |
30 | |
31 | Vector result(this->type); |
32 | for (idx_t i = 0; i < this->tuple_count; i += STANDARD_VECTOR_SIZE) { |
33 | idx_t vector_idx = i / STANDARD_VECTOR_SIZE; |
34 | idx_t count = std::min((idx_t)STANDARD_VECTOR_SIZE, tuple_count - i); |
35 | Scan(transaction, state, vector_idx, result); |
36 | result.Verify(count); |
37 | } |
38 | #endif |
39 | } |
40 | |
41 | static void CheckForConflicts(UpdateInfo *info, Transaction &transaction, row_t *ids, idx_t count, row_t offset, |
42 | UpdateInfo *&node) { |
43 | if (info->version_number == transaction.transaction_id) { |
44 | // this UpdateInfo belongs to the current transaction, set it in the node |
45 | node = info; |
46 | } else if (info->version_number > transaction.start_time) { |
47 | // potential conflict, check that tuple ids do not conflict |
48 | // as both ids and info->tuples are sorted, this is similar to a merge join |
49 | idx_t i = 0, j = 0; |
50 | while (true) { |
51 | auto id = ids[i] - offset; |
52 | if (id == info->tuples[j]) { |
53 | throw TransactionException("Conflict on update!" ); |
54 | } else if (id < info->tuples[j]) { |
55 | // id < the current tuple in info, move to next id |
56 | i++; |
57 | if (i == count) { |
58 | break; |
59 | } |
60 | } else { |
61 | // id > the current tuple, move to next tuple in info |
62 | j++; |
63 | if (j == info->N) { |
64 | break; |
65 | } |
66 | } |
67 | } |
68 | } |
69 | if (info->next) { |
70 | CheckForConflicts(info->next, transaction, ids, count, offset, node); |
71 | } |
72 | } |
73 | |
74 | void UncompressedSegment::Update(ColumnData &column_data, SegmentStatistics &stats, Transaction &transaction, |
75 | Vector &update, row_t *ids, idx_t count, row_t offset) { |
76 | // can only perform in-place updates on temporary blocks |
77 | assert(block_id >= MAXIMUM_BLOCK); |
78 | |
79 | // obtain an exclusive lock |
80 | auto write_lock = lock.GetExclusiveLock(); |
81 | |
82 | #ifdef DEBUG |
83 | // verify that the ids are sorted and there are no duplicates |
84 | for (idx_t i = 1; i < count; i++) { |
85 | assert(ids[i] > ids[i - 1]); |
86 | } |
87 | #endif |
88 | |
89 | // create the versions for this segment, if there are none yet |
90 | if (!versions) { |
91 | this->versions = unique_ptr<UpdateInfo *[]>(new UpdateInfo *[max_vector_count]); |
92 | for (idx_t i = 0; i < max_vector_count; i++) { |
93 | this->versions[i] = nullptr; |
94 | } |
95 | } |
96 | |
97 | // get the vector index based on the first id |
98 | // we assert that all updates must be part of the same vector |
99 | auto first_id = ids[0]; |
100 | idx_t vector_index = (first_id - offset) / STANDARD_VECTOR_SIZE; |
101 | idx_t vector_offset = offset + vector_index * STANDARD_VECTOR_SIZE; |
102 | |
103 | assert(first_id >= offset); |
104 | assert(vector_index < max_vector_count); |
105 | |
106 | // first check the version chain |
107 | UpdateInfo *node = nullptr; |
108 | if (versions[vector_index]) { |
109 | // there is already a version here, check if there are any conflicts and search for the node that belongs to |
110 | // this transaction in the version chain |
111 | CheckForConflicts(versions[vector_index], transaction, ids, count, vector_offset, node); |
112 | } |
113 | Update(column_data, stats, transaction, update, ids, count, vector_index, vector_offset, node); |
114 | } |
115 | |
116 | UpdateInfo *UncompressedSegment::CreateUpdateInfo(ColumnData &column_data, Transaction &transaction, row_t *ids, |
117 | idx_t count, idx_t vector_index, idx_t vector_offset, |
118 | idx_t type_size) { |
119 | auto node = transaction.CreateUpdateInfo(type_size, STANDARD_VECTOR_SIZE); |
120 | node->column_data = &column_data; |
121 | node->segment = this; |
122 | node->vector_index = vector_index; |
123 | node->prev = nullptr; |
124 | node->next = versions[vector_index]; |
125 | if (node->next) { |
126 | node->next->prev = node; |
127 | } |
128 | versions[vector_index] = node; |
129 | |
130 | // set up the tuple ids |
131 | node->N = count; |
132 | for (idx_t i = 0; i < count; i++) { |
133 | assert((idx_t) ids[i] >= vector_offset && (idx_t) ids[i] < vector_offset + STANDARD_VECTOR_SIZE); |
134 | node->tuples[i] = ids[i] - vector_offset; |
135 | }; |
136 | return node; |
137 | } |
138 | |
139 | void UncompressedSegment::Fetch(ColumnScanState &state, idx_t vector_index, Vector &result) { |
140 | auto read_lock = lock.GetSharedLock(); |
141 | InitializeScan(state); |
142 | FetchBaseData(state, vector_index, result); |
143 | } |
144 | |
145 | //===--------------------------------------------------------------------===// |
146 | // Filter |
147 | //===--------------------------------------------------------------------===// |
148 | template<class T> |
149 | static void filterSelectionType(T *vec, T *predicate, SelectionVector &sel, idx_t &approved_tuple_count, |
150 | ExpressionType comparison_type, nullmask_t &nullmask) { |
151 | SelectionVector new_sel(approved_tuple_count); |
152 | // the inplace loops take the result as the last parameter |
153 | switch (comparison_type) { |
154 | case ExpressionType::COMPARE_EQUAL: { |
155 | if (nullmask.any()) { |
156 | approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, Equals, false, true, true, true, false>( |
157 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
158 | } else { |
159 | approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, Equals, false, true, false, true, false>( |
160 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
161 | } |
162 | break; |
163 | } |
164 | case ExpressionType::COMPARE_LESSTHAN: { |
165 | if (nullmask.any()) { |
166 | approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, LessThan, false, true, true, true, false>( |
167 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
168 | } else { |
169 | approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, LessThan, false, true, false, true, false>( |
170 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
171 | } |
172 | break; |
173 | } |
174 | case ExpressionType::COMPARE_GREATERTHAN: { |
175 | if (nullmask.any()) { |
176 | approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, GreaterThan, false, true, true, true, false>( |
177 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
178 | } else { |
179 | approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, GreaterThan, false, true, false, true, false>( |
180 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
181 | } |
182 | break; |
183 | } |
184 | case ExpressionType::COMPARE_LESSTHANOREQUALTO: { |
185 | if (nullmask.any()) { |
186 | approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, LessThanEquals, false, true, true, true, false>( |
187 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
188 | } else { |
189 | approved_tuple_count = |
190 | BinaryExecutor::SelectFlatLoop<T, T, LessThanEquals, false, true, false, true, false>( |
191 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
192 | } |
193 | break; |
194 | } |
195 | case ExpressionType::COMPARE_GREATERTHANOREQUALTO: { |
196 | if (nullmask.any()) { |
197 | approved_tuple_count = |
198 | BinaryExecutor::SelectFlatLoop<T, T, GreaterThanEquals, false, true, true, true, false>( |
199 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
200 | } else { |
201 | approved_tuple_count = |
202 | BinaryExecutor::SelectFlatLoop<T, T, GreaterThanEquals, false, true, false, true, false>( |
203 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
204 | } |
205 | break; |
206 | } |
207 | default: |
208 | throw NotImplementedException("Unknown comparison type for filter pushed down to table!" ); |
209 | } |
210 | sel.Initialize(new_sel); |
211 | } |
212 | |
213 | void UncompressedSegment::filterSelection(SelectionVector &sel, Vector &result, TableFilter filter, |
214 | idx_t &approved_tuple_count, nullmask_t &nullmask) { |
215 | // the inplace loops take the result as the last parameter |
216 | switch (result.type) { |
217 | case TypeId::INT8: { |
218 | auto result_flat = FlatVector::GetData<int8_t>(result); |
219 | auto predicate_vector = Vector(filter.constant.value_.tinyint); |
220 | auto predicate = FlatVector::GetData<int8_t>(predicate_vector); |
221 | filterSelectionType<int8_t>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type, |
222 | nullmask); |
223 | break; |
224 | } |
225 | case TypeId::INT16: { |
226 | auto result_flat = FlatVector::GetData<int16_t>(result); |
227 | auto predicate_vector = Vector(filter.constant.value_.smallint); |
228 | auto predicate = FlatVector::GetData<int16_t>(predicate_vector); |
229 | filterSelectionType<int16_t>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type, |
230 | nullmask); |
231 | break; |
232 | } |
233 | case TypeId::INT32: { |
234 | auto result_flat = FlatVector::GetData<int32_t>(result); |
235 | auto predicate_vector = Vector(filter.constant.value_.integer); |
236 | auto predicate = FlatVector::GetData<int32_t>(predicate_vector); |
237 | filterSelectionType<int32_t>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type, |
238 | nullmask); |
239 | break; |
240 | } |
241 | case TypeId::INT64: { |
242 | auto result_flat = FlatVector::GetData<int64_t>(result); |
243 | auto predicate_vector = Vector(filter.constant.value_.bigint); |
244 | auto predicate = FlatVector::GetData<int64_t>(predicate_vector); |
245 | filterSelectionType<int64_t>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type, |
246 | nullmask); |
247 | break; |
248 | } |
249 | case TypeId::FLOAT: { |
250 | auto result_flat = FlatVector::GetData<float>(result); |
251 | auto predicate_vector = Vector(filter.constant.value_.float_); |
252 | auto predicate = FlatVector::GetData<float>(predicate_vector); |
253 | filterSelectionType<float>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type, |
254 | nullmask); |
255 | break; |
256 | } |
257 | case TypeId::DOUBLE: { |
258 | auto result_flat = FlatVector::GetData<double>(result); |
259 | auto predicate_vector = Vector(filter.constant.value_.double_); |
260 | auto predicate = FlatVector::GetData<double>(predicate_vector); |
261 | filterSelectionType<double>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type, |
262 | nullmask); |
263 | break; |
264 | } |
265 | case TypeId::VARCHAR: { |
266 | auto result_flat = FlatVector::GetData<string_t>(result); |
267 | auto predicate_vector = Vector(filter.constant.str_value); |
268 | auto predicate = FlatVector::GetData<string_t>(predicate_vector); |
269 | filterSelectionType<string_t>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type, |
270 | nullmask); |
271 | break; |
272 | } |
273 | default: |
274 | throw InvalidTypeException(result.type, "Invalid type for filter pushed down to table comparison" ); |
275 | } |
276 | } |
277 | |
278 | void UncompressedSegment::Select(Transaction &transaction, Vector &result, vector<TableFilter> &tableFilters, |
279 | SelectionVector &sel, idx_t &approved_tuple_count, ColumnScanState &state) { |
280 | auto read_lock = lock.GetSharedLock(); |
281 | if (versions && versions[state.vector_index]) { |
282 | Scan(transaction, state, state.vector_index, result, false); |
283 | auto vector_index = state.vector_index; |
284 | // pin the buffer for this segment |
285 | auto handle = manager.Pin(block_id); |
286 | auto data = handle->node->buffer; |
287 | auto offset = vector_index * vector_size; |
288 | auto source_nullmask = (nullmask_t *) (data + offset); |
289 | for (auto &table_filter : tableFilters) { |
290 | filterSelection(sel, result, table_filter, approved_tuple_count, *source_nullmask); |
291 | } |
292 | } else { |
293 | //! Select the data from the base table |
294 | Select(state, result, sel, approved_tuple_count, tableFilters); |
295 | } |
296 | } |
297 | |
298 | //===--------------------------------------------------------------------===// |
299 | // Scan |
300 | //===--------------------------------------------------------------------===// |
301 | void UncompressedSegment::Scan(Transaction &transaction, ColumnScanState &state, idx_t vector_index, Vector &result, |
302 | bool get_lock) { |
303 | unique_ptr<StorageLockKey> read_lock; |
304 | if (get_lock) { |
305 | read_lock = lock.GetSharedLock(); |
306 | } |
307 | // first fetch the data from the base table |
308 | FetchBaseData(state, vector_index, result); |
309 | if (versions && versions[vector_index]) { |
310 | // if there are any versions, check if we need to overwrite the data with the versioned data |
311 | FetchUpdateData(state, transaction, versions[vector_index], result); |
312 | } |
313 | } |
314 | |
315 | void UncompressedSegment::FilterScan(Transaction &transaction, ColumnScanState &state, Vector &result, |
316 | SelectionVector &sel, idx_t &approved_tuple_count) { |
317 | auto read_lock = lock.GetSharedLock(); |
318 | if (versions && versions[state.vector_index]) { |
319 | // if there are any versions, we do a regular scan |
320 | Scan(transaction, state, state.vector_index, result, false); |
321 | result.Slice(sel, approved_tuple_count); |
322 | } else { |
323 | FilterFetchBaseData(state, result, sel, approved_tuple_count); |
324 | } |
325 | } |
326 | |
327 | void UncompressedSegment::IndexScan(ColumnScanState &state, idx_t vector_index, Vector &result) { |
328 | if (vector_index == 0) { |
329 | // vector_index = 0, obtain a shared lock on the segment that we keep until the index scan is complete |
330 | state.locks.push_back(lock.GetSharedLock()); |
331 | } |
332 | if (versions && versions[vector_index]) { |
333 | throw TransactionException("Cannot create index with outstanding updates" ); |
334 | } |
335 | FetchBaseData(state, vector_index, result); |
336 | } |
337 | |
338 | //===--------------------------------------------------------------------===// |
339 | // Update |
340 | //===--------------------------------------------------------------------===// |
341 | void UncompressedSegment::CleanupUpdate(UpdateInfo *info) { |
342 | if (info->prev) { |
343 | // there is a prev info: remove from the chain |
344 | auto prev = info->prev; |
345 | prev->next = info->next; |
346 | if (prev->next) { |
347 | prev->next->prev = prev; |
348 | } |
349 | } else { |
350 | // there is no prev info: remove from base segment |
351 | info->segment->versions[info->vector_index] = info->next; |
352 | if (info->next) { |
353 | info->next->prev = nullptr; |
354 | } |
355 | } |
356 | } |
357 | |
358 | //===--------------------------------------------------------------------===// |
359 | // ToTemporary |
360 | //===--------------------------------------------------------------------===// |
361 | void UncompressedSegment::ToTemporary() { |
362 | auto write_lock = lock.GetExclusiveLock(); |
363 | |
364 | if (block_id >= MAXIMUM_BLOCK) { |
365 | // conversion has already been performed by a different thread |
366 | return; |
367 | } |
368 | // pin the current block |
369 | auto current = manager.Pin(block_id); |
370 | |
371 | // now allocate a new block from the buffer manager |
372 | auto handle = manager.Allocate(Storage::BLOCK_ALLOC_SIZE); |
373 | // now copy the data over and switch to using the new block id |
374 | memcpy(handle->node->buffer, current->node->buffer, Storage::BLOCK_SIZE); |
375 | this->block_id = handle->block_id; |
376 | } |
377 | |