1#pragma once
2
3#include <ext/shared_ptr_helper.h>
4
5#include <Storages/IStorage.h>
6#include <Common/SimpleIncrement.h>
7#include <Client/ConnectionPool.h>
8#include <Client/ConnectionPoolWithFailover.h>
9#include <Core/Settings.h>
10#include <Interpreters/Cluster.h>
11#include <Interpreters/ExpressionActions.h>
12#include <Parsers/ASTFunction.h>
13#include <common/logger_useful.h>
14#include <Common/ActionBlocker.h>
15
16
17namespace DB
18{
19
20class Context;
21class StorageDistributedDirectoryMonitor;
22
23
24/** A distributed table that resides on multiple servers.
25 * Uses data from the specified database and tables on each server.
26 *
27 * You can pass one address, not several.
28 * In this case, the table can be considered remote, rather than distributed.
29 */
30class StorageDistributed : public ext::shared_ptr_helper<StorageDistributed>, public IStorage
31{
32 friend struct ext::shared_ptr_helper<StorageDistributed>;
33 friend class DistributedBlockOutputStream;
34 friend class StorageDistributedDirectoryMonitor;
35
36public:
37 ~StorageDistributed() override;
38
39 static StoragePtr createWithOwnCluster(
40 const std::string & table_name_,
41 const ColumnsDescription & columns_,
42 const String & remote_database_, /// database on remote servers.
43 const String & remote_table_, /// The name of the table on the remote servers.
44 ClusterPtr owned_cluster_,
45 const Context & context_);
46
47 static StoragePtr createWithOwnCluster(
48 const std::string & table_name_,
49 const ColumnsDescription & columns_,
50 ASTPtr & remote_table_function_ptr_, /// Table function ptr.
51 ClusterPtr & owned_cluster_,
52 const Context & context_);
53
54 std::string getName() const override { return "Distributed"; }
55 std::string getTableName() const override { return table_name; }
56 std::string getDatabaseName() const override { return database_name; }
57
58 bool supportsSampling() const override { return true; }
59 bool supportsFinal() const override { return true; }
60 bool supportsPrewhere() const override { return true; }
61
62 NameAndTypePair getColumn(const String & column_name) const override;
63 bool hasColumn(const String & column_name) const override;
64
65 bool isRemote() const override { return true; }
66
67 QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
68 QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ClusterPtr & cluster) const;
69
70 BlockInputStreams read(
71 const Names & column_names,
72 const SelectQueryInfo & query_info,
73 const Context & context,
74 QueryProcessingStage::Enum processed_stage,
75 size_t max_block_size,
76 unsigned num_streams) override;
77
78 BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
79
80 void drop(TableStructureWriteLockHolder &) override {}
81
82 /// Removes temporary data in local filesystem.
83 void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
84
85 void rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
86
87
88 void checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) override;
89
90 /// in the sub-tables, you need to manually add and delete columns
91 /// the structure of the sub-table is not checked
92 void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
93
94 void startup() override;
95 void shutdown() override;
96
97 Strings getDataPaths() const override { return {path}; }
98
99 const ExpressionActionsPtr & getShardingKeyExpr() const { return sharding_key_expr; }
100 const String & getShardingKeyColumnName() const { return sharding_key_column_name; }
101 size_t getShardCount() const;
102 const String & getPath() const { return path; }
103 std::string getRemoteDatabaseName() const { return remote_database; }
104 std::string getRemoteTableName() const { return remote_table; }
105 std::string getClusterName() const { return cluster_name; } /// Returns empty string if tables is used by TableFunctionRemote
106
107 /// create directory monitors for each existing subdirectory
108 void createDirectoryMonitors();
109 /// ensure directory monitor thread creation by subdirectory name
110 void requireDirectoryMonitor(const std::string & name);
111 /// ensure connection pool creation and return it
112 ConnectionPoolPtr requireConnectionPool(const std::string & name);
113
114 void flushClusterNodesAllData();
115
116 ClusterPtr getCluster() const;
117
118 ActionLock getActionLock(StorageActionBlockType type) override;
119
120 String table_name;
121 String database_name;
122 String remote_database;
123 String remote_table;
124 ASTPtr remote_table_function_ptr;
125
126 Context global_context;
127 Logger * log = &Logger::get("StorageDistributed");
128
129 /// Used to implement TableFunctionRemote.
130 std::shared_ptr<Cluster> owned_cluster;
131
132 /// Is empty if this storage implements TableFunctionRemote.
133 const String cluster_name;
134
135 bool has_sharding_key;
136 ExpressionActionsPtr sharding_key_expr;
137 String sharding_key_column_name;
138 String path; /// Can be empty if data_path_ is empty. In this case, a directory for the data to be sent is not created.
139
140 struct ClusterNodeData
141 {
142 std::unique_ptr<StorageDistributedDirectoryMonitor> directory_monitor;
143 ConnectionPoolPtr conneciton_pool;
144
145 /// Creates connection_pool if not exists.
146 void requireConnectionPool(const std::string & name, const StorageDistributed & storage);
147 /// Creates directory_monitor if not exists.
148 void requireDirectoryMonitor(const std::string & name, StorageDistributed & storage, ActionBlocker & monitor_blocker);
149
150 void flushAllData();
151
152 void shutdownAndDropAllData();
153 };
154 std::unordered_map<std::string, ClusterNodeData> cluster_nodes_data;
155 std::mutex cluster_nodes_mutex;
156
157 /// Used for global monotonic ordering of files to send.
158 SimpleIncrement file_names_increment;
159
160 ActionBlocker monitors_blocker;
161
162protected:
163 StorageDistributed(
164 const String & database_name_,
165 const String & table_name_,
166 const ColumnsDescription & columns_,
167 const ConstraintsDescription & constraints_,
168 const String & remote_database_,
169 const String & remote_table_,
170 const String & cluster_name_,
171 const Context & context_,
172 const ASTPtr & sharding_key_,
173 const String & relative_data_path_,
174 bool attach_);
175
176 StorageDistributed(
177 const String & database_name,
178 const String & table_name_,
179 const ColumnsDescription & columns_,
180 const ConstraintsDescription & constraints_,
181 ASTPtr remote_table_function_ptr_,
182 const String & cluster_name_,
183 const Context & context_,
184 const ASTPtr & sharding_key_,
185 const String & relative_data_path_,
186 bool attach);
187
188 ClusterPtr skipUnusedShards(ClusterPtr cluster, const SelectQueryInfo & query_info);
189};
190
191}
192