1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include "orc/Int128.hh"
20
21#include "Adaptor.hh"
22#include "ByteRLE.hh"
23#include "ColumnReader.hh"
24#include "orc/Exceptions.hh"
25#include "RLE.hh"
26
27#include <math.h>
28#include <iostream>
29
30namespace orc {
31
32 StripeStreams::~StripeStreams() {
33 // PASS
34 }
35
36 inline RleVersion convertRleVersion(proto::ColumnEncoding_Kind kind) {
37 switch (static_cast<int64_t>(kind)) {
38 case proto::ColumnEncoding_Kind_DIRECT:
39 case proto::ColumnEncoding_Kind_DICTIONARY:
40 return RleVersion_1;
41 case proto::ColumnEncoding_Kind_DIRECT_V2:
42 case proto::ColumnEncoding_Kind_DICTIONARY_V2:
43 return RleVersion_2;
44 default:
45 throw ParseError("Unknown encoding in convertRleVersion");
46 }
47 }
48
49 ColumnReader::ColumnReader(const Type& type,
50 StripeStreams& stripe
51 ): columnId(type.getColumnId()),
52 memoryPool(stripe.getMemoryPool()) {
53 std::unique_ptr<SeekableInputStream> stream =
54 stripe.getStream(columnId, proto::Stream_Kind_PRESENT, true);
55 if (stream.get()) {
56 notNullDecoder = createBooleanRleDecoder(std::move(stream));
57 }
58 }
59
60 ColumnReader::~ColumnReader() {
61 // PASS
62 }
63
64 uint64_t ColumnReader::skip(uint64_t numValues) {
65 ByteRleDecoder* decoder = notNullDecoder.get();
66 if (decoder) {
67 // page through the values that we want to skip
68 // and count how many are non-null
69 const size_t MAX_BUFFER_SIZE = 32768;
70 size_t bufferSize = std::min(MAX_BUFFER_SIZE,
71 static_cast<size_t>(numValues));
72 char buffer[MAX_BUFFER_SIZE];
73 uint64_t remaining = numValues;
74 while (remaining > 0) {
75 uint64_t chunkSize =
76 std::min(remaining,
77 static_cast<uint64_t>(bufferSize));
78 decoder->next(buffer, chunkSize, nullptr);
79 remaining -= chunkSize;
80 for(uint64_t i=0; i < chunkSize; ++i) {
81 if (!buffer[i]) {
82 numValues -= 1;
83 }
84 }
85 }
86 }
87 return numValues;
88 }
89
90 void ColumnReader::next(ColumnVectorBatch& rowBatch,
91 uint64_t numValues,
92 char* incomingMask) {
93 if (numValues > rowBatch.capacity) {
94 rowBatch.resize(numValues);
95 }
96 rowBatch.numElements = numValues;
97 ByteRleDecoder* decoder = notNullDecoder.get();
98 if (decoder) {
99 char* notNullArray = rowBatch.notNull.data();
100 decoder->next(notNullArray, numValues, incomingMask);
101 // check to see if there are nulls in this batch
102 for(uint64_t i=0; i < numValues; ++i) {
103 if (!notNullArray[i]) {
104 rowBatch.hasNulls = true;
105 return;
106 }
107 }
108 } else if (incomingMask) {
109 // If we don't have a notNull stream, copy the incomingMask
110 rowBatch.hasNulls = true;
111 memcpy(rowBatch.notNull.data(), incomingMask, numValues);
112 return;
113 }
114 rowBatch.hasNulls = false;
115 }
116
117 /**
118 * Expand an array of bytes in place to the corresponding array of longs.
119 * Has to work backwards so that they data isn't clobbered during the
120 * expansion.
121 * @param buffer the array of chars and array of longs that need to be
122 * expanded
123 * @param numValues the number of bytes to convert to longs
124 */
125 void expandBytesToLongs(int64_t* buffer, uint64_t numValues) {
126 for(size_t i=numValues - 1; i < numValues; --i) {
127 buffer[i] = reinterpret_cast<char *>(buffer)[i];
128 }
129 }
130
131 class BooleanColumnReader: public ColumnReader {
132 private:
133 std::unique_ptr<orc::ByteRleDecoder> rle;
134
135 public:
136 BooleanColumnReader(const Type& type, StripeStreams& stipe);
137 ~BooleanColumnReader() override;
138
139 uint64_t skip(uint64_t numValues) override;
140
141 void next(ColumnVectorBatch& rowBatch,
142 uint64_t numValues,
143 char* notNull) override;
144 };
145
146 BooleanColumnReader::BooleanColumnReader(const Type& type,
147 StripeStreams& stripe
148 ): ColumnReader(type, stripe){
149 std::unique_ptr<SeekableInputStream> stream =
150 stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
151 if (stream == nullptr)
152 throw ParseError("DATA stream not found in Boolean column");
153 rle = createBooleanRleDecoder(std::move(stream));
154 }
155
156 BooleanColumnReader::~BooleanColumnReader() {
157 // PASS
158 }
159
160 uint64_t BooleanColumnReader::skip(uint64_t numValues) {
161 numValues = ColumnReader::skip(numValues);
162 rle->skip(numValues);
163 return numValues;
164 }
165
166 void BooleanColumnReader::next(ColumnVectorBatch& rowBatch,
167 uint64_t numValues,
168 char *notNull) {
169 ColumnReader::next(rowBatch, numValues, notNull);
170 // Since the byte rle places the output in a char* instead of long*,
171 // we cheat here and use the long* and then expand it in a second pass.
172 int64_t *ptr = dynamic_cast<LongVectorBatch&>(rowBatch).data.data();
173 rle->next(reinterpret_cast<char*>(ptr),
174 numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
175 expandBytesToLongs(ptr, numValues);
176 }
177
178 class ByteColumnReader: public ColumnReader {
179 private:
180 std::unique_ptr<orc::ByteRleDecoder> rle;
181
182 public:
183 ByteColumnReader(const Type& type, StripeStreams& stipe);
184 ~ByteColumnReader() override;
185
186 uint64_t skip(uint64_t numValues) override;
187
188 void next(ColumnVectorBatch& rowBatch,
189 uint64_t numValues,
190 char* notNull) override;
191 };
192
193 ByteColumnReader::ByteColumnReader(const Type& type,
194 StripeStreams& stripe
195 ): ColumnReader(type, stripe){
196 std::unique_ptr<SeekableInputStream> stream =
197 stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
198 if (stream == nullptr)
199 throw ParseError("DATA stream not found in Byte column");
200 rle = createByteRleDecoder(std::move(stream));
201 }
202
203 ByteColumnReader::~ByteColumnReader() {
204 // PASS
205 }
206
207 uint64_t ByteColumnReader::skip(uint64_t numValues) {
208 numValues = ColumnReader::skip(numValues);
209 rle->skip(numValues);
210 return numValues;
211 }
212
213 void ByteColumnReader::next(ColumnVectorBatch& rowBatch,
214 uint64_t numValues,
215 char *notNull) {
216 ColumnReader::next(rowBatch, numValues, notNull);
217 // Since the byte rle places the output in a char* instead of long*,
218 // we cheat here and use the long* and then expand it in a second pass.
219 int64_t *ptr = dynamic_cast<LongVectorBatch&>(rowBatch).data.data();
220 rle->next(reinterpret_cast<char*>(ptr),
221 numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
222 expandBytesToLongs(ptr, numValues);
223 }
224
225 class IntegerColumnReader: public ColumnReader {
226 protected:
227 std::unique_ptr<orc::RleDecoder> rle;
228
229 public:
230 IntegerColumnReader(const Type& type, StripeStreams& stripe);
231 ~IntegerColumnReader() override;
232
233 uint64_t skip(uint64_t numValues) override;
234
235 void next(ColumnVectorBatch& rowBatch,
236 uint64_t numValues,
237 char* notNull) override;
238 };
239
240 IntegerColumnReader::IntegerColumnReader(const Type& type,
241 StripeStreams& stripe
242 ): ColumnReader(type, stripe) {
243 RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
244 std::unique_ptr<SeekableInputStream> stream =
245 stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
246 if (stream == nullptr)
247 throw ParseError("DATA stream not found in Integer column");
248 rle = createRleDecoder(std::move(stream), true, vers, memoryPool);
249 }
250
251 IntegerColumnReader::~IntegerColumnReader() {
252 // PASS
253 }
254
255 uint64_t IntegerColumnReader::skip(uint64_t numValues) {
256 numValues = ColumnReader::skip(numValues);
257 rle->skip(numValues);
258 return numValues;
259 }
260
261 void IntegerColumnReader::next(ColumnVectorBatch& rowBatch,
262 uint64_t numValues,
263 char *notNull) {
264 ColumnReader::next(rowBatch, numValues, notNull);
265 rle->next(dynamic_cast<LongVectorBatch&>(rowBatch).data.data(),
266 numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
267 }
268
269 class TimestampColumnReader: public ColumnReader {
270 private:
271 std::unique_ptr<orc::RleDecoder> secondsRle;
272 std::unique_ptr<orc::RleDecoder> nanoRle;
273 const Timezone& writerTimezone;
274 const int64_t epochOffset;
275
276 public:
277 TimestampColumnReader(const Type& type, StripeStreams& stripe);
278 ~TimestampColumnReader() override;
279
280 uint64_t skip(uint64_t numValues) override;
281
282 void next(ColumnVectorBatch& rowBatch,
283 uint64_t numValues,
284 char* notNull) override;
285 };
286
287
288 TimestampColumnReader::TimestampColumnReader(const Type& type,
289 StripeStreams& stripe
290 ): ColumnReader(type, stripe),
291 writerTimezone(stripe.getWriterTimezone()),
292 epochOffset(writerTimezone.getEpoch()) {
293 RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
294 std::unique_ptr<SeekableInputStream> stream =
295 stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
296 if (stream == nullptr)
297 throw ParseError("DATA stream not found in Timestamp column");
298 secondsRle = createRleDecoder(std::move(stream), true, vers, memoryPool);
299 stream = stripe.getStream(columnId, proto::Stream_Kind_SECONDARY, true);
300 if (stream == nullptr)
301 throw ParseError("SECONDARY stream not found in Timestamp column");
302 nanoRle = createRleDecoder(std::move(stream), false, vers, memoryPool);
303 }
304
305 TimestampColumnReader::~TimestampColumnReader() {
306 // PASS
307 }
308
309 uint64_t TimestampColumnReader::skip(uint64_t numValues) {
310 numValues = ColumnReader::skip(numValues);
311 secondsRle->skip(numValues);
312 nanoRle->skip(numValues);
313 return numValues;
314 }
315
316 void TimestampColumnReader::next(ColumnVectorBatch& rowBatch,
317 uint64_t numValues,
318 char *notNull) {
319 ColumnReader::next(rowBatch, numValues, notNull);
320 notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
321 TimestampVectorBatch& timestampBatch =
322 dynamic_cast<TimestampVectorBatch&>(rowBatch);
323 int64_t *secsBuffer = timestampBatch.data.data();
324 secondsRle->next(secsBuffer, numValues, notNull);
325 int64_t *nanoBuffer = timestampBatch.nanoseconds.data();
326 nanoRle->next(nanoBuffer, numValues, notNull);
327
328 // Construct the values
329 for(uint64_t i=0; i < numValues; i++) {
330 if (notNull == nullptr || notNull[i]) {
331 uint64_t zeros = nanoBuffer[i] & 0x7;
332 nanoBuffer[i] >>= 3;
333 if (zeros != 0) {
334 for(uint64_t j = 0; j <= zeros; ++j) {
335 nanoBuffer[i] *= 10;
336 }
337 }
338 int64_t writerTime = secsBuffer[i] + epochOffset;
339 secsBuffer[i] = writerTimezone.convertToUTC(writerTime);
340 if (secsBuffer[i] < 0 && nanoBuffer[i] != 0) {
341 secsBuffer[i] -= 1;
342 }
343 }
344 }
345 }
346
347 class DoubleColumnReader: public ColumnReader {
348 public:
349 DoubleColumnReader(const Type& type, StripeStreams& stripe);
350 ~DoubleColumnReader() override;
351
352 uint64_t skip(uint64_t numValues) override;
353
354 void next(ColumnVectorBatch& rowBatch,
355 uint64_t numValues,
356 char* notNull) override;
357
358 private:
359 std::unique_ptr<SeekableInputStream> inputStream;
360 TypeKind columnKind;
361 const uint64_t bytesPerValue ;
362 const char *bufferPointer;
363 const char *bufferEnd;
364
365 unsigned char readByte() {
366 if (bufferPointer == bufferEnd) {
367 int length;
368 if (!inputStream->Next
369 (reinterpret_cast<const void**>(&bufferPointer), &length)) {
370 throw ParseError("bad read in DoubleColumnReader::next()");
371 }
372 bufferEnd = bufferPointer + length;
373 }
374 return static_cast<unsigned char>(*(bufferPointer++));
375 }
376
377 double readDouble() {
378 int64_t bits = 0;
379 for (uint64_t i=0; i < 8; i++) {
380 bits |= static_cast<int64_t>(readByte()) << (i*8);
381 }
382 double *result = reinterpret_cast<double*>(&bits);
383 return *result;
384 }
385
386 double readFloat() {
387 int32_t bits = 0;
388 for (uint64_t i=0; i < 4; i++) {
389 bits |= readByte() << (i*8);
390 }
391 float *result = reinterpret_cast<float*>(&bits);
392 return static_cast<double>(*result);
393 }
394 };
395
396 DoubleColumnReader::DoubleColumnReader(const Type& type,
397 StripeStreams& stripe
398 ): ColumnReader(type, stripe),
399 columnKind(type.getKind()),
400 bytesPerValue((type.getKind() ==
401 FLOAT) ? 4 : 8),
402 bufferPointer(nullptr),
403 bufferEnd(nullptr) {
404 inputStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
405 if (inputStream == nullptr)
406 throw ParseError("DATA stream not found in Double column");
407 }
408
409 DoubleColumnReader::~DoubleColumnReader() {
410 // PASS
411 }
412
413 uint64_t DoubleColumnReader::skip(uint64_t numValues) {
414 numValues = ColumnReader::skip(numValues);
415
416 if (static_cast<size_t>(bufferEnd - bufferPointer) >=
417 bytesPerValue * numValues) {
418 bufferPointer += bytesPerValue * numValues;
419 } else {
420 inputStream->Skip(static_cast<int>(bytesPerValue * numValues -
421 static_cast<size_t>(bufferEnd - bufferPointer)));
422 bufferEnd = nullptr;
423 bufferPointer = nullptr;
424 }
425
426 return numValues;
427 }
428
429 void DoubleColumnReader::next(ColumnVectorBatch& rowBatch,
430 uint64_t numValues,
431 char *notNull) {
432 ColumnReader::next(rowBatch, numValues, notNull);
433 // update the notNull from the parent class
434 notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
435 double* outArray = dynamic_cast<DoubleVectorBatch&>(rowBatch).data.data();
436
437 if (columnKind == FLOAT) {
438 if (notNull) {
439 for(size_t i=0; i < numValues; ++i) {
440 if (notNull[i]) {
441 outArray[i] = readFloat();
442 }
443 }
444 } else {
445 for(size_t i=0; i < numValues; ++i) {
446 outArray[i] = readFloat();
447 }
448 }
449 } else {
450 if (notNull) {
451 for(size_t i=0; i < numValues; ++i) {
452 if (notNull[i]) {
453 outArray[i] = readDouble();
454 }
455 }
456 } else {
457 for(size_t i=0; i < numValues; ++i) {
458 outArray[i] = readDouble();
459 }
460 }
461 }
462 }
463
464 void readFully(char* buffer, int64_t bufferSize, SeekableInputStream* stream) {
465 int64_t posn = 0;
466 while (posn < bufferSize) {
467 const void* chunk;
468 int length;
469 if (!stream->Next(&chunk, &length)) {
470 throw ParseError("bad read in readFully");
471 }
472 if (posn + length > bufferSize) {
473 throw ParseError("Corrupt dictionary blob in StringDictionaryColumn");
474 }
475 memcpy(buffer + posn, chunk, static_cast<size_t>(length));
476 posn += length;
477 }
478 }
479
480 class StringDictionaryColumnReader: public ColumnReader {
481 private:
482 DataBuffer<char> dictionaryBlob;
483 DataBuffer<int64_t> dictionaryOffset;
484 std::unique_ptr<RleDecoder> rle;
485 uint64_t dictionaryCount;
486
487 public:
488 StringDictionaryColumnReader(const Type& type, StripeStreams& stipe);
489 ~StringDictionaryColumnReader() override;
490
491 uint64_t skip(uint64_t numValues) override;
492
493 void next(ColumnVectorBatch& rowBatch,
494 uint64_t numValues,
495 char *notNull) override;
496 };
497
498 StringDictionaryColumnReader::StringDictionaryColumnReader
499 (const Type& type,
500 StripeStreams& stripe
501 ): ColumnReader(type, stripe),
502 dictionaryBlob(stripe.getMemoryPool()),
503 dictionaryOffset(stripe.getMemoryPool()) {
504 RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId)
505 .kind());
506 dictionaryCount = stripe.getEncoding(columnId).dictionarysize();
507 std::unique_ptr<SeekableInputStream> stream =
508 stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
509 if (stream == nullptr)
510 throw ParseError("DATA stream not found in StringDictionaryColumn");
511 rle = createRleDecoder(std::move(stream), false, rleVersion, memoryPool);
512 stream = stripe.getStream(columnId, proto::Stream_Kind_LENGTH, false);
513 if (dictionaryCount > 0 && stream == nullptr) {
514 throw ParseError("LENGTH stream not found in StringDictionaryColumn");
515 }
516 std::unique_ptr<RleDecoder> lengthDecoder =
517 createRleDecoder(std::move(stream), false, rleVersion, memoryPool);
518 dictionaryOffset.resize(dictionaryCount+1);
519 int64_t* lengthArray = dictionaryOffset.data();
520 lengthDecoder->next(lengthArray + 1, dictionaryCount, nullptr);
521 lengthArray[0] = 0;
522 for (uint64_t i = 1; i < dictionaryCount + 1; ++i) {
523 if (lengthArray[i] < 0)
524 throw ParseError("Negative dictionary entry length");
525 lengthArray[i] += lengthArray[i - 1];
526 }
527 int64_t blobSize = lengthArray[dictionaryCount];
528 dictionaryBlob.resize(static_cast<uint64_t>(blobSize));
529 std::unique_ptr<SeekableInputStream> blobStream =
530 stripe.getStream(columnId, proto::Stream_Kind_DICTIONARY_DATA, false);
531 if (blobSize > 0 && blobStream == nullptr) {
532 throw ParseError(
533 "DICTIONARY_DATA stream not found in StringDictionaryColumn");
534 }
535 readFully(dictionaryBlob.data(), blobSize, blobStream.get());
536 }
537
538 StringDictionaryColumnReader::~StringDictionaryColumnReader() {
539 // PASS
540 }
541
542 uint64_t StringDictionaryColumnReader::skip(uint64_t numValues) {
543 numValues = ColumnReader::skip(numValues);
544 rle->skip(numValues);
545 return numValues;
546 }
547
548 void StringDictionaryColumnReader::next(ColumnVectorBatch& rowBatch,
549 uint64_t numValues,
550 char *notNull) {
551 ColumnReader::next(rowBatch, numValues, notNull);
552 // update the notNull from the parent class
553 notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
554 StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
555 char *blob = dictionaryBlob.data();
556 int64_t *dictionaryOffsets = dictionaryOffset.data();
557 char **outputStarts = byteBatch.data.data();
558 int64_t *outputLengths = byteBatch.length.data();
559 rle->next(outputLengths, numValues, notNull);
560 if (notNull) {
561 for(uint64_t i=0; i < numValues; ++i) {
562 if (notNull[i]) {
563 int64_t entry = outputLengths[i];
564 if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount) {
565 throw ParseError("Entry index out of range in StringDictionaryColumn");
566 }
567 outputStarts[i] = blob + dictionaryOffsets[entry];
568 outputLengths[i] = dictionaryOffsets[entry+1] -
569 dictionaryOffsets[entry];
570 }
571 }
572 } else {
573 for(uint64_t i=0; i < numValues; ++i) {
574 int64_t entry = outputLengths[i];
575 if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount) {
576 throw ParseError("Entry index out of range in StringDictionaryColumn");
577 }
578 outputStarts[i] = blob + dictionaryOffsets[entry];
579 outputLengths[i] = dictionaryOffsets[entry+1] -
580 dictionaryOffsets[entry];
581 }
582 }
583 }
584
585 class StringDirectColumnReader: public ColumnReader {
586 private:
587 DataBuffer<char> blobBuffer;
588 std::unique_ptr<RleDecoder> lengthRle;
589 std::unique_ptr<SeekableInputStream> blobStream;
590 const char *lastBuffer;
591 size_t lastBufferLength;
592
593 /**
594 * Compute the total length of the values.
595 * @param lengths the array of lengths
596 * @param notNull the array of notNull flags
597 * @param numValues the lengths of the arrays
598 * @return the total number of bytes for the non-null values
599 */
600 size_t computeSize(const int64_t *lengths, const char *notNull,
601 uint64_t numValues);
602
603 public:
604 StringDirectColumnReader(const Type& type, StripeStreams& stipe);
605 ~StringDirectColumnReader() override;
606
607 uint64_t skip(uint64_t numValues) override;
608
609 void next(ColumnVectorBatch& rowBatch,
610 uint64_t numValues,
611 char *notNull) override;
612 };
613
614 StringDirectColumnReader::StringDirectColumnReader
615 (const Type& type,
616 StripeStreams& stripe
617 ): ColumnReader(type, stripe),
618 blobBuffer(stripe.getMemoryPool()) {
619 RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId)
620 .kind());
621 std::unique_ptr<SeekableInputStream> stream =
622 stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true);
623 if (stream == nullptr)
624 throw ParseError("LENGTH stream not found in StringDirectColumn");
625 lengthRle = createRleDecoder(
626 std::move(stream), false, rleVersion, memoryPool);
627 blobStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
628 if (blobStream == nullptr)
629 throw ParseError("DATA stream not found in StringDirectColumn");
630 lastBuffer = nullptr;
631 lastBufferLength = 0;
632 }
633
634 StringDirectColumnReader::~StringDirectColumnReader() {
635 // PASS
636 }
637
638 uint64_t StringDirectColumnReader::skip(uint64_t numValues) {
639 const size_t BUFFER_SIZE = 1024;
640 numValues = ColumnReader::skip(numValues);
641 int64_t buffer[BUFFER_SIZE];
642 uint64_t done = 0;
643 size_t totalBytes = 0;
644 // read the lengths, so we know haw many bytes to skip
645 while (done < numValues) {
646 uint64_t step = std::min(BUFFER_SIZE,
647 static_cast<size_t>(numValues - done));
648 lengthRle->next(buffer, step, nullptr);
649 totalBytes += computeSize(buffer, nullptr, step);
650 done += step;
651 }
652 if (totalBytes <= lastBufferLength) {
653 // subtract the needed bytes from the ones left over
654 lastBufferLength -= totalBytes;
655 lastBuffer += totalBytes;
656 } else {
657 // move the stream forward after accounting for the buffered bytes
658 totalBytes -= lastBufferLength;
659 blobStream->Skip(static_cast<int>(totalBytes));
660 lastBufferLength = 0;
661 lastBuffer = nullptr;
662 }
663 return numValues;
664 }
665
666 size_t StringDirectColumnReader::computeSize(const int64_t* lengths,
667 const char* notNull,
668 uint64_t numValues) {
669 size_t totalLength = 0;
670 if (notNull) {
671 for(size_t i=0; i < numValues; ++i) {
672 if (notNull[i]) {
673 totalLength += static_cast<size_t>(lengths[i]);
674 }
675 }
676 } else {
677 for(size_t i=0; i < numValues; ++i) {
678 totalLength += static_cast<size_t>(lengths[i]);
679 }
680 }
681 return totalLength;
682 }
683
684 void StringDirectColumnReader::next(ColumnVectorBatch& rowBatch,
685 uint64_t numValues,
686 char *notNull) {
687 ColumnReader::next(rowBatch, numValues, notNull);
688 // update the notNull from the parent class
689 notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
690 StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
691 char **startPtr = byteBatch.data.data();
692 int64_t *lengthPtr = byteBatch.length.data();
693
694 // read the length vector
695 lengthRle->next(lengthPtr, numValues, notNull);
696
697 // figure out the total length of data we need from the blob stream
698 const size_t totalLength = computeSize(lengthPtr, notNull, numValues);
699
700 // Load data from the blob stream into our buffer until we have enough
701 // to get the rest directly out of the stream's buffer.
702 size_t bytesBuffered = 0;
703 blobBuffer.resize(totalLength);
704 char *ptr= blobBuffer.data();
705 while (bytesBuffered + lastBufferLength < totalLength) {
706 blobBuffer.resize(bytesBuffered + lastBufferLength);
707 memcpy(ptr + bytesBuffered, lastBuffer, lastBufferLength);
708 bytesBuffered += lastBufferLength;
709 const void* readBuffer;
710 int readLength;
711 if (!blobStream->Next(&readBuffer, &readLength)) {
712 throw ParseError("failed to read in StringDirectColumnReader.next");
713 }
714 lastBuffer = static_cast<const char*>(readBuffer);
715 lastBufferLength = static_cast<size_t>(readLength);
716 }
717
718 // Set up the start pointers for the ones that will come out of the buffer.
719 size_t filledSlots = 0;
720 size_t usedBytes = 0;
721 ptr = blobBuffer.data();
722 if (notNull) {
723 while (filledSlots < numValues &&
724 (!notNull[filledSlots] ||
725 usedBytes + static_cast<size_t>(lengthPtr[filledSlots]) <=
726 bytesBuffered)) {
727 if (notNull[filledSlots]) {
728 startPtr[filledSlots] = ptr + usedBytes;
729 usedBytes += static_cast<size_t>(lengthPtr[filledSlots]);
730 }
731 filledSlots += 1;
732 }
733 } else {
734 while (filledSlots < numValues &&
735 (usedBytes + static_cast<size_t>(lengthPtr[filledSlots]) <=
736 bytesBuffered)) {
737 startPtr[filledSlots] = ptr + usedBytes;
738 usedBytes += static_cast<size_t>(lengthPtr[filledSlots]);
739 filledSlots += 1;
740 }
741 }
742
743 // do we need to complete the last value in the blob buffer?
744 if (usedBytes < bytesBuffered) {
745 size_t moreBytes = static_cast<size_t>(lengthPtr[filledSlots]) -
746 (bytesBuffered - usedBytes);
747 blobBuffer.resize(bytesBuffered + moreBytes);
748 ptr = blobBuffer.data();
749 memcpy(ptr + bytesBuffered, lastBuffer, moreBytes);
750 lastBuffer += moreBytes;
751 lastBufferLength -= moreBytes;
752 startPtr[filledSlots++] = ptr + usedBytes;
753 }
754
755 // Finally, set up any remaining entries into the stream buffer
756 if (notNull) {
757 while (filledSlots < numValues) {
758 if (notNull[filledSlots]) {
759 startPtr[filledSlots] = const_cast<char*>(lastBuffer);
760 lastBuffer += lengthPtr[filledSlots];
761 lastBufferLength -= static_cast<size_t>(lengthPtr[filledSlots]);
762 }
763 filledSlots += 1;
764 }
765 } else {
766 while (filledSlots < numValues) {
767 startPtr[filledSlots] = const_cast<char*>(lastBuffer);
768 lastBuffer += lengthPtr[filledSlots];
769 lastBufferLength -= static_cast<size_t>(lengthPtr[filledSlots]);
770 filledSlots += 1;
771 }
772 }
773 }
774
775 class StructColumnReader: public ColumnReader {
776 private:
777 std::vector<ColumnReader*> children;
778
779 public:
780 StructColumnReader(const Type& type, StripeStreams& stipe);
781 ~StructColumnReader() override;
782
783 uint64_t skip(uint64_t numValues) override;
784
785 void next(ColumnVectorBatch& rowBatch,
786 uint64_t numValues,
787 char *notNull) override;
788 };
789
790 StructColumnReader::StructColumnReader(const Type& type,
791 StripeStreams& stripe
792 ): ColumnReader(type, stripe) {
793 // count the number of selected sub-columns
794 const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
795 switch (static_cast<int64_t>(stripe.getEncoding(columnId).kind())) {
796 case proto::ColumnEncoding_Kind_DIRECT:
797 for(unsigned int i=0; i < type.getSubtypeCount(); ++i) {
798 const Type& child = *type.getSubtype(i);
799 if (selectedColumns[static_cast<uint64_t>(child.getColumnId())]) {
800 children.push_back(buildReader(child, stripe).release());
801 }
802 }
803 break;
804 case proto::ColumnEncoding_Kind_DIRECT_V2:
805 case proto::ColumnEncoding_Kind_DICTIONARY:
806 case proto::ColumnEncoding_Kind_DICTIONARY_V2:
807 default:
808 throw ParseError("Unknown encoding for StructColumnReader");
809 }
810 }
811
812 StructColumnReader::~StructColumnReader() {
813 for (size_t i=0; i<children.size(); i++) {
814 delete children[i];
815 }
816 }
817
818 uint64_t StructColumnReader::skip(uint64_t numValues) {
819 numValues = ColumnReader::skip(numValues);
820 for(std::vector<ColumnReader*>::iterator ptr=children.begin(); ptr != children.end(); ++ptr) {
821 (*ptr)->skip(numValues);
822 }
823 return numValues;
824 }
825
826 void StructColumnReader::next(ColumnVectorBatch& rowBatch,
827 uint64_t numValues,
828 char *notNull) {
829 ColumnReader::next(rowBatch, numValues, notNull);
830 uint64_t i=0;
831 notNull = rowBatch.hasNulls? rowBatch.notNull.data() : nullptr;
832 for(std::vector<ColumnReader*>::iterator ptr=children.begin();
833 ptr != children.end(); ++ptr, ++i) {
834 (*ptr)->next(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]),
835 numValues, notNull);
836 }
837 }
838
839 class ListColumnReader: public ColumnReader {
840 private:
841 std::unique_ptr<ColumnReader> child;
842 std::unique_ptr<RleDecoder> rle;
843
844 public:
845 ListColumnReader(const Type& type, StripeStreams& stipe);
846 ~ListColumnReader() override;
847
848 uint64_t skip(uint64_t numValues) override;
849
850 void next(ColumnVectorBatch& rowBatch,
851 uint64_t numValues,
852 char *notNull) override;
853 };
854
855 ListColumnReader::ListColumnReader(const Type& type,
856 StripeStreams& stripe
857 ): ColumnReader(type, stripe) {
858 // count the number of selected sub-columns
859 const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
860 RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
861 std::unique_ptr<SeekableInputStream> stream =
862 stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true);
863 if (stream == nullptr)
864 throw ParseError("LENGTH stream not found in List column");
865 rle = createRleDecoder(std::move(stream), false, vers, memoryPool);
866 const Type& childType = *type.getSubtype(0);
867 if (selectedColumns[static_cast<uint64_t>(childType.getColumnId())]) {
868 child = buildReader(childType, stripe);
869 }
870 }
871
872 ListColumnReader::~ListColumnReader() {
873 // PASS
874 }
875
876 uint64_t ListColumnReader::skip(uint64_t numValues) {
877 numValues = ColumnReader::skip(numValues);
878 ColumnReader *childReader = child.get();
879 if (childReader) {
880 const uint64_t BUFFER_SIZE = 1024;
881 int64_t buffer[BUFFER_SIZE];
882 uint64_t childrenElements = 0;
883 uint64_t lengthsRead = 0;
884 while (lengthsRead < numValues) {
885 uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
886 rle->next(buffer, chunk, nullptr);
887 for(size_t i=0; i < chunk; ++i) {
888 childrenElements += static_cast<size_t>(buffer[i]);
889 }
890 lengthsRead += chunk;
891 }
892 childReader->skip(childrenElements);
893 } else {
894 rle->skip(numValues);
895 }
896 return numValues;
897 }
898
899 void ListColumnReader::next(ColumnVectorBatch& rowBatch,
900 uint64_t numValues,
901 char *notNull) {
902 ColumnReader::next(rowBatch, numValues, notNull);
903 ListVectorBatch &listBatch = dynamic_cast<ListVectorBatch&>(rowBatch);
904 int64_t* offsets = listBatch.offsets.data();
905 notNull = listBatch.hasNulls ? listBatch.notNull.data() : nullptr;
906 rle->next(offsets, numValues, notNull);
907 uint64_t totalChildren = 0;
908 if (notNull) {
909 for(size_t i=0; i < numValues; ++i) {
910 if (notNull[i]) {
911 uint64_t tmp = static_cast<uint64_t>(offsets[i]);
912 offsets[i] = static_cast<int64_t>(totalChildren);
913 totalChildren += tmp;
914 } else {
915 offsets[i] = static_cast<int64_t>(totalChildren);
916 }
917 }
918 } else {
919 for(size_t i=0; i < numValues; ++i) {
920 uint64_t tmp = static_cast<uint64_t>(offsets[i]);
921 offsets[i] = static_cast<int64_t>(totalChildren);
922 totalChildren += tmp;
923 }
924 }
925 offsets[numValues] = static_cast<int64_t>(totalChildren);
926 ColumnReader *childReader = child.get();
927 if (childReader) {
928 childReader->next(*(listBatch.elements.get()), totalChildren, nullptr);
929 }
930 }
931
932 class MapColumnReader: public ColumnReader {
933 private:
934 std::unique_ptr<ColumnReader> keyReader;
935 std::unique_ptr<ColumnReader> elementReader;
936 std::unique_ptr<RleDecoder> rle;
937
938 public:
939 MapColumnReader(const Type& type, StripeStreams& stipe);
940 ~MapColumnReader() override;
941
942 uint64_t skip(uint64_t numValues) override;
943
944 void next(ColumnVectorBatch& rowBatch,
945 uint64_t numValues,
946 char *notNull) override;
947 };
948
949 MapColumnReader::MapColumnReader(const Type& type,
950 StripeStreams& stripe
951 ): ColumnReader(type, stripe) {
952 // Determine if the key and/or value columns are selected
953 const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
954 RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
955 std::unique_ptr<SeekableInputStream> stream =
956 stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true);
957 if (stream == nullptr)
958 throw ParseError("LENGTH stream not found in Map column");
959 rle = createRleDecoder(std::move(stream), false, vers, memoryPool);
960 const Type& keyType = *type.getSubtype(0);
961 if (selectedColumns[static_cast<uint64_t>(keyType.getColumnId())]) {
962 keyReader = buildReader(keyType, stripe);
963 }
964 const Type& elementType = *type.getSubtype(1);
965 if (selectedColumns[static_cast<uint64_t>(elementType.getColumnId())]) {
966 elementReader = buildReader(elementType, stripe);
967 }
968 }
969
970 MapColumnReader::~MapColumnReader() {
971 // PASS
972 }
973
974 uint64_t MapColumnReader::skip(uint64_t numValues) {
975 numValues = ColumnReader::skip(numValues);
976 ColumnReader *rawKeyReader = keyReader.get();
977 ColumnReader *rawElementReader = elementReader.get();
978 if (rawKeyReader || rawElementReader) {
979 const uint64_t BUFFER_SIZE = 1024;
980 int64_t buffer[BUFFER_SIZE];
981 uint64_t childrenElements = 0;
982 uint64_t lengthsRead = 0;
983 while (lengthsRead < numValues) {
984 uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
985 rle->next(buffer, chunk, nullptr);
986 for(size_t i=0; i < chunk; ++i) {
987 childrenElements += static_cast<size_t>(buffer[i]);
988 }
989 lengthsRead += chunk;
990 }
991 if (rawKeyReader) {
992 rawKeyReader->skip(childrenElements);
993 }
994 if (rawElementReader) {
995 rawElementReader->skip(childrenElements);
996 }
997 } else {
998 rle->skip(numValues);
999 }
1000 return numValues;
1001 }
1002
1003 void MapColumnReader::next(ColumnVectorBatch& rowBatch,
1004 uint64_t numValues,
1005 char *notNull) {
1006 ColumnReader::next(rowBatch, numValues, notNull);
1007 MapVectorBatch &mapBatch = dynamic_cast<MapVectorBatch&>(rowBatch);
1008 int64_t* offsets = mapBatch.offsets.data();
1009 notNull = mapBatch.hasNulls ? mapBatch.notNull.data() : nullptr;
1010 rle->next(offsets, numValues, notNull);
1011 uint64_t totalChildren = 0;
1012 if (notNull) {
1013 for(size_t i=0; i < numValues; ++i) {
1014 if (notNull[i]) {
1015 uint64_t tmp = static_cast<uint64_t>(offsets[i]);
1016 offsets[i] = static_cast<int64_t>(totalChildren);
1017 totalChildren += tmp;
1018 } else {
1019 offsets[i] = static_cast<int64_t>(totalChildren);
1020 }
1021 }
1022 } else {
1023 for(size_t i=0; i < numValues; ++i) {
1024 uint64_t tmp = static_cast<uint64_t>(offsets[i]);
1025 offsets[i] = static_cast<int64_t>(totalChildren);
1026 totalChildren += tmp;
1027 }
1028 }
1029 offsets[numValues] = static_cast<int64_t>(totalChildren);
1030 ColumnReader *rawKeyReader = keyReader.get();
1031 if (rawKeyReader) {
1032 rawKeyReader->next(*(mapBatch.keys.get()), totalChildren, nullptr);
1033 }
1034 ColumnReader *rawElementReader = elementReader.get();
1035 if (rawElementReader) {
1036 rawElementReader->next(*(mapBatch.elements.get()), totalChildren, nullptr);
1037 }
1038 }
1039
1040 class UnionColumnReader: public ColumnReader {
1041 private:
1042 std::unique_ptr<ByteRleDecoder> rle;
1043 std::vector<ColumnReader*> childrenReader;
1044 std::vector<int64_t> childrenCounts;
1045 uint64_t numChildren;
1046
1047 public:
1048 UnionColumnReader(const Type& type, StripeStreams& stipe);
1049 ~UnionColumnReader() override;
1050
1051 uint64_t skip(uint64_t numValues) override;
1052
1053 void next(ColumnVectorBatch& rowBatch,
1054 uint64_t numValues,
1055 char *notNull) override;
1056 };
1057
1058 UnionColumnReader::UnionColumnReader(const Type& type,
1059 StripeStreams& stripe
1060 ): ColumnReader(type, stripe) {
1061 numChildren = type.getSubtypeCount();
1062 childrenReader.resize(numChildren);
1063 childrenCounts.resize(numChildren);
1064
1065 std::unique_ptr<SeekableInputStream> stream =
1066 stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
1067 if (stream == nullptr)
1068 throw ParseError("LENGTH stream not found in Union column");
1069 rle = createByteRleDecoder(std::move(stream));
1070 // figure out which types are selected
1071 const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
1072 for(unsigned int i=0; i < numChildren; ++i) {
1073 const Type &child = *type.getSubtype(i);
1074 if (selectedColumns[static_cast<size_t>(child.getColumnId())]) {
1075 childrenReader[i] = buildReader(child, stripe).release();
1076 }
1077 }
1078 }
1079
1080 UnionColumnReader::~UnionColumnReader() {
1081 for(std::vector<ColumnReader*>::iterator itr = childrenReader.begin();
1082 itr != childrenReader.end(); ++itr) {
1083 delete *itr;
1084 }
1085 }
1086
1087 uint64_t UnionColumnReader::skip(uint64_t numValues) {
1088 numValues = ColumnReader::skip(numValues);
1089 const uint64_t BUFFER_SIZE = 1024;
1090 char buffer[BUFFER_SIZE];
1091 uint64_t lengthsRead = 0;
1092 int64_t *counts = childrenCounts.data();
1093 memset(counts, 0, sizeof(int64_t) * numChildren);
1094 while (lengthsRead < numValues) {
1095 uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
1096 rle->next(buffer, chunk, nullptr);
1097 for(size_t i=0; i < chunk; ++i) {
1098 counts[static_cast<size_t>(buffer[i])] += 1;
1099 }
1100 lengthsRead += chunk;
1101 }
1102 for(size_t i=0; i < numChildren; ++i) {
1103 if (counts[i] != 0 && childrenReader[i] != nullptr) {
1104 childrenReader[i]->skip(static_cast<uint64_t>(counts[i]));
1105 }
1106 }
1107 return numValues;
1108 }
1109
1110 void UnionColumnReader::next(ColumnVectorBatch& rowBatch,
1111 uint64_t numValues,
1112 char *notNull) {
1113 ColumnReader::next(rowBatch, numValues, notNull);
1114 UnionVectorBatch &unionBatch = dynamic_cast<UnionVectorBatch&>(rowBatch);
1115 uint64_t* offsets = unionBatch.offsets.data();
1116 int64_t* counts = childrenCounts.data();
1117 memset(counts, 0, sizeof(int64_t) * numChildren);
1118 unsigned char* tags = unionBatch.tags.data();
1119 notNull = unionBatch.hasNulls ? unionBatch.notNull.data() : nullptr;
1120 rle->next(reinterpret_cast<char *>(tags), numValues, notNull);
1121 // set the offsets for each row
1122 if (notNull) {
1123 for(size_t i=0; i < numValues; ++i) {
1124 if (notNull[i]) {
1125 offsets[i] =
1126 static_cast<uint64_t>(counts[static_cast<size_t>(tags[i])]++);
1127 }
1128 }
1129 } else {
1130 for(size_t i=0; i < numValues; ++i) {
1131 offsets[i] =
1132 static_cast<uint64_t>(counts[static_cast<size_t>(tags[i])]++);
1133 }
1134 }
1135 // read the right number of each child column
1136 for(size_t i=0; i < numChildren; ++i) {
1137 if (childrenReader[i] != nullptr) {
1138 childrenReader[i]->next(*(unionBatch.children[i]),
1139 static_cast<uint64_t>(counts[i]), nullptr);
1140 }
1141 }
1142 }
1143
1144 /**
1145 * Destructively convert the number from zigzag encoding to the
1146 * natural signed representation.
1147 */
1148 void unZigZagInt128(Int128& value) {
1149 bool needsNegate = value.getLowBits() & 1;
1150 value >>= 1;
1151 if (needsNegate) {
1152 value.negate();
1153 value -= 1;
1154 }
1155 }
1156
1157 class Decimal64ColumnReader: public ColumnReader {
1158 public:
1159 static const uint32_t MAX_PRECISION_64 = 18;
1160 static const uint32_t MAX_PRECISION_128 = 38;
1161 static const int64_t POWERS_OF_TEN[MAX_PRECISION_64 + 1];
1162
1163 protected:
1164 std::unique_ptr<SeekableInputStream> valueStream;
1165 int32_t precision;
1166 int32_t scale;
1167 const char* buffer;
1168 const char* bufferEnd;
1169
1170 std::unique_ptr<RleDecoder> scaleDecoder;
1171
1172 /**
1173 * Read the valueStream for more bytes.
1174 */
1175 void readBuffer() {
1176 while (buffer == bufferEnd) {
1177 int length;
1178 if (!valueStream->Next(reinterpret_cast<const void**>(&buffer),
1179 &length)) {
1180 throw ParseError("Read past end of stream in Decimal64ColumnReader "+
1181 valueStream->getName());
1182 }
1183 bufferEnd = buffer + length;
1184 }
1185 }
1186
1187 void readInt64(int64_t& value, int32_t currentScale) {
1188 value = 0;
1189 size_t offset = 0;
1190 while (true) {
1191 readBuffer();
1192 unsigned char ch = static_cast<unsigned char>(*(buffer++));
1193 value |= static_cast<uint64_t>(ch & 0x7f) << offset;
1194 offset += 7;
1195 if (!(ch & 0x80)) {
1196 break;
1197 }
1198 }
1199 value = unZigZag(static_cast<uint64_t>(value));
1200 if (scale > currentScale &&
1201 static_cast<uint64_t>(scale - currentScale) <= MAX_PRECISION_64) {
1202 value *= POWERS_OF_TEN[scale - currentScale];
1203 } else if (scale < currentScale &&
1204 static_cast<uint64_t>(currentScale - scale) <= MAX_PRECISION_64) {
1205 value /= POWERS_OF_TEN[currentScale - scale];
1206 } else if (scale != currentScale) {
1207 throw ParseError("Decimal scale out of range");
1208 }
1209 }
1210
1211 public:
1212 Decimal64ColumnReader(const Type& type, StripeStreams& stipe);
1213 ~Decimal64ColumnReader() override;
1214
1215 uint64_t skip(uint64_t numValues) override;
1216
1217 void next(ColumnVectorBatch& rowBatch,
1218 uint64_t numValues,
1219 char *notNull) override;
1220 };
1221 const uint32_t Decimal64ColumnReader::MAX_PRECISION_64;
1222 const uint32_t Decimal64ColumnReader::MAX_PRECISION_128;
1223 const int64_t Decimal64ColumnReader::POWERS_OF_TEN[MAX_PRECISION_64 + 1]=
1224 {1,
1225 10,
1226 100,
1227 1000,
1228 10000,
1229 100000,
1230 1000000,
1231 10000000,
1232 100000000,
1233 1000000000,
1234 10000000000,
1235 100000000000,
1236 1000000000000,
1237 10000000000000,
1238 100000000000000,
1239 1000000000000000,
1240 10000000000000000,
1241 100000000000000000,
1242 1000000000000000000};
1243
1244 Decimal64ColumnReader::Decimal64ColumnReader(const Type& type,
1245 StripeStreams& stripe
1246 ): ColumnReader(type, stripe) {
1247 scale = static_cast<int32_t>(type.getScale());
1248 precision = static_cast<int32_t>(type.getPrecision());
1249 valueStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
1250 if (valueStream == nullptr)
1251 throw ParseError("DATA stream not found in Decimal64Column");
1252 buffer = nullptr;
1253 bufferEnd = nullptr;
1254 RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
1255 std::unique_ptr<SeekableInputStream> stream =
1256 stripe.getStream(columnId, proto::Stream_Kind_SECONDARY, true);
1257 if (stream == nullptr)
1258 throw ParseError("SECONDARY stream not found in Decimal64Column");
1259 scaleDecoder = createRleDecoder(std::move(stream), true, vers, memoryPool);
1260 }
1261
1262 Decimal64ColumnReader::~Decimal64ColumnReader() {
1263 // PASS
1264 }
1265
1266 uint64_t Decimal64ColumnReader::skip(uint64_t numValues) {
1267 numValues = ColumnReader::skip(numValues);
1268 uint64_t skipped = 0;
1269 while (skipped < numValues) {
1270 readBuffer();
1271 if (!(0x80 & *(buffer++))) {
1272 skipped += 1;
1273 }
1274 }
1275 scaleDecoder->skip(numValues);
1276 return numValues;
1277 }
1278
1279 void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch,
1280 uint64_t numValues,
1281 char *notNull) {
1282 ColumnReader::next(rowBatch, numValues, notNull);
1283 notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
1284 Decimal64VectorBatch &batch =
1285 dynamic_cast<Decimal64VectorBatch&>(rowBatch);
1286 int64_t* values = batch.values.data();
1287 // read the next group of scales
1288 int64_t* scaleBuffer = batch.readScales.data();
1289 scaleDecoder->next(scaleBuffer, numValues, notNull);
1290 batch.precision = precision;
1291 batch.scale = scale;
1292 if (notNull) {
1293 for(size_t i=0; i < numValues; ++i) {
1294 if (notNull[i]) {
1295 readInt64(values[i], static_cast<int32_t>(scaleBuffer[i]));
1296 }
1297 }
1298 } else {
1299 for(size_t i=0; i < numValues; ++i) {
1300 readInt64(values[i], static_cast<int32_t>(scaleBuffer[i]));
1301 }
1302 }
1303 }
1304
1305 void scaleInt128(Int128& value, uint32_t scale, uint32_t currentScale) {
1306 if (scale > currentScale) {
1307 while(scale > currentScale) {
1308 uint32_t scaleAdjust =
1309 std::min(Decimal64ColumnReader::MAX_PRECISION_64,
1310 scale - currentScale);
1311 value *= Decimal64ColumnReader::POWERS_OF_TEN[scaleAdjust];
1312 currentScale += scaleAdjust;
1313 }
1314 } else if (scale < currentScale) {
1315 Int128 remainder;
1316 while(currentScale > scale) {
1317 uint32_t scaleAdjust =
1318 std::min(Decimal64ColumnReader::MAX_PRECISION_64,
1319 currentScale - scale);
1320 value = value.divide(Decimal64ColumnReader::POWERS_OF_TEN[scaleAdjust],
1321 remainder);
1322 currentScale -= scaleAdjust;
1323 }
1324 }
1325 }
1326
1327 class Decimal128ColumnReader: public Decimal64ColumnReader {
1328 public:
1329 Decimal128ColumnReader(const Type& type, StripeStreams& stipe);
1330 ~Decimal128ColumnReader() override;
1331
1332 void next(ColumnVectorBatch& rowBatch,
1333 uint64_t numValues,
1334 char *notNull) override;
1335
1336 private:
1337 void readInt128(Int128& value, int32_t currentScale) {
1338 value = 0;
1339 Int128 work;
1340 uint32_t offset = 0;
1341 while (true) {
1342 readBuffer();
1343 unsigned char ch = static_cast<unsigned char>(*(buffer++));
1344 work = ch & 0x7f;
1345 work <<= offset;
1346 value |= work;
1347 offset += 7;
1348 if (!(ch & 0x80)) {
1349 break;
1350 }
1351 }
1352 unZigZagInt128(value);
1353 scaleInt128(value, static_cast<uint32_t>(scale),
1354 static_cast<uint32_t>(currentScale));
1355 }
1356 };
1357
1358 Decimal128ColumnReader::Decimal128ColumnReader
1359 (const Type& type,
1360 StripeStreams& stripe
1361 ): Decimal64ColumnReader(type, stripe) {
1362 // PASS
1363 }
1364
1365 Decimal128ColumnReader::~Decimal128ColumnReader() {
1366 // PASS
1367 }
1368
1369 void Decimal128ColumnReader::next(ColumnVectorBatch& rowBatch,
1370 uint64_t numValues,
1371 char *notNull) {
1372 ColumnReader::next(rowBatch, numValues, notNull);
1373 notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
1374 Decimal128VectorBatch &batch =
1375 dynamic_cast<Decimal128VectorBatch&>(rowBatch);
1376 Int128* values = batch.values.data();
1377 // read the next group of scales
1378 int64_t* scaleBuffer = batch.readScales.data();
1379 scaleDecoder->next(scaleBuffer, numValues, notNull);
1380 batch.precision = precision;
1381 batch.scale = scale;
1382 if (notNull) {
1383 for(size_t i=0; i < numValues; ++i) {
1384 if (notNull[i]) {
1385 readInt128(values[i], static_cast<int32_t>(scaleBuffer[i]));
1386 }
1387 }
1388 } else {
1389 for(size_t i=0; i < numValues; ++i) {
1390 readInt128(values[i], static_cast<int32_t>(scaleBuffer[i]));
1391 }
1392 }
1393 }
1394
1395 class DecimalHive11ColumnReader: public Decimal64ColumnReader {
1396 private:
1397 bool throwOnOverflow;
1398 std::ostream* errorStream;
1399
1400 /**
1401 * Read an Int128 from the stream and correct it to the desired scale.
1402 */
1403 bool readInt128(Int128& value, int32_t currentScale) {
1404 // -/+ 99999999999999999999999999999999999999
1405 static const Int128 MIN_VALUE(-0x4b3b4ca85a86c47b, 0xf675ddc000000001);
1406 static const Int128 MAX_VALUE( 0x4b3b4ca85a86c47a, 0x098a223fffffffff);
1407
1408 value = 0;
1409 Int128 work;
1410 uint32_t offset = 0;
1411 bool result = true;
1412 while (true) {
1413 readBuffer();
1414 unsigned char ch = static_cast<unsigned char>(*(buffer++));
1415 work = ch & 0x7f;
1416 // If we have read more than 128 bits, we flag the error, but keep
1417 // reading bytes so the stream isn't thrown off.
1418 if (offset > 128 || (offset == 126 && work > 3)) {
1419 result = false;
1420 }
1421 work <<= offset;
1422 value |= work;
1423 offset += 7;
1424 if (!(ch & 0x80)) {
1425 break;
1426 }
1427 }
1428
1429 if (!result) {
1430 return result;
1431 }
1432 unZigZagInt128(value);
1433 scaleInt128(value, static_cast<uint32_t>(scale),
1434 static_cast<uint32_t>(currentScale));
1435 return value >= MIN_VALUE && value <= MAX_VALUE;
1436 }
1437
1438 public:
1439 DecimalHive11ColumnReader(const Type& type, StripeStreams& stipe);
1440 ~DecimalHive11ColumnReader() override;
1441
1442 void next(ColumnVectorBatch& rowBatch,
1443 uint64_t numValues,
1444 char *notNull) override;
1445 };
1446
1447 DecimalHive11ColumnReader::DecimalHive11ColumnReader
1448 (const Type& type,
1449 StripeStreams& stripe
1450 ): Decimal64ColumnReader(type, stripe) {
1451 scale = stripe.getForcedScaleOnHive11Decimal();
1452 throwOnOverflow = stripe.getThrowOnHive11DecimalOverflow();
1453 errorStream = stripe.getErrorStream();
1454 }
1455
1456 DecimalHive11ColumnReader::~DecimalHive11ColumnReader() {
1457 // PASS
1458 }
1459
1460 void DecimalHive11ColumnReader::next(ColumnVectorBatch& rowBatch,
1461 uint64_t numValues,
1462 char *notNull) {
1463 ColumnReader::next(rowBatch, numValues, notNull);
1464 notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
1465 Decimal128VectorBatch &batch =
1466 dynamic_cast<Decimal128VectorBatch&>(rowBatch);
1467 Int128* values = batch.values.data();
1468 // read the next group of scales
1469 int64_t* scaleBuffer = batch.readScales.data();
1470
1471 scaleDecoder->next(scaleBuffer, numValues, notNull);
1472
1473 batch.precision = precision;
1474 batch.scale = scale;
1475 if (notNull) {
1476 for(size_t i=0; i < numValues; ++i) {
1477 if (notNull[i]) {
1478 if (!readInt128(values[i],
1479 static_cast<int32_t>(scaleBuffer[i]))) {
1480 if (throwOnOverflow) {
1481 throw ParseError("Hive 0.11 decimal was more than 38 digits.");
1482 } else {
1483 *errorStream << "Warning: "
1484 << "Hive 0.11 decimal with more than 38 digits "
1485 << "replaced by NULL.\n";
1486 notNull[i] = false;
1487 }
1488 }
1489 }
1490 }
1491 } else {
1492 for(size_t i=0; i < numValues; ++i) {
1493 if (!readInt128(values[i],
1494 static_cast<int32_t>(scaleBuffer[i]))) {
1495 if (throwOnOverflow) {
1496 throw ParseError("Hive 0.11 decimal was more than 38 digits.");
1497 } else {
1498 *errorStream << "Warning: "
1499 << "Hive 0.11 decimal with more than 38 digits "
1500 << "replaced by NULL.\n";
1501 batch.hasNulls = true;
1502 batch.notNull[i] = false;
1503 }
1504 }
1505 }
1506 }
1507 }
1508
1509 /**
1510 * Create a reader for the given stripe.
1511 */
1512 std::unique_ptr<ColumnReader> buildReader(const Type& type,
1513 StripeStreams& stripe) {
1514 switch (static_cast<int64_t>(type.getKind())) {
1515 case DATE:
1516 case INT:
1517 case LONG:
1518 case SHORT:
1519 return std::unique_ptr<ColumnReader>(
1520 new IntegerColumnReader(type, stripe));
1521 case BINARY:
1522 case CHAR:
1523 case STRING:
1524 case VARCHAR:
1525 switch (static_cast<int64_t>(stripe.getEncoding(type.getColumnId()).kind())){
1526 case proto::ColumnEncoding_Kind_DICTIONARY:
1527 case proto::ColumnEncoding_Kind_DICTIONARY_V2:
1528 return std::unique_ptr<ColumnReader>(
1529 new StringDictionaryColumnReader(type, stripe));
1530 case proto::ColumnEncoding_Kind_DIRECT:
1531 case proto::ColumnEncoding_Kind_DIRECT_V2:
1532 return std::unique_ptr<ColumnReader>(
1533 new StringDirectColumnReader(type, stripe));
1534 default:
1535 throw NotImplementedYet("buildReader unhandled string encoding");
1536 }
1537
1538 case BOOLEAN:
1539 return std::unique_ptr<ColumnReader>(
1540 new BooleanColumnReader(type, stripe));
1541
1542 case BYTE:
1543 return std::unique_ptr<ColumnReader>(
1544 new ByteColumnReader(type, stripe));
1545
1546 case LIST:
1547 return std::unique_ptr<ColumnReader>(
1548 new ListColumnReader(type, stripe));
1549
1550 case MAP:
1551 return std::unique_ptr<ColumnReader>(
1552 new MapColumnReader(type, stripe));
1553
1554 case UNION:
1555 return std::unique_ptr<ColumnReader>(
1556 new UnionColumnReader(type, stripe));
1557
1558 case STRUCT:
1559 return std::unique_ptr<ColumnReader>(
1560 new StructColumnReader(type, stripe));
1561
1562 case FLOAT:
1563 case DOUBLE:
1564 return std::unique_ptr<ColumnReader>(
1565 new DoubleColumnReader(type, stripe));
1566
1567 case TIMESTAMP:
1568 return std::unique_ptr<ColumnReader>
1569 (new TimestampColumnReader(type, stripe));
1570
1571 case DECIMAL:
1572 // is this a Hive 0.11 or 0.12 file?
1573 if (type.getPrecision() == 0) {
1574 return std::unique_ptr<ColumnReader>
1575 (new DecimalHive11ColumnReader(type, stripe));
1576
1577 // can we represent the values using int64_t?
1578 } else if (type.getPrecision() <=
1579 Decimal64ColumnReader::MAX_PRECISION_64) {
1580 return std::unique_ptr<ColumnReader>
1581 (new Decimal64ColumnReader(type, stripe));
1582
1583 // otherwise we use the Int128 implementation
1584 } else {
1585 return std::unique_ptr<ColumnReader>
1586 (new Decimal128ColumnReader(type, stripe));
1587 }
1588
1589 default:
1590 throw NotImplementedYet("buildReader unhandled type");
1591 }
1592 }
1593
1594}
1595