| 1 | #include <Common/config.h> |
| 2 | |
| 3 | #if USE_AWS_S3 |
| 4 | |
| 5 | #include <IO/WriteBufferFromS3.h> |
| 6 | #include <IO/WriteHelpers.h> |
| 7 | |
| 8 | #include <common/logger_useful.h> |
| 9 | #include <aws/s3/S3Client.h> |
| 10 | #include <aws/s3/model/CreateMultipartUploadRequest.h> |
| 11 | #include <aws/s3/model/UploadPartRequest.h> |
| 12 | #include <aws/s3/model/CompleteMultipartUploadRequest.h> |
| 13 | |
| 14 | #include <utility> |
| 15 | |
| 16 | |
| 17 | namespace DB |
| 18 | { |
| 19 | |
| 20 | // S3 protocol does not allow to have multipart upload with more than 10000 parts. |
| 21 | // In case server does not return an error on exceeding that number, we print a warning |
| 22 | // because custom S3 implementation may allow relaxed requirements on that. |
| 23 | const int S3_WARN_MAX_PARTS = 10000; |
| 24 | |
| 25 | |
| 26 | namespace ErrorCodes |
| 27 | { |
| 28 | extern const int S3_ERROR; |
| 29 | } |
| 30 | |
| 31 | |
| 32 | WriteBufferFromS3::WriteBufferFromS3( |
| 33 | std::shared_ptr<Aws::S3::S3Client> client_ptr_, |
| 34 | const String & bucket_, |
| 35 | const String & key_, |
| 36 | size_t minimum_upload_part_size_, |
| 37 | size_t buffer_size_ |
| 38 | ) |
| 39 | : BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0) |
| 40 | , bucket(bucket_) |
| 41 | , key(key_) |
| 42 | , client_ptr(std::move(client_ptr_)) |
| 43 | , minimum_upload_part_size {minimum_upload_part_size_} |
| 44 | , temporary_buffer {std::make_unique<WriteBufferFromString>(buffer_string)} |
| 45 | , last_part_size {0} |
| 46 | { |
| 47 | initiate(); |
| 48 | } |
| 49 | |
| 50 | |
| 51 | void WriteBufferFromS3::nextImpl() |
| 52 | { |
| 53 | if (!offset()) |
| 54 | return; |
| 55 | |
| 56 | temporary_buffer->write(working_buffer.begin(), offset()); |
| 57 | |
| 58 | last_part_size += offset(); |
| 59 | |
| 60 | if (last_part_size > minimum_upload_part_size) |
| 61 | { |
| 62 | temporary_buffer->finish(); |
| 63 | writePart(buffer_string); |
| 64 | last_part_size = 0; |
| 65 | temporary_buffer = std::make_unique<WriteBufferFromString>(buffer_string); |
| 66 | } |
| 67 | } |
| 68 | |
| 69 | |
| 70 | void WriteBufferFromS3::finalize() |
| 71 | { |
| 72 | temporary_buffer->finish(); |
| 73 | if (!buffer_string.empty()) |
| 74 | { |
| 75 | writePart(buffer_string); |
| 76 | } |
| 77 | |
| 78 | complete(); |
| 79 | } |
| 80 | |
| 81 | |
| 82 | WriteBufferFromS3::~WriteBufferFromS3() |
| 83 | { |
| 84 | try |
| 85 | { |
| 86 | next(); |
| 87 | } |
| 88 | catch (...) |
| 89 | { |
| 90 | tryLogCurrentException(__PRETTY_FUNCTION__); |
| 91 | } |
| 92 | } |
| 93 | |
| 94 | |
| 95 | void WriteBufferFromS3::initiate() |
| 96 | { |
| 97 | Aws::S3::Model::CreateMultipartUploadRequest req; |
| 98 | req.SetBucket(bucket); |
| 99 | req.SetKey(key); |
| 100 | |
| 101 | auto outcome = client_ptr->CreateMultipartUpload(req); |
| 102 | |
| 103 | if (outcome.IsSuccess()) |
| 104 | { |
| 105 | upload_id = outcome.GetResult().GetUploadId(); |
| 106 | LOG_DEBUG(log, "Multipart upload initiated. Upload id = " + upload_id); |
| 107 | } |
| 108 | else |
| 109 | throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); |
| 110 | } |
| 111 | |
| 112 | |
| 113 | void WriteBufferFromS3::writePart(const String & data) |
| 114 | { |
| 115 | if (part_tags.size() == S3_WARN_MAX_PARTS) |
| 116 | { |
| 117 | // Don't throw exception here by ourselves but leave the decision to take by S3 server. |
| 118 | LOG_WARNING(log, "Maximum part number in S3 protocol has reached (too many parts). Server may not accept this whole upload." ); |
| 119 | } |
| 120 | |
| 121 | Aws::S3::Model::UploadPartRequest req; |
| 122 | |
| 123 | req.SetBucket(bucket); |
| 124 | req.SetKey(key); |
| 125 | req.SetPartNumber(part_tags.size() + 1); |
| 126 | req.SetUploadId(upload_id); |
| 127 | req.SetContentLength(data.size()); |
| 128 | req.SetBody(std::make_shared<Aws::StringStream>(data)); |
| 129 | |
| 130 | auto outcome = client_ptr->UploadPart(req); |
| 131 | |
| 132 | if (outcome.IsSuccess()) |
| 133 | { |
| 134 | auto etag = outcome.GetResult().GetETag(); |
| 135 | part_tags.push_back(etag); |
| 136 | LOG_DEBUG(log, "Write part " + std::to_string(part_tags.size()) + " finished. Upload id = " + upload_id + ". Etag = " + etag); |
| 137 | } |
| 138 | else |
| 139 | throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); |
| 140 | } |
| 141 | |
| 142 | |
| 143 | void WriteBufferFromS3::complete() |
| 144 | { |
| 145 | Aws::S3::Model::CompleteMultipartUploadRequest req; |
| 146 | req.SetBucket(bucket); |
| 147 | req.SetKey(key); |
| 148 | req.SetUploadId(upload_id); |
| 149 | |
| 150 | Aws::S3::Model::CompletedMultipartUpload multipart_upload; |
| 151 | for (size_t i = 0; i < part_tags.size(); ++i) |
| 152 | { |
| 153 | Aws::S3::Model::CompletedPart part; |
| 154 | multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(i + 1)); |
| 155 | } |
| 156 | |
| 157 | req.SetMultipartUpload(multipart_upload); |
| 158 | |
| 159 | auto outcome = client_ptr->CompleteMultipartUpload(req); |
| 160 | |
| 161 | if (outcome.IsSuccess()) |
| 162 | LOG_DEBUG(log, "Multipart upload completed. Upload_id = " + upload_id); |
| 163 | else |
| 164 | throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); |
| 165 | } |
| 166 | |
| 167 | } |
| 168 | |
| 169 | #endif |
| 170 | |