1 | #include <Common/config.h> |
2 | |
3 | #if USE_AWS_S3 |
4 | |
5 | #include <IO/WriteBufferFromS3.h> |
6 | #include <IO/WriteHelpers.h> |
7 | |
8 | #include <common/logger_useful.h> |
9 | #include <aws/s3/S3Client.h> |
10 | #include <aws/s3/model/CreateMultipartUploadRequest.h> |
11 | #include <aws/s3/model/UploadPartRequest.h> |
12 | #include <aws/s3/model/CompleteMultipartUploadRequest.h> |
13 | |
14 | #include <utility> |
15 | |
16 | |
17 | namespace DB |
18 | { |
19 | |
20 | // S3 protocol does not allow to have multipart upload with more than 10000 parts. |
21 | // In case server does not return an error on exceeding that number, we print a warning |
22 | // because custom S3 implementation may allow relaxed requirements on that. |
23 | const int S3_WARN_MAX_PARTS = 10000; |
24 | |
25 | |
26 | namespace ErrorCodes |
27 | { |
28 | extern const int S3_ERROR; |
29 | } |
30 | |
31 | |
32 | WriteBufferFromS3::WriteBufferFromS3( |
33 | std::shared_ptr<Aws::S3::S3Client> client_ptr_, |
34 | const String & bucket_, |
35 | const String & key_, |
36 | size_t minimum_upload_part_size_, |
37 | size_t buffer_size_ |
38 | ) |
39 | : BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0) |
40 | , bucket(bucket_) |
41 | , key(key_) |
42 | , client_ptr(std::move(client_ptr_)) |
43 | , minimum_upload_part_size {minimum_upload_part_size_} |
44 | , temporary_buffer {std::make_unique<WriteBufferFromString>(buffer_string)} |
45 | , last_part_size {0} |
46 | { |
47 | initiate(); |
48 | } |
49 | |
50 | |
51 | void WriteBufferFromS3::nextImpl() |
52 | { |
53 | if (!offset()) |
54 | return; |
55 | |
56 | temporary_buffer->write(working_buffer.begin(), offset()); |
57 | |
58 | last_part_size += offset(); |
59 | |
60 | if (last_part_size > minimum_upload_part_size) |
61 | { |
62 | temporary_buffer->finish(); |
63 | writePart(buffer_string); |
64 | last_part_size = 0; |
65 | temporary_buffer = std::make_unique<WriteBufferFromString>(buffer_string); |
66 | } |
67 | } |
68 | |
69 | |
70 | void WriteBufferFromS3::finalize() |
71 | { |
72 | temporary_buffer->finish(); |
73 | if (!buffer_string.empty()) |
74 | { |
75 | writePart(buffer_string); |
76 | } |
77 | |
78 | complete(); |
79 | } |
80 | |
81 | |
82 | WriteBufferFromS3::~WriteBufferFromS3() |
83 | { |
84 | try |
85 | { |
86 | next(); |
87 | } |
88 | catch (...) |
89 | { |
90 | tryLogCurrentException(__PRETTY_FUNCTION__); |
91 | } |
92 | } |
93 | |
94 | |
95 | void WriteBufferFromS3::initiate() |
96 | { |
97 | Aws::S3::Model::CreateMultipartUploadRequest req; |
98 | req.SetBucket(bucket); |
99 | req.SetKey(key); |
100 | |
101 | auto outcome = client_ptr->CreateMultipartUpload(req); |
102 | |
103 | if (outcome.IsSuccess()) |
104 | { |
105 | upload_id = outcome.GetResult().GetUploadId(); |
106 | LOG_DEBUG(log, "Multipart upload initiated. Upload id = " + upload_id); |
107 | } |
108 | else |
109 | throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); |
110 | } |
111 | |
112 | |
113 | void WriteBufferFromS3::writePart(const String & data) |
114 | { |
115 | if (part_tags.size() == S3_WARN_MAX_PARTS) |
116 | { |
117 | // Don't throw exception here by ourselves but leave the decision to take by S3 server. |
118 | LOG_WARNING(log, "Maximum part number in S3 protocol has reached (too many parts). Server may not accept this whole upload." ); |
119 | } |
120 | |
121 | Aws::S3::Model::UploadPartRequest req; |
122 | |
123 | req.SetBucket(bucket); |
124 | req.SetKey(key); |
125 | req.SetPartNumber(part_tags.size() + 1); |
126 | req.SetUploadId(upload_id); |
127 | req.SetContentLength(data.size()); |
128 | req.SetBody(std::make_shared<Aws::StringStream>(data)); |
129 | |
130 | auto outcome = client_ptr->UploadPart(req); |
131 | |
132 | if (outcome.IsSuccess()) |
133 | { |
134 | auto etag = outcome.GetResult().GetETag(); |
135 | part_tags.push_back(etag); |
136 | LOG_DEBUG(log, "Write part " + std::to_string(part_tags.size()) + " finished. Upload id = " + upload_id + ". Etag = " + etag); |
137 | } |
138 | else |
139 | throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); |
140 | } |
141 | |
142 | |
143 | void WriteBufferFromS3::complete() |
144 | { |
145 | Aws::S3::Model::CompleteMultipartUploadRequest req; |
146 | req.SetBucket(bucket); |
147 | req.SetKey(key); |
148 | req.SetUploadId(upload_id); |
149 | |
150 | Aws::S3::Model::CompletedMultipartUpload multipart_upload; |
151 | for (size_t i = 0; i < part_tags.size(); ++i) |
152 | { |
153 | Aws::S3::Model::CompletedPart part; |
154 | multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(i + 1)); |
155 | } |
156 | |
157 | req.SetMultipartUpload(multipart_upload); |
158 | |
159 | auto outcome = client_ptr->CompleteMultipartUpload(req); |
160 | |
161 | if (outcome.IsSuccess()) |
162 | LOG_DEBUG(log, "Multipart upload completed. Upload_id = " + upload_id); |
163 | else |
164 | throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); |
165 | } |
166 | |
167 | } |
168 | |
169 | #endif |
170 | |