| 1 | /** |
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | * or more contributor license agreements. See the NOTICE file |
| 4 | * distributed with this work for additional information |
| 5 | * regarding copyright ownership. The ASF licenses this file |
| 6 | * to you under the Apache License, Version 2.0 (the |
| 7 | * "License"); you may not use this file except in compliance |
| 8 | * with the License. You may obtain a copy of the License at |
| 9 | * |
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | * |
| 12 | * Unless required by applicable law or agreed to in writing, software |
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | * See the License for the specific language governing permissions and |
| 16 | * limitations under the License. |
| 17 | */ |
| 18 | |
| 19 | #include "orc/Common.hh" |
| 20 | #include "orc/OrcFile.hh" |
| 21 | |
| 22 | #include "ColumnWriter.hh" |
| 23 | #include "Timezone.hh" |
| 24 | |
| 25 | #include <memory> |
| 26 | |
| 27 | namespace orc { |
| 28 | |
| 29 | struct WriterOptionsPrivate { |
| 30 | uint64_t stripeSize; |
| 31 | uint64_t compressionBlockSize; |
| 32 | uint64_t rowIndexStride; |
| 33 | CompressionKind compression; |
| 34 | CompressionStrategy compressionStrategy; |
| 35 | MemoryPool* memoryPool; |
| 36 | double paddingTolerance; |
| 37 | std::ostream* errorStream; |
| 38 | FileVersion fileVersion; |
| 39 | double dictionaryKeySizeThreshold; |
| 40 | bool enableIndex; |
| 41 | |
| 42 | WriterOptionsPrivate() : |
| 43 | fileVersion(0, 11) { // default to Hive_0_11 |
| 44 | stripeSize = 64 * 1024 * 1024; // 64M |
| 45 | compressionBlockSize = 64 * 1024; // 64K |
| 46 | rowIndexStride = 10000; |
| 47 | compression = CompressionKind_ZLIB; |
| 48 | compressionStrategy = CompressionStrategy_SPEED; |
| 49 | memoryPool = getDefaultPool(); |
| 50 | paddingTolerance = 0.0; |
| 51 | errorStream = &std::cerr; |
| 52 | dictionaryKeySizeThreshold = 0.0; |
| 53 | enableIndex = true; |
| 54 | } |
| 55 | }; |
| 56 | |
| 57 | WriterOptions::WriterOptions(): |
| 58 | privateBits(std::unique_ptr<WriterOptionsPrivate> |
| 59 | (new WriterOptionsPrivate())) { |
| 60 | // PASS |
| 61 | } |
| 62 | |
| 63 | WriterOptions::WriterOptions(const WriterOptions& rhs): |
| 64 | privateBits(std::unique_ptr<WriterOptionsPrivate> |
| 65 | (new WriterOptionsPrivate(*(rhs.privateBits.get())))) { |
| 66 | // PASS |
| 67 | } |
| 68 | |
| 69 | WriterOptions::WriterOptions(WriterOptions& rhs) { |
| 70 | // swap privateBits with rhs |
| 71 | WriterOptionsPrivate* l = privateBits.release(); |
| 72 | privateBits.reset(rhs.privateBits.release()); |
| 73 | rhs.privateBits.reset(l); |
| 74 | } |
| 75 | |
| 76 | WriterOptions& WriterOptions::operator=(const WriterOptions& rhs) { |
| 77 | if (this != &rhs) { |
| 78 | privateBits.reset(new WriterOptionsPrivate(*(rhs.privateBits.get()))); |
| 79 | } |
| 80 | return *this; |
| 81 | } |
| 82 | |
| 83 | WriterOptions::~WriterOptions() { |
| 84 | // PASS |
| 85 | } |
| 86 | |
| 87 | WriterOptions& WriterOptions::setStripeSize(uint64_t size) { |
| 88 | privateBits->stripeSize = size; |
| 89 | return *this; |
| 90 | } |
| 91 | |
| 92 | uint64_t WriterOptions::getStripeSize() const { |
| 93 | return privateBits->stripeSize; |
| 94 | } |
| 95 | |
| 96 | WriterOptions& WriterOptions::setCompressionBlockSize(uint64_t size) { |
| 97 | privateBits->compressionBlockSize = size; |
| 98 | return *this; |
| 99 | } |
| 100 | |
| 101 | uint64_t WriterOptions::getCompressionBlockSize() const { |
| 102 | return privateBits->compressionBlockSize; |
| 103 | } |
| 104 | |
| 105 | WriterOptions& WriterOptions::setRowIndexStride(uint64_t stride) { |
| 106 | privateBits->rowIndexStride = stride; |
| 107 | privateBits->enableIndex = (stride != 0); |
| 108 | return *this; |
| 109 | } |
| 110 | |
| 111 | uint64_t WriterOptions::getRowIndexStride() const { |
| 112 | return privateBits->rowIndexStride; |
| 113 | } |
| 114 | |
| 115 | WriterOptions& WriterOptions::setDictionaryKeySizeThreshold(double val) { |
| 116 | privateBits->dictionaryKeySizeThreshold = val; |
| 117 | return *this; |
| 118 | } |
| 119 | |
| 120 | double WriterOptions::getDictionaryKeySizeThreshold() const { |
| 121 | return privateBits->dictionaryKeySizeThreshold; |
| 122 | } |
| 123 | |
| 124 | WriterOptions& WriterOptions::setFileVersion(const FileVersion& version) { |
| 125 | // Only Hive_0_11 version is supported currently |
| 126 | if (version.getMajor() == 0 && version.getMinor() == 11) { |
| 127 | privateBits->fileVersion = version; |
| 128 | return *this; |
| 129 | } |
| 130 | throw std::logic_error("Unpoorted file version specified." ); |
| 131 | } |
| 132 | |
| 133 | FileVersion WriterOptions::getFileVersion() const { |
| 134 | return privateBits->fileVersion; |
| 135 | } |
| 136 | |
| 137 | WriterOptions& WriterOptions::setCompression(CompressionKind comp) { |
| 138 | privateBits->compression = comp; |
| 139 | return *this; |
| 140 | } |
| 141 | |
| 142 | CompressionKind WriterOptions::getCompression() const { |
| 143 | return privateBits->compression; |
| 144 | } |
| 145 | |
| 146 | WriterOptions& WriterOptions::setCompressionStrategy( |
| 147 | CompressionStrategy strategy) { |
| 148 | privateBits->compressionStrategy = strategy; |
| 149 | return *this; |
| 150 | } |
| 151 | |
| 152 | CompressionStrategy WriterOptions::getCompressionStrategy() const { |
| 153 | return privateBits->compressionStrategy; |
| 154 | } |
| 155 | |
| 156 | WriterOptions& WriterOptions::setPaddingTolerance(double tolerance) { |
| 157 | privateBits->paddingTolerance = tolerance; |
| 158 | return *this; |
| 159 | } |
| 160 | |
| 161 | double WriterOptions::getPaddingTolerance() const { |
| 162 | return privateBits->paddingTolerance; |
| 163 | } |
| 164 | |
| 165 | WriterOptions& WriterOptions::setMemoryPool(MemoryPool* memoryPool) { |
| 166 | privateBits->memoryPool = memoryPool; |
| 167 | return *this; |
| 168 | } |
| 169 | |
| 170 | MemoryPool* WriterOptions::getMemoryPool() const { |
| 171 | return privateBits->memoryPool; |
| 172 | } |
| 173 | |
| 174 | WriterOptions& WriterOptions::setErrorStream(std::ostream& errStream) { |
| 175 | privateBits->errorStream = &errStream; |
| 176 | return *this; |
| 177 | } |
| 178 | |
| 179 | std::ostream* WriterOptions::getErrorStream() const { |
| 180 | return privateBits->errorStream; |
| 181 | } |
| 182 | |
| 183 | bool WriterOptions::getEnableIndex() const { |
| 184 | return privateBits->enableIndex; |
| 185 | } |
| 186 | |
| 187 | Writer::~Writer() { |
| 188 | // PASS |
| 189 | } |
| 190 | |
| 191 | class WriterImpl : public Writer { |
| 192 | private: |
| 193 | std::unique_ptr<ColumnWriter> columnWriter; |
| 194 | std::unique_ptr<BufferedOutputStream> compressionStream; |
| 195 | std::unique_ptr<BufferedOutputStream> bufferedStream; |
| 196 | std::unique_ptr<StreamsFactory> streamsFactory; |
| 197 | OutputStream* outStream; |
| 198 | WriterOptions options; |
| 199 | const Type& type; |
| 200 | uint64_t stripeRows, totalRows, indexRows; |
| 201 | uint64_t currentOffset; |
| 202 | proto::Footer ; |
| 203 | proto::PostScript postScript; |
| 204 | proto::StripeInformation stripeInfo; |
| 205 | proto::Metadata metadata; |
| 206 | |
| 207 | static const char* magicId; |
| 208 | static const WriterId writerId; |
| 209 | |
| 210 | public: |
| 211 | WriterImpl( |
| 212 | const Type& type, |
| 213 | OutputStream* stream, |
| 214 | const WriterOptions& options); |
| 215 | |
| 216 | std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size) |
| 217 | const override; |
| 218 | |
| 219 | void add(ColumnVectorBatch& rowsToAdd) override; |
| 220 | |
| 221 | void close() override; |
| 222 | |
| 223 | private: |
| 224 | void init(); |
| 225 | void initStripe(); |
| 226 | void writeStripe(); |
| 227 | void writeMetadata(); |
| 228 | void writeFileFooter(); |
| 229 | void writePostscript(); |
| 230 | void buildFooterType(const Type& t, proto::Footer& , uint32_t& index); |
| 231 | static proto::CompressionKind convertCompressionKind( |
| 232 | const CompressionKind& kind); |
| 233 | }; |
| 234 | |
| 235 | const char * WriterImpl::magicId = "ORC" ; |
| 236 | |
| 237 | const WriterId WriterImpl::writerId = WriterId::ORC_CPP_WRITER; |
| 238 | |
| 239 | WriterImpl::WriterImpl( |
| 240 | const Type& t, |
| 241 | OutputStream* stream, |
| 242 | const WriterOptions& opts) : |
| 243 | outStream(stream), |
| 244 | options(opts), |
| 245 | type(t) { |
| 246 | streamsFactory = createStreamsFactory(options, outStream); |
| 247 | columnWriter = buildWriter(type, *streamsFactory, options); |
| 248 | stripeRows = totalRows = indexRows = 0; |
| 249 | currentOffset = 0; |
| 250 | |
| 251 | // compression stream for stripe footer, file footer and metadata |
| 252 | compressionStream = createCompressor( |
| 253 | options.getCompression(), |
| 254 | outStream, |
| 255 | options.getCompressionStrategy(), |
| 256 | 1 * 1024 * 1024, // buffer capacity: 1M |
| 257 | options.getCompressionBlockSize(), |
| 258 | *options.getMemoryPool()); |
| 259 | |
| 260 | // uncompressed stream for post script |
| 261 | bufferedStream.reset(new BufferedOutputStream( |
| 262 | *options.getMemoryPool(), |
| 263 | outStream, |
| 264 | 1024, // buffer capacity: 1024 bytes |
| 265 | options.getCompressionBlockSize())); |
| 266 | |
| 267 | init(); |
| 268 | } |
| 269 | |
| 270 | std::unique_ptr<ColumnVectorBatch> WriterImpl::createRowBatch(uint64_t size) |
| 271 | const { |
| 272 | return type.createRowBatch(size, *options.getMemoryPool()); |
| 273 | } |
| 274 | |
| 275 | void WriterImpl::add(ColumnVectorBatch& rowsToAdd) { |
| 276 | if (options.getEnableIndex()) { |
| 277 | uint64_t pos = 0; |
| 278 | uint64_t chunkSize = 0; |
| 279 | uint64_t rowIndexStride = options.getRowIndexStride(); |
| 280 | while (pos < rowsToAdd.numElements) { |
| 281 | chunkSize = std::min(rowsToAdd.numElements - pos, |
| 282 | rowIndexStride - indexRows); |
| 283 | columnWriter->add(rowsToAdd, pos, chunkSize); |
| 284 | |
| 285 | pos += chunkSize; |
| 286 | indexRows += chunkSize; |
| 287 | stripeRows += chunkSize; |
| 288 | |
| 289 | if (indexRows >= rowIndexStride) { |
| 290 | columnWriter->createRowIndexEntry(); |
| 291 | indexRows = 0; |
| 292 | } |
| 293 | } |
| 294 | } else { |
| 295 | stripeRows += rowsToAdd.numElements; |
| 296 | columnWriter->add(rowsToAdd, 0, rowsToAdd.numElements); |
| 297 | } |
| 298 | |
| 299 | if (columnWriter->getEstimatedSize() >= options.getStripeSize()) { |
| 300 | writeStripe(); |
| 301 | } |
| 302 | } |
| 303 | |
| 304 | void WriterImpl::close() { |
| 305 | if (stripeRows > 0) { |
| 306 | writeStripe(); |
| 307 | } |
| 308 | writeMetadata(); |
| 309 | writeFileFooter(); |
| 310 | writePostscript(); |
| 311 | outStream->close(); |
| 312 | } |
| 313 | |
| 314 | void WriterImpl::init() { |
| 315 | // Write file header |
| 316 | outStream->write(WriterImpl::magicId, strlen(WriterImpl::magicId)); |
| 317 | currentOffset += strlen(WriterImpl::magicId); |
| 318 | |
| 319 | // Initialize file footer |
| 320 | fileFooter.set_headerlength(currentOffset); |
| 321 | fileFooter.set_contentlength(0); |
| 322 | fileFooter.set_numberofrows(0); |
| 323 | fileFooter.set_rowindexstride( |
| 324 | static_cast<uint32_t>(options.getRowIndexStride())); |
| 325 | fileFooter.set_writer(writerId); |
| 326 | |
| 327 | uint32_t index = 0; |
| 328 | buildFooterType(type, fileFooter, index); |
| 329 | |
| 330 | // Initialize post script |
| 331 | postScript.set_footerlength(0); |
| 332 | postScript.set_compression( |
| 333 | WriterImpl::convertCompressionKind(options.getCompression())); |
| 334 | postScript.set_compressionblocksize(options.getCompressionBlockSize()); |
| 335 | |
| 336 | postScript.add_version(options.getFileVersion().getMajor()); |
| 337 | postScript.add_version(options.getFileVersion().getMinor()); |
| 338 | |
| 339 | postScript.set_writerversion(WriterVersion_ORC_135); |
| 340 | postScript.set_magic("ORC" ); |
| 341 | |
| 342 | // Initialize first stripe |
| 343 | initStripe(); |
| 344 | } |
| 345 | |
| 346 | void WriterImpl::initStripe() { |
| 347 | stripeInfo.set_offset(currentOffset); |
| 348 | stripeInfo.set_indexlength(0); |
| 349 | stripeInfo.set_datalength(0); |
| 350 | stripeInfo.set_footerlength(0); |
| 351 | stripeInfo.set_numberofrows(0); |
| 352 | |
| 353 | stripeRows = indexRows = 0; |
| 354 | } |
| 355 | |
| 356 | void WriterImpl::writeStripe() { |
| 357 | if (options.getEnableIndex() && indexRows != 0) { |
| 358 | columnWriter->createRowIndexEntry(); |
| 359 | indexRows = 0; |
| 360 | } else { |
| 361 | columnWriter->mergeRowGroupStatsIntoStripeStats(); |
| 362 | } |
| 363 | |
| 364 | std::vector<proto::Stream> streams; |
| 365 | // write ROW_INDEX streams |
| 366 | if (options.getEnableIndex()) { |
| 367 | columnWriter->writeIndex(streams); |
| 368 | } |
| 369 | // write streams like PRESENT, DATA, etc. |
| 370 | columnWriter->flush(streams); |
| 371 | |
| 372 | // generate and write stripe footer |
| 373 | proto::StripeFooter ; |
| 374 | for (uint32_t i = 0; i < streams.size(); ++i) { |
| 375 | *stripeFooter.add_streams() = streams[i]; |
| 376 | } |
| 377 | |
| 378 | std::vector<proto::ColumnEncoding> encodings; |
| 379 | columnWriter->getColumnEncoding(encodings); |
| 380 | |
| 381 | for (uint32_t i = 0; i < encodings.size(); ++i) { |
| 382 | *stripeFooter.add_columns() = encodings[i]; |
| 383 | } |
| 384 | |
| 385 | // use GMT to guarantee TimestampVectorBatch from reader can write |
| 386 | // same wall clock time |
| 387 | stripeFooter.set_writertimezone("GMT" ); |
| 388 | |
| 389 | // add stripe statistics to metadata |
| 390 | proto::StripeStatistics* stripeStats = metadata.add_stripestats(); |
| 391 | std::vector<proto::ColumnStatistics> colStats; |
| 392 | columnWriter->getStripeStatistics(colStats); |
| 393 | for (uint32_t i = 0; i != colStats.size(); ++i) { |
| 394 | *stripeStats->add_colstats() = colStats[i]; |
| 395 | } |
| 396 | // merge stripe stats into file stats and clear stripe stats |
| 397 | columnWriter->mergeStripeStatsIntoFileStats(); |
| 398 | |
| 399 | if (!stripeFooter.SerializeToZeroCopyStream(compressionStream.get())) { |
| 400 | throw std::logic_error("Failed to write stripe footer." ); |
| 401 | } |
| 402 | uint64_t = compressionStream->flush(); |
| 403 | |
| 404 | // calculate data length and index length |
| 405 | uint64_t dataLength = 0; |
| 406 | uint64_t indexLength = 0; |
| 407 | for (uint32_t i = 0; i < streams.size(); ++i) { |
| 408 | if (streams[i].kind() == proto::Stream_Kind_ROW_INDEX) { |
| 409 | indexLength += streams[i].length(); |
| 410 | } else { |
| 411 | dataLength += streams[i].length(); |
| 412 | } |
| 413 | } |
| 414 | |
| 415 | // update stripe info |
| 416 | stripeInfo.set_indexlength(indexLength); |
| 417 | stripeInfo.set_datalength(dataLength); |
| 418 | stripeInfo.set_footerlength(footerLength); |
| 419 | stripeInfo.set_numberofrows(stripeRows); |
| 420 | |
| 421 | *fileFooter.add_stripes() = stripeInfo; |
| 422 | |
| 423 | currentOffset = currentOffset + indexLength + dataLength + footerLength; |
| 424 | totalRows += stripeRows; |
| 425 | |
| 426 | columnWriter->reset(); |
| 427 | |
| 428 | initStripe(); |
| 429 | } |
| 430 | |
| 431 | void WriterImpl::writeMetadata() { |
| 432 | if (!metadata.SerializeToZeroCopyStream(compressionStream.get())) { |
| 433 | throw std::logic_error("Failed to write metadata." ); |
| 434 | } |
| 435 | postScript.set_metadatalength(compressionStream.get()->flush()); |
| 436 | } |
| 437 | |
| 438 | void WriterImpl::() { |
| 439 | fileFooter.set_contentlength(currentOffset - fileFooter.headerlength()); |
| 440 | fileFooter.set_numberofrows(totalRows); |
| 441 | |
| 442 | // update file statistics |
| 443 | std::vector<proto::ColumnStatistics> colStats; |
| 444 | columnWriter->getFileStatistics(colStats); |
| 445 | for (uint32_t i = 0; i != colStats.size(); ++i) { |
| 446 | *fileFooter.add_statistics() = colStats[i]; |
| 447 | } |
| 448 | |
| 449 | if (!fileFooter.SerializeToZeroCopyStream(compressionStream.get())) { |
| 450 | throw std::logic_error("Failed to write file footer." ); |
| 451 | } |
| 452 | postScript.set_footerlength(compressionStream->flush()); |
| 453 | } |
| 454 | |
| 455 | void WriterImpl::writePostscript() { |
| 456 | if (!postScript.SerializeToZeroCopyStream(bufferedStream.get())) { |
| 457 | throw std::logic_error("Failed to write post script." ); |
| 458 | } |
| 459 | unsigned char psLength = |
| 460 | static_cast<unsigned char>(bufferedStream->flush()); |
| 461 | outStream->write(&psLength, sizeof(unsigned char)); |
| 462 | } |
| 463 | |
| 464 | void WriterImpl::( |
| 465 | const Type& t, |
| 466 | proto::Footer& , |
| 467 | uint32_t & index) { |
| 468 | proto::Type protoType; |
| 469 | protoType.set_maximumlength(static_cast<uint32_t>(t.getMaximumLength())); |
| 470 | protoType.set_precision(static_cast<uint32_t>(t.getPrecision())); |
| 471 | protoType.set_scale(static_cast<uint32_t>(t.getScale())); |
| 472 | |
| 473 | switch (t.getKind()) { |
| 474 | case BOOLEAN: { |
| 475 | protoType.set_kind(proto::Type_Kind_BOOLEAN); |
| 476 | break; |
| 477 | } |
| 478 | case BYTE: { |
| 479 | protoType.set_kind(proto::Type_Kind_BYTE); |
| 480 | break; |
| 481 | } |
| 482 | case SHORT: { |
| 483 | protoType.set_kind(proto::Type_Kind_SHORT); |
| 484 | break; |
| 485 | } |
| 486 | case INT: { |
| 487 | protoType.set_kind(proto::Type_Kind_INT); |
| 488 | break; |
| 489 | } |
| 490 | case LONG: { |
| 491 | protoType.set_kind(proto::Type_Kind_LONG); |
| 492 | break; |
| 493 | } |
| 494 | case FLOAT: { |
| 495 | protoType.set_kind(proto::Type_Kind_FLOAT); |
| 496 | break; |
| 497 | } |
| 498 | case DOUBLE: { |
| 499 | protoType.set_kind(proto::Type_Kind_DOUBLE); |
| 500 | break; |
| 501 | } |
| 502 | case STRING: { |
| 503 | protoType.set_kind(proto::Type_Kind_STRING); |
| 504 | break; |
| 505 | } |
| 506 | case BINARY: { |
| 507 | protoType.set_kind(proto::Type_Kind_BINARY); |
| 508 | break; |
| 509 | } |
| 510 | case TIMESTAMP: { |
| 511 | protoType.set_kind(proto::Type_Kind_TIMESTAMP); |
| 512 | break; |
| 513 | } |
| 514 | case LIST: { |
| 515 | protoType.set_kind(proto::Type_Kind_LIST); |
| 516 | break; |
| 517 | } |
| 518 | case MAP: { |
| 519 | protoType.set_kind(proto::Type_Kind_MAP); |
| 520 | break; |
| 521 | } |
| 522 | case STRUCT: { |
| 523 | protoType.set_kind(proto::Type_Kind_STRUCT); |
| 524 | break; |
| 525 | } |
| 526 | case UNION: { |
| 527 | protoType.set_kind(proto::Type_Kind_UNION); |
| 528 | break; |
| 529 | } |
| 530 | case DECIMAL: { |
| 531 | protoType.set_kind(proto::Type_Kind_DECIMAL); |
| 532 | break; |
| 533 | } |
| 534 | case DATE: { |
| 535 | protoType.set_kind(proto::Type_Kind_DATE); |
| 536 | break; |
| 537 | } |
| 538 | case VARCHAR: { |
| 539 | protoType.set_kind(proto::Type_Kind_VARCHAR); |
| 540 | break; |
| 541 | } |
| 542 | case CHAR: { |
| 543 | protoType.set_kind(proto::Type_Kind_CHAR); |
| 544 | break; |
| 545 | } |
| 546 | default: |
| 547 | throw std::logic_error("Unknown type." ); |
| 548 | } |
| 549 | |
| 550 | int pos = static_cast<int>(index); |
| 551 | *footer.add_types() = protoType; |
| 552 | |
| 553 | for (uint64_t i = 0; i < t.getSubtypeCount(); ++i) { |
| 554 | if (t.getKind() != LIST && t.getKind() != MAP && t.getKind() != UNION) { |
| 555 | footer.mutable_types(pos)->add_fieldnames(t.getFieldName(i)); |
| 556 | } |
| 557 | footer.mutable_types(pos)->add_subtypes(++index); |
| 558 | buildFooterType(*t.getSubtype(i), footer, index); |
| 559 | } |
| 560 | } |
| 561 | |
| 562 | proto::CompressionKind WriterImpl::convertCompressionKind( |
| 563 | const CompressionKind& kind) { |
| 564 | return static_cast<proto::CompressionKind>(kind); |
| 565 | } |
| 566 | |
| 567 | std::unique_ptr<Writer> createWriter( |
| 568 | const Type& type, |
| 569 | OutputStream* stream, |
| 570 | const WriterOptions& options) { |
| 571 | return std::unique_ptr<Writer>( |
| 572 | new WriterImpl( |
| 573 | type, |
| 574 | stream, |
| 575 | options)); |
| 576 | } |
| 577 | |
| 578 | } |
| 579 | |
| 580 | |