1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include "orc/Int128.hh"
20#include "orc/Writer.hh"
21
22#include "ByteRLE.hh"
23#include "ColumnWriter.hh"
24#include "RLE.hh"
25#include "Statistics.hh"
26#include "Timezone.hh"
27
28namespace orc {
29 StreamsFactory::~StreamsFactory() {
30 //PASS
31 }
32
33 class StreamsFactoryImpl : public StreamsFactory {
34 public:
35 StreamsFactoryImpl(
36 const WriterOptions& writerOptions,
37 OutputStream* outputStream) :
38 options(writerOptions),
39 outStream(outputStream) {
40 }
41
42 virtual std::unique_ptr<BufferedOutputStream>
43 createStream(proto::Stream_Kind kind) const override;
44 private:
45 const WriterOptions& options;
46 OutputStream* outStream;
47 };
48
49 std::unique_ptr<BufferedOutputStream> StreamsFactoryImpl::createStream(
50 proto::Stream_Kind) const {
51 // In the future, we can decide compression strategy and modifier
52 // based on stream kind. But for now we just use the setting from
53 // WriterOption
54 return createCompressor(
55 options.getCompression(),
56 outStream,
57 options.getCompressionStrategy(),
58 // BufferedOutputStream initial capacity
59 1 * 1024 * 1024,
60 options.getCompressionBlockSize(),
61 *options.getMemoryPool());
62 }
63
64 std::unique_ptr<StreamsFactory> createStreamsFactory(
65 const WriterOptions& options,
66 OutputStream* outStream) {
67 return std::unique_ptr<StreamsFactory>(
68 new StreamsFactoryImpl(options, outStream));
69 }
70
71 RowIndexPositionRecorder::~RowIndexPositionRecorder() {
72 // PASS
73 }
74
75 ColumnWriter::ColumnWriter(
76 const Type& type,
77 const StreamsFactory& factory,
78 const WriterOptions& options) :
79 columnId(type.getColumnId()),
80 colIndexStatistics(),
81 colStripeStatistics(),
82 colFileStatistics(),
83 enableIndex(options.getEnableIndex()),
84 rowIndex(),
85 rowIndexEntry(),
86 rowIndexPosition(),
87 memPool(*options.getMemoryPool()),
88 indexStream() {
89
90 std::unique_ptr<BufferedOutputStream> presentStream =
91 factory.createStream(proto::Stream_Kind_PRESENT);
92 notNullEncoder = createBooleanRleEncoder(std::move(presentStream));
93
94 colIndexStatistics = createColumnStatistics(type);
95 colStripeStatistics = createColumnStatistics(type);
96 colFileStatistics = createColumnStatistics(type);
97
98 if (enableIndex) {
99 rowIndex = std::unique_ptr<proto::RowIndex>(new proto::RowIndex());
100 rowIndexEntry =
101 std::unique_ptr<proto::RowIndexEntry>(new proto::RowIndexEntry());
102 rowIndexPosition = std::unique_ptr<RowIndexPositionRecorder>(
103 new RowIndexPositionRecorder(*rowIndexEntry));
104 indexStream =
105 factory.createStream(proto::Stream_Kind_ROW_INDEX);
106 }
107 }
108
109 ColumnWriter::~ColumnWriter() {
110 // PASS
111 }
112
113 void ColumnWriter::add(ColumnVectorBatch& batch,
114 uint64_t offset,
115 uint64_t numValues) {
116 notNullEncoder->add(batch.notNull.data() + offset, numValues, nullptr);
117 }
118
119 void ColumnWriter::flush(std::vector<proto::Stream>& streams) {
120 proto::Stream stream;
121 stream.set_kind(proto::Stream_Kind_PRESENT);
122 stream.set_column(static_cast<uint32_t>(columnId));
123 stream.set_length(notNullEncoder->flush());
124 streams.push_back(stream);
125 }
126
127 uint64_t ColumnWriter::getEstimatedSize() const {
128 return notNullEncoder->getBufferSize();
129 }
130
131 void ColumnWriter::getStripeStatistics(
132 std::vector<proto::ColumnStatistics>& stats) const {
133 getProtoBufStatistics(stats, colStripeStatistics.get());
134 }
135
136 void ColumnWriter::mergeStripeStatsIntoFileStats() {
137 colFileStatistics->merge(*colStripeStatistics);
138 colStripeStatistics->reset();
139 }
140
141 void ColumnWriter::mergeRowGroupStatsIntoStripeStats() {
142 colStripeStatistics->merge(*colIndexStatistics);
143 colIndexStatistics->reset();
144 }
145
146 void ColumnWriter::getFileStatistics(
147 std::vector<proto::ColumnStatistics>& stats) const {
148 getProtoBufStatistics(stats, colFileStatistics.get());
149 }
150
151 void ColumnWriter::createRowIndexEntry() {
152 proto::ColumnStatistics *indexStats = rowIndexEntry->mutable_statistics();
153 colIndexStatistics->toProtoBuf(*indexStats);
154
155 *rowIndex->add_entry() = *rowIndexEntry;
156
157 rowIndexEntry->clear_positions();
158 rowIndexEntry->clear_statistics();
159
160 colStripeStatistics->merge(*colIndexStatistics);
161 colIndexStatistics->reset();
162
163 recordPosition();
164 }
165
166 void ColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const {
167 // write row index to output stream
168 rowIndex->SerializeToZeroCopyStream(indexStream.get());
169
170 // construct row index stream
171 proto::Stream stream;
172 stream.set_kind(proto::Stream_Kind_ROW_INDEX);
173 stream.set_column(static_cast<uint32_t>(columnId));
174 stream.set_length(indexStream->flush());
175 streams.push_back(stream);
176 }
177
178 void ColumnWriter::recordPosition() const {
179 notNullEncoder->recordPosition(rowIndexPosition.get());
180 }
181
182 void ColumnWriter::reset() {
183 if (enableIndex) {
184 // clear row index
185 rowIndex->clear_entry();
186 rowIndexEntry->clear_positions();
187 rowIndexEntry->clear_statistics();
188
189 // write current positions
190 recordPosition();
191 }
192 }
193
194 class StructColumnWriter : public ColumnWriter {
195 public:
196 StructColumnWriter(
197 const Type& type,
198 const StreamsFactory& factory,
199 const WriterOptions& options);
200 ~StructColumnWriter() override;
201
202 virtual void add(ColumnVectorBatch& rowBatch,
203 uint64_t offset,
204 uint64_t numValues) override;
205
206 virtual void flush(std::vector<proto::Stream>& streams) override;
207
208 virtual uint64_t getEstimatedSize() const override;
209 virtual void getColumnEncoding(
210 std::vector<proto::ColumnEncoding>& encodings) const override;
211
212 virtual void getStripeStatistics(
213 std::vector<proto::ColumnStatistics>& stats) const override;
214
215 virtual void getFileStatistics(
216 std::vector<proto::ColumnStatistics>& stats) const override;
217
218 virtual void mergeStripeStatsIntoFileStats() override;
219
220 virtual void mergeRowGroupStatsIntoStripeStats() override;
221
222 virtual void createRowIndexEntry() override;
223
224 virtual void writeIndex(
225 std::vector<proto::Stream> &streams) const override;
226
227 virtual void reset() override;
228
229 private:
230 std::vector<ColumnWriter *> children;
231 };
232
233 StructColumnWriter::StructColumnWriter(
234 const Type& type,
235 const StreamsFactory& factory,
236 const WriterOptions& options) :
237 ColumnWriter(type, factory, options) {
238 for(unsigned int i = 0; i < type.getSubtypeCount(); ++i) {
239 const Type& child = *type.getSubtype(i);
240 children.push_back(buildWriter(child, factory, options).release());
241 }
242
243 if (enableIndex) {
244 recordPosition();
245 }
246 }
247
248 StructColumnWriter::~StructColumnWriter() {
249 for (uint32_t i = 0; i < children.size(); ++i) {
250 delete children[i];
251 }
252 }
253
254 void StructColumnWriter::add(
255 ColumnVectorBatch& rowBatch,
256 uint64_t offset,
257 uint64_t numValues) {
258 ColumnWriter::add(rowBatch, offset, numValues);
259 const StructVectorBatch* structBatch =
260 dynamic_cast<const StructVectorBatch *>(&rowBatch);
261 if (structBatch == nullptr) {
262 throw InvalidArgument("Failed to cast to StructVectorBatch");
263 }
264
265 for (uint32_t i = 0; i < children.size(); ++i) {
266 children[i]->add(*structBatch->fields[i], offset, numValues);
267 }
268
269 // update stats
270 bool hasNull = false;
271 if (!structBatch->hasNulls) {
272 colIndexStatistics->increase(numValues);
273 } else {
274 const char* notNull = structBatch->notNull.data() + offset;
275 for (uint64_t i = 0; i < numValues; ++i) {
276 if (notNull[i]) {
277 colIndexStatistics->increase(1);
278 } else if (!hasNull) {
279 hasNull = true;
280 }
281 }
282 }
283 colIndexStatistics->setHasNull(hasNull);
284 }
285
286 void StructColumnWriter::flush(std::vector<proto::Stream>& streams) {
287 ColumnWriter::flush(streams);
288 for (uint32_t i = 0; i < children.size(); ++i) {
289 children[i]->flush(streams);
290 }
291 }
292
293 void StructColumnWriter::writeIndex(
294 std::vector<proto::Stream> &streams) const {
295 ColumnWriter::writeIndex(streams);
296 for (uint32_t i = 0; i < children.size(); ++i) {
297 children[i]->writeIndex(streams);
298 }
299 }
300
301 uint64_t StructColumnWriter::getEstimatedSize() const {
302 uint64_t size = ColumnWriter::getEstimatedSize();
303 for (uint32_t i = 0; i < children.size(); ++i) {
304 size += children[i]->getEstimatedSize();
305 }
306 return size;
307 }
308
309 void StructColumnWriter::getColumnEncoding(
310 std::vector<proto::ColumnEncoding>& encodings) const {
311 proto::ColumnEncoding encoding;
312 encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
313 encoding.set_dictionarysize(0);
314 encodings.push_back(encoding);
315 for (uint32_t i = 0; i < children.size(); ++i) {
316 children[i]->getColumnEncoding(encodings);
317 }
318 }
319
320 void StructColumnWriter::getStripeStatistics(
321 std::vector<proto::ColumnStatistics>& stats) const {
322 ColumnWriter::getStripeStatistics(stats);
323
324 for (uint32_t i = 0; i < children.size(); ++i) {
325 children[i]->getStripeStatistics(stats);
326 }
327 }
328
329 void StructColumnWriter::mergeStripeStatsIntoFileStats() {
330 ColumnWriter::mergeStripeStatsIntoFileStats();
331
332 for (uint32_t i = 0; i < children.size(); ++i) {
333 children[i]->mergeStripeStatsIntoFileStats();
334 }
335 }
336
337 void StructColumnWriter::getFileStatistics(
338 std::vector<proto::ColumnStatistics>& stats) const {
339 ColumnWriter::getFileStatistics(stats);
340
341 for (uint32_t i = 0; i < children.size(); ++i) {
342 children[i]->getFileStatistics(stats);
343 }
344 }
345
346 void StructColumnWriter::mergeRowGroupStatsIntoStripeStats() {
347 ColumnWriter::mergeRowGroupStatsIntoStripeStats();
348
349 for (uint32_t i = 0; i < children.size(); ++i) {
350 children[i]->mergeRowGroupStatsIntoStripeStats();
351 }
352 }
353
354 void StructColumnWriter::createRowIndexEntry() {
355 ColumnWriter::createRowIndexEntry();
356
357 for (uint32_t i = 0; i < children.size(); ++i) {
358 children[i]->createRowIndexEntry();
359 }
360 }
361
362 void StructColumnWriter::reset() {
363 ColumnWriter::reset();
364
365 for (uint32_t i = 0; i < children.size(); ++i) {
366 children[i]->reset();
367 }
368 }
369
370 class IntegerColumnWriter : public ColumnWriter {
371 public:
372 IntegerColumnWriter(
373 const Type& type,
374 const StreamsFactory& factory,
375 const WriterOptions& options);
376
377 virtual void add(ColumnVectorBatch& rowBatch,
378 uint64_t offset,
379 uint64_t numValues) override;
380
381 virtual void flush(std::vector<proto::Stream>& streams) override;
382
383 virtual uint64_t getEstimatedSize() const override;
384
385 virtual void getColumnEncoding(
386 std::vector<proto::ColumnEncoding>& encodings) const override;
387
388 virtual void recordPosition() const override;
389
390 protected:
391 std::unique_ptr<RleEncoder> rleEncoder;
392
393 private:
394 RleVersion rleVersion;
395 };
396
397 IntegerColumnWriter::IntegerColumnWriter(
398 const Type& type,
399 const StreamsFactory& factory,
400 const WriterOptions& options) :
401 ColumnWriter(type, factory, options),
402 rleVersion(RleVersion_1) {
403 std::unique_ptr<BufferedOutputStream> dataStream =
404 factory.createStream(proto::Stream_Kind_DATA);
405 rleEncoder = createRleEncoder(
406 std::move(dataStream),
407 true,
408 rleVersion,
409 memPool);
410
411 if (enableIndex) {
412 recordPosition();
413 }
414 }
415
416 void IntegerColumnWriter::add(
417 ColumnVectorBatch& rowBatch,
418 uint64_t offset,
419 uint64_t numValues) {
420 ColumnWriter::add(rowBatch, offset, numValues);
421
422 const LongVectorBatch* longBatch =
423 dynamic_cast<const LongVectorBatch*>(&rowBatch);
424 if (longBatch == nullptr) {
425 throw InvalidArgument("Failed to cast to LongVectorBatch");
426 }
427
428 const int64_t* data = longBatch->data.data() + offset;
429 const char* notNull = longBatch->hasNulls ?
430 longBatch->notNull.data() + offset : nullptr;
431
432 rleEncoder->add(data, numValues, notNull);
433
434 // update stats
435 IntegerColumnStatisticsImpl* intStats =
436 dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
437 if (intStats == nullptr) {
438 throw InvalidArgument("Failed to cast to IntegerColumnStatisticsImpl");
439 }
440
441 bool hasNull = false;
442 for (uint64_t i = 0; i < numValues; ++i) {
443 if (notNull == nullptr || notNull[i]) {
444 intStats->increase(1);
445 intStats->update(data[i], 1);
446 } else if (!hasNull) {
447 hasNull = true;
448 }
449 }
450 intStats->setHasNull(hasNull);
451 }
452
453 void IntegerColumnWriter::flush(std::vector<proto::Stream>& streams) {
454 ColumnWriter::flush(streams);
455
456 proto::Stream stream;
457 stream.set_kind(proto::Stream_Kind_DATA);
458 stream.set_column(static_cast<uint32_t>(columnId));
459 stream.set_length(rleEncoder->flush());
460 streams.push_back(stream);
461 }
462
463 uint64_t IntegerColumnWriter::getEstimatedSize() const {
464 uint64_t size = ColumnWriter::getEstimatedSize();
465 size += rleEncoder->getBufferSize();
466 return size;
467 }
468
469 void IntegerColumnWriter::getColumnEncoding(
470 std::vector<proto::ColumnEncoding>& encodings) const {
471 proto::ColumnEncoding encoding;
472 encoding.set_kind(rleVersion == RleVersion_1 ?
473 proto::ColumnEncoding_Kind_DIRECT :
474 proto::ColumnEncoding_Kind_DIRECT_V2);
475 encoding.set_dictionarysize(0);
476 encodings.push_back(encoding);
477 }
478
479 void IntegerColumnWriter::recordPosition() const {
480 ColumnWriter::recordPosition();
481 rleEncoder->recordPosition(rowIndexPosition.get());
482 }
483
484 class ByteColumnWriter : public ColumnWriter {
485 public:
486 ByteColumnWriter(const Type& type,
487 const StreamsFactory& factory,
488 const WriterOptions& options);
489
490 virtual void add(ColumnVectorBatch& rowBatch,
491 uint64_t offset,
492 uint64_t numValues) override;
493
494 virtual void flush(std::vector<proto::Stream>& streams) override;
495
496 virtual uint64_t getEstimatedSize() const override;
497
498 virtual void getColumnEncoding(
499 std::vector<proto::ColumnEncoding>& encodings) const override;
500
501 virtual void recordPosition() const override;
502
503 private:
504 std::unique_ptr<ByteRleEncoder> byteRleEncoder;
505 };
506
507 ByteColumnWriter::ByteColumnWriter(
508 const Type& type,
509 const StreamsFactory& factory,
510 const WriterOptions& options) :
511 ColumnWriter(type, factory, options) {
512 std::unique_ptr<BufferedOutputStream> dataStream =
513 factory.createStream(proto::Stream_Kind_DATA);
514 byteRleEncoder = createByteRleEncoder(std::move(dataStream));
515
516 if (enableIndex) {
517 recordPosition();
518 }
519 }
520
521 void ByteColumnWriter::add(ColumnVectorBatch& rowBatch,
522 uint64_t offset,
523 uint64_t numValues) {
524 ColumnWriter::add(rowBatch, offset, numValues);
525
526 LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch);
527 if (byteBatch == nullptr) {
528 throw InvalidArgument("Failed to cast to LongVectorBatch");
529 }
530
531 int64_t* data = byteBatch->data.data() + offset;
532 const char* notNull = byteBatch->hasNulls ?
533 byteBatch->notNull.data() + offset : nullptr;
534
535 char* byteData = reinterpret_cast<char*>(data);
536 for (uint64_t i = 0; i < numValues; ++i) {
537 byteData[i] = static_cast<char>(data[i]);
538 }
539 byteRleEncoder->add(byteData, numValues, notNull);
540
541 IntegerColumnStatisticsImpl* intStats =
542 dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
543 if (intStats == nullptr) {
544 throw InvalidArgument("Failed to cast to IntegerColumnStatisticsImpl");
545 }
546 bool hasNull = false;
547 for (uint64_t i = 0; i < numValues; ++i) {
548 if (notNull == nullptr || notNull[i]) {
549 intStats->increase(1);
550 intStats->update(static_cast<int64_t>(byteData[i]), 1);
551 } else if (!hasNull) {
552 hasNull = true;
553 }
554 }
555 intStats->setHasNull(hasNull);
556 }
557
558 void ByteColumnWriter::flush(std::vector<proto::Stream>& streams) {
559 ColumnWriter::flush(streams);
560
561 proto::Stream stream;
562 stream.set_kind(proto::Stream_Kind_DATA);
563 stream.set_column(static_cast<uint32_t>(columnId));
564 stream.set_length(byteRleEncoder->flush());
565 streams.push_back(stream);
566 }
567
568 uint64_t ByteColumnWriter::getEstimatedSize() const {
569 uint64_t size = ColumnWriter::getEstimatedSize();
570 size += byteRleEncoder->getBufferSize();
571 return size;
572 }
573
574 void ByteColumnWriter::getColumnEncoding(
575 std::vector<proto::ColumnEncoding>& encodings) const {
576 proto::ColumnEncoding encoding;
577 encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
578 encoding.set_dictionarysize(0);
579 encodings.push_back(encoding);
580 }
581
582 void ByteColumnWriter::recordPosition() const {
583 ColumnWriter::recordPosition();
584 byteRleEncoder->recordPosition(rowIndexPosition.get());
585 }
586
587 class BooleanColumnWriter : public ColumnWriter {
588 public:
589 BooleanColumnWriter(const Type& type,
590 const StreamsFactory& factory,
591 const WriterOptions& options);
592
593 virtual void add(ColumnVectorBatch& rowBatch,
594 uint64_t offset,
595 uint64_t numValues) override;
596
597 virtual void flush(std::vector<proto::Stream>& streams) override;
598
599 virtual uint64_t getEstimatedSize() const override;
600
601 virtual void getColumnEncoding(
602 std::vector<proto::ColumnEncoding>& encodings) const override;
603
604 virtual void recordPosition() const override;
605
606 private:
607 std::unique_ptr<ByteRleEncoder> rleEncoder;
608 };
609
610 BooleanColumnWriter::BooleanColumnWriter(
611 const Type& type,
612 const StreamsFactory& factory,
613 const WriterOptions& options) :
614 ColumnWriter(type, factory, options) {
615 std::unique_ptr<BufferedOutputStream> dataStream =
616 factory.createStream(proto::Stream_Kind_DATA);
617 rleEncoder = createBooleanRleEncoder(std::move(dataStream));
618
619 if (enableIndex) {
620 recordPosition();
621 }
622 }
623
624 void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
625 uint64_t offset,
626 uint64_t numValues) {
627 ColumnWriter::add(rowBatch, offset, numValues);
628
629 LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch);
630 if (byteBatch == nullptr) {
631 throw InvalidArgument("Failed to cast to LongVectorBatch");
632 }
633 int64_t* data = byteBatch->data.data() + offset;
634 const char* notNull = byteBatch->hasNulls ?
635 byteBatch->notNull.data() + offset : nullptr;
636
637 char* byteData = reinterpret_cast<char*>(data);
638 for (uint64_t i = 0; i < numValues; ++i) {
639 byteData[i] = static_cast<char>(data[i]);
640 }
641 rleEncoder->add(byteData, numValues, notNull);
642
643 BooleanColumnStatisticsImpl* boolStats =
644 dynamic_cast<BooleanColumnStatisticsImpl*>(colIndexStatistics.get());
645 if (boolStats == nullptr) {
646 throw InvalidArgument("Failed to cast to BooleanColumnStatisticsImpl");
647 }
648 bool hasNull = false;
649 for (uint64_t i = 0; i < numValues; ++i) {
650 if (notNull == nullptr || notNull[i]) {
651 boolStats->increase(1);
652 boolStats->update(byteData[i] != 0, 1);
653 } else if (!hasNull) {
654 hasNull = true;
655 }
656 }
657 boolStats->setHasNull(hasNull);
658 }
659
660 void BooleanColumnWriter::flush(std::vector<proto::Stream>& streams) {
661 ColumnWriter::flush(streams);
662
663 proto::Stream stream;
664 stream.set_kind(proto::Stream_Kind_DATA);
665 stream.set_column(static_cast<uint32_t>(columnId));
666 stream.set_length(rleEncoder->flush());
667 streams.push_back(stream);
668 }
669
670 uint64_t BooleanColumnWriter::getEstimatedSize() const {
671 uint64_t size = ColumnWriter::getEstimatedSize();
672 size += rleEncoder->getBufferSize();
673 return size;
674 }
675
676 void BooleanColumnWriter::getColumnEncoding(
677 std::vector<proto::ColumnEncoding>& encodings) const {
678 proto::ColumnEncoding encoding;
679 encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
680 encoding.set_dictionarysize(0);
681 encodings.push_back(encoding);
682 }
683
684 void BooleanColumnWriter::recordPosition() const {
685 ColumnWriter::recordPosition();
686 rleEncoder->recordPosition(rowIndexPosition.get());
687 }
688
689 class DoubleColumnWriter : public ColumnWriter {
690 public:
691 DoubleColumnWriter(const Type& type,
692 const StreamsFactory& factory,
693 const WriterOptions& options,
694 bool isFloat);
695
696 virtual void add(ColumnVectorBatch& rowBatch,
697 uint64_t offset,
698 uint64_t numValues) override;
699
700 virtual void flush(std::vector<proto::Stream>& streams) override;
701
702 virtual uint64_t getEstimatedSize() const override;
703
704 virtual void getColumnEncoding(
705 std::vector<proto::ColumnEncoding>& encodings) const override;
706
707 virtual void recordPosition() const override;
708
709 private:
710 bool isFloat;
711 std::unique_ptr<AppendOnlyBufferedStream> dataStream;
712 DataBuffer<char> buffer;
713 };
714
715 DoubleColumnWriter::DoubleColumnWriter(
716 const Type& type,
717 const StreamsFactory& factory,
718 const WriterOptions& options,
719 bool isFloatType) :
720 ColumnWriter(type, factory, options),
721 isFloat(isFloatType),
722 buffer(*options.getMemoryPool()) {
723 dataStream.reset(new AppendOnlyBufferedStream(
724 factory.createStream(proto::Stream_Kind_DATA)));
725 buffer.resize(isFloat ? 4 : 8);
726
727 if (enableIndex) {
728 recordPosition();
729 }
730 }
731
732 // Floating point types are stored using IEEE 754 floating point bit layout.
733 // Float columns use 4 bytes per value and double columns use 8 bytes.
734 template <typename FLOAT_TYPE, typename INTEGER_TYPE>
735 inline void encodeFloatNum(FLOAT_TYPE input, char* output) {
736 INTEGER_TYPE* intBits = reinterpret_cast<INTEGER_TYPE*>(&input);
737 for (size_t i = 0; i < sizeof(INTEGER_TYPE); ++i) {
738 output[i] = static_cast<char>(((*intBits) >> (8 * i)) & 0xff);
739 }
740 }
741
742 void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch,
743 uint64_t offset,
744 uint64_t numValues) {
745 ColumnWriter::add(rowBatch, offset, numValues);
746 const DoubleVectorBatch* dblBatch =
747 dynamic_cast<const DoubleVectorBatch*>(&rowBatch);
748 if (dblBatch == nullptr) {
749 throw InvalidArgument("Failed to cast to DoubleVectorBatch");
750 }
751
752 const double* doubleData = dblBatch->data.data() + offset;
753 const char* notNull = dblBatch->hasNulls ?
754 dblBatch->notNull.data() + offset : nullptr;
755
756 DoubleColumnStatisticsImpl* doubleStats =
757 dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get());
758 if (doubleStats == nullptr) {
759 throw InvalidArgument("Failed to cast to DoubleColumnStatisticsImpl");
760 }
761
762 size_t bytes = isFloat ? 4 : 8;
763 char* data = buffer.data();
764 bool hasNull = false;
765
766 for (uint64_t i = 0; i < numValues; ++i) {
767 if (!notNull || notNull[i]) {
768 if (isFloat) {
769 encodeFloatNum<float, int32_t>(static_cast<float>(doubleData[i]), data);
770 } else {
771 encodeFloatNum<double, int64_t>(doubleData[i], data);
772 }
773 dataStream->write(data, bytes);
774
775 doubleStats->increase(1);
776 doubleStats->update(doubleData[i]);
777 } else if (!hasNull) {
778 hasNull = true;
779 }
780 }
781 doubleStats->setHasNull(hasNull);
782 }
783
784 void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) {
785 ColumnWriter::flush(streams);
786
787 proto::Stream stream;
788 stream.set_kind(proto::Stream_Kind_DATA);
789 stream.set_column(static_cast<uint32_t>(columnId));
790 stream.set_length(dataStream->flush());
791 streams.push_back(stream);
792 }
793
794 uint64_t DoubleColumnWriter::getEstimatedSize() const {
795 uint64_t size = ColumnWriter::getEstimatedSize();
796 size += dataStream->getSize();
797 return size;
798 }
799
800 void DoubleColumnWriter::getColumnEncoding(
801 std::vector<proto::ColumnEncoding>& encodings) const {
802 proto::ColumnEncoding encoding;
803 encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
804 encoding.set_dictionarysize(0);
805 encodings.push_back(encoding);
806 }
807
808 void DoubleColumnWriter::recordPosition() const {
809 ColumnWriter::recordPosition();
810 dataStream->recordPosition(rowIndexPosition.get());
811 }
812
813 class StringColumnWriter : public ColumnWriter {
814 public:
815 StringColumnWriter(const Type& type,
816 const StreamsFactory& factory,
817 const WriterOptions& options);
818
819 virtual void add(ColumnVectorBatch& rowBatch,
820 uint64_t offset,
821 uint64_t numValues) override;
822
823 virtual void flush(std::vector<proto::Stream>& streams) override;
824
825 virtual uint64_t getEstimatedSize() const override;
826
827 virtual void getColumnEncoding(
828 std::vector<proto::ColumnEncoding>& encodings) const override;
829
830 virtual void recordPosition() const override;
831
832 protected:
833 std::unique_ptr<RleEncoder> lengthEncoder;
834 std::unique_ptr<AppendOnlyBufferedStream> dataStream;
835 RleVersion rleVersion;
836 };
837
838 StringColumnWriter::StringColumnWriter(
839 const Type& type,
840 const StreamsFactory& factory,
841 const WriterOptions& options) :
842 ColumnWriter(type, factory, options),
843 rleVersion(RleVersion_1) {
844 std::unique_ptr<BufferedOutputStream> lengthStream =
845 factory.createStream(proto::Stream_Kind_LENGTH);
846 lengthEncoder = createRleEncoder(std::move(lengthStream),
847 false,
848 rleVersion,
849 memPool);
850 dataStream.reset(new AppendOnlyBufferedStream(
851 factory.createStream(proto::Stream_Kind_DATA)));
852
853 if (enableIndex) {
854 recordPosition();
855 }
856 }
857
858 void StringColumnWriter::add(ColumnVectorBatch& rowBatch,
859 uint64_t offset,
860 uint64_t numValues) {
861 ColumnWriter::add(rowBatch, offset, numValues);
862 const StringVectorBatch* stringBatch =
863 dynamic_cast<const StringVectorBatch*>(&rowBatch);
864 if (stringBatch == nullptr) {
865 throw InvalidArgument("Failed to cast to StringVectorBatch");
866 }
867
868 char *const * data = stringBatch->data.data() + offset;
869 const int64_t* length = stringBatch->length.data() + offset;
870 const char* notNull = stringBatch->hasNulls ?
871 stringBatch->notNull.data() + offset : nullptr;
872
873 lengthEncoder->add(length, numValues, notNull);
874
875 StringColumnStatisticsImpl* strStats =
876 dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
877 if (strStats == nullptr) {
878 throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl");
879 }
880 bool hasNull = false;
881 for (uint64_t i = 0; i < numValues; ++i) {
882 if (!notNull || notNull[i]) {
883 dataStream->write(data[i], static_cast<size_t>(length[i]));
884 strStats->update(data[i], static_cast<size_t>(length[i]));
885 strStats->increase(1);
886 } else if (!hasNull) {
887 hasNull = true;
888 }
889 }
890 strStats->setHasNull(hasNull);
891 }
892
893 void StringColumnWriter::flush(std::vector<proto::Stream>& streams) {
894 ColumnWriter::flush(streams);
895
896 proto::Stream length;
897 length.set_kind(proto::Stream_Kind_LENGTH);
898 length.set_column(static_cast<uint32_t>(columnId));
899 length.set_length(lengthEncoder->flush());
900 streams.push_back(length);
901
902 proto::Stream data;
903 data.set_kind(proto::Stream_Kind_DATA);
904 data.set_column(static_cast<uint32_t>(columnId));
905 data.set_length(dataStream->flush());
906 streams.push_back(data);
907 }
908
909 uint64_t StringColumnWriter::getEstimatedSize() const {
910 uint64_t size = ColumnWriter::getEstimatedSize();
911 size += lengthEncoder->getBufferSize();
912 size += dataStream->getSize();
913 return size;
914 }
915
916 void StringColumnWriter::getColumnEncoding(
917 std::vector<proto::ColumnEncoding>& encodings) const {
918 proto::ColumnEncoding encoding;
919 encoding.set_kind(rleVersion == RleVersion_1 ?
920 proto::ColumnEncoding_Kind_DIRECT :
921 proto::ColumnEncoding_Kind_DIRECT_V2);
922 encoding.set_dictionarysize(0);
923 encodings.push_back(encoding);
924 }
925
926 void StringColumnWriter::recordPosition() const {
927 ColumnWriter::recordPosition();
928 dataStream->recordPosition(rowIndexPosition.get());
929 lengthEncoder->recordPosition(rowIndexPosition.get());
930 }
931
932 class CharColumnWriter : public StringColumnWriter {
933 public:
934 CharColumnWriter(const Type& type,
935 const StreamsFactory& factory,
936 const WriterOptions& options) :
937 StringColumnWriter(type, factory, options),
938 fixedLength(type.getMaximumLength()),
939 padBuffer(*options.getMemoryPool(),
940 type.getMaximumLength()) {
941 // PASS
942 }
943
944 virtual void add(ColumnVectorBatch& rowBatch,
945 uint64_t offset,
946 uint64_t numValues) override;
947
948 private:
949 uint64_t fixedLength;
950 DataBuffer<char> padBuffer;
951 };
952
953 void CharColumnWriter::add(ColumnVectorBatch& rowBatch,
954 uint64_t offset,
955 uint64_t numValues) {
956 ColumnWriter::add(rowBatch, offset, numValues);
957 StringVectorBatch* charsBatch = dynamic_cast<StringVectorBatch*>(&rowBatch);
958 if (charsBatch == nullptr) {
959 throw InvalidArgument("Failed to cast to StringVectorBatch");
960 }
961
962 char** data = charsBatch->data.data() + offset;
963 int64_t* length = charsBatch->length.data() + offset;
964 const char* notNull = charsBatch->hasNulls ?
965 charsBatch->notNull.data() + offset : nullptr;
966
967 StringColumnStatisticsImpl* strStats =
968 dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
969 if (strStats == nullptr) {
970 throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl");
971 }
972 bool hasNull = false;
973
974 for (uint64_t i = 0; i < numValues; ++i) {
975 if (!notNull || notNull[i]) {
976 char *charData = data[i];
977 uint64_t oriLength = static_cast<uint64_t>(length[i]);
978 if (oriLength < fixedLength) {
979 memcpy(padBuffer.data(), data[i], oriLength);
980 memset(padBuffer.data() + oriLength, ' ', fixedLength - oriLength);
981 charData = padBuffer.data();
982 }
983 length[i] = static_cast<int64_t>(fixedLength);
984 dataStream->write(charData, fixedLength);
985
986 strStats->update(charData, fixedLength);
987 strStats->increase(1);
988 } else if (!hasNull) {
989 hasNull = true;
990 }
991 }
992 lengthEncoder->add(length, numValues, notNull);
993 strStats->setHasNull(hasNull);
994 }
995
996 class VarCharColumnWriter : public StringColumnWriter {
997 public:
998 VarCharColumnWriter(const Type& type,
999 const StreamsFactory& factory,
1000 const WriterOptions& options) :
1001 StringColumnWriter(type, factory, options),
1002 maxLength(type.getMaximumLength()) {
1003 // PASS
1004 }
1005
1006 virtual void add(ColumnVectorBatch& rowBatch,
1007 uint64_t offset,
1008 uint64_t numValues) override;
1009
1010 private:
1011 uint64_t maxLength;
1012 };
1013
1014 void VarCharColumnWriter::add(ColumnVectorBatch& rowBatch,
1015 uint64_t offset,
1016 uint64_t numValues) {
1017 ColumnWriter::add(rowBatch, offset, numValues);
1018 StringVectorBatch* charsBatch = dynamic_cast<StringVectorBatch*>(&rowBatch);
1019 if (charsBatch == nullptr) {
1020 throw InvalidArgument("Failed to cast to StringVectorBatch");
1021 }
1022
1023 char* const* data = charsBatch->data.data() + offset;
1024 int64_t* length = charsBatch->length.data() + offset;
1025 const char* notNull = charsBatch->hasNulls ?
1026 charsBatch->notNull.data() + offset : nullptr;
1027
1028 StringColumnStatisticsImpl* strStats =
1029 dynamic_cast<StringColumnStatisticsImpl*>(colIndexStatistics.get());
1030 if (strStats == nullptr) {
1031 throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl");
1032 }
1033 bool hasNull = false;
1034
1035 for (uint64_t i = 0; i < numValues; ++i) {
1036 if (!notNull || notNull[i]) {
1037 if (length[i] > static_cast<int64_t>(maxLength)) {
1038 length[i] = static_cast<int64_t>(maxLength);
1039 }
1040 dataStream->write(data[i], static_cast<size_t>(length[i]));
1041
1042 strStats->update(data[i], static_cast<size_t>(length[i]));
1043 strStats->increase(1);
1044 } else if (!hasNull) {
1045 hasNull = true;
1046 }
1047 }
1048 lengthEncoder->add(length, numValues, notNull);
1049 strStats->setHasNull(hasNull);
1050 }
1051
1052 class BinaryColumnWriter : public StringColumnWriter {
1053 public:
1054 BinaryColumnWriter(const Type& type,
1055 const StreamsFactory& factory,
1056 const WriterOptions& options) :
1057 StringColumnWriter(type, factory, options) {
1058 // PASS
1059 }
1060
1061 virtual void add(ColumnVectorBatch& rowBatch,
1062 uint64_t offset,
1063 uint64_t numValues) override;
1064 };
1065
1066 void BinaryColumnWriter::add(ColumnVectorBatch& rowBatch,
1067 uint64_t offset,
1068 uint64_t numValues) {
1069 ColumnWriter::add(rowBatch, offset, numValues);
1070
1071 StringVectorBatch* binBatch = dynamic_cast<StringVectorBatch*>(&rowBatch);
1072 if (binBatch == nullptr) {
1073 throw InvalidArgument("Failed to cast to StringVectorBatch");
1074 }
1075 char** data = binBatch->data.data() + offset;
1076 int64_t* length = binBatch->length.data() + offset;
1077 const char* notNull = binBatch->hasNulls ?
1078 binBatch->notNull.data() + offset : nullptr;
1079
1080 BinaryColumnStatisticsImpl* binStats =
1081 dynamic_cast<BinaryColumnStatisticsImpl*>(colIndexStatistics.get());
1082 if (binStats == nullptr) {
1083 throw InvalidArgument("Failed to cast to BinaryColumnStatisticsImpl");
1084 }
1085
1086 bool hasNull = false;
1087 for (uint64_t i = 0; i < numValues; ++i) {
1088 uint64_t unsignedLength = static_cast<uint64_t>(length[i]);
1089 if (!notNull || notNull[i]) {
1090 dataStream->write(data[i], unsignedLength);
1091
1092 binStats->update(unsignedLength);
1093 binStats->increase(1);
1094 } else if (!hasNull) {
1095 hasNull = true;
1096 }
1097 }
1098 lengthEncoder->add(length, numValues, notNull);
1099 binStats->setHasNull(hasNull);
1100 }
1101
1102 class TimestampColumnWriter : public ColumnWriter {
1103 public:
1104 TimestampColumnWriter(const Type& type,
1105 const StreamsFactory& factory,
1106 const WriterOptions& options);
1107
1108 virtual void add(ColumnVectorBatch& rowBatch,
1109 uint64_t offset,
1110 uint64_t numValues) override;
1111
1112 virtual void flush(std::vector<proto::Stream>& streams) override;
1113
1114 virtual uint64_t getEstimatedSize() const override;
1115
1116 virtual void getColumnEncoding(
1117 std::vector<proto::ColumnEncoding>& encodings) const override;
1118
1119 virtual void recordPosition() const override;
1120
1121 protected:
1122 std::unique_ptr<RleEncoder> secRleEncoder, nanoRleEncoder;
1123
1124 private:
1125 RleVersion rleVersion;
1126 const Timezone& timezone;
1127 };
1128
1129 TimestampColumnWriter::TimestampColumnWriter(
1130 const Type& type,
1131 const StreamsFactory& factory,
1132 const WriterOptions& options) :
1133 ColumnWriter(type, factory, options),
1134 rleVersion(RleVersion_1),
1135 timezone(getTimezoneByName("GMT")){
1136 std::unique_ptr<BufferedOutputStream> dataStream =
1137 factory.createStream(proto::Stream_Kind_DATA);
1138 std::unique_ptr<BufferedOutputStream> secondaryStream =
1139 factory.createStream(proto::Stream_Kind_SECONDARY);
1140 secRleEncoder = createRleEncoder(std::move(dataStream),
1141 true,
1142 rleVersion,
1143 memPool);
1144 nanoRleEncoder = createRleEncoder(std::move(secondaryStream),
1145 false,
1146 rleVersion,
1147 memPool);
1148
1149 if (enableIndex) {
1150 recordPosition();
1151 }
1152 }
1153
1154 // Because the number of nanoseconds often has a large number of trailing zeros,
1155 // the number has trailing decimal zero digits removed and the last three bits
1156 // are used to record how many zeros were removed. Thus 1000 nanoseconds would
1157 // be serialized as 0x0b and 100000 would be serialized as 0x0d.
1158 static int64_t formatNano(int64_t nanos) {
1159 if (nanos == 0) {
1160 return 0;
1161 } else if (nanos % 100 != 0) {
1162 return (nanos) << 3;
1163 } else {
1164 nanos /= 100;
1165 int64_t trailingZeros = 1;
1166 while (nanos % 10 == 0 && trailingZeros < 7) {
1167 nanos /= 10;
1168 trailingZeros += 1;
1169 }
1170 return (nanos) << 3 | trailingZeros;
1171 }
1172 }
1173
1174 void TimestampColumnWriter::add(ColumnVectorBatch& rowBatch,
1175 uint64_t offset,
1176 uint64_t numValues) {
1177 ColumnWriter::add(rowBatch, offset, numValues);
1178 TimestampVectorBatch* tsBatch =
1179 dynamic_cast<TimestampVectorBatch*>(&rowBatch);
1180 if (tsBatch == nullptr) {
1181 throw InvalidArgument("Failed to cast to TimestampVectorBatch");
1182 }
1183
1184 const char* notNull = tsBatch->hasNulls ?
1185 tsBatch->notNull.data() + offset : nullptr;
1186 int64_t *secs = tsBatch->data.data() + offset;
1187 int64_t *nanos = tsBatch->nanoseconds.data() + offset;
1188
1189 TimestampColumnStatisticsImpl* tsStats =
1190 dynamic_cast<TimestampColumnStatisticsImpl*>(colIndexStatistics.get());
1191 if (tsStats == nullptr) {
1192 throw InvalidArgument("Failed to cast to TimestampColumnStatisticsImpl");
1193 }
1194 bool hasNull = false;
1195 for (uint64_t i = 0; i < numValues; ++i) {
1196 if (notNull == nullptr || notNull[i]) {
1197 // TimestampVectorBatch already stores data in UTC
1198 int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000;
1199 tsStats->increase(1);
1200 tsStats->update(millsUTC);
1201
1202 if (secs[i] < 0 && nanos[i] != 0) {
1203 secs[i] += 1;
1204 }
1205
1206 secs[i] -= timezone.getEpoch();
1207 nanos[i] = formatNano(nanos[i]);
1208 } else if (!hasNull) {
1209 hasNull = true;
1210 }
1211 }
1212 tsStats->setHasNull(hasNull);
1213
1214 secRleEncoder->add(secs, numValues, notNull);
1215 nanoRleEncoder->add(nanos, numValues, notNull);
1216 }
1217
1218 void TimestampColumnWriter::flush(std::vector<proto::Stream>& streams) {
1219 ColumnWriter::flush(streams);
1220
1221 proto::Stream dataStream;
1222 dataStream.set_kind(proto::Stream_Kind_DATA);
1223 dataStream.set_column(static_cast<uint32_t>(columnId));
1224 dataStream.set_length(secRleEncoder->flush());
1225 streams.push_back(dataStream);
1226
1227 proto::Stream secondaryStream;
1228 secondaryStream.set_kind(proto::Stream_Kind_SECONDARY);
1229 secondaryStream.set_column(static_cast<uint32_t>(columnId));
1230 secondaryStream.set_length(nanoRleEncoder->flush());
1231 streams.push_back(secondaryStream);
1232 }
1233
1234 uint64_t TimestampColumnWriter::getEstimatedSize() const {
1235 uint64_t size = ColumnWriter::getEstimatedSize();
1236 size += secRleEncoder->getBufferSize();
1237 size += nanoRleEncoder->getBufferSize();
1238 return size;
1239 }
1240
1241 void TimestampColumnWriter::getColumnEncoding(
1242 std::vector<proto::ColumnEncoding>& encodings) const {
1243 proto::ColumnEncoding encoding;
1244 encoding.set_kind(rleVersion == RleVersion_1 ?
1245 proto::ColumnEncoding_Kind_DIRECT :
1246 proto::ColumnEncoding_Kind_DIRECT_V2);
1247 encoding.set_dictionarysize(0);
1248 encodings.push_back(encoding);
1249 }
1250
1251 void TimestampColumnWriter::recordPosition() const {
1252 ColumnWriter::recordPosition();
1253 secRleEncoder->recordPosition(rowIndexPosition.get());
1254 nanoRleEncoder->recordPosition(rowIndexPosition.get());
1255 }
1256
1257 class DateColumnWriter : public IntegerColumnWriter {
1258 public:
1259 DateColumnWriter(const Type& type,
1260 const StreamsFactory& factory,
1261 const WriterOptions& options);
1262
1263 virtual void add(ColumnVectorBatch& rowBatch,
1264 uint64_t offset,
1265 uint64_t numValues) override;
1266 };
1267
1268 DateColumnWriter::DateColumnWriter(
1269 const Type &type,
1270 const StreamsFactory &factory,
1271 const WriterOptions &options) :
1272 IntegerColumnWriter(type, factory, options) {
1273 // PASS
1274 }
1275
1276 void DateColumnWriter::add(ColumnVectorBatch& rowBatch,
1277 uint64_t offset,
1278 uint64_t numValues) {
1279 ColumnWriter::add(rowBatch, offset, numValues);
1280 const LongVectorBatch* longBatch =
1281 dynamic_cast<const LongVectorBatch*>(&rowBatch);
1282 if (longBatch == nullptr) {
1283 throw InvalidArgument("Failed to cast to LongVectorBatch");
1284 }
1285
1286 const int64_t* data = longBatch->data.data() + offset;
1287 const char* notNull = longBatch->hasNulls ?
1288 longBatch->notNull.data() + offset : nullptr;
1289
1290 rleEncoder->add(data, numValues, notNull);
1291
1292 DateColumnStatisticsImpl* dateStats =
1293 dynamic_cast<DateColumnStatisticsImpl*>(colIndexStatistics.get());
1294 if (dateStats == nullptr) {
1295 throw InvalidArgument("Failed to cast to DateColumnStatisticsImpl");
1296 }
1297 bool hasNull = false;
1298 for (uint64_t i = 0; i < numValues; ++i) {
1299 if (!notNull || notNull[i]) {
1300 dateStats->increase(1);
1301 dateStats->update(static_cast<int32_t>(data[i]));
1302 } else if (!hasNull) {
1303 hasNull = true;
1304 }
1305 }
1306 dateStats->setHasNull(hasNull);
1307 }
1308
1309 class Decimal64ColumnWriter : public ColumnWriter {
1310 public:
1311 static const uint32_t MAX_PRECISION_64 = 18;
1312 static const uint32_t MAX_PRECISION_128 = 38;
1313
1314 Decimal64ColumnWriter(const Type& type,
1315 const StreamsFactory& factory,
1316 const WriterOptions& options);
1317
1318 virtual void add(ColumnVectorBatch& rowBatch,
1319 uint64_t offset,
1320 uint64_t numValues) override;
1321
1322 virtual void flush(std::vector<proto::Stream>& streams) override;
1323
1324 virtual uint64_t getEstimatedSize() const override;
1325
1326 virtual void getColumnEncoding(
1327 std::vector<proto::ColumnEncoding>& encodings) const override;
1328
1329 virtual void recordPosition() const override;
1330
1331 protected:
1332 RleVersion rleVersion;
1333 uint64_t precision;
1334 uint64_t scale;
1335 std::unique_ptr<AppendOnlyBufferedStream> valueStream;
1336 std::unique_ptr<RleEncoder> scaleEncoder;
1337
1338 private:
1339 char buffer[10];
1340 };
1341
1342 Decimal64ColumnWriter::Decimal64ColumnWriter(
1343 const Type& type,
1344 const StreamsFactory& factory,
1345 const WriterOptions& options) :
1346 ColumnWriter(type, factory, options),
1347 rleVersion(RleVersion_1),
1348 precision(type.getPrecision()),
1349 scale(type.getScale()) {
1350 valueStream.reset(new AppendOnlyBufferedStream(
1351 factory.createStream(proto::Stream_Kind_DATA)));
1352 std::unique_ptr<BufferedOutputStream> scaleStream =
1353 factory.createStream(proto::Stream_Kind_SECONDARY);
1354 scaleEncoder = createRleEncoder(std::move(scaleStream),
1355 true,
1356 rleVersion,
1357 memPool);
1358
1359 if (enableIndex) {
1360 recordPosition();
1361 }
1362 }
1363
1364 void Decimal64ColumnWriter::add(ColumnVectorBatch& rowBatch,
1365 uint64_t offset,
1366 uint64_t numValues) {
1367 ColumnWriter::add(rowBatch, offset, numValues);
1368 const Decimal64VectorBatch* decBatch =
1369 dynamic_cast<const Decimal64VectorBatch*>(&rowBatch);
1370 if (decBatch == nullptr) {
1371 throw InvalidArgument("Failed to cast to Decimal64VectorBatch");
1372 }
1373
1374 const char* notNull = decBatch->hasNulls ?
1375 decBatch->notNull.data() + offset : nullptr;
1376 const int64_t* values = decBatch->values.data() + offset;
1377 DecimalColumnStatisticsImpl* decStats =
1378 dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get());
1379 if (decStats == nullptr) {
1380 throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl");
1381 }
1382 bool hasNull = false;
1383
1384 for (uint64_t i = 0; i < numValues; ++i) {
1385 if (!notNull || notNull[i]) {
1386 int64_t val = zigZag(values[i]);
1387 char* data = buffer;
1388 while (true) {
1389 if ((val & ~0x7f) == 0) {
1390 *(data++) = (static_cast<char>(val));
1391 break;
1392 } else {
1393 *(data++) = static_cast<char>(0x80 | (val & 0x7f));
1394 // cast val to unsigned so as to force 0-fill right shift
1395 val = (static_cast<uint64_t>(val) >> 7);
1396 }
1397 }
1398 valueStream->write(buffer, static_cast<size_t>(data - buffer));
1399
1400 decStats->increase(1);
1401 decStats->update(Decimal(values[i], static_cast<int32_t>(scale)));
1402 } else if (!hasNull) {
1403 hasNull = true;
1404 }
1405 }
1406 std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale));
1407 scaleEncoder->add(scales.data(), numValues, notNull);
1408
1409 decStats->setHasNull(hasNull);
1410 }
1411
1412 void Decimal64ColumnWriter::flush(std::vector<proto::Stream>& streams) {
1413 ColumnWriter::flush(streams);
1414
1415 proto::Stream dataStream;
1416 dataStream.set_kind(proto::Stream_Kind_DATA);
1417 dataStream.set_column(static_cast<uint32_t>(columnId));
1418 dataStream.set_length(valueStream->flush());
1419 streams.push_back(dataStream);
1420
1421 proto::Stream secondaryStream;
1422 secondaryStream.set_kind(proto::Stream_Kind_SECONDARY);
1423 secondaryStream.set_column(static_cast<uint32_t>(columnId));
1424 secondaryStream.set_length(scaleEncoder->flush());
1425 streams.push_back(secondaryStream);
1426 }
1427
1428 uint64_t Decimal64ColumnWriter::getEstimatedSize() const {
1429 uint64_t size = ColumnWriter::getEstimatedSize();
1430 size += valueStream->getSize();
1431 size += scaleEncoder->getBufferSize();
1432 return size;
1433 }
1434
1435 void Decimal64ColumnWriter::getColumnEncoding(
1436 std::vector<proto::ColumnEncoding>& encodings) const {
1437 proto::ColumnEncoding encoding;
1438 encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
1439 encoding.set_dictionarysize(0);
1440 encodings.push_back(encoding);
1441 }
1442
1443 void Decimal64ColumnWriter::recordPosition() const {
1444 ColumnWriter::recordPosition();
1445 valueStream->recordPosition(rowIndexPosition.get());
1446 scaleEncoder->recordPosition(rowIndexPosition.get());
1447 }
1448
1449 class Decimal128ColumnWriter : public Decimal64ColumnWriter {
1450 public:
1451 Decimal128ColumnWriter(const Type& type,
1452 const StreamsFactory& factory,
1453 const WriterOptions& options);
1454
1455 virtual void add(ColumnVectorBatch& rowBatch,
1456 uint64_t offset,
1457 uint64_t numValues) override;
1458
1459 private:
1460 char buffer[20];
1461 };
1462
1463 Decimal128ColumnWriter::Decimal128ColumnWriter(
1464 const Type& type,
1465 const StreamsFactory& factory,
1466 const WriterOptions& options) :
1467 Decimal64ColumnWriter(type, factory, options) {
1468 // PASS
1469 }
1470
1471 // Zigzag encoding moves the sign bit to the least significant bit using the
1472 // expression (val « 1) ^ (val » 63) and derives its name from the fact that
1473 // positive and negative numbers alternate once encoded.
1474 Int128 zigZagInt128(const Int128& value) {
1475 bool isNegative = value < 0;
1476 Int128 val = value.abs();
1477 val <<= 1;
1478 if (isNegative) {
1479 val -= 1;
1480 }
1481 return val;
1482 }
1483
1484 void Decimal128ColumnWriter::add(ColumnVectorBatch& rowBatch,
1485 uint64_t offset,
1486 uint64_t numValues) {
1487 ColumnWriter::add(rowBatch, offset, numValues);
1488 const Decimal128VectorBatch* decBatch =
1489 dynamic_cast<const Decimal128VectorBatch*>(&rowBatch);
1490 if (decBatch == nullptr) {
1491 throw InvalidArgument("Failed to cast to Decimal128VectorBatch");
1492 }
1493
1494 const char* notNull = decBatch->hasNulls ?
1495 decBatch->notNull.data() + offset : nullptr;
1496 const Int128* values = decBatch->values.data() + offset;
1497 DecimalColumnStatisticsImpl* decStats =
1498 dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get());
1499 if (decStats == nullptr) {
1500 throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl");
1501 }
1502 bool hasNull = false;
1503
1504 // The current encoding of decimal columns stores the integer representation
1505 // of the value as an unbounded length zigzag encoded base 128 varint.
1506 for (uint64_t i = 0; i < numValues; ++i) {
1507 if (!notNull || notNull[i]) {
1508 Int128 val = zigZagInt128(values[i]);
1509 char* data = buffer;
1510 while (true) {
1511 if ((val & ~0x7f) == 0) {
1512 *(data++) = (static_cast<char>(val.getLowBits()));
1513 break;
1514 } else {
1515 *(data++) = static_cast<char>(0x80 | (val.getLowBits() & 0x7f));
1516 val >>= 7;
1517 }
1518 }
1519 valueStream->write(buffer, static_cast<size_t>(data - buffer));
1520
1521 decStats->increase(1);
1522 decStats->update(Decimal(values[i], static_cast<int32_t>(scale)));
1523 } else if (!hasNull) {
1524 hasNull = true;
1525 }
1526 }
1527 std::vector<int64_t> scales(numValues, static_cast<int64_t>(scale));
1528 scaleEncoder->add(scales.data(), numValues, notNull);
1529
1530 decStats->setHasNull(hasNull);
1531 }
1532
1533 class ListColumnWriter : public ColumnWriter {
1534 public:
1535 ListColumnWriter(const Type& type,
1536 const StreamsFactory& factory,
1537 const WriterOptions& options);
1538 ~ListColumnWriter() override;
1539
1540 virtual void add(ColumnVectorBatch& rowBatch,
1541 uint64_t offset,
1542 uint64_t numValues) override;
1543
1544 virtual void flush(std::vector<proto::Stream>& streams) override;
1545
1546 virtual uint64_t getEstimatedSize() const override;
1547
1548 virtual void getColumnEncoding(
1549 std::vector<proto::ColumnEncoding>& encodings) const override;
1550
1551 virtual void getStripeStatistics(
1552 std::vector<proto::ColumnStatistics>& stats) const override;
1553
1554 virtual void getFileStatistics(
1555 std::vector<proto::ColumnStatistics>& stats) const override;
1556
1557 virtual void mergeStripeStatsIntoFileStats() override;
1558
1559 virtual void mergeRowGroupStatsIntoStripeStats() override;
1560
1561 virtual void createRowIndexEntry() override;
1562
1563 virtual void writeIndex(
1564 std::vector<proto::Stream> &streams) const override;
1565
1566 virtual void recordPosition() const override;
1567
1568 private:
1569 std::unique_ptr<RleEncoder> lengthEncoder;
1570 RleVersion rleVersion;
1571 std::unique_ptr<ColumnWriter> child;
1572 };
1573
1574 ListColumnWriter::ListColumnWriter(const Type& type,
1575 const StreamsFactory& factory,
1576 const WriterOptions& options) :
1577 ColumnWriter(type, factory, options),
1578 rleVersion(RleVersion_1){
1579
1580 std::unique_ptr<BufferedOutputStream> lengthStream =
1581 factory.createStream(proto::Stream_Kind_LENGTH);
1582 lengthEncoder = createRleEncoder(std::move(lengthStream),
1583 false,
1584 rleVersion,
1585 memPool);
1586
1587 if (type.getSubtypeCount() == 1) {
1588 child = buildWriter(*type.getSubtype(0), factory, options);
1589 }
1590
1591 if (enableIndex) {
1592 recordPosition();
1593 }
1594 }
1595
1596 ListColumnWriter::~ListColumnWriter() {
1597 // PASS
1598 }
1599
1600 void ListColumnWriter::add(ColumnVectorBatch& rowBatch,
1601 uint64_t offset,
1602 uint64_t numValues) {
1603 ColumnWriter::add(rowBatch, offset, numValues);
1604
1605 ListVectorBatch* listBatch = dynamic_cast<ListVectorBatch*>(&rowBatch);
1606 if (listBatch == nullptr) {
1607 throw InvalidArgument("Failed to cast to ListVectorBatch");
1608 }
1609
1610 int64_t* offsets = listBatch->offsets.data() + offset;
1611 const char* notNull = listBatch->hasNulls ?
1612 listBatch->notNull.data() + offset : nullptr;
1613
1614 uint64_t elemOffset = static_cast<uint64_t>(offsets[0]);
1615 uint64_t totalNumValues = static_cast<uint64_t>(offsets[numValues] - offsets[0]);
1616
1617 // translate offsets to lengths
1618 for (uint64_t i = 0; i != numValues; ++i) {
1619 offsets[i] = offsets[i + 1] - offsets[i];
1620 }
1621
1622 // unnecessary to deal with null as elements are packed together
1623 if (child.get()) {
1624 child->add(*listBatch->elements, elemOffset, totalNumValues);
1625 }
1626 lengthEncoder->add(offsets, numValues, notNull);
1627
1628 if (enableIndex) {
1629 bool hasNull = false;
1630 if (!notNull) {
1631 colIndexStatistics->increase(numValues);
1632 } else {
1633 for (uint64_t i = 0; i < numValues; ++i) {
1634 if (notNull[i]) {
1635 colIndexStatistics->increase(1);
1636 } else if (!hasNull) {
1637 hasNull = true;
1638 }
1639 }
1640 }
1641 colIndexStatistics->setHasNull(hasNull);
1642 }
1643 }
1644
1645 void ListColumnWriter::flush(std::vector<proto::Stream>& streams) {
1646 ColumnWriter::flush(streams);
1647
1648 proto::Stream stream;
1649 stream.set_kind(proto::Stream_Kind_LENGTH);
1650 stream.set_column(static_cast<uint32_t>(columnId));
1651 stream.set_length(lengthEncoder->flush());
1652 streams.push_back(stream);
1653
1654 if (child.get()) {
1655 child->flush(streams);
1656 }
1657 }
1658
1659 void ListColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const {
1660 ColumnWriter::writeIndex(streams);
1661 if (child.get()) {
1662 child->writeIndex(streams);
1663 }
1664 }
1665
1666 uint64_t ListColumnWriter::getEstimatedSize() const {
1667 uint64_t size = ColumnWriter::getEstimatedSize();
1668 if (child.get()) {
1669 size += lengthEncoder->getBufferSize();
1670 size += child->getEstimatedSize();
1671 }
1672 return size;
1673 }
1674
1675 void ListColumnWriter::getColumnEncoding(
1676 std::vector<proto::ColumnEncoding>& encodings) const {
1677 proto::ColumnEncoding encoding;
1678 encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
1679 encoding.set_dictionarysize(0);
1680 encodings.push_back(encoding);
1681 if (child.get()) {
1682 child->getColumnEncoding(encodings);
1683 }
1684 }
1685
1686 void ListColumnWriter::getStripeStatistics(
1687 std::vector<proto::ColumnStatistics>& stats) const {
1688 ColumnWriter::getStripeStatistics(stats);
1689 if (child.get()) {
1690 child->getStripeStatistics(stats);
1691 }
1692 }
1693
1694 void ListColumnWriter::mergeStripeStatsIntoFileStats() {
1695 ColumnWriter::mergeStripeStatsIntoFileStats();
1696 if (child.get()) {
1697 child->mergeStripeStatsIntoFileStats();
1698 }
1699 }
1700
1701 void ListColumnWriter::getFileStatistics(
1702 std::vector<proto::ColumnStatistics>& stats) const {
1703 ColumnWriter::getFileStatistics(stats);
1704 if (child.get()) {
1705 child->getFileStatistics(stats);
1706 }
1707 }
1708
1709 void ListColumnWriter::mergeRowGroupStatsIntoStripeStats() {
1710 ColumnWriter::mergeRowGroupStatsIntoStripeStats();
1711 if (child.get()) {
1712 child->mergeRowGroupStatsIntoStripeStats();
1713 }
1714 }
1715
1716 void ListColumnWriter::createRowIndexEntry() {
1717 ColumnWriter::createRowIndexEntry();
1718 if (child.get()) {
1719 child->createRowIndexEntry();
1720 }
1721 }
1722
1723 void ListColumnWriter::recordPosition() const {
1724 ColumnWriter::recordPosition();
1725 lengthEncoder->recordPosition(rowIndexPosition.get());
1726 }
1727
1728 class MapColumnWriter : public ColumnWriter {
1729 public:
1730 MapColumnWriter(const Type& type,
1731 const StreamsFactory& factory,
1732 const WriterOptions& options);
1733 ~MapColumnWriter() override;
1734
1735 virtual void add(ColumnVectorBatch& rowBatch,
1736 uint64_t offset,
1737 uint64_t numValues) override;
1738
1739 virtual void flush(std::vector<proto::Stream>& streams) override;
1740
1741 virtual uint64_t getEstimatedSize() const override;
1742
1743 virtual void getColumnEncoding(
1744 std::vector<proto::ColumnEncoding>& encodings) const override;
1745
1746 virtual void getStripeStatistics(
1747 std::vector<proto::ColumnStatistics>& stats) const override;
1748
1749 virtual void getFileStatistics(
1750 std::vector<proto::ColumnStatistics>& stats) const override;
1751
1752 virtual void mergeStripeStatsIntoFileStats() override;
1753
1754 virtual void mergeRowGroupStatsIntoStripeStats() override;
1755
1756 virtual void createRowIndexEntry() override;
1757
1758 virtual void writeIndex(
1759 std::vector<proto::Stream> &streams) const override;
1760
1761 virtual void recordPosition() const override;
1762
1763 private:
1764 std::unique_ptr<ColumnWriter> keyWriter;
1765 std::unique_ptr<ColumnWriter> elemWriter;
1766 std::unique_ptr<RleEncoder> lengthEncoder;
1767 RleVersion rleVersion;
1768 };
1769
1770 MapColumnWriter::MapColumnWriter(const Type& type,
1771 const StreamsFactory& factory,
1772 const WriterOptions& options) :
1773 ColumnWriter(type, factory, options),
1774 rleVersion(RleVersion_1){
1775 std::unique_ptr<BufferedOutputStream> lengthStream =
1776 factory.createStream(proto::Stream_Kind_LENGTH);
1777 lengthEncoder = createRleEncoder(std::move(lengthStream),
1778 false,
1779 rleVersion,
1780 memPool);
1781
1782 if (type.getSubtypeCount() > 0) {
1783 keyWriter = buildWriter(*type.getSubtype(0), factory, options);
1784 }
1785
1786 if (type.getSubtypeCount() > 1) {
1787 elemWriter = buildWriter(*type.getSubtype(1), factory, options);
1788 }
1789
1790 if (enableIndex) {
1791 recordPosition();
1792 }
1793 }
1794
1795 MapColumnWriter::~MapColumnWriter() {
1796 // PASS
1797 }
1798
1799 void MapColumnWriter::add(ColumnVectorBatch& rowBatch,
1800 uint64_t offset,
1801 uint64_t numValues) {
1802 ColumnWriter::add(rowBatch, offset, numValues);
1803
1804 MapVectorBatch* mapBatch = dynamic_cast<MapVectorBatch*>(&rowBatch);
1805 if (mapBatch == nullptr) {
1806 throw InvalidArgument("Failed to cast to MapVectorBatch");
1807 }
1808
1809 int64_t* offsets = mapBatch->offsets.data() + offset;
1810 const char* notNull = mapBatch->hasNulls ?
1811 mapBatch->notNull.data() + offset : nullptr;
1812
1813 uint64_t elemOffset = static_cast<uint64_t>(offsets[0]);
1814 uint64_t totalNumValues = static_cast<uint64_t>(offsets[numValues] - offsets[0]);
1815
1816 // translate offsets to lengths
1817 for (uint64_t i = 0; i != numValues; ++i) {
1818 offsets[i] = offsets[i + 1] - offsets[i];
1819 }
1820
1821 lengthEncoder->add(offsets, numValues, notNull);
1822
1823 // unnecessary to deal with null as keys and values are packed together
1824 if (keyWriter.get()) {
1825 keyWriter->add(*mapBatch->keys, elemOffset, totalNumValues);
1826 }
1827 if (elemWriter.get()) {
1828 elemWriter->add(*mapBatch->elements, elemOffset, totalNumValues);
1829 }
1830
1831 if (enableIndex) {
1832 bool hasNull = false;
1833 if (!notNull) {
1834 colIndexStatistics->increase(numValues);
1835 } else {
1836 for (uint64_t i = 0; i < numValues; ++i) {
1837 if (notNull[i]) {
1838 colIndexStatistics->increase(1);
1839 } else if (!hasNull) {
1840 hasNull = true;
1841 }
1842 }
1843 }
1844 colIndexStatistics->setHasNull(hasNull);
1845 }
1846 }
1847
1848 void MapColumnWriter::flush(std::vector<proto::Stream>& streams) {
1849 ColumnWriter::flush(streams);
1850
1851 proto::Stream stream;
1852 stream.set_kind(proto::Stream_Kind_LENGTH);
1853 stream.set_column(static_cast<uint32_t>(columnId));
1854 stream.set_length(lengthEncoder->flush());
1855 streams.push_back(stream);
1856
1857 if (keyWriter.get()) {
1858 keyWriter->flush(streams);
1859 }
1860 if (elemWriter.get()) {
1861 elemWriter->flush(streams);
1862 }
1863 }
1864
1865 void MapColumnWriter::writeIndex(
1866 std::vector<proto::Stream> &streams) const {
1867 ColumnWriter::writeIndex(streams);
1868 if (keyWriter.get()) {
1869 keyWriter->writeIndex(streams);
1870 }
1871 if (elemWriter.get()) {
1872 elemWriter->writeIndex(streams);
1873 }
1874 }
1875
1876 uint64_t MapColumnWriter::getEstimatedSize() const {
1877 uint64_t size = ColumnWriter::getEstimatedSize();
1878 size += lengthEncoder->getBufferSize();
1879 if (keyWriter.get()) {
1880 size += keyWriter->getEstimatedSize();
1881 }
1882 if (elemWriter.get()) {
1883 size += elemWriter->getEstimatedSize();
1884 }
1885 return size;
1886 }
1887
1888 void MapColumnWriter::getColumnEncoding(
1889 std::vector<proto::ColumnEncoding>& encodings) const {
1890 proto::ColumnEncoding encoding;
1891 encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
1892 encoding.set_dictionarysize(0);
1893 encodings.push_back(encoding);
1894 if (keyWriter.get()) {
1895 keyWriter->getColumnEncoding(encodings);
1896 }
1897 if (elemWriter.get()) {
1898 elemWriter->getColumnEncoding(encodings);
1899 }
1900 }
1901
1902 void MapColumnWriter::getStripeStatistics(
1903 std::vector<proto::ColumnStatistics>& stats) const {
1904 ColumnWriter::getStripeStatistics(stats);
1905 if (keyWriter.get()) {
1906 keyWriter->getStripeStatistics(stats);
1907 }
1908 if (elemWriter.get()) {
1909 elemWriter->getStripeStatistics(stats);
1910 }
1911 }
1912
1913 void MapColumnWriter::mergeStripeStatsIntoFileStats() {
1914 ColumnWriter::mergeStripeStatsIntoFileStats();
1915 if (keyWriter.get()) {
1916 keyWriter->mergeStripeStatsIntoFileStats();
1917 }
1918 if (elemWriter.get()) {
1919 elemWriter->mergeStripeStatsIntoFileStats();
1920 }
1921 }
1922
1923 void MapColumnWriter::getFileStatistics(
1924 std::vector<proto::ColumnStatistics>& stats) const {
1925 ColumnWriter::getFileStatistics(stats);
1926 if (keyWriter.get()) {
1927 keyWriter->getFileStatistics(stats);
1928 }
1929 if (elemWriter.get()) {
1930 elemWriter->getFileStatistics(stats);
1931 }
1932 }
1933
1934 void MapColumnWriter::mergeRowGroupStatsIntoStripeStats() {
1935 ColumnWriter::mergeRowGroupStatsIntoStripeStats();
1936 if (keyWriter.get()) {
1937 keyWriter->mergeRowGroupStatsIntoStripeStats();
1938 }
1939 if (elemWriter.get()) {
1940 elemWriter->mergeRowGroupStatsIntoStripeStats();
1941 }
1942 }
1943
1944 void MapColumnWriter::createRowIndexEntry() {
1945 ColumnWriter::createRowIndexEntry();
1946 if (keyWriter.get()) {
1947 keyWriter->createRowIndexEntry();
1948 }
1949 if (elemWriter.get()) {
1950 elemWriter->createRowIndexEntry();
1951 }
1952 }
1953
1954 void MapColumnWriter::recordPosition() const {
1955 ColumnWriter::recordPosition();
1956 lengthEncoder->recordPosition(rowIndexPosition.get());
1957 }
1958
1959 class UnionColumnWriter : public ColumnWriter {
1960 public:
1961 UnionColumnWriter(const Type& type,
1962 const StreamsFactory& factory,
1963 const WriterOptions& options);
1964 ~UnionColumnWriter() override;
1965
1966 virtual void add(ColumnVectorBatch& rowBatch,
1967 uint64_t offset,
1968 uint64_t numValues) override;
1969
1970 virtual void flush(std::vector<proto::Stream>& streams) override;
1971
1972 virtual uint64_t getEstimatedSize() const override;
1973
1974 virtual void getColumnEncoding(
1975 std::vector<proto::ColumnEncoding>& encodings) const override;
1976
1977 virtual void getStripeStatistics(
1978 std::vector<proto::ColumnStatistics>& stats) const override;
1979
1980 virtual void getFileStatistics(
1981 std::vector<proto::ColumnStatistics>& stats) const override;
1982
1983 virtual void mergeStripeStatsIntoFileStats() override;
1984
1985 virtual void mergeRowGroupStatsIntoStripeStats() override;
1986
1987 virtual void createRowIndexEntry() override;
1988
1989 virtual void writeIndex(
1990 std::vector<proto::Stream> &streams) const override;
1991
1992 virtual void recordPosition() const override;
1993
1994 private:
1995 std::unique_ptr<ByteRleEncoder> rleEncoder;
1996 std::vector<ColumnWriter*> children;
1997 };
1998
1999 UnionColumnWriter::UnionColumnWriter(const Type& type,
2000 const StreamsFactory& factory,
2001 const WriterOptions& options) :
2002 ColumnWriter(type, factory, options) {
2003
2004 std::unique_ptr<BufferedOutputStream> dataStream =
2005 factory.createStream(proto::Stream_Kind_DATA);
2006 rleEncoder = createByteRleEncoder(std::move(dataStream));
2007
2008 for (uint64_t i = 0; i != type.getSubtypeCount(); ++i) {
2009 children.push_back(buildWriter(*type.getSubtype(i),
2010 factory,
2011 options).release());
2012 }
2013
2014 if (enableIndex) {
2015 recordPosition();
2016 }
2017 }
2018
2019 UnionColumnWriter::~UnionColumnWriter() {
2020 for (uint32_t i = 0; i < children.size(); ++i) {
2021 delete children[i];
2022 }
2023 }
2024
2025 void UnionColumnWriter::add(ColumnVectorBatch& rowBatch,
2026 uint64_t offset,
2027 uint64_t numValues) {
2028 ColumnWriter::add(rowBatch, offset, numValues);
2029
2030 UnionVectorBatch* unionBatch = dynamic_cast<UnionVectorBatch*>(&rowBatch);
2031 if (unionBatch == nullptr) {
2032 throw InvalidArgument("Failed to cast to UnionVectorBatch");
2033 }
2034
2035 const char* notNull = unionBatch->hasNulls ?
2036 unionBatch->notNull.data() + offset : nullptr;
2037 unsigned char * tags = unionBatch->tags.data() + offset;
2038 uint64_t * offsets = unionBatch->offsets.data() + offset;
2039
2040 std::vector<int64_t> childOffset(children.size(), -1);
2041 std::vector<uint64_t> childLength(children.size(), 0);
2042
2043 for (uint64_t i = 0; i != numValues; ++i) {
2044 if (childOffset[tags[i]] == -1) {
2045 childOffset[tags[i]] = static_cast<int64_t>(offsets[i]);
2046 }
2047 ++childLength[tags[i]];
2048 }
2049
2050 rleEncoder->add(reinterpret_cast<char*>(tags), numValues, notNull);
2051
2052 for (uint32_t i = 0; i < children.size(); ++i) {
2053 if (childLength[i] > 0) {
2054 children[i]->add(*unionBatch->children[i],
2055 static_cast<uint64_t>(childOffset[i]),
2056 childLength[i]);
2057 }
2058 }
2059
2060 // update stats
2061 if (enableIndex) {
2062 bool hasNull = false;
2063 if (!notNull) {
2064 colIndexStatistics->increase(numValues);
2065 } else {
2066 for (uint64_t i = 0; i < numValues; ++i) {
2067 if (notNull[i]) {
2068 colIndexStatistics->increase(1);
2069 } else if (!hasNull) {
2070 hasNull = true;
2071 }
2072 }
2073 }
2074 colIndexStatistics->setHasNull(hasNull);
2075 }
2076 }
2077
2078 void UnionColumnWriter::flush(std::vector<proto::Stream>& streams) {
2079 ColumnWriter::flush(streams);
2080
2081 proto::Stream stream;
2082 stream.set_kind(proto::Stream_Kind_DATA);
2083 stream.set_column(static_cast<uint32_t>(columnId));
2084 stream.set_length(rleEncoder->flush());
2085 streams.push_back(stream);
2086
2087 for (uint32_t i = 0; i < children.size(); ++i) {
2088 children[i]->flush(streams);
2089 }
2090 }
2091
2092 void UnionColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const {
2093 ColumnWriter::writeIndex(streams);
2094 for (uint32_t i = 0; i < children.size(); ++i) {
2095 children[i]->writeIndex(streams);
2096 }
2097 }
2098
2099 uint64_t UnionColumnWriter::getEstimatedSize() const {
2100 uint64_t size = ColumnWriter::getEstimatedSize();
2101 size += rleEncoder->getBufferSize();
2102 for (uint32_t i = 0; i < children.size(); ++i) {
2103 size += children[i]->getEstimatedSize();
2104 }
2105 return size;
2106 }
2107
2108 void UnionColumnWriter::getColumnEncoding(
2109 std::vector<proto::ColumnEncoding>& encodings) const {
2110 proto::ColumnEncoding encoding;
2111 encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
2112 encoding.set_dictionarysize(0);
2113 encodings.push_back(encoding);
2114 for (uint32_t i = 0; i < children.size(); ++i) {
2115 children[i]->getColumnEncoding(encodings);
2116 }
2117 }
2118
2119 void UnionColumnWriter::getStripeStatistics(
2120 std::vector<proto::ColumnStatistics>& stats) const {
2121 ColumnWriter::getStripeStatistics(stats);
2122 for (uint32_t i = 0; i < children.size(); ++i) {
2123 children[i]->getStripeStatistics(stats);
2124 }
2125 }
2126
2127 void UnionColumnWriter::mergeStripeStatsIntoFileStats() {
2128 ColumnWriter::mergeStripeStatsIntoFileStats();
2129 for (uint32_t i = 0; i < children.size(); ++i) {
2130 children[i]->mergeStripeStatsIntoFileStats();
2131 }
2132 }
2133
2134 void UnionColumnWriter::getFileStatistics(
2135 std::vector<proto::ColumnStatistics>& stats) const {
2136 ColumnWriter::getFileStatistics(stats);
2137 for (uint32_t i = 0; i < children.size(); ++i) {
2138 children[i]->getFileStatistics(stats);
2139 }
2140 }
2141
2142 void UnionColumnWriter::mergeRowGroupStatsIntoStripeStats() {
2143 ColumnWriter::mergeRowGroupStatsIntoStripeStats();
2144 for (uint32_t i = 0; i < children.size(); ++i) {
2145 children[i]->mergeRowGroupStatsIntoStripeStats();
2146 }
2147 }
2148
2149 void UnionColumnWriter::createRowIndexEntry() {
2150 ColumnWriter::createRowIndexEntry();
2151 for (uint32_t i = 0; i < children.size(); ++i) {
2152 children[i]->createRowIndexEntry();
2153 }
2154 }
2155
2156 void UnionColumnWriter::recordPosition() const {
2157 ColumnWriter::recordPosition();
2158 rleEncoder->recordPosition(rowIndexPosition.get());
2159 }
2160
2161 std::unique_ptr<ColumnWriter> buildWriter(
2162 const Type& type,
2163 const StreamsFactory& factory,
2164 const WriterOptions& options) {
2165 switch (static_cast<int64_t>(type.getKind())) {
2166 case STRUCT:
2167 return std::unique_ptr<ColumnWriter>(
2168 new StructColumnWriter(
2169 type,
2170 factory,
2171 options));
2172 case INT:
2173 case LONG:
2174 case SHORT:
2175 return std::unique_ptr<ColumnWriter>(
2176 new IntegerColumnWriter(
2177 type,
2178 factory,
2179 options));
2180 case BYTE:
2181 return std::unique_ptr<ColumnWriter>(
2182 new ByteColumnWriter(
2183 type,
2184 factory,
2185 options));
2186 case BOOLEAN:
2187 return std::unique_ptr<ColumnWriter>(
2188 new BooleanColumnWriter(
2189 type,
2190 factory,
2191 options));
2192 case DOUBLE:
2193 return std::unique_ptr<ColumnWriter>(
2194 new DoubleColumnWriter(
2195 type,
2196 factory,
2197 options,
2198 false));
2199 case FLOAT:
2200 return std::unique_ptr<ColumnWriter>(
2201 new DoubleColumnWriter(
2202 type,
2203 factory,
2204 options,
2205 true));
2206 case BINARY:
2207 return std::unique_ptr<ColumnWriter>(
2208 new BinaryColumnWriter(
2209 type,
2210 factory,
2211 options));
2212 case STRING:
2213 return std::unique_ptr<ColumnWriter>(
2214 new StringColumnWriter(
2215 type,
2216 factory,
2217 options));
2218 case CHAR:
2219 return std::unique_ptr<ColumnWriter>(
2220 new CharColumnWriter(
2221 type,
2222 factory,
2223 options));
2224 case VARCHAR:
2225 return std::unique_ptr<ColumnWriter>(
2226 new VarCharColumnWriter(
2227 type,
2228 factory,
2229 options));
2230 case DATE:
2231 return std::unique_ptr<ColumnWriter>(
2232 new DateColumnWriter(
2233 type,
2234 factory,
2235 options));
2236 case TIMESTAMP:
2237 return std::unique_ptr<ColumnWriter>(
2238 new TimestampColumnWriter(
2239 type,
2240 factory,
2241 options));
2242 case DECIMAL:
2243 if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_64) {
2244 return std::unique_ptr<ColumnWriter>(
2245 new Decimal64ColumnWriter(
2246 type,
2247 factory,
2248 options));
2249 } else if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_128) {
2250 return std::unique_ptr<ColumnWriter>(
2251 new Decimal128ColumnWriter(
2252 type,
2253 factory,
2254 options));
2255 } else {
2256 throw NotImplementedYet("Decimal precision more than 38 is not "
2257 "supported");
2258 }
2259 case LIST:
2260 return std::unique_ptr<ColumnWriter>(
2261 new ListColumnWriter(
2262 type,
2263 factory,
2264 options));
2265 case MAP:
2266 return std::unique_ptr<ColumnWriter>(
2267 new MapColumnWriter(
2268 type,
2269 factory,
2270 options));
2271 case UNION:
2272 return std::unique_ptr<ColumnWriter>(
2273 new UnionColumnWriter(
2274 type,
2275 factory,
2276 options));
2277 default:
2278 throw NotImplementedYet("Type is not supported yet for creating "
2279 "ColumnWriter.");
2280 }
2281 }
2282}
2283