1 | #include <Common/config.h> |
2 | |
3 | #if USE_AWS_S3 |
4 | |
5 | #include <IO/S3Common.h> |
6 | #include <Storages/StorageFactory.h> |
7 | #include <Storages/StorageS3.h> |
8 | |
9 | #include <Interpreters/Context.h> |
10 | #include <Interpreters/evaluateConstantExpression.h> |
11 | #include <Parsers/ASTLiteral.h> |
12 | |
13 | #include <IO/ReadBufferFromS3.h> |
14 | #include <IO/ReadHelpers.h> |
15 | #include <IO/WriteBufferFromS3.h> |
16 | #include <IO/WriteHelpers.h> |
17 | |
18 | #include <Formats/FormatFactory.h> |
19 | |
20 | #include <DataStreams/IBlockOutputStream.h> |
21 | #include <DataStreams/IBlockInputStream.h> |
22 | #include <DataStreams/AddingDefaultsBlockInputStream.h> |
23 | |
24 | #include <aws/s3/S3Client.h> |
25 | |
26 | |
27 | namespace DB |
28 | { |
29 | namespace ErrorCodes |
30 | { |
31 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
32 | } |
33 | |
34 | |
35 | namespace |
36 | { |
37 | class StorageS3BlockInputStream : public IBlockInputStream |
38 | { |
39 | public: |
40 | StorageS3BlockInputStream( |
41 | const String & format, |
42 | const String & name_, |
43 | const Block & sample_block, |
44 | const Context & context, |
45 | UInt64 max_block_size, |
46 | const CompressionMethod compression_method, |
47 | const std::shared_ptr<Aws::S3::S3Client> & client, |
48 | const String & bucket, |
49 | const String & key) |
50 | : name(name_) |
51 | { |
52 | read_buf = getReadBuffer<ReadBufferFromS3>(compression_method, client, bucket, key); |
53 | reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); |
54 | } |
55 | |
56 | String getName() const override |
57 | { |
58 | return name; |
59 | } |
60 | |
61 | Block readImpl() override |
62 | { |
63 | return reader->read(); |
64 | } |
65 | |
66 | Block () const override |
67 | { |
68 | return reader->getHeader(); |
69 | } |
70 | |
71 | void readPrefixImpl() override |
72 | { |
73 | reader->readPrefix(); |
74 | } |
75 | |
76 | void readSuffixImpl() override |
77 | { |
78 | reader->readSuffix(); |
79 | } |
80 | |
81 | private: |
82 | String name; |
83 | std::unique_ptr<ReadBuffer> read_buf; |
84 | BlockInputStreamPtr reader; |
85 | }; |
86 | |
87 | class StorageS3BlockOutputStream : public IBlockOutputStream |
88 | { |
89 | public: |
90 | StorageS3BlockOutputStream( |
91 | const String & format, |
92 | UInt64 min_upload_part_size, |
93 | const Block & sample_block_, |
94 | const Context & context, |
95 | const CompressionMethod compression_method, |
96 | const std::shared_ptr<Aws::S3::S3Client> & client, |
97 | const String & bucket, |
98 | const String & key) |
99 | : sample_block(sample_block_) |
100 | { |
101 | write_buf = getWriteBuffer<WriteBufferFromS3>(compression_method, client, bucket, key, min_upload_part_size); |
102 | writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context); |
103 | } |
104 | |
105 | Block () const override |
106 | { |
107 | return sample_block; |
108 | } |
109 | |
110 | void write(const Block & block) override |
111 | { |
112 | writer->write(block); |
113 | } |
114 | |
115 | void writePrefix() override |
116 | { |
117 | writer->writePrefix(); |
118 | } |
119 | |
120 | void writeSuffix() override |
121 | { |
122 | writer->writeSuffix(); |
123 | writer->flush(); |
124 | write_buf->finalize(); |
125 | } |
126 | |
127 | private: |
128 | Block sample_block; |
129 | std::unique_ptr<WriteBuffer> write_buf; |
130 | BlockOutputStreamPtr writer; |
131 | }; |
132 | } |
133 | |
134 | |
135 | StorageS3::StorageS3(const S3::URI & uri_, |
136 | const String & access_key_id_, |
137 | const String & secret_access_key_, |
138 | const std::string & database_name_, |
139 | const std::string & table_name_, |
140 | const String & format_name_, |
141 | UInt64 min_upload_part_size_, |
142 | const ColumnsDescription & columns_, |
143 | const ConstraintsDescription & constraints_, |
144 | Context & context_, |
145 | const String & compression_method_ = "" ) |
146 | : IStorage(columns_) |
147 | , uri(uri_) |
148 | , context_global(context_) |
149 | , format_name(format_name_) |
150 | , database_name(database_name_) |
151 | , table_name(table_name_) |
152 | , min_upload_part_size(min_upload_part_size_) |
153 | , compression_method(compression_method_) |
154 | , client(S3::ClientFactory::instance().create(uri_.endpoint, access_key_id_, secret_access_key_)) |
155 | { |
156 | context_global.getRemoteHostFilter().checkURL(uri_.uri); |
157 | setColumns(columns_); |
158 | setConstraints(constraints_); |
159 | } |
160 | |
161 | |
162 | BlockInputStreams StorageS3::read( |
163 | const Names & column_names, |
164 | const SelectQueryInfo & /*query_info*/, |
165 | const Context & context, |
166 | QueryProcessingStage::Enum /*processed_stage*/, |
167 | size_t max_block_size, |
168 | unsigned /*num_streams*/) |
169 | { |
170 | BlockInputStreamPtr block_input = std::make_shared<StorageS3BlockInputStream>( |
171 | format_name, |
172 | getName(), |
173 | getHeaderBlock(column_names), |
174 | context, |
175 | max_block_size, |
176 | IStorage::chooseCompressionMethod(uri.endpoint, compression_method), |
177 | client, |
178 | uri.bucket, |
179 | uri.key); |
180 | |
181 | auto column_defaults = getColumns().getDefaults(); |
182 | if (column_defaults.empty()) |
183 | return {block_input}; |
184 | return {std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context)}; |
185 | } |
186 | |
187 | void StorageS3::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) |
188 | { |
189 | table_name = new_table_name; |
190 | database_name = new_database_name; |
191 | } |
192 | |
193 | BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const Context & /*context*/) |
194 | { |
195 | return std::make_shared<StorageS3BlockOutputStream>( |
196 | format_name, min_upload_part_size, getSampleBlock(), context_global, |
197 | IStorage::chooseCompressionMethod(uri.endpoint, compression_method), |
198 | client, uri.bucket, uri.key); |
199 | } |
200 | |
201 | void registerStorageS3(StorageFactory & factory) |
202 | { |
203 | factory.registerStorage("S3" , [](const StorageFactory::Arguments & args) |
204 | { |
205 | ASTs & engine_args = args.engine_args; |
206 | |
207 | if (engine_args.size() < 2 || engine_args.size() > 5) |
208 | throw Exception( |
209 | "Storage S3 requires 2 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method]." , ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
210 | |
211 | for (size_t i = 0; i < engine_args.size(); ++i) |
212 | engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[i], args.local_context); |
213 | |
214 | String url = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(); |
215 | Poco::URI uri (url); |
216 | S3::URI s3_uri (uri); |
217 | |
218 | String format_name = engine_args[engine_args.size() - 1]->as<ASTLiteral &>().value.safeGet<String>(); |
219 | |
220 | String access_key_id; |
221 | String secret_access_key; |
222 | if (engine_args.size() >= 4) |
223 | { |
224 | access_key_id = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>(); |
225 | secret_access_key = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>(); |
226 | } |
227 | |
228 | UInt64 min_upload_part_size = args.local_context.getSettingsRef().s3_min_upload_part_size; |
229 | |
230 | String compression_method; |
231 | if (engine_args.size() == 3 || engine_args.size() == 5) |
232 | compression_method = engine_args.back()->as<ASTLiteral &>().value.safeGet<String>(); |
233 | else |
234 | compression_method = "auto" ; |
235 | |
236 | return StorageS3::create(s3_uri, access_key_id, secret_access_key, args.database_name, args.table_name, format_name, min_upload_part_size, args.columns, args.constraints, args.context); |
237 | }); |
238 | } |
239 | |
240 | } |
241 | |
242 | #endif |
243 | |