1#include <iomanip>
2#include <ext/scope_guard.h>
3#include <Poco/Net/NetException.h>
4#include <Common/ClickHouseRevision.h>
5#include <Common/CurrentThread.h>
6#include <Common/Stopwatch.h>
7#include <Common/NetException.h>
8#include <Common/setThreadName.h>
9#include <Common/config_version.h>
10#include <IO/Progress.h>
11#include <Compression/CompressedReadBuffer.h>
12#include <Compression/CompressedWriteBuffer.h>
13#include <IO/ReadBufferFromPocoSocket.h>
14#include <IO/WriteBufferFromPocoSocket.h>
15#include <IO/ReadHelpers.h>
16#include <IO/WriteHelpers.h>
17#include <IO/copyData.h>
18#include <DataStreams/AsynchronousBlockInputStream.h>
19#include <DataStreams/NativeBlockInputStream.h>
20#include <DataStreams/NativeBlockOutputStream.h>
21#include <Interpreters/executeQuery.h>
22#include <Interpreters/TablesStatus.h>
23#include <Interpreters/InternalTextLogsQueue.h>
24#include <Storages/StorageMemory.h>
25#include <Storages/StorageReplicatedMergeTree.h>
26#include <Core/ExternalTable.h>
27#include <Storages/ColumnDefault.h>
28#include <DataTypes/DataTypeLowCardinality.h>
29#include <Compression/CompressionFactory.h>
30#include <common/logger_useful.h>
31
32#include <Processors/Formats/LazyOutputFormat.h>
33
34#include "TCPHandler.h"
35
36
37namespace DB
38{
39
40namespace ErrorCodes
41{
42 extern const int CLIENT_HAS_CONNECTED_TO_WRONG_PORT;
43 extern const int UNKNOWN_DATABASE;
44 extern const int UNKNOWN_EXCEPTION;
45 extern const int UNKNOWN_PACKET_FROM_CLIENT;
46 extern const int POCO_EXCEPTION;
47 extern const int STD_EXCEPTION;
48 extern const int SOCKET_TIMEOUT;
49 extern const int UNEXPECTED_PACKET_FROM_CLIENT;
50}
51
52
53void TCPHandler::runImpl()
54{
55 setThreadName("TCPHandler");
56 ThreadStatus thread_status;
57
58 connection_context = server.context();
59 connection_context.makeSessionContext();
60
61 /// These timeouts can be changed after receiving query.
62
63 auto global_receive_timeout = connection_context.getSettingsRef().receive_timeout;
64 auto global_send_timeout = connection_context.getSettingsRef().send_timeout;
65
66 socket().setReceiveTimeout(global_receive_timeout);
67 socket().setSendTimeout(global_send_timeout);
68 socket().setNoDelay(true);
69
70 in = std::make_shared<ReadBufferFromPocoSocket>(socket());
71 out = std::make_shared<WriteBufferFromPocoSocket>(socket());
72
73 if (in->eof())
74 {
75 LOG_WARNING(log, "Client has not sent any data.");
76 return;
77 }
78
79 /// User will be authenticated here. It will also set settings from user profile into connection_context.
80 try
81 {
82 receiveHello();
83 }
84 catch (const Exception & e) /// Typical for an incorrect username, password, or address.
85 {
86 if (e.code() == ErrorCodes::CLIENT_HAS_CONNECTED_TO_WRONG_PORT)
87 {
88 LOG_DEBUG(log, "Client has connected to wrong port.");
89 return;
90 }
91
92 if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
93 {
94 LOG_WARNING(log, "Client has gone away.");
95 return;
96 }
97
98 try
99 {
100 /// We try to send error information to the client.
101 sendException(e, connection_context.getSettingsRef().calculate_text_stack_trace);
102 }
103 catch (...) {}
104
105 throw;
106 }
107
108 /// When connecting, the default database can be specified.
109 if (!default_database.empty())
110 {
111 if (!connection_context.isDatabaseExist(default_database))
112 {
113 Exception e("Database " + backQuote(default_database) + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
114 LOG_ERROR(log, "Code: " << e.code() << ", e.displayText() = " << e.displayText()
115 << ", Stack trace:\n\n" << e.getStackTrace().toString());
116 sendException(e, connection_context.getSettingsRef().calculate_text_stack_trace);
117 return;
118 }
119
120 connection_context.setCurrentDatabase(default_database);
121 }
122
123 Settings connection_settings = connection_context.getSettings();
124
125 sendHello();
126
127 connection_context.setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); });
128
129 while (1)
130 {
131 /// We are waiting for a packet from the client. Thus, every `poll_interval` seconds check whether we need to shut down.
132 {
133 Stopwatch idle_time;
134 while (!server.isCancelled() && !static_cast<ReadBufferFromPocoSocket &>(*in).poll(
135 std::min(connection_settings.poll_interval, connection_settings.idle_connection_timeout) * 1000000))
136 {
137 if (idle_time.elapsedSeconds() > connection_settings.idle_connection_timeout)
138 {
139 LOG_TRACE(log, "Closing idle connection");
140 return;
141 }
142 }
143 }
144
145 /// If we need to shut down, or client disconnects.
146 if (server.isCancelled() || in->eof())
147 break;
148
149 /// Set context of request.
150 query_context = connection_context;
151
152 Stopwatch watch;
153 state.reset();
154
155 /// Initialized later.
156 std::optional<CurrentThread::QueryScope> query_scope;
157
158 /** An exception during the execution of request (it must be sent over the network to the client).
159 * The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet.
160 */
161 std::unique_ptr<Exception> exception;
162 bool network_error = false;
163
164 bool send_exception_with_stack_trace = connection_context.getSettingsRef().calculate_text_stack_trace;
165
166 try
167 {
168 /// If a user passed query-local timeouts, reset socket to initial state at the end of the query
169 SCOPE_EXIT({state.timeout_setter.reset();});
170
171 /** If Query - process it. If Ping or Cancel - go back to the beginning.
172 * There may come settings for a separate query that modify `query_context`.
173 */
174 if (!receivePacket())
175 continue;
176
177 query_scope.emplace(*query_context);
178
179 send_exception_with_stack_trace = query_context->getSettingsRef().calculate_text_stack_trace;
180
181 /// Should we send internal logs to client?
182 const auto client_logs_level = query_context->getSettingsRef().send_logs_level;
183 if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_LOGS
184 && client_logs_level != LogsLevel::none)
185 {
186 state.logs_queue = std::make_shared<InternalTextLogsQueue>();
187 state.logs_queue->max_priority = Poco::Logger::parseLevel(client_logs_level.toString());
188 CurrentThread::attachInternalTextLogsQueue(state.logs_queue, client_logs_level);
189 }
190
191 query_context->setExternalTablesInitializer([&connection_settings, this] (Context & context)
192 {
193 if (&context != &*query_context)
194 throw Exception("Unexpected context in external tables initializer", ErrorCodes::LOGICAL_ERROR);
195
196 /// Get blocks of temporary tables
197 readData(connection_settings);
198
199 /// Reset the input stream, as we received an empty block while receiving external table data.
200 /// So, the stream has been marked as cancelled and we can't read from it anymore.
201 state.block_in.reset();
202 state.maybe_compressed_in.reset(); /// For more accurate accounting by MemoryTracker.
203
204 state.temporary_tables_read = true;
205 });
206
207 /// Send structure of columns to client for function input()
208 query_context->setInputInitializer([this] (Context & context, const StoragePtr & input_storage)
209 {
210 if (&context != &query_context.value())
211 throw Exception("Unexpected context in Input initializer", ErrorCodes::LOGICAL_ERROR);
212
213 state.need_receive_data_for_input = true;
214
215 /// Send ColumnsDescription for input storage.
216 if (client_revision >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA
217 && query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
218 {
219 sendTableColumns(input_storage->getColumns());
220 }
221
222 /// Send block to the client - input storage structure.
223 state.input_header = input_storage->getSampleBlock();
224 sendData(state.input_header);
225 });
226
227 query_context->setInputBlocksReaderCallback([&connection_settings, this] (Context & context) -> Block
228 {
229 if (&context != &query_context.value())
230 throw Exception("Unexpected context in InputBlocksReader", ErrorCodes::LOGICAL_ERROR);
231
232 size_t poll_interval;
233 int receive_timeout;
234 std::tie(poll_interval, receive_timeout) = getReadTimeouts(connection_settings);
235 if (!readDataNext(poll_interval, receive_timeout))
236 {
237 state.block_in.reset();
238 state.maybe_compressed_in.reset();
239 return Block();
240 }
241 return state.block_for_input;
242 });
243
244 customizeContext(*query_context);
245
246 bool may_have_embedded_data = client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA;
247 /// Processing Query
248 state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);
249
250 if (state.io.out)
251 state.need_receive_data_for_insert = true;
252
253 after_check_cancelled.restart();
254 after_send_progress.restart();
255
256 /// Does the request require receive data from client?
257 if (state.need_receive_data_for_insert)
258 processInsertQuery(connection_settings);
259 else if (state.need_receive_data_for_input)
260 {
261 /// It is special case for input(), all works for reading data from client will be done in callbacks.
262 /// state.io.in is NullAndDoCopyBlockInputStream so read it once.
263 state.io.in->read();
264 state.io.onFinish();
265 }
266 else if (state.io.pipeline.initialized())
267 processOrdinaryQueryWithProcessors(query_context->getSettingsRef().max_threads);
268 else
269 processOrdinaryQuery();
270
271 /// Do it before sending end of stream, to have a chance to show log message in client.
272 query_scope->logPeakMemoryUsage();
273
274 sendLogs();
275 sendEndOfStream();
276
277 query_scope.reset();
278 state.reset();
279 }
280 catch (const Exception & e)
281 {
282 state.io.onException();
283 exception.reset(e.clone());
284
285 if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT)
286 throw;
287
288 /// If a timeout occurred, try to inform client about it and close the session
289 if (e.code() == ErrorCodes::SOCKET_TIMEOUT)
290 network_error = true;
291 }
292 catch (const Poco::Net::NetException & e)
293 {
294 /** We can get here if there was an error during connection to the client,
295 * or in connection with a remote server that was used to process the request.
296 * It is not possible to distinguish between these two cases.
297 * Although in one of them, we have to send exception to the client, but in the other - we can not.
298 * We will try to send exception to the client in any case - see below.
299 */
300 state.io.onException();
301 exception = std::make_unique<Exception>(e.displayText(), ErrorCodes::POCO_EXCEPTION);
302 }
303 catch (const Poco::Exception & e)
304 {
305 state.io.onException();
306 exception = std::make_unique<Exception>(e.displayText(), ErrorCodes::POCO_EXCEPTION);
307 }
308 catch (const std::exception & e)
309 {
310 state.io.onException();
311 exception = std::make_unique<Exception>(e.what(), ErrorCodes::STD_EXCEPTION);
312 }
313 catch (...)
314 {
315 state.io.onException();
316 exception = std::make_unique<Exception>("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
317 }
318
319 try
320 {
321 if (exception)
322 {
323 try
324 {
325 /// Try to send logs to client, but it could be risky too
326 /// Assume that we can't break output here
327 sendLogs();
328 }
329 catch (...)
330 {
331 tryLogCurrentException(log, "Can't send logs to client");
332 }
333
334 sendException(*exception, send_exception_with_stack_trace);
335 }
336 }
337 catch (...)
338 {
339 /** Could not send exception information to the client. */
340 network_error = true;
341 LOG_WARNING(log, "Client has gone away.");
342 }
343
344 try
345 {
346 if (exception && !state.temporary_tables_read)
347 query_context->initializeExternalTablesIfSet();
348 }
349 catch (...)
350 {
351 network_error = true;
352 LOG_WARNING(log, "Can't read external tables after query failure.");
353 }
354
355
356 try
357 {
358 query_scope.reset();
359 state.reset();
360 }
361 catch (...)
362 {
363 /** During the processing of request, there was an exception that we caught and possibly sent to client.
364 * When destroying the request pipeline execution there was a second exception.
365 * For example, a pipeline could run in multiple threads, and an exception could occur in each of them.
366 * Ignore it.
367 */
368 }
369
370 watch.stop();
371
372 LOG_INFO(log, std::fixed << std::setprecision(3)
373 << "Processed in " << watch.elapsedSeconds() << " sec.");
374
375 /// It is important to destroy query context here. We do not want it to live arbitrarily longer than the query.
376 query_context.reset();
377
378 if (network_error)
379 break;
380 }
381}
382
383
384bool TCPHandler::readDataNext(const size_t & poll_interval, const int & receive_timeout)
385{
386 Stopwatch watch(CLOCK_MONOTONIC_COARSE);
387
388 /// We are waiting for a packet from the client. Thus, every `POLL_INTERVAL` seconds check whether we need to shut down.
389 while (true)
390 {
391 if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(poll_interval))
392 break;
393
394 /// Do we need to shut down?
395 if (server.isCancelled())
396 return false;
397
398 /** Have we waited for data for too long?
399 * If we periodically poll, the receive_timeout of the socket itself does not work.
400 * Therefore, an additional check is added.
401 */
402 double elapsed = watch.elapsedSeconds();
403 if (elapsed > receive_timeout)
404 {
405 std::stringstream ss;
406 ss << "Timeout exceeded while receiving data from client.";
407 ss << " Waited for " << static_cast<size_t>(elapsed) << " seconds,";
408 ss << " timeout is " << receive_timeout << " seconds.";
409
410 throw Exception(ss.str(), ErrorCodes::SOCKET_TIMEOUT);
411 }
412 }
413
414 /// If client disconnected.
415 if (in->eof())
416 return false;
417
418 /// We accept and process data. And if they are over, then we leave.
419 if (!receivePacket())
420 return false;
421
422 sendLogs();
423 return true;
424}
425
426
427std::tuple<size_t, int> TCPHandler::getReadTimeouts(const Settings & connection_settings)
428{
429 const auto receive_timeout = query_context->getSettingsRef().receive_timeout.value;
430
431 /// Poll interval should not be greater than receive_timeout
432 const size_t default_poll_interval = connection_settings.poll_interval * 1000000;
433 size_t current_poll_interval = static_cast<size_t>(receive_timeout.totalMicroseconds());
434 constexpr size_t min_poll_interval = 5000; // 5 ms
435 size_t poll_interval = std::max(min_poll_interval, std::min(default_poll_interval, current_poll_interval));
436
437 return std::make_tuple(poll_interval, receive_timeout.totalSeconds());
438}
439
440
441void TCPHandler::readData(const Settings & connection_settings)
442{
443 size_t poll_interval;
444 int receive_timeout;
445
446 std::tie(poll_interval, receive_timeout) = getReadTimeouts(connection_settings);
447 sendLogs();
448
449 while (true)
450 if (!readDataNext(poll_interval, receive_timeout))
451 return;
452}
453
454
455void TCPHandler::processInsertQuery(const Settings & connection_settings)
456{
457 /** Made above the rest of the lines, so that in case of `writePrefix` function throws an exception,
458 * client receive exception before sending data.
459 */
460 state.io.out->writePrefix();
461
462 /// Send ColumnsDescription for insertion table
463 if (client_revision >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA)
464 {
465 const auto & db_and_table = query_context->getInsertionTable();
466 if (query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
467 {
468 if (!db_and_table.second.empty())
469 sendTableColumns(query_context->getTable(db_and_table.first, db_and_table.second)->getColumns());
470 }
471 }
472
473 /// Send block to the client - table structure.
474 sendData(state.io.out->getHeader());
475
476 readData(connection_settings);
477 state.io.out->writeSuffix();
478 state.io.onFinish();
479}
480
481
482void TCPHandler::processOrdinaryQuery()
483{
484 /// Pull query execution result, if exists, and send it to network.
485 if (state.io.in)
486 {
487 /// This allows the client to prepare output format
488 if (Block header = state.io.in->getHeader())
489 sendData(header);
490
491 /// Use of async mode here enables reporting progress and monitoring client cancelling the query
492 AsynchronousBlockInputStream async_in(state.io.in);
493
494 async_in.readPrefix();
495 while (true)
496 {
497 if (isQueryCancelled())
498 {
499 async_in.cancel(false);
500 break;
501 }
502
503 if (after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay)
504 {
505 /// Some time passed and there is a progress.
506 after_send_progress.restart();
507 sendProgress();
508 }
509
510 sendLogs();
511
512 if (async_in.poll(query_context->getSettingsRef().interactive_delay / 1000))
513 {
514 const auto block = async_in.read();
515 if (!block)
516 break;
517
518 if (!state.io.null_format)
519 sendData(block);
520 }
521 }
522 async_in.readSuffix();
523
524 /** When the data has run out, we send the profiling data and totals up to the terminating empty block,
525 * so that this information can be used in the suffix output of stream.
526 * If the request has been interrupted, then sendTotals and other methods should not be called,
527 * because we have not read all the data.
528 */
529 if (!isQueryCancelled())
530 {
531 sendTotals(state.io.in->getTotals());
532 sendExtremes(state.io.in->getExtremes());
533 sendProfileInfo(state.io.in->getProfileInfo());
534 sendProgress();
535 }
536
537 sendData({});
538 }
539
540 state.io.onFinish();
541}
542
543
544void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
545{
546 auto & pipeline = state.io.pipeline;
547
548 if (pipeline.getMaxThreads())
549 num_threads = pipeline.getMaxThreads();
550
551 /// Send header-block, to allow client to prepare output format for data to send.
552 {
553 auto & header = pipeline.getHeader();
554
555 if (header)
556 sendData(header);
557 }
558
559 auto lazy_format = std::make_shared<LazyOutputFormat>(pipeline.getHeader());
560 pipeline.setOutput(lazy_format);
561
562 {
563 auto thread_group = CurrentThread::getGroup();
564 ThreadPool pool(1);
565 auto executor = pipeline.execute();
566 std::atomic_bool exception = false;
567
568 pool.scheduleOrThrowOnError([&]()
569 {
570 /// ThreadStatus thread_status;
571
572 if (thread_group)
573 CurrentThread::attachTo(thread_group);
574
575 SCOPE_EXIT(
576 if (thread_group)
577 CurrentThread::detachQueryIfNotDetached();
578 );
579
580 CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
581 setThreadName("QueryPipelineEx");
582
583 try
584 {
585 executor->execute(num_threads);
586 }
587 catch (...)
588 {
589 exception = true;
590 throw;
591 }
592 });
593
594 /// Wait in case of exception. Delete pipeline to release memory.
595 SCOPE_EXIT(
596 /// Clear queue in case if somebody is waiting lazy_format to push.
597 lazy_format->finish();
598 lazy_format->clearQueue();
599
600 try
601 {
602 pool.wait();
603 }
604 catch (...)
605 {
606 /// If exception was thrown during pipeline execution, skip it while processing other exception.
607 }
608
609 pipeline = QueryPipeline()
610 );
611
612 while (true)
613 {
614 Block block;
615
616 while (true)
617 {
618 if (isQueryCancelled())
619 {
620 /// A packet was received requesting to stop execution of the request.
621 executor->cancel();
622
623 break;
624 }
625 else
626 {
627 if (after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay)
628 {
629 /// Some time passed and there is a progress.
630 after_send_progress.restart();
631 sendProgress();
632 }
633
634 sendLogs();
635
636 if ((block = lazy_format->getBlock(query_context->getSettingsRef().interactive_delay / 1000)))
637 break;
638
639 if (lazy_format->isFinished())
640 break;
641
642 if (exception)
643 {
644 pool.wait();
645 break;
646 }
647 }
648 }
649
650 /** If data has run out, we will send the profiling data and total values to
651 * the last zero block to be able to use
652 * this information in the suffix output of stream.
653 * If the request was interrupted, then `sendTotals` and other methods could not be called,
654 * because we have not read all the data yet,
655 * and there could be ongoing calculations in other threads at the same time.
656 */
657 if (!block && !isQueryCancelled())
658 {
659 pool.wait();
660 pipeline.finalize();
661
662 sendTotals(lazy_format->getTotals());
663 sendExtremes(lazy_format->getExtremes());
664 sendProfileInfo(lazy_format->getProfileInfo());
665 sendProgress();
666 sendLogs();
667 }
668
669 sendData(block);
670 if (!block)
671 break;
672 }
673 }
674
675 state.io.onFinish();
676}
677
678
679void TCPHandler::processTablesStatusRequest()
680{
681 TablesStatusRequest request;
682 request.read(*in, client_revision);
683
684 TablesStatusResponse response;
685 for (const QualifiedTableName & table_name: request.tables)
686 {
687 StoragePtr table = connection_context.tryGetTable(table_name.database, table_name.table);
688 if (!table)
689 continue;
690
691 TableStatus status;
692 if (auto * replicated_table = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
693 {
694 status.is_replicated = true;
695 status.absolute_delay = replicated_table->getAbsoluteDelay();
696 }
697 else
698 status.is_replicated = false;
699
700 response.table_states_by_id.emplace(table_name, std::move(status));
701 }
702
703 writeVarUInt(Protocol::Server::TablesStatusResponse, *out);
704 response.write(*out, client_revision);
705}
706
707void TCPHandler::receiveUnexpectedTablesStatusRequest()
708{
709 TablesStatusRequest skip_request;
710 skip_request.read(*in, client_revision);
711
712 throw NetException("Unexpected packet TablesStatusRequest received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
713}
714
715void TCPHandler::sendProfileInfo(const BlockStreamProfileInfo & info)
716{
717 writeVarUInt(Protocol::Server::ProfileInfo, *out);
718 info.write(*out);
719 out->next();
720}
721
722
723void TCPHandler::sendTotals(const Block & totals)
724{
725 if (totals)
726 {
727 initBlockOutput(totals);
728
729 writeVarUInt(Protocol::Server::Totals, *out);
730 writeStringBinary("", *out);
731
732 state.block_out->write(totals);
733 state.maybe_compressed_out->next();
734 out->next();
735 }
736}
737
738
739void TCPHandler::sendExtremes(const Block & extremes)
740{
741 if (extremes)
742 {
743 initBlockOutput(extremes);
744
745 writeVarUInt(Protocol::Server::Extremes, *out);
746 writeStringBinary("", *out);
747
748 state.block_out->write(extremes);
749 state.maybe_compressed_out->next();
750 out->next();
751 }
752}
753
754
755void TCPHandler::receiveHello()
756{
757 /// Receive `hello` packet.
758 UInt64 packet_type = 0;
759 String user = "default";
760 String password;
761
762 readVarUInt(packet_type, *in);
763 if (packet_type != Protocol::Client::Hello)
764 {
765 /** If you accidentally accessed the HTTP protocol for a port destined for an internal TCP protocol,
766 * Then instead of the packet type, there will be G (GET) or P (POST), in most cases.
767 */
768 if (packet_type == 'G' || packet_type == 'P')
769 {
770 writeString("HTTP/1.0 400 Bad Request\r\n\r\n"
771 "Port " + server.config().getString("tcp_port") + " is for clickhouse-client program.\r\n"
772 "You must use port " + server.config().getString("http_port") + " for HTTP.\r\n",
773 *out);
774
775 throw Exception("Client has connected to wrong port", ErrorCodes::CLIENT_HAS_CONNECTED_TO_WRONG_PORT);
776 }
777 else
778 throw NetException("Unexpected packet from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
779 }
780
781 readStringBinary(client_name, *in);
782 readVarUInt(client_version_major, *in);
783 readVarUInt(client_version_minor, *in);
784 // NOTE For backward compatibility of the protocol, client cannot send its version_patch.
785 readVarUInt(client_revision, *in);
786 readStringBinary(default_database, *in);
787 readStringBinary(user, *in);
788 readStringBinary(password, *in);
789
790 LOG_DEBUG(log, "Connected " << client_name
791 << " version " << client_version_major
792 << "." << client_version_minor
793 << "." << client_version_patch
794 << ", revision: " << client_revision
795 << (!default_database.empty() ? ", database: " + default_database : "")
796 << (!user.empty() ? ", user: " + user : "")
797 << ".");
798
799 connection_context.setUser(user, password, socket().peerAddress(), "");
800}
801
802
803void TCPHandler::receiveUnexpectedHello()
804{
805 UInt64 skip_uint_64;
806 String skip_string;
807
808 readStringBinary(skip_string, *in);
809 readVarUInt(skip_uint_64, *in);
810 readVarUInt(skip_uint_64, *in);
811 readVarUInt(skip_uint_64, *in);
812 readStringBinary(skip_string, *in);
813 readStringBinary(skip_string, *in);
814 readStringBinary(skip_string, *in);
815
816 throw NetException("Unexpected packet Hello received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
817}
818
819
820void TCPHandler::sendHello()
821{
822 writeVarUInt(Protocol::Server::Hello, *out);
823 writeStringBinary(DBMS_NAME, *out);
824 writeVarUInt(DBMS_VERSION_MAJOR, *out);
825 writeVarUInt(DBMS_VERSION_MINOR, *out);
826 writeVarUInt(ClickHouseRevision::get(), *out);
827 if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE)
828 writeStringBinary(DateLUT::instance().getTimeZone(), *out);
829 if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME)
830 writeStringBinary(server_display_name, *out);
831 if (client_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH)
832 writeVarUInt(DBMS_VERSION_PATCH, *out);
833 out->next();
834}
835
836
837bool TCPHandler::receivePacket()
838{
839 UInt64 packet_type = 0;
840 readVarUInt(packet_type, *in);
841
842// std::cerr << "Server got packet: " << Protocol::Client::toString(packet_type) << "\n";
843
844 switch (packet_type)
845 {
846 case Protocol::Client::Query:
847 if (!state.empty())
848 receiveUnexpectedQuery();
849 receiveQuery();
850 return true;
851
852 case Protocol::Client::Data:
853 case Protocol::Client::Scalar:
854 if (state.empty())
855 receiveUnexpectedData();
856 return receiveData(packet_type == Protocol::Client::Scalar);
857
858 case Protocol::Client::Ping:
859 writeVarUInt(Protocol::Server::Pong, *out);
860 out->next();
861 return false;
862
863 case Protocol::Client::Cancel:
864 return false;
865
866 case Protocol::Client::Hello:
867 receiveUnexpectedHello();
868
869 case Protocol::Client::TablesStatusRequest:
870 if (!state.empty())
871 receiveUnexpectedTablesStatusRequest();
872 processTablesStatusRequest();
873 out->next();
874 return false;
875
876 default:
877 throw Exception("Unknown packet " + toString(packet_type) + " from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
878 }
879}
880
881
882void TCPHandler::receiveQuery()
883{
884 UInt64 stage = 0;
885 UInt64 compression = 0;
886
887 state.is_empty = false;
888 readStringBinary(state.query_id, *in);
889
890 query_context->setCurrentQueryId(state.query_id);
891
892 /// Client info
893 {
894 ClientInfo & client_info = query_context->getClientInfo();
895 if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO)
896 client_info.read(*in, client_revision);
897
898 /// For better support of old clients, that does not send ClientInfo.
899 if (client_info.query_kind == ClientInfo::QueryKind::NO_QUERY)
900 {
901 client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
902 client_info.client_name = client_name;
903 client_info.client_version_major = client_version_major;
904 client_info.client_version_minor = client_version_minor;
905 client_info.client_version_patch = client_version_patch;
906 client_info.client_revision = client_revision;
907 }
908
909 /// Set fields, that are known apriori.
910 client_info.interface = ClientInfo::Interface::TCP;
911
912 if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
913 {
914 /// 'Current' fields was set at receiveHello.
915 client_info.initial_user = client_info.current_user;
916 client_info.initial_query_id = client_info.current_query_id;
917 client_info.initial_address = client_info.current_address;
918 }
919 }
920
921 /// Per query settings.
922 Settings custom_settings{};
923 auto settings_format = (client_revision >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) ? SettingsBinaryFormat::STRINGS
924 : SettingsBinaryFormat::OLD;
925 custom_settings.deserialize(*in, settings_format);
926 auto settings_changes = custom_settings.changes();
927 query_context->checkSettingsConstraints(settings_changes);
928 query_context->applySettingsChanges(settings_changes);
929
930 Settings & settings = query_context->getSettingsRef();
931
932 /// Sync timeouts on client and server during current query to avoid dangling queries on server
933 /// NOTE: We use settings.send_timeout for the receive timeout and vice versa (change arguments ordering in TimeoutSetter),
934 /// because settings.send_timeout is client-side setting which has opposite meaning on the server side.
935 /// NOTE: these settings are applied only for current connection (not for distributed tables' connections)
936 state.timeout_setter = std::make_unique<TimeoutSetter>(socket(), settings.receive_timeout, settings.send_timeout);
937
938 readVarUInt(stage, *in);
939 state.stage = QueryProcessingStage::Enum(stage);
940
941 readVarUInt(compression, *in);
942 state.compression = static_cast<Protocol::Compression>(compression);
943
944 readStringBinary(state.query, *in);
945}
946
947void TCPHandler::receiveUnexpectedQuery()
948{
949 UInt64 skip_uint_64;
950 String skip_string;
951
952 readStringBinary(skip_string, *in);
953
954 ClientInfo & skip_client_info = query_context->getClientInfo();
955 if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO)
956 skip_client_info.read(*in, client_revision);
957
958 Settings & skip_settings = query_context->getSettingsRef();
959 auto settings_format = (client_revision >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) ? SettingsBinaryFormat::STRINGS
960 : SettingsBinaryFormat::OLD;
961 skip_settings.deserialize(*in, settings_format);
962
963 readVarUInt(skip_uint_64, *in);
964 readVarUInt(skip_uint_64, *in);
965 readStringBinary(skip_string, *in);
966
967 throw NetException("Unexpected packet Query received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
968}
969
970bool TCPHandler::receiveData(bool scalar)
971{
972 initBlockInput();
973
974 /// The name of the temporary table for writing data, default to empty string
975 String name;
976 readStringBinary(name, *in);
977
978 /// Read one block from the network and write it down
979 Block block = state.block_in->read();
980
981 if (block)
982 {
983 if (scalar)
984 query_context->addScalar(name, block);
985 else
986 {
987 /// If there is an insert request, then the data should be written directly to `state.io.out`.
988 /// Otherwise, we write the blocks in the temporary `external_table_name` table.
989 if (!state.need_receive_data_for_insert && !state.need_receive_data_for_input)
990 {
991 StoragePtr storage;
992 /// If such a table does not exist, create it.
993 if (!(storage = query_context->tryGetExternalTable(name)))
994 {
995 NamesAndTypesList columns = block.getNamesAndTypesList();
996 storage = StorageMemory::create("_external", name, ColumnsDescription{columns}, ConstraintsDescription{});
997 storage->startup();
998 query_context->addExternalTable(name, storage);
999 }
1000 /// The data will be written directly to the table.
1001 state.io.out = storage->write(ASTPtr(), *query_context);
1002 }
1003 if (state.need_receive_data_for_input)
1004 state.block_for_input = block;
1005 else
1006 state.io.out->write(block);
1007 }
1008 return true;
1009 }
1010 else
1011 return false;
1012}
1013
1014void TCPHandler::receiveUnexpectedData()
1015{
1016 String skip_external_table_name;
1017 readStringBinary(skip_external_table_name, *in);
1018
1019 std::shared_ptr<ReadBuffer> maybe_compressed_in;
1020
1021 if (last_block_in.compression == Protocol::Compression::Enable)
1022 maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in);
1023 else
1024 maybe_compressed_in = in;
1025
1026 auto skip_block_in = std::make_shared<NativeBlockInputStream>(
1027 *maybe_compressed_in,
1028 last_block_in.header,
1029 client_revision);
1030
1031 Block skip_block = skip_block_in->read();
1032 throw NetException("Unexpected packet Data received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
1033}
1034
1035void TCPHandler::initBlockInput()
1036{
1037 if (!state.block_in)
1038 {
1039 if (state.compression == Protocol::Compression::Enable)
1040 state.maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in);
1041 else
1042 state.maybe_compressed_in = in;
1043
1044 Block header;
1045 if (state.io.out)
1046 header = state.io.out->getHeader();
1047 else if (state.need_receive_data_for_input)
1048 header = state.input_header;
1049
1050 last_block_in.header = header;
1051 last_block_in.compression = state.compression;
1052
1053 state.block_in = std::make_shared<NativeBlockInputStream>(
1054 *state.maybe_compressed_in,
1055 header,
1056 client_revision);
1057 }
1058}
1059
1060
1061void TCPHandler::initBlockOutput(const Block & block)
1062{
1063 if (!state.block_out)
1064 {
1065 if (!state.maybe_compressed_out)
1066 {
1067 std::string method = Poco::toUpper(query_context->getSettingsRef().network_compression_method.toString());
1068 std::optional<int> level;
1069 if (method == "ZSTD")
1070 level = query_context->getSettingsRef().network_zstd_compression_level;
1071
1072 if (state.compression == Protocol::Compression::Enable)
1073 state.maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(
1074 *out, CompressionCodecFactory::instance().get(method, level));
1075 else
1076 state.maybe_compressed_out = out;
1077 }
1078
1079 state.block_out = std::make_shared<NativeBlockOutputStream>(
1080 *state.maybe_compressed_out,
1081 client_revision,
1082 block.cloneEmpty(),
1083 !connection_context.getSettingsRef().low_cardinality_allow_in_native_format);
1084 }
1085}
1086
1087void TCPHandler::initLogsBlockOutput(const Block & block)
1088{
1089 if (!state.logs_block_out)
1090 {
1091 /// Use uncompressed stream since log blocks usually contain only one row
1092 state.logs_block_out = std::make_shared<NativeBlockOutputStream>(
1093 *out,
1094 client_revision,
1095 block.cloneEmpty(),
1096 !connection_context.getSettingsRef().low_cardinality_allow_in_native_format);
1097 }
1098}
1099
1100
1101bool TCPHandler::isQueryCancelled()
1102{
1103 if (state.is_cancelled || state.sent_all_data)
1104 return true;
1105
1106 if (after_check_cancelled.elapsed() / 1000 < query_context->getSettingsRef().interactive_delay)
1107 return false;
1108
1109 after_check_cancelled.restart();
1110
1111 /// During request execution the only packet that can come from the client is stopping the query.
1112 if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(0))
1113 {
1114 UInt64 packet_type = 0;
1115 readVarUInt(packet_type, *in);
1116
1117 switch (packet_type)
1118 {
1119 case Protocol::Client::Cancel:
1120 if (state.empty())
1121 throw NetException("Unexpected packet Cancel received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
1122 LOG_INFO(log, "Query was cancelled.");
1123 state.is_cancelled = true;
1124 return true;
1125
1126 default:
1127 throw NetException("Unknown packet from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
1128 }
1129 }
1130
1131 return false;
1132}
1133
1134
1135void TCPHandler::sendData(const Block & block)
1136{
1137 initBlockOutput(block);
1138
1139 writeVarUInt(Protocol::Server::Data, *out);
1140 /// Send external table name (empty name is the main table)
1141 writeStringBinary("", *out);
1142
1143 state.block_out->write(block);
1144 state.maybe_compressed_out->next();
1145 out->next();
1146}
1147
1148
1149void TCPHandler::sendLogData(const Block & block)
1150{
1151 initLogsBlockOutput(block);
1152
1153 writeVarUInt(Protocol::Server::Log, *out);
1154 /// Send log tag (empty tag is the default tag)
1155 writeStringBinary("", *out);
1156
1157 state.logs_block_out->write(block);
1158 out->next();
1159}
1160
1161void TCPHandler::sendTableColumns(const ColumnsDescription & columns)
1162{
1163 writeVarUInt(Protocol::Server::TableColumns, *out);
1164
1165 /// Send external table name (empty name is the main table)
1166 writeStringBinary("", *out);
1167 writeStringBinary(columns.toString(), *out);
1168
1169 out->next();
1170}
1171
1172void TCPHandler::sendException(const Exception & e, bool with_stack_trace)
1173{
1174 writeVarUInt(Protocol::Server::Exception, *out);
1175 writeException(e, *out, with_stack_trace);
1176 out->next();
1177}
1178
1179
1180void TCPHandler::sendEndOfStream()
1181{
1182 state.sent_all_data = true;
1183 writeVarUInt(Protocol::Server::EndOfStream, *out);
1184 out->next();
1185}
1186
1187
1188void TCPHandler::updateProgress(const Progress & value)
1189{
1190 state.progress.incrementPiecewiseAtomically(value);
1191}
1192
1193
1194void TCPHandler::sendProgress()
1195{
1196 writeVarUInt(Protocol::Server::Progress, *out);
1197 auto increment = state.progress.fetchAndResetPiecewiseAtomically();
1198 increment.write(*out, client_revision);
1199 out->next();
1200}
1201
1202
1203void TCPHandler::sendLogs()
1204{
1205 if (!state.logs_queue)
1206 return;
1207
1208 MutableColumns logs_columns;
1209 MutableColumns curr_logs_columns;
1210 size_t rows = 0;
1211
1212 for (; state.logs_queue->tryPop(curr_logs_columns); ++rows)
1213 {
1214 if (rows == 0)
1215 {
1216 logs_columns = std::move(curr_logs_columns);
1217 }
1218 else
1219 {
1220 for (size_t j = 0; j < logs_columns.size(); ++j)
1221 logs_columns[j]->insertRangeFrom(*curr_logs_columns[j], 0, curr_logs_columns[j]->size());
1222 }
1223 }
1224
1225 if (rows > 0)
1226 {
1227 Block block = InternalTextLogsQueue::getSampleBlock();
1228 block.setColumns(std::move(logs_columns));
1229 sendLogData(block);
1230 }
1231}
1232
1233
1234void TCPHandler::run()
1235{
1236 try
1237 {
1238 runImpl();
1239
1240 LOG_INFO(log, "Done processing connection.");
1241 }
1242 catch (Poco::Exception & e)
1243 {
1244 /// Timeout - not an error.
1245 if (!strcmp(e.what(), "Timeout"))
1246 {
1247 LOG_DEBUG(log, "Poco::Exception. Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code()
1248 << ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what());
1249 }
1250 else
1251 throw;
1252 }
1253}
1254
1255}
1256