| 1 | #include "duckdb/storage/column_data.hpp" | 
| 2 | #include "duckdb/storage/table/persistent_segment.hpp" | 
| 3 | #include "duckdb/storage/table/transient_segment.hpp" | 
| 4 | #include "duckdb/storage/data_table.hpp" | 
| 5 | #include "duckdb/storage/storage_manager.hpp" | 
| 6 |  | 
| 7 | using namespace duckdb; | 
| 8 | using namespace std; | 
| 9 |  | 
| 10 | ColumnData::ColumnData(BufferManager &manager, DataTableInfo &table_info) | 
| 11 |     : table_info(table_info), manager(manager), persistent_rows(0) { | 
| 12 | } | 
| 13 |  | 
| 14 | void ColumnData::Initialize(vector<unique_ptr<PersistentSegment>> &segments) { | 
| 15 | 	for (auto &segment : segments) { | 
| 16 | 		persistent_rows += segment->count; | 
| 17 | 		data.AppendSegment(move(segment)); | 
| 18 | 	} | 
| 19 | } | 
| 20 |  | 
| 21 | void ColumnData::InitializeScan(ColumnScanState &state) { | 
| 22 | 	state.current = (ColumnSegment *)data.GetRootSegment(); | 
| 23 | 	state.vector_index = 0; | 
| 24 | 	state.initialized = false; | 
| 25 | } | 
| 26 |  | 
| 27 | void ColumnData::Scan(Transaction &transaction, ColumnScanState &state, Vector &result) { | 
| 28 | 	if (!state.initialized) { | 
| 29 | 		state.current->InitializeScan(state); | 
| 30 | 		state.initialized = true; | 
| 31 | 	} | 
| 32 | 	// perform a scan of this segment | 
| 33 | 	state.current->Scan(transaction, state, state.vector_index, result); | 
| 34 | 	// move over to the next vector | 
| 35 | 	state.Next(); | 
| 36 | } | 
| 37 |  | 
| 38 | void ColumnData::FilterScan(Transaction &transaction, ColumnScanState &state, Vector &result, SelectionVector &sel, | 
| 39 |                             idx_t &approved_tuple_count) { | 
| 40 | 	if (!state.initialized) { | 
| 41 | 		state.current->InitializeScan(state); | 
| 42 | 		state.initialized = true; | 
| 43 | 	} | 
| 44 | 	// perform a scan of this segment | 
| 45 | 	state.current->FilterScan(transaction, state, result, sel, approved_tuple_count); | 
| 46 | 	// move over to the next vector | 
| 47 | 	state.Next(); | 
| 48 | } | 
| 49 |  | 
| 50 | void ColumnData::Select(Transaction &transaction, ColumnScanState &state, Vector &result, SelectionVector &sel, | 
| 51 |                         idx_t &approved_tuple_count, vector<TableFilter> &tableFilter) { | 
| 52 | 	if (!state.initialized) { | 
| 53 | 		state.current->InitializeScan(state); | 
| 54 | 		state.initialized = true; | 
| 55 | 	} | 
| 56 | 	// perform a scan of this segment | 
| 57 | 	state.current->Select(transaction, state, result, sel, approved_tuple_count, tableFilter); | 
| 58 | 	// move over to the next vector | 
| 59 | 	state.Next(); | 
| 60 | } | 
| 61 |  | 
| 62 | void ColumnData::IndexScan(ColumnScanState &state, Vector &result) { | 
| 63 | 	if (state.vector_index == 0) { | 
| 64 | 		state.current->InitializeScan(state); | 
| 65 | 	} | 
| 66 | 	// perform a scan of this segment | 
| 67 | 	state.current->IndexScan(state, result); | 
| 68 | 	// move over to the next vector | 
| 69 | 	state.Next(); | 
| 70 | } | 
| 71 |  | 
| 72 | void ColumnScanState::Next() { | 
| 73 | 	//! There is no column segment | 
| 74 | 	if (!current) { | 
| 75 | 		return; | 
| 76 | 	} | 
| 77 | 	vector_index++; | 
| 78 | 	if (vector_index * STANDARD_VECTOR_SIZE >= current->count) { | 
| 79 | 		current = (ColumnSegment *)current->next.get(); | 
| 80 | 		vector_index = 0; | 
| 81 | 		initialized = false; | 
| 82 | 		segment_checked = false; | 
| 83 | 	} | 
| 84 | } | 
| 85 |  | 
| 86 | void TableScanState::NextVector() { | 
| 87 | 	//! nothing to scan for this vector, skip the entire vector | 
| 88 | 	for (idx_t j = 0; j < column_ids.size(); j++) { | 
| 89 | 		auto column = column_ids[j]; | 
| 90 | 		if (column != COLUMN_IDENTIFIER_ROW_ID) { | 
| 91 | 			column_scans[j].Next(); | 
| 92 | 		} | 
| 93 | 	} | 
| 94 | } | 
| 95 |  | 
| 96 | void ColumnData::InitializeAppend(ColumnAppendState &state) { | 
| 97 | 	lock_guard<mutex> tree_lock(data.node_lock); | 
| 98 | 	if (data.nodes.size() == 0) { | 
| 99 | 		// no transient segments yet, append one | 
| 100 | 		AppendTransientSegment(persistent_rows); | 
| 101 | 	} | 
| 102 | 	auto segment = (ColumnSegment *)data.GetLastSegment(); | 
| 103 | 	if (segment->segment_type == ColumnSegmentType::PERSISTENT) { | 
| 104 | 		// cannot append to persistent segment, add a transient one | 
| 105 | 		AppendTransientSegment(persistent_rows); | 
| 106 | 		state.current = (TransientSegment *)data.GetLastSegment(); | 
| 107 | 	} else { | 
| 108 | 		state.current = (TransientSegment *)segment; | 
| 109 | 	} | 
| 110 | 	assert(state.current->segment_type == ColumnSegmentType::TRANSIENT); | 
| 111 | 	state.current->InitializeAppend(state); | 
| 112 | } | 
| 113 |  | 
| 114 | void ColumnData::Append(ColumnAppendState &state, Vector &vector, idx_t count) { | 
| 115 | 	idx_t offset = 0; | 
| 116 | 	while (true) { | 
| 117 | 		// append the data from the vector | 
| 118 | 		idx_t copied_elements = state.current->Append(state, vector, offset, count); | 
| 119 | 		if (copied_elements == count) { | 
| 120 | 			// finished copying everything | 
| 121 | 			break; | 
| 122 | 		} | 
| 123 |  | 
| 124 | 		// we couldn't fit everything we wanted in the current column segment, create a new one | 
| 125 | 		{ | 
| 126 | 			lock_guard<mutex> tree_lock(data.node_lock); | 
| 127 | 			AppendTransientSegment(state.current->start + state.current->count); | 
| 128 | 			state.current = (TransientSegment *)data.GetLastSegment(); | 
| 129 | 			state.current->InitializeAppend(state); | 
| 130 | 		} | 
| 131 | 		offset += copied_elements; | 
| 132 | 		count -= copied_elements; | 
| 133 | 	} | 
| 134 | } | 
| 135 |  | 
| 136 | void ColumnData::RevertAppend(row_t start_row) { | 
| 137 | 	lock_guard<mutex> tree_lock(data.node_lock); | 
| 138 | 	// find the segment index that the current row belongs to | 
| 139 | 	idx_t segment_index = data.GetSegmentIndex(start_row); | 
| 140 | 	auto segment = data.nodes[segment_index].node; | 
| 141 | 	auto &transient = (TransientSegment &)*segment; | 
| 142 | 	assert(transient.segment_type == ColumnSegmentType::TRANSIENT); | 
| 143 |  | 
| 144 | 	// remove any segments AFTER this segment: they should be deleted entirely | 
| 145 | 	if (segment_index < data.nodes.size() - 1) { | 
| 146 | 		data.nodes.erase(data.nodes.begin() + segment_index + 1, data.nodes.end()); | 
| 147 | 	} | 
| 148 | 	segment->next = nullptr; | 
| 149 | 	transient.RevertAppend(start_row); | 
| 150 | } | 
| 151 |  | 
| 152 | void ColumnData::Update(Transaction &transaction, Vector &updates, Vector &row_ids, idx_t count) { | 
| 153 | 	// first find the segment that the update belongs to | 
| 154 | 	idx_t first_id = FlatVector::GetValue<row_t>(row_ids, 0); | 
| 155 | 	auto segment = (ColumnSegment *)data.GetSegment(first_id); | 
| 156 | 	// now perform the update within the segment | 
| 157 | 	segment->Update(*this, transaction, updates, FlatVector::GetData<row_t>(row_ids), count); | 
| 158 | } | 
| 159 |  | 
| 160 | void ColumnData::Fetch(ColumnScanState &state, row_t row_id, Vector &result) { | 
| 161 | 	// find the segment that the row belongs to | 
| 162 | 	auto segment = (ColumnSegment *)data.GetSegment(row_id); | 
| 163 | 	auto vector_index = (row_id - segment->start) / STANDARD_VECTOR_SIZE; | 
| 164 | 	// now perform the fetch within the segment | 
| 165 | 	segment->Fetch(state, vector_index, result); | 
| 166 | } | 
| 167 |  | 
| 168 | void ColumnData::FetchRow(ColumnFetchState &state, Transaction &transaction, row_t row_id, Vector &result, | 
| 169 |                           idx_t result_idx) { | 
| 170 | 	// find the segment the row belongs to | 
| 171 | 	auto segment = (TransientSegment *)data.GetSegment(row_id); | 
| 172 | 	// now perform the fetch within the segment | 
| 173 | 	segment->FetchRow(state, transaction, row_id, result, result_idx); | 
| 174 | } | 
| 175 |  | 
| 176 | void ColumnData::AppendTransientSegment(idx_t start_row) { | 
| 177 | 	auto new_segment = make_unique<TransientSegment>(manager, type, start_row); | 
| 178 | 	data.AppendSegment(move(new_segment)); | 
| 179 | } | 
| 180 |  |