1#include <Common/config.h>
2
3#if USE_AWS_S3
4
5#include <IO/S3Common.h>
6#include <IO/WriteBufferFromString.h>
7
8#include <regex>
9#include <aws/s3/S3Client.h>
10#include <aws/core/auth/AWSCredentialsProvider.h>
11#include <aws/core/utils/logging/LogSystemInterface.h>
12#include <aws/core/utils/logging/LogMacros.h>
13#include <common/logger_useful.h>
14
15
16namespace
17{
18const std::pair<LogsLevel, Message::Priority> & convertLogLevel(Aws::Utils::Logging::LogLevel log_level)
19{
20 static const std::unordered_map<Aws::Utils::Logging::LogLevel, std::pair<LogsLevel, Message::Priority>> mapping = {
21 {Aws::Utils::Logging::LogLevel::Off, {LogsLevel::none, Message::PRIO_FATAL}},
22 {Aws::Utils::Logging::LogLevel::Fatal, {LogsLevel::error, Message::PRIO_FATAL}},
23 {Aws::Utils::Logging::LogLevel::Error, {LogsLevel::error, Message::PRIO_ERROR}},
24 {Aws::Utils::Logging::LogLevel::Warn, {LogsLevel::warning, Message::PRIO_WARNING}},
25 {Aws::Utils::Logging::LogLevel::Info, {LogsLevel::information, Message::PRIO_INFORMATION}},
26 {Aws::Utils::Logging::LogLevel::Debug, {LogsLevel::debug, Message::PRIO_DEBUG}},
27 {Aws::Utils::Logging::LogLevel::Trace, {LogsLevel::trace, Message::PRIO_TRACE}},
28 };
29 return mapping.at(log_level);
30}
31
32class AWSLogger : public Aws::Utils::Logging::LogSystemInterface
33{
34public:
35 ~AWSLogger() final = default;
36
37 Aws::Utils::Logging::LogLevel GetLogLevel() const final { return Aws::Utils::Logging::LogLevel::Trace; }
38
39 void Log(Aws::Utils::Logging::LogLevel log_level, const char * tag, const char * format_str, ...) final
40 {
41 auto & [level, prio] = convertLogLevel(log_level);
42 LOG_SIMPLE(log, std::string(tag) + ": " + format_str, level, prio);
43 }
44
45 void LogStream(Aws::Utils::Logging::LogLevel log_level, const char * tag, const Aws::OStringStream & message_stream) final
46 {
47 auto & [level, prio] = convertLogLevel(log_level);
48 LOG_SIMPLE(log, std::string(tag) + ": " + message_stream.str(), level, prio);
49 }
50
51 void Flush() final {}
52
53private:
54 Poco::Logger * log = &Poco::Logger::get("AWSClient");
55};
56}
57
58namespace DB
59{
60
61namespace ErrorCodes
62{
63 extern const int BAD_ARGUMENTS;
64}
65
66namespace S3
67{
68 ClientFactory::ClientFactory()
69 {
70 aws_options = Aws::SDKOptions {};
71 Aws::InitAPI(aws_options);
72 Aws::Utils::Logging::InitializeAWSLogging(std::make_shared<AWSLogger>());
73 }
74
75 ClientFactory::~ClientFactory()
76 {
77 Aws::Utils::Logging::ShutdownAWSLogging();
78 Aws::ShutdownAPI(aws_options);
79 }
80
81 ClientFactory & ClientFactory::instance()
82 {
83 static ClientFactory ret;
84 return ret;
85 }
86
87 std::shared_ptr<Aws::S3::S3Client> ClientFactory::create(
88 const String & endpoint,
89 const String & access_key_id,
90 const String & secret_access_key)
91 {
92 Aws::Client::ClientConfiguration cfg;
93 if (!endpoint.empty())
94 cfg.endpointOverride = endpoint;
95
96 auto cred_provider = std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(access_key_id,
97 secret_access_key);
98
99 return std::make_shared<Aws::S3::S3Client>(
100 std::move(cred_provider), // Credentials provider.
101 std::move(cfg), // Client configuration.
102 Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, // Sign policy.
103 endpoint.empty() // Use virtual addressing only if endpoint is not specified.
104 );
105 }
106
107
108 URI::URI(Poco::URI & uri_)
109 {
110 static const std::regex BUCKET_KEY_PATTERN("([^/]+)/(.*)");
111
112 uri = uri_;
113
114 // s3://*
115 if (uri.getScheme() == "s3" || uri.getScheme() == "S3")
116 {
117 bucket = uri.getAuthority();
118 if (bucket.empty())
119 throw Exception ("Invalid S3 URI: no bucket: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
120
121 const auto & path = uri.getPath();
122 // s3://bucket or s3://bucket/
123 if (path.length() <= 1)
124 throw Exception ("Invalid S3 URI: no key: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
125
126 key = path.substr(1);
127 return;
128 }
129
130 if (uri.getHost().empty())
131 throw Exception("Invalid S3 URI: no host: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
132
133 endpoint = uri.getScheme() + "://" + uri.getAuthority();
134
135 // Parse bucket and key from path.
136 std::smatch match;
137 std::regex_search(uri.getPath(), match, BUCKET_KEY_PATTERN);
138 if (!match.empty())
139 {
140 bucket = match.str(1);
141 if (bucket.empty())
142 throw Exception ("Invalid S3 URI: no bucket: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
143
144 key = match.str(2);
145 if (key.empty())
146 throw Exception ("Invalid S3 URI: no key: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
147 }
148 else
149 throw Exception("Invalid S3 URI: no bucket or key: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
150 }
151}
152
153}
154
155#endif
156