1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19
20#include "Options.hh"
21#include "Reader.hh"
22#include "Statistics.hh"
23#include "StripeStream.hh"
24
25#include "wrap/coded-stream-wrapper.h"
26
27#include <algorithm>
28#include <iostream>
29#include <memory>
30#include <sstream>
31#include <string>
32#include <vector>
33#include <iterator>
34#include <set>
35
36namespace orc {
37
38 const WriterVersionImpl &WriterVersionImpl::VERSION_HIVE_8732() {
39 static const WriterVersionImpl version(WriterVersion_HIVE_8732);
40 return version;
41 }
42
43 uint64_t getCompressionBlockSize(const proto::PostScript& ps) {
44 if (ps.has_compressionblocksize()) {
45 return ps.compressionblocksize();
46 } else {
47 return 256 * 1024;
48 }
49 }
50
51 CompressionKind convertCompressionKind(const proto::PostScript& ps) {
52 if (ps.has_compression()) {
53 return static_cast<CompressionKind>(ps.compression());
54 } else {
55 throw ParseError("Unknown compression type");
56 }
57 }
58
59 std::string ColumnSelector::toDotColumnPath() {
60 if (columns.empty()) {
61 return std::string();
62 }
63 std::ostringstream columnStream;
64 std::copy(columns.begin(), columns.end(),
65 std::ostream_iterator<std::string>(columnStream, "."));
66 std::string columnPath = columnStream.str();
67 return columnPath.substr(0, columnPath.length() - 1);
68 }
69
70
71 void ColumnSelector::selectChildren(std::vector<bool>& selectedColumns, const Type& type) {
72 size_t id = static_cast<size_t>(type.getColumnId());
73 if (!selectedColumns[id]) {
74 selectedColumns[id] = true;
75 for(size_t c = id; c <= type.getMaximumColumnId(); ++c){
76 selectedColumns[c] = true;
77 }
78 }
79 }
80
81 /**
82 * Recurses over a type tree and selects the parents of every selected type.
83 * @return true if any child was selected.
84 */
85 bool ColumnSelector::selectParents(std::vector<bool>& selectedColumns, const Type& type) {
86 size_t id = static_cast<size_t>(type.getColumnId());
87 bool result = selectedColumns[id];
88 for(uint64_t c=0; c < type.getSubtypeCount(); ++c) {
89 result |= selectParents(selectedColumns, *type.getSubtype(c));
90 }
91 selectedColumns[id] = result;
92 return result;
93 }
94
95 /**
96 * Recurses over a type tree and build two maps
97 * map<TypeName, TypeId>, map<TypeId, Type>
98 */
99 void ColumnSelector::buildTypeNameIdMap(const Type* type) {
100 // map<type_id, Type*>
101 idTypeMap[type->getColumnId()] = type;
102
103 if (STRUCT == type->getKind()) {
104 for (size_t i = 0; i < type->getSubtypeCount(); ++i) {
105 const std::string& fieldName = type->getFieldName(i);
106 columns.push_back(fieldName);
107 nameIdMap[toDotColumnPath()] = type->getSubtype(i)->getColumnId();
108 buildTypeNameIdMap(type->getSubtype(i));
109 columns.pop_back();
110 }
111 } else {
112 // other non-primitive type
113 for (size_t j = 0; j < type->getSubtypeCount(); ++j) {
114 buildTypeNameIdMap(type->getSubtype(j));
115 }
116 }
117 }
118
119 void ColumnSelector::updateSelected(std::vector<bool>& selectedColumns,
120 const RowReaderOptions& options) {
121 selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
122 if (contents->schema->getKind() == STRUCT && options.getIndexesSet()) {
123 for(std::list<uint64_t>::const_iterator field = options.getInclude().begin();
124 field != options.getInclude().end(); ++field) {
125 updateSelectedByFieldId(selectedColumns, *field);
126 }
127 } else if (contents->schema->getKind() == STRUCT && options.getNamesSet()) {
128 for(std::list<std::string>::const_iterator field = options.getIncludeNames().begin();
129 field != options.getIncludeNames().end(); ++field) {
130 updateSelectedByName(selectedColumns, *field);
131 }
132 } else if (options.getTypeIdsSet()) {
133 for(std::list<uint64_t>::const_iterator typeId = options.getInclude().begin();
134 typeId != options.getInclude().end(); ++typeId) {
135 updateSelectedByTypeId(selectedColumns, *typeId);
136 }
137 } else {
138 // default is to select all columns
139 std::fill(selectedColumns.begin(), selectedColumns.end(), true);
140 }
141 selectParents(selectedColumns, *contents->schema.get());
142 selectedColumns[0] = true; // column 0 is selected by default
143 }
144
145 void ColumnSelector::updateSelectedByFieldId(std::vector<bool>& selectedColumns,
146 uint64_t fieldId) {
147 if (fieldId < contents->schema->getSubtypeCount()) {
148 selectChildren(selectedColumns, *contents->schema->getSubtype(fieldId));
149 } else {
150 std::stringstream buffer;
151 buffer << "Invalid column selected " << fieldId << " out of "
152 << contents->schema->getSubtypeCount();
153 throw ParseError(buffer.str());
154 }
155 }
156
157 void ColumnSelector::updateSelectedByTypeId(std::vector<bool>& selectedColumns, uint64_t typeId) {
158 if (typeId < selectedColumns.size()) {
159 const Type& type = *idTypeMap[typeId];
160 selectChildren(selectedColumns, type);
161 } else {
162 std::stringstream buffer;
163 buffer << "Invalid type id selected " << typeId << " out of "
164 << selectedColumns.size();
165 throw ParseError(buffer.str());
166 }
167 }
168
169 void ColumnSelector::updateSelectedByName(std::vector<bool>& selectedColumns,
170 const std::string& fieldName) {
171 std::map<std::string, uint64_t>::const_iterator ite = nameIdMap.find(fieldName);
172 if (ite != nameIdMap.end()) {
173 updateSelectedByTypeId(selectedColumns, ite->second);
174 } else {
175 throw ParseError("Invalid column selected " + fieldName);
176 }
177 }
178
179 ColumnSelector::ColumnSelector(const FileContents* _contents): contents(_contents) {
180 buildTypeNameIdMap(contents->schema.get());
181 }
182
183 RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> _contents,
184 const RowReaderOptions& opts
185 ): localTimezone(getLocalTimezone()),
186 contents(_contents),
187 throwOnHive11DecimalOverflow(opts.getThrowOnHive11DecimalOverflow()),
188 forcedScaleOnHive11Decimal(opts.getForcedScaleOnHive11Decimal()),
189 footer(contents->footer.get()),
190 firstRowOfStripe(*contents->pool, 0) {
191 uint64_t numberOfStripes;
192 numberOfStripes = static_cast<uint64_t>(footer->stripes_size());
193 currentStripe = numberOfStripes;
194 lastStripe = 0;
195 currentRowInStripe = 0;
196 rowsInCurrentStripe = 0;
197 uint64_t rowTotal = 0;
198
199 firstRowOfStripe.resize(numberOfStripes);
200 for(size_t i=0; i < numberOfStripes; ++i) {
201 firstRowOfStripe[i] = rowTotal;
202 proto::StripeInformation stripeInfo =
203 footer->stripes(static_cast<int>(i));
204 rowTotal += stripeInfo.numberofrows();
205 bool isStripeInRange = stripeInfo.offset() >= opts.getOffset() &&
206 stripeInfo.offset() < opts.getOffset() + opts.getLength();
207 if (isStripeInRange) {
208 if (i < currentStripe) {
209 currentStripe = i;
210 }
211 if (i >= lastStripe) {
212 lastStripe = i + 1;
213 }
214 }
215 }
216 firstStripe = currentStripe;
217
218 if (currentStripe == 0) {
219 previousRow = (std::numeric_limits<uint64_t>::max)();
220 } else if (currentStripe == numberOfStripes) {
221 previousRow = footer->numberofrows();
222 } else {
223 previousRow = firstRowOfStripe[firstStripe]-1;
224 }
225
226 ColumnSelector column_selector(contents.get());
227 column_selector.updateSelected(selectedColumns, opts);
228 }
229
230 CompressionKind RowReaderImpl::getCompression() const {
231 return contents->compression;
232 }
233
234 uint64_t RowReaderImpl::getCompressionSize() const {
235 return contents->blockSize;
236 }
237
238 const std::vector<bool> RowReaderImpl::getSelectedColumns() const {
239 return selectedColumns;
240 }
241
242 const Type& RowReaderImpl::getSelectedType() const {
243 if (selectedSchema.get() == nullptr) {
244 selectedSchema = buildSelectedType(contents->schema.get(),
245 selectedColumns);
246 }
247 return *(selectedSchema.get());
248 }
249
250 uint64_t RowReaderImpl::getRowNumber() const {
251 return previousRow;
252 }
253
254 void RowReaderImpl::seekToRow(uint64_t rowNumber) {
255 // Empty file
256 if (lastStripe == 0) {
257 return;
258 }
259
260 // If we are reading only a portion of the file
261 // (bounded by firstStripe and lastStripe),
262 // seeking before or after the portion of interest should return no data.
263 // Implement this by setting previousRow to the number of rows in the file.
264
265 // seeking past lastStripe
266 uint64_t num_stripes = static_cast<uint64_t>(footer->stripes_size());
267 if ( (lastStripe == num_stripes
268 && rowNumber >= footer->numberofrows()) ||
269 (lastStripe < num_stripes
270 && rowNumber >= firstRowOfStripe[lastStripe]) ) {
271 currentStripe = num_stripes;
272 previousRow = footer->numberofrows();
273 return;
274 }
275
276 uint64_t seekToStripe = 0;
277 while (seekToStripe+1 < lastStripe &&
278 firstRowOfStripe[seekToStripe+1] <= rowNumber) {
279 seekToStripe++;
280 }
281
282 // seeking before the first stripe
283 if (seekToStripe < firstStripe) {
284 currentStripe = num_stripes;
285 previousRow = footer->numberofrows();
286 return;
287 }
288
289 currentStripe = seekToStripe;
290 currentRowInStripe = rowNumber - firstRowOfStripe[currentStripe];
291 previousRow = rowNumber;
292 startNextStripe();
293 reader->skip(currentRowInStripe);
294 }
295
296 const FileContents& RowReaderImpl::getFileContents() const {
297 return *contents;
298 }
299
300 bool RowReaderImpl::getThrowOnHive11DecimalOverflow() const {
301 return throwOnHive11DecimalOverflow;
302 }
303
304 int32_t RowReaderImpl::getForcedScaleOnHive11Decimal() const {
305 return forcedScaleOnHive11Decimal;
306 }
307
308 proto::StripeFooter getStripeFooter(const proto::StripeInformation& info,
309 const FileContents& contents) {
310 uint64_t stripeFooterStart = info.offset() + info.indexlength() +
311 info.datalength();
312 uint64_t stripeFooterLength = info.footerlength();
313 std::unique_ptr<SeekableInputStream> pbStream =
314 createDecompressor(contents.compression,
315 std::unique_ptr<SeekableInputStream>
316 (new SeekableFileInputStream(contents.stream.get(),
317 stripeFooterStart,
318 stripeFooterLength,
319 *contents.pool)),
320 contents.blockSize,
321 *contents.pool);
322 proto::StripeFooter result;
323 if (!result.ParseFromZeroCopyStream(pbStream.get())) {
324 throw ParseError(std::string("bad StripeFooter from ") +
325 pbStream->getName());
326 }
327 return result;
328 }
329
330 ReaderImpl::ReaderImpl(std::shared_ptr<FileContents> _contents,
331 const ReaderOptions& opts,
332 uint64_t _fileLength,
333 uint64_t _postscriptLength
334 ): contents(std::move(_contents)),
335 options(opts),
336 fileLength(_fileLength),
337 postscriptLength(_postscriptLength),
338 footer(contents->footer.get()) {
339 isMetadataLoaded = false;
340 checkOrcVersion();
341 numberOfStripes = static_cast<uint64_t>(footer->stripes_size());
342 contents->schema = REDUNDANT_MOVE(convertType(footer->types(0), *footer));
343 contents->blockSize = getCompressionBlockSize(*contents->postscript);
344 contents->compression= convertCompressionKind(*contents->postscript);
345 }
346
347 std::string ReaderImpl::getSerializedFileTail() const {
348 proto::FileTail tail;
349 proto::PostScript *mutable_ps = tail.mutable_postscript();
350 mutable_ps->CopyFrom(*contents->postscript);
351 proto::Footer *mutableFooter = tail.mutable_footer();
352 mutableFooter->CopyFrom(*footer);
353 tail.set_filelength(fileLength);
354 tail.set_postscriptlength(postscriptLength);
355 std::string result;
356 if (!tail.SerializeToString(&result)) {
357 throw ParseError("Failed to serialize file tail");
358 }
359 return result;
360 }
361
362 const ReaderOptions& ReaderImpl::getReaderOptions() const {
363 return options;
364 }
365
366 CompressionKind ReaderImpl::getCompression() const {
367 return contents->compression;
368 }
369
370 uint64_t ReaderImpl::getCompressionSize() const {
371 return contents->blockSize;
372 }
373
374 uint64_t ReaderImpl::getNumberOfStripes() const {
375 return numberOfStripes;
376 }
377
378 uint64_t ReaderImpl::getNumberOfStripeStatistics() const {
379 if (!isMetadataLoaded) {
380 readMetadata();
381 }
382 return metadata.get() == nullptr ? 0 :
383 static_cast<uint64_t>(metadata->stripestats_size());
384 }
385
386 std::unique_ptr<StripeInformation>
387 ReaderImpl::getStripe(uint64_t stripeIndex) const {
388 if (stripeIndex > getNumberOfStripes()) {
389 throw std::logic_error("stripe index out of range");
390 }
391 proto::StripeInformation stripeInfo =
392 footer->stripes(static_cast<int>(stripeIndex));
393
394 return std::unique_ptr<StripeInformation>
395 (new StripeInformationImpl
396 (stripeInfo.offset(),
397 stripeInfo.indexlength(),
398 stripeInfo.datalength(),
399 stripeInfo.footerlength(),
400 stripeInfo.numberofrows(),
401 contents->stream.get(),
402 *contents->pool,
403 contents->compression,
404 contents->blockSize));
405 }
406
407 FileVersion ReaderImpl::getFormatVersion() const {
408 if (contents->postscript->version_size() != 2) {
409 throw std::logic_error("Unrecognized file version.");
410 }
411 return FileVersion(
412 contents->postscript->version(0),
413 contents->postscript->version(1));
414 }
415
416 uint64_t ReaderImpl::getNumberOfRows() const {
417 return footer->numberofrows();
418 }
419
420 WriterId ReaderImpl::getWriterId() const {
421 if (footer->has_writer()) {
422 uint32_t id = footer->writer();
423 if (id > WriterId::PRESTO_WRITER) {
424 return WriterId::UNKNOWN_WRITER;
425 } else {
426 return static_cast<WriterId>(id);
427 }
428 }
429 return WriterId::ORC_JAVA_WRITER;
430 }
431
432 uint32_t ReaderImpl::getWriterIdValue() const {
433 if (footer->has_writer()) {
434 return footer->writer();
435 } else {
436 return WriterId::ORC_JAVA_WRITER;
437 }
438 }
439
440 WriterVersion ReaderImpl::getWriterVersion() const {
441 if (!contents->postscript->has_writerversion()) {
442 return WriterVersion_ORIGINAL;
443 }
444 return static_cast<WriterVersion>(contents->postscript->writerversion());
445 }
446
447 uint64_t ReaderImpl::getContentLength() const {
448 return footer->contentlength();
449 }
450
451 uint64_t ReaderImpl::getStripeStatisticsLength() const {
452 return contents->postscript->metadatalength();
453 }
454
455 uint64_t ReaderImpl::getFileFooterLength() const {
456 return contents->postscript->footerlength();
457 }
458
459 uint64_t ReaderImpl::getFilePostscriptLength() const {
460 return postscriptLength;
461 }
462
463 uint64_t ReaderImpl::getFileLength() const {
464 return fileLength;
465 }
466
467 uint64_t ReaderImpl::getRowIndexStride() const {
468 return footer->rowindexstride();
469 }
470
471 const std::string& ReaderImpl::getStreamName() const {
472 return contents->stream->getName();
473 }
474
475 std::list<std::string> ReaderImpl::getMetadataKeys() const {
476 std::list<std::string> result;
477 for(int i=0; i < footer->metadata_size(); ++i) {
478 result.push_back(footer->metadata(i).name());
479 }
480 return result;
481 }
482
483 std::string ReaderImpl::getMetadataValue(const std::string& key) const {
484 for(int i=0; i < footer->metadata_size(); ++i) {
485 if (footer->metadata(i).name() == key) {
486 return footer->metadata(i).value();
487 }
488 }
489 throw std::range_error("key not found");
490 }
491
492 void ReaderImpl::getRowIndexStatistics(
493 uint64_t stripeOffset, const proto::StripeFooter& currentStripeFooter,
494 std::vector<std::vector<proto::ColumnStatistics> >* indexStats) const {
495 int num_streams = currentStripeFooter.streams_size();
496 uint64_t offset = stripeOffset;
497 for (int i = 0; i < num_streams; i++) {
498 const proto::Stream& stream = currentStripeFooter.streams(i);
499 uint64_t length = static_cast<uint64_t>(stream.length());
500 if (static_cast<StreamKind>(stream.kind()) == StreamKind::StreamKind_ROW_INDEX) {
501 std::unique_ptr<SeekableInputStream> pbStream =
502 createDecompressor(contents->compression,
503 std::unique_ptr<SeekableInputStream>
504 (new SeekableFileInputStream(contents->stream.get(),
505 offset,
506 length,
507 *contents->pool)),
508 contents->blockSize,
509 *(contents->pool));
510
511 proto::RowIndex rowIndex;
512 if (!rowIndex.ParseFromZeroCopyStream(pbStream.get())) {
513 throw ParseError("Failed to parse RowIndex from stripe footer");
514 }
515 int num_entries = rowIndex.entry_size();
516 size_t column = static_cast<size_t>(stream.column());
517 for (int j = 0; j < num_entries; j++) {
518 const proto::RowIndexEntry& entry = rowIndex.entry(j);
519 (*indexStats)[column].push_back(entry.statistics());
520 }
521 }
522 offset += length;
523 }
524 }
525
526 bool ReaderImpl::hasMetadataValue(const std::string& key) const {
527 for(int i=0; i < footer->metadata_size(); ++i) {
528 if (footer->metadata(i).name() == key) {
529 return true;
530 }
531 }
532 return false;
533 }
534
535 const Type& ReaderImpl::getType() const {
536 return *(contents->schema.get());
537 }
538
539 std::unique_ptr<StripeStatistics>
540 ReaderImpl::getStripeStatistics(uint64_t stripeIndex) const {
541 if (!isMetadataLoaded) {
542 readMetadata();
543 }
544 if (metadata.get() == nullptr) {
545 throw std::logic_error("No stripe statistics in file");
546 }
547 size_t num_cols = static_cast<size_t>(
548 metadata->stripestats(
549 static_cast<int>(stripeIndex)).colstats_size());
550 std::vector<std::vector<proto::ColumnStatistics> > indexStats(num_cols);
551
552 proto::StripeInformation currentStripeInfo =
553 footer->stripes(static_cast<int>(stripeIndex));
554 proto::StripeFooter currentStripeFooter =
555 getStripeFooter(currentStripeInfo, *contents.get());
556
557 getRowIndexStatistics(currentStripeInfo.offset(), currentStripeFooter, &indexStats);
558
559 const Timezone& writerTZ =
560 currentStripeFooter.has_writertimezone() ?
561 getTimezoneByName(currentStripeFooter.writertimezone()) :
562 getLocalTimezone();
563 StatContext statContext(hasCorrectStatistics(), &writerTZ);
564 return std::unique_ptr<StripeStatistics>
565 (new StripeStatisticsImpl(metadata->stripestats(static_cast<int>(stripeIndex)),
566 indexStats, statContext));
567 }
568
569 std::unique_ptr<Statistics> ReaderImpl::getStatistics() const {
570 StatContext statContext(hasCorrectStatistics());
571 return std::unique_ptr<Statistics>
572 (new StatisticsImpl(*footer, statContext));
573 }
574
575 std::unique_ptr<ColumnStatistics>
576 ReaderImpl::getColumnStatistics(uint32_t index) const {
577 if (index >= static_cast<uint64_t>(footer->statistics_size())) {
578 throw std::logic_error("column index out of range");
579 }
580 proto::ColumnStatistics col =
581 footer->statistics(static_cast<int32_t>(index));
582
583 StatContext statContext(hasCorrectStatistics());
584 return std::unique_ptr<ColumnStatistics> (convertColumnStatistics(col, statContext));
585 }
586
587 void ReaderImpl::readMetadata() const {
588 uint64_t metadataSize = contents->postscript->metadatalength();
589 uint64_t metadataStart = fileLength - metadataSize
590 - contents->postscript->footerlength() - postscriptLength - 1;
591 if (metadataSize != 0) {
592 std::unique_ptr<SeekableInputStream> pbStream =
593 createDecompressor(contents->compression,
594 std::unique_ptr<SeekableInputStream>
595 (new SeekableFileInputStream(contents->stream.get(),
596 metadataStart,
597 metadataSize,
598 *contents->pool)),
599 contents->blockSize,
600 *contents->pool);
601 metadata.reset(new proto::Metadata());
602 if (!metadata->ParseFromZeroCopyStream(pbStream.get())) {
603 throw ParseError("Failed to parse the metadata");
604 }
605 }
606 isMetadataLoaded = true;
607 }
608
609 bool ReaderImpl::hasCorrectStatistics() const {
610 return !WriterVersionImpl::VERSION_HIVE_8732().compareGT(getWriterVersion());
611 }
612
613 void ReaderImpl::checkOrcVersion() {
614 FileVersion version = getFormatVersion();
615 if (version != FileVersion(0, 11) && version != FileVersion(0, 12)) {
616 *(options.getErrorStream())
617 << "Warning: ORC file " << contents->stream->getName()
618 << " was written in an unknown format version "
619 << version.toString() << "\n";
620 }
621 }
622
623 std::unique_ptr<RowReader> ReaderImpl::createRowReader() const {
624 RowReaderOptions defaultOpts;
625 return createRowReader(defaultOpts);
626 }
627
628 std::unique_ptr<RowReader> ReaderImpl::createRowReader(
629 const RowReaderOptions& opts) const {
630 return std::unique_ptr<RowReader>(new RowReaderImpl(contents, opts));
631 }
632
633 uint64_t maxStreamsForType(const proto::Type& type) {
634 switch (static_cast<int64_t>(type.kind())) {
635 case proto::Type_Kind_STRUCT:
636 return 1;
637 case proto::Type_Kind_INT:
638 case proto::Type_Kind_LONG:
639 case proto::Type_Kind_SHORT:
640 case proto::Type_Kind_FLOAT:
641 case proto::Type_Kind_DOUBLE:
642 case proto::Type_Kind_BOOLEAN:
643 case proto::Type_Kind_BYTE:
644 case proto::Type_Kind_DATE:
645 case proto::Type_Kind_LIST:
646 case proto::Type_Kind_MAP:
647 case proto::Type_Kind_UNION:
648 return 2;
649 case proto::Type_Kind_BINARY:
650 case proto::Type_Kind_DECIMAL:
651 case proto::Type_Kind_TIMESTAMP:
652 return 3;
653 case proto::Type_Kind_CHAR:
654 case proto::Type_Kind_STRING:
655 case proto::Type_Kind_VARCHAR:
656 return 4;
657 default:
658 return 0;
659 }
660 }
661
662 uint64_t ReaderImpl::getMemoryUse(int stripeIx) {
663 std::vector<bool> selectedColumns;
664 selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), true);
665 return getMemoryUse(stripeIx, selectedColumns);
666 }
667
668 uint64_t ReaderImpl::getMemoryUseByFieldId(const std::list<uint64_t>& include, int stripeIx) {
669 std::vector<bool> selectedColumns;
670 selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
671 ColumnSelector column_selector(contents.get());
672 if (contents->schema->getKind() == STRUCT && include.begin() != include.end()) {
673 for(std::list<uint64_t>::const_iterator field = include.begin();
674 field != include.end(); ++field) {
675 column_selector.updateSelectedByFieldId(selectedColumns, *field);
676 }
677 } else {
678 // default is to select all columns
679 std::fill(selectedColumns.begin(), selectedColumns.end(), true);
680 }
681 column_selector.selectParents(selectedColumns, *contents->schema.get());
682 selectedColumns[0] = true; // column 0 is selected by default
683 return getMemoryUse(stripeIx, selectedColumns);
684 }
685
686 uint64_t ReaderImpl::getMemoryUseByName(const std::list<std::string>& names, int stripeIx) {
687 std::vector<bool> selectedColumns;
688 selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
689 ColumnSelector column_selector(contents.get());
690 if (contents->schema->getKind() == STRUCT && names.begin() != names.end()) {
691 for(std::list<std::string>::const_iterator field = names.begin();
692 field != names.end(); ++field) {
693 column_selector.updateSelectedByName(selectedColumns, *field);
694 }
695 } else {
696 // default is to select all columns
697 std::fill(selectedColumns.begin(), selectedColumns.end(), true);
698 }
699 column_selector.selectParents(selectedColumns, *contents->schema.get());
700 selectedColumns[0] = true; // column 0 is selected by default
701 return getMemoryUse(stripeIx, selectedColumns);
702 }
703
704 uint64_t ReaderImpl::getMemoryUseByTypeId(const std::list<uint64_t>& include, int stripeIx) {
705 std::vector<bool> selectedColumns;
706 selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
707 ColumnSelector column_selector(contents.get());
708 if (include.begin() != include.end()) {
709 for(std::list<uint64_t>::const_iterator field = include.begin();
710 field != include.end(); ++field) {
711 column_selector.updateSelectedByTypeId(selectedColumns, *field);
712 }
713 } else {
714 // default is to select all columns
715 std::fill(selectedColumns.begin(), selectedColumns.end(), true);
716 }
717 column_selector.selectParents(selectedColumns, *contents->schema.get());
718 selectedColumns[0] = true; // column 0 is selected by default
719 return getMemoryUse(stripeIx, selectedColumns);
720 }
721
722 uint64_t ReaderImpl::getMemoryUse(int stripeIx, std::vector<bool>& selectedColumns) {
723 uint64_t maxDataLength = 0;
724
725 if (stripeIx >= 0 && stripeIx < footer->stripes_size()) {
726 uint64_t stripe = footer->stripes(stripeIx).datalength();
727 if (maxDataLength < stripe) {
728 maxDataLength = stripe;
729 }
730 } else {
731 for (int i=0; i < footer->stripes_size(); i++) {
732 uint64_t stripe = footer->stripes(i).datalength();
733 if (maxDataLength < stripe) {
734 maxDataLength = stripe;
735 }
736 }
737 }
738
739 bool hasStringColumn = false;
740 uint64_t nSelectedStreams = 0;
741 for (int i=0; !hasStringColumn && i < footer->types_size(); i++) {
742 if (selectedColumns[static_cast<size_t>(i)]) {
743 const proto::Type& type = footer->types(i);
744 nSelectedStreams += maxStreamsForType(type) ;
745 switch (static_cast<int64_t>(type.kind())) {
746 case proto::Type_Kind_CHAR:
747 case proto::Type_Kind_STRING:
748 case proto::Type_Kind_VARCHAR:
749 case proto::Type_Kind_BINARY: {
750 hasStringColumn = true;
751 break;
752 }
753 default: {
754 break;
755 }
756 }
757 }
758 }
759
760 /* If a string column is read, use stripe datalength as a memory estimate
761 * because we don't know the dictionary size. Multiply by 2 because
762 * a string column requires two buffers:
763 * in the input stream and in the seekable input stream.
764 * If no string column is read, estimate from the number of streams.
765 */
766 uint64_t memory = hasStringColumn ? 2 * maxDataLength :
767 std::min(uint64_t(maxDataLength),
768 nSelectedStreams * contents->stream->getNaturalReadSize());
769
770 // Do we need even more memory to read the footer or the metadata?
771 if (memory < contents->postscript->footerlength() + DIRECTORY_SIZE_GUESS) {
772 memory = contents->postscript->footerlength() + DIRECTORY_SIZE_GUESS;
773 }
774 if (memory < contents->postscript->metadatalength()) {
775 memory = contents->postscript->metadatalength();
776 }
777
778 // Account for firstRowOfStripe.
779 memory += static_cast<uint64_t>(footer->stripes_size()) * sizeof(uint64_t);
780
781 // Decompressors need buffers for each stream
782 uint64_t decompressorMemory = 0;
783 if (contents->compression != CompressionKind_NONE) {
784 for (int i=0; i < footer->types_size(); i++) {
785 if (selectedColumns[static_cast<size_t>(i)]) {
786 const proto::Type& type = footer->types(i);
787 decompressorMemory += maxStreamsForType(type) * contents->blockSize;
788 }
789 }
790 if (contents->compression == CompressionKind_SNAPPY) {
791 decompressorMemory *= 2; // Snappy decompressor uses a second buffer
792 }
793 }
794
795 return memory + decompressorMemory ;
796 }
797
798 void RowReaderImpl::startNextStripe() {
799 reader.reset(); // ColumnReaders use lots of memory; free old memory first
800 currentStripeInfo = footer->stripes(static_cast<int>(currentStripe));
801 currentStripeFooter = getStripeFooter(currentStripeInfo, *contents.get());
802 rowsInCurrentStripe = currentStripeInfo.numberofrows();
803 const Timezone& writerTimezone =
804 currentStripeFooter.has_writertimezone() ?
805 getTimezoneByName(currentStripeFooter.writertimezone()) :
806 localTimezone;
807 StripeStreamsImpl stripeStreams(*this, currentStripeFooter,
808 currentStripeInfo.offset(),
809 *(contents->stream.get()),
810 writerTimezone);
811 reader = buildReader(*contents->schema.get(), stripeStreams);
812 }
813
814 bool RowReaderImpl::next(ColumnVectorBatch& data) {
815 if (currentStripe >= lastStripe) {
816 data.numElements = 0;
817 if (lastStripe > 0) {
818 previousRow = firstRowOfStripe[lastStripe - 1] +
819 footer->stripes(static_cast<int>(lastStripe - 1)).numberofrows();
820 } else {
821 previousRow = 0;
822 }
823 return false;
824 }
825 if (currentRowInStripe == 0) {
826 startNextStripe();
827 }
828 uint64_t rowsToRead =
829 std::min(static_cast<uint64_t>(data.capacity),
830 rowsInCurrentStripe - currentRowInStripe);
831 data.numElements = rowsToRead;
832 reader->next(data, rowsToRead, nullptr);
833 // update row number
834 previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe;
835 currentRowInStripe += rowsToRead;
836 if (currentRowInStripe >= rowsInCurrentStripe) {
837 currentStripe += 1;
838 currentRowInStripe = 0;
839 }
840 return rowsToRead != 0;
841 }
842
843 std::unique_ptr<ColumnVectorBatch> RowReaderImpl::createRowBatch
844 (uint64_t capacity) const {
845 return getSelectedType().createRowBatch(capacity, *contents->pool);
846 }
847
848 void ensureOrcFooter(InputStream* stream,
849 DataBuffer<char> *buffer,
850 uint64_t postscriptLength) {
851
852 const std::string MAGIC("ORC");
853 const uint64_t magicLength = MAGIC.length();
854 const char * const bufferStart = buffer->data();
855 const uint64_t bufferLength = buffer->size();
856
857 if (postscriptLength < magicLength || bufferLength < magicLength) {
858 throw ParseError("Invalid ORC postscript length");
859 }
860 const char* magicStart = bufferStart + bufferLength - 1 - magicLength;
861
862 // Look for the magic string at the end of the postscript.
863 if (memcmp(magicStart, MAGIC.c_str(), magicLength) != 0) {
864 // If there is no magic string at the end, check the beginning.
865 // Only files written by Hive 0.11.0 don't have the tail ORC string.
866 char *frontBuffer = new char[magicLength];
867 stream->read(frontBuffer, magicLength, 0);
868 bool foundMatch = memcmp(frontBuffer, MAGIC.c_str(), magicLength) == 0;
869 delete[] frontBuffer;
870 if (!foundMatch) {
871 throw ParseError("Not an ORC file");
872 }
873 }
874 }
875
876 /**
877 * Read the file's postscript from the given buffer.
878 * @param stream the file stream
879 * @param buffer the buffer with the tail of the file.
880 * @param postscriptSize the length of postscript in bytes
881 */
882 std::unique_ptr<proto::PostScript> readPostscript(InputStream *stream,
883 DataBuffer<char> *buffer,
884 uint64_t postscriptSize) {
885 char *ptr = buffer->data();
886 uint64_t readSize = buffer->size();
887
888 ensureOrcFooter(stream, buffer, postscriptSize);
889
890 std::unique_ptr<proto::PostScript> postscript =
891 std::unique_ptr<proto::PostScript>(new proto::PostScript());
892 if (!postscript->ParseFromArray(ptr + readSize - 1 - postscriptSize,
893 static_cast<int>(postscriptSize))) {
894 throw ParseError("Failed to parse the postscript from " +
895 stream->getName());
896 }
897 return REDUNDANT_MOVE(postscript);
898 }
899
900 /**
901 * Check that indices in the type tree are valid, so we won't crash
902 * when we convert the proto::Types to TypeImpls.
903 */
904 void checkProtoTypeIds(const proto::Footer &footer) {
905 std::stringstream msg;
906 int maxId = footer.types_size();
907 for (int i = 0; i < maxId; ++i) {
908 const proto::Type& type = footer.types(i);
909 for (int j = 0; j < type.subtypes_size(); ++j) {
910 int subTypeId = static_cast<int>(type.subtypes(j));
911 if (subTypeId <= i) {
912 msg << "Footer is corrupt: malformed link from type " << i << " to "
913 << subTypeId;
914 throw ParseError(msg.str());
915 }
916 if (subTypeId >= maxId) {
917 msg << "Footer is corrupt: types(" << subTypeId << ") not exists";
918 throw ParseError(msg.str());
919 }
920 if (j > 0 && static_cast<int>(type.subtypes(j - 1)) >= subTypeId) {
921 msg << "Footer is corrupt: subType(" << (j-1) << ") >= subType(" << j
922 << ") in types(" << i << "). (" << type.subtypes(j - 1) << " >= "
923 << subTypeId << ")";
924 throw ParseError(msg.str());
925 }
926 }
927 }
928 }
929
930 /**
931 * Parse the footer from the given buffer.
932 * @param stream the file's stream
933 * @param buffer the buffer to parse the footer from
934 * @param footerOffset the offset within the buffer that contains the footer
935 * @param ps the file's postscript
936 * @param memoryPool the memory pool to use
937 */
938 std::unique_ptr<proto::Footer> readFooter(InputStream* stream,
939 DataBuffer<char> *&buffer,
940 uint64_t footerOffset,
941 const proto::PostScript& ps,
942 MemoryPool& memoryPool) {
943 char *footerPtr = buffer->data() + footerOffset;
944
945 std::unique_ptr<SeekableInputStream> pbStream =
946 createDecompressor(convertCompressionKind(ps),
947 std::unique_ptr<SeekableInputStream>
948 (new SeekableArrayInputStream(footerPtr,
949 ps.footerlength())),
950 getCompressionBlockSize(ps),
951 memoryPool);
952
953 std::unique_ptr<proto::Footer> footer =
954 std::unique_ptr<proto::Footer>(new proto::Footer());
955 if (!footer->ParseFromZeroCopyStream(pbStream.get())) {
956 throw ParseError("Failed to parse the footer from " +
957 stream->getName());
958 }
959
960 checkProtoTypeIds(*footer);
961 return REDUNDANT_MOVE(footer);
962 }
963
964 std::unique_ptr<Reader> createReader(std::unique_ptr<InputStream> stream,
965 const ReaderOptions& options) {
966 std::shared_ptr<FileContents> contents = std::shared_ptr<FileContents>(new FileContents());
967 contents->pool = options.getMemoryPool();
968 contents->errorStream = options.getErrorStream();
969 std::string serializedFooter = options.getSerializedFileTail();
970 uint64_t fileLength;
971 uint64_t postscriptLength;
972 if (serializedFooter.length() != 0) {
973 // Parse the file tail from the serialized one.
974 proto::FileTail tail;
975 if (!tail.ParseFromString(serializedFooter)) {
976 throw ParseError("Failed to parse the file tail from string");
977 }
978 contents->postscript.reset(new proto::PostScript(tail.postscript()));
979 contents->footer.reset(new proto::Footer(tail.footer()));
980 fileLength = tail.filelength();
981 postscriptLength = tail.postscriptlength();
982 } else {
983 // figure out the size of the file using the option or filesystem
984 fileLength = std::min(options.getTailLocation(),
985 static_cast<uint64_t>(stream->getLength()));
986
987 //read last bytes into buffer to get PostScript
988 uint64_t readSize = std::min(fileLength, DIRECTORY_SIZE_GUESS);
989 if (readSize < 4) {
990 throw ParseError("File size too small");
991 }
992 DataBuffer<char> *buffer = new DataBuffer<char>(*contents->pool, readSize);
993 stream->read(buffer->data(), readSize, fileLength - readSize);
994
995 postscriptLength = buffer->data()[readSize - 1] & 0xff;
996 contents->postscript = REDUNDANT_MOVE(readPostscript(stream.get(),
997 buffer, postscriptLength));
998 uint64_t footerSize = contents->postscript->footerlength();
999 uint64_t tailSize = 1 + postscriptLength + footerSize;
1000 uint64_t footerOffset;
1001
1002 if (tailSize > readSize) {
1003 buffer->resize(footerSize);
1004 stream->read(buffer->data(), footerSize, fileLength - tailSize);
1005 footerOffset = 0;
1006 } else {
1007 footerOffset = readSize - tailSize;
1008 }
1009
1010 contents->footer = REDUNDANT_MOVE(readFooter(stream.get(), buffer,
1011 footerOffset, *contents->postscript, *contents->pool));
1012 delete buffer;
1013 }
1014 contents->stream = std::move(stream);
1015 return std::unique_ptr<Reader>(new ReaderImpl(std::move(contents),
1016 options,
1017 fileLength,
1018 postscriptLength));
1019 }
1020
1021 RowReader::~RowReader() {
1022 // PASS
1023 }
1024
1025 Reader::~Reader() {
1026 // PASS
1027 }
1028
1029 InputStream::~InputStream() {
1030 // PASS
1031 };
1032
1033
1034
1035}// namespace
1036