| 1 | #include <sys/stat.h> |
| 2 | #include <sys/types.h> |
| 3 | #include <errno.h> |
| 4 | |
| 5 | #include <map> |
| 6 | |
| 7 | #include <Poco/Path.h> |
| 8 | #include <Poco/Util/XMLConfiguration.h> |
| 9 | |
| 10 | #include <Common/escapeForFileName.h> |
| 11 | |
| 12 | #include <Common/Exception.h> |
| 13 | |
| 14 | #include <IO/ReadBufferFromFile.h> |
| 15 | #include <IO/WriteBufferFromFile.h> |
| 16 | #include <Compression/CompressedReadBuffer.h> |
| 17 | #include <Compression/CompressedWriteBuffer.h> |
| 18 | #include <IO/ReadHelpers.h> |
| 19 | #include <IO/WriteHelpers.h> |
| 20 | |
| 21 | #include <DataTypes/NestedUtils.h> |
| 22 | |
| 23 | #include <DataStreams/IBlockInputStream.h> |
| 24 | #include <DataStreams/IBlockOutputStream.h> |
| 25 | |
| 26 | #include <Columns/ColumnArray.h> |
| 27 | |
| 28 | #include <Common/typeid_cast.h> |
| 29 | #include <Compression/CompressionFactory.h> |
| 30 | |
| 31 | #include <Interpreters/Context.h> |
| 32 | |
| 33 | #include <Storages/StorageTinyLog.h> |
| 34 | #include <Storages/StorageFactory.h> |
| 35 | #include <Storages/CheckResults.h> |
| 36 | |
| 37 | #include <Poco/DirectoryIterator.h> |
| 38 | |
| 39 | #define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin" |
| 40 | |
| 41 | |
| 42 | namespace DB |
| 43 | { |
| 44 | |
| 45 | namespace ErrorCodes |
| 46 | { |
| 47 | extern const int EMPTY_LIST_OF_COLUMNS_PASSED; |
| 48 | extern const int CANNOT_CREATE_DIRECTORY; |
| 49 | extern const int CANNOT_READ_ALL_DATA; |
| 50 | extern const int DUPLICATE_COLUMN; |
| 51 | extern const int LOGICAL_ERROR; |
| 52 | extern const int INCORRECT_FILE_NAME; |
| 53 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
| 54 | } |
| 55 | |
| 56 | |
| 57 | class TinyLogBlockInputStream final : public IBlockInputStream |
| 58 | { |
| 59 | public: |
| 60 | TinyLogBlockInputStream(size_t block_size_, const NamesAndTypesList & columns_, StorageTinyLog & storage_, size_t max_read_buffer_size_) |
| 61 | : block_size(block_size_), columns(columns_), |
| 62 | storage(storage_), lock(storage_.rwlock), |
| 63 | max_read_buffer_size(max_read_buffer_size_) {} |
| 64 | |
| 65 | String getName() const override { return "TinyLog" ; } |
| 66 | |
| 67 | Block () const override |
| 68 | { |
| 69 | Block res; |
| 70 | |
| 71 | for (const auto & name_type : columns) |
| 72 | res.insert({ name_type.type->createColumn(), name_type.type, name_type.name }); |
| 73 | |
| 74 | return Nested::flatten(res); |
| 75 | } |
| 76 | |
| 77 | protected: |
| 78 | Block readImpl() override; |
| 79 | private: |
| 80 | size_t block_size; |
| 81 | NamesAndTypesList columns; |
| 82 | StorageTinyLog & storage; |
| 83 | std::shared_lock<std::shared_mutex> lock; |
| 84 | bool finished = false; |
| 85 | size_t max_read_buffer_size; |
| 86 | |
| 87 | struct Stream |
| 88 | { |
| 89 | Stream(const std::string & data_path, size_t max_read_buffer_size_) |
| 90 | : plain(data_path, std::min(static_cast<Poco::File::FileSize>(max_read_buffer_size_), Poco::File(data_path).getSize())), |
| 91 | compressed(plain) |
| 92 | { |
| 93 | } |
| 94 | |
| 95 | ReadBufferFromFile plain; |
| 96 | CompressedReadBuffer compressed; |
| 97 | }; |
| 98 | |
| 99 | using FileStreams = std::map<std::string, std::unique_ptr<Stream>>; |
| 100 | FileStreams streams; |
| 101 | |
| 102 | using DeserializeState = IDataType::DeserializeBinaryBulkStatePtr; |
| 103 | using DeserializeStates = std::map<String, DeserializeState>; |
| 104 | DeserializeStates deserialize_states; |
| 105 | |
| 106 | void readData(const String & name, const IDataType & type, IColumn & column, UInt64 limit); |
| 107 | }; |
| 108 | |
| 109 | |
| 110 | class TinyLogBlockOutputStream final : public IBlockOutputStream |
| 111 | { |
| 112 | public: |
| 113 | explicit TinyLogBlockOutputStream(StorageTinyLog & storage_) |
| 114 | : storage(storage_), lock(storage_.rwlock) |
| 115 | { |
| 116 | } |
| 117 | |
| 118 | ~TinyLogBlockOutputStream() override |
| 119 | { |
| 120 | try |
| 121 | { |
| 122 | writeSuffix(); |
| 123 | } |
| 124 | catch (...) |
| 125 | { |
| 126 | tryLogCurrentException(__PRETTY_FUNCTION__); |
| 127 | } |
| 128 | } |
| 129 | |
| 130 | Block () const override { return storage.getSampleBlock(); } |
| 131 | |
| 132 | void write(const Block & block) override; |
| 133 | void writeSuffix() override; |
| 134 | |
| 135 | private: |
| 136 | StorageTinyLog & storage; |
| 137 | std::unique_lock<std::shared_mutex> lock; |
| 138 | bool done = false; |
| 139 | |
| 140 | struct Stream |
| 141 | { |
| 142 | Stream(const std::string & data_path, CompressionCodecPtr codec, size_t max_compress_block_size) : |
| 143 | plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY), |
| 144 | compressed(plain, std::move(codec), max_compress_block_size) |
| 145 | { |
| 146 | } |
| 147 | |
| 148 | WriteBufferFromFile plain; |
| 149 | CompressedWriteBuffer compressed; |
| 150 | |
| 151 | void finalize() |
| 152 | { |
| 153 | compressed.next(); |
| 154 | plain.next(); |
| 155 | } |
| 156 | }; |
| 157 | |
| 158 | using FileStreams = std::map<std::string, std::unique_ptr<Stream>>; |
| 159 | FileStreams streams; |
| 160 | |
| 161 | using SerializeState = IDataType::SerializeBinaryBulkStatePtr; |
| 162 | using SerializeStates = std::map<String, SerializeState>; |
| 163 | SerializeStates serialize_states; |
| 164 | |
| 165 | using WrittenStreams = std::set<std::string>; |
| 166 | |
| 167 | IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenStreams & written_streams); |
| 168 | void writeData(const String & name, const IDataType & type, const IColumn & column, WrittenStreams & written_streams); |
| 169 | }; |
| 170 | |
| 171 | |
| 172 | Block TinyLogBlockInputStream::readImpl() |
| 173 | { |
| 174 | Block res; |
| 175 | |
| 176 | if (finished || (!streams.empty() && streams.begin()->second->compressed.eof())) |
| 177 | { |
| 178 | /** Close the files (before destroying the object). |
| 179 | * When many sources are created, but simultaneously reading only a few of them, |
| 180 | * buffers don't waste memory. |
| 181 | */ |
| 182 | finished = true; |
| 183 | streams.clear(); |
| 184 | return res; |
| 185 | } |
| 186 | |
| 187 | { |
| 188 | /// if there are no files in the folder, it means that the table is empty |
| 189 | if (Poco::DirectoryIterator(storage.fullPath()) == Poco::DirectoryIterator()) |
| 190 | return res; |
| 191 | } |
| 192 | |
| 193 | for (const auto & name_type : columns) |
| 194 | { |
| 195 | MutableColumnPtr column = name_type.type->createColumn(); |
| 196 | |
| 197 | try |
| 198 | { |
| 199 | readData(name_type.name, *name_type.type, *column, block_size); |
| 200 | } |
| 201 | catch (Exception & e) |
| 202 | { |
| 203 | e.addMessage("while reading column " + name_type.name + " at " + storage.fullPath()); |
| 204 | throw; |
| 205 | } |
| 206 | |
| 207 | if (column->size()) |
| 208 | res.insert(ColumnWithTypeAndName(std::move(column), name_type.type, name_type.name)); |
| 209 | } |
| 210 | |
| 211 | if (!res || streams.begin()->second->compressed.eof()) |
| 212 | { |
| 213 | finished = true; |
| 214 | streams.clear(); |
| 215 | } |
| 216 | |
| 217 | return Nested::flatten(res); |
| 218 | } |
| 219 | |
| 220 | |
| 221 | void TinyLogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, UInt64 limit) |
| 222 | { |
| 223 | IDataType::DeserializeBinaryBulkSettings settings; /// TODO Use avg_value_size_hint. |
| 224 | settings.getter = [&] (const IDataType::SubstreamPath & path) -> ReadBuffer * |
| 225 | { |
| 226 | String stream_name = IDataType::getFileNameForStream(name, path); |
| 227 | |
| 228 | if (!streams.count(stream_name)) |
| 229 | streams[stream_name] = std::make_unique<Stream>(storage.files[stream_name].data_file.path(), max_read_buffer_size); |
| 230 | |
| 231 | return &streams[stream_name]->compressed; |
| 232 | }; |
| 233 | |
| 234 | if (deserialize_states.count(name) == 0) |
| 235 | type.deserializeBinaryBulkStatePrefix(settings, deserialize_states[name]); |
| 236 | |
| 237 | type.deserializeBinaryBulkWithMultipleStreams(column, limit, settings, deserialize_states[name]); |
| 238 | } |
| 239 | |
| 240 | |
| 241 | IDataType::OutputStreamGetter TinyLogBlockOutputStream::createStreamGetter(const String & name, |
| 242 | WrittenStreams & written_streams) |
| 243 | { |
| 244 | return [&] (const IDataType::SubstreamPath & path) -> WriteBuffer * |
| 245 | { |
| 246 | String stream_name = IDataType::getFileNameForStream(name, path); |
| 247 | |
| 248 | if (!written_streams.insert(stream_name).second) |
| 249 | return nullptr; |
| 250 | |
| 251 | const auto & columns = storage.getColumns(); |
| 252 | if (!streams.count(stream_name)) |
| 253 | streams[stream_name] = std::make_unique<Stream>(storage.files[stream_name].data_file.path(), |
| 254 | columns.getCodecOrDefault(name), |
| 255 | storage.max_compress_block_size); |
| 256 | |
| 257 | return &streams[stream_name]->compressed; |
| 258 | }; |
| 259 | } |
| 260 | |
| 261 | |
| 262 | void TinyLogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column, WrittenStreams & written_streams) |
| 263 | { |
| 264 | IDataType::SerializeBinaryBulkSettings settings; |
| 265 | settings.getter = createStreamGetter(name, written_streams); |
| 266 | |
| 267 | if (serialize_states.count(name) == 0) |
| 268 | type.serializeBinaryBulkStatePrefix(settings, serialize_states[name]); |
| 269 | |
| 270 | type.serializeBinaryBulkWithMultipleStreams(column, 0, 0, settings, serialize_states[name]); |
| 271 | } |
| 272 | |
| 273 | |
| 274 | void TinyLogBlockOutputStream::writeSuffix() |
| 275 | { |
| 276 | if (done) |
| 277 | return; |
| 278 | done = true; |
| 279 | |
| 280 | /// If nothing was written - leave the table in initial state. |
| 281 | if (streams.empty()) |
| 282 | return; |
| 283 | |
| 284 | WrittenStreams written_streams; |
| 285 | IDataType::SerializeBinaryBulkSettings settings; |
| 286 | for (const auto & column : getHeader()) |
| 287 | { |
| 288 | auto it = serialize_states.find(column.name); |
| 289 | if (it != serialize_states.end()) |
| 290 | { |
| 291 | settings.getter = createStreamGetter(column.name, written_streams); |
| 292 | column.type->serializeBinaryBulkStateSuffix(settings, it->second); |
| 293 | } |
| 294 | } |
| 295 | |
| 296 | /// Finish write. |
| 297 | for (auto & stream : streams) |
| 298 | stream.second->finalize(); |
| 299 | |
| 300 | std::vector<Poco::File> column_files; |
| 301 | for (auto & pair : streams) |
| 302 | column_files.push_back(storage.files[pair.first].data_file); |
| 303 | |
| 304 | storage.file_checker.update(column_files.begin(), column_files.end()); |
| 305 | |
| 306 | streams.clear(); |
| 307 | } |
| 308 | |
| 309 | |
| 310 | void TinyLogBlockOutputStream::write(const Block & block) |
| 311 | { |
| 312 | storage.check(block, true); |
| 313 | |
| 314 | /// The set of written offset columns so that you do not write shared columns for nested structures multiple times |
| 315 | WrittenStreams written_streams; |
| 316 | |
| 317 | for (size_t i = 0; i < block.columns(); ++i) |
| 318 | { |
| 319 | const ColumnWithTypeAndName & column = block.safeGetByPosition(i); |
| 320 | writeData(column.name, *column.type, *column.column, written_streams); |
| 321 | } |
| 322 | } |
| 323 | |
| 324 | |
| 325 | StorageTinyLog::StorageTinyLog( |
| 326 | const std::string & relative_path_, |
| 327 | const std::string & database_name_, |
| 328 | const std::string & table_name_, |
| 329 | const ColumnsDescription & columns_, |
| 330 | const ConstraintsDescription & constraints_, |
| 331 | bool attach, |
| 332 | size_t max_compress_block_size_, |
| 333 | const Context & context_) |
| 334 | : base_path(context_.getPath()), path(base_path + relative_path_), table_name(table_name_), database_name(database_name_), |
| 335 | max_compress_block_size(max_compress_block_size_), |
| 336 | file_checker(path + "sizes.json" ), |
| 337 | log(&Logger::get("StorageTinyLog" )) |
| 338 | { |
| 339 | setColumns(columns_); |
| 340 | setConstraints(constraints_); |
| 341 | |
| 342 | if (relative_path_.empty()) |
| 343 | throw Exception("Storage " + getName() + " requires data path" , ErrorCodes::INCORRECT_FILE_NAME); |
| 344 | |
| 345 | if (!attach) |
| 346 | Poco::File(path).createDirectories(); |
| 347 | |
| 348 | for (const auto & col : getColumns().getAllPhysical()) |
| 349 | addFiles(col.name, *col.type); |
| 350 | } |
| 351 | |
| 352 | |
| 353 | void StorageTinyLog::addFiles(const String & column_name, const IDataType & type) |
| 354 | { |
| 355 | if (files.end() != files.find(column_name)) |
| 356 | throw Exception("Duplicate column with name " + column_name + " in constructor of StorageTinyLog." , |
| 357 | ErrorCodes::DUPLICATE_COLUMN); |
| 358 | |
| 359 | IDataType::StreamCallback stream_callback = [&] (const IDataType::SubstreamPath & substream_path) |
| 360 | { |
| 361 | String stream_name = IDataType::getFileNameForStream(column_name, substream_path); |
| 362 | if (!files.count(stream_name)) |
| 363 | { |
| 364 | ColumnData column_data; |
| 365 | files.insert(std::make_pair(stream_name, column_data)); |
| 366 | files[stream_name].data_file = Poco::File( |
| 367 | path + stream_name + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION); |
| 368 | } |
| 369 | }; |
| 370 | |
| 371 | IDataType::SubstreamPath substream_path; |
| 372 | type.enumerateStreams(stream_callback, substream_path); |
| 373 | } |
| 374 | |
| 375 | |
| 376 | void StorageTinyLog::rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) |
| 377 | { |
| 378 | std::unique_lock<std::shared_mutex> lock(rwlock); |
| 379 | |
| 380 | /// Rename directory with data. |
| 381 | String new_path = base_path + new_path_to_table_data; |
| 382 | Poco::File(path).renameTo(new_path); |
| 383 | |
| 384 | path = new_path; |
| 385 | table_name = new_table_name; |
| 386 | database_name = new_database_name; |
| 387 | file_checker.setPath(path + "sizes.json" ); |
| 388 | |
| 389 | for (Files_t::iterator it = files.begin(); it != files.end(); ++it) |
| 390 | it->second.data_file = Poco::File(path + Poco::Path(it->second.data_file.path()).getFileName()); |
| 391 | } |
| 392 | |
| 393 | |
| 394 | BlockInputStreams StorageTinyLog::read( |
| 395 | const Names & column_names, |
| 396 | const SelectQueryInfo & /*query_info*/, |
| 397 | const Context & context, |
| 398 | QueryProcessingStage::Enum /*processed_stage*/, |
| 399 | const size_t max_block_size, |
| 400 | const unsigned /*num_streams*/) |
| 401 | { |
| 402 | check(column_names); |
| 403 | // When reading, we lock the entire storage, because we only have one file |
| 404 | // per column and can't modify it concurrently. |
| 405 | return BlockInputStreams(1, std::make_shared<TinyLogBlockInputStream>( |
| 406 | max_block_size, Nested::collect(getColumns().getAllPhysical().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size)); |
| 407 | } |
| 408 | |
| 409 | |
| 410 | BlockOutputStreamPtr StorageTinyLog::write( |
| 411 | const ASTPtr & /*query*/, const Context & /*context*/) |
| 412 | { |
| 413 | return std::make_shared<TinyLogBlockOutputStream>(*this); |
| 414 | } |
| 415 | |
| 416 | |
| 417 | CheckResults StorageTinyLog::checkData(const ASTPtr & /* query */, const Context & /* context */) |
| 418 | { |
| 419 | std::shared_lock<std::shared_mutex> lock(rwlock); |
| 420 | return file_checker.check(); |
| 421 | } |
| 422 | |
| 423 | void StorageTinyLog::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) |
| 424 | { |
| 425 | if (table_name.empty()) |
| 426 | throw Exception("Logical error: table name is empty" , ErrorCodes::LOGICAL_ERROR); |
| 427 | |
| 428 | std::unique_lock<std::shared_mutex> lock(rwlock); |
| 429 | |
| 430 | auto file = Poco::File(path); |
| 431 | file.remove(true); |
| 432 | file.createDirectories(); |
| 433 | |
| 434 | files.clear(); |
| 435 | file_checker = FileChecker{path + "sizes.json" }; |
| 436 | |
| 437 | for (const auto &column : getColumns().getAllPhysical()) |
| 438 | addFiles(column.name, *column.type); |
| 439 | } |
| 440 | |
| 441 | |
| 442 | void registerStorageTinyLog(StorageFactory & factory) |
| 443 | { |
| 444 | factory.registerStorage("TinyLog" , [](const StorageFactory::Arguments & args) |
| 445 | { |
| 446 | if (!args.engine_args.empty()) |
| 447 | throw Exception( |
| 448 | "Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)" , |
| 449 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
| 450 | |
| 451 | return StorageTinyLog::create( |
| 452 | args.relative_data_path, args.database_name, args.table_name, args.columns, args.constraints, |
| 453 | args.attach, args.context.getSettings().max_compress_block_size, args.context); |
| 454 | }); |
| 455 | } |
| 456 | |
| 457 | } |
| 458 | |