1 | #pragma once |
2 | |
3 | #include <common/logger_useful.h> |
4 | |
5 | #include <Poco/Net/StreamSocket.h> |
6 | |
7 | #include <Common/Throttler.h> |
8 | |
9 | #include <Core/Block.h> |
10 | #include <Core/Defines.h> |
11 | #include <IO/Progress.h> |
12 | #include <Core/Protocol.h> |
13 | #include <Core/QueryProcessingStage.h> |
14 | |
15 | #include <DataStreams/IBlockStream_fwd.h> |
16 | #include <DataStreams/BlockStreamProfileInfo.h> |
17 | |
18 | #include <IO/ConnectionTimeouts.h> |
19 | |
20 | #include <Core/Settings.h> |
21 | #include <Interpreters/TablesStatus.h> |
22 | |
23 | #include <Compression/ICompressionCodec.h> |
24 | |
25 | #include <atomic> |
26 | #include <optional> |
27 | |
28 | |
29 | namespace DB |
30 | { |
31 | |
32 | class ClientInfo; |
33 | |
34 | /// The stream of blocks reading from the table and its name |
35 | using ExternalTableData = std::pair<BlockInputStreamPtr, std::string>; |
36 | /// Vector of pairs describing tables |
37 | using ExternalTablesData = std::vector<ExternalTableData>; |
38 | |
39 | class Connection; |
40 | |
41 | using ConnectionPtr = std::shared_ptr<Connection>; |
42 | using Connections = std::vector<ConnectionPtr>; |
43 | |
44 | |
45 | /// Packet that could be received from server. |
46 | struct Packet |
47 | { |
48 | UInt64 type; |
49 | |
50 | Block block; |
51 | std::unique_ptr<Exception> exception; |
52 | std::vector<String> multistring_message; |
53 | Progress progress; |
54 | BlockStreamProfileInfo profile_info; |
55 | |
56 | Packet() : type(Protocol::Server::Hello) {} |
57 | }; |
58 | |
59 | |
60 | /** Connection with database server, to use by client. |
61 | * How to use - see Core/Protocol.h |
62 | * (Implementation of server end - see Server/TCPHandler.h) |
63 | * |
64 | * As 'default_database' empty string could be passed |
65 | * - in that case, server will use it's own default database. |
66 | */ |
67 | class Connection : private boost::noncopyable |
68 | { |
69 | friend class MultiplexedConnections; |
70 | |
71 | public: |
72 | Connection(const String & host_, UInt16 port_, |
73 | const String & default_database_, |
74 | const String & user_, const String & password_, |
75 | const String & client_name_ = "client" , |
76 | Protocol::Compression compression_ = Protocol::Compression::Enable, |
77 | Protocol::Secure secure_ = Protocol::Secure::Disable, |
78 | Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0)) |
79 | : |
80 | host(host_), port(port_), default_database(default_database_), |
81 | user(user_), password(password_), |
82 | client_name(client_name_), |
83 | compression(compression_), |
84 | secure(secure_), |
85 | sync_request_timeout(sync_request_timeout_), |
86 | log_wrapper(*this) |
87 | { |
88 | /// Don't connect immediately, only on first need. |
89 | |
90 | if (user.empty()) |
91 | user = "default" ; |
92 | |
93 | setDescription(); |
94 | } |
95 | |
96 | virtual ~Connection() {} |
97 | |
98 | /// Set throttler of network traffic. One throttler could be used for multiple connections to limit total traffic. |
99 | void setThrottler(const ThrottlerPtr & throttler_) |
100 | { |
101 | throttler = throttler_; |
102 | } |
103 | |
104 | |
105 | /// Change default database. Changes will take effect on next reconnect. |
106 | void setDefaultDatabase(const String & database); |
107 | |
108 | void getServerVersion(const ConnectionTimeouts & timeouts, |
109 | String & name, |
110 | UInt64 & version_major, |
111 | UInt64 & version_minor, |
112 | UInt64 & version_patch, |
113 | UInt64 & revision); |
114 | UInt64 getServerRevision(const ConnectionTimeouts & timeouts); |
115 | |
116 | const String & getServerTimezone(const ConnectionTimeouts & timeouts); |
117 | const String & getServerDisplayName(const ConnectionTimeouts & timeouts); |
118 | |
119 | /// For log and exception messages. |
120 | const String & getDescription() const; |
121 | const String & getHost() const; |
122 | UInt16 getPort() const; |
123 | const String & getDefaultDatabase() const; |
124 | |
125 | /// If last flag is true, you need to call sendExternalTablesData after. |
126 | void sendQuery( |
127 | const ConnectionTimeouts & timeouts, |
128 | const String & query, |
129 | const String & query_id_ = "" , |
130 | UInt64 stage = QueryProcessingStage::Complete, |
131 | const Settings * settings = nullptr, |
132 | const ClientInfo * client_info = nullptr, |
133 | bool with_pending_data = false); |
134 | |
135 | void sendCancel(); |
136 | /// Send block of data; if name is specified, server will write it to external (temporary) table of that name. |
137 | void sendData(const Block & block, const String & name = "" , bool scalar = false); |
138 | /// Send all scalars. |
139 | void sendScalarsData(Scalars & data); |
140 | /// Send all contents of external (temporary) tables. |
141 | void sendExternalTablesData(ExternalTablesData & data); |
142 | |
143 | /// Send prepared block of data (serialized and, if need, compressed), that will be read from 'input'. |
144 | /// You could pass size of serialized/compressed block. |
145 | void sendPreparedData(ReadBuffer & input, size_t size, const String & name = "" ); |
146 | |
147 | /// Check, if has data to read. |
148 | bool poll(size_t timeout_microseconds = 0); |
149 | |
150 | /// Check, if has data in read buffer. |
151 | bool hasReadPendingData() const; |
152 | |
153 | /// Checks if there is input data in connection and reads packet ID. |
154 | std::optional<UInt64> checkPacket(size_t timeout_microseconds = 0); |
155 | |
156 | /// Receive packet from server. |
157 | Packet receivePacket(); |
158 | |
159 | /// If not connected yet, or if connection is broken - then connect. If cannot connect - throw an exception. |
160 | void forceConnected(const ConnectionTimeouts & timeouts); |
161 | |
162 | TablesStatusResponse getTablesStatus(const ConnectionTimeouts & timeouts, |
163 | const TablesStatusRequest & request); |
164 | |
165 | /** Disconnect. |
166 | * This may be used, if connection is left in unsynchronised state |
167 | * (when someone continues to wait for something) after an exception. |
168 | */ |
169 | void disconnect(); |
170 | |
171 | size_t outBytesCount() const { return out ? out->count() : 0; } |
172 | size_t inBytesCount() const { return in ? in->count() : 0; } |
173 | |
174 | private: |
175 | String host; |
176 | UInt16 port; |
177 | String default_database; |
178 | String user; |
179 | String password; |
180 | |
181 | /// Address is resolved during the first connection (or the following reconnects) |
182 | /// Use it only for logging purposes |
183 | std::optional<Poco::Net::SocketAddress> current_resolved_address; |
184 | |
185 | /// For messages in log and in exceptions. |
186 | String description; |
187 | void setDescription(); |
188 | |
189 | /// Returns resolved address if it was resolved. |
190 | std::optional<Poco::Net::SocketAddress> getResolvedAddress() const; |
191 | |
192 | String client_name; |
193 | |
194 | bool connected = false; |
195 | |
196 | String server_name; |
197 | UInt64 server_version_major = 0; |
198 | UInt64 server_version_minor = 0; |
199 | UInt64 server_version_patch = 0; |
200 | UInt64 server_revision = 0; |
201 | String server_timezone; |
202 | String server_display_name; |
203 | |
204 | std::unique_ptr<Poco::Net::StreamSocket> socket; |
205 | std::shared_ptr<ReadBuffer> in; |
206 | std::shared_ptr<WriteBuffer> out; |
207 | std::optional<UInt64> last_input_packet_type; |
208 | |
209 | String query_id; |
210 | Protocol::Compression compression; /// Enable data compression for communication. |
211 | Protocol::Secure secure; /// Enable data encryption for communication. |
212 | |
213 | /// What compression settings to use while sending data for INSERT queries and external tables. |
214 | CompressionCodecPtr compression_codec; |
215 | |
216 | /** If not nullptr, used to limit network traffic. |
217 | * Only traffic for transferring blocks is accounted. Other packets don't. |
218 | */ |
219 | ThrottlerPtr throttler; |
220 | |
221 | Poco::Timespan sync_request_timeout; |
222 | |
223 | /// From where to read query execution result. |
224 | std::shared_ptr<ReadBuffer> maybe_compressed_in; |
225 | BlockInputStreamPtr block_in; |
226 | BlockInputStreamPtr block_logs_in; |
227 | |
228 | /// Where to write data for INSERT. |
229 | std::shared_ptr<WriteBuffer> maybe_compressed_out; |
230 | BlockOutputStreamPtr block_out; |
231 | |
232 | /// Logger is created lazily, for avoid to run DNS request in constructor. |
233 | class LoggerWrapper |
234 | { |
235 | public: |
236 | LoggerWrapper(Connection & parent_) |
237 | : log(nullptr), parent(parent_) |
238 | { |
239 | } |
240 | |
241 | Logger * get() |
242 | { |
243 | if (!log) |
244 | log = &Logger::get("Connection (" + parent.getDescription() + ")" ); |
245 | |
246 | return log; |
247 | } |
248 | |
249 | private: |
250 | std::atomic<Logger *> log; |
251 | Connection & parent; |
252 | }; |
253 | |
254 | LoggerWrapper log_wrapper; |
255 | |
256 | void connect(const ConnectionTimeouts & timeouts); |
257 | void sendHello(); |
258 | void receiveHello(); |
259 | bool ping(); |
260 | |
261 | Block receiveData(); |
262 | Block receiveLogData(); |
263 | Block receiveDataImpl(BlockInputStreamPtr & stream); |
264 | |
265 | std::vector<String> receiveMultistringMessage(UInt64 msg_type); |
266 | std::unique_ptr<Exception> receiveException(); |
267 | Progress receiveProgress(); |
268 | BlockStreamProfileInfo receiveProfileInfo(); |
269 | |
270 | void initInputBuffers(); |
271 | void initBlockInput(); |
272 | void initBlockLogsInput(); |
273 | |
274 | [[noreturn]] void throwUnexpectedPacket(UInt64 packet_type, const char * expected) const; |
275 | }; |
276 | |
277 | } |
278 | |