| 1 | #include <Storages/StorageFactory.h> |
| 2 | #include <Storages/StorageURL.h> |
| 3 | |
| 4 | #include <Interpreters/Context.h> |
| 5 | #include <Interpreters/evaluateConstantExpression.h> |
| 6 | #include <Parsers/ASTLiteral.h> |
| 7 | |
| 8 | #include <IO/ReadHelpers.h> |
| 9 | #include <IO/ReadWriteBufferFromHTTP.h> |
| 10 | #include <IO/WriteBufferFromHTTP.h> |
| 11 | #include <IO/WriteHelpers.h> |
| 12 | |
| 13 | #include <Formats/FormatFactory.h> |
| 14 | |
| 15 | #include <DataStreams/IBlockOutputStream.h> |
| 16 | #include <DataStreams/IBlockInputStream.h> |
| 17 | #include <DataStreams/AddingDefaultsBlockInputStream.h> |
| 18 | |
| 19 | #include <Poco/Net/HTTPRequest.h> |
| 20 | |
| 21 | |
| 22 | namespace DB |
| 23 | { |
| 24 | namespace ErrorCodes |
| 25 | { |
| 26 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
| 27 | extern const int UNACCEPTABLE_URL; |
| 28 | } |
| 29 | |
| 30 | IStorageURLBase::IStorageURLBase( |
| 31 | const Poco::URI & uri_, |
| 32 | const Context & context_, |
| 33 | const std::string & database_name_, |
| 34 | const std::string & table_name_, |
| 35 | const String & format_name_, |
| 36 | const ColumnsDescription & columns_, |
| 37 | const ConstraintsDescription & constraints_, |
| 38 | const String & compression_method_) |
| 39 | : uri(uri_), context_global(context_), compression_method(compression_method_), format_name(format_name_), table_name(table_name_), database_name(database_name_) |
| 40 | { |
| 41 | context_global.getRemoteHostFilter().checkURL(uri); |
| 42 | setColumns(columns_); |
| 43 | setConstraints(constraints_); |
| 44 | } |
| 45 | |
| 46 | namespace |
| 47 | { |
| 48 | class StorageURLBlockInputStream : public IBlockInputStream |
| 49 | { |
| 50 | public: |
| 51 | StorageURLBlockInputStream(const Poco::URI & uri, |
| 52 | const std::string & method, |
| 53 | std::function<void(std::ostream &)> callback, |
| 54 | const String & format, |
| 55 | const String & name_, |
| 56 | const Block & sample_block, |
| 57 | const Context & context, |
| 58 | UInt64 max_block_size, |
| 59 | const ConnectionTimeouts & timeouts, |
| 60 | const CompressionMethod compression_method) |
| 61 | : name(name_) |
| 62 | { |
| 63 | read_buf = getReadBuffer<ReadWriteBufferFromHTTP>( |
| 64 | compression_method, |
| 65 | uri, |
| 66 | method, |
| 67 | callback, |
| 68 | timeouts, |
| 69 | context.getSettingsRef().max_http_get_redirects, |
| 70 | Poco::Net::HTTPBasicCredentials{}, |
| 71 | DBMS_DEFAULT_BUFFER_SIZE, |
| 72 | ReadWriteBufferFromHTTP::HTTPHeaderEntries{}, |
| 73 | context.getRemoteHostFilter()); |
| 74 | |
| 75 | reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); |
| 76 | } |
| 77 | |
| 78 | String getName() const override |
| 79 | { |
| 80 | return name; |
| 81 | } |
| 82 | |
| 83 | Block readImpl() override |
| 84 | { |
| 85 | return reader->read(); |
| 86 | } |
| 87 | |
| 88 | Block () const override |
| 89 | { |
| 90 | return reader->getHeader(); |
| 91 | } |
| 92 | |
| 93 | void readPrefixImpl() override |
| 94 | { |
| 95 | reader->readPrefix(); |
| 96 | } |
| 97 | |
| 98 | void readSuffixImpl() override |
| 99 | { |
| 100 | reader->readSuffix(); |
| 101 | } |
| 102 | |
| 103 | private: |
| 104 | String name; |
| 105 | std::unique_ptr<ReadBuffer> read_buf; |
| 106 | BlockInputStreamPtr reader; |
| 107 | }; |
| 108 | |
| 109 | class StorageURLBlockOutputStream : public IBlockOutputStream |
| 110 | { |
| 111 | public: |
| 112 | StorageURLBlockOutputStream(const Poco::URI & uri, |
| 113 | const String & format, |
| 114 | const Block & sample_block_, |
| 115 | const Context & context, |
| 116 | const ConnectionTimeouts & timeouts, |
| 117 | const CompressionMethod compression_method) |
| 118 | : sample_block(sample_block_) |
| 119 | { |
| 120 | write_buf = getWriteBuffer<WriteBufferFromHTTP>(compression_method, uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts); |
| 121 | writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context); |
| 122 | } |
| 123 | |
| 124 | Block () const override |
| 125 | { |
| 126 | return sample_block; |
| 127 | } |
| 128 | |
| 129 | void write(const Block & block) override |
| 130 | { |
| 131 | writer->write(block); |
| 132 | } |
| 133 | |
| 134 | void writePrefix() override |
| 135 | { |
| 136 | writer->writePrefix(); |
| 137 | } |
| 138 | |
| 139 | void writeSuffix() override |
| 140 | { |
| 141 | writer->writeSuffix(); |
| 142 | writer->flush(); |
| 143 | write_buf->finalize(); |
| 144 | } |
| 145 | |
| 146 | private: |
| 147 | Block sample_block; |
| 148 | std::unique_ptr<WriteBuffer> write_buf; |
| 149 | BlockOutputStreamPtr writer; |
| 150 | }; |
| 151 | } |
| 152 | |
| 153 | |
| 154 | std::string IStorageURLBase::getReadMethod() const |
| 155 | { |
| 156 | return Poco::Net::HTTPRequest::HTTP_GET; |
| 157 | } |
| 158 | |
| 159 | std::vector<std::pair<std::string, std::string>> IStorageURLBase::getReadURIParams(const Names & /*column_names*/, |
| 160 | const SelectQueryInfo & /*query_info*/, |
| 161 | const Context & /*context*/, |
| 162 | QueryProcessingStage::Enum & /*processed_stage*/, |
| 163 | size_t /*max_block_size*/) const |
| 164 | { |
| 165 | return {}; |
| 166 | } |
| 167 | |
| 168 | std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(const Names & /*column_names*/, |
| 169 | const SelectQueryInfo & /*query_info*/, |
| 170 | const Context & /*context*/, |
| 171 | QueryProcessingStage::Enum & /*processed_stage*/, |
| 172 | size_t /*max_block_size*/) const |
| 173 | { |
| 174 | return nullptr; |
| 175 | } |
| 176 | |
| 177 | |
| 178 | BlockInputStreams IStorageURLBase::read(const Names & column_names, |
| 179 | const SelectQueryInfo & query_info, |
| 180 | const Context & context, |
| 181 | QueryProcessingStage::Enum processed_stage, |
| 182 | size_t max_block_size, |
| 183 | unsigned /*num_streams*/) |
| 184 | { |
| 185 | auto request_uri = uri; |
| 186 | auto params = getReadURIParams(column_names, query_info, context, processed_stage, max_block_size); |
| 187 | for (const auto & [param, value] : params) |
| 188 | request_uri.addQueryParameter(param, value); |
| 189 | |
| 190 | BlockInputStreamPtr block_input = std::make_shared<StorageURLBlockInputStream>(request_uri, |
| 191 | getReadMethod(), |
| 192 | getReadPOSTDataCallback(column_names, query_info, context, processed_stage, max_block_size), |
| 193 | format_name, |
| 194 | getName(), |
| 195 | getHeaderBlock(column_names), |
| 196 | context, |
| 197 | max_block_size, |
| 198 | ConnectionTimeouts::getHTTPTimeouts(context), |
| 199 | IStorage::chooseCompressionMethod(request_uri.getPath(), compression_method)); |
| 200 | |
| 201 | auto column_defaults = getColumns().getDefaults(); |
| 202 | if (column_defaults.empty()) |
| 203 | return {block_input}; |
| 204 | return {std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context)}; |
| 205 | } |
| 206 | |
| 207 | void IStorageURLBase::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) |
| 208 | { |
| 209 | table_name = new_table_name; |
| 210 | database_name = new_database_name; |
| 211 | } |
| 212 | |
| 213 | BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const Context & /*context*/) |
| 214 | { |
| 215 | return std::make_shared<StorageURLBlockOutputStream>( |
| 216 | uri, format_name, getSampleBlock(), context_global, |
| 217 | ConnectionTimeouts::getHTTPTimeouts(context_global), |
| 218 | IStorage::chooseCompressionMethod(uri.toString(), compression_method)); |
| 219 | } |
| 220 | |
| 221 | void registerStorageURL(StorageFactory & factory) |
| 222 | { |
| 223 | factory.registerStorage("URL" , [](const StorageFactory::Arguments & args) |
| 224 | { |
| 225 | ASTs & engine_args = args.engine_args; |
| 226 | |
| 227 | if (engine_args.size() != 2 && engine_args.size() != 3) |
| 228 | throw Exception( |
| 229 | "Storage URL requires 2 or 3 arguments: url, name of used format and optional compression method." , ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
| 230 | |
| 231 | engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context); |
| 232 | |
| 233 | String url = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(); |
| 234 | Poco::URI uri(url); |
| 235 | |
| 236 | engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.local_context); |
| 237 | |
| 238 | String format_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>(); |
| 239 | |
| 240 | String compression_method; |
| 241 | if (engine_args.size() == 3) |
| 242 | { |
| 243 | engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context); |
| 244 | compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>(); |
| 245 | } else compression_method = "auto" ; |
| 246 | |
| 247 | return StorageURL::create( |
| 248 | uri, |
| 249 | args.database_name, args.table_name, |
| 250 | format_name, |
| 251 | args.columns, args.constraints, args.context, |
| 252 | compression_method); |
| 253 | }); |
| 254 | } |
| 255 | } |
| 256 | |