1#include "duckdb/execution/operator/persistent/physical_batch_insert.hpp"
2
3#include "duckdb/parallel/thread_context.hpp"
4#include "duckdb/storage/data_table.hpp"
5#include "duckdb/storage/table/row_group_collection.hpp"
6#include "duckdb/storage/table_io_manager.hpp"
7#include "duckdb/transaction/local_storage.hpp"
8#include "duckdb/catalog/catalog_entry/duck_table_entry.hpp"
9#include "duckdb/storage/table/append_state.hpp"
10#include "duckdb/storage/table/scan_state.hpp"
11
12namespace duckdb {
13
14PhysicalBatchInsert::PhysicalBatchInsert(vector<LogicalType> types, TableCatalogEntry &table,
15 physical_index_vector_t<idx_t> column_index_map,
16 vector<unique_ptr<Expression>> bound_defaults, idx_t estimated_cardinality)
17 : PhysicalOperator(PhysicalOperatorType::BATCH_INSERT, std::move(types), estimated_cardinality),
18 column_index_map(std::move(column_index_map)), insert_table(&table), insert_types(table.GetTypes()),
19 bound_defaults(std::move(bound_defaults)) {
20}
21
22PhysicalBatchInsert::PhysicalBatchInsert(LogicalOperator &op, SchemaCatalogEntry &schema,
23 unique_ptr<BoundCreateTableInfo> info_p, idx_t estimated_cardinality)
24 : PhysicalOperator(PhysicalOperatorType::BATCH_CREATE_TABLE_AS, op.types, estimated_cardinality),
25 insert_table(nullptr), schema(&schema), info(std::move(info_p)) {
26 PhysicalInsert::GetInsertInfo(info: *info, insert_types, bound_defaults);
27}
28
29//===--------------------------------------------------------------------===//
30// Sink
31//===--------------------------------------------------------------------===//
32
33class CollectionMerger {
34public:
35 explicit CollectionMerger(ClientContext &context) : context(context) {
36 }
37
38 ClientContext &context;
39 vector<unique_ptr<RowGroupCollection>> current_collections;
40
41public:
42 void AddCollection(unique_ptr<RowGroupCollection> collection) {
43 current_collections.push_back(x: std::move(collection));
44 }
45
46 bool Empty() {
47 return current_collections.empty();
48 }
49
50 unique_ptr<RowGroupCollection> Flush(OptimisticDataWriter &writer) {
51 if (Empty()) {
52 return nullptr;
53 }
54 unique_ptr<RowGroupCollection> new_collection = std::move(current_collections[0]);
55 if (current_collections.size() > 1) {
56 // we have gathered multiple collections: create one big collection and merge that
57 auto &types = new_collection->GetTypes();
58 TableAppendState append_state;
59 new_collection->InitializeAppend(state&: append_state);
60
61 DataChunk scan_chunk;
62 scan_chunk.Initialize(context, types);
63
64 vector<column_t> column_ids;
65 for (idx_t i = 0; i < types.size(); i++) {
66 column_ids.push_back(x: i);
67 }
68 for (auto &collection : current_collections) {
69 if (!collection) {
70 continue;
71 }
72 TableScanState scan_state;
73 scan_state.Initialize(column_ids);
74 collection->InitializeScan(state&: scan_state.local_state, column_ids, table_filters: nullptr);
75
76 while (true) {
77 scan_chunk.Reset();
78 scan_state.local_state.ScanCommitted(result&: scan_chunk, type: TableScanType::TABLE_SCAN_COMMITTED_ROWS);
79 if (scan_chunk.size() == 0) {
80 break;
81 }
82 auto new_row_group = new_collection->Append(chunk&: scan_chunk, state&: append_state);
83 if (new_row_group) {
84 writer.WriteNewRowGroup(row_groups&: *new_collection);
85 }
86 }
87 }
88 new_collection->FinalizeAppend(transaction: TransactionData(0, 0), state&: append_state);
89 writer.WriteLastRowGroup(row_groups&: *new_collection);
90 }
91 current_collections.clear();
92 return new_collection;
93 }
94};
95
96enum class RowGroupBatchType : uint8_t { FLUSHED, NOT_FLUSHED };
97struct RowGroupBatchEntry {
98 RowGroupBatchEntry(idx_t batch_idx, unique_ptr<RowGroupCollection> collection_p, RowGroupBatchType type)
99 : batch_idx(batch_idx), total_rows(collection_p->GetTotalRows()), collection(std::move(collection_p)),
100 type(type) {
101 }
102
103 idx_t batch_idx;
104 idx_t total_rows;
105 unique_ptr<RowGroupCollection> collection;
106 RowGroupBatchType type;
107};
108
109class BatchInsertGlobalState : public GlobalSinkState {
110public:
111 static constexpr const idx_t BATCH_FLUSH_THRESHOLD = LocalStorage::MERGE_THRESHOLD * 3;
112
113public:
114 explicit BatchInsertGlobalState(DuckTableEntry &table) : table(table), insert_count(0) {
115 }
116
117 mutex lock;
118 DuckTableEntry &table;
119 idx_t insert_count;
120 vector<RowGroupBatchEntry> collections;
121 idx_t next_start = 0;
122
123 void FindMergeCollections(idx_t min_batch_index, optional_idx &merged_batch_index,
124 vector<unique_ptr<RowGroupCollection>> &result) {
125 bool merge = false;
126 idx_t start_index = next_start;
127 idx_t current_idx;
128 idx_t total_count = 0;
129 for (current_idx = start_index; current_idx < collections.size(); current_idx++) {
130 auto &entry = collections[current_idx];
131 if (entry.batch_idx >= min_batch_index) {
132 // this entry is AFTER the min_batch_index
133 // we might still find new entries!
134 break;
135 }
136 if (entry.type == RowGroupBatchType::FLUSHED) {
137 // already flushed: cannot flush anything here
138 if (total_count > 0) {
139 merge = true;
140 break;
141 }
142 start_index = current_idx + 1;
143 if (start_index > next_start) {
144 // avoid checking this segment again in the future
145 next_start = start_index;
146 }
147 total_count = 0;
148 continue;
149 }
150 // not flushed - add to set of indexes to flush
151 total_count += entry.total_rows;
152 if (total_count >= BATCH_FLUSH_THRESHOLD) {
153 merge = true;
154 break;
155 }
156 }
157 if (merge && total_count > 0) {
158 D_ASSERT(current_idx > start_index);
159 merged_batch_index = collections[start_index].batch_idx;
160 for (idx_t idx = start_index; idx < current_idx; idx++) {
161 auto &entry = collections[idx];
162 if (!entry.collection || entry.type == RowGroupBatchType::FLUSHED) {
163 throw InternalException("Adding a row group collection that should not be flushed");
164 }
165 result.push_back(x: std::move(entry.collection));
166 entry.total_rows = total_count;
167 entry.type = RowGroupBatchType::FLUSHED;
168 }
169 if (start_index + 1 < current_idx) {
170 // erase all entries except the first one
171 collections.erase(first: collections.begin() + start_index + 1, last: collections.begin() + current_idx);
172 }
173 }
174 }
175
176 unique_ptr<RowGroupCollection> MergeCollections(ClientContext &context,
177 vector<unique_ptr<RowGroupCollection>> merge_collections,
178 OptimisticDataWriter &writer) {
179 CollectionMerger merger(context);
180 for (auto &collection : merge_collections) {
181 merger.AddCollection(collection: std::move(collection));
182 }
183 return merger.Flush(writer);
184 }
185
186 void AddCollection(ClientContext &context, idx_t batch_index, idx_t min_batch_index,
187 unique_ptr<RowGroupCollection> current_collection,
188 optional_ptr<OptimisticDataWriter> writer = nullptr,
189 optional_ptr<bool> written_to_disk = nullptr) {
190 if (batch_index < min_batch_index) {
191 throw InternalException(
192 "Batch index of the added collection (%llu) is smaller than the min batch index (%llu)", batch_index,
193 min_batch_index);
194 }
195 auto new_count = current_collection->GetTotalRows();
196 auto batch_type =
197 new_count < RowGroup::ROW_GROUP_SIZE ? RowGroupBatchType::NOT_FLUSHED : RowGroupBatchType::FLUSHED;
198 if (batch_type == RowGroupBatchType::FLUSHED && writer) {
199 writer->WriteLastRowGroup(row_groups&: *current_collection);
200 }
201 optional_idx merged_batch_index;
202 vector<unique_ptr<RowGroupCollection>> merge_collections;
203 {
204 lock_guard<mutex> l(lock);
205 insert_count += new_count;
206
207 // add the collection to the batch index
208 RowGroupBatchEntry new_entry(batch_index, std::move(current_collection), batch_type);
209
210 auto it = std::lower_bound(
211 first: collections.begin(), last: collections.end(), val: new_entry,
212 comp: [&](const RowGroupBatchEntry &a, const RowGroupBatchEntry &b) { return a.batch_idx < b.batch_idx; });
213 if (it != collections.end() && it->batch_idx == new_entry.batch_idx) {
214 throw InternalException(
215 "PhysicalBatchInsert::AddCollection error: batch index %d is present in multiple "
216 "collections. This occurs when "
217 "batch indexes are not uniquely distributed over threads",
218 batch_index);
219 }
220 collections.insert(position: it, x: std::move(new_entry));
221 if (writer) {
222 FindMergeCollections(min_batch_index, merged_batch_index, result&: merge_collections);
223 }
224 }
225 if (!merge_collections.empty()) {
226 // merge together the collections
227 D_ASSERT(writer);
228 auto final_collection = MergeCollections(context, merge_collections: std::move(merge_collections), writer&: *writer);
229 if (written_to_disk) {
230 *written_to_disk = true;
231 }
232 // add the merged-together collection to the set of batch indexes
233 {
234 lock_guard<mutex> l(lock);
235 RowGroupBatchEntry new_entry(merged_batch_index.GetIndex(), std::move(final_collection),
236 RowGroupBatchType::FLUSHED);
237 auto it = std::lower_bound(first: collections.begin(), last: collections.end(), val: new_entry,
238 comp: [&](const RowGroupBatchEntry &a, const RowGroupBatchEntry &b) {
239 return a.batch_idx < b.batch_idx;
240 });
241 if (it->batch_idx != merged_batch_index.GetIndex()) {
242 throw InternalException("Merged batch index was no longer present in collection");
243 }
244 it->collection = std::move(new_entry.collection);
245 }
246 }
247 }
248};
249
250class BatchInsertLocalState : public LocalSinkState {
251public:
252 BatchInsertLocalState(ClientContext &context, const vector<LogicalType> &types,
253 const vector<unique_ptr<Expression>> &bound_defaults)
254 : default_executor(context, bound_defaults), written_to_disk(false) {
255 insert_chunk.Initialize(allocator&: Allocator::Get(context), types);
256 }
257
258 DataChunk insert_chunk;
259 ExpressionExecutor default_executor;
260 idx_t current_index;
261 TableAppendState current_append_state;
262 unique_ptr<RowGroupCollection> current_collection;
263 optional_ptr<OptimisticDataWriter> writer;
264 bool written_to_disk;
265
266 void CreateNewCollection(DuckTableEntry &table, const vector<LogicalType> &insert_types) {
267 auto &table_info = table.GetStorage().info;
268 auto &block_manager = TableIOManager::Get(table&: table.GetStorage()).GetBlockManagerForRowData();
269 current_collection = make_uniq<RowGroupCollection>(args&: table_info, args&: block_manager, args: insert_types, args: MAX_ROW_ID);
270 current_collection->InitializeEmpty();
271 current_collection->InitializeAppend(state&: current_append_state);
272 written_to_disk = false;
273 }
274};
275
276unique_ptr<GlobalSinkState> PhysicalBatchInsert::GetGlobalSinkState(ClientContext &context) const {
277 optional_ptr<TableCatalogEntry> table;
278 if (info) {
279 // CREATE TABLE AS
280 D_ASSERT(!insert_table);
281 auto &catalog = schema->catalog;
282 auto created_table = catalog.CreateTable(transaction: catalog.GetCatalogTransaction(context), schema&: *schema.get_mutable(), info&: *info);
283 table = &created_table->Cast<TableCatalogEntry>();
284 } else {
285 D_ASSERT(insert_table);
286 D_ASSERT(insert_table->IsDuckTable());
287 table = insert_table.get_mutable();
288 }
289 auto result = make_uniq<BatchInsertGlobalState>(args&: table->Cast<DuckTableEntry>());
290 return std::move(result);
291}
292
293unique_ptr<LocalSinkState> PhysicalBatchInsert::GetLocalSinkState(ExecutionContext &context) const {
294 return make_uniq<BatchInsertLocalState>(args&: context.client, args: insert_types, args: bound_defaults);
295}
296
297void PhysicalBatchInsert::NextBatch(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate_p) const {
298 auto &gstate = state.Cast<BatchInsertGlobalState>();
299 auto &lstate = lstate_p.Cast<BatchInsertLocalState>();
300
301 auto &table = gstate.table;
302 auto batch_index = lstate.partition_info.batch_index.GetIndex();
303 if (lstate.current_collection) {
304 if (lstate.current_index == batch_index) {
305 throw InternalException("NextBatch called with the same batch index?");
306 }
307 // batch index has changed: move the old collection to the global state and create a new collection
308 TransactionData tdata(0, 0);
309 lstate.current_collection->FinalizeAppend(transaction: tdata, state&: lstate.current_append_state);
310 gstate.AddCollection(context&: context.client, batch_index: lstate.current_index, min_batch_index: lstate.partition_info.min_batch_index.GetIndex(),
311 current_collection: std::move(lstate.current_collection), writer: lstate.writer, written_to_disk: &lstate.written_to_disk);
312 lstate.CreateNewCollection(table, insert_types);
313 }
314 lstate.current_index = batch_index;
315}
316
317SinkResultType PhysicalBatchInsert::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const {
318 auto &gstate = input.global_state.Cast<BatchInsertGlobalState>();
319 auto &lstate = input.local_state.Cast<BatchInsertLocalState>();
320
321 auto &table = gstate.table;
322 PhysicalInsert::ResolveDefaults(table, chunk, column_index_map, defaults_executor&: lstate.default_executor, result&: lstate.insert_chunk);
323
324 auto batch_index = lstate.partition_info.batch_index.GetIndex();
325 if (!lstate.current_collection) {
326 lock_guard<mutex> l(gstate.lock);
327 // no collection yet: create a new one
328 lstate.CreateNewCollection(table, insert_types);
329 lstate.writer = &table.GetStorage().CreateOptimisticWriter(context&: context.client);
330 } else if (lstate.current_index != batch_index) {
331 throw InternalException("Current batch differs from batch - but NextBatch was not called!?");
332 }
333 lstate.current_index = batch_index;
334
335 table.GetStorage().VerifyAppendConstraints(table, context&: context.client, chunk&: lstate.insert_chunk);
336
337 auto new_row_group = lstate.current_collection->Append(chunk&: lstate.insert_chunk, state&: lstate.current_append_state);
338 if (new_row_group) {
339 // we have already written to disk - flush the next row group as well
340 lstate.writer->WriteNewRowGroup(row_groups&: *lstate.current_collection);
341 lstate.written_to_disk = true;
342 }
343 return SinkResultType::NEED_MORE_INPUT;
344}
345
346void PhysicalBatchInsert::Combine(ExecutionContext &context, GlobalSinkState &gstate_p,
347 LocalSinkState &lstate_p) const {
348 auto &gstate = gstate_p.Cast<BatchInsertGlobalState>();
349 auto &lstate = lstate_p.Cast<BatchInsertLocalState>();
350 auto &client_profiler = QueryProfiler::Get(context&: context.client);
351 context.thread.profiler.Flush(phys_op: *this, expression_executor&: lstate.default_executor, name: "default_executor", id: 1);
352 client_profiler.Flush(profiler&: context.thread.profiler);
353
354 if (!lstate.current_collection) {
355 return;
356 }
357
358 if (lstate.current_collection->GetTotalRows() > 0) {
359 TransactionData tdata(0, 0);
360 lstate.current_collection->FinalizeAppend(transaction: tdata, state&: lstate.current_append_state);
361 gstate.AddCollection(context&: context.client, batch_index: lstate.current_index, min_batch_index: lstate.partition_info.min_batch_index.GetIndex(),
362 current_collection: std::move(lstate.current_collection));
363 }
364 {
365 lock_guard<mutex> l(gstate.lock);
366 gstate.table.GetStorage().FinalizeOptimisticWriter(context&: context.client, writer&: *lstate.writer);
367 }
368}
369
370SinkFinalizeType PhysicalBatchInsert::Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
371 GlobalSinkState &gstate_p) const {
372 auto &gstate = gstate_p.Cast<BatchInsertGlobalState>();
373
374 // in the finalize, do a final pass over all of the collections we created and try to merge smaller collections
375 // together
376 vector<unique_ptr<CollectionMerger>> mergers;
377 unique_ptr<CollectionMerger> current_merger;
378
379 auto &storage = gstate.table.GetStorage();
380 for (auto &entry : gstate.collections) {
381 if (entry.type == RowGroupBatchType::NOT_FLUSHED) {
382 // this collection has not been flushed: add it to the merge set
383 if (!current_merger) {
384 current_merger = make_uniq<CollectionMerger>(args&: context);
385 }
386 current_merger->AddCollection(collection: std::move(entry.collection));
387 } else {
388 // this collection has been flushed: it does not need to be merged
389 // create a separate collection merger only for this entry
390 if (current_merger) {
391 // we have small collections remaining: flush them
392 mergers.push_back(x: std::move(current_merger));
393 current_merger.reset();
394 }
395 auto larger_merger = make_uniq<CollectionMerger>(args&: context);
396 larger_merger->AddCollection(collection: std::move(entry.collection));
397 mergers.push_back(x: std::move(larger_merger));
398 }
399 }
400 if (current_merger) {
401 mergers.push_back(x: std::move(current_merger));
402 }
403
404 // now that we have created all of the mergers, perform the actual merging
405 vector<unique_ptr<RowGroupCollection>> final_collections;
406 final_collections.reserve(n: mergers.size());
407 auto &writer = storage.CreateOptimisticWriter(context);
408 for (auto &merger : mergers) {
409 final_collections.push_back(x: merger->Flush(writer));
410 }
411 storage.FinalizeOptimisticWriter(context, writer);
412
413 // finally, merge the row groups into the local storage
414 for (auto &collection : final_collections) {
415 storage.LocalMerge(context, collection&: *collection);
416 }
417 return SinkFinalizeType::READY;
418}
419
420//===--------------------------------------------------------------------===//
421// Source
422//===--------------------------------------------------------------------===//
423
424SourceResultType PhysicalBatchInsert::GetData(ExecutionContext &context, DataChunk &chunk,
425 OperatorSourceInput &input) const {
426 auto &insert_gstate = sink_state->Cast<BatchInsertGlobalState>();
427
428 chunk.SetCardinality(1);
429 chunk.SetValue(col_idx: 0, index: 0, val: Value::BIGINT(value: insert_gstate.insert_count));
430
431 return SourceResultType::FINISHED;
432}
433
434} // namespace duckdb
435