| 1 | #pragma once |
| 2 | |
| 3 | #include <mutex> |
| 4 | #include <thread> |
| 5 | #include <ext/shared_ptr_helper.h> |
| 6 | #include <Core/NamesAndTypes.h> |
| 7 | #include <Common/ThreadPool.h> |
| 8 | #include <Storages/IStorage.h> |
| 9 | #include <DataStreams/IBlockOutputStream.h> |
| 10 | #include <Poco/Event.h> |
| 11 | |
| 12 | |
| 13 | namespace Poco { class Logger; } |
| 14 | |
| 15 | |
| 16 | namespace DB |
| 17 | { |
| 18 | |
| 19 | class Context; |
| 20 | |
| 21 | |
| 22 | /** During insertion, buffers the data in the RAM until certain thresholds are exceeded. |
| 23 | * When thresholds are exceeded, flushes the data to another table. |
| 24 | * When reading, it reads both from its buffers and from the subordinate table. |
| 25 | * |
| 26 | * The buffer is a set of num_shards blocks. |
| 27 | * When writing, select the block number by the remainder of the `ThreadNumber` division by `num_shards` (or one of the others), |
| 28 | * and add rows to the corresponding block. |
| 29 | * When using a block, it is locked by some mutex. If during write the corresponding block is already occupied |
| 30 | * - try to lock the next block in a round-robin fashion, and so no more than `num_shards` times (then wait for lock). |
| 31 | * Thresholds are checked on insertion, and, periodically, in the background thread (to implement time thresholds). |
| 32 | * Thresholds act independently for each shard. Each shard can be flushed independently of the others. |
| 33 | * If a block is inserted into the table, which itself exceeds the max-thresholds, it is written directly to the subordinate table without buffering. |
| 34 | * Thresholds can be exceeded. For example, if max_rows = 1 000 000, the buffer already had 500 000 rows, |
| 35 | * and a part of 800 000 rows is added, then there will be 1 300 000 rows in the buffer, and then such a block will be written to the subordinate table. |
| 36 | * |
| 37 | * When you destroy a Buffer table, all remaining data is flushed to the subordinate table. |
| 38 | * The data in the buffer is not replicated, not logged to disk, not indexed. With a rough restart of the server, the data is lost. |
| 39 | */ |
| 40 | class StorageBuffer : public ext::shared_ptr_helper<StorageBuffer>, public IStorage |
| 41 | { |
| 42 | friend struct ext::shared_ptr_helper<StorageBuffer>; |
| 43 | friend class BufferBlockInputStream; |
| 44 | friend class BufferBlockOutputStream; |
| 45 | |
| 46 | public: |
| 47 | /// Thresholds. |
| 48 | struct Thresholds |
| 49 | { |
| 50 | time_t time; /// The number of seconds from the insertion of the first row into the block. |
| 51 | size_t rows; /// The number of rows in the block. |
| 52 | size_t bytes; /// The number of (uncompressed) bytes in the block. |
| 53 | }; |
| 54 | |
| 55 | std::string getName() const override { return "Buffer" ; } |
| 56 | std::string getTableName() const override { return table_name; } |
| 57 | std::string getDatabaseName() const override { return database_name; } |
| 58 | |
| 59 | QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override; |
| 60 | |
| 61 | BlockInputStreams read( |
| 62 | const Names & column_names, |
| 63 | const SelectQueryInfo & query_info, |
| 64 | const Context & context, |
| 65 | QueryProcessingStage::Enum processed_stage, |
| 66 | size_t max_block_size, |
| 67 | unsigned num_streams) override; |
| 68 | |
| 69 | BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override; |
| 70 | |
| 71 | void startup() override; |
| 72 | /// Flush all buffers into the subordinate table and stop background thread. |
| 73 | void shutdown() override; |
| 74 | bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) override; |
| 75 | |
| 76 | void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override |
| 77 | { |
| 78 | table_name = new_table_name; |
| 79 | database_name = new_database_name; |
| 80 | } |
| 81 | |
| 82 | bool supportsSampling() const override { return true; } |
| 83 | bool supportsPrewhere() const override |
| 84 | { |
| 85 | if (no_destination) |
| 86 | return false; |
| 87 | auto dest = global_context.tryGetTable(destination_database, destination_table); |
| 88 | if (dest && dest.get() != this) |
| 89 | return dest->supportsPrewhere(); |
| 90 | return false; |
| 91 | } |
| 92 | bool supportsFinal() const override { return true; } |
| 93 | bool supportsIndexForIn() const override { return true; } |
| 94 | |
| 95 | bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const override; |
| 96 | |
| 97 | void checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) override; |
| 98 | |
| 99 | /// The structure of the subordinate table is not checked and does not change. |
| 100 | void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override; |
| 101 | |
| 102 | ~StorageBuffer() override; |
| 103 | |
| 104 | private: |
| 105 | String table_name; |
| 106 | String database_name; |
| 107 | |
| 108 | Context global_context; |
| 109 | |
| 110 | struct Buffer |
| 111 | { |
| 112 | time_t first_write_time = 0; |
| 113 | Block data; |
| 114 | std::mutex mutex; |
| 115 | }; |
| 116 | |
| 117 | /// There are `num_shards` of independent buffers. |
| 118 | const size_t num_shards; |
| 119 | std::vector<Buffer> buffers; |
| 120 | |
| 121 | const Thresholds min_thresholds; |
| 122 | const Thresholds max_thresholds; |
| 123 | |
| 124 | const String destination_database; |
| 125 | const String destination_table; |
| 126 | bool no_destination; /// If set, do not write data from the buffer, but simply empty the buffer. |
| 127 | bool allow_materialized; |
| 128 | |
| 129 | Poco::Logger * log; |
| 130 | |
| 131 | Poco::Event shutdown_event; |
| 132 | /// Resets data by timeout. |
| 133 | ThreadFromGlobalPool flush_thread; |
| 134 | |
| 135 | void flushAllBuffers(bool check_thresholds = true); |
| 136 | /// Reset the buffer. If check_thresholds is set - resets only if thresholds are exceeded. |
| 137 | void flushBuffer(Buffer & buffer, bool check_thresholds, bool locked = false); |
| 138 | bool checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const; |
| 139 | bool checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const; |
| 140 | |
| 141 | /// `table` argument is passed, as it is sometimes evaluated beforehand. It must match the `destination`. |
| 142 | void writeBlockToDestination(const Block & block, StoragePtr table); |
| 143 | |
| 144 | void flushThread(); |
| 145 | |
| 146 | protected: |
| 147 | /** num_shards - the level of internal parallelism (the number of independent buffers) |
| 148 | * The buffer is flushed if all minimum thresholds or at least one of the maximum thresholds are exceeded. |
| 149 | */ |
| 150 | StorageBuffer(const std::string & database_name_, const std::string & table_name_, |
| 151 | const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, |
| 152 | Context & context_, |
| 153 | size_t num_shards_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, |
| 154 | const String & destination_database_, const String & destination_table_, bool allow_materialized_); |
| 155 | }; |
| 156 | |
| 157 | } |
| 158 | |