1#include "InterserverIOHTTPHandler.h"
2
3#include <Poco/Net/HTTPBasicCredentials.h>
4#include <Poco/Net/HTTPServerRequest.h>
5#include <Poco/Net/HTTPServerResponse.h>
6#include <common/logger_useful.h>
7#include <Common/HTMLForm.h>
8#include <Common/setThreadName.h>
9#include <Compression/CompressedWriteBuffer.h>
10#include <IO/ReadBufferFromIStream.h>
11#include <IO/WriteBufferFromHTTPServerResponse.h>
12#include <Interpreters/InterserverIOHandler.h>
13#include "IServer.h"
14
15namespace DB
16{
17
18namespace ErrorCodes
19{
20 extern const int ABORTED;
21 extern const int TOO_MANY_SIMULTANEOUS_QUERIES;
22}
23
24std::pair<String, bool> InterserverIOHTTPHandler::checkAuthentication(Poco::Net::HTTPServerRequest & request) const
25{
26 const auto & config = server.config();
27
28 if (config.has("interserver_http_credentials.user"))
29 {
30 if (!request.hasCredentials())
31 return {"Server requires HTTP Basic authentification, but client doesn't provide it", false};
32 String scheme, info;
33 request.getCredentials(scheme, info);
34
35 if (scheme != "Basic")
36 return {"Server requires HTTP Basic authentification but client provides another method", false};
37
38 String user = config.getString("interserver_http_credentials.user");
39 String password = config.getString("interserver_http_credentials.password", "");
40
41 Poco::Net::HTTPBasicCredentials credentials(info);
42 if (std::make_pair(user, password) != std::make_pair(credentials.getUsername(), credentials.getPassword()))
43 return {"Incorrect user or password in HTTP Basic authentification", false};
44 }
45 else if (request.hasCredentials())
46 {
47 return {"Client requires HTTP Basic authentification, but server doesn't provide it", false};
48 }
49 return {"", true};
50}
51
52void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, Output & used_output)
53{
54 HTMLForm params(request);
55
56 LOG_TRACE(log, "Request URI: " << request.getURI());
57
58 String endpoint_name = params.get("endpoint");
59 bool compress = params.get("compress") == "true";
60
61 ReadBufferFromIStream body(request.stream());
62
63 auto endpoint = server.context().getInterserverIOHandler().getEndpoint(endpoint_name);
64
65 if (compress)
66 {
67 CompressedWriteBuffer compressed_out(*used_output.out.get());
68 endpoint->processQuery(params, body, compressed_out, response);
69 }
70 else
71 {
72 endpoint->processQuery(params, body, *used_output.out.get(), response);
73 }
74}
75
76
77void InterserverIOHTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
78{
79 setThreadName("IntersrvHandler");
80
81 /// In order to work keep-alive.
82 if (request.getVersion() == Poco::Net::HTTPServerRequest::HTTP_1_1)
83 response.setChunkedTransferEncoding(true);
84
85 Output used_output;
86 const auto & config = server.config();
87 unsigned keep_alive_timeout = config.getUInt("keep_alive_timeout", 10);
88 used_output.out = std::make_shared<WriteBufferFromHTTPServerResponse>(request, response, keep_alive_timeout);
89
90 try
91 {
92 if (auto [message, success] = checkAuthentication(request); success)
93 {
94 processQuery(request, response, used_output);
95 LOG_INFO(log, "Done processing query");
96 }
97 else
98 {
99 response.setStatusAndReason(Poco::Net::HTTPServerResponse::HTTP_UNAUTHORIZED);
100 if (!response.sent())
101 writeString(message, *used_output.out);
102 LOG_WARNING(log, "Query processing failed request: '" << request.getURI() << "' authentification failed");
103 }
104 }
105 catch (Exception & e)
106 {
107 if (e.code() == ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES)
108 return;
109
110 response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
111
112 /// Sending to remote server was cancelled due to server shutdown or drop table.
113 bool is_real_error = e.code() != ErrorCodes::ABORTED;
114
115 std::string message = getCurrentExceptionMessage(is_real_error);
116 if (!response.sent())
117 writeString(message, *used_output.out);
118
119 if (is_real_error)
120 LOG_ERROR(log, message);
121 else
122 LOG_INFO(log, message);
123 }
124 catch (...)
125 {
126 response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
127 std::string message = getCurrentExceptionMessage(false);
128 if (!response.sent())
129 writeString(message, *used_output.out);
130
131 LOG_ERROR(log, message);
132 }
133}
134
135
136}
137