1#include <Storages/StorageFile.h>
2#include <Storages/StorageFactory.h>
3
4#include <Interpreters/Context.h>
5#include <Interpreters/evaluateConstantExpression.h>
6
7#include <Parsers/ASTLiteral.h>
8#include <Parsers/ASTIdentifier.h>
9
10#include <IO/ReadBufferFromFile.h>
11#include <IO/ReadHelpers.h>
12#include <IO/WriteBufferFromFile.h>
13#include <IO/WriteHelpers.h>
14
15#include <Formats/FormatFactory.h>
16#include <DataStreams/IBlockInputStream.h>
17#include <DataStreams/IBlockOutputStream.h>
18#include <DataStreams/AddingDefaultsBlockInputStream.h>
19#include <DataStreams/narrowBlockInputStreams.h>
20
21#include <Common/escapeForFileName.h>
22#include <Common/typeid_cast.h>
23#include <Common/parseGlobs.h>
24
25#include <fcntl.h>
26
27#include <Poco/Path.h>
28#include <Poco/File.h>
29
30#include <re2/re2.h>
31#include <filesystem>
32
33namespace fs = std::filesystem;
34
35namespace DB
36{
37
38namespace ErrorCodes
39{
40 extern const int CANNOT_WRITE_TO_FILE_DESCRIPTOR;
41 extern const int CANNOT_SEEK_THROUGH_FILE;
42 extern const int DATABASE_ACCESS_DENIED;
43 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
44 extern const int UNKNOWN_IDENTIFIER;
45 extern const int INCORRECT_FILE_NAME;
46 extern const int FILE_DOESNT_EXIST;
47 extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
48}
49
50namespace
51{
52
53/* Recursive directory listing with matched paths as a result.
54 * Have the same method in StorageHDFS.
55 */
56static std::vector<std::string> listFilesWithRegexpMatching(const std::string & path_for_ls, const std::string & for_match)
57{
58 const size_t first_glob = for_match.find_first_of("*?{");
59
60 const size_t end_of_path_without_globs = for_match.substr(0, first_glob).rfind('/');
61 const std::string suffix_with_globs = for_match.substr(end_of_path_without_globs); /// begin with '/'
62
63 const size_t next_slash = suffix_with_globs.find('/', 1);
64 re2::RE2 matcher(makeRegexpPatternFromGlobs(suffix_with_globs.substr(0, next_slash)));
65
66 std::vector<std::string> result;
67 const std::string prefix_without_globs = path_for_ls + for_match.substr(1, end_of_path_without_globs);
68 if (!fs::exists(fs::path(prefix_without_globs.data())))
69 {
70 return result;
71 }
72 const fs::directory_iterator end;
73 for (fs::directory_iterator it(prefix_without_globs); it != end; ++it)
74 {
75 const std::string full_path = it->path().string();
76 const size_t last_slash = full_path.rfind('/');
77 const String file_name = full_path.substr(last_slash);
78 const bool looking_for_directory = next_slash != std::string::npos;
79 /// Condition is_directory means what kind of path is it in current iteration of ls
80 if (!fs::is_directory(it->path()) && !looking_for_directory)
81 {
82 if (re2::RE2::FullMatch(file_name, matcher))
83 {
84 result.push_back(it->path().string());
85 }
86 }
87 else if (fs::is_directory(it->path()) && looking_for_directory)
88 {
89 if (re2::RE2::FullMatch(file_name, matcher))
90 {
91 /// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check.
92 Strings result_part = listFilesWithRegexpMatching(full_path + "/", suffix_with_globs.substr(next_slash));
93 std::move(result_part.begin(), result_part.end(), std::back_inserter(result));
94 }
95 }
96 }
97 return result;
98}
99
100static std::string getTablePath(const std::string & table_dir_path, const std::string & format_name)
101{
102 return table_dir_path + "/data." + escapeForFileName(format_name);
103}
104
105/// Both db_dir_path and table_path must be converted to absolute paths (in particular, path cannot contain '..').
106static void checkCreationIsAllowed(const Context & context_global, const std::string & db_dir_path, const std::string & table_path)
107{
108 if (context_global.getApplicationType() != Context::ApplicationType::SERVER)
109 return;
110
111 /// "/dev/null" is allowed for perf testing
112 if (!startsWith(table_path, db_dir_path) && table_path != "/dev/null")
113 throw Exception("Part path " + table_path + " is not inside " + db_dir_path, ErrorCodes::DATABASE_ACCESS_DENIED);
114
115 Poco::File table_path_poco_file = Poco::File(table_path);
116 if (table_path_poco_file.exists() && table_path_poco_file.isDirectory())
117 throw Exception("File " + table_path + " must not be a directory", ErrorCodes::INCORRECT_FILE_NAME);
118}
119}
120
121
122StorageFile::StorageFile(int table_fd_, CommonArguments args)
123 : StorageFile(args)
124{
125 if (args.context.getApplicationType() == Context::ApplicationType::SERVER)
126 throw Exception("Using file descriptor as source of storage isn't allowed for server daemons", ErrorCodes::DATABASE_ACCESS_DENIED);
127
128 is_db_table = false;
129 use_table_fd = true;
130 table_fd = table_fd_;
131
132 /// Save initial offset, it will be used for repeating SELECTs
133 /// If FD isn't seekable (lseek returns -1), then the second and subsequent SELECTs will fail.
134 table_fd_init_offset = lseek(table_fd, 0, SEEK_CUR);
135}
136
137StorageFile::StorageFile(const std::string & table_path_, const std::string & user_files_path, CommonArguments args)
138 : StorageFile(args)
139{
140 is_db_table = false;
141 std::string user_files_absolute_path = Poco::Path(user_files_path).makeAbsolute().makeDirectory().toString();
142 Poco::Path poco_path = Poco::Path(table_path_);
143 if (poco_path.isRelative())
144 poco_path = Poco::Path(user_files_absolute_path, poco_path);
145
146 const std::string path = poco_path.absolute().toString();
147 if (path.find_first_of("*?{") == std::string::npos)
148 {
149 paths.push_back(path);
150 }
151 else
152 paths = listFilesWithRegexpMatching("/", path);
153 for (const auto & cur_path : paths)
154 checkCreationIsAllowed(args.context, user_files_absolute_path, cur_path);
155}
156
157StorageFile::StorageFile(const std::string & relative_table_dir_path, CommonArguments args)
158 : StorageFile(args)
159{
160 if (relative_table_dir_path.empty())
161 throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
162
163 String table_dir_path = base_path + relative_table_dir_path + "/";
164 Poco::File(table_dir_path).createDirectories();
165 paths = {getTablePath(table_dir_path, format_name)};
166}
167
168StorageFile::StorageFile(CommonArguments args)
169 : table_name(args.table_name), database_name(args.database_name), format_name(args.format_name)
170 , compression_method(args.compression_method), base_path(args.context.getPath())
171{
172 setColumns(args.columns);
173 setConstraints(args.constraints);
174}
175
176class StorageFileBlockInputStream : public IBlockInputStream
177{
178public:
179 StorageFileBlockInputStream(std::shared_ptr<StorageFile> storage_,
180 const Context & context, UInt64 max_block_size,
181 std::string file_path,
182 const CompressionMethod compression_method)
183 : storage(std::move(storage_))
184 {
185 if (storage->use_table_fd)
186 {
187 unique_lock = std::unique_lock(storage->rwlock);
188
189 /// We could use common ReadBuffer and WriteBuffer in storage to leverage cache
190 /// and add ability to seek unseekable files, but cache sync isn't supported.
191
192 if (storage->table_fd_was_used) /// We need seek to initial position
193 {
194 if (storage->table_fd_init_offset < 0)
195 throw Exception("File descriptor isn't seekable, inside " + storage->getName(), ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
196
197 /// ReadBuffer's seek() doesn't make sense, since cache is empty
198 if (lseek(storage->table_fd, storage->table_fd_init_offset, SEEK_SET) < 0)
199 throwFromErrno("Cannot seek file descriptor, inside " + storage->getName(), ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
200 }
201
202 storage->table_fd_was_used = true;
203 read_buf = getReadBuffer<ReadBufferFromFileDescriptor>(compression_method, storage->table_fd);
204 }
205 else
206 {
207 shared_lock = std::shared_lock(storage->rwlock);
208 read_buf = getReadBuffer<ReadBufferFromFile>(compression_method, file_path);
209 }
210
211 reader = FormatFactory::instance().getInput(storage->format_name, *read_buf, storage->getSampleBlock(), context, max_block_size);
212 }
213
214 String getName() const override
215 {
216 return storage->getName();
217 }
218
219 Block readImpl() override
220 {
221 return reader->read();
222 }
223
224 Block getHeader() const override { return reader->getHeader(); }
225
226 void readPrefixImpl() override
227 {
228 reader->readPrefix();
229 }
230
231 void readSuffixImpl() override
232 {
233 reader->readSuffix();
234 }
235
236private:
237 std::shared_ptr<StorageFile> storage;
238 Block sample_block;
239 std::unique_ptr<ReadBuffer> read_buf;
240 BlockInputStreamPtr reader;
241
242 std::shared_lock<std::shared_mutex> shared_lock;
243 std::unique_lock<std::shared_mutex> unique_lock;
244};
245
246
247BlockInputStreams StorageFile::read(
248 const Names & /*column_names*/,
249 const SelectQueryInfo & /*query_info*/,
250 const Context & context,
251 QueryProcessingStage::Enum /*processed_stage*/,
252 size_t max_block_size,
253 unsigned num_streams)
254{
255 const ColumnsDescription & columns_ = getColumns();
256 auto column_defaults = columns_.getDefaults();
257 BlockInputStreams blocks_input;
258 if (use_table_fd) /// need to call ctr BlockInputStream
259 paths = {""}; /// when use fd, paths are empty
260 else
261 {
262 if (paths.size() == 1 && !Poco::File(paths[0]).exists())
263 throw Exception("File " + paths[0] + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
264 }
265 blocks_input.reserve(paths.size());
266 for (const auto & file_path : paths)
267 {
268 BlockInputStreamPtr cur_block = std::make_shared<StorageFileBlockInputStream>(
269 std::static_pointer_cast<StorageFile>(shared_from_this()), context, max_block_size, file_path, IStorage::chooseCompressionMethod(file_path, compression_method));
270 blocks_input.push_back(column_defaults.empty() ? cur_block : std::make_shared<AddingDefaultsBlockInputStream>(cur_block, column_defaults, context));
271 }
272 return narrowBlockInputStreams(blocks_input, num_streams);
273}
274
275
276class StorageFileBlockOutputStream : public IBlockOutputStream
277{
278public:
279 explicit StorageFileBlockOutputStream(StorageFile & storage_,
280 const CompressionMethod compression_method,
281 const Context & context)
282 : storage(storage_), lock(storage.rwlock)
283 {
284 if (storage.use_table_fd)
285 {
286 /** NOTE: Using real file binded to FD may be misleading:
287 * SELECT *; INSERT insert_data; SELECT *; last SELECT returns initil_fd_data + insert_data
288 * INSERT data; SELECT *; last SELECT returns only insert_data
289 */
290 storage.table_fd_was_used = true;
291 write_buf = getWriteBuffer<WriteBufferFromFileDescriptor>(compression_method, storage.table_fd);
292 }
293 else
294 {
295 if (storage.paths.size() != 1)
296 throw Exception("Table '" + storage.table_name + "' is in readonly mode because of globs in filepath", ErrorCodes::DATABASE_ACCESS_DENIED);
297 write_buf = getWriteBuffer<WriteBufferFromFile>(compression_method, storage.paths[0], DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT);
298 }
299
300 writer = FormatFactory::instance().getOutput(storage.format_name, *write_buf, storage.getSampleBlock(), context);
301 }
302
303 Block getHeader() const override { return storage.getSampleBlock(); }
304
305 void write(const Block & block) override
306 {
307 writer->write(block);
308 }
309
310 void writePrefix() override
311 {
312 writer->writePrefix();
313 }
314
315 void writeSuffix() override
316 {
317 writer->writeSuffix();
318 }
319
320 void flush() override
321 {
322 writer->flush();
323 }
324
325private:
326 StorageFile & storage;
327 std::unique_lock<std::shared_mutex> lock;
328 std::unique_ptr<WriteBuffer> write_buf;
329 BlockOutputStreamPtr writer;
330};
331
332BlockOutputStreamPtr StorageFile::write(
333 const ASTPtr & /*query*/,
334 const Context & context)
335{
336 return std::make_shared<StorageFileBlockOutputStream>(*this,
337 IStorage::chooseCompressionMethod(paths[0], compression_method), context);
338}
339
340Strings StorageFile::getDataPaths() const
341{
342 if (paths.empty())
343 throw Exception("Table '" + table_name + "' is in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
344 return paths;
345}
346
347void StorageFile::rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
348{
349 if (!is_db_table)
350 throw Exception("Can't rename table '" + table_name + "' binded to user-defined file (or FD)", ErrorCodes::DATABASE_ACCESS_DENIED);
351
352 if (paths.size() != 1)
353 throw Exception("Can't rename table '" + table_name + "' in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
354
355 std::unique_lock<std::shared_mutex> lock(rwlock);
356
357 std::string path_new = getTablePath(base_path + new_path_to_table_data, format_name);
358 Poco::File(Poco::Path(path_new).parent()).createDirectories();
359 Poco::File(paths[0]).renameTo(path_new);
360
361 paths[0] = std::move(path_new);
362 table_name = new_table_name;
363 database_name = new_database_name;
364}
365
366
367void registerStorageFile(StorageFactory & factory)
368{
369 factory.registerStorage("File", [](const StorageFactory::Arguments & args)
370 {
371 ASTs & engine_args = args.engine_args;
372
373 if (!(engine_args.size() >= 1 && engine_args.size() <= 3))
374 throw Exception(
375 "Storage File requires from 1 to 3 arguments: name of used format, source and compression_method.",
376 ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
377
378 engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context);
379 String format_name = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
380
381 String compression_method;
382 StorageFile::CommonArguments common_args{args.database_name, args.table_name, format_name, compression_method,
383 args.columns, args.constraints, args.context};
384
385 if (engine_args.size() == 1) /// Table in database
386 return StorageFile::create(args.relative_data_path, common_args);
387
388 /// Will use FD if engine_args[1] is int literal or identifier with std* name
389 int source_fd = -1;
390 String source_path;
391
392 if (auto opt_name = tryGetIdentifierName(engine_args[1]))
393 {
394 if (*opt_name == "stdin")
395 source_fd = STDIN_FILENO;
396 else if (*opt_name == "stdout")
397 source_fd = STDOUT_FILENO;
398 else if (*opt_name == "stderr")
399 source_fd = STDERR_FILENO;
400 else
401 throw Exception("Unknown identifier '" + *opt_name + "' in second arg of File storage constructor",
402 ErrorCodes::UNKNOWN_IDENTIFIER);
403 }
404 else if (const auto * literal = engine_args[1]->as<ASTLiteral>())
405 {
406 auto type = literal->value.getType();
407 if (type == Field::Types::Int64)
408 source_fd = static_cast<int>(literal->value.get<Int64>());
409 else if (type == Field::Types::UInt64)
410 source_fd = static_cast<int>(literal->value.get<UInt64>());
411 else if (type == Field::Types::String)
412 source_path = literal->value.get<String>();
413 else
414 throw Exception("Second argument must be path or file descriptor", ErrorCodes::BAD_ARGUMENTS);
415 }
416
417 if (engine_args.size() == 3)
418 {
419 engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
420 compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
421 }
422 else
423 compression_method = "auto";
424
425 if (0 <= source_fd) /// File descriptor
426 return StorageFile::create(source_fd, common_args);
427 else /// User's file
428 return StorageFile::create(source_path, args.context.getUserFilesPath(), common_args);
429 });
430}
431
432}
433