1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "parquet/file_writer.h"
19
20#include <cstddef>
21#include <ostream>
22#include <utility>
23#include <vector>
24
25#include "parquet/column_writer.h"
26#include "parquet/deprecated_io.h"
27#include "parquet/exception.h"
28#include "parquet/platform.h"
29#include "parquet/schema.h"
30#include "parquet/types.h"
31
32using arrow::MemoryPool;
33
34using parquet::schema::GroupNode;
35
36namespace parquet {
37
38// FIXME: copied from reader-internal.cc
39static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
40
41// ----------------------------------------------------------------------
42// RowGroupWriter public API
43
44RowGroupWriter::RowGroupWriter(std::unique_ptr<Contents> contents)
45 : contents_(std::move(contents)) {}
46
47void RowGroupWriter::Close() {
48 if (contents_) {
49 contents_->Close();
50 }
51}
52
53ColumnWriter* RowGroupWriter::NextColumn() { return contents_->NextColumn(); }
54
55ColumnWriter* RowGroupWriter::column(int i) { return contents_->column(i); }
56
57int64_t RowGroupWriter::total_compressed_bytes() const {
58 return contents_->total_compressed_bytes();
59}
60
61int64_t RowGroupWriter::total_bytes_written() const {
62 return contents_->total_bytes_written();
63}
64
65int RowGroupWriter::current_column() { return contents_->current_column(); }
66
67int RowGroupWriter::num_columns() const { return contents_->num_columns(); }
68
69int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); }
70
71inline void ThrowRowsMisMatchError(int col, int64_t prev, int64_t curr) {
72 std::stringstream ss;
73 ss << "Column " << col << " had " << curr << " while previous column had " << prev;
74 throw ParquetException(ss.str());
75}
76
77// ----------------------------------------------------------------------
78// RowGroupSerializer
79
80// RowGroupWriter::Contents implementation for the Parquet file specification
81class RowGroupSerializer : public RowGroupWriter::Contents {
82 public:
83 RowGroupSerializer(const std::shared_ptr<ArrowOutputStream>& sink,
84 RowGroupMetaDataBuilder* metadata,
85 const WriterProperties* properties, bool buffered_row_group = false)
86 : sink_(sink),
87 metadata_(metadata),
88 properties_(properties),
89 total_bytes_written_(0),
90 closed_(false),
91 next_column_index_(0),
92 num_rows_(0),
93 buffered_row_group_(buffered_row_group) {
94 if (buffered_row_group) {
95 InitColumns();
96 } else {
97 column_writers_.push_back(nullptr);
98 }
99 }
100
101 int num_columns() const override { return metadata_->num_columns(); }
102
103 int64_t num_rows() const override {
104 CheckRowsWritten();
105 // CheckRowsWritten ensures num_rows_ is set correctly
106 return num_rows_;
107 }
108
109 ColumnWriter* NextColumn() override {
110 if (buffered_row_group_) {
111 throw ParquetException(
112 "NextColumn() is not supported when a RowGroup is written by size");
113 }
114
115 if (column_writers_[0]) {
116 CheckRowsWritten();
117 }
118
119 // Throws an error if more columns are being written
120 auto col_meta = metadata_->NextColumnChunk();
121
122 if (column_writers_[0]) {
123 total_bytes_written_ += column_writers_[0]->Close();
124 }
125
126 ++next_column_index_;
127
128 const auto& path = col_meta->descr()->path();
129 std::unique_ptr<PageWriter> pager = PageWriter::Open(
130 sink_, properties_->compression(path), properties_->compression_level(path),
131 col_meta, properties_->memory_pool());
132 column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager), properties_);
133 return column_writers_[0].get();
134 }
135
136 ColumnWriter* column(int i) override {
137 if (!buffered_row_group_) {
138 throw ParquetException(
139 "column() is only supported when a BufferedRowGroup is being written");
140 }
141
142 if (i >= 0 && i < static_cast<int>(column_writers_.size())) {
143 return column_writers_[i].get();
144 }
145 return nullptr;
146 }
147
148 int current_column() const override { return metadata_->current_column(); }
149
150 int64_t total_compressed_bytes() const override {
151 int64_t total_compressed_bytes = 0;
152 for (size_t i = 0; i < column_writers_.size(); i++) {
153 if (column_writers_[i]) {
154 total_compressed_bytes += column_writers_[i]->total_compressed_bytes();
155 }
156 }
157 return total_compressed_bytes;
158 }
159
160 int64_t total_bytes_written() const override {
161 int64_t total_bytes_written = 0;
162 for (size_t i = 0; i < column_writers_.size(); i++) {
163 if (column_writers_[i]) {
164 total_bytes_written += column_writers_[i]->total_bytes_written();
165 }
166 }
167 return total_bytes_written;
168 }
169
170 void Close() override {
171 if (!closed_) {
172 closed_ = true;
173 CheckRowsWritten();
174
175 for (size_t i = 0; i < column_writers_.size(); i++) {
176 if (column_writers_[i]) {
177 total_bytes_written_ += column_writers_[i]->Close();
178 column_writers_[i].reset();
179 }
180 }
181
182 column_writers_.clear();
183
184 // Ensures all columns have been written
185 metadata_->set_num_rows(num_rows_);
186 metadata_->Finish(total_bytes_written_);
187 }
188 }
189
190 private:
191 std::shared_ptr<ArrowOutputStream> sink_;
192 mutable RowGroupMetaDataBuilder* metadata_;
193 const WriterProperties* properties_;
194 int64_t total_bytes_written_;
195 bool closed_;
196 int next_column_index_;
197 mutable int64_t num_rows_;
198 bool buffered_row_group_;
199
200 void CheckRowsWritten() const {
201 // verify when only one column is written at a time
202 if (!buffered_row_group_ && column_writers_.size() > 0 && column_writers_[0]) {
203 int64_t current_col_rows = column_writers_[0]->rows_written();
204 if (num_rows_ == 0) {
205 num_rows_ = current_col_rows;
206 } else if (num_rows_ != current_col_rows) {
207 ThrowRowsMisMatchError(next_column_index_, current_col_rows, num_rows_);
208 }
209 } else if (buffered_row_group_ &&
210 column_writers_.size() > 0) { // when buffered_row_group = true
211 int64_t current_col_rows = column_writers_[0]->rows_written();
212 for (int i = 1; i < static_cast<int>(column_writers_.size()); i++) {
213 int64_t current_col_rows_i = column_writers_[i]->rows_written();
214 if (current_col_rows != current_col_rows_i) {
215 ThrowRowsMisMatchError(i, current_col_rows_i, current_col_rows);
216 }
217 }
218 num_rows_ = current_col_rows;
219 }
220 }
221
222 void InitColumns() {
223 for (int i = 0; i < num_columns(); i++) {
224 auto col_meta = metadata_->NextColumnChunk();
225 const auto& path = col_meta->descr()->path();
226 std::unique_ptr<PageWriter> pager = PageWriter::Open(
227 sink_, properties_->compression(path), properties_->compression_level(path),
228 col_meta, properties_->memory_pool(), buffered_row_group_);
229 column_writers_.push_back(
230 ColumnWriter::Make(col_meta, std::move(pager), properties_));
231 }
232 }
233
234 std::vector<std::shared_ptr<ColumnWriter>> column_writers_;
235};
236
237// ----------------------------------------------------------------------
238// FileSerializer
239
240// An implementation of ParquetFileWriter::Contents that deals with the Parquet
241// file structure, Thrift serialization, and other internal matters
242
243class FileSerializer : public ParquetFileWriter::Contents {
244 public:
245 static std::unique_ptr<ParquetFileWriter::Contents> Open(
246 const std::shared_ptr<ArrowOutputStream>& sink,
247 const std::shared_ptr<GroupNode>& schema,
248 const std::shared_ptr<WriterProperties>& properties,
249 const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
250 std::unique_ptr<ParquetFileWriter::Contents> result(
251 new FileSerializer(sink, schema, properties, key_value_metadata));
252
253 return result;
254 }
255
256 void Close() override {
257 if (is_open_) {
258 // If any functions here raise an exception, we set is_open_ to be false
259 // so that this does not get called again (possibly causing segfault)
260 is_open_ = false;
261 if (row_group_writer_) {
262 num_rows_ += row_group_writer_->num_rows();
263 row_group_writer_->Close();
264 }
265 row_group_writer_.reset();
266
267 // Write magic bytes and metadata
268 file_metadata_ = metadata_->Finish();
269 WriteFileMetaData(*file_metadata_, sink_.get());
270 }
271 }
272
273 int num_columns() const override { return schema_.num_columns(); }
274
275 int num_row_groups() const override { return num_row_groups_; }
276
277 int64_t num_rows() const override { return num_rows_; }
278
279 const std::shared_ptr<WriterProperties>& properties() const override {
280 return properties_;
281 }
282
283 RowGroupWriter* AppendRowGroup(bool buffered_row_group) {
284 if (row_group_writer_) {
285 row_group_writer_->Close();
286 }
287 num_row_groups_++;
288 auto rg_metadata = metadata_->AppendRowGroup();
289 std::unique_ptr<RowGroupWriter::Contents> contents(new RowGroupSerializer(
290 sink_, rg_metadata, properties_.get(), buffered_row_group));
291 row_group_writer_.reset(new RowGroupWriter(std::move(contents)));
292 return row_group_writer_.get();
293 }
294
295 RowGroupWriter* AppendRowGroup() override { return AppendRowGroup(false); }
296
297 RowGroupWriter* AppendBufferedRowGroup() override { return AppendRowGroup(true); }
298
299 ~FileSerializer() override {
300 try {
301 Close();
302 } catch (...) {
303 }
304 }
305
306 private:
307 FileSerializer(const std::shared_ptr<ArrowOutputStream>& sink,
308 const std::shared_ptr<GroupNode>& schema,
309 const std::shared_ptr<WriterProperties>& properties,
310 const std::shared_ptr<const KeyValueMetadata>& key_value_metadata)
311 : ParquetFileWriter::Contents(schema, key_value_metadata),
312 sink_(sink),
313 is_open_(true),
314 properties_(properties),
315 num_row_groups_(0),
316 num_rows_(0),
317 metadata_(FileMetaDataBuilder::Make(&schema_, properties, key_value_metadata)) {
318 StartFile();
319 }
320
321 std::shared_ptr<ArrowOutputStream> sink_;
322 bool is_open_;
323 const std::shared_ptr<WriterProperties> properties_;
324 int num_row_groups_;
325 int64_t num_rows_;
326 std::unique_ptr<FileMetaDataBuilder> metadata_;
327 // Only one of the row group writers is active at a time
328 std::unique_ptr<RowGroupWriter> row_group_writer_;
329
330 void StartFile() {
331 // Parquet files always start with PAR1
332 PARQUET_THROW_NOT_OK(sink_->Write(PARQUET_MAGIC, 4));
333 }
334};
335
336// ----------------------------------------------------------------------
337// ParquetFileWriter public API
338
339ParquetFileWriter::ParquetFileWriter() {}
340
341ParquetFileWriter::~ParquetFileWriter() {
342 try {
343 Close();
344 } catch (...) {
345 }
346}
347
348std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
349 const std::shared_ptr<::arrow::io::OutputStream>& sink,
350 const std::shared_ptr<GroupNode>& schema,
351 const std::shared_ptr<WriterProperties>& properties,
352 const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
353 auto contents = FileSerializer::Open(sink, schema, properties, key_value_metadata);
354 std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter());
355 result->Open(std::move(contents));
356 return result;
357}
358
359std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
360 const std::shared_ptr<OutputStream>& sink,
361 const std::shared_ptr<schema::GroupNode>& schema,
362 const std::shared_ptr<WriterProperties>& properties,
363 const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
364 return Open(std::make_shared<ParquetOutputWrapper>(sink), schema, properties,
365 key_value_metadata);
366}
367
368void WriteFileMetaData(const FileMetaData& file_metadata, ArrowOutputStream* sink) {
369 int64_t position = -1;
370 PARQUET_THROW_NOT_OK(sink->Tell(&position));
371
372 // Write MetaData
373 uint32_t metadata_len = static_cast<uint32_t>(position);
374
375 file_metadata.WriteTo(sink);
376 PARQUET_THROW_NOT_OK(sink->Tell(&position));
377 metadata_len = static_cast<uint32_t>(position) - metadata_len;
378
379 // Write Footer
380 PARQUET_THROW_NOT_OK(sink->Write(reinterpret_cast<uint8_t*>(&metadata_len), 4));
381 PARQUET_THROW_NOT_OK(sink->Write(PARQUET_MAGIC, 4));
382}
383
384void WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink) {
385 ParquetOutputWrapper wrapper(sink);
386 return WriteFileMetaData(file_metadata, &wrapper);
387}
388
389void WriteMetaDataFile(const FileMetaData& file_metadata, ArrowOutputStream* sink) {
390 PARQUET_THROW_NOT_OK(sink->Write(PARQUET_MAGIC, 4));
391 return WriteFileMetaData(file_metadata, sink);
392}
393
394const SchemaDescriptor* ParquetFileWriter::schema() const { return contents_->schema(); }
395
396const ColumnDescriptor* ParquetFileWriter::descr(int i) const {
397 return contents_->schema()->Column(i);
398}
399
400int ParquetFileWriter::num_columns() const { return contents_->num_columns(); }
401
402int64_t ParquetFileWriter::num_rows() const { return contents_->num_rows(); }
403
404int ParquetFileWriter::num_row_groups() const { return contents_->num_row_groups(); }
405
406const std::shared_ptr<const KeyValueMetadata>& ParquetFileWriter::key_value_metadata()
407 const {
408 return contents_->key_value_metadata();
409}
410
411const std::shared_ptr<FileMetaData> ParquetFileWriter::metadata() const {
412 return file_metadata_;
413}
414
415void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> contents) {
416 contents_ = std::move(contents);
417}
418
419void ParquetFileWriter::Close() {
420 if (contents_) {
421 contents_->Close();
422 file_metadata_ = contents_->metadata();
423 contents_.reset();
424 }
425}
426
427RowGroupWriter* ParquetFileWriter::AppendRowGroup() {
428 return contents_->AppendRowGroup();
429}
430
431RowGroupWriter* ParquetFileWriter::AppendBufferedRowGroup() {
432 return contents_->AppendBufferedRowGroup();
433}
434
435RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
436 return AppendRowGroup();
437}
438
439const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const {
440 return contents_->properties();
441}
442
443} // namespace parquet
444