1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include <algorithm>
20#include <iostream>
21#include <string.h>
22#include <utility>
23
24#include "ByteRLE.hh"
25#include "orc/Exceptions.hh"
26
27namespace orc {
28
29 const int MINIMUM_REPEAT = 3;
30 const int MAXIMUM_REPEAT = 127 + MINIMUM_REPEAT;
31 const int MAX_LITERAL_SIZE = 128;
32
33 ByteRleEncoder::~ByteRleEncoder() {
34 // PASS
35 }
36
37 class ByteRleEncoderImpl : public ByteRleEncoder {
38 public:
39 ByteRleEncoderImpl(std::unique_ptr<BufferedOutputStream> output);
40 virtual ~ByteRleEncoderImpl() override;
41
42 /**
43 * Encode the next batch of values
44 * @param data to be encoded
45 * @param numValues the number of values to be encoded
46 * @param notNull If the pointer is null, all values are read. If the
47 * pointer is not null, positions that are false are skipped.
48 */
49 virtual void add(const char* data, uint64_t numValues,
50 const char* notNull) override;
51
52 /**
53 * Get size of buffer used so far.
54 */
55 virtual uint64_t getBufferSize() const override;
56
57 /**
58 * Flushing underlying BufferedOutputStream
59 */
60 virtual uint64_t flush() override;
61
62 virtual void recordPosition(PositionRecorder* recorder) const override;
63
64 protected:
65 std::unique_ptr<BufferedOutputStream> outputStream;
66 char* literals;
67 int numLiterals;
68 bool repeat;
69 int tailRunLength;
70 int bufferPosition;
71 int bufferLength;
72 char* buffer;
73
74 void writeByte(char c);
75 void writeValues();
76 void write(char c);
77 };
78
79 ByteRleEncoderImpl::ByteRleEncoderImpl(
80 std::unique_ptr<BufferedOutputStream> output)
81 : outputStream(std::move(output)) {
82 literals = new char[MAX_LITERAL_SIZE];
83 numLiterals = 0;
84 tailRunLength = 0;
85 repeat = false;
86 bufferPosition = 0;
87 bufferLength = 0;
88 buffer = nullptr;
89 }
90
91 ByteRleEncoderImpl::~ByteRleEncoderImpl() {
92 // PASS
93 delete [] literals;
94 }
95
96 void ByteRleEncoderImpl::writeByte(char c) {
97 if (bufferPosition == bufferLength) {
98 int addedSize = 0;
99 if (!outputStream->Next(reinterpret_cast<void **>(&buffer), &addedSize)) {
100 throw std::bad_alloc();
101 }
102 bufferPosition = 0;
103 bufferLength = addedSize;
104 }
105 buffer[bufferPosition++] = c;
106 }
107
108 void ByteRleEncoderImpl::add(
109 const char* data,
110 uint64_t numValues,
111 const char* notNull) {
112 for (uint64_t i = 0; i < numValues; ++i) {
113 if (!notNull || notNull[i]) {
114 write(data[i]);
115 }
116 }
117 }
118
119 void ByteRleEncoderImpl::writeValues() {
120 if (numLiterals != 0) {
121 if (repeat) {
122 writeByte(
123 static_cast<char>(numLiterals - static_cast<int>(MINIMUM_REPEAT)));
124 writeByte(literals[0]);
125 } else {
126 writeByte(static_cast<char>(-numLiterals));
127 for (int i = 0; i < numLiterals; ++i) {
128 writeByte(literals[i]);
129 }
130 }
131 repeat = false;
132 tailRunLength = 0;
133 numLiterals = 0;
134 }
135 }
136
137 uint64_t ByteRleEncoderImpl::flush() {
138 writeValues();
139 outputStream->BackUp(bufferLength - bufferPosition);
140 uint64_t dataSize = outputStream->flush();
141 bufferLength = bufferPosition = 0;
142 return dataSize;
143 }
144
145 void ByteRleEncoderImpl::write(char value) {
146 if (numLiterals == 0) {
147 literals[numLiterals++] = value;
148 tailRunLength = 1;
149 } else if (repeat) {
150 if (value == literals[0]) {
151 numLiterals += 1;
152 if (numLiterals == MAXIMUM_REPEAT) {
153 writeValues();
154 }
155 } else {
156 writeValues();
157 literals[numLiterals++] = value;
158 tailRunLength = 1;
159 }
160 } else {
161 if (value == literals[numLiterals - 1]) {
162 tailRunLength += 1;
163 } else {
164 tailRunLength = 1;
165 }
166 if (tailRunLength == MINIMUM_REPEAT) {
167 if (numLiterals + 1 == MINIMUM_REPEAT) {
168 repeat = true;
169 numLiterals += 1;
170 } else {
171 numLiterals -= static_cast<int>(MINIMUM_REPEAT - 1);
172 writeValues();
173 literals[0] = value;
174 repeat = true;
175 numLiterals = MINIMUM_REPEAT;
176 }
177 } else {
178 literals[numLiterals++] = value;
179 if (numLiterals == MAX_LITERAL_SIZE) {
180 writeValues();
181 }
182 }
183 }
184 }
185
186 uint64_t ByteRleEncoderImpl::getBufferSize() const {
187 return outputStream->getSize();
188 }
189
190 void ByteRleEncoderImpl::recordPosition(PositionRecorder *recorder) const {
191 uint64_t flushedSize = outputStream->getSize();
192 uint64_t unflushedSize = static_cast<uint64_t>(bufferPosition);
193 if (outputStream->isCompressed()) {
194 // start of the compression chunk in the stream
195 recorder->add(flushedSize);
196 // number of decompressed bytes that need to be consumed
197 recorder->add(unflushedSize);
198 } else {
199 flushedSize -= static_cast<uint64_t>(bufferLength);
200 // byte offset of the RLE run’s start location
201 recorder->add(flushedSize + unflushedSize);
202 }
203 recorder->add(static_cast<uint64_t>(numLiterals));
204 }
205
206 std::unique_ptr<ByteRleEncoder> createByteRleEncoder
207 (std::unique_ptr<BufferedOutputStream> output) {
208 return std::unique_ptr<ByteRleEncoder>(new ByteRleEncoderImpl
209 (std::move(output)));
210 }
211
212 class BooleanRleEncoderImpl : public ByteRleEncoderImpl {
213 public:
214 BooleanRleEncoderImpl(std::unique_ptr<BufferedOutputStream> output);
215 virtual ~BooleanRleEncoderImpl() override;
216
217 /**
218 * Encode the next batch of values
219 * @param data to be encoded
220 * @param numValues the number of values to be encoded
221 * @param notNull If the pointer is null, all values are read. If the
222 * pointer is not null, positions that are false are skipped.
223 */
224 virtual void add(const char* data, uint64_t numValues,
225 const char* notNull) override;
226
227 /**
228 * Flushing underlying BufferedOutputStream
229 */
230 virtual uint64_t flush() override;
231
232 virtual void recordPosition(PositionRecorder* recorder) const override;
233
234 private:
235 int bitsRemained;
236 char current;
237
238 };
239
240 BooleanRleEncoderImpl::BooleanRleEncoderImpl(
241 std::unique_ptr<BufferedOutputStream> output)
242 : ByteRleEncoderImpl(std::move(output)) {
243 bitsRemained = 8;
244 current = static_cast<char>(0);
245 }
246
247 BooleanRleEncoderImpl::~BooleanRleEncoderImpl() {
248 // PASS
249 }
250
251 void BooleanRleEncoderImpl::add(
252 const char* data,
253 uint64_t numValues,
254 const char* notNull) {
255 for (uint64_t i = 0; i < numValues; ++i) {
256 if (bitsRemained == 0) {
257 write(current);
258 current = static_cast<char>(0);
259 bitsRemained = 8;
260 }
261 if (!notNull || notNull[i]) {
262 if (!data || data[i]) {
263 current =
264 static_cast<char>(current | (0x80 >> (8 - bitsRemained)));
265 }
266 --bitsRemained;
267 }
268 }
269 if (bitsRemained == 0) {
270 write(current);
271 current = static_cast<char>(0);
272 bitsRemained = 8;
273 }
274 }
275
276 uint64_t BooleanRleEncoderImpl::flush() {
277 if (bitsRemained != 8) {
278 write(current);
279 }
280 bitsRemained = 8;
281 current = static_cast<char>(0);
282 return ByteRleEncoderImpl::flush();
283 }
284
285 void BooleanRleEncoderImpl::recordPosition(PositionRecorder* recorder) const {
286 ByteRleEncoderImpl::recordPosition(recorder);
287 recorder->add(static_cast<uint64_t>(8 - bitsRemained));
288 }
289
290 std::unique_ptr<ByteRleEncoder> createBooleanRleEncoder
291 (std::unique_ptr<BufferedOutputStream> output) {
292 BooleanRleEncoderImpl* encoder =
293 new BooleanRleEncoderImpl(std::move(output)) ;
294 return std::unique_ptr<ByteRleEncoder>(
295 reinterpret_cast<ByteRleEncoder*>(encoder));
296 }
297
298 ByteRleDecoder::~ByteRleDecoder() {
299 // PASS
300 }
301
302 class ByteRleDecoderImpl: public ByteRleDecoder {
303 public:
304 ByteRleDecoderImpl(std::unique_ptr<SeekableInputStream> input);
305
306 virtual ~ByteRleDecoderImpl();
307
308 /**
309 * Seek to a particular spot.
310 */
311 virtual void seek(PositionProvider&);
312
313 /**
314 * Seek over a given number of values.
315 */
316 virtual void skip(uint64_t numValues);
317
318 /**
319 * Read a number of values into the batch.
320 */
321 virtual void next(char* data, uint64_t numValues, char* notNull);
322
323 protected:
324 inline void nextBuffer();
325 inline signed char readByte();
326 inline void readHeader();
327
328 std::unique_ptr<SeekableInputStream> inputStream;
329 size_t remainingValues;
330 char value;
331 const char* bufferStart;
332 const char* bufferEnd;
333 bool repeating;
334 };
335
336 void ByteRleDecoderImpl::nextBuffer() {
337 int bufferLength;
338 const void* bufferPointer;
339 bool result = inputStream->Next(&bufferPointer, &bufferLength);
340 if (!result) {
341 throw ParseError("bad read in nextBuffer");
342 }
343 bufferStart = static_cast<const char*>(bufferPointer);
344 bufferEnd = bufferStart + bufferLength;
345 }
346
347 signed char ByteRleDecoderImpl::readByte() {
348 if (bufferStart == bufferEnd) {
349 nextBuffer();
350 }
351 return *(bufferStart++);
352 }
353
354 void ByteRleDecoderImpl::readHeader() {
355 signed char ch = readByte();
356 if (ch < 0) {
357 remainingValues = static_cast<size_t>(-ch);
358 repeating = false;
359 } else {
360 remainingValues = static_cast<size_t>(ch) + MINIMUM_REPEAT;
361 repeating = true;
362 value = readByte();
363 }
364 }
365
366 ByteRleDecoderImpl::ByteRleDecoderImpl(std::unique_ptr<SeekableInputStream>
367 input) {
368 inputStream = std::move(input);
369 repeating = false;
370 remainingValues = 0;
371 value = 0;
372 bufferStart = nullptr;
373 bufferEnd = nullptr;
374 }
375
376 ByteRleDecoderImpl::~ByteRleDecoderImpl() {
377 // PASS
378 }
379
380 void ByteRleDecoderImpl::seek(PositionProvider& location) {
381 // move the input stream
382 inputStream->seek(location);
383 // force a re-read from the stream
384 bufferEnd = bufferStart;
385 // read a new header
386 readHeader();
387 // skip ahead the given number of records
388 ByteRleDecoderImpl::skip(location.next());
389 }
390
391 void ByteRleDecoderImpl::skip(uint64_t numValues) {
392 while (numValues > 0) {
393 if (remainingValues == 0) {
394 readHeader();
395 }
396 size_t count = std::min(static_cast<size_t>(numValues), remainingValues);
397 remainingValues -= count;
398 numValues -= count;
399 // for literals we need to skip over count bytes, which may involve
400 // reading from the underlying stream
401 if (!repeating) {
402 size_t consumedBytes = count;
403 while (consumedBytes > 0) {
404 if (bufferStart == bufferEnd) {
405 nextBuffer();
406 }
407 size_t skipSize = std::min(static_cast<size_t>(consumedBytes),
408 static_cast<size_t>(bufferEnd -
409 bufferStart));
410 bufferStart += skipSize;
411 consumedBytes -= skipSize;
412 }
413 }
414 }
415 }
416
417 void ByteRleDecoderImpl::next(char* data, uint64_t numValues,
418 char* notNull) {
419 uint64_t position = 0;
420 // skip over null values
421 while (notNull && position < numValues && !notNull[position]) {
422 position += 1;
423 }
424 while (position < numValues) {
425 // if we are out of values, read more
426 if (remainingValues == 0) {
427 readHeader();
428 }
429 // how many do we read out of this block?
430 size_t count = std::min(static_cast<size_t>(numValues - position),
431 remainingValues);
432 uint64_t consumed = 0;
433 if (repeating) {
434 if (notNull) {
435 for(uint64_t i=0; i < count; ++i) {
436 if (notNull[position + i]) {
437 data[position + i] = value;
438 consumed += 1;
439 }
440 }
441 } else {
442 memset(data + position, value, count);
443 consumed = count;
444 }
445 } else {
446 if (notNull) {
447 for(uint64_t i=0; i < count; ++i) {
448 if (notNull[position + i]) {
449 data[position + i] = readByte();
450 consumed += 1;
451 }
452 }
453 } else {
454 uint64_t i = 0;
455 while (i < count) {
456 if (bufferStart == bufferEnd) {
457 nextBuffer();
458 }
459 uint64_t copyBytes =
460 std::min(static_cast<uint64_t>(count - i),
461 static_cast<uint64_t>(bufferEnd - bufferStart));
462 memcpy(data + position + i, bufferStart, copyBytes);
463 bufferStart += copyBytes;
464 i += copyBytes;
465 }
466 consumed = count;
467 }
468 }
469 remainingValues -= consumed;
470 position += count;
471 // skip over any null values
472 while (notNull && position < numValues && !notNull[position]) {
473 position += 1;
474 }
475 }
476 }
477
478 std::unique_ptr<ByteRleDecoder> createByteRleDecoder
479 (std::unique_ptr<SeekableInputStream> input) {
480 return std::unique_ptr<ByteRleDecoder>(new ByteRleDecoderImpl
481 (std::move(input)));
482 }
483
484 class BooleanRleDecoderImpl: public ByteRleDecoderImpl {
485 public:
486 BooleanRleDecoderImpl(std::unique_ptr<SeekableInputStream> input);
487
488 virtual ~BooleanRleDecoderImpl();
489
490 /**
491 * Seek to a particular spot.
492 */
493 virtual void seek(PositionProvider&);
494
495 /**
496 * Seek over a given number of values.
497 */
498 virtual void skip(uint64_t numValues);
499
500 /**
501 * Read a number of values into the batch.
502 */
503 virtual void next(char* data, uint64_t numValues, char* notNull);
504
505 protected:
506 size_t remainingBits;
507 char lastByte;
508 };
509
510 BooleanRleDecoderImpl::BooleanRleDecoderImpl
511 (std::unique_ptr<SeekableInputStream> input
512 ): ByteRleDecoderImpl(std::move(input)) {
513 remainingBits = 0;
514 lastByte = 0;
515 }
516
517 BooleanRleDecoderImpl::~BooleanRleDecoderImpl() {
518 // PASS
519 }
520
521 void BooleanRleDecoderImpl::seek(PositionProvider& location) {
522 ByteRleDecoderImpl::seek(location);
523 uint64_t consumed = location.next();
524 if (consumed > 8) {
525 throw ParseError("bad position");
526 }
527 if (consumed != 0) {
528 remainingBits = 8 - consumed;
529 ByteRleDecoderImpl::next(&lastByte, 1, nullptr);
530 }
531 }
532
533 void BooleanRleDecoderImpl::skip(uint64_t numValues) {
534 if (numValues <= remainingBits) {
535 remainingBits -= numValues;
536 } else {
537 numValues -= remainingBits;
538 uint64_t bytesSkipped = numValues / 8;
539 ByteRleDecoderImpl::skip(bytesSkipped);
540 ByteRleDecoderImpl::next(&lastByte, 1, nullptr);
541 remainingBits = 8 - (numValues % 8);
542 }
543 }
544
545 void BooleanRleDecoderImpl::next(char* data, uint64_t numValues,
546 char* notNull) {
547 // next spot to fill in
548 uint64_t position = 0;
549
550 // use up any remaining bits
551 if (notNull) {
552 while(remainingBits > 0 && position < numValues) {
553 if (notNull[position]) {
554 remainingBits -= 1;
555 data[position] = (static_cast<unsigned char>(lastByte) >>
556 remainingBits) & 0x1;
557 } else {
558 data[position] = 0;
559 }
560 position += 1;
561 }
562 } else {
563 while(remainingBits > 0 && position < numValues) {
564 remainingBits -= 1;
565 data[position++] = (static_cast<unsigned char>(lastByte) >>
566 remainingBits) & 0x1;
567 }
568 }
569
570 // count the number of nonNulls remaining
571 uint64_t nonNulls = numValues - position;
572 if (notNull) {
573 for(uint64_t i=position; i < numValues; ++i) {
574 if (!notNull[i]) {
575 nonNulls -= 1;
576 }
577 }
578 }
579
580 // fill in the remaining values
581 if (nonNulls == 0) {
582 while (position < numValues) {
583 data[position++] = 0;
584 }
585 } else if (position < numValues) {
586 // read the new bytes into the array
587 uint64_t bytesRead = (nonNulls + 7) / 8;
588 ByteRleDecoderImpl::next(data + position, bytesRead, nullptr);
589 lastByte = data[position + bytesRead - 1];
590 remainingBits = bytesRead * 8 - nonNulls;
591 // expand the array backwards so that we don't clobber the data
592 uint64_t bitsLeft = bytesRead * 8 - remainingBits;
593 if (notNull) {
594 for(int64_t i=static_cast<int64_t>(numValues) - 1;
595 i >= static_cast<int64_t>(position); --i) {
596 if (notNull[i]) {
597 uint64_t shiftPosn = (-bitsLeft) % 8;
598 data[i] = (data[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x1;
599 bitsLeft -= 1;
600 } else {
601 data[i] = 0;
602 }
603 }
604 } else {
605 for(int64_t i=static_cast<int64_t>(numValues) - 1;
606 i >= static_cast<int64_t>(position); --i, --bitsLeft) {
607 uint64_t shiftPosn = (-bitsLeft) % 8;
608 data[i] = (data[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x1;
609 }
610 }
611 }
612 }
613
614 std::unique_ptr<ByteRleDecoder> createBooleanRleDecoder
615 (std::unique_ptr<SeekableInputStream> input) {
616 BooleanRleDecoderImpl* decoder =
617 new BooleanRleDecoderImpl(std::move(input));
618 return std::unique_ptr<ByteRleDecoder>(
619 reinterpret_cast<ByteRleDecoder*>(decoder));
620 }
621}
622