| 1 | #include <Common/config.h> |
| 2 | |
| 3 | #include "MySQLHandler.h" |
| 4 | #include <limits> |
| 5 | #include <ext/scope_guard.h> |
| 6 | #include <Columns/ColumnVector.h> |
| 7 | #include <Common/config_version.h> |
| 8 | #include <Common/NetException.h> |
| 9 | #include <Common/OpenSSLHelpers.h> |
| 10 | #include <Core/MySQLProtocol.h> |
| 11 | #include <Core/NamesAndTypes.h> |
| 12 | #include <DataStreams/copyData.h> |
| 13 | #include <Interpreters/executeQuery.h> |
| 14 | #include <IO/ReadBufferFromPocoSocket.h> |
| 15 | #include <IO/ReadBufferFromString.h> |
| 16 | #include <IO/WriteBufferFromPocoSocket.h> |
| 17 | #include <Storages/IStorage.h> |
| 18 | #include <boost/algorithm/string/replace.hpp> |
| 19 | |
| 20 | #if USE_POCO_NETSSL |
| 21 | #include <Poco/Net/SecureStreamSocket.h> |
| 22 | #include <Poco/Net/SSLManager.h> |
| 23 | #include <Poco/Crypto/CipherFactory.h> |
| 24 | #include <Poco/Crypto/RSAKey.h> |
| 25 | #endif |
| 26 | |
| 27 | namespace DB |
| 28 | { |
| 29 | |
| 30 | using namespace MySQLProtocol; |
| 31 | |
| 32 | #if USE_POCO_NETSSL |
| 33 | using Poco::Net::SecureStreamSocket; |
| 34 | using Poco::Net::SSLManager; |
| 35 | #endif |
| 36 | |
| 37 | namespace ErrorCodes |
| 38 | { |
| 39 | extern const int MYSQL_CLIENT_INSUFFICIENT_CAPABILITIES; |
| 40 | extern const int OPENSSL_ERROR; |
| 41 | extern const int SUPPORT_IS_DISABLED; |
| 42 | } |
| 43 | |
| 44 | MySQLHandler::MySQLHandler(IServer & server_, const Poco::Net::StreamSocket & socket_, |
| 45 | bool ssl_enabled, size_t connection_id_) |
| 46 | : Poco::Net::TCPServerConnection(socket_) |
| 47 | , server(server_) |
| 48 | , log(&Poco::Logger::get("MySQLHandler" )) |
| 49 | , connection_context(server.context()) |
| 50 | , connection_id(connection_id_) |
| 51 | , auth_plugin(new MySQLProtocol::Authentication::Native41()) |
| 52 | { |
| 53 | server_capability_flags = CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION | CLIENT_PLUGIN_AUTH | CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA | CLIENT_CONNECT_WITH_DB | CLIENT_DEPRECATE_EOF; |
| 54 | if (ssl_enabled) |
| 55 | server_capability_flags |= CLIENT_SSL; |
| 56 | } |
| 57 | |
| 58 | void MySQLHandler::run() |
| 59 | { |
| 60 | connection_context.makeSessionContext(); |
| 61 | connection_context.setDefaultFormat("MySQLWire" ); |
| 62 | |
| 63 | in = std::make_shared<ReadBufferFromPocoSocket>(socket()); |
| 64 | out = std::make_shared<WriteBufferFromPocoSocket>(socket()); |
| 65 | packet_sender = std::make_shared<PacketSender>(*in, *out, connection_context.mysql.sequence_id); |
| 66 | |
| 67 | try |
| 68 | { |
| 69 | Handshake handshake(server_capability_flags, connection_id, VERSION_STRING + String("-" ) + VERSION_NAME, auth_plugin->getName(), auth_plugin->getAuthPluginData()); |
| 70 | packet_sender->sendPacket<Handshake>(handshake, true); |
| 71 | |
| 72 | LOG_TRACE(log, "Sent handshake" ); |
| 73 | |
| 74 | HandshakeResponse handshake_response; |
| 75 | finishHandshake(handshake_response); |
| 76 | connection_context.mysql.client_capabilities = handshake_response.capability_flags; |
| 77 | if (handshake_response.max_packet_size) |
| 78 | connection_context.mysql.max_packet_size = handshake_response.max_packet_size; |
| 79 | if (!connection_context.mysql.max_packet_size) |
| 80 | connection_context.mysql.max_packet_size = MAX_PACKET_LENGTH; |
| 81 | |
| 82 | /* LOG_TRACE(log, "Capabilities: " << handshake_response.capability_flags |
| 83 | << "\nmax_packet_size: " |
| 84 | << handshake_response.max_packet_size |
| 85 | << "\ncharacter_set: " |
| 86 | << handshake_response.character_set |
| 87 | << "\nuser: " |
| 88 | << handshake_response.username |
| 89 | << "\nauth_response length: " |
| 90 | << handshake_response.auth_response.length() |
| 91 | << "\nauth_response: " |
| 92 | << handshake_response.auth_response |
| 93 | << "\ndatabase: " |
| 94 | << handshake_response.database |
| 95 | << "\nauth_plugin_name: " |
| 96 | << handshake_response.auth_plugin_name);*/ |
| 97 | |
| 98 | client_capability_flags = handshake_response.capability_flags; |
| 99 | if (!(client_capability_flags & CLIENT_PROTOCOL_41)) |
| 100 | throw Exception("Required capability: CLIENT_PROTOCOL_41." , ErrorCodes::MYSQL_CLIENT_INSUFFICIENT_CAPABILITIES); |
| 101 | |
| 102 | authenticate(handshake_response.username, handshake_response.auth_plugin_name, handshake_response.auth_response); |
| 103 | |
| 104 | try |
| 105 | { |
| 106 | if (!handshake_response.database.empty()) |
| 107 | connection_context.setCurrentDatabase(handshake_response.database); |
| 108 | connection_context.setCurrentQueryId("" ); |
| 109 | } |
| 110 | catch (const Exception & exc) |
| 111 | { |
| 112 | log->log(exc); |
| 113 | packet_sender->sendPacket(ERR_Packet(exc.code(), "00000" , exc.message()), true); |
| 114 | } |
| 115 | |
| 116 | OK_Packet ok_packet(0, handshake_response.capability_flags, 0, 0, 0); |
| 117 | packet_sender->sendPacket(ok_packet, true); |
| 118 | |
| 119 | while (true) |
| 120 | { |
| 121 | packet_sender->resetSequenceId(); |
| 122 | PacketPayloadReadBuffer payload = packet_sender->getPayload(); |
| 123 | |
| 124 | char command = 0; |
| 125 | payload.readStrict(command); |
| 126 | |
| 127 | // For commands which are executed without MemoryTracker. |
| 128 | LimitReadBuffer limited_payload(payload, 10000, true, "too long MySQL packet." ); |
| 129 | |
| 130 | LOG_DEBUG(log, "Received command: " << static_cast<int>(static_cast<unsigned char>(command)) << ". Connection id: " << connection_id << "." ); |
| 131 | try |
| 132 | { |
| 133 | switch (command) |
| 134 | { |
| 135 | case COM_QUIT: |
| 136 | return; |
| 137 | case COM_INIT_DB: |
| 138 | comInitDB(limited_payload); |
| 139 | break; |
| 140 | case COM_QUERY: |
| 141 | comQuery(payload); |
| 142 | break; |
| 143 | case COM_FIELD_LIST: |
| 144 | comFieldList(limited_payload); |
| 145 | break; |
| 146 | case COM_PING: |
| 147 | comPing(); |
| 148 | break; |
| 149 | default: |
| 150 | throw Exception(Poco::format("Command %d is not implemented." , command), ErrorCodes::NOT_IMPLEMENTED); |
| 151 | } |
| 152 | } |
| 153 | catch (const NetException & exc) |
| 154 | { |
| 155 | log->log(exc); |
| 156 | throw; |
| 157 | } |
| 158 | catch (const Exception & exc) |
| 159 | { |
| 160 | log->log(exc); |
| 161 | packet_sender->sendPacket(ERR_Packet(exc.code(), "00000" , exc.message()), true); |
| 162 | } |
| 163 | } |
| 164 | } |
| 165 | catch (const Poco::Exception & exc) |
| 166 | { |
| 167 | log->log(exc); |
| 168 | } |
| 169 | } |
| 170 | |
| 171 | /** Reads 3 bytes, finds out whether it is SSLRequest or HandshakeResponse packet, starts secure connection, if it is SSLRequest. |
| 172 | * Reading is performed from socket instead of ReadBuffer to prevent reading part of SSL handshake. |
| 173 | * If we read it from socket, it will be impossible to start SSL connection using Poco. Size of SSLRequest packet payload is 32 bytes, thus we can read at most 36 bytes. |
| 174 | */ |
| 175 | void MySQLHandler::finishHandshake(MySQLProtocol::HandshakeResponse & packet) |
| 176 | { |
| 177 | size_t packet_size = PACKET_HEADER_SIZE + SSL_REQUEST_PAYLOAD_SIZE; |
| 178 | |
| 179 | /// Buffer for SSLRequest or part of HandshakeResponse. |
| 180 | char buf[packet_size]; |
| 181 | size_t pos = 0; |
| 182 | |
| 183 | /// Reads at least count and at most packet_size bytes. |
| 184 | auto read_bytes = [this, &buf, &pos, &packet_size](size_t count) -> void { |
| 185 | while (pos < count) |
| 186 | { |
| 187 | int ret = socket().receiveBytes(buf + pos, packet_size - pos); |
| 188 | if (ret == 0) |
| 189 | { |
| 190 | throw Exception("Cannot read all data. Bytes read: " + std::to_string(pos) + ". Bytes expected: 3." , ErrorCodes::CANNOT_READ_ALL_DATA); |
| 191 | } |
| 192 | pos += ret; |
| 193 | } |
| 194 | }; |
| 195 | read_bytes(3); /// We can find out whether it is SSLRequest of HandshakeResponse by first 3 bytes. |
| 196 | |
| 197 | size_t payload_size = unalignedLoad<uint32_t>(buf) & 0xFFFFFFu; |
| 198 | LOG_TRACE(log, "payload size: " << payload_size); |
| 199 | |
| 200 | if (payload_size == SSL_REQUEST_PAYLOAD_SIZE) |
| 201 | { |
| 202 | finishHandshakeSSL(packet_size, buf, pos, read_bytes, packet); |
| 203 | } |
| 204 | else |
| 205 | { |
| 206 | /// Reading rest of HandshakeResponse. |
| 207 | packet_size = PACKET_HEADER_SIZE + payload_size; |
| 208 | WriteBufferFromOwnString buf_for_handshake_response; |
| 209 | buf_for_handshake_response.write(buf, pos); |
| 210 | copyData(*packet_sender->in, buf_for_handshake_response, packet_size - pos); |
| 211 | ReadBufferFromString payload(buf_for_handshake_response.str()); |
| 212 | payload.ignore(PACKET_HEADER_SIZE); |
| 213 | packet.readPayload(payload); |
| 214 | packet_sender->sequence_id++; |
| 215 | } |
| 216 | } |
| 217 | |
| 218 | void MySQLHandler::authenticate(const String & user_name, const String & auth_plugin_name, const String & initial_auth_response) |
| 219 | { |
| 220 | try |
| 221 | { |
| 222 | // For compatibility with JavaScript MySQL client, Native41 authentication plugin is used when possible (if password is specified using double SHA1). Otherwise SHA256 plugin is used. |
| 223 | auto user = connection_context.getUser(user_name); |
| 224 | const DB::Authentication::Type user_auth_type = user->authentication.getType(); |
| 225 | if (user_auth_type != DB::Authentication::DOUBLE_SHA1_PASSWORD && user_auth_type != DB::Authentication::PLAINTEXT_PASSWORD && user_auth_type != DB::Authentication::NO_PASSWORD) |
| 226 | { |
| 227 | authPluginSSL(); |
| 228 | } |
| 229 | |
| 230 | std::optional<String> auth_response = auth_plugin_name == auth_plugin->getName() ? std::make_optional<String>(initial_auth_response) : std::nullopt; |
| 231 | auth_plugin->authenticate(user_name, auth_response, connection_context, packet_sender, secure_connection, socket().peerAddress()); |
| 232 | } |
| 233 | catch (const Exception & exc) |
| 234 | { |
| 235 | LOG_ERROR(log, "Authentication for user " << user_name << " failed." ); |
| 236 | packet_sender->sendPacket(ERR_Packet(exc.code(), "00000" , exc.message()), true); |
| 237 | throw; |
| 238 | } |
| 239 | LOG_INFO(log, "Authentication for user " << user_name << " succeeded." ); |
| 240 | } |
| 241 | |
| 242 | void MySQLHandler::comInitDB(ReadBuffer & payload) |
| 243 | { |
| 244 | String database; |
| 245 | readStringUntilEOF(database, payload); |
| 246 | LOG_DEBUG(log, "Setting current database to " << database); |
| 247 | connection_context.setCurrentDatabase(database); |
| 248 | packet_sender->sendPacket(OK_Packet(0, client_capability_flags, 0, 0, 1), true); |
| 249 | } |
| 250 | |
| 251 | void MySQLHandler::comFieldList(ReadBuffer & payload) |
| 252 | { |
| 253 | ComFieldList packet; |
| 254 | packet.readPayload(payload); |
| 255 | String database = connection_context.getCurrentDatabase(); |
| 256 | StoragePtr tablePtr = connection_context.getTable(database, packet.table); |
| 257 | for (const NameAndTypePair & column: tablePtr->getColumns().getAll()) |
| 258 | { |
| 259 | ColumnDefinition column_definition( |
| 260 | database, packet.table, packet.table, column.name, column.name, CharacterSet::binary, 100, ColumnType::MYSQL_TYPE_STRING, 0, 0 |
| 261 | ); |
| 262 | packet_sender->sendPacket(column_definition); |
| 263 | } |
| 264 | packet_sender->sendPacket(OK_Packet(0xfe, client_capability_flags, 0, 0, 0), true); |
| 265 | } |
| 266 | |
| 267 | void MySQLHandler::comPing() |
| 268 | { |
| 269 | packet_sender->sendPacket(OK_Packet(0x0, client_capability_flags, 0, 0, 0), true); |
| 270 | } |
| 271 | |
| 272 | static bool isFederatedServerSetupCommand(const String & query); |
| 273 | |
| 274 | void MySQLHandler::comQuery(ReadBuffer & payload) |
| 275 | { |
| 276 | String query = String(payload.position(), payload.buffer().end()); |
| 277 | |
| 278 | // This is a workaround in order to support adding ClickHouse to MySQL using federated server. |
| 279 | // As Clickhouse doesn't support these statements, we just send OK packet in response. |
| 280 | if (isFederatedServerSetupCommand(query)) |
| 281 | { |
| 282 | packet_sender->sendPacket(OK_Packet(0x00, client_capability_flags, 0, 0, 0), true); |
| 283 | } |
| 284 | else |
| 285 | { |
| 286 | bool with_output = false; |
| 287 | std::function<void(const String &)> set_content_type = [&with_output](const String &) -> void { |
| 288 | with_output = true; |
| 289 | }; |
| 290 | |
| 291 | String replacement_query = "select ''" ; |
| 292 | bool should_replace = false; |
| 293 | |
| 294 | // Translate query from MySQL to ClickHouse. |
| 295 | // This is a temporary workaround until ClickHouse supports the syntax "@@var_name". |
| 296 | if (query == "select @@version_comment limit 1" ) // MariaDB client starts session with that query |
| 297 | { |
| 298 | should_replace = true; |
| 299 | } |
| 300 | // This is a workaround in order to support adding ClickHouse to MySQL using federated server. |
| 301 | if (0 == strncasecmp("SHOW TABLE STATUS LIKE" , query.c_str(), 22)) |
| 302 | { |
| 303 | should_replace = true; |
| 304 | replacement_query = boost::replace_all_copy(query, "SHOW TABLE STATUS LIKE " , show_table_status_replacement_query); |
| 305 | } |
| 306 | |
| 307 | ReadBufferFromString replacement(replacement_query); |
| 308 | |
| 309 | Context query_context = connection_context; |
| 310 | executeQuery(should_replace ? replacement : payload, *out, true, query_context, set_content_type, nullptr); |
| 311 | |
| 312 | if (!with_output) |
| 313 | packet_sender->sendPacket(OK_Packet(0x00, client_capability_flags, 0, 0, 0), true); |
| 314 | } |
| 315 | } |
| 316 | |
| 317 | void MySQLHandler::authPluginSSL() |
| 318 | { |
| 319 | throw Exception("ClickHouse was built without SSL support. Try specifying password using double SHA1 in users.xml." , ErrorCodes::SUPPORT_IS_DISABLED); |
| 320 | } |
| 321 | |
| 322 | void MySQLHandler::finishHandshakeSSL([[maybe_unused]] size_t packet_size, [[maybe_unused]] char * buf, [[maybe_unused]] size_t pos, [[maybe_unused]] std::function<void(size_t)> read_bytes, [[maybe_unused]] MySQLProtocol::HandshakeResponse & packet) |
| 323 | { |
| 324 | throw Exception("Client requested SSL, while it is disabled." , ErrorCodes::SUPPORT_IS_DISABLED); |
| 325 | } |
| 326 | |
| 327 | #if USE_SSL && USE_POCO_NETSSL |
| 328 | MySQLHandlerSSL::MySQLHandlerSSL(IServer & server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, size_t connection_id_, RSA & public_key_, RSA & private_key_) |
| 329 | : MySQLHandler(server_, socket_, ssl_enabled, connection_id_) |
| 330 | , public_key(public_key_) |
| 331 | , private_key(private_key_) |
| 332 | {} |
| 333 | |
| 334 | void MySQLHandlerSSL::authPluginSSL() |
| 335 | { |
| 336 | auth_plugin = std::make_unique<MySQLProtocol::Authentication::Sha256Password>(public_key, private_key, log); |
| 337 | } |
| 338 | |
| 339 | void MySQLHandlerSSL::finishHandshakeSSL(size_t packet_size, char * buf, size_t pos, std::function<void(size_t)> read_bytes, MySQLProtocol::HandshakeResponse & packet) |
| 340 | { |
| 341 | read_bytes(packet_size); /// Reading rest SSLRequest. |
| 342 | SSLRequest ssl_request; |
| 343 | ReadBufferFromMemory payload(buf, pos); |
| 344 | payload.ignore(PACKET_HEADER_SIZE); |
| 345 | ssl_request.readPayload(payload); |
| 346 | connection_context.mysql.client_capabilities = ssl_request.capability_flags; |
| 347 | connection_context.mysql.max_packet_size = ssl_request.max_packet_size ? ssl_request.max_packet_size : MAX_PACKET_LENGTH; |
| 348 | secure_connection = true; |
| 349 | ss = std::make_shared<SecureStreamSocket>(SecureStreamSocket::attach(socket(), SSLManager::instance().defaultServerContext())); |
| 350 | in = std::make_shared<ReadBufferFromPocoSocket>(*ss); |
| 351 | out = std::make_shared<WriteBufferFromPocoSocket>(*ss); |
| 352 | connection_context.mysql.sequence_id = 2; |
| 353 | packet_sender = std::make_shared<PacketSender>(*in, *out, connection_context.mysql.sequence_id); |
| 354 | packet_sender->max_packet_size = connection_context.mysql.max_packet_size; |
| 355 | packet_sender->receivePacket(packet); /// Reading HandshakeResponse from secure socket. |
| 356 | } |
| 357 | |
| 358 | #endif |
| 359 | |
| 360 | static bool isFederatedServerSetupCommand(const String & query) |
| 361 | { |
| 362 | return 0 == strncasecmp("SET NAMES" , query.c_str(), 9) || 0 == strncasecmp("SET character_set_results" , query.c_str(), 25) |
| 363 | || 0 == strncasecmp("SET FOREIGN_KEY_CHECKS" , query.c_str(), 22) || 0 == strncasecmp("SET AUTOCOMMIT" , query.c_str(), 14) |
| 364 | || 0 == strncasecmp("SET SESSION TRANSACTION ISOLATION LEVEL" , query.c_str(), 39); |
| 365 | } |
| 366 | |
| 367 | const String MySQLHandler::show_table_status_replacement_query("SELECT" |
| 368 | " name AS Name," |
| 369 | " engine AS Engine," |
| 370 | " '10' AS Version," |
| 371 | " 'Dynamic' AS Row_format," |
| 372 | " 0 AS Rows," |
| 373 | " 0 AS Avg_row_length," |
| 374 | " 0 AS Data_length," |
| 375 | " 0 AS Max_data_length," |
| 376 | " 0 AS Index_length," |
| 377 | " 0 AS Data_free," |
| 378 | " 'NULL' AS Auto_increment," |
| 379 | " metadata_modification_time AS Create_time," |
| 380 | " metadata_modification_time AS Update_time," |
| 381 | " metadata_modification_time AS Check_time," |
| 382 | " 'utf8_bin' AS Collation," |
| 383 | " 'NULL' AS Checksum," |
| 384 | " '' AS Create_options," |
| 385 | " '' AS Comment" |
| 386 | " FROM system.tables" |
| 387 | " WHERE name LIKE " ); |
| 388 | |
| 389 | } |
| 390 | |