1#include "duckdb/transaction/local_storage.hpp"
2#include "duckdb/execution/index/art/art.hpp"
3#include "duckdb/storage/table/append_state.hpp"
4#include "duckdb/storage/write_ahead_log.hpp"
5#include "duckdb/common/vector_operations/vector_operations.hpp"
6#include "duckdb/storage/uncompressed_segment.hpp"
7
8using namespace duckdb;
9using namespace std;
10
11LocalTableStorage::LocalTableStorage(DataTable &table) : max_row(0) {
12 for (auto &index : table.info->indexes) {
13 assert(index->type == IndexType::ART);
14 auto &art = (ART &)*index;
15 if (art.is_unique) {
16 // unique index: create a local ART index that maintains the same unique constraint
17 vector<unique_ptr<Expression>> unbound_expressions;
18 for (auto &expr : art.unbound_expressions) {
19 unbound_expressions.push_back(expr->Copy());
20 }
21 indexes.push_back(make_unique<ART>(art.column_ids, move(unbound_expressions), true));
22 }
23 }
24}
25
26LocalTableStorage::~LocalTableStorage() {
27}
28
29void LocalTableStorage::InitializeScan(LocalScanState &state) {
30 state.storage = this;
31
32 state.chunk_index = 0;
33 state.max_index = collection.chunks.size() - 1;
34 state.last_chunk_count = collection.chunks.back()->size();
35}
36
37void LocalTableStorage::Clear() {
38 collection.chunks.clear();
39 indexes.clear();
40 deleted_entries.clear();
41}
42
43void LocalStorage::InitializeScan(DataTable *table, LocalScanState &state) {
44 auto entry = table_storage.find(table);
45 if (entry == table_storage.end()) {
46 // no local storage for table: set scan to nullptr
47 state.storage = nullptr;
48 return;
49 }
50 state.storage = entry->second.get();
51 state.storage->InitializeScan(state);
52}
53
54void LocalStorage::Scan(LocalScanState &state, const vector<column_t> &column_ids, DataChunk &result,
55 unordered_map<idx_t, vector<TableFilter>> *table_filters) {
56 if (!state.storage || state.chunk_index > state.max_index) {
57 // nothing left to scan
58 result.Reset();
59 return;
60 }
61 auto &chunk = *state.storage->collection.chunks[state.chunk_index];
62 idx_t chunk_count = state.chunk_index == state.max_index ? state.last_chunk_count : chunk.size();
63 idx_t count = chunk_count;
64
65 // first create a selection vector from the deleted entries (if any)
66 SelectionVector valid_sel(STANDARD_VECTOR_SIZE);
67 auto entry = state.storage->deleted_entries.find(state.chunk_index);
68 if (entry != state.storage->deleted_entries.end()) {
69 // deleted entries! create a selection vector
70 auto deleted = entry->second.get();
71 idx_t new_count = 0;
72 for (idx_t i = 0; i < count; i++) {
73 if (!deleted[i]) {
74 valid_sel.set_index(new_count++, i);
75 }
76 }
77 if (new_count == 0 && count > 0) {
78 // all entries in this chunk were deleted: continue to next chunk
79 state.chunk_index++;
80 Scan(state, column_ids, result, table_filters);
81 return;
82 }
83 count = new_count;
84 }
85
86 SelectionVector sel;
87 if (count != chunk_count) {
88 sel.Initialize(valid_sel);
89 } else {
90 sel.Initialize(FlatVector::IncrementalSelectionVector);
91 }
92 // now scan the vectors of the chunk
93 for (idx_t i = 0; i < column_ids.size(); i++) {
94 auto id = column_ids[i];
95 if (id == COLUMN_IDENTIFIER_ROW_ID) {
96 // row identifier: return a sequence of rowids starting from MAX_ROW_ID plus the row offset in the chunk
97 result.data[i].Sequence(MAX_ROW_ID + state.chunk_index * STANDARD_VECTOR_SIZE, 1);
98 } else {
99 result.data[i].Reference(chunk.data[id]);
100 }
101 idx_t approved_tuple_count = count;
102 if (table_filters) {
103 auto column_filters = table_filters->find(i);
104 if (column_filters != table_filters->end()) {
105 //! We have filters to apply here
106 for (auto &column_filter : column_filters->second) {
107 nullmask_t nullmask = FlatVector::Nullmask(result.data[i]);
108 UncompressedSegment::filterSelection(sel, result.data[i], column_filter, approved_tuple_count,
109 nullmask);
110 }
111 count = approved_tuple_count;
112 }
113 }
114 }
115 if (count == 0) {
116 // all entries in this chunk were filtered:: Continue on next chunk
117 state.chunk_index++;
118 Scan(state, column_ids, result, table_filters);
119 return;
120 }
121 if (count == chunk_count) {
122 result.SetCardinality(count);
123 } else {
124 result.Slice(sel, count);
125 }
126 state.chunk_index++;
127}
128
129void LocalStorage::Append(DataTable *table, DataChunk &chunk) {
130 auto entry = table_storage.find(table);
131 LocalTableStorage *storage;
132 if (entry == table_storage.end()) {
133 auto new_storage = make_unique<LocalTableStorage>(*table);
134 storage = new_storage.get();
135 table_storage.insert(make_pair(table, move(new_storage)));
136 } else {
137 storage = entry->second.get();
138 }
139 // append to unique indices (if any)
140 if (storage->indexes.size() > 0) {
141 idx_t base_id = MAX_ROW_ID + storage->collection.count;
142
143 // first generate the vector of row identifiers
144 Vector row_ids(ROW_TYPE);
145 VectorOperations::GenerateSequence(row_ids, chunk.size(), base_id, 1);
146
147 // now append the entries to the indices
148 for (auto &index : storage->indexes) {
149 if (!index->Append(chunk, row_ids)) {
150 throw ConstraintException("PRIMARY KEY or UNIQUE constraint violated: duplicated key");
151 }
152 }
153 }
154
155 //! Append to the chunk
156 storage->collection.Append(chunk);
157}
158
159LocalTableStorage *LocalStorage::GetStorage(DataTable *table) {
160 auto entry = table_storage.find(table);
161 assert(entry != table_storage.end());
162 return entry->second.get();
163}
164
165static idx_t GetChunk(Vector &row_ids) {
166 auto ids = FlatVector::GetData<row_t>(row_ids);
167 auto first_id = ids[0] - MAX_ROW_ID;
168
169 return first_id / STANDARD_VECTOR_SIZE;
170}
171
172void LocalStorage::Delete(DataTable *table, Vector &row_ids, idx_t count) {
173 auto storage = GetStorage(table);
174 // figure out the chunk from which these row ids came
175 idx_t chunk_idx = GetChunk(row_ids);
176 assert(chunk_idx < storage->collection.chunks.size());
177
178 // get a pointer to the deleted entries for this chunk
179 bool *deleted;
180 auto entry = storage->deleted_entries.find(chunk_idx);
181 if (entry == storage->deleted_entries.end()) {
182 // nothing deleted yet, add the deleted entries
183 auto del_entries = unique_ptr<bool[]>(new bool[STANDARD_VECTOR_SIZE]);
184 memset(del_entries.get(), 0, sizeof(bool) * STANDARD_VECTOR_SIZE);
185 deleted = del_entries.get();
186 storage->deleted_entries.insert(make_pair(chunk_idx, move(del_entries)));
187 } else {
188 deleted = entry->second.get();
189 }
190
191 // now actually mark the entries as deleted in the deleted vector
192 idx_t base_index = MAX_ROW_ID + chunk_idx * STANDARD_VECTOR_SIZE;
193
194 auto ids = FlatVector::GetData<row_t>(row_ids);
195 for (idx_t i = 0; i < count; i++) {
196 auto id = ids[i] - base_index;
197 deleted[id] = true;
198 }
199}
200
201template <class T>
202static void update_data(Vector &data_vector, Vector &update_vector, Vector &row_ids, idx_t count, idx_t base_index) {
203 VectorData udata;
204 update_vector.Orrify(count, udata);
205
206 auto target = FlatVector::GetData<T>(data_vector);
207 auto &nullmask = FlatVector::Nullmask(data_vector);
208 auto ids = FlatVector::GetData<row_t>(row_ids);
209 auto updates = (T *)udata.data;
210
211 for (idx_t i = 0; i < count; i++) {
212 auto uidx = udata.sel->get_index(i);
213
214 auto id = ids[i] - base_index;
215 target[id] = updates[uidx];
216 nullmask[id] = (*udata.nullmask)[uidx];
217 }
218}
219
220static void update_chunk(Vector &data, Vector &updates, Vector &row_ids, idx_t count, idx_t base_index) {
221 assert(data.type == updates.type);
222 assert(row_ids.type == ROW_TYPE);
223
224 switch (data.type) {
225 case TypeId::INT8:
226 update_data<int8_t>(data, updates, row_ids, count, base_index);
227 break;
228 case TypeId::INT16:
229 update_data<int16_t>(data, updates, row_ids, count, base_index);
230 break;
231 case TypeId::INT32:
232 update_data<int32_t>(data, updates, row_ids, count, base_index);
233 break;
234 case TypeId::INT64:
235 update_data<int64_t>(data, updates, row_ids, count, base_index);
236 break;
237 case TypeId::FLOAT:
238 update_data<float>(data, updates, row_ids, count, base_index);
239 break;
240 case TypeId::DOUBLE:
241 update_data<double>(data, updates, row_ids, count, base_index);
242 break;
243 default:
244 throw Exception("Unsupported type for in-place update");
245 }
246}
247
248void LocalStorage::Update(DataTable *table, Vector &row_ids, vector<column_t> &column_ids, DataChunk &data) {
249 auto storage = GetStorage(table);
250 // figure out the chunk from which these row ids came
251 idx_t chunk_idx = GetChunk(row_ids);
252 assert(chunk_idx < storage->collection.chunks.size());
253
254 idx_t base_index = MAX_ROW_ID + chunk_idx * STANDARD_VECTOR_SIZE;
255
256 // now perform the actual update
257 auto &chunk = *storage->collection.chunks[chunk_idx];
258 for (idx_t i = 0; i < column_ids.size(); i++) {
259 auto col_idx = column_ids[i];
260 update_chunk(chunk.data[col_idx], data.data[i], row_ids, data.size(), base_index);
261 }
262}
263
264template <class T> bool LocalStorage::ScanTableStorage(DataTable *table, LocalTableStorage *storage, T &&fun) {
265 vector<column_t> column_ids;
266 for (idx_t i = 0; i < table->types.size(); i++) {
267 column_ids.push_back(i);
268 }
269
270 DataChunk chunk;
271 chunk.Initialize(table->types);
272
273 // initialize the scan
274 LocalScanState state;
275 storage->InitializeScan(state);
276
277 while (true) {
278 Scan(state, column_ids, chunk);
279 if (chunk.size() == 0) {
280 return true;
281 }
282 if (!fun(chunk)) {
283 return false;
284 }
285 }
286}
287
288void LocalStorage::Commit(LocalStorage::CommitState &commit_state, Transaction &transaction, WriteAheadLog *log,
289 transaction_t commit_id) {
290 // commit local storage, iterate over all entries in the table storage map
291 for (auto &entry : table_storage) {
292 auto table = entry.first;
293 auto storage = entry.second.get();
294
295 // initialize the append state
296 auto append_state_ptr = make_unique<TableAppendState>();
297 auto &append_state = *append_state_ptr;
298 // add it to the set of append states
299 commit_state.append_states[table] = move(append_state_ptr);
300 table->InitializeAppend(append_state);
301
302 if (log && !table->info->IsTemporary()) {
303 log->WriteSetTable(table->info->schema, table->info->table);
304 }
305
306 // scan all chunks in this storage
307 ScanTableStorage(table, storage, [&](DataChunk &chunk) -> bool {
308 // append this chunk to the indexes of the table
309 if (!table->AppendToIndexes(append_state, chunk, append_state.current_row)) {
310 throw ConstraintException("PRIMARY KEY or UNIQUE constraint violated: duplicated key");
311 }
312
313 // append to base table
314 table->Append(transaction, commit_id, chunk, append_state);
315 // if there is a WAL, write the chunk to there as well
316 if (log && !table->info->IsTemporary()) {
317 log->WriteInsert(chunk);
318 }
319 return true;
320 });
321 }
322 // finished commit: clear local storage
323 for (auto &entry : table_storage) {
324 entry.second->Clear();
325 }
326 table_storage.clear();
327}
328
329void LocalStorage::RevertCommit(LocalStorage::CommitState &commit_state) {
330 for (auto &entry : commit_state.append_states) {
331 auto table = entry.first;
332 auto storage = table_storage[table].get();
333 auto &append_state = *entry.second;
334 if (table->info->indexes.size() > 0 && !table->info->IsTemporary()) {
335 row_t current_row = append_state.row_start;
336 // remove the data from the indexes, if there are any indexes
337 ScanTableStorage(table, storage, [&](DataChunk &chunk) -> bool {
338 // append this chunk to the indexes of the table
339 table->RemoveFromIndexes(append_state, chunk, current_row);
340
341 current_row += chunk.size();
342 if (current_row >= append_state.current_row) {
343 // finished deleting all rows from the index: abort now
344 return false;
345 }
346 return true;
347 });
348 }
349
350 table->RevertAppend(*entry.second);
351 }
352}
353
354void LocalStorage::AddColumn(DataTable *old_dt, DataTable *new_dt, ColumnDefinition &new_column,
355 Expression *default_value) {
356 // check if there are any pending appends for the old version of the table
357 auto entry = table_storage.find(old_dt);
358 if (entry == table_storage.end()) {
359 return;
360 }
361 // take over the storage from the old entry
362 auto new_storage = move(entry->second);
363
364 // now add the new column filled with the default value to all chunks
365 auto new_column_type = GetInternalType(new_column.type);
366 ExpressionExecutor executor;
367 DataChunk dummy_chunk;
368 if (default_value) {
369 executor.AddExpression(*default_value);
370 }
371
372 new_storage->collection.types.push_back(new_column_type);
373 for (idx_t chunk_idx = 0; chunk_idx < new_storage->collection.chunks.size(); chunk_idx++) {
374 auto &chunk = new_storage->collection.chunks[chunk_idx];
375 Vector result(new_column_type);
376 if (default_value) {
377 dummy_chunk.SetCardinality(chunk->size());
378 executor.ExecuteExpression(dummy_chunk, result);
379 } else {
380 FlatVector::Nullmask(result).set();
381 }
382 chunk->data.push_back(move(result));
383 }
384
385 table_storage.erase(entry);
386 table_storage[new_dt] = move(new_storage);
387}
388
389void LocalStorage::ChangeType(DataTable *old_dt, DataTable *new_dt, idx_t changed_idx, SQLType target_type,
390 vector<column_t> bound_columns, Expression &cast_expr) {
391 // check if there are any pending appends for the old version of the table
392 auto entry = table_storage.find(old_dt);
393 if (entry == table_storage.end()) {
394 return;
395 }
396 throw NotImplementedException("FIXME: ALTER TYPE with transaction local data not currently supported");
397}
398