1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#ifndef PARQUET_ARROW_WRITER_H
19#define PARQUET_ARROW_WRITER_H
20
21#include <memory>
22
23#include "parquet/api/schema.h"
24#include "parquet/api/writer.h"
25
26#include "arrow/io/interfaces.h"
27#include "arrow/type.h"
28
29namespace arrow {
30
31class Array;
32class MemoryPool;
33class PrimitiveArray;
34class Schema;
35class Status;
36class StringArray;
37class Table;
38} // namespace arrow
39
40namespace parquet {
41namespace arrow {
42
43class PARQUET_EXPORT ArrowWriterProperties {
44 public:
45 class Builder {
46 public:
47 Builder()
48 : write_timestamps_as_int96_(false),
49 coerce_timestamps_enabled_(false),
50 coerce_timestamps_unit_(::arrow::TimeUnit::SECOND),
51 truncated_timestamps_allowed_(false) {}
52 virtual ~Builder() {}
53
54 Builder* disable_deprecated_int96_timestamps() {
55 write_timestamps_as_int96_ = false;
56 return this;
57 }
58
59 Builder* enable_deprecated_int96_timestamps() {
60 write_timestamps_as_int96_ = true;
61 return this;
62 }
63
64 Builder* coerce_timestamps(::arrow::TimeUnit::type unit) {
65 coerce_timestamps_enabled_ = true;
66 coerce_timestamps_unit_ = unit;
67 return this;
68 }
69
70 Builder* allow_truncated_timestamps() {
71 truncated_timestamps_allowed_ = true;
72 return this;
73 }
74
75 Builder* disallow_truncated_timestamps() {
76 truncated_timestamps_allowed_ = false;
77 return this;
78 }
79
80 std::shared_ptr<ArrowWriterProperties> build() {
81 return std::shared_ptr<ArrowWriterProperties>(new ArrowWriterProperties(
82 write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_,
83 truncated_timestamps_allowed_));
84 }
85
86 private:
87 bool write_timestamps_as_int96_;
88
89 bool coerce_timestamps_enabled_;
90 ::arrow::TimeUnit::type coerce_timestamps_unit_;
91 bool truncated_timestamps_allowed_;
92 };
93
94 bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; }
95
96 bool coerce_timestamps_enabled() const { return coerce_timestamps_enabled_; }
97 ::arrow::TimeUnit::type coerce_timestamps_unit() const {
98 return coerce_timestamps_unit_;
99 }
100
101 bool truncated_timestamps_allowed() const { return truncated_timestamps_allowed_; }
102
103 private:
104 explicit ArrowWriterProperties(bool write_nanos_as_int96,
105 bool coerce_timestamps_enabled,
106 ::arrow::TimeUnit::type coerce_timestamps_unit,
107 bool truncated_timestamps_allowed)
108 : write_timestamps_as_int96_(write_nanos_as_int96),
109 coerce_timestamps_enabled_(coerce_timestamps_enabled),
110 coerce_timestamps_unit_(coerce_timestamps_unit),
111 truncated_timestamps_allowed_(truncated_timestamps_allowed) {}
112
113 const bool write_timestamps_as_int96_;
114 const bool coerce_timestamps_enabled_;
115 const ::arrow::TimeUnit::type coerce_timestamps_unit_;
116 const bool truncated_timestamps_allowed_;
117};
118
119std::shared_ptr<ArrowWriterProperties> PARQUET_EXPORT default_arrow_writer_properties();
120
121/**
122 * Iterative API:
123 * Start a new RowGroup/Chunk with NewRowGroup
124 * Write column-by-column the whole column chunk
125 */
126class PARQUET_EXPORT FileWriter {
127 public:
128 FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
129 const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
130 default_arrow_writer_properties());
131
132 static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
133 const std::shared_ptr<OutputStream>& sink,
134 const std::shared_ptr<WriterProperties>& properties,
135 std::unique_ptr<FileWriter>* writer);
136
137 static ::arrow::Status Open(
138 const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
139 const std::shared_ptr<OutputStream>& sink,
140 const std::shared_ptr<WriterProperties>& properties,
141 const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
142 std::unique_ptr<FileWriter>* writer);
143
144 static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
145 const std::shared_ptr<::arrow::io::OutputStream>& sink,
146 const std::shared_ptr<WriterProperties>& properties,
147 std::unique_ptr<FileWriter>* writer);
148
149 static ::arrow::Status Open(
150 const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
151 const std::shared_ptr<::arrow::io::OutputStream>& sink,
152 const std::shared_ptr<WriterProperties>& properties,
153 const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
154 std::unique_ptr<FileWriter>* writer);
155
156 /// \brief Write a Table to Parquet.
157 ::arrow::Status WriteTable(const ::arrow::Table& table, int64_t chunk_size);
158
159 ::arrow::Status NewRowGroup(int64_t chunk_size);
160 ::arrow::Status WriteColumnChunk(const ::arrow::Array& data);
161
162 /// \brief Write ColumnChunk in row group using slice of a ChunkedArray
163 ::arrow::Status WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data,
164 const int64_t offset, const int64_t size);
165 ::arrow::Status WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data);
166 ::arrow::Status Close();
167
168 virtual ~FileWriter();
169
170 ::arrow::MemoryPool* memory_pool() const;
171
172 private:
173 class PARQUET_NO_EXPORT Impl;
174 std::unique_ptr<Impl> impl_;
175};
176
177/// \brief Write Parquet file metadata only to indicated OutputStream
178PARQUET_EXPORT
179::arrow::Status WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink);
180
181/// \brief Write Parquet file metadata only to indicated Arrow OutputStream
182PARQUET_EXPORT
183::arrow::Status WriteFileMetaData(const FileMetaData& file_metadata,
184 const std::shared_ptr<::arrow::io::OutputStream>& sink);
185
186/**
187 * Write a Table to Parquet.
188 *
189 * The table shall only consist of columns of primitive type or of primitive lists.
190 */
191::arrow::Status PARQUET_EXPORT WriteTable(
192 const ::arrow::Table& table, ::arrow::MemoryPool* pool,
193 const std::shared_ptr<OutputStream>& sink, int64_t chunk_size,
194 const std::shared_ptr<WriterProperties>& properties = default_writer_properties(),
195 const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
196 default_arrow_writer_properties());
197
198::arrow::Status PARQUET_EXPORT WriteTable(
199 const ::arrow::Table& table, ::arrow::MemoryPool* pool,
200 const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size,
201 const std::shared_ptr<WriterProperties>& properties = default_writer_properties(),
202 const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
203 default_arrow_writer_properties());
204
205namespace internal {
206
207/**
208 * Timestamp conversion constants
209 */
210constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588);
211
212template <int64_t UnitPerDay, int64_t NanosecondsPerUnit>
213inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) {
214 int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays;
215 (*impala_timestamp).value[2] = (uint32_t)julian_days;
216
217 int64_t last_day_units = time % UnitPerDay;
218 int64_t* impala_last_day_nanos = reinterpret_cast<int64_t*>(impala_timestamp);
219 *impala_last_day_nanos = last_day_units * NanosecondsPerUnit;
220}
221
222constexpr int64_t kSecondsInNanos = INT64_C(1000000000);
223
224inline void SecondsToImpalaTimestamp(const int64_t seconds, Int96* impala_timestamp) {
225 ArrowTimestampToImpalaTimestamp<kSecondsPerDay, kSecondsInNanos>(seconds,
226 impala_timestamp);
227}
228
229constexpr int64_t kMillisecondsInNanos = kSecondsInNanos / INT64_C(1000);
230
231inline void MillisecondsToImpalaTimestamp(const int64_t milliseconds,
232 Int96* impala_timestamp) {
233 ArrowTimestampToImpalaTimestamp<kMillisecondsPerDay, kMillisecondsInNanos>(
234 milliseconds, impala_timestamp);
235}
236
237constexpr int64_t kMicrosecondsInNanos = kMillisecondsInNanos / INT64_C(1000);
238
239inline void MicrosecondsToImpalaTimestamp(const int64_t microseconds,
240 Int96* impala_timestamp) {
241 ArrowTimestampToImpalaTimestamp<kMicrosecondsPerDay, kMicrosecondsInNanos>(
242 microseconds, impala_timestamp);
243}
244
245constexpr int64_t kNanosecondsInNanos = INT64_C(1);
246
247inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds,
248 Int96* impala_timestamp) {
249 ArrowTimestampToImpalaTimestamp<kNanosecondsPerDay, kNanosecondsInNanos>(
250 nanoseconds, impala_timestamp);
251}
252
253} // namespace internal
254
255} // namespace arrow
256
257} // namespace parquet
258
259#endif // PARQUET_ARROW_WRITER_H
260