1 | #pragma once |
2 | |
3 | #include <functional> |
4 | #include <Core/Types.h> |
5 | #include <IO/ConnectionTimeouts.h> |
6 | #include <IO/HTTPCommon.h> |
7 | #include <IO/ReadBuffer.h> |
8 | #include <IO/ReadBufferFromIStream.h> |
9 | #include <Poco/Any.h> |
10 | #include <Poco/Net/HTTPBasicCredentials.h> |
11 | #include <Poco/Net/HTTPClientSession.h> |
12 | #include <Poco/Net/HTTPRequest.h> |
13 | #include <Poco/Net/HTTPResponse.h> |
14 | #include <Poco/URI.h> |
15 | #include <Poco/Version.h> |
16 | #include <Common/DNSResolver.h> |
17 | #include <Common/config.h> |
18 | #include <common/logger_useful.h> |
19 | #include <Poco/URIStreamFactory.h> |
20 | |
21 | |
22 | #define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800 |
23 | #define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1 |
24 | |
25 | namespace DB |
26 | { |
27 | /** Perform HTTP POST request and provide response to read. |
28 | */ |
29 | |
30 | namespace ErrorCodes |
31 | { |
32 | extern const int TOO_MANY_REDIRECTS; |
33 | } |
34 | |
35 | template <typename SessionPtr> |
36 | class UpdatableSessionBase |
37 | { |
38 | protected: |
39 | SessionPtr session; |
40 | UInt64 redirects { 0 }; |
41 | Poco::URI initial_uri; |
42 | const ConnectionTimeouts & timeouts; |
43 | SettingUInt64 max_redirects; |
44 | |
45 | public: |
46 | virtual void buildNewSession(const Poco::URI & uri) = 0; |
47 | |
48 | explicit UpdatableSessionBase(const Poco::URI uri, |
49 | const ConnectionTimeouts & timeouts_, |
50 | SettingUInt64 max_redirects_) |
51 | : initial_uri { uri } |
52 | , timeouts { timeouts_ } |
53 | , max_redirects { max_redirects_ } |
54 | { |
55 | } |
56 | |
57 | SessionPtr getSession() |
58 | { |
59 | return session; |
60 | } |
61 | |
62 | void updateSession(const Poco::URI & uri) |
63 | { |
64 | ++redirects; |
65 | if (redirects <= max_redirects) |
66 | { |
67 | buildNewSession(uri); |
68 | } |
69 | else |
70 | { |
71 | std::stringstream error_message; |
72 | error_message << "Too many redirects while trying to access " << initial_uri.toString(); |
73 | |
74 | throw Exception(error_message.str(), ErrorCodes::TOO_MANY_REDIRECTS); |
75 | } |
76 | } |
77 | |
78 | virtual ~UpdatableSessionBase() |
79 | { |
80 | } |
81 | }; |
82 | |
83 | |
84 | namespace detail |
85 | { |
86 | template <typename UpdatableSessionPtr> |
87 | class ReadWriteBufferFromHTTPBase : public ReadBuffer |
88 | { |
89 | public: |
90 | using = std::tuple<std::string, std::string>; |
91 | using = std::vector<HTTPHeaderEntry>; |
92 | |
93 | protected: |
94 | Poco::URI uri; |
95 | std::string method; |
96 | |
97 | UpdatableSessionPtr session; |
98 | std::istream * istr; /// owned by session |
99 | std::unique_ptr<ReadBuffer> impl; |
100 | std::function<void(std::ostream &)> out_stream_callback; |
101 | const Poco::Net::HTTPBasicCredentials & credentials; |
102 | std::vector<Poco::Net::HTTPCookie> cookies; |
103 | HTTPHeaderEntries ; |
104 | RemoteHostFilter remote_host_filter; |
105 | |
106 | std::istream * call(const Poco::URI uri_, Poco::Net::HTTPResponse & response) |
107 | { |
108 | // With empty path poco will send "POST HTTP/1.1" its bug. |
109 | if (uri.getPath().empty()) |
110 | uri.setPath("/" ); |
111 | |
112 | Poco::Net::HTTPRequest request(method, uri_.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); |
113 | request.setHost(uri_.getHost()); // use original, not resolved host name in header |
114 | |
115 | if (out_stream_callback) |
116 | request.setChunkedTransferEncoding(true); |
117 | |
118 | for (auto & : http_header_entries) |
119 | { |
120 | request.set(std::get<0>(http_header_entry), std::get<1>(http_header_entry)); |
121 | } |
122 | |
123 | if (!credentials.getUsername().empty()) |
124 | credentials.authenticate(request); |
125 | |
126 | LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP" )), "Sending request to " << uri.toString()); |
127 | |
128 | auto sess = session->getSession(); |
129 | |
130 | try |
131 | { |
132 | auto & stream_out = sess->sendRequest(request); |
133 | |
134 | if (out_stream_callback) |
135 | out_stream_callback(stream_out); |
136 | |
137 | istr = receiveResponse(*sess, request, response, true); |
138 | response.getCookies(cookies); |
139 | |
140 | return istr; |
141 | |
142 | } |
143 | catch (const Poco::Exception & e) |
144 | { |
145 | /// We use session data storage as storage for exception text |
146 | /// Depend on it we can deduce to reconnect session or reresolve session host |
147 | sess->attachSessionData(e.message()); |
148 | throw; |
149 | } |
150 | } |
151 | |
152 | public: |
153 | using OutStreamCallback = std::function<void(std::ostream &)>; |
154 | |
155 | explicit ReadWriteBufferFromHTTPBase(UpdatableSessionPtr session_, |
156 | Poco::URI uri_, |
157 | const std::string & method_ = {}, |
158 | OutStreamCallback out_stream_callback_ = {}, |
159 | const Poco::Net::HTTPBasicCredentials & credentials_ = {}, |
160 | size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, |
161 | HTTPHeaderEntries = {}, |
162 | const RemoteHostFilter & remote_host_filter_ = {}) |
163 | : ReadBuffer(nullptr, 0) |
164 | , uri {uri_} |
165 | , method {!method_.empty() ? method_ : out_stream_callback_ ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET} |
166 | , session {session_} |
167 | , out_stream_callback {out_stream_callback_} |
168 | , credentials {credentials_} |
169 | , http_header_entries {http_header_entries_} |
170 | , remote_host_filter {remote_host_filter_} |
171 | { |
172 | Poco::Net::HTTPResponse response; |
173 | |
174 | istr = call(uri, response); |
175 | |
176 | while (isRedirect(response.getStatus())) |
177 | { |
178 | Poco::URI uri_redirect(response.get("Location" )); |
179 | remote_host_filter.checkURL(uri_redirect); |
180 | |
181 | session->updateSession(uri_redirect); |
182 | |
183 | istr = call(uri_redirect,response); |
184 | } |
185 | |
186 | try |
187 | { |
188 | impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_); |
189 | } |
190 | catch (const Poco::Exception & e) |
191 | { |
192 | /// We use session data storage as storage for exception text |
193 | /// Depend on it we can deduce to reconnect session or reresolve session host |
194 | auto sess = session->getSession(); |
195 | sess->attachSessionData(e.message()); |
196 | throw; |
197 | } |
198 | } |
199 | |
200 | bool nextImpl() override |
201 | { |
202 | if (!impl->next()) |
203 | return false; |
204 | internal_buffer = impl->buffer(); |
205 | working_buffer = internal_buffer; |
206 | return true; |
207 | } |
208 | |
209 | std::string getResponseCookie(const std::string & name, const std::string & def) const |
210 | { |
211 | for (const auto & cookie : cookies) |
212 | if (cookie.getName() == name) |
213 | return cookie.getValue(); |
214 | return def; |
215 | } |
216 | }; |
217 | } |
218 | |
219 | class UpdatableSession : public UpdatableSessionBase<HTTPSessionPtr> |
220 | { |
221 | using Parent = UpdatableSessionBase<HTTPSessionPtr>; |
222 | |
223 | public: |
224 | explicit UpdatableSession(const Poco::URI uri, |
225 | const ConnectionTimeouts & timeouts_, |
226 | const SettingUInt64 max_redirects_) |
227 | : Parent(uri, timeouts_, max_redirects_) |
228 | { |
229 | session = makeHTTPSession(initial_uri, timeouts); |
230 | } |
231 | |
232 | void buildNewSession(const Poco::URI & uri) override |
233 | { |
234 | session = makeHTTPSession(uri, timeouts); |
235 | } |
236 | }; |
237 | |
238 | class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatableSession>> |
239 | { |
240 | using Parent = detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatableSession>>; |
241 | |
242 | public: |
243 | explicit ReadWriteBufferFromHTTP(Poco::URI uri_, |
244 | const std::string & method_ = {}, |
245 | OutStreamCallback out_stream_callback_ = {}, |
246 | const ConnectionTimeouts & timeouts = {}, |
247 | const SettingUInt64 max_redirects = 0, |
248 | const Poco::Net::HTTPBasicCredentials & credentials_ = {}, |
249 | size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, |
250 | const HTTPHeaderEntries & = {}, |
251 | const RemoteHostFilter & remote_host_filter_ = {}) |
252 | : Parent(std::make_shared<UpdatableSession>(uri_, timeouts, max_redirects), uri_, method_, out_stream_callback_, credentials_, buffer_size_, http_header_entries_, remote_host_filter_) |
253 | { |
254 | } |
255 | }; |
256 | |
257 | class UpdatablePooledSession : public UpdatableSessionBase<PooledHTTPSessionPtr> |
258 | { |
259 | using Parent = UpdatableSessionBase<PooledHTTPSessionPtr>; |
260 | |
261 | private: |
262 | size_t per_endpoint_pool_size; |
263 | |
264 | public: |
265 | explicit UpdatablePooledSession(const Poco::URI uri, |
266 | const ConnectionTimeouts & timeouts_, |
267 | const SettingUInt64 max_redirects_, |
268 | size_t per_endpoint_pool_size_) |
269 | : Parent(uri, timeouts_, max_redirects_) |
270 | , per_endpoint_pool_size { per_endpoint_pool_size_ } |
271 | { |
272 | session = makePooledHTTPSession(initial_uri, timeouts, per_endpoint_pool_size); |
273 | } |
274 | |
275 | void buildNewSession(const Poco::URI & uri) override |
276 | { |
277 | session = makePooledHTTPSession(uri, timeouts, per_endpoint_pool_size); |
278 | } |
279 | }; |
280 | |
281 | class PooledReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatablePooledSession>> |
282 | { |
283 | using Parent = detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatablePooledSession>>; |
284 | |
285 | public: |
286 | explicit PooledReadWriteBufferFromHTTP(Poco::URI uri_, |
287 | const std::string & method_ = {}, |
288 | OutStreamCallback out_stream_callback_ = {}, |
289 | const ConnectionTimeouts & timeouts_ = {}, |
290 | const Poco::Net::HTTPBasicCredentials & credentials_ = {}, |
291 | size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, |
292 | const SettingUInt64 max_redirects = 0, |
293 | size_t max_connections_per_endpoint = DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT) |
294 | : Parent(std::make_shared<UpdatablePooledSession>(uri_, timeouts_, max_redirects, max_connections_per_endpoint), |
295 | uri_, |
296 | method_, |
297 | out_stream_callback_, |
298 | credentials_, |
299 | buffer_size_) |
300 | { |
301 | } |
302 | }; |
303 | |
304 | } |
305 | |
306 | |