1#include <Poco/Version.h>
2#include <Poco/Net/HTTPServerResponse.h>
3#include <IO/WriteBufferFromHTTPServerResponse.h>
4#include <IO/WriteBufferFromString.h>
5#include <IO/HTTPCommon.h>
6#include <IO/Progress.h>
7#include <Common/Exception.h>
8#include <Common/NetException.h>
9#include <Common/Stopwatch.h>
10#include <Common/config.h>
11
12
13namespace DB
14{
15
16namespace ErrorCodes
17{
18 extern const int LOGICAL_ERROR;
19}
20
21
22void WriteBufferFromHTTPServerResponse::startSendHeaders()
23{
24 if (!headers_started_sending)
25 {
26 headers_started_sending = true;
27
28 if (add_cors_header)
29 response.set("Access-Control-Allow-Origin", "*");
30
31 setResponseDefaultHeaders(response, keep_alive_timeout);
32
33#if defined(POCO_CLICKHOUSE_PATCH)
34 if (request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD)
35 std::tie(response_header_ostr, response_body_ostr) = response.beginSend();
36#endif
37 }
38}
39
40void WriteBufferFromHTTPServerResponse::writeHeaderSummary()
41{
42#if defined(POCO_CLICKHOUSE_PATCH)
43 if (headers_finished_sending)
44 return;
45
46 WriteBufferFromOwnString progress_string_writer;
47 accumulated_progress.writeJSON(progress_string_writer);
48
49 if (response_header_ostr)
50 *response_header_ostr << "X-ClickHouse-Summary: " << progress_string_writer.str() << "\r\n" << std::flush;
51#endif
52}
53
54void WriteBufferFromHTTPServerResponse::writeHeaderProgress()
55{
56#if defined(POCO_CLICKHOUSE_PATCH)
57 if (headers_finished_sending)
58 return;
59
60 WriteBufferFromOwnString progress_string_writer;
61 accumulated_progress.writeJSON(progress_string_writer);
62
63 if (response_header_ostr)
64 *response_header_ostr << "X-ClickHouse-Progress: " << progress_string_writer.str() << "\r\n" << std::flush;
65#endif
66}
67
68void WriteBufferFromHTTPServerResponse::finishSendHeaders()
69{
70 if (!headers_finished_sending)
71 {
72 writeHeaderSummary();
73 headers_finished_sending = true;
74
75 if (request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD)
76 {
77#if defined(POCO_CLICKHOUSE_PATCH)
78 /// Send end of headers delimiter.
79 if (response_header_ostr)
80 *response_header_ostr << "\r\n" << std::flush;
81#else
82 /// Newline autosent by response.send()
83 /// if nothing to send in body:
84 if (!response_body_ostr)
85 response_body_ostr = &(response.send());
86#endif
87 }
88 else
89 {
90 if (!response_body_ostr)
91 response_body_ostr = &(response.send());
92 }
93 }
94}
95
96
97void WriteBufferFromHTTPServerResponse::nextImpl()
98{
99 {
100 std::lock_guard lock(mutex);
101
102 startSendHeaders();
103
104 if (!out && request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD)
105 {
106 if (compress)
107 {
108 if (compression_method == CompressionMethod::Gzip)
109 {
110#if defined(POCO_CLICKHOUSE_PATCH)
111 *response_header_ostr << "Content-Encoding: gzip\r\n";
112#else
113 response.set("Content-Encoding", "gzip");
114 response_body_ostr = &(response.send());
115#endif
116 out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr);
117 deflating_buf.emplace(std::move(out_raw), compression_method, compression_level, working_buffer.size(), working_buffer.begin());
118 out = &*deflating_buf;
119 }
120 else if (compression_method == CompressionMethod::Zlib)
121 {
122#if defined(POCO_CLICKHOUSE_PATCH)
123 *response_header_ostr << "Content-Encoding: deflate\r\n";
124#else
125 response.set("Content-Encoding", "deflate");
126 response_body_ostr = &(response.send());
127#endif
128 out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr);
129 deflating_buf.emplace(std::move(out_raw), compression_method, compression_level, working_buffer.size(), working_buffer.begin());
130 out = &*deflating_buf;
131 }
132#if USE_BROTLI
133 else if (compression_method == CompressionMethod::Brotli)
134 {
135#if defined(POCO_CLICKHOUSE_PATCH)
136 *response_header_ostr << "Content-Encoding: br\r\n";
137#else
138 response.set("Content-Encoding", "br");
139 response_body_ostr = &(response.send());
140#endif
141 out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr);
142 brotli_buf.emplace(*out_raw, compression_level, working_buffer.size(), working_buffer.begin());
143 out = &*brotli_buf;
144 }
145#endif
146
147 else
148 throw Exception("Logical error: unknown compression method passed to WriteBufferFromHTTPServerResponse",
149 ErrorCodes::LOGICAL_ERROR);
150 /// Use memory allocated for the outer buffer in the buffer pointed to by out. This avoids extra allocation and copy.
151 }
152 else
153 {
154#if !defined(POCO_CLICKHOUSE_PATCH)
155 response_body_ostr = &(response.send());
156#endif
157
158 out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr, working_buffer.size(), working_buffer.begin());
159 out = &*out_raw;
160 }
161 }
162
163 finishSendHeaders();
164
165 }
166
167 if (out)
168 {
169 out->position() = position();
170 out->next();
171 }
172}
173
174
175WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse(
176 Poco::Net::HTTPServerRequest & request_,
177 Poco::Net::HTTPServerResponse & response_,
178 unsigned keep_alive_timeout_,
179 bool compress_,
180 CompressionMethod compression_method_,
181 size_t size)
182 : BufferWithOwnMemory<WriteBuffer>(size)
183 , request(request_)
184 , response(response_)
185 , keep_alive_timeout(keep_alive_timeout_)
186 , compress(compress_)
187 , compression_method(compression_method_)
188{
189}
190
191
192void WriteBufferFromHTTPServerResponse::onProgress(const Progress & progress)
193{
194 std::lock_guard lock(mutex);
195
196 /// Cannot add new headers if body was started to send.
197 if (headers_finished_sending)
198 return;
199
200 accumulated_progress.incrementPiecewiseAtomically(progress);
201
202 if (progress_watch.elapsed() >= send_progress_interval_ms * 1000000)
203 {
204 progress_watch.restart();
205
206 /// Send all common headers before our special progress headers.
207 startSendHeaders();
208 writeHeaderProgress();
209 }
210}
211
212
213void WriteBufferFromHTTPServerResponse::finalize()
214{
215 if (offset())
216 {
217 next();
218 }
219 else
220 {
221 /// If no remaining data, just send headers.
222 std::lock_guard lock(mutex);
223 startSendHeaders();
224 finishSendHeaders();
225 }
226}
227
228
229WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse()
230{
231 try
232 {
233 finalize();
234 }
235 catch (...)
236 {
237 tryLogCurrentException(__PRETTY_FUNCTION__);
238 }
239}
240
241}
242