1#include <Storages/StorageFactory.h>
2#include <Storages/StorageURL.h>
3
4#include <Interpreters/Context.h>
5#include <Interpreters/evaluateConstantExpression.h>
6#include <Parsers/ASTLiteral.h>
7
8#include <IO/ReadHelpers.h>
9#include <IO/ReadWriteBufferFromHTTP.h>
10#include <IO/WriteBufferFromHTTP.h>
11#include <IO/WriteHelpers.h>
12
13#include <Formats/FormatFactory.h>
14
15#include <DataStreams/IBlockOutputStream.h>
16#include <DataStreams/IBlockInputStream.h>
17#include <DataStreams/AddingDefaultsBlockInputStream.h>
18
19#include <Poco/Net/HTTPRequest.h>
20
21
22namespace DB
23{
24namespace ErrorCodes
25{
26 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
27 extern const int UNACCEPTABLE_URL;
28}
29
30IStorageURLBase::IStorageURLBase(
31 const Poco::URI & uri_,
32 const Context & context_,
33 const std::string & database_name_,
34 const std::string & table_name_,
35 const String & format_name_,
36 const ColumnsDescription & columns_,
37 const ConstraintsDescription & constraints_,
38 const String & compression_method_)
39 : uri(uri_), context_global(context_), compression_method(compression_method_), format_name(format_name_), table_name(table_name_), database_name(database_name_)
40{
41 context_global.getRemoteHostFilter().checkURL(uri);
42 setColumns(columns_);
43 setConstraints(constraints_);
44}
45
46namespace
47{
48 class StorageURLBlockInputStream : public IBlockInputStream
49 {
50 public:
51 StorageURLBlockInputStream(const Poco::URI & uri,
52 const std::string & method,
53 std::function<void(std::ostream &)> callback,
54 const String & format,
55 const String & name_,
56 const Block & sample_block,
57 const Context & context,
58 UInt64 max_block_size,
59 const ConnectionTimeouts & timeouts,
60 const CompressionMethod compression_method)
61 : name(name_)
62 {
63 read_buf = getReadBuffer<ReadWriteBufferFromHTTP>(
64 compression_method,
65 uri,
66 method,
67 callback,
68 timeouts,
69 context.getSettingsRef().max_http_get_redirects,
70 Poco::Net::HTTPBasicCredentials{},
71 DBMS_DEFAULT_BUFFER_SIZE,
72 ReadWriteBufferFromHTTP::HTTPHeaderEntries{},
73 context.getRemoteHostFilter());
74
75 reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
76 }
77
78 String getName() const override
79 {
80 return name;
81 }
82
83 Block readImpl() override
84 {
85 return reader->read();
86 }
87
88 Block getHeader() const override
89 {
90 return reader->getHeader();
91 }
92
93 void readPrefixImpl() override
94 {
95 reader->readPrefix();
96 }
97
98 void readSuffixImpl() override
99 {
100 reader->readSuffix();
101 }
102
103 private:
104 String name;
105 std::unique_ptr<ReadBuffer> read_buf;
106 BlockInputStreamPtr reader;
107 };
108
109 class StorageURLBlockOutputStream : public IBlockOutputStream
110 {
111 public:
112 StorageURLBlockOutputStream(const Poco::URI & uri,
113 const String & format,
114 const Block & sample_block_,
115 const Context & context,
116 const ConnectionTimeouts & timeouts,
117 const CompressionMethod compression_method)
118 : sample_block(sample_block_)
119 {
120 write_buf = getWriteBuffer<WriteBufferFromHTTP>(compression_method, uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts);
121 writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
122 }
123
124 Block getHeader() const override
125 {
126 return sample_block;
127 }
128
129 void write(const Block & block) override
130 {
131 writer->write(block);
132 }
133
134 void writePrefix() override
135 {
136 writer->writePrefix();
137 }
138
139 void writeSuffix() override
140 {
141 writer->writeSuffix();
142 writer->flush();
143 write_buf->finalize();
144 }
145
146 private:
147 Block sample_block;
148 std::unique_ptr<WriteBuffer> write_buf;
149 BlockOutputStreamPtr writer;
150 };
151}
152
153
154std::string IStorageURLBase::getReadMethod() const
155{
156 return Poco::Net::HTTPRequest::HTTP_GET;
157}
158
159std::vector<std::pair<std::string, std::string>> IStorageURLBase::getReadURIParams(const Names & /*column_names*/,
160 const SelectQueryInfo & /*query_info*/,
161 const Context & /*context*/,
162 QueryProcessingStage::Enum & /*processed_stage*/,
163 size_t /*max_block_size*/) const
164{
165 return {};
166}
167
168std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(const Names & /*column_names*/,
169 const SelectQueryInfo & /*query_info*/,
170 const Context & /*context*/,
171 QueryProcessingStage::Enum & /*processed_stage*/,
172 size_t /*max_block_size*/) const
173{
174 return nullptr;
175}
176
177
178BlockInputStreams IStorageURLBase::read(const Names & column_names,
179 const SelectQueryInfo & query_info,
180 const Context & context,
181 QueryProcessingStage::Enum processed_stage,
182 size_t max_block_size,
183 unsigned /*num_streams*/)
184{
185 auto request_uri = uri;
186 auto params = getReadURIParams(column_names, query_info, context, processed_stage, max_block_size);
187 for (const auto & [param, value] : params)
188 request_uri.addQueryParameter(param, value);
189
190 BlockInputStreamPtr block_input = std::make_shared<StorageURLBlockInputStream>(request_uri,
191 getReadMethod(),
192 getReadPOSTDataCallback(column_names, query_info, context, processed_stage, max_block_size),
193 format_name,
194 getName(),
195 getHeaderBlock(column_names),
196 context,
197 max_block_size,
198 ConnectionTimeouts::getHTTPTimeouts(context),
199 IStorage::chooseCompressionMethod(request_uri.getPath(), compression_method));
200
201 auto column_defaults = getColumns().getDefaults();
202 if (column_defaults.empty())
203 return {block_input};
204 return {std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context)};
205}
206
207void IStorageURLBase::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
208{
209 table_name = new_table_name;
210 database_name = new_database_name;
211}
212
213BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const Context & /*context*/)
214{
215 return std::make_shared<StorageURLBlockOutputStream>(
216 uri, format_name, getSampleBlock(), context_global,
217 ConnectionTimeouts::getHTTPTimeouts(context_global),
218 IStorage::chooseCompressionMethod(uri.toString(), compression_method));
219}
220
221void registerStorageURL(StorageFactory & factory)
222{
223 factory.registerStorage("URL", [](const StorageFactory::Arguments & args)
224 {
225 ASTs & engine_args = args.engine_args;
226
227 if (engine_args.size() != 2 && engine_args.size() != 3)
228 throw Exception(
229 "Storage URL requires 2 or 3 arguments: url, name of used format and optional compression method.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
230
231 engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context);
232
233 String url = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
234 Poco::URI uri(url);
235
236 engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.local_context);
237
238 String format_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
239
240 String compression_method;
241 if (engine_args.size() == 3)
242 {
243 engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
244 compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
245 } else compression_method = "auto";
246
247 return StorageURL::create(
248 uri,
249 args.database_name, args.table_name,
250 format_name,
251 args.columns, args.constraints, args.context,
252 compression_method);
253 });
254}
255}
256