1#include "duckdb/storage/table/update_segment.hpp"
2
3#include "duckdb/storage/statistics/distinct_statistics.hpp"
4
5#include "duckdb/storage/table/column_data.hpp"
6#include "duckdb/transaction/duck_transaction.hpp"
7#include "duckdb/transaction/update_info.hpp"
8#include "duckdb/common/printer.hpp"
9
10#include <algorithm>
11
12namespace duckdb {
13
14static UpdateSegment::initialize_update_function_t GetInitializeUpdateFunction(PhysicalType type);
15static UpdateSegment::fetch_update_function_t GetFetchUpdateFunction(PhysicalType type);
16static UpdateSegment::fetch_committed_function_t GetFetchCommittedFunction(PhysicalType type);
17static UpdateSegment::fetch_committed_range_function_t GetFetchCommittedRangeFunction(PhysicalType type);
18
19static UpdateSegment::merge_update_function_t GetMergeUpdateFunction(PhysicalType type);
20static UpdateSegment::rollback_update_function_t GetRollbackUpdateFunction(PhysicalType type);
21static UpdateSegment::statistics_update_function_t GetStatisticsUpdateFunction(PhysicalType type);
22static UpdateSegment::fetch_row_function_t GetFetchRowFunction(PhysicalType type);
23
24UpdateSegment::UpdateSegment(ColumnData &column_data)
25 : column_data(column_data), stats(column_data.type), heap(BufferAllocator::Get(db&: column_data.GetDatabase())) {
26 auto physical_type = column_data.type.InternalType();
27
28 this->type_size = GetTypeIdSize(type: physical_type);
29
30 this->initialize_update_function = GetInitializeUpdateFunction(type: physical_type);
31 this->fetch_update_function = GetFetchUpdateFunction(type: physical_type);
32 this->fetch_committed_function = GetFetchCommittedFunction(type: physical_type);
33 this->fetch_committed_range = GetFetchCommittedRangeFunction(type: physical_type);
34 this->fetch_row_function = GetFetchRowFunction(type: physical_type);
35 this->merge_update_function = GetMergeUpdateFunction(type: physical_type);
36 this->rollback_update_function = GetRollbackUpdateFunction(type: physical_type);
37 this->statistics_update_function = GetStatisticsUpdateFunction(type: physical_type);
38}
39
40UpdateSegment::~UpdateSegment() {
41}
42
43//===--------------------------------------------------------------------===//
44// Update Info Helpers
45//===--------------------------------------------------------------------===//
46Value UpdateInfo::GetValue(idx_t index) {
47 auto &type = segment->column_data.type;
48
49 switch (type.id()) {
50 case LogicalTypeId::VALIDITY:
51 return Value::BOOLEAN(value: reinterpret_cast<bool *>(tuple_data)[index]);
52 case LogicalTypeId::INTEGER:
53 return Value::INTEGER(value: reinterpret_cast<int32_t *>(tuple_data)[index]);
54 default:
55 throw NotImplementedException("Unimplemented type for UpdateInfo::GetValue");
56 }
57}
58
59void UpdateInfo::Print() {
60 Printer::Print(str: ToString());
61}
62
63string UpdateInfo::ToString() {
64 auto &type = segment->column_data.type;
65 string result = "Update Info [" + type.ToString() + ", Count: " + to_string(val: N) +
66 ", Transaction Id: " + to_string(val: version_number) + "]\n";
67 for (idx_t i = 0; i < N; i++) {
68 result += to_string(val: tuples[i]) + ": " + GetValue(index: i).ToString() + "\n";
69 }
70 if (next) {
71 result += "\nChild Segment: " + next->ToString();
72 }
73 return result;
74}
75
76void UpdateInfo::Verify() {
77#ifdef DEBUG
78 for (idx_t i = 1; i < N; i++) {
79 D_ASSERT(tuples[i] > tuples[i - 1] && tuples[i] < STANDARD_VECTOR_SIZE);
80 }
81#endif
82}
83
84//===--------------------------------------------------------------------===//
85// Update Fetch
86//===--------------------------------------------------------------------===//
87static void MergeValidityInfo(UpdateInfo *current, ValidityMask &result_mask) {
88 auto info_data = reinterpret_cast<bool *>(current->tuple_data);
89 for (idx_t i = 0; i < current->N; i++) {
90 result_mask.Set(row_idx: current->tuples[i], valid: info_data[i]);
91 }
92}
93
94static void UpdateMergeValidity(transaction_t start_time, transaction_t transaction_id, UpdateInfo *info,
95 Vector &result) {
96 auto &result_mask = FlatVector::Validity(vector&: result);
97 UpdateInfo::UpdatesForTransaction(current: info, start_time, transaction_id,
98 callback: [&](UpdateInfo *current) { MergeValidityInfo(current, result_mask); });
99}
100
101template <class T>
102static void MergeUpdateInfo(UpdateInfo *current, T *result_data) {
103 auto info_data = reinterpret_cast<T *>(current->tuple_data);
104 if (current->N == STANDARD_VECTOR_SIZE) {
105 // special case: update touches ALL tuples of this vector
106 // in this case we can just memcpy the data
107 // since the layout of the update info is guaranteed to be [0, 1, 2, 3, ...]
108 memcpy(result_data, info_data, sizeof(T) * current->N);
109 } else {
110 for (idx_t i = 0; i < current->N; i++) {
111 result_data[current->tuples[i]] = info_data[i];
112 }
113 }
114}
115
116template <class T>
117static void UpdateMergeFetch(transaction_t start_time, transaction_t transaction_id, UpdateInfo *info, Vector &result) {
118 auto result_data = FlatVector::GetData<T>(result);
119 UpdateInfo::UpdatesForTransaction(info, start_time, transaction_id,
120 [&](UpdateInfo *current) { MergeUpdateInfo<T>(current, result_data); });
121}
122
123static UpdateSegment::fetch_update_function_t GetFetchUpdateFunction(PhysicalType type) {
124 switch (type) {
125 case PhysicalType::BIT:
126 return UpdateMergeValidity;
127 case PhysicalType::BOOL:
128 case PhysicalType::INT8:
129 return UpdateMergeFetch<int8_t>;
130 case PhysicalType::INT16:
131 return UpdateMergeFetch<int16_t>;
132 case PhysicalType::INT32:
133 return UpdateMergeFetch<int32_t>;
134 case PhysicalType::INT64:
135 return UpdateMergeFetch<int64_t>;
136 case PhysicalType::UINT8:
137 return UpdateMergeFetch<uint8_t>;
138 case PhysicalType::UINT16:
139 return UpdateMergeFetch<uint16_t>;
140 case PhysicalType::UINT32:
141 return UpdateMergeFetch<uint32_t>;
142 case PhysicalType::UINT64:
143 return UpdateMergeFetch<uint64_t>;
144 case PhysicalType::INT128:
145 return UpdateMergeFetch<hugeint_t>;
146 case PhysicalType::FLOAT:
147 return UpdateMergeFetch<float>;
148 case PhysicalType::DOUBLE:
149 return UpdateMergeFetch<double>;
150 case PhysicalType::INTERVAL:
151 return UpdateMergeFetch<interval_t>;
152 case PhysicalType::VARCHAR:
153 return UpdateMergeFetch<string_t>;
154 default:
155 throw NotImplementedException("Unimplemented type for update segment");
156 }
157}
158
159void UpdateSegment::FetchUpdates(TransactionData transaction, idx_t vector_index, Vector &result) {
160 auto lock_handle = lock.GetSharedLock();
161 if (!root) {
162 return;
163 }
164 if (!root->info[vector_index]) {
165 return;
166 }
167 // FIXME: normalify if this is not the case... need to pass in count?
168 D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR);
169
170 fetch_update_function(transaction.start_time, transaction.transaction_id, root->info[vector_index]->info.get(),
171 result);
172}
173
174//===--------------------------------------------------------------------===//
175// Fetch Committed
176//===--------------------------------------------------------------------===//
177static void FetchCommittedValidity(UpdateInfo *info, Vector &result) {
178 auto &result_mask = FlatVector::Validity(vector&: result);
179 MergeValidityInfo(current: info, result_mask);
180}
181
182template <class T>
183static void TemplatedFetchCommitted(UpdateInfo *info, Vector &result) {
184 auto result_data = FlatVector::GetData<T>(result);
185 MergeUpdateInfo<T>(info, result_data);
186}
187
188static UpdateSegment::fetch_committed_function_t GetFetchCommittedFunction(PhysicalType type) {
189 switch (type) {
190 case PhysicalType::BIT:
191 return FetchCommittedValidity;
192 case PhysicalType::BOOL:
193 case PhysicalType::INT8:
194 return TemplatedFetchCommitted<int8_t>;
195 case PhysicalType::INT16:
196 return TemplatedFetchCommitted<int16_t>;
197 case PhysicalType::INT32:
198 return TemplatedFetchCommitted<int32_t>;
199 case PhysicalType::INT64:
200 return TemplatedFetchCommitted<int64_t>;
201 case PhysicalType::UINT8:
202 return TemplatedFetchCommitted<uint8_t>;
203 case PhysicalType::UINT16:
204 return TemplatedFetchCommitted<uint16_t>;
205 case PhysicalType::UINT32:
206 return TemplatedFetchCommitted<uint32_t>;
207 case PhysicalType::UINT64:
208 return TemplatedFetchCommitted<uint64_t>;
209 case PhysicalType::INT128:
210 return TemplatedFetchCommitted<hugeint_t>;
211 case PhysicalType::FLOAT:
212 return TemplatedFetchCommitted<float>;
213 case PhysicalType::DOUBLE:
214 return TemplatedFetchCommitted<double>;
215 case PhysicalType::INTERVAL:
216 return TemplatedFetchCommitted<interval_t>;
217 case PhysicalType::VARCHAR:
218 return TemplatedFetchCommitted<string_t>;
219 default:
220 throw NotImplementedException("Unimplemented type for update segment");
221 }
222}
223
224void UpdateSegment::FetchCommitted(idx_t vector_index, Vector &result) {
225 auto lock_handle = lock.GetSharedLock();
226
227 if (!root) {
228 return;
229 }
230 if (!root->info[vector_index]) {
231 return;
232 }
233 // FIXME: normalify if this is not the case... need to pass in count?
234 D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR);
235
236 fetch_committed_function(root->info[vector_index]->info.get(), result);
237}
238
239//===--------------------------------------------------------------------===//
240// Fetch Range
241//===--------------------------------------------------------------------===//
242static void MergeUpdateInfoRangeValidity(UpdateInfo *current, idx_t start, idx_t end, idx_t result_offset,
243 ValidityMask &result_mask) {
244 auto info_data = reinterpret_cast<bool *>(current->tuple_data);
245 for (idx_t i = 0; i < current->N; i++) {
246 auto tuple_idx = current->tuples[i];
247 if (tuple_idx < start) {
248 continue;
249 } else if (tuple_idx >= end) {
250 break;
251 }
252 auto result_idx = result_offset + tuple_idx - start;
253 result_mask.Set(row_idx: result_idx, valid: info_data[i]);
254 }
255}
256
257static void FetchCommittedRangeValidity(UpdateInfo *info, idx_t start, idx_t end, idx_t result_offset, Vector &result) {
258 auto &result_mask = FlatVector::Validity(vector&: result);
259 MergeUpdateInfoRangeValidity(current: info, start, end, result_offset, result_mask);
260}
261
262template <class T>
263static void MergeUpdateInfoRange(UpdateInfo *current, idx_t start, idx_t end, idx_t result_offset, T *result_data) {
264 auto info_data = reinterpret_cast<T *>(current->tuple_data);
265 for (idx_t i = 0; i < current->N; i++) {
266 auto tuple_idx = current->tuples[i];
267 if (tuple_idx < start) {
268 continue;
269 } else if (tuple_idx >= end) {
270 break;
271 }
272 auto result_idx = result_offset + tuple_idx - start;
273 result_data[result_idx] = info_data[i];
274 }
275}
276
277template <class T>
278static void TemplatedFetchCommittedRange(UpdateInfo *info, idx_t start, idx_t end, idx_t result_offset,
279 Vector &result) {
280 auto result_data = FlatVector::GetData<T>(result);
281 MergeUpdateInfoRange<T>(info, start, end, result_offset, result_data);
282}
283
284static UpdateSegment::fetch_committed_range_function_t GetFetchCommittedRangeFunction(PhysicalType type) {
285 switch (type) {
286 case PhysicalType::BIT:
287 return FetchCommittedRangeValidity;
288 case PhysicalType::BOOL:
289 case PhysicalType::INT8:
290 return TemplatedFetchCommittedRange<int8_t>;
291 case PhysicalType::INT16:
292 return TemplatedFetchCommittedRange<int16_t>;
293 case PhysicalType::INT32:
294 return TemplatedFetchCommittedRange<int32_t>;
295 case PhysicalType::INT64:
296 return TemplatedFetchCommittedRange<int64_t>;
297 case PhysicalType::UINT8:
298 return TemplatedFetchCommittedRange<uint8_t>;
299 case PhysicalType::UINT16:
300 return TemplatedFetchCommittedRange<uint16_t>;
301 case PhysicalType::UINT32:
302 return TemplatedFetchCommittedRange<uint32_t>;
303 case PhysicalType::UINT64:
304 return TemplatedFetchCommittedRange<uint64_t>;
305 case PhysicalType::INT128:
306 return TemplatedFetchCommittedRange<hugeint_t>;
307 case PhysicalType::FLOAT:
308 return TemplatedFetchCommittedRange<float>;
309 case PhysicalType::DOUBLE:
310 return TemplatedFetchCommittedRange<double>;
311 case PhysicalType::INTERVAL:
312 return TemplatedFetchCommittedRange<interval_t>;
313 case PhysicalType::VARCHAR:
314 return TemplatedFetchCommittedRange<string_t>;
315 default:
316 throw NotImplementedException("Unimplemented type for update segment");
317 }
318}
319
320void UpdateSegment::FetchCommittedRange(idx_t start_row, idx_t count, Vector &result) {
321 D_ASSERT(count > 0);
322 if (!root) {
323 return;
324 }
325 D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR);
326
327 idx_t end_row = start_row + count;
328 idx_t start_vector = start_row / STANDARD_VECTOR_SIZE;
329 idx_t end_vector = (end_row - 1) / STANDARD_VECTOR_SIZE;
330 D_ASSERT(start_vector <= end_vector);
331 D_ASSERT(end_vector < RowGroup::ROW_GROUP_VECTOR_COUNT);
332
333 for (idx_t vector_idx = start_vector; vector_idx <= end_vector; vector_idx++) {
334 if (!root->info[vector_idx]) {
335 continue;
336 }
337 idx_t start_in_vector = vector_idx == start_vector ? start_row - start_vector * STANDARD_VECTOR_SIZE : 0;
338 idx_t end_in_vector =
339 vector_idx == end_vector ? end_row - end_vector * STANDARD_VECTOR_SIZE : STANDARD_VECTOR_SIZE;
340 D_ASSERT(start_in_vector < end_in_vector);
341 D_ASSERT(end_in_vector > 0 && end_in_vector <= STANDARD_VECTOR_SIZE);
342 idx_t result_offset = ((vector_idx * STANDARD_VECTOR_SIZE) + start_in_vector) - start_row;
343 fetch_committed_range(root->info[vector_idx]->info.get(), start_in_vector, end_in_vector, result_offset,
344 result);
345 }
346}
347
348//===--------------------------------------------------------------------===//
349// Fetch Row
350//===--------------------------------------------------------------------===//
351static void FetchRowValidity(transaction_t start_time, transaction_t transaction_id, UpdateInfo *info, idx_t row_idx,
352 Vector &result, idx_t result_idx) {
353 auto &result_mask = FlatVector::Validity(vector&: result);
354 UpdateInfo::UpdatesForTransaction(current: info, start_time, transaction_id, callback: [&](UpdateInfo *current) {
355 auto info_data = reinterpret_cast<bool *>(current->tuple_data);
356 // FIXME: we could do a binary search in here
357 for (idx_t i = 0; i < current->N; i++) {
358 if (current->tuples[i] == row_idx) {
359 result_mask.Set(row_idx: result_idx, valid: info_data[i]);
360 break;
361 } else if (current->tuples[i] > row_idx) {
362 break;
363 }
364 }
365 });
366}
367
368template <class T>
369static void TemplatedFetchRow(transaction_t start_time, transaction_t transaction_id, UpdateInfo *info, idx_t row_idx,
370 Vector &result, idx_t result_idx) {
371 auto result_data = FlatVector::GetData<T>(result);
372 UpdateInfo::UpdatesForTransaction(info, start_time, transaction_id, [&](UpdateInfo *current) {
373 auto info_data = (T *)current->tuple_data;
374 // FIXME: we could do a binary search in here
375 for (idx_t i = 0; i < current->N; i++) {
376 if (current->tuples[i] == row_idx) {
377 result_data[result_idx] = info_data[i];
378 break;
379 } else if (current->tuples[i] > row_idx) {
380 break;
381 }
382 }
383 });
384}
385
386static UpdateSegment::fetch_row_function_t GetFetchRowFunction(PhysicalType type) {
387 switch (type) {
388 case PhysicalType::BIT:
389 return FetchRowValidity;
390 case PhysicalType::BOOL:
391 case PhysicalType::INT8:
392 return TemplatedFetchRow<int8_t>;
393 case PhysicalType::INT16:
394 return TemplatedFetchRow<int16_t>;
395 case PhysicalType::INT32:
396 return TemplatedFetchRow<int32_t>;
397 case PhysicalType::INT64:
398 return TemplatedFetchRow<int64_t>;
399 case PhysicalType::UINT8:
400 return TemplatedFetchRow<uint8_t>;
401 case PhysicalType::UINT16:
402 return TemplatedFetchRow<uint16_t>;
403 case PhysicalType::UINT32:
404 return TemplatedFetchRow<uint32_t>;
405 case PhysicalType::UINT64:
406 return TemplatedFetchRow<uint64_t>;
407 case PhysicalType::INT128:
408 return TemplatedFetchRow<hugeint_t>;
409 case PhysicalType::FLOAT:
410 return TemplatedFetchRow<float>;
411 case PhysicalType::DOUBLE:
412 return TemplatedFetchRow<double>;
413 case PhysicalType::INTERVAL:
414 return TemplatedFetchRow<interval_t>;
415 case PhysicalType::VARCHAR:
416 return TemplatedFetchRow<string_t>;
417 default:
418 throw NotImplementedException("Unimplemented type for update segment fetch row");
419 }
420}
421
422void UpdateSegment::FetchRow(TransactionData transaction, idx_t row_id, Vector &result, idx_t result_idx) {
423 if (!root) {
424 return;
425 }
426 idx_t vector_index = (row_id - column_data.start) / STANDARD_VECTOR_SIZE;
427 if (!root->info[vector_index]) {
428 return;
429 }
430 idx_t row_in_vector = row_id - vector_index * STANDARD_VECTOR_SIZE;
431 fetch_row_function(transaction.start_time, transaction.transaction_id, root->info[vector_index]->info.get(),
432 row_in_vector, result, result_idx);
433}
434
435//===--------------------------------------------------------------------===//
436// Rollback update
437//===--------------------------------------------------------------------===//
438template <class T>
439static void RollbackUpdate(UpdateInfo &base_info, UpdateInfo &rollback_info) {
440 auto base_data = (T *)base_info.tuple_data;
441 auto rollback_data = (T *)rollback_info.tuple_data;
442 idx_t base_offset = 0;
443 for (idx_t i = 0; i < rollback_info.N; i++) {
444 auto id = rollback_info.tuples[i];
445 while (base_info.tuples[base_offset] < id) {
446 base_offset++;
447 D_ASSERT(base_offset < base_info.N);
448 }
449 base_data[base_offset] = rollback_data[i];
450 }
451}
452
453static UpdateSegment::rollback_update_function_t GetRollbackUpdateFunction(PhysicalType type) {
454 switch (type) {
455 case PhysicalType::BIT:
456 return RollbackUpdate<bool>;
457 case PhysicalType::BOOL:
458 case PhysicalType::INT8:
459 return RollbackUpdate<int8_t>;
460 case PhysicalType::INT16:
461 return RollbackUpdate<int16_t>;
462 case PhysicalType::INT32:
463 return RollbackUpdate<int32_t>;
464 case PhysicalType::INT64:
465 return RollbackUpdate<int64_t>;
466 case PhysicalType::UINT8:
467 return RollbackUpdate<uint8_t>;
468 case PhysicalType::UINT16:
469 return RollbackUpdate<uint16_t>;
470 case PhysicalType::UINT32:
471 return RollbackUpdate<uint32_t>;
472 case PhysicalType::UINT64:
473 return RollbackUpdate<uint64_t>;
474 case PhysicalType::INT128:
475 return RollbackUpdate<hugeint_t>;
476 case PhysicalType::FLOAT:
477 return RollbackUpdate<float>;
478 case PhysicalType::DOUBLE:
479 return RollbackUpdate<double>;
480 case PhysicalType::INTERVAL:
481 return RollbackUpdate<interval_t>;
482 case PhysicalType::VARCHAR:
483 return RollbackUpdate<string_t>;
484 default:
485 throw NotImplementedException("Unimplemented type for uncompressed segment");
486 }
487}
488
489void UpdateSegment::RollbackUpdate(UpdateInfo &info) {
490 // obtain an exclusive lock
491 auto lock_handle = lock.GetExclusiveLock();
492
493 // move the data from the UpdateInfo back into the base info
494 D_ASSERT(root->info[info.vector_index]);
495 rollback_update_function(*root->info[info.vector_index]->info, info);
496
497 // clean up the update chain
498 CleanupUpdateInternal(lock: *lock_handle, info);
499}
500
501//===--------------------------------------------------------------------===//
502// Cleanup Update
503//===--------------------------------------------------------------------===//
504void UpdateSegment::CleanupUpdateInternal(const StorageLockKey &lock, UpdateInfo &info) {
505 D_ASSERT(info.prev);
506 auto prev = info.prev;
507 prev->next = info.next;
508 if (prev->next) {
509 prev->next->prev = prev;
510 }
511}
512
513void UpdateSegment::CleanupUpdate(UpdateInfo &info) {
514 // obtain an exclusive lock
515 auto lock_handle = lock.GetExclusiveLock();
516 CleanupUpdateInternal(lock: *lock_handle, info);
517}
518
519//===--------------------------------------------------------------------===//
520// Check for conflicts in update
521//===--------------------------------------------------------------------===//
522static void CheckForConflicts(UpdateInfo *info, TransactionData transaction, row_t *ids, const SelectionVector &sel,
523 idx_t count, row_t offset, UpdateInfo *&node) {
524 if (!info) {
525 return;
526 }
527 if (info->version_number == transaction.transaction_id) {
528 // this UpdateInfo belongs to the current transaction, set it in the node
529 node = info;
530 } else if (info->version_number > transaction.start_time) {
531 // potential conflict, check that tuple ids do not conflict
532 // as both ids and info->tuples are sorted, this is similar to a merge join
533 idx_t i = 0, j = 0;
534 while (true) {
535 auto id = ids[sel.get_index(idx: i)] - offset;
536 if (id == info->tuples[j]) {
537 throw TransactionException("Conflict on update!");
538 } else if (id < info->tuples[j]) {
539 // id < the current tuple in info, move to next id
540 i++;
541 if (i == count) {
542 break;
543 }
544 } else {
545 // id > the current tuple, move to next tuple in info
546 j++;
547 if (j == info->N) {
548 break;
549 }
550 }
551 }
552 }
553 CheckForConflicts(info: info->next, transaction, ids, sel, count, offset, node);
554}
555
556//===--------------------------------------------------------------------===//
557// Initialize update info
558//===--------------------------------------------------------------------===//
559void UpdateSegment::InitializeUpdateInfo(UpdateInfo &info, row_t *ids, const SelectionVector &sel, idx_t count,
560 idx_t vector_index, idx_t vector_offset) {
561 info.segment = this;
562 info.vector_index = vector_index;
563 info.prev = nullptr;
564 info.next = nullptr;
565
566 // set up the tuple ids
567 info.N = count;
568 for (idx_t i = 0; i < count; i++) {
569 auto idx = sel.get_index(idx: i);
570 auto id = ids[idx];
571 D_ASSERT(idx_t(id) >= vector_offset && idx_t(id) < vector_offset + STANDARD_VECTOR_SIZE);
572 info.tuples[i] = id - vector_offset;
573 };
574}
575
576static void InitializeUpdateValidity(UpdateInfo *base_info, Vector &base_data, UpdateInfo *update_info, Vector &update,
577 const SelectionVector &sel) {
578 auto &update_mask = FlatVector::Validity(vector&: update);
579 auto tuple_data = reinterpret_cast<bool *>(update_info->tuple_data);
580
581 if (!update_mask.AllValid()) {
582 for (idx_t i = 0; i < update_info->N; i++) {
583 auto idx = sel.get_index(idx: i);
584 tuple_data[i] = update_mask.RowIsValidUnsafe(row_idx: idx);
585 }
586 } else {
587 for (idx_t i = 0; i < update_info->N; i++) {
588 tuple_data[i] = true;
589 }
590 }
591
592 auto &base_mask = FlatVector::Validity(vector&: base_data);
593 auto base_tuple_data = reinterpret_cast<bool *>(base_info->tuple_data);
594 if (!base_mask.AllValid()) {
595 for (idx_t i = 0; i < base_info->N; i++) {
596 base_tuple_data[i] = base_mask.RowIsValidUnsafe(row_idx: base_info->tuples[i]);
597 }
598 } else {
599 for (idx_t i = 0; i < base_info->N; i++) {
600 base_tuple_data[i] = true;
601 }
602 }
603}
604
605struct UpdateSelectElement {
606 template <class T>
607 static T Operation(UpdateSegment *segment, T element) {
608 return element;
609 }
610};
611
612template <>
613string_t UpdateSelectElement::Operation(UpdateSegment *segment, string_t element) {
614 return element.IsInlined() ? element : segment->GetStringHeap().AddBlob(data: element);
615}
616
617template <class T>
618static void InitializeUpdateData(UpdateInfo *base_info, Vector &base_data, UpdateInfo *update_info, Vector &update,
619 const SelectionVector &sel) {
620 auto update_data = FlatVector::GetData<T>(update);
621 auto tuple_data = (T *)update_info->tuple_data;
622
623 for (idx_t i = 0; i < update_info->N; i++) {
624 auto idx = sel.get_index(idx: i);
625 tuple_data[i] = update_data[idx];
626 }
627
628 auto base_array_data = FlatVector::GetData<T>(base_data);
629 auto &base_validity = FlatVector::Validity(vector&: base_data);
630 auto base_tuple_data = (T *)base_info->tuple_data;
631 for (idx_t i = 0; i < base_info->N; i++) {
632 auto base_idx = base_info->tuples[i];
633 if (!base_validity.RowIsValid(row_idx: base_idx)) {
634 continue;
635 }
636 base_tuple_data[i] = UpdateSelectElement::Operation<T>(base_info->segment, base_array_data[base_idx]);
637 }
638}
639
640static UpdateSegment::initialize_update_function_t GetInitializeUpdateFunction(PhysicalType type) {
641 switch (type) {
642 case PhysicalType::BIT:
643 return InitializeUpdateValidity;
644 case PhysicalType::BOOL:
645 case PhysicalType::INT8:
646 return InitializeUpdateData<int8_t>;
647 case PhysicalType::INT16:
648 return InitializeUpdateData<int16_t>;
649 case PhysicalType::INT32:
650 return InitializeUpdateData<int32_t>;
651 case PhysicalType::INT64:
652 return InitializeUpdateData<int64_t>;
653 case PhysicalType::UINT8:
654 return InitializeUpdateData<uint8_t>;
655 case PhysicalType::UINT16:
656 return InitializeUpdateData<uint16_t>;
657 case PhysicalType::UINT32:
658 return InitializeUpdateData<uint32_t>;
659 case PhysicalType::UINT64:
660 return InitializeUpdateData<uint64_t>;
661 case PhysicalType::INT128:
662 return InitializeUpdateData<hugeint_t>;
663 case PhysicalType::FLOAT:
664 return InitializeUpdateData<float>;
665 case PhysicalType::DOUBLE:
666 return InitializeUpdateData<double>;
667 case PhysicalType::INTERVAL:
668 return InitializeUpdateData<interval_t>;
669 case PhysicalType::VARCHAR:
670 return InitializeUpdateData<string_t>;
671 default:
672 throw NotImplementedException("Unimplemented type for update segment");
673 }
674}
675
676//===--------------------------------------------------------------------===//
677// Merge update info
678//===--------------------------------------------------------------------===//
679template <class F1, class F2, class F3>
680static idx_t MergeLoop(row_t a[], sel_t b[], idx_t acount, idx_t bcount, idx_t aoffset, F1 merge, F2 pick_a, F3 pick_b,
681 const SelectionVector &asel) {
682 idx_t aidx = 0, bidx = 0;
683 idx_t count = 0;
684 while (aidx < acount && bidx < bcount) {
685 auto a_index = asel.get_index(idx: aidx);
686 auto a_id = a[a_index] - aoffset;
687 auto b_id = b[bidx];
688 if (a_id == b_id) {
689 merge(a_id, a_index, bidx, count);
690 aidx++;
691 bidx++;
692 count++;
693 } else if (a_id < b_id) {
694 pick_a(a_id, a_index, count);
695 aidx++;
696 count++;
697 } else {
698 pick_b(b_id, bidx, count);
699 bidx++;
700 count++;
701 }
702 }
703 for (; aidx < acount; aidx++) {
704 auto a_index = asel.get_index(idx: aidx);
705 pick_a(a[a_index] - aoffset, a_index, count);
706 count++;
707 }
708 for (; bidx < bcount; bidx++) {
709 pick_b(b[bidx], bidx, count);
710 count++;
711 }
712 return count;
713}
714
715struct ExtractStandardEntry {
716 template <class T, class V>
717 static T Extract(V *data, idx_t entry) {
718 return data[entry];
719 }
720};
721
722struct ExtractValidityEntry {
723 template <class T, class V>
724 static T Extract(V *data, idx_t entry) {
725 return data->RowIsValid(entry);
726 }
727};
728
729template <class T, class V, class OP = ExtractStandardEntry>
730static void MergeUpdateLoopInternal(UpdateInfo *base_info, V *base_table_data, UpdateInfo *update_info,
731 V *update_vector_data, row_t *ids, idx_t count, const SelectionVector &sel) {
732 auto base_id = base_info->segment->column_data.start + base_info->vector_index * STANDARD_VECTOR_SIZE;
733#ifdef DEBUG
734 // all of these should be sorted, otherwise the below algorithm does not work
735 for (idx_t i = 1; i < count; i++) {
736 auto prev_idx = sel.get_index(i - 1);
737 auto idx = sel.get_index(i);
738 D_ASSERT(ids[idx] > ids[prev_idx] && ids[idx] >= row_t(base_id) &&
739 ids[idx] < row_t(base_id + STANDARD_VECTOR_SIZE));
740 }
741#endif
742
743 // we have a new batch of updates (update, ids, count)
744 // we already have existing updates (base_info)
745 // and potentially, this transaction already has updates present (update_info)
746 // we need to merge these all together so that the latest updates get merged into base_info
747 // and the "old" values (fetched from EITHER base_info OR from base_data) get placed into update_info
748 auto base_info_data = (T *)base_info->tuple_data;
749 auto update_info_data = (T *)update_info->tuple_data;
750
751 // we first do the merging of the old values
752 // what we are trying to do here is update the "update_info" of this transaction with all the old data we require
753 // this means we need to merge (1) any previously updated values (stored in update_info->tuples)
754 // together with (2)
755 // to simplify this, we create new arrays here
756 // we memcpy these over afterwards
757 T result_values[STANDARD_VECTOR_SIZE];
758 sel_t result_ids[STANDARD_VECTOR_SIZE];
759
760 idx_t base_info_offset = 0;
761 idx_t update_info_offset = 0;
762 idx_t result_offset = 0;
763 for (idx_t i = 0; i < count; i++) {
764 auto idx = sel.get_index(idx: i);
765 // we have to merge the info for "ids[i]"
766 auto update_id = ids[idx] - base_id;
767
768 while (update_info_offset < update_info->N && update_info->tuples[update_info_offset] < update_id) {
769 // old id comes before the current id: write it
770 result_values[result_offset] = update_info_data[update_info_offset];
771 result_ids[result_offset++] = update_info->tuples[update_info_offset];
772 update_info_offset++;
773 }
774 // write the new id
775 if (update_info_offset < update_info->N && update_info->tuples[update_info_offset] == update_id) {
776 // we have an id that is equivalent in the current update info: write the update info
777 result_values[result_offset] = update_info_data[update_info_offset];
778 result_ids[result_offset++] = update_info->tuples[update_info_offset];
779 update_info_offset++;
780 continue;
781 }
782
783 /// now check if we have the current update_id in the base_info, or if we should fetch it from the base data
784 while (base_info_offset < base_info->N && base_info->tuples[base_info_offset] < update_id) {
785 base_info_offset++;
786 }
787 if (base_info_offset < base_info->N && base_info->tuples[base_info_offset] == update_id) {
788 // it is! we have to move the tuple from base_info->ids[base_info_offset] to update_info
789 result_values[result_offset] = base_info_data[base_info_offset];
790 } else {
791 // it is not! we have to move base_table_data[update_id] to update_info
792 result_values[result_offset] = UpdateSelectElement::Operation<T>(
793 base_info->segment, OP::template Extract<T, V>(base_table_data, update_id));
794 }
795 result_ids[result_offset++] = update_id;
796 }
797 // write any remaining entries from the old updates
798 while (update_info_offset < update_info->N) {
799 result_values[result_offset] = update_info_data[update_info_offset];
800 result_ids[result_offset++] = update_info->tuples[update_info_offset];
801 update_info_offset++;
802 }
803 // now copy them back
804 update_info->N = result_offset;
805 memcpy(update_info_data, result_values, result_offset * sizeof(T));
806 memcpy(dest: update_info->tuples, src: result_ids, n: result_offset * sizeof(sel_t));
807
808 // now we merge the new values into the base_info
809 result_offset = 0;
810 auto pick_new = [&](idx_t id, idx_t aidx, idx_t count) {
811 result_values[result_offset] = OP::template Extract<T, V>(update_vector_data, aidx);
812 result_ids[result_offset] = id;
813 result_offset++;
814 };
815 auto pick_old = [&](idx_t id, idx_t bidx, idx_t count) {
816 result_values[result_offset] = base_info_data[bidx];
817 result_ids[result_offset] = id;
818 result_offset++;
819 };
820 // now we perform a merge of the new ids with the old ids
821 auto merge = [&](idx_t id, idx_t aidx, idx_t bidx, idx_t count) {
822 pick_new(id, aidx, count);
823 };
824 MergeLoop(ids, base_info->tuples, count, base_info->N, base_id, merge, pick_new, pick_old, sel);
825
826 base_info->N = result_offset;
827 memcpy(base_info_data, result_values, result_offset * sizeof(T));
828 memcpy(dest: base_info->tuples, src: result_ids, n: result_offset * sizeof(sel_t));
829}
830
831static void MergeValidityLoop(UpdateInfo *base_info, Vector &base_data, UpdateInfo *update_info, Vector &update,
832 row_t *ids, idx_t count, const SelectionVector &sel) {
833 auto &base_validity = FlatVector::Validity(vector&: base_data);
834 auto &update_validity = FlatVector::Validity(vector&: update);
835 MergeUpdateLoopInternal<bool, ValidityMask, ExtractValidityEntry>(base_info, base_table_data: &base_validity, update_info,
836 update_vector_data: &update_validity, ids, count, sel);
837}
838
839template <class T>
840static void MergeUpdateLoop(UpdateInfo *base_info, Vector &base_data, UpdateInfo *update_info, Vector &update,
841 row_t *ids, idx_t count, const SelectionVector &sel) {
842 auto base_table_data = FlatVector::GetData<T>(base_data);
843 auto update_vector_data = FlatVector::GetData<T>(update);
844 MergeUpdateLoopInternal<T, T>(base_info, base_table_data, update_info, update_vector_data, ids, count, sel);
845}
846
847static UpdateSegment::merge_update_function_t GetMergeUpdateFunction(PhysicalType type) {
848 switch (type) {
849 case PhysicalType::BIT:
850 return MergeValidityLoop;
851 case PhysicalType::BOOL:
852 case PhysicalType::INT8:
853 return MergeUpdateLoop<int8_t>;
854 case PhysicalType::INT16:
855 return MergeUpdateLoop<int16_t>;
856 case PhysicalType::INT32:
857 return MergeUpdateLoop<int32_t>;
858 case PhysicalType::INT64:
859 return MergeUpdateLoop<int64_t>;
860 case PhysicalType::UINT8:
861 return MergeUpdateLoop<uint8_t>;
862 case PhysicalType::UINT16:
863 return MergeUpdateLoop<uint16_t>;
864 case PhysicalType::UINT32:
865 return MergeUpdateLoop<uint32_t>;
866 case PhysicalType::UINT64:
867 return MergeUpdateLoop<uint64_t>;
868 case PhysicalType::INT128:
869 return MergeUpdateLoop<hugeint_t>;
870 case PhysicalType::FLOAT:
871 return MergeUpdateLoop<float>;
872 case PhysicalType::DOUBLE:
873 return MergeUpdateLoop<double>;
874 case PhysicalType::INTERVAL:
875 return MergeUpdateLoop<interval_t>;
876 case PhysicalType::VARCHAR:
877 return MergeUpdateLoop<string_t>;
878 default:
879 throw NotImplementedException("Unimplemented type for uncompressed segment");
880 }
881}
882
883//===--------------------------------------------------------------------===//
884// Update statistics
885//===--------------------------------------------------------------------===//
886unique_ptr<BaseStatistics> UpdateSegment::GetStatistics() {
887 lock_guard<mutex> stats_guard(stats_lock);
888 return stats.statistics.ToUnique();
889}
890
891idx_t UpdateValidityStatistics(UpdateSegment *segment, SegmentStatistics &stats, Vector &update, idx_t count,
892 SelectionVector &sel) {
893 auto &mask = FlatVector::Validity(vector&: update);
894 auto &validity = stats.statistics;
895 if (!mask.AllValid() && !validity.CanHaveNull()) {
896 for (idx_t i = 0; i < count; i++) {
897 if (!mask.RowIsValid(row_idx: i)) {
898 validity.SetHasNull();
899 break;
900 }
901 }
902 }
903 sel.Initialize(sel: nullptr);
904 return count;
905}
906
907template <class T>
908idx_t TemplatedUpdateNumericStatistics(UpdateSegment *segment, SegmentStatistics &stats, Vector &update, idx_t count,
909 SelectionVector &sel) {
910 auto update_data = FlatVector::GetData<T>(update);
911 auto &mask = FlatVector::Validity(vector&: update);
912
913 if (mask.AllValid()) {
914 for (idx_t i = 0; i < count; i++) {
915 NumericStats::Update<T>(stats.statistics, update_data[i]);
916 }
917 sel.Initialize(sel: nullptr);
918 return count;
919 } else {
920 idx_t not_null_count = 0;
921 sel.Initialize(STANDARD_VECTOR_SIZE);
922 for (idx_t i = 0; i < count; i++) {
923 if (mask.RowIsValid(row_idx: i)) {
924 sel.set_index(idx: not_null_count++, loc: i);
925 NumericStats::Update<T>(stats.statistics, update_data[i]);
926 }
927 }
928 return not_null_count;
929 }
930}
931
932idx_t UpdateStringStatistics(UpdateSegment *segment, SegmentStatistics &stats, Vector &update, idx_t count,
933 SelectionVector &sel) {
934 auto update_data = FlatVector::GetData<string_t>(vector&: update);
935 auto &mask = FlatVector::Validity(vector&: update);
936 if (mask.AllValid()) {
937 for (idx_t i = 0; i < count; i++) {
938 StringStats::Update(stats&: stats.statistics, value: update_data[i]);
939 if (!update_data[i].IsInlined()) {
940 update_data[i] = segment->GetStringHeap().AddBlob(data: update_data[i]);
941 }
942 }
943 sel.Initialize(sel: nullptr);
944 return count;
945 } else {
946 idx_t not_null_count = 0;
947 sel.Initialize(STANDARD_VECTOR_SIZE);
948 for (idx_t i = 0; i < count; i++) {
949 if (mask.RowIsValid(row_idx: i)) {
950 sel.set_index(idx: not_null_count++, loc: i);
951 StringStats::Update(stats&: stats.statistics, value: update_data[i]);
952 if (!update_data[i].IsInlined()) {
953 update_data[i] = segment->GetStringHeap().AddBlob(data: update_data[i]);
954 }
955 }
956 }
957 return not_null_count;
958 }
959}
960
961UpdateSegment::statistics_update_function_t GetStatisticsUpdateFunction(PhysicalType type) {
962 switch (type) {
963 case PhysicalType::BIT:
964 return UpdateValidityStatistics;
965 case PhysicalType::BOOL:
966 case PhysicalType::INT8:
967 return TemplatedUpdateNumericStatistics<int8_t>;
968 case PhysicalType::INT16:
969 return TemplatedUpdateNumericStatistics<int16_t>;
970 case PhysicalType::INT32:
971 return TemplatedUpdateNumericStatistics<int32_t>;
972 case PhysicalType::INT64:
973 return TemplatedUpdateNumericStatistics<int64_t>;
974 case PhysicalType::UINT8:
975 return TemplatedUpdateNumericStatistics<uint8_t>;
976 case PhysicalType::UINT16:
977 return TemplatedUpdateNumericStatistics<uint16_t>;
978 case PhysicalType::UINT32:
979 return TemplatedUpdateNumericStatistics<uint32_t>;
980 case PhysicalType::UINT64:
981 return TemplatedUpdateNumericStatistics<uint64_t>;
982 case PhysicalType::INT128:
983 return TemplatedUpdateNumericStatistics<hugeint_t>;
984 case PhysicalType::FLOAT:
985 return TemplatedUpdateNumericStatistics<float>;
986 case PhysicalType::DOUBLE:
987 return TemplatedUpdateNumericStatistics<double>;
988 case PhysicalType::INTERVAL:
989 return TemplatedUpdateNumericStatistics<interval_t>;
990 case PhysicalType::VARCHAR:
991 return UpdateStringStatistics;
992 default:
993 throw NotImplementedException("Unimplemented type for uncompressed segment");
994 }
995}
996
997//===--------------------------------------------------------------------===//
998// Update
999//===--------------------------------------------------------------------===//
1000static idx_t SortSelectionVector(SelectionVector &sel, idx_t count, row_t *ids) {
1001 D_ASSERT(count > 0);
1002
1003 bool is_sorted = true;
1004 for (idx_t i = 1; i < count; i++) {
1005 auto prev_idx = sel.get_index(idx: i - 1);
1006 auto idx = sel.get_index(idx: i);
1007 if (ids[idx] <= ids[prev_idx]) {
1008 is_sorted = false;
1009 break;
1010 }
1011 }
1012 if (is_sorted) {
1013 // already sorted: bailout
1014 return count;
1015 }
1016 // not sorted: need to sort the selection vector
1017 SelectionVector sorted_sel(count);
1018 for (idx_t i = 0; i < count; i++) {
1019 sorted_sel.set_index(idx: i, loc: sel.get_index(idx: i));
1020 }
1021 std::sort(first: sorted_sel.data(), last: sorted_sel.data() + count, comp: [&](sel_t l, sel_t r) { return ids[l] < ids[r]; });
1022 // eliminate any duplicates
1023 idx_t pos = 1;
1024 for (idx_t i = 1; i < count; i++) {
1025 auto prev_idx = sorted_sel.get_index(idx: i - 1);
1026 auto idx = sorted_sel.get_index(idx: i);
1027 D_ASSERT(ids[idx] >= ids[prev_idx]);
1028 if (ids[prev_idx] != ids[idx]) {
1029 sorted_sel.set_index(idx: pos++, loc: idx);
1030 }
1031 }
1032#ifdef DEBUG
1033 for (idx_t i = 1; i < pos; i++) {
1034 auto prev_idx = sorted_sel.get_index(i - 1);
1035 auto idx = sorted_sel.get_index(i);
1036 D_ASSERT(ids[idx] > ids[prev_idx]);
1037 }
1038#endif
1039
1040 sel.Initialize(other: sorted_sel);
1041 D_ASSERT(pos > 0);
1042 return pos;
1043}
1044
1045UpdateInfo *CreateEmptyUpdateInfo(TransactionData transaction, idx_t type_size, idx_t count,
1046 unsafe_unique_array<char> &data) {
1047 data = make_unsafe_uniq_array<char>(n: sizeof(UpdateInfo) + (sizeof(sel_t) + type_size) * STANDARD_VECTOR_SIZE);
1048 auto update_info = reinterpret_cast<UpdateInfo *>(data.get());
1049 update_info->max = STANDARD_VECTOR_SIZE;
1050 update_info->tuples = reinterpret_cast<sel_t *>((data_ptr_cast(src: update_info)) + sizeof(UpdateInfo));
1051 update_info->tuple_data = (data_ptr_cast(src: update_info)) + sizeof(UpdateInfo) + sizeof(sel_t) * update_info->max;
1052 update_info->version_number = transaction.transaction_id;
1053 return update_info;
1054}
1055
1056void UpdateSegment::Update(TransactionData transaction, idx_t column_index, Vector &update, row_t *ids, idx_t count,
1057 Vector &base_data) {
1058 // obtain an exclusive lock
1059 auto write_lock = lock.GetExclusiveLock();
1060
1061 update.Flatten(count);
1062
1063 // update statistics
1064 SelectionVector sel;
1065 {
1066 lock_guard<mutex> stats_guard(stats_lock);
1067 count = statistics_update_function(this, stats, update, count, sel);
1068 }
1069 if (count == 0) {
1070 return;
1071 }
1072
1073 // subsequent algorithms used by the update require row ids to be (1) sorted, and (2) unique
1074 // this is usually the case for "standard" queries (e.g. UPDATE tbl SET x=bla WHERE cond)
1075 // however, for more exotic queries involving e.g. cross products/joins this might not be the case
1076 // hence we explicitly check here if the ids are sorted and, if not, sort + duplicate eliminate them
1077 count = SortSelectionVector(sel, count, ids);
1078 D_ASSERT(count > 0);
1079
1080 // create the versions for this segment, if there are none yet
1081 if (!root) {
1082 root = make_uniq<UpdateNode>();
1083 }
1084
1085 // get the vector index based on the first id
1086 // we assert that all updates must be part of the same vector
1087 auto first_id = ids[sel.get_index(idx: 0)];
1088 idx_t vector_index = (first_id - column_data.start) / STANDARD_VECTOR_SIZE;
1089 idx_t vector_offset = column_data.start + vector_index * STANDARD_VECTOR_SIZE;
1090
1091 D_ASSERT(idx_t(first_id) >= column_data.start);
1092 D_ASSERT(vector_index < RowGroup::ROW_GROUP_VECTOR_COUNT);
1093
1094 // first check the version chain
1095 UpdateInfo *node = nullptr;
1096
1097 if (root->info[vector_index]) {
1098 // there is already a version here, check if there are any conflicts and search for the node that belongs to
1099 // this transaction in the version chain
1100 auto base_info = root->info[vector_index]->info.get();
1101 CheckForConflicts(info: base_info->next, transaction, ids, sel, count, offset: vector_offset, node);
1102
1103 // there are no conflicts
1104 // first, check if this thread has already done any updates
1105 auto node = base_info->next;
1106 while (node) {
1107 if (node->version_number == transaction.transaction_id) {
1108 // it has! use this node
1109 break;
1110 }
1111 node = node->next;
1112 }
1113 unsafe_unique_array<char> update_info_data;
1114 if (!node) {
1115 // no updates made yet by this transaction: initially the update info to empty
1116 if (transaction.transaction) {
1117 auto &dtransaction = transaction.transaction->Cast<DuckTransaction>();
1118 node = dtransaction.CreateUpdateInfo(type_size, entries: count);
1119 } else {
1120 node = CreateEmptyUpdateInfo(transaction, type_size, count, data&: update_info_data);
1121 }
1122 node->segment = this;
1123 node->vector_index = vector_index;
1124 node->N = 0;
1125 node->column_index = column_index;
1126
1127 // insert the new node into the chain
1128 node->next = base_info->next;
1129 if (node->next) {
1130 node->next->prev = node;
1131 }
1132 node->prev = base_info;
1133 base_info->next = transaction.transaction ? node : nullptr;
1134 }
1135 base_info->Verify();
1136 node->Verify();
1137
1138 // now we are going to perform the merge
1139 merge_update_function(base_info, base_data, node, update, ids, count, sel);
1140
1141 base_info->Verify();
1142 node->Verify();
1143 } else {
1144 // there is no version info yet: create the top level update info and fill it with the updates
1145 auto result = make_uniq<UpdateNodeData>();
1146
1147 result->info = make_uniq<UpdateInfo>();
1148 result->tuples = make_unsafe_uniq_array<sel_t>(STANDARD_VECTOR_SIZE);
1149 result->tuple_data = make_unsafe_uniq_array<data_t>(STANDARD_VECTOR_SIZE * type_size);
1150 result->info->tuples = result->tuples.get();
1151 result->info->tuple_data = result->tuple_data.get();
1152 result->info->version_number = TRANSACTION_ID_START - 1;
1153 result->info->column_index = column_index;
1154 InitializeUpdateInfo(info&: *result->info, ids, sel, count, vector_index, vector_offset);
1155
1156 // now create the transaction level update info in the undo log
1157 unsafe_unique_array<char> update_info_data;
1158 UpdateInfo *transaction_node;
1159 if (transaction.transaction) {
1160 transaction_node = transaction.transaction->CreateUpdateInfo(type_size, entries: count);
1161 } else {
1162 transaction_node = CreateEmptyUpdateInfo(transaction, type_size, count, data&: update_info_data);
1163 }
1164
1165 InitializeUpdateInfo(info&: *transaction_node, ids, sel, count, vector_index, vector_offset);
1166
1167 // we write the updates in the update node data, and write the updates in the info
1168 initialize_update_function(transaction_node, base_data, result->info.get(), update, sel);
1169
1170 result->info->next = transaction.transaction ? transaction_node : nullptr;
1171 result->info->prev = nullptr;
1172 transaction_node->next = nullptr;
1173 transaction_node->prev = result->info.get();
1174 transaction_node->column_index = column_index;
1175
1176 transaction_node->Verify();
1177 result->info->Verify();
1178
1179 root->info[vector_index] = std::move(result);
1180 }
1181}
1182
1183bool UpdateSegment::HasUpdates() const {
1184 return root.get() != nullptr;
1185}
1186
1187bool UpdateSegment::HasUpdates(idx_t vector_index) const {
1188 if (!HasUpdates()) {
1189 return false;
1190 }
1191 return root->info[vector_index].get();
1192}
1193
1194bool UpdateSegment::HasUncommittedUpdates(idx_t vector_index) {
1195 if (!HasUpdates(vector_index)) {
1196 return false;
1197 }
1198 auto read_lock = lock.GetSharedLock();
1199 auto entry = root->info[vector_index].get();
1200 if (entry->info->next) {
1201 return true;
1202 }
1203 return false;
1204}
1205
1206bool UpdateSegment::HasUpdates(idx_t start_row_index, idx_t end_row_index) {
1207 if (!HasUpdates()) {
1208 return false;
1209 }
1210 auto read_lock = lock.GetSharedLock();
1211 idx_t base_vector_index = start_row_index / STANDARD_VECTOR_SIZE;
1212 idx_t end_vector_index = end_row_index / STANDARD_VECTOR_SIZE;
1213 for (idx_t i = base_vector_index; i <= end_vector_index; i++) {
1214 if (root->info[i]) {
1215 return true;
1216 }
1217 }
1218 return false;
1219}
1220
1221} // namespace duckdb
1222