| 1 | // Licensed to the Apache Software Foundation (ASF) under one |
| 2 | // or more contributor license agreements. See the NOTICE file |
| 3 | // distributed with this work for additional information |
| 4 | // regarding copyright ownership. The ASF licenses this file |
| 5 | // to you under the Apache License, Version 2.0 (the |
| 6 | // "License"); you may not use this file except in compliance |
| 7 | // with the License. You may obtain a copy of the License at |
| 8 | // |
| 9 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | // |
| 11 | // Unless required by applicable law or agreed to in writing, |
| 12 | // software distributed under the License is distributed on an |
| 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | // KIND, either express or implied. See the License for the |
| 15 | // specific language governing permissions and limitations |
| 16 | // under the License. |
| 17 | |
| 18 | #include "parquet/file_writer.h" |
| 19 | |
| 20 | #include <cstddef> |
| 21 | #include <ostream> |
| 22 | #include <utility> |
| 23 | #include <vector> |
| 24 | |
| 25 | #include "parquet/column_writer.h" |
| 26 | #include "parquet/deprecated_io.h" |
| 27 | #include "parquet/exception.h" |
| 28 | #include "parquet/platform.h" |
| 29 | #include "parquet/schema.h" |
| 30 | #include "parquet/types.h" |
| 31 | |
| 32 | using arrow::MemoryPool; |
| 33 | |
| 34 | using parquet::schema::GroupNode; |
| 35 | |
| 36 | namespace parquet { |
| 37 | |
| 38 | // FIXME: copied from reader-internal.cc |
| 39 | static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; |
| 40 | |
| 41 | // ---------------------------------------------------------------------- |
| 42 | // RowGroupWriter public API |
| 43 | |
| 44 | RowGroupWriter::RowGroupWriter(std::unique_ptr<Contents> contents) |
| 45 | : contents_(std::move(contents)) {} |
| 46 | |
| 47 | void RowGroupWriter::Close() { |
| 48 | if (contents_) { |
| 49 | contents_->Close(); |
| 50 | } |
| 51 | } |
| 52 | |
| 53 | ColumnWriter* RowGroupWriter::NextColumn() { return contents_->NextColumn(); } |
| 54 | |
| 55 | ColumnWriter* RowGroupWriter::column(int i) { return contents_->column(i); } |
| 56 | |
| 57 | int64_t RowGroupWriter::total_compressed_bytes() const { |
| 58 | return contents_->total_compressed_bytes(); |
| 59 | } |
| 60 | |
| 61 | int64_t RowGroupWriter::total_bytes_written() const { |
| 62 | return contents_->total_bytes_written(); |
| 63 | } |
| 64 | |
| 65 | int RowGroupWriter::current_column() { return contents_->current_column(); } |
| 66 | |
| 67 | int RowGroupWriter::num_columns() const { return contents_->num_columns(); } |
| 68 | |
| 69 | int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); } |
| 70 | |
| 71 | inline void ThrowRowsMisMatchError(int col, int64_t prev, int64_t curr) { |
| 72 | std::stringstream ss; |
| 73 | ss << "Column " << col << " had " << curr << " while previous column had " << prev; |
| 74 | throw ParquetException(ss.str()); |
| 75 | } |
| 76 | |
| 77 | // ---------------------------------------------------------------------- |
| 78 | // RowGroupSerializer |
| 79 | |
| 80 | // RowGroupWriter::Contents implementation for the Parquet file specification |
| 81 | class RowGroupSerializer : public RowGroupWriter::Contents { |
| 82 | public: |
| 83 | RowGroupSerializer(const std::shared_ptr<ArrowOutputStream>& sink, |
| 84 | RowGroupMetaDataBuilder* metadata, |
| 85 | const WriterProperties* properties, bool buffered_row_group = false) |
| 86 | : sink_(sink), |
| 87 | metadata_(metadata), |
| 88 | properties_(properties), |
| 89 | total_bytes_written_(0), |
| 90 | closed_(false), |
| 91 | next_column_index_(0), |
| 92 | num_rows_(0), |
| 93 | buffered_row_group_(buffered_row_group) { |
| 94 | if (buffered_row_group) { |
| 95 | InitColumns(); |
| 96 | } else { |
| 97 | column_writers_.push_back(nullptr); |
| 98 | } |
| 99 | } |
| 100 | |
| 101 | int num_columns() const override { return metadata_->num_columns(); } |
| 102 | |
| 103 | int64_t num_rows() const override { |
| 104 | CheckRowsWritten(); |
| 105 | // CheckRowsWritten ensures num_rows_ is set correctly |
| 106 | return num_rows_; |
| 107 | } |
| 108 | |
| 109 | ColumnWriter* NextColumn() override { |
| 110 | if (buffered_row_group_) { |
| 111 | throw ParquetException( |
| 112 | "NextColumn() is not supported when a RowGroup is written by size" ); |
| 113 | } |
| 114 | |
| 115 | if (column_writers_[0]) { |
| 116 | CheckRowsWritten(); |
| 117 | } |
| 118 | |
| 119 | // Throws an error if more columns are being written |
| 120 | auto col_meta = metadata_->NextColumnChunk(); |
| 121 | |
| 122 | if (column_writers_[0]) { |
| 123 | total_bytes_written_ += column_writers_[0]->Close(); |
| 124 | } |
| 125 | |
| 126 | ++next_column_index_; |
| 127 | |
| 128 | const auto& path = col_meta->descr()->path(); |
| 129 | std::unique_ptr<PageWriter> = PageWriter::Open( |
| 130 | sink_, properties_->compression(path), properties_->compression_level(path), |
| 131 | col_meta, properties_->memory_pool()); |
| 132 | column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager), properties_); |
| 133 | return column_writers_[0].get(); |
| 134 | } |
| 135 | |
| 136 | ColumnWriter* column(int i) override { |
| 137 | if (!buffered_row_group_) { |
| 138 | throw ParquetException( |
| 139 | "column() is only supported when a BufferedRowGroup is being written" ); |
| 140 | } |
| 141 | |
| 142 | if (i >= 0 && i < static_cast<int>(column_writers_.size())) { |
| 143 | return column_writers_[i].get(); |
| 144 | } |
| 145 | return nullptr; |
| 146 | } |
| 147 | |
| 148 | int current_column() const override { return metadata_->current_column(); } |
| 149 | |
| 150 | int64_t total_compressed_bytes() const override { |
| 151 | int64_t total_compressed_bytes = 0; |
| 152 | for (size_t i = 0; i < column_writers_.size(); i++) { |
| 153 | if (column_writers_[i]) { |
| 154 | total_compressed_bytes += column_writers_[i]->total_compressed_bytes(); |
| 155 | } |
| 156 | } |
| 157 | return total_compressed_bytes; |
| 158 | } |
| 159 | |
| 160 | int64_t total_bytes_written() const override { |
| 161 | int64_t total_bytes_written = 0; |
| 162 | for (size_t i = 0; i < column_writers_.size(); i++) { |
| 163 | if (column_writers_[i]) { |
| 164 | total_bytes_written += column_writers_[i]->total_bytes_written(); |
| 165 | } |
| 166 | } |
| 167 | return total_bytes_written; |
| 168 | } |
| 169 | |
| 170 | void Close() override { |
| 171 | if (!closed_) { |
| 172 | closed_ = true; |
| 173 | CheckRowsWritten(); |
| 174 | |
| 175 | for (size_t i = 0; i < column_writers_.size(); i++) { |
| 176 | if (column_writers_[i]) { |
| 177 | total_bytes_written_ += column_writers_[i]->Close(); |
| 178 | column_writers_[i].reset(); |
| 179 | } |
| 180 | } |
| 181 | |
| 182 | column_writers_.clear(); |
| 183 | |
| 184 | // Ensures all columns have been written |
| 185 | metadata_->set_num_rows(num_rows_); |
| 186 | metadata_->Finish(total_bytes_written_); |
| 187 | } |
| 188 | } |
| 189 | |
| 190 | private: |
| 191 | std::shared_ptr<ArrowOutputStream> sink_; |
| 192 | mutable RowGroupMetaDataBuilder* metadata_; |
| 193 | const WriterProperties* properties_; |
| 194 | int64_t total_bytes_written_; |
| 195 | bool closed_; |
| 196 | int next_column_index_; |
| 197 | mutable int64_t num_rows_; |
| 198 | bool buffered_row_group_; |
| 199 | |
| 200 | void CheckRowsWritten() const { |
| 201 | // verify when only one column is written at a time |
| 202 | if (!buffered_row_group_ && column_writers_.size() > 0 && column_writers_[0]) { |
| 203 | int64_t current_col_rows = column_writers_[0]->rows_written(); |
| 204 | if (num_rows_ == 0) { |
| 205 | num_rows_ = current_col_rows; |
| 206 | } else if (num_rows_ != current_col_rows) { |
| 207 | ThrowRowsMisMatchError(next_column_index_, current_col_rows, num_rows_); |
| 208 | } |
| 209 | } else if (buffered_row_group_ && |
| 210 | column_writers_.size() > 0) { // when buffered_row_group = true |
| 211 | int64_t current_col_rows = column_writers_[0]->rows_written(); |
| 212 | for (int i = 1; i < static_cast<int>(column_writers_.size()); i++) { |
| 213 | int64_t current_col_rows_i = column_writers_[i]->rows_written(); |
| 214 | if (current_col_rows != current_col_rows_i) { |
| 215 | ThrowRowsMisMatchError(i, current_col_rows_i, current_col_rows); |
| 216 | } |
| 217 | } |
| 218 | num_rows_ = current_col_rows; |
| 219 | } |
| 220 | } |
| 221 | |
| 222 | void InitColumns() { |
| 223 | for (int i = 0; i < num_columns(); i++) { |
| 224 | auto col_meta = metadata_->NextColumnChunk(); |
| 225 | const auto& path = col_meta->descr()->path(); |
| 226 | std::unique_ptr<PageWriter> = PageWriter::Open( |
| 227 | sink_, properties_->compression(path), properties_->compression_level(path), |
| 228 | col_meta, properties_->memory_pool(), buffered_row_group_); |
| 229 | column_writers_.push_back( |
| 230 | ColumnWriter::Make(col_meta, std::move(pager), properties_)); |
| 231 | } |
| 232 | } |
| 233 | |
| 234 | std::vector<std::shared_ptr<ColumnWriter>> column_writers_; |
| 235 | }; |
| 236 | |
| 237 | // ---------------------------------------------------------------------- |
| 238 | // FileSerializer |
| 239 | |
| 240 | // An implementation of ParquetFileWriter::Contents that deals with the Parquet |
| 241 | // file structure, Thrift serialization, and other internal matters |
| 242 | |
| 243 | class FileSerializer : public ParquetFileWriter::Contents { |
| 244 | public: |
| 245 | static std::unique_ptr<ParquetFileWriter::Contents> Open( |
| 246 | const std::shared_ptr<ArrowOutputStream>& sink, |
| 247 | const std::shared_ptr<GroupNode>& schema, |
| 248 | const std::shared_ptr<WriterProperties>& properties, |
| 249 | const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) { |
| 250 | std::unique_ptr<ParquetFileWriter::Contents> result( |
| 251 | new FileSerializer(sink, schema, properties, key_value_metadata)); |
| 252 | |
| 253 | return result; |
| 254 | } |
| 255 | |
| 256 | void Close() override { |
| 257 | if (is_open_) { |
| 258 | // If any functions here raise an exception, we set is_open_ to be false |
| 259 | // so that this does not get called again (possibly causing segfault) |
| 260 | is_open_ = false; |
| 261 | if (row_group_writer_) { |
| 262 | num_rows_ += row_group_writer_->num_rows(); |
| 263 | row_group_writer_->Close(); |
| 264 | } |
| 265 | row_group_writer_.reset(); |
| 266 | |
| 267 | // Write magic bytes and metadata |
| 268 | file_metadata_ = metadata_->Finish(); |
| 269 | WriteFileMetaData(*file_metadata_, sink_.get()); |
| 270 | } |
| 271 | } |
| 272 | |
| 273 | int num_columns() const override { return schema_.num_columns(); } |
| 274 | |
| 275 | int num_row_groups() const override { return num_row_groups_; } |
| 276 | |
| 277 | int64_t num_rows() const override { return num_rows_; } |
| 278 | |
| 279 | const std::shared_ptr<WriterProperties>& properties() const override { |
| 280 | return properties_; |
| 281 | } |
| 282 | |
| 283 | RowGroupWriter* AppendRowGroup(bool buffered_row_group) { |
| 284 | if (row_group_writer_) { |
| 285 | row_group_writer_->Close(); |
| 286 | } |
| 287 | num_row_groups_++; |
| 288 | auto rg_metadata = metadata_->AppendRowGroup(); |
| 289 | std::unique_ptr<RowGroupWriter::Contents> contents(new RowGroupSerializer( |
| 290 | sink_, rg_metadata, properties_.get(), buffered_row_group)); |
| 291 | row_group_writer_.reset(new RowGroupWriter(std::move(contents))); |
| 292 | return row_group_writer_.get(); |
| 293 | } |
| 294 | |
| 295 | RowGroupWriter* AppendRowGroup() override { return AppendRowGroup(false); } |
| 296 | |
| 297 | RowGroupWriter* AppendBufferedRowGroup() override { return AppendRowGroup(true); } |
| 298 | |
| 299 | ~FileSerializer() override { |
| 300 | try { |
| 301 | Close(); |
| 302 | } catch (...) { |
| 303 | } |
| 304 | } |
| 305 | |
| 306 | private: |
| 307 | FileSerializer(const std::shared_ptr<ArrowOutputStream>& sink, |
| 308 | const std::shared_ptr<GroupNode>& schema, |
| 309 | const std::shared_ptr<WriterProperties>& properties, |
| 310 | const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) |
| 311 | : ParquetFileWriter::Contents(schema, key_value_metadata), |
| 312 | sink_(sink), |
| 313 | is_open_(true), |
| 314 | properties_(properties), |
| 315 | num_row_groups_(0), |
| 316 | num_rows_(0), |
| 317 | metadata_(FileMetaDataBuilder::Make(&schema_, properties, key_value_metadata)) { |
| 318 | StartFile(); |
| 319 | } |
| 320 | |
| 321 | std::shared_ptr<ArrowOutputStream> sink_; |
| 322 | bool is_open_; |
| 323 | const std::shared_ptr<WriterProperties> properties_; |
| 324 | int num_row_groups_; |
| 325 | int64_t num_rows_; |
| 326 | std::unique_ptr<FileMetaDataBuilder> metadata_; |
| 327 | // Only one of the row group writers is active at a time |
| 328 | std::unique_ptr<RowGroupWriter> row_group_writer_; |
| 329 | |
| 330 | void StartFile() { |
| 331 | // Parquet files always start with PAR1 |
| 332 | PARQUET_THROW_NOT_OK(sink_->Write(PARQUET_MAGIC, 4)); |
| 333 | } |
| 334 | }; |
| 335 | |
| 336 | // ---------------------------------------------------------------------- |
| 337 | // ParquetFileWriter public API |
| 338 | |
| 339 | ParquetFileWriter::ParquetFileWriter() {} |
| 340 | |
| 341 | ParquetFileWriter::~ParquetFileWriter() { |
| 342 | try { |
| 343 | Close(); |
| 344 | } catch (...) { |
| 345 | } |
| 346 | } |
| 347 | |
| 348 | std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open( |
| 349 | const std::shared_ptr<::arrow::io::OutputStream>& sink, |
| 350 | const std::shared_ptr<GroupNode>& schema, |
| 351 | const std::shared_ptr<WriterProperties>& properties, |
| 352 | const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) { |
| 353 | auto contents = FileSerializer::Open(sink, schema, properties, key_value_metadata); |
| 354 | std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter()); |
| 355 | result->Open(std::move(contents)); |
| 356 | return result; |
| 357 | } |
| 358 | |
| 359 | std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open( |
| 360 | const std::shared_ptr<OutputStream>& sink, |
| 361 | const std::shared_ptr<schema::GroupNode>& schema, |
| 362 | const std::shared_ptr<WriterProperties>& properties, |
| 363 | const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) { |
| 364 | return Open(std::make_shared<ParquetOutputWrapper>(sink), schema, properties, |
| 365 | key_value_metadata); |
| 366 | } |
| 367 | |
| 368 | void WriteFileMetaData(const FileMetaData& file_metadata, ArrowOutputStream* sink) { |
| 369 | int64_t position = -1; |
| 370 | PARQUET_THROW_NOT_OK(sink->Tell(&position)); |
| 371 | |
| 372 | // Write MetaData |
| 373 | uint32_t metadata_len = static_cast<uint32_t>(position); |
| 374 | |
| 375 | file_metadata.WriteTo(sink); |
| 376 | PARQUET_THROW_NOT_OK(sink->Tell(&position)); |
| 377 | metadata_len = static_cast<uint32_t>(position) - metadata_len; |
| 378 | |
| 379 | // Write Footer |
| 380 | PARQUET_THROW_NOT_OK(sink->Write(reinterpret_cast<uint8_t*>(&metadata_len), 4)); |
| 381 | PARQUET_THROW_NOT_OK(sink->Write(PARQUET_MAGIC, 4)); |
| 382 | } |
| 383 | |
| 384 | void WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink) { |
| 385 | ParquetOutputWrapper wrapper(sink); |
| 386 | return WriteFileMetaData(file_metadata, &wrapper); |
| 387 | } |
| 388 | |
| 389 | void WriteMetaDataFile(const FileMetaData& file_metadata, ArrowOutputStream* sink) { |
| 390 | PARQUET_THROW_NOT_OK(sink->Write(PARQUET_MAGIC, 4)); |
| 391 | return WriteFileMetaData(file_metadata, sink); |
| 392 | } |
| 393 | |
| 394 | const SchemaDescriptor* ParquetFileWriter::schema() const { return contents_->schema(); } |
| 395 | |
| 396 | const ColumnDescriptor* ParquetFileWriter::descr(int i) const { |
| 397 | return contents_->schema()->Column(i); |
| 398 | } |
| 399 | |
| 400 | int ParquetFileWriter::num_columns() const { return contents_->num_columns(); } |
| 401 | |
| 402 | int64_t ParquetFileWriter::num_rows() const { return contents_->num_rows(); } |
| 403 | |
| 404 | int ParquetFileWriter::num_row_groups() const { return contents_->num_row_groups(); } |
| 405 | |
| 406 | const std::shared_ptr<const KeyValueMetadata>& ParquetFileWriter::key_value_metadata() |
| 407 | const { |
| 408 | return contents_->key_value_metadata(); |
| 409 | } |
| 410 | |
| 411 | const std::shared_ptr<FileMetaData> ParquetFileWriter::metadata() const { |
| 412 | return file_metadata_; |
| 413 | } |
| 414 | |
| 415 | void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> contents) { |
| 416 | contents_ = std::move(contents); |
| 417 | } |
| 418 | |
| 419 | void ParquetFileWriter::Close() { |
| 420 | if (contents_) { |
| 421 | contents_->Close(); |
| 422 | file_metadata_ = contents_->metadata(); |
| 423 | contents_.reset(); |
| 424 | } |
| 425 | } |
| 426 | |
| 427 | RowGroupWriter* ParquetFileWriter::AppendRowGroup() { |
| 428 | return contents_->AppendRowGroup(); |
| 429 | } |
| 430 | |
| 431 | RowGroupWriter* ParquetFileWriter::AppendBufferedRowGroup() { |
| 432 | return contents_->AppendBufferedRowGroup(); |
| 433 | } |
| 434 | |
| 435 | RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) { |
| 436 | return AppendRowGroup(); |
| 437 | } |
| 438 | |
| 439 | const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const { |
| 440 | return contents_->properties(); |
| 441 | } |
| 442 | |
| 443 | } // namespace parquet |
| 444 | |