1#include "duckdb/storage/optimistic_data_writer.hpp"
2#include "duckdb/storage/table/column_segment.hpp"
3#include "duckdb/storage/partial_block_manager.hpp"
4#include "duckdb/storage/table/column_checkpoint_state.hpp"
5
6namespace duckdb {
7
8OptimisticDataWriter::OptimisticDataWriter(DataTable &table) : table(table) {
9}
10
11OptimisticDataWriter::OptimisticDataWriter(DataTable &table, OptimisticDataWriter &parent) : table(table) {
12 if (parent.partial_manager) {
13 parent.partial_manager->ClearBlocks();
14 }
15}
16
17OptimisticDataWriter::~OptimisticDataWriter() {
18}
19
20bool OptimisticDataWriter::PrepareWrite() {
21 // check if we should pre-emptively write the table to disk
22 if (table.info->IsTemporary() || StorageManager::Get(db&: table.info->db).InMemory()) {
23 return false;
24 }
25 // we should! write the second-to-last row group to disk
26 // allocate the partial block-manager if none is allocated yet
27 if (!partial_manager) {
28 auto &block_manager = table.info->table_io_manager->GetBlockManagerForRowData();
29 partial_manager = make_uniq<PartialBlockManager>(args&: block_manager, args: CheckpointType::APPEND_TO_TABLE);
30 }
31 return true;
32}
33
34void OptimisticDataWriter::WriteNewRowGroup(RowGroupCollection &row_groups) {
35 // we finished writing a complete row group
36 if (!PrepareWrite()) {
37 return;
38 }
39 // flush second-to-last row group
40 auto row_group = row_groups.GetRowGroup(index: -2);
41 FlushToDisk(row_group);
42}
43
44void OptimisticDataWriter::WriteLastRowGroup(RowGroupCollection &row_groups) {
45 // we finished writing a complete row group
46 if (!PrepareWrite()) {
47 return;
48 }
49 // flush second-to-last row group
50 auto row_group = row_groups.GetRowGroup(index: -1);
51 if (!row_group) {
52 return;
53 }
54 FlushToDisk(row_group);
55}
56
57void OptimisticDataWriter::FlushToDisk(RowGroup *row_group) {
58 if (!row_group) {
59 throw InternalException("FlushToDisk called without a RowGroup");
60 }
61 //! The set of column compression types (if any)
62 vector<CompressionType> compression_types;
63 D_ASSERT(compression_types.empty());
64 for (auto &column : table.column_definitions) {
65 compression_types.push_back(x: column.CompressionType());
66 }
67 row_group->WriteToDisk(manager&: *partial_manager, compression_types);
68}
69
70void OptimisticDataWriter::Merge(OptimisticDataWriter &other) {
71 if (!other.partial_manager) {
72 return;
73 }
74 if (!partial_manager) {
75 partial_manager = std::move(other.partial_manager);
76 return;
77 }
78 partial_manager->Merge(other&: *other.partial_manager);
79 other.partial_manager.reset();
80}
81
82void OptimisticDataWriter::FinalFlush() {
83 if (partial_manager) {
84 partial_manager->FlushPartialBlocks();
85 partial_manager.reset();
86 }
87}
88
89void OptimisticDataWriter::Rollback() {
90 if (partial_manager) {
91 partial_manager->Rollback();
92 partial_manager.reset();
93 }
94}
95
96} // namespace duckdb
97