1#include <iomanip>
2
3#include <Poco/Net/NetException.h>
4#include <Core/Defines.h>
5#include <Compression/CompressedReadBuffer.h>
6#include <Compression/CompressedWriteBuffer.h>
7#include <IO/ReadBufferFromPocoSocket.h>
8#include <IO/WriteBufferFromPocoSocket.h>
9#include <IO/ReadHelpers.h>
10#include <IO/WriteHelpers.h>
11#include <IO/copyData.h>
12#include <DataStreams/NativeBlockInputStream.h>
13#include <DataStreams/NativeBlockOutputStream.h>
14#include <Client/Connection.h>
15#include <Client/TimeoutSetter.h>
16#include <Common/ClickHouseRevision.h>
17#include <Common/Exception.h>
18#include <Common/NetException.h>
19#include <Common/CurrentMetrics.h>
20#include <Common/DNSResolver.h>
21#include <Common/StringUtils/StringUtils.h>
22#include <Common/config_version.h>
23#include <Interpreters/ClientInfo.h>
24#include <Compression/CompressionFactory.h>
25
26#include <Common/config.h>
27#if USE_POCO_NETSSL
28#include <Poco/Net/SecureStreamSocket.h>
29#endif
30
31namespace CurrentMetrics
32{
33 extern const Metric SendScalars;
34 extern const Metric SendExternalTables;
35}
36
37namespace DB
38{
39
40namespace ErrorCodes
41{
42 extern const int NETWORK_ERROR;
43 extern const int SOCKET_TIMEOUT;
44 extern const int SERVER_REVISION_IS_TOO_OLD;
45 extern const int UNEXPECTED_PACKET_FROM_SERVER;
46 extern const int UNKNOWN_PACKET_FROM_SERVER;
47 extern const int SUPPORT_IS_DISABLED;
48 extern const int BAD_ARGUMENTS;
49}
50
51
52void Connection::connect(const ConnectionTimeouts & timeouts)
53{
54 try
55 {
56 if (connected)
57 disconnect();
58
59 LOG_TRACE(log_wrapper.get(), "Connecting. Database: " << (default_database.empty() ? "(not specified)" : default_database) << ". User: " << user
60 << (static_cast<bool>(secure) ? ". Secure" : "") << (static_cast<bool>(compression) ? "" : ". Uncompressed"));
61
62 if (static_cast<bool>(secure))
63 {
64#if USE_POCO_NETSSL
65 socket = std::make_unique<Poco::Net::SecureStreamSocket>();
66#else
67 throw Exception{"tcp_secure protocol is disabled because poco library was built without NetSSL support.", ErrorCodes::SUPPORT_IS_DISABLED};
68#endif
69 }
70 else
71 {
72 socket = std::make_unique<Poco::Net::StreamSocket>();
73 }
74
75 current_resolved_address = DNSResolver::instance().resolveAddress(host, port);
76
77 socket->connect(*current_resolved_address, timeouts.connection_timeout);
78 socket->setReceiveTimeout(timeouts.receive_timeout);
79 socket->setSendTimeout(timeouts.send_timeout);
80 socket->setNoDelay(true);
81 if (timeouts.tcp_keep_alive_timeout.totalSeconds())
82 {
83 socket->setKeepAlive(true);
84 socket->setOption(IPPROTO_TCP,
85#if defined(TCP_KEEPALIVE)
86 TCP_KEEPALIVE
87#else
88 TCP_KEEPIDLE // __APPLE__
89#endif
90 , timeouts.tcp_keep_alive_timeout);
91 }
92
93 in = std::make_shared<ReadBufferFromPocoSocket>(*socket);
94 out = std::make_shared<WriteBufferFromPocoSocket>(*socket);
95
96 connected = true;
97
98 sendHello();
99 receiveHello();
100
101 LOG_TRACE(log_wrapper.get(), "Connected to " << server_name
102 << " server version " << server_version_major
103 << "." << server_version_minor
104 << "." << server_version_patch
105 << ".");
106 }
107 catch (Poco::Net::NetException & e)
108 {
109 disconnect();
110
111 /// Add server address to exception. Also Exception will remember stack trace. It's a pity that more precise exception type is lost.
112 throw NetException(e.displayText() + " (" + getDescription() + ")", ErrorCodes::NETWORK_ERROR);
113 }
114 catch (Poco::TimeoutException & e)
115 {
116 disconnect();
117
118 /// Add server address to exception. Also Exception will remember stack trace. It's a pity that more precise exception type is lost.
119 throw NetException(e.displayText() + " (" + getDescription() + ")", ErrorCodes::SOCKET_TIMEOUT);
120 }
121}
122
123
124void Connection::disconnect()
125{
126 //LOG_TRACE(log_wrapper.get(), "Disconnecting");
127
128 in = nullptr;
129 last_input_packet_type.reset();
130 out = nullptr; // can write to socket
131 if (socket)
132 socket->close();
133 socket = nullptr;
134 connected = false;
135}
136
137
138void Connection::sendHello()
139{
140 /** Disallow control characters in user controlled parameters
141 * to mitigate the possibility of SSRF.
142 * The user may do server side requests with 'remote' table function.
143 * Malicious user with full r/w access to ClickHouse
144 * may use 'remote' table function to forge requests
145 * to another services in the network other than ClickHouse (examples: SMTP).
146 * Limiting number of possible characters in user-controlled part of handshake
147 * will mitigate this possibility but doesn't solve it completely.
148 */
149 auto has_control_character = [](const std::string & s)
150 {
151 for (auto c : s)
152 if (isControlASCII(c))
153 return true;
154 return false;
155 };
156
157 if (has_control_character(default_database)
158 || has_control_character(user)
159 || has_control_character(password))
160 throw Exception("Parameters 'default_database', 'user' and 'password' must not contain ASCII control characters", ErrorCodes::BAD_ARGUMENTS);
161
162 writeVarUInt(Protocol::Client::Hello, *out);
163 writeStringBinary((DBMS_NAME " ") + client_name, *out);
164 writeVarUInt(DBMS_VERSION_MAJOR, *out);
165 writeVarUInt(DBMS_VERSION_MINOR, *out);
166 // NOTE For backward compatibility of the protocol, client cannot send its version_patch.
167 writeVarUInt(ClickHouseRevision::get(), *out);
168 writeStringBinary(default_database, *out);
169 writeStringBinary(user, *out);
170 writeStringBinary(password, *out);
171
172 out->next();
173}
174
175
176void Connection::receiveHello()
177{
178 //LOG_TRACE(log_wrapper.get(), "Receiving hello");
179
180 /// Receive hello packet.
181 UInt64 packet_type = 0;
182
183 readVarUInt(packet_type, *in);
184 if (packet_type == Protocol::Server::Hello)
185 {
186 readStringBinary(server_name, *in);
187 readVarUInt(server_version_major, *in);
188 readVarUInt(server_version_minor, *in);
189 readVarUInt(server_revision, *in);
190 if (server_revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE)
191 readStringBinary(server_timezone, *in);
192 if (server_revision >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME)
193 readStringBinary(server_display_name, *in);
194 if (server_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH)
195 readVarUInt(server_version_patch, *in);
196 else
197 server_version_patch = server_revision;
198 }
199 else if (packet_type == Protocol::Server::Exception)
200 receiveException()->rethrow();
201 else
202 {
203 /// Close connection, to not stay in unsynchronised state.
204 disconnect();
205 throwUnexpectedPacket(packet_type, "Hello or Exception");
206 }
207}
208
209void Connection::setDefaultDatabase(const String & database)
210{
211 default_database = database;
212}
213
214const String & Connection::getDefaultDatabase() const
215{
216 return default_database;
217}
218
219const String & Connection::getDescription() const
220{
221 return description;
222}
223
224const String & Connection::getHost() const
225{
226 return host;
227}
228
229UInt16 Connection::getPort() const
230{
231 return port;
232}
233
234void Connection::getServerVersion(const ConnectionTimeouts & timeouts,
235 String & name,
236 UInt64 & version_major,
237 UInt64 & version_minor,
238 UInt64 & version_patch,
239 UInt64 & revision)
240{
241 if (!connected)
242 connect(timeouts);
243
244 name = server_name;
245 version_major = server_version_major;
246 version_minor = server_version_minor;
247 version_patch = server_version_patch;
248 revision = server_revision;
249}
250
251UInt64 Connection::getServerRevision(const ConnectionTimeouts & timeouts)
252{
253 if (!connected)
254 connect(timeouts);
255
256 return server_revision;
257}
258
259const String & Connection::getServerTimezone(const ConnectionTimeouts & timeouts)
260{
261 if (!connected)
262 connect(timeouts);
263
264 return server_timezone;
265}
266
267const String & Connection::getServerDisplayName(const ConnectionTimeouts & timeouts)
268{
269 if (!connected)
270 connect(timeouts);
271
272 return server_display_name;
273}
274
275void Connection::forceConnected(const ConnectionTimeouts & timeouts)
276{
277 if (!connected)
278 {
279 connect(timeouts);
280 }
281 else if (!ping())
282 {
283 LOG_TRACE(log_wrapper.get(), "Connection was closed, will reconnect.");
284 connect(timeouts);
285 }
286}
287
288bool Connection::ping()
289{
290 // LOG_TRACE(log_wrapper.get(), "Ping");
291
292 TimeoutSetter timeout_setter(*socket, sync_request_timeout, true);
293 try
294 {
295 UInt64 pong = 0;
296 writeVarUInt(Protocol::Client::Ping, *out);
297 out->next();
298
299 if (in->eof())
300 return false;
301
302 readVarUInt(pong, *in);
303
304 /// Could receive late packets with progress. TODO: Maybe possible to fix.
305 while (pong == Protocol::Server::Progress)
306 {
307 receiveProgress();
308
309 if (in->eof())
310 return false;
311
312 readVarUInt(pong, *in);
313 }
314
315 if (pong != Protocol::Server::Pong)
316 throwUnexpectedPacket(pong, "Pong");
317 }
318 catch (const Poco::Exception & e)
319 {
320 LOG_TRACE(log_wrapper.get(), e.displayText());
321 return false;
322 }
323
324 return true;
325}
326
327TablesStatusResponse Connection::getTablesStatus(const ConnectionTimeouts & timeouts,
328 const TablesStatusRequest & request)
329{
330 if (!connected)
331 connect(timeouts);
332
333 TimeoutSetter timeout_setter(*socket, sync_request_timeout, true);
334
335 writeVarUInt(Protocol::Client::TablesStatusRequest, *out);
336 request.write(*out, server_revision);
337 out->next();
338
339 UInt64 response_type = 0;
340 readVarUInt(response_type, *in);
341
342 if (response_type == Protocol::Server::Exception)
343 receiveException()->rethrow();
344 else if (response_type != Protocol::Server::TablesStatusResponse)
345 throwUnexpectedPacket(response_type, "TablesStatusResponse");
346
347 TablesStatusResponse response;
348 response.read(*in, server_revision);
349 return response;
350}
351
352
353void Connection::sendQuery(
354 const ConnectionTimeouts & timeouts,
355 const String & query,
356 const String & query_id_,
357 UInt64 stage,
358 const Settings * settings,
359 const ClientInfo * client_info,
360 bool with_pending_data)
361{
362 if (!connected)
363 connect(timeouts);
364
365 TimeoutSetter timeout_setter(*socket, timeouts.send_timeout, timeouts.receive_timeout, true);
366
367 if (settings)
368 {
369 std::optional<int> level;
370 std::string method = Poco::toUpper(settings->network_compression_method.toString());
371
372 /// Bad custom logic
373 if (method == "ZSTD")
374 level = settings->network_zstd_compression_level;
375
376 compression_codec = CompressionCodecFactory::instance().get(method, level);
377 }
378 else
379 compression_codec = CompressionCodecFactory::instance().getDefaultCodec();
380
381 query_id = query_id_;
382
383 //LOG_TRACE(log_wrapper.get(), "Sending query");
384
385 writeVarUInt(Protocol::Client::Query, *out);
386 writeStringBinary(query_id, *out);
387
388 /// Client info.
389 if (server_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO)
390 {
391 ClientInfo client_info_to_send;
392
393 if (!client_info || client_info->empty())
394 {
395 /// No client info passed - means this query initiated by me.
396 client_info_to_send.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
397 client_info_to_send.fillOSUserHostNameAndVersionInfo();
398 client_info_to_send.client_name = (DBMS_NAME " ") + client_name;
399 }
400 else
401 {
402 /// This query is initiated by another query.
403 client_info_to_send = *client_info;
404 client_info_to_send.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
405 }
406
407 client_info_to_send.write(*out, server_revision);
408 }
409
410 /// Per query settings.
411 if (settings)
412 {
413 auto settings_format = (server_revision >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) ? SettingsBinaryFormat::STRINGS
414 : SettingsBinaryFormat::OLD;
415 settings->serialize(*out, settings_format);
416 }
417 else
418 writeStringBinary("" /* empty string is a marker of the end of settings */, *out);
419
420 writeVarUInt(stage, *out);
421 writeVarUInt(static_cast<bool>(compression), *out);
422
423 writeStringBinary(query, *out);
424
425 maybe_compressed_in.reset();
426 maybe_compressed_out.reset();
427 block_in.reset();
428 block_logs_in.reset();
429 block_out.reset();
430
431 /// Send empty block which means end of data.
432 if (!with_pending_data)
433 {
434 sendData(Block());
435 out->next();
436 }
437}
438
439
440void Connection::sendCancel()
441{
442 /// If we already disconnected.
443 if (!out)
444 return;
445
446 //LOG_TRACE(log_wrapper.get(), "Sending cancel");
447
448 writeVarUInt(Protocol::Client::Cancel, *out);
449 out->next();
450}
451
452
453void Connection::sendData(const Block & block, const String & name, bool scalar)
454{
455 //LOG_TRACE(log_wrapper.get(), "Sending data");
456
457 if (!block_out)
458 {
459 if (compression == Protocol::Compression::Enable)
460 maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(*out, compression_codec);
461 else
462 maybe_compressed_out = out;
463
464 block_out = std::make_shared<NativeBlockOutputStream>(*maybe_compressed_out, server_revision, block.cloneEmpty());
465 }
466
467 if (scalar)
468 writeVarUInt(Protocol::Client::Scalar, *out);
469 else
470 writeVarUInt(Protocol::Client::Data, *out);
471 writeStringBinary(name, *out);
472
473 size_t prev_bytes = out->count();
474
475 block_out->write(block);
476 maybe_compressed_out->next();
477 out->next();
478
479 if (throttler)
480 throttler->add(out->count() - prev_bytes);
481}
482
483
484void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String & name)
485{
486 /// NOTE 'Throttler' is not used in this method (could use, but it's not important right now).
487
488 writeVarUInt(Protocol::Client::Data, *out);
489 writeStringBinary(name, *out);
490
491 if (0 == size)
492 copyData(input, *out);
493 else
494 copyData(input, *out, size);
495 out->next();
496}
497
498
499void Connection::sendScalarsData(Scalars & data)
500{
501 if (data.empty())
502 return;
503
504 Stopwatch watch;
505 size_t out_bytes = out ? out->count() : 0;
506 size_t maybe_compressed_out_bytes = maybe_compressed_out ? maybe_compressed_out->count() : 0;
507 size_t rows = 0;
508
509 CurrentMetrics::Increment metric_increment{CurrentMetrics::SendScalars};
510
511 for (auto & elem : data)
512 {
513 rows += elem.second.rows();
514 sendData(elem.second, elem.first, true /* scalar */);
515 }
516
517 out_bytes = out->count() - out_bytes;
518 maybe_compressed_out_bytes = maybe_compressed_out->count() - maybe_compressed_out_bytes;
519 double elapsed = watch.elapsedSeconds();
520
521 std::stringstream msg;
522 msg << std::fixed << std::setprecision(3);
523 msg << "Sent data for " << data.size() << " scalars, total " << rows << " rows in " << elapsed << " sec., "
524 << static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., "
525 << maybe_compressed_out_bytes / 1048576.0 << " MiB (" << maybe_compressed_out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)";
526
527 if (compression == Protocol::Compression::Enable)
528 msg << ", compressed " << static_cast<double>(maybe_compressed_out_bytes) / out_bytes << " times to "
529 << out_bytes / 1048576.0 << " MiB (" << out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)";
530 else
531 msg << ", no compression.";
532
533 LOG_DEBUG(log_wrapper.get(), msg.rdbuf());
534}
535
536
537void Connection::sendExternalTablesData(ExternalTablesData & data)
538{
539 if (data.empty())
540 {
541 /// Send empty block, which means end of data transfer.
542 sendData(Block());
543 return;
544 }
545
546 Stopwatch watch;
547 size_t out_bytes = out ? out->count() : 0;
548 size_t maybe_compressed_out_bytes = maybe_compressed_out ? maybe_compressed_out->count() : 0;
549 size_t rows = 0;
550
551 CurrentMetrics::Increment metric_increment{CurrentMetrics::SendExternalTables};
552
553 for (auto & elem : data)
554 {
555 elem.first->readPrefix();
556 while (Block block = elem.first->read())
557 {
558 rows += block.rows();
559 sendData(block, elem.second);
560 }
561 elem.first->readSuffix();
562 }
563
564 /// Send empty block, which means end of data transfer.
565 sendData(Block());
566
567 out_bytes = out->count() - out_bytes;
568 maybe_compressed_out_bytes = maybe_compressed_out->count() - maybe_compressed_out_bytes;
569 double elapsed = watch.elapsedSeconds();
570
571 std::stringstream msg;
572 msg << std::fixed << std::setprecision(3);
573 msg << "Sent data for " << data.size() << " external tables, total " << rows << " rows in " << elapsed << " sec., "
574 << static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., "
575 << maybe_compressed_out_bytes / 1048576.0 << " MiB (" << maybe_compressed_out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)";
576
577 if (compression == Protocol::Compression::Enable)
578 msg << ", compressed " << static_cast<double>(maybe_compressed_out_bytes) / out_bytes << " times to "
579 << out_bytes / 1048576.0 << " MiB (" << out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)";
580 else
581 msg << ", no compression.";
582
583 LOG_DEBUG(log_wrapper.get(), msg.rdbuf());
584}
585
586std::optional<Poco::Net::SocketAddress> Connection::getResolvedAddress() const
587{
588 return current_resolved_address;
589}
590
591
592bool Connection::poll(size_t timeout_microseconds)
593{
594 return static_cast<ReadBufferFromPocoSocket &>(*in).poll(timeout_microseconds);
595}
596
597
598bool Connection::hasReadPendingData() const
599{
600 return last_input_packet_type.has_value() || static_cast<const ReadBufferFromPocoSocket &>(*in).hasPendingData();
601}
602
603
604std::optional<UInt64> Connection::checkPacket(size_t timeout_microseconds)
605{
606 if (last_input_packet_type.has_value())
607 return last_input_packet_type;
608
609 if (hasReadPendingData() || poll(timeout_microseconds))
610 {
611 // LOG_TRACE(log_wrapper.get(), "Receiving packet type");
612 UInt64 packet_type;
613 readVarUInt(packet_type, *in);
614
615 last_input_packet_type.emplace(packet_type);
616 return last_input_packet_type;
617 }
618
619 return {};
620}
621
622
623Packet Connection::receivePacket()
624{
625 try
626 {
627 Packet res;
628
629 /// Have we already read packet type?
630 if (last_input_packet_type)
631 {
632 res.type = *last_input_packet_type;
633 last_input_packet_type.reset();
634 }
635 else
636 {
637 //LOG_TRACE(log_wrapper.get(), "Receiving packet type");
638 readVarUInt(res.type, *in);
639 }
640
641 //LOG_TRACE(log_wrapper.get(), "Receiving packet " << res.type << " " << Protocol::Server::toString(res.type));
642 //std::cerr << "Client got packet: " << Protocol::Server::toString(res.type) << "\n";
643 switch (res.type)
644 {
645 case Protocol::Server::Data: [[fallthrough]];
646 case Protocol::Server::Totals: [[fallthrough]];
647 case Protocol::Server::Extremes:
648 res.block = receiveData();
649 return res;
650
651 case Protocol::Server::Exception:
652 res.exception = receiveException();
653 return res;
654
655 case Protocol::Server::Progress:
656 res.progress = receiveProgress();
657 return res;
658
659 case Protocol::Server::ProfileInfo:
660 res.profile_info = receiveProfileInfo();
661 return res;
662
663 case Protocol::Server::Log:
664 res.block = receiveLogData();
665 return res;
666
667 case Protocol::Server::TableColumns:
668 res.multistring_message = receiveMultistringMessage(res.type);
669 return res;
670
671 case Protocol::Server::EndOfStream:
672 return res;
673
674 default:
675 /// In unknown state, disconnect - to not leave unsynchronised connection.
676 disconnect();
677 throw Exception("Unknown packet "
678 + toString(res.type)
679 + " from server " + getDescription(), ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
680 }
681 }
682 catch (Exception & e)
683 {
684 /// Add server address to exception message, if need.
685 if (e.code() != ErrorCodes::UNKNOWN_PACKET_FROM_SERVER)
686 e.addMessage("while receiving packet from " + getDescription());
687
688 throw;
689 }
690}
691
692
693Block Connection::receiveData()
694{
695 //LOG_TRACE(log_wrapper.get(), "Receiving data");
696
697 initBlockInput();
698 return receiveDataImpl(block_in);
699}
700
701
702Block Connection::receiveLogData()
703{
704 initBlockLogsInput();
705 return receiveDataImpl(block_logs_in);
706}
707
708
709Block Connection::receiveDataImpl(BlockInputStreamPtr & stream)
710{
711 String external_table_name;
712 readStringBinary(external_table_name, *in);
713
714 size_t prev_bytes = in->count();
715
716 /// Read one block from network.
717 Block res = stream->read();
718
719 if (throttler)
720 throttler->add(in->count() - prev_bytes);
721
722 return res;
723}
724
725
726void Connection::initInputBuffers()
727{
728
729}
730
731
732void Connection::initBlockInput()
733{
734 if (!block_in)
735 {
736 if (!maybe_compressed_in)
737 {
738 if (compression == Protocol::Compression::Enable)
739 maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in);
740 else
741 maybe_compressed_in = in;
742 }
743
744 block_in = std::make_shared<NativeBlockInputStream>(*maybe_compressed_in, server_revision);
745 }
746}
747
748
749void Connection::initBlockLogsInput()
750{
751 if (!block_logs_in)
752 {
753 /// Have to return superset of SystemLogsQueue::getSampleBlock() columns
754 block_logs_in = std::make_shared<NativeBlockInputStream>(*in, server_revision);
755 }
756}
757
758
759void Connection::setDescription()
760{
761 auto resolved_address = getResolvedAddress();
762 description = host + ":" + toString(port);
763
764 if (resolved_address)
765 {
766 auto ip_address = resolved_address->host().toString();
767 if (host != ip_address)
768 description += ", " + ip_address;
769 }
770}
771
772
773std::unique_ptr<Exception> Connection::receiveException()
774{
775 //LOG_TRACE(log_wrapper.get(), "Receiving exception");
776
777 Exception e;
778 readException(e, *in, "Received from " + getDescription());
779 return std::unique_ptr<Exception>{ e.clone() };
780}
781
782
783std::vector<String> Connection::receiveMultistringMessage(UInt64 msg_type)
784{
785 size_t num = Protocol::Server::stringsInMessage(msg_type);
786 std::vector<String> strings(num);
787 for (size_t i = 0; i < num; ++i)
788 readStringBinary(strings[i], *in);
789 return strings;
790}
791
792
793Progress Connection::receiveProgress()
794{
795 //LOG_TRACE(log_wrapper.get(), "Receiving progress");
796
797 Progress progress;
798 progress.read(*in, server_revision);
799 return progress;
800}
801
802
803BlockStreamProfileInfo Connection::receiveProfileInfo()
804{
805 BlockStreamProfileInfo profile_info;
806 profile_info.read(*in);
807 return profile_info;
808}
809
810
811void Connection::throwUnexpectedPacket(UInt64 packet_type, const char * expected) const
812{
813 throw NetException(
814 "Unexpected packet from server " + getDescription() + " (expected " + expected
815 + ", got " + String(Protocol::Server::toString(packet_type)) + ")",
816 ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
817}
818
819}
820