| 1 | #include "duckdb/storage/string_segment.hpp" |
| 2 | #include "duckdb/storage/buffer_manager.hpp" |
| 3 | #include "duckdb/storage/numeric_segment.hpp" |
| 4 | #include "duckdb/transaction/update_info.hpp" |
| 5 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
| 6 | #include "duckdb/storage/data_table.hpp" |
| 7 | #include "duckdb/common/operator/comparison_operators.hpp" |
| 8 | |
| 9 | using namespace duckdb; |
| 10 | using namespace std; |
| 11 | |
| 12 | StringSegment::StringSegment(BufferManager &manager, idx_t row_start, block_id_t block) |
| 13 | : UncompressedSegment(manager, TypeId::VARCHAR, row_start) { |
| 14 | this->max_vector_count = 0; |
| 15 | this->dictionary_offset = 0; |
| 16 | // the vector_size is given in the size of the dictionary offsets |
| 17 | this->vector_size = STANDARD_VECTOR_SIZE * sizeof(int32_t) + sizeof(nullmask_t); |
| 18 | this->string_updates = nullptr; |
| 19 | |
| 20 | this->block_id = block; |
| 21 | if (block_id == INVALID_BLOCK) { |
| 22 | // start off with an empty string segment: allocate space for it |
| 23 | auto handle = manager.Allocate(Storage::BLOCK_ALLOC_SIZE); |
| 24 | this->block_id = handle->block_id; |
| 25 | |
| 26 | ExpandStringSegment(handle->node->buffer); |
| 27 | } |
| 28 | } |
| 29 | |
| 30 | StringSegment::~StringSegment() { |
| 31 | while (head) { |
| 32 | manager.DestroyBuffer(head->block_id); |
| 33 | head = move(head->next); |
| 34 | } |
| 35 | } |
| 36 | |
| 37 | void StringSegment::ExpandStringSegment(data_ptr_t baseptr) { |
| 38 | // clear the nullmask for this vector |
| 39 | auto mask = (nullmask_t *)(baseptr + (max_vector_count * vector_size)); |
| 40 | mask->reset(); |
| 41 | |
| 42 | max_vector_count++; |
| 43 | if (versions) { |
| 44 | auto new_versions = unique_ptr<UpdateInfo *[]>(new UpdateInfo *[max_vector_count]); |
| 45 | memcpy(new_versions.get(), versions.get(), (max_vector_count - 1) * sizeof(UpdateInfo *)); |
| 46 | new_versions[max_vector_count - 1] = nullptr; |
| 47 | versions = move(new_versions); |
| 48 | } |
| 49 | |
| 50 | if (string_updates) { |
| 51 | auto new_string_updates = unique_ptr<string_update_info_t[]>(new string_update_info_t[max_vector_count]); |
| 52 | for (idx_t i = 0; i < max_vector_count - 1; i++) { |
| 53 | new_string_updates[i] = move(string_updates[i]); |
| 54 | } |
| 55 | new_string_updates[max_vector_count - 1] = 0; |
| 56 | string_updates = move(new_string_updates); |
| 57 | } |
| 58 | } |
| 59 | |
| 60 | //===--------------------------------------------------------------------===// |
| 61 | // Scan |
| 62 | //===--------------------------------------------------------------------===// |
| 63 | void StringSegment::InitializeScan(ColumnScanState &state) { |
| 64 | // pin the primary buffer |
| 65 | state.primary_handle = manager.Pin(block_id); |
| 66 | } |
| 67 | |
| 68 | //===--------------------------------------------------------------------===// |
| 69 | // Filter base data |
| 70 | //===--------------------------------------------------------------------===// |
| 71 | void StringSegment::read_string(string_t *result_data, buffer_handle_set_t &handles, data_ptr_t baseptr, |
| 72 | int32_t *dict_offset, idx_t src_idx, idx_t res_idx, idx_t &update_idx, |
| 73 | size_t vector_index) { |
| 74 | if (string_updates && string_updates[vector_index]) { |
| 75 | auto &info = *string_updates[vector_index]; |
| 76 | while (info.ids[update_idx] < src_idx) { |
| 77 | //! We need to catch the update_idx up to the src_idx |
| 78 | update_idx++; |
| 79 | } |
| 80 | if (update_idx < info.count && info.ids[update_idx] == src_idx) { |
| 81 | result_data[res_idx] = ReadString(handles, info.block_ids[update_idx], info.offsets[update_idx]); |
| 82 | } else { |
| 83 | result_data[res_idx] = FetchStringFromDict(handles, baseptr, dict_offset[src_idx]); |
| 84 | } |
| 85 | } else { |
| 86 | result_data[res_idx] = FetchStringFromDict(handles, baseptr, dict_offset[src_idx]); |
| 87 | } |
| 88 | } |
| 89 | |
| 90 | void StringSegment::Select(ColumnScanState &state, Vector &result, SelectionVector &sel, idx_t &approved_tuple_count, |
| 91 | vector<TableFilter> &tableFilter) { |
| 92 | auto vector_index = state.vector_index; |
| 93 | assert(vector_index < max_vector_count); |
| 94 | assert(vector_index * STANDARD_VECTOR_SIZE <= tuple_count); |
| 95 | |
| 96 | auto handle = state.primary_handle.get(); |
| 97 | state.handles.clear(); |
| 98 | auto baseptr = handle->node->buffer; |
| 99 | // fetch the data from the base segment |
| 100 | auto base = baseptr + state.vector_index * vector_size; |
| 101 | auto base_data = (int32_t *)(base + sizeof(nullmask_t)); |
| 102 | auto base_nullmask = (nullmask_t *)base; |
| 103 | |
| 104 | if (tableFilter.size() == 1) { |
| 105 | switch (tableFilter[0].comparison_type) { |
| 106 | case ExpressionType::COMPARE_EQUAL: { |
| 107 | Select_String<Equals>(state.handles, result, baseptr, base_data, sel, tableFilter[0].constant.str_value, |
| 108 | approved_tuple_count, base_nullmask, vector_index); |
| 109 | break; |
| 110 | } |
| 111 | case ExpressionType::COMPARE_LESSTHAN: { |
| 112 | Select_String<LessThan>(state.handles, result, baseptr, base_data, sel, tableFilter[0].constant.str_value, |
| 113 | approved_tuple_count, base_nullmask, vector_index); |
| 114 | break; |
| 115 | } |
| 116 | case ExpressionType::COMPARE_GREATERTHAN: { |
| 117 | Select_String<GreaterThan>(state.handles, result, baseptr, base_data, sel, |
| 118 | tableFilter[0].constant.str_value, approved_tuple_count, base_nullmask, |
| 119 | vector_index); |
| 120 | break; |
| 121 | } |
| 122 | case ExpressionType::COMPARE_LESSTHANOREQUALTO: { |
| 123 | Select_String<LessThanEquals>(state.handles, result, baseptr, base_data, sel, |
| 124 | tableFilter[0].constant.str_value, approved_tuple_count, base_nullmask, |
| 125 | vector_index); |
| 126 | break; |
| 127 | } |
| 128 | case ExpressionType::COMPARE_GREATERTHANOREQUALTO: { |
| 129 | Select_String<GreaterThanEquals>(state.handles, result, baseptr, base_data, sel, |
| 130 | tableFilter[0].constant.str_value, approved_tuple_count, base_nullmask, |
| 131 | vector_index); |
| 132 | |
| 133 | break; |
| 134 | } |
| 135 | default: |
| 136 | throw NotImplementedException("Unknown comparison type for filter pushed down to table!" ); |
| 137 | } |
| 138 | } else { |
| 139 | bool isFirstGreater = tableFilter[0].comparison_type == ExpressionType::COMPARE_GREATERTHAN || |
| 140 | tableFilter[0].comparison_type == ExpressionType::COMPARE_GREATERTHANOREQUALTO; |
| 141 | auto less = isFirstGreater?tableFilter[1]:tableFilter[0]; |
| 142 | auto greater = isFirstGreater?tableFilter[0]:tableFilter[1]; |
| 143 | if (greater.comparison_type == ExpressionType::COMPARE_GREATERTHAN) { |
| 144 | if (less.comparison_type == ExpressionType::COMPARE_LESSTHAN) { |
| 145 | Select_String_Between<GreaterThan, LessThan>( |
| 146 | state.handles, result, baseptr, base_data, sel, greater.constant.str_value, |
| 147 | less.constant.str_value, approved_tuple_count, base_nullmask, vector_index); |
| 148 | } else { |
| 149 | Select_String_Between<GreaterThan, LessThanEquals>( |
| 150 | state.handles, result, baseptr, base_data, sel, greater.constant.str_value, |
| 151 | less.constant.str_value, approved_tuple_count, base_nullmask, vector_index); |
| 152 | } |
| 153 | } else { |
| 154 | if (less.comparison_type == ExpressionType::COMPARE_LESSTHAN) { |
| 155 | Select_String_Between<GreaterThanEquals, LessThan>( |
| 156 | state.handles, result, baseptr, base_data, sel, greater.constant.str_value, |
| 157 | less.constant.str_value, approved_tuple_count, base_nullmask, vector_index); |
| 158 | } else { |
| 159 | Select_String_Between<GreaterThanEquals, LessThanEquals>( |
| 160 | state.handles, result, baseptr, base_data, sel, greater.constant.str_value, |
| 161 | less.constant.str_value, approved_tuple_count, base_nullmask, vector_index); |
| 162 | } |
| 163 | } |
| 164 | } |
| 165 | } |
| 166 | |
| 167 | //===--------------------------------------------------------------------===// |
| 168 | // Fetch base data |
| 169 | //===--------------------------------------------------------------------===// |
| 170 | void StringSegment::FetchBaseData(ColumnScanState &state, idx_t vector_index, Vector &result) { |
| 171 | // clear any previously locked buffers and get the primary buffer handle |
| 172 | auto handle = state.primary_handle.get(); |
| 173 | state.handles.clear(); |
| 174 | |
| 175 | // fetch the data from the base segment |
| 176 | FetchBaseData(state, handle->node->buffer, vector_index, result, GetVectorCount(vector_index)); |
| 177 | } |
| 178 | |
| 179 | void StringSegment::FetchBaseData(ColumnScanState &state, data_ptr_t baseptr, idx_t vector_index, Vector &result, |
| 180 | idx_t count) { |
| 181 | auto base = baseptr + vector_index * vector_size; |
| 182 | |
| 183 | auto &base_nullmask = *((nullmask_t *)base); |
| 184 | auto base_data = (int32_t *)(base + sizeof(nullmask_t)); |
| 185 | auto result_data = FlatVector::GetData<string_t>(result); |
| 186 | |
| 187 | if (string_updates && string_updates[vector_index]) { |
| 188 | // there are updates: merge them in |
| 189 | auto &info = *string_updates[vector_index]; |
| 190 | idx_t update_idx = 0; |
| 191 | for (idx_t i = 0; i < count; i++) { |
| 192 | if (update_idx < info.count && info.ids[update_idx] == i) { |
| 193 | // use update info |
| 194 | result_data[i] = ReadString(state.handles, info.block_ids[update_idx], info.offsets[update_idx]); |
| 195 | update_idx++; |
| 196 | } else { |
| 197 | // use base table info |
| 198 | result_data[i] = FetchStringFromDict(state.handles, baseptr, base_data[i]); |
| 199 | } |
| 200 | } |
| 201 | } else { |
| 202 | // no updates: fetch only from the string dictionary |
| 203 | for (idx_t i = 0; i < count; i++) { |
| 204 | result_data[i] = FetchStringFromDict(state.handles, baseptr, base_data[i]); |
| 205 | } |
| 206 | } |
| 207 | FlatVector::SetNullmask(result, base_nullmask); |
| 208 | } |
| 209 | |
| 210 | void StringSegment::FilterFetchBaseData(ColumnScanState &state, Vector &result, SelectionVector &sel, |
| 211 | idx_t &approved_tuple_count) { |
| 212 | // clear any previously locked buffers and get the primary buffer handle |
| 213 | auto handle = state.primary_handle.get(); |
| 214 | state.handles.clear(); |
| 215 | auto baseptr = handle->node->buffer; |
| 216 | // fetch the data from the base segment |
| 217 | auto base = baseptr + state.vector_index * vector_size; |
| 218 | auto &base_nullmask = *((nullmask_t *)base); |
| 219 | auto base_data = (int32_t *)(base + sizeof(nullmask_t)); |
| 220 | result.vector_type = VectorType::FLAT_VECTOR; |
| 221 | auto result_data = FlatVector::GetData<string_t>(result); |
| 222 | nullmask_t result_nullmask; |
| 223 | idx_t update_idx = 0; |
| 224 | if (base_nullmask.any()) { |
| 225 | for (idx_t i = 0; i < approved_tuple_count; i++) { |
| 226 | idx_t src_idx = sel.get_index(i); |
| 227 | if (base_nullmask[src_idx]) { |
| 228 | result_nullmask.set(i, true); |
| 229 | read_string(result_data, state.handles, baseptr, base_data, src_idx, i, update_idx, state.vector_index); |
| 230 | } else { |
| 231 | read_string(result_data, state.handles, baseptr, base_data, src_idx, i, update_idx, state.vector_index); |
| 232 | } |
| 233 | } |
| 234 | } else { |
| 235 | for (idx_t i = 0; i < approved_tuple_count; i++) { |
| 236 | idx_t src_idx = sel.get_index(i); |
| 237 | read_string(result_data, state.handles, baseptr, base_data, src_idx, i, update_idx, state.vector_index); |
| 238 | } |
| 239 | } |
| 240 | FlatVector::SetNullmask(result, result_nullmask); |
| 241 | } |
| 242 | |
| 243 | //===--------------------------------------------------------------------===// |
| 244 | // Fetch update data |
| 245 | //===--------------------------------------------------------------------===// |
| 246 | void StringSegment::FetchUpdateData(ColumnScanState &state, Transaction &transaction, UpdateInfo *info, |
| 247 | Vector &result) { |
| 248 | // fetch data from updates |
| 249 | auto handle = state.primary_handle.get(); |
| 250 | |
| 251 | auto result_data = FlatVector::GetData<string_t>(result); |
| 252 | auto &result_mask = FlatVector::Nullmask(result); |
| 253 | UpdateInfo::UpdatesForTransaction(info, transaction, [&](UpdateInfo *current) { |
| 254 | auto info_data = (string_location_t *)current->tuple_data; |
| 255 | for (idx_t i = 0; i < current->N; i++) { |
| 256 | auto string = FetchString(state.handles, handle->node->buffer, info_data[i]); |
| 257 | result_data[current->tuples[i]] = string; |
| 258 | result_mask[current->tuples[i]] = current->nullmask[current->tuples[i]]; |
| 259 | } |
| 260 | }); |
| 261 | } |
| 262 | |
| 263 | //===--------------------------------------------------------------------===// |
| 264 | // Fetch strings |
| 265 | //===--------------------------------------------------------------------===// |
| 266 | void StringSegment::FetchStringLocations(data_ptr_t baseptr, row_t *ids, idx_t vector_index, idx_t vector_offset, |
| 267 | idx_t count, string_location_t result[]) { |
| 268 | auto base = baseptr + vector_index * vector_size; |
| 269 | auto base_data = (int32_t *)(base + sizeof(nullmask_t)); |
| 270 | |
| 271 | if (string_updates && string_updates[vector_index]) { |
| 272 | // there are updates: merge them in |
| 273 | auto &info = *string_updates[vector_index]; |
| 274 | idx_t update_idx = 0; |
| 275 | for (idx_t i = 0; i < count; i++) { |
| 276 | auto id = ids[i] - vector_offset; |
| 277 | while (update_idx < info.count && info.ids[update_idx] < id) { |
| 278 | update_idx++; |
| 279 | } |
| 280 | if (update_idx < info.count && info.ids[update_idx] == id) { |
| 281 | // use update info |
| 282 | result[i].block_id = info.block_ids[update_idx]; |
| 283 | result[i].offset = info.offsets[update_idx]; |
| 284 | update_idx++; |
| 285 | } else { |
| 286 | // use base table info |
| 287 | result[i] = FetchStringLocation(baseptr, base_data[id]); |
| 288 | } |
| 289 | } |
| 290 | } else { |
| 291 | // no updates: fetch strings from base vector |
| 292 | for (idx_t i = 0; i < count; i++) { |
| 293 | auto id = ids[i] - vector_offset; |
| 294 | result[i] = FetchStringLocation(baseptr, base_data[id]); |
| 295 | } |
| 296 | } |
| 297 | } |
| 298 | |
| 299 | string_location_t StringSegment::FetchStringLocation(data_ptr_t baseptr, int32_t dict_offset) { |
| 300 | if (dict_offset == 0) { |
| 301 | return string_location_t(INVALID_BLOCK, 0); |
| 302 | } |
| 303 | // look up result in dictionary |
| 304 | auto dict_end = baseptr + Storage::BLOCK_SIZE; |
| 305 | auto dict_pos = dict_end - dict_offset; |
| 306 | auto string_length = *((uint16_t *)dict_pos); |
| 307 | string_location_t result; |
| 308 | if (string_length == BIG_STRING_MARKER) { |
| 309 | ReadStringMarker(dict_pos, result.block_id, result.offset); |
| 310 | } else { |
| 311 | result.block_id = INVALID_BLOCK; |
| 312 | result.offset = dict_offset; |
| 313 | } |
| 314 | return result; |
| 315 | } |
| 316 | |
| 317 | string_t StringSegment::FetchStringFromDict(buffer_handle_set_t &handles, data_ptr_t baseptr, int32_t dict_offset) { |
| 318 | // fetch base data |
| 319 | assert(dict_offset <= Storage::BLOCK_SIZE); |
| 320 | string_location_t location = FetchStringLocation(baseptr, dict_offset); |
| 321 | return FetchString(handles, baseptr, location); |
| 322 | } |
| 323 | |
| 324 | string_t StringSegment::FetchString(buffer_handle_set_t &handles, data_ptr_t baseptr, string_location_t location) { |
| 325 | if (location.block_id != INVALID_BLOCK) { |
| 326 | // big string marker: read from separate block |
| 327 | return ReadString(handles, location.block_id, location.offset); |
| 328 | } else { |
| 329 | if (location.offset == 0) { |
| 330 | return string_t(nullptr, 0); |
| 331 | } |
| 332 | // normal string: read string from this block |
| 333 | auto dict_end = baseptr + Storage::BLOCK_SIZE; |
| 334 | auto dict_pos = dict_end - location.offset; |
| 335 | auto string_length = *((uint16_t *)dict_pos); |
| 336 | |
| 337 | auto str_ptr = (char *)(dict_pos + sizeof(uint16_t)); |
| 338 | return string_t(str_ptr, string_length); |
| 339 | } |
| 340 | } |
| 341 | |
| 342 | void StringSegment::FetchRow(ColumnFetchState &state, Transaction &transaction, row_t row_id, Vector &result, |
| 343 | idx_t result_idx) { |
| 344 | auto read_lock = lock.GetSharedLock(); |
| 345 | |
| 346 | idx_t vector_index = row_id / STANDARD_VECTOR_SIZE; |
| 347 | idx_t id_in_vector = row_id - vector_index * STANDARD_VECTOR_SIZE; |
| 348 | assert(vector_index < max_vector_count); |
| 349 | |
| 350 | data_ptr_t baseptr; |
| 351 | |
| 352 | // fetch a single row from the string segment |
| 353 | // first pin the main buffer if it is not already pinned |
| 354 | auto entry = state.handles.find(block_id); |
| 355 | if (entry == state.handles.end()) { |
| 356 | // not pinned yet: pin it |
| 357 | auto handle = manager.Pin(block_id); |
| 358 | baseptr = handle->node->buffer; |
| 359 | state.handles[block_id] = move(handle); |
| 360 | } else { |
| 361 | // already pinned: use the pinned handle |
| 362 | baseptr = entry->second->node->buffer; |
| 363 | } |
| 364 | |
| 365 | auto base = baseptr + vector_index * vector_size; |
| 366 | auto &base_nullmask = *((nullmask_t *)base); |
| 367 | auto base_data = (int32_t *)(base + sizeof(nullmask_t)); |
| 368 | auto result_data = FlatVector::GetData<string_t>(result); |
| 369 | auto &result_mask = FlatVector::Nullmask(result); |
| 370 | |
| 371 | bool found_data = false; |
| 372 | // first see if there is any updated version of this tuple we must fetch |
| 373 | if (versions && versions[vector_index]) { |
| 374 | UpdateInfo::UpdatesForTransaction(versions[vector_index], transaction, [&](UpdateInfo *current) { |
| 375 | auto info_data = (string_location_t *)current->tuple_data; |
| 376 | // loop over the tuples in this UpdateInfo |
| 377 | for (idx_t i = 0; i < current->N; i++) { |
| 378 | if (current->tuples[i] == row_id) { |
| 379 | // found the relevant tuple |
| 380 | found_data = true; |
| 381 | result_data[result_idx] = FetchString(state.handles, baseptr, info_data[i]); |
| 382 | result_mask[result_idx] = current->nullmask[current->tuples[i]]; |
| 383 | break; |
| 384 | } else if (current->tuples[i] > row_id) { |
| 385 | // tuples are sorted: so if the current tuple is > row_id we will not find it anymore |
| 386 | break; |
| 387 | } |
| 388 | } |
| 389 | }); |
| 390 | } |
| 391 | if (!found_data && string_updates && string_updates[vector_index]) { |
| 392 | // there are updates: check if we should use them |
| 393 | auto &info = *string_updates[vector_index]; |
| 394 | for (idx_t i = 0; i < info.count; i++) { |
| 395 | if (info.ids[i] == id_in_vector) { |
| 396 | // use the update |
| 397 | result_data[result_idx] = ReadString(state.handles, info.block_ids[i], info.offsets[i]); |
| 398 | found_data = true; |
| 399 | break; |
| 400 | } else if (info.ids[i] > id_in_vector) { |
| 401 | break; |
| 402 | } |
| 403 | } |
| 404 | } |
| 405 | if (!found_data) { |
| 406 | // no version was found yet: fetch base table version |
| 407 | result_data[result_idx] = FetchStringFromDict(state.handles, baseptr, base_data[id_in_vector]); |
| 408 | } |
| 409 | result_mask[result_idx] = base_nullmask[id_in_vector]; |
| 410 | } |
| 411 | |
| 412 | //===--------------------------------------------------------------------===// |
| 413 | // Append |
| 414 | //===--------------------------------------------------------------------===// |
| 415 | idx_t StringSegment::Append(SegmentStatistics &stats, Vector &data, idx_t offset, idx_t count) { |
| 416 | assert(data.type == TypeId::VARCHAR); |
| 417 | auto handle = manager.Pin(block_id); |
| 418 | idx_t initial_count = tuple_count; |
| 419 | while (count > 0) { |
| 420 | // get the vector index of the vector to append to and see how many tuples we can append to that vector |
| 421 | idx_t vector_index = tuple_count / STANDARD_VECTOR_SIZE; |
| 422 | if (vector_index == max_vector_count) { |
| 423 | // we are at the maximum vector, check if there is space to increase the maximum vector count |
| 424 | // as a heuristic, we only allow another vector to be added if we have at least 32 bytes per string |
| 425 | // remaining (32KB out of a 256KB block, or around 12% empty) |
| 426 | if (RemainingSpace() >= STANDARD_VECTOR_SIZE * 32) { |
| 427 | // we have enough remaining space to add another vector |
| 428 | ExpandStringSegment(handle->node->buffer); |
| 429 | } else { |
| 430 | break; |
| 431 | } |
| 432 | } |
| 433 | idx_t current_tuple_count = tuple_count - vector_index * STANDARD_VECTOR_SIZE; |
| 434 | idx_t append_count = std::min(STANDARD_VECTOR_SIZE - current_tuple_count, count); |
| 435 | |
| 436 | // now perform the actual append |
| 437 | AppendData(stats, handle->node->buffer + vector_size * vector_index, handle->node->buffer + Storage::BLOCK_SIZE, |
| 438 | current_tuple_count, data, offset, append_count); |
| 439 | |
| 440 | count -= append_count; |
| 441 | offset += append_count; |
| 442 | tuple_count += append_count; |
| 443 | } |
| 444 | return tuple_count - initial_count; |
| 445 | } |
| 446 | |
| 447 | static void update_min_max(string value, char *__restrict min, char *__restrict max) { |
| 448 | //! we can only fit 8 bytes, so we might need to trim our string |
| 449 | size_t value_size = value.size() > 7 ? 7 : value.size(); |
| 450 | //! This marks the min/max was not initialized |
| 451 | char marker = '1'; |
| 452 | if (min[0] == '\0' && min[1] == marker && max[0] == '\0' && max[1] == marker) { |
| 453 | size_t min_end = value.copy(min, value_size); |
| 454 | size_t max_end = value.copy(max, value_size); |
| 455 | for (size_t i = min_end; i < 8; i++) { |
| 456 | min[i] = '\0'; |
| 457 | } |
| 458 | for (size_t i = max_end; i < 8; i++) { |
| 459 | max[i] = '\0'; |
| 460 | } |
| 461 | } |
| 462 | if (strcmp(value.data(), min) < 0) { |
| 463 | size_t min_end = value.copy(min, value_size); |
| 464 | for (size_t i = min_end; i < 8; i++) { |
| 465 | min[i] = '\0'; |
| 466 | } |
| 467 | } |
| 468 | if (strcmp(value.data(), max) > 0) { |
| 469 | size_t max_end = value.copy(max, value_size); |
| 470 | for (size_t i = max_end; i < 8; i++) { |
| 471 | max[i] = '\0'; |
| 472 | } |
| 473 | } |
| 474 | } |
| 475 | |
| 476 | void StringSegment::AppendData(SegmentStatistics &stats, data_ptr_t target, data_ptr_t end, idx_t target_offset, |
| 477 | Vector &source, idx_t offset, idx_t count) { |
| 478 | VectorData adata; |
| 479 | source.Orrify(count, adata); |
| 480 | |
| 481 | auto sdata = (string_t *)adata.data; |
| 482 | auto &result_nullmask = *((nullmask_t *)target); |
| 483 | auto result_data = (int32_t *)(target + sizeof(nullmask_t)); |
| 484 | auto min = (char *)stats.minimum.get(); |
| 485 | auto max = (char *)stats.maximum.get(); |
| 486 | |
| 487 | idx_t remaining_strings = STANDARD_VECTOR_SIZE - (this->tuple_count % STANDARD_VECTOR_SIZE); |
| 488 | for (idx_t i = 0; i < count; i++) { |
| 489 | auto source_idx = adata.sel->get_index(offset + i); |
| 490 | auto target_idx = target_offset + i; |
| 491 | if ((*adata.nullmask)[source_idx]) { |
| 492 | // null value is stored as -1 |
| 493 | result_data[target_idx] = 0; |
| 494 | result_nullmask[target_idx] = true; |
| 495 | stats.has_null = true; |
| 496 | } else { |
| 497 | assert(dictionary_offset < Storage::BLOCK_SIZE); |
| 498 | // non-null value, check if we can fit it within the block |
| 499 | idx_t string_length = sdata[source_idx].GetSize(); |
| 500 | idx_t total_length = string_length + 1 + sizeof(uint16_t); |
| 501 | |
| 502 | if (string_length > stats.max_string_length) { |
| 503 | stats.max_string_length = string_length; |
| 504 | } |
| 505 | // determine whether or not the string needs to be stored in an overflow block |
| 506 | // we never place small strings in the overflow blocks: the pointer would take more space than the |
| 507 | // string itself we always place big strings (>= STRING_BLOCK_LIMIT) in the overflow blocks we also have |
| 508 | // to always leave enough room for BIG_STRING_MARKER_SIZE for each of the remaining strings |
| 509 | if (total_length > BIG_STRING_MARKER_BASE_SIZE && |
| 510 | (total_length >= STRING_BLOCK_LIMIT || |
| 511 | total_length + (remaining_strings * BIG_STRING_MARKER_SIZE) > RemainingSpace())) { |
| 512 | assert(RemainingSpace() >= BIG_STRING_MARKER_SIZE); |
| 513 | // string is too big for block: write to overflow blocks |
| 514 | block_id_t block; |
| 515 | int32_t offset; |
| 516 | //! Update min/max of column segment |
| 517 | update_min_max(sdata[source_idx].GetData(), min, max); |
| 518 | // write the string into the current string block |
| 519 | WriteString(sdata[source_idx], block, offset); |
| 520 | dictionary_offset += BIG_STRING_MARKER_SIZE; |
| 521 | auto dict_pos = end - dictionary_offset; |
| 522 | |
| 523 | // write a big string marker into the dictionary |
| 524 | WriteStringMarker(dict_pos, block, offset); |
| 525 | |
| 526 | stats.has_overflow_strings = true; |
| 527 | } else { |
| 528 | // string fits in block, append to dictionary and increment dictionary position |
| 529 | assert(string_length < std::numeric_limits<uint16_t>::max()); |
| 530 | dictionary_offset += total_length; |
| 531 | auto dict_pos = end - dictionary_offset; |
| 532 | //! Update min/max of column segment |
| 533 | update_min_max(sdata[source_idx].GetData(), min, max); |
| 534 | // first write the length as u16 |
| 535 | uint16_t string_length_u16 = string_length; |
| 536 | memcpy(dict_pos, &string_length_u16, sizeof(uint16_t)); |
| 537 | // now write the actual string data into the dictionary |
| 538 | memcpy(dict_pos + sizeof(uint16_t), sdata[source_idx].GetData(), string_length + 1); |
| 539 | } |
| 540 | // place the dictionary offset into the set of vectors |
| 541 | assert(dictionary_offset <= Storage::BLOCK_SIZE); |
| 542 | result_data[target_idx] = dictionary_offset; |
| 543 | } |
| 544 | remaining_strings--; |
| 545 | } |
| 546 | } |
| 547 | |
| 548 | void StringSegment::WriteString(string_t string, block_id_t &result_block, int32_t &result_offset) { |
| 549 | assert(strlen(string.GetData()) == string.GetSize()); |
| 550 | if (overflow_writer) { |
| 551 | // overflow writer is set: write string there |
| 552 | overflow_writer->WriteString(string, result_block, result_offset); |
| 553 | } else { |
| 554 | // default overflow behavior: use in-memory buffer to store the overflow string |
| 555 | WriteStringMemory(string, result_block, result_offset); |
| 556 | } |
| 557 | } |
| 558 | |
| 559 | void StringSegment::WriteStringMemory(string_t string, block_id_t &result_block, int32_t &result_offset) { |
| 560 | uint32_t total_length = string.GetSize() + 1 + sizeof(uint32_t); |
| 561 | unique_ptr<BufferHandle> handle; |
| 562 | // check if the string fits in the current block |
| 563 | if (!head || head->offset + total_length >= head->size) { |
| 564 | // string does not fit, allocate space for it |
| 565 | // create a new string block |
| 566 | idx_t alloc_size = std::max((idx_t)total_length, (idx_t)Storage::BLOCK_ALLOC_SIZE); |
| 567 | auto new_block = make_unique<StringBlock>(); |
| 568 | new_block->offset = 0; |
| 569 | new_block->size = alloc_size; |
| 570 | // allocate an in-memory buffer for it |
| 571 | handle = manager.Allocate(alloc_size); |
| 572 | new_block->block_id = handle->block_id; |
| 573 | new_block->next = move(head); |
| 574 | head = move(new_block); |
| 575 | } else { |
| 576 | // string fits, copy it into the current block |
| 577 | handle = manager.Pin(head->block_id); |
| 578 | } |
| 579 | |
| 580 | result_block = head->block_id; |
| 581 | result_offset = head->offset; |
| 582 | |
| 583 | // copy the string and the length there |
| 584 | auto ptr = handle->node->buffer + head->offset; |
| 585 | memcpy(ptr, &string.length, sizeof(uint32_t)); |
| 586 | ptr += sizeof(uint32_t); |
| 587 | memcpy(ptr, string.GetData(), string.length + 1); |
| 588 | head->offset += total_length; |
| 589 | } |
| 590 | |
| 591 | string_t StringSegment::ReadString(buffer_handle_set_t &handles, block_id_t block, int32_t offset) { |
| 592 | assert(offset < Storage::BLOCK_SIZE); |
| 593 | if (block == INVALID_BLOCK) { |
| 594 | return string_t(nullptr, 0); |
| 595 | } |
| 596 | if (block < MAXIMUM_BLOCK) { |
| 597 | // read the overflow string from disk |
| 598 | // pin the initial handle and read the length |
| 599 | auto handle = manager.Pin(block); |
| 600 | uint32_t length = *((uint32_t *)(handle->node->buffer + offset)); |
| 601 | uint32_t remaining = length + 1; |
| 602 | offset += sizeof(uint32_t); |
| 603 | |
| 604 | // allocate a buffer to store the string |
| 605 | auto alloc_size = std::max((idx_t)Storage::BLOCK_ALLOC_SIZE, (idx_t)length + 1 + sizeof(uint32_t)); |
| 606 | auto target_handle = manager.Allocate(alloc_size, true); |
| 607 | auto target_ptr = target_handle->node->buffer; |
| 608 | // write the length in this block as well |
| 609 | *((uint32_t *)target_ptr) = length; |
| 610 | target_ptr += sizeof(uint32_t); |
| 611 | // now append the string to the single buffer |
| 612 | while (remaining > 0) { |
| 613 | idx_t to_write = std::min((idx_t)remaining, (idx_t)(Storage::BLOCK_SIZE - sizeof(block_id_t) - offset)); |
| 614 | memcpy(target_ptr, handle->node->buffer + offset, to_write); |
| 615 | |
| 616 | remaining -= to_write; |
| 617 | offset += to_write; |
| 618 | target_ptr += to_write; |
| 619 | if (remaining > 0) { |
| 620 | // read the next block |
| 621 | block_id_t next_block = *((block_id_t *)(handle->node->buffer + offset)); |
| 622 | handle = manager.Pin(next_block); |
| 623 | offset = 0; |
| 624 | } |
| 625 | } |
| 626 | |
| 627 | auto final_buffer = target_handle->node->buffer; |
| 628 | handles.insert(make_pair(target_handle->block_id, move(target_handle))); |
| 629 | return ReadString(final_buffer, 0); |
| 630 | } else { |
| 631 | // read the overflow string from memory |
| 632 | // first pin the handle, if it is not pinned yet |
| 633 | BufferHandle *handle; |
| 634 | auto entry = handles.find(block); |
| 635 | if (entry == handles.end()) { |
| 636 | auto pinned_handle = manager.Pin(block); |
| 637 | handle = pinned_handle.get(); |
| 638 | |
| 639 | handles.insert(make_pair(block, move(pinned_handle))); |
| 640 | } else { |
| 641 | handle = entry->second.get(); |
| 642 | } |
| 643 | return ReadString(handle->node->buffer, offset); |
| 644 | } |
| 645 | } |
| 646 | |
| 647 | string_t StringSegment::ReadString(data_ptr_t target, int32_t offset) { |
| 648 | auto ptr = target + offset; |
| 649 | auto str_length = *((uint32_t *)ptr); |
| 650 | auto str_ptr = (char *)(ptr + sizeof(uint32_t)); |
| 651 | return string_t(str_ptr, str_length); |
| 652 | } |
| 653 | |
| 654 | void StringSegment::WriteStringMarker(data_ptr_t target, block_id_t block_id, int32_t offset) { |
| 655 | uint16_t length = BIG_STRING_MARKER; |
| 656 | memcpy(target, &length, sizeof(uint16_t)); |
| 657 | target += sizeof(uint16_t); |
| 658 | memcpy(target, &block_id, sizeof(block_id_t)); |
| 659 | target += sizeof(block_id_t); |
| 660 | memcpy(target, &offset, sizeof(int32_t)); |
| 661 | } |
| 662 | |
| 663 | void StringSegment::ReadStringMarker(data_ptr_t target, block_id_t &block_id, int32_t &offset) { |
| 664 | target += sizeof(uint16_t); |
| 665 | memcpy(&block_id, target, sizeof(block_id_t)); |
| 666 | target += sizeof(block_id_t); |
| 667 | memcpy(&offset, target, sizeof(int32_t)); |
| 668 | } |
| 669 | |
| 670 | //===--------------------------------------------------------------------===// |
| 671 | // String Update |
| 672 | //===--------------------------------------------------------------------===// |
| 673 | string_update_info_t StringSegment::CreateStringUpdate(SegmentStatistics &stats, Vector &update, row_t *ids, |
| 674 | idx_t count, idx_t vector_offset) { |
| 675 | auto info = make_unique<StringUpdateInfo>(); |
| 676 | info->count = count; |
| 677 | auto strings = FlatVector::GetData<string_t>(update); |
| 678 | auto &update_nullmask = FlatVector::Nullmask(update); |
| 679 | for (idx_t i = 0; i < count; i++) { |
| 680 | info->ids[i] = ids[i] - vector_offset; |
| 681 | // copy the string into the block |
| 682 | if (!update_nullmask[i]) { |
| 683 | auto min = (char *)stats.minimum.get(); |
| 684 | auto max = (char *)stats.maximum.get(); |
| 685 | update_min_max(strings[i].GetData(), min, max); |
| 686 | WriteString(strings[i], info->block_ids[i], info->offsets[i]); |
| 687 | } else { |
| 688 | info->block_ids[i] = INVALID_BLOCK; |
| 689 | info->offsets[i] = 0; |
| 690 | } |
| 691 | } |
| 692 | return info; |
| 693 | } |
| 694 | |
| 695 | string_update_info_t StringSegment::MergeStringUpdate(SegmentStatistics &stats, Vector &update, row_t *ids, |
| 696 | idx_t update_count, idx_t vector_offset, |
| 697 | StringUpdateInfo &update_info) { |
| 698 | auto info = make_unique<StringUpdateInfo>(); |
| 699 | |
| 700 | // perform a merge between the new and old indexes |
| 701 | auto strings = FlatVector::GetData<string_t>(update); |
| 702 | auto &update_nullmask = FlatVector::Nullmask(update); |
| 703 | //! Check if we need to update the segment's nullmask |
| 704 | for (idx_t i = 0; i < update_count; i++) { |
| 705 | if (!update_nullmask[i]) { |
| 706 | auto min = (char *)stats.minimum.get(); |
| 707 | auto max = (char *)stats.maximum.get(); |
| 708 | update_min_max(strings[i].GetData(), min, max); |
| 709 | } |
| 710 | } |
| 711 | auto pick_new = [&](idx_t id, idx_t idx, idx_t count) { |
| 712 | info->ids[count] = id; |
| 713 | if (!update_nullmask[idx]) { |
| 714 | WriteString(strings[idx], info->block_ids[count], info->offsets[count]); |
| 715 | } else { |
| 716 | info->block_ids[count] = INVALID_BLOCK; |
| 717 | info->offsets[count] = 0; |
| 718 | } |
| 719 | }; |
| 720 | auto merge = [&](idx_t id, idx_t aidx, idx_t bidx, idx_t count) { |
| 721 | // merge: only pick new entry |
| 722 | pick_new(id, aidx, count); |
| 723 | }; |
| 724 | auto pick_old = [&](idx_t id, idx_t bidx, idx_t count) { |
| 725 | // pick old entry |
| 726 | info->ids[count] = id; |
| 727 | info->block_ids[count] = update_info.block_ids[bidx]; |
| 728 | info->offsets[count] = update_info.offsets[bidx]; |
| 729 | }; |
| 730 | |
| 731 | info->count = |
| 732 | merge_loop(ids, update_info.ids, update_count, update_info.count, vector_offset, merge, pick_new, pick_old); |
| 733 | return info; |
| 734 | } |
| 735 | |
| 736 | //===--------------------------------------------------------------------===// |
| 737 | // Update Info |
| 738 | //===--------------------------------------------------------------------===// |
| 739 | void StringSegment::MergeUpdateInfo(UpdateInfo *node, row_t *ids, idx_t update_count, idx_t vector_offset, |
| 740 | string_location_t base_data[], nullmask_t base_nullmask) { |
| 741 | auto info_data = (string_location_t *)node->tuple_data; |
| 742 | |
| 743 | // first we copy the old update info into a temporary structure |
| 744 | sel_t old_ids[STANDARD_VECTOR_SIZE]; |
| 745 | string_location_t old_data[STANDARD_VECTOR_SIZE]; |
| 746 | |
| 747 | memcpy(old_ids, node->tuples, node->N * sizeof(sel_t)); |
| 748 | memcpy(old_data, node->tuple_data, node->N * sizeof(string_location_t)); |
| 749 | |
| 750 | // now we perform a merge of the new ids with the old ids |
| 751 | auto merge = [&](idx_t id, idx_t aidx, idx_t bidx, idx_t count) { |
| 752 | // new_id and old_id are the same, insert the old data in the UpdateInfo |
| 753 | assert(old_data[bidx].IsValid()); |
| 754 | info_data[count] = old_data[bidx]; |
| 755 | node->tuples[count] = id; |
| 756 | }; |
| 757 | auto pick_new = [&](idx_t id, idx_t aidx, idx_t count) { |
| 758 | // new_id comes before the old id, insert the base table data into the update info |
| 759 | assert(base_data[aidx].IsValid()); |
| 760 | info_data[count] = base_data[aidx]; |
| 761 | node->nullmask[id] = base_nullmask[aidx]; |
| 762 | |
| 763 | node->tuples[count] = id; |
| 764 | }; |
| 765 | auto pick_old = [&](idx_t id, idx_t bidx, idx_t count) { |
| 766 | // old_id comes before new_id, insert the old data |
| 767 | assert(old_data[bidx].IsValid()); |
| 768 | info_data[count] = old_data[bidx]; |
| 769 | node->tuples[count] = id; |
| 770 | }; |
| 771 | // perform the merge |
| 772 | node->N = merge_loop(ids, old_ids, update_count, node->N, vector_offset, merge, pick_new, pick_old); |
| 773 | } |
| 774 | |
| 775 | //===--------------------------------------------------------------------===// |
| 776 | // Update |
| 777 | //===--------------------------------------------------------------------===// |
| 778 | void StringSegment::Update(ColumnData &column_data, SegmentStatistics &stats, Transaction &transaction, Vector &update, |
| 779 | row_t *ids, idx_t count, idx_t vector_index, idx_t vector_offset, UpdateInfo *node) { |
| 780 | if (!string_updates) { |
| 781 | string_updates = unique_ptr<string_update_info_t[]>(new string_update_info_t[max_vector_count]); |
| 782 | } |
| 783 | |
| 784 | // first pin the base block |
| 785 | auto handle = manager.Pin(block_id); |
| 786 | auto baseptr = handle->node->buffer; |
| 787 | auto base = baseptr + vector_index * vector_size; |
| 788 | auto &base_nullmask = *((nullmask_t *)base); |
| 789 | |
| 790 | // fetch the original string locations and copy the original nullmask |
| 791 | string_location_t string_locations[STANDARD_VECTOR_SIZE]; |
| 792 | nullmask_t original_nullmask = base_nullmask; |
| 793 | FetchStringLocations(baseptr, ids, vector_index, vector_offset, count, string_locations); |
| 794 | |
| 795 | string_update_info_t new_update_info; |
| 796 | // next up: create the updates |
| 797 | if (!string_updates[vector_index]) { |
| 798 | // no string updates yet, allocate a block and place the updates there |
| 799 | new_update_info = CreateStringUpdate(stats, update, ids, count, vector_offset); |
| 800 | } else { |
| 801 | // string updates already exist, merge the string updates together |
| 802 | new_update_info = MergeStringUpdate(stats, update, ids, count, vector_offset, *string_updates[vector_index]); |
| 803 | } |
| 804 | |
| 805 | // now update the original nullmask |
| 806 | auto &update_nullmask = FlatVector::Nullmask(update); |
| 807 | for (idx_t i = 0; i < count; i++) { |
| 808 | base_nullmask[ids[i] - vector_offset] = update_nullmask[i]; |
| 809 | } |
| 810 | |
| 811 | // now that the original strings are placed in the undo buffer and the updated strings are placed in the base table |
| 812 | // create the update node |
| 813 | if (!node) { |
| 814 | // create a new node in the undo buffer for this update |
| 815 | node = CreateUpdateInfo(column_data, transaction, ids, count, vector_index, vector_offset, |
| 816 | sizeof(string_location_t)); |
| 817 | |
| 818 | // copy the string location data into the undo buffer |
| 819 | node->nullmask = original_nullmask; |
| 820 | memcpy(node->tuple_data, string_locations, sizeof(string_location_t) * count); |
| 821 | } else { |
| 822 | // node in the update info already exists, merge the new updates in |
| 823 | MergeUpdateInfo(node, ids, count, vector_offset, string_locations, original_nullmask); |
| 824 | } |
| 825 | // finally move the string updates in place |
| 826 | string_updates[vector_index] = move(new_update_info); |
| 827 | } |
| 828 | |
| 829 | void StringSegment::RollbackUpdate(UpdateInfo *info) { |
| 830 | auto lock_handle = lock.GetExclusiveLock(); |
| 831 | |
| 832 | idx_t new_count = 0; |
| 833 | auto &update_info = *string_updates[info->vector_index]; |
| 834 | auto string_locations = (string_location_t *)info->tuple_data; |
| 835 | |
| 836 | // put the previous NULL values back |
| 837 | auto handle = manager.Pin(block_id); |
| 838 | auto baseptr = handle->node->buffer; |
| 839 | auto base = baseptr + info->vector_index * vector_size; |
| 840 | auto &base_nullmask = *((nullmask_t *)base); |
| 841 | for (idx_t i = 0; i < info->N; i++) { |
| 842 | base_nullmask[info->tuples[i]] = info->nullmask[info->tuples[i]]; |
| 843 | } |
| 844 | |
| 845 | // now put the original values back into the update info |
| 846 | idx_t old_idx = 0; |
| 847 | for (idx_t i = 0; i < update_info.count; i++) { |
| 848 | if (old_idx >= info->N || update_info.ids[i] != info->tuples[old_idx]) { |
| 849 | assert(old_idx >= info->N || update_info.ids[i] < info->tuples[old_idx]); |
| 850 | // this entry is not rolled back: insert entry directly |
| 851 | update_info.ids[new_count] = update_info.ids[i]; |
| 852 | update_info.block_ids[new_count] = update_info.block_ids[i]; |
| 853 | update_info.offsets[new_count] = update_info.offsets[i]; |
| 854 | new_count++; |
| 855 | } else { |
| 856 | // this entry is being rolled back |
| 857 | auto &old_location = string_locations[old_idx]; |
| 858 | if (old_location.block_id != INVALID_BLOCK) { |
| 859 | // not rolled back to base table: insert entry again |
| 860 | update_info.ids[new_count] = update_info.ids[i]; |
| 861 | update_info.block_ids[new_count] = old_location.block_id; |
| 862 | update_info.offsets[new_count] = old_location.offset; |
| 863 | new_count++; |
| 864 | } |
| 865 | old_idx++; |
| 866 | } |
| 867 | } |
| 868 | |
| 869 | if (new_count == 0) { |
| 870 | // all updates are rolled back: delete the string update vector |
| 871 | string_updates[info->vector_index].reset(); |
| 872 | } else { |
| 873 | // set the count of the new string update vector |
| 874 | update_info.count = new_count; |
| 875 | } |
| 876 | CleanupUpdate(info); |
| 877 | } |
| 878 | |