1 | #include <Poco/Version.h> |
2 | #include <Poco/Net/HTTPServerResponse.h> |
3 | #include <IO/WriteBufferFromHTTPServerResponse.h> |
4 | #include <IO/WriteBufferFromString.h> |
5 | #include <IO/HTTPCommon.h> |
6 | #include <IO/Progress.h> |
7 | #include <Common/Exception.h> |
8 | #include <Common/NetException.h> |
9 | #include <Common/Stopwatch.h> |
10 | #include <Common/config.h> |
11 | |
12 | |
13 | namespace DB |
14 | { |
15 | |
16 | namespace ErrorCodes |
17 | { |
18 | extern const int LOGICAL_ERROR; |
19 | } |
20 | |
21 | |
22 | void WriteBufferFromHTTPServerResponse::() |
23 | { |
24 | if (!headers_started_sending) |
25 | { |
26 | headers_started_sending = true; |
27 | |
28 | if (add_cors_header) |
29 | response.set("Access-Control-Allow-Origin" , "*" ); |
30 | |
31 | setResponseDefaultHeaders(response, keep_alive_timeout); |
32 | |
33 | #if defined(POCO_CLICKHOUSE_PATCH) |
34 | if (request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD) |
35 | std::tie(response_header_ostr, response_body_ostr) = response.beginSend(); |
36 | #endif |
37 | } |
38 | } |
39 | |
40 | void WriteBufferFromHTTPServerResponse::() |
41 | { |
42 | #if defined(POCO_CLICKHOUSE_PATCH) |
43 | if (headers_finished_sending) |
44 | return; |
45 | |
46 | WriteBufferFromOwnString progress_string_writer; |
47 | accumulated_progress.writeJSON(progress_string_writer); |
48 | |
49 | if (response_header_ostr) |
50 | *response_header_ostr << "X-ClickHouse-Summary: " << progress_string_writer.str() << "\r\n" << std::flush; |
51 | #endif |
52 | } |
53 | |
54 | void WriteBufferFromHTTPServerResponse::() |
55 | { |
56 | #if defined(POCO_CLICKHOUSE_PATCH) |
57 | if (headers_finished_sending) |
58 | return; |
59 | |
60 | WriteBufferFromOwnString progress_string_writer; |
61 | accumulated_progress.writeJSON(progress_string_writer); |
62 | |
63 | if (response_header_ostr) |
64 | *response_header_ostr << "X-ClickHouse-Progress: " << progress_string_writer.str() << "\r\n" << std::flush; |
65 | #endif |
66 | } |
67 | |
68 | void WriteBufferFromHTTPServerResponse::() |
69 | { |
70 | if (!headers_finished_sending) |
71 | { |
72 | writeHeaderSummary(); |
73 | headers_finished_sending = true; |
74 | |
75 | if (request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD) |
76 | { |
77 | #if defined(POCO_CLICKHOUSE_PATCH) |
78 | /// Send end of headers delimiter. |
79 | if (response_header_ostr) |
80 | *response_header_ostr << "\r\n" << std::flush; |
81 | #else |
82 | /// Newline autosent by response.send() |
83 | /// if nothing to send in body: |
84 | if (!response_body_ostr) |
85 | response_body_ostr = &(response.send()); |
86 | #endif |
87 | } |
88 | else |
89 | { |
90 | if (!response_body_ostr) |
91 | response_body_ostr = &(response.send()); |
92 | } |
93 | } |
94 | } |
95 | |
96 | |
97 | void WriteBufferFromHTTPServerResponse::nextImpl() |
98 | { |
99 | { |
100 | std::lock_guard lock(mutex); |
101 | |
102 | startSendHeaders(); |
103 | |
104 | if (!out && request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD) |
105 | { |
106 | if (compress) |
107 | { |
108 | if (compression_method == CompressionMethod::Gzip) |
109 | { |
110 | #if defined(POCO_CLICKHOUSE_PATCH) |
111 | *response_header_ostr << "Content-Encoding: gzip\r\n" ; |
112 | #else |
113 | response.set("Content-Encoding" , "gzip" ); |
114 | response_body_ostr = &(response.send()); |
115 | #endif |
116 | out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr); |
117 | deflating_buf.emplace(std::move(out_raw), compression_method, compression_level, working_buffer.size(), working_buffer.begin()); |
118 | out = &*deflating_buf; |
119 | } |
120 | else if (compression_method == CompressionMethod::Zlib) |
121 | { |
122 | #if defined(POCO_CLICKHOUSE_PATCH) |
123 | *response_header_ostr << "Content-Encoding: deflate\r\n" ; |
124 | #else |
125 | response.set("Content-Encoding" , "deflate" ); |
126 | response_body_ostr = &(response.send()); |
127 | #endif |
128 | out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr); |
129 | deflating_buf.emplace(std::move(out_raw), compression_method, compression_level, working_buffer.size(), working_buffer.begin()); |
130 | out = &*deflating_buf; |
131 | } |
132 | #if USE_BROTLI |
133 | else if (compression_method == CompressionMethod::Brotli) |
134 | { |
135 | #if defined(POCO_CLICKHOUSE_PATCH) |
136 | *response_header_ostr << "Content-Encoding: br\r\n" ; |
137 | #else |
138 | response.set("Content-Encoding" , "br" ); |
139 | response_body_ostr = &(response.send()); |
140 | #endif |
141 | out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr); |
142 | brotli_buf.emplace(*out_raw, compression_level, working_buffer.size(), working_buffer.begin()); |
143 | out = &*brotli_buf; |
144 | } |
145 | #endif |
146 | |
147 | else |
148 | throw Exception("Logical error: unknown compression method passed to WriteBufferFromHTTPServerResponse" , |
149 | ErrorCodes::LOGICAL_ERROR); |
150 | /// Use memory allocated for the outer buffer in the buffer pointed to by out. This avoids extra allocation and copy. |
151 | } |
152 | else |
153 | { |
154 | #if !defined(POCO_CLICKHOUSE_PATCH) |
155 | response_body_ostr = &(response.send()); |
156 | #endif |
157 | |
158 | out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr, working_buffer.size(), working_buffer.begin()); |
159 | out = &*out_raw; |
160 | } |
161 | } |
162 | |
163 | finishSendHeaders(); |
164 | |
165 | } |
166 | |
167 | if (out) |
168 | { |
169 | out->position() = position(); |
170 | out->next(); |
171 | } |
172 | } |
173 | |
174 | |
175 | WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse( |
176 | Poco::Net::HTTPServerRequest & request_, |
177 | Poco::Net::HTTPServerResponse & response_, |
178 | unsigned keep_alive_timeout_, |
179 | bool compress_, |
180 | CompressionMethod compression_method_, |
181 | size_t size) |
182 | : BufferWithOwnMemory<WriteBuffer>(size) |
183 | , request(request_) |
184 | , response(response_) |
185 | , keep_alive_timeout(keep_alive_timeout_) |
186 | , compress(compress_) |
187 | , compression_method(compression_method_) |
188 | { |
189 | } |
190 | |
191 | |
192 | void WriteBufferFromHTTPServerResponse::onProgress(const Progress & progress) |
193 | { |
194 | std::lock_guard lock(mutex); |
195 | |
196 | /// Cannot add new headers if body was started to send. |
197 | if (headers_finished_sending) |
198 | return; |
199 | |
200 | accumulated_progress.incrementPiecewiseAtomically(progress); |
201 | |
202 | if (progress_watch.elapsed() >= send_progress_interval_ms * 1000000) |
203 | { |
204 | progress_watch.restart(); |
205 | |
206 | /// Send all common headers before our special progress headers. |
207 | startSendHeaders(); |
208 | writeHeaderProgress(); |
209 | } |
210 | } |
211 | |
212 | |
213 | void WriteBufferFromHTTPServerResponse::finalize() |
214 | { |
215 | if (offset()) |
216 | { |
217 | next(); |
218 | } |
219 | else |
220 | { |
221 | /// If no remaining data, just send headers. |
222 | std::lock_guard lock(mutex); |
223 | startSendHeaders(); |
224 | finishSendHeaders(); |
225 | } |
226 | } |
227 | |
228 | |
229 | WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse() |
230 | { |
231 | try |
232 | { |
233 | finalize(); |
234 | } |
235 | catch (...) |
236 | { |
237 | tryLogCurrentException(__PRETTY_FUNCTION__); |
238 | } |
239 | } |
240 | |
241 | } |
242 | |