1 | #include <Common/config.h> |
2 | |
3 | #include "MySQLHandler.h" |
4 | #include <limits> |
5 | #include <ext/scope_guard.h> |
6 | #include <Columns/ColumnVector.h> |
7 | #include <Common/config_version.h> |
8 | #include <Common/NetException.h> |
9 | #include <Common/OpenSSLHelpers.h> |
10 | #include <Core/MySQLProtocol.h> |
11 | #include <Core/NamesAndTypes.h> |
12 | #include <DataStreams/copyData.h> |
13 | #include <Interpreters/executeQuery.h> |
14 | #include <IO/ReadBufferFromPocoSocket.h> |
15 | #include <IO/ReadBufferFromString.h> |
16 | #include <IO/WriteBufferFromPocoSocket.h> |
17 | #include <Storages/IStorage.h> |
18 | #include <boost/algorithm/string/replace.hpp> |
19 | |
20 | #if USE_POCO_NETSSL |
21 | #include <Poco/Net/SecureStreamSocket.h> |
22 | #include <Poco/Net/SSLManager.h> |
23 | #include <Poco/Crypto/CipherFactory.h> |
24 | #include <Poco/Crypto/RSAKey.h> |
25 | #endif |
26 | |
27 | namespace DB |
28 | { |
29 | |
30 | using namespace MySQLProtocol; |
31 | |
32 | #if USE_POCO_NETSSL |
33 | using Poco::Net::SecureStreamSocket; |
34 | using Poco::Net::SSLManager; |
35 | #endif |
36 | |
37 | namespace ErrorCodes |
38 | { |
39 | extern const int MYSQL_CLIENT_INSUFFICIENT_CAPABILITIES; |
40 | extern const int OPENSSL_ERROR; |
41 | extern const int SUPPORT_IS_DISABLED; |
42 | } |
43 | |
44 | MySQLHandler::MySQLHandler(IServer & server_, const Poco::Net::StreamSocket & socket_, |
45 | bool ssl_enabled, size_t connection_id_) |
46 | : Poco::Net::TCPServerConnection(socket_) |
47 | , server(server_) |
48 | , log(&Poco::Logger::get("MySQLHandler" )) |
49 | , connection_context(server.context()) |
50 | , connection_id(connection_id_) |
51 | , auth_plugin(new MySQLProtocol::Authentication::Native41()) |
52 | { |
53 | server_capability_flags = CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION | CLIENT_PLUGIN_AUTH | CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA | CLIENT_CONNECT_WITH_DB | CLIENT_DEPRECATE_EOF; |
54 | if (ssl_enabled) |
55 | server_capability_flags |= CLIENT_SSL; |
56 | } |
57 | |
58 | void MySQLHandler::run() |
59 | { |
60 | connection_context.makeSessionContext(); |
61 | connection_context.setDefaultFormat("MySQLWire" ); |
62 | |
63 | in = std::make_shared<ReadBufferFromPocoSocket>(socket()); |
64 | out = std::make_shared<WriteBufferFromPocoSocket>(socket()); |
65 | packet_sender = std::make_shared<PacketSender>(*in, *out, connection_context.mysql.sequence_id); |
66 | |
67 | try |
68 | { |
69 | Handshake handshake(server_capability_flags, connection_id, VERSION_STRING + String("-" ) + VERSION_NAME, auth_plugin->getName(), auth_plugin->getAuthPluginData()); |
70 | packet_sender->sendPacket<Handshake>(handshake, true); |
71 | |
72 | LOG_TRACE(log, "Sent handshake" ); |
73 | |
74 | HandshakeResponse handshake_response; |
75 | finishHandshake(handshake_response); |
76 | connection_context.mysql.client_capabilities = handshake_response.capability_flags; |
77 | if (handshake_response.max_packet_size) |
78 | connection_context.mysql.max_packet_size = handshake_response.max_packet_size; |
79 | if (!connection_context.mysql.max_packet_size) |
80 | connection_context.mysql.max_packet_size = MAX_PACKET_LENGTH; |
81 | |
82 | /* LOG_TRACE(log, "Capabilities: " << handshake_response.capability_flags |
83 | << "\nmax_packet_size: " |
84 | << handshake_response.max_packet_size |
85 | << "\ncharacter_set: " |
86 | << handshake_response.character_set |
87 | << "\nuser: " |
88 | << handshake_response.username |
89 | << "\nauth_response length: " |
90 | << handshake_response.auth_response.length() |
91 | << "\nauth_response: " |
92 | << handshake_response.auth_response |
93 | << "\ndatabase: " |
94 | << handshake_response.database |
95 | << "\nauth_plugin_name: " |
96 | << handshake_response.auth_plugin_name);*/ |
97 | |
98 | client_capability_flags = handshake_response.capability_flags; |
99 | if (!(client_capability_flags & CLIENT_PROTOCOL_41)) |
100 | throw Exception("Required capability: CLIENT_PROTOCOL_41." , ErrorCodes::MYSQL_CLIENT_INSUFFICIENT_CAPABILITIES); |
101 | |
102 | authenticate(handshake_response.username, handshake_response.auth_plugin_name, handshake_response.auth_response); |
103 | |
104 | try |
105 | { |
106 | if (!handshake_response.database.empty()) |
107 | connection_context.setCurrentDatabase(handshake_response.database); |
108 | connection_context.setCurrentQueryId("" ); |
109 | } |
110 | catch (const Exception & exc) |
111 | { |
112 | log->log(exc); |
113 | packet_sender->sendPacket(ERR_Packet(exc.code(), "00000" , exc.message()), true); |
114 | } |
115 | |
116 | OK_Packet ok_packet(0, handshake_response.capability_flags, 0, 0, 0); |
117 | packet_sender->sendPacket(ok_packet, true); |
118 | |
119 | while (true) |
120 | { |
121 | packet_sender->resetSequenceId(); |
122 | PacketPayloadReadBuffer payload = packet_sender->getPayload(); |
123 | |
124 | char command = 0; |
125 | payload.readStrict(command); |
126 | |
127 | // For commands which are executed without MemoryTracker. |
128 | LimitReadBuffer limited_payload(payload, 10000, true, "too long MySQL packet." ); |
129 | |
130 | LOG_DEBUG(log, "Received command: " << static_cast<int>(static_cast<unsigned char>(command)) << ". Connection id: " << connection_id << "." ); |
131 | try |
132 | { |
133 | switch (command) |
134 | { |
135 | case COM_QUIT: |
136 | return; |
137 | case COM_INIT_DB: |
138 | comInitDB(limited_payload); |
139 | break; |
140 | case COM_QUERY: |
141 | comQuery(payload); |
142 | break; |
143 | case COM_FIELD_LIST: |
144 | comFieldList(limited_payload); |
145 | break; |
146 | case COM_PING: |
147 | comPing(); |
148 | break; |
149 | default: |
150 | throw Exception(Poco::format("Command %d is not implemented." , command), ErrorCodes::NOT_IMPLEMENTED); |
151 | } |
152 | } |
153 | catch (const NetException & exc) |
154 | { |
155 | log->log(exc); |
156 | throw; |
157 | } |
158 | catch (const Exception & exc) |
159 | { |
160 | log->log(exc); |
161 | packet_sender->sendPacket(ERR_Packet(exc.code(), "00000" , exc.message()), true); |
162 | } |
163 | } |
164 | } |
165 | catch (const Poco::Exception & exc) |
166 | { |
167 | log->log(exc); |
168 | } |
169 | } |
170 | |
171 | /** Reads 3 bytes, finds out whether it is SSLRequest or HandshakeResponse packet, starts secure connection, if it is SSLRequest. |
172 | * Reading is performed from socket instead of ReadBuffer to prevent reading part of SSL handshake. |
173 | * If we read it from socket, it will be impossible to start SSL connection using Poco. Size of SSLRequest packet payload is 32 bytes, thus we can read at most 36 bytes. |
174 | */ |
175 | void MySQLHandler::finishHandshake(MySQLProtocol::HandshakeResponse & packet) |
176 | { |
177 | size_t packet_size = PACKET_HEADER_SIZE + SSL_REQUEST_PAYLOAD_SIZE; |
178 | |
179 | /// Buffer for SSLRequest or part of HandshakeResponse. |
180 | char buf[packet_size]; |
181 | size_t pos = 0; |
182 | |
183 | /// Reads at least count and at most packet_size bytes. |
184 | auto read_bytes = [this, &buf, &pos, &packet_size](size_t count) -> void { |
185 | while (pos < count) |
186 | { |
187 | int ret = socket().receiveBytes(buf + pos, packet_size - pos); |
188 | if (ret == 0) |
189 | { |
190 | throw Exception("Cannot read all data. Bytes read: " + std::to_string(pos) + ". Bytes expected: 3." , ErrorCodes::CANNOT_READ_ALL_DATA); |
191 | } |
192 | pos += ret; |
193 | } |
194 | }; |
195 | read_bytes(3); /// We can find out whether it is SSLRequest of HandshakeResponse by first 3 bytes. |
196 | |
197 | size_t payload_size = unalignedLoad<uint32_t>(buf) & 0xFFFFFFu; |
198 | LOG_TRACE(log, "payload size: " << payload_size); |
199 | |
200 | if (payload_size == SSL_REQUEST_PAYLOAD_SIZE) |
201 | { |
202 | finishHandshakeSSL(packet_size, buf, pos, read_bytes, packet); |
203 | } |
204 | else |
205 | { |
206 | /// Reading rest of HandshakeResponse. |
207 | packet_size = PACKET_HEADER_SIZE + payload_size; |
208 | WriteBufferFromOwnString buf_for_handshake_response; |
209 | buf_for_handshake_response.write(buf, pos); |
210 | copyData(*packet_sender->in, buf_for_handshake_response, packet_size - pos); |
211 | ReadBufferFromString payload(buf_for_handshake_response.str()); |
212 | payload.ignore(PACKET_HEADER_SIZE); |
213 | packet.readPayload(payload); |
214 | packet_sender->sequence_id++; |
215 | } |
216 | } |
217 | |
218 | void MySQLHandler::authenticate(const String & user_name, const String & auth_plugin_name, const String & initial_auth_response) |
219 | { |
220 | try |
221 | { |
222 | // For compatibility with JavaScript MySQL client, Native41 authentication plugin is used when possible (if password is specified using double SHA1). Otherwise SHA256 plugin is used. |
223 | auto user = connection_context.getUser(user_name); |
224 | const DB::Authentication::Type user_auth_type = user->authentication.getType(); |
225 | if (user_auth_type != DB::Authentication::DOUBLE_SHA1_PASSWORD && user_auth_type != DB::Authentication::PLAINTEXT_PASSWORD && user_auth_type != DB::Authentication::NO_PASSWORD) |
226 | { |
227 | authPluginSSL(); |
228 | } |
229 | |
230 | std::optional<String> auth_response = auth_plugin_name == auth_plugin->getName() ? std::make_optional<String>(initial_auth_response) : std::nullopt; |
231 | auth_plugin->authenticate(user_name, auth_response, connection_context, packet_sender, secure_connection, socket().peerAddress()); |
232 | } |
233 | catch (const Exception & exc) |
234 | { |
235 | LOG_ERROR(log, "Authentication for user " << user_name << " failed." ); |
236 | packet_sender->sendPacket(ERR_Packet(exc.code(), "00000" , exc.message()), true); |
237 | throw; |
238 | } |
239 | LOG_INFO(log, "Authentication for user " << user_name << " succeeded." ); |
240 | } |
241 | |
242 | void MySQLHandler::comInitDB(ReadBuffer & payload) |
243 | { |
244 | String database; |
245 | readStringUntilEOF(database, payload); |
246 | LOG_DEBUG(log, "Setting current database to " << database); |
247 | connection_context.setCurrentDatabase(database); |
248 | packet_sender->sendPacket(OK_Packet(0, client_capability_flags, 0, 0, 1), true); |
249 | } |
250 | |
251 | void MySQLHandler::comFieldList(ReadBuffer & payload) |
252 | { |
253 | ComFieldList packet; |
254 | packet.readPayload(payload); |
255 | String database = connection_context.getCurrentDatabase(); |
256 | StoragePtr tablePtr = connection_context.getTable(database, packet.table); |
257 | for (const NameAndTypePair & column: tablePtr->getColumns().getAll()) |
258 | { |
259 | ColumnDefinition column_definition( |
260 | database, packet.table, packet.table, column.name, column.name, CharacterSet::binary, 100, ColumnType::MYSQL_TYPE_STRING, 0, 0 |
261 | ); |
262 | packet_sender->sendPacket(column_definition); |
263 | } |
264 | packet_sender->sendPacket(OK_Packet(0xfe, client_capability_flags, 0, 0, 0), true); |
265 | } |
266 | |
267 | void MySQLHandler::comPing() |
268 | { |
269 | packet_sender->sendPacket(OK_Packet(0x0, client_capability_flags, 0, 0, 0), true); |
270 | } |
271 | |
272 | static bool isFederatedServerSetupCommand(const String & query); |
273 | |
274 | void MySQLHandler::comQuery(ReadBuffer & payload) |
275 | { |
276 | String query = String(payload.position(), payload.buffer().end()); |
277 | |
278 | // This is a workaround in order to support adding ClickHouse to MySQL using federated server. |
279 | // As Clickhouse doesn't support these statements, we just send OK packet in response. |
280 | if (isFederatedServerSetupCommand(query)) |
281 | { |
282 | packet_sender->sendPacket(OK_Packet(0x00, client_capability_flags, 0, 0, 0), true); |
283 | } |
284 | else |
285 | { |
286 | bool with_output = false; |
287 | std::function<void(const String &)> set_content_type = [&with_output](const String &) -> void { |
288 | with_output = true; |
289 | }; |
290 | |
291 | String replacement_query = "select ''" ; |
292 | bool should_replace = false; |
293 | |
294 | // Translate query from MySQL to ClickHouse. |
295 | // This is a temporary workaround until ClickHouse supports the syntax "@@var_name". |
296 | if (query == "select @@version_comment limit 1" ) // MariaDB client starts session with that query |
297 | { |
298 | should_replace = true; |
299 | } |
300 | // This is a workaround in order to support adding ClickHouse to MySQL using federated server. |
301 | if (0 == strncasecmp("SHOW TABLE STATUS LIKE" , query.c_str(), 22)) |
302 | { |
303 | should_replace = true; |
304 | replacement_query = boost::replace_all_copy(query, "SHOW TABLE STATUS LIKE " , show_table_status_replacement_query); |
305 | } |
306 | |
307 | ReadBufferFromString replacement(replacement_query); |
308 | |
309 | Context query_context = connection_context; |
310 | executeQuery(should_replace ? replacement : payload, *out, true, query_context, set_content_type, nullptr); |
311 | |
312 | if (!with_output) |
313 | packet_sender->sendPacket(OK_Packet(0x00, client_capability_flags, 0, 0, 0), true); |
314 | } |
315 | } |
316 | |
317 | void MySQLHandler::authPluginSSL() |
318 | { |
319 | throw Exception("ClickHouse was built without SSL support. Try specifying password using double SHA1 in users.xml." , ErrorCodes::SUPPORT_IS_DISABLED); |
320 | } |
321 | |
322 | void MySQLHandler::finishHandshakeSSL([[maybe_unused]] size_t packet_size, [[maybe_unused]] char * buf, [[maybe_unused]] size_t pos, [[maybe_unused]] std::function<void(size_t)> read_bytes, [[maybe_unused]] MySQLProtocol::HandshakeResponse & packet) |
323 | { |
324 | throw Exception("Client requested SSL, while it is disabled." , ErrorCodes::SUPPORT_IS_DISABLED); |
325 | } |
326 | |
327 | #if USE_SSL && USE_POCO_NETSSL |
328 | MySQLHandlerSSL::MySQLHandlerSSL(IServer & server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, size_t connection_id_, RSA & public_key_, RSA & private_key_) |
329 | : MySQLHandler(server_, socket_, ssl_enabled, connection_id_) |
330 | , public_key(public_key_) |
331 | , private_key(private_key_) |
332 | {} |
333 | |
334 | void MySQLHandlerSSL::authPluginSSL() |
335 | { |
336 | auth_plugin = std::make_unique<MySQLProtocol::Authentication::Sha256Password>(public_key, private_key, log); |
337 | } |
338 | |
339 | void MySQLHandlerSSL::finishHandshakeSSL(size_t packet_size, char * buf, size_t pos, std::function<void(size_t)> read_bytes, MySQLProtocol::HandshakeResponse & packet) |
340 | { |
341 | read_bytes(packet_size); /// Reading rest SSLRequest. |
342 | SSLRequest ssl_request; |
343 | ReadBufferFromMemory payload(buf, pos); |
344 | payload.ignore(PACKET_HEADER_SIZE); |
345 | ssl_request.readPayload(payload); |
346 | connection_context.mysql.client_capabilities = ssl_request.capability_flags; |
347 | connection_context.mysql.max_packet_size = ssl_request.max_packet_size ? ssl_request.max_packet_size : MAX_PACKET_LENGTH; |
348 | secure_connection = true; |
349 | ss = std::make_shared<SecureStreamSocket>(SecureStreamSocket::attach(socket(), SSLManager::instance().defaultServerContext())); |
350 | in = std::make_shared<ReadBufferFromPocoSocket>(*ss); |
351 | out = std::make_shared<WriteBufferFromPocoSocket>(*ss); |
352 | connection_context.mysql.sequence_id = 2; |
353 | packet_sender = std::make_shared<PacketSender>(*in, *out, connection_context.mysql.sequence_id); |
354 | packet_sender->max_packet_size = connection_context.mysql.max_packet_size; |
355 | packet_sender->receivePacket(packet); /// Reading HandshakeResponse from secure socket. |
356 | } |
357 | |
358 | #endif |
359 | |
360 | static bool isFederatedServerSetupCommand(const String & query) |
361 | { |
362 | return 0 == strncasecmp("SET NAMES" , query.c_str(), 9) || 0 == strncasecmp("SET character_set_results" , query.c_str(), 25) |
363 | || 0 == strncasecmp("SET FOREIGN_KEY_CHECKS" , query.c_str(), 22) || 0 == strncasecmp("SET AUTOCOMMIT" , query.c_str(), 14) |
364 | || 0 == strncasecmp("SET SESSION TRANSACTION ISOLATION LEVEL" , query.c_str(), 39); |
365 | } |
366 | |
367 | const String MySQLHandler::show_table_status_replacement_query("SELECT" |
368 | " name AS Name," |
369 | " engine AS Engine," |
370 | " '10' AS Version," |
371 | " 'Dynamic' AS Row_format," |
372 | " 0 AS Rows," |
373 | " 0 AS Avg_row_length," |
374 | " 0 AS Data_length," |
375 | " 0 AS Max_data_length," |
376 | " 0 AS Index_length," |
377 | " 0 AS Data_free," |
378 | " 'NULL' AS Auto_increment," |
379 | " metadata_modification_time AS Create_time," |
380 | " metadata_modification_time AS Update_time," |
381 | " metadata_modification_time AS Check_time," |
382 | " 'utf8_bin' AS Collation," |
383 | " 'NULL' AS Checksum," |
384 | " '' AS Create_options," |
385 | " '' AS Comment" |
386 | " FROM system.tables" |
387 | " WHERE name LIKE " ); |
388 | |
389 | } |
390 | |