1 | #include "duckdb/storage/checkpoint/table_data_writer.hpp" |
2 | |
3 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
4 | #include "duckdb/common/types/null_value.hpp" |
5 | |
6 | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" |
7 | #include "duckdb/common/serializer/buffered_serializer.hpp" |
8 | |
9 | #include "duckdb/storage/numeric_segment.hpp" |
10 | #include "duckdb/storage/string_segment.hpp" |
11 | #include "duckdb/storage/table/column_segment.hpp" |
12 | |
13 | using namespace duckdb; |
14 | using namespace std; |
15 | |
16 | class WriteOverflowStringsToDisk : public OverflowStringWriter { |
17 | public: |
18 | WriteOverflowStringsToDisk(CheckpointManager &manager); |
19 | ~WriteOverflowStringsToDisk(); |
20 | |
21 | //! The checkpoint manager |
22 | CheckpointManager &manager; |
23 | //! Block handle use for writing to |
24 | unique_ptr<BufferHandle> handle; |
25 | //! The current block we are writing to |
26 | block_id_t block_id; |
27 | //! The offset within the current block |
28 | idx_t offset; |
29 | |
30 | static constexpr idx_t STRING_SPACE = Storage::BLOCK_SIZE - sizeof(block_id_t); |
31 | |
32 | public: |
33 | void WriteString(string_t string, block_id_t &result_block, int32_t &result_offset) override; |
34 | |
35 | private: |
36 | void AllocateNewBlock(block_id_t new_block_id); |
37 | }; |
38 | |
39 | TableDataWriter::TableDataWriter(CheckpointManager &manager, TableCatalogEntry &table) |
40 | : manager(manager), table(table) { |
41 | } |
42 | |
43 | TableDataWriter::~TableDataWriter() { |
44 | } |
45 | |
46 | void TableDataWriter::WriteTableData(Transaction &transaction) { |
47 | // allocate segments to write the table to |
48 | segments.resize(table.columns.size()); |
49 | data_pointers.resize(table.columns.size()); |
50 | for (idx_t i = 0; i < table.columns.size(); i++) { |
51 | auto type_id = GetInternalType(table.columns[i].type); |
52 | stats.push_back(make_unique<SegmentStatistics>(type_id, GetTypeIdSize(type_id))); |
53 | CreateSegment(i); |
54 | } |
55 | |
56 | // now start scanning the table and append the data to the uncompressed segments |
57 | vector<column_t> column_ids; |
58 | for (auto &column : table.columns) { |
59 | column_ids.push_back(column.oid); |
60 | } |
61 | // initialize scan structures to prepare for the scan |
62 | TableScanState state; |
63 | table.storage->InitializeScan(transaction, state, column_ids); |
64 | //! get all types of the table and initialize the chunk |
65 | auto types = table.GetTypes(); |
66 | DataChunk chunk; |
67 | chunk.Initialize(types); |
68 | |
69 | while (true) { |
70 | chunk.Reset(); |
71 | // now scan the table to construct the blocks |
72 | unordered_map<idx_t, vector<TableFilter>> mock; |
73 | table.storage->Scan(transaction, chunk, state, mock); |
74 | if (chunk.size() == 0) { |
75 | break; |
76 | } |
77 | // for each column, we append whatever we can fit into the block |
78 | idx_t chunk_size = chunk.size(); |
79 | for (idx_t i = 0; i < table.columns.size(); i++) { |
80 | assert(chunk.data[i].type == GetInternalType(table.columns[i].type)); |
81 | AppendData(transaction, i, chunk.data[i], chunk_size); |
82 | } |
83 | } |
84 | // flush any remaining data and write the data pointers to disk |
85 | for (idx_t i = 0; i < table.columns.size(); i++) { |
86 | FlushSegment(transaction, i); |
87 | } |
88 | VerifyDataPointers(); |
89 | WriteDataPointers(); |
90 | } |
91 | |
92 | void TableDataWriter::CreateSegment(idx_t col_idx) { |
93 | auto type_id = GetInternalType(table.columns[col_idx].type); |
94 | if (type_id == TypeId::VARCHAR) { |
95 | auto string_segment = make_unique<StringSegment>(manager.buffer_manager, 0); |
96 | string_segment->overflow_writer = make_unique<WriteOverflowStringsToDisk>(manager); |
97 | segments[col_idx] = move(string_segment); |
98 | } else { |
99 | segments[col_idx] = make_unique<NumericSegment>(manager.buffer_manager, type_id, 0); |
100 | } |
101 | } |
102 | |
103 | void TableDataWriter::AppendData(Transaction &transaction, idx_t col_idx, Vector &data, idx_t count) { |
104 | idx_t offset = 0; |
105 | while (count > 0) { |
106 | idx_t appended = segments[col_idx]->Append(*stats[col_idx], data, offset, count); |
107 | if (appended == count) { |
108 | // appended everything: finished |
109 | return; |
110 | } |
111 | // the segment is full: flush it to disk |
112 | FlushSegment(transaction, col_idx); |
113 | |
114 | // now create a new segment and continue appending |
115 | CreateSegment(col_idx); |
116 | offset += appended; |
117 | count -= appended; |
118 | } |
119 | } |
120 | |
121 | void TableDataWriter::FlushSegment(Transaction &transaction, idx_t col_idx) { |
122 | auto tuple_count = segments[col_idx]->tuple_count; |
123 | if (tuple_count == 0) { |
124 | return; |
125 | } |
126 | |
127 | // get the buffer of the segment and pin it |
128 | auto handle = manager.buffer_manager.Pin(segments[col_idx]->block_id); |
129 | |
130 | // get a free block id to write to |
131 | auto block_id = manager.block_manager.GetFreeBlockId(); |
132 | |
133 | // construct the data pointer, FIXME: add statistics as well |
134 | DataPointer data_pointer; |
135 | data_pointer.block_id = block_id; |
136 | data_pointer.offset = 0; |
137 | data_pointer.row_start = 0; |
138 | if (data_pointers[col_idx].size() > 0) { |
139 | auto &last_pointer = data_pointers[col_idx].back(); |
140 | data_pointer.row_start = last_pointer.row_start + last_pointer.tuple_count; |
141 | } |
142 | data_pointer.tuple_count = tuple_count; |
143 | idx_t type_size = stats[col_idx]->type == TypeId::VARCHAR ? 8 : stats[col_idx]->type_size; |
144 | memcpy(&data_pointer.min_stats, stats[col_idx]->minimum.get(), type_size); |
145 | memcpy(&data_pointer.max_stats, stats[col_idx]->maximum.get(), type_size); |
146 | data_pointers[col_idx].push_back(move(data_pointer)); |
147 | // write the block to disk |
148 | manager.block_manager.Write(*handle->node, block_id); |
149 | |
150 | handle.reset(); |
151 | segments[col_idx] = nullptr; |
152 | } |
153 | |
154 | void TableDataWriter::VerifyDataPointers() { |
155 | // verify the data pointers |
156 | idx_t table_count = 0; |
157 | for (idx_t i = 0; i < data_pointers.size(); i++) { |
158 | auto &data_pointer_list = data_pointers[i]; |
159 | idx_t column_count = 0; |
160 | // then write the data pointers themselves |
161 | for (idx_t k = 0; k < data_pointer_list.size(); k++) { |
162 | auto &data_pointer = data_pointer_list[k]; |
163 | column_count += data_pointer.tuple_count; |
164 | } |
165 | if (segments[i]) { |
166 | column_count += segments[i]->tuple_count; |
167 | } |
168 | if (i == 0) { |
169 | table_count = column_count; |
170 | } else { |
171 | if (table_count != column_count) { |
172 | throw Exception("Column count mismatch in data write!" ); |
173 | } |
174 | } |
175 | } |
176 | } |
177 | |
178 | void TableDataWriter::WriteDataPointers() { |
179 | for (idx_t i = 0; i < data_pointers.size(); i++) { |
180 | // get a reference to the data column |
181 | auto &data_pointer_list = data_pointers[i]; |
182 | manager.tabledata_writer->Write<idx_t>(data_pointer_list.size()); |
183 | // then write the data pointers themselves |
184 | for (idx_t k = 0; k < data_pointer_list.size(); k++) { |
185 | auto &data_pointer = data_pointer_list[k]; |
186 | manager.tabledata_writer->Write<double>(data_pointer.min); |
187 | manager.tabledata_writer->Write<double>(data_pointer.max); |
188 | manager.tabledata_writer->Write<idx_t>(data_pointer.row_start); |
189 | manager.tabledata_writer->Write<idx_t>(data_pointer.tuple_count); |
190 | manager.tabledata_writer->Write<block_id_t>(data_pointer.block_id); |
191 | manager.tabledata_writer->Write<uint32_t>(data_pointer.offset); |
192 | manager.tabledata_writer->WriteData(data_pointer.min_stats, 8); |
193 | manager.tabledata_writer->WriteData(data_pointer.max_stats, 8); |
194 | } |
195 | } |
196 | } |
197 | |
198 | WriteOverflowStringsToDisk::WriteOverflowStringsToDisk(CheckpointManager &manager) |
199 | : manager(manager), handle(nullptr), block_id(INVALID_BLOCK), offset(0) { |
200 | } |
201 | |
202 | WriteOverflowStringsToDisk::~WriteOverflowStringsToDisk() { |
203 | if (offset > 0) { |
204 | manager.block_manager.Write(*handle->node, block_id); |
205 | } |
206 | } |
207 | |
208 | void WriteOverflowStringsToDisk::WriteString(string_t string, block_id_t &result_block, int32_t &result_offset) { |
209 | if (!handle) { |
210 | handle = manager.buffer_manager.Allocate(Storage::BLOCK_ALLOC_SIZE); |
211 | } |
212 | // first write the length of the string |
213 | if (block_id == INVALID_BLOCK || offset + sizeof(uint32_t) >= STRING_SPACE) { |
214 | AllocateNewBlock(manager.block_manager.GetFreeBlockId()); |
215 | } |
216 | result_block = block_id; |
217 | result_offset = offset; |
218 | |
219 | // write the length field |
220 | auto string_length = string.GetSize(); |
221 | *((uint32_t *)(handle->node->buffer + offset)) = string_length; |
222 | offset += sizeof(uint32_t); |
223 | // now write the remainder of the string |
224 | auto strptr = string.GetData(); |
225 | uint32_t remaining = string_length + 1; |
226 | while (remaining > 0) { |
227 | uint32_t to_write = std::min((uint32_t)remaining, (uint32_t)(STRING_SPACE - offset)); |
228 | if (to_write > 0) { |
229 | memcpy(handle->node->buffer + offset, strptr, to_write); |
230 | |
231 | remaining -= to_write; |
232 | offset += to_write; |
233 | strptr += to_write; |
234 | } |
235 | if (remaining > 0) { |
236 | // there is still remaining stuff to write |
237 | // first get the new block id and write it to the end of the previous block |
238 | auto new_block_id = manager.block_manager.GetFreeBlockId(); |
239 | *((block_id_t *)(handle->node->buffer + offset)) = new_block_id; |
240 | // now write the current block to disk and allocate a new block |
241 | AllocateNewBlock(new_block_id); |
242 | } |
243 | } |
244 | } |
245 | |
246 | void WriteOverflowStringsToDisk::AllocateNewBlock(block_id_t new_block_id) { |
247 | if (block_id != INVALID_BLOCK) { |
248 | // there is an old block, write it first |
249 | manager.block_manager.Write(*handle->node, block_id); |
250 | } |
251 | offset = 0; |
252 | block_id = new_block_id; |
253 | } |
254 | |