1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "parquet/file_writer.h" |
19 | |
20 | #include <cstddef> |
21 | #include <ostream> |
22 | #include <utility> |
23 | #include <vector> |
24 | |
25 | #include "parquet/column_writer.h" |
26 | #include "parquet/deprecated_io.h" |
27 | #include "parquet/exception.h" |
28 | #include "parquet/platform.h" |
29 | #include "parquet/schema.h" |
30 | #include "parquet/types.h" |
31 | |
32 | using arrow::MemoryPool; |
33 | |
34 | using parquet::schema::GroupNode; |
35 | |
36 | namespace parquet { |
37 | |
38 | // FIXME: copied from reader-internal.cc |
39 | static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; |
40 | |
41 | // ---------------------------------------------------------------------- |
42 | // RowGroupWriter public API |
43 | |
44 | RowGroupWriter::RowGroupWriter(std::unique_ptr<Contents> contents) |
45 | : contents_(std::move(contents)) {} |
46 | |
47 | void RowGroupWriter::Close() { |
48 | if (contents_) { |
49 | contents_->Close(); |
50 | } |
51 | } |
52 | |
53 | ColumnWriter* RowGroupWriter::NextColumn() { return contents_->NextColumn(); } |
54 | |
55 | ColumnWriter* RowGroupWriter::column(int i) { return contents_->column(i); } |
56 | |
57 | int64_t RowGroupWriter::total_compressed_bytes() const { |
58 | return contents_->total_compressed_bytes(); |
59 | } |
60 | |
61 | int64_t RowGroupWriter::total_bytes_written() const { |
62 | return contents_->total_bytes_written(); |
63 | } |
64 | |
65 | int RowGroupWriter::current_column() { return contents_->current_column(); } |
66 | |
67 | int RowGroupWriter::num_columns() const { return contents_->num_columns(); } |
68 | |
69 | int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); } |
70 | |
71 | inline void ThrowRowsMisMatchError(int col, int64_t prev, int64_t curr) { |
72 | std::stringstream ss; |
73 | ss << "Column " << col << " had " << curr << " while previous column had " << prev; |
74 | throw ParquetException(ss.str()); |
75 | } |
76 | |
77 | // ---------------------------------------------------------------------- |
78 | // RowGroupSerializer |
79 | |
80 | // RowGroupWriter::Contents implementation for the Parquet file specification |
81 | class RowGroupSerializer : public RowGroupWriter::Contents { |
82 | public: |
83 | RowGroupSerializer(const std::shared_ptr<ArrowOutputStream>& sink, |
84 | RowGroupMetaDataBuilder* metadata, |
85 | const WriterProperties* properties, bool buffered_row_group = false) |
86 | : sink_(sink), |
87 | metadata_(metadata), |
88 | properties_(properties), |
89 | total_bytes_written_(0), |
90 | closed_(false), |
91 | next_column_index_(0), |
92 | num_rows_(0), |
93 | buffered_row_group_(buffered_row_group) { |
94 | if (buffered_row_group) { |
95 | InitColumns(); |
96 | } else { |
97 | column_writers_.push_back(nullptr); |
98 | } |
99 | } |
100 | |
101 | int num_columns() const override { return metadata_->num_columns(); } |
102 | |
103 | int64_t num_rows() const override { |
104 | CheckRowsWritten(); |
105 | // CheckRowsWritten ensures num_rows_ is set correctly |
106 | return num_rows_; |
107 | } |
108 | |
109 | ColumnWriter* NextColumn() override { |
110 | if (buffered_row_group_) { |
111 | throw ParquetException( |
112 | "NextColumn() is not supported when a RowGroup is written by size" ); |
113 | } |
114 | |
115 | if (column_writers_[0]) { |
116 | CheckRowsWritten(); |
117 | } |
118 | |
119 | // Throws an error if more columns are being written |
120 | auto col_meta = metadata_->NextColumnChunk(); |
121 | |
122 | if (column_writers_[0]) { |
123 | total_bytes_written_ += column_writers_[0]->Close(); |
124 | } |
125 | |
126 | ++next_column_index_; |
127 | |
128 | const auto& path = col_meta->descr()->path(); |
129 | std::unique_ptr<PageWriter> = PageWriter::Open( |
130 | sink_, properties_->compression(path), properties_->compression_level(path), |
131 | col_meta, properties_->memory_pool()); |
132 | column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager), properties_); |
133 | return column_writers_[0].get(); |
134 | } |
135 | |
136 | ColumnWriter* column(int i) override { |
137 | if (!buffered_row_group_) { |
138 | throw ParquetException( |
139 | "column() is only supported when a BufferedRowGroup is being written" ); |
140 | } |
141 | |
142 | if (i >= 0 && i < static_cast<int>(column_writers_.size())) { |
143 | return column_writers_[i].get(); |
144 | } |
145 | return nullptr; |
146 | } |
147 | |
148 | int current_column() const override { return metadata_->current_column(); } |
149 | |
150 | int64_t total_compressed_bytes() const override { |
151 | int64_t total_compressed_bytes = 0; |
152 | for (size_t i = 0; i < column_writers_.size(); i++) { |
153 | if (column_writers_[i]) { |
154 | total_compressed_bytes += column_writers_[i]->total_compressed_bytes(); |
155 | } |
156 | } |
157 | return total_compressed_bytes; |
158 | } |
159 | |
160 | int64_t total_bytes_written() const override { |
161 | int64_t total_bytes_written = 0; |
162 | for (size_t i = 0; i < column_writers_.size(); i++) { |
163 | if (column_writers_[i]) { |
164 | total_bytes_written += column_writers_[i]->total_bytes_written(); |
165 | } |
166 | } |
167 | return total_bytes_written; |
168 | } |
169 | |
170 | void Close() override { |
171 | if (!closed_) { |
172 | closed_ = true; |
173 | CheckRowsWritten(); |
174 | |
175 | for (size_t i = 0; i < column_writers_.size(); i++) { |
176 | if (column_writers_[i]) { |
177 | total_bytes_written_ += column_writers_[i]->Close(); |
178 | column_writers_[i].reset(); |
179 | } |
180 | } |
181 | |
182 | column_writers_.clear(); |
183 | |
184 | // Ensures all columns have been written |
185 | metadata_->set_num_rows(num_rows_); |
186 | metadata_->Finish(total_bytes_written_); |
187 | } |
188 | } |
189 | |
190 | private: |
191 | std::shared_ptr<ArrowOutputStream> sink_; |
192 | mutable RowGroupMetaDataBuilder* metadata_; |
193 | const WriterProperties* properties_; |
194 | int64_t total_bytes_written_; |
195 | bool closed_; |
196 | int next_column_index_; |
197 | mutable int64_t num_rows_; |
198 | bool buffered_row_group_; |
199 | |
200 | void CheckRowsWritten() const { |
201 | // verify when only one column is written at a time |
202 | if (!buffered_row_group_ && column_writers_.size() > 0 && column_writers_[0]) { |
203 | int64_t current_col_rows = column_writers_[0]->rows_written(); |
204 | if (num_rows_ == 0) { |
205 | num_rows_ = current_col_rows; |
206 | } else if (num_rows_ != current_col_rows) { |
207 | ThrowRowsMisMatchError(next_column_index_, current_col_rows, num_rows_); |
208 | } |
209 | } else if (buffered_row_group_ && |
210 | column_writers_.size() > 0) { // when buffered_row_group = true |
211 | int64_t current_col_rows = column_writers_[0]->rows_written(); |
212 | for (int i = 1; i < static_cast<int>(column_writers_.size()); i++) { |
213 | int64_t current_col_rows_i = column_writers_[i]->rows_written(); |
214 | if (current_col_rows != current_col_rows_i) { |
215 | ThrowRowsMisMatchError(i, current_col_rows_i, current_col_rows); |
216 | } |
217 | } |
218 | num_rows_ = current_col_rows; |
219 | } |
220 | } |
221 | |
222 | void InitColumns() { |
223 | for (int i = 0; i < num_columns(); i++) { |
224 | auto col_meta = metadata_->NextColumnChunk(); |
225 | const auto& path = col_meta->descr()->path(); |
226 | std::unique_ptr<PageWriter> = PageWriter::Open( |
227 | sink_, properties_->compression(path), properties_->compression_level(path), |
228 | col_meta, properties_->memory_pool(), buffered_row_group_); |
229 | column_writers_.push_back( |
230 | ColumnWriter::Make(col_meta, std::move(pager), properties_)); |
231 | } |
232 | } |
233 | |
234 | std::vector<std::shared_ptr<ColumnWriter>> column_writers_; |
235 | }; |
236 | |
237 | // ---------------------------------------------------------------------- |
238 | // FileSerializer |
239 | |
240 | // An implementation of ParquetFileWriter::Contents that deals with the Parquet |
241 | // file structure, Thrift serialization, and other internal matters |
242 | |
243 | class FileSerializer : public ParquetFileWriter::Contents { |
244 | public: |
245 | static std::unique_ptr<ParquetFileWriter::Contents> Open( |
246 | const std::shared_ptr<ArrowOutputStream>& sink, |
247 | const std::shared_ptr<GroupNode>& schema, |
248 | const std::shared_ptr<WriterProperties>& properties, |
249 | const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) { |
250 | std::unique_ptr<ParquetFileWriter::Contents> result( |
251 | new FileSerializer(sink, schema, properties, key_value_metadata)); |
252 | |
253 | return result; |
254 | } |
255 | |
256 | void Close() override { |
257 | if (is_open_) { |
258 | // If any functions here raise an exception, we set is_open_ to be false |
259 | // so that this does not get called again (possibly causing segfault) |
260 | is_open_ = false; |
261 | if (row_group_writer_) { |
262 | num_rows_ += row_group_writer_->num_rows(); |
263 | row_group_writer_->Close(); |
264 | } |
265 | row_group_writer_.reset(); |
266 | |
267 | // Write magic bytes and metadata |
268 | file_metadata_ = metadata_->Finish(); |
269 | WriteFileMetaData(*file_metadata_, sink_.get()); |
270 | } |
271 | } |
272 | |
273 | int num_columns() const override { return schema_.num_columns(); } |
274 | |
275 | int num_row_groups() const override { return num_row_groups_; } |
276 | |
277 | int64_t num_rows() const override { return num_rows_; } |
278 | |
279 | const std::shared_ptr<WriterProperties>& properties() const override { |
280 | return properties_; |
281 | } |
282 | |
283 | RowGroupWriter* AppendRowGroup(bool buffered_row_group) { |
284 | if (row_group_writer_) { |
285 | row_group_writer_->Close(); |
286 | } |
287 | num_row_groups_++; |
288 | auto rg_metadata = metadata_->AppendRowGroup(); |
289 | std::unique_ptr<RowGroupWriter::Contents> contents(new RowGroupSerializer( |
290 | sink_, rg_metadata, properties_.get(), buffered_row_group)); |
291 | row_group_writer_.reset(new RowGroupWriter(std::move(contents))); |
292 | return row_group_writer_.get(); |
293 | } |
294 | |
295 | RowGroupWriter* AppendRowGroup() override { return AppendRowGroup(false); } |
296 | |
297 | RowGroupWriter* AppendBufferedRowGroup() override { return AppendRowGroup(true); } |
298 | |
299 | ~FileSerializer() override { |
300 | try { |
301 | Close(); |
302 | } catch (...) { |
303 | } |
304 | } |
305 | |
306 | private: |
307 | FileSerializer(const std::shared_ptr<ArrowOutputStream>& sink, |
308 | const std::shared_ptr<GroupNode>& schema, |
309 | const std::shared_ptr<WriterProperties>& properties, |
310 | const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) |
311 | : ParquetFileWriter::Contents(schema, key_value_metadata), |
312 | sink_(sink), |
313 | is_open_(true), |
314 | properties_(properties), |
315 | num_row_groups_(0), |
316 | num_rows_(0), |
317 | metadata_(FileMetaDataBuilder::Make(&schema_, properties, key_value_metadata)) { |
318 | StartFile(); |
319 | } |
320 | |
321 | std::shared_ptr<ArrowOutputStream> sink_; |
322 | bool is_open_; |
323 | const std::shared_ptr<WriterProperties> properties_; |
324 | int num_row_groups_; |
325 | int64_t num_rows_; |
326 | std::unique_ptr<FileMetaDataBuilder> metadata_; |
327 | // Only one of the row group writers is active at a time |
328 | std::unique_ptr<RowGroupWriter> row_group_writer_; |
329 | |
330 | void StartFile() { |
331 | // Parquet files always start with PAR1 |
332 | PARQUET_THROW_NOT_OK(sink_->Write(PARQUET_MAGIC, 4)); |
333 | } |
334 | }; |
335 | |
336 | // ---------------------------------------------------------------------- |
337 | // ParquetFileWriter public API |
338 | |
339 | ParquetFileWriter::ParquetFileWriter() {} |
340 | |
341 | ParquetFileWriter::~ParquetFileWriter() { |
342 | try { |
343 | Close(); |
344 | } catch (...) { |
345 | } |
346 | } |
347 | |
348 | std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open( |
349 | const std::shared_ptr<::arrow::io::OutputStream>& sink, |
350 | const std::shared_ptr<GroupNode>& schema, |
351 | const std::shared_ptr<WriterProperties>& properties, |
352 | const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) { |
353 | auto contents = FileSerializer::Open(sink, schema, properties, key_value_metadata); |
354 | std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter()); |
355 | result->Open(std::move(contents)); |
356 | return result; |
357 | } |
358 | |
359 | std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open( |
360 | const std::shared_ptr<OutputStream>& sink, |
361 | const std::shared_ptr<schema::GroupNode>& schema, |
362 | const std::shared_ptr<WriterProperties>& properties, |
363 | const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) { |
364 | return Open(std::make_shared<ParquetOutputWrapper>(sink), schema, properties, |
365 | key_value_metadata); |
366 | } |
367 | |
368 | void WriteFileMetaData(const FileMetaData& file_metadata, ArrowOutputStream* sink) { |
369 | int64_t position = -1; |
370 | PARQUET_THROW_NOT_OK(sink->Tell(&position)); |
371 | |
372 | // Write MetaData |
373 | uint32_t metadata_len = static_cast<uint32_t>(position); |
374 | |
375 | file_metadata.WriteTo(sink); |
376 | PARQUET_THROW_NOT_OK(sink->Tell(&position)); |
377 | metadata_len = static_cast<uint32_t>(position) - metadata_len; |
378 | |
379 | // Write Footer |
380 | PARQUET_THROW_NOT_OK(sink->Write(reinterpret_cast<uint8_t*>(&metadata_len), 4)); |
381 | PARQUET_THROW_NOT_OK(sink->Write(PARQUET_MAGIC, 4)); |
382 | } |
383 | |
384 | void WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink) { |
385 | ParquetOutputWrapper wrapper(sink); |
386 | return WriteFileMetaData(file_metadata, &wrapper); |
387 | } |
388 | |
389 | void WriteMetaDataFile(const FileMetaData& file_metadata, ArrowOutputStream* sink) { |
390 | PARQUET_THROW_NOT_OK(sink->Write(PARQUET_MAGIC, 4)); |
391 | return WriteFileMetaData(file_metadata, sink); |
392 | } |
393 | |
394 | const SchemaDescriptor* ParquetFileWriter::schema() const { return contents_->schema(); } |
395 | |
396 | const ColumnDescriptor* ParquetFileWriter::descr(int i) const { |
397 | return contents_->schema()->Column(i); |
398 | } |
399 | |
400 | int ParquetFileWriter::num_columns() const { return contents_->num_columns(); } |
401 | |
402 | int64_t ParquetFileWriter::num_rows() const { return contents_->num_rows(); } |
403 | |
404 | int ParquetFileWriter::num_row_groups() const { return contents_->num_row_groups(); } |
405 | |
406 | const std::shared_ptr<const KeyValueMetadata>& ParquetFileWriter::key_value_metadata() |
407 | const { |
408 | return contents_->key_value_metadata(); |
409 | } |
410 | |
411 | const std::shared_ptr<FileMetaData> ParquetFileWriter::metadata() const { |
412 | return file_metadata_; |
413 | } |
414 | |
415 | void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> contents) { |
416 | contents_ = std::move(contents); |
417 | } |
418 | |
419 | void ParquetFileWriter::Close() { |
420 | if (contents_) { |
421 | contents_->Close(); |
422 | file_metadata_ = contents_->metadata(); |
423 | contents_.reset(); |
424 | } |
425 | } |
426 | |
427 | RowGroupWriter* ParquetFileWriter::AppendRowGroup() { |
428 | return contents_->AppendRowGroup(); |
429 | } |
430 | |
431 | RowGroupWriter* ParquetFileWriter::AppendBufferedRowGroup() { |
432 | return contents_->AppendBufferedRowGroup(); |
433 | } |
434 | |
435 | RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) { |
436 | return AppendRowGroup(); |
437 | } |
438 | |
439 | const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const { |
440 | return contents_->properties(); |
441 | } |
442 | |
443 | } // namespace parquet |
444 | |