1 | #include "duckdb/common/types/chunk_collection.hpp" |
2 | |
3 | #include "duckdb/common/exception.hpp" |
4 | #include "duckdb/common/printer.hpp" |
5 | #include "duckdb/common/value_operations/value_operations.hpp" |
6 | #include "duckdb/common/operator/comparison_operators.hpp" |
7 | #include "duckdb/common/assert.hpp" |
8 | |
9 | #include <algorithm> |
10 | #include <cstring> |
11 | |
12 | using namespace duckdb; |
13 | using namespace std; |
14 | |
15 | void ChunkCollection::Verify() { |
16 | #ifdef DEBUG |
17 | for (auto &chunk : chunks) { |
18 | chunk->Verify(); |
19 | } |
20 | #endif |
21 | } |
22 | |
23 | void ChunkCollection::Append(ChunkCollection &other) { |
24 | for (auto &chunk : other.chunks) { |
25 | Append(*chunk.get()); |
26 | } |
27 | } |
28 | |
29 | void ChunkCollection::Append(DataChunk &new_chunk) { |
30 | if (new_chunk.size() == 0) { |
31 | return; |
32 | } |
33 | new_chunk.Verify(); |
34 | |
35 | // we have to ensure that every chunk in the ChunkCollection is completely |
36 | // filled, otherwise our O(1) lookup in GetValue and SetValue does not work |
37 | // first fill the latest chunk, if it exists |
38 | count += new_chunk.size(); |
39 | |
40 | idx_t remaining_data = new_chunk.size(); |
41 | idx_t offset = 0; |
42 | if (chunks.size() == 0) { |
43 | // first chunk |
44 | types = new_chunk.GetTypes(); |
45 | } else { |
46 | // the types of the new chunk should match the types of the previous one |
47 | assert(types.size() == new_chunk.column_count()); |
48 | auto new_types = new_chunk.GetTypes(); |
49 | for (idx_t i = 0; i < types.size(); i++) { |
50 | if (new_types[i] != types[i]) { |
51 | throw TypeMismatchException(new_types[i], types[i], "Type mismatch when combining rows" ); |
52 | } |
53 | if (types[i] == TypeId::LIST) { |
54 | for (auto &chunk : |
55 | chunks) { // need to check all the chunks because they can have only-null list entries |
56 | auto &chunk_vec = chunk->data[i]; |
57 | auto &new_vec = new_chunk.data[i]; |
58 | if (ListVector::HasEntry(chunk_vec) && ListVector::HasEntry(new_vec)) { |
59 | auto &chunk_types = ListVector::GetEntry(chunk_vec).types; |
60 | auto &new_types = ListVector::GetEntry(new_vec).types; |
61 | if (chunk_types.size() > 0 && new_types.size() > 0 && chunk_types != new_types) { |
62 | throw TypeMismatchException(chunk_types[0], new_types[i], |
63 | "Type mismatch when combining lists" ); |
64 | } |
65 | } |
66 | } |
67 | } |
68 | // TODO check structs, too |
69 | } |
70 | |
71 | // first append data to the current chunk |
72 | DataChunk &last_chunk = *chunks.back(); |
73 | idx_t added_data = std::min(remaining_data, (idx_t)(STANDARD_VECTOR_SIZE - last_chunk.size())); |
74 | if (added_data > 0) { |
75 | // copy <added_data> elements to the last chunk |
76 | idx_t old_count = new_chunk.size(); |
77 | new_chunk.SetCardinality(added_data); |
78 | |
79 | last_chunk.Append(new_chunk); |
80 | remaining_data -= added_data; |
81 | // reset the chunk to the old data |
82 | new_chunk.SetCardinality(old_count); |
83 | offset = added_data; |
84 | } |
85 | } |
86 | |
87 | if (remaining_data > 0) { |
88 | // create a new chunk and fill it with the remainder |
89 | auto chunk = make_unique<DataChunk>(); |
90 | chunk->Initialize(types); |
91 | new_chunk.Copy(*chunk, offset); |
92 | chunks.push_back(move(chunk)); |
93 | } |
94 | } |
95 | |
96 | // returns an int similar to a C comparator: |
97 | // -1 if left < right |
98 | // 0 if left == right |
99 | // 1 if left > right |
100 | |
101 | template <class TYPE> |
102 | static int8_t templated_compare_value(Vector &left_vec, Vector &right_vec, idx_t left_idx, idx_t right_idx) { |
103 | assert(left_vec.type == right_vec.type); |
104 | auto left_val = FlatVector::GetData<TYPE>(left_vec)[left_idx]; |
105 | auto right_val = FlatVector::GetData<TYPE>(right_vec)[right_idx]; |
106 | if (Equals::Operation<TYPE>(left_val, right_val)) { |
107 | return 0; |
108 | } |
109 | if (LessThan::Operation<TYPE>(left_val, right_val)) { |
110 | return -1; |
111 | } |
112 | return 1; |
113 | } |
114 | |
115 | // return type here is int32 because strcmp() on some platforms returns rather large values |
116 | static int32_t compare_value(Vector &left_vec, Vector &right_vec, idx_t vector_idx_left, idx_t vector_idx_right) { |
117 | auto left_null = FlatVector::Nullmask(left_vec)[vector_idx_left]; |
118 | auto right_null = FlatVector::Nullmask(right_vec)[vector_idx_right]; |
119 | |
120 | if (left_null && right_null) { |
121 | return 0; |
122 | } else if (right_null) { |
123 | return 1; |
124 | } else if (left_null) { |
125 | return -1; |
126 | } |
127 | |
128 | switch (left_vec.type) { |
129 | case TypeId::BOOL: |
130 | case TypeId::INT8: |
131 | return templated_compare_value<int8_t>(left_vec, right_vec, vector_idx_left, vector_idx_right); |
132 | case TypeId::INT16: |
133 | return templated_compare_value<int16_t>(left_vec, right_vec, vector_idx_left, vector_idx_right); |
134 | case TypeId::INT32: |
135 | return templated_compare_value<int32_t>(left_vec, right_vec, vector_idx_left, vector_idx_right); |
136 | case TypeId::INT64: |
137 | return templated_compare_value<int64_t>(left_vec, right_vec, vector_idx_left, vector_idx_right); |
138 | case TypeId::FLOAT: |
139 | return templated_compare_value<float>(left_vec, right_vec, vector_idx_left, vector_idx_right); |
140 | case TypeId::DOUBLE: |
141 | return templated_compare_value<double>(left_vec, right_vec, vector_idx_left, vector_idx_right); |
142 | case TypeId::VARCHAR: |
143 | return templated_compare_value<string_t>(left_vec, right_vec, vector_idx_left, vector_idx_right); |
144 | default: |
145 | throw NotImplementedException("Type for comparison" ); |
146 | } |
147 | return false; |
148 | } |
149 | |
150 | static int compare_tuple(ChunkCollection *sort_by, vector<OrderType> &desc, idx_t left, idx_t right) { |
151 | assert(sort_by); |
152 | |
153 | idx_t chunk_idx_left = left / STANDARD_VECTOR_SIZE; |
154 | idx_t chunk_idx_right = right / STANDARD_VECTOR_SIZE; |
155 | idx_t vector_idx_left = left % STANDARD_VECTOR_SIZE; |
156 | idx_t vector_idx_right = right % STANDARD_VECTOR_SIZE; |
157 | |
158 | auto &left_chunk = sort_by->chunks[chunk_idx_left]; |
159 | auto &right_chunk = sort_by->chunks[chunk_idx_right]; |
160 | |
161 | for (idx_t col_idx = 0; col_idx < desc.size(); col_idx++) { |
162 | auto order_type = desc[col_idx]; |
163 | |
164 | Vector &left_vec = left_chunk->data[col_idx]; |
165 | Vector &right_vec = right_chunk->data[col_idx]; |
166 | |
167 | assert(left_vec.vector_type == VectorType::FLAT_VECTOR); |
168 | assert(right_vec.vector_type == VectorType::FLAT_VECTOR); |
169 | assert(left_vec.type == right_vec.type); |
170 | |
171 | auto comp_res = compare_value(left_vec, right_vec, vector_idx_left, vector_idx_right); |
172 | |
173 | if (comp_res == 0) { |
174 | continue; |
175 | } |
176 | return comp_res < 0 ? (order_type == OrderType::ASCENDING ? -1 : 1) |
177 | : (order_type == OrderType::ASCENDING ? 1 : -1); |
178 | } |
179 | return 0; |
180 | } |
181 | |
182 | static int64_t _quicksort_initial(ChunkCollection *sort_by, vector<OrderType> &desc, idx_t *result) { |
183 | // select pivot |
184 | int64_t pivot = 0; |
185 | int64_t low = 0, high = sort_by->count - 1; |
186 | // now insert elements |
187 | for (idx_t i = 1; i < sort_by->count; i++) { |
188 | if (compare_tuple(sort_by, desc, i, pivot) <= 0) { |
189 | result[low++] = i; |
190 | } else { |
191 | result[high--] = i; |
192 | } |
193 | } |
194 | assert(low == high); |
195 | result[low] = pivot; |
196 | return low; |
197 | } |
198 | |
199 | static void _quicksort_inplace(ChunkCollection *sort_by, vector<OrderType> &desc, idx_t *result, int64_t left, |
200 | int64_t right) { |
201 | if (left >= right) { |
202 | return; |
203 | } |
204 | |
205 | int64_t middle = left + (right - left) / 2; |
206 | int64_t pivot = result[middle]; |
207 | // move the mid point value to the front. |
208 | int64_t i = left + 1; |
209 | int64_t j = right; |
210 | |
211 | std::swap(result[middle], result[left]); |
212 | while (i <= j) { |
213 | while (i <= j && compare_tuple(sort_by, desc, result[i], pivot) <= 0) { |
214 | i++; |
215 | } |
216 | |
217 | while (i <= j && compare_tuple(sort_by, desc, result[j], pivot) > 0) { |
218 | j--; |
219 | } |
220 | |
221 | if (i < j) { |
222 | std::swap(result[i], result[j]); |
223 | } |
224 | } |
225 | std::swap(result[i - 1], result[left]); |
226 | int64_t part = i - 1; |
227 | |
228 | _quicksort_inplace(sort_by, desc, result, left, part - 1); |
229 | _quicksort_inplace(sort_by, desc, result, part + 1, right); |
230 | } |
231 | |
232 | void ChunkCollection::Sort(vector<OrderType> &desc, idx_t result[]) { |
233 | assert(result); |
234 | if (count == 0) |
235 | return; |
236 | // quicksort |
237 | int64_t part = _quicksort_initial(this, desc, result); |
238 | _quicksort_inplace(this, desc, result, 0, part); |
239 | _quicksort_inplace(this, desc, result, part + 1, count - 1); |
240 | } |
241 | |
242 | // FIXME make this more efficient by not using the Value API |
243 | // just use memcpy in the vectors |
244 | // assert that there is no selection list |
245 | void ChunkCollection::Reorder(idx_t order_org[]) { |
246 | auto order = unique_ptr<idx_t[]>(new idx_t[count]); |
247 | memcpy(order.get(), order_org, sizeof(idx_t) * count); |
248 | |
249 | // adapted from https://stackoverflow.com/a/7366196/2652376 |
250 | |
251 | auto val_buf = vector<Value>(); |
252 | val_buf.resize(column_count()); |
253 | |
254 | idx_t j, k; |
255 | for (idx_t i = 0; i < count; i++) { |
256 | for (idx_t col_idx = 0; col_idx < column_count(); col_idx++) { |
257 | val_buf[col_idx] = GetValue(col_idx, i); |
258 | } |
259 | j = i; |
260 | while (true) { |
261 | k = order[j]; |
262 | order[j] = j; |
263 | if (k == i) { |
264 | break; |
265 | } |
266 | for (idx_t col_idx = 0; col_idx < column_count(); col_idx++) { |
267 | SetValue(col_idx, j, GetValue(col_idx, k)); |
268 | } |
269 | j = k; |
270 | } |
271 | for (idx_t col_idx = 0; col_idx < column_count(); col_idx++) { |
272 | SetValue(col_idx, j, val_buf[col_idx]); |
273 | } |
274 | } |
275 | } |
276 | |
277 | template <class TYPE> |
278 | static void templated_set_values(ChunkCollection *src_coll, Vector &tgt_vec, idx_t order[], idx_t col_idx, |
279 | idx_t start_offset, idx_t remaining_data) { |
280 | assert(src_coll); |
281 | |
282 | for (idx_t row_idx = 0; row_idx < remaining_data; row_idx++) { |
283 | idx_t chunk_idx_src = order[start_offset + row_idx] / STANDARD_VECTOR_SIZE; |
284 | idx_t vector_idx_src = order[start_offset + row_idx] % STANDARD_VECTOR_SIZE; |
285 | |
286 | auto &src_chunk = src_coll->chunks[chunk_idx_src]; |
287 | Vector &src_vec = src_chunk->data[col_idx]; |
288 | auto source_data = FlatVector::GetData<TYPE>(src_vec); |
289 | auto target_data = FlatVector::GetData<TYPE>(tgt_vec); |
290 | |
291 | if (FlatVector::IsNull(src_vec, vector_idx_src)) { |
292 | FlatVector::SetNull(tgt_vec, row_idx, true); |
293 | } else { |
294 | target_data[row_idx] = source_data[vector_idx_src]; |
295 | } |
296 | } |
297 | } |
298 | |
299 | // TODO: reorder functionality is similar, perhaps merge |
300 | void ChunkCollection::MaterializeSortedChunk(DataChunk &target, idx_t order[], idx_t start_offset) { |
301 | idx_t remaining_data = min((idx_t)STANDARD_VECTOR_SIZE, count - start_offset); |
302 | assert(target.GetTypes() == types); |
303 | |
304 | target.SetCardinality(remaining_data); |
305 | for (idx_t col_idx = 0; col_idx < column_count(); col_idx++) { |
306 | switch (types[col_idx]) { |
307 | case TypeId::BOOL: |
308 | case TypeId::INT8: |
309 | templated_set_values<int8_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data); |
310 | break; |
311 | case TypeId::INT16: |
312 | templated_set_values<int16_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data); |
313 | break; |
314 | case TypeId::INT32: |
315 | templated_set_values<int32_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data); |
316 | break; |
317 | case TypeId::INT64: |
318 | templated_set_values<int64_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data); |
319 | break; |
320 | case TypeId::FLOAT: |
321 | templated_set_values<float>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data); |
322 | break; |
323 | case TypeId::DOUBLE: |
324 | templated_set_values<double>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data); |
325 | break; |
326 | case TypeId::VARCHAR: |
327 | templated_set_values<string_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data); |
328 | break; |
329 | |
330 | case TypeId::LIST: |
331 | case TypeId::STRUCT: { |
332 | for (idx_t row_idx = 0; row_idx < remaining_data; row_idx++) { |
333 | idx_t chunk_idx_src = order[start_offset + row_idx] / STANDARD_VECTOR_SIZE; |
334 | idx_t vector_idx_src = order[start_offset + row_idx] % STANDARD_VECTOR_SIZE; |
335 | |
336 | auto &src_chunk = chunks[chunk_idx_src]; |
337 | Vector &src_vec = src_chunk->data[col_idx]; |
338 | auto &tgt_vec = target.data[col_idx]; |
339 | if (FlatVector::IsNull(src_vec, vector_idx_src)) { |
340 | FlatVector::SetNull(tgt_vec, row_idx, true); |
341 | } else { |
342 | tgt_vec.SetValue(row_idx, src_vec.GetValue(vector_idx_src)); |
343 | } |
344 | } |
345 | } break; |
346 | default: |
347 | throw NotImplementedException("Type is unsupported in MaterializeSortedChunk()" ); |
348 | } |
349 | } |
350 | target.Verify(); |
351 | } |
352 | |
353 | Value ChunkCollection::GetValue(idx_t column, idx_t index) { |
354 | return chunks[LocateChunk(index)]->GetValue(column, index % STANDARD_VECTOR_SIZE); |
355 | } |
356 | |
357 | vector<Value> ChunkCollection::GetRow(idx_t index) { |
358 | vector<Value> values; |
359 | values.resize(column_count()); |
360 | |
361 | for (idx_t p_idx = 0; p_idx < column_count(); p_idx++) { |
362 | values[p_idx] = GetValue(p_idx, index); |
363 | } |
364 | return values; |
365 | } |
366 | |
367 | void ChunkCollection::SetValue(idx_t column, idx_t index, Value value) { |
368 | chunks[LocateChunk(index)]->SetValue(column, index % STANDARD_VECTOR_SIZE, value); |
369 | } |
370 | |
371 | void ChunkCollection::Print() { |
372 | Printer::Print(ToString()); |
373 | } |
374 | |
375 | bool ChunkCollection::Equals(ChunkCollection &other) { |
376 | if (count != other.count) { |
377 | return false; |
378 | } |
379 | if (column_count() != other.column_count()) { |
380 | return false; |
381 | } |
382 | if (types != other.types) { |
383 | return false; |
384 | } |
385 | // if count is equal amount of chunks should be equal |
386 | for (idx_t row_idx = 0; row_idx < count; row_idx++) { |
387 | for (idx_t col_idx = 0; col_idx < column_count(); col_idx++) { |
388 | auto lvalue = GetValue(col_idx, row_idx); |
389 | auto rvalue = other.GetValue(col_idx, row_idx); |
390 | if (!Value::ValuesAreEqual(lvalue, rvalue)) { |
391 | return false; |
392 | } |
393 | } |
394 | } |
395 | return true; |
396 | } |
397 | static void _heapify(ChunkCollection *input, vector<OrderType> &desc, idx_t *heap, idx_t heap_size, |
398 | idx_t current_index) { |
399 | if (current_index >= heap_size) { |
400 | return; |
401 | } |
402 | idx_t left_child_index = current_index * 2 + 1; |
403 | idx_t right_child_index = current_index * 2 + 2; |
404 | idx_t swap_index = current_index; |
405 | |
406 | if (left_child_index < heap_size) { |
407 | swap_index = |
408 | compare_tuple(input, desc, heap[swap_index], heap[left_child_index]) <= 0 ? left_child_index : swap_index; |
409 | } |
410 | |
411 | if (right_child_index < heap_size) { |
412 | swap_index = |
413 | compare_tuple(input, desc, heap[swap_index], heap[right_child_index]) <= 0 ? right_child_index : swap_index; |
414 | } |
415 | |
416 | if (swap_index != current_index) { |
417 | std::swap(heap[current_index], heap[swap_index]); |
418 | _heapify(input, desc, heap, heap_size, swap_index); |
419 | } |
420 | } |
421 | |
422 | static void _heap_create(ChunkCollection *input, vector<OrderType> &desc, idx_t *heap, idx_t heap_size) { |
423 | for (idx_t i = 0; i < heap_size; i++) { |
424 | heap[i] = i; |
425 | } |
426 | |
427 | // build heap |
428 | for (int64_t i = heap_size / 2 - 1; i >= 0; i--) { |
429 | _heapify(input, desc, heap, heap_size, i); |
430 | } |
431 | |
432 | // Run through all the rows. |
433 | for (idx_t i = heap_size; i < input->count; i++) { |
434 | if (compare_tuple(input, desc, i, heap[0]) <= 0) { |
435 | heap[0] = i; |
436 | _heapify(input, desc, heap, heap_size, 0); |
437 | } |
438 | } |
439 | } |
440 | |
441 | void ChunkCollection::Heap(vector<OrderType> &desc, idx_t heap[], idx_t heap_size) { |
442 | assert(heap); |
443 | if (count == 0) |
444 | return; |
445 | |
446 | _heap_create(this, desc, heap, heap_size); |
447 | |
448 | // Heap is ready. Now do a heapsort |
449 | for (int64_t i = heap_size - 1; i >= 0; i--) { |
450 | std::swap(heap[i], heap[0]); |
451 | _heapify(this, desc, heap, i, 0); |
452 | } |
453 | } |
454 | |
455 | idx_t ChunkCollection::MaterializeHeapChunk(DataChunk &target, idx_t order[], idx_t start_offset, idx_t heap_size) { |
456 | idx_t remaining_data = min((idx_t)STANDARD_VECTOR_SIZE, heap_size - start_offset); |
457 | assert(target.GetTypes() == types); |
458 | |
459 | target.SetCardinality(remaining_data); |
460 | for (idx_t col_idx = 0; col_idx < column_count(); col_idx++) { |
461 | switch (types[col_idx]) { |
462 | case TypeId::BOOL: |
463 | case TypeId::INT8: |
464 | templated_set_values<int8_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data); |
465 | break; |
466 | case TypeId::INT16: |
467 | templated_set_values<int16_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data); |
468 | break; |
469 | case TypeId::INT32: |
470 | templated_set_values<int32_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data); |
471 | break; |
472 | case TypeId::INT64: |
473 | templated_set_values<int64_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data); |
474 | break; |
475 | case TypeId::FLOAT: |
476 | templated_set_values<float>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data); |
477 | break; |
478 | case TypeId::DOUBLE: |
479 | templated_set_values<double>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data); |
480 | break; |
481 | case TypeId::VARCHAR: |
482 | templated_set_values<string_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data); |
483 | break; |
484 | // TODO this is ugly and sloooow! |
485 | case TypeId::STRUCT: |
486 | case TypeId::LIST: { |
487 | for (idx_t row_idx = 0; row_idx < remaining_data; row_idx++) { |
488 | idx_t chunk_idx_src = order[start_offset + row_idx] / STANDARD_VECTOR_SIZE; |
489 | idx_t vector_idx_src = order[start_offset + row_idx] % STANDARD_VECTOR_SIZE; |
490 | |
491 | auto &src_chunk = chunks[chunk_idx_src]; |
492 | Vector &src_vec = src_chunk->data[col_idx]; |
493 | auto &tgt_vec = target.data[col_idx]; |
494 | if (FlatVector::IsNull(src_vec, vector_idx_src)) { |
495 | FlatVector::SetNull(tgt_vec, row_idx, true); |
496 | } else { |
497 | tgt_vec.SetValue(row_idx, src_vec.GetValue(vector_idx_src)); |
498 | } |
499 | } |
500 | } break; |
501 | |
502 | default: |
503 | throw NotImplementedException("Type is unsupported in MaterializeHeapChunk()" ); |
504 | } |
505 | } |
506 | target.Verify(); |
507 | return remaining_data; |
508 | } |
509 | |