1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "parquet/file_writer.h" |
19 | |
20 | #include <utility> |
21 | #include <vector> |
22 | |
23 | #include "parquet/column_writer.h" |
24 | #include "parquet/schema-internal.h" |
25 | #include "parquet/schema.h" |
26 | #include "parquet/thrift.h" |
27 | #include "parquet/util/memory.h" |
28 | |
29 | using arrow::MemoryPool; |
30 | |
31 | using parquet::schema::GroupNode; |
32 | using parquet::schema::SchemaFlattener; |
33 | |
34 | namespace parquet { |
35 | |
36 | // FIXME: copied from reader-internal.cc |
37 | static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; |
38 | |
39 | // ---------------------------------------------------------------------- |
40 | // RowGroupWriter public API |
41 | |
42 | RowGroupWriter::RowGroupWriter(std::unique_ptr<Contents> contents) |
43 | : contents_(std::move(contents)) {} |
44 | |
45 | void RowGroupWriter::Close() { |
46 | if (contents_) { |
47 | contents_->Close(); |
48 | } |
49 | } |
50 | |
51 | ColumnWriter* RowGroupWriter::NextColumn() { return contents_->NextColumn(); } |
52 | |
53 | ColumnWriter* RowGroupWriter::column(int i) { return contents_->column(i); } |
54 | |
55 | int64_t RowGroupWriter::total_compressed_bytes() const { |
56 | return contents_->total_compressed_bytes(); |
57 | } |
58 | |
59 | int64_t RowGroupWriter::total_bytes_written() const { |
60 | return contents_->total_bytes_written(); |
61 | } |
62 | |
63 | int RowGroupWriter::current_column() { return contents_->current_column(); } |
64 | |
65 | int RowGroupWriter::num_columns() const { return contents_->num_columns(); } |
66 | |
67 | int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); } |
68 | |
69 | inline void ThrowRowsMisMatchError(int col, int64_t prev, int64_t curr) { |
70 | std::stringstream ss; |
71 | ss << "Column " << col << " had " << curr << " while previous column had " << prev; |
72 | throw ParquetException(ss.str()); |
73 | } |
74 | |
75 | // ---------------------------------------------------------------------- |
76 | // RowGroupSerializer |
77 | |
78 | // RowGroupWriter::Contents implementation for the Parquet file specification |
79 | class RowGroupSerializer : public RowGroupWriter::Contents { |
80 | public: |
81 | RowGroupSerializer(OutputStream* sink, RowGroupMetaDataBuilder* metadata, |
82 | const WriterProperties* properties, bool buffered_row_group = false) |
83 | : sink_(sink), |
84 | metadata_(metadata), |
85 | properties_(properties), |
86 | total_bytes_written_(0), |
87 | closed_(false), |
88 | current_column_index_(0), |
89 | num_rows_(0), |
90 | buffered_row_group_(buffered_row_group) { |
91 | if (buffered_row_group) { |
92 | InitColumns(); |
93 | } else { |
94 | column_writers_.push_back(nullptr); |
95 | } |
96 | } |
97 | |
98 | int num_columns() const override { return metadata_->num_columns(); } |
99 | |
100 | int64_t num_rows() const override { |
101 | CheckRowsWritten(); |
102 | // CheckRowsWritten ensures num_rows_ is set correctly |
103 | return num_rows_; |
104 | } |
105 | |
106 | ColumnWriter* NextColumn() override { |
107 | if (buffered_row_group_) { |
108 | throw ParquetException( |
109 | "NextColumn() is not supported when a RowGroup is written by size" ); |
110 | } |
111 | |
112 | if (column_writers_[0]) { |
113 | CheckRowsWritten(); |
114 | } |
115 | |
116 | // Throws an error if more columns are being written |
117 | auto col_meta = metadata_->NextColumnChunk(); |
118 | |
119 | if (column_writers_[0]) { |
120 | total_bytes_written_ += column_writers_[0]->Close(); |
121 | } |
122 | |
123 | ++current_column_index_; |
124 | |
125 | const ColumnDescriptor* column_descr = col_meta->descr(); |
126 | std::unique_ptr<PageWriter> = |
127 | PageWriter::Open(sink_, properties_->compression(column_descr->path()), col_meta, |
128 | properties_->memory_pool()); |
129 | column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager), properties_); |
130 | return column_writers_[0].get(); |
131 | } |
132 | |
133 | ColumnWriter* column(int i) override { |
134 | if (!buffered_row_group_) { |
135 | throw ParquetException( |
136 | "column() is only supported when a BufferedRowGroup is being written" ); |
137 | } |
138 | |
139 | if (i >= 0 && i < static_cast<int>(column_writers_.size())) { |
140 | return column_writers_[i].get(); |
141 | } |
142 | return nullptr; |
143 | } |
144 | |
145 | int current_column() const override { return metadata_->current_column(); } |
146 | |
147 | int64_t total_compressed_bytes() const override { |
148 | int64_t total_compressed_bytes = 0; |
149 | for (size_t i = 0; i < column_writers_.size(); i++) { |
150 | if (column_writers_[i]) { |
151 | total_compressed_bytes += column_writers_[i]->total_compressed_bytes(); |
152 | } |
153 | } |
154 | return total_compressed_bytes; |
155 | } |
156 | |
157 | int64_t total_bytes_written() const override { |
158 | int64_t total_bytes_written = 0; |
159 | for (size_t i = 0; i < column_writers_.size(); i++) { |
160 | if (column_writers_[i]) { |
161 | total_bytes_written += column_writers_[i]->total_bytes_written(); |
162 | } |
163 | } |
164 | return total_bytes_written; |
165 | } |
166 | |
167 | void Close() override { |
168 | if (!closed_) { |
169 | closed_ = true; |
170 | CheckRowsWritten(); |
171 | |
172 | for (size_t i = 0; i < column_writers_.size(); i++) { |
173 | if (column_writers_[i]) { |
174 | total_bytes_written_ += column_writers_[i]->Close(); |
175 | column_writers_[i].reset(); |
176 | } |
177 | } |
178 | |
179 | column_writers_.clear(); |
180 | |
181 | // Ensures all columns have been written |
182 | metadata_->set_num_rows(num_rows_); |
183 | metadata_->Finish(total_bytes_written_); |
184 | } |
185 | } |
186 | |
187 | private: |
188 | OutputStream* sink_; |
189 | mutable RowGroupMetaDataBuilder* metadata_; |
190 | const WriterProperties* properties_; |
191 | int64_t total_bytes_written_; |
192 | bool closed_; |
193 | int current_column_index_; |
194 | mutable int64_t num_rows_; |
195 | bool buffered_row_group_; |
196 | |
197 | void CheckRowsWritten() const { |
198 | // verify when only one column is written at a time |
199 | if (!buffered_row_group_ && column_writers_.size() > 0 && column_writers_[0]) { |
200 | int64_t current_col_rows = column_writers_[0]->rows_written(); |
201 | if (num_rows_ == 0) { |
202 | num_rows_ = current_col_rows; |
203 | } else if (num_rows_ != current_col_rows) { |
204 | ThrowRowsMisMatchError(current_column_index_, current_col_rows, num_rows_); |
205 | } |
206 | } else if (buffered_row_group_ && |
207 | column_writers_.size() > 0) { // when buffered_row_group = true |
208 | int64_t current_col_rows = column_writers_[0]->rows_written(); |
209 | for (int i = 1; i < static_cast<int>(column_writers_.size()); i++) { |
210 | int64_t current_col_rows_i = column_writers_[i]->rows_written(); |
211 | if (current_col_rows != current_col_rows_i) { |
212 | ThrowRowsMisMatchError(i, current_col_rows_i, current_col_rows); |
213 | } |
214 | } |
215 | num_rows_ = current_col_rows; |
216 | } |
217 | } |
218 | |
219 | void InitColumns() { |
220 | for (int i = 0; i < num_columns(); i++) { |
221 | auto col_meta = metadata_->NextColumnChunk(); |
222 | const ColumnDescriptor* column_descr = col_meta->descr(); |
223 | std::unique_ptr<PageWriter> = |
224 | PageWriter::Open(sink_, properties_->compression(column_descr->path()), |
225 | col_meta, properties_->memory_pool(), buffered_row_group_); |
226 | column_writers_.push_back( |
227 | ColumnWriter::Make(col_meta, std::move(pager), properties_)); |
228 | } |
229 | } |
230 | |
231 | std::vector<std::shared_ptr<ColumnWriter>> column_writers_; |
232 | }; |
233 | |
234 | // ---------------------------------------------------------------------- |
235 | // FileSerializer |
236 | |
237 | // An implementation of ParquetFileWriter::Contents that deals with the Parquet |
238 | // file structure, Thrift serialization, and other internal matters |
239 | |
240 | class FileSerializer : public ParquetFileWriter::Contents { |
241 | public: |
242 | static std::unique_ptr<ParquetFileWriter::Contents> Open( |
243 | const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<GroupNode>& schema, |
244 | const std::shared_ptr<WriterProperties>& properties, |
245 | const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) { |
246 | std::unique_ptr<ParquetFileWriter::Contents> result( |
247 | new FileSerializer(sink, schema, properties, key_value_metadata)); |
248 | |
249 | return result; |
250 | } |
251 | |
252 | void Close() override { |
253 | if (is_open_) { |
254 | if (row_group_writer_) { |
255 | num_rows_ += row_group_writer_->num_rows(); |
256 | row_group_writer_->Close(); |
257 | } |
258 | row_group_writer_.reset(); |
259 | |
260 | // Write magic bytes and metadata |
261 | auto metadata = metadata_->Finish(); |
262 | WriteFileMetaData(*metadata, sink_.get()); |
263 | |
264 | sink_->Close(); |
265 | is_open_ = false; |
266 | } |
267 | } |
268 | |
269 | int num_columns() const override { return schema_.num_columns(); } |
270 | |
271 | int num_row_groups() const override { return num_row_groups_; } |
272 | |
273 | int64_t num_rows() const override { return num_rows_; } |
274 | |
275 | const std::shared_ptr<WriterProperties>& properties() const override { |
276 | return properties_; |
277 | } |
278 | |
279 | RowGroupWriter* AppendRowGroup(bool buffered_row_group) { |
280 | if (row_group_writer_) { |
281 | row_group_writer_->Close(); |
282 | } |
283 | num_row_groups_++; |
284 | auto rg_metadata = metadata_->AppendRowGroup(); |
285 | std::unique_ptr<RowGroupWriter::Contents> contents(new RowGroupSerializer( |
286 | sink_.get(), rg_metadata, properties_.get(), buffered_row_group)); |
287 | row_group_writer_.reset(new RowGroupWriter(std::move(contents))); |
288 | return row_group_writer_.get(); |
289 | } |
290 | |
291 | RowGroupWriter* AppendRowGroup() override { return AppendRowGroup(false); } |
292 | |
293 | RowGroupWriter* AppendBufferedRowGroup() override { return AppendRowGroup(true); } |
294 | |
295 | ~FileSerializer() override { |
296 | try { |
297 | Close(); |
298 | } catch (...) { |
299 | } |
300 | } |
301 | |
302 | private: |
303 | FileSerializer(const std::shared_ptr<OutputStream>& sink, |
304 | const std::shared_ptr<GroupNode>& schema, |
305 | const std::shared_ptr<WriterProperties>& properties, |
306 | const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) |
307 | : ParquetFileWriter::Contents(schema, key_value_metadata), |
308 | sink_(sink), |
309 | is_open_(true), |
310 | properties_(properties), |
311 | num_row_groups_(0), |
312 | num_rows_(0), |
313 | metadata_(FileMetaDataBuilder::Make(&schema_, properties, key_value_metadata)) { |
314 | StartFile(); |
315 | } |
316 | |
317 | std::shared_ptr<OutputStream> sink_; |
318 | bool is_open_; |
319 | const std::shared_ptr<WriterProperties> properties_; |
320 | int num_row_groups_; |
321 | int64_t num_rows_; |
322 | std::unique_ptr<FileMetaDataBuilder> metadata_; |
323 | // Only one of the row group writers is active at a time |
324 | std::unique_ptr<RowGroupWriter> row_group_writer_; |
325 | |
326 | void StartFile() { |
327 | // Parquet files always start with PAR1 |
328 | sink_->Write(PARQUET_MAGIC, 4); |
329 | } |
330 | }; |
331 | |
332 | // ---------------------------------------------------------------------- |
333 | // ParquetFileWriter public API |
334 | |
335 | ParquetFileWriter::ParquetFileWriter() {} |
336 | |
337 | ParquetFileWriter::~ParquetFileWriter() { |
338 | try { |
339 | Close(); |
340 | } catch (...) { |
341 | } |
342 | } |
343 | |
344 | std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open( |
345 | const std::shared_ptr<::arrow::io::OutputStream>& sink, |
346 | const std::shared_ptr<GroupNode>& schema, |
347 | const std::shared_ptr<WriterProperties>& properties, |
348 | const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) { |
349 | return Open(std::make_shared<ArrowOutputStream>(sink), schema, properties, |
350 | key_value_metadata); |
351 | } |
352 | |
353 | std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open( |
354 | const std::shared_ptr<OutputStream>& sink, |
355 | const std::shared_ptr<schema::GroupNode>& schema, |
356 | const std::shared_ptr<WriterProperties>& properties, |
357 | const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) { |
358 | auto contents = FileSerializer::Open(sink, schema, properties, key_value_metadata); |
359 | std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter()); |
360 | result->Open(std::move(contents)); |
361 | return result; |
362 | } |
363 | |
364 | void WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink) { |
365 | // Write MetaData |
366 | uint32_t metadata_len = static_cast<uint32_t>(sink->Tell()); |
367 | |
368 | file_metadata.WriteTo(sink); |
369 | metadata_len = static_cast<uint32_t>(sink->Tell()) - metadata_len; |
370 | |
371 | // Write Footer |
372 | sink->Write(reinterpret_cast<uint8_t*>(&metadata_len), 4); |
373 | sink->Write(PARQUET_MAGIC, 4); |
374 | } |
375 | |
376 | const SchemaDescriptor* ParquetFileWriter::schema() const { return contents_->schema(); } |
377 | |
378 | const ColumnDescriptor* ParquetFileWriter::descr(int i) const { |
379 | return contents_->schema()->Column(i); |
380 | } |
381 | |
382 | int ParquetFileWriter::num_columns() const { return contents_->num_columns(); } |
383 | |
384 | int64_t ParquetFileWriter::num_rows() const { return contents_->num_rows(); } |
385 | |
386 | int ParquetFileWriter::num_row_groups() const { return contents_->num_row_groups(); } |
387 | |
388 | const std::shared_ptr<const KeyValueMetadata>& ParquetFileWriter::key_value_metadata() |
389 | const { |
390 | return contents_->key_value_metadata(); |
391 | } |
392 | |
393 | void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> contents) { |
394 | contents_ = std::move(contents); |
395 | } |
396 | |
397 | void ParquetFileWriter::Close() { |
398 | if (contents_) { |
399 | contents_->Close(); |
400 | contents_.reset(); |
401 | } |
402 | } |
403 | |
404 | RowGroupWriter* ParquetFileWriter::AppendRowGroup() { |
405 | return contents_->AppendRowGroup(); |
406 | } |
407 | |
408 | RowGroupWriter* ParquetFileWriter::AppendBufferedRowGroup() { |
409 | return contents_->AppendBufferedRowGroup(); |
410 | } |
411 | |
412 | RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) { |
413 | return AppendRowGroup(); |
414 | } |
415 | |
416 | const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const { |
417 | return contents_->properties(); |
418 | } |
419 | |
420 | } // namespace parquet |
421 | |