| 1 | #include "duckdb/common/vector_operations/binary_executor.hpp" |
| 2 | #include "duckdb/storage/data_table.hpp" |
| 3 | #include "duckdb/common/operator/comparison_operators.hpp" |
| 4 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
| 5 | #include "duckdb/storage/uncompressed_segment.hpp" |
| 6 | #include "duckdb/common/exception.hpp" |
| 7 | #include "duckdb/common/types/vector.hpp" |
| 8 | #include "duckdb/transaction/update_info.hpp" |
| 9 | |
| 10 | using namespace duckdb; |
| 11 | using namespace std; |
| 12 | |
| 13 | UncompressedSegment::UncompressedSegment(BufferManager &manager, TypeId type, idx_t row_start) |
| 14 | : manager(manager), type(type), block_id(INVALID_BLOCK), max_vector_count(0), tuple_count(0), |
| 15 | row_start(row_start), |
| 16 | versions(nullptr) { |
| 17 | } |
| 18 | |
| 19 | UncompressedSegment::~UncompressedSegment() { |
| 20 | if (block_id >= MAXIMUM_BLOCK) { |
| 21 | // if the uncompressed segment had an in-memory segment, destroy it when the uncompressed segment is destroyed |
| 22 | manager.DestroyBuffer(block_id); |
| 23 | } |
| 24 | } |
| 25 | |
| 26 | void UncompressedSegment::Verify(Transaction &transaction) { |
| 27 | #ifdef DEBUG |
| 28 | ColumnScanState state; |
| 29 | InitializeScan(state); |
| 30 | |
| 31 | Vector result(this->type); |
| 32 | for (idx_t i = 0; i < this->tuple_count; i += STANDARD_VECTOR_SIZE) { |
| 33 | idx_t vector_idx = i / STANDARD_VECTOR_SIZE; |
| 34 | idx_t count = std::min((idx_t)STANDARD_VECTOR_SIZE, tuple_count - i); |
| 35 | Scan(transaction, state, vector_idx, result); |
| 36 | result.Verify(count); |
| 37 | } |
| 38 | #endif |
| 39 | } |
| 40 | |
| 41 | static void CheckForConflicts(UpdateInfo *info, Transaction &transaction, row_t *ids, idx_t count, row_t offset, |
| 42 | UpdateInfo *&node) { |
| 43 | if (info->version_number == transaction.transaction_id) { |
| 44 | // this UpdateInfo belongs to the current transaction, set it in the node |
| 45 | node = info; |
| 46 | } else if (info->version_number > transaction.start_time) { |
| 47 | // potential conflict, check that tuple ids do not conflict |
| 48 | // as both ids and info->tuples are sorted, this is similar to a merge join |
| 49 | idx_t i = 0, j = 0; |
| 50 | while (true) { |
| 51 | auto id = ids[i] - offset; |
| 52 | if (id == info->tuples[j]) { |
| 53 | throw TransactionException("Conflict on update!" ); |
| 54 | } else if (id < info->tuples[j]) { |
| 55 | // id < the current tuple in info, move to next id |
| 56 | i++; |
| 57 | if (i == count) { |
| 58 | break; |
| 59 | } |
| 60 | } else { |
| 61 | // id > the current tuple, move to next tuple in info |
| 62 | j++; |
| 63 | if (j == info->N) { |
| 64 | break; |
| 65 | } |
| 66 | } |
| 67 | } |
| 68 | } |
| 69 | if (info->next) { |
| 70 | CheckForConflicts(info->next, transaction, ids, count, offset, node); |
| 71 | } |
| 72 | } |
| 73 | |
| 74 | void UncompressedSegment::Update(ColumnData &column_data, SegmentStatistics &stats, Transaction &transaction, |
| 75 | Vector &update, row_t *ids, idx_t count, row_t offset) { |
| 76 | // can only perform in-place updates on temporary blocks |
| 77 | assert(block_id >= MAXIMUM_BLOCK); |
| 78 | |
| 79 | // obtain an exclusive lock |
| 80 | auto write_lock = lock.GetExclusiveLock(); |
| 81 | |
| 82 | #ifdef DEBUG |
| 83 | // verify that the ids are sorted and there are no duplicates |
| 84 | for (idx_t i = 1; i < count; i++) { |
| 85 | assert(ids[i] > ids[i - 1]); |
| 86 | } |
| 87 | #endif |
| 88 | |
| 89 | // create the versions for this segment, if there are none yet |
| 90 | if (!versions) { |
| 91 | this->versions = unique_ptr<UpdateInfo *[]>(new UpdateInfo *[max_vector_count]); |
| 92 | for (idx_t i = 0; i < max_vector_count; i++) { |
| 93 | this->versions[i] = nullptr; |
| 94 | } |
| 95 | } |
| 96 | |
| 97 | // get the vector index based on the first id |
| 98 | // we assert that all updates must be part of the same vector |
| 99 | auto first_id = ids[0]; |
| 100 | idx_t vector_index = (first_id - offset) / STANDARD_VECTOR_SIZE; |
| 101 | idx_t vector_offset = offset + vector_index * STANDARD_VECTOR_SIZE; |
| 102 | |
| 103 | assert(first_id >= offset); |
| 104 | assert(vector_index < max_vector_count); |
| 105 | |
| 106 | // first check the version chain |
| 107 | UpdateInfo *node = nullptr; |
| 108 | if (versions[vector_index]) { |
| 109 | // there is already a version here, check if there are any conflicts and search for the node that belongs to |
| 110 | // this transaction in the version chain |
| 111 | CheckForConflicts(versions[vector_index], transaction, ids, count, vector_offset, node); |
| 112 | } |
| 113 | Update(column_data, stats, transaction, update, ids, count, vector_index, vector_offset, node); |
| 114 | } |
| 115 | |
| 116 | UpdateInfo *UncompressedSegment::CreateUpdateInfo(ColumnData &column_data, Transaction &transaction, row_t *ids, |
| 117 | idx_t count, idx_t vector_index, idx_t vector_offset, |
| 118 | idx_t type_size) { |
| 119 | auto node = transaction.CreateUpdateInfo(type_size, STANDARD_VECTOR_SIZE); |
| 120 | node->column_data = &column_data; |
| 121 | node->segment = this; |
| 122 | node->vector_index = vector_index; |
| 123 | node->prev = nullptr; |
| 124 | node->next = versions[vector_index]; |
| 125 | if (node->next) { |
| 126 | node->next->prev = node; |
| 127 | } |
| 128 | versions[vector_index] = node; |
| 129 | |
| 130 | // set up the tuple ids |
| 131 | node->N = count; |
| 132 | for (idx_t i = 0; i < count; i++) { |
| 133 | assert((idx_t) ids[i] >= vector_offset && (idx_t) ids[i] < vector_offset + STANDARD_VECTOR_SIZE); |
| 134 | node->tuples[i] = ids[i] - vector_offset; |
| 135 | }; |
| 136 | return node; |
| 137 | } |
| 138 | |
| 139 | void UncompressedSegment::Fetch(ColumnScanState &state, idx_t vector_index, Vector &result) { |
| 140 | auto read_lock = lock.GetSharedLock(); |
| 141 | InitializeScan(state); |
| 142 | FetchBaseData(state, vector_index, result); |
| 143 | } |
| 144 | |
| 145 | //===--------------------------------------------------------------------===// |
| 146 | // Filter |
| 147 | //===--------------------------------------------------------------------===// |
| 148 | template<class T> |
| 149 | static void filterSelectionType(T *vec, T *predicate, SelectionVector &sel, idx_t &approved_tuple_count, |
| 150 | ExpressionType comparison_type, nullmask_t &nullmask) { |
| 151 | SelectionVector new_sel(approved_tuple_count); |
| 152 | // the inplace loops take the result as the last parameter |
| 153 | switch (comparison_type) { |
| 154 | case ExpressionType::COMPARE_EQUAL: { |
| 155 | if (nullmask.any()) { |
| 156 | approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, Equals, false, true, true, true, false>( |
| 157 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
| 158 | } else { |
| 159 | approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, Equals, false, true, false, true, false>( |
| 160 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
| 161 | } |
| 162 | break; |
| 163 | } |
| 164 | case ExpressionType::COMPARE_LESSTHAN: { |
| 165 | if (nullmask.any()) { |
| 166 | approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, LessThan, false, true, true, true, false>( |
| 167 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
| 168 | } else { |
| 169 | approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, LessThan, false, true, false, true, false>( |
| 170 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
| 171 | } |
| 172 | break; |
| 173 | } |
| 174 | case ExpressionType::COMPARE_GREATERTHAN: { |
| 175 | if (nullmask.any()) { |
| 176 | approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, GreaterThan, false, true, true, true, false>( |
| 177 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
| 178 | } else { |
| 179 | approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, GreaterThan, false, true, false, true, false>( |
| 180 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
| 181 | } |
| 182 | break; |
| 183 | } |
| 184 | case ExpressionType::COMPARE_LESSTHANOREQUALTO: { |
| 185 | if (nullmask.any()) { |
| 186 | approved_tuple_count = BinaryExecutor::SelectFlatLoop<T, T, LessThanEquals, false, true, true, true, false>( |
| 187 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
| 188 | } else { |
| 189 | approved_tuple_count = |
| 190 | BinaryExecutor::SelectFlatLoop<T, T, LessThanEquals, false, true, false, true, false>( |
| 191 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
| 192 | } |
| 193 | break; |
| 194 | } |
| 195 | case ExpressionType::COMPARE_GREATERTHANOREQUALTO: { |
| 196 | if (nullmask.any()) { |
| 197 | approved_tuple_count = |
| 198 | BinaryExecutor::SelectFlatLoop<T, T, GreaterThanEquals, false, true, true, true, false>( |
| 199 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
| 200 | } else { |
| 201 | approved_tuple_count = |
| 202 | BinaryExecutor::SelectFlatLoop<T, T, GreaterThanEquals, false, true, false, true, false>( |
| 203 | vec, predicate, &sel, approved_tuple_count, nullmask, &new_sel, &sel); |
| 204 | } |
| 205 | break; |
| 206 | } |
| 207 | default: |
| 208 | throw NotImplementedException("Unknown comparison type for filter pushed down to table!" ); |
| 209 | } |
| 210 | sel.Initialize(new_sel); |
| 211 | } |
| 212 | |
| 213 | void UncompressedSegment::filterSelection(SelectionVector &sel, Vector &result, TableFilter filter, |
| 214 | idx_t &approved_tuple_count, nullmask_t &nullmask) { |
| 215 | // the inplace loops take the result as the last parameter |
| 216 | switch (result.type) { |
| 217 | case TypeId::INT8: { |
| 218 | auto result_flat = FlatVector::GetData<int8_t>(result); |
| 219 | auto predicate_vector = Vector(filter.constant.value_.tinyint); |
| 220 | auto predicate = FlatVector::GetData<int8_t>(predicate_vector); |
| 221 | filterSelectionType<int8_t>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type, |
| 222 | nullmask); |
| 223 | break; |
| 224 | } |
| 225 | case TypeId::INT16: { |
| 226 | auto result_flat = FlatVector::GetData<int16_t>(result); |
| 227 | auto predicate_vector = Vector(filter.constant.value_.smallint); |
| 228 | auto predicate = FlatVector::GetData<int16_t>(predicate_vector); |
| 229 | filterSelectionType<int16_t>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type, |
| 230 | nullmask); |
| 231 | break; |
| 232 | } |
| 233 | case TypeId::INT32: { |
| 234 | auto result_flat = FlatVector::GetData<int32_t>(result); |
| 235 | auto predicate_vector = Vector(filter.constant.value_.integer); |
| 236 | auto predicate = FlatVector::GetData<int32_t>(predicate_vector); |
| 237 | filterSelectionType<int32_t>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type, |
| 238 | nullmask); |
| 239 | break; |
| 240 | } |
| 241 | case TypeId::INT64: { |
| 242 | auto result_flat = FlatVector::GetData<int64_t>(result); |
| 243 | auto predicate_vector = Vector(filter.constant.value_.bigint); |
| 244 | auto predicate = FlatVector::GetData<int64_t>(predicate_vector); |
| 245 | filterSelectionType<int64_t>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type, |
| 246 | nullmask); |
| 247 | break; |
| 248 | } |
| 249 | case TypeId::FLOAT: { |
| 250 | auto result_flat = FlatVector::GetData<float>(result); |
| 251 | auto predicate_vector = Vector(filter.constant.value_.float_); |
| 252 | auto predicate = FlatVector::GetData<float>(predicate_vector); |
| 253 | filterSelectionType<float>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type, |
| 254 | nullmask); |
| 255 | break; |
| 256 | } |
| 257 | case TypeId::DOUBLE: { |
| 258 | auto result_flat = FlatVector::GetData<double>(result); |
| 259 | auto predicate_vector = Vector(filter.constant.value_.double_); |
| 260 | auto predicate = FlatVector::GetData<double>(predicate_vector); |
| 261 | filterSelectionType<double>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type, |
| 262 | nullmask); |
| 263 | break; |
| 264 | } |
| 265 | case TypeId::VARCHAR: { |
| 266 | auto result_flat = FlatVector::GetData<string_t>(result); |
| 267 | auto predicate_vector = Vector(filter.constant.str_value); |
| 268 | auto predicate = FlatVector::GetData<string_t>(predicate_vector); |
| 269 | filterSelectionType<string_t>(result_flat, predicate, sel, approved_tuple_count, filter.comparison_type, |
| 270 | nullmask); |
| 271 | break; |
| 272 | } |
| 273 | default: |
| 274 | throw InvalidTypeException(result.type, "Invalid type for filter pushed down to table comparison" ); |
| 275 | } |
| 276 | } |
| 277 | |
| 278 | void UncompressedSegment::Select(Transaction &transaction, Vector &result, vector<TableFilter> &tableFilters, |
| 279 | SelectionVector &sel, idx_t &approved_tuple_count, ColumnScanState &state) { |
| 280 | auto read_lock = lock.GetSharedLock(); |
| 281 | if (versions && versions[state.vector_index]) { |
| 282 | Scan(transaction, state, state.vector_index, result, false); |
| 283 | auto vector_index = state.vector_index; |
| 284 | // pin the buffer for this segment |
| 285 | auto handle = manager.Pin(block_id); |
| 286 | auto data = handle->node->buffer; |
| 287 | auto offset = vector_index * vector_size; |
| 288 | auto source_nullmask = (nullmask_t *) (data + offset); |
| 289 | for (auto &table_filter : tableFilters) { |
| 290 | filterSelection(sel, result, table_filter, approved_tuple_count, *source_nullmask); |
| 291 | } |
| 292 | } else { |
| 293 | //! Select the data from the base table |
| 294 | Select(state, result, sel, approved_tuple_count, tableFilters); |
| 295 | } |
| 296 | } |
| 297 | |
| 298 | //===--------------------------------------------------------------------===// |
| 299 | // Scan |
| 300 | //===--------------------------------------------------------------------===// |
| 301 | void UncompressedSegment::Scan(Transaction &transaction, ColumnScanState &state, idx_t vector_index, Vector &result, |
| 302 | bool get_lock) { |
| 303 | unique_ptr<StorageLockKey> read_lock; |
| 304 | if (get_lock) { |
| 305 | read_lock = lock.GetSharedLock(); |
| 306 | } |
| 307 | // first fetch the data from the base table |
| 308 | FetchBaseData(state, vector_index, result); |
| 309 | if (versions && versions[vector_index]) { |
| 310 | // if there are any versions, check if we need to overwrite the data with the versioned data |
| 311 | FetchUpdateData(state, transaction, versions[vector_index], result); |
| 312 | } |
| 313 | } |
| 314 | |
| 315 | void UncompressedSegment::FilterScan(Transaction &transaction, ColumnScanState &state, Vector &result, |
| 316 | SelectionVector &sel, idx_t &approved_tuple_count) { |
| 317 | auto read_lock = lock.GetSharedLock(); |
| 318 | if (versions && versions[state.vector_index]) { |
| 319 | // if there are any versions, we do a regular scan |
| 320 | Scan(transaction, state, state.vector_index, result, false); |
| 321 | result.Slice(sel, approved_tuple_count); |
| 322 | } else { |
| 323 | FilterFetchBaseData(state, result, sel, approved_tuple_count); |
| 324 | } |
| 325 | } |
| 326 | |
| 327 | void UncompressedSegment::IndexScan(ColumnScanState &state, idx_t vector_index, Vector &result) { |
| 328 | if (vector_index == 0) { |
| 329 | // vector_index = 0, obtain a shared lock on the segment that we keep until the index scan is complete |
| 330 | state.locks.push_back(lock.GetSharedLock()); |
| 331 | } |
| 332 | if (versions && versions[vector_index]) { |
| 333 | throw TransactionException("Cannot create index with outstanding updates" ); |
| 334 | } |
| 335 | FetchBaseData(state, vector_index, result); |
| 336 | } |
| 337 | |
| 338 | //===--------------------------------------------------------------------===// |
| 339 | // Update |
| 340 | //===--------------------------------------------------------------------===// |
| 341 | void UncompressedSegment::CleanupUpdate(UpdateInfo *info) { |
| 342 | if (info->prev) { |
| 343 | // there is a prev info: remove from the chain |
| 344 | auto prev = info->prev; |
| 345 | prev->next = info->next; |
| 346 | if (prev->next) { |
| 347 | prev->next->prev = prev; |
| 348 | } |
| 349 | } else { |
| 350 | // there is no prev info: remove from base segment |
| 351 | info->segment->versions[info->vector_index] = info->next; |
| 352 | if (info->next) { |
| 353 | info->next->prev = nullptr; |
| 354 | } |
| 355 | } |
| 356 | } |
| 357 | |
| 358 | //===--------------------------------------------------------------------===// |
| 359 | // ToTemporary |
| 360 | //===--------------------------------------------------------------------===// |
| 361 | void UncompressedSegment::ToTemporary() { |
| 362 | auto write_lock = lock.GetExclusiveLock(); |
| 363 | |
| 364 | if (block_id >= MAXIMUM_BLOCK) { |
| 365 | // conversion has already been performed by a different thread |
| 366 | return; |
| 367 | } |
| 368 | // pin the current block |
| 369 | auto current = manager.Pin(block_id); |
| 370 | |
| 371 | // now allocate a new block from the buffer manager |
| 372 | auto handle = manager.Allocate(Storage::BLOCK_ALLOC_SIZE); |
| 373 | // now copy the data over and switch to using the new block id |
| 374 | memcpy(handle->node->buffer, current->node->buffer, Storage::BLOCK_SIZE); |
| 375 | this->block_id = handle->block_id; |
| 376 | } |
| 377 | |