1#include "HTTPHandler.h"
2
3#include <chrono>
4#include <iomanip>
5#include <Poco/File.h>
6#include <Poco/Net/HTTPBasicCredentials.h>
7#include <Poco/Net/HTTPServerRequest.h>
8#include <Poco/Net/HTTPServerRequestImpl.h>
9#include <Poco/Net/HTTPServerResponse.h>
10#include <Poco/Net/NetException.h>
11#include <ext/scope_guard.h>
12#include <Core/ExternalTable.h>
13#include <Common/StringUtils/StringUtils.h>
14#include <Common/escapeForFileName.h>
15#include <Common/getFQDNOrHostName.h>
16#include <Common/CurrentThread.h>
17#include <Common/setThreadName.h>
18#include <Common/config.h>
19#include <Common/SettingsChanges.h>
20#include <Compression/CompressedReadBuffer.h>
21#include <Compression/CompressedWriteBuffer.h>
22#include <IO/ReadBufferFromIStream.h>
23#include <IO/ZlibInflatingReadBuffer.h>
24#include <IO/BrotliReadBuffer.h>
25#include <IO/ReadBufferFromString.h>
26#include <IO/WriteBufferFromString.h>
27#include <IO/WriteBufferFromHTTPServerResponse.h>
28#include <IO/WriteBufferFromFile.h>
29#include <IO/WriteHelpers.h>
30#include <IO/copyData.h>
31#include <IO/ConcatReadBuffer.h>
32#include <IO/CascadeWriteBuffer.h>
33#include <IO/MemoryReadWriteBuffer.h>
34#include <IO/WriteBufferFromTemporaryFile.h>
35#include <DataStreams/IBlockInputStream.h>
36#include <Interpreters/executeQuery.h>
37#include <Common/typeid_cast.h>
38#include <Poco/Net/HTTPStream.h>
39
40namespace DB
41{
42
43namespace ErrorCodes
44{
45 extern const int READONLY;
46 extern const int UNKNOWN_COMPRESSION_METHOD;
47
48 extern const int CANNOT_PARSE_TEXT;
49 extern const int CANNOT_PARSE_ESCAPE_SEQUENCE;
50 extern const int CANNOT_PARSE_QUOTED_STRING;
51 extern const int CANNOT_PARSE_DATE;
52 extern const int CANNOT_PARSE_DATETIME;
53 extern const int CANNOT_PARSE_NUMBER;
54 extern const int CANNOT_OPEN_FILE;
55
56 extern const int UNKNOWN_ELEMENT_IN_AST;
57 extern const int UNKNOWN_TYPE_OF_AST_NODE;
58 extern const int TOO_DEEP_AST;
59 extern const int TOO_BIG_AST;
60 extern const int UNEXPECTED_AST_STRUCTURE;
61
62 extern const int SYNTAX_ERROR;
63
64 extern const int INCORRECT_DATA;
65 extern const int TYPE_MISMATCH;
66
67 extern const int UNKNOWN_TABLE;
68 extern const int UNKNOWN_FUNCTION;
69 extern const int UNKNOWN_IDENTIFIER;
70 extern const int UNKNOWN_TYPE;
71 extern const int UNKNOWN_STORAGE;
72 extern const int UNKNOWN_DATABASE;
73 extern const int UNKNOWN_SETTING;
74 extern const int UNKNOWN_DIRECTION_OF_SORTING;
75 extern const int UNKNOWN_AGGREGATE_FUNCTION;
76 extern const int UNKNOWN_FORMAT;
77 extern const int UNKNOWN_DATABASE_ENGINE;
78 extern const int UNKNOWN_TYPE_OF_QUERY;
79
80 extern const int QUERY_IS_TOO_LARGE;
81
82 extern const int NOT_IMPLEMENTED;
83 extern const int SOCKET_TIMEOUT;
84
85 extern const int UNKNOWN_USER;
86 extern const int WRONG_PASSWORD;
87 extern const int REQUIRED_PASSWORD;
88
89 extern const int INVALID_SESSION_TIMEOUT;
90 extern const int HTTP_LENGTH_REQUIRED;
91}
92
93
94static Poco::Net::HTTPResponse::HTTPStatus exceptionCodeToHTTPStatus(int exception_code)
95{
96 using namespace Poco::Net;
97
98 if (exception_code == ErrorCodes::REQUIRED_PASSWORD)
99 return HTTPResponse::HTTP_UNAUTHORIZED;
100 else if (exception_code == ErrorCodes::CANNOT_PARSE_TEXT ||
101 exception_code == ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE ||
102 exception_code == ErrorCodes::CANNOT_PARSE_QUOTED_STRING ||
103 exception_code == ErrorCodes::CANNOT_PARSE_DATE ||
104 exception_code == ErrorCodes::CANNOT_PARSE_DATETIME ||
105 exception_code == ErrorCodes::CANNOT_PARSE_NUMBER ||
106
107 exception_code == ErrorCodes::UNKNOWN_ELEMENT_IN_AST ||
108 exception_code == ErrorCodes::UNKNOWN_TYPE_OF_AST_NODE ||
109 exception_code == ErrorCodes::TOO_DEEP_AST ||
110 exception_code == ErrorCodes::TOO_BIG_AST ||
111 exception_code == ErrorCodes::UNEXPECTED_AST_STRUCTURE ||
112
113 exception_code == ErrorCodes::SYNTAX_ERROR ||
114
115 exception_code == ErrorCodes::INCORRECT_DATA ||
116 exception_code == ErrorCodes::TYPE_MISMATCH)
117 return HTTPResponse::HTTP_BAD_REQUEST;
118 else if (exception_code == ErrorCodes::UNKNOWN_TABLE ||
119 exception_code == ErrorCodes::UNKNOWN_FUNCTION ||
120 exception_code == ErrorCodes::UNKNOWN_IDENTIFIER ||
121 exception_code == ErrorCodes::UNKNOWN_TYPE ||
122 exception_code == ErrorCodes::UNKNOWN_STORAGE ||
123 exception_code == ErrorCodes::UNKNOWN_DATABASE ||
124 exception_code == ErrorCodes::UNKNOWN_SETTING ||
125 exception_code == ErrorCodes::UNKNOWN_DIRECTION_OF_SORTING ||
126 exception_code == ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION ||
127 exception_code == ErrorCodes::UNKNOWN_FORMAT ||
128 exception_code == ErrorCodes::UNKNOWN_DATABASE_ENGINE ||
129
130 exception_code == ErrorCodes::UNKNOWN_TYPE_OF_QUERY)
131 return HTTPResponse::HTTP_NOT_FOUND;
132 else if (exception_code == ErrorCodes::QUERY_IS_TOO_LARGE)
133 return HTTPResponse::HTTP_REQUESTENTITYTOOLARGE;
134 else if (exception_code == ErrorCodes::NOT_IMPLEMENTED)
135 return HTTPResponse::HTTP_NOT_IMPLEMENTED;
136 else if (exception_code == ErrorCodes::SOCKET_TIMEOUT ||
137 exception_code == ErrorCodes::CANNOT_OPEN_FILE)
138 return HTTPResponse::HTTP_SERVICE_UNAVAILABLE;
139 else if (exception_code == ErrorCodes::HTTP_LENGTH_REQUIRED)
140 return HTTPResponse::HTTP_LENGTH_REQUIRED;
141
142 return HTTPResponse::HTTP_INTERNAL_SERVER_ERROR;
143}
144
145
146static std::chrono::steady_clock::duration parseSessionTimeout(
147 const Poco::Util::AbstractConfiguration & config,
148 const HTMLForm & params)
149{
150 unsigned session_timeout = config.getInt("default_session_timeout", 60);
151
152 if (params.has("session_timeout"))
153 {
154 unsigned max_session_timeout = config.getUInt("max_session_timeout", 3600);
155 std::string session_timeout_str = params.get("session_timeout");
156
157 ReadBufferFromString buf(session_timeout_str);
158 if (!tryReadIntText(session_timeout, buf) || !buf.eof())
159 throw Exception("Invalid session timeout: '" + session_timeout_str + "'", ErrorCodes::INVALID_SESSION_TIMEOUT);
160
161 if (session_timeout > max_session_timeout)
162 throw Exception("Session timeout '" + session_timeout_str + "' is larger than max_session_timeout: " + toString(max_session_timeout)
163 + ". Maximum session timeout could be modified in configuration file.",
164 ErrorCodes::INVALID_SESSION_TIMEOUT);
165 }
166
167 return std::chrono::seconds(session_timeout);
168}
169
170
171void HTTPHandler::pushDelayedResults(Output & used_output)
172{
173 std::vector<WriteBufferPtr> write_buffers;
174 std::vector<ReadBufferPtr> read_buffers;
175 std::vector<ReadBuffer *> read_buffers_raw_ptr;
176
177 auto cascade_buffer = typeid_cast<CascadeWriteBuffer *>(used_output.out_maybe_delayed_and_compressed.get());
178 if (!cascade_buffer)
179 throw Exception("Expected CascadeWriteBuffer", ErrorCodes::LOGICAL_ERROR);
180
181 cascade_buffer->getResultBuffers(write_buffers);
182
183 if (write_buffers.empty())
184 throw Exception("At least one buffer is expected to overwrite result into HTTP response", ErrorCodes::LOGICAL_ERROR);
185
186 for (auto & write_buf : write_buffers)
187 {
188 IReadableWriteBuffer * write_buf_concrete;
189 ReadBufferPtr reread_buf;
190
191 if (write_buf
192 && (write_buf_concrete = dynamic_cast<IReadableWriteBuffer *>(write_buf.get()))
193 && (reread_buf = write_buf_concrete->tryGetReadBuffer()))
194 {
195 read_buffers.emplace_back(reread_buf);
196 read_buffers_raw_ptr.emplace_back(reread_buf.get());
197 }
198 }
199
200 ConcatReadBuffer concat_read_buffer(read_buffers_raw_ptr);
201 copyData(concat_read_buffer, *used_output.out_maybe_compressed);
202}
203
204
205HTTPHandler::HTTPHandler(IServer & server_)
206 : server(server_)
207 , log(&Logger::get("HTTPHandler"))
208{
209 server_display_name = server.config().getString("display_name", getFQDNOrHostName());
210}
211
212
213void HTTPHandler::processQuery(
214 Poco::Net::HTTPServerRequest & request,
215 HTMLForm & params,
216 Poco::Net::HTTPServerResponse & response,
217 Output & used_output)
218{
219 Context context = server.context();
220
221 CurrentThread::QueryScope query_scope(context);
222
223 LOG_TRACE(log, "Request URI: " << request.getURI());
224
225 std::istream & istr = request.stream();
226
227 /// Part of the query can be passed in the 'query' parameter and the rest in the request body
228 /// (http method need not necessarily be POST). In this case the entire query consists of the
229 /// contents of the 'query' parameter, a line break and the request body.
230 std::string query_param = params.get("query", "");
231 if (!query_param.empty())
232 query_param += '\n';
233
234 /// The user and password can be passed by headers (similar to X-Auth-*),
235 /// which is used by load balancers to pass authentication information.
236 std::string user = request.get("X-ClickHouse-User", "");
237 std::string password = request.get("X-ClickHouse-Key", "");
238 std::string quota_key = request.get("X-ClickHouse-Quota", "");
239
240 if (user.empty() && password.empty() && quota_key.empty())
241 {
242 /// User name and password can be passed using query parameters
243 /// or using HTTP Basic auth (both methods are insecure).
244 if (request.hasCredentials())
245 {
246 Poco::Net::HTTPBasicCredentials credentials(request);
247
248 user = credentials.getUsername();
249 password = credentials.getPassword();
250 }
251 else
252 {
253 user = params.get("user", "default");
254 password = params.get("password", "");
255 }
256
257 quota_key = params.get("quota_key", "");
258 }
259 else
260 {
261 /// It is prohibited to mix different authorization schemes.
262 if (request.hasCredentials()
263 || params.has("user")
264 || params.has("password")
265 || params.has("quota_key"))
266 {
267 throw Exception("Invalid authentication: it is not allowed to use X-ClickHouse HTTP headers and other authentication methods simultaneously", ErrorCodes::REQUIRED_PASSWORD);
268 }
269 }
270
271 std::string query_id = params.get("query_id", "");
272 context.setUser(user, password, request.clientAddress(), quota_key);
273 context.setCurrentQueryId(query_id);
274
275 /// The user could specify session identifier and session timeout.
276 /// It allows to modify settings, create temporary tables and reuse them in subsequent requests.
277
278 std::shared_ptr<Context> session;
279 String session_id;
280 std::chrono::steady_clock::duration session_timeout;
281 bool session_is_set = params.has("session_id");
282 const auto & config = server.config();
283
284 if (session_is_set)
285 {
286 session_id = params.get("session_id");
287 session_timeout = parseSessionTimeout(config, params);
288 std::string session_check = params.get("session_check", "");
289
290 session = context.acquireSession(session_id, session_timeout, session_check == "1");
291
292 context = *session;
293 context.setSessionContext(*session);
294 }
295
296 SCOPE_EXIT({
297 if (session_is_set)
298 session->releaseSession(session_id, session_timeout);
299 });
300
301 /// The client can pass a HTTP header indicating supported compression method (gzip or deflate).
302 String http_response_compression_methods = request.get("Accept-Encoding", "");
303 bool client_supports_http_compression = false;
304 CompressionMethod http_response_compression_method {};
305
306 if (!http_response_compression_methods.empty())
307 {
308 /// Both gzip and deflate are supported. If the client supports both, gzip is preferred.
309 /// NOTE parsing of the list of methods is slightly incorrect.
310 if (std::string::npos != http_response_compression_methods.find("gzip"))
311 {
312 client_supports_http_compression = true;
313 http_response_compression_method = CompressionMethod::Gzip;
314 }
315 else if (std::string::npos != http_response_compression_methods.find("deflate"))
316 {
317 client_supports_http_compression = true;
318 http_response_compression_method = CompressionMethod::Zlib;
319 }
320#if USE_BROTLI
321 else if (http_response_compression_methods == "br")
322 {
323 client_supports_http_compression = true;
324 http_response_compression_method = CompressionMethod::Brotli;
325 }
326#endif
327 }
328
329 /// Client can pass a 'compress' flag in the query string. In this case the query result is
330 /// compressed using internal algorithm. This is not reflected in HTTP headers.
331 bool internal_compression = params.getParsed<bool>("compress", false);
332
333 /// At least, we should postpone sending of first buffer_size result bytes
334 size_t buffer_size_total = std::max(
335 params.getParsed<size_t>("buffer_size", DBMS_DEFAULT_BUFFER_SIZE), static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE));
336
337 /// If it is specified, the whole result will be buffered.
338 /// First ~buffer_size bytes will be buffered in memory, the remaining bytes will be stored in temporary file.
339 bool buffer_until_eof = params.getParsed<bool>("wait_end_of_query", false);
340
341 size_t buffer_size_http = DBMS_DEFAULT_BUFFER_SIZE;
342 size_t buffer_size_memory = (buffer_size_total > buffer_size_http) ? buffer_size_total : 0;
343
344 unsigned keep_alive_timeout = config.getUInt("keep_alive_timeout", 10);
345
346 used_output.out = std::make_shared<WriteBufferFromHTTPServerResponse>(
347 request, response, keep_alive_timeout,
348 client_supports_http_compression, http_response_compression_method, buffer_size_http);
349 if (internal_compression)
350 used_output.out_maybe_compressed = std::make_shared<CompressedWriteBuffer>(*used_output.out);
351 else
352 used_output.out_maybe_compressed = used_output.out;
353
354 if (buffer_size_memory > 0 || buffer_until_eof)
355 {
356 CascadeWriteBuffer::WriteBufferPtrs cascade_buffer1;
357 CascadeWriteBuffer::WriteBufferConstructors cascade_buffer2;
358
359 if (buffer_size_memory > 0)
360 cascade_buffer1.emplace_back(std::make_shared<MemoryWriteBuffer>(buffer_size_memory));
361
362 if (buffer_until_eof)
363 {
364 std::string tmp_path_template = context.getTemporaryPath() + "http_buffers/";
365
366 auto create_tmp_disk_buffer = [tmp_path_template] (const WriteBufferPtr &)
367 {
368 return WriteBufferFromTemporaryFile::create(tmp_path_template);
369 };
370
371 cascade_buffer2.emplace_back(std::move(create_tmp_disk_buffer));
372 }
373 else
374 {
375 auto push_memory_buffer_and_continue = [next_buffer = used_output.out_maybe_compressed] (const WriteBufferPtr & prev_buf)
376 {
377 auto prev_memory_buffer = typeid_cast<MemoryWriteBuffer *>(prev_buf.get());
378 if (!prev_memory_buffer)
379 throw Exception("Expected MemoryWriteBuffer", ErrorCodes::LOGICAL_ERROR);
380
381 auto rdbuf = prev_memory_buffer->tryGetReadBuffer();
382 copyData(*rdbuf , *next_buffer);
383
384 return next_buffer;
385 };
386
387 cascade_buffer2.emplace_back(push_memory_buffer_and_continue);
388 }
389
390 used_output.out_maybe_delayed_and_compressed = std::make_shared<CascadeWriteBuffer>(
391 std::move(cascade_buffer1), std::move(cascade_buffer2));
392 }
393 else
394 {
395 used_output.out_maybe_delayed_and_compressed = used_output.out_maybe_compressed;
396 }
397
398 std::unique_ptr<ReadBuffer> in_param = std::make_unique<ReadBufferFromString>(query_param);
399
400 std::unique_ptr<ReadBuffer> in_post_raw = std::make_unique<ReadBufferFromIStream>(istr);
401
402 /// Request body can be compressed using algorithm specified in the Content-Encoding header.
403 std::unique_ptr<ReadBuffer> in_post;
404 String http_request_compression_method_str = request.get("Content-Encoding", "");
405 if (!http_request_compression_method_str.empty())
406 {
407 if (http_request_compression_method_str == "gzip")
408 {
409 in_post = std::make_unique<ZlibInflatingReadBuffer>(std::move(in_post_raw), CompressionMethod::Gzip);
410 }
411 else if (http_request_compression_method_str == "deflate")
412 {
413 in_post = std::make_unique<ZlibInflatingReadBuffer>(std::move(in_post_raw), CompressionMethod::Zlib);
414 }
415#if USE_BROTLI
416 else if (http_request_compression_method_str == "br")
417 {
418 in_post = std::make_unique<BrotliReadBuffer>(std::move(in_post_raw));
419 }
420#endif
421 else
422 {
423 throw Exception("Unknown Content-Encoding of HTTP request: " + http_request_compression_method_str,
424 ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
425 }
426 }
427 else
428 in_post = std::move(in_post_raw);
429
430 /// The data can also be compressed using incompatible internal algorithm. This is indicated by
431 /// 'decompress' query parameter.
432 std::unique_ptr<ReadBuffer> in_post_maybe_compressed;
433 bool in_post_compressed = false;
434 if (params.getParsed<bool>("decompress", false))
435 {
436 in_post_maybe_compressed = std::make_unique<CompressedReadBuffer>(*in_post);
437 in_post_compressed = true;
438 }
439 else
440 in_post_maybe_compressed = std::move(in_post);
441
442 std::unique_ptr<ReadBuffer> in;
443
444 static const NameSet reserved_param_names{"query", "compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace",
445 "buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check"};
446
447 Names reserved_param_suffixes;
448
449 auto param_could_be_skipped = [&] (const String & name)
450 {
451 if (reserved_param_names.count(name))
452 return true;
453
454 for (const String & suffix : reserved_param_suffixes)
455 {
456 if (endsWith(name, suffix))
457 return true;
458 }
459
460 return false;
461 };
462
463 /// Settings can be overridden in the query.
464 /// Some parameters (database, default_format, everything used in the code above) do not
465 /// belong to the Settings class.
466
467 /// 'readonly' setting values mean:
468 /// readonly = 0 - any query is allowed, client can change any setting.
469 /// readonly = 1 - only readonly queries are allowed, client can't change settings.
470 /// readonly = 2 - only readonly queries are allowed, client can change any setting except 'readonly'.
471
472 /// In theory if initially readonly = 0, the client can change any setting and then set readonly
473 /// to some other value.
474 auto & settings = context.getSettingsRef();
475
476 /// Only readonly queries are allowed for HTTP GET requests.
477 if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_GET)
478 {
479 if (settings.readonly == 0)
480 settings.readonly = 2;
481 }
482
483 bool has_external_data = startsWith(request.getContentType().data(), "multipart/form-data");
484
485 if (has_external_data)
486 {
487 /// Skip unneeded parameters to avoid confusing them later with context settings or query parameters.
488 reserved_param_suffixes.reserve(3);
489 /// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings.
490 reserved_param_suffixes.emplace_back("_format");
491 reserved_param_suffixes.emplace_back("_types");
492 reserved_param_suffixes.emplace_back("_structure");
493 }
494
495 SettingsChanges settings_changes;
496 for (const auto & [key, value] : params)
497 {
498 if (key == "database")
499 {
500 context.setCurrentDatabase(value);
501 }
502 else if (key == "default_format")
503 {
504 context.setDefaultFormat(value);
505 }
506 else if (param_could_be_skipped(key))
507 {
508 }
509 else if (startsWith(key, "param_"))
510 {
511 /// Save name and values of substitution in dictionary.
512 const String parameter_name = key.substr(strlen("param_"));
513 context.setQueryParameter(parameter_name, value);
514 }
515 else
516 {
517 /// All other query parameters are treated as settings.
518 settings_changes.push_back({key, value});
519 }
520 }
521
522 /// For external data we also want settings
523 context.checkSettingsConstraints(settings_changes);
524 context.applySettingsChanges(settings_changes);
525
526 /// Used in case of POST request with form-data, but it isn't expected to be deleted after that scope.
527 std::string full_query;
528
529 /// Support for "external data for query processing".
530 if (has_external_data)
531 {
532 ExternalTablesHandler handler(context, params);
533 params.load(request, istr, handler);
534
535 /// Params are of both form params POST and uri (GET params)
536 for (const auto & it : params)
537 if (it.first == "query")
538 full_query += it.second;
539
540 in = std::make_unique<ReadBufferFromString>(full_query);
541 }
542 else
543 in = std::make_unique<ConcatReadBuffer>(*in_param, *in_post_maybe_compressed);
544
545
546 /// HTTP response compression is turned on only if the client signalled that they support it
547 /// (using Accept-Encoding header) and 'enable_http_compression' setting is turned on.
548 used_output.out->setCompression(client_supports_http_compression && settings.enable_http_compression);
549 if (client_supports_http_compression)
550 used_output.out->setCompressionLevel(settings.http_zlib_compression_level);
551
552 used_output.out->setSendProgressInterval(settings.http_headers_progress_interval_ms);
553
554 /// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on,
555 /// checksums of client data compressed with internal algorithm are not checked.
556 if (in_post_compressed && settings.http_native_compression_disable_checksumming_on_decompress)
557 static_cast<CompressedReadBuffer &>(*in_post_maybe_compressed).disableChecksumming();
558
559 /// Add CORS header if 'add_http_cors_header' setting is turned on and the client passed
560 /// Origin header.
561 used_output.out->addHeaderCORS(settings.add_http_cors_header && !request.get("Origin", "").empty());
562
563 ClientInfo & client_info = context.getClientInfo();
564 client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
565 client_info.interface = ClientInfo::Interface::HTTP;
566
567 /// Query sent through HTTP interface is initial.
568 client_info.initial_user = client_info.current_user;
569 client_info.initial_query_id = client_info.current_query_id;
570 client_info.initial_address = client_info.current_address;
571
572 ClientInfo::HTTPMethod http_method = ClientInfo::HTTPMethod::UNKNOWN;
573 if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_GET)
574 http_method = ClientInfo::HTTPMethod::GET;
575 else if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_POST)
576 http_method = ClientInfo::HTTPMethod::POST;
577
578 client_info.http_method = http_method;
579 client_info.http_user_agent = request.get("User-Agent", "");
580
581 auto appendCallback = [&context] (ProgressCallback callback)
582 {
583 auto prev = context.getProgressCallback();
584
585 context.setProgressCallback([prev, callback] (const Progress & progress)
586 {
587 if (prev)
588 prev(progress);
589
590 callback(progress);
591 });
592 };
593
594 /// While still no data has been sent, we will report about query execution progress by sending HTTP headers.
595 if (settings.send_progress_in_http_headers)
596 appendCallback([&used_output] (const Progress & progress) { used_output.out->onProgress(progress); });
597
598 if (settings.readonly > 0 && settings.cancel_http_readonly_queries_on_client_close)
599 {
600 Poco::Net::StreamSocket & socket = dynamic_cast<Poco::Net::HTTPServerRequestImpl &>(request).socket();
601
602 appendCallback([&context, &socket](const Progress &)
603 {
604 /// Assume that at the point this method is called no one is reading data from the socket any more.
605 /// True for read-only queries.
606 try
607 {
608 char b;
609 int status = socket.receiveBytes(&b, 1, MSG_DONTWAIT | MSG_PEEK);
610 if (status == 0)
611 context.killCurrentQuery();
612 }
613 catch (Poco::TimeoutException &)
614 {
615 }
616 catch (...)
617 {
618 context.killCurrentQuery();
619 }
620 });
621 }
622
623 customizeContext(context);
624
625 executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
626 [&response] (const String & content_type) { response.setContentType(content_type); },
627 [&response] (const String & current_query_id) { response.add("X-ClickHouse-Query-Id", current_query_id); });
628
629 if (used_output.hasDelayed())
630 {
631 /// TODO: set Content-Length if possible
632 pushDelayedResults(used_output);
633 }
634
635 /// Send HTTP headers with code 200 if no exception happened and the data is still not sent to
636 /// the client.
637 used_output.out->finalize();
638}
639
640void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_code,
641 Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response,
642 Output & used_output)
643{
644 try
645 {
646 /// If HTTP method is POST and Keep-Alive is turned on, we should read the whole request body
647 /// to avoid reading part of the current request body in the next request.
648 if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST
649 && response.getKeepAlive()
650 && !request.stream().eof()
651 && exception_code != ErrorCodes::HTTP_LENGTH_REQUIRED)
652 {
653 request.stream().ignore(std::numeric_limits<std::streamsize>::max());
654 }
655
656 bool auth_fail = exception_code == ErrorCodes::UNKNOWN_USER ||
657 exception_code == ErrorCodes::WRONG_PASSWORD ||
658 exception_code == ErrorCodes::REQUIRED_PASSWORD;
659
660 if (auth_fail)
661 {
662 response.requireAuthentication("ClickHouse server HTTP API");
663 }
664 else
665 {
666 response.setStatusAndReason(exceptionCodeToHTTPStatus(exception_code));
667 }
668
669 if (!response.sent() && !used_output.out_maybe_compressed)
670 {
671 /// If nothing was sent yet and we don't even know if we must compress the response.
672 response.send() << s << std::endl;
673 }
674 else if (used_output.out_maybe_compressed)
675 {
676 /// Destroy CascadeBuffer to actualize buffers' positions and reset extra references
677 if (used_output.hasDelayed())
678 used_output.out_maybe_delayed_and_compressed.reset();
679
680 /// Send the error message into already used (and possibly compressed) stream.
681 /// Note that the error message will possibly be sent after some data.
682 /// Also HTTP code 200 could have already been sent.
683
684 /// If buffer has data, and that data wasn't sent yet, then no need to send that data
685 bool data_sent = used_output.out->count() != used_output.out->offset();
686
687 if (!data_sent)
688 {
689 used_output.out_maybe_compressed->position() = used_output.out_maybe_compressed->buffer().begin();
690 used_output.out->position() = used_output.out->buffer().begin();
691 }
692
693 writeString(s, *used_output.out_maybe_compressed);
694 writeChar('\n', *used_output.out_maybe_compressed);
695
696 used_output.out_maybe_compressed->next();
697 used_output.out->next();
698 used_output.out->finalize();
699 }
700 }
701 catch (...)
702 {
703 tryLogCurrentException(log, "Cannot send exception to client");
704 }
705}
706
707
708void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
709{
710 setThreadName("HTTPHandler");
711 ThreadStatus thread_status;
712
713 Output used_output;
714
715 /// In case of exception, send stack trace to client.
716 bool with_stacktrace = false;
717
718 try
719 {
720 response.setContentType("text/plain; charset=UTF-8");
721 response.set("X-ClickHouse-Server-Display-Name", server_display_name);
722 /// For keep-alive to work.
723 if (request.getVersion() == Poco::Net::HTTPServerRequest::HTTP_1_1)
724 response.setChunkedTransferEncoding(true);
725
726 HTMLForm params(request);
727 with_stacktrace = params.getParsed<bool>("stacktrace", false);
728
729 /// Workaround. Poco does not detect 411 Length Required case.
730 if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST && !request.getChunkedTransferEncoding() &&
731 !request.hasContentLength())
732 {
733 throw Exception("There is neither Transfer-Encoding header nor Content-Length header", ErrorCodes::HTTP_LENGTH_REQUIRED);
734 }
735
736 processQuery(request, params, response, used_output);
737 LOG_INFO(log, "Done processing query");
738 }
739 catch (...)
740 {
741 tryLogCurrentException(log);
742
743 /** If exception is received from remote server, then stack trace is embedded in message.
744 * If exception is thrown on local server, then stack trace is in separate field.
745 */
746 std::string exception_message = getCurrentExceptionMessage(with_stacktrace, true);
747 int exception_code = getCurrentExceptionCode();
748
749 trySendExceptionToClient(exception_message, exception_code, request, response, used_output);
750 }
751}
752
753
754}
755