| 1 | #include <sys/stat.h> |
| 2 | #include <sys/types.h> |
| 3 | #include <errno.h> |
| 4 | |
| 5 | #include <map> |
| 6 | #include <optional> |
| 7 | |
| 8 | #include <Common/escapeForFileName.h> |
| 9 | |
| 10 | #include <Common/Exception.h> |
| 11 | |
| 12 | #include <IO/ReadBufferFromFile.h> |
| 13 | #include <IO/WriteBufferFromFile.h> |
| 14 | #include <Compression/CompressedReadBufferFromFile.h> |
| 15 | #include <Compression/CompressedWriteBuffer.h> |
| 16 | #include <IO/ReadHelpers.h> |
| 17 | #include <IO/WriteHelpers.h> |
| 18 | |
| 19 | #include <DataStreams/IBlockInputStream.h> |
| 20 | #include <DataStreams/IBlockOutputStream.h> |
| 21 | #include <DataStreams/NativeBlockInputStream.h> |
| 22 | #include <DataStreams/NativeBlockOutputStream.h> |
| 23 | #include <DataStreams/NullBlockInputStream.h> |
| 24 | |
| 25 | #include <DataTypes/DataTypeFactory.h> |
| 26 | |
| 27 | #include <Columns/ColumnArray.h> |
| 28 | |
| 29 | #include <Interpreters/Context.h> |
| 30 | |
| 31 | #include <Storages/StorageStripeLog.h> |
| 32 | #include <Storages/StorageFactory.h> |
| 33 | #include <Poco/DirectoryIterator.h> |
| 34 | |
| 35 | |
| 36 | namespace DB |
| 37 | { |
| 38 | |
| 39 | #define INDEX_BUFFER_SIZE 4096 |
| 40 | |
| 41 | namespace ErrorCodes |
| 42 | { |
| 43 | extern const int EMPTY_LIST_OF_COLUMNS_PASSED; |
| 44 | extern const int CANNOT_CREATE_DIRECTORY; |
| 45 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
| 46 | extern const int INCORRECT_FILE_NAME; |
| 47 | extern const int LOGICAL_ERROR; |
| 48 | } |
| 49 | |
| 50 | |
| 51 | class StripeLogBlockInputStream final : public IBlockInputStream |
| 52 | { |
| 53 | public: |
| 54 | StripeLogBlockInputStream(StorageStripeLog & storage_, size_t max_read_buffer_size_, |
| 55 | std::shared_ptr<const IndexForNativeFormat> & index_, |
| 56 | IndexForNativeFormat::Blocks::const_iterator index_begin_, |
| 57 | IndexForNativeFormat::Blocks::const_iterator index_end_) |
| 58 | : storage(storage_), max_read_buffer_size(max_read_buffer_size_), |
| 59 | index(index_), index_begin(index_begin_), index_end(index_end_) |
| 60 | { |
| 61 | if (index_begin != index_end) |
| 62 | { |
| 63 | for (const auto & column : index_begin->columns) |
| 64 | { |
| 65 | auto type = DataTypeFactory::instance().get(column.type); |
| 66 | header.insert(ColumnWithTypeAndName{ type, column.name }); |
| 67 | } |
| 68 | } |
| 69 | } |
| 70 | |
| 71 | String getName() const override { return "StripeLog" ; } |
| 72 | |
| 73 | Block () const override |
| 74 | { |
| 75 | return header; |
| 76 | } |
| 77 | |
| 78 | protected: |
| 79 | Block readImpl() override |
| 80 | { |
| 81 | Block res; |
| 82 | start(); |
| 83 | |
| 84 | if (block_in) |
| 85 | { |
| 86 | res = block_in->read(); |
| 87 | |
| 88 | /// Freeing memory before destroying the object. |
| 89 | if (!res) |
| 90 | { |
| 91 | block_in.reset(); |
| 92 | data_in.reset(); |
| 93 | index.reset(); |
| 94 | } |
| 95 | } |
| 96 | |
| 97 | return res; |
| 98 | } |
| 99 | |
| 100 | private: |
| 101 | StorageStripeLog & storage; |
| 102 | size_t max_read_buffer_size; |
| 103 | |
| 104 | std::shared_ptr<const IndexForNativeFormat> index; |
| 105 | IndexForNativeFormat::Blocks::const_iterator index_begin; |
| 106 | IndexForNativeFormat::Blocks::const_iterator index_end; |
| 107 | Block ; |
| 108 | |
| 109 | /** optional - to create objects only on first reading |
| 110 | * and delete objects (release buffers) after the source is exhausted |
| 111 | * - to save RAM when using a large number of sources. |
| 112 | */ |
| 113 | bool started = false; |
| 114 | std::optional<CompressedReadBufferFromFile> data_in; |
| 115 | std::optional<NativeBlockInputStream> block_in; |
| 116 | |
| 117 | void start() |
| 118 | { |
| 119 | if (!started) |
| 120 | { |
| 121 | started = true; |
| 122 | |
| 123 | data_in.emplace( |
| 124 | storage.fullPath() + "data.bin" , 0, 0, |
| 125 | std::min(static_cast<Poco::File::FileSize>(max_read_buffer_size), Poco::File(storage.fullPath() + "data.bin" ).getSize())); |
| 126 | |
| 127 | block_in.emplace(*data_in, 0, index_begin, index_end); |
| 128 | } |
| 129 | } |
| 130 | }; |
| 131 | |
| 132 | |
| 133 | class StripeLogBlockOutputStream final : public IBlockOutputStream |
| 134 | { |
| 135 | public: |
| 136 | explicit StripeLogBlockOutputStream(StorageStripeLog & storage_) |
| 137 | : storage(storage_), lock(storage.rwlock), |
| 138 | data_out_compressed(storage.fullPath() + "data.bin" , DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT), |
| 139 | data_out(data_out_compressed, CompressionCodecFactory::instance().getDefaultCodec(), storage.max_compress_block_size), |
| 140 | index_out_compressed(storage.fullPath() + "index.mrk" , INDEX_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT), |
| 141 | index_out(index_out_compressed), |
| 142 | block_out(data_out, 0, storage.getSampleBlock(), false, &index_out, Poco::File(storage.fullPath() + "data.bin" ).getSize()) |
| 143 | { |
| 144 | } |
| 145 | |
| 146 | ~StripeLogBlockOutputStream() override |
| 147 | { |
| 148 | try |
| 149 | { |
| 150 | writeSuffix(); |
| 151 | } |
| 152 | catch (...) |
| 153 | { |
| 154 | tryLogCurrentException(__PRETTY_FUNCTION__); |
| 155 | } |
| 156 | } |
| 157 | |
| 158 | Block () const override { return storage.getSampleBlock(); } |
| 159 | |
| 160 | void write(const Block & block) override |
| 161 | { |
| 162 | block_out.write(block); |
| 163 | } |
| 164 | |
| 165 | void writeSuffix() override |
| 166 | { |
| 167 | if (done) |
| 168 | return; |
| 169 | |
| 170 | block_out.writeSuffix(); |
| 171 | data_out.next(); |
| 172 | data_out_compressed.next(); |
| 173 | index_out.next(); |
| 174 | index_out_compressed.next(); |
| 175 | |
| 176 | FileChecker::Files files{ data_out_compressed.getFileName(), index_out_compressed.getFileName() }; |
| 177 | storage.file_checker.update(files.begin(), files.end()); |
| 178 | |
| 179 | done = true; |
| 180 | } |
| 181 | |
| 182 | private: |
| 183 | StorageStripeLog & storage; |
| 184 | std::unique_lock<std::shared_mutex> lock; |
| 185 | |
| 186 | WriteBufferFromFile data_out_compressed; |
| 187 | CompressedWriteBuffer data_out; |
| 188 | WriteBufferFromFile index_out_compressed; |
| 189 | CompressedWriteBuffer index_out; |
| 190 | NativeBlockOutputStream block_out; |
| 191 | |
| 192 | bool done = false; |
| 193 | }; |
| 194 | |
| 195 | |
| 196 | StorageStripeLog::StorageStripeLog( |
| 197 | const std::string & relative_path_, |
| 198 | const std::string & database_name_, |
| 199 | const std::string & table_name_, |
| 200 | const ColumnsDescription & columns_, |
| 201 | const ConstraintsDescription & constraints_, |
| 202 | bool attach, |
| 203 | size_t max_compress_block_size_, |
| 204 | const Context & context_) |
| 205 | : base_path(context_.getPath()), path(base_path + relative_path_), table_name(table_name_), database_name(database_name_), |
| 206 | max_compress_block_size(max_compress_block_size_), |
| 207 | file_checker(path + "sizes.json" ), |
| 208 | log(&Logger::get("StorageStripeLog" )) |
| 209 | { |
| 210 | setColumns(columns_); |
| 211 | setConstraints(constraints_); |
| 212 | |
| 213 | if (relative_path_.empty()) |
| 214 | throw Exception("Storage " + getName() + " requires data path" , ErrorCodes::INCORRECT_FILE_NAME); |
| 215 | |
| 216 | if (!attach) |
| 217 | Poco::File(path).createDirectories(); |
| 218 | } |
| 219 | |
| 220 | |
| 221 | void StorageStripeLog::rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) |
| 222 | { |
| 223 | std::unique_lock<std::shared_mutex> lock(rwlock); |
| 224 | |
| 225 | /// Rename directory with data. |
| 226 | String new_path = base_path + new_path_to_table_data; |
| 227 | Poco::File(path).renameTo(new_path); |
| 228 | |
| 229 | path = new_path; |
| 230 | table_name = new_table_name; |
| 231 | database_name = new_database_name; |
| 232 | file_checker.setPath(path + "sizes.json" ); |
| 233 | } |
| 234 | |
| 235 | |
| 236 | BlockInputStreams StorageStripeLog::read( |
| 237 | const Names & column_names, |
| 238 | const SelectQueryInfo & /*query_info*/, |
| 239 | const Context & context, |
| 240 | QueryProcessingStage::Enum /*processed_stage*/, |
| 241 | const size_t /*max_block_size*/, |
| 242 | unsigned num_streams) |
| 243 | { |
| 244 | std::shared_lock<std::shared_mutex> lock(rwlock); |
| 245 | |
| 246 | check(column_names); |
| 247 | |
| 248 | NameSet column_names_set(column_names.begin(), column_names.end()); |
| 249 | |
| 250 | if (!Poco::File(fullPath() + "index.mrk" ).exists()) |
| 251 | return { std::make_shared<NullBlockInputStream>(getSampleBlockForColumns(column_names)) }; |
| 252 | |
| 253 | CompressedReadBufferFromFile index_in(fullPath() + "index.mrk" , 0, 0, INDEX_BUFFER_SIZE); |
| 254 | std::shared_ptr<const IndexForNativeFormat> index{std::make_shared<IndexForNativeFormat>(index_in, column_names_set)}; |
| 255 | |
| 256 | BlockInputStreams res; |
| 257 | |
| 258 | size_t size = index->blocks.size(); |
| 259 | if (num_streams > size) |
| 260 | num_streams = size; |
| 261 | |
| 262 | for (size_t stream = 0; stream < num_streams; ++stream) |
| 263 | { |
| 264 | IndexForNativeFormat::Blocks::const_iterator begin = index->blocks.begin(); |
| 265 | IndexForNativeFormat::Blocks::const_iterator end = index->blocks.begin(); |
| 266 | |
| 267 | std::advance(begin, stream * size / num_streams); |
| 268 | std::advance(end, (stream + 1) * size / num_streams); |
| 269 | |
| 270 | res.emplace_back(std::make_shared<StripeLogBlockInputStream>( |
| 271 | *this, context.getSettingsRef().max_read_buffer_size, index, begin, end)); |
| 272 | } |
| 273 | |
| 274 | /// We do not keep read lock directly at the time of reading, because we read ranges of data that do not change. |
| 275 | |
| 276 | return res; |
| 277 | } |
| 278 | |
| 279 | |
| 280 | BlockOutputStreamPtr StorageStripeLog::write( |
| 281 | const ASTPtr & /*query*/, const Context & /*context*/) |
| 282 | { |
| 283 | return std::make_shared<StripeLogBlockOutputStream>(*this); |
| 284 | } |
| 285 | |
| 286 | |
| 287 | CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, const Context & /* context */) |
| 288 | { |
| 289 | std::shared_lock<std::shared_mutex> lock(rwlock); |
| 290 | return file_checker.check(); |
| 291 | } |
| 292 | |
| 293 | void StorageStripeLog::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) |
| 294 | { |
| 295 | if (table_name.empty()) |
| 296 | throw Exception("Logical error: table name is empty" , ErrorCodes::LOGICAL_ERROR); |
| 297 | |
| 298 | std::shared_lock<std::shared_mutex> lock(rwlock); |
| 299 | |
| 300 | auto file = Poco::File(path); |
| 301 | file.remove(true); |
| 302 | file.createDirectories(); |
| 303 | |
| 304 | file_checker = FileChecker{path + "sizes.json" }; |
| 305 | } |
| 306 | |
| 307 | |
| 308 | void registerStorageStripeLog(StorageFactory & factory) |
| 309 | { |
| 310 | factory.registerStorage("StripeLog" , [](const StorageFactory::Arguments & args) |
| 311 | { |
| 312 | if (!args.engine_args.empty()) |
| 313 | throw Exception( |
| 314 | "Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)" , |
| 315 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
| 316 | |
| 317 | return StorageStripeLog::create( |
| 318 | args.relative_data_path, args.database_name, args.table_name, args.columns, args.constraints, |
| 319 | args.attach, args.context.getSettings().max_compress_block_size, args.context); |
| 320 | }); |
| 321 | } |
| 322 | |
| 323 | } |
| 324 | |