1 | #include "InterserverIOHTTPHandler.h" |
2 | |
3 | #include <Poco/Net/HTTPBasicCredentials.h> |
4 | #include <Poco/Net/HTTPServerRequest.h> |
5 | #include <Poco/Net/HTTPServerResponse.h> |
6 | #include <common/logger_useful.h> |
7 | #include <Common/HTMLForm.h> |
8 | #include <Common/setThreadName.h> |
9 | #include <Compression/CompressedWriteBuffer.h> |
10 | #include <IO/ReadBufferFromIStream.h> |
11 | #include <IO/WriteBufferFromHTTPServerResponse.h> |
12 | #include <Interpreters/InterserverIOHandler.h> |
13 | #include "IServer.h" |
14 | |
15 | namespace DB |
16 | { |
17 | |
18 | namespace ErrorCodes |
19 | { |
20 | extern const int ABORTED; |
21 | extern const int TOO_MANY_SIMULTANEOUS_QUERIES; |
22 | } |
23 | |
24 | std::pair<String, bool> InterserverIOHTTPHandler::checkAuthentication(Poco::Net::HTTPServerRequest & request) const |
25 | { |
26 | const auto & config = server.config(); |
27 | |
28 | if (config.has("interserver_http_credentials.user" )) |
29 | { |
30 | if (!request.hasCredentials()) |
31 | return {"Server requires HTTP Basic authentification, but client doesn't provide it" , false}; |
32 | String scheme, info; |
33 | request.getCredentials(scheme, info); |
34 | |
35 | if (scheme != "Basic" ) |
36 | return {"Server requires HTTP Basic authentification but client provides another method" , false}; |
37 | |
38 | String user = config.getString("interserver_http_credentials.user" ); |
39 | String password = config.getString("interserver_http_credentials.password" , "" ); |
40 | |
41 | Poco::Net::HTTPBasicCredentials credentials(info); |
42 | if (std::make_pair(user, password) != std::make_pair(credentials.getUsername(), credentials.getPassword())) |
43 | return {"Incorrect user or password in HTTP Basic authentification" , false}; |
44 | } |
45 | else if (request.hasCredentials()) |
46 | { |
47 | return {"Client requires HTTP Basic authentification, but server doesn't provide it" , false}; |
48 | } |
49 | return {"" , true}; |
50 | } |
51 | |
52 | void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, Output & used_output) |
53 | { |
54 | HTMLForm params(request); |
55 | |
56 | LOG_TRACE(log, "Request URI: " << request.getURI()); |
57 | |
58 | String endpoint_name = params.get("endpoint" ); |
59 | bool compress = params.get("compress" ) == "true" ; |
60 | |
61 | ReadBufferFromIStream body(request.stream()); |
62 | |
63 | auto endpoint = server.context().getInterserverIOHandler().getEndpoint(endpoint_name); |
64 | |
65 | if (compress) |
66 | { |
67 | CompressedWriteBuffer compressed_out(*used_output.out.get()); |
68 | endpoint->processQuery(params, body, compressed_out, response); |
69 | } |
70 | else |
71 | { |
72 | endpoint->processQuery(params, body, *used_output.out.get(), response); |
73 | } |
74 | } |
75 | |
76 | |
77 | void InterserverIOHTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) |
78 | { |
79 | setThreadName("IntersrvHandler" ); |
80 | |
81 | /// In order to work keep-alive. |
82 | if (request.getVersion() == Poco::Net::HTTPServerRequest::HTTP_1_1) |
83 | response.setChunkedTransferEncoding(true); |
84 | |
85 | Output used_output; |
86 | const auto & config = server.config(); |
87 | unsigned keep_alive_timeout = config.getUInt("keep_alive_timeout" , 10); |
88 | used_output.out = std::make_shared<WriteBufferFromHTTPServerResponse>(request, response, keep_alive_timeout); |
89 | |
90 | try |
91 | { |
92 | if (auto [message, success] = checkAuthentication(request); success) |
93 | { |
94 | processQuery(request, response, used_output); |
95 | LOG_INFO(log, "Done processing query" ); |
96 | } |
97 | else |
98 | { |
99 | response.setStatusAndReason(Poco::Net::HTTPServerResponse::HTTP_UNAUTHORIZED); |
100 | if (!response.sent()) |
101 | writeString(message, *used_output.out); |
102 | LOG_WARNING(log, "Query processing failed request: '" << request.getURI() << "' authentification failed" ); |
103 | } |
104 | } |
105 | catch (Exception & e) |
106 | { |
107 | if (e.code() == ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES) |
108 | return; |
109 | |
110 | response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); |
111 | |
112 | /// Sending to remote server was cancelled due to server shutdown or drop table. |
113 | bool is_real_error = e.code() != ErrorCodes::ABORTED; |
114 | |
115 | std::string message = getCurrentExceptionMessage(is_real_error); |
116 | if (!response.sent()) |
117 | writeString(message, *used_output.out); |
118 | |
119 | if (is_real_error) |
120 | LOG_ERROR(log, message); |
121 | else |
122 | LOG_INFO(log, message); |
123 | } |
124 | catch (...) |
125 | { |
126 | response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); |
127 | std::string message = getCurrentExceptionMessage(false); |
128 | if (!response.sent()) |
129 | writeString(message, *used_output.out); |
130 | |
131 | LOG_ERROR(log, message); |
132 | } |
133 | } |
134 | |
135 | |
136 | } |
137 | |