1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #ifndef PARQUET_UTIL_MEMORY_H |
19 | #define PARQUET_UTIL_MEMORY_H |
20 | |
21 | #include <atomic> |
22 | #include <cstdint> |
23 | #include <cstdlib> |
24 | #include <cstring> |
25 | #include <memory> |
26 | #include <string> |
27 | #include <vector> |
28 | |
29 | #include "arrow/buffer.h" |
30 | #include "arrow/io/interfaces.h" |
31 | #include "arrow/io/memory.h" |
32 | #include "arrow/memory_pool.h" |
33 | |
34 | #include "parquet/exception.h" |
35 | #include "parquet/types.h" |
36 | #include "parquet/util/macros.h" |
37 | #include "parquet/util/visibility.h" |
38 | |
39 | namespace arrow { |
40 | namespace util { |
41 | |
42 | class Codec; |
43 | |
44 | } // namespace util |
45 | } // namespace arrow |
46 | |
47 | namespace parquet { |
48 | |
49 | PARQUET_EXPORT |
50 | std::unique_ptr<::arrow::util::Codec> GetCodecFromArrow(Compression::type codec); |
51 | |
52 | static constexpr int64_t kInMemoryDefaultCapacity = 1024; |
53 | |
54 | using Buffer = ::arrow::Buffer; |
55 | using MutableBuffer = ::arrow::MutableBuffer; |
56 | using ResizableBuffer = ::arrow::ResizableBuffer; |
57 | using ResizableBuffer = ::arrow::ResizableBuffer; |
58 | |
59 | template <class T> |
60 | class PARQUET_EXPORT Vector { |
61 | public: |
62 | explicit Vector(int64_t size, ::arrow::MemoryPool* pool); |
63 | void Resize(int64_t new_size); |
64 | void Reserve(int64_t new_capacity); |
65 | void Assign(int64_t size, const T val); |
66 | void Swap(Vector<T>& v); |
67 | inline T& operator[](int64_t i) const { return data_[i]; } |
68 | |
69 | T* data() { return data_; } |
70 | const T* data() const { return data_; } |
71 | |
72 | private: |
73 | std::shared_ptr<ResizableBuffer> buffer_; |
74 | int64_t size_; |
75 | int64_t capacity_; |
76 | T* data_; |
77 | |
78 | PARQUET_DISALLOW_COPY_AND_ASSIGN(Vector); |
79 | }; |
80 | |
81 | // File input and output interfaces that translate arrow::Status to exceptions |
82 | |
83 | class PARQUET_EXPORT FileInterface { |
84 | public: |
85 | virtual ~FileInterface() = default; |
86 | |
87 | // Close the file |
88 | virtual void Close() = 0; |
89 | |
90 | // Return the current position in the file relative to the start |
91 | virtual int64_t Tell() = 0; |
92 | }; |
93 | |
94 | /// It is the responsibility of implementations to mind threadsafety of shared |
95 | /// resources |
96 | class PARQUET_EXPORT RandomAccessSource : virtual public FileInterface { |
97 | public: |
98 | virtual ~RandomAccessSource() = default; |
99 | |
100 | virtual int64_t Size() const = 0; |
101 | |
102 | // Returns bytes read |
103 | virtual int64_t Read(int64_t nbytes, uint8_t* out) = 0; |
104 | |
105 | virtual std::shared_ptr<Buffer> Read(int64_t nbytes) = 0; |
106 | |
107 | virtual std::shared_ptr<Buffer> ReadAt(int64_t position, int64_t nbytes) = 0; |
108 | |
109 | /// Returns bytes read |
110 | virtual int64_t ReadAt(int64_t position, int64_t nbytes, uint8_t* out) = 0; |
111 | }; |
112 | |
113 | class PARQUET_EXPORT OutputStream : virtual public FileInterface { |
114 | public: |
115 | virtual ~OutputStream() = default; |
116 | |
117 | // Copy bytes into the output stream |
118 | virtual void Write(const uint8_t* data, int64_t length) = 0; |
119 | }; |
120 | |
121 | class PARQUET_EXPORT ArrowFileMethods : virtual public FileInterface { |
122 | public: |
123 | // No-op. Closing the file is the responsibility of the owner of the handle |
124 | void Close() override; |
125 | |
126 | int64_t Tell() override; |
127 | |
128 | protected: |
129 | virtual ::arrow::io::FileInterface* file_interface() = 0; |
130 | }; |
131 | |
132 | // Suppress C4250 warning caused by diamond inheritance |
133 | #ifdef _MSC_VER |
134 | #pragma warning(push) |
135 | #pragma warning(disable : 4250) |
136 | #endif |
137 | |
138 | /// This interface depends on the threadsafety of the underlying Arrow file interface |
139 | class PARQUET_EXPORT ArrowInputFile : public ArrowFileMethods, public RandomAccessSource { |
140 | public: |
141 | explicit ArrowInputFile( |
142 | const std::shared_ptr<::arrow::io::ReadableFileInterface>& file); |
143 | |
144 | int64_t Size() const override; |
145 | |
146 | // Returns bytes read |
147 | int64_t Read(int64_t nbytes, uint8_t* out) override; |
148 | |
149 | std::shared_ptr<Buffer> Read(int64_t nbytes) override; |
150 | |
151 | std::shared_ptr<Buffer> ReadAt(int64_t position, int64_t nbytes) override; |
152 | |
153 | /// Returns bytes read |
154 | int64_t ReadAt(int64_t position, int64_t nbytes, uint8_t* out) override; |
155 | |
156 | std::shared_ptr<::arrow::io::ReadableFileInterface> file() const { return file_; } |
157 | |
158 | // Diamond inheritance |
159 | using ArrowFileMethods::Close; |
160 | using ArrowFileMethods::Tell; |
161 | |
162 | private: |
163 | ::arrow::io::FileInterface* file_interface() override; |
164 | std::shared_ptr<::arrow::io::ReadableFileInterface> file_; |
165 | }; |
166 | |
167 | class PARQUET_EXPORT ArrowOutputStream : public ArrowFileMethods, public OutputStream { |
168 | public: |
169 | explicit ArrowOutputStream(const std::shared_ptr<::arrow::io::OutputStream> file); |
170 | |
171 | // Copy bytes into the output stream |
172 | void Write(const uint8_t* data, int64_t length) override; |
173 | |
174 | std::shared_ptr<::arrow::io::OutputStream> file() { return file_; } |
175 | |
176 | // Diamond inheritance |
177 | using ArrowFileMethods::Close; |
178 | using ArrowFileMethods::Tell; |
179 | |
180 | private: |
181 | ::arrow::io::FileInterface* file_interface() override; |
182 | std::shared_ptr<::arrow::io::OutputStream> file_; |
183 | }; |
184 | |
185 | // Pop C4250 pragma |
186 | #ifdef _MSC_VER |
187 | #pragma warning(pop) |
188 | #endif |
189 | |
190 | class PARQUET_EXPORT InMemoryOutputStream : public OutputStream { |
191 | public: |
192 | explicit InMemoryOutputStream( |
193 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), |
194 | int64_t initial_capacity = kInMemoryDefaultCapacity); |
195 | |
196 | virtual ~InMemoryOutputStream(); |
197 | |
198 | // Close is currently a no-op with the in-memory stream |
199 | virtual void Close() {} |
200 | |
201 | virtual int64_t Tell(); |
202 | |
203 | virtual void Write(const uint8_t* data, int64_t length); |
204 | |
205 | // Clears the stream |
206 | void Clear() { size_ = 0; } |
207 | |
208 | // Get pointer to the underlying buffer |
209 | const Buffer& GetBufferRef() const { return *buffer_; } |
210 | |
211 | // Return complete stream as Buffer |
212 | std::shared_ptr<Buffer> GetBuffer(); |
213 | |
214 | private: |
215 | // Mutable pointer to the current write position in the stream |
216 | uint8_t* Head(); |
217 | |
218 | std::shared_ptr<ResizableBuffer> buffer_; |
219 | int64_t size_; |
220 | int64_t capacity_; |
221 | |
222 | PARQUET_DISALLOW_COPY_AND_ASSIGN(InMemoryOutputStream); |
223 | }; |
224 | |
225 | // ---------------------------------------------------------------------- |
226 | // Streaming input interfaces |
227 | |
228 | // Interface for the column reader to get the bytes. The interface is a stream |
229 | // interface, meaning the bytes in order and once a byte is read, it does not |
230 | // need to be read again. |
231 | class PARQUET_EXPORT InputStream { |
232 | public: |
233 | // Returns the next 'num_to_peek' without advancing the current position. |
234 | // *num_bytes will contain the number of bytes returned which can only be |
235 | // less than num_to_peek at end of stream cases. |
236 | // Since the position is not advanced, calls to this function are idempotent. |
237 | // The buffer returned to the caller is still owned by the input stream and must |
238 | // stay valid until the next call to Peek() or Read(). |
239 | virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes) = 0; |
240 | |
241 | // Identical to Peek(), except the current position in the stream is advanced by |
242 | // *num_bytes. |
243 | virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes) = 0; |
244 | |
245 | // Advance the stream without reading |
246 | virtual void Advance(int64_t num_bytes) = 0; |
247 | |
248 | virtual ~InputStream() {} |
249 | |
250 | protected: |
251 | InputStream() {} |
252 | }; |
253 | |
254 | // Implementation of an InputStream when all the bytes are in memory. |
255 | class PARQUET_EXPORT InMemoryInputStream : public InputStream { |
256 | public: |
257 | InMemoryInputStream(RandomAccessSource* source, int64_t start, int64_t end); |
258 | explicit InMemoryInputStream(const std::shared_ptr<Buffer>& buffer); |
259 | virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes); |
260 | virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes); |
261 | |
262 | virtual void Advance(int64_t num_bytes); |
263 | |
264 | private: |
265 | std::shared_ptr<Buffer> buffer_; |
266 | int64_t len_; |
267 | int64_t offset_; |
268 | }; |
269 | |
270 | // Implementation of an InputStream when only some of the bytes are in memory. |
271 | class PARQUET_EXPORT BufferedInputStream : public InputStream { |
272 | public: |
273 | BufferedInputStream(::arrow::MemoryPool* pool, int64_t buffer_size, |
274 | RandomAccessSource* source, int64_t start, int64_t end); |
275 | virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes); |
276 | virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes); |
277 | |
278 | virtual void Advance(int64_t num_bytes); |
279 | |
280 | private: |
281 | std::shared_ptr<ResizableBuffer> buffer_; |
282 | RandomAccessSource* source_; |
283 | int64_t stream_offset_; |
284 | int64_t stream_end_; |
285 | int64_t buffer_offset_; |
286 | int64_t buffer_size_; |
287 | }; |
288 | |
289 | std::shared_ptr<ResizableBuffer> PARQUET_EXPORT AllocateBuffer( |
290 | ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), int64_t size = 0); |
291 | |
292 | } // namespace parquet |
293 | |
294 | #endif // PARQUET_UTIL_MEMORY_H |
295 | |