1#include "duckdb/common/vector_operations/binary_executor.hpp"
2#include "duckdb/storage/data_table.hpp"
3#include "duckdb/common/operator/comparison_operators.hpp"
4#include "duckdb/common/vector_operations/vector_operations.hpp"
5#include "duckdb/storage/uncompressed_segment.hpp"
6#include "duckdb/common/exception.hpp"
7#include "duckdb/common/types/vector.hpp"
8#include "duckdb/transaction/update_info.hpp"
9
10using namespace duckdb;
11using namespace std;
12
13UncompressedSegment::UncompressedSegment(BufferManager &manager, TypeId type, idx_t row_start)
14 : manager(manager), type(type), block_id(INVALID_BLOCK), max_vector_count(0), tuple_count(0),
15 row_start(row_start),
16 versions(nullptr) {
17}
18
19UncompressedSegment::~UncompressedSegment() {
20 if (block_id >= MAXIMUM_BLOCK) {
21 // if the uncompressed segment had an in-memory segment, destroy it when the uncompressed segment is destroyed
22 manager.DestroyBuffer(block_id);
23 }
24}
25
26void UncompressedSegment::Verify(Transaction &transaction) {
27#ifdef DEBUG
28 ColumnScanState state;
29 InitializeScan(state);
30
31 Vector result(this->type);
32 for (idx_t i = 0; i < this->tuple_count; i += STANDARD_VECTOR_SIZE) {
33 idx_t vector_idx = i / STANDARD_VECTOR_SIZE;
34 idx_t count = std::min((idx_t)STANDARD_VECTOR_SIZE, tuple_count - i);
35 Scan(transaction, state, vector_idx, result);
36 result.Verify(count);
37 }
38#endif
39}
40
41static void CheckForConflicts(UpdateInfo *info, Transaction &transaction, row_t *ids, idx_t count, row_t offset,
42 UpdateInfo *&node) {
43 if (info->version_number == transaction.transaction_id) {
44 // this UpdateInfo belongs to the current transaction, set it in the node
45 node = info;
46 } else if (info->version_number > transaction.start_time) {
47 // potential conflict, check that tuple ids do not conflict
48 // as both ids and info->tuples are sorted, this is similar to a merge join
49 idx_t i = 0, j = 0;
50 while (true) {
51 auto id = ids[i] - offset;
52 if (id == info->tuples[j]) {
53 throw TransactionException("Conflict on update!");
54 } else if (id < info->tuples[j]) {
55 // id < the current tuple in info, move to next id
56 i++;
57 if (i == count) {
58 break;
59 }
60 } else {
61 // id > the current tuple, move to next tuple in info
62 j++;
63 if (j == info->N) {
64 break;
65 }
66 }
67 }
68 }
69 if (info->next) {
70 CheckForConflicts(info->next, transaction, ids, count, offset, node);
71 }
72}
73
74void UncompressedSegment::Update(ColumnData &column_data, SegmentStatistics &stats, Transaction &transaction,
75 Vector &update, row_t *ids, idx_t count, row_t offset) {
76 // can only perform in-place updates on temporary blocks
77 assert(block_id >= MAXIMUM_BLOCK);
78
79 // obtain an exclusive lock
80 auto write_lock = lock.GetExclusiveLock();
81
82#ifdef DEBUG
83 // verify that the ids are sorted and there are no duplicates
84 for (idx_t i = 1; i < count; i++) {
85 assert(ids[i] > ids[i - 1]);
86 }
87#endif
88
89 // create the versions for this segment, if there are none yet
90 if (!versions) {
91 this->versions = unique_ptr<UpdateInfo *[]>(new UpdateInfo *[max_vector_count]);
92 for (idx_t i = 0; i < max_vector_count; i++) {
93 this->versions[i] = nullptr;
94 }
95 }
96
97 // get the vector index based on the first id
98 // we assert that all updates must be part of the same vector
99 auto first_id = ids[0];
100 idx_t vector_index = (first_id - offset) / STANDARD_VECTOR_SIZE;
101 idx_t vector_offset = offset + vector_index * STANDARD_VECTOR_SIZE;
102
103 assert(first_id >= offset);
104 assert(vector_index < max_vector_count);
105
106 // first check the version chain
107 UpdateInfo *node = nullptr;
108 if (versions[vector_index]) {
109 // there is already a version here, check if there are any conflicts and search for the node that belongs to
110 // this transaction in the version chain
111 CheckForConflicts(versions[vector_index], transaction, ids, count, vector_offset, node);
112 }
113 Update(column_data, stats, transaction, update, ids, count, vector_index, vector_offset, node);
114}
115
116UpdateInfo *UncompressedSegment::CreateUpdateInfo(ColumnData &column_data, Transaction &transaction, row_t *ids,
117 idx_t count, idx_t vector_index, idx_t vector_offset,
118 idx_t type_size) {
119 auto node = transaction.CreateUpdateInfo(type_size, STANDARD_VECTOR_SIZE);
120 node->column_data = &column_data;
121 node->segment = this;
122 node->vector_index = vector_index;
123 node->prev = nullptr;
124 node->next = versions[vector_index];
125 if (node->next) {
126 node->next->prev = node;
127 }
128 versions[vector_index] = node;
129
130 // set up the tuple ids
131 node->N = count;
132 for (idx_t i = 0; i < count; i++) {
133 assert((idx_t) ids[i] >= vector_offset && (idx_t) ids[i] < vector_offset + STANDARD_VECTOR_SIZE);
134 node->tuples[i] = ids[i] - vector_offset;
135 };
136 return node;
137}
138
139void UncompressedSegment::Fetch(ColumnScanState &state, idx_t vector_index, Vector &result) {
140 auto read_lock = lock.GetSharedLock();
141 InitializeScan(state);
142 FetchBaseData(state, vector_index, result);
143}
144
145//===--------------------------------------------------------------------===//
146// Filter
147//===--------------------------------------------------------------------===//
148template<class T>
149static void filterSelectionType(T *vec, T *predicate, SelectionVector &sel, idx_t &approved_tuple_count,
150 ExpressionType comparison_type, nullmask_t &nullmask) {
151 SelectionVector new_sel(approved_tuple_count);
152 // the inplace loops take the result as the last parameter
153 switch (comparison_type) {
154 case ExpressionType::COMPARE_EQUAL: {
155 if (nullmask.any()) {
156 approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, Equals, false, true, true, true, false>(
157 vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel);
158 } else {
159 approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, Equals, false, true, false, true, false>(
160 vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel);
161 }
162 break;
163 }
164 case ExpressionType::COMPARE_LESSTHAN: {
165 if (nullmask.any()) {
166 approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, LessThan, false, true, true, true, false>(
167 vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel);
168 } else {
169 approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, LessThan, false, true, false, true, false>(
170 vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel);
171 }
172 break;
173 }
174 case ExpressionType::COMPARE_GREATERTHAN: {
175 if (nullmask.any()) {
176 approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, GreaterThan, false, true, true, true, false>(
177 vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel);
178 } else {
179 approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, GreaterThan, false, true, false, true, false>(
180 vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel);
181 }
182 break;
183 }
184 case ExpressionType::COMPARE_LESSTHANOREQUALTO: {
185 if (nullmask.any()) {
186 approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, LessThanEquals, false, true, true, true, false>(
187 vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel);
188 } else {
189 approved_tuple_count =
190 BinaryExecutor::SelectFlatLoop<T, T, LessThanEquals, false, true, false, true, false>(
191 vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel);
192 }
193 break;
194 }
195 case ExpressionType::COMPARE_GREATERTHANOREQUALTO: {
196 if (nullmask.any()) {
197 approved_tuple_count =
198 BinaryExecutor::SelectFlatLoop<T, T, GreaterThanEquals, false, true, true, true, false>(
199 vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel);
200 } else {
201 approved_tuple_count =
202 BinaryExecutor::SelectFlatLoop<T, T, GreaterThanEquals, false, true, false, true, false>(
203 vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel);
204 }
205 break;
206 }
207 default:
208 throw NotImplementedException("Unknown comparison type for filter pushed down to table!");
209 }
210 sel.Initialize(new_sel);
211}
212
213void UncompressedSegment::filterSelection(SelectionVector &sel, Vector &result, TableFilter filter,
214 idx_t &approved_tuple_count, nullmask_t &nullmask) {
215 // the inplace loops take the result as the last parameter
216 switch (result.type) {
217 case TypeId::INT8: {
218 auto result_flat = FlatVector::GetData<int8_t>(result);
219 auto predicate_vector = Vector(filter.constant.value_.tinyint);
220 auto predicate = FlatVector::GetData<int8_t>(predicate_vector);
221 filterSelectionType<int8_t>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type,
222 nullmask);
223 break;
224 }
225 case TypeId::INT16: {
226 auto result_flat = FlatVector::GetData<int16_t>(result);
227 auto predicate_vector = Vector(filter.constant.value_.smallint);
228 auto predicate = FlatVector::GetData<int16_t>(predicate_vector);
229 filterSelectionType<int16_t>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type,
230 nullmask);
231 break;
232 }
233 case TypeId::INT32: {
234 auto result_flat = FlatVector::GetData<int32_t>(result);
235 auto predicate_vector = Vector(filter.constant.value_.integer);
236 auto predicate = FlatVector::GetData<int32_t>(predicate_vector);
237 filterSelectionType<int32_t>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type,
238 nullmask);
239 break;
240 }
241 case TypeId::INT64: {
242 auto result_flat = FlatVector::GetData<int64_t>(result);
243 auto predicate_vector = Vector(filter.constant.value_.bigint);
244 auto predicate = FlatVector::GetData<int64_t>(predicate_vector);
245 filterSelectionType<int64_t>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type,
246 nullmask);
247 break;
248 }
249 case TypeId::FLOAT: {
250 auto result_flat = FlatVector::GetData<float>(result);
251 auto predicate_vector = Vector(filter.constant.value_.float_);
252 auto predicate = FlatVector::GetData<float>(predicate_vector);
253 filterSelectionType<float>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type,
254 nullmask);
255 break;
256 }
257 case TypeId::DOUBLE: {
258 auto result_flat = FlatVector::GetData<double>(result);
259 auto predicate_vector = Vector(filter.constant.value_.double_);
260 auto predicate = FlatVector::GetData<double>(predicate_vector);
261 filterSelectionType<double>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type,
262 nullmask);
263 break;
264 }
265 case TypeId::VARCHAR: {
266 auto result_flat = FlatVector::GetData<string_t>(result);
267 auto predicate_vector = Vector(filter.constant.str_value);
268 auto predicate = FlatVector::GetData<string_t>(predicate_vector);
269 filterSelectionType<string_t>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type,
270 nullmask);
271 break;
272 }
273 default:
274 throw InvalidTypeException(result.type, "Invalid type for filter pushed down to table comparison");
275 }
276}
277
278void UncompressedSegment::Select(Transaction &transaction, Vector &result, vector<TableFilter> &tableFilters,
279 SelectionVector &sel, idx_t &approved_tuple_count, ColumnScanState &state) {
280 auto read_lock = lock.GetSharedLock();
281 if (versions && versions[state.vector_index]) {
282 Scan(transaction, state, state.vector_index, result, false);
283 auto vector_index = state.vector_index;
284 // pin the buffer for this segment
285 auto handle = manager.Pin(block_id);
286 auto data = handle->node->buffer;
287 auto offset = vector_index * vector_size;
288 auto source_nullmask = (nullmask_t *) (data + offset);
289 for (auto &table_filter : tableFilters) {
290 filterSelection(sel, result, table_filter, approved_tuple_count, *source_nullmask);
291 }
292 } else {
293 //! Select the data from the base table
294 Select(state, result, sel, approved_tuple_count, tableFilters);
295 }
296}
297
298//===--------------------------------------------------------------------===//
299// Scan
300//===--------------------------------------------------------------------===//
301void UncompressedSegment::Scan(Transaction &transaction, ColumnScanState &state, idx_t vector_index, Vector &result,
302 bool get_lock) {
303 unique_ptr<StorageLockKey> read_lock;
304 if (get_lock) {
305 read_lock = lock.GetSharedLock();
306 }
307 // first fetch the data from the base table
308 FetchBaseData(state, vector_index, result);
309 if (versions && versions[vector_index]) {
310 // if there are any versions, check if we need to overwrite the data with the versioned data
311 FetchUpdateData(state, transaction, versions[vector_index], result);
312 }
313}
314
315void UncompressedSegment::FilterScan(Transaction &transaction, ColumnScanState &state, Vector &result,
316 SelectionVector &sel, idx_t &approved_tuple_count) {
317 auto read_lock = lock.GetSharedLock();
318 if (versions && versions[state.vector_index]) {
319 // if there are any versions, we do a regular scan
320 Scan(transaction, state, state.vector_index, result, false);
321 result.Slice(sel, approved_tuple_count);
322 } else {
323 FilterFetchBaseData(state, result, sel, approved_tuple_count);
324 }
325}
326
327void UncompressedSegment::IndexScan(ColumnScanState &state, idx_t vector_index, Vector &result) {
328 if (vector_index == 0) {
329 // vector_index = 0, obtain a shared lock on the segment that we keep until the index scan is complete
330 state.locks.push_back(lock.GetSharedLock());
331 }
332 if (versions && versions[vector_index]) {
333 throw TransactionException("Cannot create index with outstanding updates");
334 }
335 FetchBaseData(state, vector_index, result);
336}
337
338//===--------------------------------------------------------------------===//
339// Update
340//===--------------------------------------------------------------------===//
341void UncompressedSegment::CleanupUpdate(UpdateInfo *info) {
342 if (info->prev) {
343 // there is a prev info: remove from the chain
344 auto prev = info->prev;
345 prev->next = info->next;
346 if (prev->next) {
347 prev->next->prev = prev;
348 }
349 } else {
350 // there is no prev info: remove from base segment
351 info->segment->versions[info->vector_index] = info->next;
352 if (info->next) {
353 info->next->prev = nullptr;
354 }
355 }
356}
357
358//===--------------------------------------------------------------------===//
359// ToTemporary
360//===--------------------------------------------------------------------===//
361void UncompressedSegment::ToTemporary() {
362 auto write_lock = lock.GetExclusiveLock();
363
364 if (block_id >= MAXIMUM_BLOCK) {
365 // conversion has already been performed by a different thread
366 return;
367 }
368 // pin the current block
369 auto current = manager.Pin(block_id);
370
371 // now allocate a new block from the buffer manager
372 auto handle = manager.Allocate(Storage::BLOCK_ALLOC_SIZE);
373 // now copy the data over and switch to using the new block id
374 memcpy(handle->node->buffer, current->node->buffer, Storage::BLOCK_SIZE);
375 this->block_id = handle->block_id;
376}
377