1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#ifndef PARQUET_UTIL_MEMORY_H
19#define PARQUET_UTIL_MEMORY_H
20
21#include <atomic>
22#include <cstdint>
23#include <cstdlib>
24#include <cstring>
25#include <memory>
26#include <string>
27#include <vector>
28
29#include "arrow/buffer.h"
30#include "arrow/io/interfaces.h"
31#include "arrow/io/memory.h"
32#include "arrow/memory_pool.h"
33
34#include "parquet/exception.h"
35#include "parquet/types.h"
36#include "parquet/util/macros.h"
37#include "parquet/util/visibility.h"
38
39namespace arrow {
40namespace util {
41
42class Codec;
43
44} // namespace util
45} // namespace arrow
46
47namespace parquet {
48
49PARQUET_EXPORT
50std::unique_ptr<::arrow::util::Codec> GetCodecFromArrow(Compression::type codec);
51
52static constexpr int64_t kInMemoryDefaultCapacity = 1024;
53
54using Buffer = ::arrow::Buffer;
55using MutableBuffer = ::arrow::MutableBuffer;
56using ResizableBuffer = ::arrow::ResizableBuffer;
57using ResizableBuffer = ::arrow::ResizableBuffer;
58
59template <class T>
60class PARQUET_EXPORT Vector {
61 public:
62 explicit Vector(int64_t size, ::arrow::MemoryPool* pool);
63 void Resize(int64_t new_size);
64 void Reserve(int64_t new_capacity);
65 void Assign(int64_t size, const T val);
66 void Swap(Vector<T>& v);
67 inline T& operator[](int64_t i) const { return data_[i]; }
68
69 T* data() { return data_; }
70 const T* data() const { return data_; }
71
72 private:
73 std::shared_ptr<ResizableBuffer> buffer_;
74 int64_t size_;
75 int64_t capacity_;
76 T* data_;
77
78 PARQUET_DISALLOW_COPY_AND_ASSIGN(Vector);
79};
80
81// File input and output interfaces that translate arrow::Status to exceptions
82
83class PARQUET_EXPORT FileInterface {
84 public:
85 virtual ~FileInterface() = default;
86
87 // Close the file
88 virtual void Close() = 0;
89
90 // Return the current position in the file relative to the start
91 virtual int64_t Tell() = 0;
92};
93
94/// It is the responsibility of implementations to mind threadsafety of shared
95/// resources
96class PARQUET_EXPORT RandomAccessSource : virtual public FileInterface {
97 public:
98 virtual ~RandomAccessSource() = default;
99
100 virtual int64_t Size() const = 0;
101
102 // Returns bytes read
103 virtual int64_t Read(int64_t nbytes, uint8_t* out) = 0;
104
105 virtual std::shared_ptr<Buffer> Read(int64_t nbytes) = 0;
106
107 virtual std::shared_ptr<Buffer> ReadAt(int64_t position, int64_t nbytes) = 0;
108
109 /// Returns bytes read
110 virtual int64_t ReadAt(int64_t position, int64_t nbytes, uint8_t* out) = 0;
111};
112
113class PARQUET_EXPORT OutputStream : virtual public FileInterface {
114 public:
115 virtual ~OutputStream() = default;
116
117 // Copy bytes into the output stream
118 virtual void Write(const uint8_t* data, int64_t length) = 0;
119};
120
121class PARQUET_EXPORT ArrowFileMethods : virtual public FileInterface {
122 public:
123 // No-op. Closing the file is the responsibility of the owner of the handle
124 void Close() override;
125
126 int64_t Tell() override;
127
128 protected:
129 virtual ::arrow::io::FileInterface* file_interface() = 0;
130};
131
132// Suppress C4250 warning caused by diamond inheritance
133#ifdef _MSC_VER
134#pragma warning(push)
135#pragma warning(disable : 4250)
136#endif
137
138/// This interface depends on the threadsafety of the underlying Arrow file interface
139class PARQUET_EXPORT ArrowInputFile : public ArrowFileMethods, public RandomAccessSource {
140 public:
141 explicit ArrowInputFile(
142 const std::shared_ptr<::arrow::io::ReadableFileInterface>& file);
143
144 int64_t Size() const override;
145
146 // Returns bytes read
147 int64_t Read(int64_t nbytes, uint8_t* out) override;
148
149 std::shared_ptr<Buffer> Read(int64_t nbytes) override;
150
151 std::shared_ptr<Buffer> ReadAt(int64_t position, int64_t nbytes) override;
152
153 /// Returns bytes read
154 int64_t ReadAt(int64_t position, int64_t nbytes, uint8_t* out) override;
155
156 std::shared_ptr<::arrow::io::ReadableFileInterface> file() const { return file_; }
157
158 // Diamond inheritance
159 using ArrowFileMethods::Close;
160 using ArrowFileMethods::Tell;
161
162 private:
163 ::arrow::io::FileInterface* file_interface() override;
164 std::shared_ptr<::arrow::io::ReadableFileInterface> file_;
165};
166
167class PARQUET_EXPORT ArrowOutputStream : public ArrowFileMethods, public OutputStream {
168 public:
169 explicit ArrowOutputStream(const std::shared_ptr<::arrow::io::OutputStream> file);
170
171 // Copy bytes into the output stream
172 void Write(const uint8_t* data, int64_t length) override;
173
174 std::shared_ptr<::arrow::io::OutputStream> file() { return file_; }
175
176 // Diamond inheritance
177 using ArrowFileMethods::Close;
178 using ArrowFileMethods::Tell;
179
180 private:
181 ::arrow::io::FileInterface* file_interface() override;
182 std::shared_ptr<::arrow::io::OutputStream> file_;
183};
184
185// Pop C4250 pragma
186#ifdef _MSC_VER
187#pragma warning(pop)
188#endif
189
190class PARQUET_EXPORT InMemoryOutputStream : public OutputStream {
191 public:
192 explicit InMemoryOutputStream(
193 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
194 int64_t initial_capacity = kInMemoryDefaultCapacity);
195
196 virtual ~InMemoryOutputStream();
197
198 // Close is currently a no-op with the in-memory stream
199 virtual void Close() {}
200
201 virtual int64_t Tell();
202
203 virtual void Write(const uint8_t* data, int64_t length);
204
205 // Clears the stream
206 void Clear() { size_ = 0; }
207
208 // Get pointer to the underlying buffer
209 const Buffer& GetBufferRef() const { return *buffer_; }
210
211 // Return complete stream as Buffer
212 std::shared_ptr<Buffer> GetBuffer();
213
214 private:
215 // Mutable pointer to the current write position in the stream
216 uint8_t* Head();
217
218 std::shared_ptr<ResizableBuffer> buffer_;
219 int64_t size_;
220 int64_t capacity_;
221
222 PARQUET_DISALLOW_COPY_AND_ASSIGN(InMemoryOutputStream);
223};
224
225// ----------------------------------------------------------------------
226// Streaming input interfaces
227
228// Interface for the column reader to get the bytes. The interface is a stream
229// interface, meaning the bytes in order and once a byte is read, it does not
230// need to be read again.
231class PARQUET_EXPORT InputStream {
232 public:
233 // Returns the next 'num_to_peek' without advancing the current position.
234 // *num_bytes will contain the number of bytes returned which can only be
235 // less than num_to_peek at end of stream cases.
236 // Since the position is not advanced, calls to this function are idempotent.
237 // The buffer returned to the caller is still owned by the input stream and must
238 // stay valid until the next call to Peek() or Read().
239 virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes) = 0;
240
241 // Identical to Peek(), except the current position in the stream is advanced by
242 // *num_bytes.
243 virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes) = 0;
244
245 // Advance the stream without reading
246 virtual void Advance(int64_t num_bytes) = 0;
247
248 virtual ~InputStream() {}
249
250 protected:
251 InputStream() {}
252};
253
254// Implementation of an InputStream when all the bytes are in memory.
255class PARQUET_EXPORT InMemoryInputStream : public InputStream {
256 public:
257 InMemoryInputStream(RandomAccessSource* source, int64_t start, int64_t end);
258 explicit InMemoryInputStream(const std::shared_ptr<Buffer>& buffer);
259 virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
260 virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);
261
262 virtual void Advance(int64_t num_bytes);
263
264 private:
265 std::shared_ptr<Buffer> buffer_;
266 int64_t len_;
267 int64_t offset_;
268};
269
270// Implementation of an InputStream when only some of the bytes are in memory.
271class PARQUET_EXPORT BufferedInputStream : public InputStream {
272 public:
273 BufferedInputStream(::arrow::MemoryPool* pool, int64_t buffer_size,
274 RandomAccessSource* source, int64_t start, int64_t end);
275 virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
276 virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);
277
278 virtual void Advance(int64_t num_bytes);
279
280 private:
281 std::shared_ptr<ResizableBuffer> buffer_;
282 RandomAccessSource* source_;
283 int64_t stream_offset_;
284 int64_t stream_end_;
285 int64_t buffer_offset_;
286 int64_t buffer_size_;
287};
288
289std::shared_ptr<ResizableBuffer> PARQUET_EXPORT AllocateBuffer(
290 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), int64_t size = 0);
291
292} // namespace parquet
293
294#endif // PARQUET_UTIL_MEMORY_H
295