1#include "duckdb/storage/table/version_manager.hpp"
2#include "duckdb/transaction/transaction.hpp"
3#include "duckdb/common/vector_operations/vector_operations.hpp"
4
5using namespace duckdb;
6using namespace std;
7
8idx_t VersionManager::GetSelVector(Transaction &transaction, idx_t index, SelectionVector &sel_vector,
9 idx_t max_count) {
10 // obtain a read lock
11 auto read_lock = lock.GetSharedLock();
12
13 auto entry = info.find(index);
14 if (entry == info.end()) {
15 // no info, use everything
16 return max_count;
17 } else {
18 // get the selection vector from the chunk info
19 return entry->second->GetSelVector(transaction, sel_vector, max_count);
20 }
21}
22
23bool VersionManager::Fetch(Transaction &transaction, idx_t row) {
24 row -= base_row;
25 idx_t vector_index = row / STANDARD_VECTOR_SIZE;
26
27 auto entry = info.find(vector_index);
28 if (entry == info.end()) {
29 // no info, use the row
30 return true;
31 } else {
32 // there is an info: need to figure out if we want to use the row or not
33 return entry->second->Fetch(transaction, row - vector_index * STANDARD_VECTOR_SIZE);
34 }
35}
36
37class VersionDeleteState {
38public:
39 VersionDeleteState(VersionManager &manager, Transaction &transaction, DataTable *table, idx_t base_row)
40 : manager(manager), transaction(transaction), table(table), current_info(nullptr), current_chunk((idx_t)-1),
41 count(0), base_row(base_row) {
42 }
43
44 VersionManager &manager;
45 Transaction &transaction;
46 DataTable *table;
47 ChunkInfo *current_info;
48 idx_t current_chunk;
49 row_t rows[STANDARD_VECTOR_SIZE];
50 idx_t count;
51 idx_t base_row;
52 idx_t chunk_row;
53
54public:
55 void Delete(row_t row_id);
56 void Flush();
57};
58
59void VersionManager::Delete(Transaction &transaction, DataTable *table, Vector &row_ids, idx_t count) {
60 VersionDeleteState del_state(*this, transaction, table, base_row);
61
62 VectorData rdata;
63 row_ids.Orrify(count, rdata);
64 // obtain a write lock
65 auto write_lock = lock.GetExclusiveLock();
66 auto ids = (row_t *)rdata.data;
67 for (idx_t i = 0; i < count; i++) {
68 auto ridx = rdata.sel->get_index(i);
69 del_state.Delete(ids[ridx] - base_row);
70 }
71 del_state.Flush();
72}
73
74void VersionDeleteState::Delete(row_t row_id) {
75 idx_t chunk_idx = row_id / STANDARD_VECTOR_SIZE;
76 idx_t idx_in_chunk = row_id - chunk_idx * STANDARD_VECTOR_SIZE;
77
78 // check if we are targetting a different chunk than the current chunk
79 if (chunk_idx != current_chunk) {
80 // if we are, first flush the previous chunk
81 Flush();
82
83 // then look up if the chunk already exists
84 auto entry = manager.info.find(chunk_idx);
85 if (entry == manager.info.end()) {
86 // no version info yet: have to create one
87 auto new_info = make_unique<ChunkDeleteInfo>(manager, chunk_idx * STANDARD_VECTOR_SIZE);
88 current_info = new_info.get();
89 manager.info[chunk_idx] = move(new_info);
90 } else {
91 // version info already exists: alter existing version info
92 current_info = entry->second.get();
93 }
94 current_chunk = chunk_idx;
95 chunk_row = chunk_idx * STANDARD_VECTOR_SIZE;
96 }
97
98 // now add the row to the set of to-be-deleted rows
99 rows[count++] = idx_in_chunk;
100}
101
102void VersionDeleteState::Flush() {
103 if (count == 0) {
104 return;
105 }
106 // delete in the current info
107 current_info->Delete(transaction, rows, count);
108 // now push the delete into the undo buffer
109 transaction.PushDelete(table, current_info, rows, count, base_row + chunk_row);
110 count = 0;
111}
112
113void VersionManager::Append(Transaction &transaction, row_t row_start, idx_t count, transaction_t commit_id) {
114 idx_t chunk_idx = row_start / STANDARD_VECTOR_SIZE;
115 idx_t idx_in_chunk = row_start - chunk_idx * STANDARD_VECTOR_SIZE;
116
117 // obtain a write lock
118 auto write_lock = lock.GetExclusiveLock();
119 auto current_info = GetInsertInfo(chunk_idx);
120 for (idx_t i = 0; i < count; i++) {
121 current_info->inserted[idx_in_chunk] = commit_id;
122 idx_in_chunk++;
123 if (idx_in_chunk == STANDARD_VECTOR_SIZE) {
124 chunk_idx++;
125 idx_in_chunk = 0;
126 current_info = GetInsertInfo(chunk_idx);
127 }
128 }
129 max_row += count;
130}
131
132ChunkInsertInfo *VersionManager::GetInsertInfo(idx_t chunk_idx) {
133 auto entry = info.find(chunk_idx);
134 if (entry == info.end()) {
135 // no version info yet: have to create one
136 auto new_info = make_unique<ChunkInsertInfo>(*this, chunk_idx * STANDARD_VECTOR_SIZE);
137 auto result = new_info.get();
138 info[chunk_idx] = move(new_info);
139 return result;
140 } else {
141 // version info already exists: check if it is insert or delete info
142 auto current_info = entry->second.get();
143 if (current_info->type == ChunkInfoType::INSERT_INFO) {
144 return (ChunkInsertInfo *)current_info;
145 } else {
146 assert(current_info->type == ChunkInfoType::DELETE_INFO);
147 // delete info, change to insert info
148 auto new_info = make_unique<ChunkInsertInfo>((ChunkDeleteInfo &)*current_info);
149 auto result = new_info.get();
150 info[chunk_idx] = move(new_info);
151 return result;
152 }
153 }
154}
155
156void VersionManager::RevertAppend(row_t row_start, row_t row_end) {
157 auto write_lock = lock.GetExclusiveLock();
158
159 idx_t chunk_start = row_start / STANDARD_VECTOR_SIZE + (row_start % STANDARD_VECTOR_SIZE == 0 ? 0 : 1);
160 idx_t chunk_end = row_end / STANDARD_VECTOR_SIZE;
161 for (; chunk_start <= chunk_end; chunk_start++) {
162 info.erase(chunk_start);
163 }
164}
165