| 1 | #include "duckdb/storage/checkpoint/table_data_writer.hpp" |
| 2 | |
| 3 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
| 4 | #include "duckdb/common/types/null_value.hpp" |
| 5 | |
| 6 | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" |
| 7 | #include "duckdb/common/serializer/buffered_serializer.hpp" |
| 8 | |
| 9 | #include "duckdb/storage/numeric_segment.hpp" |
| 10 | #include "duckdb/storage/string_segment.hpp" |
| 11 | #include "duckdb/storage/table/column_segment.hpp" |
| 12 | |
| 13 | using namespace duckdb; |
| 14 | using namespace std; |
| 15 | |
| 16 | class WriteOverflowStringsToDisk : public OverflowStringWriter { |
| 17 | public: |
| 18 | WriteOverflowStringsToDisk(CheckpointManager &manager); |
| 19 | ~WriteOverflowStringsToDisk(); |
| 20 | |
| 21 | //! The checkpoint manager |
| 22 | CheckpointManager &manager; |
| 23 | //! Block handle use for writing to |
| 24 | unique_ptr<BufferHandle> handle; |
| 25 | //! The current block we are writing to |
| 26 | block_id_t block_id; |
| 27 | //! The offset within the current block |
| 28 | idx_t offset; |
| 29 | |
| 30 | static constexpr idx_t STRING_SPACE = Storage::BLOCK_SIZE - sizeof(block_id_t); |
| 31 | |
| 32 | public: |
| 33 | void WriteString(string_t string, block_id_t &result_block, int32_t &result_offset) override; |
| 34 | |
| 35 | private: |
| 36 | void AllocateNewBlock(block_id_t new_block_id); |
| 37 | }; |
| 38 | |
| 39 | TableDataWriter::TableDataWriter(CheckpointManager &manager, TableCatalogEntry &table) |
| 40 | : manager(manager), table(table) { |
| 41 | } |
| 42 | |
| 43 | TableDataWriter::~TableDataWriter() { |
| 44 | } |
| 45 | |
| 46 | void TableDataWriter::WriteTableData(Transaction &transaction) { |
| 47 | // allocate segments to write the table to |
| 48 | segments.resize(table.columns.size()); |
| 49 | data_pointers.resize(table.columns.size()); |
| 50 | for (idx_t i = 0; i < table.columns.size(); i++) { |
| 51 | auto type_id = GetInternalType(table.columns[i].type); |
| 52 | stats.push_back(make_unique<SegmentStatistics>(type_id, GetTypeIdSize(type_id))); |
| 53 | CreateSegment(i); |
| 54 | } |
| 55 | |
| 56 | // now start scanning the table and append the data to the uncompressed segments |
| 57 | vector<column_t> column_ids; |
| 58 | for (auto &column : table.columns) { |
| 59 | column_ids.push_back(column.oid); |
| 60 | } |
| 61 | // initialize scan structures to prepare for the scan |
| 62 | TableScanState state; |
| 63 | table.storage->InitializeScan(transaction, state, column_ids); |
| 64 | //! get all types of the table and initialize the chunk |
| 65 | auto types = table.GetTypes(); |
| 66 | DataChunk chunk; |
| 67 | chunk.Initialize(types); |
| 68 | |
| 69 | while (true) { |
| 70 | chunk.Reset(); |
| 71 | // now scan the table to construct the blocks |
| 72 | unordered_map<idx_t, vector<TableFilter>> mock; |
| 73 | table.storage->Scan(transaction, chunk, state, mock); |
| 74 | if (chunk.size() == 0) { |
| 75 | break; |
| 76 | } |
| 77 | // for each column, we append whatever we can fit into the block |
| 78 | idx_t chunk_size = chunk.size(); |
| 79 | for (idx_t i = 0; i < table.columns.size(); i++) { |
| 80 | assert(chunk.data[i].type == GetInternalType(table.columns[i].type)); |
| 81 | AppendData(transaction, i, chunk.data[i], chunk_size); |
| 82 | } |
| 83 | } |
| 84 | // flush any remaining data and write the data pointers to disk |
| 85 | for (idx_t i = 0; i < table.columns.size(); i++) { |
| 86 | FlushSegment(transaction, i); |
| 87 | } |
| 88 | VerifyDataPointers(); |
| 89 | WriteDataPointers(); |
| 90 | } |
| 91 | |
| 92 | void TableDataWriter::CreateSegment(idx_t col_idx) { |
| 93 | auto type_id = GetInternalType(table.columns[col_idx].type); |
| 94 | if (type_id == TypeId::VARCHAR) { |
| 95 | auto string_segment = make_unique<StringSegment>(manager.buffer_manager, 0); |
| 96 | string_segment->overflow_writer = make_unique<WriteOverflowStringsToDisk>(manager); |
| 97 | segments[col_idx] = move(string_segment); |
| 98 | } else { |
| 99 | segments[col_idx] = make_unique<NumericSegment>(manager.buffer_manager, type_id, 0); |
| 100 | } |
| 101 | } |
| 102 | |
| 103 | void TableDataWriter::AppendData(Transaction &transaction, idx_t col_idx, Vector &data, idx_t count) { |
| 104 | idx_t offset = 0; |
| 105 | while (count > 0) { |
| 106 | idx_t appended = segments[col_idx]->Append(*stats[col_idx], data, offset, count); |
| 107 | if (appended == count) { |
| 108 | // appended everything: finished |
| 109 | return; |
| 110 | } |
| 111 | // the segment is full: flush it to disk |
| 112 | FlushSegment(transaction, col_idx); |
| 113 | |
| 114 | // now create a new segment and continue appending |
| 115 | CreateSegment(col_idx); |
| 116 | offset += appended; |
| 117 | count -= appended; |
| 118 | } |
| 119 | } |
| 120 | |
| 121 | void TableDataWriter::FlushSegment(Transaction &transaction, idx_t col_idx) { |
| 122 | auto tuple_count = segments[col_idx]->tuple_count; |
| 123 | if (tuple_count == 0) { |
| 124 | return; |
| 125 | } |
| 126 | |
| 127 | // get the buffer of the segment and pin it |
| 128 | auto handle = manager.buffer_manager.Pin(segments[col_idx]->block_id); |
| 129 | |
| 130 | // get a free block id to write to |
| 131 | auto block_id = manager.block_manager.GetFreeBlockId(); |
| 132 | |
| 133 | // construct the data pointer, FIXME: add statistics as well |
| 134 | DataPointer data_pointer; |
| 135 | data_pointer.block_id = block_id; |
| 136 | data_pointer.offset = 0; |
| 137 | data_pointer.row_start = 0; |
| 138 | if (data_pointers[col_idx].size() > 0) { |
| 139 | auto &last_pointer = data_pointers[col_idx].back(); |
| 140 | data_pointer.row_start = last_pointer.row_start + last_pointer.tuple_count; |
| 141 | } |
| 142 | data_pointer.tuple_count = tuple_count; |
| 143 | idx_t type_size = stats[col_idx]->type == TypeId::VARCHAR ? 8 : stats[col_idx]->type_size; |
| 144 | memcpy(&data_pointer.min_stats, stats[col_idx]->minimum.get(), type_size); |
| 145 | memcpy(&data_pointer.max_stats, stats[col_idx]->maximum.get(), type_size); |
| 146 | data_pointers[col_idx].push_back(move(data_pointer)); |
| 147 | // write the block to disk |
| 148 | manager.block_manager.Write(*handle->node, block_id); |
| 149 | |
| 150 | handle.reset(); |
| 151 | segments[col_idx] = nullptr; |
| 152 | } |
| 153 | |
| 154 | void TableDataWriter::VerifyDataPointers() { |
| 155 | // verify the data pointers |
| 156 | idx_t table_count = 0; |
| 157 | for (idx_t i = 0; i < data_pointers.size(); i++) { |
| 158 | auto &data_pointer_list = data_pointers[i]; |
| 159 | idx_t column_count = 0; |
| 160 | // then write the data pointers themselves |
| 161 | for (idx_t k = 0; k < data_pointer_list.size(); k++) { |
| 162 | auto &data_pointer = data_pointer_list[k]; |
| 163 | column_count += data_pointer.tuple_count; |
| 164 | } |
| 165 | if (segments[i]) { |
| 166 | column_count += segments[i]->tuple_count; |
| 167 | } |
| 168 | if (i == 0) { |
| 169 | table_count = column_count; |
| 170 | } else { |
| 171 | if (table_count != column_count) { |
| 172 | throw Exception("Column count mismatch in data write!" ); |
| 173 | } |
| 174 | } |
| 175 | } |
| 176 | } |
| 177 | |
| 178 | void TableDataWriter::WriteDataPointers() { |
| 179 | for (idx_t i = 0; i < data_pointers.size(); i++) { |
| 180 | // get a reference to the data column |
| 181 | auto &data_pointer_list = data_pointers[i]; |
| 182 | manager.tabledata_writer->Write<idx_t>(data_pointer_list.size()); |
| 183 | // then write the data pointers themselves |
| 184 | for (idx_t k = 0; k < data_pointer_list.size(); k++) { |
| 185 | auto &data_pointer = data_pointer_list[k]; |
| 186 | manager.tabledata_writer->Write<double>(data_pointer.min); |
| 187 | manager.tabledata_writer->Write<double>(data_pointer.max); |
| 188 | manager.tabledata_writer->Write<idx_t>(data_pointer.row_start); |
| 189 | manager.tabledata_writer->Write<idx_t>(data_pointer.tuple_count); |
| 190 | manager.tabledata_writer->Write<block_id_t>(data_pointer.block_id); |
| 191 | manager.tabledata_writer->Write<uint32_t>(data_pointer.offset); |
| 192 | manager.tabledata_writer->WriteData(data_pointer.min_stats, 8); |
| 193 | manager.tabledata_writer->WriteData(data_pointer.max_stats, 8); |
| 194 | } |
| 195 | } |
| 196 | } |
| 197 | |
| 198 | WriteOverflowStringsToDisk::WriteOverflowStringsToDisk(CheckpointManager &manager) |
| 199 | : manager(manager), handle(nullptr), block_id(INVALID_BLOCK), offset(0) { |
| 200 | } |
| 201 | |
| 202 | WriteOverflowStringsToDisk::~WriteOverflowStringsToDisk() { |
| 203 | if (offset > 0) { |
| 204 | manager.block_manager.Write(*handle->node, block_id); |
| 205 | } |
| 206 | } |
| 207 | |
| 208 | void WriteOverflowStringsToDisk::WriteString(string_t string, block_id_t &result_block, int32_t &result_offset) { |
| 209 | if (!handle) { |
| 210 | handle = manager.buffer_manager.Allocate(Storage::BLOCK_ALLOC_SIZE); |
| 211 | } |
| 212 | // first write the length of the string |
| 213 | if (block_id == INVALID_BLOCK || offset + sizeof(uint32_t) >= STRING_SPACE) { |
| 214 | AllocateNewBlock(manager.block_manager.GetFreeBlockId()); |
| 215 | } |
| 216 | result_block = block_id; |
| 217 | result_offset = offset; |
| 218 | |
| 219 | // write the length field |
| 220 | auto string_length = string.GetSize(); |
| 221 | *((uint32_t *)(handle->node->buffer + offset)) = string_length; |
| 222 | offset += sizeof(uint32_t); |
| 223 | // now write the remainder of the string |
| 224 | auto strptr = string.GetData(); |
| 225 | uint32_t remaining = string_length + 1; |
| 226 | while (remaining > 0) { |
| 227 | uint32_t to_write = std::min((uint32_t)remaining, (uint32_t)(STRING_SPACE - offset)); |
| 228 | if (to_write > 0) { |
| 229 | memcpy(handle->node->buffer + offset, strptr, to_write); |
| 230 | |
| 231 | remaining -= to_write; |
| 232 | offset += to_write; |
| 233 | strptr += to_write; |
| 234 | } |
| 235 | if (remaining > 0) { |
| 236 | // there is still remaining stuff to write |
| 237 | // first get the new block id and write it to the end of the previous block |
| 238 | auto new_block_id = manager.block_manager.GetFreeBlockId(); |
| 239 | *((block_id_t *)(handle->node->buffer + offset)) = new_block_id; |
| 240 | // now write the current block to disk and allocate a new block |
| 241 | AllocateNewBlock(new_block_id); |
| 242 | } |
| 243 | } |
| 244 | } |
| 245 | |
| 246 | void WriteOverflowStringsToDisk::AllocateNewBlock(block_id_t new_block_id) { |
| 247 | if (block_id != INVALID_BLOCK) { |
| 248 | // there is an old block, write it first |
| 249 | manager.block_manager.Write(*handle->node, block_id); |
| 250 | } |
| 251 | offset = 0; |
| 252 | block_id = new_block_id; |
| 253 | } |
| 254 | |