1#include <Common/config.h>
2
3#if USE_HDFS
4
5#include <Storages/StorageFactory.h>
6#include <Storages/StorageHDFS.h>
7#include <Interpreters/Context.h>
8#include <Interpreters/evaluateConstantExpression.h>
9#include <Parsers/ASTLiteral.h>
10#include <IO/ReadHelpers.h>
11#include <IO/ReadBufferFromHDFS.h>
12#include <IO/WriteBufferFromHDFS.h>
13#include <IO/WriteHelpers.h>
14#include <IO/HDFSCommon.h>
15#include <Formats/FormatFactory.h>
16#include <DataStreams/IBlockOutputStream.h>
17#include <DataStreams/UnionBlockInputStream.h>
18#include <DataStreams/OwningBlockInputStream.h>
19#include <DataStreams/IBlockInputStream.h>
20#include <DataStreams/narrowBlockInputStreams.h>
21
22#include <Common/parseGlobs.h>
23#include <Poco/URI.h>
24#include <re2/re2.h>
25#include <re2/stringpiece.h>
26#include <hdfs/hdfs.h>
27
28namespace DB
29{
30namespace ErrorCodes
31{
32 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
33 extern const int NOT_IMPLEMENTED;
34 extern const int BAD_ARGUMENTS;
35}
36
37StorageHDFS::StorageHDFS(const String & uri_,
38 const std::string & database_name_,
39 const std::string & table_name_,
40 const String & format_name_,
41 const ColumnsDescription & columns_,
42 const ConstraintsDescription & constraints_,
43 Context & context_,
44 const String & compression_method_ = "")
45 : uri(uri_)
46 , format_name(format_name_)
47 , table_name(table_name_)
48 , database_name(database_name_)
49 , context(context_)
50 , compression_method(compression_method_)
51{
52 context.getRemoteHostFilter().checkURL(Poco::URI(uri));
53 setColumns(columns_);
54 setConstraints(constraints_);
55}
56
57namespace
58{
59
60class HDFSBlockInputStream : public IBlockInputStream
61{
62public:
63 HDFSBlockInputStream(const String & uri,
64 const String & format,
65 const Block & sample_block,
66 const Context & context,
67 UInt64 max_block_size,
68 const CompressionMethod compression_method)
69 {
70 auto read_buf = getReadBuffer<ReadBufferFromHDFS>(compression_method, uri);
71
72 auto input_stream = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
73 reader = std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(read_buf));
74 }
75
76 String getName() const override
77 {
78 return "HDFS";
79 }
80
81 Block readImpl() override
82 {
83 return reader->read();
84 }
85
86 Block getHeader() const override
87 {
88 return reader->getHeader();
89 }
90
91 void readPrefixImpl() override
92 {
93 reader->readPrefix();
94 }
95
96 void readSuffixImpl() override
97 {
98 reader->readSuffix();
99 }
100
101private:
102 BlockInputStreamPtr reader;
103};
104
105class HDFSBlockOutputStream : public IBlockOutputStream
106{
107public:
108 HDFSBlockOutputStream(const String & uri,
109 const String & format,
110 const Block & sample_block_,
111 const Context & context,
112 const CompressionMethod compression_method)
113 : sample_block(sample_block_)
114 {
115 write_buf = getWriteBuffer<WriteBufferFromHDFS>(compression_method, uri);
116 writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
117 }
118
119 Block getHeader() const override
120 {
121 return sample_block;
122 }
123
124 void write(const Block & block) override
125 {
126 writer->write(block);
127 }
128
129 void writePrefix() override
130 {
131 writer->writePrefix();
132 }
133
134 void writeSuffix() override
135 {
136 writer->writeSuffix();
137 writer->flush();
138 write_buf->sync();
139 }
140
141private:
142 Block sample_block;
143 std::unique_ptr<WriteBuffer> write_buf;
144 BlockOutputStreamPtr writer;
145};
146
147/* Recursive directory listing with matched paths as a result.
148 * Have the same method in StorageFile.
149 */
150Strings LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, const String & for_match)
151{
152 const size_t first_glob = for_match.find_first_of("*?{");
153
154 const size_t end_of_path_without_globs = for_match.substr(0, first_glob).rfind('/');
155 const String suffix_with_globs = for_match.substr(end_of_path_without_globs); /// begin with '/'
156 const String prefix_without_globs = path_for_ls + for_match.substr(1, end_of_path_without_globs); /// ends with '/'
157
158 const size_t next_slash = suffix_with_globs.find('/', 1);
159 re2::RE2 matcher(makeRegexpPatternFromGlobs(suffix_with_globs.substr(0, next_slash)));
160
161 HDFSFileInfo ls;
162 ls.file_info = hdfsListDirectory(fs.get(), prefix_without_globs.data(), &ls.length);
163 Strings result;
164 for (int i = 0; i < ls.length; ++i)
165 {
166 const String full_path = String(ls.file_info[i].mName);
167 const size_t last_slash = full_path.rfind('/');
168 const String file_name = full_path.substr(last_slash);
169 const bool looking_for_directory = next_slash != std::string::npos;
170 const bool is_directory = ls.file_info[i].mKind == 'D';
171 /// Condition with type of current file_info means what kind of path is it in current iteration of ls
172 if (!is_directory && !looking_for_directory)
173 {
174 if (re2::RE2::FullMatch(file_name, matcher))
175 {
176 result.push_back(String(ls.file_info[i].mName));
177 }
178 }
179 else if (is_directory && looking_for_directory)
180 {
181 if (re2::RE2::FullMatch(file_name, matcher))
182 {
183 Strings result_part = LSWithRegexpMatching(full_path + "/", fs, suffix_with_globs.substr(next_slash));
184 /// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check.
185 std::move(result_part.begin(), result_part.end(), std::back_inserter(result));
186 }
187 }
188 }
189
190 return result;
191}
192
193}
194
195
196BlockInputStreams StorageHDFS::read(
197 const Names & /*column_names*/,
198 const SelectQueryInfo & /*query_info*/,
199 const Context & context_,
200 QueryProcessingStage::Enum /*processed_stage*/,
201 size_t max_block_size,
202 unsigned num_streams)
203{
204 const size_t begin_of_path = uri.find('/', uri.find("//") + 2);
205 const String path_from_uri = uri.substr(begin_of_path);
206 const String uri_without_path = uri.substr(0, begin_of_path);
207
208 HDFSBuilderPtr builder = createHDFSBuilder(uri_without_path + "/");
209 HDFSFSPtr fs = createHDFSFS(builder.get());
210
211 const Strings res_paths = LSWithRegexpMatching("/", fs, path_from_uri);
212 BlockInputStreams result;
213 for (const auto & res_path : res_paths)
214 {
215 result.push_back(std::make_shared<HDFSBlockInputStream>(uri_without_path + res_path, format_name, getSampleBlock(), context_,
216 max_block_size, IStorage::chooseCompressionMethod(res_path, compression_method)));
217 }
218
219 return narrowBlockInputStreams(result, num_streams);
220}
221
222void StorageHDFS::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
223{
224 table_name = new_table_name;
225 database_name = new_database_name;
226}
227
228BlockOutputStreamPtr StorageHDFS::write(const ASTPtr & /*query*/, const Context & /*context*/)
229{
230 return std::make_shared<HDFSBlockOutputStream>(uri,
231 format_name,
232 getSampleBlock(),
233 context,
234 IStorage::chooseCompressionMethod(uri, compression_method));
235}
236
237void registerStorageHDFS(StorageFactory & factory)
238{
239 factory.registerStorage("HDFS", [](const StorageFactory::Arguments & args)
240 {
241 ASTs & engine_args = args.engine_args;
242
243 if (engine_args.size() != 2 && engine_args.size() != 3)
244 throw Exception(
245 "Storage HDFS requires 2 or 3 arguments: url, name of used format and optional compression method.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
246
247 engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context);
248
249 String url = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
250
251 engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.local_context);
252
253 String format_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
254
255 String compression_method;
256 if (engine_args.size() == 3)
257 {
258 engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
259 compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
260 } else compression_method = "auto";
261
262 return StorageHDFS::create(url, args.database_name, args.table_name, format_name, args.columns, args.constraints, args.context, compression_method);
263 });
264}
265
266}
267
268#endif
269