| 1 | #include <iomanip> | 
| 2 |  | 
| 3 | #include <Poco/Net/NetException.h> | 
| 4 | #include <Core/Defines.h> | 
| 5 | #include <Compression/CompressedReadBuffer.h> | 
| 6 | #include <Compression/CompressedWriteBuffer.h> | 
| 7 | #include <IO/ReadBufferFromPocoSocket.h> | 
| 8 | #include <IO/WriteBufferFromPocoSocket.h> | 
| 9 | #include <IO/ReadHelpers.h> | 
| 10 | #include <IO/WriteHelpers.h> | 
| 11 | #include <IO/copyData.h> | 
| 12 | #include <DataStreams/NativeBlockInputStream.h> | 
| 13 | #include <DataStreams/NativeBlockOutputStream.h> | 
| 14 | #include <Client/Connection.h> | 
| 15 | #include <Client/TimeoutSetter.h> | 
| 16 | #include <Common/ClickHouseRevision.h> | 
| 17 | #include <Common/Exception.h> | 
| 18 | #include <Common/NetException.h> | 
| 19 | #include <Common/CurrentMetrics.h> | 
| 20 | #include <Common/DNSResolver.h> | 
| 21 | #include <Common/StringUtils/StringUtils.h> | 
| 22 | #include <Common/config_version.h> | 
| 23 | #include <Interpreters/ClientInfo.h> | 
| 24 | #include <Compression/CompressionFactory.h> | 
| 25 |  | 
| 26 | #include <Common/config.h> | 
| 27 | #if USE_POCO_NETSSL | 
| 28 | #include <Poco/Net/SecureStreamSocket.h> | 
| 29 | #endif | 
| 30 |  | 
| 31 | namespace CurrentMetrics | 
| 32 | { | 
| 33 |     extern const Metric SendScalars; | 
| 34 |     extern const Metric SendExternalTables; | 
| 35 | } | 
| 36 |  | 
| 37 | namespace DB | 
| 38 | { | 
| 39 |  | 
| 40 | namespace ErrorCodes | 
| 41 | { | 
| 42 |     extern const int NETWORK_ERROR; | 
| 43 |     extern const int SOCKET_TIMEOUT; | 
| 44 |     extern const int SERVER_REVISION_IS_TOO_OLD; | 
| 45 |     extern const int UNEXPECTED_PACKET_FROM_SERVER; | 
| 46 |     extern const int UNKNOWN_PACKET_FROM_SERVER; | 
| 47 |     extern const int SUPPORT_IS_DISABLED; | 
| 48 |     extern const int BAD_ARGUMENTS; | 
| 49 | } | 
| 50 |  | 
| 51 |  | 
| 52 | void Connection::connect(const ConnectionTimeouts & timeouts) | 
| 53 | { | 
| 54 |     try | 
| 55 |     { | 
| 56 |         if (connected) | 
| 57 |             disconnect(); | 
| 58 |  | 
| 59 |         LOG_TRACE(log_wrapper.get(), "Connecting. Database: "  << (default_database.empty() ? "(not specified)"  : default_database) << ". User: "  << user | 
| 60 |         << (static_cast<bool>(secure) ? ". Secure"  : "" ) << (static_cast<bool>(compression) ? ""  : ". Uncompressed" )); | 
| 61 |  | 
| 62 |         if (static_cast<bool>(secure)) | 
| 63 |         { | 
| 64 | #if USE_POCO_NETSSL | 
| 65 |             socket = std::make_unique<Poco::Net::SecureStreamSocket>(); | 
| 66 | #else | 
| 67 |             throw Exception{"tcp_secure protocol is disabled because poco library was built without NetSSL support." , ErrorCodes::SUPPORT_IS_DISABLED}; | 
| 68 | #endif | 
| 69 |         } | 
| 70 |         else | 
| 71 |         { | 
| 72 |             socket = std::make_unique<Poco::Net::StreamSocket>(); | 
| 73 |         } | 
| 74 |  | 
| 75 |         current_resolved_address = DNSResolver::instance().resolveAddress(host, port); | 
| 76 |  | 
| 77 |         socket->connect(*current_resolved_address, timeouts.connection_timeout); | 
| 78 |         socket->setReceiveTimeout(timeouts.receive_timeout); | 
| 79 |         socket->setSendTimeout(timeouts.send_timeout); | 
| 80 |         socket->setNoDelay(true); | 
| 81 |         if (timeouts.tcp_keep_alive_timeout.totalSeconds()) | 
| 82 |         { | 
| 83 |             socket->setKeepAlive(true); | 
| 84 |             socket->setOption(IPPROTO_TCP, | 
| 85 | #if defined(TCP_KEEPALIVE) | 
| 86 |                 TCP_KEEPALIVE | 
| 87 | #else | 
| 88 |                 TCP_KEEPIDLE  // __APPLE__ | 
| 89 | #endif | 
| 90 |                 , timeouts.tcp_keep_alive_timeout); | 
| 91 |         } | 
| 92 |  | 
| 93 |         in = std::make_shared<ReadBufferFromPocoSocket>(*socket); | 
| 94 |         out = std::make_shared<WriteBufferFromPocoSocket>(*socket); | 
| 95 |  | 
| 96 |         connected = true; | 
| 97 |  | 
| 98 |         sendHello(); | 
| 99 |         receiveHello(); | 
| 100 |  | 
| 101 |         LOG_TRACE(log_wrapper.get(), "Connected to "  << server_name | 
| 102 |             << " server version "  << server_version_major | 
| 103 |             << "."  << server_version_minor | 
| 104 |             << "."  << server_version_patch | 
| 105 |             << "." ); | 
| 106 |     } | 
| 107 |     catch (Poco::Net::NetException & e) | 
| 108 |     { | 
| 109 |         disconnect(); | 
| 110 |  | 
| 111 |         /// Add server address to exception. Also Exception will remember stack trace. It's a pity that more precise exception type is lost. | 
| 112 |         throw NetException(e.displayText() + " ("  + getDescription() + ")" , ErrorCodes::NETWORK_ERROR); | 
| 113 |     } | 
| 114 |     catch (Poco::TimeoutException & e) | 
| 115 |     { | 
| 116 |         disconnect(); | 
| 117 |  | 
| 118 |         /// Add server address to exception. Also Exception will remember stack trace. It's a pity that more precise exception type is lost. | 
| 119 |         throw NetException(e.displayText() + " ("  + getDescription() + ")" , ErrorCodes::SOCKET_TIMEOUT); | 
| 120 |     } | 
| 121 | } | 
| 122 |  | 
| 123 |  | 
| 124 | void Connection::disconnect() | 
| 125 | { | 
| 126 |     //LOG_TRACE(log_wrapper.get(), "Disconnecting"); | 
| 127 |  | 
| 128 |     in = nullptr; | 
| 129 |     last_input_packet_type.reset(); | 
| 130 |     out = nullptr; // can write to socket | 
| 131 |     if (socket) | 
| 132 |         socket->close(); | 
| 133 |     socket = nullptr; | 
| 134 |     connected = false; | 
| 135 | } | 
| 136 |  | 
| 137 |  | 
| 138 | void Connection::sendHello() | 
| 139 | { | 
| 140 |     /** Disallow control characters in user controlled parameters | 
| 141 |       *  to mitigate the possibility of SSRF. | 
| 142 |       * The user may do server side requests with 'remote' table function. | 
| 143 |       * Malicious user with full r/w access to ClickHouse | 
| 144 |       *  may use 'remote' table function to forge requests | 
| 145 |       *  to another services in the network other than ClickHouse (examples: SMTP). | 
| 146 |       * Limiting number of possible characters in user-controlled part of handshake | 
| 147 |       *  will mitigate this possibility but doesn't solve it completely. | 
| 148 |       */ | 
| 149 |     auto has_control_character = [](const std::string & s) | 
| 150 |     { | 
| 151 |         for (auto c : s) | 
| 152 |             if (isControlASCII(c)) | 
| 153 |                 return true; | 
| 154 |         return false; | 
| 155 |     }; | 
| 156 |  | 
| 157 |     if (has_control_character(default_database) | 
| 158 |         || has_control_character(user) | 
| 159 |         || has_control_character(password)) | 
| 160 |         throw Exception("Parameters 'default_database', 'user' and 'password' must not contain ASCII control characters" , ErrorCodes::BAD_ARGUMENTS); | 
| 161 |  | 
| 162 |     writeVarUInt(Protocol::Client::Hello, *out); | 
| 163 |     writeStringBinary((DBMS_NAME " " ) + client_name, *out); | 
| 164 |     writeVarUInt(DBMS_VERSION_MAJOR, *out); | 
| 165 |     writeVarUInt(DBMS_VERSION_MINOR, *out); | 
| 166 |     // NOTE For backward compatibility of the protocol, client cannot send its version_patch. | 
| 167 |     writeVarUInt(ClickHouseRevision::get(), *out); | 
| 168 |     writeStringBinary(default_database, *out); | 
| 169 |     writeStringBinary(user, *out); | 
| 170 |     writeStringBinary(password, *out); | 
| 171 |  | 
| 172 |     out->next(); | 
| 173 | } | 
| 174 |  | 
| 175 |  | 
| 176 | void Connection::receiveHello() | 
| 177 | { | 
| 178 |     //LOG_TRACE(log_wrapper.get(), "Receiving hello"); | 
| 179 |  | 
| 180 |     /// Receive hello packet. | 
| 181 |     UInt64 packet_type = 0; | 
| 182 |  | 
| 183 |     readVarUInt(packet_type, *in); | 
| 184 |     if (packet_type == Protocol::Server::Hello) | 
| 185 |     { | 
| 186 |         readStringBinary(server_name, *in); | 
| 187 |         readVarUInt(server_version_major, *in); | 
| 188 |         readVarUInt(server_version_minor, *in); | 
| 189 |         readVarUInt(server_revision, *in); | 
| 190 |         if (server_revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) | 
| 191 |             readStringBinary(server_timezone, *in); | 
| 192 |         if (server_revision >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME) | 
| 193 |             readStringBinary(server_display_name, *in); | 
| 194 |         if (server_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) | 
| 195 |             readVarUInt(server_version_patch, *in); | 
| 196 |         else | 
| 197 |             server_version_patch = server_revision; | 
| 198 |     } | 
| 199 |     else if (packet_type == Protocol::Server::Exception) | 
| 200 |         receiveException()->rethrow(); | 
| 201 |     else | 
| 202 |     { | 
| 203 |         /// Close connection, to not stay in unsynchronised state. | 
| 204 |         disconnect(); | 
| 205 |         throwUnexpectedPacket(packet_type, "Hello or Exception" ); | 
| 206 |     } | 
| 207 | } | 
| 208 |  | 
| 209 | void Connection::setDefaultDatabase(const String & database) | 
| 210 | { | 
| 211 |     default_database = database; | 
| 212 | } | 
| 213 |  | 
| 214 | const String & Connection::getDefaultDatabase() const | 
| 215 | { | 
| 216 |     return default_database; | 
| 217 | } | 
| 218 |  | 
| 219 | const String & Connection::getDescription() const | 
| 220 | { | 
| 221 |     return description; | 
| 222 | } | 
| 223 |  | 
| 224 | const String & Connection::getHost() const | 
| 225 | { | 
| 226 |     return host; | 
| 227 | } | 
| 228 |  | 
| 229 | UInt16 Connection::getPort() const | 
| 230 | { | 
| 231 |     return port; | 
| 232 | } | 
| 233 |  | 
| 234 | void Connection::getServerVersion(const ConnectionTimeouts & timeouts, | 
| 235 |                                   String & name, | 
| 236 |                                   UInt64 & version_major, | 
| 237 |                                   UInt64 & version_minor, | 
| 238 |                                   UInt64 & version_patch, | 
| 239 |                                   UInt64 & revision) | 
| 240 | { | 
| 241 |     if (!connected) | 
| 242 |         connect(timeouts); | 
| 243 |  | 
| 244 |     name = server_name; | 
| 245 |     version_major = server_version_major; | 
| 246 |     version_minor = server_version_minor; | 
| 247 |     version_patch = server_version_patch; | 
| 248 |     revision = server_revision; | 
| 249 | } | 
| 250 |  | 
| 251 | UInt64 Connection::getServerRevision(const ConnectionTimeouts & timeouts) | 
| 252 | { | 
| 253 |     if (!connected) | 
| 254 |         connect(timeouts); | 
| 255 |  | 
| 256 |     return server_revision; | 
| 257 | } | 
| 258 |  | 
| 259 | const String & Connection::getServerTimezone(const ConnectionTimeouts & timeouts) | 
| 260 | { | 
| 261 |     if (!connected) | 
| 262 |         connect(timeouts); | 
| 263 |  | 
| 264 |     return server_timezone; | 
| 265 | } | 
| 266 |  | 
| 267 | const String & Connection::getServerDisplayName(const ConnectionTimeouts & timeouts) | 
| 268 | { | 
| 269 |     if (!connected) | 
| 270 |         connect(timeouts); | 
| 271 |  | 
| 272 |     return server_display_name; | 
| 273 | } | 
| 274 |  | 
| 275 | void Connection::forceConnected(const ConnectionTimeouts & timeouts) | 
| 276 | { | 
| 277 |     if (!connected) | 
| 278 |     { | 
| 279 |         connect(timeouts); | 
| 280 |     } | 
| 281 |     else if (!ping()) | 
| 282 |     { | 
| 283 |         LOG_TRACE(log_wrapper.get(), "Connection was closed, will reconnect." ); | 
| 284 |         connect(timeouts); | 
| 285 |     } | 
| 286 | } | 
| 287 |  | 
| 288 | bool Connection::ping() | 
| 289 | { | 
| 290 |     // LOG_TRACE(log_wrapper.get(), "Ping"); | 
| 291 |  | 
| 292 |     TimeoutSetter timeout_setter(*socket, sync_request_timeout, true); | 
| 293 |     try | 
| 294 |     { | 
| 295 |         UInt64 pong = 0; | 
| 296 |         writeVarUInt(Protocol::Client::Ping, *out); | 
| 297 |         out->next(); | 
| 298 |  | 
| 299 |         if (in->eof()) | 
| 300 |             return false; | 
| 301 |  | 
| 302 |         readVarUInt(pong, *in); | 
| 303 |  | 
| 304 |         /// Could receive late packets with progress. TODO: Maybe possible to fix. | 
| 305 |         while (pong == Protocol::Server::Progress) | 
| 306 |         { | 
| 307 |             receiveProgress(); | 
| 308 |  | 
| 309 |             if (in->eof()) | 
| 310 |                 return false; | 
| 311 |  | 
| 312 |             readVarUInt(pong, *in); | 
| 313 |         } | 
| 314 |  | 
| 315 |         if (pong != Protocol::Server::Pong) | 
| 316 |             throwUnexpectedPacket(pong, "Pong" ); | 
| 317 |     } | 
| 318 |     catch (const Poco::Exception & e) | 
| 319 |     { | 
| 320 |         LOG_TRACE(log_wrapper.get(), e.displayText()); | 
| 321 |         return false; | 
| 322 |     } | 
| 323 |  | 
| 324 |     return true; | 
| 325 | } | 
| 326 |  | 
| 327 | TablesStatusResponse Connection::getTablesStatus(const ConnectionTimeouts & timeouts, | 
| 328 |                                                  const TablesStatusRequest & request) | 
| 329 | { | 
| 330 |     if (!connected) | 
| 331 |         connect(timeouts); | 
| 332 |  | 
| 333 |     TimeoutSetter timeout_setter(*socket, sync_request_timeout, true); | 
| 334 |  | 
| 335 |     writeVarUInt(Protocol::Client::TablesStatusRequest, *out); | 
| 336 |     request.write(*out, server_revision); | 
| 337 |     out->next(); | 
| 338 |  | 
| 339 |     UInt64 response_type = 0; | 
| 340 |     readVarUInt(response_type, *in); | 
| 341 |  | 
| 342 |     if (response_type == Protocol::Server::Exception) | 
| 343 |         receiveException()->rethrow(); | 
| 344 |     else if (response_type != Protocol::Server::TablesStatusResponse) | 
| 345 |         throwUnexpectedPacket(response_type, "TablesStatusResponse" ); | 
| 346 |  | 
| 347 |     TablesStatusResponse response; | 
| 348 |     response.read(*in, server_revision); | 
| 349 |     return response; | 
| 350 | } | 
| 351 |  | 
| 352 |  | 
| 353 | void Connection::sendQuery( | 
| 354 |     const ConnectionTimeouts & timeouts, | 
| 355 |     const String & query, | 
| 356 |     const String & query_id_, | 
| 357 |     UInt64 stage, | 
| 358 |     const Settings * settings, | 
| 359 |     const ClientInfo * client_info, | 
| 360 |     bool with_pending_data) | 
| 361 | { | 
| 362 |     if (!connected) | 
| 363 |         connect(timeouts); | 
| 364 |  | 
| 365 |     TimeoutSetter timeout_setter(*socket, timeouts.send_timeout, timeouts.receive_timeout, true); | 
| 366 |  | 
| 367 |     if (settings) | 
| 368 |     { | 
| 369 |         std::optional<int> level; | 
| 370 |         std::string method = Poco::toUpper(settings->network_compression_method.toString()); | 
| 371 |  | 
| 372 |         /// Bad custom logic | 
| 373 |         if (method == "ZSTD" ) | 
| 374 |             level = settings->network_zstd_compression_level; | 
| 375 |  | 
| 376 |         compression_codec = CompressionCodecFactory::instance().get(method, level); | 
| 377 |     } | 
| 378 |     else | 
| 379 |         compression_codec = CompressionCodecFactory::instance().getDefaultCodec(); | 
| 380 |  | 
| 381 |     query_id = query_id_; | 
| 382 |  | 
| 383 |     //LOG_TRACE(log_wrapper.get(), "Sending query"); | 
| 384 |  | 
| 385 |     writeVarUInt(Protocol::Client::Query, *out); | 
| 386 |     writeStringBinary(query_id, *out); | 
| 387 |  | 
| 388 |     /// Client info. | 
| 389 |     if (server_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO) | 
| 390 |     { | 
| 391 |         ClientInfo client_info_to_send; | 
| 392 |  | 
| 393 |         if (!client_info || client_info->empty()) | 
| 394 |         { | 
| 395 |             /// No client info passed - means this query initiated by me. | 
| 396 |             client_info_to_send.query_kind = ClientInfo::QueryKind::INITIAL_QUERY; | 
| 397 |             client_info_to_send.fillOSUserHostNameAndVersionInfo(); | 
| 398 |             client_info_to_send.client_name = (DBMS_NAME " " ) + client_name; | 
| 399 |         } | 
| 400 |         else | 
| 401 |         { | 
| 402 |             /// This query is initiated by another query. | 
| 403 |             client_info_to_send = *client_info; | 
| 404 |             client_info_to_send.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; | 
| 405 |         } | 
| 406 |  | 
| 407 |         client_info_to_send.write(*out, server_revision); | 
| 408 |     } | 
| 409 |  | 
| 410 |     /// Per query settings. | 
| 411 |     if (settings) | 
| 412 |     { | 
| 413 |         auto settings_format = (server_revision >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) ? SettingsBinaryFormat::STRINGS | 
| 414 |                                                                                                           : SettingsBinaryFormat::OLD; | 
| 415 |         settings->serialize(*out, settings_format); | 
| 416 |     } | 
| 417 |     else | 
| 418 |         writeStringBinary(""  /* empty string is a marker of the end of settings */, *out); | 
| 419 |  | 
| 420 |     writeVarUInt(stage, *out); | 
| 421 |     writeVarUInt(static_cast<bool>(compression), *out); | 
| 422 |  | 
| 423 |     writeStringBinary(query, *out); | 
| 424 |  | 
| 425 |     maybe_compressed_in.reset(); | 
| 426 |     maybe_compressed_out.reset(); | 
| 427 |     block_in.reset(); | 
| 428 |     block_logs_in.reset(); | 
| 429 |     block_out.reset(); | 
| 430 |  | 
| 431 |     /// Send empty block which means end of data. | 
| 432 |     if (!with_pending_data) | 
| 433 |     { | 
| 434 |         sendData(Block()); | 
| 435 |         out->next(); | 
| 436 |     } | 
| 437 | } | 
| 438 |  | 
| 439 |  | 
| 440 | void Connection::sendCancel() | 
| 441 | { | 
| 442 |     /// If we already disconnected. | 
| 443 |     if (!out) | 
| 444 |         return; | 
| 445 |  | 
| 446 |     //LOG_TRACE(log_wrapper.get(), "Sending cancel"); | 
| 447 |  | 
| 448 |     writeVarUInt(Protocol::Client::Cancel, *out); | 
| 449 |     out->next(); | 
| 450 | } | 
| 451 |  | 
| 452 |  | 
| 453 | void Connection::sendData(const Block & block, const String & name, bool scalar) | 
| 454 | { | 
| 455 |     //LOG_TRACE(log_wrapper.get(), "Sending data"); | 
| 456 |  | 
| 457 |     if (!block_out) | 
| 458 |     { | 
| 459 |         if (compression == Protocol::Compression::Enable) | 
| 460 |             maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(*out, compression_codec); | 
| 461 |         else | 
| 462 |             maybe_compressed_out = out; | 
| 463 |  | 
| 464 |         block_out = std::make_shared<NativeBlockOutputStream>(*maybe_compressed_out, server_revision, block.cloneEmpty()); | 
| 465 |     } | 
| 466 |  | 
| 467 |     if (scalar) | 
| 468 |         writeVarUInt(Protocol::Client::Scalar, *out); | 
| 469 |     else | 
| 470 |         writeVarUInt(Protocol::Client::Data, *out); | 
| 471 |     writeStringBinary(name, *out); | 
| 472 |  | 
| 473 |     size_t prev_bytes = out->count(); | 
| 474 |  | 
| 475 |     block_out->write(block); | 
| 476 |     maybe_compressed_out->next(); | 
| 477 |     out->next(); | 
| 478 |  | 
| 479 |     if (throttler) | 
| 480 |         throttler->add(out->count() - prev_bytes); | 
| 481 | } | 
| 482 |  | 
| 483 |  | 
| 484 | void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String & name) | 
| 485 | { | 
| 486 |     /// NOTE 'Throttler' is not used in this method (could use, but it's not important right now). | 
| 487 |  | 
| 488 |     writeVarUInt(Protocol::Client::Data, *out); | 
| 489 |     writeStringBinary(name, *out); | 
| 490 |  | 
| 491 |     if (0 == size) | 
| 492 |         copyData(input, *out); | 
| 493 |     else | 
| 494 |         copyData(input, *out, size); | 
| 495 |     out->next(); | 
| 496 | } | 
| 497 |  | 
| 498 |  | 
| 499 | void Connection::sendScalarsData(Scalars & data) | 
| 500 | { | 
| 501 |     if (data.empty()) | 
| 502 |         return; | 
| 503 |  | 
| 504 |     Stopwatch watch; | 
| 505 |     size_t out_bytes = out ? out->count() : 0; | 
| 506 |     size_t maybe_compressed_out_bytes = maybe_compressed_out ? maybe_compressed_out->count() : 0; | 
| 507 |     size_t rows = 0; | 
| 508 |  | 
| 509 |     CurrentMetrics::Increment metric_increment{CurrentMetrics::SendScalars}; | 
| 510 |  | 
| 511 |     for (auto & elem : data) | 
| 512 |     { | 
| 513 |         rows += elem.second.rows(); | 
| 514 |         sendData(elem.second, elem.first, true /* scalar */); | 
| 515 |     } | 
| 516 |  | 
| 517 |     out_bytes = out->count() - out_bytes; | 
| 518 |     maybe_compressed_out_bytes = maybe_compressed_out->count() - maybe_compressed_out_bytes; | 
| 519 |     double elapsed = watch.elapsedSeconds(); | 
| 520 |  | 
| 521 |     std::stringstream msg; | 
| 522 |     msg << std::fixed << std::setprecision(3); | 
| 523 |     msg << "Sent data for "  << data.size() << " scalars, total "  << rows << " rows in "  << elapsed << " sec., "  | 
| 524 |         << static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., "  | 
| 525 |         << maybe_compressed_out_bytes / 1048576.0 << " MiB ("  << maybe_compressed_out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)" ; | 
| 526 |  | 
| 527 |     if (compression == Protocol::Compression::Enable) | 
| 528 |         msg << ", compressed "  << static_cast<double>(maybe_compressed_out_bytes) / out_bytes << " times to "  | 
| 529 |             << out_bytes / 1048576.0 << " MiB ("  << out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)" ; | 
| 530 |     else | 
| 531 |         msg << ", no compression." ; | 
| 532 |  | 
| 533 |     LOG_DEBUG(log_wrapper.get(), msg.rdbuf()); | 
| 534 | } | 
| 535 |  | 
| 536 |  | 
| 537 | void Connection::sendExternalTablesData(ExternalTablesData & data) | 
| 538 | { | 
| 539 |     if (data.empty()) | 
| 540 |     { | 
| 541 |         /// Send empty block, which means end of data transfer. | 
| 542 |         sendData(Block()); | 
| 543 |         return; | 
| 544 |     } | 
| 545 |  | 
| 546 |     Stopwatch watch; | 
| 547 |     size_t out_bytes = out ? out->count() : 0; | 
| 548 |     size_t maybe_compressed_out_bytes = maybe_compressed_out ? maybe_compressed_out->count() : 0; | 
| 549 |     size_t rows = 0; | 
| 550 |  | 
| 551 |     CurrentMetrics::Increment metric_increment{CurrentMetrics::SendExternalTables}; | 
| 552 |  | 
| 553 |     for (auto & elem : data) | 
| 554 |     { | 
| 555 |         elem.first->readPrefix(); | 
| 556 |         while (Block block = elem.first->read()) | 
| 557 |         { | 
| 558 |             rows += block.rows(); | 
| 559 |             sendData(block, elem.second); | 
| 560 |         } | 
| 561 |         elem.first->readSuffix(); | 
| 562 |     } | 
| 563 |  | 
| 564 |     /// Send empty block, which means end of data transfer. | 
| 565 |     sendData(Block()); | 
| 566 |  | 
| 567 |     out_bytes = out->count() - out_bytes; | 
| 568 |     maybe_compressed_out_bytes = maybe_compressed_out->count() - maybe_compressed_out_bytes; | 
| 569 |     double elapsed = watch.elapsedSeconds(); | 
| 570 |  | 
| 571 |     std::stringstream msg; | 
| 572 |     msg << std::fixed << std::setprecision(3); | 
| 573 |     msg << "Sent data for "  << data.size() << " external tables, total "  << rows << " rows in "  << elapsed << " sec., "  | 
| 574 |         << static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., "  | 
| 575 |         << maybe_compressed_out_bytes / 1048576.0 << " MiB ("  << maybe_compressed_out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)" ; | 
| 576 |  | 
| 577 |     if (compression == Protocol::Compression::Enable) | 
| 578 |         msg << ", compressed "  << static_cast<double>(maybe_compressed_out_bytes) / out_bytes << " times to "  | 
| 579 |             << out_bytes / 1048576.0 << " MiB ("  << out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)" ; | 
| 580 |     else | 
| 581 |         msg << ", no compression." ; | 
| 582 |  | 
| 583 |     LOG_DEBUG(log_wrapper.get(), msg.rdbuf()); | 
| 584 | } | 
| 585 |  | 
| 586 | std::optional<Poco::Net::SocketAddress> Connection::getResolvedAddress() const | 
| 587 | { | 
| 588 |     return current_resolved_address; | 
| 589 | } | 
| 590 |  | 
| 591 |  | 
| 592 | bool Connection::poll(size_t timeout_microseconds) | 
| 593 | { | 
| 594 |     return static_cast<ReadBufferFromPocoSocket &>(*in).poll(timeout_microseconds); | 
| 595 | } | 
| 596 |  | 
| 597 |  | 
| 598 | bool Connection::hasReadPendingData() const | 
| 599 | { | 
| 600 |     return last_input_packet_type.has_value() || static_cast<const ReadBufferFromPocoSocket &>(*in).hasPendingData(); | 
| 601 | } | 
| 602 |  | 
| 603 |  | 
| 604 | std::optional<UInt64> Connection::checkPacket(size_t timeout_microseconds) | 
| 605 | { | 
| 606 |     if (last_input_packet_type.has_value()) | 
| 607 |         return last_input_packet_type; | 
| 608 |  | 
| 609 |     if (hasReadPendingData() || poll(timeout_microseconds)) | 
| 610 |     { | 
| 611 |         // LOG_TRACE(log_wrapper.get(), "Receiving packet type"); | 
| 612 |         UInt64 packet_type; | 
| 613 |         readVarUInt(packet_type, *in); | 
| 614 |  | 
| 615 |         last_input_packet_type.emplace(packet_type); | 
| 616 |         return last_input_packet_type; | 
| 617 |     } | 
| 618 |  | 
| 619 |     return {}; | 
| 620 | } | 
| 621 |  | 
| 622 |  | 
| 623 | Packet Connection::receivePacket() | 
| 624 | { | 
| 625 |     try | 
| 626 |     { | 
| 627 |         Packet res; | 
| 628 |  | 
| 629 |         /// Have we already read packet type? | 
| 630 |         if (last_input_packet_type) | 
| 631 |         { | 
| 632 |             res.type = *last_input_packet_type; | 
| 633 |             last_input_packet_type.reset(); | 
| 634 |         } | 
| 635 |         else | 
| 636 |         { | 
| 637 |             //LOG_TRACE(log_wrapper.get(), "Receiving packet type"); | 
| 638 |             readVarUInt(res.type, *in); | 
| 639 |         } | 
| 640 |  | 
| 641 |         //LOG_TRACE(log_wrapper.get(), "Receiving packet " << res.type << " " << Protocol::Server::toString(res.type)); | 
| 642 |         //std::cerr << "Client got packet: " << Protocol::Server::toString(res.type) << "\n"; | 
| 643 |         switch (res.type) | 
| 644 |         { | 
| 645 |             case Protocol::Server::Data: [[fallthrough]]; | 
| 646 |             case Protocol::Server::Totals: [[fallthrough]]; | 
| 647 |             case Protocol::Server::Extremes: | 
| 648 |                 res.block = receiveData(); | 
| 649 |                 return res; | 
| 650 |  | 
| 651 |             case Protocol::Server::Exception: | 
| 652 |                 res.exception = receiveException(); | 
| 653 |                 return res; | 
| 654 |  | 
| 655 |             case Protocol::Server::Progress: | 
| 656 |                 res.progress = receiveProgress(); | 
| 657 |                 return res; | 
| 658 |  | 
| 659 |             case Protocol::Server::ProfileInfo: | 
| 660 |                 res.profile_info = receiveProfileInfo(); | 
| 661 |                 return res; | 
| 662 |  | 
| 663 |             case Protocol::Server::Log: | 
| 664 |                 res.block = receiveLogData(); | 
| 665 |                 return res; | 
| 666 |  | 
| 667 |             case Protocol::Server::TableColumns: | 
| 668 |                 res.multistring_message = receiveMultistringMessage(res.type); | 
| 669 |                 return res; | 
| 670 |  | 
| 671 |             case Protocol::Server::EndOfStream: | 
| 672 |                 return res; | 
| 673 |  | 
| 674 |             default: | 
| 675 |                 /// In unknown state, disconnect - to not leave unsynchronised connection. | 
| 676 |                 disconnect(); | 
| 677 |                 throw Exception("Unknown packet "  | 
| 678 |                     + toString(res.type) | 
| 679 |                     + " from server "  + getDescription(), ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); | 
| 680 |         } | 
| 681 |     } | 
| 682 |     catch (Exception & e) | 
| 683 |     { | 
| 684 |         /// Add server address to exception message, if need. | 
| 685 |         if (e.code() != ErrorCodes::UNKNOWN_PACKET_FROM_SERVER) | 
| 686 |             e.addMessage("while receiving packet from "  + getDescription()); | 
| 687 |  | 
| 688 |         throw; | 
| 689 |     } | 
| 690 | } | 
| 691 |  | 
| 692 |  | 
| 693 | Block Connection::receiveData() | 
| 694 | { | 
| 695 |     //LOG_TRACE(log_wrapper.get(), "Receiving data"); | 
| 696 |  | 
| 697 |     initBlockInput(); | 
| 698 |     return receiveDataImpl(block_in); | 
| 699 | } | 
| 700 |  | 
| 701 |  | 
| 702 | Block Connection::receiveLogData() | 
| 703 | { | 
| 704 |     initBlockLogsInput(); | 
| 705 |     return receiveDataImpl(block_logs_in); | 
| 706 | } | 
| 707 |  | 
| 708 |  | 
| 709 | Block Connection::receiveDataImpl(BlockInputStreamPtr & stream) | 
| 710 | { | 
| 711 |     String external_table_name; | 
| 712 |     readStringBinary(external_table_name, *in); | 
| 713 |  | 
| 714 |     size_t prev_bytes = in->count(); | 
| 715 |  | 
| 716 |     /// Read one block from network. | 
| 717 |     Block res = stream->read(); | 
| 718 |  | 
| 719 |     if (throttler) | 
| 720 |         throttler->add(in->count() - prev_bytes); | 
| 721 |  | 
| 722 |     return res; | 
| 723 | } | 
| 724 |  | 
| 725 |  | 
| 726 | void Connection::initInputBuffers() | 
| 727 | { | 
| 728 |  | 
| 729 | } | 
| 730 |  | 
| 731 |  | 
| 732 | void Connection::initBlockInput() | 
| 733 | { | 
| 734 |     if (!block_in) | 
| 735 |     { | 
| 736 |         if (!maybe_compressed_in) | 
| 737 |         { | 
| 738 |             if (compression == Protocol::Compression::Enable) | 
| 739 |                 maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in); | 
| 740 |             else | 
| 741 |                 maybe_compressed_in = in; | 
| 742 |         } | 
| 743 |  | 
| 744 |         block_in = std::make_shared<NativeBlockInputStream>(*maybe_compressed_in, server_revision); | 
| 745 |     } | 
| 746 | } | 
| 747 |  | 
| 748 |  | 
| 749 | void Connection::initBlockLogsInput() | 
| 750 | { | 
| 751 |     if (!block_logs_in) | 
| 752 |     { | 
| 753 |         /// Have to return superset of SystemLogsQueue::getSampleBlock() columns | 
| 754 |         block_logs_in = std::make_shared<NativeBlockInputStream>(*in, server_revision); | 
| 755 |     } | 
| 756 | } | 
| 757 |  | 
| 758 |  | 
| 759 | void Connection::setDescription() | 
| 760 | { | 
| 761 |     auto resolved_address = getResolvedAddress(); | 
| 762 |     description = host + ":"  + toString(port); | 
| 763 |  | 
| 764 |     if (resolved_address) | 
| 765 |     { | 
| 766 |         auto ip_address = resolved_address->host().toString(); | 
| 767 |         if (host != ip_address) | 
| 768 |             description += ", "  + ip_address; | 
| 769 |     } | 
| 770 | } | 
| 771 |  | 
| 772 |  | 
| 773 | std::unique_ptr<Exception> Connection::receiveException() | 
| 774 | { | 
| 775 |     //LOG_TRACE(log_wrapper.get(), "Receiving exception"); | 
| 776 |  | 
| 777 |     Exception e; | 
| 778 |     readException(e, *in, "Received from "  + getDescription()); | 
| 779 |     return std::unique_ptr<Exception>{ e.clone() }; | 
| 780 | } | 
| 781 |  | 
| 782 |  | 
| 783 | std::vector<String> Connection::receiveMultistringMessage(UInt64 msg_type) | 
| 784 | { | 
| 785 |     size_t num = Protocol::Server::stringsInMessage(msg_type); | 
| 786 |     std::vector<String> strings(num); | 
| 787 |     for (size_t i = 0; i < num; ++i) | 
| 788 |         readStringBinary(strings[i], *in); | 
| 789 |     return strings; | 
| 790 | } | 
| 791 |  | 
| 792 |  | 
| 793 | Progress Connection::receiveProgress() | 
| 794 | { | 
| 795 |     //LOG_TRACE(log_wrapper.get(), "Receiving progress"); | 
| 796 |  | 
| 797 |     Progress progress; | 
| 798 |     progress.read(*in, server_revision); | 
| 799 |     return progress; | 
| 800 | } | 
| 801 |  | 
| 802 |  | 
| 803 | BlockStreamProfileInfo Connection::receiveProfileInfo() | 
| 804 | { | 
| 805 |     BlockStreamProfileInfo profile_info; | 
| 806 |     profile_info.read(*in); | 
| 807 |     return profile_info; | 
| 808 | } | 
| 809 |  | 
| 810 |  | 
| 811 | void Connection::throwUnexpectedPacket(UInt64 packet_type, const char * expected) const | 
| 812 | { | 
| 813 |     throw NetException( | 
| 814 |             "Unexpected packet from server "  + getDescription() + " (expected "  + expected | 
| 815 |             + ", got "  + String(Protocol::Server::toString(packet_type)) + ")" , | 
| 816 |             ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER); | 
| 817 | } | 
| 818 |  | 
| 819 | } | 
| 820 |  |