| 1 | #include "HTTPHandler.h" |
| 2 | |
| 3 | #include <chrono> |
| 4 | #include <iomanip> |
| 5 | #include <Poco/File.h> |
| 6 | #include <Poco/Net/HTTPBasicCredentials.h> |
| 7 | #include <Poco/Net/HTTPServerRequest.h> |
| 8 | #include <Poco/Net/HTTPServerRequestImpl.h> |
| 9 | #include <Poco/Net/HTTPServerResponse.h> |
| 10 | #include <Poco/Net/NetException.h> |
| 11 | #include <ext/scope_guard.h> |
| 12 | #include <Core/ExternalTable.h> |
| 13 | #include <Common/StringUtils/StringUtils.h> |
| 14 | #include <Common/escapeForFileName.h> |
| 15 | #include <Common/getFQDNOrHostName.h> |
| 16 | #include <Common/CurrentThread.h> |
| 17 | #include <Common/setThreadName.h> |
| 18 | #include <Common/config.h> |
| 19 | #include <Common/SettingsChanges.h> |
| 20 | #include <Compression/CompressedReadBuffer.h> |
| 21 | #include <Compression/CompressedWriteBuffer.h> |
| 22 | #include <IO/ReadBufferFromIStream.h> |
| 23 | #include <IO/ZlibInflatingReadBuffer.h> |
| 24 | #include <IO/BrotliReadBuffer.h> |
| 25 | #include <IO/ReadBufferFromString.h> |
| 26 | #include <IO/WriteBufferFromString.h> |
| 27 | #include <IO/WriteBufferFromHTTPServerResponse.h> |
| 28 | #include <IO/WriteBufferFromFile.h> |
| 29 | #include <IO/WriteHelpers.h> |
| 30 | #include <IO/copyData.h> |
| 31 | #include <IO/ConcatReadBuffer.h> |
| 32 | #include <IO/CascadeWriteBuffer.h> |
| 33 | #include <IO/MemoryReadWriteBuffer.h> |
| 34 | #include <IO/WriteBufferFromTemporaryFile.h> |
| 35 | #include <DataStreams/IBlockInputStream.h> |
| 36 | #include <Interpreters/executeQuery.h> |
| 37 | #include <Common/typeid_cast.h> |
| 38 | #include <Poco/Net/HTTPStream.h> |
| 39 | |
| 40 | namespace DB |
| 41 | { |
| 42 | |
| 43 | namespace ErrorCodes |
| 44 | { |
| 45 | extern const int READONLY; |
| 46 | extern const int UNKNOWN_COMPRESSION_METHOD; |
| 47 | |
| 48 | extern const int CANNOT_PARSE_TEXT; |
| 49 | extern const int CANNOT_PARSE_ESCAPE_SEQUENCE; |
| 50 | extern const int CANNOT_PARSE_QUOTED_STRING; |
| 51 | extern const int CANNOT_PARSE_DATE; |
| 52 | extern const int CANNOT_PARSE_DATETIME; |
| 53 | extern const int CANNOT_PARSE_NUMBER; |
| 54 | extern const int CANNOT_OPEN_FILE; |
| 55 | |
| 56 | extern const int UNKNOWN_ELEMENT_IN_AST; |
| 57 | extern const int UNKNOWN_TYPE_OF_AST_NODE; |
| 58 | extern const int TOO_DEEP_AST; |
| 59 | extern const int TOO_BIG_AST; |
| 60 | extern const int UNEXPECTED_AST_STRUCTURE; |
| 61 | |
| 62 | extern const int SYNTAX_ERROR; |
| 63 | |
| 64 | extern const int INCORRECT_DATA; |
| 65 | extern const int TYPE_MISMATCH; |
| 66 | |
| 67 | extern const int UNKNOWN_TABLE; |
| 68 | extern const int UNKNOWN_FUNCTION; |
| 69 | extern const int UNKNOWN_IDENTIFIER; |
| 70 | extern const int UNKNOWN_TYPE; |
| 71 | extern const int UNKNOWN_STORAGE; |
| 72 | extern const int UNKNOWN_DATABASE; |
| 73 | extern const int UNKNOWN_SETTING; |
| 74 | extern const int UNKNOWN_DIRECTION_OF_SORTING; |
| 75 | extern const int UNKNOWN_AGGREGATE_FUNCTION; |
| 76 | extern const int UNKNOWN_FORMAT; |
| 77 | extern const int UNKNOWN_DATABASE_ENGINE; |
| 78 | extern const int UNKNOWN_TYPE_OF_QUERY; |
| 79 | |
| 80 | extern const int QUERY_IS_TOO_LARGE; |
| 81 | |
| 82 | extern const int NOT_IMPLEMENTED; |
| 83 | extern const int SOCKET_TIMEOUT; |
| 84 | |
| 85 | extern const int UNKNOWN_USER; |
| 86 | extern const int WRONG_PASSWORD; |
| 87 | extern const int REQUIRED_PASSWORD; |
| 88 | |
| 89 | extern const int INVALID_SESSION_TIMEOUT; |
| 90 | extern const int HTTP_LENGTH_REQUIRED; |
| 91 | } |
| 92 | |
| 93 | |
| 94 | static Poco::Net::HTTPResponse::HTTPStatus exceptionCodeToHTTPStatus(int exception_code) |
| 95 | { |
| 96 | using namespace Poco::Net; |
| 97 | |
| 98 | if (exception_code == ErrorCodes::REQUIRED_PASSWORD) |
| 99 | return HTTPResponse::HTTP_UNAUTHORIZED; |
| 100 | else if (exception_code == ErrorCodes::CANNOT_PARSE_TEXT || |
| 101 | exception_code == ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE || |
| 102 | exception_code == ErrorCodes::CANNOT_PARSE_QUOTED_STRING || |
| 103 | exception_code == ErrorCodes::CANNOT_PARSE_DATE || |
| 104 | exception_code == ErrorCodes::CANNOT_PARSE_DATETIME || |
| 105 | exception_code == ErrorCodes::CANNOT_PARSE_NUMBER || |
| 106 | |
| 107 | exception_code == ErrorCodes::UNKNOWN_ELEMENT_IN_AST || |
| 108 | exception_code == ErrorCodes::UNKNOWN_TYPE_OF_AST_NODE || |
| 109 | exception_code == ErrorCodes::TOO_DEEP_AST || |
| 110 | exception_code == ErrorCodes::TOO_BIG_AST || |
| 111 | exception_code == ErrorCodes::UNEXPECTED_AST_STRUCTURE || |
| 112 | |
| 113 | exception_code == ErrorCodes::SYNTAX_ERROR || |
| 114 | |
| 115 | exception_code == ErrorCodes::INCORRECT_DATA || |
| 116 | exception_code == ErrorCodes::TYPE_MISMATCH) |
| 117 | return HTTPResponse::HTTP_BAD_REQUEST; |
| 118 | else if (exception_code == ErrorCodes::UNKNOWN_TABLE || |
| 119 | exception_code == ErrorCodes::UNKNOWN_FUNCTION || |
| 120 | exception_code == ErrorCodes::UNKNOWN_IDENTIFIER || |
| 121 | exception_code == ErrorCodes::UNKNOWN_TYPE || |
| 122 | exception_code == ErrorCodes::UNKNOWN_STORAGE || |
| 123 | exception_code == ErrorCodes::UNKNOWN_DATABASE || |
| 124 | exception_code == ErrorCodes::UNKNOWN_SETTING || |
| 125 | exception_code == ErrorCodes::UNKNOWN_DIRECTION_OF_SORTING || |
| 126 | exception_code == ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION || |
| 127 | exception_code == ErrorCodes::UNKNOWN_FORMAT || |
| 128 | exception_code == ErrorCodes::UNKNOWN_DATABASE_ENGINE || |
| 129 | |
| 130 | exception_code == ErrorCodes::UNKNOWN_TYPE_OF_QUERY) |
| 131 | return HTTPResponse::HTTP_NOT_FOUND; |
| 132 | else if (exception_code == ErrorCodes::QUERY_IS_TOO_LARGE) |
| 133 | return HTTPResponse::HTTP_REQUESTENTITYTOOLARGE; |
| 134 | else if (exception_code == ErrorCodes::NOT_IMPLEMENTED) |
| 135 | return HTTPResponse::HTTP_NOT_IMPLEMENTED; |
| 136 | else if (exception_code == ErrorCodes::SOCKET_TIMEOUT || |
| 137 | exception_code == ErrorCodes::CANNOT_OPEN_FILE) |
| 138 | return HTTPResponse::HTTP_SERVICE_UNAVAILABLE; |
| 139 | else if (exception_code == ErrorCodes::HTTP_LENGTH_REQUIRED) |
| 140 | return HTTPResponse::HTTP_LENGTH_REQUIRED; |
| 141 | |
| 142 | return HTTPResponse::HTTP_INTERNAL_SERVER_ERROR; |
| 143 | } |
| 144 | |
| 145 | |
| 146 | static std::chrono::steady_clock::duration parseSessionTimeout( |
| 147 | const Poco::Util::AbstractConfiguration & config, |
| 148 | const HTMLForm & params) |
| 149 | { |
| 150 | unsigned session_timeout = config.getInt("default_session_timeout" , 60); |
| 151 | |
| 152 | if (params.has("session_timeout" )) |
| 153 | { |
| 154 | unsigned max_session_timeout = config.getUInt("max_session_timeout" , 3600); |
| 155 | std::string session_timeout_str = params.get("session_timeout" ); |
| 156 | |
| 157 | ReadBufferFromString buf(session_timeout_str); |
| 158 | if (!tryReadIntText(session_timeout, buf) || !buf.eof()) |
| 159 | throw Exception("Invalid session timeout: '" + session_timeout_str + "'" , ErrorCodes::INVALID_SESSION_TIMEOUT); |
| 160 | |
| 161 | if (session_timeout > max_session_timeout) |
| 162 | throw Exception("Session timeout '" + session_timeout_str + "' is larger than max_session_timeout: " + toString(max_session_timeout) |
| 163 | + ". Maximum session timeout could be modified in configuration file." , |
| 164 | ErrorCodes::INVALID_SESSION_TIMEOUT); |
| 165 | } |
| 166 | |
| 167 | return std::chrono::seconds(session_timeout); |
| 168 | } |
| 169 | |
| 170 | |
| 171 | void HTTPHandler::pushDelayedResults(Output & used_output) |
| 172 | { |
| 173 | std::vector<WriteBufferPtr> write_buffers; |
| 174 | std::vector<ReadBufferPtr> read_buffers; |
| 175 | std::vector<ReadBuffer *> read_buffers_raw_ptr; |
| 176 | |
| 177 | auto cascade_buffer = typeid_cast<CascadeWriteBuffer *>(used_output.out_maybe_delayed_and_compressed.get()); |
| 178 | if (!cascade_buffer) |
| 179 | throw Exception("Expected CascadeWriteBuffer" , ErrorCodes::LOGICAL_ERROR); |
| 180 | |
| 181 | cascade_buffer->getResultBuffers(write_buffers); |
| 182 | |
| 183 | if (write_buffers.empty()) |
| 184 | throw Exception("At least one buffer is expected to overwrite result into HTTP response" , ErrorCodes::LOGICAL_ERROR); |
| 185 | |
| 186 | for (auto & write_buf : write_buffers) |
| 187 | { |
| 188 | IReadableWriteBuffer * write_buf_concrete; |
| 189 | ReadBufferPtr reread_buf; |
| 190 | |
| 191 | if (write_buf |
| 192 | && (write_buf_concrete = dynamic_cast<IReadableWriteBuffer *>(write_buf.get())) |
| 193 | && (reread_buf = write_buf_concrete->tryGetReadBuffer())) |
| 194 | { |
| 195 | read_buffers.emplace_back(reread_buf); |
| 196 | read_buffers_raw_ptr.emplace_back(reread_buf.get()); |
| 197 | } |
| 198 | } |
| 199 | |
| 200 | ConcatReadBuffer concat_read_buffer(read_buffers_raw_ptr); |
| 201 | copyData(concat_read_buffer, *used_output.out_maybe_compressed); |
| 202 | } |
| 203 | |
| 204 | |
| 205 | HTTPHandler::HTTPHandler(IServer & server_) |
| 206 | : server(server_) |
| 207 | , log(&Logger::get("HTTPHandler" )) |
| 208 | { |
| 209 | server_display_name = server.config().getString("display_name" , getFQDNOrHostName()); |
| 210 | } |
| 211 | |
| 212 | |
| 213 | void HTTPHandler::processQuery( |
| 214 | Poco::Net::HTTPServerRequest & request, |
| 215 | HTMLForm & params, |
| 216 | Poco::Net::HTTPServerResponse & response, |
| 217 | Output & used_output) |
| 218 | { |
| 219 | Context context = server.context(); |
| 220 | |
| 221 | CurrentThread::QueryScope query_scope(context); |
| 222 | |
| 223 | LOG_TRACE(log, "Request URI: " << request.getURI()); |
| 224 | |
| 225 | std::istream & istr = request.stream(); |
| 226 | |
| 227 | /// Part of the query can be passed in the 'query' parameter and the rest in the request body |
| 228 | /// (http method need not necessarily be POST). In this case the entire query consists of the |
| 229 | /// contents of the 'query' parameter, a line break and the request body. |
| 230 | std::string query_param = params.get("query" , "" ); |
| 231 | if (!query_param.empty()) |
| 232 | query_param += '\n'; |
| 233 | |
| 234 | /// The user and password can be passed by headers (similar to X-Auth-*), |
| 235 | /// which is used by load balancers to pass authentication information. |
| 236 | std::string user = request.get("X-ClickHouse-User" , "" ); |
| 237 | std::string password = request.get("X-ClickHouse-Key" , "" ); |
| 238 | std::string quota_key = request.get("X-ClickHouse-Quota" , "" ); |
| 239 | |
| 240 | if (user.empty() && password.empty() && quota_key.empty()) |
| 241 | { |
| 242 | /// User name and password can be passed using query parameters |
| 243 | /// or using HTTP Basic auth (both methods are insecure). |
| 244 | if (request.hasCredentials()) |
| 245 | { |
| 246 | Poco::Net::HTTPBasicCredentials credentials(request); |
| 247 | |
| 248 | user = credentials.getUsername(); |
| 249 | password = credentials.getPassword(); |
| 250 | } |
| 251 | else |
| 252 | { |
| 253 | user = params.get("user" , "default" ); |
| 254 | password = params.get("password" , "" ); |
| 255 | } |
| 256 | |
| 257 | quota_key = params.get("quota_key" , "" ); |
| 258 | } |
| 259 | else |
| 260 | { |
| 261 | /// It is prohibited to mix different authorization schemes. |
| 262 | if (request.hasCredentials() |
| 263 | || params.has("user" ) |
| 264 | || params.has("password" ) |
| 265 | || params.has("quota_key" )) |
| 266 | { |
| 267 | throw Exception("Invalid authentication: it is not allowed to use X-ClickHouse HTTP headers and other authentication methods simultaneously" , ErrorCodes::REQUIRED_PASSWORD); |
| 268 | } |
| 269 | } |
| 270 | |
| 271 | std::string query_id = params.get("query_id" , "" ); |
| 272 | context.setUser(user, password, request.clientAddress(), quota_key); |
| 273 | context.setCurrentQueryId(query_id); |
| 274 | |
| 275 | /// The user could specify session identifier and session timeout. |
| 276 | /// It allows to modify settings, create temporary tables and reuse them in subsequent requests. |
| 277 | |
| 278 | std::shared_ptr<Context> session; |
| 279 | String session_id; |
| 280 | std::chrono::steady_clock::duration session_timeout; |
| 281 | bool session_is_set = params.has("session_id" ); |
| 282 | const auto & config = server.config(); |
| 283 | |
| 284 | if (session_is_set) |
| 285 | { |
| 286 | session_id = params.get("session_id" ); |
| 287 | session_timeout = parseSessionTimeout(config, params); |
| 288 | std::string session_check = params.get("session_check" , "" ); |
| 289 | |
| 290 | session = context.acquireSession(session_id, session_timeout, session_check == "1" ); |
| 291 | |
| 292 | context = *session; |
| 293 | context.setSessionContext(*session); |
| 294 | } |
| 295 | |
| 296 | SCOPE_EXIT({ |
| 297 | if (session_is_set) |
| 298 | session->releaseSession(session_id, session_timeout); |
| 299 | }); |
| 300 | |
| 301 | /// The client can pass a HTTP header indicating supported compression method (gzip or deflate). |
| 302 | String http_response_compression_methods = request.get("Accept-Encoding" , "" ); |
| 303 | bool client_supports_http_compression = false; |
| 304 | CompressionMethod http_response_compression_method {}; |
| 305 | |
| 306 | if (!http_response_compression_methods.empty()) |
| 307 | { |
| 308 | /// Both gzip and deflate are supported. If the client supports both, gzip is preferred. |
| 309 | /// NOTE parsing of the list of methods is slightly incorrect. |
| 310 | if (std::string::npos != http_response_compression_methods.find("gzip" )) |
| 311 | { |
| 312 | client_supports_http_compression = true; |
| 313 | http_response_compression_method = CompressionMethod::Gzip; |
| 314 | } |
| 315 | else if (std::string::npos != http_response_compression_methods.find("deflate" )) |
| 316 | { |
| 317 | client_supports_http_compression = true; |
| 318 | http_response_compression_method = CompressionMethod::Zlib; |
| 319 | } |
| 320 | #if USE_BROTLI |
| 321 | else if (http_response_compression_methods == "br" ) |
| 322 | { |
| 323 | client_supports_http_compression = true; |
| 324 | http_response_compression_method = CompressionMethod::Brotli; |
| 325 | } |
| 326 | #endif |
| 327 | } |
| 328 | |
| 329 | /// Client can pass a 'compress' flag in the query string. In this case the query result is |
| 330 | /// compressed using internal algorithm. This is not reflected in HTTP headers. |
| 331 | bool internal_compression = params.getParsed<bool>("compress" , false); |
| 332 | |
| 333 | /// At least, we should postpone sending of first buffer_size result bytes |
| 334 | size_t buffer_size_total = std::max( |
| 335 | params.getParsed<size_t>("buffer_size" , DBMS_DEFAULT_BUFFER_SIZE), static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE)); |
| 336 | |
| 337 | /// If it is specified, the whole result will be buffered. |
| 338 | /// First ~buffer_size bytes will be buffered in memory, the remaining bytes will be stored in temporary file. |
| 339 | bool buffer_until_eof = params.getParsed<bool>("wait_end_of_query" , false); |
| 340 | |
| 341 | size_t buffer_size_http = DBMS_DEFAULT_BUFFER_SIZE; |
| 342 | size_t buffer_size_memory = (buffer_size_total > buffer_size_http) ? buffer_size_total : 0; |
| 343 | |
| 344 | unsigned keep_alive_timeout = config.getUInt("keep_alive_timeout" , 10); |
| 345 | |
| 346 | used_output.out = std::make_shared<WriteBufferFromHTTPServerResponse>( |
| 347 | request, response, keep_alive_timeout, |
| 348 | client_supports_http_compression, http_response_compression_method, buffer_size_http); |
| 349 | if (internal_compression) |
| 350 | used_output.out_maybe_compressed = std::make_shared<CompressedWriteBuffer>(*used_output.out); |
| 351 | else |
| 352 | used_output.out_maybe_compressed = used_output.out; |
| 353 | |
| 354 | if (buffer_size_memory > 0 || buffer_until_eof) |
| 355 | { |
| 356 | CascadeWriteBuffer::WriteBufferPtrs cascade_buffer1; |
| 357 | CascadeWriteBuffer::WriteBufferConstructors cascade_buffer2; |
| 358 | |
| 359 | if (buffer_size_memory > 0) |
| 360 | cascade_buffer1.emplace_back(std::make_shared<MemoryWriteBuffer>(buffer_size_memory)); |
| 361 | |
| 362 | if (buffer_until_eof) |
| 363 | { |
| 364 | std::string tmp_path_template = context.getTemporaryPath() + "http_buffers/" ; |
| 365 | |
| 366 | auto create_tmp_disk_buffer = [tmp_path_template] (const WriteBufferPtr &) |
| 367 | { |
| 368 | return WriteBufferFromTemporaryFile::create(tmp_path_template); |
| 369 | }; |
| 370 | |
| 371 | cascade_buffer2.emplace_back(std::move(create_tmp_disk_buffer)); |
| 372 | } |
| 373 | else |
| 374 | { |
| 375 | auto push_memory_buffer_and_continue = [next_buffer = used_output.out_maybe_compressed] (const WriteBufferPtr & prev_buf) |
| 376 | { |
| 377 | auto prev_memory_buffer = typeid_cast<MemoryWriteBuffer *>(prev_buf.get()); |
| 378 | if (!prev_memory_buffer) |
| 379 | throw Exception("Expected MemoryWriteBuffer" , ErrorCodes::LOGICAL_ERROR); |
| 380 | |
| 381 | auto rdbuf = prev_memory_buffer->tryGetReadBuffer(); |
| 382 | copyData(*rdbuf , *next_buffer); |
| 383 | |
| 384 | return next_buffer; |
| 385 | }; |
| 386 | |
| 387 | cascade_buffer2.emplace_back(push_memory_buffer_and_continue); |
| 388 | } |
| 389 | |
| 390 | used_output.out_maybe_delayed_and_compressed = std::make_shared<CascadeWriteBuffer>( |
| 391 | std::move(cascade_buffer1), std::move(cascade_buffer2)); |
| 392 | } |
| 393 | else |
| 394 | { |
| 395 | used_output.out_maybe_delayed_and_compressed = used_output.out_maybe_compressed; |
| 396 | } |
| 397 | |
| 398 | std::unique_ptr<ReadBuffer> in_param = std::make_unique<ReadBufferFromString>(query_param); |
| 399 | |
| 400 | std::unique_ptr<ReadBuffer> in_post_raw = std::make_unique<ReadBufferFromIStream>(istr); |
| 401 | |
| 402 | /// Request body can be compressed using algorithm specified in the Content-Encoding header. |
| 403 | std::unique_ptr<ReadBuffer> in_post; |
| 404 | String http_request_compression_method_str = request.get("Content-Encoding" , "" ); |
| 405 | if (!http_request_compression_method_str.empty()) |
| 406 | { |
| 407 | if (http_request_compression_method_str == "gzip" ) |
| 408 | { |
| 409 | in_post = std::make_unique<ZlibInflatingReadBuffer>(std::move(in_post_raw), CompressionMethod::Gzip); |
| 410 | } |
| 411 | else if (http_request_compression_method_str == "deflate" ) |
| 412 | { |
| 413 | in_post = std::make_unique<ZlibInflatingReadBuffer>(std::move(in_post_raw), CompressionMethod::Zlib); |
| 414 | } |
| 415 | #if USE_BROTLI |
| 416 | else if (http_request_compression_method_str == "br" ) |
| 417 | { |
| 418 | in_post = std::make_unique<BrotliReadBuffer>(std::move(in_post_raw)); |
| 419 | } |
| 420 | #endif |
| 421 | else |
| 422 | { |
| 423 | throw Exception("Unknown Content-Encoding of HTTP request: " + http_request_compression_method_str, |
| 424 | ErrorCodes::UNKNOWN_COMPRESSION_METHOD); |
| 425 | } |
| 426 | } |
| 427 | else |
| 428 | in_post = std::move(in_post_raw); |
| 429 | |
| 430 | /// The data can also be compressed using incompatible internal algorithm. This is indicated by |
| 431 | /// 'decompress' query parameter. |
| 432 | std::unique_ptr<ReadBuffer> in_post_maybe_compressed; |
| 433 | bool in_post_compressed = false; |
| 434 | if (params.getParsed<bool>("decompress" , false)) |
| 435 | { |
| 436 | in_post_maybe_compressed = std::make_unique<CompressedReadBuffer>(*in_post); |
| 437 | in_post_compressed = true; |
| 438 | } |
| 439 | else |
| 440 | in_post_maybe_compressed = std::move(in_post); |
| 441 | |
| 442 | std::unique_ptr<ReadBuffer> in; |
| 443 | |
| 444 | static const NameSet reserved_param_names{"query" , "compress" , "decompress" , "user" , "password" , "quota_key" , "query_id" , "stacktrace" , |
| 445 | "buffer_size" , "wait_end_of_query" , "session_id" , "session_timeout" , "session_check" }; |
| 446 | |
| 447 | Names reserved_param_suffixes; |
| 448 | |
| 449 | auto param_could_be_skipped = [&] (const String & name) |
| 450 | { |
| 451 | if (reserved_param_names.count(name)) |
| 452 | return true; |
| 453 | |
| 454 | for (const String & suffix : reserved_param_suffixes) |
| 455 | { |
| 456 | if (endsWith(name, suffix)) |
| 457 | return true; |
| 458 | } |
| 459 | |
| 460 | return false; |
| 461 | }; |
| 462 | |
| 463 | /// Settings can be overridden in the query. |
| 464 | /// Some parameters (database, default_format, everything used in the code above) do not |
| 465 | /// belong to the Settings class. |
| 466 | |
| 467 | /// 'readonly' setting values mean: |
| 468 | /// readonly = 0 - any query is allowed, client can change any setting. |
| 469 | /// readonly = 1 - only readonly queries are allowed, client can't change settings. |
| 470 | /// readonly = 2 - only readonly queries are allowed, client can change any setting except 'readonly'. |
| 471 | |
| 472 | /// In theory if initially readonly = 0, the client can change any setting and then set readonly |
| 473 | /// to some other value. |
| 474 | auto & settings = context.getSettingsRef(); |
| 475 | |
| 476 | /// Only readonly queries are allowed for HTTP GET requests. |
| 477 | if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_GET) |
| 478 | { |
| 479 | if (settings.readonly == 0) |
| 480 | settings.readonly = 2; |
| 481 | } |
| 482 | |
| 483 | bool has_external_data = startsWith(request.getContentType().data(), "multipart/form-data" ); |
| 484 | |
| 485 | if (has_external_data) |
| 486 | { |
| 487 | /// Skip unneeded parameters to avoid confusing them later with context settings or query parameters. |
| 488 | reserved_param_suffixes.reserve(3); |
| 489 | /// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings. |
| 490 | reserved_param_suffixes.emplace_back("_format" ); |
| 491 | reserved_param_suffixes.emplace_back("_types" ); |
| 492 | reserved_param_suffixes.emplace_back("_structure" ); |
| 493 | } |
| 494 | |
| 495 | SettingsChanges settings_changes; |
| 496 | for (const auto & [key, value] : params) |
| 497 | { |
| 498 | if (key == "database" ) |
| 499 | { |
| 500 | context.setCurrentDatabase(value); |
| 501 | } |
| 502 | else if (key == "default_format" ) |
| 503 | { |
| 504 | context.setDefaultFormat(value); |
| 505 | } |
| 506 | else if (param_could_be_skipped(key)) |
| 507 | { |
| 508 | } |
| 509 | else if (startsWith(key, "param_" )) |
| 510 | { |
| 511 | /// Save name and values of substitution in dictionary. |
| 512 | const String parameter_name = key.substr(strlen("param_" )); |
| 513 | context.setQueryParameter(parameter_name, value); |
| 514 | } |
| 515 | else |
| 516 | { |
| 517 | /// All other query parameters are treated as settings. |
| 518 | settings_changes.push_back({key, value}); |
| 519 | } |
| 520 | } |
| 521 | |
| 522 | /// For external data we also want settings |
| 523 | context.checkSettingsConstraints(settings_changes); |
| 524 | context.applySettingsChanges(settings_changes); |
| 525 | |
| 526 | /// Used in case of POST request with form-data, but it isn't expected to be deleted after that scope. |
| 527 | std::string full_query; |
| 528 | |
| 529 | /// Support for "external data for query processing". |
| 530 | if (has_external_data) |
| 531 | { |
| 532 | ExternalTablesHandler handler(context, params); |
| 533 | params.load(request, istr, handler); |
| 534 | |
| 535 | /// Params are of both form params POST and uri (GET params) |
| 536 | for (const auto & it : params) |
| 537 | if (it.first == "query" ) |
| 538 | full_query += it.second; |
| 539 | |
| 540 | in = std::make_unique<ReadBufferFromString>(full_query); |
| 541 | } |
| 542 | else |
| 543 | in = std::make_unique<ConcatReadBuffer>(*in_param, *in_post_maybe_compressed); |
| 544 | |
| 545 | |
| 546 | /// HTTP response compression is turned on only if the client signalled that they support it |
| 547 | /// (using Accept-Encoding header) and 'enable_http_compression' setting is turned on. |
| 548 | used_output.out->setCompression(client_supports_http_compression && settings.enable_http_compression); |
| 549 | if (client_supports_http_compression) |
| 550 | used_output.out->setCompressionLevel(settings.http_zlib_compression_level); |
| 551 | |
| 552 | used_output.out->setSendProgressInterval(settings.http_headers_progress_interval_ms); |
| 553 | |
| 554 | /// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on, |
| 555 | /// checksums of client data compressed with internal algorithm are not checked. |
| 556 | if (in_post_compressed && settings.http_native_compression_disable_checksumming_on_decompress) |
| 557 | static_cast<CompressedReadBuffer &>(*in_post_maybe_compressed).disableChecksumming(); |
| 558 | |
| 559 | /// Add CORS header if 'add_http_cors_header' setting is turned on and the client passed |
| 560 | /// Origin header. |
| 561 | used_output.out->addHeaderCORS(settings.add_http_cors_header && !request.get("Origin" , "" ).empty()); |
| 562 | |
| 563 | ClientInfo & client_info = context.getClientInfo(); |
| 564 | client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY; |
| 565 | client_info.interface = ClientInfo::Interface::HTTP; |
| 566 | |
| 567 | /// Query sent through HTTP interface is initial. |
| 568 | client_info.initial_user = client_info.current_user; |
| 569 | client_info.initial_query_id = client_info.current_query_id; |
| 570 | client_info.initial_address = client_info.current_address; |
| 571 | |
| 572 | ClientInfo::HTTPMethod http_method = ClientInfo::HTTPMethod::UNKNOWN; |
| 573 | if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_GET) |
| 574 | http_method = ClientInfo::HTTPMethod::GET; |
| 575 | else if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_POST) |
| 576 | http_method = ClientInfo::HTTPMethod::POST; |
| 577 | |
| 578 | client_info.http_method = http_method; |
| 579 | client_info.http_user_agent = request.get("User-Agent" , "" ); |
| 580 | |
| 581 | auto appendCallback = [&context] (ProgressCallback callback) |
| 582 | { |
| 583 | auto prev = context.getProgressCallback(); |
| 584 | |
| 585 | context.setProgressCallback([prev, callback] (const Progress & progress) |
| 586 | { |
| 587 | if (prev) |
| 588 | prev(progress); |
| 589 | |
| 590 | callback(progress); |
| 591 | }); |
| 592 | }; |
| 593 | |
| 594 | /// While still no data has been sent, we will report about query execution progress by sending HTTP headers. |
| 595 | if (settings.send_progress_in_http_headers) |
| 596 | appendCallback([&used_output] (const Progress & progress) { used_output.out->onProgress(progress); }); |
| 597 | |
| 598 | if (settings.readonly > 0 && settings.cancel_http_readonly_queries_on_client_close) |
| 599 | { |
| 600 | Poco::Net::StreamSocket & socket = dynamic_cast<Poco::Net::HTTPServerRequestImpl &>(request).socket(); |
| 601 | |
| 602 | appendCallback([&context, &socket](const Progress &) |
| 603 | { |
| 604 | /// Assume that at the point this method is called no one is reading data from the socket any more. |
| 605 | /// True for read-only queries. |
| 606 | try |
| 607 | { |
| 608 | char b; |
| 609 | int status = socket.receiveBytes(&b, 1, MSG_DONTWAIT | MSG_PEEK); |
| 610 | if (status == 0) |
| 611 | context.killCurrentQuery(); |
| 612 | } |
| 613 | catch (Poco::TimeoutException &) |
| 614 | { |
| 615 | } |
| 616 | catch (...) |
| 617 | { |
| 618 | context.killCurrentQuery(); |
| 619 | } |
| 620 | }); |
| 621 | } |
| 622 | |
| 623 | customizeContext(context); |
| 624 | |
| 625 | executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context, |
| 626 | [&response] (const String & content_type) { response.setContentType(content_type); }, |
| 627 | [&response] (const String & current_query_id) { response.add("X-ClickHouse-Query-Id" , current_query_id); }); |
| 628 | |
| 629 | if (used_output.hasDelayed()) |
| 630 | { |
| 631 | /// TODO: set Content-Length if possible |
| 632 | pushDelayedResults(used_output); |
| 633 | } |
| 634 | |
| 635 | /// Send HTTP headers with code 200 if no exception happened and the data is still not sent to |
| 636 | /// the client. |
| 637 | used_output.out->finalize(); |
| 638 | } |
| 639 | |
| 640 | void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_code, |
| 641 | Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, |
| 642 | Output & used_output) |
| 643 | { |
| 644 | try |
| 645 | { |
| 646 | /// If HTTP method is POST and Keep-Alive is turned on, we should read the whole request body |
| 647 | /// to avoid reading part of the current request body in the next request. |
| 648 | if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST |
| 649 | && response.getKeepAlive() |
| 650 | && !request.stream().eof() |
| 651 | && exception_code != ErrorCodes::HTTP_LENGTH_REQUIRED) |
| 652 | { |
| 653 | request.stream().ignore(std::numeric_limits<std::streamsize>::max()); |
| 654 | } |
| 655 | |
| 656 | bool auth_fail = exception_code == ErrorCodes::UNKNOWN_USER || |
| 657 | exception_code == ErrorCodes::WRONG_PASSWORD || |
| 658 | exception_code == ErrorCodes::REQUIRED_PASSWORD; |
| 659 | |
| 660 | if (auth_fail) |
| 661 | { |
| 662 | response.requireAuthentication("ClickHouse server HTTP API" ); |
| 663 | } |
| 664 | else |
| 665 | { |
| 666 | response.setStatusAndReason(exceptionCodeToHTTPStatus(exception_code)); |
| 667 | } |
| 668 | |
| 669 | if (!response.sent() && !used_output.out_maybe_compressed) |
| 670 | { |
| 671 | /// If nothing was sent yet and we don't even know if we must compress the response. |
| 672 | response.send() << s << std::endl; |
| 673 | } |
| 674 | else if (used_output.out_maybe_compressed) |
| 675 | { |
| 676 | /// Destroy CascadeBuffer to actualize buffers' positions and reset extra references |
| 677 | if (used_output.hasDelayed()) |
| 678 | used_output.out_maybe_delayed_and_compressed.reset(); |
| 679 | |
| 680 | /// Send the error message into already used (and possibly compressed) stream. |
| 681 | /// Note that the error message will possibly be sent after some data. |
| 682 | /// Also HTTP code 200 could have already been sent. |
| 683 | |
| 684 | /// If buffer has data, and that data wasn't sent yet, then no need to send that data |
| 685 | bool data_sent = used_output.out->count() != used_output.out->offset(); |
| 686 | |
| 687 | if (!data_sent) |
| 688 | { |
| 689 | used_output.out_maybe_compressed->position() = used_output.out_maybe_compressed->buffer().begin(); |
| 690 | used_output.out->position() = used_output.out->buffer().begin(); |
| 691 | } |
| 692 | |
| 693 | writeString(s, *used_output.out_maybe_compressed); |
| 694 | writeChar('\n', *used_output.out_maybe_compressed); |
| 695 | |
| 696 | used_output.out_maybe_compressed->next(); |
| 697 | used_output.out->next(); |
| 698 | used_output.out->finalize(); |
| 699 | } |
| 700 | } |
| 701 | catch (...) |
| 702 | { |
| 703 | tryLogCurrentException(log, "Cannot send exception to client" ); |
| 704 | } |
| 705 | } |
| 706 | |
| 707 | |
| 708 | void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) |
| 709 | { |
| 710 | setThreadName("HTTPHandler" ); |
| 711 | ThreadStatus thread_status; |
| 712 | |
| 713 | Output used_output; |
| 714 | |
| 715 | /// In case of exception, send stack trace to client. |
| 716 | bool with_stacktrace = false; |
| 717 | |
| 718 | try |
| 719 | { |
| 720 | response.setContentType("text/plain; charset=UTF-8" ); |
| 721 | response.set("X-ClickHouse-Server-Display-Name" , server_display_name); |
| 722 | /// For keep-alive to work. |
| 723 | if (request.getVersion() == Poco::Net::HTTPServerRequest::HTTP_1_1) |
| 724 | response.setChunkedTransferEncoding(true); |
| 725 | |
| 726 | HTMLForm params(request); |
| 727 | with_stacktrace = params.getParsed<bool>("stacktrace" , false); |
| 728 | |
| 729 | /// Workaround. Poco does not detect 411 Length Required case. |
| 730 | if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST && !request.getChunkedTransferEncoding() && |
| 731 | !request.hasContentLength()) |
| 732 | { |
| 733 | throw Exception("There is neither Transfer-Encoding header nor Content-Length header" , ErrorCodes::HTTP_LENGTH_REQUIRED); |
| 734 | } |
| 735 | |
| 736 | processQuery(request, params, response, used_output); |
| 737 | LOG_INFO(log, "Done processing query" ); |
| 738 | } |
| 739 | catch (...) |
| 740 | { |
| 741 | tryLogCurrentException(log); |
| 742 | |
| 743 | /** If exception is received from remote server, then stack trace is embedded in message. |
| 744 | * If exception is thrown on local server, then stack trace is in separate field. |
| 745 | */ |
| 746 | std::string exception_message = getCurrentExceptionMessage(with_stacktrace, true); |
| 747 | int exception_code = getCurrentExceptionCode(); |
| 748 | |
| 749 | trySendExceptionToClient(exception_message, exception_code, request, response, used_output); |
| 750 | } |
| 751 | } |
| 752 | |
| 753 | |
| 754 | } |
| 755 | |