1 | #pragma once |
2 | |
3 | #include <mutex> |
4 | #include <thread> |
5 | #include <ext/shared_ptr_helper.h> |
6 | #include <Core/NamesAndTypes.h> |
7 | #include <Common/ThreadPool.h> |
8 | #include <Storages/IStorage.h> |
9 | #include <DataStreams/IBlockOutputStream.h> |
10 | #include <Poco/Event.h> |
11 | |
12 | |
13 | namespace Poco { class Logger; } |
14 | |
15 | |
16 | namespace DB |
17 | { |
18 | |
19 | class Context; |
20 | |
21 | |
22 | /** During insertion, buffers the data in the RAM until certain thresholds are exceeded. |
23 | * When thresholds are exceeded, flushes the data to another table. |
24 | * When reading, it reads both from its buffers and from the subordinate table. |
25 | * |
26 | * The buffer is a set of num_shards blocks. |
27 | * When writing, select the block number by the remainder of the `ThreadNumber` division by `num_shards` (or one of the others), |
28 | * and add rows to the corresponding block. |
29 | * When using a block, it is locked by some mutex. If during write the corresponding block is already occupied |
30 | * - try to lock the next block in a round-robin fashion, and so no more than `num_shards` times (then wait for lock). |
31 | * Thresholds are checked on insertion, and, periodically, in the background thread (to implement time thresholds). |
32 | * Thresholds act independently for each shard. Each shard can be flushed independently of the others. |
33 | * If a block is inserted into the table, which itself exceeds the max-thresholds, it is written directly to the subordinate table without buffering. |
34 | * Thresholds can be exceeded. For example, if max_rows = 1 000 000, the buffer already had 500 000 rows, |
35 | * and a part of 800 000 rows is added, then there will be 1 300 000 rows in the buffer, and then such a block will be written to the subordinate table. |
36 | * |
37 | * When you destroy a Buffer table, all remaining data is flushed to the subordinate table. |
38 | * The data in the buffer is not replicated, not logged to disk, not indexed. With a rough restart of the server, the data is lost. |
39 | */ |
40 | class StorageBuffer : public ext::shared_ptr_helper<StorageBuffer>, public IStorage |
41 | { |
42 | friend struct ext::shared_ptr_helper<StorageBuffer>; |
43 | friend class BufferBlockInputStream; |
44 | friend class BufferBlockOutputStream; |
45 | |
46 | public: |
47 | /// Thresholds. |
48 | struct Thresholds |
49 | { |
50 | time_t time; /// The number of seconds from the insertion of the first row into the block. |
51 | size_t rows; /// The number of rows in the block. |
52 | size_t bytes; /// The number of (uncompressed) bytes in the block. |
53 | }; |
54 | |
55 | std::string getName() const override { return "Buffer" ; } |
56 | std::string getTableName() const override { return table_name; } |
57 | std::string getDatabaseName() const override { return database_name; } |
58 | |
59 | QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override; |
60 | |
61 | BlockInputStreams read( |
62 | const Names & column_names, |
63 | const SelectQueryInfo & query_info, |
64 | const Context & context, |
65 | QueryProcessingStage::Enum processed_stage, |
66 | size_t max_block_size, |
67 | unsigned num_streams) override; |
68 | |
69 | BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override; |
70 | |
71 | void startup() override; |
72 | /// Flush all buffers into the subordinate table and stop background thread. |
73 | void shutdown() override; |
74 | bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) override; |
75 | |
76 | void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override |
77 | { |
78 | table_name = new_table_name; |
79 | database_name = new_database_name; |
80 | } |
81 | |
82 | bool supportsSampling() const override { return true; } |
83 | bool supportsPrewhere() const override |
84 | { |
85 | if (no_destination) |
86 | return false; |
87 | auto dest = global_context.tryGetTable(destination_database, destination_table); |
88 | if (dest && dest.get() != this) |
89 | return dest->supportsPrewhere(); |
90 | return false; |
91 | } |
92 | bool supportsFinal() const override { return true; } |
93 | bool supportsIndexForIn() const override { return true; } |
94 | |
95 | bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const override; |
96 | |
97 | void checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) override; |
98 | |
99 | /// The structure of the subordinate table is not checked and does not change. |
100 | void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override; |
101 | |
102 | ~StorageBuffer() override; |
103 | |
104 | private: |
105 | String table_name; |
106 | String database_name; |
107 | |
108 | Context global_context; |
109 | |
110 | struct Buffer |
111 | { |
112 | time_t first_write_time = 0; |
113 | Block data; |
114 | std::mutex mutex; |
115 | }; |
116 | |
117 | /// There are `num_shards` of independent buffers. |
118 | const size_t num_shards; |
119 | std::vector<Buffer> buffers; |
120 | |
121 | const Thresholds min_thresholds; |
122 | const Thresholds max_thresholds; |
123 | |
124 | const String destination_database; |
125 | const String destination_table; |
126 | bool no_destination; /// If set, do not write data from the buffer, but simply empty the buffer. |
127 | bool allow_materialized; |
128 | |
129 | Poco::Logger * log; |
130 | |
131 | Poco::Event shutdown_event; |
132 | /// Resets data by timeout. |
133 | ThreadFromGlobalPool flush_thread; |
134 | |
135 | void flushAllBuffers(bool check_thresholds = true); |
136 | /// Reset the buffer. If check_thresholds is set - resets only if thresholds are exceeded. |
137 | void flushBuffer(Buffer & buffer, bool check_thresholds, bool locked = false); |
138 | bool checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const; |
139 | bool checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const; |
140 | |
141 | /// `table` argument is passed, as it is sometimes evaluated beforehand. It must match the `destination`. |
142 | void writeBlockToDestination(const Block & block, StoragePtr table); |
143 | |
144 | void flushThread(); |
145 | |
146 | protected: |
147 | /** num_shards - the level of internal parallelism (the number of independent buffers) |
148 | * The buffer is flushed if all minimum thresholds or at least one of the maximum thresholds are exceeded. |
149 | */ |
150 | StorageBuffer(const std::string & database_name_, const std::string & table_name_, |
151 | const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, |
152 | Context & context_, |
153 | size_t num_shards_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, |
154 | const String & destination_database_, const String & destination_table_, bool allow_materialized_); |
155 | }; |
156 | |
157 | } |
158 | |