1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "parquet/util/memory.h"
19
20#include <algorithm>
21#include <cstdint>
22#include <cstdio>
23#include <string>
24#include <utility>
25
26#include "arrow/status.h"
27#include "arrow/util/bit-util.h"
28
29#include "arrow/util/compression.h"
30#include "arrow/util/logging.h"
31#include "parquet/exception.h"
32#include "parquet/types.h"
33
34using arrow::MemoryPool;
35using arrow::util::Codec;
36
37namespace parquet {
38
39std::unique_ptr<Codec> GetCodecFromArrow(Compression::type codec) {
40 std::unique_ptr<Codec> result;
41 switch (codec) {
42 case Compression::UNCOMPRESSED:
43 break;
44 case Compression::SNAPPY:
45 PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::SNAPPY, &result));
46 break;
47 case Compression::GZIP:
48 PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::GZIP, &result));
49 break;
50 case Compression::LZO:
51 PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::LZO, &result));
52 break;
53 case Compression::BROTLI:
54 PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::BROTLI, &result));
55 break;
56 case Compression::LZ4:
57 PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::LZ4, &result));
58 break;
59 case Compression::ZSTD:
60 PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::ZSTD, &result));
61 break;
62 default:
63 break;
64 }
65 return result;
66}
67
68template <class T>
69Vector<T>::Vector(int64_t size, MemoryPool* pool)
70 : buffer_(AllocateBuffer(pool, size * sizeof(T))), size_(size), capacity_(size) {
71 if (size > 0) {
72 data_ = reinterpret_cast<T*>(buffer_->mutable_data());
73 } else {
74 data_ = nullptr;
75 }
76}
77
78template <class T>
79void Vector<T>::Reserve(int64_t new_capacity) {
80 if (new_capacity > capacity_) {
81 PARQUET_THROW_NOT_OK(buffer_->Resize(new_capacity * sizeof(T)));
82 data_ = reinterpret_cast<T*>(buffer_->mutable_data());
83 capacity_ = new_capacity;
84 }
85}
86
87template <class T>
88void Vector<T>::Resize(int64_t new_size) {
89 Reserve(new_size);
90 size_ = new_size;
91}
92
93template <class T>
94void Vector<T>::Assign(int64_t size, const T val) {
95 Resize(size);
96 for (int64_t i = 0; i < size_; i++) {
97 data_[i] = val;
98 }
99}
100
101template <class T>
102void Vector<T>::Swap(Vector<T>& v) {
103 buffer_.swap(v.buffer_);
104 std::swap(size_, v.size_);
105 std::swap(capacity_, v.capacity_);
106 std::swap(data_, v.data_);
107}
108
109template class Vector<int32_t>;
110template class Vector<int64_t>;
111template class Vector<bool>;
112template class Vector<float>;
113template class Vector<double>;
114template class Vector<Int96>;
115template class Vector<ByteArray>;
116template class Vector<FixedLenByteArray>;
117
118// ----------------------------------------------------------------------
119// Arrow IO wrappers
120
121void ArrowFileMethods::Close() {
122 // Closing the file is the responsibility of the owner of the handle
123 return;
124}
125
126// Return the current position in the output stream relative to the start
127int64_t ArrowFileMethods::Tell() {
128 int64_t position = 0;
129 PARQUET_THROW_NOT_OK(file_interface()->Tell(&position));
130 return position;
131}
132
133ArrowInputFile::ArrowInputFile(
134 const std::shared_ptr<::arrow::io::ReadableFileInterface>& file)
135 : file_(file) {}
136
137::arrow::io::FileInterface* ArrowInputFile::file_interface() { return file_.get(); }
138
139int64_t ArrowInputFile::Size() const {
140 int64_t size;
141 PARQUET_THROW_NOT_OK(file_->GetSize(&size));
142 return size;
143}
144
145// Returns bytes read
146int64_t ArrowInputFile::Read(int64_t nbytes, uint8_t* out) {
147 int64_t bytes_read = 0;
148 PARQUET_THROW_NOT_OK(file_->Read(nbytes, &bytes_read, out));
149 return bytes_read;
150}
151
152std::shared_ptr<Buffer> ArrowInputFile::Read(int64_t nbytes) {
153 std::shared_ptr<Buffer> out;
154 PARQUET_THROW_NOT_OK(file_->Read(nbytes, &out));
155 return out;
156}
157
158std::shared_ptr<Buffer> ArrowInputFile::ReadAt(int64_t position, int64_t nbytes) {
159 std::shared_ptr<Buffer> out;
160 PARQUET_THROW_NOT_OK(file_->ReadAt(position, nbytes, &out));
161 return out;
162}
163
164int64_t ArrowInputFile::ReadAt(int64_t position, int64_t nbytes, uint8_t* out) {
165 int64_t bytes_read = 0;
166 PARQUET_THROW_NOT_OK(file_->ReadAt(position, nbytes, &bytes_read, out));
167 return bytes_read;
168}
169
170ArrowOutputStream::ArrowOutputStream(
171 const std::shared_ptr<::arrow::io::OutputStream> file)
172 : file_(file) {}
173
174::arrow::io::FileInterface* ArrowOutputStream::file_interface() { return file_.get(); }
175
176// Copy bytes into the output stream
177void ArrowOutputStream::Write(const uint8_t* data, int64_t length) {
178 PARQUET_THROW_NOT_OK(file_->Write(data, length));
179}
180
181// ----------------------------------------------------------------------
182// InMemoryInputStream
183
184InMemoryInputStream::InMemoryInputStream(const std::shared_ptr<Buffer>& buffer)
185 : buffer_(buffer), offset_(0) {
186 len_ = buffer_->size();
187}
188
189InMemoryInputStream::InMemoryInputStream(RandomAccessSource* source, int64_t start,
190 int64_t num_bytes)
191 : offset_(0) {
192 buffer_ = source->ReadAt(start, num_bytes);
193 if (buffer_->size() < num_bytes) {
194 throw ParquetException("Unable to read column chunk data");
195 }
196 len_ = buffer_->size();
197}
198
199const uint8_t* InMemoryInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) {
200 *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_);
201 return buffer_->data() + offset_;
202}
203
204const uint8_t* InMemoryInputStream::Read(int64_t num_to_read, int64_t* num_bytes) {
205 const uint8_t* result = Peek(num_to_read, num_bytes);
206 offset_ += *num_bytes;
207 return result;
208}
209
210void InMemoryInputStream::Advance(int64_t num_bytes) { offset_ += num_bytes; }
211
212// ----------------------------------------------------------------------
213// In-memory output stream
214
215InMemoryOutputStream::InMemoryOutputStream(MemoryPool* pool, int64_t initial_capacity)
216 : size_(0), capacity_(initial_capacity) {
217 if (initial_capacity == 0) {
218 initial_capacity = kInMemoryDefaultCapacity;
219 }
220 buffer_ = AllocateBuffer(pool, initial_capacity);
221}
222
223InMemoryOutputStream::~InMemoryOutputStream() {}
224
225uint8_t* InMemoryOutputStream::Head() { return buffer_->mutable_data() + size_; }
226
227void InMemoryOutputStream::Write(const uint8_t* data, int64_t length) {
228 if (size_ + length > capacity_) {
229 int64_t new_capacity = capacity_ * 2;
230 while (new_capacity < size_ + length) {
231 new_capacity *= 2;
232 }
233 PARQUET_THROW_NOT_OK(buffer_->Resize(new_capacity));
234 capacity_ = new_capacity;
235 }
236 // If length == 0, data may be null
237 if (length > 0) {
238 memcpy(Head(), data, length);
239 size_ += length;
240 }
241}
242
243int64_t InMemoryOutputStream::Tell() { return size_; }
244
245std::shared_ptr<Buffer> InMemoryOutputStream::GetBuffer() {
246 PARQUET_THROW_NOT_OK(buffer_->Resize(size_));
247 std::shared_ptr<Buffer> result = buffer_;
248 buffer_ = nullptr;
249 return result;
250}
251
252// ----------------------------------------------------------------------
253// BufferedInputStream
254
255BufferedInputStream::BufferedInputStream(MemoryPool* pool, int64_t buffer_size,
256 RandomAccessSource* source, int64_t start,
257 int64_t num_bytes)
258 : source_(source), stream_offset_(start), stream_end_(start + num_bytes) {
259 buffer_ = AllocateBuffer(pool, buffer_size);
260 buffer_size_ = buffer_->size();
261 // Required to force a lazy read
262 buffer_offset_ = buffer_size_;
263}
264
265const uint8_t* BufferedInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) {
266 *num_bytes = std::min(num_to_peek, stream_end_ - stream_offset_);
267 // increase the buffer size if needed
268 if (*num_bytes > buffer_size_) {
269 PARQUET_THROW_NOT_OK(buffer_->Resize(*num_bytes));
270 buffer_size_ = buffer_->size();
271 DCHECK(buffer_size_ >= *num_bytes);
272 }
273 // Read more data when buffer has insufficient left or when resized
274 if (*num_bytes > (buffer_size_ - buffer_offset_)) {
275 buffer_size_ = std::min(buffer_size_, stream_end_ - stream_offset_);
276 int64_t bytes_read =
277 source_->ReadAt(stream_offset_, buffer_size_, buffer_->mutable_data());
278 if (bytes_read < *num_bytes) {
279 throw ParquetException("Failed reading column data from source");
280 }
281 buffer_offset_ = 0;
282 }
283 return buffer_->data() + buffer_offset_;
284}
285
286const uint8_t* BufferedInputStream::Read(int64_t num_to_read, int64_t* num_bytes) {
287 const uint8_t* result = Peek(num_to_read, num_bytes);
288 stream_offset_ += *num_bytes;
289 buffer_offset_ += *num_bytes;
290 return result;
291}
292
293void BufferedInputStream::Advance(int64_t num_bytes) {
294 stream_offset_ += num_bytes;
295 buffer_offset_ += num_bytes;
296}
297
298std::shared_ptr<ResizableBuffer> AllocateBuffer(MemoryPool* pool, int64_t size) {
299 std::shared_ptr<ResizableBuffer> result;
300 PARQUET_THROW_NOT_OK(arrow::AllocateResizableBuffer(pool, size, &result));
301 return result;
302}
303
304} // namespace parquet
305