1#pragma once
2
3#include <functional>
4#include <Core/Types.h>
5#include <IO/ConnectionTimeouts.h>
6#include <IO/HTTPCommon.h>
7#include <IO/ReadBuffer.h>
8#include <IO/ReadBufferFromIStream.h>
9#include <Poco/Any.h>
10#include <Poco/Net/HTTPBasicCredentials.h>
11#include <Poco/Net/HTTPClientSession.h>
12#include <Poco/Net/HTTPRequest.h>
13#include <Poco/Net/HTTPResponse.h>
14#include <Poco/URI.h>
15#include <Poco/Version.h>
16#include <Common/DNSResolver.h>
17#include <Common/config.h>
18#include <common/logger_useful.h>
19#include <Poco/URIStreamFactory.h>
20
21
22#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
23#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
24
25namespace DB
26{
27/** Perform HTTP POST request and provide response to read.
28 */
29
30namespace ErrorCodes
31{
32 extern const int TOO_MANY_REDIRECTS;
33}
34
35template <typename SessionPtr>
36class UpdatableSessionBase
37{
38protected:
39 SessionPtr session;
40 UInt64 redirects { 0 };
41 Poco::URI initial_uri;
42 const ConnectionTimeouts & timeouts;
43 SettingUInt64 max_redirects;
44
45public:
46 virtual void buildNewSession(const Poco::URI & uri) = 0;
47
48 explicit UpdatableSessionBase(const Poco::URI uri,
49 const ConnectionTimeouts & timeouts_,
50 SettingUInt64 max_redirects_)
51 : initial_uri { uri }
52 , timeouts { timeouts_ }
53 , max_redirects { max_redirects_ }
54 {
55 }
56
57 SessionPtr getSession()
58 {
59 return session;
60 }
61
62 void updateSession(const Poco::URI & uri)
63 {
64 ++redirects;
65 if (redirects <= max_redirects)
66 {
67 buildNewSession(uri);
68 }
69 else
70 {
71 std::stringstream error_message;
72 error_message << "Too many redirects while trying to access " << initial_uri.toString();
73
74 throw Exception(error_message.str(), ErrorCodes::TOO_MANY_REDIRECTS);
75 }
76 }
77
78 virtual ~UpdatableSessionBase()
79 {
80 }
81};
82
83
84namespace detail
85{
86 template <typename UpdatableSessionPtr>
87 class ReadWriteBufferFromHTTPBase : public ReadBuffer
88 {
89 public:
90 using HTTPHeaderEntry = std::tuple<std::string, std::string>;
91 using HTTPHeaderEntries = std::vector<HTTPHeaderEntry>;
92
93 protected:
94 Poco::URI uri;
95 std::string method;
96
97 UpdatableSessionPtr session;
98 std::istream * istr; /// owned by session
99 std::unique_ptr<ReadBuffer> impl;
100 std::function<void(std::ostream &)> out_stream_callback;
101 const Poco::Net::HTTPBasicCredentials & credentials;
102 std::vector<Poco::Net::HTTPCookie> cookies;
103 HTTPHeaderEntries http_header_entries;
104 RemoteHostFilter remote_host_filter;
105
106 std::istream * call(const Poco::URI uri_, Poco::Net::HTTPResponse & response)
107 {
108 // With empty path poco will send "POST HTTP/1.1" its bug.
109 if (uri.getPath().empty())
110 uri.setPath("/");
111
112 Poco::Net::HTTPRequest request(method, uri_.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
113 request.setHost(uri_.getHost()); // use original, not resolved host name in header
114
115 if (out_stream_callback)
116 request.setChunkedTransferEncoding(true);
117
118 for (auto & http_header_entry: http_header_entries)
119 {
120 request.set(std::get<0>(http_header_entry), std::get<1>(http_header_entry));
121 }
122
123 if (!credentials.getUsername().empty())
124 credentials.authenticate(request);
125
126 LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP")), "Sending request to " << uri.toString());
127
128 auto sess = session->getSession();
129
130 try
131 {
132 auto & stream_out = sess->sendRequest(request);
133
134 if (out_stream_callback)
135 out_stream_callback(stream_out);
136
137 istr = receiveResponse(*sess, request, response, true);
138 response.getCookies(cookies);
139
140 return istr;
141
142 }
143 catch (const Poco::Exception & e)
144 {
145 /// We use session data storage as storage for exception text
146 /// Depend on it we can deduce to reconnect session or reresolve session host
147 sess->attachSessionData(e.message());
148 throw;
149 }
150 }
151
152 public:
153 using OutStreamCallback = std::function<void(std::ostream &)>;
154
155 explicit ReadWriteBufferFromHTTPBase(UpdatableSessionPtr session_,
156 Poco::URI uri_,
157 const std::string & method_ = {},
158 OutStreamCallback out_stream_callback_ = {},
159 const Poco::Net::HTTPBasicCredentials & credentials_ = {},
160 size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
161 HTTPHeaderEntries http_header_entries_ = {},
162 const RemoteHostFilter & remote_host_filter_ = {})
163 : ReadBuffer(nullptr, 0)
164 , uri {uri_}
165 , method {!method_.empty() ? method_ : out_stream_callback_ ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET}
166 , session {session_}
167 , out_stream_callback {out_stream_callback_}
168 , credentials {credentials_}
169 , http_header_entries {http_header_entries_}
170 , remote_host_filter {remote_host_filter_}
171 {
172 Poco::Net::HTTPResponse response;
173
174 istr = call(uri, response);
175
176 while (isRedirect(response.getStatus()))
177 {
178 Poco::URI uri_redirect(response.get("Location"));
179 remote_host_filter.checkURL(uri_redirect);
180
181 session->updateSession(uri_redirect);
182
183 istr = call(uri_redirect,response);
184 }
185
186 try
187 {
188 impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_);
189 }
190 catch (const Poco::Exception & e)
191 {
192 /// We use session data storage as storage for exception text
193 /// Depend on it we can deduce to reconnect session or reresolve session host
194 auto sess = session->getSession();
195 sess->attachSessionData(e.message());
196 throw;
197 }
198 }
199
200 bool nextImpl() override
201 {
202 if (!impl->next())
203 return false;
204 internal_buffer = impl->buffer();
205 working_buffer = internal_buffer;
206 return true;
207 }
208
209 std::string getResponseCookie(const std::string & name, const std::string & def) const
210 {
211 for (const auto & cookie : cookies)
212 if (cookie.getName() == name)
213 return cookie.getValue();
214 return def;
215 }
216 };
217}
218
219class UpdatableSession : public UpdatableSessionBase<HTTPSessionPtr>
220{
221 using Parent = UpdatableSessionBase<HTTPSessionPtr>;
222
223public:
224 explicit UpdatableSession(const Poco::URI uri,
225 const ConnectionTimeouts & timeouts_,
226 const SettingUInt64 max_redirects_)
227 : Parent(uri, timeouts_, max_redirects_)
228 {
229 session = makeHTTPSession(initial_uri, timeouts);
230 }
231
232 void buildNewSession(const Poco::URI & uri) override
233 {
234 session = makeHTTPSession(uri, timeouts);
235 }
236};
237
238class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatableSession>>
239{
240 using Parent = detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatableSession>>;
241
242public:
243 explicit ReadWriteBufferFromHTTP(Poco::URI uri_,
244 const std::string & method_ = {},
245 OutStreamCallback out_stream_callback_ = {},
246 const ConnectionTimeouts & timeouts = {},
247 const SettingUInt64 max_redirects = 0,
248 const Poco::Net::HTTPBasicCredentials & credentials_ = {},
249 size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
250 const HTTPHeaderEntries & http_header_entries_ = {},
251 const RemoteHostFilter & remote_host_filter_ = {})
252 : Parent(std::make_shared<UpdatableSession>(uri_, timeouts, max_redirects), uri_, method_, out_stream_callback_, credentials_, buffer_size_, http_header_entries_, remote_host_filter_)
253 {
254 }
255};
256
257class UpdatablePooledSession : public UpdatableSessionBase<PooledHTTPSessionPtr>
258{
259 using Parent = UpdatableSessionBase<PooledHTTPSessionPtr>;
260
261private:
262 size_t per_endpoint_pool_size;
263
264public:
265 explicit UpdatablePooledSession(const Poco::URI uri,
266 const ConnectionTimeouts & timeouts_,
267 const SettingUInt64 max_redirects_,
268 size_t per_endpoint_pool_size_)
269 : Parent(uri, timeouts_, max_redirects_)
270 , per_endpoint_pool_size { per_endpoint_pool_size_ }
271 {
272 session = makePooledHTTPSession(initial_uri, timeouts, per_endpoint_pool_size);
273 }
274
275 void buildNewSession(const Poco::URI & uri) override
276 {
277 session = makePooledHTTPSession(uri, timeouts, per_endpoint_pool_size);
278 }
279};
280
281class PooledReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatablePooledSession>>
282{
283 using Parent = detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatablePooledSession>>;
284
285public:
286 explicit PooledReadWriteBufferFromHTTP(Poco::URI uri_,
287 const std::string & method_ = {},
288 OutStreamCallback out_stream_callback_ = {},
289 const ConnectionTimeouts & timeouts_ = {},
290 const Poco::Net::HTTPBasicCredentials & credentials_ = {},
291 size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
292 const SettingUInt64 max_redirects = 0,
293 size_t max_connections_per_endpoint = DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT)
294 : Parent(std::make_shared<UpdatablePooledSession>(uri_, timeouts_, max_redirects, max_connections_per_endpoint),
295 uri_,
296 method_,
297 out_stream_callback_,
298 credentials_,
299 buffer_size_)
300 {
301 }
302};
303
304}
305
306