1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include "orc/Exceptions.hh"
20#include "RLE.hh"
21#include "Reader.hh"
22#include "StripeStream.hh"
23
24#include "wrap/coded-stream-wrapper.h"
25
26namespace orc {
27
28 StripeStreamsImpl::StripeStreamsImpl(const RowReaderImpl& _reader,
29 const proto::StripeFooter& _footer,
30 uint64_t _stripeStart,
31 InputStream& _input,
32 const Timezone& _writerTimezone
33 ): reader(_reader),
34 footer(_footer),
35 stripeStart(_stripeStart),
36 input(_input),
37 writerTimezone(_writerTimezone) {
38 // PASS
39 }
40
41 StripeStreamsImpl::~StripeStreamsImpl() {
42 // PASS
43 }
44
45 StreamInformation::~StreamInformation() {
46 // PASS
47 }
48
49 StripeInformation::~StripeInformation() {
50 // PASS
51 }
52
53
54 StreamInformationImpl::~StreamInformationImpl() {
55 // PASS
56 }
57
58 const std::vector<bool> StripeStreamsImpl::getSelectedColumns() const {
59 return reader.getSelectedColumns();
60 }
61
62 proto::ColumnEncoding StripeStreamsImpl::getEncoding(uint64_t columnId
63 ) const {
64 return footer.columns(static_cast<int>(columnId));
65 }
66
67 const Timezone& StripeStreamsImpl::getWriterTimezone() const {
68 return writerTimezone;
69 }
70
71 std::ostream* StripeStreamsImpl::getErrorStream() const {
72 return reader.getFileContents().errorStream;
73 }
74
75 std::unique_ptr<SeekableInputStream>
76 StripeStreamsImpl::getStream(uint64_t columnId,
77 proto::Stream_Kind kind,
78 bool shouldStream) const {
79 uint64_t offset = stripeStart;
80 MemoryPool *pool = reader.getFileContents().pool;
81 for(int i = 0; i < footer.streams_size(); ++i) {
82 const proto::Stream& stream = footer.streams(i);
83 if (stream.has_kind() &&
84 stream.kind() == kind &&
85 stream.column() == static_cast<uint64_t>(columnId)) {
86 uint64_t myBlock = shouldStream ? input.getNaturalReadSize():
87 stream.length();
88 return createDecompressor(reader.getCompression(),
89 std::unique_ptr<SeekableInputStream>
90 (new SeekableFileInputStream
91 (&input,
92 offset,
93 stream.length(),
94 *pool,
95 myBlock)),
96 reader.getCompressionSize(),
97 *pool);
98 }
99 offset += stream.length();
100 }
101 return std::unique_ptr<SeekableInputStream>();
102 }
103
104 MemoryPool& StripeStreamsImpl::getMemoryPool() const {
105 return *reader.getFileContents().pool;
106 }
107
108 bool StripeStreamsImpl::getThrowOnHive11DecimalOverflow() const {
109 return reader.getThrowOnHive11DecimalOverflow();
110 }
111
112 int32_t StripeStreamsImpl::getForcedScaleOnHive11Decimal() const {
113 return reader.getForcedScaleOnHive11Decimal();
114 }
115
116 void StripeInformationImpl::ensureStripeFooterLoaded() const {
117 if (stripeFooter.get() == nullptr) {
118 std::unique_ptr<SeekableInputStream> pbStream =
119 createDecompressor(compression,
120 std::unique_ptr<SeekableInputStream>
121 (new SeekableFileInputStream(stream,
122 offset +
123 indexLength +
124 dataLength,
125 footerLength,
126 memory)),
127 blockSize,
128 memory);
129 stripeFooter.reset(new proto::StripeFooter());
130 if (!stripeFooter->ParseFromZeroCopyStream(pbStream.get())) {
131 throw ParseError("Failed to parse the stripe footer");
132 }
133 }
134 }
135
136 std::unique_ptr<StreamInformation>
137 StripeInformationImpl::getStreamInformation(uint64_t streamId) const {
138 ensureStripeFooterLoaded();
139 uint64_t streamOffset = offset;
140 for(uint64_t s=0; s < streamId; ++s) {
141 streamOffset += stripeFooter->streams(static_cast<int>(s)).length();
142 }
143 return ORC_UNIQUE_PTR<StreamInformation>
144 (new StreamInformationImpl(streamOffset,
145 stripeFooter->
146 streams(static_cast<int>(streamId))));
147 }
148
149}
150