1#include <DataStreams/RemoteBlockInputStream.h>
2#include <DataStreams/OneBlockInputStream.h>
3#include <Common/NetException.h>
4#include <Common/CurrentThread.h>
5#include <Columns/ColumnConst.h>
6#include <Interpreters/Context.h>
7#include <Interpreters/castColumn.h>
8#include <Interpreters/InternalTextLogsQueue.h>
9#include <Storages/IStorage.h>
10
11#include <IO/ConnectionTimeouts.h>
12
13
14namespace DB
15{
16
17namespace ErrorCodes
18{
19 extern const int UNKNOWN_PACKET_FROM_SERVER;
20 extern const int LOGICAL_ERROR;
21}
22
23
24RemoteBlockInputStream::RemoteBlockInputStream(
25 Connection & connection,
26 const String & query_, const Block & header_, const Context & context_, const Settings * settings,
27 const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
28 : header(header_), query(query_), context(context_), scalars(scalars_), external_tables(external_tables_), stage(stage_)
29{
30 if (settings)
31 context.setSettings(*settings);
32
33 create_multiplexed_connections = [this, &connection, throttler]()
34 {
35 return std::make_unique<MultiplexedConnections>(connection, context.getSettingsRef(), throttler);
36 };
37}
38
39RemoteBlockInputStream::RemoteBlockInputStream(
40 std::vector<IConnectionPool::Entry> && connections,
41 const String & query_, const Block & header_, const Context & context_, const Settings * settings,
42 const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
43 : header(header_), query(query_), context(context_), scalars(scalars_), external_tables(external_tables_), stage(stage_)
44{
45 if (settings)
46 context.setSettings(*settings);
47
48 create_multiplexed_connections = [this, connections, throttler]() mutable
49 {
50 return std::make_unique<MultiplexedConnections>(
51 std::move(connections), context.getSettingsRef(), throttler);
52 };
53}
54
55RemoteBlockInputStream::RemoteBlockInputStream(
56 const ConnectionPoolWithFailoverPtr & pool,
57 const String & query_, const Block & header_, const Context & context_, const Settings * settings,
58 const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
59 : header(header_), query(query_), context(context_), scalars(scalars_), external_tables(external_tables_), stage(stage_)
60{
61 if (settings)
62 context.setSettings(*settings);
63
64 create_multiplexed_connections = [this, pool, throttler]()
65 {
66 const Settings & current_settings = context.getSettingsRef();
67 auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
68 std::vector<IConnectionPool::Entry> connections;
69 if (main_table)
70 {
71 auto try_results = pool->getManyChecked(timeouts, &current_settings, pool_mode, *main_table);
72 connections.reserve(try_results.size());
73 for (auto & try_result : try_results)
74 connections.emplace_back(std::move(try_result.entry));
75 }
76 else
77 connections = pool->getMany(timeouts, &current_settings, pool_mode);
78
79 return std::make_unique<MultiplexedConnections>(
80 std::move(connections), current_settings, throttler);
81 };
82}
83
84RemoteBlockInputStream::~RemoteBlockInputStream()
85{
86 /** If interrupted in the middle of the loop of communication with replicas, then interrupt
87 * all connections, then read and skip the remaining packets to make sure
88 * these connections did not remain hanging in the out-of-sync state.
89 */
90 if (established || isQueryPending())
91 multiplexed_connections->disconnect();
92}
93
94void RemoteBlockInputStream::readPrefix()
95{
96 if (!sent_query)
97 sendQuery();
98}
99
100void RemoteBlockInputStream::cancel(bool kill)
101{
102 if (kill)
103 is_killed = true;
104
105 bool old_val = false;
106 if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
107 return;
108
109 {
110 std::lock_guard lock(external_tables_mutex);
111
112 /// Stop sending external data.
113 for (auto & vec : external_tables_data)
114 for (auto & elem : vec)
115 elem.first->cancel(kill);
116 }
117
118 if (!isQueryPending() || hasThrownException())
119 return;
120
121 tryCancel("Cancelling query");
122}
123
124void RemoteBlockInputStream::sendScalars()
125{
126 multiplexed_connections->sendScalarsData(scalars);
127}
128
129void RemoteBlockInputStream::sendExternalTables()
130{
131 size_t count = multiplexed_connections->size();
132
133 {
134 std::lock_guard lock(external_tables_mutex);
135
136 external_tables_data.reserve(count);
137
138 for (size_t i = 0; i < count; ++i)
139 {
140 ExternalTablesData res;
141 for (const auto & table : external_tables)
142 {
143 StoragePtr cur = table.second;
144 QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(context);
145 BlockInputStreams input = cur->read(cur->getColumns().getNamesOfPhysical(), {}, context,
146 read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
147 if (input.size() == 0)
148 res.push_back(std::make_pair(std::make_shared<OneBlockInputStream>(cur->getSampleBlock()), table.first));
149 else
150 res.push_back(std::make_pair(input[0], table.first));
151 }
152 external_tables_data.push_back(std::move(res));
153 }
154 }
155
156 multiplexed_connections->sendExternalTablesData(external_tables_data);
157}
158
159
160/** If we receive a block with slightly different column types, or with excessive columns,
161 * we will adapt it to expected structure.
162 */
163static Block adaptBlockStructure(const Block & block, const Block & header, const Context & context)
164{
165 /// Special case when reader doesn't care about result structure. Deprecated and used only in Benchmark, PerformanceTest.
166 if (!header)
167 return block;
168
169 Block res;
170 res.info = block.info;
171
172 for (const auto & elem : header)
173 {
174 ColumnPtr column;
175
176 if (elem.column && isColumnConst(*elem.column))
177 {
178 /// We expect constant column in block.
179 /// If block is not empty, then get value for constant from it,
180 /// because it may be different for remote server for functions like version(), uptime(), ...
181 if (block.rows() > 0 && block.has(elem.name))
182 {
183 /// Const column is passed as materialized. Get first value from it.
184 ///
185 /// TODO: check that column contains the same value.
186 /// TODO: serialize const columns.
187 auto col = block.getByName(elem.name);
188 col.column = block.getByName(elem.name).column->cut(0, 1);
189
190 column = castColumn(col, elem.type, context);
191
192 if (!isColumnConst(*column))
193 column = ColumnConst::create(column, block.rows());
194 else
195 /// It is not possible now. Just in case we support const columns serialization.
196 column = column->cloneResized(block.rows());
197 }
198 else
199 column = elem.column->cloneResized(block.rows());
200 }
201 else
202 column = castColumn(block.getByName(elem.name), elem.type, context);
203
204 res.insert({column, elem.type, elem.name});
205 }
206 return res;
207}
208
209
210Block RemoteBlockInputStream::readImpl()
211{
212 if (!sent_query)
213 {
214 sendQuery();
215
216 if (context.getSettingsRef().skip_unavailable_shards && (0 == multiplexed_connections->size()))
217 return {};
218 }
219
220 while (true)
221 {
222 if (isCancelledOrThrowIfKilled())
223 return Block();
224
225 Packet packet = multiplexed_connections->receivePacket();
226
227 switch (packet.type)
228 {
229 case Protocol::Server::Data:
230 /// If the block is not empty and is not a header block
231 if (packet.block && (packet.block.rows() > 0))
232 return adaptBlockStructure(packet.block, header, context);
233 break; /// If the block is empty - we will receive other packets before EndOfStream.
234
235 case Protocol::Server::Exception:
236 got_exception_from_replica = true;
237 packet.exception->rethrow();
238 break;
239
240 case Protocol::Server::EndOfStream:
241 if (!multiplexed_connections->hasActiveConnections())
242 {
243 finished = true;
244 return Block();
245 }
246 break;
247
248 case Protocol::Server::Progress:
249 /** We use the progress from a remote server.
250 * We also include in ProcessList,
251 * and we use it to check
252 * constraints (for example, the minimum speed of query execution)
253 * and quotas (for example, the number of lines to read).
254 */
255 progressImpl(packet.progress);
256 break;
257
258 case Protocol::Server::ProfileInfo:
259 /// Use own (client-side) info about read bytes, it is more correct info than server-side one.
260 info.setFrom(packet.profile_info, true);
261 break;
262
263 case Protocol::Server::Totals:
264 totals = packet.block;
265 break;
266
267 case Protocol::Server::Extremes:
268 extremes = packet.block;
269 break;
270
271 case Protocol::Server::Log:
272 /// Pass logs from remote server to client
273 if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
274 log_queue->pushBlock(std::move(packet.block));
275 break;
276
277 default:
278 got_unknown_packet_from_replica = true;
279 throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
280 }
281 }
282}
283
284void RemoteBlockInputStream::readSuffixImpl()
285{
286 /** If one of:
287 * - nothing started to do;
288 * - received all packets before EndOfStream;
289 * - received exception from one replica;
290 * - received an unknown packet from one replica;
291 * then you do not need to read anything.
292 */
293 if (!isQueryPending() || hasThrownException())
294 return;
295
296 /** If you have not read all the data yet, but they are no longer needed.
297 * This may be due to the fact that the data is sufficient (for example, when using LIMIT).
298 */
299
300 /// Send the request to abort the execution of the request, if not already sent.
301 tryCancel("Cancelling query because enough data has been read");
302
303 /// Get the remaining packets so that there is no out of sync in the connections to the replicas.
304 Packet packet = multiplexed_connections->drain();
305 switch (packet.type)
306 {
307 case Protocol::Server::EndOfStream:
308 finished = true;
309 break;
310
311 case Protocol::Server::Exception:
312 got_exception_from_replica = true;
313 packet.exception->rethrow();
314 break;
315
316 default:
317 got_unknown_packet_from_replica = true;
318 throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
319 }
320}
321
322void RemoteBlockInputStream::sendQuery()
323{
324 multiplexed_connections = create_multiplexed_connections();
325
326 const auto& settings = context.getSettingsRef();
327 if (settings.skip_unavailable_shards && 0 == multiplexed_connections->size())
328 return;
329
330 established = true;
331
332 auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
333 multiplexed_connections->sendQuery(timeouts, query, query_id, stage, &context.getClientInfo(), true);
334
335 established = false;
336 sent_query = true;
337
338 if (settings.enable_scalar_subquery_optimization)
339 sendScalars();
340 sendExternalTables();
341}
342
343void RemoteBlockInputStream::tryCancel(const char * reason)
344{
345 bool old_val = false;
346 if (!was_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
347 return;
348
349 LOG_TRACE(log, "(" << multiplexed_connections->dumpAddresses() << ") " << reason);
350 multiplexed_connections->sendCancel();
351}
352
353bool RemoteBlockInputStream::isQueryPending() const
354{
355 return sent_query && !finished;
356}
357
358bool RemoteBlockInputStream::hasThrownException() const
359{
360 return got_exception_from_replica || got_unknown_packet_from_replica;
361}
362
363}
364