| 1 | #include "duckdb/storage/column_data.hpp" | 
|---|
| 2 | #include "duckdb/storage/table/persistent_segment.hpp" | 
|---|
| 3 | #include "duckdb/storage/table/transient_segment.hpp" | 
|---|
| 4 | #include "duckdb/storage/data_table.hpp" | 
|---|
| 5 | #include "duckdb/storage/storage_manager.hpp" | 
|---|
| 6 |  | 
|---|
| 7 | using namespace duckdb; | 
|---|
| 8 | using namespace std; | 
|---|
| 9 |  | 
|---|
| 10 | ColumnData::ColumnData(BufferManager &manager, DataTableInfo &table_info) | 
|---|
| 11 | : table_info(table_info), manager(manager), persistent_rows(0) { | 
|---|
| 12 | } | 
|---|
| 13 |  | 
|---|
| 14 | void ColumnData::Initialize(vector<unique_ptr<PersistentSegment>> &segments) { | 
|---|
| 15 | for (auto &segment : segments) { | 
|---|
| 16 | persistent_rows += segment->count; | 
|---|
| 17 | data.AppendSegment(move(segment)); | 
|---|
| 18 | } | 
|---|
| 19 | } | 
|---|
| 20 |  | 
|---|
| 21 | void ColumnData::InitializeScan(ColumnScanState &state) { | 
|---|
| 22 | state.current = (ColumnSegment *)data.GetRootSegment(); | 
|---|
| 23 | state.vector_index = 0; | 
|---|
| 24 | state.initialized = false; | 
|---|
| 25 | } | 
|---|
| 26 |  | 
|---|
| 27 | void ColumnData::Scan(Transaction &transaction, ColumnScanState &state, Vector &result) { | 
|---|
| 28 | if (!state.initialized) { | 
|---|
| 29 | state.current->InitializeScan(state); | 
|---|
| 30 | state.initialized = true; | 
|---|
| 31 | } | 
|---|
| 32 | // perform a scan of this segment | 
|---|
| 33 | state.current->Scan(transaction, state, state.vector_index, result); | 
|---|
| 34 | // move over to the next vector | 
|---|
| 35 | state.Next(); | 
|---|
| 36 | } | 
|---|
| 37 |  | 
|---|
| 38 | void ColumnData::FilterScan(Transaction &transaction, ColumnScanState &state, Vector &result, SelectionVector &sel, | 
|---|
| 39 | idx_t &approved_tuple_count) { | 
|---|
| 40 | if (!state.initialized) { | 
|---|
| 41 | state.current->InitializeScan(state); | 
|---|
| 42 | state.initialized = true; | 
|---|
| 43 | } | 
|---|
| 44 | // perform a scan of this segment | 
|---|
| 45 | state.current->FilterScan(transaction, state, result, sel, approved_tuple_count); | 
|---|
| 46 | // move over to the next vector | 
|---|
| 47 | state.Next(); | 
|---|
| 48 | } | 
|---|
| 49 |  | 
|---|
| 50 | void ColumnData::Select(Transaction &transaction, ColumnScanState &state, Vector &result, SelectionVector &sel, | 
|---|
| 51 | idx_t &approved_tuple_count, vector<TableFilter> &tableFilter) { | 
|---|
| 52 | if (!state.initialized) { | 
|---|
| 53 | state.current->InitializeScan(state); | 
|---|
| 54 | state.initialized = true; | 
|---|
| 55 | } | 
|---|
| 56 | // perform a scan of this segment | 
|---|
| 57 | state.current->Select(transaction, state, result, sel, approved_tuple_count, tableFilter); | 
|---|
| 58 | // move over to the next vector | 
|---|
| 59 | state.Next(); | 
|---|
| 60 | } | 
|---|
| 61 |  | 
|---|
| 62 | void ColumnData::IndexScan(ColumnScanState &state, Vector &result) { | 
|---|
| 63 | if (state.vector_index == 0) { | 
|---|
| 64 | state.current->InitializeScan(state); | 
|---|
| 65 | } | 
|---|
| 66 | // perform a scan of this segment | 
|---|
| 67 | state.current->IndexScan(state, result); | 
|---|
| 68 | // move over to the next vector | 
|---|
| 69 | state.Next(); | 
|---|
| 70 | } | 
|---|
| 71 |  | 
|---|
| 72 | void ColumnScanState::Next() { | 
|---|
| 73 | //! There is no column segment | 
|---|
| 74 | if (!current) { | 
|---|
| 75 | return; | 
|---|
| 76 | } | 
|---|
| 77 | vector_index++; | 
|---|
| 78 | if (vector_index * STANDARD_VECTOR_SIZE >= current->count) { | 
|---|
| 79 | current = (ColumnSegment *)current->next.get(); | 
|---|
| 80 | vector_index = 0; | 
|---|
| 81 | initialized = false; | 
|---|
| 82 | segment_checked = false; | 
|---|
| 83 | } | 
|---|
| 84 | } | 
|---|
| 85 |  | 
|---|
| 86 | void TableScanState::NextVector() { | 
|---|
| 87 | //! nothing to scan for this vector, skip the entire vector | 
|---|
| 88 | for (idx_t j = 0; j < column_ids.size(); j++) { | 
|---|
| 89 | auto column = column_ids[j]; | 
|---|
| 90 | if (column != COLUMN_IDENTIFIER_ROW_ID) { | 
|---|
| 91 | column_scans[j].Next(); | 
|---|
| 92 | } | 
|---|
| 93 | } | 
|---|
| 94 | } | 
|---|
| 95 |  | 
|---|
| 96 | void ColumnData::InitializeAppend(ColumnAppendState &state) { | 
|---|
| 97 | lock_guard<mutex> tree_lock(data.node_lock); | 
|---|
| 98 | if (data.nodes.size() == 0) { | 
|---|
| 99 | // no transient segments yet, append one | 
|---|
| 100 | AppendTransientSegment(persistent_rows); | 
|---|
| 101 | } | 
|---|
| 102 | auto segment = (ColumnSegment *)data.GetLastSegment(); | 
|---|
| 103 | if (segment->segment_type == ColumnSegmentType::PERSISTENT) { | 
|---|
| 104 | // cannot append to persistent segment, add a transient one | 
|---|
| 105 | AppendTransientSegment(persistent_rows); | 
|---|
| 106 | state.current = (TransientSegment *)data.GetLastSegment(); | 
|---|
| 107 | } else { | 
|---|
| 108 | state.current = (TransientSegment *)segment; | 
|---|
| 109 | } | 
|---|
| 110 | assert(state.current->segment_type == ColumnSegmentType::TRANSIENT); | 
|---|
| 111 | state.current->InitializeAppend(state); | 
|---|
| 112 | } | 
|---|
| 113 |  | 
|---|
| 114 | void ColumnData::Append(ColumnAppendState &state, Vector &vector, idx_t count) { | 
|---|
| 115 | idx_t offset = 0; | 
|---|
| 116 | while (true) { | 
|---|
| 117 | // append the data from the vector | 
|---|
| 118 | idx_t copied_elements = state.current->Append(state, vector, offset, count); | 
|---|
| 119 | if (copied_elements == count) { | 
|---|
| 120 | // finished copying everything | 
|---|
| 121 | break; | 
|---|
| 122 | } | 
|---|
| 123 |  | 
|---|
| 124 | // we couldn't fit everything we wanted in the current column segment, create a new one | 
|---|
| 125 | { | 
|---|
| 126 | lock_guard<mutex> tree_lock(data.node_lock); | 
|---|
| 127 | AppendTransientSegment(state.current->start + state.current->count); | 
|---|
| 128 | state.current = (TransientSegment *)data.GetLastSegment(); | 
|---|
| 129 | state.current->InitializeAppend(state); | 
|---|
| 130 | } | 
|---|
| 131 | offset += copied_elements; | 
|---|
| 132 | count -= copied_elements; | 
|---|
| 133 | } | 
|---|
| 134 | } | 
|---|
| 135 |  | 
|---|
| 136 | void ColumnData::RevertAppend(row_t start_row) { | 
|---|
| 137 | lock_guard<mutex> tree_lock(data.node_lock); | 
|---|
| 138 | // find the segment index that the current row belongs to | 
|---|
| 139 | idx_t segment_index = data.GetSegmentIndex(start_row); | 
|---|
| 140 | auto segment = data.nodes[segment_index].node; | 
|---|
| 141 | auto &transient = (TransientSegment &)*segment; | 
|---|
| 142 | assert(transient.segment_type == ColumnSegmentType::TRANSIENT); | 
|---|
| 143 |  | 
|---|
| 144 | // remove any segments AFTER this segment: they should be deleted entirely | 
|---|
| 145 | if (segment_index < data.nodes.size() - 1) { | 
|---|
| 146 | data.nodes.erase(data.nodes.begin() + segment_index + 1, data.nodes.end()); | 
|---|
| 147 | } | 
|---|
| 148 | segment->next = nullptr; | 
|---|
| 149 | transient.RevertAppend(start_row); | 
|---|
| 150 | } | 
|---|
| 151 |  | 
|---|
| 152 | void ColumnData::Update(Transaction &transaction, Vector &updates, Vector &row_ids, idx_t count) { | 
|---|
| 153 | // first find the segment that the update belongs to | 
|---|
| 154 | idx_t first_id = FlatVector::GetValue<row_t>(row_ids, 0); | 
|---|
| 155 | auto segment = (ColumnSegment *)data.GetSegment(first_id); | 
|---|
| 156 | // now perform the update within the segment | 
|---|
| 157 | segment->Update(*this, transaction, updates, FlatVector::GetData<row_t>(row_ids), count); | 
|---|
| 158 | } | 
|---|
| 159 |  | 
|---|
| 160 | void ColumnData::Fetch(ColumnScanState &state, row_t row_id, Vector &result) { | 
|---|
| 161 | // find the segment that the row belongs to | 
|---|
| 162 | auto segment = (ColumnSegment *)data.GetSegment(row_id); | 
|---|
| 163 | auto vector_index = (row_id - segment->start) / STANDARD_VECTOR_SIZE; | 
|---|
| 164 | // now perform the fetch within the segment | 
|---|
| 165 | segment->Fetch(state, vector_index, result); | 
|---|
| 166 | } | 
|---|
| 167 |  | 
|---|
| 168 | void ColumnData::FetchRow(ColumnFetchState &state, Transaction &transaction, row_t row_id, Vector &result, | 
|---|
| 169 | idx_t result_idx) { | 
|---|
| 170 | // find the segment the row belongs to | 
|---|
| 171 | auto segment = (TransientSegment *)data.GetSegment(row_id); | 
|---|
| 172 | // now perform the fetch within the segment | 
|---|
| 173 | segment->FetchRow(state, transaction, row_id, result, result_idx); | 
|---|
| 174 | } | 
|---|
| 175 |  | 
|---|
| 176 | void ColumnData::AppendTransientSegment(idx_t start_row) { | 
|---|
| 177 | auto new_segment = make_unique<TransientSegment>(manager, type, start_row); | 
|---|
| 178 | data.AppendSegment(move(new_segment)); | 
|---|
| 179 | } | 
|---|
| 180 |  | 
|---|