1#include "duckdb/storage/string_segment.hpp"
2#include "duckdb/storage/buffer_manager.hpp"
3#include "duckdb/storage/numeric_segment.hpp"
4#include "duckdb/transaction/update_info.hpp"
5#include "duckdb/common/vector_operations/vector_operations.hpp"
6#include "duckdb/storage/data_table.hpp"
7#include "duckdb/common/operator/comparison_operators.hpp"
8
9using namespace duckdb;
10using namespace std;
11
12StringSegment::StringSegment(BufferManager &manager, idx_t row_start, block_id_t block)
13 : UncompressedSegment(manager, TypeId::VARCHAR, row_start) {
14 this->max_vector_count = 0;
15 this->dictionary_offset = 0;
16 // the vector_size is given in the size of the dictionary offsets
17 this->vector_size = STANDARD_VECTOR_SIZE * sizeof(int32_t) + sizeof(nullmask_t);
18 this->string_updates = nullptr;
19
20 this->block_id = block;
21 if (block_id == INVALID_BLOCK) {
22 // start off with an empty string segment: allocate space for it
23 auto handle = manager.Allocate(Storage::BLOCK_ALLOC_SIZE);
24 this->block_id = handle->block_id;
25
26 ExpandStringSegment(handle->node->buffer);
27 }
28}
29
30StringSegment::~StringSegment() {
31 while (head) {
32 manager.DestroyBuffer(head->block_id);
33 head = move(head->next);
34 }
35}
36
37void StringSegment::ExpandStringSegment(data_ptr_t baseptr) {
38 // clear the nullmask for this vector
39 auto mask = (nullmask_t *)(baseptr + (max_vector_count * vector_size));
40 mask->reset();
41
42 max_vector_count++;
43 if (versions) {
44 auto new_versions = unique_ptr<UpdateInfo *[]>(new UpdateInfo *[max_vector_count]);
45 memcpy(new_versions.get(), versions.get(), (max_vector_count - 1) * sizeof(UpdateInfo *));
46 new_versions[max_vector_count - 1] = nullptr;
47 versions = move(new_versions);
48 }
49
50 if (string_updates) {
51 auto new_string_updates = unique_ptr<string_update_info_t[]>(new string_update_info_t[max_vector_count]);
52 for (idx_t i = 0; i < max_vector_count - 1; i++) {
53 new_string_updates[i] = move(string_updates[i]);
54 }
55 new_string_updates[max_vector_count - 1] = 0;
56 string_updates = move(new_string_updates);
57 }
58}
59
60//===--------------------------------------------------------------------===//
61// Scan
62//===--------------------------------------------------------------------===//
63void StringSegment::InitializeScan(ColumnScanState &state) {
64 // pin the primary buffer
65 state.primary_handle = manager.Pin(block_id);
66}
67
68//===--------------------------------------------------------------------===//
69// Filter base data
70//===--------------------------------------------------------------------===//
71void StringSegment::read_string(string_t *result_data, buffer_handle_set_t &handles, data_ptr_t baseptr,
72 int32_t *dict_offset, idx_t src_idx, idx_t res_idx, idx_t &update_idx,
73 size_t vector_index) {
74 if (string_updates && string_updates[vector_index]) {
75 auto &info = *string_updates[vector_index];
76 while (info.ids[update_idx] < src_idx) {
77 //! We need to catch the update_idx up to the src_idx
78 update_idx++;
79 }
80 if (update_idx < info.count && info.ids[update_idx] == src_idx) {
81 result_data[res_idx] = ReadString(handles, info.block_ids[update_idx], info.offsets[update_idx]);
82 } else {
83 result_data[res_idx] = FetchStringFromDict(handles, baseptr, dict_offset[src_idx]);
84 }
85 } else {
86 result_data[res_idx] = FetchStringFromDict(handles, baseptr, dict_offset[src_idx]);
87 }
88}
89
90void StringSegment::Select(ColumnScanState &state, Vector &result, SelectionVector &sel, idx_t &approved_tuple_count,
91 vector<TableFilter> &tableFilter) {
92 auto vector_index = state.vector_index;
93 assert(vector_index < max_vector_count);
94 assert(vector_index * STANDARD_VECTOR_SIZE <= tuple_count);
95
96 auto handle = state.primary_handle.get();
97 state.handles.clear();
98 auto baseptr = handle->node->buffer;
99 // fetch the data from the base segment
100 auto base = baseptr + state.vector_index * vector_size;
101 auto base_data = (int32_t *)(base + sizeof(nullmask_t));
102 auto base_nullmask = (nullmask_t *)base;
103
104 if (tableFilter.size() == 1) {
105 switch (tableFilter[0].comparison_type) {
106 case ExpressionType::COMPARE_EQUAL: {
107 Select_String<Equals>(state.handles, result, baseptr, base_data, sel, tableFilter[0].constant.str_value,
108 approved_tuple_count, base_nullmask, vector_index);
109 break;
110 }
111 case ExpressionType::COMPARE_LESSTHAN: {
112 Select_String<LessThan>(state.handles, result, baseptr, base_data, sel, tableFilter[0].constant.str_value,
113 approved_tuple_count, base_nullmask, vector_index);
114 break;
115 }
116 case ExpressionType::COMPARE_GREATERTHAN: {
117 Select_String<GreaterThan>(state.handles, result, baseptr, base_data, sel,
118 tableFilter[0].constant.str_value, approved_tuple_count, base_nullmask,
119 vector_index);
120 break;
121 }
122 case ExpressionType::COMPARE_LESSTHANOREQUALTO: {
123 Select_String<LessThanEquals>(state.handles, result, baseptr, base_data, sel,
124 tableFilter[0].constant.str_value, approved_tuple_count, base_nullmask,
125 vector_index);
126 break;
127 }
128 case ExpressionType::COMPARE_GREATERTHANOREQUALTO: {
129 Select_String<GreaterThanEquals>(state.handles, result, baseptr, base_data, sel,
130 tableFilter[0].constant.str_value, approved_tuple_count, base_nullmask,
131 vector_index);
132
133 break;
134 }
135 default:
136 throw NotImplementedException("Unknown comparison type for filter pushed down to table!");
137 }
138 } else {
139 bool isFirstGreater = tableFilter[0].comparison_type == ExpressionType::COMPARE_GREATERTHAN ||
140 tableFilter[0].comparison_type == ExpressionType::COMPARE_GREATERTHANOREQUALTO;
141 auto less = isFirstGreater?tableFilter[1]:tableFilter[0];
142 auto greater = isFirstGreater?tableFilter[0]:tableFilter[1];
143 if (greater.comparison_type == ExpressionType::COMPARE_GREATERTHAN) {
144 if (less.comparison_type == ExpressionType::COMPARE_LESSTHAN) {
145 Select_String_Between<GreaterThan, LessThan>(
146 state.handles, result, baseptr, base_data, sel, greater.constant.str_value,
147 less.constant.str_value, approved_tuple_count, base_nullmask, vector_index);
148 } else {
149 Select_String_Between<GreaterThan, LessThanEquals>(
150 state.handles, result, baseptr, base_data, sel, greater.constant.str_value,
151 less.constant.str_value, approved_tuple_count, base_nullmask, vector_index);
152 }
153 } else {
154 if (less.comparison_type == ExpressionType::COMPARE_LESSTHAN) {
155 Select_String_Between<GreaterThanEquals, LessThan>(
156 state.handles, result, baseptr, base_data, sel, greater.constant.str_value,
157 less.constant.str_value, approved_tuple_count, base_nullmask, vector_index);
158 } else {
159 Select_String_Between<GreaterThanEquals, LessThanEquals>(
160 state.handles, result, baseptr, base_data, sel, greater.constant.str_value,
161 less.constant.str_value, approved_tuple_count, base_nullmask, vector_index);
162 }
163 }
164 }
165}
166
167//===--------------------------------------------------------------------===//
168// Fetch base data
169//===--------------------------------------------------------------------===//
170void StringSegment::FetchBaseData(ColumnScanState &state, idx_t vector_index, Vector &result) {
171 // clear any previously locked buffers and get the primary buffer handle
172 auto handle = state.primary_handle.get();
173 state.handles.clear();
174
175 // fetch the data from the base segment
176 FetchBaseData(state, handle->node->buffer, vector_index, result, GetVectorCount(vector_index));
177}
178
179void StringSegment::FetchBaseData(ColumnScanState &state, data_ptr_t baseptr, idx_t vector_index, Vector &result,
180 idx_t count) {
181 auto base = baseptr + vector_index * vector_size;
182
183 auto &base_nullmask = *((nullmask_t *)base);
184 auto base_data = (int32_t *)(base + sizeof(nullmask_t));
185 auto result_data = FlatVector::GetData<string_t>(result);
186
187 if (string_updates && string_updates[vector_index]) {
188 // there are updates: merge them in
189 auto &info = *string_updates[vector_index];
190 idx_t update_idx = 0;
191 for (idx_t i = 0; i < count; i++) {
192 if (update_idx < info.count && info.ids[update_idx] == i) {
193 // use update info
194 result_data[i] = ReadString(state.handles, info.block_ids[update_idx], info.offsets[update_idx]);
195 update_idx++;
196 } else {
197 // use base table info
198 result_data[i] = FetchStringFromDict(state.handles, baseptr, base_data[i]);
199 }
200 }
201 } else {
202 // no updates: fetch only from the string dictionary
203 for (idx_t i = 0; i < count; i++) {
204 result_data[i] = FetchStringFromDict(state.handles, baseptr, base_data[i]);
205 }
206 }
207 FlatVector::SetNullmask(result, base_nullmask);
208}
209
210void StringSegment::FilterFetchBaseData(ColumnScanState &state, Vector &result, SelectionVector &sel,
211 idx_t &approved_tuple_count) {
212 // clear any previously locked buffers and get the primary buffer handle
213 auto handle = state.primary_handle.get();
214 state.handles.clear();
215 auto baseptr = handle->node->buffer;
216 // fetch the data from the base segment
217 auto base = baseptr + state.vector_index * vector_size;
218 auto &base_nullmask = *((nullmask_t *)base);
219 auto base_data = (int32_t *)(base + sizeof(nullmask_t));
220 result.vector_type = VectorType::FLAT_VECTOR;
221 auto result_data = FlatVector::GetData<string_t>(result);
222 nullmask_t result_nullmask;
223 idx_t update_idx = 0;
224 if (base_nullmask.any()) {
225 for (idx_t i = 0; i < approved_tuple_count; i++) {
226 idx_t src_idx = sel.get_index(i);
227 if (base_nullmask[src_idx]) {
228 result_nullmask.set(i, true);
229 read_string(result_data, state.handles, baseptr, base_data, src_idx, i, update_idx, state.vector_index);
230 } else {
231 read_string(result_data, state.handles, baseptr, base_data, src_idx, i, update_idx, state.vector_index);
232 }
233 }
234 } else {
235 for (idx_t i = 0; i < approved_tuple_count; i++) {
236 idx_t src_idx = sel.get_index(i);
237 read_string(result_data, state.handles, baseptr, base_data, src_idx, i, update_idx, state.vector_index);
238 }
239 }
240 FlatVector::SetNullmask(result, result_nullmask);
241}
242
243//===--------------------------------------------------------------------===//
244// Fetch update data
245//===--------------------------------------------------------------------===//
246void StringSegment::FetchUpdateData(ColumnScanState &state, Transaction &transaction, UpdateInfo *info,
247 Vector &result) {
248 // fetch data from updates
249 auto handle = state.primary_handle.get();
250
251 auto result_data = FlatVector::GetData<string_t>(result);
252 auto &result_mask = FlatVector::Nullmask(result);
253 UpdateInfo::UpdatesForTransaction(info, transaction, [&](UpdateInfo *current) {
254 auto info_data = (string_location_t *)current->tuple_data;
255 for (idx_t i = 0; i < current->N; i++) {
256 auto string = FetchString(state.handles, handle->node->buffer, info_data[i]);
257 result_data[current->tuples[i]] = string;
258 result_mask[current->tuples[i]] = current->nullmask[current->tuples[i]];
259 }
260 });
261}
262
263//===--------------------------------------------------------------------===//
264// Fetch strings
265//===--------------------------------------------------------------------===//
266void StringSegment::FetchStringLocations(data_ptr_t baseptr, row_t *ids, idx_t vector_index, idx_t vector_offset,
267 idx_t count, string_location_t result[]) {
268 auto base = baseptr + vector_index * vector_size;
269 auto base_data = (int32_t *)(base + sizeof(nullmask_t));
270
271 if (string_updates && string_updates[vector_index]) {
272 // there are updates: merge them in
273 auto &info = *string_updates[vector_index];
274 idx_t update_idx = 0;
275 for (idx_t i = 0; i < count; i++) {
276 auto id = ids[i] - vector_offset;
277 while (update_idx < info.count && info.ids[update_idx] < id) {
278 update_idx++;
279 }
280 if (update_idx < info.count && info.ids[update_idx] == id) {
281 // use update info
282 result[i].block_id = info.block_ids[update_idx];
283 result[i].offset = info.offsets[update_idx];
284 update_idx++;
285 } else {
286 // use base table info
287 result[i] = FetchStringLocation(baseptr, base_data[id]);
288 }
289 }
290 } else {
291 // no updates: fetch strings from base vector
292 for (idx_t i = 0; i < count; i++) {
293 auto id = ids[i] - vector_offset;
294 result[i] = FetchStringLocation(baseptr, base_data[id]);
295 }
296 }
297}
298
299string_location_t StringSegment::FetchStringLocation(data_ptr_t baseptr, int32_t dict_offset) {
300 if (dict_offset == 0) {
301 return string_location_t(INVALID_BLOCK, 0);
302 }
303 // look up result in dictionary
304 auto dict_end = baseptr + Storage::BLOCK_SIZE;
305 auto dict_pos = dict_end - dict_offset;
306 auto string_length = *((uint16_t *)dict_pos);
307 string_location_t result;
308 if (string_length == BIG_STRING_MARKER) {
309 ReadStringMarker(dict_pos, result.block_id, result.offset);
310 } else {
311 result.block_id = INVALID_BLOCK;
312 result.offset = dict_offset;
313 }
314 return result;
315}
316
317string_t StringSegment::FetchStringFromDict(buffer_handle_set_t &handles, data_ptr_t baseptr, int32_t dict_offset) {
318 // fetch base data
319 assert(dict_offset <= Storage::BLOCK_SIZE);
320 string_location_t location = FetchStringLocation(baseptr, dict_offset);
321 return FetchString(handles, baseptr, location);
322}
323
324string_t StringSegment::FetchString(buffer_handle_set_t &handles, data_ptr_t baseptr, string_location_t location) {
325 if (location.block_id != INVALID_BLOCK) {
326 // big string marker: read from separate block
327 return ReadString(handles, location.block_id, location.offset);
328 } else {
329 if (location.offset == 0) {
330 return string_t(nullptr, 0);
331 }
332 // normal string: read string from this block
333 auto dict_end = baseptr + Storage::BLOCK_SIZE;
334 auto dict_pos = dict_end - location.offset;
335 auto string_length = *((uint16_t *)dict_pos);
336
337 auto str_ptr = (char *)(dict_pos + sizeof(uint16_t));
338 return string_t(str_ptr, string_length);
339 }
340}
341
342void StringSegment::FetchRow(ColumnFetchState &state, Transaction &transaction, row_t row_id, Vector &result,
343 idx_t result_idx) {
344 auto read_lock = lock.GetSharedLock();
345
346 idx_t vector_index = row_id / STANDARD_VECTOR_SIZE;
347 idx_t id_in_vector = row_id - vector_index * STANDARD_VECTOR_SIZE;
348 assert(vector_index < max_vector_count);
349
350 data_ptr_t baseptr;
351
352 // fetch a single row from the string segment
353 // first pin the main buffer if it is not already pinned
354 auto entry = state.handles.find(block_id);
355 if (entry == state.handles.end()) {
356 // not pinned yet: pin it
357 auto handle = manager.Pin(block_id);
358 baseptr = handle->node->buffer;
359 state.handles[block_id] = move(handle);
360 } else {
361 // already pinned: use the pinned handle
362 baseptr = entry->second->node->buffer;
363 }
364
365 auto base = baseptr + vector_index * vector_size;
366 auto &base_nullmask = *((nullmask_t *)base);
367 auto base_data = (int32_t *)(base + sizeof(nullmask_t));
368 auto result_data = FlatVector::GetData<string_t>(result);
369 auto &result_mask = FlatVector::Nullmask(result);
370
371 bool found_data = false;
372 // first see if there is any updated version of this tuple we must fetch
373 if (versions && versions[vector_index]) {
374 UpdateInfo::UpdatesForTransaction(versions[vector_index], transaction, [&](UpdateInfo *current) {
375 auto info_data = (string_location_t *)current->tuple_data;
376 // loop over the tuples in this UpdateInfo
377 for (idx_t i = 0; i < current->N; i++) {
378 if (current->tuples[i] == row_id) {
379 // found the relevant tuple
380 found_data = true;
381 result_data[result_idx] = FetchString(state.handles, baseptr, info_data[i]);
382 result_mask[result_idx] = current->nullmask[current->tuples[i]];
383 break;
384 } else if (current->tuples[i] > row_id) {
385 // tuples are sorted: so if the current tuple is > row_id we will not find it anymore
386 break;
387 }
388 }
389 });
390 }
391 if (!found_data && string_updates && string_updates[vector_index]) {
392 // there are updates: check if we should use them
393 auto &info = *string_updates[vector_index];
394 for (idx_t i = 0; i < info.count; i++) {
395 if (info.ids[i] == id_in_vector) {
396 // use the update
397 result_data[result_idx] = ReadString(state.handles, info.block_ids[i], info.offsets[i]);
398 found_data = true;
399 break;
400 } else if (info.ids[i] > id_in_vector) {
401 break;
402 }
403 }
404 }
405 if (!found_data) {
406 // no version was found yet: fetch base table version
407 result_data[result_idx] = FetchStringFromDict(state.handles, baseptr, base_data[id_in_vector]);
408 }
409 result_mask[result_idx] = base_nullmask[id_in_vector];
410}
411
412//===--------------------------------------------------------------------===//
413// Append
414//===--------------------------------------------------------------------===//
415idx_t StringSegment::Append(SegmentStatistics &stats, Vector &data, idx_t offset, idx_t count) {
416 assert(data.type == TypeId::VARCHAR);
417 auto handle = manager.Pin(block_id);
418 idx_t initial_count = tuple_count;
419 while (count > 0) {
420 // get the vector index of the vector to append to and see how many tuples we can append to that vector
421 idx_t vector_index = tuple_count / STANDARD_VECTOR_SIZE;
422 if (vector_index == max_vector_count) {
423 // we are at the maximum vector, check if there is space to increase the maximum vector count
424 // as a heuristic, we only allow another vector to be added if we have at least 32 bytes per string
425 // remaining (32KB out of a 256KB block, or around 12% empty)
426 if (RemainingSpace() >= STANDARD_VECTOR_SIZE * 32) {
427 // we have enough remaining space to add another vector
428 ExpandStringSegment(handle->node->buffer);
429 } else {
430 break;
431 }
432 }
433 idx_t current_tuple_count = tuple_count - vector_index * STANDARD_VECTOR_SIZE;
434 idx_t append_count = std::min(STANDARD_VECTOR_SIZE - current_tuple_count, count);
435
436 // now perform the actual append
437 AppendData(stats, handle->node->buffer + vector_size * vector_index, handle->node->buffer + Storage::BLOCK_SIZE,
438 current_tuple_count, data, offset, append_count);
439
440 count -= append_count;
441 offset += append_count;
442 tuple_count += append_count;
443 }
444 return tuple_count - initial_count;
445}
446
447static void update_min_max(string value, char *__restrict min, char *__restrict max) {
448 //! we can only fit 8 bytes, so we might need to trim our string
449 size_t value_size = value.size() > 7 ? 7 : value.size();
450 //! This marks the min/max was not initialized
451 char marker = '1';
452 if (min[0] == '\0' && min[1] == marker && max[0] == '\0' && max[1] == marker) {
453 size_t min_end = value.copy(min, value_size);
454 size_t max_end = value.copy(max, value_size);
455 for (size_t i = min_end; i < 8; i++) {
456 min[i] = '\0';
457 }
458 for (size_t i = max_end; i < 8; i++) {
459 max[i] = '\0';
460 }
461 }
462 if (strcmp(value.data(), min) < 0) {
463 size_t min_end = value.copy(min, value_size);
464 for (size_t i = min_end; i < 8; i++) {
465 min[i] = '\0';
466 }
467 }
468 if (strcmp(value.data(), max) > 0) {
469 size_t max_end = value.copy(max, value_size);
470 for (size_t i = max_end; i < 8; i++) {
471 max[i] = '\0';
472 }
473 }
474}
475
476void StringSegment::AppendData(SegmentStatistics &stats, data_ptr_t target, data_ptr_t end, idx_t target_offset,
477 Vector &source, idx_t offset, idx_t count) {
478 VectorData adata;
479 source.Orrify(count, adata);
480
481 auto sdata = (string_t *)adata.data;
482 auto &result_nullmask = *((nullmask_t *)target);
483 auto result_data = (int32_t *)(target + sizeof(nullmask_t));
484 auto min = (char *)stats.minimum.get();
485 auto max = (char *)stats.maximum.get();
486
487 idx_t remaining_strings = STANDARD_VECTOR_SIZE - (this->tuple_count % STANDARD_VECTOR_SIZE);
488 for (idx_t i = 0; i < count; i++) {
489 auto source_idx = adata.sel->get_index(offset + i);
490 auto target_idx = target_offset + i;
491 if ((*adata.nullmask)[source_idx]) {
492 // null value is stored as -1
493 result_data[target_idx] = 0;
494 result_nullmask[target_idx] = true;
495 stats.has_null = true;
496 } else {
497 assert(dictionary_offset < Storage::BLOCK_SIZE);
498 // non-null value, check if we can fit it within the block
499 idx_t string_length = sdata[source_idx].GetSize();
500 idx_t total_length = string_length + 1 + sizeof(uint16_t);
501
502 if (string_length > stats.max_string_length) {
503 stats.max_string_length = string_length;
504 }
505 // determine whether or not the string needs to be stored in an overflow block
506 // we never place small strings in the overflow blocks: the pointer would take more space than the
507 // string itself we always place big strings (>= STRING_BLOCK_LIMIT) in the overflow blocks we also have
508 // to always leave enough room for BIG_STRING_MARKER_SIZE for each of the remaining strings
509 if (total_length > BIG_STRING_MARKER_BASE_SIZE &&
510 (total_length >= STRING_BLOCK_LIMIT ||
511 total_length + (remaining_strings * BIG_STRING_MARKER_SIZE) > RemainingSpace())) {
512 assert(RemainingSpace() >= BIG_STRING_MARKER_SIZE);
513 // string is too big for block: write to overflow blocks
514 block_id_t block;
515 int32_t offset;
516 //! Update min/max of column segment
517 update_min_max(sdata[source_idx].GetData(), min, max);
518 // write the string into the current string block
519 WriteString(sdata[source_idx], block, offset);
520 dictionary_offset += BIG_STRING_MARKER_SIZE;
521 auto dict_pos = end - dictionary_offset;
522
523 // write a big string marker into the dictionary
524 WriteStringMarker(dict_pos, block, offset);
525
526 stats.has_overflow_strings = true;
527 } else {
528 // string fits in block, append to dictionary and increment dictionary position
529 assert(string_length < std::numeric_limits<uint16_t>::max());
530 dictionary_offset += total_length;
531 auto dict_pos = end - dictionary_offset;
532 //! Update min/max of column segment
533 update_min_max(sdata[source_idx].GetData(), min, max);
534 // first write the length as u16
535 uint16_t string_length_u16 = string_length;
536 memcpy(dict_pos, &string_length_u16, sizeof(uint16_t));
537 // now write the actual string data into the dictionary
538 memcpy(dict_pos + sizeof(uint16_t), sdata[source_idx].GetData(), string_length + 1);
539 }
540 // place the dictionary offset into the set of vectors
541 assert(dictionary_offset <= Storage::BLOCK_SIZE);
542 result_data[target_idx] = dictionary_offset;
543 }
544 remaining_strings--;
545 }
546}
547
548void StringSegment::WriteString(string_t string, block_id_t &result_block, int32_t &result_offset) {
549 assert(strlen(string.GetData()) == string.GetSize());
550 if (overflow_writer) {
551 // overflow writer is set: write string there
552 overflow_writer->WriteString(string, result_block, result_offset);
553 } else {
554 // default overflow behavior: use in-memory buffer to store the overflow string
555 WriteStringMemory(string, result_block, result_offset);
556 }
557}
558
559void StringSegment::WriteStringMemory(string_t string, block_id_t &result_block, int32_t &result_offset) {
560 uint32_t total_length = string.GetSize() + 1 + sizeof(uint32_t);
561 unique_ptr<BufferHandle> handle;
562 // check if the string fits in the current block
563 if (!head || head->offset + total_length >= head->size) {
564 // string does not fit, allocate space for it
565 // create a new string block
566 idx_t alloc_size = std::max((idx_t)total_length, (idx_t)Storage::BLOCK_ALLOC_SIZE);
567 auto new_block = make_unique<StringBlock>();
568 new_block->offset = 0;
569 new_block->size = alloc_size;
570 // allocate an in-memory buffer for it
571 handle = manager.Allocate(alloc_size);
572 new_block->block_id = handle->block_id;
573 new_block->next = move(head);
574 head = move(new_block);
575 } else {
576 // string fits, copy it into the current block
577 handle = manager.Pin(head->block_id);
578 }
579
580 result_block = head->block_id;
581 result_offset = head->offset;
582
583 // copy the string and the length there
584 auto ptr = handle->node->buffer + head->offset;
585 memcpy(ptr, &string.length, sizeof(uint32_t));
586 ptr += sizeof(uint32_t);
587 memcpy(ptr, string.GetData(), string.length + 1);
588 head->offset += total_length;
589}
590
591string_t StringSegment::ReadString(buffer_handle_set_t &handles, block_id_t block, int32_t offset) {
592 assert(offset < Storage::BLOCK_SIZE);
593 if (block == INVALID_BLOCK) {
594 return string_t(nullptr, 0);
595 }
596 if (block < MAXIMUM_BLOCK) {
597 // read the overflow string from disk
598 // pin the initial handle and read the length
599 auto handle = manager.Pin(block);
600 uint32_t length = *((uint32_t *)(handle->node->buffer + offset));
601 uint32_t remaining = length + 1;
602 offset += sizeof(uint32_t);
603
604 // allocate a buffer to store the string
605 auto alloc_size = std::max((idx_t)Storage::BLOCK_ALLOC_SIZE, (idx_t)length + 1 + sizeof(uint32_t));
606 auto target_handle = manager.Allocate(alloc_size, true);
607 auto target_ptr = target_handle->node->buffer;
608 // write the length in this block as well
609 *((uint32_t *)target_ptr) = length;
610 target_ptr += sizeof(uint32_t);
611 // now append the string to the single buffer
612 while (remaining > 0) {
613 idx_t to_write = std::min((idx_t)remaining, (idx_t)(Storage::BLOCK_SIZE - sizeof(block_id_t) - offset));
614 memcpy(target_ptr, handle->node->buffer + offset, to_write);
615
616 remaining -= to_write;
617 offset += to_write;
618 target_ptr += to_write;
619 if (remaining > 0) {
620 // read the next block
621 block_id_t next_block = *((block_id_t *)(handle->node->buffer + offset));
622 handle = manager.Pin(next_block);
623 offset = 0;
624 }
625 }
626
627 auto final_buffer = target_handle->node->buffer;
628 handles.insert(make_pair(target_handle->block_id, move(target_handle)));
629 return ReadString(final_buffer, 0);
630 } else {
631 // read the overflow string from memory
632 // first pin the handle, if it is not pinned yet
633 BufferHandle *handle;
634 auto entry = handles.find(block);
635 if (entry == handles.end()) {
636 auto pinned_handle = manager.Pin(block);
637 handle = pinned_handle.get();
638
639 handles.insert(make_pair(block, move(pinned_handle)));
640 } else {
641 handle = entry->second.get();
642 }
643 return ReadString(handle->node->buffer, offset);
644 }
645}
646
647string_t StringSegment::ReadString(data_ptr_t target, int32_t offset) {
648 auto ptr = target + offset;
649 auto str_length = *((uint32_t *)ptr);
650 auto str_ptr = (char *)(ptr + sizeof(uint32_t));
651 return string_t(str_ptr, str_length);
652}
653
654void StringSegment::WriteStringMarker(data_ptr_t target, block_id_t block_id, int32_t offset) {
655 uint16_t length = BIG_STRING_MARKER;
656 memcpy(target, &length, sizeof(uint16_t));
657 target += sizeof(uint16_t);
658 memcpy(target, &block_id, sizeof(block_id_t));
659 target += sizeof(block_id_t);
660 memcpy(target, &offset, sizeof(int32_t));
661}
662
663void StringSegment::ReadStringMarker(data_ptr_t target, block_id_t &block_id, int32_t &offset) {
664 target += sizeof(uint16_t);
665 memcpy(&block_id, target, sizeof(block_id_t));
666 target += sizeof(block_id_t);
667 memcpy(&offset, target, sizeof(int32_t));
668}
669
670//===--------------------------------------------------------------------===//
671// String Update
672//===--------------------------------------------------------------------===//
673string_update_info_t StringSegment::CreateStringUpdate(SegmentStatistics &stats, Vector &update, row_t *ids,
674 idx_t count, idx_t vector_offset) {
675 auto info = make_unique<StringUpdateInfo>();
676 info->count = count;
677 auto strings = FlatVector::GetData<string_t>(update);
678 auto &update_nullmask = FlatVector::Nullmask(update);
679 for (idx_t i = 0; i < count; i++) {
680 info->ids[i] = ids[i] - vector_offset;
681 // copy the string into the block
682 if (!update_nullmask[i]) {
683 auto min = (char *)stats.minimum.get();
684 auto max = (char *)stats.maximum.get();
685 update_min_max(strings[i].GetData(), min, max);
686 WriteString(strings[i], info->block_ids[i], info->offsets[i]);
687 } else {
688 info->block_ids[i] = INVALID_BLOCK;
689 info->offsets[i] = 0;
690 }
691 }
692 return info;
693}
694
695string_update_info_t StringSegment::MergeStringUpdate(SegmentStatistics &stats, Vector &update, row_t *ids,
696 idx_t update_count, idx_t vector_offset,
697 StringUpdateInfo &update_info) {
698 auto info = make_unique<StringUpdateInfo>();
699
700 // perform a merge between the new and old indexes
701 auto strings = FlatVector::GetData<string_t>(update);
702 auto &update_nullmask = FlatVector::Nullmask(update);
703 //! Check if we need to update the segment's nullmask
704 for (idx_t i = 0; i < update_count; i++) {
705 if (!update_nullmask[i]) {
706 auto min = (char *)stats.minimum.get();
707 auto max = (char *)stats.maximum.get();
708 update_min_max(strings[i].GetData(), min, max);
709 }
710 }
711 auto pick_new = [&](idx_t id, idx_t idx, idx_t count) {
712 info->ids[count] = id;
713 if (!update_nullmask[idx]) {
714 WriteString(strings[idx], info->block_ids[count], info->offsets[count]);
715 } else {
716 info->block_ids[count] = INVALID_BLOCK;
717 info->offsets[count] = 0;
718 }
719 };
720 auto merge = [&](idx_t id, idx_t aidx, idx_t bidx, idx_t count) {
721 // merge: only pick new entry
722 pick_new(id, aidx, count);
723 };
724 auto pick_old = [&](idx_t id, idx_t bidx, idx_t count) {
725 // pick old entry
726 info->ids[count] = id;
727 info->block_ids[count] = update_info.block_ids[bidx];
728 info->offsets[count] = update_info.offsets[bidx];
729 };
730
731 info->count =
732 merge_loop(ids, update_info.ids, update_count, update_info.count, vector_offset, merge, pick_new, pick_old);
733 return info;
734}
735
736//===--------------------------------------------------------------------===//
737// Update Info
738//===--------------------------------------------------------------------===//
739void StringSegment::MergeUpdateInfo(UpdateInfo *node, row_t *ids, idx_t update_count, idx_t vector_offset,
740 string_location_t base_data[], nullmask_t base_nullmask) {
741 auto info_data = (string_location_t *)node->tuple_data;
742
743 // first we copy the old update info into a temporary structure
744 sel_t old_ids[STANDARD_VECTOR_SIZE];
745 string_location_t old_data[STANDARD_VECTOR_SIZE];
746
747 memcpy(old_ids, node->tuples, node->N * sizeof(sel_t));
748 memcpy(old_data, node->tuple_data, node->N * sizeof(string_location_t));
749
750 // now we perform a merge of the new ids with the old ids
751 auto merge = [&](idx_t id, idx_t aidx, idx_t bidx, idx_t count) {
752 // new_id and old_id are the same, insert the old data in the UpdateInfo
753 assert(old_data[bidx].IsValid());
754 info_data[count] = old_data[bidx];
755 node->tuples[count] = id;
756 };
757 auto pick_new = [&](idx_t id, idx_t aidx, idx_t count) {
758 // new_id comes before the old id, insert the base table data into the update info
759 assert(base_data[aidx].IsValid());
760 info_data[count] = base_data[aidx];
761 node->nullmask[id] = base_nullmask[aidx];
762
763 node->tuples[count] = id;
764 };
765 auto pick_old = [&](idx_t id, idx_t bidx, idx_t count) {
766 // old_id comes before new_id, insert the old data
767 assert(old_data[bidx].IsValid());
768 info_data[count] = old_data[bidx];
769 node->tuples[count] = id;
770 };
771 // perform the merge
772 node->N = merge_loop(ids, old_ids, update_count, node->N, vector_offset, merge, pick_new, pick_old);
773}
774
775//===--------------------------------------------------------------------===//
776// Update
777//===--------------------------------------------------------------------===//
778void StringSegment::Update(ColumnData &column_data, SegmentStatistics &stats, Transaction &transaction, Vector &update,
779 row_t *ids, idx_t count, idx_t vector_index, idx_t vector_offset, UpdateInfo *node) {
780 if (!string_updates) {
781 string_updates = unique_ptr<string_update_info_t[]>(new string_update_info_t[max_vector_count]);
782 }
783
784 // first pin the base block
785 auto handle = manager.Pin(block_id);
786 auto baseptr = handle->node->buffer;
787 auto base = baseptr + vector_index * vector_size;
788 auto &base_nullmask = *((nullmask_t *)base);
789
790 // fetch the original string locations and copy the original nullmask
791 string_location_t string_locations[STANDARD_VECTOR_SIZE];
792 nullmask_t original_nullmask = base_nullmask;
793 FetchStringLocations(baseptr, ids, vector_index, vector_offset, count, string_locations);
794
795 string_update_info_t new_update_info;
796 // next up: create the updates
797 if (!string_updates[vector_index]) {
798 // no string updates yet, allocate a block and place the updates there
799 new_update_info = CreateStringUpdate(stats, update, ids, count, vector_offset);
800 } else {
801 // string updates already exist, merge the string updates together
802 new_update_info = MergeStringUpdate(stats, update, ids, count, vector_offset, *string_updates[vector_index]);
803 }
804
805 // now update the original nullmask
806 auto &update_nullmask = FlatVector::Nullmask(update);
807 for (idx_t i = 0; i < count; i++) {
808 base_nullmask[ids[i] - vector_offset] = update_nullmask[i];
809 }
810
811 // now that the original strings are placed in the undo buffer and the updated strings are placed in the base table
812 // create the update node
813 if (!node) {
814 // create a new node in the undo buffer for this update
815 node = CreateUpdateInfo(column_data, transaction, ids, count, vector_index, vector_offset,
816 sizeof(string_location_t));
817
818 // copy the string location data into the undo buffer
819 node->nullmask = original_nullmask;
820 memcpy(node->tuple_data, string_locations, sizeof(string_location_t) * count);
821 } else {
822 // node in the update info already exists, merge the new updates in
823 MergeUpdateInfo(node, ids, count, vector_offset, string_locations, original_nullmask);
824 }
825 // finally move the string updates in place
826 string_updates[vector_index] = move(new_update_info);
827}
828
829void StringSegment::RollbackUpdate(UpdateInfo *info) {
830 auto lock_handle = lock.GetExclusiveLock();
831
832 idx_t new_count = 0;
833 auto &update_info = *string_updates[info->vector_index];
834 auto string_locations = (string_location_t *)info->tuple_data;
835
836 // put the previous NULL values back
837 auto handle = manager.Pin(block_id);
838 auto baseptr = handle->node->buffer;
839 auto base = baseptr + info->vector_index * vector_size;
840 auto &base_nullmask = *((nullmask_t *)base);
841 for (idx_t i = 0; i < info->N; i++) {
842 base_nullmask[info->tuples[i]] = info->nullmask[info->tuples[i]];
843 }
844
845 // now put the original values back into the update info
846 idx_t old_idx = 0;
847 for (idx_t i = 0; i < update_info.count; i++) {
848 if (old_idx >= info->N || update_info.ids[i] != info->tuples[old_idx]) {
849 assert(old_idx >= info->N || update_info.ids[i] < info->tuples[old_idx]);
850 // this entry is not rolled back: insert entry directly
851 update_info.ids[new_count] = update_info.ids[i];
852 update_info.block_ids[new_count] = update_info.block_ids[i];
853 update_info.offsets[new_count] = update_info.offsets[i];
854 new_count++;
855 } else {
856 // this entry is being rolled back
857 auto &old_location = string_locations[old_idx];
858 if (old_location.block_id != INVALID_BLOCK) {
859 // not rolled back to base table: insert entry again
860 update_info.ids[new_count] = update_info.ids[i];
861 update_info.block_ids[new_count] = old_location.block_id;
862 update_info.offsets[new_count] = old_location.offset;
863 new_count++;
864 }
865 old_idx++;
866 }
867 }
868
869 if (new_count == 0) {
870 // all updates are rolled back: delete the string update vector
871 string_updates[info->vector_index].reset();
872 } else {
873 // set the count of the new string update vector
874 update_info.count = new_count;
875 }
876 CleanupUpdate(info);
877}
878