1 | #include <Storages/StorageFactory.h> |
2 | #include <Storages/StorageURL.h> |
3 | |
4 | #include <Interpreters/Context.h> |
5 | #include <Interpreters/evaluateConstantExpression.h> |
6 | #include <Parsers/ASTLiteral.h> |
7 | |
8 | #include <IO/ReadHelpers.h> |
9 | #include <IO/ReadWriteBufferFromHTTP.h> |
10 | #include <IO/WriteBufferFromHTTP.h> |
11 | #include <IO/WriteHelpers.h> |
12 | |
13 | #include <Formats/FormatFactory.h> |
14 | |
15 | #include <DataStreams/IBlockOutputStream.h> |
16 | #include <DataStreams/IBlockInputStream.h> |
17 | #include <DataStreams/AddingDefaultsBlockInputStream.h> |
18 | |
19 | #include <Poco/Net/HTTPRequest.h> |
20 | |
21 | |
22 | namespace DB |
23 | { |
24 | namespace ErrorCodes |
25 | { |
26 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
27 | extern const int UNACCEPTABLE_URL; |
28 | } |
29 | |
30 | IStorageURLBase::IStorageURLBase( |
31 | const Poco::URI & uri_, |
32 | const Context & context_, |
33 | const std::string & database_name_, |
34 | const std::string & table_name_, |
35 | const String & format_name_, |
36 | const ColumnsDescription & columns_, |
37 | const ConstraintsDescription & constraints_, |
38 | const String & compression_method_) |
39 | : uri(uri_), context_global(context_), compression_method(compression_method_), format_name(format_name_), table_name(table_name_), database_name(database_name_) |
40 | { |
41 | context_global.getRemoteHostFilter().checkURL(uri); |
42 | setColumns(columns_); |
43 | setConstraints(constraints_); |
44 | } |
45 | |
46 | namespace |
47 | { |
48 | class StorageURLBlockInputStream : public IBlockInputStream |
49 | { |
50 | public: |
51 | StorageURLBlockInputStream(const Poco::URI & uri, |
52 | const std::string & method, |
53 | std::function<void(std::ostream &)> callback, |
54 | const String & format, |
55 | const String & name_, |
56 | const Block & sample_block, |
57 | const Context & context, |
58 | UInt64 max_block_size, |
59 | const ConnectionTimeouts & timeouts, |
60 | const CompressionMethod compression_method) |
61 | : name(name_) |
62 | { |
63 | read_buf = getReadBuffer<ReadWriteBufferFromHTTP>( |
64 | compression_method, |
65 | uri, |
66 | method, |
67 | callback, |
68 | timeouts, |
69 | context.getSettingsRef().max_http_get_redirects, |
70 | Poco::Net::HTTPBasicCredentials{}, |
71 | DBMS_DEFAULT_BUFFER_SIZE, |
72 | ReadWriteBufferFromHTTP::HTTPHeaderEntries{}, |
73 | context.getRemoteHostFilter()); |
74 | |
75 | reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); |
76 | } |
77 | |
78 | String getName() const override |
79 | { |
80 | return name; |
81 | } |
82 | |
83 | Block readImpl() override |
84 | { |
85 | return reader->read(); |
86 | } |
87 | |
88 | Block () const override |
89 | { |
90 | return reader->getHeader(); |
91 | } |
92 | |
93 | void readPrefixImpl() override |
94 | { |
95 | reader->readPrefix(); |
96 | } |
97 | |
98 | void readSuffixImpl() override |
99 | { |
100 | reader->readSuffix(); |
101 | } |
102 | |
103 | private: |
104 | String name; |
105 | std::unique_ptr<ReadBuffer> read_buf; |
106 | BlockInputStreamPtr reader; |
107 | }; |
108 | |
109 | class StorageURLBlockOutputStream : public IBlockOutputStream |
110 | { |
111 | public: |
112 | StorageURLBlockOutputStream(const Poco::URI & uri, |
113 | const String & format, |
114 | const Block & sample_block_, |
115 | const Context & context, |
116 | const ConnectionTimeouts & timeouts, |
117 | const CompressionMethod compression_method) |
118 | : sample_block(sample_block_) |
119 | { |
120 | write_buf = getWriteBuffer<WriteBufferFromHTTP>(compression_method, uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts); |
121 | writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context); |
122 | } |
123 | |
124 | Block () const override |
125 | { |
126 | return sample_block; |
127 | } |
128 | |
129 | void write(const Block & block) override |
130 | { |
131 | writer->write(block); |
132 | } |
133 | |
134 | void writePrefix() override |
135 | { |
136 | writer->writePrefix(); |
137 | } |
138 | |
139 | void writeSuffix() override |
140 | { |
141 | writer->writeSuffix(); |
142 | writer->flush(); |
143 | write_buf->finalize(); |
144 | } |
145 | |
146 | private: |
147 | Block sample_block; |
148 | std::unique_ptr<WriteBuffer> write_buf; |
149 | BlockOutputStreamPtr writer; |
150 | }; |
151 | } |
152 | |
153 | |
154 | std::string IStorageURLBase::getReadMethod() const |
155 | { |
156 | return Poco::Net::HTTPRequest::HTTP_GET; |
157 | } |
158 | |
159 | std::vector<std::pair<std::string, std::string>> IStorageURLBase::getReadURIParams(const Names & /*column_names*/, |
160 | const SelectQueryInfo & /*query_info*/, |
161 | const Context & /*context*/, |
162 | QueryProcessingStage::Enum & /*processed_stage*/, |
163 | size_t /*max_block_size*/) const |
164 | { |
165 | return {}; |
166 | } |
167 | |
168 | std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(const Names & /*column_names*/, |
169 | const SelectQueryInfo & /*query_info*/, |
170 | const Context & /*context*/, |
171 | QueryProcessingStage::Enum & /*processed_stage*/, |
172 | size_t /*max_block_size*/) const |
173 | { |
174 | return nullptr; |
175 | } |
176 | |
177 | |
178 | BlockInputStreams IStorageURLBase::read(const Names & column_names, |
179 | const SelectQueryInfo & query_info, |
180 | const Context & context, |
181 | QueryProcessingStage::Enum processed_stage, |
182 | size_t max_block_size, |
183 | unsigned /*num_streams*/) |
184 | { |
185 | auto request_uri = uri; |
186 | auto params = getReadURIParams(column_names, query_info, context, processed_stage, max_block_size); |
187 | for (const auto & [param, value] : params) |
188 | request_uri.addQueryParameter(param, value); |
189 | |
190 | BlockInputStreamPtr block_input = std::make_shared<StorageURLBlockInputStream>(request_uri, |
191 | getReadMethod(), |
192 | getReadPOSTDataCallback(column_names, query_info, context, processed_stage, max_block_size), |
193 | format_name, |
194 | getName(), |
195 | getHeaderBlock(column_names), |
196 | context, |
197 | max_block_size, |
198 | ConnectionTimeouts::getHTTPTimeouts(context), |
199 | IStorage::chooseCompressionMethod(request_uri.getPath(), compression_method)); |
200 | |
201 | auto column_defaults = getColumns().getDefaults(); |
202 | if (column_defaults.empty()) |
203 | return {block_input}; |
204 | return {std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context)}; |
205 | } |
206 | |
207 | void IStorageURLBase::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) |
208 | { |
209 | table_name = new_table_name; |
210 | database_name = new_database_name; |
211 | } |
212 | |
213 | BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const Context & /*context*/) |
214 | { |
215 | return std::make_shared<StorageURLBlockOutputStream>( |
216 | uri, format_name, getSampleBlock(), context_global, |
217 | ConnectionTimeouts::getHTTPTimeouts(context_global), |
218 | IStorage::chooseCompressionMethod(uri.toString(), compression_method)); |
219 | } |
220 | |
221 | void registerStorageURL(StorageFactory & factory) |
222 | { |
223 | factory.registerStorage("URL" , [](const StorageFactory::Arguments & args) |
224 | { |
225 | ASTs & engine_args = args.engine_args; |
226 | |
227 | if (engine_args.size() != 2 && engine_args.size() != 3) |
228 | throw Exception( |
229 | "Storage URL requires 2 or 3 arguments: url, name of used format and optional compression method." , ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
230 | |
231 | engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context); |
232 | |
233 | String url = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(); |
234 | Poco::URI uri(url); |
235 | |
236 | engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.local_context); |
237 | |
238 | String format_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>(); |
239 | |
240 | String compression_method; |
241 | if (engine_args.size() == 3) |
242 | { |
243 | engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context); |
244 | compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>(); |
245 | } else compression_method = "auto" ; |
246 | |
247 | return StorageURL::create( |
248 | uri, |
249 | args.database_name, args.table_name, |
250 | format_name, |
251 | args.columns, args.constraints, args.context, |
252 | compression_method); |
253 | }); |
254 | } |
255 | } |
256 | |