1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "parquet/util/memory.h" |
19 | |
20 | #include <algorithm> |
21 | #include <cstdint> |
22 | #include <cstdio> |
23 | #include <string> |
24 | #include <utility> |
25 | |
26 | #include "arrow/status.h" |
27 | #include "arrow/util/bit-util.h" |
28 | |
29 | #include "arrow/util/compression.h" |
30 | #include "arrow/util/logging.h" |
31 | #include "parquet/exception.h" |
32 | #include "parquet/types.h" |
33 | |
34 | using arrow::MemoryPool; |
35 | using arrow::util::Codec; |
36 | |
37 | namespace parquet { |
38 | |
39 | std::unique_ptr<Codec> GetCodecFromArrow(Compression::type codec) { |
40 | std::unique_ptr<Codec> result; |
41 | switch (codec) { |
42 | case Compression::UNCOMPRESSED: |
43 | break; |
44 | case Compression::SNAPPY: |
45 | PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::SNAPPY, &result)); |
46 | break; |
47 | case Compression::GZIP: |
48 | PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::GZIP, &result)); |
49 | break; |
50 | case Compression::LZO: |
51 | PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::LZO, &result)); |
52 | break; |
53 | case Compression::BROTLI: |
54 | PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::BROTLI, &result)); |
55 | break; |
56 | case Compression::LZ4: |
57 | PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::LZ4, &result)); |
58 | break; |
59 | case Compression::ZSTD: |
60 | PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::ZSTD, &result)); |
61 | break; |
62 | default: |
63 | break; |
64 | } |
65 | return result; |
66 | } |
67 | |
68 | template <class T> |
69 | Vector<T>::Vector(int64_t size, MemoryPool* pool) |
70 | : buffer_(AllocateBuffer(pool, size * sizeof(T))), size_(size), capacity_(size) { |
71 | if (size > 0) { |
72 | data_ = reinterpret_cast<T*>(buffer_->mutable_data()); |
73 | } else { |
74 | data_ = nullptr; |
75 | } |
76 | } |
77 | |
78 | template <class T> |
79 | void Vector<T>::Reserve(int64_t new_capacity) { |
80 | if (new_capacity > capacity_) { |
81 | PARQUET_THROW_NOT_OK(buffer_->Resize(new_capacity * sizeof(T))); |
82 | data_ = reinterpret_cast<T*>(buffer_->mutable_data()); |
83 | capacity_ = new_capacity; |
84 | } |
85 | } |
86 | |
87 | template <class T> |
88 | void Vector<T>::Resize(int64_t new_size) { |
89 | Reserve(new_size); |
90 | size_ = new_size; |
91 | } |
92 | |
93 | template <class T> |
94 | void Vector<T>::Assign(int64_t size, const T val) { |
95 | Resize(size); |
96 | for (int64_t i = 0; i < size_; i++) { |
97 | data_[i] = val; |
98 | } |
99 | } |
100 | |
101 | template <class T> |
102 | void Vector<T>::Swap(Vector<T>& v) { |
103 | buffer_.swap(v.buffer_); |
104 | std::swap(size_, v.size_); |
105 | std::swap(capacity_, v.capacity_); |
106 | std::swap(data_, v.data_); |
107 | } |
108 | |
109 | template class Vector<int32_t>; |
110 | template class Vector<int64_t>; |
111 | template class Vector<bool>; |
112 | template class Vector<float>; |
113 | template class Vector<double>; |
114 | template class Vector<Int96>; |
115 | template class Vector<ByteArray>; |
116 | template class Vector<FixedLenByteArray>; |
117 | |
118 | // ---------------------------------------------------------------------- |
119 | // Arrow IO wrappers |
120 | |
121 | void ArrowFileMethods::Close() { |
122 | // Closing the file is the responsibility of the owner of the handle |
123 | return; |
124 | } |
125 | |
126 | // Return the current position in the output stream relative to the start |
127 | int64_t ArrowFileMethods::Tell() { |
128 | int64_t position = 0; |
129 | PARQUET_THROW_NOT_OK(file_interface()->Tell(&position)); |
130 | return position; |
131 | } |
132 | |
133 | ArrowInputFile::ArrowInputFile( |
134 | const std::shared_ptr<::arrow::io::ReadableFileInterface>& file) |
135 | : file_(file) {} |
136 | |
137 | ::arrow::io::FileInterface* ArrowInputFile::file_interface() { return file_.get(); } |
138 | |
139 | int64_t ArrowInputFile::Size() const { |
140 | int64_t size; |
141 | PARQUET_THROW_NOT_OK(file_->GetSize(&size)); |
142 | return size; |
143 | } |
144 | |
145 | // Returns bytes read |
146 | int64_t ArrowInputFile::Read(int64_t nbytes, uint8_t* out) { |
147 | int64_t bytes_read = 0; |
148 | PARQUET_THROW_NOT_OK(file_->Read(nbytes, &bytes_read, out)); |
149 | return bytes_read; |
150 | } |
151 | |
152 | std::shared_ptr<Buffer> ArrowInputFile::Read(int64_t nbytes) { |
153 | std::shared_ptr<Buffer> out; |
154 | PARQUET_THROW_NOT_OK(file_->Read(nbytes, &out)); |
155 | return out; |
156 | } |
157 | |
158 | std::shared_ptr<Buffer> ArrowInputFile::ReadAt(int64_t position, int64_t nbytes) { |
159 | std::shared_ptr<Buffer> out; |
160 | PARQUET_THROW_NOT_OK(file_->ReadAt(position, nbytes, &out)); |
161 | return out; |
162 | } |
163 | |
164 | int64_t ArrowInputFile::ReadAt(int64_t position, int64_t nbytes, uint8_t* out) { |
165 | int64_t bytes_read = 0; |
166 | PARQUET_THROW_NOT_OK(file_->ReadAt(position, nbytes, &bytes_read, out)); |
167 | return bytes_read; |
168 | } |
169 | |
170 | ArrowOutputStream::ArrowOutputStream( |
171 | const std::shared_ptr<::arrow::io::OutputStream> file) |
172 | : file_(file) {} |
173 | |
174 | ::arrow::io::FileInterface* ArrowOutputStream::file_interface() { return file_.get(); } |
175 | |
176 | // Copy bytes into the output stream |
177 | void ArrowOutputStream::Write(const uint8_t* data, int64_t length) { |
178 | PARQUET_THROW_NOT_OK(file_->Write(data, length)); |
179 | } |
180 | |
181 | // ---------------------------------------------------------------------- |
182 | // InMemoryInputStream |
183 | |
184 | InMemoryInputStream::InMemoryInputStream(const std::shared_ptr<Buffer>& buffer) |
185 | : buffer_(buffer), offset_(0) { |
186 | len_ = buffer_->size(); |
187 | } |
188 | |
189 | InMemoryInputStream::InMemoryInputStream(RandomAccessSource* source, int64_t start, |
190 | int64_t num_bytes) |
191 | : offset_(0) { |
192 | buffer_ = source->ReadAt(start, num_bytes); |
193 | if (buffer_->size() < num_bytes) { |
194 | throw ParquetException("Unable to read column chunk data" ); |
195 | } |
196 | len_ = buffer_->size(); |
197 | } |
198 | |
199 | const uint8_t* InMemoryInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) { |
200 | *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_); |
201 | return buffer_->data() + offset_; |
202 | } |
203 | |
204 | const uint8_t* InMemoryInputStream::Read(int64_t num_to_read, int64_t* num_bytes) { |
205 | const uint8_t* result = Peek(num_to_read, num_bytes); |
206 | offset_ += *num_bytes; |
207 | return result; |
208 | } |
209 | |
210 | void InMemoryInputStream::Advance(int64_t num_bytes) { offset_ += num_bytes; } |
211 | |
212 | // ---------------------------------------------------------------------- |
213 | // In-memory output stream |
214 | |
215 | InMemoryOutputStream::InMemoryOutputStream(MemoryPool* pool, int64_t initial_capacity) |
216 | : size_(0), capacity_(initial_capacity) { |
217 | if (initial_capacity == 0) { |
218 | initial_capacity = kInMemoryDefaultCapacity; |
219 | } |
220 | buffer_ = AllocateBuffer(pool, initial_capacity); |
221 | } |
222 | |
223 | InMemoryOutputStream::~InMemoryOutputStream() {} |
224 | |
225 | uint8_t* InMemoryOutputStream::Head() { return buffer_->mutable_data() + size_; } |
226 | |
227 | void InMemoryOutputStream::Write(const uint8_t* data, int64_t length) { |
228 | if (size_ + length > capacity_) { |
229 | int64_t new_capacity = capacity_ * 2; |
230 | while (new_capacity < size_ + length) { |
231 | new_capacity *= 2; |
232 | } |
233 | PARQUET_THROW_NOT_OK(buffer_->Resize(new_capacity)); |
234 | capacity_ = new_capacity; |
235 | } |
236 | // If length == 0, data may be null |
237 | if (length > 0) { |
238 | memcpy(Head(), data, length); |
239 | size_ += length; |
240 | } |
241 | } |
242 | |
243 | int64_t InMemoryOutputStream::Tell() { return size_; } |
244 | |
245 | std::shared_ptr<Buffer> InMemoryOutputStream::GetBuffer() { |
246 | PARQUET_THROW_NOT_OK(buffer_->Resize(size_)); |
247 | std::shared_ptr<Buffer> result = buffer_; |
248 | buffer_ = nullptr; |
249 | return result; |
250 | } |
251 | |
252 | // ---------------------------------------------------------------------- |
253 | // BufferedInputStream |
254 | |
255 | BufferedInputStream::BufferedInputStream(MemoryPool* pool, int64_t buffer_size, |
256 | RandomAccessSource* source, int64_t start, |
257 | int64_t num_bytes) |
258 | : source_(source), stream_offset_(start), stream_end_(start + num_bytes) { |
259 | buffer_ = AllocateBuffer(pool, buffer_size); |
260 | buffer_size_ = buffer_->size(); |
261 | // Required to force a lazy read |
262 | buffer_offset_ = buffer_size_; |
263 | } |
264 | |
265 | const uint8_t* BufferedInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) { |
266 | *num_bytes = std::min(num_to_peek, stream_end_ - stream_offset_); |
267 | // increase the buffer size if needed |
268 | if (*num_bytes > buffer_size_) { |
269 | PARQUET_THROW_NOT_OK(buffer_->Resize(*num_bytes)); |
270 | buffer_size_ = buffer_->size(); |
271 | DCHECK(buffer_size_ >= *num_bytes); |
272 | } |
273 | // Read more data when buffer has insufficient left or when resized |
274 | if (*num_bytes > (buffer_size_ - buffer_offset_)) { |
275 | buffer_size_ = std::min(buffer_size_, stream_end_ - stream_offset_); |
276 | int64_t bytes_read = |
277 | source_->ReadAt(stream_offset_, buffer_size_, buffer_->mutable_data()); |
278 | if (bytes_read < *num_bytes) { |
279 | throw ParquetException("Failed reading column data from source" ); |
280 | } |
281 | buffer_offset_ = 0; |
282 | } |
283 | return buffer_->data() + buffer_offset_; |
284 | } |
285 | |
286 | const uint8_t* BufferedInputStream::Read(int64_t num_to_read, int64_t* num_bytes) { |
287 | const uint8_t* result = Peek(num_to_read, num_bytes); |
288 | stream_offset_ += *num_bytes; |
289 | buffer_offset_ += *num_bytes; |
290 | return result; |
291 | } |
292 | |
293 | void BufferedInputStream::Advance(int64_t num_bytes) { |
294 | stream_offset_ += num_bytes; |
295 | buffer_offset_ += num_bytes; |
296 | } |
297 | |
298 | std::shared_ptr<ResizableBuffer> AllocateBuffer(MemoryPool* pool, int64_t size) { |
299 | std::shared_ptr<ResizableBuffer> result; |
300 | PARQUET_THROW_NOT_OK(arrow::AllocateResizableBuffer(pool, size, &result)); |
301 | return result; |
302 | } |
303 | |
304 | } // namespace parquet |
305 | |