1#pragma once
2
3#include <Poco/Net/TCPServerConnection.h>
4
5#include <Common/getFQDNOrHostName.h>
6#include <Common/CurrentMetrics.h>
7#include <Common/Stopwatch.h>
8#include <Core/Protocol.h>
9#include <Core/QueryProcessingStage.h>
10#include <IO/Progress.h>
11#include <DataStreams/BlockIO.h>
12#include <Interpreters/InternalTextLogsQueue.h>
13#include <Client/TimeoutSetter.h>
14
15#include "IServer.h"
16
17
18namespace CurrentMetrics
19{
20 extern const Metric TCPConnection;
21}
22
23namespace Poco { class Logger; }
24
25namespace DB
26{
27
28class ColumnsDescription;
29
30/// State of query processing.
31struct QueryState
32{
33 /// Identifier of the query.
34 String query_id;
35
36 QueryProcessingStage::Enum stage = QueryProcessingStage::Complete;
37 Protocol::Compression compression = Protocol::Compression::Disable;
38
39 /// A queue with internal logs that will be passed to client. It must be
40 /// destroyed after input/output blocks, because they may contain other
41 /// threads that use this queue.
42 InternalTextLogsQueuePtr logs_queue;
43 BlockOutputStreamPtr logs_block_out;
44
45 /// From where to read data for INSERT.
46 std::shared_ptr<ReadBuffer> maybe_compressed_in;
47 BlockInputStreamPtr block_in;
48
49 /// Where to write result data.
50 std::shared_ptr<WriteBuffer> maybe_compressed_out;
51 BlockOutputStreamPtr block_out;
52
53 /// Query text.
54 String query;
55 /// Streams of blocks, that are processing the query.
56 BlockIO io;
57
58 /// Is request cancelled
59 bool is_cancelled = false;
60 /// empty or not
61 bool is_empty = true;
62 /// Data was sent.
63 bool sent_all_data = false;
64 /// Request requires data from the client (INSERT, but not INSERT SELECT).
65 bool need_receive_data_for_insert = false;
66 /// Temporary tables read
67 bool temporary_tables_read = false;
68
69 /// Request requires data from client for function input()
70 bool need_receive_data_for_input = false;
71 /// temporary place for incoming data block for input()
72 Block block_for_input;
73 /// sample block from StorageInput
74 Block input_header;
75
76 /// To output progress, the difference after the previous sending of progress.
77 Progress progress;
78
79 /// Timeouts setter for current query
80 std::unique_ptr<TimeoutSetter> timeout_setter;
81
82 void reset()
83 {
84 *this = QueryState();
85 }
86
87 bool empty()
88 {
89 return is_empty;
90 }
91};
92
93
94struct LastBlockInputParameters
95{
96 Protocol::Compression compression = Protocol::Compression::Disable;
97 Block header;
98};
99
100
101class TCPHandler : public Poco::Net::TCPServerConnection
102{
103public:
104 TCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_)
105 : Poco::Net::TCPServerConnection(socket_)
106 , server(server_)
107 , log(&Poco::Logger::get("TCPHandler"))
108 , connection_context(server.context())
109 , query_context(server.context())
110 {
111 server_display_name = server.config().getString("display_name", getFQDNOrHostName());
112 }
113
114 void run();
115
116 /// This method is called right before the query execution.
117 virtual void customizeContext(DB::Context & /*context*/) {}
118
119private:
120 IServer & server;
121 Poco::Logger * log;
122
123 String client_name;
124 UInt64 client_version_major = 0;
125 UInt64 client_version_minor = 0;
126 UInt64 client_version_patch = 0;
127 UInt64 client_revision = 0;
128
129 Context connection_context;
130 std::optional<Context> query_context;
131
132 /// Streams for reading/writing from/to client connection socket.
133 std::shared_ptr<ReadBuffer> in;
134 std::shared_ptr<WriteBuffer> out;
135
136 /// Time after the last check to stop the request and send the progress.
137 Stopwatch after_check_cancelled;
138 Stopwatch after_send_progress;
139
140 String default_database;
141
142 /// At the moment, only one ongoing query in the connection is supported at a time.
143 QueryState state;
144
145 /// Last block input parameters are saved to be able to receive unexpected data packet sent after exception.
146 LastBlockInputParameters last_block_in;
147
148 CurrentMetrics::Increment metric_increment{CurrentMetrics::TCPConnection};
149
150 /// It is the name of the server that will be sent to the client.
151 String server_display_name;
152
153 void runImpl();
154
155 void receiveHello();
156 bool receivePacket();
157 void receiveQuery();
158 bool receiveData(bool scalar);
159 bool readDataNext(const size_t & poll_interval, const int & receive_timeout);
160 void readData(const Settings & global_settings);
161 std::tuple<size_t, int> getReadTimeouts(const Settings & global_settings);
162
163 [[noreturn]] void receiveUnexpectedData();
164 [[noreturn]] void receiveUnexpectedQuery();
165 [[noreturn]] void receiveUnexpectedHello();
166 [[noreturn]] void receiveUnexpectedTablesStatusRequest();
167
168 /// Process INSERT query
169 void processInsertQuery(const Settings & global_settings);
170
171 /// Process a request that does not require the receiving of data blocks from the client
172 void processOrdinaryQuery();
173
174 void processOrdinaryQueryWithProcessors(size_t num_threads);
175
176 void processTablesStatusRequest();
177
178 void sendHello();
179 void sendData(const Block & block); /// Write a block to the network.
180 void sendLogData(const Block & block);
181 void sendTableColumns(const ColumnsDescription & columns);
182 void sendException(const Exception & e, bool with_stack_trace);
183 void sendProgress();
184 void sendLogs();
185 void sendEndOfStream();
186 void sendProfileInfo(const BlockStreamProfileInfo & info);
187 void sendTotals(const Block & totals);
188 void sendExtremes(const Block & extremes);
189
190 /// Creates state.block_in/block_out for blocks read/write, depending on whether compression is enabled.
191 void initBlockInput();
192 void initBlockOutput(const Block & block);
193 void initLogsBlockOutput(const Block & block);
194
195 bool isQueryCancelled();
196
197 /// This function is called from different threads.
198 void updateProgress(const Progress & value);
199};
200
201}
202