1#include "duckdb/common/types/chunk_collection.hpp"
2
3#include "duckdb/common/exception.hpp"
4#include "duckdb/common/printer.hpp"
5#include "duckdb/common/value_operations/value_operations.hpp"
6#include "duckdb/common/operator/comparison_operators.hpp"
7#include "duckdb/common/assert.hpp"
8
9#include <algorithm>
10#include <cstring>
11
12using namespace duckdb;
13using namespace std;
14
15void ChunkCollection::Verify() {
16#ifdef DEBUG
17 for (auto &chunk : chunks) {
18 chunk->Verify();
19 }
20#endif
21}
22
23void ChunkCollection::Append(ChunkCollection &other) {
24 for (auto &chunk : other.chunks) {
25 Append(*chunk.get());
26 }
27}
28
29void ChunkCollection::Append(DataChunk &new_chunk) {
30 if (new_chunk.size() == 0) {
31 return;
32 }
33 new_chunk.Verify();
34
35 // we have to ensure that every chunk in the ChunkCollection is completely
36 // filled, otherwise our O(1) lookup in GetValue and SetValue does not work
37 // first fill the latest chunk, if it exists
38 count += new_chunk.size();
39
40 idx_t remaining_data = new_chunk.size();
41 idx_t offset = 0;
42 if (chunks.size() == 0) {
43 // first chunk
44 types = new_chunk.GetTypes();
45 } else {
46 // the types of the new chunk should match the types of the previous one
47 assert(types.size() == new_chunk.column_count());
48 auto new_types = new_chunk.GetTypes();
49 for (idx_t i = 0; i < types.size(); i++) {
50 if (new_types[i] != types[i]) {
51 throw TypeMismatchException(new_types[i], types[i], "Type mismatch when combining rows");
52 }
53 if (types[i] == TypeId::LIST) {
54 for (auto &chunk :
55 chunks) { // need to check all the chunks because they can have only-null list entries
56 auto &chunk_vec = chunk->data[i];
57 auto &new_vec = new_chunk.data[i];
58 if (ListVector::HasEntry(chunk_vec) && ListVector::HasEntry(new_vec)) {
59 auto &chunk_types = ListVector::GetEntry(chunk_vec).types;
60 auto &new_types = ListVector::GetEntry(new_vec).types;
61 if (chunk_types.size() > 0 && new_types.size() > 0 && chunk_types != new_types) {
62 throw TypeMismatchException(chunk_types[0], new_types[i],
63 "Type mismatch when combining lists");
64 }
65 }
66 }
67 }
68 // TODO check structs, too
69 }
70
71 // first append data to the current chunk
72 DataChunk &last_chunk = *chunks.back();
73 idx_t added_data = std::min(remaining_data, (idx_t)(STANDARD_VECTOR_SIZE - last_chunk.size()));
74 if (added_data > 0) {
75 // copy <added_data> elements to the last chunk
76 idx_t old_count = new_chunk.size();
77 new_chunk.SetCardinality(added_data);
78
79 last_chunk.Append(new_chunk);
80 remaining_data -= added_data;
81 // reset the chunk to the old data
82 new_chunk.SetCardinality(old_count);
83 offset = added_data;
84 }
85 }
86
87 if (remaining_data > 0) {
88 // create a new chunk and fill it with the remainder
89 auto chunk = make_unique<DataChunk>();
90 chunk->Initialize(types);
91 new_chunk.Copy(*chunk, offset);
92 chunks.push_back(move(chunk));
93 }
94}
95
96// returns an int similar to a C comparator:
97// -1 if left < right
98// 0 if left == right
99// 1 if left > right
100
101template <class TYPE>
102static int8_t templated_compare_value(Vector &left_vec, Vector &right_vec, idx_t left_idx, idx_t right_idx) {
103 assert(left_vec.type == right_vec.type);
104 auto left_val = FlatVector::GetData<TYPE>(left_vec)[left_idx];
105 auto right_val = FlatVector::GetData<TYPE>(right_vec)[right_idx];
106 if (Equals::Operation<TYPE>(left_val, right_val)) {
107 return 0;
108 }
109 if (LessThan::Operation<TYPE>(left_val, right_val)) {
110 return -1;
111 }
112 return 1;
113}
114
115// return type here is int32 because strcmp() on some platforms returns rather large values
116static int32_t compare_value(Vector &left_vec, Vector &right_vec, idx_t vector_idx_left, idx_t vector_idx_right) {
117 auto left_null = FlatVector::Nullmask(left_vec)[vector_idx_left];
118 auto right_null = FlatVector::Nullmask(right_vec)[vector_idx_right];
119
120 if (left_null && right_null) {
121 return 0;
122 } else if (right_null) {
123 return 1;
124 } else if (left_null) {
125 return -1;
126 }
127
128 switch (left_vec.type) {
129 case TypeId::BOOL:
130 case TypeId::INT8:
131 return templated_compare_value<int8_t>(left_vec, right_vec, vector_idx_left, vector_idx_right);
132 case TypeId::INT16:
133 return templated_compare_value<int16_t>(left_vec, right_vec, vector_idx_left, vector_idx_right);
134 case TypeId::INT32:
135 return templated_compare_value<int32_t>(left_vec, right_vec, vector_idx_left, vector_idx_right);
136 case TypeId::INT64:
137 return templated_compare_value<int64_t>(left_vec, right_vec, vector_idx_left, vector_idx_right);
138 case TypeId::FLOAT:
139 return templated_compare_value<float>(left_vec, right_vec, vector_idx_left, vector_idx_right);
140 case TypeId::DOUBLE:
141 return templated_compare_value<double>(left_vec, right_vec, vector_idx_left, vector_idx_right);
142 case TypeId::VARCHAR:
143 return templated_compare_value<string_t>(left_vec, right_vec, vector_idx_left, vector_idx_right);
144 default:
145 throw NotImplementedException("Type for comparison");
146 }
147 return false;
148}
149
150static int compare_tuple(ChunkCollection *sort_by, vector<OrderType> &desc, idx_t left, idx_t right) {
151 assert(sort_by);
152
153 idx_t chunk_idx_left = left / STANDARD_VECTOR_SIZE;
154 idx_t chunk_idx_right = right / STANDARD_VECTOR_SIZE;
155 idx_t vector_idx_left = left % STANDARD_VECTOR_SIZE;
156 idx_t vector_idx_right = right % STANDARD_VECTOR_SIZE;
157
158 auto &left_chunk = sort_by->chunks[chunk_idx_left];
159 auto &right_chunk = sort_by->chunks[chunk_idx_right];
160
161 for (idx_t col_idx = 0; col_idx < desc.size(); col_idx++) {
162 auto order_type = desc[col_idx];
163
164 Vector &left_vec = left_chunk->data[col_idx];
165 Vector &right_vec = right_chunk->data[col_idx];
166
167 assert(left_vec.vector_type == VectorType::FLAT_VECTOR);
168 assert(right_vec.vector_type == VectorType::FLAT_VECTOR);
169 assert(left_vec.type == right_vec.type);
170
171 auto comp_res = compare_value(left_vec, right_vec, vector_idx_left, vector_idx_right);
172
173 if (comp_res == 0) {
174 continue;
175 }
176 return comp_res < 0 ? (order_type == OrderType::ASCENDING ? -1 : 1)
177 : (order_type == OrderType::ASCENDING ? 1 : -1);
178 }
179 return 0;
180}
181
182static int64_t _quicksort_initial(ChunkCollection *sort_by, vector<OrderType> &desc, idx_t *result) {
183 // select pivot
184 int64_t pivot = 0;
185 int64_t low = 0, high = sort_by->count - 1;
186 // now insert elements
187 for (idx_t i = 1; i < sort_by->count; i++) {
188 if (compare_tuple(sort_by, desc, i, pivot) <= 0) {
189 result[low++] = i;
190 } else {
191 result[high--] = i;
192 }
193 }
194 assert(low == high);
195 result[low] = pivot;
196 return low;
197}
198
199static void _quicksort_inplace(ChunkCollection *sort_by, vector<OrderType> &desc, idx_t *result, int64_t left,
200 int64_t right) {
201 if (left >= right) {
202 return;
203 }
204
205 int64_t middle = left + (right - left) / 2;
206 int64_t pivot = result[middle];
207 // move the mid point value to the front.
208 int64_t i = left + 1;
209 int64_t j = right;
210
211 std::swap(result[middle], result[left]);
212 while (i <= j) {
213 while (i <= j && compare_tuple(sort_by, desc, result[i], pivot) <= 0) {
214 i++;
215 }
216
217 while (i <= j && compare_tuple(sort_by, desc, result[j], pivot) > 0) {
218 j--;
219 }
220
221 if (i < j) {
222 std::swap(result[i], result[j]);
223 }
224 }
225 std::swap(result[i - 1], result[left]);
226 int64_t part = i - 1;
227
228 _quicksort_inplace(sort_by, desc, result, left, part - 1);
229 _quicksort_inplace(sort_by, desc, result, part + 1, right);
230}
231
232void ChunkCollection::Sort(vector<OrderType> &desc, idx_t result[]) {
233 assert(result);
234 if (count == 0)
235 return;
236 // quicksort
237 int64_t part = _quicksort_initial(this, desc, result);
238 _quicksort_inplace(this, desc, result, 0, part);
239 _quicksort_inplace(this, desc, result, part + 1, count - 1);
240}
241
242// FIXME make this more efficient by not using the Value API
243// just use memcpy in the vectors
244// assert that there is no selection list
245void ChunkCollection::Reorder(idx_t order_org[]) {
246 auto order = unique_ptr<idx_t[]>(new idx_t[count]);
247 memcpy(order.get(), order_org, sizeof(idx_t) * count);
248
249 // adapted from https://stackoverflow.com/a/7366196/2652376
250
251 auto val_buf = vector<Value>();
252 val_buf.resize(column_count());
253
254 idx_t j, k;
255 for (idx_t i = 0; i < count; i++) {
256 for (idx_t col_idx = 0; col_idx < column_count(); col_idx++) {
257 val_buf[col_idx] = GetValue(col_idx, i);
258 }
259 j = i;
260 while (true) {
261 k = order[j];
262 order[j] = j;
263 if (k == i) {
264 break;
265 }
266 for (idx_t col_idx = 0; col_idx < column_count(); col_idx++) {
267 SetValue(col_idx, j, GetValue(col_idx, k));
268 }
269 j = k;
270 }
271 for (idx_t col_idx = 0; col_idx < column_count(); col_idx++) {
272 SetValue(col_idx, j, val_buf[col_idx]);
273 }
274 }
275}
276
277template <class TYPE>
278static void templated_set_values(ChunkCollection *src_coll, Vector &tgt_vec, idx_t order[], idx_t col_idx,
279 idx_t start_offset, idx_t remaining_data) {
280 assert(src_coll);
281
282 for (idx_t row_idx = 0; row_idx < remaining_data; row_idx++) {
283 idx_t chunk_idx_src = order[start_offset + row_idx] / STANDARD_VECTOR_SIZE;
284 idx_t vector_idx_src = order[start_offset + row_idx] % STANDARD_VECTOR_SIZE;
285
286 auto &src_chunk = src_coll->chunks[chunk_idx_src];
287 Vector &src_vec = src_chunk->data[col_idx];
288 auto source_data = FlatVector::GetData<TYPE>(src_vec);
289 auto target_data = FlatVector::GetData<TYPE>(tgt_vec);
290
291 if (FlatVector::IsNull(src_vec, vector_idx_src)) {
292 FlatVector::SetNull(tgt_vec, row_idx, true);
293 } else {
294 target_data[row_idx] = source_data[vector_idx_src];
295 }
296 }
297}
298
299// TODO: reorder functionality is similar, perhaps merge
300void ChunkCollection::MaterializeSortedChunk(DataChunk &target, idx_t order[], idx_t start_offset) {
301 idx_t remaining_data = min((idx_t)STANDARD_VECTOR_SIZE, count - start_offset);
302 assert(target.GetTypes() == types);
303
304 target.SetCardinality(remaining_data);
305 for (idx_t col_idx = 0; col_idx < column_count(); col_idx++) {
306 switch (types[col_idx]) {
307 case TypeId::BOOL:
308 case TypeId::INT8:
309 templated_set_values<int8_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data);
310 break;
311 case TypeId::INT16:
312 templated_set_values<int16_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data);
313 break;
314 case TypeId::INT32:
315 templated_set_values<int32_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data);
316 break;
317 case TypeId::INT64:
318 templated_set_values<int64_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data);
319 break;
320 case TypeId::FLOAT:
321 templated_set_values<float>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data);
322 break;
323 case TypeId::DOUBLE:
324 templated_set_values<double>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data);
325 break;
326 case TypeId::VARCHAR:
327 templated_set_values<string_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data);
328 break;
329
330 case TypeId::LIST:
331 case TypeId::STRUCT: {
332 for (idx_t row_idx = 0; row_idx < remaining_data; row_idx++) {
333 idx_t chunk_idx_src = order[start_offset + row_idx] / STANDARD_VECTOR_SIZE;
334 idx_t vector_idx_src = order[start_offset + row_idx] % STANDARD_VECTOR_SIZE;
335
336 auto &src_chunk = chunks[chunk_idx_src];
337 Vector &src_vec = src_chunk->data[col_idx];
338 auto &tgt_vec = target.data[col_idx];
339 if (FlatVector::IsNull(src_vec, vector_idx_src)) {
340 FlatVector::SetNull(tgt_vec, row_idx, true);
341 } else {
342 tgt_vec.SetValue(row_idx, src_vec.GetValue(vector_idx_src));
343 }
344 }
345 } break;
346 default:
347 throw NotImplementedException("Type is unsupported in MaterializeSortedChunk()");
348 }
349 }
350 target.Verify();
351}
352
353Value ChunkCollection::GetValue(idx_t column, idx_t index) {
354 return chunks[LocateChunk(index)]->GetValue(column, index % STANDARD_VECTOR_SIZE);
355}
356
357vector<Value> ChunkCollection::GetRow(idx_t index) {
358 vector<Value> values;
359 values.resize(column_count());
360
361 for (idx_t p_idx = 0; p_idx < column_count(); p_idx++) {
362 values[p_idx] = GetValue(p_idx, index);
363 }
364 return values;
365}
366
367void ChunkCollection::SetValue(idx_t column, idx_t index, Value value) {
368 chunks[LocateChunk(index)]->SetValue(column, index % STANDARD_VECTOR_SIZE, value);
369}
370
371void ChunkCollection::Print() {
372 Printer::Print(ToString());
373}
374
375bool ChunkCollection::Equals(ChunkCollection &other) {
376 if (count != other.count) {
377 return false;
378 }
379 if (column_count() != other.column_count()) {
380 return false;
381 }
382 if (types != other.types) {
383 return false;
384 }
385 // if count is equal amount of chunks should be equal
386 for (idx_t row_idx = 0; row_idx < count; row_idx++) {
387 for (idx_t col_idx = 0; col_idx < column_count(); col_idx++) {
388 auto lvalue = GetValue(col_idx, row_idx);
389 auto rvalue = other.GetValue(col_idx, row_idx);
390 if (!Value::ValuesAreEqual(lvalue, rvalue)) {
391 return false;
392 }
393 }
394 }
395 return true;
396}
397static void _heapify(ChunkCollection *input, vector<OrderType> &desc, idx_t *heap, idx_t heap_size,
398 idx_t current_index) {
399 if (current_index >= heap_size) {
400 return;
401 }
402 idx_t left_child_index = current_index * 2 + 1;
403 idx_t right_child_index = current_index * 2 + 2;
404 idx_t swap_index = current_index;
405
406 if (left_child_index < heap_size) {
407 swap_index =
408 compare_tuple(input, desc, heap[swap_index], heap[left_child_index]) <= 0 ? left_child_index : swap_index;
409 }
410
411 if (right_child_index < heap_size) {
412 swap_index =
413 compare_tuple(input, desc, heap[swap_index], heap[right_child_index]) <= 0 ? right_child_index : swap_index;
414 }
415
416 if (swap_index != current_index) {
417 std::swap(heap[current_index], heap[swap_index]);
418 _heapify(input, desc, heap, heap_size, swap_index);
419 }
420}
421
422static void _heap_create(ChunkCollection *input, vector<OrderType> &desc, idx_t *heap, idx_t heap_size) {
423 for (idx_t i = 0; i < heap_size; i++) {
424 heap[i] = i;
425 }
426
427 // build heap
428 for (int64_t i = heap_size / 2 - 1; i >= 0; i--) {
429 _heapify(input, desc, heap, heap_size, i);
430 }
431
432 // Run through all the rows.
433 for (idx_t i = heap_size; i < input->count; i++) {
434 if (compare_tuple(input, desc, i, heap[0]) <= 0) {
435 heap[0] = i;
436 _heapify(input, desc, heap, heap_size, 0);
437 }
438 }
439}
440
441void ChunkCollection::Heap(vector<OrderType> &desc, idx_t heap[], idx_t heap_size) {
442 assert(heap);
443 if (count == 0)
444 return;
445
446 _heap_create(this, desc, heap, heap_size);
447
448 // Heap is ready. Now do a heapsort
449 for (int64_t i = heap_size - 1; i >= 0; i--) {
450 std::swap(heap[i], heap[0]);
451 _heapify(this, desc, heap, i, 0);
452 }
453}
454
455idx_t ChunkCollection::MaterializeHeapChunk(DataChunk &target, idx_t order[], idx_t start_offset, idx_t heap_size) {
456 idx_t remaining_data = min((idx_t)STANDARD_VECTOR_SIZE, heap_size - start_offset);
457 assert(target.GetTypes() == types);
458
459 target.SetCardinality(remaining_data);
460 for (idx_t col_idx = 0; col_idx < column_count(); col_idx++) {
461 switch (types[col_idx]) {
462 case TypeId::BOOL:
463 case TypeId::INT8:
464 templated_set_values<int8_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data);
465 break;
466 case TypeId::INT16:
467 templated_set_values<int16_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data);
468 break;
469 case TypeId::INT32:
470 templated_set_values<int32_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data);
471 break;
472 case TypeId::INT64:
473 templated_set_values<int64_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data);
474 break;
475 case TypeId::FLOAT:
476 templated_set_values<float>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data);
477 break;
478 case TypeId::DOUBLE:
479 templated_set_values<double>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data);
480 break;
481 case TypeId::VARCHAR:
482 templated_set_values<string_t>(this, target.data[col_idx], order, col_idx, start_offset, remaining_data);
483 break;
484 // TODO this is ugly and sloooow!
485 case TypeId::STRUCT:
486 case TypeId::LIST: {
487 for (idx_t row_idx = 0; row_idx < remaining_data; row_idx++) {
488 idx_t chunk_idx_src = order[start_offset + row_idx] / STANDARD_VECTOR_SIZE;
489 idx_t vector_idx_src = order[start_offset + row_idx] % STANDARD_VECTOR_SIZE;
490
491 auto &src_chunk = chunks[chunk_idx_src];
492 Vector &src_vec = src_chunk->data[col_idx];
493 auto &tgt_vec = target.data[col_idx];
494 if (FlatVector::IsNull(src_vec, vector_idx_src)) {
495 FlatVector::SetNull(tgt_vec, row_idx, true);
496 } else {
497 tgt_vec.SetValue(row_idx, src_vec.GetValue(vector_idx_src));
498 }
499 }
500 } break;
501
502 default:
503 throw NotImplementedException("Type is unsupported in MaterializeHeapChunk()");
504 }
505 }
506 target.Verify();
507 return remaining_data;
508}
509