1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #ifndef PARQUET_ARROW_WRITER_H |
19 | #define PARQUET_ARROW_WRITER_H |
20 | |
21 | #include <memory> |
22 | |
23 | #include "parquet/api/schema.h" |
24 | #include "parquet/api/writer.h" |
25 | |
26 | #include "arrow/io/interfaces.h" |
27 | #include "arrow/type.h" |
28 | |
29 | namespace arrow { |
30 | |
31 | class Array; |
32 | class MemoryPool; |
33 | class PrimitiveArray; |
34 | class Schema; |
35 | class Status; |
36 | class StringArray; |
37 | class Table; |
38 | } // namespace arrow |
39 | |
40 | namespace parquet { |
41 | namespace arrow { |
42 | |
43 | class PARQUET_EXPORT ArrowWriterProperties { |
44 | public: |
45 | class Builder { |
46 | public: |
47 | Builder() |
48 | : write_timestamps_as_int96_(false), |
49 | coerce_timestamps_enabled_(false), |
50 | coerce_timestamps_unit_(::arrow::TimeUnit::SECOND), |
51 | truncated_timestamps_allowed_(false) {} |
52 | virtual ~Builder() {} |
53 | |
54 | Builder* disable_deprecated_int96_timestamps() { |
55 | write_timestamps_as_int96_ = false; |
56 | return this; |
57 | } |
58 | |
59 | Builder* enable_deprecated_int96_timestamps() { |
60 | write_timestamps_as_int96_ = true; |
61 | return this; |
62 | } |
63 | |
64 | Builder* coerce_timestamps(::arrow::TimeUnit::type unit) { |
65 | coerce_timestamps_enabled_ = true; |
66 | coerce_timestamps_unit_ = unit; |
67 | return this; |
68 | } |
69 | |
70 | Builder* allow_truncated_timestamps() { |
71 | truncated_timestamps_allowed_ = true; |
72 | return this; |
73 | } |
74 | |
75 | Builder* disallow_truncated_timestamps() { |
76 | truncated_timestamps_allowed_ = false; |
77 | return this; |
78 | } |
79 | |
80 | std::shared_ptr<ArrowWriterProperties> build() { |
81 | return std::shared_ptr<ArrowWriterProperties>(new ArrowWriterProperties( |
82 | write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_, |
83 | truncated_timestamps_allowed_)); |
84 | } |
85 | |
86 | private: |
87 | bool write_timestamps_as_int96_; |
88 | |
89 | bool coerce_timestamps_enabled_; |
90 | ::arrow::TimeUnit::type coerce_timestamps_unit_; |
91 | bool truncated_timestamps_allowed_; |
92 | }; |
93 | |
94 | bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; } |
95 | |
96 | bool coerce_timestamps_enabled() const { return coerce_timestamps_enabled_; } |
97 | ::arrow::TimeUnit::type coerce_timestamps_unit() const { |
98 | return coerce_timestamps_unit_; |
99 | } |
100 | |
101 | bool truncated_timestamps_allowed() const { return truncated_timestamps_allowed_; } |
102 | |
103 | private: |
104 | explicit ArrowWriterProperties(bool write_nanos_as_int96, |
105 | bool coerce_timestamps_enabled, |
106 | ::arrow::TimeUnit::type coerce_timestamps_unit, |
107 | bool truncated_timestamps_allowed) |
108 | : write_timestamps_as_int96_(write_nanos_as_int96), |
109 | coerce_timestamps_enabled_(coerce_timestamps_enabled), |
110 | coerce_timestamps_unit_(coerce_timestamps_unit), |
111 | truncated_timestamps_allowed_(truncated_timestamps_allowed) {} |
112 | |
113 | const bool write_timestamps_as_int96_; |
114 | const bool coerce_timestamps_enabled_; |
115 | const ::arrow::TimeUnit::type coerce_timestamps_unit_; |
116 | const bool truncated_timestamps_allowed_; |
117 | }; |
118 | |
119 | std::shared_ptr<ArrowWriterProperties> PARQUET_EXPORT default_arrow_writer_properties(); |
120 | |
121 | /** |
122 | * Iterative API: |
123 | * Start a new RowGroup/Chunk with NewRowGroup |
124 | * Write column-by-column the whole column chunk |
125 | */ |
126 | class PARQUET_EXPORT FileWriter { |
127 | public: |
128 | FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer, |
129 | const std::shared_ptr<ArrowWriterProperties>& arrow_properties = |
130 | default_arrow_writer_properties()); |
131 | |
132 | static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool, |
133 | const std::shared_ptr<OutputStream>& sink, |
134 | const std::shared_ptr<WriterProperties>& properties, |
135 | std::unique_ptr<FileWriter>* writer); |
136 | |
137 | static ::arrow::Status Open( |
138 | const ::arrow::Schema& schema, ::arrow::MemoryPool* pool, |
139 | const std::shared_ptr<OutputStream>& sink, |
140 | const std::shared_ptr<WriterProperties>& properties, |
141 | const std::shared_ptr<ArrowWriterProperties>& arrow_properties, |
142 | std::unique_ptr<FileWriter>* writer); |
143 | |
144 | static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool, |
145 | const std::shared_ptr<::arrow::io::OutputStream>& sink, |
146 | const std::shared_ptr<WriterProperties>& properties, |
147 | std::unique_ptr<FileWriter>* writer); |
148 | |
149 | static ::arrow::Status Open( |
150 | const ::arrow::Schema& schema, ::arrow::MemoryPool* pool, |
151 | const std::shared_ptr<::arrow::io::OutputStream>& sink, |
152 | const std::shared_ptr<WriterProperties>& properties, |
153 | const std::shared_ptr<ArrowWriterProperties>& arrow_properties, |
154 | std::unique_ptr<FileWriter>* writer); |
155 | |
156 | /// \brief Write a Table to Parquet. |
157 | ::arrow::Status WriteTable(const ::arrow::Table& table, int64_t chunk_size); |
158 | |
159 | ::arrow::Status NewRowGroup(int64_t chunk_size); |
160 | ::arrow::Status WriteColumnChunk(const ::arrow::Array& data); |
161 | |
162 | /// \brief Write ColumnChunk in row group using slice of a ChunkedArray |
163 | ::arrow::Status WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data, |
164 | const int64_t offset, const int64_t size); |
165 | ::arrow::Status WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data); |
166 | ::arrow::Status Close(); |
167 | |
168 | virtual ~FileWriter(); |
169 | |
170 | ::arrow::MemoryPool* memory_pool() const; |
171 | |
172 | private: |
173 | class PARQUET_NO_EXPORT Impl; |
174 | std::unique_ptr<Impl> impl_; |
175 | }; |
176 | |
177 | /// \brief Write Parquet file metadata only to indicated OutputStream |
178 | PARQUET_EXPORT |
179 | ::arrow::Status WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink); |
180 | |
181 | /// \brief Write Parquet file metadata only to indicated Arrow OutputStream |
182 | PARQUET_EXPORT |
183 | ::arrow::Status WriteFileMetaData(const FileMetaData& file_metadata, |
184 | const std::shared_ptr<::arrow::io::OutputStream>& sink); |
185 | |
186 | /** |
187 | * Write a Table to Parquet. |
188 | * |
189 | * The table shall only consist of columns of primitive type or of primitive lists. |
190 | */ |
191 | ::arrow::Status PARQUET_EXPORT WriteTable( |
192 | const ::arrow::Table& table, ::arrow::MemoryPool* pool, |
193 | const std::shared_ptr<OutputStream>& sink, int64_t chunk_size, |
194 | const std::shared_ptr<WriterProperties>& properties = default_writer_properties(), |
195 | const std::shared_ptr<ArrowWriterProperties>& arrow_properties = |
196 | default_arrow_writer_properties()); |
197 | |
198 | ::arrow::Status PARQUET_EXPORT WriteTable( |
199 | const ::arrow::Table& table, ::arrow::MemoryPool* pool, |
200 | const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size, |
201 | const std::shared_ptr<WriterProperties>& properties = default_writer_properties(), |
202 | const std::shared_ptr<ArrowWriterProperties>& arrow_properties = |
203 | default_arrow_writer_properties()); |
204 | |
205 | namespace internal { |
206 | |
207 | /** |
208 | * Timestamp conversion constants |
209 | */ |
210 | constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588); |
211 | |
212 | template <int64_t UnitPerDay, int64_t NanosecondsPerUnit> |
213 | inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) { |
214 | int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays; |
215 | (*impala_timestamp).value[2] = (uint32_t)julian_days; |
216 | |
217 | int64_t last_day_units = time % UnitPerDay; |
218 | int64_t* impala_last_day_nanos = reinterpret_cast<int64_t*>(impala_timestamp); |
219 | *impala_last_day_nanos = last_day_units * NanosecondsPerUnit; |
220 | } |
221 | |
222 | constexpr int64_t kSecondsInNanos = INT64_C(1000000000); |
223 | |
224 | inline void SecondsToImpalaTimestamp(const int64_t seconds, Int96* impala_timestamp) { |
225 | ArrowTimestampToImpalaTimestamp<kSecondsPerDay, kSecondsInNanos>(seconds, |
226 | impala_timestamp); |
227 | } |
228 | |
229 | constexpr int64_t kMillisecondsInNanos = kSecondsInNanos / INT64_C(1000); |
230 | |
231 | inline void MillisecondsToImpalaTimestamp(const int64_t milliseconds, |
232 | Int96* impala_timestamp) { |
233 | ArrowTimestampToImpalaTimestamp<kMillisecondsPerDay, kMillisecondsInNanos>( |
234 | milliseconds, impala_timestamp); |
235 | } |
236 | |
237 | constexpr int64_t kMicrosecondsInNanos = kMillisecondsInNanos / INT64_C(1000); |
238 | |
239 | inline void MicrosecondsToImpalaTimestamp(const int64_t microseconds, |
240 | Int96* impala_timestamp) { |
241 | ArrowTimestampToImpalaTimestamp<kMicrosecondsPerDay, kMicrosecondsInNanos>( |
242 | microseconds, impala_timestamp); |
243 | } |
244 | |
245 | constexpr int64_t kNanosecondsInNanos = INT64_C(1); |
246 | |
247 | inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds, |
248 | Int96* impala_timestamp) { |
249 | ArrowTimestampToImpalaTimestamp<kNanosecondsPerDay, kNanosecondsInNanos>( |
250 | nanoseconds, impala_timestamp); |
251 | } |
252 | |
253 | } // namespace internal |
254 | |
255 | } // namespace arrow |
256 | |
257 | } // namespace parquet |
258 | |
259 | #endif // PARQUET_ARROW_WRITER_H |
260 | |