1#include "duckdb/execution/operator/schema/physical_create_index.hpp"
2
3#include "duckdb/catalog/catalog_entry/duck_table_entry.hpp"
4#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
5#include "duckdb/catalog/catalog_entry/duck_index_entry.hpp"
6#include "duckdb/main/client_context.hpp"
7#include "duckdb/storage/storage_manager.hpp"
8#include "duckdb/main/database_manager.hpp"
9#include "duckdb/execution/index/art/art_key.hpp"
10#include "duckdb/execution/index/art/node.hpp"
11#include "duckdb/execution/index/art/leaf.hpp"
12
13namespace duckdb {
14
15PhysicalCreateIndex::PhysicalCreateIndex(LogicalOperator &op, TableCatalogEntry &table_p,
16 const vector<column_t> &column_ids, unique_ptr<CreateIndexInfo> info,
17 vector<unique_ptr<Expression>> unbound_expressions,
18 idx_t estimated_cardinality)
19 : PhysicalOperator(PhysicalOperatorType::CREATE_INDEX, op.types, estimated_cardinality),
20 table(table_p.Cast<DuckTableEntry>()), info(std::move(info)),
21 unbound_expressions(std::move(unbound_expressions)) {
22 // convert virtual column ids to storage column ids
23 for (auto &column_id : column_ids) {
24 storage_ids.push_back(x: table.GetColumns().LogicalToPhysical(index: LogicalIndex(column_id)).index);
25 }
26}
27
28//===--------------------------------------------------------------------===//
29// Sink
30//===--------------------------------------------------------------------===//
31
32class CreateIndexGlobalSinkState : public GlobalSinkState {
33public:
34 //! Global index to be added to the table
35 unique_ptr<Index> global_index;
36};
37
38class CreateIndexLocalSinkState : public LocalSinkState {
39public:
40 explicit CreateIndexLocalSinkState(ClientContext &context) : arena_allocator(Allocator::Get(context)) {};
41
42 unique_ptr<Index> local_index;
43 ArenaAllocator arena_allocator;
44 vector<ARTKey> keys;
45 DataChunk key_chunk;
46 vector<column_t> key_column_ids;
47};
48
49unique_ptr<GlobalSinkState> PhysicalCreateIndex::GetGlobalSinkState(ClientContext &context) const {
50 auto state = make_uniq<CreateIndexGlobalSinkState>();
51
52 // create the global index
53 switch (info->index_type) {
54 case IndexType::ART: {
55 auto &storage = table.GetStorage();
56 state->global_index = make_uniq<ART>(args: storage_ids, args&: TableIOManager::Get(table&: storage), args: unbound_expressions,
57 args&: info->constraint_type, args&: storage.db);
58 break;
59 }
60 default:
61 throw InternalException("Unimplemented index type");
62 }
63 return (std::move(state));
64}
65
66unique_ptr<LocalSinkState> PhysicalCreateIndex::GetLocalSinkState(ExecutionContext &context) const {
67 auto state = make_uniq<CreateIndexLocalSinkState>(args&: context.client);
68
69 // create the local index
70 switch (info->index_type) {
71 case IndexType::ART: {
72 auto &storage = table.GetStorage();
73 state->local_index = make_uniq<ART>(args: storage_ids, args&: TableIOManager::Get(table&: storage), args: unbound_expressions,
74 args&: info->constraint_type, args&: storage.db);
75 break;
76 }
77 default:
78 throw InternalException("Unimplemented index type");
79 }
80 state->keys = vector<ARTKey>(STANDARD_VECTOR_SIZE);
81 state->key_chunk.Initialize(allocator&: Allocator::Get(context&: context.client), types: state->local_index->logical_types);
82
83 for (idx_t i = 0; i < state->key_chunk.ColumnCount(); i++) {
84 state->key_column_ids.push_back(x: i);
85 }
86 return std::move(state);
87}
88
89SinkResultType PhysicalCreateIndex::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const {
90
91 D_ASSERT(chunk.ColumnCount() >= 2);
92 auto &lstate = input.local_state.Cast<CreateIndexLocalSinkState>();
93 auto &row_identifiers = chunk.data[chunk.ColumnCount() - 1];
94
95 // generate the keys for the given input
96 lstate.key_chunk.ReferenceColumns(other&: chunk, column_ids: lstate.key_column_ids);
97 lstate.arena_allocator.Reset();
98 ART::GenerateKeys(allocator&: lstate.arena_allocator, input&: lstate.key_chunk, keys&: lstate.keys);
99
100 auto &storage = table.GetStorage();
101 auto art = make_uniq<ART>(args&: lstate.local_index->column_ids, args&: lstate.local_index->table_io_manager,
102 args&: lstate.local_index->unbound_expressions, args&: lstate.local_index->constraint_type, args&: storage.db);
103 if (!art->ConstructFromSorted(count: lstate.key_chunk.size(), keys&: lstate.keys, row_identifiers)) {
104 throw ConstraintException("Data contains duplicates on indexed column(s)");
105 }
106
107 // merge into the local ART
108 if (!lstate.local_index->MergeIndexes(other_index&: *art)) {
109 throw ConstraintException("Data contains duplicates on indexed column(s)");
110 }
111
112#ifdef DEBUG
113 // ensure that all row IDs of this chunk exist in the ART
114 auto row_ids = FlatVector::GetData<row_t>(row_identifiers);
115 for (idx_t i = 0; i < lstate.key_chunk.size(); i++) {
116 auto leaf_node =
117 lstate.local_index->Cast<ART>().Lookup(*lstate.local_index->Cast<ART>().tree, lstate.keys[i], 0);
118 D_ASSERT(leaf_node.IsSet());
119 auto &leaf = Leaf::Get(lstate.local_index->Cast<ART>(), leaf_node);
120
121 if (leaf.IsInlined()) {
122 D_ASSERT(row_ids[i] == leaf.row_ids.inlined);
123 continue;
124 }
125
126 D_ASSERT(leaf.row_ids.ptr.IsSet());
127 Node leaf_segment = leaf.row_ids.ptr;
128 auto position = leaf.FindRowId(lstate.local_index->Cast<ART>(), leaf_segment, row_ids[i]);
129 D_ASSERT(position != (uint32_t)DConstants::INVALID_INDEX);
130 }
131#endif
132
133 return SinkResultType::NEED_MORE_INPUT;
134}
135
136void PhysicalCreateIndex::Combine(ExecutionContext &context, GlobalSinkState &gstate_p,
137 LocalSinkState &lstate_p) const {
138
139 auto &gstate = gstate_p.Cast<CreateIndexGlobalSinkState>();
140 auto &lstate = lstate_p.Cast<CreateIndexLocalSinkState>();
141
142 // merge the local index into the global index
143 if (!gstate.global_index->MergeIndexes(other_index&: *lstate.local_index)) {
144 throw ConstraintException("Data contains duplicates on indexed column(s)");
145 }
146
147 // vacuum excess memory
148 gstate.global_index->Vacuum();
149}
150
151SinkFinalizeType PhysicalCreateIndex::Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
152 GlobalSinkState &gstate_p) const {
153
154 // here, we just set the resulting global index as the newly created index of the table
155
156 auto &state = gstate_p.Cast<CreateIndexGlobalSinkState>();
157 D_ASSERT(!state.global_index->VerifyAndToString(true).empty());
158
159 auto &storage = table.GetStorage();
160 if (!storage.IsRoot()) {
161 throw TransactionException("Transaction conflict: cannot add an index to a table that has been altered!");
162 }
163
164 auto &schema = table.schema;
165 auto index_entry = schema.CreateIndex(context, info&: *info, table).get();
166 if (!index_entry) {
167 D_ASSERT(info->on_conflict == OnCreateConflict::IGNORE_ON_CONFLICT);
168 // index already exists, but error ignored because of IF NOT EXISTS
169 return SinkFinalizeType::READY;
170 }
171 auto &index = index_entry->Cast<DuckIndexEntry>();
172
173 index.index = state.global_index.get();
174 index.info = storage.info;
175 for (auto &parsed_expr : info->parsed_expressions) {
176 index.parsed_expressions.push_back(x: parsed_expr->Copy());
177 }
178
179 // add index to storage
180 storage.info->indexes.AddIndex(index: std::move(state.global_index));
181 return SinkFinalizeType::READY;
182}
183
184//===--------------------------------------------------------------------===//
185// Source
186//===--------------------------------------------------------------------===//
187
188SourceResultType PhysicalCreateIndex::GetData(ExecutionContext &context, DataChunk &chunk,
189 OperatorSourceInput &input) const {
190 return SourceResultType::FINISHED;
191}
192
193} // namespace duckdb
194