1 | #include "HTTPHandler.h" |
2 | |
3 | #include <chrono> |
4 | #include <iomanip> |
5 | #include <Poco/File.h> |
6 | #include <Poco/Net/HTTPBasicCredentials.h> |
7 | #include <Poco/Net/HTTPServerRequest.h> |
8 | #include <Poco/Net/HTTPServerRequestImpl.h> |
9 | #include <Poco/Net/HTTPServerResponse.h> |
10 | #include <Poco/Net/NetException.h> |
11 | #include <ext/scope_guard.h> |
12 | #include <Core/ExternalTable.h> |
13 | #include <Common/StringUtils/StringUtils.h> |
14 | #include <Common/escapeForFileName.h> |
15 | #include <Common/getFQDNOrHostName.h> |
16 | #include <Common/CurrentThread.h> |
17 | #include <Common/setThreadName.h> |
18 | #include <Common/config.h> |
19 | #include <Common/SettingsChanges.h> |
20 | #include <Compression/CompressedReadBuffer.h> |
21 | #include <Compression/CompressedWriteBuffer.h> |
22 | #include <IO/ReadBufferFromIStream.h> |
23 | #include <IO/ZlibInflatingReadBuffer.h> |
24 | #include <IO/BrotliReadBuffer.h> |
25 | #include <IO/ReadBufferFromString.h> |
26 | #include <IO/WriteBufferFromString.h> |
27 | #include <IO/WriteBufferFromHTTPServerResponse.h> |
28 | #include <IO/WriteBufferFromFile.h> |
29 | #include <IO/WriteHelpers.h> |
30 | #include <IO/copyData.h> |
31 | #include <IO/ConcatReadBuffer.h> |
32 | #include <IO/CascadeWriteBuffer.h> |
33 | #include <IO/MemoryReadWriteBuffer.h> |
34 | #include <IO/WriteBufferFromTemporaryFile.h> |
35 | #include <DataStreams/IBlockInputStream.h> |
36 | #include <Interpreters/executeQuery.h> |
37 | #include <Common/typeid_cast.h> |
38 | #include <Poco/Net/HTTPStream.h> |
39 | |
40 | namespace DB |
41 | { |
42 | |
43 | namespace ErrorCodes |
44 | { |
45 | extern const int READONLY; |
46 | extern const int UNKNOWN_COMPRESSION_METHOD; |
47 | |
48 | extern const int CANNOT_PARSE_TEXT; |
49 | extern const int CANNOT_PARSE_ESCAPE_SEQUENCE; |
50 | extern const int CANNOT_PARSE_QUOTED_STRING; |
51 | extern const int CANNOT_PARSE_DATE; |
52 | extern const int CANNOT_PARSE_DATETIME; |
53 | extern const int CANNOT_PARSE_NUMBER; |
54 | extern const int CANNOT_OPEN_FILE; |
55 | |
56 | extern const int UNKNOWN_ELEMENT_IN_AST; |
57 | extern const int UNKNOWN_TYPE_OF_AST_NODE; |
58 | extern const int TOO_DEEP_AST; |
59 | extern const int TOO_BIG_AST; |
60 | extern const int UNEXPECTED_AST_STRUCTURE; |
61 | |
62 | extern const int SYNTAX_ERROR; |
63 | |
64 | extern const int INCORRECT_DATA; |
65 | extern const int TYPE_MISMATCH; |
66 | |
67 | extern const int UNKNOWN_TABLE; |
68 | extern const int UNKNOWN_FUNCTION; |
69 | extern const int UNKNOWN_IDENTIFIER; |
70 | extern const int UNKNOWN_TYPE; |
71 | extern const int UNKNOWN_STORAGE; |
72 | extern const int UNKNOWN_DATABASE; |
73 | extern const int UNKNOWN_SETTING; |
74 | extern const int UNKNOWN_DIRECTION_OF_SORTING; |
75 | extern const int UNKNOWN_AGGREGATE_FUNCTION; |
76 | extern const int UNKNOWN_FORMAT; |
77 | extern const int UNKNOWN_DATABASE_ENGINE; |
78 | extern const int UNKNOWN_TYPE_OF_QUERY; |
79 | |
80 | extern const int QUERY_IS_TOO_LARGE; |
81 | |
82 | extern const int NOT_IMPLEMENTED; |
83 | extern const int SOCKET_TIMEOUT; |
84 | |
85 | extern const int UNKNOWN_USER; |
86 | extern const int WRONG_PASSWORD; |
87 | extern const int REQUIRED_PASSWORD; |
88 | |
89 | extern const int INVALID_SESSION_TIMEOUT; |
90 | extern const int HTTP_LENGTH_REQUIRED; |
91 | } |
92 | |
93 | |
94 | static Poco::Net::HTTPResponse::HTTPStatus exceptionCodeToHTTPStatus(int exception_code) |
95 | { |
96 | using namespace Poco::Net; |
97 | |
98 | if (exception_code == ErrorCodes::REQUIRED_PASSWORD) |
99 | return HTTPResponse::HTTP_UNAUTHORIZED; |
100 | else if (exception_code == ErrorCodes::CANNOT_PARSE_TEXT || |
101 | exception_code == ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE || |
102 | exception_code == ErrorCodes::CANNOT_PARSE_QUOTED_STRING || |
103 | exception_code == ErrorCodes::CANNOT_PARSE_DATE || |
104 | exception_code == ErrorCodes::CANNOT_PARSE_DATETIME || |
105 | exception_code == ErrorCodes::CANNOT_PARSE_NUMBER || |
106 | |
107 | exception_code == ErrorCodes::UNKNOWN_ELEMENT_IN_AST || |
108 | exception_code == ErrorCodes::UNKNOWN_TYPE_OF_AST_NODE || |
109 | exception_code == ErrorCodes::TOO_DEEP_AST || |
110 | exception_code == ErrorCodes::TOO_BIG_AST || |
111 | exception_code == ErrorCodes::UNEXPECTED_AST_STRUCTURE || |
112 | |
113 | exception_code == ErrorCodes::SYNTAX_ERROR || |
114 | |
115 | exception_code == ErrorCodes::INCORRECT_DATA || |
116 | exception_code == ErrorCodes::TYPE_MISMATCH) |
117 | return HTTPResponse::HTTP_BAD_REQUEST; |
118 | else if (exception_code == ErrorCodes::UNKNOWN_TABLE || |
119 | exception_code == ErrorCodes::UNKNOWN_FUNCTION || |
120 | exception_code == ErrorCodes::UNKNOWN_IDENTIFIER || |
121 | exception_code == ErrorCodes::UNKNOWN_TYPE || |
122 | exception_code == ErrorCodes::UNKNOWN_STORAGE || |
123 | exception_code == ErrorCodes::UNKNOWN_DATABASE || |
124 | exception_code == ErrorCodes::UNKNOWN_SETTING || |
125 | exception_code == ErrorCodes::UNKNOWN_DIRECTION_OF_SORTING || |
126 | exception_code == ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION || |
127 | exception_code == ErrorCodes::UNKNOWN_FORMAT || |
128 | exception_code == ErrorCodes::UNKNOWN_DATABASE_ENGINE || |
129 | |
130 | exception_code == ErrorCodes::UNKNOWN_TYPE_OF_QUERY) |
131 | return HTTPResponse::HTTP_NOT_FOUND; |
132 | else if (exception_code == ErrorCodes::QUERY_IS_TOO_LARGE) |
133 | return HTTPResponse::HTTP_REQUESTENTITYTOOLARGE; |
134 | else if (exception_code == ErrorCodes::NOT_IMPLEMENTED) |
135 | return HTTPResponse::HTTP_NOT_IMPLEMENTED; |
136 | else if (exception_code == ErrorCodes::SOCKET_TIMEOUT || |
137 | exception_code == ErrorCodes::CANNOT_OPEN_FILE) |
138 | return HTTPResponse::HTTP_SERVICE_UNAVAILABLE; |
139 | else if (exception_code == ErrorCodes::HTTP_LENGTH_REQUIRED) |
140 | return HTTPResponse::HTTP_LENGTH_REQUIRED; |
141 | |
142 | return HTTPResponse::HTTP_INTERNAL_SERVER_ERROR; |
143 | } |
144 | |
145 | |
146 | static std::chrono::steady_clock::duration parseSessionTimeout( |
147 | const Poco::Util::AbstractConfiguration & config, |
148 | const HTMLForm & params) |
149 | { |
150 | unsigned session_timeout = config.getInt("default_session_timeout" , 60); |
151 | |
152 | if (params.has("session_timeout" )) |
153 | { |
154 | unsigned max_session_timeout = config.getUInt("max_session_timeout" , 3600); |
155 | std::string session_timeout_str = params.get("session_timeout" ); |
156 | |
157 | ReadBufferFromString buf(session_timeout_str); |
158 | if (!tryReadIntText(session_timeout, buf) || !buf.eof()) |
159 | throw Exception("Invalid session timeout: '" + session_timeout_str + "'" , ErrorCodes::INVALID_SESSION_TIMEOUT); |
160 | |
161 | if (session_timeout > max_session_timeout) |
162 | throw Exception("Session timeout '" + session_timeout_str + "' is larger than max_session_timeout: " + toString(max_session_timeout) |
163 | + ". Maximum session timeout could be modified in configuration file." , |
164 | ErrorCodes::INVALID_SESSION_TIMEOUT); |
165 | } |
166 | |
167 | return std::chrono::seconds(session_timeout); |
168 | } |
169 | |
170 | |
171 | void HTTPHandler::pushDelayedResults(Output & used_output) |
172 | { |
173 | std::vector<WriteBufferPtr> write_buffers; |
174 | std::vector<ReadBufferPtr> read_buffers; |
175 | std::vector<ReadBuffer *> read_buffers_raw_ptr; |
176 | |
177 | auto cascade_buffer = typeid_cast<CascadeWriteBuffer *>(used_output.out_maybe_delayed_and_compressed.get()); |
178 | if (!cascade_buffer) |
179 | throw Exception("Expected CascadeWriteBuffer" , ErrorCodes::LOGICAL_ERROR); |
180 | |
181 | cascade_buffer->getResultBuffers(write_buffers); |
182 | |
183 | if (write_buffers.empty()) |
184 | throw Exception("At least one buffer is expected to overwrite result into HTTP response" , ErrorCodes::LOGICAL_ERROR); |
185 | |
186 | for (auto & write_buf : write_buffers) |
187 | { |
188 | IReadableWriteBuffer * write_buf_concrete; |
189 | ReadBufferPtr reread_buf; |
190 | |
191 | if (write_buf |
192 | && (write_buf_concrete = dynamic_cast<IReadableWriteBuffer *>(write_buf.get())) |
193 | && (reread_buf = write_buf_concrete->tryGetReadBuffer())) |
194 | { |
195 | read_buffers.emplace_back(reread_buf); |
196 | read_buffers_raw_ptr.emplace_back(reread_buf.get()); |
197 | } |
198 | } |
199 | |
200 | ConcatReadBuffer concat_read_buffer(read_buffers_raw_ptr); |
201 | copyData(concat_read_buffer, *used_output.out_maybe_compressed); |
202 | } |
203 | |
204 | |
205 | HTTPHandler::HTTPHandler(IServer & server_) |
206 | : server(server_) |
207 | , log(&Logger::get("HTTPHandler" )) |
208 | { |
209 | server_display_name = server.config().getString("display_name" , getFQDNOrHostName()); |
210 | } |
211 | |
212 | |
213 | void HTTPHandler::processQuery( |
214 | Poco::Net::HTTPServerRequest & request, |
215 | HTMLForm & params, |
216 | Poco::Net::HTTPServerResponse & response, |
217 | Output & used_output) |
218 | { |
219 | Context context = server.context(); |
220 | |
221 | CurrentThread::QueryScope query_scope(context); |
222 | |
223 | LOG_TRACE(log, "Request URI: " << request.getURI()); |
224 | |
225 | std::istream & istr = request.stream(); |
226 | |
227 | /// Part of the query can be passed in the 'query' parameter and the rest in the request body |
228 | /// (http method need not necessarily be POST). In this case the entire query consists of the |
229 | /// contents of the 'query' parameter, a line break and the request body. |
230 | std::string query_param = params.get("query" , "" ); |
231 | if (!query_param.empty()) |
232 | query_param += '\n'; |
233 | |
234 | /// The user and password can be passed by headers (similar to X-Auth-*), |
235 | /// which is used by load balancers to pass authentication information. |
236 | std::string user = request.get("X-ClickHouse-User" , "" ); |
237 | std::string password = request.get("X-ClickHouse-Key" , "" ); |
238 | std::string quota_key = request.get("X-ClickHouse-Quota" , "" ); |
239 | |
240 | if (user.empty() && password.empty() && quota_key.empty()) |
241 | { |
242 | /// User name and password can be passed using query parameters |
243 | /// or using HTTP Basic auth (both methods are insecure). |
244 | if (request.hasCredentials()) |
245 | { |
246 | Poco::Net::HTTPBasicCredentials credentials(request); |
247 | |
248 | user = credentials.getUsername(); |
249 | password = credentials.getPassword(); |
250 | } |
251 | else |
252 | { |
253 | user = params.get("user" , "default" ); |
254 | password = params.get("password" , "" ); |
255 | } |
256 | |
257 | quota_key = params.get("quota_key" , "" ); |
258 | } |
259 | else |
260 | { |
261 | /// It is prohibited to mix different authorization schemes. |
262 | if (request.hasCredentials() |
263 | || params.has("user" ) |
264 | || params.has("password" ) |
265 | || params.has("quota_key" )) |
266 | { |
267 | throw Exception("Invalid authentication: it is not allowed to use X-ClickHouse HTTP headers and other authentication methods simultaneously" , ErrorCodes::REQUIRED_PASSWORD); |
268 | } |
269 | } |
270 | |
271 | std::string query_id = params.get("query_id" , "" ); |
272 | context.setUser(user, password, request.clientAddress(), quota_key); |
273 | context.setCurrentQueryId(query_id); |
274 | |
275 | /// The user could specify session identifier and session timeout. |
276 | /// It allows to modify settings, create temporary tables and reuse them in subsequent requests. |
277 | |
278 | std::shared_ptr<Context> session; |
279 | String session_id; |
280 | std::chrono::steady_clock::duration session_timeout; |
281 | bool session_is_set = params.has("session_id" ); |
282 | const auto & config = server.config(); |
283 | |
284 | if (session_is_set) |
285 | { |
286 | session_id = params.get("session_id" ); |
287 | session_timeout = parseSessionTimeout(config, params); |
288 | std::string session_check = params.get("session_check" , "" ); |
289 | |
290 | session = context.acquireSession(session_id, session_timeout, session_check == "1" ); |
291 | |
292 | context = *session; |
293 | context.setSessionContext(*session); |
294 | } |
295 | |
296 | SCOPE_EXIT({ |
297 | if (session_is_set) |
298 | session->releaseSession(session_id, session_timeout); |
299 | }); |
300 | |
301 | /// The client can pass a HTTP header indicating supported compression method (gzip or deflate). |
302 | String http_response_compression_methods = request.get("Accept-Encoding" , "" ); |
303 | bool client_supports_http_compression = false; |
304 | CompressionMethod http_response_compression_method {}; |
305 | |
306 | if (!http_response_compression_methods.empty()) |
307 | { |
308 | /// Both gzip and deflate are supported. If the client supports both, gzip is preferred. |
309 | /// NOTE parsing of the list of methods is slightly incorrect. |
310 | if (std::string::npos != http_response_compression_methods.find("gzip" )) |
311 | { |
312 | client_supports_http_compression = true; |
313 | http_response_compression_method = CompressionMethod::Gzip; |
314 | } |
315 | else if (std::string::npos != http_response_compression_methods.find("deflate" )) |
316 | { |
317 | client_supports_http_compression = true; |
318 | http_response_compression_method = CompressionMethod::Zlib; |
319 | } |
320 | #if USE_BROTLI |
321 | else if (http_response_compression_methods == "br" ) |
322 | { |
323 | client_supports_http_compression = true; |
324 | http_response_compression_method = CompressionMethod::Brotli; |
325 | } |
326 | #endif |
327 | } |
328 | |
329 | /// Client can pass a 'compress' flag in the query string. In this case the query result is |
330 | /// compressed using internal algorithm. This is not reflected in HTTP headers. |
331 | bool internal_compression = params.getParsed<bool>("compress" , false); |
332 | |
333 | /// At least, we should postpone sending of first buffer_size result bytes |
334 | size_t buffer_size_total = std::max( |
335 | params.getParsed<size_t>("buffer_size" , DBMS_DEFAULT_BUFFER_SIZE), static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE)); |
336 | |
337 | /// If it is specified, the whole result will be buffered. |
338 | /// First ~buffer_size bytes will be buffered in memory, the remaining bytes will be stored in temporary file. |
339 | bool buffer_until_eof = params.getParsed<bool>("wait_end_of_query" , false); |
340 | |
341 | size_t buffer_size_http = DBMS_DEFAULT_BUFFER_SIZE; |
342 | size_t buffer_size_memory = (buffer_size_total > buffer_size_http) ? buffer_size_total : 0; |
343 | |
344 | unsigned keep_alive_timeout = config.getUInt("keep_alive_timeout" , 10); |
345 | |
346 | used_output.out = std::make_shared<WriteBufferFromHTTPServerResponse>( |
347 | request, response, keep_alive_timeout, |
348 | client_supports_http_compression, http_response_compression_method, buffer_size_http); |
349 | if (internal_compression) |
350 | used_output.out_maybe_compressed = std::make_shared<CompressedWriteBuffer>(*used_output.out); |
351 | else |
352 | used_output.out_maybe_compressed = used_output.out; |
353 | |
354 | if (buffer_size_memory > 0 || buffer_until_eof) |
355 | { |
356 | CascadeWriteBuffer::WriteBufferPtrs cascade_buffer1; |
357 | CascadeWriteBuffer::WriteBufferConstructors cascade_buffer2; |
358 | |
359 | if (buffer_size_memory > 0) |
360 | cascade_buffer1.emplace_back(std::make_shared<MemoryWriteBuffer>(buffer_size_memory)); |
361 | |
362 | if (buffer_until_eof) |
363 | { |
364 | std::string tmp_path_template = context.getTemporaryPath() + "http_buffers/" ; |
365 | |
366 | auto create_tmp_disk_buffer = [tmp_path_template] (const WriteBufferPtr &) |
367 | { |
368 | return WriteBufferFromTemporaryFile::create(tmp_path_template); |
369 | }; |
370 | |
371 | cascade_buffer2.emplace_back(std::move(create_tmp_disk_buffer)); |
372 | } |
373 | else |
374 | { |
375 | auto push_memory_buffer_and_continue = [next_buffer = used_output.out_maybe_compressed] (const WriteBufferPtr & prev_buf) |
376 | { |
377 | auto prev_memory_buffer = typeid_cast<MemoryWriteBuffer *>(prev_buf.get()); |
378 | if (!prev_memory_buffer) |
379 | throw Exception("Expected MemoryWriteBuffer" , ErrorCodes::LOGICAL_ERROR); |
380 | |
381 | auto rdbuf = prev_memory_buffer->tryGetReadBuffer(); |
382 | copyData(*rdbuf , *next_buffer); |
383 | |
384 | return next_buffer; |
385 | }; |
386 | |
387 | cascade_buffer2.emplace_back(push_memory_buffer_and_continue); |
388 | } |
389 | |
390 | used_output.out_maybe_delayed_and_compressed = std::make_shared<CascadeWriteBuffer>( |
391 | std::move(cascade_buffer1), std::move(cascade_buffer2)); |
392 | } |
393 | else |
394 | { |
395 | used_output.out_maybe_delayed_and_compressed = used_output.out_maybe_compressed; |
396 | } |
397 | |
398 | std::unique_ptr<ReadBuffer> in_param = std::make_unique<ReadBufferFromString>(query_param); |
399 | |
400 | std::unique_ptr<ReadBuffer> in_post_raw = std::make_unique<ReadBufferFromIStream>(istr); |
401 | |
402 | /// Request body can be compressed using algorithm specified in the Content-Encoding header. |
403 | std::unique_ptr<ReadBuffer> in_post; |
404 | String http_request_compression_method_str = request.get("Content-Encoding" , "" ); |
405 | if (!http_request_compression_method_str.empty()) |
406 | { |
407 | if (http_request_compression_method_str == "gzip" ) |
408 | { |
409 | in_post = std::make_unique<ZlibInflatingReadBuffer>(std::move(in_post_raw), CompressionMethod::Gzip); |
410 | } |
411 | else if (http_request_compression_method_str == "deflate" ) |
412 | { |
413 | in_post = std::make_unique<ZlibInflatingReadBuffer>(std::move(in_post_raw), CompressionMethod::Zlib); |
414 | } |
415 | #if USE_BROTLI |
416 | else if (http_request_compression_method_str == "br" ) |
417 | { |
418 | in_post = std::make_unique<BrotliReadBuffer>(std::move(in_post_raw)); |
419 | } |
420 | #endif |
421 | else |
422 | { |
423 | throw Exception("Unknown Content-Encoding of HTTP request: " + http_request_compression_method_str, |
424 | ErrorCodes::UNKNOWN_COMPRESSION_METHOD); |
425 | } |
426 | } |
427 | else |
428 | in_post = std::move(in_post_raw); |
429 | |
430 | /// The data can also be compressed using incompatible internal algorithm. This is indicated by |
431 | /// 'decompress' query parameter. |
432 | std::unique_ptr<ReadBuffer> in_post_maybe_compressed; |
433 | bool in_post_compressed = false; |
434 | if (params.getParsed<bool>("decompress" , false)) |
435 | { |
436 | in_post_maybe_compressed = std::make_unique<CompressedReadBuffer>(*in_post); |
437 | in_post_compressed = true; |
438 | } |
439 | else |
440 | in_post_maybe_compressed = std::move(in_post); |
441 | |
442 | std::unique_ptr<ReadBuffer> in; |
443 | |
444 | static const NameSet reserved_param_names{"query" , "compress" , "decompress" , "user" , "password" , "quota_key" , "query_id" , "stacktrace" , |
445 | "buffer_size" , "wait_end_of_query" , "session_id" , "session_timeout" , "session_check" }; |
446 | |
447 | Names reserved_param_suffixes; |
448 | |
449 | auto param_could_be_skipped = [&] (const String & name) |
450 | { |
451 | if (reserved_param_names.count(name)) |
452 | return true; |
453 | |
454 | for (const String & suffix : reserved_param_suffixes) |
455 | { |
456 | if (endsWith(name, suffix)) |
457 | return true; |
458 | } |
459 | |
460 | return false; |
461 | }; |
462 | |
463 | /// Settings can be overridden in the query. |
464 | /// Some parameters (database, default_format, everything used in the code above) do not |
465 | /// belong to the Settings class. |
466 | |
467 | /// 'readonly' setting values mean: |
468 | /// readonly = 0 - any query is allowed, client can change any setting. |
469 | /// readonly = 1 - only readonly queries are allowed, client can't change settings. |
470 | /// readonly = 2 - only readonly queries are allowed, client can change any setting except 'readonly'. |
471 | |
472 | /// In theory if initially readonly = 0, the client can change any setting and then set readonly |
473 | /// to some other value. |
474 | auto & settings = context.getSettingsRef(); |
475 | |
476 | /// Only readonly queries are allowed for HTTP GET requests. |
477 | if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_GET) |
478 | { |
479 | if (settings.readonly == 0) |
480 | settings.readonly = 2; |
481 | } |
482 | |
483 | bool has_external_data = startsWith(request.getContentType().data(), "multipart/form-data" ); |
484 | |
485 | if (has_external_data) |
486 | { |
487 | /// Skip unneeded parameters to avoid confusing them later with context settings or query parameters. |
488 | reserved_param_suffixes.reserve(3); |
489 | /// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings. |
490 | reserved_param_suffixes.emplace_back("_format" ); |
491 | reserved_param_suffixes.emplace_back("_types" ); |
492 | reserved_param_suffixes.emplace_back("_structure" ); |
493 | } |
494 | |
495 | SettingsChanges settings_changes; |
496 | for (const auto & [key, value] : params) |
497 | { |
498 | if (key == "database" ) |
499 | { |
500 | context.setCurrentDatabase(value); |
501 | } |
502 | else if (key == "default_format" ) |
503 | { |
504 | context.setDefaultFormat(value); |
505 | } |
506 | else if (param_could_be_skipped(key)) |
507 | { |
508 | } |
509 | else if (startsWith(key, "param_" )) |
510 | { |
511 | /// Save name and values of substitution in dictionary. |
512 | const String parameter_name = key.substr(strlen("param_" )); |
513 | context.setQueryParameter(parameter_name, value); |
514 | } |
515 | else |
516 | { |
517 | /// All other query parameters are treated as settings. |
518 | settings_changes.push_back({key, value}); |
519 | } |
520 | } |
521 | |
522 | /// For external data we also want settings |
523 | context.checkSettingsConstraints(settings_changes); |
524 | context.applySettingsChanges(settings_changes); |
525 | |
526 | /// Used in case of POST request with form-data, but it isn't expected to be deleted after that scope. |
527 | std::string full_query; |
528 | |
529 | /// Support for "external data for query processing". |
530 | if (has_external_data) |
531 | { |
532 | ExternalTablesHandler handler(context, params); |
533 | params.load(request, istr, handler); |
534 | |
535 | /// Params are of both form params POST and uri (GET params) |
536 | for (const auto & it : params) |
537 | if (it.first == "query" ) |
538 | full_query += it.second; |
539 | |
540 | in = std::make_unique<ReadBufferFromString>(full_query); |
541 | } |
542 | else |
543 | in = std::make_unique<ConcatReadBuffer>(*in_param, *in_post_maybe_compressed); |
544 | |
545 | |
546 | /// HTTP response compression is turned on only if the client signalled that they support it |
547 | /// (using Accept-Encoding header) and 'enable_http_compression' setting is turned on. |
548 | used_output.out->setCompression(client_supports_http_compression && settings.enable_http_compression); |
549 | if (client_supports_http_compression) |
550 | used_output.out->setCompressionLevel(settings.http_zlib_compression_level); |
551 | |
552 | used_output.out->setSendProgressInterval(settings.http_headers_progress_interval_ms); |
553 | |
554 | /// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on, |
555 | /// checksums of client data compressed with internal algorithm are not checked. |
556 | if (in_post_compressed && settings.http_native_compression_disable_checksumming_on_decompress) |
557 | static_cast<CompressedReadBuffer &>(*in_post_maybe_compressed).disableChecksumming(); |
558 | |
559 | /// Add CORS header if 'add_http_cors_header' setting is turned on and the client passed |
560 | /// Origin header. |
561 | used_output.out->addHeaderCORS(settings.add_http_cors_header && !request.get("Origin" , "" ).empty()); |
562 | |
563 | ClientInfo & client_info = context.getClientInfo(); |
564 | client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY; |
565 | client_info.interface = ClientInfo::Interface::HTTP; |
566 | |
567 | /// Query sent through HTTP interface is initial. |
568 | client_info.initial_user = client_info.current_user; |
569 | client_info.initial_query_id = client_info.current_query_id; |
570 | client_info.initial_address = client_info.current_address; |
571 | |
572 | ClientInfo::HTTPMethod http_method = ClientInfo::HTTPMethod::UNKNOWN; |
573 | if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_GET) |
574 | http_method = ClientInfo::HTTPMethod::GET; |
575 | else if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_POST) |
576 | http_method = ClientInfo::HTTPMethod::POST; |
577 | |
578 | client_info.http_method = http_method; |
579 | client_info.http_user_agent = request.get("User-Agent" , "" ); |
580 | |
581 | auto appendCallback = [&context] (ProgressCallback callback) |
582 | { |
583 | auto prev = context.getProgressCallback(); |
584 | |
585 | context.setProgressCallback([prev, callback] (const Progress & progress) |
586 | { |
587 | if (prev) |
588 | prev(progress); |
589 | |
590 | callback(progress); |
591 | }); |
592 | }; |
593 | |
594 | /// While still no data has been sent, we will report about query execution progress by sending HTTP headers. |
595 | if (settings.send_progress_in_http_headers) |
596 | appendCallback([&used_output] (const Progress & progress) { used_output.out->onProgress(progress); }); |
597 | |
598 | if (settings.readonly > 0 && settings.cancel_http_readonly_queries_on_client_close) |
599 | { |
600 | Poco::Net::StreamSocket & socket = dynamic_cast<Poco::Net::HTTPServerRequestImpl &>(request).socket(); |
601 | |
602 | appendCallback([&context, &socket](const Progress &) |
603 | { |
604 | /// Assume that at the point this method is called no one is reading data from the socket any more. |
605 | /// True for read-only queries. |
606 | try |
607 | { |
608 | char b; |
609 | int status = socket.receiveBytes(&b, 1, MSG_DONTWAIT | MSG_PEEK); |
610 | if (status == 0) |
611 | context.killCurrentQuery(); |
612 | } |
613 | catch (Poco::TimeoutException &) |
614 | { |
615 | } |
616 | catch (...) |
617 | { |
618 | context.killCurrentQuery(); |
619 | } |
620 | }); |
621 | } |
622 | |
623 | customizeContext(context); |
624 | |
625 | executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context, |
626 | [&response] (const String & content_type) { response.setContentType(content_type); }, |
627 | [&response] (const String & current_query_id) { response.add("X-ClickHouse-Query-Id" , current_query_id); }); |
628 | |
629 | if (used_output.hasDelayed()) |
630 | { |
631 | /// TODO: set Content-Length if possible |
632 | pushDelayedResults(used_output); |
633 | } |
634 | |
635 | /// Send HTTP headers with code 200 if no exception happened and the data is still not sent to |
636 | /// the client. |
637 | used_output.out->finalize(); |
638 | } |
639 | |
640 | void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_code, |
641 | Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, |
642 | Output & used_output) |
643 | { |
644 | try |
645 | { |
646 | /// If HTTP method is POST and Keep-Alive is turned on, we should read the whole request body |
647 | /// to avoid reading part of the current request body in the next request. |
648 | if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST |
649 | && response.getKeepAlive() |
650 | && !request.stream().eof() |
651 | && exception_code != ErrorCodes::HTTP_LENGTH_REQUIRED) |
652 | { |
653 | request.stream().ignore(std::numeric_limits<std::streamsize>::max()); |
654 | } |
655 | |
656 | bool auth_fail = exception_code == ErrorCodes::UNKNOWN_USER || |
657 | exception_code == ErrorCodes::WRONG_PASSWORD || |
658 | exception_code == ErrorCodes::REQUIRED_PASSWORD; |
659 | |
660 | if (auth_fail) |
661 | { |
662 | response.requireAuthentication("ClickHouse server HTTP API" ); |
663 | } |
664 | else |
665 | { |
666 | response.setStatusAndReason(exceptionCodeToHTTPStatus(exception_code)); |
667 | } |
668 | |
669 | if (!response.sent() && !used_output.out_maybe_compressed) |
670 | { |
671 | /// If nothing was sent yet and we don't even know if we must compress the response. |
672 | response.send() << s << std::endl; |
673 | } |
674 | else if (used_output.out_maybe_compressed) |
675 | { |
676 | /// Destroy CascadeBuffer to actualize buffers' positions and reset extra references |
677 | if (used_output.hasDelayed()) |
678 | used_output.out_maybe_delayed_and_compressed.reset(); |
679 | |
680 | /// Send the error message into already used (and possibly compressed) stream. |
681 | /// Note that the error message will possibly be sent after some data. |
682 | /// Also HTTP code 200 could have already been sent. |
683 | |
684 | /// If buffer has data, and that data wasn't sent yet, then no need to send that data |
685 | bool data_sent = used_output.out->count() != used_output.out->offset(); |
686 | |
687 | if (!data_sent) |
688 | { |
689 | used_output.out_maybe_compressed->position() = used_output.out_maybe_compressed->buffer().begin(); |
690 | used_output.out->position() = used_output.out->buffer().begin(); |
691 | } |
692 | |
693 | writeString(s, *used_output.out_maybe_compressed); |
694 | writeChar('\n', *used_output.out_maybe_compressed); |
695 | |
696 | used_output.out_maybe_compressed->next(); |
697 | used_output.out->next(); |
698 | used_output.out->finalize(); |
699 | } |
700 | } |
701 | catch (...) |
702 | { |
703 | tryLogCurrentException(log, "Cannot send exception to client" ); |
704 | } |
705 | } |
706 | |
707 | |
708 | void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) |
709 | { |
710 | setThreadName("HTTPHandler" ); |
711 | ThreadStatus thread_status; |
712 | |
713 | Output used_output; |
714 | |
715 | /// In case of exception, send stack trace to client. |
716 | bool with_stacktrace = false; |
717 | |
718 | try |
719 | { |
720 | response.setContentType("text/plain; charset=UTF-8" ); |
721 | response.set("X-ClickHouse-Server-Display-Name" , server_display_name); |
722 | /// For keep-alive to work. |
723 | if (request.getVersion() == Poco::Net::HTTPServerRequest::HTTP_1_1) |
724 | response.setChunkedTransferEncoding(true); |
725 | |
726 | HTMLForm params(request); |
727 | with_stacktrace = params.getParsed<bool>("stacktrace" , false); |
728 | |
729 | /// Workaround. Poco does not detect 411 Length Required case. |
730 | if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST && !request.getChunkedTransferEncoding() && |
731 | !request.hasContentLength()) |
732 | { |
733 | throw Exception("There is neither Transfer-Encoding header nor Content-Length header" , ErrorCodes::HTTP_LENGTH_REQUIRED); |
734 | } |
735 | |
736 | processQuery(request, params, response, used_output); |
737 | LOG_INFO(log, "Done processing query" ); |
738 | } |
739 | catch (...) |
740 | { |
741 | tryLogCurrentException(log); |
742 | |
743 | /** If exception is received from remote server, then stack trace is embedded in message. |
744 | * If exception is thrown on local server, then stack trace is in separate field. |
745 | */ |
746 | std::string exception_message = getCurrentExceptionMessage(with_stacktrace, true); |
747 | int exception_code = getCurrentExceptionCode(); |
748 | |
749 | trySendExceptionToClient(exception_message, exception_code, request, response, used_output); |
750 | } |
751 | } |
752 | |
753 | |
754 | } |
755 | |