1 | #include "HTTPDictionarySource.h" |
2 | #include <DataStreams/IBlockOutputStream.h> |
3 | #include <DataStreams/OwningBlockInputStream.h> |
4 | #include <IO/ConnectionTimeouts.h> |
5 | #include <IO/ReadWriteBufferFromHTTP.h> |
6 | #include <IO/WriteBufferFromOStream.h> |
7 | #include <IO/WriteBufferFromString.h> |
8 | #include <IO/WriteHelpers.h> |
9 | #include <Interpreters/Context.h> |
10 | #include <Poco/Net/HTTPRequest.h> |
11 | #include <common/logger_useful.h> |
12 | #include "DictionarySourceFactory.h" |
13 | #include "DictionarySourceHelpers.h" |
14 | #include "DictionaryStructure.h" |
15 | #include "registerDictionaries.h" |
16 | |
17 | |
18 | namespace DB |
19 | { |
20 | static const UInt64 max_block_size = 8192; |
21 | |
22 | |
23 | HTTPDictionarySource::HTTPDictionarySource( |
24 | const DictionaryStructure & dict_struct_, |
25 | const Poco::Util::AbstractConfiguration & config, |
26 | const std::string & config_prefix, |
27 | Block & sample_block_, |
28 | const Context & context_, |
29 | bool check_config) |
30 | : log(&Logger::get("HTTPDictionarySource" )) |
31 | , update_time{std::chrono::system_clock::from_time_t(0)} |
32 | , dict_struct{dict_struct_} |
33 | , url{config.getString(config_prefix + ".url" , "" )} |
34 | , update_field{config.getString(config_prefix + ".update_field" , "" )} |
35 | , format{config.getString(config_prefix + ".format" )} |
36 | , sample_block{sample_block_} |
37 | , context(context_) |
38 | , timeouts(ConnectionTimeouts::getHTTPTimeouts(context)) |
39 | { |
40 | |
41 | if (check_config) |
42 | context.getRemoteHostFilter().checkURL(Poco::URI(url)); |
43 | |
44 | const auto & credentials_prefix = config_prefix + ".credentials" ; |
45 | |
46 | if (config.has(credentials_prefix)) |
47 | { |
48 | credentials.setUsername(config.getString(credentials_prefix + ".user" , "" )); |
49 | credentials.setPassword(config.getString(credentials_prefix + ".password" , "" )); |
50 | } |
51 | |
52 | const auto & = config_prefix + ".headers" ; |
53 | |
54 | if (config.has(headers_prefix)) |
55 | { |
56 | Poco::Util::AbstractConfiguration::Keys config_keys; |
57 | config.keys(headers_prefix, config_keys); |
58 | |
59 | header_entries.reserve(config_keys.size()); |
60 | for (const auto & key : config_keys) |
61 | { |
62 | const auto = config.getString(headers_prefix + "." + key + ".name" , "" ); |
63 | const auto = config.getString(headers_prefix + "." + key + ".value" , "" ); |
64 | header_entries.emplace_back(std::make_tuple(header_key, header_value)); |
65 | } |
66 | } |
67 | } |
68 | |
69 | HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other) |
70 | : log(&Logger::get("HTTPDictionarySource" )) |
71 | , update_time{other.update_time} |
72 | , dict_struct{other.dict_struct} |
73 | , url{other.url} |
74 | , header_entries{other.header_entries} |
75 | , update_field{other.update_field} |
76 | , format{other.format} |
77 | , sample_block{other.sample_block} |
78 | , context(other.context) |
79 | , timeouts(ConnectionTimeouts::getHTTPTimeouts(context)) |
80 | { |
81 | credentials.setUsername(other.credentials.getUsername()); |
82 | credentials.setPassword(other.credentials.getPassword()); |
83 | } |
84 | |
85 | void HTTPDictionarySource::getUpdateFieldAndDate(Poco::URI & uri) |
86 | { |
87 | if (update_time != std::chrono::system_clock::from_time_t(0)) |
88 | { |
89 | auto tmp_time = update_time; |
90 | update_time = std::chrono::system_clock::now(); |
91 | time_t hr_time = std::chrono::system_clock::to_time_t(tmp_time) - 1; |
92 | WriteBufferFromOwnString out; |
93 | writeDateTimeText(hr_time, out); |
94 | uri.addQueryParameter(update_field, out.str()); |
95 | } |
96 | else |
97 | { |
98 | update_time = std::chrono::system_clock::now(); |
99 | } |
100 | } |
101 | |
102 | BlockInputStreamPtr HTTPDictionarySource::loadAll() |
103 | { |
104 | LOG_TRACE(log, "loadAll " + toString()); |
105 | Poco::URI uri(url); |
106 | auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>( |
107 | uri, Poco::Net::HTTPRequest::HTTP_GET, ReadWriteBufferFromHTTP::OutStreamCallback(), timeouts, |
108 | 0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries); |
109 | auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); |
110 | return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr)); |
111 | } |
112 | |
113 | BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll() |
114 | { |
115 | Poco::URI uri(url); |
116 | getUpdateFieldAndDate(uri); |
117 | LOG_TRACE(log, "loadUpdatedAll " + uri.toString()); |
118 | auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>( |
119 | uri, Poco::Net::HTTPRequest::HTTP_GET, ReadWriteBufferFromHTTP::OutStreamCallback(), timeouts, |
120 | 0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries); |
121 | auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); |
122 | return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr)); |
123 | } |
124 | |
125 | BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector<UInt64> & ids) |
126 | { |
127 | LOG_TRACE(log, "loadIds " << toString() << " size = " << ids.size()); |
128 | |
129 | ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & ostr) |
130 | { |
131 | WriteBufferFromOStream out_buffer(ostr); |
132 | auto output_stream = context.getOutputFormat(format, out_buffer, sample_block); |
133 | formatIDs(output_stream, ids); |
134 | }; |
135 | |
136 | Poco::URI uri(url); |
137 | auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>( |
138 | uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback, timeouts, |
139 | 0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries); |
140 | auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); |
141 | return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr)); |
142 | } |
143 | |
144 | BlockInputStreamPtr HTTPDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) |
145 | { |
146 | LOG_TRACE(log, "loadKeys " << toString() << " size = " << requested_rows.size()); |
147 | |
148 | ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & ostr) |
149 | { |
150 | WriteBufferFromOStream out_buffer(ostr); |
151 | auto output_stream = context.getOutputFormat(format, out_buffer, sample_block); |
152 | formatKeys(dict_struct, output_stream, key_columns, requested_rows); |
153 | }; |
154 | |
155 | Poco::URI uri(url); |
156 | auto in_ptr = std::make_unique<ReadWriteBufferFromHTTP>( |
157 | uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback, timeouts, |
158 | 0, credentials, DBMS_DEFAULT_BUFFER_SIZE, header_entries); |
159 | auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); |
160 | return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr)); |
161 | } |
162 | |
163 | bool HTTPDictionarySource::isModified() const |
164 | { |
165 | return true; |
166 | } |
167 | |
168 | bool HTTPDictionarySource::supportsSelectiveLoad() const |
169 | { |
170 | return true; |
171 | } |
172 | |
173 | bool HTTPDictionarySource::hasUpdateField() const |
174 | { |
175 | return !update_field.empty(); |
176 | } |
177 | |
178 | DictionarySourcePtr HTTPDictionarySource::clone() const |
179 | { |
180 | return std::make_unique<HTTPDictionarySource>(*this); |
181 | } |
182 | |
183 | std::string HTTPDictionarySource::toString() const |
184 | { |
185 | Poco::URI uri(url); |
186 | return uri.toString(); |
187 | } |
188 | |
189 | void registerDictionarySourceHTTP(DictionarySourceFactory & factory) |
190 | { |
191 | auto createTableSource = [=](const DictionaryStructure & dict_struct, |
192 | const Poco::Util::AbstractConfiguration & config, |
193 | const std::string & config_prefix, |
194 | Block & sample_block, |
195 | const Context & context, |
196 | bool check_config) -> DictionarySourcePtr |
197 | { |
198 | if (dict_struct.has_expressions) |
199 | throw Exception{"Dictionary source of type `http` does not support attribute expressions" , ErrorCodes::LOGICAL_ERROR}; |
200 | |
201 | return std::make_unique<HTTPDictionarySource>( |
202 | dict_struct, config, config_prefix + ".http" , |
203 | sample_block, context, check_config); |
204 | }; |
205 | factory.registerSource("http" , createTableSource); |
206 | } |
207 | |
208 | } |
209 | |