1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include "orc/Common.hh"
20#include "orc/OrcFile.hh"
21
22#include "ColumnWriter.hh"
23#include "Timezone.hh"
24
25#include <memory>
26
27namespace orc {
28
29 struct WriterOptionsPrivate {
30 uint64_t stripeSize;
31 uint64_t compressionBlockSize;
32 uint64_t rowIndexStride;
33 CompressionKind compression;
34 CompressionStrategy compressionStrategy;
35 MemoryPool* memoryPool;
36 double paddingTolerance;
37 std::ostream* errorStream;
38 FileVersion fileVersion;
39 double dictionaryKeySizeThreshold;
40 bool enableIndex;
41
42 WriterOptionsPrivate() :
43 fileVersion(0, 11) { // default to Hive_0_11
44 stripeSize = 64 * 1024 * 1024; // 64M
45 compressionBlockSize = 64 * 1024; // 64K
46 rowIndexStride = 10000;
47 compression = CompressionKind_ZLIB;
48 compressionStrategy = CompressionStrategy_SPEED;
49 memoryPool = getDefaultPool();
50 paddingTolerance = 0.0;
51 errorStream = &std::cerr;
52 dictionaryKeySizeThreshold = 0.0;
53 enableIndex = true;
54 }
55 };
56
57 WriterOptions::WriterOptions():
58 privateBits(std::unique_ptr<WriterOptionsPrivate>
59 (new WriterOptionsPrivate())) {
60 // PASS
61 }
62
63 WriterOptions::WriterOptions(const WriterOptions& rhs):
64 privateBits(std::unique_ptr<WriterOptionsPrivate>
65 (new WriterOptionsPrivate(*(rhs.privateBits.get())))) {
66 // PASS
67 }
68
69 WriterOptions::WriterOptions(WriterOptions& rhs) {
70 // swap privateBits with rhs
71 WriterOptionsPrivate* l = privateBits.release();
72 privateBits.reset(rhs.privateBits.release());
73 rhs.privateBits.reset(l);
74 }
75
76 WriterOptions& WriterOptions::operator=(const WriterOptions& rhs) {
77 if (this != &rhs) {
78 privateBits.reset(new WriterOptionsPrivate(*(rhs.privateBits.get())));
79 }
80 return *this;
81 }
82
83 WriterOptions::~WriterOptions() {
84 // PASS
85 }
86
87 WriterOptions& WriterOptions::setStripeSize(uint64_t size) {
88 privateBits->stripeSize = size;
89 return *this;
90 }
91
92 uint64_t WriterOptions::getStripeSize() const {
93 return privateBits->stripeSize;
94 }
95
96 WriterOptions& WriterOptions::setCompressionBlockSize(uint64_t size) {
97 privateBits->compressionBlockSize = size;
98 return *this;
99 }
100
101 uint64_t WriterOptions::getCompressionBlockSize() const {
102 return privateBits->compressionBlockSize;
103 }
104
105 WriterOptions& WriterOptions::setRowIndexStride(uint64_t stride) {
106 privateBits->rowIndexStride = stride;
107 privateBits->enableIndex = (stride != 0);
108 return *this;
109 }
110
111 uint64_t WriterOptions::getRowIndexStride() const {
112 return privateBits->rowIndexStride;
113 }
114
115 WriterOptions& WriterOptions::setDictionaryKeySizeThreshold(double val) {
116 privateBits->dictionaryKeySizeThreshold = val;
117 return *this;
118 }
119
120 double WriterOptions::getDictionaryKeySizeThreshold() const {
121 return privateBits->dictionaryKeySizeThreshold;
122 }
123
124 WriterOptions& WriterOptions::setFileVersion(const FileVersion& version) {
125 // Only Hive_0_11 version is supported currently
126 if (version.getMajor() == 0 && version.getMinor() == 11) {
127 privateBits->fileVersion = version;
128 return *this;
129 }
130 throw std::logic_error("Unpoorted file version specified.");
131 }
132
133 FileVersion WriterOptions::getFileVersion() const {
134 return privateBits->fileVersion;
135 }
136
137 WriterOptions& WriterOptions::setCompression(CompressionKind comp) {
138 privateBits->compression = comp;
139 return *this;
140 }
141
142 CompressionKind WriterOptions::getCompression() const {
143 return privateBits->compression;
144 }
145
146 WriterOptions& WriterOptions::setCompressionStrategy(
147 CompressionStrategy strategy) {
148 privateBits->compressionStrategy = strategy;
149 return *this;
150 }
151
152 CompressionStrategy WriterOptions::getCompressionStrategy() const {
153 return privateBits->compressionStrategy;
154 }
155
156 WriterOptions& WriterOptions::setPaddingTolerance(double tolerance) {
157 privateBits->paddingTolerance = tolerance;
158 return *this;
159 }
160
161 double WriterOptions::getPaddingTolerance() const {
162 return privateBits->paddingTolerance;
163 }
164
165 WriterOptions& WriterOptions::setMemoryPool(MemoryPool* memoryPool) {
166 privateBits->memoryPool = memoryPool;
167 return *this;
168 }
169
170 MemoryPool* WriterOptions::getMemoryPool() const {
171 return privateBits->memoryPool;
172 }
173
174 WriterOptions& WriterOptions::setErrorStream(std::ostream& errStream) {
175 privateBits->errorStream = &errStream;
176 return *this;
177 }
178
179 std::ostream* WriterOptions::getErrorStream() const {
180 return privateBits->errorStream;
181 }
182
183 bool WriterOptions::getEnableIndex() const {
184 return privateBits->enableIndex;
185 }
186
187 Writer::~Writer() {
188 // PASS
189 }
190
191 class WriterImpl : public Writer {
192 private:
193 std::unique_ptr<ColumnWriter> columnWriter;
194 std::unique_ptr<BufferedOutputStream> compressionStream;
195 std::unique_ptr<BufferedOutputStream> bufferedStream;
196 std::unique_ptr<StreamsFactory> streamsFactory;
197 OutputStream* outStream;
198 WriterOptions options;
199 const Type& type;
200 uint64_t stripeRows, totalRows, indexRows;
201 uint64_t currentOffset;
202 proto::Footer fileFooter;
203 proto::PostScript postScript;
204 proto::StripeInformation stripeInfo;
205 proto::Metadata metadata;
206
207 static const char* magicId;
208 static const WriterId writerId;
209
210 public:
211 WriterImpl(
212 const Type& type,
213 OutputStream* stream,
214 const WriterOptions& options);
215
216 std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size)
217 const override;
218
219 void add(ColumnVectorBatch& rowsToAdd) override;
220
221 void close() override;
222
223 private:
224 void init();
225 void initStripe();
226 void writeStripe();
227 void writeMetadata();
228 void writeFileFooter();
229 void writePostscript();
230 void buildFooterType(const Type& t, proto::Footer& footer, uint32_t& index);
231 static proto::CompressionKind convertCompressionKind(
232 const CompressionKind& kind);
233 };
234
235 const char * WriterImpl::magicId = "ORC";
236
237 const WriterId WriterImpl::writerId = WriterId::ORC_CPP_WRITER;
238
239 WriterImpl::WriterImpl(
240 const Type& t,
241 OutputStream* stream,
242 const WriterOptions& opts) :
243 outStream(stream),
244 options(opts),
245 type(t) {
246 streamsFactory = createStreamsFactory(options, outStream);
247 columnWriter = buildWriter(type, *streamsFactory, options);
248 stripeRows = totalRows = indexRows = 0;
249 currentOffset = 0;
250
251 // compression stream for stripe footer, file footer and metadata
252 compressionStream = createCompressor(
253 options.getCompression(),
254 outStream,
255 options.getCompressionStrategy(),
256 1 * 1024 * 1024, // buffer capacity: 1M
257 options.getCompressionBlockSize(),
258 *options.getMemoryPool());
259
260 // uncompressed stream for post script
261 bufferedStream.reset(new BufferedOutputStream(
262 *options.getMemoryPool(),
263 outStream,
264 1024, // buffer capacity: 1024 bytes
265 options.getCompressionBlockSize()));
266
267 init();
268 }
269
270 std::unique_ptr<ColumnVectorBatch> WriterImpl::createRowBatch(uint64_t size)
271 const {
272 return type.createRowBatch(size, *options.getMemoryPool());
273 }
274
275 void WriterImpl::add(ColumnVectorBatch& rowsToAdd) {
276 if (options.getEnableIndex()) {
277 uint64_t pos = 0;
278 uint64_t chunkSize = 0;
279 uint64_t rowIndexStride = options.getRowIndexStride();
280 while (pos < rowsToAdd.numElements) {
281 chunkSize = std::min(rowsToAdd.numElements - pos,
282 rowIndexStride - indexRows);
283 columnWriter->add(rowsToAdd, pos, chunkSize);
284
285 pos += chunkSize;
286 indexRows += chunkSize;
287 stripeRows += chunkSize;
288
289 if (indexRows >= rowIndexStride) {
290 columnWriter->createRowIndexEntry();
291 indexRows = 0;
292 }
293 }
294 } else {
295 stripeRows += rowsToAdd.numElements;
296 columnWriter->add(rowsToAdd, 0, rowsToAdd.numElements);
297 }
298
299 if (columnWriter->getEstimatedSize() >= options.getStripeSize()) {
300 writeStripe();
301 }
302 }
303
304 void WriterImpl::close() {
305 if (stripeRows > 0) {
306 writeStripe();
307 }
308 writeMetadata();
309 writeFileFooter();
310 writePostscript();
311 outStream->close();
312 }
313
314 void WriterImpl::init() {
315 // Write file header
316 outStream->write(WriterImpl::magicId, strlen(WriterImpl::magicId));
317 currentOffset += strlen(WriterImpl::magicId);
318
319 // Initialize file footer
320 fileFooter.set_headerlength(currentOffset);
321 fileFooter.set_contentlength(0);
322 fileFooter.set_numberofrows(0);
323 fileFooter.set_rowindexstride(
324 static_cast<uint32_t>(options.getRowIndexStride()));
325 fileFooter.set_writer(writerId);
326
327 uint32_t index = 0;
328 buildFooterType(type, fileFooter, index);
329
330 // Initialize post script
331 postScript.set_footerlength(0);
332 postScript.set_compression(
333 WriterImpl::convertCompressionKind(options.getCompression()));
334 postScript.set_compressionblocksize(options.getCompressionBlockSize());
335
336 postScript.add_version(options.getFileVersion().getMajor());
337 postScript.add_version(options.getFileVersion().getMinor());
338
339 postScript.set_writerversion(WriterVersion_ORC_135);
340 postScript.set_magic("ORC");
341
342 // Initialize first stripe
343 initStripe();
344 }
345
346 void WriterImpl::initStripe() {
347 stripeInfo.set_offset(currentOffset);
348 stripeInfo.set_indexlength(0);
349 stripeInfo.set_datalength(0);
350 stripeInfo.set_footerlength(0);
351 stripeInfo.set_numberofrows(0);
352
353 stripeRows = indexRows = 0;
354 }
355
356 void WriterImpl::writeStripe() {
357 if (options.getEnableIndex() && indexRows != 0) {
358 columnWriter->createRowIndexEntry();
359 indexRows = 0;
360 } else {
361 columnWriter->mergeRowGroupStatsIntoStripeStats();
362 }
363
364 std::vector<proto::Stream> streams;
365 // write ROW_INDEX streams
366 if (options.getEnableIndex()) {
367 columnWriter->writeIndex(streams);
368 }
369 // write streams like PRESENT, DATA, etc.
370 columnWriter->flush(streams);
371
372 // generate and write stripe footer
373 proto::StripeFooter stripeFooter;
374 for (uint32_t i = 0; i < streams.size(); ++i) {
375 *stripeFooter.add_streams() = streams[i];
376 }
377
378 std::vector<proto::ColumnEncoding> encodings;
379 columnWriter->getColumnEncoding(encodings);
380
381 for (uint32_t i = 0; i < encodings.size(); ++i) {
382 *stripeFooter.add_columns() = encodings[i];
383 }
384
385 // use GMT to guarantee TimestampVectorBatch from reader can write
386 // same wall clock time
387 stripeFooter.set_writertimezone("GMT");
388
389 // add stripe statistics to metadata
390 proto::StripeStatistics* stripeStats = metadata.add_stripestats();
391 std::vector<proto::ColumnStatistics> colStats;
392 columnWriter->getStripeStatistics(colStats);
393 for (uint32_t i = 0; i != colStats.size(); ++i) {
394 *stripeStats->add_colstats() = colStats[i];
395 }
396 // merge stripe stats into file stats and clear stripe stats
397 columnWriter->mergeStripeStatsIntoFileStats();
398
399 if (!stripeFooter.SerializeToZeroCopyStream(compressionStream.get())) {
400 throw std::logic_error("Failed to write stripe footer.");
401 }
402 uint64_t footerLength = compressionStream->flush();
403
404 // calculate data length and index length
405 uint64_t dataLength = 0;
406 uint64_t indexLength = 0;
407 for (uint32_t i = 0; i < streams.size(); ++i) {
408 if (streams[i].kind() == proto::Stream_Kind_ROW_INDEX) {
409 indexLength += streams[i].length();
410 } else {
411 dataLength += streams[i].length();
412 }
413 }
414
415 // update stripe info
416 stripeInfo.set_indexlength(indexLength);
417 stripeInfo.set_datalength(dataLength);
418 stripeInfo.set_footerlength(footerLength);
419 stripeInfo.set_numberofrows(stripeRows);
420
421 *fileFooter.add_stripes() = stripeInfo;
422
423 currentOffset = currentOffset + indexLength + dataLength + footerLength;
424 totalRows += stripeRows;
425
426 columnWriter->reset();
427
428 initStripe();
429 }
430
431 void WriterImpl::writeMetadata() {
432 if (!metadata.SerializeToZeroCopyStream(compressionStream.get())) {
433 throw std::logic_error("Failed to write metadata.");
434 }
435 postScript.set_metadatalength(compressionStream.get()->flush());
436 }
437
438 void WriterImpl::writeFileFooter() {
439 fileFooter.set_contentlength(currentOffset - fileFooter.headerlength());
440 fileFooter.set_numberofrows(totalRows);
441
442 // update file statistics
443 std::vector<proto::ColumnStatistics> colStats;
444 columnWriter->getFileStatistics(colStats);
445 for (uint32_t i = 0; i != colStats.size(); ++i) {
446 *fileFooter.add_statistics() = colStats[i];
447 }
448
449 if (!fileFooter.SerializeToZeroCopyStream(compressionStream.get())) {
450 throw std::logic_error("Failed to write file footer.");
451 }
452 postScript.set_footerlength(compressionStream->flush());
453 }
454
455 void WriterImpl::writePostscript() {
456 if (!postScript.SerializeToZeroCopyStream(bufferedStream.get())) {
457 throw std::logic_error("Failed to write post script.");
458 }
459 unsigned char psLength =
460 static_cast<unsigned char>(bufferedStream->flush());
461 outStream->write(&psLength, sizeof(unsigned char));
462 }
463
464 void WriterImpl::buildFooterType(
465 const Type& t,
466 proto::Footer& footer,
467 uint32_t & index) {
468 proto::Type protoType;
469 protoType.set_maximumlength(static_cast<uint32_t>(t.getMaximumLength()));
470 protoType.set_precision(static_cast<uint32_t>(t.getPrecision()));
471 protoType.set_scale(static_cast<uint32_t>(t.getScale()));
472
473 switch (t.getKind()) {
474 case BOOLEAN: {
475 protoType.set_kind(proto::Type_Kind_BOOLEAN);
476 break;
477 }
478 case BYTE: {
479 protoType.set_kind(proto::Type_Kind_BYTE);
480 break;
481 }
482 case SHORT: {
483 protoType.set_kind(proto::Type_Kind_SHORT);
484 break;
485 }
486 case INT: {
487 protoType.set_kind(proto::Type_Kind_INT);
488 break;
489 }
490 case LONG: {
491 protoType.set_kind(proto::Type_Kind_LONG);
492 break;
493 }
494 case FLOAT: {
495 protoType.set_kind(proto::Type_Kind_FLOAT);
496 break;
497 }
498 case DOUBLE: {
499 protoType.set_kind(proto::Type_Kind_DOUBLE);
500 break;
501 }
502 case STRING: {
503 protoType.set_kind(proto::Type_Kind_STRING);
504 break;
505 }
506 case BINARY: {
507 protoType.set_kind(proto::Type_Kind_BINARY);
508 break;
509 }
510 case TIMESTAMP: {
511 protoType.set_kind(proto::Type_Kind_TIMESTAMP);
512 break;
513 }
514 case LIST: {
515 protoType.set_kind(proto::Type_Kind_LIST);
516 break;
517 }
518 case MAP: {
519 protoType.set_kind(proto::Type_Kind_MAP);
520 break;
521 }
522 case STRUCT: {
523 protoType.set_kind(proto::Type_Kind_STRUCT);
524 break;
525 }
526 case UNION: {
527 protoType.set_kind(proto::Type_Kind_UNION);
528 break;
529 }
530 case DECIMAL: {
531 protoType.set_kind(proto::Type_Kind_DECIMAL);
532 break;
533 }
534 case DATE: {
535 protoType.set_kind(proto::Type_Kind_DATE);
536 break;
537 }
538 case VARCHAR: {
539 protoType.set_kind(proto::Type_Kind_VARCHAR);
540 break;
541 }
542 case CHAR: {
543 protoType.set_kind(proto::Type_Kind_CHAR);
544 break;
545 }
546 default:
547 throw std::logic_error("Unknown type.");
548 }
549
550 int pos = static_cast<int>(index);
551 *footer.add_types() = protoType;
552
553 for (uint64_t i = 0; i < t.getSubtypeCount(); ++i) {
554 if (t.getKind() != LIST && t.getKind() != MAP && t.getKind() != UNION) {
555 footer.mutable_types(pos)->add_fieldnames(t.getFieldName(i));
556 }
557 footer.mutable_types(pos)->add_subtypes(++index);
558 buildFooterType(*t.getSubtype(i), footer, index);
559 }
560 }
561
562 proto::CompressionKind WriterImpl::convertCompressionKind(
563 const CompressionKind& kind) {
564 return static_cast<proto::CompressionKind>(kind);
565 }
566
567 std::unique_ptr<Writer> createWriter(
568 const Type& type,
569 OutputStream* stream,
570 const WriterOptions& options) {
571 return std::unique_ptr<Writer>(
572 new WriterImpl(
573 type,
574 stream,
575 options));
576 }
577
578}
579
580