1 | #include "duckdb/storage/table/update_segment.hpp" |
2 | |
3 | #include "duckdb/storage/statistics/distinct_statistics.hpp" |
4 | |
5 | #include "duckdb/storage/table/column_data.hpp" |
6 | #include "duckdb/transaction/duck_transaction.hpp" |
7 | #include "duckdb/transaction/update_info.hpp" |
8 | #include "duckdb/common/printer.hpp" |
9 | |
10 | #include <algorithm> |
11 | |
12 | namespace duckdb { |
13 | |
14 | static UpdateSegment::initialize_update_function_t GetInitializeUpdateFunction(PhysicalType type); |
15 | static UpdateSegment::fetch_update_function_t GetFetchUpdateFunction(PhysicalType type); |
16 | static UpdateSegment::fetch_committed_function_t GetFetchCommittedFunction(PhysicalType type); |
17 | static UpdateSegment::fetch_committed_range_function_t GetFetchCommittedRangeFunction(PhysicalType type); |
18 | |
19 | static UpdateSegment::merge_update_function_t GetMergeUpdateFunction(PhysicalType type); |
20 | static UpdateSegment::rollback_update_function_t GetRollbackUpdateFunction(PhysicalType type); |
21 | static UpdateSegment::statistics_update_function_t GetStatisticsUpdateFunction(PhysicalType type); |
22 | static UpdateSegment::fetch_row_function_t GetFetchRowFunction(PhysicalType type); |
23 | |
24 | UpdateSegment::UpdateSegment(ColumnData &column_data) |
25 | : column_data(column_data), stats(column_data.type), heap(BufferAllocator::Get(db&: column_data.GetDatabase())) { |
26 | auto physical_type = column_data.type.InternalType(); |
27 | |
28 | this->type_size = GetTypeIdSize(type: physical_type); |
29 | |
30 | this->initialize_update_function = GetInitializeUpdateFunction(type: physical_type); |
31 | this->fetch_update_function = GetFetchUpdateFunction(type: physical_type); |
32 | this->fetch_committed_function = GetFetchCommittedFunction(type: physical_type); |
33 | this->fetch_committed_range = GetFetchCommittedRangeFunction(type: physical_type); |
34 | this->fetch_row_function = GetFetchRowFunction(type: physical_type); |
35 | this->merge_update_function = GetMergeUpdateFunction(type: physical_type); |
36 | this->rollback_update_function = GetRollbackUpdateFunction(type: physical_type); |
37 | this->statistics_update_function = GetStatisticsUpdateFunction(type: physical_type); |
38 | } |
39 | |
40 | UpdateSegment::~UpdateSegment() { |
41 | } |
42 | |
43 | //===--------------------------------------------------------------------===// |
44 | // Update Info Helpers |
45 | //===--------------------------------------------------------------------===// |
46 | Value UpdateInfo::GetValue(idx_t index) { |
47 | auto &type = segment->column_data.type; |
48 | |
49 | switch (type.id()) { |
50 | case LogicalTypeId::VALIDITY: |
51 | return Value::BOOLEAN(value: reinterpret_cast<bool *>(tuple_data)[index]); |
52 | case LogicalTypeId::INTEGER: |
53 | return Value::INTEGER(value: reinterpret_cast<int32_t *>(tuple_data)[index]); |
54 | default: |
55 | throw NotImplementedException("Unimplemented type for UpdateInfo::GetValue" ); |
56 | } |
57 | } |
58 | |
59 | void UpdateInfo::Print() { |
60 | Printer::Print(str: ToString()); |
61 | } |
62 | |
63 | string UpdateInfo::ToString() { |
64 | auto &type = segment->column_data.type; |
65 | string result = "Update Info [" + type.ToString() + ", Count: " + to_string(val: N) + |
66 | ", Transaction Id: " + to_string(val: version_number) + "]\n" ; |
67 | for (idx_t i = 0; i < N; i++) { |
68 | result += to_string(val: tuples[i]) + ": " + GetValue(index: i).ToString() + "\n" ; |
69 | } |
70 | if (next) { |
71 | result += "\nChild Segment: " + next->ToString(); |
72 | } |
73 | return result; |
74 | } |
75 | |
76 | void UpdateInfo::Verify() { |
77 | #ifdef DEBUG |
78 | for (idx_t i = 1; i < N; i++) { |
79 | D_ASSERT(tuples[i] > tuples[i - 1] && tuples[i] < STANDARD_VECTOR_SIZE); |
80 | } |
81 | #endif |
82 | } |
83 | |
84 | //===--------------------------------------------------------------------===// |
85 | // Update Fetch |
86 | //===--------------------------------------------------------------------===// |
87 | static void MergeValidityInfo(UpdateInfo *current, ValidityMask &result_mask) { |
88 | auto info_data = reinterpret_cast<bool *>(current->tuple_data); |
89 | for (idx_t i = 0; i < current->N; i++) { |
90 | result_mask.Set(row_idx: current->tuples[i], valid: info_data[i]); |
91 | } |
92 | } |
93 | |
94 | static void UpdateMergeValidity(transaction_t start_time, transaction_t transaction_id, UpdateInfo *info, |
95 | Vector &result) { |
96 | auto &result_mask = FlatVector::Validity(vector&: result); |
97 | UpdateInfo::UpdatesForTransaction(current: info, start_time, transaction_id, |
98 | callback: [&](UpdateInfo *current) { MergeValidityInfo(current, result_mask); }); |
99 | } |
100 | |
101 | template <class T> |
102 | static void MergeUpdateInfo(UpdateInfo *current, T *result_data) { |
103 | auto info_data = reinterpret_cast<T *>(current->tuple_data); |
104 | if (current->N == STANDARD_VECTOR_SIZE) { |
105 | // special case: update touches ALL tuples of this vector |
106 | // in this case we can just memcpy the data |
107 | // since the layout of the update info is guaranteed to be [0, 1, 2, 3, ...] |
108 | memcpy(result_data, info_data, sizeof(T) * current->N); |
109 | } else { |
110 | for (idx_t i = 0; i < current->N; i++) { |
111 | result_data[current->tuples[i]] = info_data[i]; |
112 | } |
113 | } |
114 | } |
115 | |
116 | template <class T> |
117 | static void UpdateMergeFetch(transaction_t start_time, transaction_t transaction_id, UpdateInfo *info, Vector &result) { |
118 | auto result_data = FlatVector::GetData<T>(result); |
119 | UpdateInfo::UpdatesForTransaction(info, start_time, transaction_id, |
120 | [&](UpdateInfo *current) { MergeUpdateInfo<T>(current, result_data); }); |
121 | } |
122 | |
123 | static UpdateSegment::fetch_update_function_t GetFetchUpdateFunction(PhysicalType type) { |
124 | switch (type) { |
125 | case PhysicalType::BIT: |
126 | return UpdateMergeValidity; |
127 | case PhysicalType::BOOL: |
128 | case PhysicalType::INT8: |
129 | return UpdateMergeFetch<int8_t>; |
130 | case PhysicalType::INT16: |
131 | return UpdateMergeFetch<int16_t>; |
132 | case PhysicalType::INT32: |
133 | return UpdateMergeFetch<int32_t>; |
134 | case PhysicalType::INT64: |
135 | return UpdateMergeFetch<int64_t>; |
136 | case PhysicalType::UINT8: |
137 | return UpdateMergeFetch<uint8_t>; |
138 | case PhysicalType::UINT16: |
139 | return UpdateMergeFetch<uint16_t>; |
140 | case PhysicalType::UINT32: |
141 | return UpdateMergeFetch<uint32_t>; |
142 | case PhysicalType::UINT64: |
143 | return UpdateMergeFetch<uint64_t>; |
144 | case PhysicalType::INT128: |
145 | return UpdateMergeFetch<hugeint_t>; |
146 | case PhysicalType::FLOAT: |
147 | return UpdateMergeFetch<float>; |
148 | case PhysicalType::DOUBLE: |
149 | return UpdateMergeFetch<double>; |
150 | case PhysicalType::INTERVAL: |
151 | return UpdateMergeFetch<interval_t>; |
152 | case PhysicalType::VARCHAR: |
153 | return UpdateMergeFetch<string_t>; |
154 | default: |
155 | throw NotImplementedException("Unimplemented type for update segment" ); |
156 | } |
157 | } |
158 | |
159 | void UpdateSegment::FetchUpdates(TransactionData transaction, idx_t vector_index, Vector &result) { |
160 | auto lock_handle = lock.GetSharedLock(); |
161 | if (!root) { |
162 | return; |
163 | } |
164 | if (!root->info[vector_index]) { |
165 | return; |
166 | } |
167 | // FIXME: normalify if this is not the case... need to pass in count? |
168 | D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR); |
169 | |
170 | fetch_update_function(transaction.start_time, transaction.transaction_id, root->info[vector_index]->info.get(), |
171 | result); |
172 | } |
173 | |
174 | //===--------------------------------------------------------------------===// |
175 | // Fetch Committed |
176 | //===--------------------------------------------------------------------===// |
177 | static void FetchCommittedValidity(UpdateInfo *info, Vector &result) { |
178 | auto &result_mask = FlatVector::Validity(vector&: result); |
179 | MergeValidityInfo(current: info, result_mask); |
180 | } |
181 | |
182 | template <class T> |
183 | static void TemplatedFetchCommitted(UpdateInfo *info, Vector &result) { |
184 | auto result_data = FlatVector::GetData<T>(result); |
185 | MergeUpdateInfo<T>(info, result_data); |
186 | } |
187 | |
188 | static UpdateSegment::fetch_committed_function_t GetFetchCommittedFunction(PhysicalType type) { |
189 | switch (type) { |
190 | case PhysicalType::BIT: |
191 | return FetchCommittedValidity; |
192 | case PhysicalType::BOOL: |
193 | case PhysicalType::INT8: |
194 | return TemplatedFetchCommitted<int8_t>; |
195 | case PhysicalType::INT16: |
196 | return TemplatedFetchCommitted<int16_t>; |
197 | case PhysicalType::INT32: |
198 | return TemplatedFetchCommitted<int32_t>; |
199 | case PhysicalType::INT64: |
200 | return TemplatedFetchCommitted<int64_t>; |
201 | case PhysicalType::UINT8: |
202 | return TemplatedFetchCommitted<uint8_t>; |
203 | case PhysicalType::UINT16: |
204 | return TemplatedFetchCommitted<uint16_t>; |
205 | case PhysicalType::UINT32: |
206 | return TemplatedFetchCommitted<uint32_t>; |
207 | case PhysicalType::UINT64: |
208 | return TemplatedFetchCommitted<uint64_t>; |
209 | case PhysicalType::INT128: |
210 | return TemplatedFetchCommitted<hugeint_t>; |
211 | case PhysicalType::FLOAT: |
212 | return TemplatedFetchCommitted<float>; |
213 | case PhysicalType::DOUBLE: |
214 | return TemplatedFetchCommitted<double>; |
215 | case PhysicalType::INTERVAL: |
216 | return TemplatedFetchCommitted<interval_t>; |
217 | case PhysicalType::VARCHAR: |
218 | return TemplatedFetchCommitted<string_t>; |
219 | default: |
220 | throw NotImplementedException("Unimplemented type for update segment" ); |
221 | } |
222 | } |
223 | |
224 | void UpdateSegment::FetchCommitted(idx_t vector_index, Vector &result) { |
225 | auto lock_handle = lock.GetSharedLock(); |
226 | |
227 | if (!root) { |
228 | return; |
229 | } |
230 | if (!root->info[vector_index]) { |
231 | return; |
232 | } |
233 | // FIXME: normalify if this is not the case... need to pass in count? |
234 | D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR); |
235 | |
236 | fetch_committed_function(root->info[vector_index]->info.get(), result); |
237 | } |
238 | |
239 | //===--------------------------------------------------------------------===// |
240 | // Fetch Range |
241 | //===--------------------------------------------------------------------===// |
242 | static void MergeUpdateInfoRangeValidity(UpdateInfo *current, idx_t start, idx_t end, idx_t result_offset, |
243 | ValidityMask &result_mask) { |
244 | auto info_data = reinterpret_cast<bool *>(current->tuple_data); |
245 | for (idx_t i = 0; i < current->N; i++) { |
246 | auto tuple_idx = current->tuples[i]; |
247 | if (tuple_idx < start) { |
248 | continue; |
249 | } else if (tuple_idx >= end) { |
250 | break; |
251 | } |
252 | auto result_idx = result_offset + tuple_idx - start; |
253 | result_mask.Set(row_idx: result_idx, valid: info_data[i]); |
254 | } |
255 | } |
256 | |
257 | static void FetchCommittedRangeValidity(UpdateInfo *info, idx_t start, idx_t end, idx_t result_offset, Vector &result) { |
258 | auto &result_mask = FlatVector::Validity(vector&: result); |
259 | MergeUpdateInfoRangeValidity(current: info, start, end, result_offset, result_mask); |
260 | } |
261 | |
262 | template <class T> |
263 | static void MergeUpdateInfoRange(UpdateInfo *current, idx_t start, idx_t end, idx_t result_offset, T *result_data) { |
264 | auto info_data = reinterpret_cast<T *>(current->tuple_data); |
265 | for (idx_t i = 0; i < current->N; i++) { |
266 | auto tuple_idx = current->tuples[i]; |
267 | if (tuple_idx < start) { |
268 | continue; |
269 | } else if (tuple_idx >= end) { |
270 | break; |
271 | } |
272 | auto result_idx = result_offset + tuple_idx - start; |
273 | result_data[result_idx] = info_data[i]; |
274 | } |
275 | } |
276 | |
277 | template <class T> |
278 | static void TemplatedFetchCommittedRange(UpdateInfo *info, idx_t start, idx_t end, idx_t result_offset, |
279 | Vector &result) { |
280 | auto result_data = FlatVector::GetData<T>(result); |
281 | MergeUpdateInfoRange<T>(info, start, end, result_offset, result_data); |
282 | } |
283 | |
284 | static UpdateSegment::fetch_committed_range_function_t GetFetchCommittedRangeFunction(PhysicalType type) { |
285 | switch (type) { |
286 | case PhysicalType::BIT: |
287 | return FetchCommittedRangeValidity; |
288 | case PhysicalType::BOOL: |
289 | case PhysicalType::INT8: |
290 | return TemplatedFetchCommittedRange<int8_t>; |
291 | case PhysicalType::INT16: |
292 | return TemplatedFetchCommittedRange<int16_t>; |
293 | case PhysicalType::INT32: |
294 | return TemplatedFetchCommittedRange<int32_t>; |
295 | case PhysicalType::INT64: |
296 | return TemplatedFetchCommittedRange<int64_t>; |
297 | case PhysicalType::UINT8: |
298 | return TemplatedFetchCommittedRange<uint8_t>; |
299 | case PhysicalType::UINT16: |
300 | return TemplatedFetchCommittedRange<uint16_t>; |
301 | case PhysicalType::UINT32: |
302 | return TemplatedFetchCommittedRange<uint32_t>; |
303 | case PhysicalType::UINT64: |
304 | return TemplatedFetchCommittedRange<uint64_t>; |
305 | case PhysicalType::INT128: |
306 | return TemplatedFetchCommittedRange<hugeint_t>; |
307 | case PhysicalType::FLOAT: |
308 | return TemplatedFetchCommittedRange<float>; |
309 | case PhysicalType::DOUBLE: |
310 | return TemplatedFetchCommittedRange<double>; |
311 | case PhysicalType::INTERVAL: |
312 | return TemplatedFetchCommittedRange<interval_t>; |
313 | case PhysicalType::VARCHAR: |
314 | return TemplatedFetchCommittedRange<string_t>; |
315 | default: |
316 | throw NotImplementedException("Unimplemented type for update segment" ); |
317 | } |
318 | } |
319 | |
320 | void UpdateSegment::FetchCommittedRange(idx_t start_row, idx_t count, Vector &result) { |
321 | D_ASSERT(count > 0); |
322 | if (!root) { |
323 | return; |
324 | } |
325 | D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR); |
326 | |
327 | idx_t end_row = start_row + count; |
328 | idx_t start_vector = start_row / STANDARD_VECTOR_SIZE; |
329 | idx_t end_vector = (end_row - 1) / STANDARD_VECTOR_SIZE; |
330 | D_ASSERT(start_vector <= end_vector); |
331 | D_ASSERT(end_vector < RowGroup::ROW_GROUP_VECTOR_COUNT); |
332 | |
333 | for (idx_t vector_idx = start_vector; vector_idx <= end_vector; vector_idx++) { |
334 | if (!root->info[vector_idx]) { |
335 | continue; |
336 | } |
337 | idx_t start_in_vector = vector_idx == start_vector ? start_row - start_vector * STANDARD_VECTOR_SIZE : 0; |
338 | idx_t end_in_vector = |
339 | vector_idx == end_vector ? end_row - end_vector * STANDARD_VECTOR_SIZE : STANDARD_VECTOR_SIZE; |
340 | D_ASSERT(start_in_vector < end_in_vector); |
341 | D_ASSERT(end_in_vector > 0 && end_in_vector <= STANDARD_VECTOR_SIZE); |
342 | idx_t result_offset = ((vector_idx * STANDARD_VECTOR_SIZE) + start_in_vector) - start_row; |
343 | fetch_committed_range(root->info[vector_idx]->info.get(), start_in_vector, end_in_vector, result_offset, |
344 | result); |
345 | } |
346 | } |
347 | |
348 | //===--------------------------------------------------------------------===// |
349 | // Fetch Row |
350 | //===--------------------------------------------------------------------===// |
351 | static void FetchRowValidity(transaction_t start_time, transaction_t transaction_id, UpdateInfo *info, idx_t row_idx, |
352 | Vector &result, idx_t result_idx) { |
353 | auto &result_mask = FlatVector::Validity(vector&: result); |
354 | UpdateInfo::UpdatesForTransaction(current: info, start_time, transaction_id, callback: [&](UpdateInfo *current) { |
355 | auto info_data = reinterpret_cast<bool *>(current->tuple_data); |
356 | // FIXME: we could do a binary search in here |
357 | for (idx_t i = 0; i < current->N; i++) { |
358 | if (current->tuples[i] == row_idx) { |
359 | result_mask.Set(row_idx: result_idx, valid: info_data[i]); |
360 | break; |
361 | } else if (current->tuples[i] > row_idx) { |
362 | break; |
363 | } |
364 | } |
365 | }); |
366 | } |
367 | |
368 | template <class T> |
369 | static void TemplatedFetchRow(transaction_t start_time, transaction_t transaction_id, UpdateInfo *info, idx_t row_idx, |
370 | Vector &result, idx_t result_idx) { |
371 | auto result_data = FlatVector::GetData<T>(result); |
372 | UpdateInfo::UpdatesForTransaction(info, start_time, transaction_id, [&](UpdateInfo *current) { |
373 | auto info_data = (T *)current->tuple_data; |
374 | // FIXME: we could do a binary search in here |
375 | for (idx_t i = 0; i < current->N; i++) { |
376 | if (current->tuples[i] == row_idx) { |
377 | result_data[result_idx] = info_data[i]; |
378 | break; |
379 | } else if (current->tuples[i] > row_idx) { |
380 | break; |
381 | } |
382 | } |
383 | }); |
384 | } |
385 | |
386 | static UpdateSegment::fetch_row_function_t GetFetchRowFunction(PhysicalType type) { |
387 | switch (type) { |
388 | case PhysicalType::BIT: |
389 | return FetchRowValidity; |
390 | case PhysicalType::BOOL: |
391 | case PhysicalType::INT8: |
392 | return TemplatedFetchRow<int8_t>; |
393 | case PhysicalType::INT16: |
394 | return TemplatedFetchRow<int16_t>; |
395 | case PhysicalType::INT32: |
396 | return TemplatedFetchRow<int32_t>; |
397 | case PhysicalType::INT64: |
398 | return TemplatedFetchRow<int64_t>; |
399 | case PhysicalType::UINT8: |
400 | return TemplatedFetchRow<uint8_t>; |
401 | case PhysicalType::UINT16: |
402 | return TemplatedFetchRow<uint16_t>; |
403 | case PhysicalType::UINT32: |
404 | return TemplatedFetchRow<uint32_t>; |
405 | case PhysicalType::UINT64: |
406 | return TemplatedFetchRow<uint64_t>; |
407 | case PhysicalType::INT128: |
408 | return TemplatedFetchRow<hugeint_t>; |
409 | case PhysicalType::FLOAT: |
410 | return TemplatedFetchRow<float>; |
411 | case PhysicalType::DOUBLE: |
412 | return TemplatedFetchRow<double>; |
413 | case PhysicalType::INTERVAL: |
414 | return TemplatedFetchRow<interval_t>; |
415 | case PhysicalType::VARCHAR: |
416 | return TemplatedFetchRow<string_t>; |
417 | default: |
418 | throw NotImplementedException("Unimplemented type for update segment fetch row" ); |
419 | } |
420 | } |
421 | |
422 | void UpdateSegment::FetchRow(TransactionData transaction, idx_t row_id, Vector &result, idx_t result_idx) { |
423 | if (!root) { |
424 | return; |
425 | } |
426 | idx_t vector_index = (row_id - column_data.start) / STANDARD_VECTOR_SIZE; |
427 | if (!root->info[vector_index]) { |
428 | return; |
429 | } |
430 | idx_t row_in_vector = row_id - vector_index * STANDARD_VECTOR_SIZE; |
431 | fetch_row_function(transaction.start_time, transaction.transaction_id, root->info[vector_index]->info.get(), |
432 | row_in_vector, result, result_idx); |
433 | } |
434 | |
435 | //===--------------------------------------------------------------------===// |
436 | // Rollback update |
437 | //===--------------------------------------------------------------------===// |
438 | template <class T> |
439 | static void RollbackUpdate(UpdateInfo &base_info, UpdateInfo &rollback_info) { |
440 | auto base_data = (T *)base_info.tuple_data; |
441 | auto rollback_data = (T *)rollback_info.tuple_data; |
442 | idx_t base_offset = 0; |
443 | for (idx_t i = 0; i < rollback_info.N; i++) { |
444 | auto id = rollback_info.tuples[i]; |
445 | while (base_info.tuples[base_offset] < id) { |
446 | base_offset++; |
447 | D_ASSERT(base_offset < base_info.N); |
448 | } |
449 | base_data[base_offset] = rollback_data[i]; |
450 | } |
451 | } |
452 | |
453 | static UpdateSegment::rollback_update_function_t GetRollbackUpdateFunction(PhysicalType type) { |
454 | switch (type) { |
455 | case PhysicalType::BIT: |
456 | return RollbackUpdate<bool>; |
457 | case PhysicalType::BOOL: |
458 | case PhysicalType::INT8: |
459 | return RollbackUpdate<int8_t>; |
460 | case PhysicalType::INT16: |
461 | return RollbackUpdate<int16_t>; |
462 | case PhysicalType::INT32: |
463 | return RollbackUpdate<int32_t>; |
464 | case PhysicalType::INT64: |
465 | return RollbackUpdate<int64_t>; |
466 | case PhysicalType::UINT8: |
467 | return RollbackUpdate<uint8_t>; |
468 | case PhysicalType::UINT16: |
469 | return RollbackUpdate<uint16_t>; |
470 | case PhysicalType::UINT32: |
471 | return RollbackUpdate<uint32_t>; |
472 | case PhysicalType::UINT64: |
473 | return RollbackUpdate<uint64_t>; |
474 | case PhysicalType::INT128: |
475 | return RollbackUpdate<hugeint_t>; |
476 | case PhysicalType::FLOAT: |
477 | return RollbackUpdate<float>; |
478 | case PhysicalType::DOUBLE: |
479 | return RollbackUpdate<double>; |
480 | case PhysicalType::INTERVAL: |
481 | return RollbackUpdate<interval_t>; |
482 | case PhysicalType::VARCHAR: |
483 | return RollbackUpdate<string_t>; |
484 | default: |
485 | throw NotImplementedException("Unimplemented type for uncompressed segment" ); |
486 | } |
487 | } |
488 | |
489 | void UpdateSegment::RollbackUpdate(UpdateInfo &info) { |
490 | // obtain an exclusive lock |
491 | auto lock_handle = lock.GetExclusiveLock(); |
492 | |
493 | // move the data from the UpdateInfo back into the base info |
494 | D_ASSERT(root->info[info.vector_index]); |
495 | rollback_update_function(*root->info[info.vector_index]->info, info); |
496 | |
497 | // clean up the update chain |
498 | CleanupUpdateInternal(lock: *lock_handle, info); |
499 | } |
500 | |
501 | //===--------------------------------------------------------------------===// |
502 | // Cleanup Update |
503 | //===--------------------------------------------------------------------===// |
504 | void UpdateSegment::CleanupUpdateInternal(const StorageLockKey &lock, UpdateInfo &info) { |
505 | D_ASSERT(info.prev); |
506 | auto prev = info.prev; |
507 | prev->next = info.next; |
508 | if (prev->next) { |
509 | prev->next->prev = prev; |
510 | } |
511 | } |
512 | |
513 | void UpdateSegment::CleanupUpdate(UpdateInfo &info) { |
514 | // obtain an exclusive lock |
515 | auto lock_handle = lock.GetExclusiveLock(); |
516 | CleanupUpdateInternal(lock: *lock_handle, info); |
517 | } |
518 | |
519 | //===--------------------------------------------------------------------===// |
520 | // Check for conflicts in update |
521 | //===--------------------------------------------------------------------===// |
522 | static void CheckForConflicts(UpdateInfo *info, TransactionData transaction, row_t *ids, const SelectionVector &sel, |
523 | idx_t count, row_t offset, UpdateInfo *&node) { |
524 | if (!info) { |
525 | return; |
526 | } |
527 | if (info->version_number == transaction.transaction_id) { |
528 | // this UpdateInfo belongs to the current transaction, set it in the node |
529 | node = info; |
530 | } else if (info->version_number > transaction.start_time) { |
531 | // potential conflict, check that tuple ids do not conflict |
532 | // as both ids and info->tuples are sorted, this is similar to a merge join |
533 | idx_t i = 0, j = 0; |
534 | while (true) { |
535 | auto id = ids[sel.get_index(idx: i)] - offset; |
536 | if (id == info->tuples[j]) { |
537 | throw TransactionException("Conflict on update!" ); |
538 | } else if (id < info->tuples[j]) { |
539 | // id < the current tuple in info, move to next id |
540 | i++; |
541 | if (i == count) { |
542 | break; |
543 | } |
544 | } else { |
545 | // id > the current tuple, move to next tuple in info |
546 | j++; |
547 | if (j == info->N) { |
548 | break; |
549 | } |
550 | } |
551 | } |
552 | } |
553 | CheckForConflicts(info: info->next, transaction, ids, sel, count, offset, node); |
554 | } |
555 | |
556 | //===--------------------------------------------------------------------===// |
557 | // Initialize update info |
558 | //===--------------------------------------------------------------------===// |
559 | void UpdateSegment::InitializeUpdateInfo(UpdateInfo &info, row_t *ids, const SelectionVector &sel, idx_t count, |
560 | idx_t vector_index, idx_t vector_offset) { |
561 | info.segment = this; |
562 | info.vector_index = vector_index; |
563 | info.prev = nullptr; |
564 | info.next = nullptr; |
565 | |
566 | // set up the tuple ids |
567 | info.N = count; |
568 | for (idx_t i = 0; i < count; i++) { |
569 | auto idx = sel.get_index(idx: i); |
570 | auto id = ids[idx]; |
571 | D_ASSERT(idx_t(id) >= vector_offset && idx_t(id) < vector_offset + STANDARD_VECTOR_SIZE); |
572 | info.tuples[i] = id - vector_offset; |
573 | }; |
574 | } |
575 | |
576 | static void InitializeUpdateValidity(UpdateInfo *base_info, Vector &base_data, UpdateInfo *update_info, Vector &update, |
577 | const SelectionVector &sel) { |
578 | auto &update_mask = FlatVector::Validity(vector&: update); |
579 | auto tuple_data = reinterpret_cast<bool *>(update_info->tuple_data); |
580 | |
581 | if (!update_mask.AllValid()) { |
582 | for (idx_t i = 0; i < update_info->N; i++) { |
583 | auto idx = sel.get_index(idx: i); |
584 | tuple_data[i] = update_mask.RowIsValidUnsafe(row_idx: idx); |
585 | } |
586 | } else { |
587 | for (idx_t i = 0; i < update_info->N; i++) { |
588 | tuple_data[i] = true; |
589 | } |
590 | } |
591 | |
592 | auto &base_mask = FlatVector::Validity(vector&: base_data); |
593 | auto base_tuple_data = reinterpret_cast<bool *>(base_info->tuple_data); |
594 | if (!base_mask.AllValid()) { |
595 | for (idx_t i = 0; i < base_info->N; i++) { |
596 | base_tuple_data[i] = base_mask.RowIsValidUnsafe(row_idx: base_info->tuples[i]); |
597 | } |
598 | } else { |
599 | for (idx_t i = 0; i < base_info->N; i++) { |
600 | base_tuple_data[i] = true; |
601 | } |
602 | } |
603 | } |
604 | |
605 | struct UpdateSelectElement { |
606 | template <class T> |
607 | static T Operation(UpdateSegment *segment, T element) { |
608 | return element; |
609 | } |
610 | }; |
611 | |
612 | template <> |
613 | string_t UpdateSelectElement::Operation(UpdateSegment *segment, string_t element) { |
614 | return element.IsInlined() ? element : segment->GetStringHeap().AddBlob(data: element); |
615 | } |
616 | |
617 | template <class T> |
618 | static void InitializeUpdateData(UpdateInfo *base_info, Vector &base_data, UpdateInfo *update_info, Vector &update, |
619 | const SelectionVector &sel) { |
620 | auto update_data = FlatVector::GetData<T>(update); |
621 | auto tuple_data = (T *)update_info->tuple_data; |
622 | |
623 | for (idx_t i = 0; i < update_info->N; i++) { |
624 | auto idx = sel.get_index(idx: i); |
625 | tuple_data[i] = update_data[idx]; |
626 | } |
627 | |
628 | auto base_array_data = FlatVector::GetData<T>(base_data); |
629 | auto &base_validity = FlatVector::Validity(vector&: base_data); |
630 | auto base_tuple_data = (T *)base_info->tuple_data; |
631 | for (idx_t i = 0; i < base_info->N; i++) { |
632 | auto base_idx = base_info->tuples[i]; |
633 | if (!base_validity.RowIsValid(row_idx: base_idx)) { |
634 | continue; |
635 | } |
636 | base_tuple_data[i] = UpdateSelectElement::Operation<T>(base_info->segment, base_array_data[base_idx]); |
637 | } |
638 | } |
639 | |
640 | static UpdateSegment::initialize_update_function_t GetInitializeUpdateFunction(PhysicalType type) { |
641 | switch (type) { |
642 | case PhysicalType::BIT: |
643 | return InitializeUpdateValidity; |
644 | case PhysicalType::BOOL: |
645 | case PhysicalType::INT8: |
646 | return InitializeUpdateData<int8_t>; |
647 | case PhysicalType::INT16: |
648 | return InitializeUpdateData<int16_t>; |
649 | case PhysicalType::INT32: |
650 | return InitializeUpdateData<int32_t>; |
651 | case PhysicalType::INT64: |
652 | return InitializeUpdateData<int64_t>; |
653 | case PhysicalType::UINT8: |
654 | return InitializeUpdateData<uint8_t>; |
655 | case PhysicalType::UINT16: |
656 | return InitializeUpdateData<uint16_t>; |
657 | case PhysicalType::UINT32: |
658 | return InitializeUpdateData<uint32_t>; |
659 | case PhysicalType::UINT64: |
660 | return InitializeUpdateData<uint64_t>; |
661 | case PhysicalType::INT128: |
662 | return InitializeUpdateData<hugeint_t>; |
663 | case PhysicalType::FLOAT: |
664 | return InitializeUpdateData<float>; |
665 | case PhysicalType::DOUBLE: |
666 | return InitializeUpdateData<double>; |
667 | case PhysicalType::INTERVAL: |
668 | return InitializeUpdateData<interval_t>; |
669 | case PhysicalType::VARCHAR: |
670 | return InitializeUpdateData<string_t>; |
671 | default: |
672 | throw NotImplementedException("Unimplemented type for update segment" ); |
673 | } |
674 | } |
675 | |
676 | //===--------------------------------------------------------------------===// |
677 | // Merge update info |
678 | //===--------------------------------------------------------------------===// |
679 | template <class F1, class F2, class F3> |
680 | static idx_t MergeLoop(row_t a[], sel_t b[], idx_t acount, idx_t bcount, idx_t aoffset, F1 merge, F2 pick_a, F3 pick_b, |
681 | const SelectionVector &asel) { |
682 | idx_t aidx = 0, bidx = 0; |
683 | idx_t count = 0; |
684 | while (aidx < acount && bidx < bcount) { |
685 | auto a_index = asel.get_index(idx: aidx); |
686 | auto a_id = a[a_index] - aoffset; |
687 | auto b_id = b[bidx]; |
688 | if (a_id == b_id) { |
689 | merge(a_id, a_index, bidx, count); |
690 | aidx++; |
691 | bidx++; |
692 | count++; |
693 | } else if (a_id < b_id) { |
694 | pick_a(a_id, a_index, count); |
695 | aidx++; |
696 | count++; |
697 | } else { |
698 | pick_b(b_id, bidx, count); |
699 | bidx++; |
700 | count++; |
701 | } |
702 | } |
703 | for (; aidx < acount; aidx++) { |
704 | auto a_index = asel.get_index(idx: aidx); |
705 | pick_a(a[a_index] - aoffset, a_index, count); |
706 | count++; |
707 | } |
708 | for (; bidx < bcount; bidx++) { |
709 | pick_b(b[bidx], bidx, count); |
710 | count++; |
711 | } |
712 | return count; |
713 | } |
714 | |
715 | struct ExtractStandardEntry { |
716 | template <class T, class V> |
717 | static T Extract(V *data, idx_t entry) { |
718 | return data[entry]; |
719 | } |
720 | }; |
721 | |
722 | struct { |
723 | template <class T, class V> |
724 | static T (V *data, idx_t entry) { |
725 | return data->RowIsValid(entry); |
726 | } |
727 | }; |
728 | |
729 | template <class T, class V, class OP = ExtractStandardEntry> |
730 | static void MergeUpdateLoopInternal(UpdateInfo *base_info, V *base_table_data, UpdateInfo *update_info, |
731 | V *update_vector_data, row_t *ids, idx_t count, const SelectionVector &sel) { |
732 | auto base_id = base_info->segment->column_data.start + base_info->vector_index * STANDARD_VECTOR_SIZE; |
733 | #ifdef DEBUG |
734 | // all of these should be sorted, otherwise the below algorithm does not work |
735 | for (idx_t i = 1; i < count; i++) { |
736 | auto prev_idx = sel.get_index(i - 1); |
737 | auto idx = sel.get_index(i); |
738 | D_ASSERT(ids[idx] > ids[prev_idx] && ids[idx] >= row_t(base_id) && |
739 | ids[idx] < row_t(base_id + STANDARD_VECTOR_SIZE)); |
740 | } |
741 | #endif |
742 | |
743 | // we have a new batch of updates (update, ids, count) |
744 | // we already have existing updates (base_info) |
745 | // and potentially, this transaction already has updates present (update_info) |
746 | // we need to merge these all together so that the latest updates get merged into base_info |
747 | // and the "old" values (fetched from EITHER base_info OR from base_data) get placed into update_info |
748 | auto base_info_data = (T *)base_info->tuple_data; |
749 | auto update_info_data = (T *)update_info->tuple_data; |
750 | |
751 | // we first do the merging of the old values |
752 | // what we are trying to do here is update the "update_info" of this transaction with all the old data we require |
753 | // this means we need to merge (1) any previously updated values (stored in update_info->tuples) |
754 | // together with (2) |
755 | // to simplify this, we create new arrays here |
756 | // we memcpy these over afterwards |
757 | T result_values[STANDARD_VECTOR_SIZE]; |
758 | sel_t result_ids[STANDARD_VECTOR_SIZE]; |
759 | |
760 | idx_t base_info_offset = 0; |
761 | idx_t update_info_offset = 0; |
762 | idx_t result_offset = 0; |
763 | for (idx_t i = 0; i < count; i++) { |
764 | auto idx = sel.get_index(idx: i); |
765 | // we have to merge the info for "ids[i]" |
766 | auto update_id = ids[idx] - base_id; |
767 | |
768 | while (update_info_offset < update_info->N && update_info->tuples[update_info_offset] < update_id) { |
769 | // old id comes before the current id: write it |
770 | result_values[result_offset] = update_info_data[update_info_offset]; |
771 | result_ids[result_offset++] = update_info->tuples[update_info_offset]; |
772 | update_info_offset++; |
773 | } |
774 | // write the new id |
775 | if (update_info_offset < update_info->N && update_info->tuples[update_info_offset] == update_id) { |
776 | // we have an id that is equivalent in the current update info: write the update info |
777 | result_values[result_offset] = update_info_data[update_info_offset]; |
778 | result_ids[result_offset++] = update_info->tuples[update_info_offset]; |
779 | update_info_offset++; |
780 | continue; |
781 | } |
782 | |
783 | /// now check if we have the current update_id in the base_info, or if we should fetch it from the base data |
784 | while (base_info_offset < base_info->N && base_info->tuples[base_info_offset] < update_id) { |
785 | base_info_offset++; |
786 | } |
787 | if (base_info_offset < base_info->N && base_info->tuples[base_info_offset] == update_id) { |
788 | // it is! we have to move the tuple from base_info->ids[base_info_offset] to update_info |
789 | result_values[result_offset] = base_info_data[base_info_offset]; |
790 | } else { |
791 | // it is not! we have to move base_table_data[update_id] to update_info |
792 | result_values[result_offset] = UpdateSelectElement::Operation<T>( |
793 | base_info->segment, OP::template Extract<T, V>(base_table_data, update_id)); |
794 | } |
795 | result_ids[result_offset++] = update_id; |
796 | } |
797 | // write any remaining entries from the old updates |
798 | while (update_info_offset < update_info->N) { |
799 | result_values[result_offset] = update_info_data[update_info_offset]; |
800 | result_ids[result_offset++] = update_info->tuples[update_info_offset]; |
801 | update_info_offset++; |
802 | } |
803 | // now copy them back |
804 | update_info->N = result_offset; |
805 | memcpy(update_info_data, result_values, result_offset * sizeof(T)); |
806 | memcpy(dest: update_info->tuples, src: result_ids, n: result_offset * sizeof(sel_t)); |
807 | |
808 | // now we merge the new values into the base_info |
809 | result_offset = 0; |
810 | auto pick_new = [&](idx_t id, idx_t aidx, idx_t count) { |
811 | result_values[result_offset] = OP::template Extract<T, V>(update_vector_data, aidx); |
812 | result_ids[result_offset] = id; |
813 | result_offset++; |
814 | }; |
815 | auto pick_old = [&](idx_t id, idx_t bidx, idx_t count) { |
816 | result_values[result_offset] = base_info_data[bidx]; |
817 | result_ids[result_offset] = id; |
818 | result_offset++; |
819 | }; |
820 | // now we perform a merge of the new ids with the old ids |
821 | auto merge = [&](idx_t id, idx_t aidx, idx_t bidx, idx_t count) { |
822 | pick_new(id, aidx, count); |
823 | }; |
824 | MergeLoop(ids, base_info->tuples, count, base_info->N, base_id, merge, pick_new, pick_old, sel); |
825 | |
826 | base_info->N = result_offset; |
827 | memcpy(base_info_data, result_values, result_offset * sizeof(T)); |
828 | memcpy(dest: base_info->tuples, src: result_ids, n: result_offset * sizeof(sel_t)); |
829 | } |
830 | |
831 | static void MergeValidityLoop(UpdateInfo *base_info, Vector &base_data, UpdateInfo *update_info, Vector &update, |
832 | row_t *ids, idx_t count, const SelectionVector &sel) { |
833 | auto &base_validity = FlatVector::Validity(vector&: base_data); |
834 | auto &update_validity = FlatVector::Validity(vector&: update); |
835 | MergeUpdateLoopInternal<bool, ValidityMask, ExtractValidityEntry>(base_info, base_table_data: &base_validity, update_info, |
836 | update_vector_data: &update_validity, ids, count, sel); |
837 | } |
838 | |
839 | template <class T> |
840 | static void MergeUpdateLoop(UpdateInfo *base_info, Vector &base_data, UpdateInfo *update_info, Vector &update, |
841 | row_t *ids, idx_t count, const SelectionVector &sel) { |
842 | auto base_table_data = FlatVector::GetData<T>(base_data); |
843 | auto update_vector_data = FlatVector::GetData<T>(update); |
844 | MergeUpdateLoopInternal<T, T>(base_info, base_table_data, update_info, update_vector_data, ids, count, sel); |
845 | } |
846 | |
847 | static UpdateSegment::merge_update_function_t GetMergeUpdateFunction(PhysicalType type) { |
848 | switch (type) { |
849 | case PhysicalType::BIT: |
850 | return MergeValidityLoop; |
851 | case PhysicalType::BOOL: |
852 | case PhysicalType::INT8: |
853 | return MergeUpdateLoop<int8_t>; |
854 | case PhysicalType::INT16: |
855 | return MergeUpdateLoop<int16_t>; |
856 | case PhysicalType::INT32: |
857 | return MergeUpdateLoop<int32_t>; |
858 | case PhysicalType::INT64: |
859 | return MergeUpdateLoop<int64_t>; |
860 | case PhysicalType::UINT8: |
861 | return MergeUpdateLoop<uint8_t>; |
862 | case PhysicalType::UINT16: |
863 | return MergeUpdateLoop<uint16_t>; |
864 | case PhysicalType::UINT32: |
865 | return MergeUpdateLoop<uint32_t>; |
866 | case PhysicalType::UINT64: |
867 | return MergeUpdateLoop<uint64_t>; |
868 | case PhysicalType::INT128: |
869 | return MergeUpdateLoop<hugeint_t>; |
870 | case PhysicalType::FLOAT: |
871 | return MergeUpdateLoop<float>; |
872 | case PhysicalType::DOUBLE: |
873 | return MergeUpdateLoop<double>; |
874 | case PhysicalType::INTERVAL: |
875 | return MergeUpdateLoop<interval_t>; |
876 | case PhysicalType::VARCHAR: |
877 | return MergeUpdateLoop<string_t>; |
878 | default: |
879 | throw NotImplementedException("Unimplemented type for uncompressed segment" ); |
880 | } |
881 | } |
882 | |
883 | //===--------------------------------------------------------------------===// |
884 | // Update statistics |
885 | //===--------------------------------------------------------------------===// |
886 | unique_ptr<BaseStatistics> UpdateSegment::GetStatistics() { |
887 | lock_guard<mutex> stats_guard(stats_lock); |
888 | return stats.statistics.ToUnique(); |
889 | } |
890 | |
891 | idx_t UpdateValidityStatistics(UpdateSegment *segment, SegmentStatistics &stats, Vector &update, idx_t count, |
892 | SelectionVector &sel) { |
893 | auto &mask = FlatVector::Validity(vector&: update); |
894 | auto &validity = stats.statistics; |
895 | if (!mask.AllValid() && !validity.CanHaveNull()) { |
896 | for (idx_t i = 0; i < count; i++) { |
897 | if (!mask.RowIsValid(row_idx: i)) { |
898 | validity.SetHasNull(); |
899 | break; |
900 | } |
901 | } |
902 | } |
903 | sel.Initialize(sel: nullptr); |
904 | return count; |
905 | } |
906 | |
907 | template <class T> |
908 | idx_t TemplatedUpdateNumericStatistics(UpdateSegment *segment, SegmentStatistics &stats, Vector &update, idx_t count, |
909 | SelectionVector &sel) { |
910 | auto update_data = FlatVector::GetData<T>(update); |
911 | auto &mask = FlatVector::Validity(vector&: update); |
912 | |
913 | if (mask.AllValid()) { |
914 | for (idx_t i = 0; i < count; i++) { |
915 | NumericStats::Update<T>(stats.statistics, update_data[i]); |
916 | } |
917 | sel.Initialize(sel: nullptr); |
918 | return count; |
919 | } else { |
920 | idx_t not_null_count = 0; |
921 | sel.Initialize(STANDARD_VECTOR_SIZE); |
922 | for (idx_t i = 0; i < count; i++) { |
923 | if (mask.RowIsValid(row_idx: i)) { |
924 | sel.set_index(idx: not_null_count++, loc: i); |
925 | NumericStats::Update<T>(stats.statistics, update_data[i]); |
926 | } |
927 | } |
928 | return not_null_count; |
929 | } |
930 | } |
931 | |
932 | idx_t UpdateStringStatistics(UpdateSegment *segment, SegmentStatistics &stats, Vector &update, idx_t count, |
933 | SelectionVector &sel) { |
934 | auto update_data = FlatVector::GetData<string_t>(vector&: update); |
935 | auto &mask = FlatVector::Validity(vector&: update); |
936 | if (mask.AllValid()) { |
937 | for (idx_t i = 0; i < count; i++) { |
938 | StringStats::Update(stats&: stats.statistics, value: update_data[i]); |
939 | if (!update_data[i].IsInlined()) { |
940 | update_data[i] = segment->GetStringHeap().AddBlob(data: update_data[i]); |
941 | } |
942 | } |
943 | sel.Initialize(sel: nullptr); |
944 | return count; |
945 | } else { |
946 | idx_t not_null_count = 0; |
947 | sel.Initialize(STANDARD_VECTOR_SIZE); |
948 | for (idx_t i = 0; i < count; i++) { |
949 | if (mask.RowIsValid(row_idx: i)) { |
950 | sel.set_index(idx: not_null_count++, loc: i); |
951 | StringStats::Update(stats&: stats.statistics, value: update_data[i]); |
952 | if (!update_data[i].IsInlined()) { |
953 | update_data[i] = segment->GetStringHeap().AddBlob(data: update_data[i]); |
954 | } |
955 | } |
956 | } |
957 | return not_null_count; |
958 | } |
959 | } |
960 | |
961 | UpdateSegment::statistics_update_function_t GetStatisticsUpdateFunction(PhysicalType type) { |
962 | switch (type) { |
963 | case PhysicalType::BIT: |
964 | return UpdateValidityStatistics; |
965 | case PhysicalType::BOOL: |
966 | case PhysicalType::INT8: |
967 | return TemplatedUpdateNumericStatistics<int8_t>; |
968 | case PhysicalType::INT16: |
969 | return TemplatedUpdateNumericStatistics<int16_t>; |
970 | case PhysicalType::INT32: |
971 | return TemplatedUpdateNumericStatistics<int32_t>; |
972 | case PhysicalType::INT64: |
973 | return TemplatedUpdateNumericStatistics<int64_t>; |
974 | case PhysicalType::UINT8: |
975 | return TemplatedUpdateNumericStatistics<uint8_t>; |
976 | case PhysicalType::UINT16: |
977 | return TemplatedUpdateNumericStatistics<uint16_t>; |
978 | case PhysicalType::UINT32: |
979 | return TemplatedUpdateNumericStatistics<uint32_t>; |
980 | case PhysicalType::UINT64: |
981 | return TemplatedUpdateNumericStatistics<uint64_t>; |
982 | case PhysicalType::INT128: |
983 | return TemplatedUpdateNumericStatistics<hugeint_t>; |
984 | case PhysicalType::FLOAT: |
985 | return TemplatedUpdateNumericStatistics<float>; |
986 | case PhysicalType::DOUBLE: |
987 | return TemplatedUpdateNumericStatistics<double>; |
988 | case PhysicalType::INTERVAL: |
989 | return TemplatedUpdateNumericStatistics<interval_t>; |
990 | case PhysicalType::VARCHAR: |
991 | return UpdateStringStatistics; |
992 | default: |
993 | throw NotImplementedException("Unimplemented type for uncompressed segment" ); |
994 | } |
995 | } |
996 | |
997 | //===--------------------------------------------------------------------===// |
998 | // Update |
999 | //===--------------------------------------------------------------------===// |
1000 | static idx_t SortSelectionVector(SelectionVector &sel, idx_t count, row_t *ids) { |
1001 | D_ASSERT(count > 0); |
1002 | |
1003 | bool is_sorted = true; |
1004 | for (idx_t i = 1; i < count; i++) { |
1005 | auto prev_idx = sel.get_index(idx: i - 1); |
1006 | auto idx = sel.get_index(idx: i); |
1007 | if (ids[idx] <= ids[prev_idx]) { |
1008 | is_sorted = false; |
1009 | break; |
1010 | } |
1011 | } |
1012 | if (is_sorted) { |
1013 | // already sorted: bailout |
1014 | return count; |
1015 | } |
1016 | // not sorted: need to sort the selection vector |
1017 | SelectionVector sorted_sel(count); |
1018 | for (idx_t i = 0; i < count; i++) { |
1019 | sorted_sel.set_index(idx: i, loc: sel.get_index(idx: i)); |
1020 | } |
1021 | std::sort(first: sorted_sel.data(), last: sorted_sel.data() + count, comp: [&](sel_t l, sel_t r) { return ids[l] < ids[r]; }); |
1022 | // eliminate any duplicates |
1023 | idx_t pos = 1; |
1024 | for (idx_t i = 1; i < count; i++) { |
1025 | auto prev_idx = sorted_sel.get_index(idx: i - 1); |
1026 | auto idx = sorted_sel.get_index(idx: i); |
1027 | D_ASSERT(ids[idx] >= ids[prev_idx]); |
1028 | if (ids[prev_idx] != ids[idx]) { |
1029 | sorted_sel.set_index(idx: pos++, loc: idx); |
1030 | } |
1031 | } |
1032 | #ifdef DEBUG |
1033 | for (idx_t i = 1; i < pos; i++) { |
1034 | auto prev_idx = sorted_sel.get_index(i - 1); |
1035 | auto idx = sorted_sel.get_index(i); |
1036 | D_ASSERT(ids[idx] > ids[prev_idx]); |
1037 | } |
1038 | #endif |
1039 | |
1040 | sel.Initialize(other: sorted_sel); |
1041 | D_ASSERT(pos > 0); |
1042 | return pos; |
1043 | } |
1044 | |
1045 | UpdateInfo *CreateEmptyUpdateInfo(TransactionData transaction, idx_t type_size, idx_t count, |
1046 | unsafe_unique_array<char> &data) { |
1047 | data = make_unsafe_uniq_array<char>(n: sizeof(UpdateInfo) + (sizeof(sel_t) + type_size) * STANDARD_VECTOR_SIZE); |
1048 | auto update_info = reinterpret_cast<UpdateInfo *>(data.get()); |
1049 | update_info->max = STANDARD_VECTOR_SIZE; |
1050 | update_info->tuples = reinterpret_cast<sel_t *>((data_ptr_cast(src: update_info)) + sizeof(UpdateInfo)); |
1051 | update_info->tuple_data = (data_ptr_cast(src: update_info)) + sizeof(UpdateInfo) + sizeof(sel_t) * update_info->max; |
1052 | update_info->version_number = transaction.transaction_id; |
1053 | return update_info; |
1054 | } |
1055 | |
1056 | void UpdateSegment::Update(TransactionData transaction, idx_t column_index, Vector &update, row_t *ids, idx_t count, |
1057 | Vector &base_data) { |
1058 | // obtain an exclusive lock |
1059 | auto write_lock = lock.GetExclusiveLock(); |
1060 | |
1061 | update.Flatten(count); |
1062 | |
1063 | // update statistics |
1064 | SelectionVector sel; |
1065 | { |
1066 | lock_guard<mutex> stats_guard(stats_lock); |
1067 | count = statistics_update_function(this, stats, update, count, sel); |
1068 | } |
1069 | if (count == 0) { |
1070 | return; |
1071 | } |
1072 | |
1073 | // subsequent algorithms used by the update require row ids to be (1) sorted, and (2) unique |
1074 | // this is usually the case for "standard" queries (e.g. UPDATE tbl SET x=bla WHERE cond) |
1075 | // however, for more exotic queries involving e.g. cross products/joins this might not be the case |
1076 | // hence we explicitly check here if the ids are sorted and, if not, sort + duplicate eliminate them |
1077 | count = SortSelectionVector(sel, count, ids); |
1078 | D_ASSERT(count > 0); |
1079 | |
1080 | // create the versions for this segment, if there are none yet |
1081 | if (!root) { |
1082 | root = make_uniq<UpdateNode>(); |
1083 | } |
1084 | |
1085 | // get the vector index based on the first id |
1086 | // we assert that all updates must be part of the same vector |
1087 | auto first_id = ids[sel.get_index(idx: 0)]; |
1088 | idx_t vector_index = (first_id - column_data.start) / STANDARD_VECTOR_SIZE; |
1089 | idx_t vector_offset = column_data.start + vector_index * STANDARD_VECTOR_SIZE; |
1090 | |
1091 | D_ASSERT(idx_t(first_id) >= column_data.start); |
1092 | D_ASSERT(vector_index < RowGroup::ROW_GROUP_VECTOR_COUNT); |
1093 | |
1094 | // first check the version chain |
1095 | UpdateInfo *node = nullptr; |
1096 | |
1097 | if (root->info[vector_index]) { |
1098 | // there is already a version here, check if there are any conflicts and search for the node that belongs to |
1099 | // this transaction in the version chain |
1100 | auto base_info = root->info[vector_index]->info.get(); |
1101 | CheckForConflicts(info: base_info->next, transaction, ids, sel, count, offset: vector_offset, node); |
1102 | |
1103 | // there are no conflicts |
1104 | // first, check if this thread has already done any updates |
1105 | auto node = base_info->next; |
1106 | while (node) { |
1107 | if (node->version_number == transaction.transaction_id) { |
1108 | // it has! use this node |
1109 | break; |
1110 | } |
1111 | node = node->next; |
1112 | } |
1113 | unsafe_unique_array<char> update_info_data; |
1114 | if (!node) { |
1115 | // no updates made yet by this transaction: initially the update info to empty |
1116 | if (transaction.transaction) { |
1117 | auto &dtransaction = transaction.transaction->Cast<DuckTransaction>(); |
1118 | node = dtransaction.CreateUpdateInfo(type_size, entries: count); |
1119 | } else { |
1120 | node = CreateEmptyUpdateInfo(transaction, type_size, count, data&: update_info_data); |
1121 | } |
1122 | node->segment = this; |
1123 | node->vector_index = vector_index; |
1124 | node->N = 0; |
1125 | node->column_index = column_index; |
1126 | |
1127 | // insert the new node into the chain |
1128 | node->next = base_info->next; |
1129 | if (node->next) { |
1130 | node->next->prev = node; |
1131 | } |
1132 | node->prev = base_info; |
1133 | base_info->next = transaction.transaction ? node : nullptr; |
1134 | } |
1135 | base_info->Verify(); |
1136 | node->Verify(); |
1137 | |
1138 | // now we are going to perform the merge |
1139 | merge_update_function(base_info, base_data, node, update, ids, count, sel); |
1140 | |
1141 | base_info->Verify(); |
1142 | node->Verify(); |
1143 | } else { |
1144 | // there is no version info yet: create the top level update info and fill it with the updates |
1145 | auto result = make_uniq<UpdateNodeData>(); |
1146 | |
1147 | result->info = make_uniq<UpdateInfo>(); |
1148 | result->tuples = make_unsafe_uniq_array<sel_t>(STANDARD_VECTOR_SIZE); |
1149 | result->tuple_data = make_unsafe_uniq_array<data_t>(STANDARD_VECTOR_SIZE * type_size); |
1150 | result->info->tuples = result->tuples.get(); |
1151 | result->info->tuple_data = result->tuple_data.get(); |
1152 | result->info->version_number = TRANSACTION_ID_START - 1; |
1153 | result->info->column_index = column_index; |
1154 | InitializeUpdateInfo(info&: *result->info, ids, sel, count, vector_index, vector_offset); |
1155 | |
1156 | // now create the transaction level update info in the undo log |
1157 | unsafe_unique_array<char> update_info_data; |
1158 | UpdateInfo *transaction_node; |
1159 | if (transaction.transaction) { |
1160 | transaction_node = transaction.transaction->CreateUpdateInfo(type_size, entries: count); |
1161 | } else { |
1162 | transaction_node = CreateEmptyUpdateInfo(transaction, type_size, count, data&: update_info_data); |
1163 | } |
1164 | |
1165 | InitializeUpdateInfo(info&: *transaction_node, ids, sel, count, vector_index, vector_offset); |
1166 | |
1167 | // we write the updates in the update node data, and write the updates in the info |
1168 | initialize_update_function(transaction_node, base_data, result->info.get(), update, sel); |
1169 | |
1170 | result->info->next = transaction.transaction ? transaction_node : nullptr; |
1171 | result->info->prev = nullptr; |
1172 | transaction_node->next = nullptr; |
1173 | transaction_node->prev = result->info.get(); |
1174 | transaction_node->column_index = column_index; |
1175 | |
1176 | transaction_node->Verify(); |
1177 | result->info->Verify(); |
1178 | |
1179 | root->info[vector_index] = std::move(result); |
1180 | } |
1181 | } |
1182 | |
1183 | bool UpdateSegment::HasUpdates() const { |
1184 | return root.get() != nullptr; |
1185 | } |
1186 | |
1187 | bool UpdateSegment::HasUpdates(idx_t vector_index) const { |
1188 | if (!HasUpdates()) { |
1189 | return false; |
1190 | } |
1191 | return root->info[vector_index].get(); |
1192 | } |
1193 | |
1194 | bool UpdateSegment::HasUncommittedUpdates(idx_t vector_index) { |
1195 | if (!HasUpdates(vector_index)) { |
1196 | return false; |
1197 | } |
1198 | auto read_lock = lock.GetSharedLock(); |
1199 | auto entry = root->info[vector_index].get(); |
1200 | if (entry->info->next) { |
1201 | return true; |
1202 | } |
1203 | return false; |
1204 | } |
1205 | |
1206 | bool UpdateSegment::HasUpdates(idx_t start_row_index, idx_t end_row_index) { |
1207 | if (!HasUpdates()) { |
1208 | return false; |
1209 | } |
1210 | auto read_lock = lock.GetSharedLock(); |
1211 | idx_t base_vector_index = start_row_index / STANDARD_VECTOR_SIZE; |
1212 | idx_t end_vector_index = end_row_index / STANDARD_VECTOR_SIZE; |
1213 | for (idx_t i = base_vector_index; i <= end_vector_index; i++) { |
1214 | if (root->info[i]) { |
1215 | return true; |
1216 | } |
1217 | } |
1218 | return false; |
1219 | } |
1220 | |
1221 | } // namespace duckdb |
1222 | |