1#include <Common/config.h>
2
3#if USE_AWS_S3
4
5#include <IO/WriteBufferFromS3.h>
6#include <IO/WriteHelpers.h>
7
8#include <common/logger_useful.h>
9#include <aws/s3/S3Client.h>
10#include <aws/s3/model/CreateMultipartUploadRequest.h>
11#include <aws/s3/model/UploadPartRequest.h>
12#include <aws/s3/model/CompleteMultipartUploadRequest.h>
13
14#include <utility>
15
16
17namespace DB
18{
19
20// S3 protocol does not allow to have multipart upload with more than 10000 parts.
21// In case server does not return an error on exceeding that number, we print a warning
22// because custom S3 implementation may allow relaxed requirements on that.
23const int S3_WARN_MAX_PARTS = 10000;
24
25
26namespace ErrorCodes
27{
28 extern const int S3_ERROR;
29}
30
31
32WriteBufferFromS3::WriteBufferFromS3(
33 std::shared_ptr<Aws::S3::S3Client> client_ptr_,
34 const String & bucket_,
35 const String & key_,
36 size_t minimum_upload_part_size_,
37 size_t buffer_size_
38)
39 : BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0)
40 , bucket(bucket_)
41 , key(key_)
42 , client_ptr(std::move(client_ptr_))
43 , minimum_upload_part_size {minimum_upload_part_size_}
44 , temporary_buffer {std::make_unique<WriteBufferFromString>(buffer_string)}
45 , last_part_size {0}
46{
47 initiate();
48}
49
50
51void WriteBufferFromS3::nextImpl()
52{
53 if (!offset())
54 return;
55
56 temporary_buffer->write(working_buffer.begin(), offset());
57
58 last_part_size += offset();
59
60 if (last_part_size > minimum_upload_part_size)
61 {
62 temporary_buffer->finish();
63 writePart(buffer_string);
64 last_part_size = 0;
65 temporary_buffer = std::make_unique<WriteBufferFromString>(buffer_string);
66 }
67}
68
69
70void WriteBufferFromS3::finalize()
71{
72 temporary_buffer->finish();
73 if (!buffer_string.empty())
74 {
75 writePart(buffer_string);
76 }
77
78 complete();
79}
80
81
82WriteBufferFromS3::~WriteBufferFromS3()
83{
84 try
85 {
86 next();
87 }
88 catch (...)
89 {
90 tryLogCurrentException(__PRETTY_FUNCTION__);
91 }
92}
93
94
95void WriteBufferFromS3::initiate()
96{
97 Aws::S3::Model::CreateMultipartUploadRequest req;
98 req.SetBucket(bucket);
99 req.SetKey(key);
100
101 auto outcome = client_ptr->CreateMultipartUpload(req);
102
103 if (outcome.IsSuccess())
104 {
105 upload_id = outcome.GetResult().GetUploadId();
106 LOG_DEBUG(log, "Multipart upload initiated. Upload id = " + upload_id);
107 }
108 else
109 throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
110}
111
112
113void WriteBufferFromS3::writePart(const String & data)
114{
115 if (part_tags.size() == S3_WARN_MAX_PARTS)
116 {
117 // Don't throw exception here by ourselves but leave the decision to take by S3 server.
118 LOG_WARNING(log, "Maximum part number in S3 protocol has reached (too many parts). Server may not accept this whole upload.");
119 }
120
121 Aws::S3::Model::UploadPartRequest req;
122
123 req.SetBucket(bucket);
124 req.SetKey(key);
125 req.SetPartNumber(part_tags.size() + 1);
126 req.SetUploadId(upload_id);
127 req.SetContentLength(data.size());
128 req.SetBody(std::make_shared<Aws::StringStream>(data));
129
130 auto outcome = client_ptr->UploadPart(req);
131
132 if (outcome.IsSuccess())
133 {
134 auto etag = outcome.GetResult().GetETag();
135 part_tags.push_back(etag);
136 LOG_DEBUG(log, "Write part " + std::to_string(part_tags.size()) + " finished. Upload id = " + upload_id + ". Etag = " + etag);
137 }
138 else
139 throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
140}
141
142
143void WriteBufferFromS3::complete()
144{
145 Aws::S3::Model::CompleteMultipartUploadRequest req;
146 req.SetBucket(bucket);
147 req.SetKey(key);
148 req.SetUploadId(upload_id);
149
150 Aws::S3::Model::CompletedMultipartUpload multipart_upload;
151 for (size_t i = 0; i < part_tags.size(); ++i)
152 {
153 Aws::S3::Model::CompletedPart part;
154 multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(i + 1));
155 }
156
157 req.SetMultipartUpload(multipart_upload);
158
159 auto outcome = client_ptr->CompleteMultipartUpload(req);
160
161 if (outcome.IsSuccess())
162 LOG_DEBUG(log, "Multipart upload completed. Upload_id = " + upload_id);
163 else
164 throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
165}
166
167}
168
169#endif
170