1#include <Common/config.h>
2
3#if USE_AWS_S3
4
5#include <IO/S3Common.h>
6#include <Storages/StorageS3.h>
7#include <Interpreters/evaluateConstantExpression.h>
8#include <TableFunctions/TableFunctionFactory.h>
9#include <TableFunctions/TableFunctionS3.h>
10#include <TableFunctions/parseColumnsListForTableFunction.h>
11#include <Parsers/ASTLiteral.h>
12#include "registerTableFunctions.h"
13
14namespace DB
15{
16
17namespace ErrorCodes
18{
19 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
20}
21
22StoragePtr TableFunctionS3::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
23{
24 /// Parse args
25 ASTs & args_func = ast_function->children;
26
27 if (args_func.size() != 1)
28 throw Exception("Table function '" + getName() + "' must have arguments.", ErrorCodes::LOGICAL_ERROR);
29
30 ASTs & args = args_func.at(0)->children;
31
32 if (args.size() < 3 || args.size() > 6)
33 throw Exception("Table function '" + getName() + "' requires 3 to 6 arguments: url, [access_key_id, secret_access_key,] format, structure and [compression_method].",
34 ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
35
36 for (size_t i = 0; i < args.size(); ++i)
37 args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args[i], context);
38
39 String filename = args[0]->as<ASTLiteral &>().value.safeGet<String>();
40 String format;
41 String structure;
42 String access_key_id;
43 String secret_access_key;
44
45 if (args.size() < 5)
46 {
47 format = args[1]->as<ASTLiteral &>().value.safeGet<String>();
48 structure = args[2]->as<ASTLiteral &>().value.safeGet<String>();
49 }
50 else
51 {
52 access_key_id = args[1]->as<ASTLiteral &>().value.safeGet<String>();
53 secret_access_key = args[2]->as<ASTLiteral &>().value.safeGet<String>();
54 format = args[3]->as<ASTLiteral &>().value.safeGet<String>();
55 structure = args[4]->as<ASTLiteral &>().value.safeGet<String>();
56 }
57
58 String compression_method;
59 if (args.size() == 4 || args.size() == 6)
60 compression_method = args.back()->as<ASTLiteral &>().value.safeGet<String>();
61 else
62 compression_method = "auto";
63
64 ColumnsDescription columns = parseColumnsListFromString(structure, context);
65
66 /// Create table
67 StoragePtr storage = getStorage(filename, access_key_id, secret_access_key, format, columns, const_cast<Context &>(context), table_name, compression_method);
68
69 storage->startup();
70
71 return storage;
72}
73
74StoragePtr TableFunctionS3::getStorage(
75 const String & source,
76 const String & access_key_id,
77 const String & secret_access_key,
78 const String & format,
79 const ColumnsDescription & columns,
80 Context & global_context,
81 const std::string & table_name,
82 const String & compression_method) const
83{
84 Poco::URI uri (source);
85 S3::URI s3_uri (uri);
86
87 UInt64 min_upload_part_size = global_context.getSettingsRef().s3_min_upload_part_size;
88 return StorageS3::create(s3_uri, access_key_id, secret_access_key, getDatabaseName(), table_name, format, min_upload_part_size, columns, ConstraintsDescription{}, global_context, compression_method);
89}
90
91void registerTableFunctionS3(TableFunctionFactory & factory)
92{
93 factory.registerFunction<TableFunctionS3>();
94}
95
96}
97
98#endif
99