1#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
2#include <Interpreters/InterpreterSelectQuery.h>
3#include <DataStreams/RemoteBlockInputStream.h>
4#include <DataStreams/MaterializingBlockInputStream.h>
5#include <DataStreams/LazyBlockInputStream.h>
6#include <Storages/StorageReplicatedMergeTree.h>
7#include <Storages/VirtualColumnUtils.h>
8#include <Common/Exception.h>
9#include <Common/ProfileEvents.h>
10#include <Common/checkStackSize.h>
11#include <TableFunctions/TableFunctionFactory.h>
12
13#include <common/logger_useful.h>
14#include <DataStreams/ConvertingBlockInputStream.h>
15
16
17namespace ProfileEvents
18{
19 extern const Event DistributedConnectionMissingTable;
20 extern const Event DistributedConnectionStaleReplica;
21}
22
23namespace DB
24{
25
26namespace ErrorCodes
27{
28 extern const int ALL_REPLICAS_ARE_STALE;
29}
30
31namespace ClusterProxy
32{
33
34SelectStreamFactory::SelectStreamFactory(
35 const Block & header_,
36 QueryProcessingStage::Enum processed_stage_,
37 QualifiedTableName main_table_,
38 const Scalars & scalars_,
39 bool has_virtual_shard_num_column_,
40 const Tables & external_tables_)
41 : header(header_),
42 processed_stage{processed_stage_},
43 main_table(std::move(main_table_)),
44 table_func_ptr{nullptr},
45 scalars{scalars_},
46 has_virtual_shard_num_column(has_virtual_shard_num_column_),
47 external_tables{external_tables_}
48{
49}
50
51SelectStreamFactory::SelectStreamFactory(
52 const Block & header_,
53 QueryProcessingStage::Enum processed_stage_,
54 ASTPtr table_func_ptr_,
55 const Scalars & scalars_,
56 bool has_virtual_shard_num_column_,
57 const Tables & external_tables_)
58 : header(header_),
59 processed_stage{processed_stage_},
60 table_func_ptr{table_func_ptr_},
61 scalars{scalars_},
62 has_virtual_shard_num_column(has_virtual_shard_num_column_),
63 external_tables{external_tables_}
64{
65}
66
67namespace
68{
69
70BlockInputStreamPtr createLocalStream(const ASTPtr & query_ast, const Block & header, const Context & context, QueryProcessingStage::Enum processed_stage)
71{
72 checkStackSize();
73
74 InterpreterSelectQuery interpreter{query_ast, context, SelectQueryOptions(processed_stage)};
75 BlockInputStreamPtr stream = interpreter.execute().in;
76
77 /** Materialization is needed, since from remote servers the constants come materialized.
78 * If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
79 * And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
80 */
81
82 /* Now we don't need to materialize constants, because RemoteBlockInputStream will ignore constant and take it from header.
83 * So, streams from different threads will always have the same header.
84 */
85 /// return std::make_shared<MaterializingBlockInputStream>(stream);
86
87 return std::make_shared<ConvertingBlockInputStream>(context, stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
88}
89
90static String formattedAST(const ASTPtr & ast)
91{
92 if (!ast)
93 return "";
94 std::stringstream ss;
95 formatAST(*ast, ss, false, true);
96 return ss.str();
97}
98
99}
100
101void SelectStreamFactory::createForShard(
102 const Cluster::ShardInfo & shard_info,
103 const String &, const ASTPtr & query_ast,
104 const Context & context, const ThrottlerPtr & throttler,
105 BlockInputStreams & res)
106{
107 auto modified_query_ast = query_ast->clone();
108 if (has_virtual_shard_num_column)
109 VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, "_shard_num", shard_info.shard_num, "toUInt32");
110
111 auto emplace_local_stream = [&]()
112 {
113 res.emplace_back(createLocalStream(modified_query_ast, header, context, processed_stage));
114 };
115
116 String modified_query = formattedAST(modified_query_ast);
117
118 auto emplace_remote_stream = [&]()
119 {
120 auto stream = std::make_shared<RemoteBlockInputStream>(
121 shard_info.pool, modified_query, header, context, nullptr, throttler, scalars, external_tables, processed_stage);
122 stream->setPoolMode(PoolMode::GET_MANY);
123 if (!table_func_ptr)
124 stream->setMainTable(main_table);
125 res.emplace_back(std::move(stream));
126 };
127
128 const auto & settings = context.getSettingsRef();
129
130 if (settings.prefer_localhost_replica && shard_info.isLocal())
131 {
132 StoragePtr main_table_storage;
133
134 if (table_func_ptr)
135 {
136 const auto * table_function = table_func_ptr->as<ASTFunction>();
137 TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context);
138 main_table_storage = table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName());
139 }
140 else
141 main_table_storage = context.tryGetTable(main_table.database, main_table.table);
142
143
144 if (!main_table_storage) /// Table is absent on a local server.
145 {
146 ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);
147 if (shard_info.hasRemoteConnections())
148 {
149 LOG_WARNING(
150 &Logger::get("ClusterProxy::SelectStreamFactory"),
151 "There is no table " << main_table.database << "." << main_table.table
152 << " on local replica of shard " << shard_info.shard_num << ", will try remote replicas.");
153 emplace_remote_stream();
154 }
155 else
156 emplace_local_stream(); /// Let it fail the usual way.
157
158 return;
159 }
160
161 const auto * replicated_storage = dynamic_cast<const StorageReplicatedMergeTree *>(main_table_storage.get());
162
163 if (!replicated_storage)
164 {
165 /// Table is not replicated, use local server.
166 emplace_local_stream();
167 return;
168 }
169
170 UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries;
171
172 if (!max_allowed_delay)
173 {
174 emplace_local_stream();
175 return;
176 }
177
178 UInt32 local_delay = replicated_storage->getAbsoluteDelay();
179
180 if (local_delay < max_allowed_delay)
181 {
182 emplace_local_stream();
183 return;
184 }
185
186 /// If we reached this point, local replica is stale.
187 ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica);
188 LOG_WARNING(
189 &Logger::get("ClusterProxy::SelectStreamFactory"),
190 "Local replica of shard " << shard_info.shard_num << " is stale (delay: " << local_delay << "s.)");
191
192 if (!settings.fallback_to_stale_replicas_for_distributed_queries)
193 {
194 if (shard_info.hasRemoteConnections())
195 {
196 /// If we cannot fallback, then we cannot use local replica. Try our luck with remote replicas.
197 emplace_remote_stream();
198 return;
199 }
200 else
201 throw Exception(
202 "Local replica of shard " + toString(shard_info.shard_num)
203 + " is stale (delay: " + toString(local_delay) + "s.), but no other replica configured",
204 ErrorCodes::ALL_REPLICAS_ARE_STALE);
205 }
206
207 if (!shard_info.hasRemoteConnections())
208 {
209 /// There are no remote replicas but we are allowed to fall back to stale local replica.
210 emplace_local_stream();
211 return;
212 }
213
214 /// Try our luck with remote replicas, but if they are stale too, then fallback to local replica.
215 /// Do it lazily to avoid connecting in the main thread.
216
217 auto lazily_create_stream = [
218 pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = header, modified_query_ast, context, throttler,
219 main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables,
220 stage = processed_stage, local_delay]()
221 -> BlockInputStreamPtr
222 {
223 auto current_settings = context.getSettingsRef();
224 auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(
225 current_settings).getSaturated(
226 current_settings.max_execution_time);
227 std::vector<ConnectionPoolWithFailover::TryResult> try_results;
228 try
229 {
230 if (table_func_ptr)
231 try_results = pool->getManyForTableFunction(timeouts, &current_settings, PoolMode::GET_MANY);
232 else
233 try_results = pool->getManyChecked(timeouts, &current_settings, PoolMode::GET_MANY, main_table);
234 }
235 catch (const Exception & ex)
236 {
237 if (ex.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)
238 LOG_WARNING(
239 &Logger::get("ClusterProxy::SelectStreamFactory"),
240 "Connections to remote replicas of local shard " << shard_num << " failed, will use stale local replica");
241 else
242 throw;
243 }
244
245 double max_remote_delay = 0.0;
246 for (const auto & try_result : try_results)
247 {
248 if (!try_result.is_up_to_date)
249 max_remote_delay = std::max(try_result.staleness, max_remote_delay);
250 }
251
252 if (try_results.empty() || local_delay < max_remote_delay)
253 return createLocalStream(modified_query_ast, header, context, stage);
254 else
255 {
256 std::vector<IConnectionPool::Entry> connections;
257 connections.reserve(try_results.size());
258 for (auto & try_result : try_results)
259 connections.emplace_back(std::move(try_result.entry));
260
261 return std::make_shared<RemoteBlockInputStream>(
262 std::move(connections), modified_query, header, context, nullptr, throttler, scalars, external_tables, stage);
263 }
264 };
265
266 res.emplace_back(std::make_shared<LazyBlockInputStream>("LazyShardWithLocalReplica", header, lazily_create_stream));
267 }
268 else
269 emplace_remote_stream();
270}
271
272}
273}
274