1#include <Common/config.h>
2
3#include "MySQLHandler.h"
4#include <limits>
5#include <ext/scope_guard.h>
6#include <Columns/ColumnVector.h>
7#include <Common/config_version.h>
8#include <Common/NetException.h>
9#include <Common/OpenSSLHelpers.h>
10#include <Core/MySQLProtocol.h>
11#include <Core/NamesAndTypes.h>
12#include <DataStreams/copyData.h>
13#include <Interpreters/executeQuery.h>
14#include <IO/ReadBufferFromPocoSocket.h>
15#include <IO/ReadBufferFromString.h>
16#include <IO/WriteBufferFromPocoSocket.h>
17#include <Storages/IStorage.h>
18#include <boost/algorithm/string/replace.hpp>
19
20#if USE_POCO_NETSSL
21#include <Poco/Net/SecureStreamSocket.h>
22#include <Poco/Net/SSLManager.h>
23#include <Poco/Crypto/CipherFactory.h>
24#include <Poco/Crypto/RSAKey.h>
25#endif
26
27namespace DB
28{
29
30using namespace MySQLProtocol;
31
32#if USE_POCO_NETSSL
33using Poco::Net::SecureStreamSocket;
34using Poco::Net::SSLManager;
35#endif
36
37namespace ErrorCodes
38{
39 extern const int MYSQL_CLIENT_INSUFFICIENT_CAPABILITIES;
40 extern const int OPENSSL_ERROR;
41 extern const int SUPPORT_IS_DISABLED;
42}
43
44MySQLHandler::MySQLHandler(IServer & server_, const Poco::Net::StreamSocket & socket_,
45 bool ssl_enabled, size_t connection_id_)
46 : Poco::Net::TCPServerConnection(socket_)
47 , server(server_)
48 , log(&Poco::Logger::get("MySQLHandler"))
49 , connection_context(server.context())
50 , connection_id(connection_id_)
51 , auth_plugin(new MySQLProtocol::Authentication::Native41())
52{
53 server_capability_flags = CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION | CLIENT_PLUGIN_AUTH | CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA | CLIENT_CONNECT_WITH_DB | CLIENT_DEPRECATE_EOF;
54 if (ssl_enabled)
55 server_capability_flags |= CLIENT_SSL;
56}
57
58void MySQLHandler::run()
59{
60 connection_context.makeSessionContext();
61 connection_context.setDefaultFormat("MySQLWire");
62
63 in = std::make_shared<ReadBufferFromPocoSocket>(socket());
64 out = std::make_shared<WriteBufferFromPocoSocket>(socket());
65 packet_sender = std::make_shared<PacketSender>(*in, *out, connection_context.mysql.sequence_id);
66
67 try
68 {
69 Handshake handshake(server_capability_flags, connection_id, VERSION_STRING + String("-") + VERSION_NAME, auth_plugin->getName(), auth_plugin->getAuthPluginData());
70 packet_sender->sendPacket<Handshake>(handshake, true);
71
72 LOG_TRACE(log, "Sent handshake");
73
74 HandshakeResponse handshake_response;
75 finishHandshake(handshake_response);
76 connection_context.mysql.client_capabilities = handshake_response.capability_flags;
77 if (handshake_response.max_packet_size)
78 connection_context.mysql.max_packet_size = handshake_response.max_packet_size;
79 if (!connection_context.mysql.max_packet_size)
80 connection_context.mysql.max_packet_size = MAX_PACKET_LENGTH;
81
82/* LOG_TRACE(log, "Capabilities: " << handshake_response.capability_flags
83 << "\nmax_packet_size: "
84 << handshake_response.max_packet_size
85 << "\ncharacter_set: "
86 << handshake_response.character_set
87 << "\nuser: "
88 << handshake_response.username
89 << "\nauth_response length: "
90 << handshake_response.auth_response.length()
91 << "\nauth_response: "
92 << handshake_response.auth_response
93 << "\ndatabase: "
94 << handshake_response.database
95 << "\nauth_plugin_name: "
96 << handshake_response.auth_plugin_name);*/
97
98 client_capability_flags = handshake_response.capability_flags;
99 if (!(client_capability_flags & CLIENT_PROTOCOL_41))
100 throw Exception("Required capability: CLIENT_PROTOCOL_41.", ErrorCodes::MYSQL_CLIENT_INSUFFICIENT_CAPABILITIES);
101
102 authenticate(handshake_response.username, handshake_response.auth_plugin_name, handshake_response.auth_response);
103
104 try
105 {
106 if (!handshake_response.database.empty())
107 connection_context.setCurrentDatabase(handshake_response.database);
108 connection_context.setCurrentQueryId("");
109 }
110 catch (const Exception & exc)
111 {
112 log->log(exc);
113 packet_sender->sendPacket(ERR_Packet(exc.code(), "00000", exc.message()), true);
114 }
115
116 OK_Packet ok_packet(0, handshake_response.capability_flags, 0, 0, 0);
117 packet_sender->sendPacket(ok_packet, true);
118
119 while (true)
120 {
121 packet_sender->resetSequenceId();
122 PacketPayloadReadBuffer payload = packet_sender->getPayload();
123
124 char command = 0;
125 payload.readStrict(command);
126
127 // For commands which are executed without MemoryTracker.
128 LimitReadBuffer limited_payload(payload, 10000, true, "too long MySQL packet.");
129
130 LOG_DEBUG(log, "Received command: " << static_cast<int>(static_cast<unsigned char>(command)) << ". Connection id: " << connection_id << ".");
131 try
132 {
133 switch (command)
134 {
135 case COM_QUIT:
136 return;
137 case COM_INIT_DB:
138 comInitDB(limited_payload);
139 break;
140 case COM_QUERY:
141 comQuery(payload);
142 break;
143 case COM_FIELD_LIST:
144 comFieldList(limited_payload);
145 break;
146 case COM_PING:
147 comPing();
148 break;
149 default:
150 throw Exception(Poco::format("Command %d is not implemented.", command), ErrorCodes::NOT_IMPLEMENTED);
151 }
152 }
153 catch (const NetException & exc)
154 {
155 log->log(exc);
156 throw;
157 }
158 catch (const Exception & exc)
159 {
160 log->log(exc);
161 packet_sender->sendPacket(ERR_Packet(exc.code(), "00000", exc.message()), true);
162 }
163 }
164 }
165 catch (const Poco::Exception & exc)
166 {
167 log->log(exc);
168 }
169}
170
171/** Reads 3 bytes, finds out whether it is SSLRequest or HandshakeResponse packet, starts secure connection, if it is SSLRequest.
172 * Reading is performed from socket instead of ReadBuffer to prevent reading part of SSL handshake.
173 * If we read it from socket, it will be impossible to start SSL connection using Poco. Size of SSLRequest packet payload is 32 bytes, thus we can read at most 36 bytes.
174 */
175void MySQLHandler::finishHandshake(MySQLProtocol::HandshakeResponse & packet)
176{
177 size_t packet_size = PACKET_HEADER_SIZE + SSL_REQUEST_PAYLOAD_SIZE;
178
179 /// Buffer for SSLRequest or part of HandshakeResponse.
180 char buf[packet_size];
181 size_t pos = 0;
182
183 /// Reads at least count and at most packet_size bytes.
184 auto read_bytes = [this, &buf, &pos, &packet_size](size_t count) -> void {
185 while (pos < count)
186 {
187 int ret = socket().receiveBytes(buf + pos, packet_size - pos);
188 if (ret == 0)
189 {
190 throw Exception("Cannot read all data. Bytes read: " + std::to_string(pos) + ". Bytes expected: 3.", ErrorCodes::CANNOT_READ_ALL_DATA);
191 }
192 pos += ret;
193 }
194 };
195 read_bytes(3); /// We can find out whether it is SSLRequest of HandshakeResponse by first 3 bytes.
196
197 size_t payload_size = unalignedLoad<uint32_t>(buf) & 0xFFFFFFu;
198 LOG_TRACE(log, "payload size: " << payload_size);
199
200 if (payload_size == SSL_REQUEST_PAYLOAD_SIZE)
201 {
202 finishHandshakeSSL(packet_size, buf, pos, read_bytes, packet);
203 }
204 else
205 {
206 /// Reading rest of HandshakeResponse.
207 packet_size = PACKET_HEADER_SIZE + payload_size;
208 WriteBufferFromOwnString buf_for_handshake_response;
209 buf_for_handshake_response.write(buf, pos);
210 copyData(*packet_sender->in, buf_for_handshake_response, packet_size - pos);
211 ReadBufferFromString payload(buf_for_handshake_response.str());
212 payload.ignore(PACKET_HEADER_SIZE);
213 packet.readPayload(payload);
214 packet_sender->sequence_id++;
215 }
216}
217
218void MySQLHandler::authenticate(const String & user_name, const String & auth_plugin_name, const String & initial_auth_response)
219{
220 try
221 {
222 // For compatibility with JavaScript MySQL client, Native41 authentication plugin is used when possible (if password is specified using double SHA1). Otherwise SHA256 plugin is used.
223 auto user = connection_context.getUser(user_name);
224 const DB::Authentication::Type user_auth_type = user->authentication.getType();
225 if (user_auth_type != DB::Authentication::DOUBLE_SHA1_PASSWORD && user_auth_type != DB::Authentication::PLAINTEXT_PASSWORD && user_auth_type != DB::Authentication::NO_PASSWORD)
226 {
227 authPluginSSL();
228 }
229
230 std::optional<String> auth_response = auth_plugin_name == auth_plugin->getName() ? std::make_optional<String>(initial_auth_response) : std::nullopt;
231 auth_plugin->authenticate(user_name, auth_response, connection_context, packet_sender, secure_connection, socket().peerAddress());
232 }
233 catch (const Exception & exc)
234 {
235 LOG_ERROR(log, "Authentication for user " << user_name << " failed.");
236 packet_sender->sendPacket(ERR_Packet(exc.code(), "00000", exc.message()), true);
237 throw;
238 }
239 LOG_INFO(log, "Authentication for user " << user_name << " succeeded.");
240}
241
242void MySQLHandler::comInitDB(ReadBuffer & payload)
243{
244 String database;
245 readStringUntilEOF(database, payload);
246 LOG_DEBUG(log, "Setting current database to " << database);
247 connection_context.setCurrentDatabase(database);
248 packet_sender->sendPacket(OK_Packet(0, client_capability_flags, 0, 0, 1), true);
249}
250
251void MySQLHandler::comFieldList(ReadBuffer & payload)
252{
253 ComFieldList packet;
254 packet.readPayload(payload);
255 String database = connection_context.getCurrentDatabase();
256 StoragePtr tablePtr = connection_context.getTable(database, packet.table);
257 for (const NameAndTypePair & column: tablePtr->getColumns().getAll())
258 {
259 ColumnDefinition column_definition(
260 database, packet.table, packet.table, column.name, column.name, CharacterSet::binary, 100, ColumnType::MYSQL_TYPE_STRING, 0, 0
261 );
262 packet_sender->sendPacket(column_definition);
263 }
264 packet_sender->sendPacket(OK_Packet(0xfe, client_capability_flags, 0, 0, 0), true);
265}
266
267void MySQLHandler::comPing()
268{
269 packet_sender->sendPacket(OK_Packet(0x0, client_capability_flags, 0, 0, 0), true);
270}
271
272static bool isFederatedServerSetupCommand(const String & query);
273
274void MySQLHandler::comQuery(ReadBuffer & payload)
275{
276 String query = String(payload.position(), payload.buffer().end());
277
278 // This is a workaround in order to support adding ClickHouse to MySQL using federated server.
279 // As Clickhouse doesn't support these statements, we just send OK packet in response.
280 if (isFederatedServerSetupCommand(query))
281 {
282 packet_sender->sendPacket(OK_Packet(0x00, client_capability_flags, 0, 0, 0), true);
283 }
284 else
285 {
286 bool with_output = false;
287 std::function<void(const String &)> set_content_type = [&with_output](const String &) -> void {
288 with_output = true;
289 };
290
291 String replacement_query = "select ''";
292 bool should_replace = false;
293
294 // Translate query from MySQL to ClickHouse.
295 // This is a temporary workaround until ClickHouse supports the syntax "@@var_name".
296 if (query == "select @@version_comment limit 1") // MariaDB client starts session with that query
297 {
298 should_replace = true;
299 }
300 // This is a workaround in order to support adding ClickHouse to MySQL using federated server.
301 if (0 == strncasecmp("SHOW TABLE STATUS LIKE", query.c_str(), 22))
302 {
303 should_replace = true;
304 replacement_query = boost::replace_all_copy(query, "SHOW TABLE STATUS LIKE ", show_table_status_replacement_query);
305 }
306
307 ReadBufferFromString replacement(replacement_query);
308
309 Context query_context = connection_context;
310 executeQuery(should_replace ? replacement : payload, *out, true, query_context, set_content_type, nullptr);
311
312 if (!with_output)
313 packet_sender->sendPacket(OK_Packet(0x00, client_capability_flags, 0, 0, 0), true);
314 }
315}
316
317void MySQLHandler::authPluginSSL()
318{
319 throw Exception("ClickHouse was built without SSL support. Try specifying password using double SHA1 in users.xml.", ErrorCodes::SUPPORT_IS_DISABLED);
320}
321
322void MySQLHandler::finishHandshakeSSL([[maybe_unused]] size_t packet_size, [[maybe_unused]] char * buf, [[maybe_unused]] size_t pos, [[maybe_unused]] std::function<void(size_t)> read_bytes, [[maybe_unused]] MySQLProtocol::HandshakeResponse & packet)
323{
324 throw Exception("Client requested SSL, while it is disabled.", ErrorCodes::SUPPORT_IS_DISABLED);
325}
326
327#if USE_SSL && USE_POCO_NETSSL
328MySQLHandlerSSL::MySQLHandlerSSL(IServer & server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, size_t connection_id_, RSA & public_key_, RSA & private_key_)
329 : MySQLHandler(server_, socket_, ssl_enabled, connection_id_)
330 , public_key(public_key_)
331 , private_key(private_key_)
332{}
333
334void MySQLHandlerSSL::authPluginSSL()
335{
336 auth_plugin = std::make_unique<MySQLProtocol::Authentication::Sha256Password>(public_key, private_key, log);
337}
338
339void MySQLHandlerSSL::finishHandshakeSSL(size_t packet_size, char * buf, size_t pos, std::function<void(size_t)> read_bytes, MySQLProtocol::HandshakeResponse & packet)
340{
341 read_bytes(packet_size); /// Reading rest SSLRequest.
342 SSLRequest ssl_request;
343 ReadBufferFromMemory payload(buf, pos);
344 payload.ignore(PACKET_HEADER_SIZE);
345 ssl_request.readPayload(payload);
346 connection_context.mysql.client_capabilities = ssl_request.capability_flags;
347 connection_context.mysql.max_packet_size = ssl_request.max_packet_size ? ssl_request.max_packet_size : MAX_PACKET_LENGTH;
348 secure_connection = true;
349 ss = std::make_shared<SecureStreamSocket>(SecureStreamSocket::attach(socket(), SSLManager::instance().defaultServerContext()));
350 in = std::make_shared<ReadBufferFromPocoSocket>(*ss);
351 out = std::make_shared<WriteBufferFromPocoSocket>(*ss);
352 connection_context.mysql.sequence_id = 2;
353 packet_sender = std::make_shared<PacketSender>(*in, *out, connection_context.mysql.sequence_id);
354 packet_sender->max_packet_size = connection_context.mysql.max_packet_size;
355 packet_sender->receivePacket(packet); /// Reading HandshakeResponse from secure socket.
356}
357
358#endif
359
360static bool isFederatedServerSetupCommand(const String & query)
361{
362 return 0 == strncasecmp("SET NAMES", query.c_str(), 9) || 0 == strncasecmp("SET character_set_results", query.c_str(), 25)
363 || 0 == strncasecmp("SET FOREIGN_KEY_CHECKS", query.c_str(), 22) || 0 == strncasecmp("SET AUTOCOMMIT", query.c_str(), 14)
364 || 0 == strncasecmp("SET SESSION TRANSACTION ISOLATION LEVEL", query.c_str(), 39);
365}
366
367const String MySQLHandler::show_table_status_replacement_query("SELECT"
368 " name AS Name,"
369 " engine AS Engine,"
370 " '10' AS Version,"
371 " 'Dynamic' AS Row_format,"
372 " 0 AS Rows,"
373 " 0 AS Avg_row_length,"
374 " 0 AS Data_length,"
375 " 0 AS Max_data_length,"
376 " 0 AS Index_length,"
377 " 0 AS Data_free,"
378 " 'NULL' AS Auto_increment,"
379 " metadata_modification_time AS Create_time,"
380 " metadata_modification_time AS Update_time,"
381 " metadata_modification_time AS Check_time,"
382 " 'utf8_bin' AS Collation,"
383 " 'NULL' AS Checksum,"
384 " '' AS Create_options,"
385 " '' AS Comment"
386 " FROM system.tables"
387 " WHERE name LIKE ");
388
389}
390