1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include "Adaptor.hh"
20#include "Compression.hh"
21#include "orc/Exceptions.hh"
22#include "RLEv1.hh"
23
24#include <algorithm>
25
26namespace orc {
27
28const int MINIMUM_REPEAT = 3;
29const int MAXIMUM_REPEAT = 127 + MINIMUM_REPEAT;
30
31const int64_t BASE_128_MASK = 0x7f;
32
33const int MAX_DELTA = 127;
34const int MIN_DELTA = -128;
35const int MAX_LITERAL_SIZE = 128;
36
37RleEncoderV1::RleEncoderV1(
38 std::unique_ptr<BufferedOutputStream> outStream,
39 bool hasSigned):
40 outputStream(std::move(outStream)) {
41 isSigned = hasSigned;
42 literals = new int64_t[MAX_LITERAL_SIZE];
43 numLiterals = 0;
44 delta = 0;
45 repeat = false;
46 tailRunLength = 0;
47 bufferPosition = 0;
48 bufferLength = 0;
49 buffer = nullptr;
50}
51
52RleEncoderV1::~RleEncoderV1() {
53 delete [] literals;
54}
55
56void RleEncoderV1::add(const int64_t* data, uint64_t numValues,
57 const char* notNull) {
58 for (uint64_t i = 0; i < numValues; ++i) {
59 if (!notNull || notNull[i]) {
60 write(data[i]);
61 }
62 }
63}
64
65void RleEncoderV1::writeByte(char c) {
66 if (bufferPosition == bufferLength) {
67 int addedSize = 0;
68 if (!outputStream->Next(reinterpret_cast<void **>(&buffer), &addedSize)) {
69 throw std::bad_alloc();
70 }
71 bufferPosition = 0;
72 bufferLength = addedSize;
73 }
74 buffer[bufferPosition++] = c;
75}
76
77void RleEncoderV1::writeValues() {
78 if (numLiterals != 0) {
79 if (repeat) {
80 writeByte(static_cast<char>
81 (static_cast<uint64_t>(numLiterals) - MINIMUM_REPEAT));
82 writeByte(static_cast<char>(delta));
83 if (isSigned) {
84 writeVslong(literals[0]);
85 } else {
86 writeVulong(literals[0]);
87 }
88 } else {
89 writeByte(static_cast<char>(-numLiterals));
90 for(int i=0; i < numLiterals; ++i) {
91 if (isSigned) {
92 writeVslong(literals[i]);
93 } else {
94 writeVulong(literals[i]);
95 }
96 }
97 }
98 repeat = false;
99 numLiterals = 0;
100 tailRunLength = 0;
101 }
102}
103
104uint64_t RleEncoderV1::flush() {
105 writeValues();
106 outputStream->BackUp(bufferLength - bufferPosition);
107 uint64_t dataSize = outputStream->flush();
108 bufferLength = bufferPosition = 0;
109 return dataSize;
110}
111
112void RleEncoderV1::write(int64_t value) {
113 if (numLiterals == 0) {
114 literals[numLiterals++] = value;
115 tailRunLength = 1;
116 } else if (repeat) {
117 if (value == literals[0] + delta * numLiterals) {
118 numLiterals += 1;
119 if (numLiterals == MAXIMUM_REPEAT) {
120 writeValues();
121 }
122 } else {
123 writeValues();
124 literals[numLiterals++] = value;
125 tailRunLength = 1;
126 }
127 } else {
128 if (tailRunLength == 1) {
129 delta = value - literals[numLiterals - 1];
130 if (delta < MIN_DELTA || delta > MAX_DELTA) {
131 tailRunLength = 1;
132 } else {
133 tailRunLength = 2;
134 }
135 } else if (value == literals[numLiterals - 1] + delta) {
136 tailRunLength += 1;
137 } else {
138 delta = value - literals[numLiterals - 1];
139 if (delta < MIN_DELTA || delta > MAX_DELTA) {
140 tailRunLength = 1;
141 } else {
142 tailRunLength = 2;
143 }
144 }
145 if (tailRunLength == MINIMUM_REPEAT) {
146 if (numLiterals + 1 == MINIMUM_REPEAT) {
147 repeat = true;
148 numLiterals += 1;
149 } else {
150 numLiterals -= static_cast<int>(MINIMUM_REPEAT - 1);
151 int64_t base = literals[numLiterals];
152 writeValues();
153 literals[0] = base;
154 repeat = true;
155 numLiterals = MINIMUM_REPEAT;
156 }
157 } else {
158 literals[numLiterals++] = value;
159 if (numLiterals == MAX_LITERAL_SIZE) {
160 writeValues();
161 }
162 }
163 }
164}
165
166void RleEncoderV1::writeVslong(int64_t val) {
167 writeVulong((val << 1) ^ (val >> 63));
168}
169
170void RleEncoderV1::writeVulong(int64_t val) {
171 while (true) {
172 if ((val & ~BASE_128_MASK) == 0) {
173 writeByte(static_cast<char>(val));
174 return;
175 } else {
176 writeByte(static_cast<char>(0x80 | (val & BASE_128_MASK)));
177 // cast val to unsigned so as to force 0-fill right shift
178 val = (static_cast<uint64_t>(val) >> 7);
179 }
180 }
181}
182
183void RleEncoderV1::recordPosition(PositionRecorder* recorder) const {
184 uint64_t flushedSize = outputStream->getSize();
185 uint64_t unflushedSize = static_cast<uint64_t>(bufferPosition);
186 if (outputStream->isCompressed()) {
187 recorder->add(flushedSize);
188 recorder->add(unflushedSize);
189 } else {
190 flushedSize -= static_cast<uint64_t>(bufferLength);
191 recorder->add(flushedSize + unflushedSize);
192 }
193 recorder->add(static_cast<uint64_t>(numLiterals));
194}
195
196signed char RleDecoderV1::readByte() {
197 if (bufferStart == bufferEnd) {
198 int bufferLength;
199 const void* bufferPointer;
200 if (!inputStream->Next(&bufferPointer, &bufferLength)) {
201 throw ParseError("bad read in readByte");
202 }
203 bufferStart = static_cast<const char*>(bufferPointer);
204 bufferEnd = bufferStart + bufferLength;
205 }
206 return *(bufferStart++);
207}
208
209uint64_t RleDecoderV1::readLong() {
210 uint64_t result = 0;
211 int64_t offset = 0;
212 signed char ch = readByte();
213 if (ch >= 0) {
214 result = static_cast<uint64_t>(ch);
215 } else {
216 result = static_cast<uint64_t>(ch) & BASE_128_MASK;
217 while ((ch = readByte()) < 0) {
218 offset += 7;
219 result |= (static_cast<uint64_t>(ch) & BASE_128_MASK) << offset;
220 }
221 result |= static_cast<uint64_t>(ch) << (offset + 7);
222 }
223 return result;
224}
225
226void RleDecoderV1::skipLongs(uint64_t numValues) {
227 while (numValues > 0) {
228 if (readByte() >= 0) {
229 --numValues;
230 }
231 }
232}
233
234void RleDecoderV1::readHeader() {
235 signed char ch = readByte();
236 if (ch < 0) {
237 remainingValues = static_cast<uint64_t>(-ch);
238 repeating = false;
239 } else {
240 remainingValues = static_cast<uint64_t>(ch) + MINIMUM_REPEAT;
241 repeating = true;
242 delta = readByte();
243 value = isSigned
244 ? unZigZag(readLong())
245 : static_cast<int64_t>(readLong());
246 }
247}
248
249RleDecoderV1::RleDecoderV1(std::unique_ptr<SeekableInputStream> input,
250 bool hasSigned)
251 : inputStream(std::move(input)),
252 isSigned(hasSigned),
253 remainingValues(0),
254 value(0),
255 bufferStart(nullptr),
256 bufferEnd(bufferStart),
257 delta(0),
258 repeating(false) {
259}
260
261void RleDecoderV1::seek(PositionProvider& location) {
262 // move the input stream
263 inputStream->seek(location);
264 // force a re-read from the stream
265 bufferEnd = bufferStart;
266 // read a new header
267 readHeader();
268 // skip ahead the given number of records
269 skip(location.next());
270}
271
272void RleDecoderV1::skip(uint64_t numValues) {
273 while (numValues > 0) {
274 if (remainingValues == 0) {
275 readHeader();
276 }
277 uint64_t count = std::min(numValues, remainingValues);
278 remainingValues -= count;
279 numValues -= count;
280 if (repeating) {
281 value += delta * static_cast<int64_t>(count);
282 } else {
283 skipLongs(count);
284 }
285 }
286}
287
288void RleDecoderV1::next(int64_t* const data,
289 const uint64_t numValues,
290 const char* const notNull) {
291 uint64_t position = 0;
292 // skipNulls()
293 if (notNull) {
294 // Skip over null values.
295 while (position < numValues && !notNull[position]) {
296 ++position;
297 }
298 }
299 while (position < numValues) {
300 // If we are out of values, read more.
301 if (remainingValues == 0) {
302 readHeader();
303 }
304 // How many do we read out of this block?
305 uint64_t count = std::min(numValues - position, remainingValues);
306 uint64_t consumed = 0;
307 if (repeating) {
308 if (notNull) {
309 for (uint64_t i = 0; i < count; ++i) {
310 if (notNull[position + i]) {
311 data[position + i] = value + static_cast<int64_t>(consumed) * delta;
312 consumed += 1;
313 }
314 }
315 } else {
316 for (uint64_t i = 0; i < count; ++i) {
317 data[position + i] = value + static_cast<int64_t>(i) * delta;
318 }
319 consumed = count;
320 }
321 value += static_cast<int64_t>(consumed) * delta;
322 } else {
323 if (notNull) {
324 for (uint64_t i = 0 ; i < count; ++i) {
325 if (notNull[position + i]) {
326 data[position + i] = isSigned
327 ? unZigZag(readLong())
328 : static_cast<int64_t>(readLong());
329 ++consumed;
330 }
331 }
332 } else {
333 if (isSigned) {
334 for (uint64_t i = 0; i < count; ++i) {
335 data[position + i] = unZigZag(readLong());
336 }
337 } else {
338 for (uint64_t i = 0; i < count; ++i) {
339 data[position + i] = static_cast<int64_t>(readLong());
340 }
341 }
342 consumed = count;
343 }
344 }
345 remainingValues -= consumed;
346 position += count;
347
348 // skipNulls()
349 if (notNull) {
350 // Skip over null values.
351 while (position < numValues && !notNull[position]) {
352 ++position;
353 }
354 }
355 }
356}
357
358} // namespace orc
359