1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include "Adaptor.hh"
20#include "Compression.hh"
21#include "RLEv2.hh"
22
23#define MIN_REPEAT 3
24
25namespace orc {
26
27struct FixedBitSizes {
28 enum FBS {
29 ONE = 0, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE,
30 THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN,
31 TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX,
32 TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR
33 };
34};
35
36inline uint32_t decodeBitWidth(uint32_t n) {
37 if (n <= FixedBitSizes::TWENTYFOUR) {
38 return n + 1;
39 } else if (n == FixedBitSizes::TWENTYSIX) {
40 return 26;
41 } else if (n == FixedBitSizes::TWENTYEIGHT) {
42 return 28;
43 } else if (n == FixedBitSizes::THIRTY) {
44 return 30;
45 } else if (n == FixedBitSizes::THIRTYTWO) {
46 return 32;
47 } else if (n == FixedBitSizes::FORTY) {
48 return 40;
49 } else if (n == FixedBitSizes::FORTYEIGHT) {
50 return 48;
51 } else if (n == FixedBitSizes::FIFTYSIX) {
52 return 56;
53 } else {
54 return 64;
55 }
56}
57
58inline uint32_t getClosestFixedBits(uint32_t n) {
59 if (n == 0) {
60 return 1;
61 }
62
63 if (n >= 1 && n <= 24) {
64 return n;
65 } else if (n > 24 && n <= 26) {
66 return 26;
67 } else if (n > 26 && n <= 28) {
68 return 28;
69 } else if (n > 28 && n <= 30) {
70 return 30;
71 } else if (n > 30 && n <= 32) {
72 return 32;
73 } else if (n > 32 && n <= 40) {
74 return 40;
75 } else if (n > 40 && n <= 48) {
76 return 48;
77 } else if (n > 48 && n <= 56) {
78 return 56;
79 } else {
80 return 64;
81 }
82}
83
84int64_t RleDecoderV2::readLongBE(uint64_t bsz) {
85 int64_t ret = 0, val;
86 uint64_t n = bsz;
87 while (n > 0) {
88 n--;
89 val = readByte();
90 ret |= (val << (n * 8));
91 }
92 return ret;
93}
94
95inline int64_t RleDecoderV2::readVslong() {
96 return unZigZag(readVulong());
97}
98
99uint64_t RleDecoderV2::readVulong() {
100 uint64_t ret = 0, b;
101 uint64_t offset = 0;
102 do {
103 b = readByte();
104 ret |= (0x7f & b) << offset;
105 offset += 7;
106 } while (b >= 0x80);
107 return ret;
108}
109
110RleDecoderV2::RleDecoderV2(std::unique_ptr<SeekableInputStream> input,
111 bool _isSigned, MemoryPool& pool
112 ): inputStream(std::move(input)),
113 isSigned(_isSigned),
114 firstByte(0),
115 runLength(0),
116 runRead(0),
117 bufferStart(nullptr),
118 bufferEnd(bufferStart),
119 deltaBase(0),
120 byteSize(0),
121 firstValue(0),
122 prevValue(0),
123 bitSize(0),
124 bitsLeft(0),
125 curByte(0),
126 patchBitSize(0),
127 unpackedIdx(0),
128 patchIdx(0),
129 base(0),
130 curGap(0),
131 curPatch(0),
132 patchMask(0),
133 actualGap(0),
134 unpacked(pool, 0),
135 unpackedPatch(pool, 0) {
136 // PASS
137}
138
139void RleDecoderV2::seek(PositionProvider& location) {
140 // move the input stream
141 inputStream->seek(location);
142 // clear state
143 bufferEnd = bufferStart = nullptr;
144 runRead = runLength = 0;
145 // skip ahead the given number of records
146 skip(location.next());
147}
148
149void RleDecoderV2::skip(uint64_t numValues) {
150 // simple for now, until perf tests indicate something encoding specific is
151 // needed
152 const uint64_t N = 64;
153 int64_t dummy[N];
154
155 while (numValues) {
156 uint64_t nRead = std::min(N, numValues);
157 next(dummy, nRead, nullptr);
158 numValues -= nRead;
159 }
160}
161
162void RleDecoderV2::next(int64_t* const data,
163 const uint64_t numValues,
164 const char* const notNull) {
165 uint64_t nRead = 0;
166
167 while (nRead < numValues) {
168 // Skip any nulls before attempting to read first byte.
169 while (notNull && !notNull[nRead]) {
170 if (++nRead == numValues) {
171 return; // ended with null values
172 }
173 }
174
175 if (runRead == runLength) {
176 resetRun();
177 firstByte = readByte();
178 }
179
180 uint64_t offset = nRead, length = numValues - nRead;
181
182 EncodingType enc = static_cast<EncodingType>
183 ((firstByte >> 6) & 0x03);
184 switch(static_cast<int64_t>(enc)) {
185 case SHORT_REPEAT:
186 nRead += nextShortRepeats(data, offset, length, notNull);
187 break;
188 case DIRECT:
189 nRead += nextDirect(data, offset, length, notNull);
190 break;
191 case PATCHED_BASE:
192 nRead += nextPatched(data, offset, length, notNull);
193 break;
194 case DELTA:
195 nRead += nextDelta(data, offset, length, notNull);
196 break;
197 default:
198 throw ParseError("unknown encoding");
199 }
200 }
201}
202
203uint64_t RleDecoderV2::nextShortRepeats(int64_t* const data,
204 uint64_t offset,
205 uint64_t numValues,
206 const char* const notNull) {
207 if (runRead == runLength) {
208 // extract the number of fixed bytes
209 byteSize = (firstByte >> 3) & 0x07;
210 byteSize += 1;
211
212 runLength = firstByte & 0x07;
213 // run lengths values are stored only after MIN_REPEAT value is met
214 runLength += MIN_REPEAT;
215 runRead = 0;
216
217 // read the repeated value which is store using fixed bytes
218 firstValue = readLongBE(byteSize);
219
220 if (isSigned) {
221 firstValue = unZigZag(static_cast<uint64_t>(firstValue));
222 }
223 }
224
225 uint64_t nRead = std::min(runLength - runRead, numValues);
226
227 if (notNull) {
228 for(uint64_t pos = offset; pos < offset + nRead; ++pos) {
229 if (notNull[pos]) {
230 data[pos] = firstValue;
231 ++runRead;
232 }
233 }
234 } else {
235 for(uint64_t pos = offset; pos < offset + nRead; ++pos) {
236 data[pos] = firstValue;
237 ++runRead;
238 }
239 }
240
241 return nRead;
242}
243
244uint64_t RleDecoderV2::nextDirect(int64_t* const data,
245 uint64_t offset,
246 uint64_t numValues,
247 const char* const notNull) {
248 if (runRead == runLength) {
249 // extract the number of fixed bits
250 unsigned char fbo = (firstByte >> 1) & 0x1f;
251 bitSize = decodeBitWidth(fbo);
252
253 // extract the run length
254 runLength = static_cast<uint64_t>(firstByte & 0x01) << 8;
255 runLength |= readByte();
256 // runs are one off
257 runLength += 1;
258 runRead = 0;
259 }
260
261 uint64_t nRead = std::min(runLength - runRead, numValues);
262
263 runRead += readLongs(data, offset, nRead, bitSize, notNull);
264
265 if (isSigned) {
266 if (notNull) {
267 for (uint64_t pos = offset; pos < offset + nRead; ++pos) {
268 if (notNull[pos]) {
269 data[pos] = unZigZag(static_cast<uint64_t>(data[pos]));
270 }
271 }
272 } else {
273 for (uint64_t pos = offset; pos < offset + nRead; ++pos) {
274 data[pos] = unZigZag(static_cast<uint64_t>(data[pos]));
275 }
276 }
277 }
278
279 return nRead;
280}
281
282uint64_t RleDecoderV2::nextPatched(int64_t* const data,
283 uint64_t offset,
284 uint64_t numValues,
285 const char* const notNull) {
286 if (runRead == runLength) {
287 // extract the number of fixed bits
288 unsigned char fbo = (firstByte >> 1) & 0x1f;
289 bitSize = decodeBitWidth(fbo);
290
291 // extract the run length
292 runLength = static_cast<uint64_t>(firstByte & 0x01) << 8;
293 runLength |= readByte();
294 // runs are one off
295 runLength += 1;
296 runRead = 0;
297
298 // extract the number of bytes occupied by base
299 uint64_t thirdByte = readByte();
300 byteSize = (thirdByte >> 5) & 0x07;
301 // base width is one off
302 byteSize += 1;
303
304 // extract patch width
305 uint32_t pwo = thirdByte & 0x1f;
306 patchBitSize = decodeBitWidth(pwo);
307
308 // read fourth byte and extract patch gap width
309 uint64_t fourthByte = readByte();
310 uint32_t pgw = (fourthByte >> 5) & 0x07;
311 // patch gap width is one off
312 pgw += 1;
313
314 // extract the length of the patch list
315 size_t pl = fourthByte & 0x1f;
316 if (pl == 0) {
317 throw ParseError("Corrupt PATCHED_BASE encoded data (pl==0)!");
318 }
319
320 // read the next base width number of bytes to extract base value
321 base = readLongBE(byteSize);
322 int64_t mask = (static_cast<int64_t>(1) << ((byteSize * 8) - 1));
323 // if mask of base value is 1 then base is negative value else positive
324 if ((base & mask) != 0) {
325 base = base & ~mask;
326 base = -base;
327 }
328
329 // TODO: something more efficient than resize
330 unpacked.resize(runLength);
331 unpackedIdx = 0;
332 readLongs(unpacked.data(), 0, runLength, bitSize);
333 // any remaining bits are thrown out
334 resetReadLongs();
335
336 // TODO: something more efficient than resize
337 unpackedPatch.resize(pl);
338 patchIdx = 0;
339 // TODO: Skip corrupt?
340 // if ((patchBitSize + pgw) > 64 && !skipCorrupt) {
341 if ((patchBitSize + pgw) > 64) {
342 throw ParseError("Corrupt PATCHED_BASE encoded data "
343 "(patchBitSize + pgw > 64)!");
344 }
345 uint32_t cfb = getClosestFixedBits(patchBitSize + pgw);
346 readLongs(unpackedPatch.data(), 0, pl, cfb);
347 // any remaining bits are thrown out
348 resetReadLongs();
349
350 // apply the patch directly when decoding the packed data
351 patchMask = ((static_cast<int64_t>(1) << patchBitSize) - 1);
352
353 adjustGapAndPatch();
354 }
355
356 uint64_t nRead = std::min(runLength - runRead, numValues);
357
358 for(uint64_t pos = offset; pos < offset + nRead; ++pos) {
359 // skip null positions
360 if (notNull && !notNull[pos]) {
361 continue;
362 }
363 if (static_cast<int64_t>(unpackedIdx) != actualGap) {
364 // no patching required. add base to unpacked value to get final value
365 data[pos] = base + unpacked[unpackedIdx];
366 } else {
367 // extract the patch value
368 int64_t patchedVal = unpacked[unpackedIdx] | (curPatch << bitSize);
369
370 // add base to patched value
371 data[pos] = base + patchedVal;
372
373 // increment the patch to point to next entry in patch list
374 ++patchIdx;
375
376 if (patchIdx < unpackedPatch.size()) {
377 adjustGapAndPatch();
378
379 // next gap is relative to the current gap
380 actualGap += unpackedIdx;
381 }
382 }
383
384 ++runRead;
385 ++unpackedIdx;
386 }
387
388 return nRead;
389}
390
391uint64_t RleDecoderV2::nextDelta(int64_t* const data,
392 uint64_t offset,
393 uint64_t numValues,
394 const char* const notNull) {
395 if (runRead == runLength) {
396 // extract the number of fixed bits
397 unsigned char fbo = (firstByte >> 1) & 0x1f;
398 if (fbo != 0) {
399 bitSize = decodeBitWidth(fbo);
400 } else {
401 bitSize = 0;
402 }
403
404 // extract the run length
405 runLength = static_cast<uint64_t>(firstByte & 0x01) << 8;
406 runLength |= readByte();
407 ++runLength; // account for first value
408 runRead = deltaBase = 0;
409
410 // read the first value stored as vint
411 if (isSigned) {
412 firstValue = static_cast<int64_t>(readVslong());
413 } else {
414 firstValue = static_cast<int64_t>(readVulong());
415 }
416
417 prevValue = firstValue;
418
419 // read the fixed delta value stored as vint (deltas can be negative even
420 // if all number are positive)
421 deltaBase = static_cast<int64_t>(readVslong());
422 }
423
424 uint64_t nRead = std::min(runLength - runRead, numValues);
425
426 uint64_t pos = offset;
427 for ( ; pos < offset + nRead; ++pos) {
428 // skip null positions
429 if (!notNull || notNull[pos]) break;
430 }
431 if (runRead == 0 && pos < offset + nRead) {
432 data[pos++] = firstValue;
433 ++runRead;
434 }
435
436 if (bitSize == 0) {
437 // add fixed deltas to adjacent values
438 for ( ; pos < offset + nRead; ++pos) {
439 // skip null positions
440 if (notNull && !notNull[pos]) {
441 continue;
442 }
443 prevValue = data[pos] = prevValue + deltaBase;
444 ++runRead;
445 }
446 } else {
447 for ( ; pos < offset + nRead; ++pos) {
448 // skip null positions
449 if (!notNull || notNull[pos]) break;
450 }
451 if (runRead < 2 && pos < offset + nRead) {
452 // add delta base and first value
453 prevValue = data[pos++] = firstValue + deltaBase;
454 ++runRead;
455 }
456
457 // write the unpacked values, add it to previous value and store final
458 // value to result buffer. if the delta base value is negative then it
459 // is a decreasing sequence else an increasing sequence
460 uint64_t remaining = (offset + nRead) - pos;
461 runRead += readLongs(data, pos, remaining, bitSize, notNull);
462
463 if (deltaBase < 0) {
464 for ( ; pos < offset + nRead; ++pos) {
465 // skip null positions
466 if (notNull && !notNull[pos]) {
467 continue;
468 }
469 prevValue = data[pos] = prevValue - data[pos];
470 }
471 } else {
472 for ( ; pos < offset + nRead; ++pos) {
473 // skip null positions
474 if (notNull && !notNull[pos]) {
475 continue;
476 }
477 prevValue = data[pos] = prevValue + data[pos];
478 }
479 }
480 }
481 return nRead;
482}
483
484} // namespace orc
485