1 | /** |
2 | * Licensed to the Apache Software Foundation (ASF) under one |
3 | * or more contributor license agreements. See the NOTICE file |
4 | * distributed with this work for additional information |
5 | * regarding copyright ownership. The ASF licenses this file |
6 | * to you under the Apache License, Version 2.0 (the |
7 | * "License"); you may not use this file except in compliance |
8 | * with the License. You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, software |
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
15 | * See the License for the specific language governing permissions and |
16 | * limitations under the License. |
17 | */ |
18 | |
19 | #include "orc/Int128.hh" |
20 | |
21 | #include "Adaptor.hh" |
22 | #include "ByteRLE.hh" |
23 | #include "ColumnReader.hh" |
24 | #include "orc/Exceptions.hh" |
25 | #include "RLE.hh" |
26 | |
27 | #include <math.h> |
28 | #include <iostream> |
29 | |
30 | namespace orc { |
31 | |
32 | StripeStreams::~StripeStreams() { |
33 | // PASS |
34 | } |
35 | |
36 | inline RleVersion convertRleVersion(proto::ColumnEncoding_Kind kind) { |
37 | switch (static_cast<int64_t>(kind)) { |
38 | case proto::ColumnEncoding_Kind_DIRECT: |
39 | case proto::ColumnEncoding_Kind_DICTIONARY: |
40 | return RleVersion_1; |
41 | case proto::ColumnEncoding_Kind_DIRECT_V2: |
42 | case proto::ColumnEncoding_Kind_DICTIONARY_V2: |
43 | return RleVersion_2; |
44 | default: |
45 | throw ParseError("Unknown encoding in convertRleVersion" ); |
46 | } |
47 | } |
48 | |
49 | ColumnReader::ColumnReader(const Type& type, |
50 | StripeStreams& stripe |
51 | ): columnId(type.getColumnId()), |
52 | memoryPool(stripe.getMemoryPool()) { |
53 | std::unique_ptr<SeekableInputStream> stream = |
54 | stripe.getStream(columnId, proto::Stream_Kind_PRESENT, true); |
55 | if (stream.get()) { |
56 | notNullDecoder = createBooleanRleDecoder(std::move(stream)); |
57 | } |
58 | } |
59 | |
60 | ColumnReader::~ColumnReader() { |
61 | // PASS |
62 | } |
63 | |
64 | uint64_t ColumnReader::skip(uint64_t numValues) { |
65 | ByteRleDecoder* decoder = notNullDecoder.get(); |
66 | if (decoder) { |
67 | // page through the values that we want to skip |
68 | // and count how many are non-null |
69 | const size_t MAX_BUFFER_SIZE = 32768; |
70 | size_t bufferSize = std::min(MAX_BUFFER_SIZE, |
71 | static_cast<size_t>(numValues)); |
72 | char buffer[MAX_BUFFER_SIZE]; |
73 | uint64_t remaining = numValues; |
74 | while (remaining > 0) { |
75 | uint64_t chunkSize = |
76 | std::min(remaining, |
77 | static_cast<uint64_t>(bufferSize)); |
78 | decoder->next(buffer, chunkSize, nullptr); |
79 | remaining -= chunkSize; |
80 | for(uint64_t i=0; i < chunkSize; ++i) { |
81 | if (!buffer[i]) { |
82 | numValues -= 1; |
83 | } |
84 | } |
85 | } |
86 | } |
87 | return numValues; |
88 | } |
89 | |
90 | void ColumnReader::next(ColumnVectorBatch& rowBatch, |
91 | uint64_t numValues, |
92 | char* incomingMask) { |
93 | if (numValues > rowBatch.capacity) { |
94 | rowBatch.resize(numValues); |
95 | } |
96 | rowBatch.numElements = numValues; |
97 | ByteRleDecoder* decoder = notNullDecoder.get(); |
98 | if (decoder) { |
99 | char* notNullArray = rowBatch.notNull.data(); |
100 | decoder->next(notNullArray, numValues, incomingMask); |
101 | // check to see if there are nulls in this batch |
102 | for(uint64_t i=0; i < numValues; ++i) { |
103 | if (!notNullArray[i]) { |
104 | rowBatch.hasNulls = true; |
105 | return; |
106 | } |
107 | } |
108 | } else if (incomingMask) { |
109 | // If we don't have a notNull stream, copy the incomingMask |
110 | rowBatch.hasNulls = true; |
111 | memcpy(rowBatch.notNull.data(), incomingMask, numValues); |
112 | return; |
113 | } |
114 | rowBatch.hasNulls = false; |
115 | } |
116 | |
117 | /** |
118 | * Expand an array of bytes in place to the corresponding array of longs. |
119 | * Has to work backwards so that they data isn't clobbered during the |
120 | * expansion. |
121 | * @param buffer the array of chars and array of longs that need to be |
122 | * expanded |
123 | * @param numValues the number of bytes to convert to longs |
124 | */ |
125 | void expandBytesToLongs(int64_t* buffer, uint64_t numValues) { |
126 | for(size_t i=numValues - 1; i < numValues; --i) { |
127 | buffer[i] = reinterpret_cast<char *>(buffer)[i]; |
128 | } |
129 | } |
130 | |
131 | class BooleanColumnReader: public ColumnReader { |
132 | private: |
133 | std::unique_ptr<orc::ByteRleDecoder> rle; |
134 | |
135 | public: |
136 | BooleanColumnReader(const Type& type, StripeStreams& stipe); |
137 | ~BooleanColumnReader() override; |
138 | |
139 | uint64_t skip(uint64_t numValues) override; |
140 | |
141 | void next(ColumnVectorBatch& rowBatch, |
142 | uint64_t numValues, |
143 | char* notNull) override; |
144 | }; |
145 | |
146 | BooleanColumnReader::BooleanColumnReader(const Type& type, |
147 | StripeStreams& stripe |
148 | ): ColumnReader(type, stripe){ |
149 | std::unique_ptr<SeekableInputStream> stream = |
150 | stripe.getStream(columnId, proto::Stream_Kind_DATA, true); |
151 | if (stream == nullptr) |
152 | throw ParseError("DATA stream not found in Boolean column" ); |
153 | rle = createBooleanRleDecoder(std::move(stream)); |
154 | } |
155 | |
156 | BooleanColumnReader::~BooleanColumnReader() { |
157 | // PASS |
158 | } |
159 | |
160 | uint64_t BooleanColumnReader::skip(uint64_t numValues) { |
161 | numValues = ColumnReader::skip(numValues); |
162 | rle->skip(numValues); |
163 | return numValues; |
164 | } |
165 | |
166 | void BooleanColumnReader::next(ColumnVectorBatch& rowBatch, |
167 | uint64_t numValues, |
168 | char *notNull) { |
169 | ColumnReader::next(rowBatch, numValues, notNull); |
170 | // Since the byte rle places the output in a char* instead of long*, |
171 | // we cheat here and use the long* and then expand it in a second pass. |
172 | int64_t *ptr = dynamic_cast<LongVectorBatch&>(rowBatch).data.data(); |
173 | rle->next(reinterpret_cast<char*>(ptr), |
174 | numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr); |
175 | expandBytesToLongs(ptr, numValues); |
176 | } |
177 | |
178 | class ByteColumnReader: public ColumnReader { |
179 | private: |
180 | std::unique_ptr<orc::ByteRleDecoder> rle; |
181 | |
182 | public: |
183 | ByteColumnReader(const Type& type, StripeStreams& stipe); |
184 | ~ByteColumnReader() override; |
185 | |
186 | uint64_t skip(uint64_t numValues) override; |
187 | |
188 | void next(ColumnVectorBatch& rowBatch, |
189 | uint64_t numValues, |
190 | char* notNull) override; |
191 | }; |
192 | |
193 | ByteColumnReader::ByteColumnReader(const Type& type, |
194 | StripeStreams& stripe |
195 | ): ColumnReader(type, stripe){ |
196 | std::unique_ptr<SeekableInputStream> stream = |
197 | stripe.getStream(columnId, proto::Stream_Kind_DATA, true); |
198 | if (stream == nullptr) |
199 | throw ParseError("DATA stream not found in Byte column" ); |
200 | rle = createByteRleDecoder(std::move(stream)); |
201 | } |
202 | |
203 | ByteColumnReader::~ByteColumnReader() { |
204 | // PASS |
205 | } |
206 | |
207 | uint64_t ByteColumnReader::skip(uint64_t numValues) { |
208 | numValues = ColumnReader::skip(numValues); |
209 | rle->skip(numValues); |
210 | return numValues; |
211 | } |
212 | |
213 | void ByteColumnReader::next(ColumnVectorBatch& rowBatch, |
214 | uint64_t numValues, |
215 | char *notNull) { |
216 | ColumnReader::next(rowBatch, numValues, notNull); |
217 | // Since the byte rle places the output in a char* instead of long*, |
218 | // we cheat here and use the long* and then expand it in a second pass. |
219 | int64_t *ptr = dynamic_cast<LongVectorBatch&>(rowBatch).data.data(); |
220 | rle->next(reinterpret_cast<char*>(ptr), |
221 | numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr); |
222 | expandBytesToLongs(ptr, numValues); |
223 | } |
224 | |
225 | class IntegerColumnReader: public ColumnReader { |
226 | protected: |
227 | std::unique_ptr<orc::RleDecoder> rle; |
228 | |
229 | public: |
230 | IntegerColumnReader(const Type& type, StripeStreams& stripe); |
231 | ~IntegerColumnReader() override; |
232 | |
233 | uint64_t skip(uint64_t numValues) override; |
234 | |
235 | void next(ColumnVectorBatch& rowBatch, |
236 | uint64_t numValues, |
237 | char* notNull) override; |
238 | }; |
239 | |
240 | IntegerColumnReader::IntegerColumnReader(const Type& type, |
241 | StripeStreams& stripe |
242 | ): ColumnReader(type, stripe) { |
243 | RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind()); |
244 | std::unique_ptr<SeekableInputStream> stream = |
245 | stripe.getStream(columnId, proto::Stream_Kind_DATA, true); |
246 | if (stream == nullptr) |
247 | throw ParseError("DATA stream not found in Integer column" ); |
248 | rle = createRleDecoder(std::move(stream), true, vers, memoryPool); |
249 | } |
250 | |
251 | IntegerColumnReader::~IntegerColumnReader() { |
252 | // PASS |
253 | } |
254 | |
255 | uint64_t IntegerColumnReader::skip(uint64_t numValues) { |
256 | numValues = ColumnReader::skip(numValues); |
257 | rle->skip(numValues); |
258 | return numValues; |
259 | } |
260 | |
261 | void IntegerColumnReader::next(ColumnVectorBatch& rowBatch, |
262 | uint64_t numValues, |
263 | char *notNull) { |
264 | ColumnReader::next(rowBatch, numValues, notNull); |
265 | rle->next(dynamic_cast<LongVectorBatch&>(rowBatch).data.data(), |
266 | numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr); |
267 | } |
268 | |
269 | class TimestampColumnReader: public ColumnReader { |
270 | private: |
271 | std::unique_ptr<orc::RleDecoder> secondsRle; |
272 | std::unique_ptr<orc::RleDecoder> nanoRle; |
273 | const Timezone& writerTimezone; |
274 | const int64_t epochOffset; |
275 | |
276 | public: |
277 | TimestampColumnReader(const Type& type, StripeStreams& stripe); |
278 | ~TimestampColumnReader() override; |
279 | |
280 | uint64_t skip(uint64_t numValues) override; |
281 | |
282 | void next(ColumnVectorBatch& rowBatch, |
283 | uint64_t numValues, |
284 | char* notNull) override; |
285 | }; |
286 | |
287 | |
288 | TimestampColumnReader::TimestampColumnReader(const Type& type, |
289 | StripeStreams& stripe |
290 | ): ColumnReader(type, stripe), |
291 | writerTimezone(stripe.getWriterTimezone()), |
292 | epochOffset(writerTimezone.getEpoch()) { |
293 | RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind()); |
294 | std::unique_ptr<SeekableInputStream> stream = |
295 | stripe.getStream(columnId, proto::Stream_Kind_DATA, true); |
296 | if (stream == nullptr) |
297 | throw ParseError("DATA stream not found in Timestamp column" ); |
298 | secondsRle = createRleDecoder(std::move(stream), true, vers, memoryPool); |
299 | stream = stripe.getStream(columnId, proto::Stream_Kind_SECONDARY, true); |
300 | if (stream == nullptr) |
301 | throw ParseError("SECONDARY stream not found in Timestamp column" ); |
302 | nanoRle = createRleDecoder(std::move(stream), false, vers, memoryPool); |
303 | } |
304 | |
305 | TimestampColumnReader::~TimestampColumnReader() { |
306 | // PASS |
307 | } |
308 | |
309 | uint64_t TimestampColumnReader::skip(uint64_t numValues) { |
310 | numValues = ColumnReader::skip(numValues); |
311 | secondsRle->skip(numValues); |
312 | nanoRle->skip(numValues); |
313 | return numValues; |
314 | } |
315 | |
316 | void TimestampColumnReader::next(ColumnVectorBatch& rowBatch, |
317 | uint64_t numValues, |
318 | char *notNull) { |
319 | ColumnReader::next(rowBatch, numValues, notNull); |
320 | notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; |
321 | TimestampVectorBatch& timestampBatch = |
322 | dynamic_cast<TimestampVectorBatch&>(rowBatch); |
323 | int64_t *secsBuffer = timestampBatch.data.data(); |
324 | secondsRle->next(secsBuffer, numValues, notNull); |
325 | int64_t *nanoBuffer = timestampBatch.nanoseconds.data(); |
326 | nanoRle->next(nanoBuffer, numValues, notNull); |
327 | |
328 | // Construct the values |
329 | for(uint64_t i=0; i < numValues; i++) { |
330 | if (notNull == nullptr || notNull[i]) { |
331 | uint64_t zeros = nanoBuffer[i] & 0x7; |
332 | nanoBuffer[i] >>= 3; |
333 | if (zeros != 0) { |
334 | for(uint64_t j = 0; j <= zeros; ++j) { |
335 | nanoBuffer[i] *= 10; |
336 | } |
337 | } |
338 | int64_t writerTime = secsBuffer[i] + epochOffset; |
339 | secsBuffer[i] = writerTimezone.convertToUTC(writerTime); |
340 | if (secsBuffer[i] < 0 && nanoBuffer[i] != 0) { |
341 | secsBuffer[i] -= 1; |
342 | } |
343 | } |
344 | } |
345 | } |
346 | |
347 | class DoubleColumnReader: public ColumnReader { |
348 | public: |
349 | DoubleColumnReader(const Type& type, StripeStreams& stripe); |
350 | ~DoubleColumnReader() override; |
351 | |
352 | uint64_t skip(uint64_t numValues) override; |
353 | |
354 | void next(ColumnVectorBatch& rowBatch, |
355 | uint64_t numValues, |
356 | char* notNull) override; |
357 | |
358 | private: |
359 | std::unique_ptr<SeekableInputStream> inputStream; |
360 | TypeKind columnKind; |
361 | const uint64_t bytesPerValue ; |
362 | const char *bufferPointer; |
363 | const char *bufferEnd; |
364 | |
365 | unsigned char readByte() { |
366 | if (bufferPointer == bufferEnd) { |
367 | int length; |
368 | if (!inputStream->Next |
369 | (reinterpret_cast<const void**>(&bufferPointer), &length)) { |
370 | throw ParseError("bad read in DoubleColumnReader::next()" ); |
371 | } |
372 | bufferEnd = bufferPointer + length; |
373 | } |
374 | return static_cast<unsigned char>(*(bufferPointer++)); |
375 | } |
376 | |
377 | double readDouble() { |
378 | int64_t bits = 0; |
379 | for (uint64_t i=0; i < 8; i++) { |
380 | bits |= static_cast<int64_t>(readByte()) << (i*8); |
381 | } |
382 | double *result = reinterpret_cast<double*>(&bits); |
383 | return *result; |
384 | } |
385 | |
386 | double readFloat() { |
387 | int32_t bits = 0; |
388 | for (uint64_t i=0; i < 4; i++) { |
389 | bits |= readByte() << (i*8); |
390 | } |
391 | float *result = reinterpret_cast<float*>(&bits); |
392 | return static_cast<double>(*result); |
393 | } |
394 | }; |
395 | |
396 | DoubleColumnReader::DoubleColumnReader(const Type& type, |
397 | StripeStreams& stripe |
398 | ): ColumnReader(type, stripe), |
399 | columnKind(type.getKind()), |
400 | bytesPerValue((type.getKind() == |
401 | FLOAT) ? 4 : 8), |
402 | bufferPointer(nullptr), |
403 | bufferEnd(nullptr) { |
404 | inputStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true); |
405 | if (inputStream == nullptr) |
406 | throw ParseError("DATA stream not found in Double column" ); |
407 | } |
408 | |
409 | DoubleColumnReader::~DoubleColumnReader() { |
410 | // PASS |
411 | } |
412 | |
413 | uint64_t DoubleColumnReader::skip(uint64_t numValues) { |
414 | numValues = ColumnReader::skip(numValues); |
415 | |
416 | if (static_cast<size_t>(bufferEnd - bufferPointer) >= |
417 | bytesPerValue * numValues) { |
418 | bufferPointer += bytesPerValue * numValues; |
419 | } else { |
420 | inputStream->Skip(static_cast<int>(bytesPerValue * numValues - |
421 | static_cast<size_t>(bufferEnd - bufferPointer))); |
422 | bufferEnd = nullptr; |
423 | bufferPointer = nullptr; |
424 | } |
425 | |
426 | return numValues; |
427 | } |
428 | |
429 | void DoubleColumnReader::next(ColumnVectorBatch& rowBatch, |
430 | uint64_t numValues, |
431 | char *notNull) { |
432 | ColumnReader::next(rowBatch, numValues, notNull); |
433 | // update the notNull from the parent class |
434 | notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; |
435 | double* outArray = dynamic_cast<DoubleVectorBatch&>(rowBatch).data.data(); |
436 | |
437 | if (columnKind == FLOAT) { |
438 | if (notNull) { |
439 | for(size_t i=0; i < numValues; ++i) { |
440 | if (notNull[i]) { |
441 | outArray[i] = readFloat(); |
442 | } |
443 | } |
444 | } else { |
445 | for(size_t i=0; i < numValues; ++i) { |
446 | outArray[i] = readFloat(); |
447 | } |
448 | } |
449 | } else { |
450 | if (notNull) { |
451 | for(size_t i=0; i < numValues; ++i) { |
452 | if (notNull[i]) { |
453 | outArray[i] = readDouble(); |
454 | } |
455 | } |
456 | } else { |
457 | for(size_t i=0; i < numValues; ++i) { |
458 | outArray[i] = readDouble(); |
459 | } |
460 | } |
461 | } |
462 | } |
463 | |
464 | void readFully(char* buffer, int64_t bufferSize, SeekableInputStream* stream) { |
465 | int64_t posn = 0; |
466 | while (posn < bufferSize) { |
467 | const void* chunk; |
468 | int length; |
469 | if (!stream->Next(&chunk, &length)) { |
470 | throw ParseError("bad read in readFully" ); |
471 | } |
472 | if (posn + length > bufferSize) { |
473 | throw ParseError("Corrupt dictionary blob in StringDictionaryColumn" ); |
474 | } |
475 | memcpy(buffer + posn, chunk, static_cast<size_t>(length)); |
476 | posn += length; |
477 | } |
478 | } |
479 | |
480 | class StringDictionaryColumnReader: public ColumnReader { |
481 | private: |
482 | DataBuffer<char> dictionaryBlob; |
483 | DataBuffer<int64_t> dictionaryOffset; |
484 | std::unique_ptr<RleDecoder> rle; |
485 | uint64_t dictionaryCount; |
486 | |
487 | public: |
488 | StringDictionaryColumnReader(const Type& type, StripeStreams& stipe); |
489 | ~StringDictionaryColumnReader() override; |
490 | |
491 | uint64_t skip(uint64_t numValues) override; |
492 | |
493 | void next(ColumnVectorBatch& rowBatch, |
494 | uint64_t numValues, |
495 | char *notNull) override; |
496 | }; |
497 | |
498 | StringDictionaryColumnReader::StringDictionaryColumnReader |
499 | (const Type& type, |
500 | StripeStreams& stripe |
501 | ): ColumnReader(type, stripe), |
502 | dictionaryBlob(stripe.getMemoryPool()), |
503 | dictionaryOffset(stripe.getMemoryPool()) { |
504 | RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId) |
505 | .kind()); |
506 | dictionaryCount = stripe.getEncoding(columnId).dictionarysize(); |
507 | std::unique_ptr<SeekableInputStream> stream = |
508 | stripe.getStream(columnId, proto::Stream_Kind_DATA, true); |
509 | if (stream == nullptr) |
510 | throw ParseError("DATA stream not found in StringDictionaryColumn" ); |
511 | rle = createRleDecoder(std::move(stream), false, rleVersion, memoryPool); |
512 | stream = stripe.getStream(columnId, proto::Stream_Kind_LENGTH, false); |
513 | if (dictionaryCount > 0 && stream == nullptr) { |
514 | throw ParseError("LENGTH stream not found in StringDictionaryColumn" ); |
515 | } |
516 | std::unique_ptr<RleDecoder> lengthDecoder = |
517 | createRleDecoder(std::move(stream), false, rleVersion, memoryPool); |
518 | dictionaryOffset.resize(dictionaryCount+1); |
519 | int64_t* lengthArray = dictionaryOffset.data(); |
520 | lengthDecoder->next(lengthArray + 1, dictionaryCount, nullptr); |
521 | lengthArray[0] = 0; |
522 | for (uint64_t i = 1; i < dictionaryCount + 1; ++i) { |
523 | if (lengthArray[i] < 0) |
524 | throw ParseError("Negative dictionary entry length" ); |
525 | lengthArray[i] += lengthArray[i - 1]; |
526 | } |
527 | int64_t blobSize = lengthArray[dictionaryCount]; |
528 | dictionaryBlob.resize(static_cast<uint64_t>(blobSize)); |
529 | std::unique_ptr<SeekableInputStream> blobStream = |
530 | stripe.getStream(columnId, proto::Stream_Kind_DICTIONARY_DATA, false); |
531 | if (blobSize > 0 && blobStream == nullptr) { |
532 | throw ParseError( |
533 | "DICTIONARY_DATA stream not found in StringDictionaryColumn" ); |
534 | } |
535 | readFully(dictionaryBlob.data(), blobSize, blobStream.get()); |
536 | } |
537 | |
538 | StringDictionaryColumnReader::~StringDictionaryColumnReader() { |
539 | // PASS |
540 | } |
541 | |
542 | uint64_t StringDictionaryColumnReader::skip(uint64_t numValues) { |
543 | numValues = ColumnReader::skip(numValues); |
544 | rle->skip(numValues); |
545 | return numValues; |
546 | } |
547 | |
548 | void StringDictionaryColumnReader::next(ColumnVectorBatch& rowBatch, |
549 | uint64_t numValues, |
550 | char *notNull) { |
551 | ColumnReader::next(rowBatch, numValues, notNull); |
552 | // update the notNull from the parent class |
553 | notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; |
554 | StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch); |
555 | char *blob = dictionaryBlob.data(); |
556 | int64_t *dictionaryOffsets = dictionaryOffset.data(); |
557 | char **outputStarts = byteBatch.data.data(); |
558 | int64_t *outputLengths = byteBatch.length.data(); |
559 | rle->next(outputLengths, numValues, notNull); |
560 | if (notNull) { |
561 | for(uint64_t i=0; i < numValues; ++i) { |
562 | if (notNull[i]) { |
563 | int64_t entry = outputLengths[i]; |
564 | if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount) { |
565 | throw ParseError("Entry index out of range in StringDictionaryColumn" ); |
566 | } |
567 | outputStarts[i] = blob + dictionaryOffsets[entry]; |
568 | outputLengths[i] = dictionaryOffsets[entry+1] - |
569 | dictionaryOffsets[entry]; |
570 | } |
571 | } |
572 | } else { |
573 | for(uint64_t i=0; i < numValues; ++i) { |
574 | int64_t entry = outputLengths[i]; |
575 | if (entry < 0 || static_cast<uint64_t>(entry) >= dictionaryCount) { |
576 | throw ParseError("Entry index out of range in StringDictionaryColumn" ); |
577 | } |
578 | outputStarts[i] = blob + dictionaryOffsets[entry]; |
579 | outputLengths[i] = dictionaryOffsets[entry+1] - |
580 | dictionaryOffsets[entry]; |
581 | } |
582 | } |
583 | } |
584 | |
585 | class StringDirectColumnReader: public ColumnReader { |
586 | private: |
587 | DataBuffer<char> blobBuffer; |
588 | std::unique_ptr<RleDecoder> lengthRle; |
589 | std::unique_ptr<SeekableInputStream> blobStream; |
590 | const char *lastBuffer; |
591 | size_t lastBufferLength; |
592 | |
593 | /** |
594 | * Compute the total length of the values. |
595 | * @param lengths the array of lengths |
596 | * @param notNull the array of notNull flags |
597 | * @param numValues the lengths of the arrays |
598 | * @return the total number of bytes for the non-null values |
599 | */ |
600 | size_t computeSize(const int64_t *lengths, const char *notNull, |
601 | uint64_t numValues); |
602 | |
603 | public: |
604 | StringDirectColumnReader(const Type& type, StripeStreams& stipe); |
605 | ~StringDirectColumnReader() override; |
606 | |
607 | uint64_t skip(uint64_t numValues) override; |
608 | |
609 | void next(ColumnVectorBatch& rowBatch, |
610 | uint64_t numValues, |
611 | char *notNull) override; |
612 | }; |
613 | |
614 | StringDirectColumnReader::StringDirectColumnReader |
615 | (const Type& type, |
616 | StripeStreams& stripe |
617 | ): ColumnReader(type, stripe), |
618 | blobBuffer(stripe.getMemoryPool()) { |
619 | RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId) |
620 | .kind()); |
621 | std::unique_ptr<SeekableInputStream> stream = |
622 | stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true); |
623 | if (stream == nullptr) |
624 | throw ParseError("LENGTH stream not found in StringDirectColumn" ); |
625 | lengthRle = createRleDecoder( |
626 | std::move(stream), false, rleVersion, memoryPool); |
627 | blobStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true); |
628 | if (blobStream == nullptr) |
629 | throw ParseError("DATA stream not found in StringDirectColumn" ); |
630 | lastBuffer = nullptr; |
631 | lastBufferLength = 0; |
632 | } |
633 | |
634 | StringDirectColumnReader::~StringDirectColumnReader() { |
635 | // PASS |
636 | } |
637 | |
638 | uint64_t StringDirectColumnReader::skip(uint64_t numValues) { |
639 | const size_t BUFFER_SIZE = 1024; |
640 | numValues = ColumnReader::skip(numValues); |
641 | int64_t buffer[BUFFER_SIZE]; |
642 | uint64_t done = 0; |
643 | size_t totalBytes = 0; |
644 | // read the lengths, so we know haw many bytes to skip |
645 | while (done < numValues) { |
646 | uint64_t step = std::min(BUFFER_SIZE, |
647 | static_cast<size_t>(numValues - done)); |
648 | lengthRle->next(buffer, step, nullptr); |
649 | totalBytes += computeSize(buffer, nullptr, step); |
650 | done += step; |
651 | } |
652 | if (totalBytes <= lastBufferLength) { |
653 | // subtract the needed bytes from the ones left over |
654 | lastBufferLength -= totalBytes; |
655 | lastBuffer += totalBytes; |
656 | } else { |
657 | // move the stream forward after accounting for the buffered bytes |
658 | totalBytes -= lastBufferLength; |
659 | blobStream->Skip(static_cast<int>(totalBytes)); |
660 | lastBufferLength = 0; |
661 | lastBuffer = nullptr; |
662 | } |
663 | return numValues; |
664 | } |
665 | |
666 | size_t StringDirectColumnReader::computeSize(const int64_t* lengths, |
667 | const char* notNull, |
668 | uint64_t numValues) { |
669 | size_t totalLength = 0; |
670 | if (notNull) { |
671 | for(size_t i=0; i < numValues; ++i) { |
672 | if (notNull[i]) { |
673 | totalLength += static_cast<size_t>(lengths[i]); |
674 | } |
675 | } |
676 | } else { |
677 | for(size_t i=0; i < numValues; ++i) { |
678 | totalLength += static_cast<size_t>(lengths[i]); |
679 | } |
680 | } |
681 | return totalLength; |
682 | } |
683 | |
684 | void StringDirectColumnReader::next(ColumnVectorBatch& rowBatch, |
685 | uint64_t numValues, |
686 | char *notNull) { |
687 | ColumnReader::next(rowBatch, numValues, notNull); |
688 | // update the notNull from the parent class |
689 | notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; |
690 | StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch); |
691 | char **startPtr = byteBatch.data.data(); |
692 | int64_t *lengthPtr = byteBatch.length.data(); |
693 | |
694 | // read the length vector |
695 | lengthRle->next(lengthPtr, numValues, notNull); |
696 | |
697 | // figure out the total length of data we need from the blob stream |
698 | const size_t totalLength = computeSize(lengthPtr, notNull, numValues); |
699 | |
700 | // Load data from the blob stream into our buffer until we have enough |
701 | // to get the rest directly out of the stream's buffer. |
702 | size_t bytesBuffered = 0; |
703 | blobBuffer.resize(totalLength); |
704 | char *ptr= blobBuffer.data(); |
705 | while (bytesBuffered + lastBufferLength < totalLength) { |
706 | blobBuffer.resize(bytesBuffered + lastBufferLength); |
707 | memcpy(ptr + bytesBuffered, lastBuffer, lastBufferLength); |
708 | bytesBuffered += lastBufferLength; |
709 | const void* readBuffer; |
710 | int readLength; |
711 | if (!blobStream->Next(&readBuffer, &readLength)) { |
712 | throw ParseError("failed to read in StringDirectColumnReader.next" ); |
713 | } |
714 | lastBuffer = static_cast<const char*>(readBuffer); |
715 | lastBufferLength = static_cast<size_t>(readLength); |
716 | } |
717 | |
718 | // Set up the start pointers for the ones that will come out of the buffer. |
719 | size_t filledSlots = 0; |
720 | size_t usedBytes = 0; |
721 | ptr = blobBuffer.data(); |
722 | if (notNull) { |
723 | while (filledSlots < numValues && |
724 | (!notNull[filledSlots] || |
725 | usedBytes + static_cast<size_t>(lengthPtr[filledSlots]) <= |
726 | bytesBuffered)) { |
727 | if (notNull[filledSlots]) { |
728 | startPtr[filledSlots] = ptr + usedBytes; |
729 | usedBytes += static_cast<size_t>(lengthPtr[filledSlots]); |
730 | } |
731 | filledSlots += 1; |
732 | } |
733 | } else { |
734 | while (filledSlots < numValues && |
735 | (usedBytes + static_cast<size_t>(lengthPtr[filledSlots]) <= |
736 | bytesBuffered)) { |
737 | startPtr[filledSlots] = ptr + usedBytes; |
738 | usedBytes += static_cast<size_t>(lengthPtr[filledSlots]); |
739 | filledSlots += 1; |
740 | } |
741 | } |
742 | |
743 | // do we need to complete the last value in the blob buffer? |
744 | if (usedBytes < bytesBuffered) { |
745 | size_t moreBytes = static_cast<size_t>(lengthPtr[filledSlots]) - |
746 | (bytesBuffered - usedBytes); |
747 | blobBuffer.resize(bytesBuffered + moreBytes); |
748 | ptr = blobBuffer.data(); |
749 | memcpy(ptr + bytesBuffered, lastBuffer, moreBytes); |
750 | lastBuffer += moreBytes; |
751 | lastBufferLength -= moreBytes; |
752 | startPtr[filledSlots++] = ptr + usedBytes; |
753 | } |
754 | |
755 | // Finally, set up any remaining entries into the stream buffer |
756 | if (notNull) { |
757 | while (filledSlots < numValues) { |
758 | if (notNull[filledSlots]) { |
759 | startPtr[filledSlots] = const_cast<char*>(lastBuffer); |
760 | lastBuffer += lengthPtr[filledSlots]; |
761 | lastBufferLength -= static_cast<size_t>(lengthPtr[filledSlots]); |
762 | } |
763 | filledSlots += 1; |
764 | } |
765 | } else { |
766 | while (filledSlots < numValues) { |
767 | startPtr[filledSlots] = const_cast<char*>(lastBuffer); |
768 | lastBuffer += lengthPtr[filledSlots]; |
769 | lastBufferLength -= static_cast<size_t>(lengthPtr[filledSlots]); |
770 | filledSlots += 1; |
771 | } |
772 | } |
773 | } |
774 | |
775 | class StructColumnReader: public ColumnReader { |
776 | private: |
777 | std::vector<ColumnReader*> children; |
778 | |
779 | public: |
780 | StructColumnReader(const Type& type, StripeStreams& stipe); |
781 | ~StructColumnReader() override; |
782 | |
783 | uint64_t skip(uint64_t numValues) override; |
784 | |
785 | void next(ColumnVectorBatch& rowBatch, |
786 | uint64_t numValues, |
787 | char *notNull) override; |
788 | }; |
789 | |
790 | StructColumnReader::StructColumnReader(const Type& type, |
791 | StripeStreams& stripe |
792 | ): ColumnReader(type, stripe) { |
793 | // count the number of selected sub-columns |
794 | const std::vector<bool> selectedColumns = stripe.getSelectedColumns(); |
795 | switch (static_cast<int64_t>(stripe.getEncoding(columnId).kind())) { |
796 | case proto::ColumnEncoding_Kind_DIRECT: |
797 | for(unsigned int i=0; i < type.getSubtypeCount(); ++i) { |
798 | const Type& child = *type.getSubtype(i); |
799 | if (selectedColumns[static_cast<uint64_t>(child.getColumnId())]) { |
800 | children.push_back(buildReader(child, stripe).release()); |
801 | } |
802 | } |
803 | break; |
804 | case proto::ColumnEncoding_Kind_DIRECT_V2: |
805 | case proto::ColumnEncoding_Kind_DICTIONARY: |
806 | case proto::ColumnEncoding_Kind_DICTIONARY_V2: |
807 | default: |
808 | throw ParseError("Unknown encoding for StructColumnReader" ); |
809 | } |
810 | } |
811 | |
812 | StructColumnReader::~StructColumnReader() { |
813 | for (size_t i=0; i<children.size(); i++) { |
814 | delete children[i]; |
815 | } |
816 | } |
817 | |
818 | uint64_t StructColumnReader::skip(uint64_t numValues) { |
819 | numValues = ColumnReader::skip(numValues); |
820 | for(std::vector<ColumnReader*>::iterator ptr=children.begin(); ptr != children.end(); ++ptr) { |
821 | (*ptr)->skip(numValues); |
822 | } |
823 | return numValues; |
824 | } |
825 | |
826 | void StructColumnReader::next(ColumnVectorBatch& rowBatch, |
827 | uint64_t numValues, |
828 | char *notNull) { |
829 | ColumnReader::next(rowBatch, numValues, notNull); |
830 | uint64_t i=0; |
831 | notNull = rowBatch.hasNulls? rowBatch.notNull.data() : nullptr; |
832 | for(std::vector<ColumnReader*>::iterator ptr=children.begin(); |
833 | ptr != children.end(); ++ptr, ++i) { |
834 | (*ptr)->next(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]), |
835 | numValues, notNull); |
836 | } |
837 | } |
838 | |
839 | class ListColumnReader: public ColumnReader { |
840 | private: |
841 | std::unique_ptr<ColumnReader> child; |
842 | std::unique_ptr<RleDecoder> rle; |
843 | |
844 | public: |
845 | ListColumnReader(const Type& type, StripeStreams& stipe); |
846 | ~ListColumnReader() override; |
847 | |
848 | uint64_t skip(uint64_t numValues) override; |
849 | |
850 | void next(ColumnVectorBatch& rowBatch, |
851 | uint64_t numValues, |
852 | char *notNull) override; |
853 | }; |
854 | |
855 | ListColumnReader::ListColumnReader(const Type& type, |
856 | StripeStreams& stripe |
857 | ): ColumnReader(type, stripe) { |
858 | // count the number of selected sub-columns |
859 | const std::vector<bool> selectedColumns = stripe.getSelectedColumns(); |
860 | RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind()); |
861 | std::unique_ptr<SeekableInputStream> stream = |
862 | stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true); |
863 | if (stream == nullptr) |
864 | throw ParseError("LENGTH stream not found in List column" ); |
865 | rle = createRleDecoder(std::move(stream), false, vers, memoryPool); |
866 | const Type& childType = *type.getSubtype(0); |
867 | if (selectedColumns[static_cast<uint64_t>(childType.getColumnId())]) { |
868 | child = buildReader(childType, stripe); |
869 | } |
870 | } |
871 | |
872 | ListColumnReader::~ListColumnReader() { |
873 | // PASS |
874 | } |
875 | |
876 | uint64_t ListColumnReader::skip(uint64_t numValues) { |
877 | numValues = ColumnReader::skip(numValues); |
878 | ColumnReader *childReader = child.get(); |
879 | if (childReader) { |
880 | const uint64_t BUFFER_SIZE = 1024; |
881 | int64_t buffer[BUFFER_SIZE]; |
882 | uint64_t childrenElements = 0; |
883 | uint64_t lengthsRead = 0; |
884 | while (lengthsRead < numValues) { |
885 | uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE); |
886 | rle->next(buffer, chunk, nullptr); |
887 | for(size_t i=0; i < chunk; ++i) { |
888 | childrenElements += static_cast<size_t>(buffer[i]); |
889 | } |
890 | lengthsRead += chunk; |
891 | } |
892 | childReader->skip(childrenElements); |
893 | } else { |
894 | rle->skip(numValues); |
895 | } |
896 | return numValues; |
897 | } |
898 | |
899 | void ListColumnReader::next(ColumnVectorBatch& rowBatch, |
900 | uint64_t numValues, |
901 | char *notNull) { |
902 | ColumnReader::next(rowBatch, numValues, notNull); |
903 | ListVectorBatch &listBatch = dynamic_cast<ListVectorBatch&>(rowBatch); |
904 | int64_t* offsets = listBatch.offsets.data(); |
905 | notNull = listBatch.hasNulls ? listBatch.notNull.data() : nullptr; |
906 | rle->next(offsets, numValues, notNull); |
907 | uint64_t totalChildren = 0; |
908 | if (notNull) { |
909 | for(size_t i=0; i < numValues; ++i) { |
910 | if (notNull[i]) { |
911 | uint64_t tmp = static_cast<uint64_t>(offsets[i]); |
912 | offsets[i] = static_cast<int64_t>(totalChildren); |
913 | totalChildren += tmp; |
914 | } else { |
915 | offsets[i] = static_cast<int64_t>(totalChildren); |
916 | } |
917 | } |
918 | } else { |
919 | for(size_t i=0; i < numValues; ++i) { |
920 | uint64_t tmp = static_cast<uint64_t>(offsets[i]); |
921 | offsets[i] = static_cast<int64_t>(totalChildren); |
922 | totalChildren += tmp; |
923 | } |
924 | } |
925 | offsets[numValues] = static_cast<int64_t>(totalChildren); |
926 | ColumnReader *childReader = child.get(); |
927 | if (childReader) { |
928 | childReader->next(*(listBatch.elements.get()), totalChildren, nullptr); |
929 | } |
930 | } |
931 | |
932 | class MapColumnReader: public ColumnReader { |
933 | private: |
934 | std::unique_ptr<ColumnReader> keyReader; |
935 | std::unique_ptr<ColumnReader> elementReader; |
936 | std::unique_ptr<RleDecoder> rle; |
937 | |
938 | public: |
939 | MapColumnReader(const Type& type, StripeStreams& stipe); |
940 | ~MapColumnReader() override; |
941 | |
942 | uint64_t skip(uint64_t numValues) override; |
943 | |
944 | void next(ColumnVectorBatch& rowBatch, |
945 | uint64_t numValues, |
946 | char *notNull) override; |
947 | }; |
948 | |
949 | MapColumnReader::MapColumnReader(const Type& type, |
950 | StripeStreams& stripe |
951 | ): ColumnReader(type, stripe) { |
952 | // Determine if the key and/or value columns are selected |
953 | const std::vector<bool> selectedColumns = stripe.getSelectedColumns(); |
954 | RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind()); |
955 | std::unique_ptr<SeekableInputStream> stream = |
956 | stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true); |
957 | if (stream == nullptr) |
958 | throw ParseError("LENGTH stream not found in Map column" ); |
959 | rle = createRleDecoder(std::move(stream), false, vers, memoryPool); |
960 | const Type& keyType = *type.getSubtype(0); |
961 | if (selectedColumns[static_cast<uint64_t>(keyType.getColumnId())]) { |
962 | keyReader = buildReader(keyType, stripe); |
963 | } |
964 | const Type& elementType = *type.getSubtype(1); |
965 | if (selectedColumns[static_cast<uint64_t>(elementType.getColumnId())]) { |
966 | elementReader = buildReader(elementType, stripe); |
967 | } |
968 | } |
969 | |
970 | MapColumnReader::~MapColumnReader() { |
971 | // PASS |
972 | } |
973 | |
974 | uint64_t MapColumnReader::skip(uint64_t numValues) { |
975 | numValues = ColumnReader::skip(numValues); |
976 | ColumnReader *rawKeyReader = keyReader.get(); |
977 | ColumnReader *rawElementReader = elementReader.get(); |
978 | if (rawKeyReader || rawElementReader) { |
979 | const uint64_t BUFFER_SIZE = 1024; |
980 | int64_t buffer[BUFFER_SIZE]; |
981 | uint64_t childrenElements = 0; |
982 | uint64_t lengthsRead = 0; |
983 | while (lengthsRead < numValues) { |
984 | uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE); |
985 | rle->next(buffer, chunk, nullptr); |
986 | for(size_t i=0; i < chunk; ++i) { |
987 | childrenElements += static_cast<size_t>(buffer[i]); |
988 | } |
989 | lengthsRead += chunk; |
990 | } |
991 | if (rawKeyReader) { |
992 | rawKeyReader->skip(childrenElements); |
993 | } |
994 | if (rawElementReader) { |
995 | rawElementReader->skip(childrenElements); |
996 | } |
997 | } else { |
998 | rle->skip(numValues); |
999 | } |
1000 | return numValues; |
1001 | } |
1002 | |
1003 | void MapColumnReader::next(ColumnVectorBatch& rowBatch, |
1004 | uint64_t numValues, |
1005 | char *notNull) { |
1006 | ColumnReader::next(rowBatch, numValues, notNull); |
1007 | MapVectorBatch &mapBatch = dynamic_cast<MapVectorBatch&>(rowBatch); |
1008 | int64_t* offsets = mapBatch.offsets.data(); |
1009 | notNull = mapBatch.hasNulls ? mapBatch.notNull.data() : nullptr; |
1010 | rle->next(offsets, numValues, notNull); |
1011 | uint64_t totalChildren = 0; |
1012 | if (notNull) { |
1013 | for(size_t i=0; i < numValues; ++i) { |
1014 | if (notNull[i]) { |
1015 | uint64_t tmp = static_cast<uint64_t>(offsets[i]); |
1016 | offsets[i] = static_cast<int64_t>(totalChildren); |
1017 | totalChildren += tmp; |
1018 | } else { |
1019 | offsets[i] = static_cast<int64_t>(totalChildren); |
1020 | } |
1021 | } |
1022 | } else { |
1023 | for(size_t i=0; i < numValues; ++i) { |
1024 | uint64_t tmp = static_cast<uint64_t>(offsets[i]); |
1025 | offsets[i] = static_cast<int64_t>(totalChildren); |
1026 | totalChildren += tmp; |
1027 | } |
1028 | } |
1029 | offsets[numValues] = static_cast<int64_t>(totalChildren); |
1030 | ColumnReader *rawKeyReader = keyReader.get(); |
1031 | if (rawKeyReader) { |
1032 | rawKeyReader->next(*(mapBatch.keys.get()), totalChildren, nullptr); |
1033 | } |
1034 | ColumnReader *rawElementReader = elementReader.get(); |
1035 | if (rawElementReader) { |
1036 | rawElementReader->next(*(mapBatch.elements.get()), totalChildren, nullptr); |
1037 | } |
1038 | } |
1039 | |
1040 | class UnionColumnReader: public ColumnReader { |
1041 | private: |
1042 | std::unique_ptr<ByteRleDecoder> rle; |
1043 | std::vector<ColumnReader*> childrenReader; |
1044 | std::vector<int64_t> childrenCounts; |
1045 | uint64_t numChildren; |
1046 | |
1047 | public: |
1048 | UnionColumnReader(const Type& type, StripeStreams& stipe); |
1049 | ~UnionColumnReader() override; |
1050 | |
1051 | uint64_t skip(uint64_t numValues) override; |
1052 | |
1053 | void next(ColumnVectorBatch& rowBatch, |
1054 | uint64_t numValues, |
1055 | char *notNull) override; |
1056 | }; |
1057 | |
1058 | UnionColumnReader::UnionColumnReader(const Type& type, |
1059 | StripeStreams& stripe |
1060 | ): ColumnReader(type, stripe) { |
1061 | numChildren = type.getSubtypeCount(); |
1062 | childrenReader.resize(numChildren); |
1063 | childrenCounts.resize(numChildren); |
1064 | |
1065 | std::unique_ptr<SeekableInputStream> stream = |
1066 | stripe.getStream(columnId, proto::Stream_Kind_DATA, true); |
1067 | if (stream == nullptr) |
1068 | throw ParseError("LENGTH stream not found in Union column" ); |
1069 | rle = createByteRleDecoder(std::move(stream)); |
1070 | // figure out which types are selected |
1071 | const std::vector<bool> selectedColumns = stripe.getSelectedColumns(); |
1072 | for(unsigned int i=0; i < numChildren; ++i) { |
1073 | const Type &child = *type.getSubtype(i); |
1074 | if (selectedColumns[static_cast<size_t>(child.getColumnId())]) { |
1075 | childrenReader[i] = buildReader(child, stripe).release(); |
1076 | } |
1077 | } |
1078 | } |
1079 | |
1080 | UnionColumnReader::~UnionColumnReader() { |
1081 | for(std::vector<ColumnReader*>::iterator itr = childrenReader.begin(); |
1082 | itr != childrenReader.end(); ++itr) { |
1083 | delete *itr; |
1084 | } |
1085 | } |
1086 | |
1087 | uint64_t UnionColumnReader::skip(uint64_t numValues) { |
1088 | numValues = ColumnReader::skip(numValues); |
1089 | const uint64_t BUFFER_SIZE = 1024; |
1090 | char buffer[BUFFER_SIZE]; |
1091 | uint64_t lengthsRead = 0; |
1092 | int64_t *counts = childrenCounts.data(); |
1093 | memset(counts, 0, sizeof(int64_t) * numChildren); |
1094 | while (lengthsRead < numValues) { |
1095 | uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE); |
1096 | rle->next(buffer, chunk, nullptr); |
1097 | for(size_t i=0; i < chunk; ++i) { |
1098 | counts[static_cast<size_t>(buffer[i])] += 1; |
1099 | } |
1100 | lengthsRead += chunk; |
1101 | } |
1102 | for(size_t i=0; i < numChildren; ++i) { |
1103 | if (counts[i] != 0 && childrenReader[i] != nullptr) { |
1104 | childrenReader[i]->skip(static_cast<uint64_t>(counts[i])); |
1105 | } |
1106 | } |
1107 | return numValues; |
1108 | } |
1109 | |
1110 | void UnionColumnReader::next(ColumnVectorBatch& rowBatch, |
1111 | uint64_t numValues, |
1112 | char *notNull) { |
1113 | ColumnReader::next(rowBatch, numValues, notNull); |
1114 | UnionVectorBatch &unionBatch = dynamic_cast<UnionVectorBatch&>(rowBatch); |
1115 | uint64_t* offsets = unionBatch.offsets.data(); |
1116 | int64_t* counts = childrenCounts.data(); |
1117 | memset(counts, 0, sizeof(int64_t) * numChildren); |
1118 | unsigned char* tags = unionBatch.tags.data(); |
1119 | notNull = unionBatch.hasNulls ? unionBatch.notNull.data() : nullptr; |
1120 | rle->next(reinterpret_cast<char *>(tags), numValues, notNull); |
1121 | // set the offsets for each row |
1122 | if (notNull) { |
1123 | for(size_t i=0; i < numValues; ++i) { |
1124 | if (notNull[i]) { |
1125 | offsets[i] = |
1126 | static_cast<uint64_t>(counts[static_cast<size_t>(tags[i])]++); |
1127 | } |
1128 | } |
1129 | } else { |
1130 | for(size_t i=0; i < numValues; ++i) { |
1131 | offsets[i] = |
1132 | static_cast<uint64_t>(counts[static_cast<size_t>(tags[i])]++); |
1133 | } |
1134 | } |
1135 | // read the right number of each child column |
1136 | for(size_t i=0; i < numChildren; ++i) { |
1137 | if (childrenReader[i] != nullptr) { |
1138 | childrenReader[i]->next(*(unionBatch.children[i]), |
1139 | static_cast<uint64_t>(counts[i]), nullptr); |
1140 | } |
1141 | } |
1142 | } |
1143 | |
1144 | /** |
1145 | * Destructively convert the number from zigzag encoding to the |
1146 | * natural signed representation. |
1147 | */ |
1148 | void unZigZagInt128(Int128& value) { |
1149 | bool needsNegate = value.getLowBits() & 1; |
1150 | value >>= 1; |
1151 | if (needsNegate) { |
1152 | value.negate(); |
1153 | value -= 1; |
1154 | } |
1155 | } |
1156 | |
1157 | class Decimal64ColumnReader: public ColumnReader { |
1158 | public: |
1159 | static const uint32_t MAX_PRECISION_64 = 18; |
1160 | static const uint32_t MAX_PRECISION_128 = 38; |
1161 | static const int64_t POWERS_OF_TEN[MAX_PRECISION_64 + 1]; |
1162 | |
1163 | protected: |
1164 | std::unique_ptr<SeekableInputStream> valueStream; |
1165 | int32_t precision; |
1166 | int32_t scale; |
1167 | const char* buffer; |
1168 | const char* bufferEnd; |
1169 | |
1170 | std::unique_ptr<RleDecoder> scaleDecoder; |
1171 | |
1172 | /** |
1173 | * Read the valueStream for more bytes. |
1174 | */ |
1175 | void readBuffer() { |
1176 | while (buffer == bufferEnd) { |
1177 | int length; |
1178 | if (!valueStream->Next(reinterpret_cast<const void**>(&buffer), |
1179 | &length)) { |
1180 | throw ParseError("Read past end of stream in Decimal64ColumnReader " + |
1181 | valueStream->getName()); |
1182 | } |
1183 | bufferEnd = buffer + length; |
1184 | } |
1185 | } |
1186 | |
1187 | void readInt64(int64_t& value, int32_t currentScale) { |
1188 | value = 0; |
1189 | size_t offset = 0; |
1190 | while (true) { |
1191 | readBuffer(); |
1192 | unsigned char ch = static_cast<unsigned char>(*(buffer++)); |
1193 | value |= static_cast<uint64_t>(ch & 0x7f) << offset; |
1194 | offset += 7; |
1195 | if (!(ch & 0x80)) { |
1196 | break; |
1197 | } |
1198 | } |
1199 | value = unZigZag(static_cast<uint64_t>(value)); |
1200 | if (scale > currentScale && |
1201 | static_cast<uint64_t>(scale - currentScale) <= MAX_PRECISION_64) { |
1202 | value *= POWERS_OF_TEN[scale - currentScale]; |
1203 | } else if (scale < currentScale && |
1204 | static_cast<uint64_t>(currentScale - scale) <= MAX_PRECISION_64) { |
1205 | value /= POWERS_OF_TEN[currentScale - scale]; |
1206 | } else if (scale != currentScale) { |
1207 | throw ParseError("Decimal scale out of range" ); |
1208 | } |
1209 | } |
1210 | |
1211 | public: |
1212 | Decimal64ColumnReader(const Type& type, StripeStreams& stipe); |
1213 | ~Decimal64ColumnReader() override; |
1214 | |
1215 | uint64_t skip(uint64_t numValues) override; |
1216 | |
1217 | void next(ColumnVectorBatch& rowBatch, |
1218 | uint64_t numValues, |
1219 | char *notNull) override; |
1220 | }; |
1221 | const uint32_t Decimal64ColumnReader::MAX_PRECISION_64; |
1222 | const uint32_t Decimal64ColumnReader::MAX_PRECISION_128; |
1223 | const int64_t Decimal64ColumnReader::POWERS_OF_TEN[MAX_PRECISION_64 + 1]= |
1224 | {1, |
1225 | 10, |
1226 | 100, |
1227 | 1000, |
1228 | 10000, |
1229 | 100000, |
1230 | 1000000, |
1231 | 10000000, |
1232 | 100000000, |
1233 | 1000000000, |
1234 | 10000000000, |
1235 | 100000000000, |
1236 | 1000000000000, |
1237 | 10000000000000, |
1238 | 100000000000000, |
1239 | 1000000000000000, |
1240 | 10000000000000000, |
1241 | 100000000000000000, |
1242 | 1000000000000000000}; |
1243 | |
1244 | Decimal64ColumnReader::Decimal64ColumnReader(const Type& type, |
1245 | StripeStreams& stripe |
1246 | ): ColumnReader(type, stripe) { |
1247 | scale = static_cast<int32_t>(type.getScale()); |
1248 | precision = static_cast<int32_t>(type.getPrecision()); |
1249 | valueStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true); |
1250 | if (valueStream == nullptr) |
1251 | throw ParseError("DATA stream not found in Decimal64Column" ); |
1252 | buffer = nullptr; |
1253 | bufferEnd = nullptr; |
1254 | RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind()); |
1255 | std::unique_ptr<SeekableInputStream> stream = |
1256 | stripe.getStream(columnId, proto::Stream_Kind_SECONDARY, true); |
1257 | if (stream == nullptr) |
1258 | throw ParseError("SECONDARY stream not found in Decimal64Column" ); |
1259 | scaleDecoder = createRleDecoder(std::move(stream), true, vers, memoryPool); |
1260 | } |
1261 | |
1262 | Decimal64ColumnReader::~Decimal64ColumnReader() { |
1263 | // PASS |
1264 | } |
1265 | |
1266 | uint64_t Decimal64ColumnReader::skip(uint64_t numValues) { |
1267 | numValues = ColumnReader::skip(numValues); |
1268 | uint64_t skipped = 0; |
1269 | while (skipped < numValues) { |
1270 | readBuffer(); |
1271 | if (!(0x80 & *(buffer++))) { |
1272 | skipped += 1; |
1273 | } |
1274 | } |
1275 | scaleDecoder->skip(numValues); |
1276 | return numValues; |
1277 | } |
1278 | |
1279 | void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch, |
1280 | uint64_t numValues, |
1281 | char *notNull) { |
1282 | ColumnReader::next(rowBatch, numValues, notNull); |
1283 | notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; |
1284 | Decimal64VectorBatch &batch = |
1285 | dynamic_cast<Decimal64VectorBatch&>(rowBatch); |
1286 | int64_t* values = batch.values.data(); |
1287 | // read the next group of scales |
1288 | int64_t* scaleBuffer = batch.readScales.data(); |
1289 | scaleDecoder->next(scaleBuffer, numValues, notNull); |
1290 | batch.precision = precision; |
1291 | batch.scale = scale; |
1292 | if (notNull) { |
1293 | for(size_t i=0; i < numValues; ++i) { |
1294 | if (notNull[i]) { |
1295 | readInt64(values[i], static_cast<int32_t>(scaleBuffer[i])); |
1296 | } |
1297 | } |
1298 | } else { |
1299 | for(size_t i=0; i < numValues; ++i) { |
1300 | readInt64(values[i], static_cast<int32_t>(scaleBuffer[i])); |
1301 | } |
1302 | } |
1303 | } |
1304 | |
1305 | void scaleInt128(Int128& value, uint32_t scale, uint32_t currentScale) { |
1306 | if (scale > currentScale) { |
1307 | while(scale > currentScale) { |
1308 | uint32_t scaleAdjust = |
1309 | std::min(Decimal64ColumnReader::MAX_PRECISION_64, |
1310 | scale - currentScale); |
1311 | value *= Decimal64ColumnReader::POWERS_OF_TEN[scaleAdjust]; |
1312 | currentScale += scaleAdjust; |
1313 | } |
1314 | } else if (scale < currentScale) { |
1315 | Int128 remainder; |
1316 | while(currentScale > scale) { |
1317 | uint32_t scaleAdjust = |
1318 | std::min(Decimal64ColumnReader::MAX_PRECISION_64, |
1319 | currentScale - scale); |
1320 | value = value.divide(Decimal64ColumnReader::POWERS_OF_TEN[scaleAdjust], |
1321 | remainder); |
1322 | currentScale -= scaleAdjust; |
1323 | } |
1324 | } |
1325 | } |
1326 | |
1327 | class Decimal128ColumnReader: public Decimal64ColumnReader { |
1328 | public: |
1329 | Decimal128ColumnReader(const Type& type, StripeStreams& stipe); |
1330 | ~Decimal128ColumnReader() override; |
1331 | |
1332 | void next(ColumnVectorBatch& rowBatch, |
1333 | uint64_t numValues, |
1334 | char *notNull) override; |
1335 | |
1336 | private: |
1337 | void readInt128(Int128& value, int32_t currentScale) { |
1338 | value = 0; |
1339 | Int128 work; |
1340 | uint32_t offset = 0; |
1341 | while (true) { |
1342 | readBuffer(); |
1343 | unsigned char ch = static_cast<unsigned char>(*(buffer++)); |
1344 | work = ch & 0x7f; |
1345 | work <<= offset; |
1346 | value |= work; |
1347 | offset += 7; |
1348 | if (!(ch & 0x80)) { |
1349 | break; |
1350 | } |
1351 | } |
1352 | unZigZagInt128(value); |
1353 | scaleInt128(value, static_cast<uint32_t>(scale), |
1354 | static_cast<uint32_t>(currentScale)); |
1355 | } |
1356 | }; |
1357 | |
1358 | Decimal128ColumnReader::Decimal128ColumnReader |
1359 | (const Type& type, |
1360 | StripeStreams& stripe |
1361 | ): Decimal64ColumnReader(type, stripe) { |
1362 | // PASS |
1363 | } |
1364 | |
1365 | Decimal128ColumnReader::~Decimal128ColumnReader() { |
1366 | // PASS |
1367 | } |
1368 | |
1369 | void Decimal128ColumnReader::next(ColumnVectorBatch& rowBatch, |
1370 | uint64_t numValues, |
1371 | char *notNull) { |
1372 | ColumnReader::next(rowBatch, numValues, notNull); |
1373 | notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; |
1374 | Decimal128VectorBatch &batch = |
1375 | dynamic_cast<Decimal128VectorBatch&>(rowBatch); |
1376 | Int128* values = batch.values.data(); |
1377 | // read the next group of scales |
1378 | int64_t* scaleBuffer = batch.readScales.data(); |
1379 | scaleDecoder->next(scaleBuffer, numValues, notNull); |
1380 | batch.precision = precision; |
1381 | batch.scale = scale; |
1382 | if (notNull) { |
1383 | for(size_t i=0; i < numValues; ++i) { |
1384 | if (notNull[i]) { |
1385 | readInt128(values[i], static_cast<int32_t>(scaleBuffer[i])); |
1386 | } |
1387 | } |
1388 | } else { |
1389 | for(size_t i=0; i < numValues; ++i) { |
1390 | readInt128(values[i], static_cast<int32_t>(scaleBuffer[i])); |
1391 | } |
1392 | } |
1393 | } |
1394 | |
1395 | class DecimalHive11ColumnReader: public Decimal64ColumnReader { |
1396 | private: |
1397 | bool throwOnOverflow; |
1398 | std::ostream* errorStream; |
1399 | |
1400 | /** |
1401 | * Read an Int128 from the stream and correct it to the desired scale. |
1402 | */ |
1403 | bool readInt128(Int128& value, int32_t currentScale) { |
1404 | // -/+ 99999999999999999999999999999999999999 |
1405 | static const Int128 MIN_VALUE(-0x4b3b4ca85a86c47b, 0xf675ddc000000001); |
1406 | static const Int128 MAX_VALUE( 0x4b3b4ca85a86c47a, 0x098a223fffffffff); |
1407 | |
1408 | value = 0; |
1409 | Int128 work; |
1410 | uint32_t offset = 0; |
1411 | bool result = true; |
1412 | while (true) { |
1413 | readBuffer(); |
1414 | unsigned char ch = static_cast<unsigned char>(*(buffer++)); |
1415 | work = ch & 0x7f; |
1416 | // If we have read more than 128 bits, we flag the error, but keep |
1417 | // reading bytes so the stream isn't thrown off. |
1418 | if (offset > 128 || (offset == 126 && work > 3)) { |
1419 | result = false; |
1420 | } |
1421 | work <<= offset; |
1422 | value |= work; |
1423 | offset += 7; |
1424 | if (!(ch & 0x80)) { |
1425 | break; |
1426 | } |
1427 | } |
1428 | |
1429 | if (!result) { |
1430 | return result; |
1431 | } |
1432 | unZigZagInt128(value); |
1433 | scaleInt128(value, static_cast<uint32_t>(scale), |
1434 | static_cast<uint32_t>(currentScale)); |
1435 | return value >= MIN_VALUE && value <= MAX_VALUE; |
1436 | } |
1437 | |
1438 | public: |
1439 | DecimalHive11ColumnReader(const Type& type, StripeStreams& stipe); |
1440 | ~DecimalHive11ColumnReader() override; |
1441 | |
1442 | void next(ColumnVectorBatch& rowBatch, |
1443 | uint64_t numValues, |
1444 | char *notNull) override; |
1445 | }; |
1446 | |
1447 | DecimalHive11ColumnReader::DecimalHive11ColumnReader |
1448 | (const Type& type, |
1449 | StripeStreams& stripe |
1450 | ): Decimal64ColumnReader(type, stripe) { |
1451 | scale = stripe.getForcedScaleOnHive11Decimal(); |
1452 | throwOnOverflow = stripe.getThrowOnHive11DecimalOverflow(); |
1453 | errorStream = stripe.getErrorStream(); |
1454 | } |
1455 | |
1456 | DecimalHive11ColumnReader::~DecimalHive11ColumnReader() { |
1457 | // PASS |
1458 | } |
1459 | |
1460 | void DecimalHive11ColumnReader::next(ColumnVectorBatch& rowBatch, |
1461 | uint64_t numValues, |
1462 | char *notNull) { |
1463 | ColumnReader::next(rowBatch, numValues, notNull); |
1464 | notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr; |
1465 | Decimal128VectorBatch &batch = |
1466 | dynamic_cast<Decimal128VectorBatch&>(rowBatch); |
1467 | Int128* values = batch.values.data(); |
1468 | // read the next group of scales |
1469 | int64_t* scaleBuffer = batch.readScales.data(); |
1470 | |
1471 | scaleDecoder->next(scaleBuffer, numValues, notNull); |
1472 | |
1473 | batch.precision = precision; |
1474 | batch.scale = scale; |
1475 | if (notNull) { |
1476 | for(size_t i=0; i < numValues; ++i) { |
1477 | if (notNull[i]) { |
1478 | if (!readInt128(values[i], |
1479 | static_cast<int32_t>(scaleBuffer[i]))) { |
1480 | if (throwOnOverflow) { |
1481 | throw ParseError("Hive 0.11 decimal was more than 38 digits." ); |
1482 | } else { |
1483 | *errorStream << "Warning: " |
1484 | << "Hive 0.11 decimal with more than 38 digits " |
1485 | << "replaced by NULL.\n" ; |
1486 | notNull[i] = false; |
1487 | } |
1488 | } |
1489 | } |
1490 | } |
1491 | } else { |
1492 | for(size_t i=0; i < numValues; ++i) { |
1493 | if (!readInt128(values[i], |
1494 | static_cast<int32_t>(scaleBuffer[i]))) { |
1495 | if (throwOnOverflow) { |
1496 | throw ParseError("Hive 0.11 decimal was more than 38 digits." ); |
1497 | } else { |
1498 | *errorStream << "Warning: " |
1499 | << "Hive 0.11 decimal with more than 38 digits " |
1500 | << "replaced by NULL.\n" ; |
1501 | batch.hasNulls = true; |
1502 | batch.notNull[i] = false; |
1503 | } |
1504 | } |
1505 | } |
1506 | } |
1507 | } |
1508 | |
1509 | /** |
1510 | * Create a reader for the given stripe. |
1511 | */ |
1512 | std::unique_ptr<ColumnReader> buildReader(const Type& type, |
1513 | StripeStreams& stripe) { |
1514 | switch (static_cast<int64_t>(type.getKind())) { |
1515 | case DATE: |
1516 | case INT: |
1517 | case LONG: |
1518 | case SHORT: |
1519 | return std::unique_ptr<ColumnReader>( |
1520 | new IntegerColumnReader(type, stripe)); |
1521 | case BINARY: |
1522 | case CHAR: |
1523 | case STRING: |
1524 | case VARCHAR: |
1525 | switch (static_cast<int64_t>(stripe.getEncoding(type.getColumnId()).kind())){ |
1526 | case proto::ColumnEncoding_Kind_DICTIONARY: |
1527 | case proto::ColumnEncoding_Kind_DICTIONARY_V2: |
1528 | return std::unique_ptr<ColumnReader>( |
1529 | new StringDictionaryColumnReader(type, stripe)); |
1530 | case proto::ColumnEncoding_Kind_DIRECT: |
1531 | case proto::ColumnEncoding_Kind_DIRECT_V2: |
1532 | return std::unique_ptr<ColumnReader>( |
1533 | new StringDirectColumnReader(type, stripe)); |
1534 | default: |
1535 | throw NotImplementedYet("buildReader unhandled string encoding" ); |
1536 | } |
1537 | |
1538 | case BOOLEAN: |
1539 | return std::unique_ptr<ColumnReader>( |
1540 | new BooleanColumnReader(type, stripe)); |
1541 | |
1542 | case BYTE: |
1543 | return std::unique_ptr<ColumnReader>( |
1544 | new ByteColumnReader(type, stripe)); |
1545 | |
1546 | case LIST: |
1547 | return std::unique_ptr<ColumnReader>( |
1548 | new ListColumnReader(type, stripe)); |
1549 | |
1550 | case MAP: |
1551 | return std::unique_ptr<ColumnReader>( |
1552 | new MapColumnReader(type, stripe)); |
1553 | |
1554 | case UNION: |
1555 | return std::unique_ptr<ColumnReader>( |
1556 | new UnionColumnReader(type, stripe)); |
1557 | |
1558 | case STRUCT: |
1559 | return std::unique_ptr<ColumnReader>( |
1560 | new StructColumnReader(type, stripe)); |
1561 | |
1562 | case FLOAT: |
1563 | case DOUBLE: |
1564 | return std::unique_ptr<ColumnReader>( |
1565 | new DoubleColumnReader(type, stripe)); |
1566 | |
1567 | case TIMESTAMP: |
1568 | return std::unique_ptr<ColumnReader> |
1569 | (new TimestampColumnReader(type, stripe)); |
1570 | |
1571 | case DECIMAL: |
1572 | // is this a Hive 0.11 or 0.12 file? |
1573 | if (type.getPrecision() == 0) { |
1574 | return std::unique_ptr<ColumnReader> |
1575 | (new DecimalHive11ColumnReader(type, stripe)); |
1576 | |
1577 | // can we represent the values using int64_t? |
1578 | } else if (type.getPrecision() <= |
1579 | Decimal64ColumnReader::MAX_PRECISION_64) { |
1580 | return std::unique_ptr<ColumnReader> |
1581 | (new Decimal64ColumnReader(type, stripe)); |
1582 | |
1583 | // otherwise we use the Int128 implementation |
1584 | } else { |
1585 | return std::unique_ptr<ColumnReader> |
1586 | (new Decimal128ColumnReader(type, stripe)); |
1587 | } |
1588 | |
1589 | default: |
1590 | throw NotImplementedYet("buildReader unhandled type" ); |
1591 | } |
1592 | } |
1593 | |
1594 | } |
1595 | |