1#pragma once
2
3#include <optional>
4#include <mutex>
5#include <Poco/Net/HTTPServerRequest.h>
6#include <Poco/Net/HTTPServerResponse.h>
7#include <Poco/Version.h>
8#include <IO/WriteBuffer.h>
9#include <IO/BufferWithOwnMemory.h>
10#include <IO/WriteBufferFromOStream.h>
11#include <IO/ZlibDeflatingWriteBuffer.h>
12#include <IO/BrotliWriteBuffer.h>
13#include <IO/HTTPCommon.h>
14#include <IO/Progress.h>
15#include <Common/NetException.h>
16#include <Common/Stopwatch.h>
17#include <Common/config.h>
18
19
20namespace Poco
21{
22 namespace Net
23 {
24 class HTTPServerResponse;
25 }
26}
27
28
29namespace DB
30{
31
32/// The difference from WriteBufferFromOStream is that this buffer gets the underlying std::ostream
33/// (using response.send()) only after data is flushed for the first time. This is needed in HTTP
34/// servers to change some HTTP headers (e.g. response code) before any data is sent to the client
35/// (headers can't be changed after response.send() is called).
36///
37/// In short, it allows delaying the call to response.send().
38///
39/// Additionally, supports HTTP response compression (in this case corresponding Content-Encoding
40/// header will be set).
41///
42/// Also this class write and flush special X-ClickHouse-Progress HTTP headers
43/// if no data was sent at the time of progress notification.
44/// This allows to implement progress bar in HTTP clients.
45class WriteBufferFromHTTPServerResponse : public BufferWithOwnMemory<WriteBuffer>
46{
47private:
48 Poco::Net::HTTPServerRequest & request;
49 Poco::Net::HTTPServerResponse & response;
50
51 bool add_cors_header = false;
52 unsigned keep_alive_timeout = 0;
53 bool compress = false;
54 CompressionMethod compression_method;
55 int compression_level = Z_DEFAULT_COMPRESSION;
56
57 std::ostream * response_body_ostr = nullptr;
58
59#if defined(POCO_CLICKHOUSE_PATCH)
60 std::ostream * response_header_ostr = nullptr;
61#endif
62
63 std::unique_ptr<WriteBufferFromOStream> out_raw;
64 std::optional<ZlibDeflatingWriteBuffer> deflating_buf;
65#if USE_BROTLI
66 std::optional<BrotliWriteBuffer> brotli_buf;
67#endif
68
69 WriteBuffer * out = nullptr; /// Uncompressed HTTP body is written to this buffer. Points to out_raw or possibly to deflating_buf.
70
71 bool headers_started_sending = false;
72 bool headers_finished_sending = false; /// If true, you could not add any headers.
73
74 Progress accumulated_progress;
75 size_t send_progress_interval_ms = 100;
76 Stopwatch progress_watch;
77
78 std::mutex mutex; /// progress callback could be called from different threads.
79
80
81 /// Must be called under locked mutex.
82 /// This method send headers, if this was not done already,
83 /// but not finish them with \r\n, allowing to send more headers subsequently.
84 void startSendHeaders();
85
86 // Used for write the header X-ClickHouse-Progress
87 void writeHeaderProgress();
88 // Used for write the header X-ClickHouse-Summary
89 void writeHeaderSummary();
90
91 /// This method finish headers with \r\n, allowing to start to send body.
92 void finishSendHeaders();
93
94 void nextImpl() override;
95
96public:
97 WriteBufferFromHTTPServerResponse(
98 Poco::Net::HTTPServerRequest & request_,
99 Poco::Net::HTTPServerResponse & response_,
100 unsigned keep_alive_timeout_,
101 bool compress_ = false, /// If true - set Content-Encoding header and compress the result.
102 CompressionMethod compression_method_ = CompressionMethod::Gzip,
103 size_t size = DBMS_DEFAULT_BUFFER_SIZE);
104
105 /// Writes progess in repeating HTTP headers.
106 void onProgress(const Progress & progress);
107
108 /// Send at least HTTP headers if no data has been sent yet.
109 /// Use after the data has possibly been sent and no error happened (and thus you do not plan
110 /// to change response HTTP code.
111 /// This method is idempotent.
112 void finalize() override;
113
114 /// Turn compression on or off.
115 /// The setting has any effect only if HTTP headers haven't been sent yet.
116 void setCompression(bool enable_compression)
117 {
118 compress = enable_compression;
119 }
120
121 /// Set compression level if the compression is turned on.
122 /// The setting has any effect only if HTTP headers haven't been sent yet.
123 void setCompressionLevel(int level)
124 {
125 compression_level = level;
126 }
127
128 /// Turn CORS on or off.
129 /// The setting has any effect only if HTTP headers haven't been sent yet.
130 void addHeaderCORS(bool enable_cors)
131 {
132 add_cors_header = enable_cors;
133 }
134
135 /// Don't send HTTP headers with progress more frequently.
136 void setSendProgressInterval(size_t send_progress_interval_ms_)
137 {
138 send_progress_interval_ms = send_progress_interval_ms_;
139 }
140
141 ~WriteBufferFromHTTPServerResponse() override;
142};
143
144}
145