1 | #include <iomanip> |
2 | |
3 | #include <Poco/Net/NetException.h> |
4 | #include <Core/Defines.h> |
5 | #include <Compression/CompressedReadBuffer.h> |
6 | #include <Compression/CompressedWriteBuffer.h> |
7 | #include <IO/ReadBufferFromPocoSocket.h> |
8 | #include <IO/WriteBufferFromPocoSocket.h> |
9 | #include <IO/ReadHelpers.h> |
10 | #include <IO/WriteHelpers.h> |
11 | #include <IO/copyData.h> |
12 | #include <DataStreams/NativeBlockInputStream.h> |
13 | #include <DataStreams/NativeBlockOutputStream.h> |
14 | #include <Client/Connection.h> |
15 | #include <Client/TimeoutSetter.h> |
16 | #include <Common/ClickHouseRevision.h> |
17 | #include <Common/Exception.h> |
18 | #include <Common/NetException.h> |
19 | #include <Common/CurrentMetrics.h> |
20 | #include <Common/DNSResolver.h> |
21 | #include <Common/StringUtils/StringUtils.h> |
22 | #include <Common/config_version.h> |
23 | #include <Interpreters/ClientInfo.h> |
24 | #include <Compression/CompressionFactory.h> |
25 | |
26 | #include <Common/config.h> |
27 | #if USE_POCO_NETSSL |
28 | #include <Poco/Net/SecureStreamSocket.h> |
29 | #endif |
30 | |
31 | namespace CurrentMetrics |
32 | { |
33 | extern const Metric SendScalars; |
34 | extern const Metric SendExternalTables; |
35 | } |
36 | |
37 | namespace DB |
38 | { |
39 | |
40 | namespace ErrorCodes |
41 | { |
42 | extern const int NETWORK_ERROR; |
43 | extern const int SOCKET_TIMEOUT; |
44 | extern const int SERVER_REVISION_IS_TOO_OLD; |
45 | extern const int UNEXPECTED_PACKET_FROM_SERVER; |
46 | extern const int UNKNOWN_PACKET_FROM_SERVER; |
47 | extern const int SUPPORT_IS_DISABLED; |
48 | extern const int BAD_ARGUMENTS; |
49 | } |
50 | |
51 | |
52 | void Connection::connect(const ConnectionTimeouts & timeouts) |
53 | { |
54 | try |
55 | { |
56 | if (connected) |
57 | disconnect(); |
58 | |
59 | LOG_TRACE(log_wrapper.get(), "Connecting. Database: " << (default_database.empty() ? "(not specified)" : default_database) << ". User: " << user |
60 | << (static_cast<bool>(secure) ? ". Secure" : "" ) << (static_cast<bool>(compression) ? "" : ". Uncompressed" )); |
61 | |
62 | if (static_cast<bool>(secure)) |
63 | { |
64 | #if USE_POCO_NETSSL |
65 | socket = std::make_unique<Poco::Net::SecureStreamSocket>(); |
66 | #else |
67 | throw Exception{"tcp_secure protocol is disabled because poco library was built without NetSSL support." , ErrorCodes::SUPPORT_IS_DISABLED}; |
68 | #endif |
69 | } |
70 | else |
71 | { |
72 | socket = std::make_unique<Poco::Net::StreamSocket>(); |
73 | } |
74 | |
75 | current_resolved_address = DNSResolver::instance().resolveAddress(host, port); |
76 | |
77 | socket->connect(*current_resolved_address, timeouts.connection_timeout); |
78 | socket->setReceiveTimeout(timeouts.receive_timeout); |
79 | socket->setSendTimeout(timeouts.send_timeout); |
80 | socket->setNoDelay(true); |
81 | if (timeouts.tcp_keep_alive_timeout.totalSeconds()) |
82 | { |
83 | socket->setKeepAlive(true); |
84 | socket->setOption(IPPROTO_TCP, |
85 | #if defined(TCP_KEEPALIVE) |
86 | TCP_KEEPALIVE |
87 | #else |
88 | TCP_KEEPIDLE // __APPLE__ |
89 | #endif |
90 | , timeouts.tcp_keep_alive_timeout); |
91 | } |
92 | |
93 | in = std::make_shared<ReadBufferFromPocoSocket>(*socket); |
94 | out = std::make_shared<WriteBufferFromPocoSocket>(*socket); |
95 | |
96 | connected = true; |
97 | |
98 | sendHello(); |
99 | receiveHello(); |
100 | |
101 | LOG_TRACE(log_wrapper.get(), "Connected to " << server_name |
102 | << " server version " << server_version_major |
103 | << "." << server_version_minor |
104 | << "." << server_version_patch |
105 | << "." ); |
106 | } |
107 | catch (Poco::Net::NetException & e) |
108 | { |
109 | disconnect(); |
110 | |
111 | /// Add server address to exception. Also Exception will remember stack trace. It's a pity that more precise exception type is lost. |
112 | throw NetException(e.displayText() + " (" + getDescription() + ")" , ErrorCodes::NETWORK_ERROR); |
113 | } |
114 | catch (Poco::TimeoutException & e) |
115 | { |
116 | disconnect(); |
117 | |
118 | /// Add server address to exception. Also Exception will remember stack trace. It's a pity that more precise exception type is lost. |
119 | throw NetException(e.displayText() + " (" + getDescription() + ")" , ErrorCodes::SOCKET_TIMEOUT); |
120 | } |
121 | } |
122 | |
123 | |
124 | void Connection::disconnect() |
125 | { |
126 | //LOG_TRACE(log_wrapper.get(), "Disconnecting"); |
127 | |
128 | in = nullptr; |
129 | last_input_packet_type.reset(); |
130 | out = nullptr; // can write to socket |
131 | if (socket) |
132 | socket->close(); |
133 | socket = nullptr; |
134 | connected = false; |
135 | } |
136 | |
137 | |
138 | void Connection::sendHello() |
139 | { |
140 | /** Disallow control characters in user controlled parameters |
141 | * to mitigate the possibility of SSRF. |
142 | * The user may do server side requests with 'remote' table function. |
143 | * Malicious user with full r/w access to ClickHouse |
144 | * may use 'remote' table function to forge requests |
145 | * to another services in the network other than ClickHouse (examples: SMTP). |
146 | * Limiting number of possible characters in user-controlled part of handshake |
147 | * will mitigate this possibility but doesn't solve it completely. |
148 | */ |
149 | auto has_control_character = [](const std::string & s) |
150 | { |
151 | for (auto c : s) |
152 | if (isControlASCII(c)) |
153 | return true; |
154 | return false; |
155 | }; |
156 | |
157 | if (has_control_character(default_database) |
158 | || has_control_character(user) |
159 | || has_control_character(password)) |
160 | throw Exception("Parameters 'default_database', 'user' and 'password' must not contain ASCII control characters" , ErrorCodes::BAD_ARGUMENTS); |
161 | |
162 | writeVarUInt(Protocol::Client::Hello, *out); |
163 | writeStringBinary((DBMS_NAME " " ) + client_name, *out); |
164 | writeVarUInt(DBMS_VERSION_MAJOR, *out); |
165 | writeVarUInt(DBMS_VERSION_MINOR, *out); |
166 | // NOTE For backward compatibility of the protocol, client cannot send its version_patch. |
167 | writeVarUInt(ClickHouseRevision::get(), *out); |
168 | writeStringBinary(default_database, *out); |
169 | writeStringBinary(user, *out); |
170 | writeStringBinary(password, *out); |
171 | |
172 | out->next(); |
173 | } |
174 | |
175 | |
176 | void Connection::receiveHello() |
177 | { |
178 | //LOG_TRACE(log_wrapper.get(), "Receiving hello"); |
179 | |
180 | /// Receive hello packet. |
181 | UInt64 packet_type = 0; |
182 | |
183 | readVarUInt(packet_type, *in); |
184 | if (packet_type == Protocol::Server::Hello) |
185 | { |
186 | readStringBinary(server_name, *in); |
187 | readVarUInt(server_version_major, *in); |
188 | readVarUInt(server_version_minor, *in); |
189 | readVarUInt(server_revision, *in); |
190 | if (server_revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) |
191 | readStringBinary(server_timezone, *in); |
192 | if (server_revision >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME) |
193 | readStringBinary(server_display_name, *in); |
194 | if (server_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) |
195 | readVarUInt(server_version_patch, *in); |
196 | else |
197 | server_version_patch = server_revision; |
198 | } |
199 | else if (packet_type == Protocol::Server::Exception) |
200 | receiveException()->rethrow(); |
201 | else |
202 | { |
203 | /// Close connection, to not stay in unsynchronised state. |
204 | disconnect(); |
205 | throwUnexpectedPacket(packet_type, "Hello or Exception" ); |
206 | } |
207 | } |
208 | |
209 | void Connection::setDefaultDatabase(const String & database) |
210 | { |
211 | default_database = database; |
212 | } |
213 | |
214 | const String & Connection::getDefaultDatabase() const |
215 | { |
216 | return default_database; |
217 | } |
218 | |
219 | const String & Connection::getDescription() const |
220 | { |
221 | return description; |
222 | } |
223 | |
224 | const String & Connection::getHost() const |
225 | { |
226 | return host; |
227 | } |
228 | |
229 | UInt16 Connection::getPort() const |
230 | { |
231 | return port; |
232 | } |
233 | |
234 | void Connection::getServerVersion(const ConnectionTimeouts & timeouts, |
235 | String & name, |
236 | UInt64 & version_major, |
237 | UInt64 & version_minor, |
238 | UInt64 & version_patch, |
239 | UInt64 & revision) |
240 | { |
241 | if (!connected) |
242 | connect(timeouts); |
243 | |
244 | name = server_name; |
245 | version_major = server_version_major; |
246 | version_minor = server_version_minor; |
247 | version_patch = server_version_patch; |
248 | revision = server_revision; |
249 | } |
250 | |
251 | UInt64 Connection::getServerRevision(const ConnectionTimeouts & timeouts) |
252 | { |
253 | if (!connected) |
254 | connect(timeouts); |
255 | |
256 | return server_revision; |
257 | } |
258 | |
259 | const String & Connection::getServerTimezone(const ConnectionTimeouts & timeouts) |
260 | { |
261 | if (!connected) |
262 | connect(timeouts); |
263 | |
264 | return server_timezone; |
265 | } |
266 | |
267 | const String & Connection::getServerDisplayName(const ConnectionTimeouts & timeouts) |
268 | { |
269 | if (!connected) |
270 | connect(timeouts); |
271 | |
272 | return server_display_name; |
273 | } |
274 | |
275 | void Connection::forceConnected(const ConnectionTimeouts & timeouts) |
276 | { |
277 | if (!connected) |
278 | { |
279 | connect(timeouts); |
280 | } |
281 | else if (!ping()) |
282 | { |
283 | LOG_TRACE(log_wrapper.get(), "Connection was closed, will reconnect." ); |
284 | connect(timeouts); |
285 | } |
286 | } |
287 | |
288 | bool Connection::ping() |
289 | { |
290 | // LOG_TRACE(log_wrapper.get(), "Ping"); |
291 | |
292 | TimeoutSetter timeout_setter(*socket, sync_request_timeout, true); |
293 | try |
294 | { |
295 | UInt64 pong = 0; |
296 | writeVarUInt(Protocol::Client::Ping, *out); |
297 | out->next(); |
298 | |
299 | if (in->eof()) |
300 | return false; |
301 | |
302 | readVarUInt(pong, *in); |
303 | |
304 | /// Could receive late packets with progress. TODO: Maybe possible to fix. |
305 | while (pong == Protocol::Server::Progress) |
306 | { |
307 | receiveProgress(); |
308 | |
309 | if (in->eof()) |
310 | return false; |
311 | |
312 | readVarUInt(pong, *in); |
313 | } |
314 | |
315 | if (pong != Protocol::Server::Pong) |
316 | throwUnexpectedPacket(pong, "Pong" ); |
317 | } |
318 | catch (const Poco::Exception & e) |
319 | { |
320 | LOG_TRACE(log_wrapper.get(), e.displayText()); |
321 | return false; |
322 | } |
323 | |
324 | return true; |
325 | } |
326 | |
327 | TablesStatusResponse Connection::getTablesStatus(const ConnectionTimeouts & timeouts, |
328 | const TablesStatusRequest & request) |
329 | { |
330 | if (!connected) |
331 | connect(timeouts); |
332 | |
333 | TimeoutSetter timeout_setter(*socket, sync_request_timeout, true); |
334 | |
335 | writeVarUInt(Protocol::Client::TablesStatusRequest, *out); |
336 | request.write(*out, server_revision); |
337 | out->next(); |
338 | |
339 | UInt64 response_type = 0; |
340 | readVarUInt(response_type, *in); |
341 | |
342 | if (response_type == Protocol::Server::Exception) |
343 | receiveException()->rethrow(); |
344 | else if (response_type != Protocol::Server::TablesStatusResponse) |
345 | throwUnexpectedPacket(response_type, "TablesStatusResponse" ); |
346 | |
347 | TablesStatusResponse response; |
348 | response.read(*in, server_revision); |
349 | return response; |
350 | } |
351 | |
352 | |
353 | void Connection::sendQuery( |
354 | const ConnectionTimeouts & timeouts, |
355 | const String & query, |
356 | const String & query_id_, |
357 | UInt64 stage, |
358 | const Settings * settings, |
359 | const ClientInfo * client_info, |
360 | bool with_pending_data) |
361 | { |
362 | if (!connected) |
363 | connect(timeouts); |
364 | |
365 | TimeoutSetter timeout_setter(*socket, timeouts.send_timeout, timeouts.receive_timeout, true); |
366 | |
367 | if (settings) |
368 | { |
369 | std::optional<int> level; |
370 | std::string method = Poco::toUpper(settings->network_compression_method.toString()); |
371 | |
372 | /// Bad custom logic |
373 | if (method == "ZSTD" ) |
374 | level = settings->network_zstd_compression_level; |
375 | |
376 | compression_codec = CompressionCodecFactory::instance().get(method, level); |
377 | } |
378 | else |
379 | compression_codec = CompressionCodecFactory::instance().getDefaultCodec(); |
380 | |
381 | query_id = query_id_; |
382 | |
383 | //LOG_TRACE(log_wrapper.get(), "Sending query"); |
384 | |
385 | writeVarUInt(Protocol::Client::Query, *out); |
386 | writeStringBinary(query_id, *out); |
387 | |
388 | /// Client info. |
389 | if (server_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO) |
390 | { |
391 | ClientInfo client_info_to_send; |
392 | |
393 | if (!client_info || client_info->empty()) |
394 | { |
395 | /// No client info passed - means this query initiated by me. |
396 | client_info_to_send.query_kind = ClientInfo::QueryKind::INITIAL_QUERY; |
397 | client_info_to_send.fillOSUserHostNameAndVersionInfo(); |
398 | client_info_to_send.client_name = (DBMS_NAME " " ) + client_name; |
399 | } |
400 | else |
401 | { |
402 | /// This query is initiated by another query. |
403 | client_info_to_send = *client_info; |
404 | client_info_to_send.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; |
405 | } |
406 | |
407 | client_info_to_send.write(*out, server_revision); |
408 | } |
409 | |
410 | /// Per query settings. |
411 | if (settings) |
412 | { |
413 | auto settings_format = (server_revision >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) ? SettingsBinaryFormat::STRINGS |
414 | : SettingsBinaryFormat::OLD; |
415 | settings->serialize(*out, settings_format); |
416 | } |
417 | else |
418 | writeStringBinary("" /* empty string is a marker of the end of settings */, *out); |
419 | |
420 | writeVarUInt(stage, *out); |
421 | writeVarUInt(static_cast<bool>(compression), *out); |
422 | |
423 | writeStringBinary(query, *out); |
424 | |
425 | maybe_compressed_in.reset(); |
426 | maybe_compressed_out.reset(); |
427 | block_in.reset(); |
428 | block_logs_in.reset(); |
429 | block_out.reset(); |
430 | |
431 | /// Send empty block which means end of data. |
432 | if (!with_pending_data) |
433 | { |
434 | sendData(Block()); |
435 | out->next(); |
436 | } |
437 | } |
438 | |
439 | |
440 | void Connection::sendCancel() |
441 | { |
442 | /// If we already disconnected. |
443 | if (!out) |
444 | return; |
445 | |
446 | //LOG_TRACE(log_wrapper.get(), "Sending cancel"); |
447 | |
448 | writeVarUInt(Protocol::Client::Cancel, *out); |
449 | out->next(); |
450 | } |
451 | |
452 | |
453 | void Connection::sendData(const Block & block, const String & name, bool scalar) |
454 | { |
455 | //LOG_TRACE(log_wrapper.get(), "Sending data"); |
456 | |
457 | if (!block_out) |
458 | { |
459 | if (compression == Protocol::Compression::Enable) |
460 | maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(*out, compression_codec); |
461 | else |
462 | maybe_compressed_out = out; |
463 | |
464 | block_out = std::make_shared<NativeBlockOutputStream>(*maybe_compressed_out, server_revision, block.cloneEmpty()); |
465 | } |
466 | |
467 | if (scalar) |
468 | writeVarUInt(Protocol::Client::Scalar, *out); |
469 | else |
470 | writeVarUInt(Protocol::Client::Data, *out); |
471 | writeStringBinary(name, *out); |
472 | |
473 | size_t prev_bytes = out->count(); |
474 | |
475 | block_out->write(block); |
476 | maybe_compressed_out->next(); |
477 | out->next(); |
478 | |
479 | if (throttler) |
480 | throttler->add(out->count() - prev_bytes); |
481 | } |
482 | |
483 | |
484 | void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String & name) |
485 | { |
486 | /// NOTE 'Throttler' is not used in this method (could use, but it's not important right now). |
487 | |
488 | writeVarUInt(Protocol::Client::Data, *out); |
489 | writeStringBinary(name, *out); |
490 | |
491 | if (0 == size) |
492 | copyData(input, *out); |
493 | else |
494 | copyData(input, *out, size); |
495 | out->next(); |
496 | } |
497 | |
498 | |
499 | void Connection::sendScalarsData(Scalars & data) |
500 | { |
501 | if (data.empty()) |
502 | return; |
503 | |
504 | Stopwatch watch; |
505 | size_t out_bytes = out ? out->count() : 0; |
506 | size_t maybe_compressed_out_bytes = maybe_compressed_out ? maybe_compressed_out->count() : 0; |
507 | size_t rows = 0; |
508 | |
509 | CurrentMetrics::Increment metric_increment{CurrentMetrics::SendScalars}; |
510 | |
511 | for (auto & elem : data) |
512 | { |
513 | rows += elem.second.rows(); |
514 | sendData(elem.second, elem.first, true /* scalar */); |
515 | } |
516 | |
517 | out_bytes = out->count() - out_bytes; |
518 | maybe_compressed_out_bytes = maybe_compressed_out->count() - maybe_compressed_out_bytes; |
519 | double elapsed = watch.elapsedSeconds(); |
520 | |
521 | std::stringstream msg; |
522 | msg << std::fixed << std::setprecision(3); |
523 | msg << "Sent data for " << data.size() << " scalars, total " << rows << " rows in " << elapsed << " sec., " |
524 | << static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., " |
525 | << maybe_compressed_out_bytes / 1048576.0 << " MiB (" << maybe_compressed_out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)" ; |
526 | |
527 | if (compression == Protocol::Compression::Enable) |
528 | msg << ", compressed " << static_cast<double>(maybe_compressed_out_bytes) / out_bytes << " times to " |
529 | << out_bytes / 1048576.0 << " MiB (" << out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)" ; |
530 | else |
531 | msg << ", no compression." ; |
532 | |
533 | LOG_DEBUG(log_wrapper.get(), msg.rdbuf()); |
534 | } |
535 | |
536 | |
537 | void Connection::sendExternalTablesData(ExternalTablesData & data) |
538 | { |
539 | if (data.empty()) |
540 | { |
541 | /// Send empty block, which means end of data transfer. |
542 | sendData(Block()); |
543 | return; |
544 | } |
545 | |
546 | Stopwatch watch; |
547 | size_t out_bytes = out ? out->count() : 0; |
548 | size_t maybe_compressed_out_bytes = maybe_compressed_out ? maybe_compressed_out->count() : 0; |
549 | size_t rows = 0; |
550 | |
551 | CurrentMetrics::Increment metric_increment{CurrentMetrics::SendExternalTables}; |
552 | |
553 | for (auto & elem : data) |
554 | { |
555 | elem.first->readPrefix(); |
556 | while (Block block = elem.first->read()) |
557 | { |
558 | rows += block.rows(); |
559 | sendData(block, elem.second); |
560 | } |
561 | elem.first->readSuffix(); |
562 | } |
563 | |
564 | /// Send empty block, which means end of data transfer. |
565 | sendData(Block()); |
566 | |
567 | out_bytes = out->count() - out_bytes; |
568 | maybe_compressed_out_bytes = maybe_compressed_out->count() - maybe_compressed_out_bytes; |
569 | double elapsed = watch.elapsedSeconds(); |
570 | |
571 | std::stringstream msg; |
572 | msg << std::fixed << std::setprecision(3); |
573 | msg << "Sent data for " << data.size() << " external tables, total " << rows << " rows in " << elapsed << " sec., " |
574 | << static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., " |
575 | << maybe_compressed_out_bytes / 1048576.0 << " MiB (" << maybe_compressed_out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)" ; |
576 | |
577 | if (compression == Protocol::Compression::Enable) |
578 | msg << ", compressed " << static_cast<double>(maybe_compressed_out_bytes) / out_bytes << " times to " |
579 | << out_bytes / 1048576.0 << " MiB (" << out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)" ; |
580 | else |
581 | msg << ", no compression." ; |
582 | |
583 | LOG_DEBUG(log_wrapper.get(), msg.rdbuf()); |
584 | } |
585 | |
586 | std::optional<Poco::Net::SocketAddress> Connection::getResolvedAddress() const |
587 | { |
588 | return current_resolved_address; |
589 | } |
590 | |
591 | |
592 | bool Connection::poll(size_t timeout_microseconds) |
593 | { |
594 | return static_cast<ReadBufferFromPocoSocket &>(*in).poll(timeout_microseconds); |
595 | } |
596 | |
597 | |
598 | bool Connection::hasReadPendingData() const |
599 | { |
600 | return last_input_packet_type.has_value() || static_cast<const ReadBufferFromPocoSocket &>(*in).hasPendingData(); |
601 | } |
602 | |
603 | |
604 | std::optional<UInt64> Connection::checkPacket(size_t timeout_microseconds) |
605 | { |
606 | if (last_input_packet_type.has_value()) |
607 | return last_input_packet_type; |
608 | |
609 | if (hasReadPendingData() || poll(timeout_microseconds)) |
610 | { |
611 | // LOG_TRACE(log_wrapper.get(), "Receiving packet type"); |
612 | UInt64 packet_type; |
613 | readVarUInt(packet_type, *in); |
614 | |
615 | last_input_packet_type.emplace(packet_type); |
616 | return last_input_packet_type; |
617 | } |
618 | |
619 | return {}; |
620 | } |
621 | |
622 | |
623 | Packet Connection::receivePacket() |
624 | { |
625 | try |
626 | { |
627 | Packet res; |
628 | |
629 | /// Have we already read packet type? |
630 | if (last_input_packet_type) |
631 | { |
632 | res.type = *last_input_packet_type; |
633 | last_input_packet_type.reset(); |
634 | } |
635 | else |
636 | { |
637 | //LOG_TRACE(log_wrapper.get(), "Receiving packet type"); |
638 | readVarUInt(res.type, *in); |
639 | } |
640 | |
641 | //LOG_TRACE(log_wrapper.get(), "Receiving packet " << res.type << " " << Protocol::Server::toString(res.type)); |
642 | //std::cerr << "Client got packet: " << Protocol::Server::toString(res.type) << "\n"; |
643 | switch (res.type) |
644 | { |
645 | case Protocol::Server::Data: [[fallthrough]]; |
646 | case Protocol::Server::Totals: [[fallthrough]]; |
647 | case Protocol::Server::Extremes: |
648 | res.block = receiveData(); |
649 | return res; |
650 | |
651 | case Protocol::Server::Exception: |
652 | res.exception = receiveException(); |
653 | return res; |
654 | |
655 | case Protocol::Server::Progress: |
656 | res.progress = receiveProgress(); |
657 | return res; |
658 | |
659 | case Protocol::Server::ProfileInfo: |
660 | res.profile_info = receiveProfileInfo(); |
661 | return res; |
662 | |
663 | case Protocol::Server::Log: |
664 | res.block = receiveLogData(); |
665 | return res; |
666 | |
667 | case Protocol::Server::TableColumns: |
668 | res.multistring_message = receiveMultistringMessage(res.type); |
669 | return res; |
670 | |
671 | case Protocol::Server::EndOfStream: |
672 | return res; |
673 | |
674 | default: |
675 | /// In unknown state, disconnect - to not leave unsynchronised connection. |
676 | disconnect(); |
677 | throw Exception("Unknown packet " |
678 | + toString(res.type) |
679 | + " from server " + getDescription(), ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); |
680 | } |
681 | } |
682 | catch (Exception & e) |
683 | { |
684 | /// Add server address to exception message, if need. |
685 | if (e.code() != ErrorCodes::UNKNOWN_PACKET_FROM_SERVER) |
686 | e.addMessage("while receiving packet from " + getDescription()); |
687 | |
688 | throw; |
689 | } |
690 | } |
691 | |
692 | |
693 | Block Connection::receiveData() |
694 | { |
695 | //LOG_TRACE(log_wrapper.get(), "Receiving data"); |
696 | |
697 | initBlockInput(); |
698 | return receiveDataImpl(block_in); |
699 | } |
700 | |
701 | |
702 | Block Connection::receiveLogData() |
703 | { |
704 | initBlockLogsInput(); |
705 | return receiveDataImpl(block_logs_in); |
706 | } |
707 | |
708 | |
709 | Block Connection::receiveDataImpl(BlockInputStreamPtr & stream) |
710 | { |
711 | String external_table_name; |
712 | readStringBinary(external_table_name, *in); |
713 | |
714 | size_t prev_bytes = in->count(); |
715 | |
716 | /// Read one block from network. |
717 | Block res = stream->read(); |
718 | |
719 | if (throttler) |
720 | throttler->add(in->count() - prev_bytes); |
721 | |
722 | return res; |
723 | } |
724 | |
725 | |
726 | void Connection::initInputBuffers() |
727 | { |
728 | |
729 | } |
730 | |
731 | |
732 | void Connection::initBlockInput() |
733 | { |
734 | if (!block_in) |
735 | { |
736 | if (!maybe_compressed_in) |
737 | { |
738 | if (compression == Protocol::Compression::Enable) |
739 | maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in); |
740 | else |
741 | maybe_compressed_in = in; |
742 | } |
743 | |
744 | block_in = std::make_shared<NativeBlockInputStream>(*maybe_compressed_in, server_revision); |
745 | } |
746 | } |
747 | |
748 | |
749 | void Connection::initBlockLogsInput() |
750 | { |
751 | if (!block_logs_in) |
752 | { |
753 | /// Have to return superset of SystemLogsQueue::getSampleBlock() columns |
754 | block_logs_in = std::make_shared<NativeBlockInputStream>(*in, server_revision); |
755 | } |
756 | } |
757 | |
758 | |
759 | void Connection::setDescription() |
760 | { |
761 | auto resolved_address = getResolvedAddress(); |
762 | description = host + ":" + toString(port); |
763 | |
764 | if (resolved_address) |
765 | { |
766 | auto ip_address = resolved_address->host().toString(); |
767 | if (host != ip_address) |
768 | description += ", " + ip_address; |
769 | } |
770 | } |
771 | |
772 | |
773 | std::unique_ptr<Exception> Connection::receiveException() |
774 | { |
775 | //LOG_TRACE(log_wrapper.get(), "Receiving exception"); |
776 | |
777 | Exception e; |
778 | readException(e, *in, "Received from " + getDescription()); |
779 | return std::unique_ptr<Exception>{ e.clone() }; |
780 | } |
781 | |
782 | |
783 | std::vector<String> Connection::receiveMultistringMessage(UInt64 msg_type) |
784 | { |
785 | size_t num = Protocol::Server::stringsInMessage(msg_type); |
786 | std::vector<String> strings(num); |
787 | for (size_t i = 0; i < num; ++i) |
788 | readStringBinary(strings[i], *in); |
789 | return strings; |
790 | } |
791 | |
792 | |
793 | Progress Connection::receiveProgress() |
794 | { |
795 | //LOG_TRACE(log_wrapper.get(), "Receiving progress"); |
796 | |
797 | Progress progress; |
798 | progress.read(*in, server_revision); |
799 | return progress; |
800 | } |
801 | |
802 | |
803 | BlockStreamProfileInfo Connection::receiveProfileInfo() |
804 | { |
805 | BlockStreamProfileInfo profile_info; |
806 | profile_info.read(*in); |
807 | return profile_info; |
808 | } |
809 | |
810 | |
811 | void Connection::throwUnexpectedPacket(UInt64 packet_type, const char * expected) const |
812 | { |
813 | throw NetException( |
814 | "Unexpected packet from server " + getDescription() + " (expected " + expected |
815 | + ", got " + String(Protocol::Server::toString(packet_type)) + ")" , |
816 | ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER); |
817 | } |
818 | |
819 | } |
820 | |