1#include "duckdb/storage/table/column_segment.hpp"
2#include "duckdb/common/limits.hpp"
3#include "duckdb/storage/table/update_segment.hpp"
4#include "duckdb/common/types/null_value.hpp"
5#include "duckdb/common/types/vector.hpp"
6#include "duckdb/storage/table/append_state.hpp"
7#include "duckdb/storage/storage_manager.hpp"
8#include "duckdb/planner/filter/conjunction_filter.hpp"
9#include "duckdb/planner/filter/constant_filter.hpp"
10#include "duckdb/main/config.hpp"
11#include "duckdb/storage/table/scan_state.hpp"
12
13#include <cstring>
14
15namespace duckdb {
16
17unique_ptr<ColumnSegment> ColumnSegment::CreatePersistentSegment(DatabaseInstance &db, BlockManager &block_manager,
18 block_id_t block_id, idx_t offset,
19 const LogicalType &type, idx_t start, idx_t count,
20 CompressionType compression_type,
21 BaseStatistics statistics) {
22 auto &config = DBConfig::GetConfig(db);
23 optional_ptr<CompressionFunction> function;
24 shared_ptr<BlockHandle> block;
25 if (block_id == INVALID_BLOCK) {
26 // constant segment, no need to allocate an actual block
27 function = config.GetCompressionFunction(type: CompressionType::COMPRESSION_CONSTANT, data_type: type.InternalType());
28 } else {
29 function = config.GetCompressionFunction(type: compression_type, data_type: type.InternalType());
30 block = block_manager.RegisterBlock(block_id);
31 }
32 auto segment_size = Storage::BLOCK_SIZE;
33 return make_uniq<ColumnSegment>(args&: db, args: std::move(block), args: type, args: ColumnSegmentType::PERSISTENT, args&: start, args&: count, args&: *function,
34 args: std::move(statistics), args&: block_id, args&: offset, args&: segment_size);
35}
36
37unique_ptr<ColumnSegment> ColumnSegment::CreateTransientSegment(DatabaseInstance &db, const LogicalType &type,
38 idx_t start, idx_t segment_size) {
39 auto &config = DBConfig::GetConfig(db);
40 auto function = config.GetCompressionFunction(type: CompressionType::COMPRESSION_UNCOMPRESSED, data_type: type.InternalType());
41 auto &buffer_manager = BufferManager::GetBufferManager(db);
42 shared_ptr<BlockHandle> block;
43 // transient: allocate a buffer for the uncompressed segment
44 if (segment_size < Storage::BLOCK_SIZE) {
45 block = buffer_manager.RegisterSmallMemory(block_size: segment_size);
46 } else {
47 buffer_manager.Allocate(block_size: segment_size, can_destroy: false, block: &block);
48 }
49 return make_uniq<ColumnSegment>(args&: db, args: std::move(block), args: type, args: ColumnSegmentType::TRANSIENT, args&: start, args: 0, args&: *function,
50 args: BaseStatistics::CreateEmpty(type), INVALID_BLOCK, args: 0, args&: segment_size);
51}
52
53unique_ptr<ColumnSegment> ColumnSegment::CreateSegment(ColumnSegment &other, idx_t start) {
54 return make_uniq<ColumnSegment>(args&: other, args&: start);
55}
56
57ColumnSegment::ColumnSegment(DatabaseInstance &db, shared_ptr<BlockHandle> block, LogicalType type_p,
58 ColumnSegmentType segment_type, idx_t start, idx_t count, CompressionFunction &function_p,
59 BaseStatistics statistics, block_id_t block_id_p, idx_t offset_p, idx_t segment_size_p)
60 : SegmentBase<ColumnSegment>(start, count), db(db), type(std::move(type_p)),
61 type_size(GetTypeIdSize(type: type.InternalType())), segment_type(segment_type), function(function_p),
62 stats(std::move(statistics)), block(std::move(block)), block_id(block_id_p), offset(offset_p),
63 segment_size(segment_size_p) {
64 if (function.get().init_segment) {
65 segment_state = function.get().init_segment(*this, block_id);
66 }
67}
68
69ColumnSegment::ColumnSegment(ColumnSegment &other, idx_t start)
70 : SegmentBase<ColumnSegment>(start, other.count.load()), db(other.db), type(std::move(other.type)),
71 type_size(other.type_size), segment_type(other.segment_type), function(other.function),
72 stats(std::move(other.stats)), block(std::move(other.block)), block_id(other.block_id), offset(other.offset),
73 segment_size(other.segment_size), segment_state(std::move(other.segment_state)) {
74}
75
76ColumnSegment::~ColumnSegment() {
77}
78
79//===--------------------------------------------------------------------===//
80// Scan
81//===--------------------------------------------------------------------===//
82void ColumnSegment::InitializeScan(ColumnScanState &state) {
83 state.scan_state = function.get().init_scan(*this);
84}
85
86void ColumnSegment::Scan(ColumnScanState &state, idx_t scan_count, Vector &result, idx_t result_offset,
87 bool entire_vector) {
88 if (entire_vector) {
89 D_ASSERT(result_offset == 0);
90 Scan(state, scan_count, result);
91 } else {
92 D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR);
93 ScanPartial(state, scan_count, result, result_offset);
94 D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR);
95 }
96}
97
98void ColumnSegment::Skip(ColumnScanState &state) {
99 function.get().skip(*this, state, state.row_index - state.internal_index);
100 state.internal_index = state.row_index;
101}
102
103void ColumnSegment::Scan(ColumnScanState &state, idx_t scan_count, Vector &result) {
104 function.get().scan_vector(*this, state, scan_count, result);
105}
106
107void ColumnSegment::ScanPartial(ColumnScanState &state, idx_t scan_count, Vector &result, idx_t result_offset) {
108 function.get().scan_partial(*this, state, scan_count, result, result_offset);
109}
110
111//===--------------------------------------------------------------------===//
112// Fetch
113//===--------------------------------------------------------------------===//
114void ColumnSegment::FetchRow(ColumnFetchState &state, row_t row_id, Vector &result, idx_t result_idx) {
115 function.get().fetch_row(*this, state, row_id - this->start, result, result_idx);
116}
117
118//===--------------------------------------------------------------------===//
119// Append
120//===--------------------------------------------------------------------===//
121idx_t ColumnSegment::SegmentSize() const {
122 return segment_size;
123}
124
125void ColumnSegment::Resize(idx_t new_size) {
126 D_ASSERT(new_size > this->segment_size);
127 D_ASSERT(offset == 0);
128 auto &buffer_manager = BufferManager::GetBufferManager(db);
129 auto old_handle = buffer_manager.Pin(handle&: block);
130 shared_ptr<BlockHandle> new_block;
131 auto new_handle = buffer_manager.Allocate(block_size: Storage::BLOCK_SIZE, can_destroy: false, block: &new_block);
132 memcpy(dest: new_handle.Ptr(), src: old_handle.Ptr(), n: segment_size);
133 this->block_id = new_block->BlockId();
134 this->block = std::move(new_block);
135 this->segment_size = new_size;
136}
137
138void ColumnSegment::InitializeAppend(ColumnAppendState &state) {
139 D_ASSERT(segment_type == ColumnSegmentType::TRANSIENT);
140 if (!function.get().init_append) {
141 throw InternalException("Attempting to init append to a segment without init_append method");
142 }
143 state.append_state = function.get().init_append(*this);
144}
145
146idx_t ColumnSegment::Append(ColumnAppendState &state, UnifiedVectorFormat &append_data, idx_t offset, idx_t count) {
147 D_ASSERT(segment_type == ColumnSegmentType::TRANSIENT);
148 if (!function.get().append) {
149 throw InternalException("Attempting to append to a segment without append method");
150 }
151 return function.get().append(*state.append_state, *this, stats, append_data, offset, count);
152}
153
154idx_t ColumnSegment::FinalizeAppend(ColumnAppendState &state) {
155 D_ASSERT(segment_type == ColumnSegmentType::TRANSIENT);
156 if (!function.get().finalize_append) {
157 throw InternalException("Attempting to call FinalizeAppend on a segment without a finalize_append method");
158 }
159 auto result_count = function.get().finalize_append(*this, stats);
160 state.append_state.reset();
161 return result_count;
162}
163
164void ColumnSegment::RevertAppend(idx_t start_row) {
165 D_ASSERT(segment_type == ColumnSegmentType::TRANSIENT);
166 if (function.get().revert_append) {
167 function.get().revert_append(*this, start_row);
168 }
169 this->count = start_row - this->start;
170}
171
172//===--------------------------------------------------------------------===//
173// Convert To Persistent
174//===--------------------------------------------------------------------===//
175void ColumnSegment::ConvertToPersistent(optional_ptr<BlockManager> block_manager, block_id_t block_id_p) {
176 D_ASSERT(segment_type == ColumnSegmentType::TRANSIENT);
177 segment_type = ColumnSegmentType::PERSISTENT;
178
179 block_id = block_id_p;
180 offset = 0;
181
182 if (block_id == INVALID_BLOCK) {
183 // constant block: reset the block buffer
184 D_ASSERT(stats.statistics.IsConstant());
185 block.reset();
186 } else {
187 D_ASSERT(!stats.statistics.IsConstant());
188 // non-constant block: write the block to disk
189 // the data for the block already exists in-memory of our block
190 // instead of copying the data we alter some metadata so the buffer points to an on-disk block
191 block = block_manager->ConvertToPersistent(block_id, old_block: std::move(block));
192 }
193
194 segment_state.reset();
195 if (function.get().init_segment) {
196 segment_state = function.get().init_segment(*this, block_id);
197 }
198}
199
200void ColumnSegment::MarkAsPersistent(shared_ptr<BlockHandle> block_p, uint32_t offset_p) {
201 D_ASSERT(segment_type == ColumnSegmentType::TRANSIENT);
202 segment_type = ColumnSegmentType::PERSISTENT;
203
204 block_id = block_p->BlockId();
205 offset = offset_p;
206 block = std::move(block_p);
207
208 segment_state.reset();
209 if (function.get().init_segment) {
210 segment_state = function.get().init_segment(*this, block_id);
211 }
212}
213
214//===--------------------------------------------------------------------===//
215// Filter Selection
216//===--------------------------------------------------------------------===//
217template <class T, class OP, bool HAS_NULL>
218static idx_t TemplatedFilterSelection(T *vec, T predicate, SelectionVector &sel, idx_t approved_tuple_count,
219 ValidityMask &mask, SelectionVector &result_sel) {
220 idx_t result_count = 0;
221 for (idx_t i = 0; i < approved_tuple_count; i++) {
222 auto idx = sel.get_index(idx: i);
223 if ((!HAS_NULL || mask.RowIsValid(row_idx: idx)) && OP::Operation(vec[idx], predicate)) {
224 result_sel.set_index(idx: result_count++, loc: idx);
225 }
226 }
227 return result_count;
228}
229
230template <class T>
231static void FilterSelectionSwitch(T *vec, T predicate, SelectionVector &sel, idx_t &approved_tuple_count,
232 ExpressionType comparison_type, ValidityMask &mask) {
233 SelectionVector new_sel(approved_tuple_count);
234 // the inplace loops take the result as the last parameter
235 switch (comparison_type) {
236 case ExpressionType::COMPARE_EQUAL: {
237 if (mask.AllValid()) {
238 approved_tuple_count =
239 TemplatedFilterSelection<T, Equals, false>(vec, predicate, sel, approved_tuple_count, mask, new_sel);
240 } else {
241 approved_tuple_count =
242 TemplatedFilterSelection<T, Equals, true>(vec, predicate, sel, approved_tuple_count, mask, new_sel);
243 }
244 break;
245 }
246 case ExpressionType::COMPARE_NOTEQUAL: {
247 if (mask.AllValid()) {
248 approved_tuple_count =
249 TemplatedFilterSelection<T, NotEquals, false>(vec, predicate, sel, approved_tuple_count, mask, new_sel);
250 } else {
251 approved_tuple_count =
252 TemplatedFilterSelection<T, NotEquals, true>(vec, predicate, sel, approved_tuple_count, mask, new_sel);
253 }
254 break;
255 }
256 case ExpressionType::COMPARE_LESSTHAN: {
257 if (mask.AllValid()) {
258 approved_tuple_count =
259 TemplatedFilterSelection<T, LessThan, false>(vec, predicate, sel, approved_tuple_count, mask, new_sel);
260 } else {
261 approved_tuple_count =
262 TemplatedFilterSelection<T, LessThan, true>(vec, predicate, sel, approved_tuple_count, mask, new_sel);
263 }
264 break;
265 }
266 case ExpressionType::COMPARE_GREATERTHAN: {
267 if (mask.AllValid()) {
268 approved_tuple_count = TemplatedFilterSelection<T, GreaterThan, false>(vec, predicate, sel,
269 approved_tuple_count, mask, new_sel);
270 } else {
271 approved_tuple_count = TemplatedFilterSelection<T, GreaterThan, true>(vec, predicate, sel,
272 approved_tuple_count, mask, new_sel);
273 }
274 break;
275 }
276 case ExpressionType::COMPARE_LESSTHANOREQUALTO: {
277 if (mask.AllValid()) {
278 approved_tuple_count = TemplatedFilterSelection<T, LessThanEquals, false>(
279 vec, predicate, sel, approved_tuple_count, mask, new_sel);
280 } else {
281 approved_tuple_count = TemplatedFilterSelection<T, LessThanEquals, true>(
282 vec, predicate, sel, approved_tuple_count, mask, new_sel);
283 }
284 break;
285 }
286 case ExpressionType::COMPARE_GREATERTHANOREQUALTO: {
287 if (mask.AllValid()) {
288 approved_tuple_count = TemplatedFilterSelection<T, GreaterThanEquals, false>(
289 vec, predicate, sel, approved_tuple_count, mask, new_sel);
290 } else {
291 approved_tuple_count = TemplatedFilterSelection<T, GreaterThanEquals, true>(
292 vec, predicate, sel, approved_tuple_count, mask, new_sel);
293 }
294 break;
295 }
296 default:
297 throw NotImplementedException("Unknown comparison type for filter pushed down to table!");
298 }
299 sel.Initialize(other: new_sel);
300}
301
302template <bool IS_NULL>
303static idx_t TemplatedNullSelection(SelectionVector &sel, idx_t &approved_tuple_count, ValidityMask &mask) {
304 if (mask.AllValid()) {
305 // no NULL values
306 if (IS_NULL) {
307 approved_tuple_count = 0;
308 return 0;
309 } else {
310 return approved_tuple_count;
311 }
312 } else {
313 SelectionVector result_sel(approved_tuple_count);
314 idx_t result_count = 0;
315 for (idx_t i = 0; i < approved_tuple_count; i++) {
316 auto idx = sel.get_index(idx: i);
317 if (mask.RowIsValid(row_idx: idx) != IS_NULL) {
318 result_sel.set_index(idx: result_count++, loc: idx);
319 }
320 }
321 sel.Initialize(other: result_sel);
322 approved_tuple_count = result_count;
323 return result_count;
324 }
325}
326
327idx_t ColumnSegment::FilterSelection(SelectionVector &sel, Vector &result, const TableFilter &filter,
328 idx_t &approved_tuple_count, ValidityMask &mask) {
329 switch (filter.filter_type) {
330 case TableFilterType::CONJUNCTION_OR: {
331 // similar to the CONJUNCTION_AND, but we need to take care of the SelectionVectors (OR all of them)
332 idx_t count_total = 0;
333 SelectionVector result_sel(approved_tuple_count);
334 auto &conjunction_or = filter.Cast<ConjunctionOrFilter>();
335 for (auto &child_filter : conjunction_or.child_filters) {
336 SelectionVector temp_sel;
337 temp_sel.Initialize(other: sel);
338 idx_t temp_tuple_count = approved_tuple_count;
339 idx_t temp_count = FilterSelection(sel&: temp_sel, result, filter: *child_filter, approved_tuple_count&: temp_tuple_count, mask);
340 // tuples passed, move them into the actual result vector
341 for (idx_t i = 0; i < temp_count; i++) {
342 auto new_idx = temp_sel.get_index(idx: i);
343 bool is_new_idx = true;
344 for (idx_t res_idx = 0; res_idx < count_total; res_idx++) {
345 if (result_sel.get_index(idx: res_idx) == new_idx) {
346 is_new_idx = false;
347 break;
348 }
349 }
350 if (is_new_idx) {
351 result_sel.set_index(idx: count_total++, loc: new_idx);
352 }
353 }
354 }
355 sel.Initialize(other: result_sel);
356 approved_tuple_count = count_total;
357 return approved_tuple_count;
358 }
359 case TableFilterType::CONJUNCTION_AND: {
360 auto &conjunction_and = filter.Cast<ConjunctionAndFilter>();
361 for (auto &child_filter : conjunction_and.child_filters) {
362 FilterSelection(sel, result, filter: *child_filter, approved_tuple_count, mask);
363 }
364 return approved_tuple_count;
365 }
366 case TableFilterType::CONSTANT_COMPARISON: {
367 auto &constant_filter = filter.Cast<ConstantFilter>();
368 // the inplace loops take the result as the last parameter
369 switch (result.GetType().InternalType()) {
370 case PhysicalType::UINT8: {
371 auto result_flat = FlatVector::GetData<uint8_t>(vector&: result);
372 auto predicate = UTinyIntValue::Get(value: constant_filter.constant);
373 FilterSelectionSwitch<uint8_t>(vec: result_flat, predicate, sel, approved_tuple_count,
374 comparison_type: constant_filter.comparison_type, mask);
375 break;
376 }
377 case PhysicalType::UINT16: {
378 auto result_flat = FlatVector::GetData<uint16_t>(vector&: result);
379 auto predicate = USmallIntValue::Get(value: constant_filter.constant);
380 FilterSelectionSwitch<uint16_t>(vec: result_flat, predicate, sel, approved_tuple_count,
381 comparison_type: constant_filter.comparison_type, mask);
382 break;
383 }
384 case PhysicalType::UINT32: {
385 auto result_flat = FlatVector::GetData<uint32_t>(vector&: result);
386 auto predicate = UIntegerValue::Get(value: constant_filter.constant);
387 FilterSelectionSwitch<uint32_t>(vec: result_flat, predicate, sel, approved_tuple_count,
388 comparison_type: constant_filter.comparison_type, mask);
389 break;
390 }
391 case PhysicalType::UINT64: {
392 auto result_flat = FlatVector::GetData<uint64_t>(vector&: result);
393 auto predicate = UBigIntValue::Get(value: constant_filter.constant);
394 FilterSelectionSwitch<uint64_t>(vec: result_flat, predicate, sel, approved_tuple_count,
395 comparison_type: constant_filter.comparison_type, mask);
396 break;
397 }
398 case PhysicalType::INT8: {
399 auto result_flat = FlatVector::GetData<int8_t>(vector&: result);
400 auto predicate = TinyIntValue::Get(value: constant_filter.constant);
401 FilterSelectionSwitch<int8_t>(vec: result_flat, predicate, sel, approved_tuple_count,
402 comparison_type: constant_filter.comparison_type, mask);
403 break;
404 }
405 case PhysicalType::INT16: {
406 auto result_flat = FlatVector::GetData<int16_t>(vector&: result);
407 auto predicate = SmallIntValue::Get(value: constant_filter.constant);
408 FilterSelectionSwitch<int16_t>(vec: result_flat, predicate, sel, approved_tuple_count,
409 comparison_type: constant_filter.comparison_type, mask);
410 break;
411 }
412 case PhysicalType::INT32: {
413 auto result_flat = FlatVector::GetData<int32_t>(vector&: result);
414 auto predicate = IntegerValue::Get(value: constant_filter.constant);
415 FilterSelectionSwitch<int32_t>(vec: result_flat, predicate, sel, approved_tuple_count,
416 comparison_type: constant_filter.comparison_type, mask);
417 break;
418 }
419 case PhysicalType::INT64: {
420 auto result_flat = FlatVector::GetData<int64_t>(vector&: result);
421 auto predicate = BigIntValue::Get(value: constant_filter.constant);
422 FilterSelectionSwitch<int64_t>(vec: result_flat, predicate, sel, approved_tuple_count,
423 comparison_type: constant_filter.comparison_type, mask);
424 break;
425 }
426 case PhysicalType::INT128: {
427 auto result_flat = FlatVector::GetData<hugeint_t>(vector&: result);
428 auto predicate = HugeIntValue::Get(value: constant_filter.constant);
429 FilterSelectionSwitch<hugeint_t>(vec: result_flat, predicate, sel, approved_tuple_count,
430 comparison_type: constant_filter.comparison_type, mask);
431 break;
432 }
433 case PhysicalType::FLOAT: {
434 auto result_flat = FlatVector::GetData<float>(vector&: result);
435 auto predicate = FloatValue::Get(value: constant_filter.constant);
436 FilterSelectionSwitch<float>(vec: result_flat, predicate, sel, approved_tuple_count,
437 comparison_type: constant_filter.comparison_type, mask);
438 break;
439 }
440 case PhysicalType::DOUBLE: {
441 auto result_flat = FlatVector::GetData<double>(vector&: result);
442 auto predicate = DoubleValue::Get(value: constant_filter.constant);
443 FilterSelectionSwitch<double>(vec: result_flat, predicate, sel, approved_tuple_count,
444 comparison_type: constant_filter.comparison_type, mask);
445 break;
446 }
447 case PhysicalType::VARCHAR: {
448 auto result_flat = FlatVector::GetData<string_t>(vector&: result);
449 auto predicate = string_t(StringValue::Get(value: constant_filter.constant));
450 FilterSelectionSwitch<string_t>(vec: result_flat, predicate, sel, approved_tuple_count,
451 comparison_type: constant_filter.comparison_type, mask);
452 break;
453 }
454 case PhysicalType::BOOL: {
455 auto result_flat = FlatVector::GetData<bool>(vector&: result);
456 auto predicate = BooleanValue::Get(value: constant_filter.constant);
457 FilterSelectionSwitch<bool>(vec: result_flat, predicate, sel, approved_tuple_count,
458 comparison_type: constant_filter.comparison_type, mask);
459 break;
460 }
461 default:
462 throw InvalidTypeException(result.GetType(), "Invalid type for filter pushed down to table comparison");
463 }
464 return approved_tuple_count;
465 }
466 case TableFilterType::IS_NULL:
467 return TemplatedNullSelection<true>(sel, approved_tuple_count, mask);
468 case TableFilterType::IS_NOT_NULL:
469 return TemplatedNullSelection<false>(sel, approved_tuple_count, mask);
470 default:
471 throw InternalException("FIXME: unsupported type for filter selection");
472 }
473}
474
475} // namespace duckdb
476