1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include "orc/Exceptions.hh"
20#include "InputStream.hh"
21
22#include <algorithm>
23#include <iomanip>
24
25namespace orc {
26
27 void printBuffer(std::ostream& out,
28 const char *buffer,
29 uint64_t length) {
30 const uint64_t width = 24;
31 out << std::hex;
32 for(uint64_t line = 0; line < (length + width - 1) / width; ++line) {
33 out << std::setfill('0') << std::setw(7) << (line * width);
34 for(uint64_t byte = 0;
35 byte < width && line * width + byte < length; ++byte) {
36 out << " " << std::setfill('0') << std::setw(2)
37 << static_cast<uint64_t>(0xff & buffer[line * width +
38 byte]);
39 }
40 out << "\n";
41 }
42 out << std::dec;
43 }
44
45 PositionProvider::PositionProvider(const std::list<uint64_t>& posns) {
46 position = posns.begin();
47 }
48
49 uint64_t PositionProvider::next() {
50 uint64_t result = *position;
51 ++position;
52 return result;
53 }
54
55 SeekableInputStream::~SeekableInputStream() {
56 // PASS
57 }
58
59 SeekableArrayInputStream::~SeekableArrayInputStream() {
60 // PASS
61 }
62
63 SeekableArrayInputStream::SeekableArrayInputStream
64 (const unsigned char* values,
65 uint64_t size,
66 uint64_t blkSize
67 ): data(reinterpret_cast<const char*>(values)) {
68 length = size;
69 position = 0;
70 blockSize = blkSize == 0 ? length : static_cast<uint64_t>(blkSize);
71 }
72
73 SeekableArrayInputStream::SeekableArrayInputStream(const char* values,
74 uint64_t size,
75 uint64_t blkSize
76 ): data(values) {
77 length = size;
78 position = 0;
79 blockSize = blkSize == 0 ? length : static_cast<uint64_t>(blkSize);
80 }
81
82 bool SeekableArrayInputStream::Next(const void** buffer, int*size) {
83 uint64_t currentSize = std::min(length - position, blockSize);
84 if (currentSize > 0) {
85 *buffer = data + position;
86 *size = static_cast<int>(currentSize);
87 position += currentSize;
88 return true;
89 }
90 *size = 0;
91 return false;
92 }
93
94 void SeekableArrayInputStream::BackUp(int count) {
95 if (count >= 0) {
96 uint64_t unsignedCount = static_cast<uint64_t>(count);
97 if (unsignedCount <= blockSize && unsignedCount <= position) {
98 position -= unsignedCount;
99 } else {
100 throw std::logic_error("Can't backup that much!");
101 }
102 }
103 }
104
105 bool SeekableArrayInputStream::Skip(int count) {
106 if (count >= 0) {
107 uint64_t unsignedCount = static_cast<uint64_t>(count);
108 if (unsignedCount + position <= length) {
109 position += unsignedCount;
110 return true;
111 } else {
112 position = length;
113 }
114 }
115 return false;
116 }
117
118 google::protobuf::int64 SeekableArrayInputStream::ByteCount() const {
119 return static_cast<google::protobuf::int64>(position);
120 }
121
122 void SeekableArrayInputStream::seek(PositionProvider& seekPosition) {
123 position = seekPosition.next();
124 }
125
126 std::string SeekableArrayInputStream::getName() const {
127 std::ostringstream result;
128 result << "SeekableArrayInputStream " << position << " of " << length;
129 return result.str();
130 }
131
132 static uint64_t computeBlock(uint64_t request, uint64_t length) {
133 return std::min(length, request == 0 ? 256 * 1024 : request);
134 }
135
136 SeekableFileInputStream::SeekableFileInputStream(InputStream* stream,
137 uint64_t offset,
138 uint64_t byteCount,
139 MemoryPool& _pool,
140 uint64_t _blockSize
141 ):pool(_pool),
142 input(stream),
143 start(offset),
144 length(byteCount),
145 blockSize(computeBlock
146 (_blockSize,
147 length)) {
148
149 position = 0;
150 buffer.reset(new DataBuffer<char>(pool));
151 pushBack = 0;
152 }
153
154 SeekableFileInputStream::~SeekableFileInputStream() {
155 // PASS
156 }
157
158 bool SeekableFileInputStream::Next(const void** data, int*size) {
159 uint64_t bytesRead;
160 if (pushBack != 0) {
161 *data = buffer->data() + (buffer->size() - pushBack);
162 bytesRead = pushBack;
163 } else {
164 bytesRead = std::min(length - position, blockSize);
165 buffer->resize(bytesRead);
166 if (bytesRead > 0) {
167 input->read(buffer->data(), bytesRead, start+position);
168 *data = static_cast<void*>(buffer->data());
169 }
170 }
171 position += bytesRead;
172 pushBack = 0;
173 *size = static_cast<int>(bytesRead);
174 return bytesRead != 0;
175 }
176
177 void SeekableFileInputStream::BackUp(int signedCount) {
178 if (signedCount < 0) {
179 throw std::logic_error("can't backup negative distances");
180 }
181 uint64_t count = static_cast<uint64_t>(signedCount);
182 if (pushBack > 0) {
183 throw std::logic_error("can't backup unless we just called Next");
184 }
185 if (count > blockSize || count > position) {
186 throw std::logic_error("can't backup that far");
187 }
188 pushBack = static_cast<uint64_t>(count);
189 position -= pushBack;
190 }
191
192 bool SeekableFileInputStream::Skip(int signedCount) {
193 if (signedCount < 0) {
194 return false;
195 }
196 uint64_t count = static_cast<uint64_t>(signedCount);
197 position = std::min(position + count, length);
198 pushBack = 0;
199 return position < length;
200 }
201
202 int64_t SeekableFileInputStream::ByteCount() const {
203 return static_cast<int64_t>(position);
204 }
205
206 void SeekableFileInputStream::seek(PositionProvider& location) {
207 position = location.next();
208 if (position > length) {
209 position = length;
210 throw std::logic_error("seek too far");
211 }
212 pushBack = 0;
213 }
214
215 std::string SeekableFileInputStream::getName() const {
216 std::ostringstream result;
217 result << input->getName() << " from " << start << " for "
218 << length;
219 return result.str();
220 }
221
222}
223