1 | /** |
2 | * Licensed to the Apache Software Foundation (ASF) under one |
3 | * or more contributor license agreements. See the NOTICE file |
4 | * distributed with this work for additional information |
5 | * regarding copyright ownership. The ASF licenses this file |
6 | * to you under the Apache License, Version 2.0 (the |
7 | * "License"); you may not use this file except in compliance |
8 | * with the License. You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, software |
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
15 | * See the License for the specific language governing permissions and |
16 | * limitations under the License. |
17 | */ |
18 | |
19 | #include "Adaptor.hh" |
20 | #include "Compression.hh" |
21 | #include "RLEv2.hh" |
22 | |
23 | #define MIN_REPEAT 3 |
24 | |
25 | namespace orc { |
26 | |
27 | struct FixedBitSizes { |
28 | enum FBS { |
29 | ONE = 0, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE, |
30 | THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN, |
31 | TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX, |
32 | TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR |
33 | }; |
34 | }; |
35 | |
36 | inline uint32_t decodeBitWidth(uint32_t n) { |
37 | if (n <= FixedBitSizes::TWENTYFOUR) { |
38 | return n + 1; |
39 | } else if (n == FixedBitSizes::TWENTYSIX) { |
40 | return 26; |
41 | } else if (n == FixedBitSizes::TWENTYEIGHT) { |
42 | return 28; |
43 | } else if (n == FixedBitSizes::THIRTY) { |
44 | return 30; |
45 | } else if (n == FixedBitSizes::THIRTYTWO) { |
46 | return 32; |
47 | } else if (n == FixedBitSizes::FORTY) { |
48 | return 40; |
49 | } else if (n == FixedBitSizes::FORTYEIGHT) { |
50 | return 48; |
51 | } else if (n == FixedBitSizes::FIFTYSIX) { |
52 | return 56; |
53 | } else { |
54 | return 64; |
55 | } |
56 | } |
57 | |
58 | inline uint32_t getClosestFixedBits(uint32_t n) { |
59 | if (n == 0) { |
60 | return 1; |
61 | } |
62 | |
63 | if (n >= 1 && n <= 24) { |
64 | return n; |
65 | } else if (n > 24 && n <= 26) { |
66 | return 26; |
67 | } else if (n > 26 && n <= 28) { |
68 | return 28; |
69 | } else if (n > 28 && n <= 30) { |
70 | return 30; |
71 | } else if (n > 30 && n <= 32) { |
72 | return 32; |
73 | } else if (n > 32 && n <= 40) { |
74 | return 40; |
75 | } else if (n > 40 && n <= 48) { |
76 | return 48; |
77 | } else if (n > 48 && n <= 56) { |
78 | return 56; |
79 | } else { |
80 | return 64; |
81 | } |
82 | } |
83 | |
84 | int64_t RleDecoderV2::readLongBE(uint64_t bsz) { |
85 | int64_t ret = 0, val; |
86 | uint64_t n = bsz; |
87 | while (n > 0) { |
88 | n--; |
89 | val = readByte(); |
90 | ret |= (val << (n * 8)); |
91 | } |
92 | return ret; |
93 | } |
94 | |
95 | inline int64_t RleDecoderV2::readVslong() { |
96 | return unZigZag(readVulong()); |
97 | } |
98 | |
99 | uint64_t RleDecoderV2::readVulong() { |
100 | uint64_t ret = 0, b; |
101 | uint64_t offset = 0; |
102 | do { |
103 | b = readByte(); |
104 | ret |= (0x7f & b) << offset; |
105 | offset += 7; |
106 | } while (b >= 0x80); |
107 | return ret; |
108 | } |
109 | |
110 | RleDecoderV2::RleDecoderV2(std::unique_ptr<SeekableInputStream> input, |
111 | bool _isSigned, MemoryPool& pool |
112 | ): inputStream(std::move(input)), |
113 | isSigned(_isSigned), |
114 | firstByte(0), |
115 | runLength(0), |
116 | runRead(0), |
117 | bufferStart(nullptr), |
118 | bufferEnd(bufferStart), |
119 | deltaBase(0), |
120 | byteSize(0), |
121 | firstValue(0), |
122 | prevValue(0), |
123 | bitSize(0), |
124 | bitsLeft(0), |
125 | curByte(0), |
126 | patchBitSize(0), |
127 | unpackedIdx(0), |
128 | patchIdx(0), |
129 | base(0), |
130 | curGap(0), |
131 | curPatch(0), |
132 | patchMask(0), |
133 | actualGap(0), |
134 | unpacked(pool, 0), |
135 | unpackedPatch(pool, 0) { |
136 | // PASS |
137 | } |
138 | |
139 | void RleDecoderV2::seek(PositionProvider& location) { |
140 | // move the input stream |
141 | inputStream->seek(location); |
142 | // clear state |
143 | bufferEnd = bufferStart = nullptr; |
144 | runRead = runLength = 0; |
145 | // skip ahead the given number of records |
146 | skip(location.next()); |
147 | } |
148 | |
149 | void RleDecoderV2::skip(uint64_t numValues) { |
150 | // simple for now, until perf tests indicate something encoding specific is |
151 | // needed |
152 | const uint64_t N = 64; |
153 | int64_t dummy[N]; |
154 | |
155 | while (numValues) { |
156 | uint64_t nRead = std::min(N, numValues); |
157 | next(dummy, nRead, nullptr); |
158 | numValues -= nRead; |
159 | } |
160 | } |
161 | |
162 | void RleDecoderV2::next(int64_t* const data, |
163 | const uint64_t numValues, |
164 | const char* const notNull) { |
165 | uint64_t nRead = 0; |
166 | |
167 | while (nRead < numValues) { |
168 | // Skip any nulls before attempting to read first byte. |
169 | while (notNull && !notNull[nRead]) { |
170 | if (++nRead == numValues) { |
171 | return; // ended with null values |
172 | } |
173 | } |
174 | |
175 | if (runRead == runLength) { |
176 | resetRun(); |
177 | firstByte = readByte(); |
178 | } |
179 | |
180 | uint64_t offset = nRead, length = numValues - nRead; |
181 | |
182 | EncodingType enc = static_cast<EncodingType> |
183 | ((firstByte >> 6) & 0x03); |
184 | switch(static_cast<int64_t>(enc)) { |
185 | case SHORT_REPEAT: |
186 | nRead += nextShortRepeats(data, offset, length, notNull); |
187 | break; |
188 | case DIRECT: |
189 | nRead += nextDirect(data, offset, length, notNull); |
190 | break; |
191 | case PATCHED_BASE: |
192 | nRead += nextPatched(data, offset, length, notNull); |
193 | break; |
194 | case DELTA: |
195 | nRead += nextDelta(data, offset, length, notNull); |
196 | break; |
197 | default: |
198 | throw ParseError("unknown encoding" ); |
199 | } |
200 | } |
201 | } |
202 | |
203 | uint64_t RleDecoderV2::nextShortRepeats(int64_t* const data, |
204 | uint64_t offset, |
205 | uint64_t numValues, |
206 | const char* const notNull) { |
207 | if (runRead == runLength) { |
208 | // extract the number of fixed bytes |
209 | byteSize = (firstByte >> 3) & 0x07; |
210 | byteSize += 1; |
211 | |
212 | runLength = firstByte & 0x07; |
213 | // run lengths values are stored only after MIN_REPEAT value is met |
214 | runLength += MIN_REPEAT; |
215 | runRead = 0; |
216 | |
217 | // read the repeated value which is store using fixed bytes |
218 | firstValue = readLongBE(byteSize); |
219 | |
220 | if (isSigned) { |
221 | firstValue = unZigZag(static_cast<uint64_t>(firstValue)); |
222 | } |
223 | } |
224 | |
225 | uint64_t nRead = std::min(runLength - runRead, numValues); |
226 | |
227 | if (notNull) { |
228 | for(uint64_t pos = offset; pos < offset + nRead; ++pos) { |
229 | if (notNull[pos]) { |
230 | data[pos] = firstValue; |
231 | ++runRead; |
232 | } |
233 | } |
234 | } else { |
235 | for(uint64_t pos = offset; pos < offset + nRead; ++pos) { |
236 | data[pos] = firstValue; |
237 | ++runRead; |
238 | } |
239 | } |
240 | |
241 | return nRead; |
242 | } |
243 | |
244 | uint64_t RleDecoderV2::nextDirect(int64_t* const data, |
245 | uint64_t offset, |
246 | uint64_t numValues, |
247 | const char* const notNull) { |
248 | if (runRead == runLength) { |
249 | // extract the number of fixed bits |
250 | unsigned char fbo = (firstByte >> 1) & 0x1f; |
251 | bitSize = decodeBitWidth(fbo); |
252 | |
253 | // extract the run length |
254 | runLength = static_cast<uint64_t>(firstByte & 0x01) << 8; |
255 | runLength |= readByte(); |
256 | // runs are one off |
257 | runLength += 1; |
258 | runRead = 0; |
259 | } |
260 | |
261 | uint64_t nRead = std::min(runLength - runRead, numValues); |
262 | |
263 | runRead += readLongs(data, offset, nRead, bitSize, notNull); |
264 | |
265 | if (isSigned) { |
266 | if (notNull) { |
267 | for (uint64_t pos = offset; pos < offset + nRead; ++pos) { |
268 | if (notNull[pos]) { |
269 | data[pos] = unZigZag(static_cast<uint64_t>(data[pos])); |
270 | } |
271 | } |
272 | } else { |
273 | for (uint64_t pos = offset; pos < offset + nRead; ++pos) { |
274 | data[pos] = unZigZag(static_cast<uint64_t>(data[pos])); |
275 | } |
276 | } |
277 | } |
278 | |
279 | return nRead; |
280 | } |
281 | |
282 | uint64_t RleDecoderV2::nextPatched(int64_t* const data, |
283 | uint64_t offset, |
284 | uint64_t numValues, |
285 | const char* const notNull) { |
286 | if (runRead == runLength) { |
287 | // extract the number of fixed bits |
288 | unsigned char fbo = (firstByte >> 1) & 0x1f; |
289 | bitSize = decodeBitWidth(fbo); |
290 | |
291 | // extract the run length |
292 | runLength = static_cast<uint64_t>(firstByte & 0x01) << 8; |
293 | runLength |= readByte(); |
294 | // runs are one off |
295 | runLength += 1; |
296 | runRead = 0; |
297 | |
298 | // extract the number of bytes occupied by base |
299 | uint64_t thirdByte = readByte(); |
300 | byteSize = (thirdByte >> 5) & 0x07; |
301 | // base width is one off |
302 | byteSize += 1; |
303 | |
304 | // extract patch width |
305 | uint32_t pwo = thirdByte & 0x1f; |
306 | patchBitSize = decodeBitWidth(pwo); |
307 | |
308 | // read fourth byte and extract patch gap width |
309 | uint64_t fourthByte = readByte(); |
310 | uint32_t pgw = (fourthByte >> 5) & 0x07; |
311 | // patch gap width is one off |
312 | pgw += 1; |
313 | |
314 | // extract the length of the patch list |
315 | size_t pl = fourthByte & 0x1f; |
316 | if (pl == 0) { |
317 | throw ParseError("Corrupt PATCHED_BASE encoded data (pl==0)!" ); |
318 | } |
319 | |
320 | // read the next base width number of bytes to extract base value |
321 | base = readLongBE(byteSize); |
322 | int64_t mask = (static_cast<int64_t>(1) << ((byteSize * 8) - 1)); |
323 | // if mask of base value is 1 then base is negative value else positive |
324 | if ((base & mask) != 0) { |
325 | base = base & ~mask; |
326 | base = -base; |
327 | } |
328 | |
329 | // TODO: something more efficient than resize |
330 | unpacked.resize(runLength); |
331 | unpackedIdx = 0; |
332 | readLongs(unpacked.data(), 0, runLength, bitSize); |
333 | // any remaining bits are thrown out |
334 | resetReadLongs(); |
335 | |
336 | // TODO: something more efficient than resize |
337 | unpackedPatch.resize(pl); |
338 | patchIdx = 0; |
339 | // TODO: Skip corrupt? |
340 | // if ((patchBitSize + pgw) > 64 && !skipCorrupt) { |
341 | if ((patchBitSize + pgw) > 64) { |
342 | throw ParseError("Corrupt PATCHED_BASE encoded data " |
343 | "(patchBitSize + pgw > 64)!" ); |
344 | } |
345 | uint32_t cfb = getClosestFixedBits(patchBitSize + pgw); |
346 | readLongs(unpackedPatch.data(), 0, pl, cfb); |
347 | // any remaining bits are thrown out |
348 | resetReadLongs(); |
349 | |
350 | // apply the patch directly when decoding the packed data |
351 | patchMask = ((static_cast<int64_t>(1) << patchBitSize) - 1); |
352 | |
353 | adjustGapAndPatch(); |
354 | } |
355 | |
356 | uint64_t nRead = std::min(runLength - runRead, numValues); |
357 | |
358 | for(uint64_t pos = offset; pos < offset + nRead; ++pos) { |
359 | // skip null positions |
360 | if (notNull && !notNull[pos]) { |
361 | continue; |
362 | } |
363 | if (static_cast<int64_t>(unpackedIdx) != actualGap) { |
364 | // no patching required. add base to unpacked value to get final value |
365 | data[pos] = base + unpacked[unpackedIdx]; |
366 | } else { |
367 | // extract the patch value |
368 | int64_t patchedVal = unpacked[unpackedIdx] | (curPatch << bitSize); |
369 | |
370 | // add base to patched value |
371 | data[pos] = base + patchedVal; |
372 | |
373 | // increment the patch to point to next entry in patch list |
374 | ++patchIdx; |
375 | |
376 | if (patchIdx < unpackedPatch.size()) { |
377 | adjustGapAndPatch(); |
378 | |
379 | // next gap is relative to the current gap |
380 | actualGap += unpackedIdx; |
381 | } |
382 | } |
383 | |
384 | ++runRead; |
385 | ++unpackedIdx; |
386 | } |
387 | |
388 | return nRead; |
389 | } |
390 | |
391 | uint64_t RleDecoderV2::nextDelta(int64_t* const data, |
392 | uint64_t offset, |
393 | uint64_t numValues, |
394 | const char* const notNull) { |
395 | if (runRead == runLength) { |
396 | // extract the number of fixed bits |
397 | unsigned char fbo = (firstByte >> 1) & 0x1f; |
398 | if (fbo != 0) { |
399 | bitSize = decodeBitWidth(fbo); |
400 | } else { |
401 | bitSize = 0; |
402 | } |
403 | |
404 | // extract the run length |
405 | runLength = static_cast<uint64_t>(firstByte & 0x01) << 8; |
406 | runLength |= readByte(); |
407 | ++runLength; // account for first value |
408 | runRead = deltaBase = 0; |
409 | |
410 | // read the first value stored as vint |
411 | if (isSigned) { |
412 | firstValue = static_cast<int64_t>(readVslong()); |
413 | } else { |
414 | firstValue = static_cast<int64_t>(readVulong()); |
415 | } |
416 | |
417 | prevValue = firstValue; |
418 | |
419 | // read the fixed delta value stored as vint (deltas can be negative even |
420 | // if all number are positive) |
421 | deltaBase = static_cast<int64_t>(readVslong()); |
422 | } |
423 | |
424 | uint64_t nRead = std::min(runLength - runRead, numValues); |
425 | |
426 | uint64_t pos = offset; |
427 | for ( ; pos < offset + nRead; ++pos) { |
428 | // skip null positions |
429 | if (!notNull || notNull[pos]) break; |
430 | } |
431 | if (runRead == 0 && pos < offset + nRead) { |
432 | data[pos++] = firstValue; |
433 | ++runRead; |
434 | } |
435 | |
436 | if (bitSize == 0) { |
437 | // add fixed deltas to adjacent values |
438 | for ( ; pos < offset + nRead; ++pos) { |
439 | // skip null positions |
440 | if (notNull && !notNull[pos]) { |
441 | continue; |
442 | } |
443 | prevValue = data[pos] = prevValue + deltaBase; |
444 | ++runRead; |
445 | } |
446 | } else { |
447 | for ( ; pos < offset + nRead; ++pos) { |
448 | // skip null positions |
449 | if (!notNull || notNull[pos]) break; |
450 | } |
451 | if (runRead < 2 && pos < offset + nRead) { |
452 | // add delta base and first value |
453 | prevValue = data[pos++] = firstValue + deltaBase; |
454 | ++runRead; |
455 | } |
456 | |
457 | // write the unpacked values, add it to previous value and store final |
458 | // value to result buffer. if the delta base value is negative then it |
459 | // is a decreasing sequence else an increasing sequence |
460 | uint64_t remaining = (offset + nRead) - pos; |
461 | runRead += readLongs(data, pos, remaining, bitSize, notNull); |
462 | |
463 | if (deltaBase < 0) { |
464 | for ( ; pos < offset + nRead; ++pos) { |
465 | // skip null positions |
466 | if (notNull && !notNull[pos]) { |
467 | continue; |
468 | } |
469 | prevValue = data[pos] = prevValue - data[pos]; |
470 | } |
471 | } else { |
472 | for ( ; pos < offset + nRead; ++pos) { |
473 | // skip null positions |
474 | if (notNull && !notNull[pos]) { |
475 | continue; |
476 | } |
477 | prevValue = data[pos] = prevValue + data[pos]; |
478 | } |
479 | } |
480 | } |
481 | return nRead; |
482 | } |
483 | |
484 | } // namespace orc |
485 | |