1//===--------------------------------------------------------------------===//
2// row_scatter.cpp
3// Description: This file contains the implementation of the row scattering
4// operators
5//===--------------------------------------------------------------------===//
6
7#include "duckdb/common/exception.hpp"
8#include "duckdb/common/helper.hpp"
9#include "duckdb/common/row_operations/row_operations.hpp"
10#include "duckdb/common/types/null_value.hpp"
11#include "duckdb/common/types/row/row_data_collection.hpp"
12#include "duckdb/common/types/row/row_layout.hpp"
13#include "duckdb/common/types/selection_vector.hpp"
14#include "duckdb/common/types/vector.hpp"
15
16namespace duckdb {
17
18using ValidityBytes = RowLayout::ValidityBytes;
19
20template <class T>
21static void TemplatedScatter(UnifiedVectorFormat &col, Vector &rows, const SelectionVector &sel, const idx_t count,
22 const idx_t col_offset, const idx_t col_no) {
23 auto data = UnifiedVectorFormat::GetData<T>(col);
24 auto ptrs = FlatVector::GetData<data_ptr_t>(vector&: rows);
25
26 if (!col.validity.AllValid()) {
27 for (idx_t i = 0; i < count; i++) {
28 auto idx = sel.get_index(idx: i);
29 auto col_idx = col.sel->get_index(idx);
30 auto row = ptrs[idx];
31
32 auto isnull = !col.validity.RowIsValid(row_idx: col_idx);
33 T store_value = isnull ? NullValue<T>() : data[col_idx];
34 Store<T>(store_value, row + col_offset);
35 if (isnull) {
36 ValidityBytes col_mask(ptrs[idx]);
37 col_mask.SetInvalidUnsafe(col_no);
38 }
39 }
40 } else {
41 for (idx_t i = 0; i < count; i++) {
42 auto idx = sel.get_index(idx: i);
43 auto col_idx = col.sel->get_index(idx);
44 auto row = ptrs[idx];
45
46 Store<T>(data[col_idx], row + col_offset);
47 }
48 }
49}
50
51static void ComputeStringEntrySizes(const UnifiedVectorFormat &col, idx_t entry_sizes[], const SelectionVector &sel,
52 const idx_t count, const idx_t offset = 0) {
53 auto data = UnifiedVectorFormat::GetData<string_t>(format: col);
54 for (idx_t i = 0; i < count; i++) {
55 auto idx = sel.get_index(idx: i);
56 auto col_idx = col.sel->get_index(idx) + offset;
57 const auto &str = data[col_idx];
58 if (col.validity.RowIsValid(row_idx: col_idx) && !str.IsInlined()) {
59 entry_sizes[i] += str.GetSize();
60 }
61 }
62}
63
64static void ScatterStringVector(UnifiedVectorFormat &col, Vector &rows, data_ptr_t str_locations[],
65 const SelectionVector &sel, const idx_t count, const idx_t col_offset,
66 const idx_t col_no) {
67 auto string_data = UnifiedVectorFormat::GetData<string_t>(format: col);
68 auto ptrs = FlatVector::GetData<data_ptr_t>(vector&: rows);
69
70 // Write out zero length to avoid swizzling problems.
71 const string_t null(nullptr, 0);
72 for (idx_t i = 0; i < count; i++) {
73 auto idx = sel.get_index(idx: i);
74 auto col_idx = col.sel->get_index(idx);
75 auto row = ptrs[idx];
76 if (!col.validity.RowIsValid(row_idx: col_idx)) {
77 ValidityBytes col_mask(row);
78 col_mask.SetInvalidUnsafe(col_no);
79 Store<string_t>(val: null, ptr: row + col_offset);
80 } else if (string_data[col_idx].IsInlined()) {
81 Store<string_t>(val: string_data[col_idx], ptr: row + col_offset);
82 } else {
83 const auto &str = string_data[col_idx];
84 string_t inserted(const_char_ptr_cast(src: str_locations[i]), str.GetSize());
85 memcpy(dest: inserted.GetDataWriteable(), src: str.GetData(), n: str.GetSize());
86 str_locations[i] += str.GetSize();
87 inserted.Finalize();
88 Store<string_t>(val: inserted, ptr: row + col_offset);
89 }
90 }
91}
92
93static void ScatterNestedVector(Vector &vec, UnifiedVectorFormat &col, Vector &rows, data_ptr_t data_locations[],
94 const SelectionVector &sel, const idx_t count, const idx_t col_offset,
95 const idx_t col_no, const idx_t vcount) {
96 // Store pointers to the data in the row
97 // Do this first because SerializeVector destroys the locations
98 auto ptrs = FlatVector::GetData<data_ptr_t>(vector&: rows);
99 data_ptr_t validitymask_locations[STANDARD_VECTOR_SIZE];
100 for (idx_t i = 0; i < count; i++) {
101 auto idx = sel.get_index(idx: i);
102 auto row = ptrs[idx];
103 validitymask_locations[i] = row;
104
105 Store<data_ptr_t>(val: data_locations[i], ptr: row + col_offset);
106 }
107
108 // Serialise the data
109 RowOperations::HeapScatter(v&: vec, vcount, sel, ser_count: count, col_idx: col_no, key_locations: data_locations, validitymask_locations);
110}
111
112void RowOperations::Scatter(DataChunk &columns, UnifiedVectorFormat col_data[], const RowLayout &layout, Vector &rows,
113 RowDataCollection &string_heap, const SelectionVector &sel, idx_t count) {
114 if (count == 0) {
115 return;
116 }
117
118 // Set the validity mask for each row before inserting data
119 auto ptrs = FlatVector::GetData<data_ptr_t>(vector&: rows);
120 for (idx_t i = 0; i < count; ++i) {
121 auto row_idx = sel.get_index(idx: i);
122 auto row = ptrs[row_idx];
123 ValidityBytes(row).SetAllValid(layout.ColumnCount());
124 }
125
126 const auto vcount = columns.size();
127 auto &offsets = layout.GetOffsets();
128 auto &types = layout.GetTypes();
129
130 // Compute the entry size of the variable size columns
131 vector<BufferHandle> handles;
132 data_ptr_t data_locations[STANDARD_VECTOR_SIZE];
133 if (!layout.AllConstant()) {
134 idx_t entry_sizes[STANDARD_VECTOR_SIZE];
135 std::fill_n(entry_sizes, count, sizeof(uint32_t));
136 for (idx_t col_no = 0; col_no < types.size(); col_no++) {
137 if (TypeIsConstantSize(type: types[col_no].InternalType())) {
138 continue;
139 }
140
141 auto &vec = columns.data[col_no];
142 auto &col = col_data[col_no];
143 switch (types[col_no].InternalType()) {
144 case PhysicalType::VARCHAR:
145 ComputeStringEntrySizes(col, entry_sizes, sel, count);
146 break;
147 case PhysicalType::LIST:
148 case PhysicalType::STRUCT:
149 RowOperations::ComputeEntrySizes(v&: vec, vdata&: col, entry_sizes, vcount, ser_count: count, sel);
150 break;
151 default:
152 throw InternalException("Unsupported type for RowOperations::Scatter");
153 }
154 }
155
156 // Build out the buffer space
157 handles = string_heap.Build(added_count: count, key_locations: data_locations, entry_sizes);
158
159 // Serialize information that is needed for swizzling if the computation goes out-of-core
160 const idx_t heap_pointer_offset = layout.GetHeapOffset();
161 for (idx_t i = 0; i < count; i++) {
162 auto row_idx = sel.get_index(idx: i);
163 auto row = ptrs[row_idx];
164 // Pointer to this row in the heap block
165 Store<data_ptr_t>(val: data_locations[i], ptr: row + heap_pointer_offset);
166 // Row size is stored in the heap in front of each row
167 Store<uint32_t>(val: entry_sizes[i], ptr: data_locations[i]);
168 data_locations[i] += sizeof(uint32_t);
169 }
170 }
171
172 for (idx_t col_no = 0; col_no < types.size(); col_no++) {
173 auto &vec = columns.data[col_no];
174 auto &col = col_data[col_no];
175 auto col_offset = offsets[col_no];
176
177 switch (types[col_no].InternalType()) {
178 case PhysicalType::BOOL:
179 case PhysicalType::INT8:
180 TemplatedScatter<int8_t>(col, rows, sel, count, col_offset, col_no);
181 break;
182 case PhysicalType::INT16:
183 TemplatedScatter<int16_t>(col, rows, sel, count, col_offset, col_no);
184 break;
185 case PhysicalType::INT32:
186 TemplatedScatter<int32_t>(col, rows, sel, count, col_offset, col_no);
187 break;
188 case PhysicalType::INT64:
189 TemplatedScatter<int64_t>(col, rows, sel, count, col_offset, col_no);
190 break;
191 case PhysicalType::UINT8:
192 TemplatedScatter<uint8_t>(col, rows, sel, count, col_offset, col_no);
193 break;
194 case PhysicalType::UINT16:
195 TemplatedScatter<uint16_t>(col, rows, sel, count, col_offset, col_no);
196 break;
197 case PhysicalType::UINT32:
198 TemplatedScatter<uint32_t>(col, rows, sel, count, col_offset, col_no);
199 break;
200 case PhysicalType::UINT64:
201 TemplatedScatter<uint64_t>(col, rows, sel, count, col_offset, col_no);
202 break;
203 case PhysicalType::INT128:
204 TemplatedScatter<hugeint_t>(col, rows, sel, count, col_offset, col_no);
205 break;
206 case PhysicalType::FLOAT:
207 TemplatedScatter<float>(col, rows, sel, count, col_offset, col_no);
208 break;
209 case PhysicalType::DOUBLE:
210 TemplatedScatter<double>(col, rows, sel, count, col_offset, col_no);
211 break;
212 case PhysicalType::INTERVAL:
213 TemplatedScatter<interval_t>(col, rows, sel, count, col_offset, col_no);
214 break;
215 case PhysicalType::VARCHAR:
216 ScatterStringVector(col, rows, str_locations: data_locations, sel, count, col_offset, col_no);
217 break;
218 case PhysicalType::LIST:
219 case PhysicalType::STRUCT:
220 ScatterNestedVector(vec, col, rows, data_locations, sel, count, col_offset, col_no, vcount);
221 break;
222 default:
223 throw InternalException("Unsupported type for RowOperations::Scatter");
224 }
225 }
226}
227
228} // namespace duckdb
229