1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "parquet/file_writer.h"
19
20#include <utility>
21#include <vector>
22
23#include "parquet/column_writer.h"
24#include "parquet/schema-internal.h"
25#include "parquet/schema.h"
26#include "parquet/thrift.h"
27#include "parquet/util/memory.h"
28
29using arrow::MemoryPool;
30
31using parquet::schema::GroupNode;
32using parquet::schema::SchemaFlattener;
33
34namespace parquet {
35
36// FIXME: copied from reader-internal.cc
37static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
38
39// ----------------------------------------------------------------------
40// RowGroupWriter public API
41
42RowGroupWriter::RowGroupWriter(std::unique_ptr<Contents> contents)
43 : contents_(std::move(contents)) {}
44
45void RowGroupWriter::Close() {
46 if (contents_) {
47 contents_->Close();
48 }
49}
50
51ColumnWriter* RowGroupWriter::NextColumn() { return contents_->NextColumn(); }
52
53ColumnWriter* RowGroupWriter::column(int i) { return contents_->column(i); }
54
55int64_t RowGroupWriter::total_compressed_bytes() const {
56 return contents_->total_compressed_bytes();
57}
58
59int64_t RowGroupWriter::total_bytes_written() const {
60 return contents_->total_bytes_written();
61}
62
63int RowGroupWriter::current_column() { return contents_->current_column(); }
64
65int RowGroupWriter::num_columns() const { return contents_->num_columns(); }
66
67int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); }
68
69inline void ThrowRowsMisMatchError(int col, int64_t prev, int64_t curr) {
70 std::stringstream ss;
71 ss << "Column " << col << " had " << curr << " while previous column had " << prev;
72 throw ParquetException(ss.str());
73}
74
75// ----------------------------------------------------------------------
76// RowGroupSerializer
77
78// RowGroupWriter::Contents implementation for the Parquet file specification
79class RowGroupSerializer : public RowGroupWriter::Contents {
80 public:
81 RowGroupSerializer(OutputStream* sink, RowGroupMetaDataBuilder* metadata,
82 const WriterProperties* properties, bool buffered_row_group = false)
83 : sink_(sink),
84 metadata_(metadata),
85 properties_(properties),
86 total_bytes_written_(0),
87 closed_(false),
88 current_column_index_(0),
89 num_rows_(0),
90 buffered_row_group_(buffered_row_group) {
91 if (buffered_row_group) {
92 InitColumns();
93 } else {
94 column_writers_.push_back(nullptr);
95 }
96 }
97
98 int num_columns() const override { return metadata_->num_columns(); }
99
100 int64_t num_rows() const override {
101 CheckRowsWritten();
102 // CheckRowsWritten ensures num_rows_ is set correctly
103 return num_rows_;
104 }
105
106 ColumnWriter* NextColumn() override {
107 if (buffered_row_group_) {
108 throw ParquetException(
109 "NextColumn() is not supported when a RowGroup is written by size");
110 }
111
112 if (column_writers_[0]) {
113 CheckRowsWritten();
114 }
115
116 // Throws an error if more columns are being written
117 auto col_meta = metadata_->NextColumnChunk();
118
119 if (column_writers_[0]) {
120 total_bytes_written_ += column_writers_[0]->Close();
121 }
122
123 ++current_column_index_;
124
125 const ColumnDescriptor* column_descr = col_meta->descr();
126 std::unique_ptr<PageWriter> pager =
127 PageWriter::Open(sink_, properties_->compression(column_descr->path()), col_meta,
128 properties_->memory_pool());
129 column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager), properties_);
130 return column_writers_[0].get();
131 }
132
133 ColumnWriter* column(int i) override {
134 if (!buffered_row_group_) {
135 throw ParquetException(
136 "column() is only supported when a BufferedRowGroup is being written");
137 }
138
139 if (i >= 0 && i < static_cast<int>(column_writers_.size())) {
140 return column_writers_[i].get();
141 }
142 return nullptr;
143 }
144
145 int current_column() const override { return metadata_->current_column(); }
146
147 int64_t total_compressed_bytes() const override {
148 int64_t total_compressed_bytes = 0;
149 for (size_t i = 0; i < column_writers_.size(); i++) {
150 if (column_writers_[i]) {
151 total_compressed_bytes += column_writers_[i]->total_compressed_bytes();
152 }
153 }
154 return total_compressed_bytes;
155 }
156
157 int64_t total_bytes_written() const override {
158 int64_t total_bytes_written = 0;
159 for (size_t i = 0; i < column_writers_.size(); i++) {
160 if (column_writers_[i]) {
161 total_bytes_written += column_writers_[i]->total_bytes_written();
162 }
163 }
164 return total_bytes_written;
165 }
166
167 void Close() override {
168 if (!closed_) {
169 closed_ = true;
170 CheckRowsWritten();
171
172 for (size_t i = 0; i < column_writers_.size(); i++) {
173 if (column_writers_[i]) {
174 total_bytes_written_ += column_writers_[i]->Close();
175 column_writers_[i].reset();
176 }
177 }
178
179 column_writers_.clear();
180
181 // Ensures all columns have been written
182 metadata_->set_num_rows(num_rows_);
183 metadata_->Finish(total_bytes_written_);
184 }
185 }
186
187 private:
188 OutputStream* sink_;
189 mutable RowGroupMetaDataBuilder* metadata_;
190 const WriterProperties* properties_;
191 int64_t total_bytes_written_;
192 bool closed_;
193 int current_column_index_;
194 mutable int64_t num_rows_;
195 bool buffered_row_group_;
196
197 void CheckRowsWritten() const {
198 // verify when only one column is written at a time
199 if (!buffered_row_group_ && column_writers_.size() > 0 && column_writers_[0]) {
200 int64_t current_col_rows = column_writers_[0]->rows_written();
201 if (num_rows_ == 0) {
202 num_rows_ = current_col_rows;
203 } else if (num_rows_ != current_col_rows) {
204 ThrowRowsMisMatchError(current_column_index_, current_col_rows, num_rows_);
205 }
206 } else if (buffered_row_group_ &&
207 column_writers_.size() > 0) { // when buffered_row_group = true
208 int64_t current_col_rows = column_writers_[0]->rows_written();
209 for (int i = 1; i < static_cast<int>(column_writers_.size()); i++) {
210 int64_t current_col_rows_i = column_writers_[i]->rows_written();
211 if (current_col_rows != current_col_rows_i) {
212 ThrowRowsMisMatchError(i, current_col_rows_i, current_col_rows);
213 }
214 }
215 num_rows_ = current_col_rows;
216 }
217 }
218
219 void InitColumns() {
220 for (int i = 0; i < num_columns(); i++) {
221 auto col_meta = metadata_->NextColumnChunk();
222 const ColumnDescriptor* column_descr = col_meta->descr();
223 std::unique_ptr<PageWriter> pager =
224 PageWriter::Open(sink_, properties_->compression(column_descr->path()),
225 col_meta, properties_->memory_pool(), buffered_row_group_);
226 column_writers_.push_back(
227 ColumnWriter::Make(col_meta, std::move(pager), properties_));
228 }
229 }
230
231 std::vector<std::shared_ptr<ColumnWriter>> column_writers_;
232};
233
234// ----------------------------------------------------------------------
235// FileSerializer
236
237// An implementation of ParquetFileWriter::Contents that deals with the Parquet
238// file structure, Thrift serialization, and other internal matters
239
240class FileSerializer : public ParquetFileWriter::Contents {
241 public:
242 static std::unique_ptr<ParquetFileWriter::Contents> Open(
243 const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<GroupNode>& schema,
244 const std::shared_ptr<WriterProperties>& properties,
245 const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
246 std::unique_ptr<ParquetFileWriter::Contents> result(
247 new FileSerializer(sink, schema, properties, key_value_metadata));
248
249 return result;
250 }
251
252 void Close() override {
253 if (is_open_) {
254 if (row_group_writer_) {
255 num_rows_ += row_group_writer_->num_rows();
256 row_group_writer_->Close();
257 }
258 row_group_writer_.reset();
259
260 // Write magic bytes and metadata
261 auto metadata = metadata_->Finish();
262 WriteFileMetaData(*metadata, sink_.get());
263
264 sink_->Close();
265 is_open_ = false;
266 }
267 }
268
269 int num_columns() const override { return schema_.num_columns(); }
270
271 int num_row_groups() const override { return num_row_groups_; }
272
273 int64_t num_rows() const override { return num_rows_; }
274
275 const std::shared_ptr<WriterProperties>& properties() const override {
276 return properties_;
277 }
278
279 RowGroupWriter* AppendRowGroup(bool buffered_row_group) {
280 if (row_group_writer_) {
281 row_group_writer_->Close();
282 }
283 num_row_groups_++;
284 auto rg_metadata = metadata_->AppendRowGroup();
285 std::unique_ptr<RowGroupWriter::Contents> contents(new RowGroupSerializer(
286 sink_.get(), rg_metadata, properties_.get(), buffered_row_group));
287 row_group_writer_.reset(new RowGroupWriter(std::move(contents)));
288 return row_group_writer_.get();
289 }
290
291 RowGroupWriter* AppendRowGroup() override { return AppendRowGroup(false); }
292
293 RowGroupWriter* AppendBufferedRowGroup() override { return AppendRowGroup(true); }
294
295 ~FileSerializer() override {
296 try {
297 Close();
298 } catch (...) {
299 }
300 }
301
302 private:
303 FileSerializer(const std::shared_ptr<OutputStream>& sink,
304 const std::shared_ptr<GroupNode>& schema,
305 const std::shared_ptr<WriterProperties>& properties,
306 const std::shared_ptr<const KeyValueMetadata>& key_value_metadata)
307 : ParquetFileWriter::Contents(schema, key_value_metadata),
308 sink_(sink),
309 is_open_(true),
310 properties_(properties),
311 num_row_groups_(0),
312 num_rows_(0),
313 metadata_(FileMetaDataBuilder::Make(&schema_, properties, key_value_metadata)) {
314 StartFile();
315 }
316
317 std::shared_ptr<OutputStream> sink_;
318 bool is_open_;
319 const std::shared_ptr<WriterProperties> properties_;
320 int num_row_groups_;
321 int64_t num_rows_;
322 std::unique_ptr<FileMetaDataBuilder> metadata_;
323 // Only one of the row group writers is active at a time
324 std::unique_ptr<RowGroupWriter> row_group_writer_;
325
326 void StartFile() {
327 // Parquet files always start with PAR1
328 sink_->Write(PARQUET_MAGIC, 4);
329 }
330};
331
332// ----------------------------------------------------------------------
333// ParquetFileWriter public API
334
335ParquetFileWriter::ParquetFileWriter() {}
336
337ParquetFileWriter::~ParquetFileWriter() {
338 try {
339 Close();
340 } catch (...) {
341 }
342}
343
344std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
345 const std::shared_ptr<::arrow::io::OutputStream>& sink,
346 const std::shared_ptr<GroupNode>& schema,
347 const std::shared_ptr<WriterProperties>& properties,
348 const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
349 return Open(std::make_shared<ArrowOutputStream>(sink), schema, properties,
350 key_value_metadata);
351}
352
353std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
354 const std::shared_ptr<OutputStream>& sink,
355 const std::shared_ptr<schema::GroupNode>& schema,
356 const std::shared_ptr<WriterProperties>& properties,
357 const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
358 auto contents = FileSerializer::Open(sink, schema, properties, key_value_metadata);
359 std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter());
360 result->Open(std::move(contents));
361 return result;
362}
363
364void WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink) {
365 // Write MetaData
366 uint32_t metadata_len = static_cast<uint32_t>(sink->Tell());
367
368 file_metadata.WriteTo(sink);
369 metadata_len = static_cast<uint32_t>(sink->Tell()) - metadata_len;
370
371 // Write Footer
372 sink->Write(reinterpret_cast<uint8_t*>(&metadata_len), 4);
373 sink->Write(PARQUET_MAGIC, 4);
374}
375
376const SchemaDescriptor* ParquetFileWriter::schema() const { return contents_->schema(); }
377
378const ColumnDescriptor* ParquetFileWriter::descr(int i) const {
379 return contents_->schema()->Column(i);
380}
381
382int ParquetFileWriter::num_columns() const { return contents_->num_columns(); }
383
384int64_t ParquetFileWriter::num_rows() const { return contents_->num_rows(); }
385
386int ParquetFileWriter::num_row_groups() const { return contents_->num_row_groups(); }
387
388const std::shared_ptr<const KeyValueMetadata>& ParquetFileWriter::key_value_metadata()
389 const {
390 return contents_->key_value_metadata();
391}
392
393void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> contents) {
394 contents_ = std::move(contents);
395}
396
397void ParquetFileWriter::Close() {
398 if (contents_) {
399 contents_->Close();
400 contents_.reset();
401 }
402}
403
404RowGroupWriter* ParquetFileWriter::AppendRowGroup() {
405 return contents_->AppendRowGroup();
406}
407
408RowGroupWriter* ParquetFileWriter::AppendBufferedRowGroup() {
409 return contents_->AppendBufferedRowGroup();
410}
411
412RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
413 return AppendRowGroup();
414}
415
416const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const {
417 return contents_->properties();
418}
419
420} // namespace parquet
421