1 | /** |
2 | * Licensed to the Apache Software Foundation (ASF) under one |
3 | * or more contributor license agreements. See the NOTICE file |
4 | * distributed with this work for additional information |
5 | * regarding copyright ownership. The ASF licenses this file |
6 | * to you under the Apache License, Version 2.0 (the |
7 | * "License"); you may not use this file except in compliance |
8 | * with the License. You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, software |
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
15 | * See the License for the specific language governing permissions and |
16 | * limitations under the License. |
17 | */ |
18 | |
19 | #include "Adaptor.hh" |
20 | #include "Compression.hh" |
21 | #include "orc/Exceptions.hh" |
22 | #include "RLEv1.hh" |
23 | |
24 | #include <algorithm> |
25 | |
26 | namespace orc { |
27 | |
28 | const int MINIMUM_REPEAT = 3; |
29 | const int MAXIMUM_REPEAT = 127 + MINIMUM_REPEAT; |
30 | |
31 | const int64_t BASE_128_MASK = 0x7f; |
32 | |
33 | const int MAX_DELTA = 127; |
34 | const int MIN_DELTA = -128; |
35 | const int MAX_LITERAL_SIZE = 128; |
36 | |
37 | RleEncoderV1::RleEncoderV1( |
38 | std::unique_ptr<BufferedOutputStream> outStream, |
39 | bool hasSigned): |
40 | outputStream(std::move(outStream)) { |
41 | isSigned = hasSigned; |
42 | literals = new int64_t[MAX_LITERAL_SIZE]; |
43 | numLiterals = 0; |
44 | delta = 0; |
45 | repeat = false; |
46 | tailRunLength = 0; |
47 | bufferPosition = 0; |
48 | bufferLength = 0; |
49 | buffer = nullptr; |
50 | } |
51 | |
52 | RleEncoderV1::~RleEncoderV1() { |
53 | delete [] literals; |
54 | } |
55 | |
56 | void RleEncoderV1::add(const int64_t* data, uint64_t numValues, |
57 | const char* notNull) { |
58 | for (uint64_t i = 0; i < numValues; ++i) { |
59 | if (!notNull || notNull[i]) { |
60 | write(data[i]); |
61 | } |
62 | } |
63 | } |
64 | |
65 | void RleEncoderV1::writeByte(char c) { |
66 | if (bufferPosition == bufferLength) { |
67 | int addedSize = 0; |
68 | if (!outputStream->Next(reinterpret_cast<void **>(&buffer), &addedSize)) { |
69 | throw std::bad_alloc(); |
70 | } |
71 | bufferPosition = 0; |
72 | bufferLength = addedSize; |
73 | } |
74 | buffer[bufferPosition++] = c; |
75 | } |
76 | |
77 | void RleEncoderV1::writeValues() { |
78 | if (numLiterals != 0) { |
79 | if (repeat) { |
80 | writeByte(static_cast<char> |
81 | (static_cast<uint64_t>(numLiterals) - MINIMUM_REPEAT)); |
82 | writeByte(static_cast<char>(delta)); |
83 | if (isSigned) { |
84 | writeVslong(literals[0]); |
85 | } else { |
86 | writeVulong(literals[0]); |
87 | } |
88 | } else { |
89 | writeByte(static_cast<char>(-numLiterals)); |
90 | for(int i=0; i < numLiterals; ++i) { |
91 | if (isSigned) { |
92 | writeVslong(literals[i]); |
93 | } else { |
94 | writeVulong(literals[i]); |
95 | } |
96 | } |
97 | } |
98 | repeat = false; |
99 | numLiterals = 0; |
100 | tailRunLength = 0; |
101 | } |
102 | } |
103 | |
104 | uint64_t RleEncoderV1::flush() { |
105 | writeValues(); |
106 | outputStream->BackUp(bufferLength - bufferPosition); |
107 | uint64_t dataSize = outputStream->flush(); |
108 | bufferLength = bufferPosition = 0; |
109 | return dataSize; |
110 | } |
111 | |
112 | void RleEncoderV1::write(int64_t value) { |
113 | if (numLiterals == 0) { |
114 | literals[numLiterals++] = value; |
115 | tailRunLength = 1; |
116 | } else if (repeat) { |
117 | if (value == literals[0] + delta * numLiterals) { |
118 | numLiterals += 1; |
119 | if (numLiterals == MAXIMUM_REPEAT) { |
120 | writeValues(); |
121 | } |
122 | } else { |
123 | writeValues(); |
124 | literals[numLiterals++] = value; |
125 | tailRunLength = 1; |
126 | } |
127 | } else { |
128 | if (tailRunLength == 1) { |
129 | delta = value - literals[numLiterals - 1]; |
130 | if (delta < MIN_DELTA || delta > MAX_DELTA) { |
131 | tailRunLength = 1; |
132 | } else { |
133 | tailRunLength = 2; |
134 | } |
135 | } else if (value == literals[numLiterals - 1] + delta) { |
136 | tailRunLength += 1; |
137 | } else { |
138 | delta = value - literals[numLiterals - 1]; |
139 | if (delta < MIN_DELTA || delta > MAX_DELTA) { |
140 | tailRunLength = 1; |
141 | } else { |
142 | tailRunLength = 2; |
143 | } |
144 | } |
145 | if (tailRunLength == MINIMUM_REPEAT) { |
146 | if (numLiterals + 1 == MINIMUM_REPEAT) { |
147 | repeat = true; |
148 | numLiterals += 1; |
149 | } else { |
150 | numLiterals -= static_cast<int>(MINIMUM_REPEAT - 1); |
151 | int64_t base = literals[numLiterals]; |
152 | writeValues(); |
153 | literals[0] = base; |
154 | repeat = true; |
155 | numLiterals = MINIMUM_REPEAT; |
156 | } |
157 | } else { |
158 | literals[numLiterals++] = value; |
159 | if (numLiterals == MAX_LITERAL_SIZE) { |
160 | writeValues(); |
161 | } |
162 | } |
163 | } |
164 | } |
165 | |
166 | void RleEncoderV1::writeVslong(int64_t val) { |
167 | writeVulong((val << 1) ^ (val >> 63)); |
168 | } |
169 | |
170 | void RleEncoderV1::writeVulong(int64_t val) { |
171 | while (true) { |
172 | if ((val & ~BASE_128_MASK) == 0) { |
173 | writeByte(static_cast<char>(val)); |
174 | return; |
175 | } else { |
176 | writeByte(static_cast<char>(0x80 | (val & BASE_128_MASK))); |
177 | // cast val to unsigned so as to force 0-fill right shift |
178 | val = (static_cast<uint64_t>(val) >> 7); |
179 | } |
180 | } |
181 | } |
182 | |
183 | void RleEncoderV1::recordPosition(PositionRecorder* recorder) const { |
184 | uint64_t flushedSize = outputStream->getSize(); |
185 | uint64_t unflushedSize = static_cast<uint64_t>(bufferPosition); |
186 | if (outputStream->isCompressed()) { |
187 | recorder->add(flushedSize); |
188 | recorder->add(unflushedSize); |
189 | } else { |
190 | flushedSize -= static_cast<uint64_t>(bufferLength); |
191 | recorder->add(flushedSize + unflushedSize); |
192 | } |
193 | recorder->add(static_cast<uint64_t>(numLiterals)); |
194 | } |
195 | |
196 | signed char RleDecoderV1::readByte() { |
197 | if (bufferStart == bufferEnd) { |
198 | int bufferLength; |
199 | const void* bufferPointer; |
200 | if (!inputStream->Next(&bufferPointer, &bufferLength)) { |
201 | throw ParseError("bad read in readByte" ); |
202 | } |
203 | bufferStart = static_cast<const char*>(bufferPointer); |
204 | bufferEnd = bufferStart + bufferLength; |
205 | } |
206 | return *(bufferStart++); |
207 | } |
208 | |
209 | uint64_t RleDecoderV1::readLong() { |
210 | uint64_t result = 0; |
211 | int64_t offset = 0; |
212 | signed char ch = readByte(); |
213 | if (ch >= 0) { |
214 | result = static_cast<uint64_t>(ch); |
215 | } else { |
216 | result = static_cast<uint64_t>(ch) & BASE_128_MASK; |
217 | while ((ch = readByte()) < 0) { |
218 | offset += 7; |
219 | result |= (static_cast<uint64_t>(ch) & BASE_128_MASK) << offset; |
220 | } |
221 | result |= static_cast<uint64_t>(ch) << (offset + 7); |
222 | } |
223 | return result; |
224 | } |
225 | |
226 | void RleDecoderV1::skipLongs(uint64_t numValues) { |
227 | while (numValues > 0) { |
228 | if (readByte() >= 0) { |
229 | --numValues; |
230 | } |
231 | } |
232 | } |
233 | |
234 | void RleDecoderV1::() { |
235 | signed char ch = readByte(); |
236 | if (ch < 0) { |
237 | remainingValues = static_cast<uint64_t>(-ch); |
238 | repeating = false; |
239 | } else { |
240 | remainingValues = static_cast<uint64_t>(ch) + MINIMUM_REPEAT; |
241 | repeating = true; |
242 | delta = readByte(); |
243 | value = isSigned |
244 | ? unZigZag(readLong()) |
245 | : static_cast<int64_t>(readLong()); |
246 | } |
247 | } |
248 | |
249 | RleDecoderV1::RleDecoderV1(std::unique_ptr<SeekableInputStream> input, |
250 | bool hasSigned) |
251 | : inputStream(std::move(input)), |
252 | isSigned(hasSigned), |
253 | remainingValues(0), |
254 | value(0), |
255 | bufferStart(nullptr), |
256 | bufferEnd(bufferStart), |
257 | delta(0), |
258 | repeating(false) { |
259 | } |
260 | |
261 | void RleDecoderV1::seek(PositionProvider& location) { |
262 | // move the input stream |
263 | inputStream->seek(location); |
264 | // force a re-read from the stream |
265 | bufferEnd = bufferStart; |
266 | // read a new header |
267 | readHeader(); |
268 | // skip ahead the given number of records |
269 | skip(location.next()); |
270 | } |
271 | |
272 | void RleDecoderV1::skip(uint64_t numValues) { |
273 | while (numValues > 0) { |
274 | if (remainingValues == 0) { |
275 | readHeader(); |
276 | } |
277 | uint64_t count = std::min(numValues, remainingValues); |
278 | remainingValues -= count; |
279 | numValues -= count; |
280 | if (repeating) { |
281 | value += delta * static_cast<int64_t>(count); |
282 | } else { |
283 | skipLongs(count); |
284 | } |
285 | } |
286 | } |
287 | |
288 | void RleDecoderV1::next(int64_t* const data, |
289 | const uint64_t numValues, |
290 | const char* const notNull) { |
291 | uint64_t position = 0; |
292 | // skipNulls() |
293 | if (notNull) { |
294 | // Skip over null values. |
295 | while (position < numValues && !notNull[position]) { |
296 | ++position; |
297 | } |
298 | } |
299 | while (position < numValues) { |
300 | // If we are out of values, read more. |
301 | if (remainingValues == 0) { |
302 | readHeader(); |
303 | } |
304 | // How many do we read out of this block? |
305 | uint64_t count = std::min(numValues - position, remainingValues); |
306 | uint64_t consumed = 0; |
307 | if (repeating) { |
308 | if (notNull) { |
309 | for (uint64_t i = 0; i < count; ++i) { |
310 | if (notNull[position + i]) { |
311 | data[position + i] = value + static_cast<int64_t>(consumed) * delta; |
312 | consumed += 1; |
313 | } |
314 | } |
315 | } else { |
316 | for (uint64_t i = 0; i < count; ++i) { |
317 | data[position + i] = value + static_cast<int64_t>(i) * delta; |
318 | } |
319 | consumed = count; |
320 | } |
321 | value += static_cast<int64_t>(consumed) * delta; |
322 | } else { |
323 | if (notNull) { |
324 | for (uint64_t i = 0 ; i < count; ++i) { |
325 | if (notNull[position + i]) { |
326 | data[position + i] = isSigned |
327 | ? unZigZag(readLong()) |
328 | : static_cast<int64_t>(readLong()); |
329 | ++consumed; |
330 | } |
331 | } |
332 | } else { |
333 | if (isSigned) { |
334 | for (uint64_t i = 0; i < count; ++i) { |
335 | data[position + i] = unZigZag(readLong()); |
336 | } |
337 | } else { |
338 | for (uint64_t i = 0; i < count; ++i) { |
339 | data[position + i] = static_cast<int64_t>(readLong()); |
340 | } |
341 | } |
342 | consumed = count; |
343 | } |
344 | } |
345 | remainingValues -= consumed; |
346 | position += count; |
347 | |
348 | // skipNulls() |
349 | if (notNull) { |
350 | // Skip over null values. |
351 | while (position < numValues && !notNull[position]) { |
352 | ++position; |
353 | } |
354 | } |
355 | } |
356 | } |
357 | |
358 | } // namespace orc |
359 | |