1 | #include "duckdb/transaction/local_storage.hpp" |
2 | #include "duckdb/execution/index/art/art.hpp" |
3 | #include "duckdb/storage/table/append_state.hpp" |
4 | #include "duckdb/storage/write_ahead_log.hpp" |
5 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
6 | #include "duckdb/storage/uncompressed_segment.hpp" |
7 | |
8 | using namespace duckdb; |
9 | using namespace std; |
10 | |
11 | LocalTableStorage::LocalTableStorage(DataTable &table) : max_row(0) { |
12 | for (auto &index : table.info->indexes) { |
13 | assert(index->type == IndexType::ART); |
14 | auto &art = (ART &)*index; |
15 | if (art.is_unique) { |
16 | // unique index: create a local ART index that maintains the same unique constraint |
17 | vector<unique_ptr<Expression>> unbound_expressions; |
18 | for (auto &expr : art.unbound_expressions) { |
19 | unbound_expressions.push_back(expr->Copy()); |
20 | } |
21 | indexes.push_back(make_unique<ART>(art.column_ids, move(unbound_expressions), true)); |
22 | } |
23 | } |
24 | } |
25 | |
26 | LocalTableStorage::~LocalTableStorage() { |
27 | } |
28 | |
29 | void LocalTableStorage::InitializeScan(LocalScanState &state) { |
30 | state.storage = this; |
31 | |
32 | state.chunk_index = 0; |
33 | state.max_index = collection.chunks.size() - 1; |
34 | state.last_chunk_count = collection.chunks.back()->size(); |
35 | } |
36 | |
37 | void LocalTableStorage::Clear() { |
38 | collection.chunks.clear(); |
39 | indexes.clear(); |
40 | deleted_entries.clear(); |
41 | } |
42 | |
43 | void LocalStorage::InitializeScan(DataTable *table, LocalScanState &state) { |
44 | auto entry = table_storage.find(table); |
45 | if (entry == table_storage.end()) { |
46 | // no local storage for table: set scan to nullptr |
47 | state.storage = nullptr; |
48 | return; |
49 | } |
50 | state.storage = entry->second.get(); |
51 | state.storage->InitializeScan(state); |
52 | } |
53 | |
54 | void LocalStorage::Scan(LocalScanState &state, const vector<column_t> &column_ids, DataChunk &result, |
55 | unordered_map<idx_t, vector<TableFilter>> *table_filters) { |
56 | if (!state.storage || state.chunk_index > state.max_index) { |
57 | // nothing left to scan |
58 | result.Reset(); |
59 | return; |
60 | } |
61 | auto &chunk = *state.storage->collection.chunks[state.chunk_index]; |
62 | idx_t chunk_count = state.chunk_index == state.max_index ? state.last_chunk_count : chunk.size(); |
63 | idx_t count = chunk_count; |
64 | |
65 | // first create a selection vector from the deleted entries (if any) |
66 | SelectionVector valid_sel(STANDARD_VECTOR_SIZE); |
67 | auto entry = state.storage->deleted_entries.find(state.chunk_index); |
68 | if (entry != state.storage->deleted_entries.end()) { |
69 | // deleted entries! create a selection vector |
70 | auto deleted = entry->second.get(); |
71 | idx_t new_count = 0; |
72 | for (idx_t i = 0; i < count; i++) { |
73 | if (!deleted[i]) { |
74 | valid_sel.set_index(new_count++, i); |
75 | } |
76 | } |
77 | if (new_count == 0 && count > 0) { |
78 | // all entries in this chunk were deleted: continue to next chunk |
79 | state.chunk_index++; |
80 | Scan(state, column_ids, result, table_filters); |
81 | return; |
82 | } |
83 | count = new_count; |
84 | } |
85 | |
86 | SelectionVector sel; |
87 | if (count != chunk_count) { |
88 | sel.Initialize(valid_sel); |
89 | } else { |
90 | sel.Initialize(FlatVector::IncrementalSelectionVector); |
91 | } |
92 | // now scan the vectors of the chunk |
93 | for (idx_t i = 0; i < column_ids.size(); i++) { |
94 | auto id = column_ids[i]; |
95 | if (id == COLUMN_IDENTIFIER_ROW_ID) { |
96 | // row identifier: return a sequence of rowids starting from MAX_ROW_ID plus the row offset in the chunk |
97 | result.data[i].Sequence(MAX_ROW_ID + state.chunk_index * STANDARD_VECTOR_SIZE, 1); |
98 | } else { |
99 | result.data[i].Reference(chunk.data[id]); |
100 | } |
101 | idx_t approved_tuple_count = count; |
102 | if (table_filters) { |
103 | auto column_filters = table_filters->find(i); |
104 | if (column_filters != table_filters->end()) { |
105 | //! We have filters to apply here |
106 | for (auto &column_filter : column_filters->second) { |
107 | nullmask_t nullmask = FlatVector::Nullmask(result.data[i]); |
108 | UncompressedSegment::filterSelection(sel, result.data[i], column_filter, approved_tuple_count, |
109 | nullmask); |
110 | } |
111 | count = approved_tuple_count; |
112 | } |
113 | } |
114 | } |
115 | if (count == 0) { |
116 | // all entries in this chunk were filtered:: Continue on next chunk |
117 | state.chunk_index++; |
118 | Scan(state, column_ids, result, table_filters); |
119 | return; |
120 | } |
121 | if (count == chunk_count) { |
122 | result.SetCardinality(count); |
123 | } else { |
124 | result.Slice(sel, count); |
125 | } |
126 | state.chunk_index++; |
127 | } |
128 | |
129 | void LocalStorage::Append(DataTable *table, DataChunk &chunk) { |
130 | auto entry = table_storage.find(table); |
131 | LocalTableStorage *storage; |
132 | if (entry == table_storage.end()) { |
133 | auto new_storage = make_unique<LocalTableStorage>(*table); |
134 | storage = new_storage.get(); |
135 | table_storage.insert(make_pair(table, move(new_storage))); |
136 | } else { |
137 | storage = entry->second.get(); |
138 | } |
139 | // append to unique indices (if any) |
140 | if (storage->indexes.size() > 0) { |
141 | idx_t base_id = MAX_ROW_ID + storage->collection.count; |
142 | |
143 | // first generate the vector of row identifiers |
144 | Vector row_ids(ROW_TYPE); |
145 | VectorOperations::GenerateSequence(row_ids, chunk.size(), base_id, 1); |
146 | |
147 | // now append the entries to the indices |
148 | for (auto &index : storage->indexes) { |
149 | if (!index->Append(chunk, row_ids)) { |
150 | throw ConstraintException("PRIMARY KEY or UNIQUE constraint violated: duplicated key" ); |
151 | } |
152 | } |
153 | } |
154 | |
155 | //! Append to the chunk |
156 | storage->collection.Append(chunk); |
157 | } |
158 | |
159 | LocalTableStorage *LocalStorage::GetStorage(DataTable *table) { |
160 | auto entry = table_storage.find(table); |
161 | assert(entry != table_storage.end()); |
162 | return entry->second.get(); |
163 | } |
164 | |
165 | static idx_t GetChunk(Vector &row_ids) { |
166 | auto ids = FlatVector::GetData<row_t>(row_ids); |
167 | auto first_id = ids[0] - MAX_ROW_ID; |
168 | |
169 | return first_id / STANDARD_VECTOR_SIZE; |
170 | } |
171 | |
172 | void LocalStorage::Delete(DataTable *table, Vector &row_ids, idx_t count) { |
173 | auto storage = GetStorage(table); |
174 | // figure out the chunk from which these row ids came |
175 | idx_t chunk_idx = GetChunk(row_ids); |
176 | assert(chunk_idx < storage->collection.chunks.size()); |
177 | |
178 | // get a pointer to the deleted entries for this chunk |
179 | bool *deleted; |
180 | auto entry = storage->deleted_entries.find(chunk_idx); |
181 | if (entry == storage->deleted_entries.end()) { |
182 | // nothing deleted yet, add the deleted entries |
183 | auto del_entries = unique_ptr<bool[]>(new bool[STANDARD_VECTOR_SIZE]); |
184 | memset(del_entries.get(), 0, sizeof(bool) * STANDARD_VECTOR_SIZE); |
185 | deleted = del_entries.get(); |
186 | storage->deleted_entries.insert(make_pair(chunk_idx, move(del_entries))); |
187 | } else { |
188 | deleted = entry->second.get(); |
189 | } |
190 | |
191 | // now actually mark the entries as deleted in the deleted vector |
192 | idx_t base_index = MAX_ROW_ID + chunk_idx * STANDARD_VECTOR_SIZE; |
193 | |
194 | auto ids = FlatVector::GetData<row_t>(row_ids); |
195 | for (idx_t i = 0; i < count; i++) { |
196 | auto id = ids[i] - base_index; |
197 | deleted[id] = true; |
198 | } |
199 | } |
200 | |
201 | template <class T> |
202 | static void update_data(Vector &data_vector, Vector &update_vector, Vector &row_ids, idx_t count, idx_t base_index) { |
203 | VectorData udata; |
204 | update_vector.Orrify(count, udata); |
205 | |
206 | auto target = FlatVector::GetData<T>(data_vector); |
207 | auto &nullmask = FlatVector::Nullmask(data_vector); |
208 | auto ids = FlatVector::GetData<row_t>(row_ids); |
209 | auto updates = (T *)udata.data; |
210 | |
211 | for (idx_t i = 0; i < count; i++) { |
212 | auto uidx = udata.sel->get_index(i); |
213 | |
214 | auto id = ids[i] - base_index; |
215 | target[id] = updates[uidx]; |
216 | nullmask[id] = (*udata.nullmask)[uidx]; |
217 | } |
218 | } |
219 | |
220 | static void update_chunk(Vector &data, Vector &updates, Vector &row_ids, idx_t count, idx_t base_index) { |
221 | assert(data.type == updates.type); |
222 | assert(row_ids.type == ROW_TYPE); |
223 | |
224 | switch (data.type) { |
225 | case TypeId::INT8: |
226 | update_data<int8_t>(data, updates, row_ids, count, base_index); |
227 | break; |
228 | case TypeId::INT16: |
229 | update_data<int16_t>(data, updates, row_ids, count, base_index); |
230 | break; |
231 | case TypeId::INT32: |
232 | update_data<int32_t>(data, updates, row_ids, count, base_index); |
233 | break; |
234 | case TypeId::INT64: |
235 | update_data<int64_t>(data, updates, row_ids, count, base_index); |
236 | break; |
237 | case TypeId::FLOAT: |
238 | update_data<float>(data, updates, row_ids, count, base_index); |
239 | break; |
240 | case TypeId::DOUBLE: |
241 | update_data<double>(data, updates, row_ids, count, base_index); |
242 | break; |
243 | default: |
244 | throw Exception("Unsupported type for in-place update" ); |
245 | } |
246 | } |
247 | |
248 | void LocalStorage::Update(DataTable *table, Vector &row_ids, vector<column_t> &column_ids, DataChunk &data) { |
249 | auto storage = GetStorage(table); |
250 | // figure out the chunk from which these row ids came |
251 | idx_t chunk_idx = GetChunk(row_ids); |
252 | assert(chunk_idx < storage->collection.chunks.size()); |
253 | |
254 | idx_t base_index = MAX_ROW_ID + chunk_idx * STANDARD_VECTOR_SIZE; |
255 | |
256 | // now perform the actual update |
257 | auto &chunk = *storage->collection.chunks[chunk_idx]; |
258 | for (idx_t i = 0; i < column_ids.size(); i++) { |
259 | auto col_idx = column_ids[i]; |
260 | update_chunk(chunk.data[col_idx], data.data[i], row_ids, data.size(), base_index); |
261 | } |
262 | } |
263 | |
264 | template <class T> bool LocalStorage::ScanTableStorage(DataTable *table, LocalTableStorage *storage, T &&fun) { |
265 | vector<column_t> column_ids; |
266 | for (idx_t i = 0; i < table->types.size(); i++) { |
267 | column_ids.push_back(i); |
268 | } |
269 | |
270 | DataChunk chunk; |
271 | chunk.Initialize(table->types); |
272 | |
273 | // initialize the scan |
274 | LocalScanState state; |
275 | storage->InitializeScan(state); |
276 | |
277 | while (true) { |
278 | Scan(state, column_ids, chunk); |
279 | if (chunk.size() == 0) { |
280 | return true; |
281 | } |
282 | if (!fun(chunk)) { |
283 | return false; |
284 | } |
285 | } |
286 | } |
287 | |
288 | void LocalStorage::Commit(LocalStorage::CommitState &commit_state, Transaction &transaction, WriteAheadLog *log, |
289 | transaction_t commit_id) { |
290 | // commit local storage, iterate over all entries in the table storage map |
291 | for (auto &entry : table_storage) { |
292 | auto table = entry.first; |
293 | auto storage = entry.second.get(); |
294 | |
295 | // initialize the append state |
296 | auto append_state_ptr = make_unique<TableAppendState>(); |
297 | auto &append_state = *append_state_ptr; |
298 | // add it to the set of append states |
299 | commit_state.append_states[table] = move(append_state_ptr); |
300 | table->InitializeAppend(append_state); |
301 | |
302 | if (log && !table->info->IsTemporary()) { |
303 | log->WriteSetTable(table->info->schema, table->info->table); |
304 | } |
305 | |
306 | // scan all chunks in this storage |
307 | ScanTableStorage(table, storage, [&](DataChunk &chunk) -> bool { |
308 | // append this chunk to the indexes of the table |
309 | if (!table->AppendToIndexes(append_state, chunk, append_state.current_row)) { |
310 | throw ConstraintException("PRIMARY KEY or UNIQUE constraint violated: duplicated key" ); |
311 | } |
312 | |
313 | // append to base table |
314 | table->Append(transaction, commit_id, chunk, append_state); |
315 | // if there is a WAL, write the chunk to there as well |
316 | if (log && !table->info->IsTemporary()) { |
317 | log->WriteInsert(chunk); |
318 | } |
319 | return true; |
320 | }); |
321 | } |
322 | // finished commit: clear local storage |
323 | for (auto &entry : table_storage) { |
324 | entry.second->Clear(); |
325 | } |
326 | table_storage.clear(); |
327 | } |
328 | |
329 | void LocalStorage::RevertCommit(LocalStorage::CommitState &commit_state) { |
330 | for (auto &entry : commit_state.append_states) { |
331 | auto table = entry.first; |
332 | auto storage = table_storage[table].get(); |
333 | auto &append_state = *entry.second; |
334 | if (table->info->indexes.size() > 0 && !table->info->IsTemporary()) { |
335 | row_t current_row = append_state.row_start; |
336 | // remove the data from the indexes, if there are any indexes |
337 | ScanTableStorage(table, storage, [&](DataChunk &chunk) -> bool { |
338 | // append this chunk to the indexes of the table |
339 | table->RemoveFromIndexes(append_state, chunk, current_row); |
340 | |
341 | current_row += chunk.size(); |
342 | if (current_row >= append_state.current_row) { |
343 | // finished deleting all rows from the index: abort now |
344 | return false; |
345 | } |
346 | return true; |
347 | }); |
348 | } |
349 | |
350 | table->RevertAppend(*entry.second); |
351 | } |
352 | } |
353 | |
354 | void LocalStorage::AddColumn(DataTable *old_dt, DataTable *new_dt, ColumnDefinition &new_column, |
355 | Expression *default_value) { |
356 | // check if there are any pending appends for the old version of the table |
357 | auto entry = table_storage.find(old_dt); |
358 | if (entry == table_storage.end()) { |
359 | return; |
360 | } |
361 | // take over the storage from the old entry |
362 | auto new_storage = move(entry->second); |
363 | |
364 | // now add the new column filled with the default value to all chunks |
365 | auto new_column_type = GetInternalType(new_column.type); |
366 | ExpressionExecutor executor; |
367 | DataChunk dummy_chunk; |
368 | if (default_value) { |
369 | executor.AddExpression(*default_value); |
370 | } |
371 | |
372 | new_storage->collection.types.push_back(new_column_type); |
373 | for (idx_t chunk_idx = 0; chunk_idx < new_storage->collection.chunks.size(); chunk_idx++) { |
374 | auto &chunk = new_storage->collection.chunks[chunk_idx]; |
375 | Vector result(new_column_type); |
376 | if (default_value) { |
377 | dummy_chunk.SetCardinality(chunk->size()); |
378 | executor.ExecuteExpression(dummy_chunk, result); |
379 | } else { |
380 | FlatVector::Nullmask(result).set(); |
381 | } |
382 | chunk->data.push_back(move(result)); |
383 | } |
384 | |
385 | table_storage.erase(entry); |
386 | table_storage[new_dt] = move(new_storage); |
387 | } |
388 | |
389 | void LocalStorage::ChangeType(DataTable *old_dt, DataTable *new_dt, idx_t changed_idx, SQLType target_type, |
390 | vector<column_t> bound_columns, Expression &cast_expr) { |
391 | // check if there are any pending appends for the old version of the table |
392 | auto entry = table_storage.find(old_dt); |
393 | if (entry == table_storage.end()) { |
394 | return; |
395 | } |
396 | throw NotImplementedException("FIXME: ALTER TYPE with transaction local data not currently supported" ); |
397 | } |
398 | |