1#include "duckdb/storage/column_data.hpp"
2#include "duckdb/storage/table/persistent_segment.hpp"
3#include "duckdb/storage/table/transient_segment.hpp"
4#include "duckdb/storage/data_table.hpp"
5#include "duckdb/storage/storage_manager.hpp"
6
7using namespace duckdb;
8using namespace std;
9
10ColumnData::ColumnData(BufferManager &manager, DataTableInfo &table_info)
11 : table_info(table_info), manager(manager), persistent_rows(0) {
12}
13
14void ColumnData::Initialize(vector<unique_ptr<PersistentSegment>> &segments) {
15 for (auto &segment : segments) {
16 persistent_rows += segment->count;
17 data.AppendSegment(move(segment));
18 }
19}
20
21void ColumnData::InitializeScan(ColumnScanState &state) {
22 state.current = (ColumnSegment *)data.GetRootSegment();
23 state.vector_index = 0;
24 state.initialized = false;
25}
26
27void ColumnData::Scan(Transaction &transaction, ColumnScanState &state, Vector &result) {
28 if (!state.initialized) {
29 state.current->InitializeScan(state);
30 state.initialized = true;
31 }
32 // perform a scan of this segment
33 state.current->Scan(transaction, state, state.vector_index, result);
34 // move over to the next vector
35 state.Next();
36}
37
38void ColumnData::FilterScan(Transaction &transaction, ColumnScanState &state, Vector &result, SelectionVector &sel,
39 idx_t &approved_tuple_count) {
40 if (!state.initialized) {
41 state.current->InitializeScan(state);
42 state.initialized = true;
43 }
44 // perform a scan of this segment
45 state.current->FilterScan(transaction, state, result, sel, approved_tuple_count);
46 // move over to the next vector
47 state.Next();
48}
49
50void ColumnData::Select(Transaction &transaction, ColumnScanState &state, Vector &result, SelectionVector &sel,
51 idx_t &approved_tuple_count, vector<TableFilter> &tableFilter) {
52 if (!state.initialized) {
53 state.current->InitializeScan(state);
54 state.initialized = true;
55 }
56 // perform a scan of this segment
57 state.current->Select(transaction, state, result, sel, approved_tuple_count, tableFilter);
58 // move over to the next vector
59 state.Next();
60}
61
62void ColumnData::IndexScan(ColumnScanState &state, Vector &result) {
63 if (state.vector_index == 0) {
64 state.current->InitializeScan(state);
65 }
66 // perform a scan of this segment
67 state.current->IndexScan(state, result);
68 // move over to the next vector
69 state.Next();
70}
71
72void ColumnScanState::Next() {
73 //! There is no column segment
74 if (!current) {
75 return;
76 }
77 vector_index++;
78 if (vector_index * STANDARD_VECTOR_SIZE >= current->count) {
79 current = (ColumnSegment *)current->next.get();
80 vector_index = 0;
81 initialized = false;
82 segment_checked = false;
83 }
84}
85
86void TableScanState::NextVector() {
87 //! nothing to scan for this vector, skip the entire vector
88 for (idx_t j = 0; j < column_ids.size(); j++) {
89 auto column = column_ids[j];
90 if (column != COLUMN_IDENTIFIER_ROW_ID) {
91 column_scans[j].Next();
92 }
93 }
94}
95
96void ColumnData::InitializeAppend(ColumnAppendState &state) {
97 lock_guard<mutex> tree_lock(data.node_lock);
98 if (data.nodes.size() == 0) {
99 // no transient segments yet, append one
100 AppendTransientSegment(persistent_rows);
101 }
102 auto segment = (ColumnSegment *)data.GetLastSegment();
103 if (segment->segment_type == ColumnSegmentType::PERSISTENT) {
104 // cannot append to persistent segment, add a transient one
105 AppendTransientSegment(persistent_rows);
106 state.current = (TransientSegment *)data.GetLastSegment();
107 } else {
108 state.current = (TransientSegment *)segment;
109 }
110 assert(state.current->segment_type == ColumnSegmentType::TRANSIENT);
111 state.current->InitializeAppend(state);
112}
113
114void ColumnData::Append(ColumnAppendState &state, Vector &vector, idx_t count) {
115 idx_t offset = 0;
116 while (true) {
117 // append the data from the vector
118 idx_t copied_elements = state.current->Append(state, vector, offset, count);
119 if (copied_elements == count) {
120 // finished copying everything
121 break;
122 }
123
124 // we couldn't fit everything we wanted in the current column segment, create a new one
125 {
126 lock_guard<mutex> tree_lock(data.node_lock);
127 AppendTransientSegment(state.current->start + state.current->count);
128 state.current = (TransientSegment *)data.GetLastSegment();
129 state.current->InitializeAppend(state);
130 }
131 offset += copied_elements;
132 count -= copied_elements;
133 }
134}
135
136void ColumnData::RevertAppend(row_t start_row) {
137 lock_guard<mutex> tree_lock(data.node_lock);
138 // find the segment index that the current row belongs to
139 idx_t segment_index = data.GetSegmentIndex(start_row);
140 auto segment = data.nodes[segment_index].node;
141 auto &transient = (TransientSegment &)*segment;
142 assert(transient.segment_type == ColumnSegmentType::TRANSIENT);
143
144 // remove any segments AFTER this segment: they should be deleted entirely
145 if (segment_index < data.nodes.size() - 1) {
146 data.nodes.erase(data.nodes.begin() + segment_index + 1, data.nodes.end());
147 }
148 segment->next = nullptr;
149 transient.RevertAppend(start_row);
150}
151
152void ColumnData::Update(Transaction &transaction, Vector &updates, Vector &row_ids, idx_t count) {
153 // first find the segment that the update belongs to
154 idx_t first_id = FlatVector::GetValue<row_t>(row_ids, 0);
155 auto segment = (ColumnSegment *)data.GetSegment(first_id);
156 // now perform the update within the segment
157 segment->Update(*this, transaction, updates, FlatVector::GetData<row_t>(row_ids), count);
158}
159
160void ColumnData::Fetch(ColumnScanState &state, row_t row_id, Vector &result) {
161 // find the segment that the row belongs to
162 auto segment = (ColumnSegment *)data.GetSegment(row_id);
163 auto vector_index = (row_id - segment->start) / STANDARD_VECTOR_SIZE;
164 // now perform the fetch within the segment
165 segment->Fetch(state, vector_index, result);
166}
167
168void ColumnData::FetchRow(ColumnFetchState &state, Transaction &transaction, row_t row_id, Vector &result,
169 idx_t result_idx) {
170 // find the segment the row belongs to
171 auto segment = (TransientSegment *)data.GetSegment(row_id);
172 // now perform the fetch within the segment
173 segment->FetchRow(state, transaction, row_id, result, result_idx);
174}
175
176void ColumnData::AppendTransientSegment(idx_t start_row) {
177 auto new_segment = make_unique<TransientSegment>(manager, type, start_row);
178 data.AppendSegment(move(new_segment));
179}
180