1#pragma once
2
3#include <common/logger_useful.h>
4
5#include <Poco/Net/StreamSocket.h>
6
7#include <Common/Throttler.h>
8
9#include <Core/Block.h>
10#include <Core/Defines.h>
11#include <IO/Progress.h>
12#include <Core/Protocol.h>
13#include <Core/QueryProcessingStage.h>
14
15#include <DataStreams/IBlockStream_fwd.h>
16#include <DataStreams/BlockStreamProfileInfo.h>
17
18#include <IO/ConnectionTimeouts.h>
19
20#include <Core/Settings.h>
21#include <Interpreters/TablesStatus.h>
22
23#include <Compression/ICompressionCodec.h>
24
25#include <atomic>
26#include <optional>
27
28
29namespace DB
30{
31
32class ClientInfo;
33
34/// The stream of blocks reading from the table and its name
35using ExternalTableData = std::pair<BlockInputStreamPtr, std::string>;
36/// Vector of pairs describing tables
37using ExternalTablesData = std::vector<ExternalTableData>;
38
39class Connection;
40
41using ConnectionPtr = std::shared_ptr<Connection>;
42using Connections = std::vector<ConnectionPtr>;
43
44
45/// Packet that could be received from server.
46struct Packet
47{
48 UInt64 type;
49
50 Block block;
51 std::unique_ptr<Exception> exception;
52 std::vector<String> multistring_message;
53 Progress progress;
54 BlockStreamProfileInfo profile_info;
55
56 Packet() : type(Protocol::Server::Hello) {}
57};
58
59
60/** Connection with database server, to use by client.
61 * How to use - see Core/Protocol.h
62 * (Implementation of server end - see Server/TCPHandler.h)
63 *
64 * As 'default_database' empty string could be passed
65 * - in that case, server will use it's own default database.
66 */
67class Connection : private boost::noncopyable
68{
69 friend class MultiplexedConnections;
70
71public:
72 Connection(const String & host_, UInt16 port_,
73 const String & default_database_,
74 const String & user_, const String & password_,
75 const String & client_name_ = "client",
76 Protocol::Compression compression_ = Protocol::Compression::Enable,
77 Protocol::Secure secure_ = Protocol::Secure::Disable,
78 Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0))
79 :
80 host(host_), port(port_), default_database(default_database_),
81 user(user_), password(password_),
82 client_name(client_name_),
83 compression(compression_),
84 secure(secure_),
85 sync_request_timeout(sync_request_timeout_),
86 log_wrapper(*this)
87 {
88 /// Don't connect immediately, only on first need.
89
90 if (user.empty())
91 user = "default";
92
93 setDescription();
94 }
95
96 virtual ~Connection() {}
97
98 /// Set throttler of network traffic. One throttler could be used for multiple connections to limit total traffic.
99 void setThrottler(const ThrottlerPtr & throttler_)
100 {
101 throttler = throttler_;
102 }
103
104
105 /// Change default database. Changes will take effect on next reconnect.
106 void setDefaultDatabase(const String & database);
107
108 void getServerVersion(const ConnectionTimeouts & timeouts,
109 String & name,
110 UInt64 & version_major,
111 UInt64 & version_minor,
112 UInt64 & version_patch,
113 UInt64 & revision);
114 UInt64 getServerRevision(const ConnectionTimeouts & timeouts);
115
116 const String & getServerTimezone(const ConnectionTimeouts & timeouts);
117 const String & getServerDisplayName(const ConnectionTimeouts & timeouts);
118
119 /// For log and exception messages.
120 const String & getDescription() const;
121 const String & getHost() const;
122 UInt16 getPort() const;
123 const String & getDefaultDatabase() const;
124
125 /// If last flag is true, you need to call sendExternalTablesData after.
126 void sendQuery(
127 const ConnectionTimeouts & timeouts,
128 const String & query,
129 const String & query_id_ = "",
130 UInt64 stage = QueryProcessingStage::Complete,
131 const Settings * settings = nullptr,
132 const ClientInfo * client_info = nullptr,
133 bool with_pending_data = false);
134
135 void sendCancel();
136 /// Send block of data; if name is specified, server will write it to external (temporary) table of that name.
137 void sendData(const Block & block, const String & name = "", bool scalar = false);
138 /// Send all scalars.
139 void sendScalarsData(Scalars & data);
140 /// Send all contents of external (temporary) tables.
141 void sendExternalTablesData(ExternalTablesData & data);
142
143 /// Send prepared block of data (serialized and, if need, compressed), that will be read from 'input'.
144 /// You could pass size of serialized/compressed block.
145 void sendPreparedData(ReadBuffer & input, size_t size, const String & name = "");
146
147 /// Check, if has data to read.
148 bool poll(size_t timeout_microseconds = 0);
149
150 /// Check, if has data in read buffer.
151 bool hasReadPendingData() const;
152
153 /// Checks if there is input data in connection and reads packet ID.
154 std::optional<UInt64> checkPacket(size_t timeout_microseconds = 0);
155
156 /// Receive packet from server.
157 Packet receivePacket();
158
159 /// If not connected yet, or if connection is broken - then connect. If cannot connect - throw an exception.
160 void forceConnected(const ConnectionTimeouts & timeouts);
161
162 TablesStatusResponse getTablesStatus(const ConnectionTimeouts & timeouts,
163 const TablesStatusRequest & request);
164
165 /** Disconnect.
166 * This may be used, if connection is left in unsynchronised state
167 * (when someone continues to wait for something) after an exception.
168 */
169 void disconnect();
170
171 size_t outBytesCount() const { return out ? out->count() : 0; }
172 size_t inBytesCount() const { return in ? in->count() : 0; }
173
174private:
175 String host;
176 UInt16 port;
177 String default_database;
178 String user;
179 String password;
180
181 /// Address is resolved during the first connection (or the following reconnects)
182 /// Use it only for logging purposes
183 std::optional<Poco::Net::SocketAddress> current_resolved_address;
184
185 /// For messages in log and in exceptions.
186 String description;
187 void setDescription();
188
189 /// Returns resolved address if it was resolved.
190 std::optional<Poco::Net::SocketAddress> getResolvedAddress() const;
191
192 String client_name;
193
194 bool connected = false;
195
196 String server_name;
197 UInt64 server_version_major = 0;
198 UInt64 server_version_minor = 0;
199 UInt64 server_version_patch = 0;
200 UInt64 server_revision = 0;
201 String server_timezone;
202 String server_display_name;
203
204 std::unique_ptr<Poco::Net::StreamSocket> socket;
205 std::shared_ptr<ReadBuffer> in;
206 std::shared_ptr<WriteBuffer> out;
207 std::optional<UInt64> last_input_packet_type;
208
209 String query_id;
210 Protocol::Compression compression; /// Enable data compression for communication.
211 Protocol::Secure secure; /// Enable data encryption for communication.
212
213 /// What compression settings to use while sending data for INSERT queries and external tables.
214 CompressionCodecPtr compression_codec;
215
216 /** If not nullptr, used to limit network traffic.
217 * Only traffic for transferring blocks is accounted. Other packets don't.
218 */
219 ThrottlerPtr throttler;
220
221 Poco::Timespan sync_request_timeout;
222
223 /// From where to read query execution result.
224 std::shared_ptr<ReadBuffer> maybe_compressed_in;
225 BlockInputStreamPtr block_in;
226 BlockInputStreamPtr block_logs_in;
227
228 /// Where to write data for INSERT.
229 std::shared_ptr<WriteBuffer> maybe_compressed_out;
230 BlockOutputStreamPtr block_out;
231
232 /// Logger is created lazily, for avoid to run DNS request in constructor.
233 class LoggerWrapper
234 {
235 public:
236 LoggerWrapper(Connection & parent_)
237 : log(nullptr), parent(parent_)
238 {
239 }
240
241 Logger * get()
242 {
243 if (!log)
244 log = &Logger::get("Connection (" + parent.getDescription() + ")");
245
246 return log;
247 }
248
249 private:
250 std::atomic<Logger *> log;
251 Connection & parent;
252 };
253
254 LoggerWrapper log_wrapper;
255
256 void connect(const ConnectionTimeouts & timeouts);
257 void sendHello();
258 void receiveHello();
259 bool ping();
260
261 Block receiveData();
262 Block receiveLogData();
263 Block receiveDataImpl(BlockInputStreamPtr & stream);
264
265 std::vector<String> receiveMultistringMessage(UInt64 msg_type);
266 std::unique_ptr<Exception> receiveException();
267 Progress receiveProgress();
268 BlockStreamProfileInfo receiveProfileInfo();
269
270 void initInputBuffers();
271 void initBlockInput();
272 void initBlockLogsInput();
273
274 [[noreturn]] void throwUnexpectedPacket(UInt64 packet_type, const char * expected) const;
275};
276
277}
278