1 | #include "duckdb/execution/operator/persistent/physical_batch_insert.hpp" |
2 | |
3 | #include "duckdb/parallel/thread_context.hpp" |
4 | #include "duckdb/storage/data_table.hpp" |
5 | #include "duckdb/storage/table/row_group_collection.hpp" |
6 | #include "duckdb/storage/table_io_manager.hpp" |
7 | #include "duckdb/transaction/local_storage.hpp" |
8 | #include "duckdb/catalog/catalog_entry/duck_table_entry.hpp" |
9 | #include "duckdb/storage/table/append_state.hpp" |
10 | #include "duckdb/storage/table/scan_state.hpp" |
11 | |
12 | namespace duckdb { |
13 | |
14 | PhysicalBatchInsert::PhysicalBatchInsert(vector<LogicalType> types, TableCatalogEntry &table, |
15 | physical_index_vector_t<idx_t> column_index_map, |
16 | vector<unique_ptr<Expression>> bound_defaults, idx_t estimated_cardinality) |
17 | : PhysicalOperator(PhysicalOperatorType::BATCH_INSERT, std::move(types), estimated_cardinality), |
18 | column_index_map(std::move(column_index_map)), insert_table(&table), insert_types(table.GetTypes()), |
19 | bound_defaults(std::move(bound_defaults)) { |
20 | } |
21 | |
22 | PhysicalBatchInsert::PhysicalBatchInsert(LogicalOperator &op, SchemaCatalogEntry &schema, |
23 | unique_ptr<BoundCreateTableInfo> info_p, idx_t estimated_cardinality) |
24 | : PhysicalOperator(PhysicalOperatorType::BATCH_CREATE_TABLE_AS, op.types, estimated_cardinality), |
25 | insert_table(nullptr), schema(&schema), info(std::move(info_p)) { |
26 | PhysicalInsert::GetInsertInfo(info: *info, insert_types, bound_defaults); |
27 | } |
28 | |
29 | //===--------------------------------------------------------------------===// |
30 | // Sink |
31 | //===--------------------------------------------------------------------===// |
32 | |
33 | class CollectionMerger { |
34 | public: |
35 | explicit CollectionMerger(ClientContext &context) : context(context) { |
36 | } |
37 | |
38 | ClientContext &context; |
39 | vector<unique_ptr<RowGroupCollection>> current_collections; |
40 | |
41 | public: |
42 | void AddCollection(unique_ptr<RowGroupCollection> collection) { |
43 | current_collections.push_back(x: std::move(collection)); |
44 | } |
45 | |
46 | bool Empty() { |
47 | return current_collections.empty(); |
48 | } |
49 | |
50 | unique_ptr<RowGroupCollection> Flush(OptimisticDataWriter &writer) { |
51 | if (Empty()) { |
52 | return nullptr; |
53 | } |
54 | unique_ptr<RowGroupCollection> new_collection = std::move(current_collections[0]); |
55 | if (current_collections.size() > 1) { |
56 | // we have gathered multiple collections: create one big collection and merge that |
57 | auto &types = new_collection->GetTypes(); |
58 | TableAppendState append_state; |
59 | new_collection->InitializeAppend(state&: append_state); |
60 | |
61 | DataChunk scan_chunk; |
62 | scan_chunk.Initialize(context, types); |
63 | |
64 | vector<column_t> column_ids; |
65 | for (idx_t i = 0; i < types.size(); i++) { |
66 | column_ids.push_back(x: i); |
67 | } |
68 | for (auto &collection : current_collections) { |
69 | if (!collection) { |
70 | continue; |
71 | } |
72 | TableScanState scan_state; |
73 | scan_state.Initialize(column_ids); |
74 | collection->InitializeScan(state&: scan_state.local_state, column_ids, table_filters: nullptr); |
75 | |
76 | while (true) { |
77 | scan_chunk.Reset(); |
78 | scan_state.local_state.ScanCommitted(result&: scan_chunk, type: TableScanType::TABLE_SCAN_COMMITTED_ROWS); |
79 | if (scan_chunk.size() == 0) { |
80 | break; |
81 | } |
82 | auto new_row_group = new_collection->Append(chunk&: scan_chunk, state&: append_state); |
83 | if (new_row_group) { |
84 | writer.WriteNewRowGroup(row_groups&: *new_collection); |
85 | } |
86 | } |
87 | } |
88 | new_collection->FinalizeAppend(transaction: TransactionData(0, 0), state&: append_state); |
89 | writer.WriteLastRowGroup(row_groups&: *new_collection); |
90 | } |
91 | current_collections.clear(); |
92 | return new_collection; |
93 | } |
94 | }; |
95 | |
96 | enum class RowGroupBatchType : uint8_t { FLUSHED, NOT_FLUSHED }; |
97 | struct RowGroupBatchEntry { |
98 | RowGroupBatchEntry(idx_t batch_idx, unique_ptr<RowGroupCollection> collection_p, RowGroupBatchType type) |
99 | : batch_idx(batch_idx), total_rows(collection_p->GetTotalRows()), collection(std::move(collection_p)), |
100 | type(type) { |
101 | } |
102 | |
103 | idx_t batch_idx; |
104 | idx_t total_rows; |
105 | unique_ptr<RowGroupCollection> collection; |
106 | RowGroupBatchType type; |
107 | }; |
108 | |
109 | class BatchInsertGlobalState : public GlobalSinkState { |
110 | public: |
111 | static constexpr const idx_t BATCH_FLUSH_THRESHOLD = LocalStorage::MERGE_THRESHOLD * 3; |
112 | |
113 | public: |
114 | explicit BatchInsertGlobalState(DuckTableEntry &table) : table(table), insert_count(0) { |
115 | } |
116 | |
117 | mutex lock; |
118 | DuckTableEntry &table; |
119 | idx_t insert_count; |
120 | vector<RowGroupBatchEntry> collections; |
121 | idx_t next_start = 0; |
122 | |
123 | void FindMergeCollections(idx_t min_batch_index, optional_idx &merged_batch_index, |
124 | vector<unique_ptr<RowGroupCollection>> &result) { |
125 | bool merge = false; |
126 | idx_t start_index = next_start; |
127 | idx_t current_idx; |
128 | idx_t total_count = 0; |
129 | for (current_idx = start_index; current_idx < collections.size(); current_idx++) { |
130 | auto &entry = collections[current_idx]; |
131 | if (entry.batch_idx >= min_batch_index) { |
132 | // this entry is AFTER the min_batch_index |
133 | // we might still find new entries! |
134 | break; |
135 | } |
136 | if (entry.type == RowGroupBatchType::FLUSHED) { |
137 | // already flushed: cannot flush anything here |
138 | if (total_count > 0) { |
139 | merge = true; |
140 | break; |
141 | } |
142 | start_index = current_idx + 1; |
143 | if (start_index > next_start) { |
144 | // avoid checking this segment again in the future |
145 | next_start = start_index; |
146 | } |
147 | total_count = 0; |
148 | continue; |
149 | } |
150 | // not flushed - add to set of indexes to flush |
151 | total_count += entry.total_rows; |
152 | if (total_count >= BATCH_FLUSH_THRESHOLD) { |
153 | merge = true; |
154 | break; |
155 | } |
156 | } |
157 | if (merge && total_count > 0) { |
158 | D_ASSERT(current_idx > start_index); |
159 | merged_batch_index = collections[start_index].batch_idx; |
160 | for (idx_t idx = start_index; idx < current_idx; idx++) { |
161 | auto &entry = collections[idx]; |
162 | if (!entry.collection || entry.type == RowGroupBatchType::FLUSHED) { |
163 | throw InternalException("Adding a row group collection that should not be flushed" ); |
164 | } |
165 | result.push_back(x: std::move(entry.collection)); |
166 | entry.total_rows = total_count; |
167 | entry.type = RowGroupBatchType::FLUSHED; |
168 | } |
169 | if (start_index + 1 < current_idx) { |
170 | // erase all entries except the first one |
171 | collections.erase(first: collections.begin() + start_index + 1, last: collections.begin() + current_idx); |
172 | } |
173 | } |
174 | } |
175 | |
176 | unique_ptr<RowGroupCollection> MergeCollections(ClientContext &context, |
177 | vector<unique_ptr<RowGroupCollection>> merge_collections, |
178 | OptimisticDataWriter &writer) { |
179 | CollectionMerger merger(context); |
180 | for (auto &collection : merge_collections) { |
181 | merger.AddCollection(collection: std::move(collection)); |
182 | } |
183 | return merger.Flush(writer); |
184 | } |
185 | |
186 | void AddCollection(ClientContext &context, idx_t batch_index, idx_t min_batch_index, |
187 | unique_ptr<RowGroupCollection> current_collection, |
188 | optional_ptr<OptimisticDataWriter> writer = nullptr, |
189 | optional_ptr<bool> written_to_disk = nullptr) { |
190 | if (batch_index < min_batch_index) { |
191 | throw InternalException( |
192 | "Batch index of the added collection (%llu) is smaller than the min batch index (%llu)" , batch_index, |
193 | min_batch_index); |
194 | } |
195 | auto new_count = current_collection->GetTotalRows(); |
196 | auto batch_type = |
197 | new_count < RowGroup::ROW_GROUP_SIZE ? RowGroupBatchType::NOT_FLUSHED : RowGroupBatchType::FLUSHED; |
198 | if (batch_type == RowGroupBatchType::FLUSHED && writer) { |
199 | writer->WriteLastRowGroup(row_groups&: *current_collection); |
200 | } |
201 | optional_idx merged_batch_index; |
202 | vector<unique_ptr<RowGroupCollection>> merge_collections; |
203 | { |
204 | lock_guard<mutex> l(lock); |
205 | insert_count += new_count; |
206 | |
207 | // add the collection to the batch index |
208 | RowGroupBatchEntry new_entry(batch_index, std::move(current_collection), batch_type); |
209 | |
210 | auto it = std::lower_bound( |
211 | first: collections.begin(), last: collections.end(), val: new_entry, |
212 | comp: [&](const RowGroupBatchEntry &a, const RowGroupBatchEntry &b) { return a.batch_idx < b.batch_idx; }); |
213 | if (it != collections.end() && it->batch_idx == new_entry.batch_idx) { |
214 | throw InternalException( |
215 | "PhysicalBatchInsert::AddCollection error: batch index %d is present in multiple " |
216 | "collections. This occurs when " |
217 | "batch indexes are not uniquely distributed over threads" , |
218 | batch_index); |
219 | } |
220 | collections.insert(position: it, x: std::move(new_entry)); |
221 | if (writer) { |
222 | FindMergeCollections(min_batch_index, merged_batch_index, result&: merge_collections); |
223 | } |
224 | } |
225 | if (!merge_collections.empty()) { |
226 | // merge together the collections |
227 | D_ASSERT(writer); |
228 | auto final_collection = MergeCollections(context, merge_collections: std::move(merge_collections), writer&: *writer); |
229 | if (written_to_disk) { |
230 | *written_to_disk = true; |
231 | } |
232 | // add the merged-together collection to the set of batch indexes |
233 | { |
234 | lock_guard<mutex> l(lock); |
235 | RowGroupBatchEntry new_entry(merged_batch_index.GetIndex(), std::move(final_collection), |
236 | RowGroupBatchType::FLUSHED); |
237 | auto it = std::lower_bound(first: collections.begin(), last: collections.end(), val: new_entry, |
238 | comp: [&](const RowGroupBatchEntry &a, const RowGroupBatchEntry &b) { |
239 | return a.batch_idx < b.batch_idx; |
240 | }); |
241 | if (it->batch_idx != merged_batch_index.GetIndex()) { |
242 | throw InternalException("Merged batch index was no longer present in collection" ); |
243 | } |
244 | it->collection = std::move(new_entry.collection); |
245 | } |
246 | } |
247 | } |
248 | }; |
249 | |
250 | class BatchInsertLocalState : public LocalSinkState { |
251 | public: |
252 | BatchInsertLocalState(ClientContext &context, const vector<LogicalType> &types, |
253 | const vector<unique_ptr<Expression>> &bound_defaults) |
254 | : default_executor(context, bound_defaults), written_to_disk(false) { |
255 | insert_chunk.Initialize(allocator&: Allocator::Get(context), types); |
256 | } |
257 | |
258 | DataChunk insert_chunk; |
259 | ExpressionExecutor default_executor; |
260 | idx_t current_index; |
261 | TableAppendState current_append_state; |
262 | unique_ptr<RowGroupCollection> current_collection; |
263 | optional_ptr<OptimisticDataWriter> writer; |
264 | bool written_to_disk; |
265 | |
266 | void CreateNewCollection(DuckTableEntry &table, const vector<LogicalType> &insert_types) { |
267 | auto &table_info = table.GetStorage().info; |
268 | auto &block_manager = TableIOManager::Get(table&: table.GetStorage()).GetBlockManagerForRowData(); |
269 | current_collection = make_uniq<RowGroupCollection>(args&: table_info, args&: block_manager, args: insert_types, args: MAX_ROW_ID); |
270 | current_collection->InitializeEmpty(); |
271 | current_collection->InitializeAppend(state&: current_append_state); |
272 | written_to_disk = false; |
273 | } |
274 | }; |
275 | |
276 | unique_ptr<GlobalSinkState> PhysicalBatchInsert::GetGlobalSinkState(ClientContext &context) const { |
277 | optional_ptr<TableCatalogEntry> table; |
278 | if (info) { |
279 | // CREATE TABLE AS |
280 | D_ASSERT(!insert_table); |
281 | auto &catalog = schema->catalog; |
282 | auto created_table = catalog.CreateTable(transaction: catalog.GetCatalogTransaction(context), schema&: *schema.get_mutable(), info&: *info); |
283 | table = &created_table->Cast<TableCatalogEntry>(); |
284 | } else { |
285 | D_ASSERT(insert_table); |
286 | D_ASSERT(insert_table->IsDuckTable()); |
287 | table = insert_table.get_mutable(); |
288 | } |
289 | auto result = make_uniq<BatchInsertGlobalState>(args&: table->Cast<DuckTableEntry>()); |
290 | return std::move(result); |
291 | } |
292 | |
293 | unique_ptr<LocalSinkState> PhysicalBatchInsert::GetLocalSinkState(ExecutionContext &context) const { |
294 | return make_uniq<BatchInsertLocalState>(args&: context.client, args: insert_types, args: bound_defaults); |
295 | } |
296 | |
297 | void PhysicalBatchInsert::NextBatch(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate_p) const { |
298 | auto &gstate = state.Cast<BatchInsertGlobalState>(); |
299 | auto &lstate = lstate_p.Cast<BatchInsertLocalState>(); |
300 | |
301 | auto &table = gstate.table; |
302 | auto batch_index = lstate.partition_info.batch_index.GetIndex(); |
303 | if (lstate.current_collection) { |
304 | if (lstate.current_index == batch_index) { |
305 | throw InternalException("NextBatch called with the same batch index?" ); |
306 | } |
307 | // batch index has changed: move the old collection to the global state and create a new collection |
308 | TransactionData tdata(0, 0); |
309 | lstate.current_collection->FinalizeAppend(transaction: tdata, state&: lstate.current_append_state); |
310 | gstate.AddCollection(context&: context.client, batch_index: lstate.current_index, min_batch_index: lstate.partition_info.min_batch_index.GetIndex(), |
311 | current_collection: std::move(lstate.current_collection), writer: lstate.writer, written_to_disk: &lstate.written_to_disk); |
312 | lstate.CreateNewCollection(table, insert_types); |
313 | } |
314 | lstate.current_index = batch_index; |
315 | } |
316 | |
317 | SinkResultType PhysicalBatchInsert::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { |
318 | auto &gstate = input.global_state.Cast<BatchInsertGlobalState>(); |
319 | auto &lstate = input.local_state.Cast<BatchInsertLocalState>(); |
320 | |
321 | auto &table = gstate.table; |
322 | PhysicalInsert::ResolveDefaults(table, chunk, column_index_map, defaults_executor&: lstate.default_executor, result&: lstate.insert_chunk); |
323 | |
324 | auto batch_index = lstate.partition_info.batch_index.GetIndex(); |
325 | if (!lstate.current_collection) { |
326 | lock_guard<mutex> l(gstate.lock); |
327 | // no collection yet: create a new one |
328 | lstate.CreateNewCollection(table, insert_types); |
329 | lstate.writer = &table.GetStorage().CreateOptimisticWriter(context&: context.client); |
330 | } else if (lstate.current_index != batch_index) { |
331 | throw InternalException("Current batch differs from batch - but NextBatch was not called!?" ); |
332 | } |
333 | lstate.current_index = batch_index; |
334 | |
335 | table.GetStorage().VerifyAppendConstraints(table, context&: context.client, chunk&: lstate.insert_chunk); |
336 | |
337 | auto new_row_group = lstate.current_collection->Append(chunk&: lstate.insert_chunk, state&: lstate.current_append_state); |
338 | if (new_row_group) { |
339 | // we have already written to disk - flush the next row group as well |
340 | lstate.writer->WriteNewRowGroup(row_groups&: *lstate.current_collection); |
341 | lstate.written_to_disk = true; |
342 | } |
343 | return SinkResultType::NEED_MORE_INPUT; |
344 | } |
345 | |
346 | void PhysicalBatchInsert::Combine(ExecutionContext &context, GlobalSinkState &gstate_p, |
347 | LocalSinkState &lstate_p) const { |
348 | auto &gstate = gstate_p.Cast<BatchInsertGlobalState>(); |
349 | auto &lstate = lstate_p.Cast<BatchInsertLocalState>(); |
350 | auto &client_profiler = QueryProfiler::Get(context&: context.client); |
351 | context.thread.profiler.Flush(phys_op: *this, expression_executor&: lstate.default_executor, name: "default_executor" , id: 1); |
352 | client_profiler.Flush(profiler&: context.thread.profiler); |
353 | |
354 | if (!lstate.current_collection) { |
355 | return; |
356 | } |
357 | |
358 | if (lstate.current_collection->GetTotalRows() > 0) { |
359 | TransactionData tdata(0, 0); |
360 | lstate.current_collection->FinalizeAppend(transaction: tdata, state&: lstate.current_append_state); |
361 | gstate.AddCollection(context&: context.client, batch_index: lstate.current_index, min_batch_index: lstate.partition_info.min_batch_index.GetIndex(), |
362 | current_collection: std::move(lstate.current_collection)); |
363 | } |
364 | { |
365 | lock_guard<mutex> l(gstate.lock); |
366 | gstate.table.GetStorage().FinalizeOptimisticWriter(context&: context.client, writer&: *lstate.writer); |
367 | } |
368 | } |
369 | |
370 | SinkFinalizeType PhysicalBatchInsert::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, |
371 | GlobalSinkState &gstate_p) const { |
372 | auto &gstate = gstate_p.Cast<BatchInsertGlobalState>(); |
373 | |
374 | // in the finalize, do a final pass over all of the collections we created and try to merge smaller collections |
375 | // together |
376 | vector<unique_ptr<CollectionMerger>> mergers; |
377 | unique_ptr<CollectionMerger> current_merger; |
378 | |
379 | auto &storage = gstate.table.GetStorage(); |
380 | for (auto &entry : gstate.collections) { |
381 | if (entry.type == RowGroupBatchType::NOT_FLUSHED) { |
382 | // this collection has not been flushed: add it to the merge set |
383 | if (!current_merger) { |
384 | current_merger = make_uniq<CollectionMerger>(args&: context); |
385 | } |
386 | current_merger->AddCollection(collection: std::move(entry.collection)); |
387 | } else { |
388 | // this collection has been flushed: it does not need to be merged |
389 | // create a separate collection merger only for this entry |
390 | if (current_merger) { |
391 | // we have small collections remaining: flush them |
392 | mergers.push_back(x: std::move(current_merger)); |
393 | current_merger.reset(); |
394 | } |
395 | auto larger_merger = make_uniq<CollectionMerger>(args&: context); |
396 | larger_merger->AddCollection(collection: std::move(entry.collection)); |
397 | mergers.push_back(x: std::move(larger_merger)); |
398 | } |
399 | } |
400 | if (current_merger) { |
401 | mergers.push_back(x: std::move(current_merger)); |
402 | } |
403 | |
404 | // now that we have created all of the mergers, perform the actual merging |
405 | vector<unique_ptr<RowGroupCollection>> final_collections; |
406 | final_collections.reserve(n: mergers.size()); |
407 | auto &writer = storage.CreateOptimisticWriter(context); |
408 | for (auto &merger : mergers) { |
409 | final_collections.push_back(x: merger->Flush(writer)); |
410 | } |
411 | storage.FinalizeOptimisticWriter(context, writer); |
412 | |
413 | // finally, merge the row groups into the local storage |
414 | for (auto &collection : final_collections) { |
415 | storage.LocalMerge(context, collection&: *collection); |
416 | } |
417 | return SinkFinalizeType::READY; |
418 | } |
419 | |
420 | //===--------------------------------------------------------------------===// |
421 | // Source |
422 | //===--------------------------------------------------------------------===// |
423 | |
424 | SourceResultType PhysicalBatchInsert::GetData(ExecutionContext &context, DataChunk &chunk, |
425 | OperatorSourceInput &input) const { |
426 | auto &insert_gstate = sink_state->Cast<BatchInsertGlobalState>(); |
427 | |
428 | chunk.SetCardinality(1); |
429 | chunk.SetValue(col_idx: 0, index: 0, val: Value::BIGINT(value: insert_gstate.insert_count)); |
430 | |
431 | return SourceResultType::FINISHED; |
432 | } |
433 | |
434 | } // namespace duckdb |
435 | |