1#pragma once
2
3#include <mutex>
4#include <thread>
5#include <ext/shared_ptr_helper.h>
6#include <Core/NamesAndTypes.h>
7#include <Common/ThreadPool.h>
8#include <Storages/IStorage.h>
9#include <DataStreams/IBlockOutputStream.h>
10#include <Poco/Event.h>
11
12
13namespace Poco { class Logger; }
14
15
16namespace DB
17{
18
19class Context;
20
21
22/** During insertion, buffers the data in the RAM until certain thresholds are exceeded.
23 * When thresholds are exceeded, flushes the data to another table.
24 * When reading, it reads both from its buffers and from the subordinate table.
25 *
26 * The buffer is a set of num_shards blocks.
27 * When writing, select the block number by the remainder of the `ThreadNumber` division by `num_shards` (or one of the others),
28 * and add rows to the corresponding block.
29 * When using a block, it is locked by some mutex. If during write the corresponding block is already occupied
30 * - try to lock the next block in a round-robin fashion, and so no more than `num_shards` times (then wait for lock).
31 * Thresholds are checked on insertion, and, periodically, in the background thread (to implement time thresholds).
32 * Thresholds act independently for each shard. Each shard can be flushed independently of the others.
33 * If a block is inserted into the table, which itself exceeds the max-thresholds, it is written directly to the subordinate table without buffering.
34 * Thresholds can be exceeded. For example, if max_rows = 1 000 000, the buffer already had 500 000 rows,
35 * and a part of 800 000 rows is added, then there will be 1 300 000 rows in the buffer, and then such a block will be written to the subordinate table.
36 *
37 * When you destroy a Buffer table, all remaining data is flushed to the subordinate table.
38 * The data in the buffer is not replicated, not logged to disk, not indexed. With a rough restart of the server, the data is lost.
39 */
40class StorageBuffer : public ext::shared_ptr_helper<StorageBuffer>, public IStorage
41{
42friend struct ext::shared_ptr_helper<StorageBuffer>;
43friend class BufferBlockInputStream;
44friend class BufferBlockOutputStream;
45
46public:
47 /// Thresholds.
48 struct Thresholds
49 {
50 time_t time; /// The number of seconds from the insertion of the first row into the block.
51 size_t rows; /// The number of rows in the block.
52 size_t bytes; /// The number of (uncompressed) bytes in the block.
53 };
54
55 std::string getName() const override { return "Buffer"; }
56 std::string getTableName() const override { return table_name; }
57 std::string getDatabaseName() const override { return database_name; }
58
59 QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
60
61 BlockInputStreams read(
62 const Names & column_names,
63 const SelectQueryInfo & query_info,
64 const Context & context,
65 QueryProcessingStage::Enum processed_stage,
66 size_t max_block_size,
67 unsigned num_streams) override;
68
69 BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
70
71 void startup() override;
72 /// Flush all buffers into the subordinate table and stop background thread.
73 void shutdown() override;
74 bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) override;
75
76 void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override
77 {
78 table_name = new_table_name;
79 database_name = new_database_name;
80 }
81
82 bool supportsSampling() const override { return true; }
83 bool supportsPrewhere() const override
84 {
85 if (no_destination)
86 return false;
87 auto dest = global_context.tryGetTable(destination_database, destination_table);
88 if (dest && dest.get() != this)
89 return dest->supportsPrewhere();
90 return false;
91 }
92 bool supportsFinal() const override { return true; }
93 bool supportsIndexForIn() const override { return true; }
94
95 bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const override;
96
97 void checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) override;
98
99 /// The structure of the subordinate table is not checked and does not change.
100 void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
101
102 ~StorageBuffer() override;
103
104private:
105 String table_name;
106 String database_name;
107
108 Context global_context;
109
110 struct Buffer
111 {
112 time_t first_write_time = 0;
113 Block data;
114 std::mutex mutex;
115 };
116
117 /// There are `num_shards` of independent buffers.
118 const size_t num_shards;
119 std::vector<Buffer> buffers;
120
121 const Thresholds min_thresholds;
122 const Thresholds max_thresholds;
123
124 const String destination_database;
125 const String destination_table;
126 bool no_destination; /// If set, do not write data from the buffer, but simply empty the buffer.
127 bool allow_materialized;
128
129 Poco::Logger * log;
130
131 Poco::Event shutdown_event;
132 /// Resets data by timeout.
133 ThreadFromGlobalPool flush_thread;
134
135 void flushAllBuffers(bool check_thresholds = true);
136 /// Reset the buffer. If check_thresholds is set - resets only if thresholds are exceeded.
137 void flushBuffer(Buffer & buffer, bool check_thresholds, bool locked = false);
138 bool checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const;
139 bool checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const;
140
141 /// `table` argument is passed, as it is sometimes evaluated beforehand. It must match the `destination`.
142 void writeBlockToDestination(const Block & block, StoragePtr table);
143
144 void flushThread();
145
146protected:
147 /** num_shards - the level of internal parallelism (the number of independent buffers)
148 * The buffer is flushed if all minimum thresholds or at least one of the maximum thresholds are exceeded.
149 */
150 StorageBuffer(const std::string & database_name_, const std::string & table_name_,
151 const ColumnsDescription & columns_, const ConstraintsDescription & constraints_,
152 Context & context_,
153 size_t num_shards_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_,
154 const String & destination_database_, const String & destination_table_, bool allow_materialized_);
155};
156
157}
158