1#include "duckdb/common/operator/comparison_operators.hpp"
2#include "duckdb/storage/numeric_segment.hpp"
3#include "duckdb/storage/buffer_manager.hpp"
4#include "duckdb/common/types/vector.hpp"
5#include "duckdb/storage/table/append_state.hpp"
6#include "duckdb/transaction/update_info.hpp"
7#include "duckdb/transaction/transaction.hpp"
8#include "duckdb/common/vector_operations/vector_operations.hpp"
9#include "duckdb/storage/data_table.hpp"
10
11using namespace duckdb;
12using namespace std;
13
14static NumericSegment::append_function_t GetAppendFunction(TypeId type);
15
16static NumericSegment::update_function_t GetUpdateFunction(TypeId type);
17
18static NumericSegment::update_info_fetch_function_t GetUpdateInfoFetchFunction(TypeId type);
19
20static NumericSegment::rollback_update_function_t GetRollbackUpdateFunction(TypeId type);
21
22static NumericSegment::merge_update_function_t GetMergeUpdateFunction(TypeId type);
23
24static NumericSegment::update_info_append_function_t GetUpdateInfoAppendFunction(TypeId type);
25
26NumericSegment::NumericSegment(BufferManager &manager, TypeId type, idx_t row_start, block_id_t block)
27 : UncompressedSegment(manager, type, row_start) {
28 // set up the different functions for this type of segment
29 this->append_function = GetAppendFunction(type);
30 this->update_function = GetUpdateFunction(type);
31 this->fetch_from_update_info = GetUpdateInfoFetchFunction(type);
32 this->append_from_update_info = GetUpdateInfoAppendFunction(type);
33 this->rollback_update = GetRollbackUpdateFunction(type);
34 this->merge_update_function = GetMergeUpdateFunction(type);
35
36 // figure out how many vectors we want to store in this block
37 this->type_size = GetTypeIdSize(type);
38 this->vector_size = sizeof(nullmask_t) + type_size * STANDARD_VECTOR_SIZE;
39 this->max_vector_count = Storage::BLOCK_SIZE / vector_size;
40
41 this->block_id = block;
42 if (block_id == INVALID_BLOCK) {
43 // no block id specified: allocate a buffer for the uncompressed segment
44 auto handle = manager.Allocate(Storage::BLOCK_ALLOC_SIZE);
45 this->block_id = handle->block_id;
46 // initialize nullmasks to 0 for all vectors
47 for (idx_t i = 0; i < max_vector_count; i++) {
48 auto mask = (nullmask_t *)(handle->node->buffer + (i * vector_size));
49 mask->reset();
50 }
51 }
52}
53
54template <class T, class OP>
55void Select(SelectionVector &sel, Vector &result, unsigned char *source, nullmask_t *source_nullmask, T constant,
56 idx_t &approved_tuple_count) {
57 result.vector_type = VectorType::FLAT_VECTOR;
58 auto result_data = FlatVector::GetData(result);
59 SelectionVector new_sel(approved_tuple_count);
60 idx_t result_count = 0;
61 if (source_nullmask->any()) {
62 for (idx_t i = 0; i < approved_tuple_count; i++) {
63 idx_t src_idx = sel.get_index(i);
64 if (!(*source_nullmask)[src_idx] && OP::Operation(((T *)source)[src_idx], constant)) {
65 ((T *)result_data)[src_idx] = ((T *)source)[src_idx];
66 new_sel.set_index(result_count++, src_idx);
67 }
68 }
69 } else {
70 for (idx_t i = 0; i < approved_tuple_count; i++) {
71 idx_t src_idx = sel.get_index(i);
72 if (OP::Operation(((T *)source)[src_idx], constant)) {
73 ((T *)result_data)[src_idx] = ((T *)source)[src_idx];
74 new_sel.set_index(result_count++, src_idx);
75 }
76 }
77 }
78 sel.Initialize(new_sel);
79 approved_tuple_count = result_count;
80}
81
82template <class T, class OPL, class OPR>
83void Select(SelectionVector &sel, Vector &result, unsigned char *source, nullmask_t *source_nullmask,
84 const T constantLeft, const T constantRight, idx_t &approved_tuple_count) {
85 result.vector_type = VectorType::FLAT_VECTOR;
86 auto result_data = FlatVector::GetData(result);
87 SelectionVector new_sel(approved_tuple_count);
88 idx_t result_count = 0;
89 if (source_nullmask->any()) {
90 for (idx_t i = 0; i < approved_tuple_count; i++) {
91 idx_t src_idx = sel.get_index(i);
92 if (!(*source_nullmask)[src_idx] && OPL::Operation(((T *)source)[src_idx], constantLeft) &&
93 OPR::Operation(((T *)source)[src_idx], constantRight)) {
94 ((T *)result_data)[src_idx] = ((T *)source)[src_idx];
95 new_sel.set_index(result_count++, src_idx);
96 }
97 }
98 } else {
99 for (idx_t i = 0; i < approved_tuple_count; i++) {
100 idx_t src_idx = sel.get_index(i);
101 if (OPL::Operation(((T *)source)[src_idx], constantLeft) &&
102 OPR::Operation(((T *)source)[src_idx], constantRight)) {
103 ((T *)result_data)[src_idx] = ((T *)source)[src_idx];
104 new_sel.set_index(result_count++, src_idx);
105 }
106 }
107 }
108 sel.Initialize(new_sel);
109 approved_tuple_count = result_count;
110}
111
112template <class OP>
113static void templated_select_operation(SelectionVector &sel, Vector &result, TypeId type, unsigned char *source,
114 nullmask_t *source_mask, Value &constant, idx_t &approved_tuple_count) {
115 // the inplace loops take the result as the last parameter
116 switch (type) {
117 case TypeId::INT8: {
118 Select<int8_t, OP>(sel, result, source, source_mask, constant.value_.tinyint, approved_tuple_count);
119 break;
120 }
121 case TypeId::INT16: {
122 Select<int16_t, OP>(sel, result, source, source_mask, constant.value_.smallint, approved_tuple_count);
123 ;
124 break;
125 }
126 case TypeId::INT32: {
127 Select<int32_t, OP>(sel, result, source, source_mask, constant.value_.integer, approved_tuple_count);
128 break;
129 }
130 case TypeId::INT64: {
131 Select<int64_t, OP>(sel, result, source, source_mask, constant.value_.bigint, approved_tuple_count);
132 break;
133 }
134 case TypeId::FLOAT: {
135 Select<float, OP>(sel, result, source, source_mask, constant.value_.float_, approved_tuple_count);
136 break;
137 }
138 case TypeId::DOUBLE: {
139 Select<double, OP>(sel, result, source, source_mask, constant.value_.double_, approved_tuple_count);
140 break;
141 }
142 default:
143 throw InvalidTypeException(type, "Invalid type for filter pushed down to table comparison");
144 }
145}
146
147template <class OPL, class OPR>
148static void templated_select_operation_between(SelectionVector &sel, Vector &result, TypeId type, unsigned char *source,
149 nullmask_t *source_mask, Value &constantLeft, Value &constantRight,
150 idx_t &approved_tuple_count) {
151 // the inplace loops take the result as the last parameter
152 switch (type) {
153 case TypeId::INT8: {
154 Select<int8_t, OPL, OPR>(sel, result, source, source_mask, constantLeft.value_.tinyint,
155 constantRight.value_.tinyint, approved_tuple_count);
156 break;
157 }
158 case TypeId::INT16: {
159 Select<int16_t, OPL, OPR>(sel, result, source, source_mask, constantLeft.value_.smallint,
160 constantRight.value_.smallint, approved_tuple_count);
161 break;
162 }
163 case TypeId::INT32: {
164 Select<int32_t, OPL, OPR>(sel, result, source, source_mask, constantLeft.value_.integer,
165 constantRight.value_.integer, approved_tuple_count);
166 break;
167 }
168 case TypeId::INT64: {
169 Select<int64_t, OPL, OPR>(sel, result, source, source_mask, constantLeft.value_.bigint,
170 constantRight.value_.bigint, approved_tuple_count);
171 break;
172 }
173 case TypeId::FLOAT: {
174 Select<float, OPL, OPR>(sel, result, source, source_mask, constantLeft.value_.float_,
175 constantRight.value_.float_, approved_tuple_count);
176 break;
177 }
178 case TypeId::DOUBLE: {
179 Select<double, OPL, OPR>(sel, result, source, source_mask, constantLeft.value_.double_,
180 constantRight.value_.double_, approved_tuple_count);
181 break;
182 }
183 default:
184 throw InvalidTypeException(type, "Invalid type for filter pushed down to table comparison");
185 }
186}
187
188void NumericSegment::Select(ColumnScanState &state, Vector &result, SelectionVector &sel, idx_t &approved_tuple_count,
189 vector<TableFilter> &tableFilter) {
190 auto vector_index = state.vector_index;
191 assert(vector_index < max_vector_count);
192 assert(vector_index * STANDARD_VECTOR_SIZE <= tuple_count);
193
194 // pin the buffer for this segment
195 auto handle = manager.Pin(block_id);
196 auto data = handle->node->buffer;
197 auto offset = vector_index * vector_size;
198 auto source_nullmask = (nullmask_t *)(data + offset);
199 auto source_data = data + offset + sizeof(nullmask_t);
200
201 if (tableFilter.size() == 1) {
202 switch (tableFilter[0].comparison_type) {
203 case ExpressionType::COMPARE_EQUAL: {
204 templated_select_operation<Equals>(sel, result, state.current->type, source_data, source_nullmask,
205 tableFilter[0].constant, approved_tuple_count);
206 break;
207 }
208 case ExpressionType::COMPARE_LESSTHAN: {
209 templated_select_operation<LessThan>(sel, result, state.current->type, source_data, source_nullmask,
210 tableFilter[0].constant, approved_tuple_count);
211 break;
212 }
213 case ExpressionType::COMPARE_GREATERTHAN: {
214 templated_select_operation<GreaterThan>(sel, result, state.current->type, source_data, source_nullmask,
215 tableFilter[0].constant, approved_tuple_count);
216 break;
217 }
218 case ExpressionType::COMPARE_LESSTHANOREQUALTO: {
219 templated_select_operation<LessThanEquals>(sel, result, state.current->type, source_data, source_nullmask,
220 tableFilter[0].constant, approved_tuple_count);
221 break;
222 }
223 case ExpressionType::COMPARE_GREATERTHANOREQUALTO: {
224 templated_select_operation<GreaterThanEquals>(sel, result, state.current->type, source_data,
225 source_nullmask, tableFilter[0].constant,
226 approved_tuple_count);
227 break;
228 }
229 default:
230 throw NotImplementedException("Unknown comparison type for filter pushed down to table!");
231 }
232 } else {
233 assert(tableFilter[0].comparison_type == ExpressionType::COMPARE_GREATERTHAN ||
234 tableFilter[0].comparison_type == ExpressionType::COMPARE_GREATERTHANOREQUALTO);
235 assert(tableFilter[1].comparison_type == ExpressionType::COMPARE_LESSTHAN ||
236 tableFilter[1].comparison_type == ExpressionType::COMPARE_LESSTHANOREQUALTO);
237
238 if (tableFilter[0].comparison_type == ExpressionType::COMPARE_GREATERTHAN) {
239 if (tableFilter[1].comparison_type == ExpressionType::COMPARE_LESSTHAN) {
240 templated_select_operation_between<GreaterThan, LessThan>(
241 sel, result, state.current->type, source_data, source_nullmask, tableFilter[0].constant,
242 tableFilter[1].constant, approved_tuple_count);
243 } else {
244 templated_select_operation_between<GreaterThan, LessThanEquals>(
245 sel, result, state.current->type, source_data, source_nullmask, tableFilter[0].constant,
246 tableFilter[1].constant, approved_tuple_count);
247 }
248 } else {
249 if (tableFilter[1].comparison_type == ExpressionType::COMPARE_LESSTHAN) {
250 templated_select_operation_between<GreaterThanEquals, LessThan>(
251 sel, result, state.current->type, source_data, source_nullmask, tableFilter[0].constant,
252 tableFilter[1].constant, approved_tuple_count);
253 } else {
254 templated_select_operation_between<GreaterThanEquals, LessThanEquals>(
255 sel, result, state.current->type, source_data, source_nullmask, tableFilter[0].constant,
256 tableFilter[1].constant, approved_tuple_count);
257 }
258 }
259 }
260}
261
262//===--------------------------------------------------------------------===//
263// Fetch base data
264//===--------------------------------------------------------------------===//
265void NumericSegment::FetchBaseData(ColumnScanState &state, idx_t vector_index, Vector &result) {
266 assert(vector_index < max_vector_count);
267 assert(vector_index * STANDARD_VECTOR_SIZE <= tuple_count);
268
269 // pin the buffer for this segment
270 auto handle = manager.Pin(block_id);
271 auto data = handle->node->buffer;
272
273 auto offset = vector_index * vector_size;
274
275 idx_t count = GetVectorCount(vector_index);
276 auto source_nullmask = (nullmask_t *)(data + offset);
277 auto source_data = data + offset + sizeof(nullmask_t);
278
279 // fetch the nullmask and copy the data from the base table
280 result.vector_type = VectorType::FLAT_VECTOR;
281 FlatVector::SetNullmask(result, *source_nullmask);
282 memcpy(FlatVector::GetData(result), source_data, count * type_size);
283}
284
285void NumericSegment::FetchUpdateData(ColumnScanState &state, Transaction &transaction, UpdateInfo *version,
286 Vector &result) {
287 fetch_from_update_info(transaction, version, result);
288}
289
290template <class T>
291static void templated_assignment(SelectionVector &sel, data_ptr_t source, data_ptr_t result,
292 nullmask_t &source_nullmask, nullmask_t &result_nullmask, idx_t approved_tuple_count) {
293 if (source_nullmask.any()) {
294 for (size_t i = 0; i < approved_tuple_count; i++) {
295 if (source_nullmask[sel.get_index(i)]) {
296 result_nullmask.set(i, true);
297 } else {
298 ((T *)result)[i] = ((T *)source)[sel.get_index(i)];
299 }
300 }
301 } else {
302 for (size_t i = 0; i < approved_tuple_count; i++) {
303 ((T *)result)[i] = ((T *)source)[sel.get_index(i)];
304 }
305 }
306}
307
308void NumericSegment::FilterFetchBaseData(ColumnScanState &state, Vector &result, SelectionVector &sel,
309 idx_t &approved_tuple_count) {
310 auto vector_index = state.vector_index;
311 assert(vector_index < max_vector_count);
312 assert(vector_index * STANDARD_VECTOR_SIZE <= tuple_count);
313
314 // pin the buffer for this segment
315 auto handle = manager.Pin(block_id);
316 auto data = handle->node->buffer;
317
318 auto offset = vector_index * vector_size;
319 auto source_nullmask = (nullmask_t *)(data + offset);
320 auto source_data = data + offset + sizeof(nullmask_t);
321 // fetch the nullmask and copy the data from the base table
322 result.vector_type = VectorType::FLAT_VECTOR;
323 auto result_data = FlatVector::GetData(result);
324 nullmask_t result_nullmask;
325 // the inplace loops take the result as the last parameter
326 switch (type) {
327 case TypeId::BOOL:
328 case TypeId::INT8: {
329 templated_assignment<int8_t>(sel, source_data, result_data, *source_nullmask, result_nullmask,
330 approved_tuple_count);
331 break;
332 }
333 case TypeId::INT16: {
334 templated_assignment<int16_t>(sel, source_data, result_data, *source_nullmask, result_nullmask,
335 approved_tuple_count);
336 break;
337 }
338 case TypeId::INT32: {
339 templated_assignment<int32_t>(sel, source_data, result_data, *source_nullmask, result_nullmask,
340 approved_tuple_count);
341 break;
342 }
343 case TypeId::INT64: {
344 templated_assignment<int64_t>(sel, source_data, result_data, *source_nullmask, result_nullmask,
345 approved_tuple_count);
346 break;
347 }
348 case TypeId::FLOAT: {
349 templated_assignment<float>(sel, source_data, result_data, *source_nullmask, result_nullmask,
350 approved_tuple_count);
351 break;
352 }
353 case TypeId::DOUBLE: {
354 templated_assignment<double>(sel, source_data, result_data, *source_nullmask, result_nullmask,
355 approved_tuple_count);
356 break;
357 }
358 default:
359 throw InvalidTypeException(type, "Invalid type for filter scan");
360 }
361
362 FlatVector::SetNullmask(result, result_nullmask);
363}
364
365//===--------------------------------------------------------------------===//
366// Fetch
367//===--------------------------------------------------------------------===//
368void NumericSegment::FetchRow(ColumnFetchState &state, Transaction &transaction, row_t row_id, Vector &result,
369 idx_t result_idx) {
370 auto read_lock = lock.GetSharedLock();
371 auto handle = manager.Pin(block_id);
372
373 // get the vector index
374 idx_t vector_index = row_id / STANDARD_VECTOR_SIZE;
375 idx_t id_in_vector = row_id - vector_index * STANDARD_VECTOR_SIZE;
376 assert(vector_index < max_vector_count);
377
378 // first fetch the data from the base table
379 auto data = handle->node->buffer + vector_index * vector_size;
380 auto &nullmask = *((nullmask_t *)(data));
381 auto vector_ptr = data + sizeof(nullmask_t);
382
383 FlatVector::SetNull(result, result_idx, nullmask[id_in_vector]);
384 memcpy(FlatVector::GetData(result) + result_idx * type_size, vector_ptr + id_in_vector * type_size, type_size);
385 if (versions && versions[vector_index]) {
386 // version information: follow the version chain to find out if we need to load this tuple data from any other
387 // version
388 append_from_update_info(transaction, versions[vector_index], id_in_vector, result, result_idx);
389 }
390}
391
392//===--------------------------------------------------------------------===//
393// Append
394//===--------------------------------------------------------------------===//
395idx_t NumericSegment::Append(SegmentStatistics &stats, Vector &data, idx_t offset, idx_t count) {
396 assert(data.type == type);
397 auto handle = manager.Pin(block_id);
398
399 idx_t initial_count = tuple_count;
400 while (count > 0) {
401 // get the vector index of the vector to append to and see how many tuples we can append to that vector
402 idx_t vector_index = tuple_count / STANDARD_VECTOR_SIZE;
403 if (vector_index == max_vector_count) {
404 break;
405 }
406 idx_t current_tuple_count = tuple_count - vector_index * STANDARD_VECTOR_SIZE;
407 idx_t append_count = std::min(STANDARD_VECTOR_SIZE - current_tuple_count, count);
408
409 // now perform the actual append
410 append_function(stats, handle->node->buffer + vector_size * vector_index, current_tuple_count, data, offset,
411 append_count);
412
413 count -= append_count;
414 offset += append_count;
415 tuple_count += append_count;
416 }
417 return tuple_count - initial_count;
418}
419
420//===--------------------------------------------------------------------===//
421// Update
422//===--------------------------------------------------------------------===//
423void NumericSegment::Update(ColumnData &column_data, SegmentStatistics &stats, Transaction &transaction, Vector &update,
424 row_t *ids, idx_t count, idx_t vector_index, idx_t vector_offset, UpdateInfo *node) {
425 if (!node) {
426 auto handle = manager.Pin(block_id);
427
428 // create a new node in the undo buffer for this update
429 node = CreateUpdateInfo(column_data, transaction, ids, count, vector_index, vector_offset, type_size);
430 // now move the original data into the UpdateInfo
431 update_function(stats, node, handle->node->buffer + vector_index * vector_size, update);
432 } else {
433 // node already exists for this transaction, we need to merge the new updates with the existing updates
434 auto handle = manager.Pin(block_id);
435
436 merge_update_function(stats, node, handle->node->buffer + vector_index * vector_size, update, ids, count,
437 vector_offset);
438 }
439}
440
441void NumericSegment::RollbackUpdate(UpdateInfo *info) {
442 // obtain an exclusive lock
443 auto lock_handle = lock.GetExclusiveLock();
444 auto handle = manager.Pin(block_id);
445
446 // move the data from the UpdateInfo back into the base table
447 rollback_update(info, handle->node->buffer + info->vector_index * vector_size);
448
449 CleanupUpdate(info);
450}
451
452//===--------------------------------------------------------------------===//
453// Append
454//===--------------------------------------------------------------------===//
455template <class T> static void update_min_max(T value, T *__restrict min, T *__restrict max) {
456 if (value < *min) {
457 *min = value;
458 }
459 if (value > *max) {
460 *max = value;
461 }
462}
463
464template <class T>
465static void append_loop(SegmentStatistics &stats, data_ptr_t target, idx_t target_offset, Vector &source, idx_t offset,
466 idx_t count) {
467 auto &nullmask = *((nullmask_t *)target);
468 auto min = (T *)stats.minimum.get();
469 auto max = (T *)stats.maximum.get();
470
471 VectorData adata;
472 source.Orrify(count, adata);
473
474 auto sdata = (T *)adata.data;
475 auto tdata = (T *)(target + sizeof(nullmask_t));
476 if (adata.nullmask->any()) {
477 for (idx_t i = 0; i < count; i++) {
478 auto source_idx = adata.sel->get_index(offset + i);
479 auto target_idx = target_offset + i;
480 bool is_null = (*adata.nullmask)[source_idx];
481 if (is_null) {
482 nullmask[target_idx] = true;
483 stats.has_null = true;
484 } else {
485 update_min_max(sdata[source_idx], min, max);
486 tdata[target_idx] = sdata[source_idx];
487 }
488 }
489 } else {
490 for (idx_t i = 0; i < count; i++) {
491 auto source_idx = adata.sel->get_index(offset + i);
492 auto target_idx = target_offset + i;
493 update_min_max(sdata[source_idx], min, max);
494 tdata[target_idx] = sdata[source_idx];
495 }
496 }
497}
498
499static NumericSegment::append_function_t GetAppendFunction(TypeId type) {
500 switch (type) {
501 case TypeId::BOOL:
502 case TypeId::INT8:
503 return append_loop<int8_t>;
504 case TypeId::INT16:
505 return append_loop<int16_t>;
506 case TypeId::INT32:
507 return append_loop<int32_t>;
508 case TypeId::INT64:
509 return append_loop<int64_t>;
510 case TypeId::FLOAT:
511 return append_loop<float>;
512 case TypeId::DOUBLE:
513 return append_loop<double>;
514 default:
515 throw NotImplementedException("Unimplemented type for uncompressed segment");
516 }
517}
518
519//===--------------------------------------------------------------------===//
520// Update
521//===--------------------------------------------------------------------===//
522template <class T>
523static void update_loop_null(T *__restrict undo_data, T *__restrict base_data, T *__restrict new_data,
524 nullmask_t &undo_nullmask, nullmask_t &base_nullmask, nullmask_t &new_nullmask,
525 idx_t count, sel_t *__restrict base_sel, T *__restrict min, T *__restrict max) {
526 for (idx_t i = 0; i < count; i++) {
527 // first move the base data into the undo buffer info
528 undo_data[i] = base_data[base_sel[i]];
529 undo_nullmask[base_sel[i]] = base_nullmask[base_sel[i]];
530 // now move the new data in-place into the base table
531 base_data[base_sel[i]] = new_data[i];
532 base_nullmask[base_sel[i]] = new_nullmask[i];
533 // update the min max with the new data
534 update_min_max(new_data[i], min, max);
535 }
536}
537
538template <class T>
539static void update_loop_no_null(T *__restrict undo_data, T *__restrict base_data, T *__restrict new_data, idx_t count,
540 sel_t *__restrict base_sel, T *__restrict min, T *__restrict max) {
541 for (idx_t i = 0; i < count; i++) {
542 // first move the base data into the undo buffer info
543 undo_data[i] = base_data[base_sel[i]];
544 // now move the new data in-place into the base table
545 base_data[base_sel[i]] = new_data[i];
546 // update the min max with the new data
547 update_min_max(new_data[i], min, max);
548 }
549}
550
551template <class T>
552static void update_loop(SegmentStatistics &stats, UpdateInfo *info, data_ptr_t base, Vector &update) {
553 auto update_data = FlatVector::GetData<T>(update);
554 auto &update_nullmask = FlatVector::Nullmask(update);
555 auto nullmask = (nullmask_t *)base;
556 auto base_data = (T *)(base + sizeof(nullmask_t));
557 auto undo_data = (T *)info->tuple_data;
558 auto min = (T *)stats.minimum.get();
559 auto max = (T *)stats.maximum.get();
560
561 if (update_nullmask.any() || nullmask->any()) {
562 update_loop_null(undo_data, base_data, update_data, info->nullmask, *nullmask, update_nullmask, info->N,
563 info->tuples, min, max);
564 } else {
565 update_loop_no_null(undo_data, base_data, update_data, info->N, info->tuples, min, max);
566 }
567}
568
569static NumericSegment::update_function_t GetUpdateFunction(TypeId type) {
570 switch (type) {
571 case TypeId::BOOL:
572 case TypeId::INT8:
573 return update_loop<int8_t>;
574 case TypeId::INT16:
575 return update_loop<int16_t>;
576 case TypeId::INT32:
577 return update_loop<int32_t>;
578 case TypeId::INT64:
579 return update_loop<int64_t>;
580 case TypeId::FLOAT:
581 return update_loop<float>;
582 case TypeId::DOUBLE:
583 return update_loop<double>;
584 default:
585 throw NotImplementedException("Unimplemented type for uncompressed segment");
586 }
587}
588
589//===--------------------------------------------------------------------===//
590// Merge Update
591//===--------------------------------------------------------------------===//
592template <class T>
593static void merge_update_loop(SegmentStatistics &stats, UpdateInfo *node, data_ptr_t base, Vector &update, row_t *ids,
594 idx_t count, idx_t vector_offset) {
595 auto &base_nullmask = *((nullmask_t *)base);
596 auto base_data = (T *)(base + sizeof(nullmask_t));
597 auto info_data = (T *)node->tuple_data;
598 auto update_data = FlatVector::GetData<T>(update);
599 auto &update_nullmask = FlatVector::Nullmask(update);
600 auto min = (T *)stats.minimum.get();
601 auto max = (T *)stats.maximum.get();
602 for (idx_t i = 0; i < count; i++) {
603 update_min_max<T>(update_data[i], min, max);
604 }
605
606 // first we copy the old update info into a temporary structure
607 sel_t old_ids[STANDARD_VECTOR_SIZE];
608 T old_data[STANDARD_VECTOR_SIZE];
609
610 memcpy(old_ids, node->tuples, node->N * sizeof(sel_t));
611 memcpy(old_data, node->tuple_data, node->N * sizeof(T));
612
613 // now we perform a merge of the new ids with the old ids
614 auto merge = [&](idx_t id, idx_t aidx, idx_t bidx, idx_t count) {
615 // new_id and old_id are the same:
616 // insert the new data into the base table
617 base_nullmask[id] = update_nullmask[aidx];
618 base_data[id] = update_data[aidx];
619 // insert the old data in the UpdateInfo
620 info_data[count] = old_data[bidx];
621 node->tuples[count] = id;
622 };
623 auto pick_new = [&](idx_t id, idx_t aidx, idx_t count) {
624 // new_id comes before the old id
625 // insert the base table data into the update info
626 info_data[count] = base_data[id];
627 node->nullmask[id] = base_nullmask[id];
628
629 // and insert the update info into the base table
630 base_nullmask[id] = update_nullmask[aidx];
631 base_data[id] = update_data[aidx];
632
633 node->tuples[count] = id;
634 };
635 auto pick_old = [&](idx_t id, idx_t bidx, idx_t count) {
636 // old_id comes before new_id, insert the old data
637 info_data[count] = old_data[bidx];
638 node->tuples[count] = id;
639 };
640 // perform the merge
641 node->N = merge_loop(ids, old_ids, count, node->N, vector_offset, merge, pick_new, pick_old);
642}
643
644static NumericSegment::merge_update_function_t GetMergeUpdateFunction(TypeId type) {
645 switch (type) {
646 case TypeId::BOOL:
647 case TypeId::INT8:
648 return merge_update_loop<int8_t>;
649 case TypeId::INT16:
650 return merge_update_loop<int16_t>;
651 case TypeId::INT32:
652 return merge_update_loop<int32_t>;
653 case TypeId::INT64:
654 return merge_update_loop<int64_t>;
655 case TypeId::FLOAT:
656 return merge_update_loop<float>;
657 case TypeId::DOUBLE:
658 return merge_update_loop<double>;
659 default:
660 throw NotImplementedException("Unimplemented type for uncompressed segment");
661 }
662}
663
664//===--------------------------------------------------------------------===//
665// Update Fetch
666//===--------------------------------------------------------------------===//
667template <class T> static void update_info_fetch(Transaction &transaction, UpdateInfo *info, Vector &result) {
668 auto result_data = FlatVector::GetData<T>(result);
669 auto &result_mask = FlatVector::Nullmask(result);
670 UpdateInfo::UpdatesForTransaction(info, transaction, [&](UpdateInfo *current) {
671 auto info_data = (T *)current->tuple_data;
672 for (idx_t i = 0; i < current->N; i++) {
673 result_data[current->tuples[i]] = info_data[i];
674 result_mask[current->tuples[i]] = current->nullmask[current->tuples[i]];
675 }
676 });
677}
678
679static NumericSegment::update_info_fetch_function_t GetUpdateInfoFetchFunction(TypeId type) {
680 switch (type) {
681 case TypeId::BOOL:
682 case TypeId::INT8:
683 return update_info_fetch<int8_t>;
684 case TypeId::INT16:
685 return update_info_fetch<int16_t>;
686 case TypeId::INT32:
687 return update_info_fetch<int32_t>;
688 case TypeId::INT64:
689 return update_info_fetch<int64_t>;
690 case TypeId::FLOAT:
691 return update_info_fetch<float>;
692 case TypeId::DOUBLE:
693 return update_info_fetch<double>;
694 default:
695 throw NotImplementedException("Unimplemented type for uncompressed segment");
696 }
697}
698
699//===--------------------------------------------------------------------===//
700// Update Append
701//===--------------------------------------------------------------------===//
702template <class T>
703static void update_info_append(Transaction &transaction, UpdateInfo *info, idx_t row_id, Vector &result,
704 idx_t result_idx) {
705 auto result_data = FlatVector::GetData<T>(result);
706 auto &result_mask = FlatVector::Nullmask(result);
707 UpdateInfo::UpdatesForTransaction(info, transaction, [&](UpdateInfo *current) {
708 auto info_data = (T *)current->tuple_data;
709 // loop over the tuples in this UpdateInfo
710 for (idx_t i = 0; i < current->N; i++) {
711 if (current->tuples[i] == row_id) {
712 // found the relevant tuple
713 result_data[result_idx] = info_data[i];
714 result_mask[result_idx] = current->nullmask[current->tuples[i]];
715 break;
716 } else if (current->tuples[i] > row_id) {
717 // tuples are sorted: so if the current tuple is > row_id we will not find it anymore
718 break;
719 }
720 }
721 });
722}
723
724static NumericSegment::update_info_append_function_t GetUpdateInfoAppendFunction(TypeId type) {
725 switch (type) {
726 case TypeId::BOOL:
727 case TypeId::INT8:
728 return update_info_append<int8_t>;
729 case TypeId::INT16:
730 return update_info_append<int16_t>;
731 case TypeId::INT32:
732 return update_info_append<int32_t>;
733 case TypeId::INT64:
734 return update_info_append<int64_t>;
735 case TypeId::FLOAT:
736 return update_info_append<float>;
737 case TypeId::DOUBLE:
738 return update_info_append<double>;
739 default:
740 throw NotImplementedException("Unimplemented type for uncompressed segment");
741 }
742}
743
744//===--------------------------------------------------------------------===//
745// Rollback Update
746//===--------------------------------------------------------------------===//
747template <class T> static void rollback_update(UpdateInfo *info, data_ptr_t base) {
748 auto &nullmask = *((nullmask_t *)base);
749 auto info_data = (T *)info->tuple_data;
750 auto base_data = (T *)(base + sizeof(nullmask_t));
751
752 for (idx_t i = 0; i < info->N; i++) {
753 base_data[info->tuples[i]] = info_data[i];
754 nullmask[info->tuples[i]] = info->nullmask[info->tuples[i]];
755 }
756}
757
758static NumericSegment::rollback_update_function_t GetRollbackUpdateFunction(TypeId type) {
759 switch (type) {
760 case TypeId::BOOL:
761 case TypeId::INT8:
762 return rollback_update<int8_t>;
763 case TypeId::INT16:
764 return rollback_update<int16_t>;
765 case TypeId::INT32:
766 return rollback_update<int32_t>;
767 case TypeId::INT64:
768 return rollback_update<int64_t>;
769 case TypeId::FLOAT:
770 return rollback_update<float>;
771 case TypeId::DOUBLE:
772 return rollback_update<double>;
773 default:
774 throw NotImplementedException("Unimplemented type for uncompressed segment");
775 }
776}
777