1 | #include <Common/config.h> |
2 | |
3 | #if USE_AWS_S3 |
4 | |
5 | #include <IO/S3Common.h> |
6 | #include <Storages/StorageS3.h> |
7 | #include <Interpreters/evaluateConstantExpression.h> |
8 | #include <TableFunctions/TableFunctionFactory.h> |
9 | #include <TableFunctions/TableFunctionS3.h> |
10 | #include <TableFunctions/parseColumnsListForTableFunction.h> |
11 | #include <Parsers/ASTLiteral.h> |
12 | #include "registerTableFunctions.h" |
13 | |
14 | namespace DB |
15 | { |
16 | |
17 | namespace ErrorCodes |
18 | { |
19 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
20 | } |
21 | |
22 | StoragePtr TableFunctionS3::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const |
23 | { |
24 | /// Parse args |
25 | ASTs & args_func = ast_function->children; |
26 | |
27 | if (args_func.size() != 1) |
28 | throw Exception("Table function '" + getName() + "' must have arguments." , ErrorCodes::LOGICAL_ERROR); |
29 | |
30 | ASTs & args = args_func.at(0)->children; |
31 | |
32 | if (args.size() < 3 || args.size() > 6) |
33 | throw Exception("Table function '" + getName() + "' requires 3 to 6 arguments: url, [access_key_id, secret_access_key,] format, structure and [compression_method]." , |
34 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
35 | |
36 | for (size_t i = 0; i < args.size(); ++i) |
37 | args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args[i], context); |
38 | |
39 | String filename = args[0]->as<ASTLiteral &>().value.safeGet<String>(); |
40 | String format; |
41 | String structure; |
42 | String access_key_id; |
43 | String secret_access_key; |
44 | |
45 | if (args.size() < 5) |
46 | { |
47 | format = args[1]->as<ASTLiteral &>().value.safeGet<String>(); |
48 | structure = args[2]->as<ASTLiteral &>().value.safeGet<String>(); |
49 | } |
50 | else |
51 | { |
52 | access_key_id = args[1]->as<ASTLiteral &>().value.safeGet<String>(); |
53 | secret_access_key = args[2]->as<ASTLiteral &>().value.safeGet<String>(); |
54 | format = args[3]->as<ASTLiteral &>().value.safeGet<String>(); |
55 | structure = args[4]->as<ASTLiteral &>().value.safeGet<String>(); |
56 | } |
57 | |
58 | String compression_method; |
59 | if (args.size() == 4 || args.size() == 6) |
60 | compression_method = args.back()->as<ASTLiteral &>().value.safeGet<String>(); |
61 | else |
62 | compression_method = "auto" ; |
63 | |
64 | ColumnsDescription columns = parseColumnsListFromString(structure, context); |
65 | |
66 | /// Create table |
67 | StoragePtr storage = getStorage(filename, access_key_id, secret_access_key, format, columns, const_cast<Context &>(context), table_name, compression_method); |
68 | |
69 | storage->startup(); |
70 | |
71 | return storage; |
72 | } |
73 | |
74 | StoragePtr TableFunctionS3::getStorage( |
75 | const String & source, |
76 | const String & access_key_id, |
77 | const String & secret_access_key, |
78 | const String & format, |
79 | const ColumnsDescription & columns, |
80 | Context & global_context, |
81 | const std::string & table_name, |
82 | const String & compression_method) const |
83 | { |
84 | Poco::URI uri (source); |
85 | S3::URI s3_uri (uri); |
86 | |
87 | UInt64 min_upload_part_size = global_context.getSettingsRef().s3_min_upload_part_size; |
88 | return StorageS3::create(s3_uri, access_key_id, secret_access_key, getDatabaseName(), table_name, format, min_upload_part_size, columns, ConstraintsDescription{}, global_context, compression_method); |
89 | } |
90 | |
91 | void registerTableFunctionS3(TableFunctionFactory & factory) |
92 | { |
93 | factory.registerFunction<TableFunctionS3>(); |
94 | } |
95 | |
96 | } |
97 | |
98 | #endif |
99 | |