1 | #pragma once |
2 | |
3 | #include <optional> |
4 | #include <mutex> |
5 | #include <Poco/Net/HTTPServerRequest.h> |
6 | #include <Poco/Net/HTTPServerResponse.h> |
7 | #include <Poco/Version.h> |
8 | #include <IO/WriteBuffer.h> |
9 | #include <IO/BufferWithOwnMemory.h> |
10 | #include <IO/WriteBufferFromOStream.h> |
11 | #include <IO/ZlibDeflatingWriteBuffer.h> |
12 | #include <IO/BrotliWriteBuffer.h> |
13 | #include <IO/HTTPCommon.h> |
14 | #include <IO/Progress.h> |
15 | #include <Common/NetException.h> |
16 | #include <Common/Stopwatch.h> |
17 | #include <Common/config.h> |
18 | |
19 | |
20 | namespace Poco |
21 | { |
22 | namespace Net |
23 | { |
24 | class HTTPServerResponse; |
25 | } |
26 | } |
27 | |
28 | |
29 | namespace DB |
30 | { |
31 | |
32 | /// The difference from WriteBufferFromOStream is that this buffer gets the underlying std::ostream |
33 | /// (using response.send()) only after data is flushed for the first time. This is needed in HTTP |
34 | /// servers to change some HTTP headers (e.g. response code) before any data is sent to the client |
35 | /// (headers can't be changed after response.send() is called). |
36 | /// |
37 | /// In short, it allows delaying the call to response.send(). |
38 | /// |
39 | /// Additionally, supports HTTP response compression (in this case corresponding Content-Encoding |
40 | /// header will be set). |
41 | /// |
42 | /// Also this class write and flush special X-ClickHouse-Progress HTTP headers |
43 | /// if no data was sent at the time of progress notification. |
44 | /// This allows to implement progress bar in HTTP clients. |
45 | class WriteBufferFromHTTPServerResponse : public BufferWithOwnMemory<WriteBuffer> |
46 | { |
47 | private: |
48 | Poco::Net::HTTPServerRequest & request; |
49 | Poco::Net::HTTPServerResponse & response; |
50 | |
51 | bool = false; |
52 | unsigned keep_alive_timeout = 0; |
53 | bool compress = false; |
54 | CompressionMethod compression_method; |
55 | int compression_level = Z_DEFAULT_COMPRESSION; |
56 | |
57 | std::ostream * response_body_ostr = nullptr; |
58 | |
59 | #if defined(POCO_CLICKHOUSE_PATCH) |
60 | std::ostream * = nullptr; |
61 | #endif |
62 | |
63 | std::unique_ptr<WriteBufferFromOStream> out_raw; |
64 | std::optional<ZlibDeflatingWriteBuffer> deflating_buf; |
65 | #if USE_BROTLI |
66 | std::optional<BrotliWriteBuffer> brotli_buf; |
67 | #endif |
68 | |
69 | WriteBuffer * out = nullptr; /// Uncompressed HTTP body is written to this buffer. Points to out_raw or possibly to deflating_buf. |
70 | |
71 | bool = false; |
72 | bool = false; /// If true, you could not add any headers. |
73 | |
74 | Progress accumulated_progress; |
75 | size_t send_progress_interval_ms = 100; |
76 | Stopwatch progress_watch; |
77 | |
78 | std::mutex mutex; /// progress callback could be called from different threads. |
79 | |
80 | |
81 | /// Must be called under locked mutex. |
82 | /// This method send headers, if this was not done already, |
83 | /// but not finish them with \r\n, allowing to send more headers subsequently. |
84 | void (); |
85 | |
86 | // Used for write the header X-ClickHouse-Progress |
87 | void (); |
88 | // Used for write the header X-ClickHouse-Summary |
89 | void (); |
90 | |
91 | /// This method finish headers with \r\n, allowing to start to send body. |
92 | void (); |
93 | |
94 | void nextImpl() override; |
95 | |
96 | public: |
97 | WriteBufferFromHTTPServerResponse( |
98 | Poco::Net::HTTPServerRequest & request_, |
99 | Poco::Net::HTTPServerResponse & response_, |
100 | unsigned keep_alive_timeout_, |
101 | bool compress_ = false, /// If true - set Content-Encoding header and compress the result. |
102 | CompressionMethod compression_method_ = CompressionMethod::Gzip, |
103 | size_t size = DBMS_DEFAULT_BUFFER_SIZE); |
104 | |
105 | /// Writes progess in repeating HTTP headers. |
106 | void onProgress(const Progress & progress); |
107 | |
108 | /// Send at least HTTP headers if no data has been sent yet. |
109 | /// Use after the data has possibly been sent and no error happened (and thus you do not plan |
110 | /// to change response HTTP code. |
111 | /// This method is idempotent. |
112 | void finalize() override; |
113 | |
114 | /// Turn compression on or off. |
115 | /// The setting has any effect only if HTTP headers haven't been sent yet. |
116 | void setCompression(bool enable_compression) |
117 | { |
118 | compress = enable_compression; |
119 | } |
120 | |
121 | /// Set compression level if the compression is turned on. |
122 | /// The setting has any effect only if HTTP headers haven't been sent yet. |
123 | void setCompressionLevel(int level) |
124 | { |
125 | compression_level = level; |
126 | } |
127 | |
128 | /// Turn CORS on or off. |
129 | /// The setting has any effect only if HTTP headers haven't been sent yet. |
130 | void (bool enable_cors) |
131 | { |
132 | add_cors_header = enable_cors; |
133 | } |
134 | |
135 | /// Don't send HTTP headers with progress more frequently. |
136 | void setSendProgressInterval(size_t send_progress_interval_ms_) |
137 | { |
138 | send_progress_interval_ms = send_progress_interval_ms_; |
139 | } |
140 | |
141 | ~WriteBufferFromHTTPServerResponse() override; |
142 | }; |
143 | |
144 | } |
145 | |