1#include <iostream>
2#include <fstream>
3#include <string>
4#include <sstream>
5#include <math.h>
6
7#include "snappy/snappy.h"
8
9#include "miniparquet.h"
10
11#include <protocol/TCompactProtocol.h>
12#include <transport/TBufferTransports.h>
13
14using namespace std;
15
16using namespace parquet;
17using namespace parquet::format;
18using namespace apache::thrift;
19using namespace apache::thrift::protocol;
20using namespace apache::thrift::transport;
21
22using namespace miniparquet;
23
24static TCompactProtocolFactoryT<TMemoryBuffer> tproto_factory;
25
26template<class T>
27static void thrift_unpack(const uint8_t *buf, uint32_t *len,
28 T *deserialized_msg) {
29 shared_ptr<TMemoryBuffer> tmem_transport(
30 new TMemoryBuffer(const_cast<uint8_t*>(buf), *len));
31 shared_ptr<TProtocol> tproto = tproto_factory.getProtocol(tmem_transport);
32 try {
33 deserialized_msg->read(tproto.get());
34 } catch (std::exception &e) {
35 std::stringstream ss;
36 ss << "Couldn't deserialize thrift: " << e.what() << "\n";
37 throw std::runtime_error(ss.str());
38 }
39 uint32_t bytes_left = tmem_transport->available_read();
40 *len = *len - bytes_left;
41}
42
43ParquetFile::ParquetFile(std::string filename) {
44 initialize(filename);
45}
46
47void ParquetFile::initialize(string filename) {
48 ByteBuffer buf;
49 pfile.open(filename, std::ios::binary);
50
51 buf.resize(4);
52 memset(buf.ptr, '\0', 4);
53 // check for magic bytes at start of file
54 pfile.read(buf.ptr, 4);
55 if (strncmp(buf.ptr, "PAR1", 4) != 0) {
56 throw runtime_error("File not found or missing magic bytes");
57 }
58
59 // check for magic bytes at end of file
60 pfile.seekg(-4, ios_base::end);
61 pfile.read(buf.ptr, 4);
62 if (strncmp(buf.ptr, "PAR1", 4) != 0) {
63 throw runtime_error("No magic bytes found at end of file");
64 }
65
66 // read four-byte footer length from just before the end magic bytes
67 pfile.seekg(-8, ios_base::end);
68 pfile.read(buf.ptr, 4);
69 int32_t footer_len = *(uint32_t*) buf.ptr;
70 if (footer_len == 0) {
71 throw runtime_error("Footer length can't be 0");
72 }
73
74 // read footer into buffer and de-thrift
75 buf.resize(footer_len);
76 pfile.seekg(-(footer_len + 8), ios_base::end);
77 pfile.read(buf.ptr, footer_len);
78 if (!pfile) {
79 throw runtime_error("Could not read footer");
80 }
81
82 thrift_unpack((const uint8_t*) buf.ptr, (uint32_t*) &footer_len,
83 &file_meta_data);
84
85// file_meta_data.printTo(cerr);
86// cerr << "\n";
87
88 if (file_meta_data.__isset.encryption_algorithm) {
89 throw runtime_error("Encrypted Parquet files are not supported");
90 }
91
92 // check if we like this schema
93 if (file_meta_data.schema.size() < 2) {
94 throw runtime_error("Need at least one column in the file");
95 }
96 if (file_meta_data.schema[0].num_children
97 != (int32_t) (file_meta_data.schema.size() - 1)) {
98 throw runtime_error("Only flat tables are supported (no nesting)");
99 }
100
101 // TODO assert that the first col is root
102
103 // skip the first column its the root and otherwise useless
104 for (uint64_t col_idx = 1; col_idx < file_meta_data.schema.size();
105 col_idx++) {
106 auto &s_ele = file_meta_data.schema[col_idx];
107
108 if (!s_ele.__isset.type || s_ele.num_children > 0) {
109 throw runtime_error("Only flat tables are supported (no nesting)");
110 }
111 // TODO if this is REQUIRED, there are no defined levels in file, support this
112 // if field is REPEATED, no bueno
113 if (s_ele.repetition_type != FieldRepetitionType::OPTIONAL) {
114 throw runtime_error("Only OPTIONAL fields support for now");
115 }
116 // TODO scale? precision? complain if set
117 auto col = unique_ptr<ParquetColumn>(new ParquetColumn());
118 col->id = col_idx - 1;
119 col->name = s_ele.name;
120 col->schema_element = &s_ele;
121 col->type = s_ele.type;
122 columns.push_back(move(col));
123 }
124 this->nrow = file_meta_data.num_rows;
125}
126
127static string type_to_string(Type::type t) {
128 std::ostringstream ss;
129 ss << t;
130 return ss.str();
131}
132
133// adapted from arrow parquet reader
134class RleBpDecoder {
135
136public:
137 /// Create a decoder object. buffer/buffer_len is the decoded data.
138 /// bit_width is the width of each value (before encoding).
139 RleBpDecoder(const uint8_t *buffer, uint32_t buffer_len, uint32_t bit_width) :
140 buffer(buffer), bit_width_(bit_width), current_value_(0), repeat_count_(
141 0), literal_count_(0) {
142
143 if (bit_width >= 64) {
144 throw runtime_error("Decode bit width too large");
145 }
146 byte_encoded_len = ((bit_width_ + 7) / 8);
147 max_val = (1 << bit_width_) - 1;
148
149 }
150
151 /// Gets a batch of values. Returns the number of decoded elements.
152 template<typename T>
153 inline int GetBatch(T *values, uint32_t batch_size) {
154 uint32_t values_read = 0;
155
156 while (values_read < batch_size) {
157 if (repeat_count_ > 0) {
158 int repeat_batch = std::min(batch_size - values_read,
159 static_cast<uint32_t>(repeat_count_));
160 std::fill(values + values_read,
161 values + values_read + repeat_batch,
162 static_cast<T>(current_value_));
163 repeat_count_ -= repeat_batch;
164 values_read += repeat_batch;
165 } else if (literal_count_ > 0) {
166 uint32_t literal_batch = std::min(batch_size - values_read,
167 static_cast<uint32_t>(literal_count_));
168 uint32_t actual_read = BitUnpack<T>(values + values_read,
169 literal_batch);
170 if (literal_batch != actual_read) {
171 throw runtime_error("Did not find enough values");
172 }
173 literal_count_ -= literal_batch;
174 values_read += literal_batch;
175 } else {
176 if (!NextCounts<T>())
177 return values_read;
178 }
179 }
180 return values_read;
181 }
182
183 template<typename T>
184 inline int GetBatchSpaced(uint32_t batch_size, uint32_t null_count,
185 const uint8_t *defined, T *out) {
186 // DCHECK_GE(bit_width_, 0);
187 uint32_t values_read = 0;
188 uint32_t remaining_nulls = null_count;
189
190 uint32_t d_off = 0; // defined_offset
191
192 while (values_read < batch_size) {
193 bool is_valid = defined[d_off++];
194
195 if (is_valid) {
196 if ((repeat_count_ == 0) && (literal_count_ == 0)) {
197 if (!NextCounts<T>())
198 return values_read;
199 }
200 if (repeat_count_ > 0) {
201 // The current index is already valid, we don't need to check that again
202 uint32_t repeat_batch = 1;
203 repeat_count_--;
204
205 while (repeat_count_ > 0
206 && (values_read + repeat_batch) < batch_size) {
207 if (defined[d_off]) {
208 repeat_count_--;
209 } else {
210 remaining_nulls--;
211 }
212 repeat_batch++;
213
214 d_off++;
215 }
216 std::fill(out, out + repeat_batch,
217 static_cast<T>(current_value_));
218 out += repeat_batch;
219 values_read += repeat_batch;
220 } else if (literal_count_ > 0) {
221 uint32_t literal_batch = std::min(
222 batch_size - values_read - remaining_nulls,
223 static_cast<uint32_t>(literal_count_));
224
225 // Decode the literals
226 constexpr uint32_t kBufferSize = 1024;
227 T indices[kBufferSize];
228 literal_batch = std::min(literal_batch, kBufferSize);
229 auto actual_read = BitUnpack<T>(indices, literal_batch);
230
231 if (actual_read != literal_batch) {
232 throw runtime_error("Did not find enough values");
233
234 }
235
236 uint32_t skipped = 0;
237 uint32_t literals_read = 1;
238 *out++ = indices[0];
239
240 // Read the first bitset to the end
241 while (literals_read < literal_batch) {
242 if (defined[d_off]) {
243 *out = indices[literals_read];
244 literals_read++;
245 } else {
246 skipped++;
247 }
248 ++out;
249 d_off++;
250 }
251 literal_count_ -= literal_batch;
252 values_read += literal_batch + skipped;
253 remaining_nulls -= skipped;
254 }
255 } else {
256 ++out;
257 values_read++;
258 remaining_nulls--;
259 }
260 }
261
262 return values_read;
263 }
264
265private:
266 const uint8_t *buffer;
267
268 ByteBuffer unpack_buf;
269
270 /// Number of bits needed to encode the value. Must be between 0 and 64.
271 int bit_width_;
272 uint64_t current_value_;
273 uint32_t repeat_count_;
274 uint32_t literal_count_;
275 uint8_t byte_encoded_len;
276 uint32_t max_val;
277
278 // this is slow but whatever, calls are rare
279 static uint8_t VarintDecode(const uint8_t *source, uint32_t *result_out) {
280 uint32_t result = 0;
281 uint8_t shift = 0;
282 uint8_t len = 0;
283 while (true) {
284 auto byte = *source++;
285 len++;
286 result |= (byte & 127) << shift;
287 if ((byte & 128) == 0)
288 break;
289 shift += 7;
290 if (shift > 32) {
291 throw runtime_error("Varint-decoding found too large number");
292 }
293 }
294 *result_out = result;
295 return len;
296 }
297
298 /// Fills literal_count_ and repeat_count_ with next values. Returns false if there
299 /// are no more.
300 template<typename T>
301 bool NextCounts() {
302 // Read the next run's indicator int, it could be a literal or repeated run.
303 // The int is encoded as a vlq-encoded value.
304 uint32_t indicator_value;
305
306 // TODO check in varint decode if we have enough buffer left
307 buffer += VarintDecode(buffer, &indicator_value);
308
309 // TODO check a bunch of lengths here against the standard
310
311 // lsb indicates if it is a literal run or repeated run
312 bool is_literal = indicator_value & 1;
313 if (is_literal) {
314 literal_count_ = (indicator_value >> 1) * 8;
315 } else {
316 repeat_count_ = indicator_value >> 1;
317 // (ARROW-4018) this is not big-endian compatible, lol
318 current_value_ = 0;
319 for (auto i = 0; i < byte_encoded_len; i++) {
320 current_value_ |= ((uint8_t) *buffer++) << (i * 8);
321 }
322 // sanity check
323 if (current_value_ > max_val) {
324 throw runtime_error(
325 "Payload value bigger than allowed. Corrupted file?");
326 }
327 }
328 // TODO complain if we run out of buffer
329 return true;
330 }
331
332 // somewhat optimized implementation that avoids non-alignment
333
334 static const uint32_t BITPACK_MASKS[];
335 static const uint8_t BITPACK_DLEN;
336
337 template<typename T>
338 uint32_t BitUnpack(T *dest, uint32_t count) {
339 assert(bit_width_ < 32);
340
341 int8_t bitpack_pos = 0;
342 auto source = buffer;
343 auto mask = BITPACK_MASKS[bit_width_];
344
345 for (uint32_t i = 0; i < count; i++) {
346 T val = (*source >> bitpack_pos) & mask;
347 bitpack_pos += bit_width_;
348 while (bitpack_pos > BITPACK_DLEN) {
349 val |= (*++source << (BITPACK_DLEN - (bitpack_pos - bit_width_)))
350 & mask;
351 bitpack_pos -= BITPACK_DLEN;
352 }
353 dest[i] = val;
354 }
355
356 buffer += bit_width_ * count / 8;
357 return count;
358 }
359
360};
361
362const uint32_t RleBpDecoder::BITPACK_MASKS[] = { 0, 1, 3, 7, 15, 31, 63, 127,
363 255, 511, 1023, 2047, 4095, 8191, 16383, 32767, 65535, 131071, 262143,
364 524287, 1048575, 2097151, 4194303, 8388607, 16777215, 33554431,
365 67108863, 134217727, 268435455, 536870911, 1073741823, 2147483647 };
366
367const uint8_t RleBpDecoder::BITPACK_DLEN = 8;
368
369class ColumnScan {
370public:
371 PageHeader page_header;
372 bool seen_dict = false;
373 const char *page_buf_ptr = nullptr;
374 const char *page_buf_end_ptr = nullptr;
375 void *dict = nullptr;
376 uint64_t dict_size;
377
378 uint64_t page_buf_len = 0;
379 uint64_t page_start_row = 0;
380
381 uint8_t *defined_ptr;
382
383 // for FIXED_LEN_BYTE_ARRAY
384 int32_t type_len;
385
386 template<class T>
387 void fill_dict() {
388 auto dict_size = page_header.dictionary_page_header.num_values;
389 dict = new Dictionary<T>(dict_size);
390 for (int32_t dict_index = 0; dict_index < dict_size; dict_index++) {
391 T val;
392 memcpy(&val, page_buf_ptr, sizeof(val));
393 page_buf_ptr += sizeof(T);
394
395 ((Dictionary<T>*) dict)->dict[dict_index] = val;
396 }
397 }
398
399 void scan_dict_page(ResultColumn &result_col) {
400 if (page_header.__isset.data_page_header
401 || !page_header.__isset.dictionary_page_header) {
402 throw runtime_error("Dictionary page header mismatch");
403 }
404
405 // make sure we like the encoding
406 switch (page_header.dictionary_page_header.encoding) {
407 case Encoding::PLAIN:
408 case Encoding::PLAIN_DICTIONARY: // deprecated
409 break;
410
411 default:
412 throw runtime_error(
413 "Dictionary page has unsupported/invalid encoding");
414 }
415
416 if (seen_dict) {
417 throw runtime_error("Multiple dictionary pages for column chunk");
418 }
419 seen_dict = true;
420 dict_size = page_header.dictionary_page_header.num_values;
421
422 // initialize dictionaries per type
423 switch (result_col.col->type) {
424 case Type::BOOLEAN:
425 fill_dict<bool>();
426 break;
427 case Type::INT32:
428 fill_dict<int32_t>();
429 break;
430 case Type::INT64:
431 fill_dict<int64_t>();
432 break;
433 case Type::INT96:
434 fill_dict<Int96>();
435 break;
436 case Type::FLOAT:
437 fill_dict<float>();
438 break;
439 case Type::DOUBLE:
440 fill_dict<double>();
441 break;
442 case Type::BYTE_ARRAY:
443 // no dict here we use the result set string heap directly
444 {
445 // never going to have more string data than this uncompressed_page_size (lengths use bytes)
446 auto string_heap_chunk = std::unique_ptr<char[]>(
447 new char[page_header.uncompressed_page_size]);
448 result_col.string_heap_chunks.push_back(move(string_heap_chunk));
449 auto str_ptr =
450 result_col.string_heap_chunks[result_col.string_heap_chunks.size()
451 - 1].get();
452 dict = new Dictionary<char*>(dict_size);
453
454 for (uint64_t dict_index = 0; dict_index < dict_size; dict_index++) {
455 uint32_t str_len;
456 memcpy(&str_len, page_buf_ptr, sizeof(str_len));
457 page_buf_ptr += sizeof(str_len);
458
459 if (page_buf_ptr + str_len > page_buf_end_ptr) {
460 throw runtime_error(
461 "Declared string length exceeds payload size");
462 }
463
464 ((Dictionary<char*>*) dict)->dict[dict_index] = str_ptr;
465 // TODO make sure we dont run out of str_ptr
466 memcpy(str_ptr, page_buf_ptr, str_len);
467 str_ptr[str_len] = '\0'; // terminate
468 str_ptr += str_len + 1;
469 page_buf_ptr += str_len;
470 }
471
472 break;
473 }
474 default:
475 throw runtime_error(
476 "Unsupported type for dictionary: "
477 + type_to_string(result_col.col->type));
478 }
479 }
480
481 void scan_data_page(ResultColumn &result_col) {
482 if (!page_header.__isset.data_page_header
483 || page_header.__isset.dictionary_page_header) {
484 throw runtime_error("Data page header mismatch");
485 }
486
487 if (page_header.__isset.data_page_header_v2) {
488 throw runtime_error("Data page v2 unsupported");
489 }
490
491 auto num_values = page_header.data_page_header.num_values;
492
493 // we have to first decode the define levels
494 switch (page_header.data_page_header.definition_level_encoding) {
495 case Encoding::RLE: {
496 // read length of define payload, always
497 uint32_t def_length;
498 memcpy(&def_length, page_buf_ptr, sizeof(def_length));
499 page_buf_ptr += sizeof(def_length);
500
501 RleBpDecoder dec((const uint8_t*) page_buf_ptr, def_length, 1);
502 dec.GetBatch<uint8_t>(defined_ptr, num_values);
503
504 page_buf_ptr += def_length;
505 }
506 break;
507 default:
508 throw runtime_error(
509 "Definition levels have unsupported/invalid encoding");
510 }
511
512 switch (page_header.data_page_header.encoding) {
513 case Encoding::RLE_DICTIONARY:
514 case Encoding::PLAIN_DICTIONARY: // deprecated
515 scan_data_page_dict(result_col);
516 break;
517
518 case Encoding::PLAIN:
519 scan_data_page_plain(result_col);
520 break;
521
522 default:
523 throw runtime_error("Data page has unsupported/invalid encoding");
524 }
525
526 defined_ptr += num_values;
527 page_start_row += num_values;
528 }
529
530 template<class T> void fill_values_plain(ResultColumn &result_col) {
531 T *result_arr = (T*) result_col.data.ptr;
532 for (int32_t val_offset = 0;
533 val_offset < page_header.data_page_header.num_values;
534 val_offset++) {
535
536 if (!defined_ptr[val_offset]) {
537 continue;
538 }
539
540 auto row_idx = page_start_row + val_offset;
541 T val;
542 memcpy(&val, page_buf_ptr, sizeof(val));
543 page_buf_ptr += sizeof(T);
544 result_arr[row_idx] = val;
545 }
546 }
547
548 void scan_data_page_plain(ResultColumn &result_col) {
549 // TODO compute null count while getting the def levels already?
550 uint32_t null_count = 0;
551 for (int32_t i = 0; i < page_header.data_page_header.num_values; i++) {
552 if (!defined_ptr[i]) {
553 null_count++;
554 }
555 }
556
557 switch (result_col.col->type) {
558 case Type::BOOLEAN: {
559 // some say this is bit packed.
560 bool *result_arr = (bool*) result_col.data.ptr;
561 int byte_pos = 0;
562 for (int32_t val_offset = 0;
563 val_offset < page_header.data_page_header.num_values;
564 val_offset++) {
565
566 if (!defined_ptr[val_offset]) {
567 continue;
568 }
569 auto row_idx = page_start_row + val_offset;
570 result_arr[row_idx] = (*page_buf_ptr >> byte_pos) & 1;
571 byte_pos++;
572 if (byte_pos == 8) {
573 byte_pos = 0;
574 page_buf_ptr++;
575 }
576 }
577
578 }
579 break;
580 case Type::INT32:
581 fill_values_plain<int32_t>(result_col);
582 break;
583 case Type::INT64:
584 fill_values_plain<int64_t>(result_col);
585 break;
586 case Type::INT96:
587 fill_values_plain<Int96>(result_col);
588 break;
589 case Type::FLOAT:
590 fill_values_plain<float>(result_col);
591 break;
592 case Type::DOUBLE:
593 fill_values_plain<double>(result_col);
594 break;
595
596 case Type::FIXED_LEN_BYTE_ARRAY:
597 case Type::BYTE_ARRAY: {
598 uint32_t str_len = type_len; // in case of FIXED_LEN_BYTE_ARRAY
599
600 uint64_t shc_len = page_header.uncompressed_page_size;
601 if (result_col.col->type == Type::FIXED_LEN_BYTE_ARRAY) {
602 shc_len += page_header.data_page_header.num_values; // make space for terminators
603 }
604 auto string_heap_chunk = std::unique_ptr<char[]>(new char[shc_len]);
605 result_col.string_heap_chunks.push_back(move(string_heap_chunk));
606 auto str_ptr =
607 result_col.string_heap_chunks[result_col.string_heap_chunks.size()
608 - 1].get();
609
610 for (int32_t val_offset = 0;
611 val_offset < page_header.data_page_header.num_values;
612 val_offset++) {
613
614 if (!defined_ptr[val_offset]) {
615 continue;
616 }
617
618 auto row_idx = page_start_row + val_offset;
619
620 if (result_col.col->type == Type::BYTE_ARRAY) {
621 memcpy(&str_len, page_buf_ptr, sizeof(str_len));
622 page_buf_ptr += sizeof(str_len);
623 }
624
625 if (page_buf_ptr + str_len > page_buf_end_ptr) {
626 throw runtime_error(
627 "Declared string length exceeds payload size");
628 }
629
630 ((char**) result_col.data.ptr)[row_idx] = str_ptr;
631 // TODO make sure we dont run out of str_ptr too
632 memcpy(str_ptr, page_buf_ptr, str_len);
633 str_ptr[str_len] = '\0';
634 str_ptr += str_len + 1;
635
636 page_buf_ptr += str_len;
637
638 }
639 }
640 break;
641
642 default:
643 throw runtime_error(
644 "Unsupported type page_plain "
645 + type_to_string(result_col.col->type));
646 }
647
648 }
649
650 template<class T> void fill_values_dict(ResultColumn &result_col,
651 uint32_t *offsets) {
652 auto result_arr = (T*) result_col.data.ptr;
653 for (int32_t val_offset = 0;
654 val_offset < page_header.data_page_header.num_values;
655 val_offset++) {
656 // always unpack because NULLs area also encoded (?)
657 auto row_idx = page_start_row + val_offset;
658
659 if (defined_ptr[val_offset]) {
660 auto offset = offsets[val_offset];
661 result_arr[row_idx] = ((Dictionary<T>*) dict)->get(offset);
662 }
663 }
664 }
665
666 // here we look back into the dicts and emit the values we find if the value is defined, otherwise NULL
667 void scan_data_page_dict(ResultColumn &result_col) {
668 if (!seen_dict) {
669 throw runtime_error("Missing dictionary page");
670 }
671
672 auto num_values = page_header.data_page_header.num_values;
673
674 // num_values is int32, hence all dict offsets have to fit in 32 bit
675 auto offsets = unique_ptr<uint32_t[]>(new uint32_t[num_values]);
676
677 // the array offset width is a single byte
678 auto enc_length = *((uint8_t*) page_buf_ptr);
679 page_buf_ptr += sizeof(uint8_t);
680
681 if (enc_length > 0) {
682 RleBpDecoder dec((const uint8_t*) page_buf_ptr, page_buf_len,
683 enc_length);
684
685 uint32_t null_count = 0;
686 for (int32_t i = 0; i < num_values; i++) {
687 if (!defined_ptr[i]) {
688 null_count++;
689 }
690 }
691 if (null_count > 0) {
692 dec.GetBatchSpaced<uint32_t>(num_values, null_count,
693 defined_ptr, offsets.get());
694 } else {
695 dec.GetBatch<uint32_t>(offsets.get(), num_values);
696 }
697
698 } else {
699 memset(offsets.get(), 0, num_values * sizeof(uint32_t));
700 }
701
702 switch (result_col.col->type) {
703 // TODO no bools here? I guess makes no sense to use dict...
704
705 case Type::INT32:
706 fill_values_dict<int32_t>(result_col, offsets.get());
707
708 break;
709
710 case Type::INT64:
711 fill_values_dict<int64_t>(result_col, offsets.get());
712
713 break;
714 case Type::INT96:
715 fill_values_dict<Int96>(result_col, offsets.get());
716
717 break;
718
719 case Type::FLOAT:
720 fill_values_dict<float>(result_col, offsets.get());
721
722 break;
723
724 case Type::DOUBLE:
725 fill_values_dict<double>(result_col, offsets.get());
726
727 break;
728
729 case Type::BYTE_ARRAY: {
730 auto result_arr = (char**) result_col.data.ptr;
731 for (int32_t val_offset = 0;
732 val_offset < page_header.data_page_header.num_values;
733 val_offset++) {
734 if (defined_ptr[val_offset]) {
735 result_arr[page_start_row + val_offset] =
736 ((Dictionary<char*>*) dict)->get(
737 offsets[val_offset]);
738 } else {
739 result_arr[page_start_row + val_offset] = nullptr;
740 }
741 }
742 break;
743 }
744 default:
745 throw runtime_error(
746 "Unsupported type page_dict "
747 + type_to_string(result_col.col->type));
748 }
749 }
750
751 // ugly but well
752 void cleanup(ResultColumn &result_col) {
753 switch (result_col.col->type) {
754 case Type::BOOLEAN:
755 delete (Dictionary<bool>*) dict;
756 break;
757 case Type::INT32:
758 delete (Dictionary<int32_t>*) dict;
759 break;
760 case Type::INT64:
761 delete (Dictionary<int64_t>*) dict;
762 break;
763 case Type::INT96:
764 delete (Dictionary<Int96>*) dict;
765 break;
766 case Type::FLOAT:
767 delete (Dictionary<float>*) dict;
768 break;
769 case Type::DOUBLE:
770 delete (Dictionary<double>*) dict;
771 break;
772 case Type::BYTE_ARRAY:
773 case Type::FIXED_LEN_BYTE_ARRAY:
774 delete (Dictionary<char*>*) dict;
775 break;
776 default:
777 throw runtime_error(
778 "Unsupported type for dictionary: "
779 + type_to_string(result_col.col->type));
780 }
781
782 }
783
784};
785
786void ParquetFile::scan_column(ScanState &state, ResultColumn &result_col) {
787 // we now expect a sequence of data pages in the buffer
788
789 auto &row_group = file_meta_data.row_groups[state.row_group_idx];
790 auto &chunk = row_group.columns[result_col.id];
791
792// chunk.printTo(cerr);
793// cerr << "\n";
794
795 if (chunk.__isset.file_path) {
796 throw runtime_error(
797 "Only inlined data files are supported (no references)");
798 }
799
800 if (chunk.meta_data.path_in_schema.size() != 1) {
801 throw runtime_error("Only flat tables are supported (no nesting)");
802 }
803
804 // ugh. sometimes there is an extra offset for the dict. sometimes it's wrong.
805 auto chunk_start = chunk.meta_data.data_page_offset;
806 if (chunk.meta_data.__isset.dictionary_page_offset
807 && chunk.meta_data.dictionary_page_offset >= 4) {
808 // this assumes the data pages follow the dict pages directly.
809 chunk_start = chunk.meta_data.dictionary_page_offset;
810 }
811 auto chunk_len = chunk.meta_data.total_compressed_size;
812
813 // read entire chunk into RAM
814 pfile.seekg(chunk_start);
815 ByteBuffer chunk_buf;
816 chunk_buf.resize(chunk_len);
817
818 pfile.read(chunk_buf.ptr, chunk_len);
819 if (!pfile) {
820 throw runtime_error("Could not read chunk. File corrupt?");
821 }
822
823 // now we have whole chunk in buffer, proceed to read pages
824 ColumnScan cs;
825 auto bytes_to_read = chunk_len;
826
827 // handle fixed len byte arrays, their length lives in schema
828 if (result_col.col->type == Type::FIXED_LEN_BYTE_ARRAY) {
829 cs.type_len = result_col.col->schema_element->type_length;
830 }
831
832 cs.page_start_row = 0;
833 cs.defined_ptr = (uint8_t*) result_col.defined.ptr;
834
835 while (bytes_to_read > 0) {
836 auto page_header_len = bytes_to_read; // the header is clearly not that long but we have no idea
837
838 // this is the only other place where we actually unpack a thrift object
839 cs.page_header = PageHeader();
840 thrift_unpack((const uint8_t*) chunk_buf.ptr,
841 (uint32_t*) &page_header_len, &cs.page_header);
842//
843// cs.page_header.printTo(cerr);
844// cerr << "\n";
845
846 // compressed_page_size does not include the header size
847 chunk_buf.ptr += page_header_len;
848 bytes_to_read -= page_header_len;
849
850 auto payload_end_ptr = chunk_buf.ptr
851 + cs.page_header.compressed_page_size;
852
853 ByteBuffer decompressed_buf;
854
855 switch (chunk.meta_data.codec) {
856 case CompressionCodec::UNCOMPRESSED:
857 cs.page_buf_ptr = chunk_buf.ptr;
858 cs.page_buf_len = cs.page_header.compressed_page_size;
859
860 break;
861 case CompressionCodec::SNAPPY: {
862 size_t decompressed_size;
863 snappy::GetUncompressedLength(chunk_buf.ptr,
864 cs.page_header.compressed_page_size, &decompressed_size);
865 decompressed_buf.resize(decompressed_size + 1);
866
867 auto res = snappy::RawUncompress(chunk_buf.ptr,
868 cs.page_header.compressed_page_size, decompressed_buf.ptr);
869 if (!res) {
870 throw runtime_error("Decompression failure");
871 }
872
873 cs.page_buf_ptr = (char*) decompressed_buf.ptr;
874 cs.page_buf_len = cs.page_header.uncompressed_page_size;
875
876 break;
877 }
878 default:
879 throw runtime_error(
880 "Unsupported compression codec. Try uncompressed or snappy");
881 }
882
883 cs.page_buf_end_ptr = cs.page_buf_ptr + cs.page_buf_len;
884
885 switch (cs.page_header.type) {
886 case PageType::DICTIONARY_PAGE:
887 cs.scan_dict_page(result_col);
888 break;
889
890 case PageType::DATA_PAGE: {
891 cs.scan_data_page(result_col);
892 break;
893 }
894 case PageType::DATA_PAGE_V2:
895 throw runtime_error("v2 data page format is not supported");
896
897 default:
898 break; // ignore INDEX page type and any other custom extensions
899 }
900
901 chunk_buf.ptr = payload_end_ptr;
902 bytes_to_read -= cs.page_header.compressed_page_size;
903 }
904 cs.cleanup(result_col);
905}
906
907void ParquetFile::initialize_column(ResultColumn &col, uint64_t num_rows) {
908 col.defined.resize(num_rows, false);
909 memset(col.defined.ptr, 0, num_rows);
910 col.string_heap_chunks.clear();
911
912 // TODO do some logical type checking here, we dont like map, list, enum, json, bson etc
913
914 switch (col.col->type) {
915 case Type::BOOLEAN:
916 col.data.resize(sizeof(bool) * num_rows, false);
917 break;
918 case Type::INT32:
919 col.data.resize(sizeof(int32_t) * num_rows, false);
920 break;
921 case Type::INT64:
922 col.data.resize(sizeof(int64_t) * num_rows, false);
923 break;
924 case Type::INT96:
925 col.data.resize(sizeof(Int96) * num_rows, false);
926 break;
927 case Type::FLOAT:
928 col.data.resize(sizeof(float) * num_rows, false);
929 break;
930 case Type::DOUBLE:
931 col.data.resize(sizeof(double) * num_rows, false);
932 break;
933 case Type::BYTE_ARRAY:
934 col.data.resize(sizeof(char*) * num_rows, false);
935 break;
936
937 case Type::FIXED_LEN_BYTE_ARRAY: {
938 auto s_ele = columns[col.id]->schema_element;
939
940 if (!s_ele->__isset.type_length) {
941 throw runtime_error("need a type length for fixed byte array");
942 }
943 col.data.resize(num_rows * sizeof(char*), false);
944 break;
945 }
946
947 default:
948 throw runtime_error(
949 "Unsupported type " + type_to_string(col.col->type));
950 }
951}
952
953bool ParquetFile::scan(ScanState &s, ResultChunk &result) {
954 if (s.row_group_idx >= file_meta_data.row_groups.size()) {
955 result.nrows = 0;
956 return false;
957 }
958
959 auto &row_group = file_meta_data.row_groups[s.row_group_idx];
960 result.nrows = row_group.num_rows;
961
962 for (auto &result_col : result.cols) {
963 initialize_column(result_col, row_group.num_rows);
964 scan_column(s, result_col);
965 }
966
967 s.row_group_idx++;
968 return true;
969}
970
971void ParquetFile::initialize_result(ResultChunk &result) {
972 result.nrows = 0;
973 result.cols.resize(columns.size());
974 for (size_t col_idx = 0; col_idx < columns.size(); col_idx++) {
975 //result.cols[col_idx].type = columns[col_idx]->type;
976 result.cols[col_idx].col = columns[col_idx].get();
977
978 result.cols[col_idx].id = col_idx;
979
980 }
981}
982
983