1 | #include "duckdb/storage/column_data.hpp" |
2 | #include "duckdb/storage/table/persistent_segment.hpp" |
3 | #include "duckdb/storage/table/transient_segment.hpp" |
4 | #include "duckdb/storage/data_table.hpp" |
5 | #include "duckdb/storage/storage_manager.hpp" |
6 | |
7 | using namespace duckdb; |
8 | using namespace std; |
9 | |
10 | ColumnData::ColumnData(BufferManager &manager, DataTableInfo &table_info) |
11 | : table_info(table_info), manager(manager), persistent_rows(0) { |
12 | } |
13 | |
14 | void ColumnData::Initialize(vector<unique_ptr<PersistentSegment>> &segments) { |
15 | for (auto &segment : segments) { |
16 | persistent_rows += segment->count; |
17 | data.AppendSegment(move(segment)); |
18 | } |
19 | } |
20 | |
21 | void ColumnData::InitializeScan(ColumnScanState &state) { |
22 | state.current = (ColumnSegment *)data.GetRootSegment(); |
23 | state.vector_index = 0; |
24 | state.initialized = false; |
25 | } |
26 | |
27 | void ColumnData::Scan(Transaction &transaction, ColumnScanState &state, Vector &result) { |
28 | if (!state.initialized) { |
29 | state.current->InitializeScan(state); |
30 | state.initialized = true; |
31 | } |
32 | // perform a scan of this segment |
33 | state.current->Scan(transaction, state, state.vector_index, result); |
34 | // move over to the next vector |
35 | state.Next(); |
36 | } |
37 | |
38 | void ColumnData::FilterScan(Transaction &transaction, ColumnScanState &state, Vector &result, SelectionVector &sel, |
39 | idx_t &approved_tuple_count) { |
40 | if (!state.initialized) { |
41 | state.current->InitializeScan(state); |
42 | state.initialized = true; |
43 | } |
44 | // perform a scan of this segment |
45 | state.current->FilterScan(transaction, state, result, sel, approved_tuple_count); |
46 | // move over to the next vector |
47 | state.Next(); |
48 | } |
49 | |
50 | void ColumnData::Select(Transaction &transaction, ColumnScanState &state, Vector &result, SelectionVector &sel, |
51 | idx_t &approved_tuple_count, vector<TableFilter> &tableFilter) { |
52 | if (!state.initialized) { |
53 | state.current->InitializeScan(state); |
54 | state.initialized = true; |
55 | } |
56 | // perform a scan of this segment |
57 | state.current->Select(transaction, state, result, sel, approved_tuple_count, tableFilter); |
58 | // move over to the next vector |
59 | state.Next(); |
60 | } |
61 | |
62 | void ColumnData::IndexScan(ColumnScanState &state, Vector &result) { |
63 | if (state.vector_index == 0) { |
64 | state.current->InitializeScan(state); |
65 | } |
66 | // perform a scan of this segment |
67 | state.current->IndexScan(state, result); |
68 | // move over to the next vector |
69 | state.Next(); |
70 | } |
71 | |
72 | void ColumnScanState::Next() { |
73 | //! There is no column segment |
74 | if (!current) { |
75 | return; |
76 | } |
77 | vector_index++; |
78 | if (vector_index * STANDARD_VECTOR_SIZE >= current->count) { |
79 | current = (ColumnSegment *)current->next.get(); |
80 | vector_index = 0; |
81 | initialized = false; |
82 | segment_checked = false; |
83 | } |
84 | } |
85 | |
86 | void TableScanState::NextVector() { |
87 | //! nothing to scan for this vector, skip the entire vector |
88 | for (idx_t j = 0; j < column_ids.size(); j++) { |
89 | auto column = column_ids[j]; |
90 | if (column != COLUMN_IDENTIFIER_ROW_ID) { |
91 | column_scans[j].Next(); |
92 | } |
93 | } |
94 | } |
95 | |
96 | void ColumnData::InitializeAppend(ColumnAppendState &state) { |
97 | lock_guard<mutex> tree_lock(data.node_lock); |
98 | if (data.nodes.size() == 0) { |
99 | // no transient segments yet, append one |
100 | AppendTransientSegment(persistent_rows); |
101 | } |
102 | auto segment = (ColumnSegment *)data.GetLastSegment(); |
103 | if (segment->segment_type == ColumnSegmentType::PERSISTENT) { |
104 | // cannot append to persistent segment, add a transient one |
105 | AppendTransientSegment(persistent_rows); |
106 | state.current = (TransientSegment *)data.GetLastSegment(); |
107 | } else { |
108 | state.current = (TransientSegment *)segment; |
109 | } |
110 | assert(state.current->segment_type == ColumnSegmentType::TRANSIENT); |
111 | state.current->InitializeAppend(state); |
112 | } |
113 | |
114 | void ColumnData::Append(ColumnAppendState &state, Vector &vector, idx_t count) { |
115 | idx_t offset = 0; |
116 | while (true) { |
117 | // append the data from the vector |
118 | idx_t copied_elements = state.current->Append(state, vector, offset, count); |
119 | if (copied_elements == count) { |
120 | // finished copying everything |
121 | break; |
122 | } |
123 | |
124 | // we couldn't fit everything we wanted in the current column segment, create a new one |
125 | { |
126 | lock_guard<mutex> tree_lock(data.node_lock); |
127 | AppendTransientSegment(state.current->start + state.current->count); |
128 | state.current = (TransientSegment *)data.GetLastSegment(); |
129 | state.current->InitializeAppend(state); |
130 | } |
131 | offset += copied_elements; |
132 | count -= copied_elements; |
133 | } |
134 | } |
135 | |
136 | void ColumnData::RevertAppend(row_t start_row) { |
137 | lock_guard<mutex> tree_lock(data.node_lock); |
138 | // find the segment index that the current row belongs to |
139 | idx_t segment_index = data.GetSegmentIndex(start_row); |
140 | auto segment = data.nodes[segment_index].node; |
141 | auto &transient = (TransientSegment &)*segment; |
142 | assert(transient.segment_type == ColumnSegmentType::TRANSIENT); |
143 | |
144 | // remove any segments AFTER this segment: they should be deleted entirely |
145 | if (segment_index < data.nodes.size() - 1) { |
146 | data.nodes.erase(data.nodes.begin() + segment_index + 1, data.nodes.end()); |
147 | } |
148 | segment->next = nullptr; |
149 | transient.RevertAppend(start_row); |
150 | } |
151 | |
152 | void ColumnData::Update(Transaction &transaction, Vector &updates, Vector &row_ids, idx_t count) { |
153 | // first find the segment that the update belongs to |
154 | idx_t first_id = FlatVector::GetValue<row_t>(row_ids, 0); |
155 | auto segment = (ColumnSegment *)data.GetSegment(first_id); |
156 | // now perform the update within the segment |
157 | segment->Update(*this, transaction, updates, FlatVector::GetData<row_t>(row_ids), count); |
158 | } |
159 | |
160 | void ColumnData::Fetch(ColumnScanState &state, row_t row_id, Vector &result) { |
161 | // find the segment that the row belongs to |
162 | auto segment = (ColumnSegment *)data.GetSegment(row_id); |
163 | auto vector_index = (row_id - segment->start) / STANDARD_VECTOR_SIZE; |
164 | // now perform the fetch within the segment |
165 | segment->Fetch(state, vector_index, result); |
166 | } |
167 | |
168 | void ColumnData::FetchRow(ColumnFetchState &state, Transaction &transaction, row_t row_id, Vector &result, |
169 | idx_t result_idx) { |
170 | // find the segment the row belongs to |
171 | auto segment = (TransientSegment *)data.GetSegment(row_id); |
172 | // now perform the fetch within the segment |
173 | segment->FetchRow(state, transaction, row_id, result, result_idx); |
174 | } |
175 | |
176 | void ColumnData::AppendTransientSegment(idx_t start_row) { |
177 | auto new_segment = make_unique<TransientSegment>(manager, type, start_row); |
178 | data.AppendSegment(move(new_segment)); |
179 | } |
180 | |