1 | #include "duckdb/storage/string_segment.hpp" |
2 | #include "duckdb/storage/buffer_manager.hpp" |
3 | #include "duckdb/storage/numeric_segment.hpp" |
4 | #include "duckdb/transaction/update_info.hpp" |
5 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
6 | #include "duckdb/storage/data_table.hpp" |
7 | #include "duckdb/common/operator/comparison_operators.hpp" |
8 | |
9 | using namespace duckdb; |
10 | using namespace std; |
11 | |
12 | StringSegment::StringSegment(BufferManager &manager, idx_t row_start, block_id_t block) |
13 | : UncompressedSegment(manager, TypeId::VARCHAR, row_start) { |
14 | this->max_vector_count = 0; |
15 | this->dictionary_offset = 0; |
16 | // the vector_size is given in the size of the dictionary offsets |
17 | this->vector_size = STANDARD_VECTOR_SIZE * sizeof(int32_t) + sizeof(nullmask_t); |
18 | this->string_updates = nullptr; |
19 | |
20 | this->block_id = block; |
21 | if (block_id == INVALID_BLOCK) { |
22 | // start off with an empty string segment: allocate space for it |
23 | auto handle = manager.Allocate(Storage::BLOCK_ALLOC_SIZE); |
24 | this->block_id = handle->block_id; |
25 | |
26 | ExpandStringSegment(handle->node->buffer); |
27 | } |
28 | } |
29 | |
30 | StringSegment::~StringSegment() { |
31 | while (head) { |
32 | manager.DestroyBuffer(head->block_id); |
33 | head = move(head->next); |
34 | } |
35 | } |
36 | |
37 | void StringSegment::ExpandStringSegment(data_ptr_t baseptr) { |
38 | // clear the nullmask for this vector |
39 | auto mask = (nullmask_t *)(baseptr + (max_vector_count * vector_size)); |
40 | mask->reset(); |
41 | |
42 | max_vector_count++; |
43 | if (versions) { |
44 | auto new_versions = unique_ptr<UpdateInfo *[]>(new UpdateInfo *[max_vector_count]); |
45 | memcpy(new_versions.get(), versions.get(), (max_vector_count - 1) * sizeof(UpdateInfo *)); |
46 | new_versions[max_vector_count - 1] = nullptr; |
47 | versions = move(new_versions); |
48 | } |
49 | |
50 | if (string_updates) { |
51 | auto new_string_updates = unique_ptr<string_update_info_t[]>(new string_update_info_t[max_vector_count]); |
52 | for (idx_t i = 0; i < max_vector_count - 1; i++) { |
53 | new_string_updates[i] = move(string_updates[i]); |
54 | } |
55 | new_string_updates[max_vector_count - 1] = 0; |
56 | string_updates = move(new_string_updates); |
57 | } |
58 | } |
59 | |
60 | //===--------------------------------------------------------------------===// |
61 | // Scan |
62 | //===--------------------------------------------------------------------===// |
63 | void StringSegment::InitializeScan(ColumnScanState &state) { |
64 | // pin the primary buffer |
65 | state.primary_handle = manager.Pin(block_id); |
66 | } |
67 | |
68 | //===--------------------------------------------------------------------===// |
69 | // Filter base data |
70 | //===--------------------------------------------------------------------===// |
71 | void StringSegment::read_string(string_t *result_data, buffer_handle_set_t &handles, data_ptr_t baseptr, |
72 | int32_t *dict_offset, idx_t src_idx, idx_t res_idx, idx_t &update_idx, |
73 | size_t vector_index) { |
74 | if (string_updates && string_updates[vector_index]) { |
75 | auto &info = *string_updates[vector_index]; |
76 | while (info.ids[update_idx] < src_idx) { |
77 | //! We need to catch the update_idx up to the src_idx |
78 | update_idx++; |
79 | } |
80 | if (update_idx < info.count && info.ids[update_idx] == src_idx) { |
81 | result_data[res_idx] = ReadString(handles, info.block_ids[update_idx], info.offsets[update_idx]); |
82 | } else { |
83 | result_data[res_idx] = FetchStringFromDict(handles, baseptr, dict_offset[src_idx]); |
84 | } |
85 | } else { |
86 | result_data[res_idx] = FetchStringFromDict(handles, baseptr, dict_offset[src_idx]); |
87 | } |
88 | } |
89 | |
90 | void StringSegment::Select(ColumnScanState &state, Vector &result, SelectionVector &sel, idx_t &approved_tuple_count, |
91 | vector<TableFilter> &tableFilter) { |
92 | auto vector_index = state.vector_index; |
93 | assert(vector_index < max_vector_count); |
94 | assert(vector_index * STANDARD_VECTOR_SIZE <= tuple_count); |
95 | |
96 | auto handle = state.primary_handle.get(); |
97 | state.handles.clear(); |
98 | auto baseptr = handle->node->buffer; |
99 | // fetch the data from the base segment |
100 | auto base = baseptr + state.vector_index * vector_size; |
101 | auto base_data = (int32_t *)(base + sizeof(nullmask_t)); |
102 | auto base_nullmask = (nullmask_t *)base; |
103 | |
104 | if (tableFilter.size() == 1) { |
105 | switch (tableFilter[0].comparison_type) { |
106 | case ExpressionType::COMPARE_EQUAL: { |
107 | Select_String<Equals>(state.handles, result, baseptr, base_data, sel, tableFilter[0].constant.str_value, |
108 | approved_tuple_count, base_nullmask, vector_index); |
109 | break; |
110 | } |
111 | case ExpressionType::COMPARE_LESSTHAN: { |
112 | Select_String<LessThan>(state.handles, result, baseptr, base_data, sel, tableFilter[0].constant.str_value, |
113 | approved_tuple_count, base_nullmask, vector_index); |
114 | break; |
115 | } |
116 | case ExpressionType::COMPARE_GREATERTHAN: { |
117 | Select_String<GreaterThan>(state.handles, result, baseptr, base_data, sel, |
118 | tableFilter[0].constant.str_value, approved_tuple_count, base_nullmask, |
119 | vector_index); |
120 | break; |
121 | } |
122 | case ExpressionType::COMPARE_LESSTHANOREQUALTO: { |
123 | Select_String<LessThanEquals>(state.handles, result, baseptr, base_data, sel, |
124 | tableFilter[0].constant.str_value, approved_tuple_count, base_nullmask, |
125 | vector_index); |
126 | break; |
127 | } |
128 | case ExpressionType::COMPARE_GREATERTHANOREQUALTO: { |
129 | Select_String<GreaterThanEquals>(state.handles, result, baseptr, base_data, sel, |
130 | tableFilter[0].constant.str_value, approved_tuple_count, base_nullmask, |
131 | vector_index); |
132 | |
133 | break; |
134 | } |
135 | default: |
136 | throw NotImplementedException("Unknown comparison type for filter pushed down to table!" ); |
137 | } |
138 | } else { |
139 | bool isFirstGreater = tableFilter[0].comparison_type == ExpressionType::COMPARE_GREATERTHAN || |
140 | tableFilter[0].comparison_type == ExpressionType::COMPARE_GREATERTHANOREQUALTO; |
141 | auto less = isFirstGreater?tableFilter[1]:tableFilter[0]; |
142 | auto greater = isFirstGreater?tableFilter[0]:tableFilter[1]; |
143 | if (greater.comparison_type == ExpressionType::COMPARE_GREATERTHAN) { |
144 | if (less.comparison_type == ExpressionType::COMPARE_LESSTHAN) { |
145 | Select_String_Between<GreaterThan, LessThan>( |
146 | state.handles, result, baseptr, base_data, sel, greater.constant.str_value, |
147 | less.constant.str_value, approved_tuple_count, base_nullmask, vector_index); |
148 | } else { |
149 | Select_String_Between<GreaterThan, LessThanEquals>( |
150 | state.handles, result, baseptr, base_data, sel, greater.constant.str_value, |
151 | less.constant.str_value, approved_tuple_count, base_nullmask, vector_index); |
152 | } |
153 | } else { |
154 | if (less.comparison_type == ExpressionType::COMPARE_LESSTHAN) { |
155 | Select_String_Between<GreaterThanEquals, LessThan>( |
156 | state.handles, result, baseptr, base_data, sel, greater.constant.str_value, |
157 | less.constant.str_value, approved_tuple_count, base_nullmask, vector_index); |
158 | } else { |
159 | Select_String_Between<GreaterThanEquals, LessThanEquals>( |
160 | state.handles, result, baseptr, base_data, sel, greater.constant.str_value, |
161 | less.constant.str_value, approved_tuple_count, base_nullmask, vector_index); |
162 | } |
163 | } |
164 | } |
165 | } |
166 | |
167 | //===--------------------------------------------------------------------===// |
168 | // Fetch base data |
169 | //===--------------------------------------------------------------------===// |
170 | void StringSegment::FetchBaseData(ColumnScanState &state, idx_t vector_index, Vector &result) { |
171 | // clear any previously locked buffers and get the primary buffer handle |
172 | auto handle = state.primary_handle.get(); |
173 | state.handles.clear(); |
174 | |
175 | // fetch the data from the base segment |
176 | FetchBaseData(state, handle->node->buffer, vector_index, result, GetVectorCount(vector_index)); |
177 | } |
178 | |
179 | void StringSegment::FetchBaseData(ColumnScanState &state, data_ptr_t baseptr, idx_t vector_index, Vector &result, |
180 | idx_t count) { |
181 | auto base = baseptr + vector_index * vector_size; |
182 | |
183 | auto &base_nullmask = *((nullmask_t *)base); |
184 | auto base_data = (int32_t *)(base + sizeof(nullmask_t)); |
185 | auto result_data = FlatVector::GetData<string_t>(result); |
186 | |
187 | if (string_updates && string_updates[vector_index]) { |
188 | // there are updates: merge them in |
189 | auto &info = *string_updates[vector_index]; |
190 | idx_t update_idx = 0; |
191 | for (idx_t i = 0; i < count; i++) { |
192 | if (update_idx < info.count && info.ids[update_idx] == i) { |
193 | // use update info |
194 | result_data[i] = ReadString(state.handles, info.block_ids[update_idx], info.offsets[update_idx]); |
195 | update_idx++; |
196 | } else { |
197 | // use base table info |
198 | result_data[i] = FetchStringFromDict(state.handles, baseptr, base_data[i]); |
199 | } |
200 | } |
201 | } else { |
202 | // no updates: fetch only from the string dictionary |
203 | for (idx_t i = 0; i < count; i++) { |
204 | result_data[i] = FetchStringFromDict(state.handles, baseptr, base_data[i]); |
205 | } |
206 | } |
207 | FlatVector::SetNullmask(result, base_nullmask); |
208 | } |
209 | |
210 | void StringSegment::FilterFetchBaseData(ColumnScanState &state, Vector &result, SelectionVector &sel, |
211 | idx_t &approved_tuple_count) { |
212 | // clear any previously locked buffers and get the primary buffer handle |
213 | auto handle = state.primary_handle.get(); |
214 | state.handles.clear(); |
215 | auto baseptr = handle->node->buffer; |
216 | // fetch the data from the base segment |
217 | auto base = baseptr + state.vector_index * vector_size; |
218 | auto &base_nullmask = *((nullmask_t *)base); |
219 | auto base_data = (int32_t *)(base + sizeof(nullmask_t)); |
220 | result.vector_type = VectorType::FLAT_VECTOR; |
221 | auto result_data = FlatVector::GetData<string_t>(result); |
222 | nullmask_t result_nullmask; |
223 | idx_t update_idx = 0; |
224 | if (base_nullmask.any()) { |
225 | for (idx_t i = 0; i < approved_tuple_count; i++) { |
226 | idx_t src_idx = sel.get_index(i); |
227 | if (base_nullmask[src_idx]) { |
228 | result_nullmask.set(i, true); |
229 | read_string(result_data, state.handles, baseptr, base_data, src_idx, i, update_idx, state.vector_index); |
230 | } else { |
231 | read_string(result_data, state.handles, baseptr, base_data, src_idx, i, update_idx, state.vector_index); |
232 | } |
233 | } |
234 | } else { |
235 | for (idx_t i = 0; i < approved_tuple_count; i++) { |
236 | idx_t src_idx = sel.get_index(i); |
237 | read_string(result_data, state.handles, baseptr, base_data, src_idx, i, update_idx, state.vector_index); |
238 | } |
239 | } |
240 | FlatVector::SetNullmask(result, result_nullmask); |
241 | } |
242 | |
243 | //===--------------------------------------------------------------------===// |
244 | // Fetch update data |
245 | //===--------------------------------------------------------------------===// |
246 | void StringSegment::FetchUpdateData(ColumnScanState &state, Transaction &transaction, UpdateInfo *info, |
247 | Vector &result) { |
248 | // fetch data from updates |
249 | auto handle = state.primary_handle.get(); |
250 | |
251 | auto result_data = FlatVector::GetData<string_t>(result); |
252 | auto &result_mask = FlatVector::Nullmask(result); |
253 | UpdateInfo::UpdatesForTransaction(info, transaction, [&](UpdateInfo *current) { |
254 | auto info_data = (string_location_t *)current->tuple_data; |
255 | for (idx_t i = 0; i < current->N; i++) { |
256 | auto string = FetchString(state.handles, handle->node->buffer, info_data[i]); |
257 | result_data[current->tuples[i]] = string; |
258 | result_mask[current->tuples[i]] = current->nullmask[current->tuples[i]]; |
259 | } |
260 | }); |
261 | } |
262 | |
263 | //===--------------------------------------------------------------------===// |
264 | // Fetch strings |
265 | //===--------------------------------------------------------------------===// |
266 | void StringSegment::FetchStringLocations(data_ptr_t baseptr, row_t *ids, idx_t vector_index, idx_t vector_offset, |
267 | idx_t count, string_location_t result[]) { |
268 | auto base = baseptr + vector_index * vector_size; |
269 | auto base_data = (int32_t *)(base + sizeof(nullmask_t)); |
270 | |
271 | if (string_updates && string_updates[vector_index]) { |
272 | // there are updates: merge them in |
273 | auto &info = *string_updates[vector_index]; |
274 | idx_t update_idx = 0; |
275 | for (idx_t i = 0; i < count; i++) { |
276 | auto id = ids[i] - vector_offset; |
277 | while (update_idx < info.count && info.ids[update_idx] < id) { |
278 | update_idx++; |
279 | } |
280 | if (update_idx < info.count && info.ids[update_idx] == id) { |
281 | // use update info |
282 | result[i].block_id = info.block_ids[update_idx]; |
283 | result[i].offset = info.offsets[update_idx]; |
284 | update_idx++; |
285 | } else { |
286 | // use base table info |
287 | result[i] = FetchStringLocation(baseptr, base_data[id]); |
288 | } |
289 | } |
290 | } else { |
291 | // no updates: fetch strings from base vector |
292 | for (idx_t i = 0; i < count; i++) { |
293 | auto id = ids[i] - vector_offset; |
294 | result[i] = FetchStringLocation(baseptr, base_data[id]); |
295 | } |
296 | } |
297 | } |
298 | |
299 | string_location_t StringSegment::FetchStringLocation(data_ptr_t baseptr, int32_t dict_offset) { |
300 | if (dict_offset == 0) { |
301 | return string_location_t(INVALID_BLOCK, 0); |
302 | } |
303 | // look up result in dictionary |
304 | auto dict_end = baseptr + Storage::BLOCK_SIZE; |
305 | auto dict_pos = dict_end - dict_offset; |
306 | auto string_length = *((uint16_t *)dict_pos); |
307 | string_location_t result; |
308 | if (string_length == BIG_STRING_MARKER) { |
309 | ReadStringMarker(dict_pos, result.block_id, result.offset); |
310 | } else { |
311 | result.block_id = INVALID_BLOCK; |
312 | result.offset = dict_offset; |
313 | } |
314 | return result; |
315 | } |
316 | |
317 | string_t StringSegment::FetchStringFromDict(buffer_handle_set_t &handles, data_ptr_t baseptr, int32_t dict_offset) { |
318 | // fetch base data |
319 | assert(dict_offset <= Storage::BLOCK_SIZE); |
320 | string_location_t location = FetchStringLocation(baseptr, dict_offset); |
321 | return FetchString(handles, baseptr, location); |
322 | } |
323 | |
324 | string_t StringSegment::FetchString(buffer_handle_set_t &handles, data_ptr_t baseptr, string_location_t location) { |
325 | if (location.block_id != INVALID_BLOCK) { |
326 | // big string marker: read from separate block |
327 | return ReadString(handles, location.block_id, location.offset); |
328 | } else { |
329 | if (location.offset == 0) { |
330 | return string_t(nullptr, 0); |
331 | } |
332 | // normal string: read string from this block |
333 | auto dict_end = baseptr + Storage::BLOCK_SIZE; |
334 | auto dict_pos = dict_end - location.offset; |
335 | auto string_length = *((uint16_t *)dict_pos); |
336 | |
337 | auto str_ptr = (char *)(dict_pos + sizeof(uint16_t)); |
338 | return string_t(str_ptr, string_length); |
339 | } |
340 | } |
341 | |
342 | void StringSegment::FetchRow(ColumnFetchState &state, Transaction &transaction, row_t row_id, Vector &result, |
343 | idx_t result_idx) { |
344 | auto read_lock = lock.GetSharedLock(); |
345 | |
346 | idx_t vector_index = row_id / STANDARD_VECTOR_SIZE; |
347 | idx_t id_in_vector = row_id - vector_index * STANDARD_VECTOR_SIZE; |
348 | assert(vector_index < max_vector_count); |
349 | |
350 | data_ptr_t baseptr; |
351 | |
352 | // fetch a single row from the string segment |
353 | // first pin the main buffer if it is not already pinned |
354 | auto entry = state.handles.find(block_id); |
355 | if (entry == state.handles.end()) { |
356 | // not pinned yet: pin it |
357 | auto handle = manager.Pin(block_id); |
358 | baseptr = handle->node->buffer; |
359 | state.handles[block_id] = move(handle); |
360 | } else { |
361 | // already pinned: use the pinned handle |
362 | baseptr = entry->second->node->buffer; |
363 | } |
364 | |
365 | auto base = baseptr + vector_index * vector_size; |
366 | auto &base_nullmask = *((nullmask_t *)base); |
367 | auto base_data = (int32_t *)(base + sizeof(nullmask_t)); |
368 | auto result_data = FlatVector::GetData<string_t>(result); |
369 | auto &result_mask = FlatVector::Nullmask(result); |
370 | |
371 | bool found_data = false; |
372 | // first see if there is any updated version of this tuple we must fetch |
373 | if (versions && versions[vector_index]) { |
374 | UpdateInfo::UpdatesForTransaction(versions[vector_index], transaction, [&](UpdateInfo *current) { |
375 | auto info_data = (string_location_t *)current->tuple_data; |
376 | // loop over the tuples in this UpdateInfo |
377 | for (idx_t i = 0; i < current->N; i++) { |
378 | if (current->tuples[i] == row_id) { |
379 | // found the relevant tuple |
380 | found_data = true; |
381 | result_data[result_idx] = FetchString(state.handles, baseptr, info_data[i]); |
382 | result_mask[result_idx] = current->nullmask[current->tuples[i]]; |
383 | break; |
384 | } else if (current->tuples[i] > row_id) { |
385 | // tuples are sorted: so if the current tuple is > row_id we will not find it anymore |
386 | break; |
387 | } |
388 | } |
389 | }); |
390 | } |
391 | if (!found_data && string_updates && string_updates[vector_index]) { |
392 | // there are updates: check if we should use them |
393 | auto &info = *string_updates[vector_index]; |
394 | for (idx_t i = 0; i < info.count; i++) { |
395 | if (info.ids[i] == id_in_vector) { |
396 | // use the update |
397 | result_data[result_idx] = ReadString(state.handles, info.block_ids[i], info.offsets[i]); |
398 | found_data = true; |
399 | break; |
400 | } else if (info.ids[i] > id_in_vector) { |
401 | break; |
402 | } |
403 | } |
404 | } |
405 | if (!found_data) { |
406 | // no version was found yet: fetch base table version |
407 | result_data[result_idx] = FetchStringFromDict(state.handles, baseptr, base_data[id_in_vector]); |
408 | } |
409 | result_mask[result_idx] = base_nullmask[id_in_vector]; |
410 | } |
411 | |
412 | //===--------------------------------------------------------------------===// |
413 | // Append |
414 | //===--------------------------------------------------------------------===// |
415 | idx_t StringSegment::Append(SegmentStatistics &stats, Vector &data, idx_t offset, idx_t count) { |
416 | assert(data.type == TypeId::VARCHAR); |
417 | auto handle = manager.Pin(block_id); |
418 | idx_t initial_count = tuple_count; |
419 | while (count > 0) { |
420 | // get the vector index of the vector to append to and see how many tuples we can append to that vector |
421 | idx_t vector_index = tuple_count / STANDARD_VECTOR_SIZE; |
422 | if (vector_index == max_vector_count) { |
423 | // we are at the maximum vector, check if there is space to increase the maximum vector count |
424 | // as a heuristic, we only allow another vector to be added if we have at least 32 bytes per string |
425 | // remaining (32KB out of a 256KB block, or around 12% empty) |
426 | if (RemainingSpace() >= STANDARD_VECTOR_SIZE * 32) { |
427 | // we have enough remaining space to add another vector |
428 | ExpandStringSegment(handle->node->buffer); |
429 | } else { |
430 | break; |
431 | } |
432 | } |
433 | idx_t current_tuple_count = tuple_count - vector_index * STANDARD_VECTOR_SIZE; |
434 | idx_t append_count = std::min(STANDARD_VECTOR_SIZE - current_tuple_count, count); |
435 | |
436 | // now perform the actual append |
437 | AppendData(stats, handle->node->buffer + vector_size * vector_index, handle->node->buffer + Storage::BLOCK_SIZE, |
438 | current_tuple_count, data, offset, append_count); |
439 | |
440 | count -= append_count; |
441 | offset += append_count; |
442 | tuple_count += append_count; |
443 | } |
444 | return tuple_count - initial_count; |
445 | } |
446 | |
447 | static void update_min_max(string value, char *__restrict min, char *__restrict max) { |
448 | //! we can only fit 8 bytes, so we might need to trim our string |
449 | size_t value_size = value.size() > 7 ? 7 : value.size(); |
450 | //! This marks the min/max was not initialized |
451 | char marker = '1'; |
452 | if (min[0] == '\0' && min[1] == marker && max[0] == '\0' && max[1] == marker) { |
453 | size_t min_end = value.copy(min, value_size); |
454 | size_t max_end = value.copy(max, value_size); |
455 | for (size_t i = min_end; i < 8; i++) { |
456 | min[i] = '\0'; |
457 | } |
458 | for (size_t i = max_end; i < 8; i++) { |
459 | max[i] = '\0'; |
460 | } |
461 | } |
462 | if (strcmp(value.data(), min) < 0) { |
463 | size_t min_end = value.copy(min, value_size); |
464 | for (size_t i = min_end; i < 8; i++) { |
465 | min[i] = '\0'; |
466 | } |
467 | } |
468 | if (strcmp(value.data(), max) > 0) { |
469 | size_t max_end = value.copy(max, value_size); |
470 | for (size_t i = max_end; i < 8; i++) { |
471 | max[i] = '\0'; |
472 | } |
473 | } |
474 | } |
475 | |
476 | void StringSegment::AppendData(SegmentStatistics &stats, data_ptr_t target, data_ptr_t end, idx_t target_offset, |
477 | Vector &source, idx_t offset, idx_t count) { |
478 | VectorData adata; |
479 | source.Orrify(count, adata); |
480 | |
481 | auto sdata = (string_t *)adata.data; |
482 | auto &result_nullmask = *((nullmask_t *)target); |
483 | auto result_data = (int32_t *)(target + sizeof(nullmask_t)); |
484 | auto min = (char *)stats.minimum.get(); |
485 | auto max = (char *)stats.maximum.get(); |
486 | |
487 | idx_t remaining_strings = STANDARD_VECTOR_SIZE - (this->tuple_count % STANDARD_VECTOR_SIZE); |
488 | for (idx_t i = 0; i < count; i++) { |
489 | auto source_idx = adata.sel->get_index(offset + i); |
490 | auto target_idx = target_offset + i; |
491 | if ((*adata.nullmask)[source_idx]) { |
492 | // null value is stored as -1 |
493 | result_data[target_idx] = 0; |
494 | result_nullmask[target_idx] = true; |
495 | stats.has_null = true; |
496 | } else { |
497 | assert(dictionary_offset < Storage::BLOCK_SIZE); |
498 | // non-null value, check if we can fit it within the block |
499 | idx_t string_length = sdata[source_idx].GetSize(); |
500 | idx_t total_length = string_length + 1 + sizeof(uint16_t); |
501 | |
502 | if (string_length > stats.max_string_length) { |
503 | stats.max_string_length = string_length; |
504 | } |
505 | // determine whether or not the string needs to be stored in an overflow block |
506 | // we never place small strings in the overflow blocks: the pointer would take more space than the |
507 | // string itself we always place big strings (>= STRING_BLOCK_LIMIT) in the overflow blocks we also have |
508 | // to always leave enough room for BIG_STRING_MARKER_SIZE for each of the remaining strings |
509 | if (total_length > BIG_STRING_MARKER_BASE_SIZE && |
510 | (total_length >= STRING_BLOCK_LIMIT || |
511 | total_length + (remaining_strings * BIG_STRING_MARKER_SIZE) > RemainingSpace())) { |
512 | assert(RemainingSpace() >= BIG_STRING_MARKER_SIZE); |
513 | // string is too big for block: write to overflow blocks |
514 | block_id_t block; |
515 | int32_t offset; |
516 | //! Update min/max of column segment |
517 | update_min_max(sdata[source_idx].GetData(), min, max); |
518 | // write the string into the current string block |
519 | WriteString(sdata[source_idx], block, offset); |
520 | dictionary_offset += BIG_STRING_MARKER_SIZE; |
521 | auto dict_pos = end - dictionary_offset; |
522 | |
523 | // write a big string marker into the dictionary |
524 | WriteStringMarker(dict_pos, block, offset); |
525 | |
526 | stats.has_overflow_strings = true; |
527 | } else { |
528 | // string fits in block, append to dictionary and increment dictionary position |
529 | assert(string_length < std::numeric_limits<uint16_t>::max()); |
530 | dictionary_offset += total_length; |
531 | auto dict_pos = end - dictionary_offset; |
532 | //! Update min/max of column segment |
533 | update_min_max(sdata[source_idx].GetData(), min, max); |
534 | // first write the length as u16 |
535 | uint16_t string_length_u16 = string_length; |
536 | memcpy(dict_pos, &string_length_u16, sizeof(uint16_t)); |
537 | // now write the actual string data into the dictionary |
538 | memcpy(dict_pos + sizeof(uint16_t), sdata[source_idx].GetData(), string_length + 1); |
539 | } |
540 | // place the dictionary offset into the set of vectors |
541 | assert(dictionary_offset <= Storage::BLOCK_SIZE); |
542 | result_data[target_idx] = dictionary_offset; |
543 | } |
544 | remaining_strings--; |
545 | } |
546 | } |
547 | |
548 | void StringSegment::WriteString(string_t string, block_id_t &result_block, int32_t &result_offset) { |
549 | assert(strlen(string.GetData()) == string.GetSize()); |
550 | if (overflow_writer) { |
551 | // overflow writer is set: write string there |
552 | overflow_writer->WriteString(string, result_block, result_offset); |
553 | } else { |
554 | // default overflow behavior: use in-memory buffer to store the overflow string |
555 | WriteStringMemory(string, result_block, result_offset); |
556 | } |
557 | } |
558 | |
559 | void StringSegment::WriteStringMemory(string_t string, block_id_t &result_block, int32_t &result_offset) { |
560 | uint32_t total_length = string.GetSize() + 1 + sizeof(uint32_t); |
561 | unique_ptr<BufferHandle> handle; |
562 | // check if the string fits in the current block |
563 | if (!head || head->offset + total_length >= head->size) { |
564 | // string does not fit, allocate space for it |
565 | // create a new string block |
566 | idx_t alloc_size = std::max((idx_t)total_length, (idx_t)Storage::BLOCK_ALLOC_SIZE); |
567 | auto new_block = make_unique<StringBlock>(); |
568 | new_block->offset = 0; |
569 | new_block->size = alloc_size; |
570 | // allocate an in-memory buffer for it |
571 | handle = manager.Allocate(alloc_size); |
572 | new_block->block_id = handle->block_id; |
573 | new_block->next = move(head); |
574 | head = move(new_block); |
575 | } else { |
576 | // string fits, copy it into the current block |
577 | handle = manager.Pin(head->block_id); |
578 | } |
579 | |
580 | result_block = head->block_id; |
581 | result_offset = head->offset; |
582 | |
583 | // copy the string and the length there |
584 | auto ptr = handle->node->buffer + head->offset; |
585 | memcpy(ptr, &string.length, sizeof(uint32_t)); |
586 | ptr += sizeof(uint32_t); |
587 | memcpy(ptr, string.GetData(), string.length + 1); |
588 | head->offset += total_length; |
589 | } |
590 | |
591 | string_t StringSegment::ReadString(buffer_handle_set_t &handles, block_id_t block, int32_t offset) { |
592 | assert(offset < Storage::BLOCK_SIZE); |
593 | if (block == INVALID_BLOCK) { |
594 | return string_t(nullptr, 0); |
595 | } |
596 | if (block < MAXIMUM_BLOCK) { |
597 | // read the overflow string from disk |
598 | // pin the initial handle and read the length |
599 | auto handle = manager.Pin(block); |
600 | uint32_t length = *((uint32_t *)(handle->node->buffer + offset)); |
601 | uint32_t remaining = length + 1; |
602 | offset += sizeof(uint32_t); |
603 | |
604 | // allocate a buffer to store the string |
605 | auto alloc_size = std::max((idx_t)Storage::BLOCK_ALLOC_SIZE, (idx_t)length + 1 + sizeof(uint32_t)); |
606 | auto target_handle = manager.Allocate(alloc_size, true); |
607 | auto target_ptr = target_handle->node->buffer; |
608 | // write the length in this block as well |
609 | *((uint32_t *)target_ptr) = length; |
610 | target_ptr += sizeof(uint32_t); |
611 | // now append the string to the single buffer |
612 | while (remaining > 0) { |
613 | idx_t to_write = std::min((idx_t)remaining, (idx_t)(Storage::BLOCK_SIZE - sizeof(block_id_t) - offset)); |
614 | memcpy(target_ptr, handle->node->buffer + offset, to_write); |
615 | |
616 | remaining -= to_write; |
617 | offset += to_write; |
618 | target_ptr += to_write; |
619 | if (remaining > 0) { |
620 | // read the next block |
621 | block_id_t next_block = *((block_id_t *)(handle->node->buffer + offset)); |
622 | handle = manager.Pin(next_block); |
623 | offset = 0; |
624 | } |
625 | } |
626 | |
627 | auto final_buffer = target_handle->node->buffer; |
628 | handles.insert(make_pair(target_handle->block_id, move(target_handle))); |
629 | return ReadString(final_buffer, 0); |
630 | } else { |
631 | // read the overflow string from memory |
632 | // first pin the handle, if it is not pinned yet |
633 | BufferHandle *handle; |
634 | auto entry = handles.find(block); |
635 | if (entry == handles.end()) { |
636 | auto pinned_handle = manager.Pin(block); |
637 | handle = pinned_handle.get(); |
638 | |
639 | handles.insert(make_pair(block, move(pinned_handle))); |
640 | } else { |
641 | handle = entry->second.get(); |
642 | } |
643 | return ReadString(handle->node->buffer, offset); |
644 | } |
645 | } |
646 | |
647 | string_t StringSegment::ReadString(data_ptr_t target, int32_t offset) { |
648 | auto ptr = target + offset; |
649 | auto str_length = *((uint32_t *)ptr); |
650 | auto str_ptr = (char *)(ptr + sizeof(uint32_t)); |
651 | return string_t(str_ptr, str_length); |
652 | } |
653 | |
654 | void StringSegment::WriteStringMarker(data_ptr_t target, block_id_t block_id, int32_t offset) { |
655 | uint16_t length = BIG_STRING_MARKER; |
656 | memcpy(target, &length, sizeof(uint16_t)); |
657 | target += sizeof(uint16_t); |
658 | memcpy(target, &block_id, sizeof(block_id_t)); |
659 | target += sizeof(block_id_t); |
660 | memcpy(target, &offset, sizeof(int32_t)); |
661 | } |
662 | |
663 | void StringSegment::ReadStringMarker(data_ptr_t target, block_id_t &block_id, int32_t &offset) { |
664 | target += sizeof(uint16_t); |
665 | memcpy(&block_id, target, sizeof(block_id_t)); |
666 | target += sizeof(block_id_t); |
667 | memcpy(&offset, target, sizeof(int32_t)); |
668 | } |
669 | |
670 | //===--------------------------------------------------------------------===// |
671 | // String Update |
672 | //===--------------------------------------------------------------------===// |
673 | string_update_info_t StringSegment::CreateStringUpdate(SegmentStatistics &stats, Vector &update, row_t *ids, |
674 | idx_t count, idx_t vector_offset) { |
675 | auto info = make_unique<StringUpdateInfo>(); |
676 | info->count = count; |
677 | auto strings = FlatVector::GetData<string_t>(update); |
678 | auto &update_nullmask = FlatVector::Nullmask(update); |
679 | for (idx_t i = 0; i < count; i++) { |
680 | info->ids[i] = ids[i] - vector_offset; |
681 | // copy the string into the block |
682 | if (!update_nullmask[i]) { |
683 | auto min = (char *)stats.minimum.get(); |
684 | auto max = (char *)stats.maximum.get(); |
685 | update_min_max(strings[i].GetData(), min, max); |
686 | WriteString(strings[i], info->block_ids[i], info->offsets[i]); |
687 | } else { |
688 | info->block_ids[i] = INVALID_BLOCK; |
689 | info->offsets[i] = 0; |
690 | } |
691 | } |
692 | return info; |
693 | } |
694 | |
695 | string_update_info_t StringSegment::MergeStringUpdate(SegmentStatistics &stats, Vector &update, row_t *ids, |
696 | idx_t update_count, idx_t vector_offset, |
697 | StringUpdateInfo &update_info) { |
698 | auto info = make_unique<StringUpdateInfo>(); |
699 | |
700 | // perform a merge between the new and old indexes |
701 | auto strings = FlatVector::GetData<string_t>(update); |
702 | auto &update_nullmask = FlatVector::Nullmask(update); |
703 | //! Check if we need to update the segment's nullmask |
704 | for (idx_t i = 0; i < update_count; i++) { |
705 | if (!update_nullmask[i]) { |
706 | auto min = (char *)stats.minimum.get(); |
707 | auto max = (char *)stats.maximum.get(); |
708 | update_min_max(strings[i].GetData(), min, max); |
709 | } |
710 | } |
711 | auto pick_new = [&](idx_t id, idx_t idx, idx_t count) { |
712 | info->ids[count] = id; |
713 | if (!update_nullmask[idx]) { |
714 | WriteString(strings[idx], info->block_ids[count], info->offsets[count]); |
715 | } else { |
716 | info->block_ids[count] = INVALID_BLOCK; |
717 | info->offsets[count] = 0; |
718 | } |
719 | }; |
720 | auto merge = [&](idx_t id, idx_t aidx, idx_t bidx, idx_t count) { |
721 | // merge: only pick new entry |
722 | pick_new(id, aidx, count); |
723 | }; |
724 | auto pick_old = [&](idx_t id, idx_t bidx, idx_t count) { |
725 | // pick old entry |
726 | info->ids[count] = id; |
727 | info->block_ids[count] = update_info.block_ids[bidx]; |
728 | info->offsets[count] = update_info.offsets[bidx]; |
729 | }; |
730 | |
731 | info->count = |
732 | merge_loop(ids, update_info.ids, update_count, update_info.count, vector_offset, merge, pick_new, pick_old); |
733 | return info; |
734 | } |
735 | |
736 | //===--------------------------------------------------------------------===// |
737 | // Update Info |
738 | //===--------------------------------------------------------------------===// |
739 | void StringSegment::MergeUpdateInfo(UpdateInfo *node, row_t *ids, idx_t update_count, idx_t vector_offset, |
740 | string_location_t base_data[], nullmask_t base_nullmask) { |
741 | auto info_data = (string_location_t *)node->tuple_data; |
742 | |
743 | // first we copy the old update info into a temporary structure |
744 | sel_t old_ids[STANDARD_VECTOR_SIZE]; |
745 | string_location_t old_data[STANDARD_VECTOR_SIZE]; |
746 | |
747 | memcpy(old_ids, node->tuples, node->N * sizeof(sel_t)); |
748 | memcpy(old_data, node->tuple_data, node->N * sizeof(string_location_t)); |
749 | |
750 | // now we perform a merge of the new ids with the old ids |
751 | auto merge = [&](idx_t id, idx_t aidx, idx_t bidx, idx_t count) { |
752 | // new_id and old_id are the same, insert the old data in the UpdateInfo |
753 | assert(old_data[bidx].IsValid()); |
754 | info_data[count] = old_data[bidx]; |
755 | node->tuples[count] = id; |
756 | }; |
757 | auto pick_new = [&](idx_t id, idx_t aidx, idx_t count) { |
758 | // new_id comes before the old id, insert the base table data into the update info |
759 | assert(base_data[aidx].IsValid()); |
760 | info_data[count] = base_data[aidx]; |
761 | node->nullmask[id] = base_nullmask[aidx]; |
762 | |
763 | node->tuples[count] = id; |
764 | }; |
765 | auto pick_old = [&](idx_t id, idx_t bidx, idx_t count) { |
766 | // old_id comes before new_id, insert the old data |
767 | assert(old_data[bidx].IsValid()); |
768 | info_data[count] = old_data[bidx]; |
769 | node->tuples[count] = id; |
770 | }; |
771 | // perform the merge |
772 | node->N = merge_loop(ids, old_ids, update_count, node->N, vector_offset, merge, pick_new, pick_old); |
773 | } |
774 | |
775 | //===--------------------------------------------------------------------===// |
776 | // Update |
777 | //===--------------------------------------------------------------------===// |
778 | void StringSegment::Update(ColumnData &column_data, SegmentStatistics &stats, Transaction &transaction, Vector &update, |
779 | row_t *ids, idx_t count, idx_t vector_index, idx_t vector_offset, UpdateInfo *node) { |
780 | if (!string_updates) { |
781 | string_updates = unique_ptr<string_update_info_t[]>(new string_update_info_t[max_vector_count]); |
782 | } |
783 | |
784 | // first pin the base block |
785 | auto handle = manager.Pin(block_id); |
786 | auto baseptr = handle->node->buffer; |
787 | auto base = baseptr + vector_index * vector_size; |
788 | auto &base_nullmask = *((nullmask_t *)base); |
789 | |
790 | // fetch the original string locations and copy the original nullmask |
791 | string_location_t string_locations[STANDARD_VECTOR_SIZE]; |
792 | nullmask_t original_nullmask = base_nullmask; |
793 | FetchStringLocations(baseptr, ids, vector_index, vector_offset, count, string_locations); |
794 | |
795 | string_update_info_t new_update_info; |
796 | // next up: create the updates |
797 | if (!string_updates[vector_index]) { |
798 | // no string updates yet, allocate a block and place the updates there |
799 | new_update_info = CreateStringUpdate(stats, update, ids, count, vector_offset); |
800 | } else { |
801 | // string updates already exist, merge the string updates together |
802 | new_update_info = MergeStringUpdate(stats, update, ids, count, vector_offset, *string_updates[vector_index]); |
803 | } |
804 | |
805 | // now update the original nullmask |
806 | auto &update_nullmask = FlatVector::Nullmask(update); |
807 | for (idx_t i = 0; i < count; i++) { |
808 | base_nullmask[ids[i] - vector_offset] = update_nullmask[i]; |
809 | } |
810 | |
811 | // now that the original strings are placed in the undo buffer and the updated strings are placed in the base table |
812 | // create the update node |
813 | if (!node) { |
814 | // create a new node in the undo buffer for this update |
815 | node = CreateUpdateInfo(column_data, transaction, ids, count, vector_index, vector_offset, |
816 | sizeof(string_location_t)); |
817 | |
818 | // copy the string location data into the undo buffer |
819 | node->nullmask = original_nullmask; |
820 | memcpy(node->tuple_data, string_locations, sizeof(string_location_t) * count); |
821 | } else { |
822 | // node in the update info already exists, merge the new updates in |
823 | MergeUpdateInfo(node, ids, count, vector_offset, string_locations, original_nullmask); |
824 | } |
825 | // finally move the string updates in place |
826 | string_updates[vector_index] = move(new_update_info); |
827 | } |
828 | |
829 | void StringSegment::RollbackUpdate(UpdateInfo *info) { |
830 | auto lock_handle = lock.GetExclusiveLock(); |
831 | |
832 | idx_t new_count = 0; |
833 | auto &update_info = *string_updates[info->vector_index]; |
834 | auto string_locations = (string_location_t *)info->tuple_data; |
835 | |
836 | // put the previous NULL values back |
837 | auto handle = manager.Pin(block_id); |
838 | auto baseptr = handle->node->buffer; |
839 | auto base = baseptr + info->vector_index * vector_size; |
840 | auto &base_nullmask = *((nullmask_t *)base); |
841 | for (idx_t i = 0; i < info->N; i++) { |
842 | base_nullmask[info->tuples[i]] = info->nullmask[info->tuples[i]]; |
843 | } |
844 | |
845 | // now put the original values back into the update info |
846 | idx_t old_idx = 0; |
847 | for (idx_t i = 0; i < update_info.count; i++) { |
848 | if (old_idx >= info->N || update_info.ids[i] != info->tuples[old_idx]) { |
849 | assert(old_idx >= info->N || update_info.ids[i] < info->tuples[old_idx]); |
850 | // this entry is not rolled back: insert entry directly |
851 | update_info.ids[new_count] = update_info.ids[i]; |
852 | update_info.block_ids[new_count] = update_info.block_ids[i]; |
853 | update_info.offsets[new_count] = update_info.offsets[i]; |
854 | new_count++; |
855 | } else { |
856 | // this entry is being rolled back |
857 | auto &old_location = string_locations[old_idx]; |
858 | if (old_location.block_id != INVALID_BLOCK) { |
859 | // not rolled back to base table: insert entry again |
860 | update_info.ids[new_count] = update_info.ids[i]; |
861 | update_info.block_ids[new_count] = old_location.block_id; |
862 | update_info.offsets[new_count] = old_location.offset; |
863 | new_count++; |
864 | } |
865 | old_idx++; |
866 | } |
867 | } |
868 | |
869 | if (new_count == 0) { |
870 | // all updates are rolled back: delete the string update vector |
871 | string_updates[info->vector_index].reset(); |
872 | } else { |
873 | // set the count of the new string update vector |
874 | update_info.count = new_count; |
875 | } |
876 | CleanupUpdate(info); |
877 | } |
878 | |