1 | #include <Common/config.h> |
2 | |
3 | #if USE_HDFS |
4 | |
5 | #include <Storages/StorageFactory.h> |
6 | #include <Storages/StorageHDFS.h> |
7 | #include <Interpreters/Context.h> |
8 | #include <Interpreters/evaluateConstantExpression.h> |
9 | #include <Parsers/ASTLiteral.h> |
10 | #include <IO/ReadHelpers.h> |
11 | #include <IO/ReadBufferFromHDFS.h> |
12 | #include <IO/WriteBufferFromHDFS.h> |
13 | #include <IO/WriteHelpers.h> |
14 | #include <IO/HDFSCommon.h> |
15 | #include <Formats/FormatFactory.h> |
16 | #include <DataStreams/IBlockOutputStream.h> |
17 | #include <DataStreams/UnionBlockInputStream.h> |
18 | #include <DataStreams/OwningBlockInputStream.h> |
19 | #include <DataStreams/IBlockInputStream.h> |
20 | #include <DataStreams/narrowBlockInputStreams.h> |
21 | |
22 | #include <Common/parseGlobs.h> |
23 | #include <Poco/URI.h> |
24 | #include <re2/re2.h> |
25 | #include <re2/stringpiece.h> |
26 | #include <hdfs/hdfs.h> |
27 | |
28 | namespace DB |
29 | { |
30 | namespace ErrorCodes |
31 | { |
32 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
33 | extern const int NOT_IMPLEMENTED; |
34 | extern const int BAD_ARGUMENTS; |
35 | } |
36 | |
37 | StorageHDFS::StorageHDFS(const String & uri_, |
38 | const std::string & database_name_, |
39 | const std::string & table_name_, |
40 | const String & format_name_, |
41 | const ColumnsDescription & columns_, |
42 | const ConstraintsDescription & constraints_, |
43 | Context & context_, |
44 | const String & compression_method_ = "" ) |
45 | : uri(uri_) |
46 | , format_name(format_name_) |
47 | , table_name(table_name_) |
48 | , database_name(database_name_) |
49 | , context(context_) |
50 | , compression_method(compression_method_) |
51 | { |
52 | context.getRemoteHostFilter().checkURL(Poco::URI(uri)); |
53 | setColumns(columns_); |
54 | setConstraints(constraints_); |
55 | } |
56 | |
57 | namespace |
58 | { |
59 | |
60 | class HDFSBlockInputStream : public IBlockInputStream |
61 | { |
62 | public: |
63 | HDFSBlockInputStream(const String & uri, |
64 | const String & format, |
65 | const Block & sample_block, |
66 | const Context & context, |
67 | UInt64 max_block_size, |
68 | const CompressionMethod compression_method) |
69 | { |
70 | auto read_buf = getReadBuffer<ReadBufferFromHDFS>(compression_method, uri); |
71 | |
72 | auto input_stream = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); |
73 | reader = std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(read_buf)); |
74 | } |
75 | |
76 | String getName() const override |
77 | { |
78 | return "HDFS" ; |
79 | } |
80 | |
81 | Block readImpl() override |
82 | { |
83 | return reader->read(); |
84 | } |
85 | |
86 | Block () const override |
87 | { |
88 | return reader->getHeader(); |
89 | } |
90 | |
91 | void readPrefixImpl() override |
92 | { |
93 | reader->readPrefix(); |
94 | } |
95 | |
96 | void readSuffixImpl() override |
97 | { |
98 | reader->readSuffix(); |
99 | } |
100 | |
101 | private: |
102 | BlockInputStreamPtr reader; |
103 | }; |
104 | |
105 | class HDFSBlockOutputStream : public IBlockOutputStream |
106 | { |
107 | public: |
108 | HDFSBlockOutputStream(const String & uri, |
109 | const String & format, |
110 | const Block & sample_block_, |
111 | const Context & context, |
112 | const CompressionMethod compression_method) |
113 | : sample_block(sample_block_) |
114 | { |
115 | write_buf = getWriteBuffer<WriteBufferFromHDFS>(compression_method, uri); |
116 | writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context); |
117 | } |
118 | |
119 | Block () const override |
120 | { |
121 | return sample_block; |
122 | } |
123 | |
124 | void write(const Block & block) override |
125 | { |
126 | writer->write(block); |
127 | } |
128 | |
129 | void writePrefix() override |
130 | { |
131 | writer->writePrefix(); |
132 | } |
133 | |
134 | void writeSuffix() override |
135 | { |
136 | writer->writeSuffix(); |
137 | writer->flush(); |
138 | write_buf->sync(); |
139 | } |
140 | |
141 | private: |
142 | Block sample_block; |
143 | std::unique_ptr<WriteBuffer> write_buf; |
144 | BlockOutputStreamPtr writer; |
145 | }; |
146 | |
147 | /* Recursive directory listing with matched paths as a result. |
148 | * Have the same method in StorageFile. |
149 | */ |
150 | Strings LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, const String & for_match) |
151 | { |
152 | const size_t first_glob = for_match.find_first_of("*?{" ); |
153 | |
154 | const size_t end_of_path_without_globs = for_match.substr(0, first_glob).rfind('/'); |
155 | const String suffix_with_globs = for_match.substr(end_of_path_without_globs); /// begin with '/' |
156 | const String prefix_without_globs = path_for_ls + for_match.substr(1, end_of_path_without_globs); /// ends with '/' |
157 | |
158 | const size_t next_slash = suffix_with_globs.find('/', 1); |
159 | re2::RE2 matcher(makeRegexpPatternFromGlobs(suffix_with_globs.substr(0, next_slash))); |
160 | |
161 | HDFSFileInfo ls; |
162 | ls.file_info = hdfsListDirectory(fs.get(), prefix_without_globs.data(), &ls.length); |
163 | Strings result; |
164 | for (int i = 0; i < ls.length; ++i) |
165 | { |
166 | const String full_path = String(ls.file_info[i].mName); |
167 | const size_t last_slash = full_path.rfind('/'); |
168 | const String file_name = full_path.substr(last_slash); |
169 | const bool looking_for_directory = next_slash != std::string::npos; |
170 | const bool is_directory = ls.file_info[i].mKind == 'D'; |
171 | /// Condition with type of current file_info means what kind of path is it in current iteration of ls |
172 | if (!is_directory && !looking_for_directory) |
173 | { |
174 | if (re2::RE2::FullMatch(file_name, matcher)) |
175 | { |
176 | result.push_back(String(ls.file_info[i].mName)); |
177 | } |
178 | } |
179 | else if (is_directory && looking_for_directory) |
180 | { |
181 | if (re2::RE2::FullMatch(file_name, matcher)) |
182 | { |
183 | Strings result_part = LSWithRegexpMatching(full_path + "/" , fs, suffix_with_globs.substr(next_slash)); |
184 | /// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check. |
185 | std::move(result_part.begin(), result_part.end(), std::back_inserter(result)); |
186 | } |
187 | } |
188 | } |
189 | |
190 | return result; |
191 | } |
192 | |
193 | } |
194 | |
195 | |
196 | BlockInputStreams StorageHDFS::read( |
197 | const Names & /*column_names*/, |
198 | const SelectQueryInfo & /*query_info*/, |
199 | const Context & context_, |
200 | QueryProcessingStage::Enum /*processed_stage*/, |
201 | size_t max_block_size, |
202 | unsigned num_streams) |
203 | { |
204 | const size_t begin_of_path = uri.find('/', uri.find("//" ) + 2); |
205 | const String path_from_uri = uri.substr(begin_of_path); |
206 | const String uri_without_path = uri.substr(0, begin_of_path); |
207 | |
208 | HDFSBuilderPtr builder = createHDFSBuilder(uri_without_path + "/" ); |
209 | HDFSFSPtr fs = createHDFSFS(builder.get()); |
210 | |
211 | const Strings res_paths = LSWithRegexpMatching("/" , fs, path_from_uri); |
212 | BlockInputStreams result; |
213 | for (const auto & res_path : res_paths) |
214 | { |
215 | result.push_back(std::make_shared<HDFSBlockInputStream>(uri_without_path + res_path, format_name, getSampleBlock(), context_, |
216 | max_block_size, IStorage::chooseCompressionMethod(res_path, compression_method))); |
217 | } |
218 | |
219 | return narrowBlockInputStreams(result, num_streams); |
220 | } |
221 | |
222 | void StorageHDFS::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) |
223 | { |
224 | table_name = new_table_name; |
225 | database_name = new_database_name; |
226 | } |
227 | |
228 | BlockOutputStreamPtr StorageHDFS::write(const ASTPtr & /*query*/, const Context & /*context*/) |
229 | { |
230 | return std::make_shared<HDFSBlockOutputStream>(uri, |
231 | format_name, |
232 | getSampleBlock(), |
233 | context, |
234 | IStorage::chooseCompressionMethod(uri, compression_method)); |
235 | } |
236 | |
237 | void registerStorageHDFS(StorageFactory & factory) |
238 | { |
239 | factory.registerStorage("HDFS" , [](const StorageFactory::Arguments & args) |
240 | { |
241 | ASTs & engine_args = args.engine_args; |
242 | |
243 | if (engine_args.size() != 2 && engine_args.size() != 3) |
244 | throw Exception( |
245 | "Storage HDFS requires 2 or 3 arguments: url, name of used format and optional compression method." , ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
246 | |
247 | engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context); |
248 | |
249 | String url = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(); |
250 | |
251 | engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.local_context); |
252 | |
253 | String format_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>(); |
254 | |
255 | String compression_method; |
256 | if (engine_args.size() == 3) |
257 | { |
258 | engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context); |
259 | compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>(); |
260 | } else compression_method = "auto" ; |
261 | |
262 | return StorageHDFS::create(url, args.database_name, args.table_name, format_name, args.columns, args.constraints, args.context, compression_method); |
263 | }); |
264 | } |
265 | |
266 | } |
267 | |
268 | #endif |
269 | |