1#include "duckdb/storage/checkpoint/table_data_writer.hpp"
2
3#include "duckdb/common/vector_operations/vector_operations.hpp"
4#include "duckdb/common/types/null_value.hpp"
5
6#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
7#include "duckdb/common/serializer/buffered_serializer.hpp"
8
9#include "duckdb/storage/numeric_segment.hpp"
10#include "duckdb/storage/string_segment.hpp"
11#include "duckdb/storage/table/column_segment.hpp"
12
13using namespace duckdb;
14using namespace std;
15
16class WriteOverflowStringsToDisk : public OverflowStringWriter {
17public:
18 WriteOverflowStringsToDisk(CheckpointManager &manager);
19 ~WriteOverflowStringsToDisk();
20
21 //! The checkpoint manager
22 CheckpointManager &manager;
23 //! Block handle use for writing to
24 unique_ptr<BufferHandle> handle;
25 //! The current block we are writing to
26 block_id_t block_id;
27 //! The offset within the current block
28 idx_t offset;
29
30 static constexpr idx_t STRING_SPACE = Storage::BLOCK_SIZE - sizeof(block_id_t);
31
32public:
33 void WriteString(string_t string, block_id_t &result_block, int32_t &result_offset) override;
34
35private:
36 void AllocateNewBlock(block_id_t new_block_id);
37};
38
39TableDataWriter::TableDataWriter(CheckpointManager &manager, TableCatalogEntry &table)
40 : manager(manager), table(table) {
41}
42
43TableDataWriter::~TableDataWriter() {
44}
45
46void TableDataWriter::WriteTableData(Transaction &transaction) {
47 // allocate segments to write the table to
48 segments.resize(table.columns.size());
49 data_pointers.resize(table.columns.size());
50 for (idx_t i = 0; i < table.columns.size(); i++) {
51 auto type_id = GetInternalType(table.columns[i].type);
52 stats.push_back(make_unique<SegmentStatistics>(type_id, GetTypeIdSize(type_id)));
53 CreateSegment(i);
54 }
55
56 // now start scanning the table and append the data to the uncompressed segments
57 vector<column_t> column_ids;
58 for (auto &column : table.columns) {
59 column_ids.push_back(column.oid);
60 }
61 // initialize scan structures to prepare for the scan
62 TableScanState state;
63 table.storage->InitializeScan(transaction, state, column_ids);
64 //! get all types of the table and initialize the chunk
65 auto types = table.GetTypes();
66 DataChunk chunk;
67 chunk.Initialize(types);
68
69 while (true) {
70 chunk.Reset();
71 // now scan the table to construct the blocks
72 unordered_map<idx_t, vector<TableFilter>> mock;
73 table.storage->Scan(transaction, chunk, state, mock);
74 if (chunk.size() == 0) {
75 break;
76 }
77 // for each column, we append whatever we can fit into the block
78 idx_t chunk_size = chunk.size();
79 for (idx_t i = 0; i < table.columns.size(); i++) {
80 assert(chunk.data[i].type == GetInternalType(table.columns[i].type));
81 AppendData(transaction, i, chunk.data[i], chunk_size);
82 }
83 }
84 // flush any remaining data and write the data pointers to disk
85 for (idx_t i = 0; i < table.columns.size(); i++) {
86 FlushSegment(transaction, i);
87 }
88 VerifyDataPointers();
89 WriteDataPointers();
90}
91
92void TableDataWriter::CreateSegment(idx_t col_idx) {
93 auto type_id = GetInternalType(table.columns[col_idx].type);
94 if (type_id == TypeId::VARCHAR) {
95 auto string_segment = make_unique<StringSegment>(manager.buffer_manager, 0);
96 string_segment->overflow_writer = make_unique<WriteOverflowStringsToDisk>(manager);
97 segments[col_idx] = move(string_segment);
98 } else {
99 segments[col_idx] = make_unique<NumericSegment>(manager.buffer_manager, type_id, 0);
100 }
101}
102
103void TableDataWriter::AppendData(Transaction &transaction, idx_t col_idx, Vector &data, idx_t count) {
104 idx_t offset = 0;
105 while (count > 0) {
106 idx_t appended = segments[col_idx]->Append(*stats[col_idx], data, offset, count);
107 if (appended == count) {
108 // appended everything: finished
109 return;
110 }
111 // the segment is full: flush it to disk
112 FlushSegment(transaction, col_idx);
113
114 // now create a new segment and continue appending
115 CreateSegment(col_idx);
116 offset += appended;
117 count -= appended;
118 }
119}
120
121void TableDataWriter::FlushSegment(Transaction &transaction, idx_t col_idx) {
122 auto tuple_count = segments[col_idx]->tuple_count;
123 if (tuple_count == 0) {
124 return;
125 }
126
127 // get the buffer of the segment and pin it
128 auto handle = manager.buffer_manager.Pin(segments[col_idx]->block_id);
129
130 // get a free block id to write to
131 auto block_id = manager.block_manager.GetFreeBlockId();
132
133 // construct the data pointer, FIXME: add statistics as well
134 DataPointer data_pointer;
135 data_pointer.block_id = block_id;
136 data_pointer.offset = 0;
137 data_pointer.row_start = 0;
138 if (data_pointers[col_idx].size() > 0) {
139 auto &last_pointer = data_pointers[col_idx].back();
140 data_pointer.row_start = last_pointer.row_start + last_pointer.tuple_count;
141 }
142 data_pointer.tuple_count = tuple_count;
143 idx_t type_size = stats[col_idx]->type == TypeId::VARCHAR ? 8 : stats[col_idx]->type_size;
144 memcpy(&data_pointer.min_stats, stats[col_idx]->minimum.get(), type_size);
145 memcpy(&data_pointer.max_stats, stats[col_idx]->maximum.get(), type_size);
146 data_pointers[col_idx].push_back(move(data_pointer));
147 // write the block to disk
148 manager.block_manager.Write(*handle->node, block_id);
149
150 handle.reset();
151 segments[col_idx] = nullptr;
152}
153
154void TableDataWriter::VerifyDataPointers() {
155 // verify the data pointers
156 idx_t table_count = 0;
157 for (idx_t i = 0; i < data_pointers.size(); i++) {
158 auto &data_pointer_list = data_pointers[i];
159 idx_t column_count = 0;
160 // then write the data pointers themselves
161 for (idx_t k = 0; k < data_pointer_list.size(); k++) {
162 auto &data_pointer = data_pointer_list[k];
163 column_count += data_pointer.tuple_count;
164 }
165 if (segments[i]) {
166 column_count += segments[i]->tuple_count;
167 }
168 if (i == 0) {
169 table_count = column_count;
170 } else {
171 if (table_count != column_count) {
172 throw Exception("Column count mismatch in data write!");
173 }
174 }
175 }
176}
177
178void TableDataWriter::WriteDataPointers() {
179 for (idx_t i = 0; i < data_pointers.size(); i++) {
180 // get a reference to the data column
181 auto &data_pointer_list = data_pointers[i];
182 manager.tabledata_writer->Write<idx_t>(data_pointer_list.size());
183 // then write the data pointers themselves
184 for (idx_t k = 0; k < data_pointer_list.size(); k++) {
185 auto &data_pointer = data_pointer_list[k];
186 manager.tabledata_writer->Write<double>(data_pointer.min);
187 manager.tabledata_writer->Write<double>(data_pointer.max);
188 manager.tabledata_writer->Write<idx_t>(data_pointer.row_start);
189 manager.tabledata_writer->Write<idx_t>(data_pointer.tuple_count);
190 manager.tabledata_writer->Write<block_id_t>(data_pointer.block_id);
191 manager.tabledata_writer->Write<uint32_t>(data_pointer.offset);
192 manager.tabledata_writer->WriteData(data_pointer.min_stats, 8);
193 manager.tabledata_writer->WriteData(data_pointer.max_stats, 8);
194 }
195 }
196}
197
198WriteOverflowStringsToDisk::WriteOverflowStringsToDisk(CheckpointManager &manager)
199 : manager(manager), handle(nullptr), block_id(INVALID_BLOCK), offset(0) {
200}
201
202WriteOverflowStringsToDisk::~WriteOverflowStringsToDisk() {
203 if (offset > 0) {
204 manager.block_manager.Write(*handle->node, block_id);
205 }
206}
207
208void WriteOverflowStringsToDisk::WriteString(string_t string, block_id_t &result_block, int32_t &result_offset) {
209 if (!handle) {
210 handle = manager.buffer_manager.Allocate(Storage::BLOCK_ALLOC_SIZE);
211 }
212 // first write the length of the string
213 if (block_id == INVALID_BLOCK || offset + sizeof(uint32_t) >= STRING_SPACE) {
214 AllocateNewBlock(manager.block_manager.GetFreeBlockId());
215 }
216 result_block = block_id;
217 result_offset = offset;
218
219 // write the length field
220 auto string_length = string.GetSize();
221 *((uint32_t *)(handle->node->buffer + offset)) = string_length;
222 offset += sizeof(uint32_t);
223 // now write the remainder of the string
224 auto strptr = string.GetData();
225 uint32_t remaining = string_length + 1;
226 while (remaining > 0) {
227 uint32_t to_write = std::min((uint32_t)remaining, (uint32_t)(STRING_SPACE - offset));
228 if (to_write > 0) {
229 memcpy(handle->node->buffer + offset, strptr, to_write);
230
231 remaining -= to_write;
232 offset += to_write;
233 strptr += to_write;
234 }
235 if (remaining > 0) {
236 // there is still remaining stuff to write
237 // first get the new block id and write it to the end of the previous block
238 auto new_block_id = manager.block_manager.GetFreeBlockId();
239 *((block_id_t *)(handle->node->buffer + offset)) = new_block_id;
240 // now write the current block to disk and allocate a new block
241 AllocateNewBlock(new_block_id);
242 }
243 }
244}
245
246void WriteOverflowStringsToDisk::AllocateNewBlock(block_id_t new_block_id) {
247 if (block_id != INVALID_BLOCK) {
248 // there is an old block, write it first
249 manager.block_manager.Write(*handle->node, block_id);
250 }
251 offset = 0;
252 block_id = new_block_id;
253}
254