1 | #include <iostream> |
2 | #include <fstream> |
3 | #include <string> |
4 | #include <sstream> |
5 | #include <math.h> |
6 | |
7 | #include "snappy/snappy.h" |
8 | |
9 | #include "miniparquet.h" |
10 | |
11 | #include <protocol/TCompactProtocol.h> |
12 | #include <transport/TBufferTransports.h> |
13 | |
14 | using namespace std; |
15 | |
16 | using namespace parquet; |
17 | using namespace parquet::format; |
18 | using namespace apache::thrift; |
19 | using namespace apache::thrift::protocol; |
20 | using namespace apache::thrift::transport; |
21 | |
22 | using namespace miniparquet; |
23 | |
24 | static TCompactProtocolFactoryT<TMemoryBuffer> tproto_factory; |
25 | |
26 | template<class T> |
27 | static void thrift_unpack(const uint8_t *buf, uint32_t *len, |
28 | T *deserialized_msg) { |
29 | shared_ptr<TMemoryBuffer> tmem_transport( |
30 | new TMemoryBuffer(const_cast<uint8_t*>(buf), *len)); |
31 | shared_ptr<TProtocol> tproto = tproto_factory.getProtocol(tmem_transport); |
32 | try { |
33 | deserialized_msg->read(tproto.get()); |
34 | } catch (std::exception &e) { |
35 | std::stringstream ss; |
36 | ss << "Couldn't deserialize thrift: " << e.what() << "\n" ; |
37 | throw std::runtime_error(ss.str()); |
38 | } |
39 | uint32_t bytes_left = tmem_transport->available_read(); |
40 | *len = *len - bytes_left; |
41 | } |
42 | |
43 | ParquetFile::ParquetFile(std::string filename) { |
44 | initialize(filename); |
45 | } |
46 | |
47 | void ParquetFile::initialize(string filename) { |
48 | ByteBuffer buf; |
49 | pfile.open(filename, std::ios::binary); |
50 | |
51 | buf.resize(4); |
52 | memset(buf.ptr, '\0', 4); |
53 | // check for magic bytes at start of file |
54 | pfile.read(buf.ptr, 4); |
55 | if (strncmp(buf.ptr, "PAR1" , 4) != 0) { |
56 | throw runtime_error("File not found or missing magic bytes" ); |
57 | } |
58 | |
59 | // check for magic bytes at end of file |
60 | pfile.seekg(-4, ios_base::end); |
61 | pfile.read(buf.ptr, 4); |
62 | if (strncmp(buf.ptr, "PAR1" , 4) != 0) { |
63 | throw runtime_error("No magic bytes found at end of file" ); |
64 | } |
65 | |
66 | // read four-byte footer length from just before the end magic bytes |
67 | pfile.seekg(-8, ios_base::end); |
68 | pfile.read(buf.ptr, 4); |
69 | int32_t = *(uint32_t*) buf.ptr; |
70 | if (footer_len == 0) { |
71 | throw runtime_error("Footer length can't be 0" ); |
72 | } |
73 | |
74 | // read footer into buffer and de-thrift |
75 | buf.resize(footer_len); |
76 | pfile.seekg(-(footer_len + 8), ios_base::end); |
77 | pfile.read(buf.ptr, footer_len); |
78 | if (!pfile) { |
79 | throw runtime_error("Could not read footer" ); |
80 | } |
81 | |
82 | thrift_unpack((const uint8_t*) buf.ptr, (uint32_t*) &footer_len, |
83 | &file_meta_data); |
84 | |
85 | // file_meta_data.printTo(cerr); |
86 | // cerr << "\n"; |
87 | |
88 | if (file_meta_data.__isset.encryption_algorithm) { |
89 | throw runtime_error("Encrypted Parquet files are not supported" ); |
90 | } |
91 | |
92 | // check if we like this schema |
93 | if (file_meta_data.schema.size() < 2) { |
94 | throw runtime_error("Need at least one column in the file" ); |
95 | } |
96 | if (file_meta_data.schema[0].num_children |
97 | != (int32_t) (file_meta_data.schema.size() - 1)) { |
98 | throw runtime_error("Only flat tables are supported (no nesting)" ); |
99 | } |
100 | |
101 | // TODO assert that the first col is root |
102 | |
103 | // skip the first column its the root and otherwise useless |
104 | for (uint64_t col_idx = 1; col_idx < file_meta_data.schema.size(); |
105 | col_idx++) { |
106 | auto &s_ele = file_meta_data.schema[col_idx]; |
107 | |
108 | if (!s_ele.__isset.type || s_ele.num_children > 0) { |
109 | throw runtime_error("Only flat tables are supported (no nesting)" ); |
110 | } |
111 | // TODO if this is REQUIRED, there are no defined levels in file, support this |
112 | // if field is REPEATED, no bueno |
113 | if (s_ele.repetition_type != FieldRepetitionType::OPTIONAL) { |
114 | throw runtime_error("Only OPTIONAL fields support for now" ); |
115 | } |
116 | // TODO scale? precision? complain if set |
117 | auto col = unique_ptr<ParquetColumn>(new ParquetColumn()); |
118 | col->id = col_idx - 1; |
119 | col->name = s_ele.name; |
120 | col->schema_element = &s_ele; |
121 | col->type = s_ele.type; |
122 | columns.push_back(move(col)); |
123 | } |
124 | this->nrow = file_meta_data.num_rows; |
125 | } |
126 | |
127 | static string type_to_string(Type::type t) { |
128 | std::ostringstream ss; |
129 | ss << t; |
130 | return ss.str(); |
131 | } |
132 | |
133 | // adapted from arrow parquet reader |
134 | class RleBpDecoder { |
135 | |
136 | public: |
137 | /// Create a decoder object. buffer/buffer_len is the decoded data. |
138 | /// bit_width is the width of each value (before encoding). |
139 | RleBpDecoder(const uint8_t *buffer, uint32_t buffer_len, uint32_t bit_width) : |
140 | buffer(buffer), bit_width_(bit_width), current_value_(0), repeat_count_( |
141 | 0), literal_count_(0) { |
142 | |
143 | if (bit_width >= 64) { |
144 | throw runtime_error("Decode bit width too large" ); |
145 | } |
146 | byte_encoded_len = ((bit_width_ + 7) / 8); |
147 | max_val = (1 << bit_width_) - 1; |
148 | |
149 | } |
150 | |
151 | /// Gets a batch of values. Returns the number of decoded elements. |
152 | template<typename T> |
153 | inline int GetBatch(T *values, uint32_t batch_size) { |
154 | uint32_t values_read = 0; |
155 | |
156 | while (values_read < batch_size) { |
157 | if (repeat_count_ > 0) { |
158 | int repeat_batch = std::min(batch_size - values_read, |
159 | static_cast<uint32_t>(repeat_count_)); |
160 | std::fill(values + values_read, |
161 | values + values_read + repeat_batch, |
162 | static_cast<T>(current_value_)); |
163 | repeat_count_ -= repeat_batch; |
164 | values_read += repeat_batch; |
165 | } else if (literal_count_ > 0) { |
166 | uint32_t literal_batch = std::min(batch_size - values_read, |
167 | static_cast<uint32_t>(literal_count_)); |
168 | uint32_t actual_read = BitUnpack<T>(values + values_read, |
169 | literal_batch); |
170 | if (literal_batch != actual_read) { |
171 | throw runtime_error("Did not find enough values" ); |
172 | } |
173 | literal_count_ -= literal_batch; |
174 | values_read += literal_batch; |
175 | } else { |
176 | if (!NextCounts<T>()) |
177 | return values_read; |
178 | } |
179 | } |
180 | return values_read; |
181 | } |
182 | |
183 | template<typename T> |
184 | inline int GetBatchSpaced(uint32_t batch_size, uint32_t null_count, |
185 | const uint8_t *defined, T *out) { |
186 | // DCHECK_GE(bit_width_, 0); |
187 | uint32_t values_read = 0; |
188 | uint32_t remaining_nulls = null_count; |
189 | |
190 | uint32_t d_off = 0; // defined_offset |
191 | |
192 | while (values_read < batch_size) { |
193 | bool is_valid = defined[d_off++]; |
194 | |
195 | if (is_valid) { |
196 | if ((repeat_count_ == 0) && (literal_count_ == 0)) { |
197 | if (!NextCounts<T>()) |
198 | return values_read; |
199 | } |
200 | if (repeat_count_ > 0) { |
201 | // The current index is already valid, we don't need to check that again |
202 | uint32_t repeat_batch = 1; |
203 | repeat_count_--; |
204 | |
205 | while (repeat_count_ > 0 |
206 | && (values_read + repeat_batch) < batch_size) { |
207 | if (defined[d_off]) { |
208 | repeat_count_--; |
209 | } else { |
210 | remaining_nulls--; |
211 | } |
212 | repeat_batch++; |
213 | |
214 | d_off++; |
215 | } |
216 | std::fill(out, out + repeat_batch, |
217 | static_cast<T>(current_value_)); |
218 | out += repeat_batch; |
219 | values_read += repeat_batch; |
220 | } else if (literal_count_ > 0) { |
221 | uint32_t literal_batch = std::min( |
222 | batch_size - values_read - remaining_nulls, |
223 | static_cast<uint32_t>(literal_count_)); |
224 | |
225 | // Decode the literals |
226 | constexpr uint32_t kBufferSize = 1024; |
227 | T indices[kBufferSize]; |
228 | literal_batch = std::min(literal_batch, kBufferSize); |
229 | auto actual_read = BitUnpack<T>(indices, literal_batch); |
230 | |
231 | if (actual_read != literal_batch) { |
232 | throw runtime_error("Did not find enough values" ); |
233 | |
234 | } |
235 | |
236 | uint32_t skipped = 0; |
237 | uint32_t literals_read = 1; |
238 | *out++ = indices[0]; |
239 | |
240 | // Read the first bitset to the end |
241 | while (literals_read < literal_batch) { |
242 | if (defined[d_off]) { |
243 | *out = indices[literals_read]; |
244 | literals_read++; |
245 | } else { |
246 | skipped++; |
247 | } |
248 | ++out; |
249 | d_off++; |
250 | } |
251 | literal_count_ -= literal_batch; |
252 | values_read += literal_batch + skipped; |
253 | remaining_nulls -= skipped; |
254 | } |
255 | } else { |
256 | ++out; |
257 | values_read++; |
258 | remaining_nulls--; |
259 | } |
260 | } |
261 | |
262 | return values_read; |
263 | } |
264 | |
265 | private: |
266 | const uint8_t *buffer; |
267 | |
268 | ByteBuffer unpack_buf; |
269 | |
270 | /// Number of bits needed to encode the value. Must be between 0 and 64. |
271 | int bit_width_; |
272 | uint64_t current_value_; |
273 | uint32_t repeat_count_; |
274 | uint32_t literal_count_; |
275 | uint8_t byte_encoded_len; |
276 | uint32_t max_val; |
277 | |
278 | // this is slow but whatever, calls are rare |
279 | static uint8_t VarintDecode(const uint8_t *source, uint32_t *result_out) { |
280 | uint32_t result = 0; |
281 | uint8_t shift = 0; |
282 | uint8_t len = 0; |
283 | while (true) { |
284 | auto byte = *source++; |
285 | len++; |
286 | result |= (byte & 127) << shift; |
287 | if ((byte & 128) == 0) |
288 | break; |
289 | shift += 7; |
290 | if (shift > 32) { |
291 | throw runtime_error("Varint-decoding found too large number" ); |
292 | } |
293 | } |
294 | *result_out = result; |
295 | return len; |
296 | } |
297 | |
298 | /// Fills literal_count_ and repeat_count_ with next values. Returns false if there |
299 | /// are no more. |
300 | template<typename T> |
301 | bool NextCounts() { |
302 | // Read the next run's indicator int, it could be a literal or repeated run. |
303 | // The int is encoded as a vlq-encoded value. |
304 | uint32_t indicator_value; |
305 | |
306 | // TODO check in varint decode if we have enough buffer left |
307 | buffer += VarintDecode(buffer, &indicator_value); |
308 | |
309 | // TODO check a bunch of lengths here against the standard |
310 | |
311 | // lsb indicates if it is a literal run or repeated run |
312 | bool is_literal = indicator_value & 1; |
313 | if (is_literal) { |
314 | literal_count_ = (indicator_value >> 1) * 8; |
315 | } else { |
316 | repeat_count_ = indicator_value >> 1; |
317 | // (ARROW-4018) this is not big-endian compatible, lol |
318 | current_value_ = 0; |
319 | for (auto i = 0; i < byte_encoded_len; i++) { |
320 | current_value_ |= ((uint8_t) *buffer++) << (i * 8); |
321 | } |
322 | // sanity check |
323 | if (current_value_ > max_val) { |
324 | throw runtime_error( |
325 | "Payload value bigger than allowed. Corrupted file?" ); |
326 | } |
327 | } |
328 | // TODO complain if we run out of buffer |
329 | return true; |
330 | } |
331 | |
332 | // somewhat optimized implementation that avoids non-alignment |
333 | |
334 | static const uint32_t BITPACK_MASKS[]; |
335 | static const uint8_t BITPACK_DLEN; |
336 | |
337 | template<typename T> |
338 | uint32_t BitUnpack(T *dest, uint32_t count) { |
339 | assert(bit_width_ < 32); |
340 | |
341 | int8_t bitpack_pos = 0; |
342 | auto source = buffer; |
343 | auto mask = BITPACK_MASKS[bit_width_]; |
344 | |
345 | for (uint32_t i = 0; i < count; i++) { |
346 | T val = (*source >> bitpack_pos) & mask; |
347 | bitpack_pos += bit_width_; |
348 | while (bitpack_pos > BITPACK_DLEN) { |
349 | val |= (*++source << (BITPACK_DLEN - (bitpack_pos - bit_width_))) |
350 | & mask; |
351 | bitpack_pos -= BITPACK_DLEN; |
352 | } |
353 | dest[i] = val; |
354 | } |
355 | |
356 | buffer += bit_width_ * count / 8; |
357 | return count; |
358 | } |
359 | |
360 | }; |
361 | |
362 | const uint32_t RleBpDecoder::BITPACK_MASKS[] = { 0, 1, 3, 7, 15, 31, 63, 127, |
363 | 255, 511, 1023, 2047, 4095, 8191, 16383, 32767, 65535, 131071, 262143, |
364 | 524287, 1048575, 2097151, 4194303, 8388607, 16777215, 33554431, |
365 | 67108863, 134217727, 268435455, 536870911, 1073741823, 2147483647 }; |
366 | |
367 | const uint8_t RleBpDecoder::BITPACK_DLEN = 8; |
368 | |
369 | class ColumnScan { |
370 | public: |
371 | PageHeader page_header; |
372 | bool seen_dict = false; |
373 | const char *page_buf_ptr = nullptr; |
374 | const char *page_buf_end_ptr = nullptr; |
375 | void *dict = nullptr; |
376 | uint64_t dict_size; |
377 | |
378 | uint64_t page_buf_len = 0; |
379 | uint64_t page_start_row = 0; |
380 | |
381 | uint8_t *defined_ptr; |
382 | |
383 | // for FIXED_LEN_BYTE_ARRAY |
384 | int32_t type_len; |
385 | |
386 | template<class T> |
387 | void fill_dict() { |
388 | auto dict_size = page_header.dictionary_page_header.num_values; |
389 | dict = new Dictionary<T>(dict_size); |
390 | for (int32_t dict_index = 0; dict_index < dict_size; dict_index++) { |
391 | T val; |
392 | memcpy(&val, page_buf_ptr, sizeof(val)); |
393 | page_buf_ptr += sizeof(T); |
394 | |
395 | ((Dictionary<T>*) dict)->dict[dict_index] = val; |
396 | } |
397 | } |
398 | |
399 | void scan_dict_page(ResultColumn &result_col) { |
400 | if (page_header.__isset.data_page_header |
401 | || !page_header.__isset.dictionary_page_header) { |
402 | throw runtime_error("Dictionary page header mismatch" ); |
403 | } |
404 | |
405 | // make sure we like the encoding |
406 | switch (page_header.dictionary_page_header.encoding) { |
407 | case Encoding::PLAIN: |
408 | case Encoding::PLAIN_DICTIONARY: // deprecated |
409 | break; |
410 | |
411 | default: |
412 | throw runtime_error( |
413 | "Dictionary page has unsupported/invalid encoding" ); |
414 | } |
415 | |
416 | if (seen_dict) { |
417 | throw runtime_error("Multiple dictionary pages for column chunk" ); |
418 | } |
419 | seen_dict = true; |
420 | dict_size = page_header.dictionary_page_header.num_values; |
421 | |
422 | // initialize dictionaries per type |
423 | switch (result_col.col->type) { |
424 | case Type::BOOLEAN: |
425 | fill_dict<bool>(); |
426 | break; |
427 | case Type::INT32: |
428 | fill_dict<int32_t>(); |
429 | break; |
430 | case Type::INT64: |
431 | fill_dict<int64_t>(); |
432 | break; |
433 | case Type::INT96: |
434 | fill_dict<Int96>(); |
435 | break; |
436 | case Type::FLOAT: |
437 | fill_dict<float>(); |
438 | break; |
439 | case Type::DOUBLE: |
440 | fill_dict<double>(); |
441 | break; |
442 | case Type::BYTE_ARRAY: |
443 | // no dict here we use the result set string heap directly |
444 | { |
445 | // never going to have more string data than this uncompressed_page_size (lengths use bytes) |
446 | auto string_heap_chunk = std::unique_ptr<char[]>( |
447 | new char[page_header.uncompressed_page_size]); |
448 | result_col.string_heap_chunks.push_back(move(string_heap_chunk)); |
449 | auto str_ptr = |
450 | result_col.string_heap_chunks[result_col.string_heap_chunks.size() |
451 | - 1].get(); |
452 | dict = new Dictionary<char*>(dict_size); |
453 | |
454 | for (uint64_t dict_index = 0; dict_index < dict_size; dict_index++) { |
455 | uint32_t str_len; |
456 | memcpy(&str_len, page_buf_ptr, sizeof(str_len)); |
457 | page_buf_ptr += sizeof(str_len); |
458 | |
459 | if (page_buf_ptr + str_len > page_buf_end_ptr) { |
460 | throw runtime_error( |
461 | "Declared string length exceeds payload size" ); |
462 | } |
463 | |
464 | ((Dictionary<char*>*) dict)->dict[dict_index] = str_ptr; |
465 | // TODO make sure we dont run out of str_ptr |
466 | memcpy(str_ptr, page_buf_ptr, str_len); |
467 | str_ptr[str_len] = '\0'; // terminate |
468 | str_ptr += str_len + 1; |
469 | page_buf_ptr += str_len; |
470 | } |
471 | |
472 | break; |
473 | } |
474 | default: |
475 | throw runtime_error( |
476 | "Unsupported type for dictionary: " |
477 | + type_to_string(result_col.col->type)); |
478 | } |
479 | } |
480 | |
481 | void scan_data_page(ResultColumn &result_col) { |
482 | if (!page_header.__isset.data_page_header |
483 | || page_header.__isset.dictionary_page_header) { |
484 | throw runtime_error("Data page header mismatch" ); |
485 | } |
486 | |
487 | if (page_header.__isset.data_page_header_v2) { |
488 | throw runtime_error("Data page v2 unsupported" ); |
489 | } |
490 | |
491 | auto num_values = page_header.data_page_header.num_values; |
492 | |
493 | // we have to first decode the define levels |
494 | switch (page_header.data_page_header.definition_level_encoding) { |
495 | case Encoding::RLE: { |
496 | // read length of define payload, always |
497 | uint32_t def_length; |
498 | memcpy(&def_length, page_buf_ptr, sizeof(def_length)); |
499 | page_buf_ptr += sizeof(def_length); |
500 | |
501 | RleBpDecoder dec((const uint8_t*) page_buf_ptr, def_length, 1); |
502 | dec.GetBatch<uint8_t>(defined_ptr, num_values); |
503 | |
504 | page_buf_ptr += def_length; |
505 | } |
506 | break; |
507 | default: |
508 | throw runtime_error( |
509 | "Definition levels have unsupported/invalid encoding" ); |
510 | } |
511 | |
512 | switch (page_header.data_page_header.encoding) { |
513 | case Encoding::RLE_DICTIONARY: |
514 | case Encoding::PLAIN_DICTIONARY: // deprecated |
515 | scan_data_page_dict(result_col); |
516 | break; |
517 | |
518 | case Encoding::PLAIN: |
519 | scan_data_page_plain(result_col); |
520 | break; |
521 | |
522 | default: |
523 | throw runtime_error("Data page has unsupported/invalid encoding" ); |
524 | } |
525 | |
526 | defined_ptr += num_values; |
527 | page_start_row += num_values; |
528 | } |
529 | |
530 | template<class T> void fill_values_plain(ResultColumn &result_col) { |
531 | T *result_arr = (T*) result_col.data.ptr; |
532 | for (int32_t val_offset = 0; |
533 | val_offset < page_header.data_page_header.num_values; |
534 | val_offset++) { |
535 | |
536 | if (!defined_ptr[val_offset]) { |
537 | continue; |
538 | } |
539 | |
540 | auto row_idx = page_start_row + val_offset; |
541 | T val; |
542 | memcpy(&val, page_buf_ptr, sizeof(val)); |
543 | page_buf_ptr += sizeof(T); |
544 | result_arr[row_idx] = val; |
545 | } |
546 | } |
547 | |
548 | void scan_data_page_plain(ResultColumn &result_col) { |
549 | // TODO compute null count while getting the def levels already? |
550 | uint32_t null_count = 0; |
551 | for (int32_t i = 0; i < page_header.data_page_header.num_values; i++) { |
552 | if (!defined_ptr[i]) { |
553 | null_count++; |
554 | } |
555 | } |
556 | |
557 | switch (result_col.col->type) { |
558 | case Type::BOOLEAN: { |
559 | // some say this is bit packed. |
560 | bool *result_arr = (bool*) result_col.data.ptr; |
561 | int byte_pos = 0; |
562 | for (int32_t val_offset = 0; |
563 | val_offset < page_header.data_page_header.num_values; |
564 | val_offset++) { |
565 | |
566 | if (!defined_ptr[val_offset]) { |
567 | continue; |
568 | } |
569 | auto row_idx = page_start_row + val_offset; |
570 | result_arr[row_idx] = (*page_buf_ptr >> byte_pos) & 1; |
571 | byte_pos++; |
572 | if (byte_pos == 8) { |
573 | byte_pos = 0; |
574 | page_buf_ptr++; |
575 | } |
576 | } |
577 | |
578 | } |
579 | break; |
580 | case Type::INT32: |
581 | fill_values_plain<int32_t>(result_col); |
582 | break; |
583 | case Type::INT64: |
584 | fill_values_plain<int64_t>(result_col); |
585 | break; |
586 | case Type::INT96: |
587 | fill_values_plain<Int96>(result_col); |
588 | break; |
589 | case Type::FLOAT: |
590 | fill_values_plain<float>(result_col); |
591 | break; |
592 | case Type::DOUBLE: |
593 | fill_values_plain<double>(result_col); |
594 | break; |
595 | |
596 | case Type::FIXED_LEN_BYTE_ARRAY: |
597 | case Type::BYTE_ARRAY: { |
598 | uint32_t str_len = type_len; // in case of FIXED_LEN_BYTE_ARRAY |
599 | |
600 | uint64_t shc_len = page_header.uncompressed_page_size; |
601 | if (result_col.col->type == Type::FIXED_LEN_BYTE_ARRAY) { |
602 | shc_len += page_header.data_page_header.num_values; // make space for terminators |
603 | } |
604 | auto string_heap_chunk = std::unique_ptr<char[]>(new char[shc_len]); |
605 | result_col.string_heap_chunks.push_back(move(string_heap_chunk)); |
606 | auto str_ptr = |
607 | result_col.string_heap_chunks[result_col.string_heap_chunks.size() |
608 | - 1].get(); |
609 | |
610 | for (int32_t val_offset = 0; |
611 | val_offset < page_header.data_page_header.num_values; |
612 | val_offset++) { |
613 | |
614 | if (!defined_ptr[val_offset]) { |
615 | continue; |
616 | } |
617 | |
618 | auto row_idx = page_start_row + val_offset; |
619 | |
620 | if (result_col.col->type == Type::BYTE_ARRAY) { |
621 | memcpy(&str_len, page_buf_ptr, sizeof(str_len)); |
622 | page_buf_ptr += sizeof(str_len); |
623 | } |
624 | |
625 | if (page_buf_ptr + str_len > page_buf_end_ptr) { |
626 | throw runtime_error( |
627 | "Declared string length exceeds payload size" ); |
628 | } |
629 | |
630 | ((char**) result_col.data.ptr)[row_idx] = str_ptr; |
631 | // TODO make sure we dont run out of str_ptr too |
632 | memcpy(str_ptr, page_buf_ptr, str_len); |
633 | str_ptr[str_len] = '\0'; |
634 | str_ptr += str_len + 1; |
635 | |
636 | page_buf_ptr += str_len; |
637 | |
638 | } |
639 | } |
640 | break; |
641 | |
642 | default: |
643 | throw runtime_error( |
644 | "Unsupported type page_plain " |
645 | + type_to_string(result_col.col->type)); |
646 | } |
647 | |
648 | } |
649 | |
650 | template<class T> void fill_values_dict(ResultColumn &result_col, |
651 | uint32_t *offsets) { |
652 | auto result_arr = (T*) result_col.data.ptr; |
653 | for (int32_t val_offset = 0; |
654 | val_offset < page_header.data_page_header.num_values; |
655 | val_offset++) { |
656 | // always unpack because NULLs area also encoded (?) |
657 | auto row_idx = page_start_row + val_offset; |
658 | |
659 | if (defined_ptr[val_offset]) { |
660 | auto offset = offsets[val_offset]; |
661 | result_arr[row_idx] = ((Dictionary<T>*) dict)->get(offset); |
662 | } |
663 | } |
664 | } |
665 | |
666 | // here we look back into the dicts and emit the values we find if the value is defined, otherwise NULL |
667 | void scan_data_page_dict(ResultColumn &result_col) { |
668 | if (!seen_dict) { |
669 | throw runtime_error("Missing dictionary page" ); |
670 | } |
671 | |
672 | auto num_values = page_header.data_page_header.num_values; |
673 | |
674 | // num_values is int32, hence all dict offsets have to fit in 32 bit |
675 | auto offsets = unique_ptr<uint32_t[]>(new uint32_t[num_values]); |
676 | |
677 | // the array offset width is a single byte |
678 | auto enc_length = *((uint8_t*) page_buf_ptr); |
679 | page_buf_ptr += sizeof(uint8_t); |
680 | |
681 | if (enc_length > 0) { |
682 | RleBpDecoder dec((const uint8_t*) page_buf_ptr, page_buf_len, |
683 | enc_length); |
684 | |
685 | uint32_t null_count = 0; |
686 | for (int32_t i = 0; i < num_values; i++) { |
687 | if (!defined_ptr[i]) { |
688 | null_count++; |
689 | } |
690 | } |
691 | if (null_count > 0) { |
692 | dec.GetBatchSpaced<uint32_t>(num_values, null_count, |
693 | defined_ptr, offsets.get()); |
694 | } else { |
695 | dec.GetBatch<uint32_t>(offsets.get(), num_values); |
696 | } |
697 | |
698 | } else { |
699 | memset(offsets.get(), 0, num_values * sizeof(uint32_t)); |
700 | } |
701 | |
702 | switch (result_col.col->type) { |
703 | // TODO no bools here? I guess makes no sense to use dict... |
704 | |
705 | case Type::INT32: |
706 | fill_values_dict<int32_t>(result_col, offsets.get()); |
707 | |
708 | break; |
709 | |
710 | case Type::INT64: |
711 | fill_values_dict<int64_t>(result_col, offsets.get()); |
712 | |
713 | break; |
714 | case Type::INT96: |
715 | fill_values_dict<Int96>(result_col, offsets.get()); |
716 | |
717 | break; |
718 | |
719 | case Type::FLOAT: |
720 | fill_values_dict<float>(result_col, offsets.get()); |
721 | |
722 | break; |
723 | |
724 | case Type::DOUBLE: |
725 | fill_values_dict<double>(result_col, offsets.get()); |
726 | |
727 | break; |
728 | |
729 | case Type::BYTE_ARRAY: { |
730 | auto result_arr = (char**) result_col.data.ptr; |
731 | for (int32_t val_offset = 0; |
732 | val_offset < page_header.data_page_header.num_values; |
733 | val_offset++) { |
734 | if (defined_ptr[val_offset]) { |
735 | result_arr[page_start_row + val_offset] = |
736 | ((Dictionary<char*>*) dict)->get( |
737 | offsets[val_offset]); |
738 | } else { |
739 | result_arr[page_start_row + val_offset] = nullptr; |
740 | } |
741 | } |
742 | break; |
743 | } |
744 | default: |
745 | throw runtime_error( |
746 | "Unsupported type page_dict " |
747 | + type_to_string(result_col.col->type)); |
748 | } |
749 | } |
750 | |
751 | // ugly but well |
752 | void cleanup(ResultColumn &result_col) { |
753 | switch (result_col.col->type) { |
754 | case Type::BOOLEAN: |
755 | delete (Dictionary<bool>*) dict; |
756 | break; |
757 | case Type::INT32: |
758 | delete (Dictionary<int32_t>*) dict; |
759 | break; |
760 | case Type::INT64: |
761 | delete (Dictionary<int64_t>*) dict; |
762 | break; |
763 | case Type::INT96: |
764 | delete (Dictionary<Int96>*) dict; |
765 | break; |
766 | case Type::FLOAT: |
767 | delete (Dictionary<float>*) dict; |
768 | break; |
769 | case Type::DOUBLE: |
770 | delete (Dictionary<double>*) dict; |
771 | break; |
772 | case Type::BYTE_ARRAY: |
773 | case Type::FIXED_LEN_BYTE_ARRAY: |
774 | delete (Dictionary<char*>*) dict; |
775 | break; |
776 | default: |
777 | throw runtime_error( |
778 | "Unsupported type for dictionary: " |
779 | + type_to_string(result_col.col->type)); |
780 | } |
781 | |
782 | } |
783 | |
784 | }; |
785 | |
786 | void ParquetFile::scan_column(ScanState &state, ResultColumn &result_col) { |
787 | // we now expect a sequence of data pages in the buffer |
788 | |
789 | auto &row_group = file_meta_data.row_groups[state.row_group_idx]; |
790 | auto &chunk = row_group.columns[result_col.id]; |
791 | |
792 | // chunk.printTo(cerr); |
793 | // cerr << "\n"; |
794 | |
795 | if (chunk.__isset.file_path) { |
796 | throw runtime_error( |
797 | "Only inlined data files are supported (no references)" ); |
798 | } |
799 | |
800 | if (chunk.meta_data.path_in_schema.size() != 1) { |
801 | throw runtime_error("Only flat tables are supported (no nesting)" ); |
802 | } |
803 | |
804 | // ugh. sometimes there is an extra offset for the dict. sometimes it's wrong. |
805 | auto chunk_start = chunk.meta_data.data_page_offset; |
806 | if (chunk.meta_data.__isset.dictionary_page_offset |
807 | && chunk.meta_data.dictionary_page_offset >= 4) { |
808 | // this assumes the data pages follow the dict pages directly. |
809 | chunk_start = chunk.meta_data.dictionary_page_offset; |
810 | } |
811 | auto chunk_len = chunk.meta_data.total_compressed_size; |
812 | |
813 | // read entire chunk into RAM |
814 | pfile.seekg(chunk_start); |
815 | ByteBuffer chunk_buf; |
816 | chunk_buf.resize(chunk_len); |
817 | |
818 | pfile.read(chunk_buf.ptr, chunk_len); |
819 | if (!pfile) { |
820 | throw runtime_error("Could not read chunk. File corrupt?" ); |
821 | } |
822 | |
823 | // now we have whole chunk in buffer, proceed to read pages |
824 | ColumnScan cs; |
825 | auto bytes_to_read = chunk_len; |
826 | |
827 | // handle fixed len byte arrays, their length lives in schema |
828 | if (result_col.col->type == Type::FIXED_LEN_BYTE_ARRAY) { |
829 | cs.type_len = result_col.col->schema_element->type_length; |
830 | } |
831 | |
832 | cs.page_start_row = 0; |
833 | cs.defined_ptr = (uint8_t*) result_col.defined.ptr; |
834 | |
835 | while (bytes_to_read > 0) { |
836 | auto = bytes_to_read; // the header is clearly not that long but we have no idea |
837 | |
838 | // this is the only other place where we actually unpack a thrift object |
839 | cs.page_header = PageHeader(); |
840 | thrift_unpack((const uint8_t*) chunk_buf.ptr, |
841 | (uint32_t*) &page_header_len, &cs.page_header); |
842 | // |
843 | // cs.page_header.printTo(cerr); |
844 | // cerr << "\n"; |
845 | |
846 | // compressed_page_size does not include the header size |
847 | chunk_buf.ptr += page_header_len; |
848 | bytes_to_read -= page_header_len; |
849 | |
850 | auto payload_end_ptr = chunk_buf.ptr |
851 | + cs.page_header.compressed_page_size; |
852 | |
853 | ByteBuffer decompressed_buf; |
854 | |
855 | switch (chunk.meta_data.codec) { |
856 | case CompressionCodec::UNCOMPRESSED: |
857 | cs.page_buf_ptr = chunk_buf.ptr; |
858 | cs.page_buf_len = cs.page_header.compressed_page_size; |
859 | |
860 | break; |
861 | case CompressionCodec::SNAPPY: { |
862 | size_t decompressed_size; |
863 | snappy::GetUncompressedLength(chunk_buf.ptr, |
864 | cs.page_header.compressed_page_size, &decompressed_size); |
865 | decompressed_buf.resize(decompressed_size + 1); |
866 | |
867 | auto res = snappy::RawUncompress(chunk_buf.ptr, |
868 | cs.page_header.compressed_page_size, decompressed_buf.ptr); |
869 | if (!res) { |
870 | throw runtime_error("Decompression failure" ); |
871 | } |
872 | |
873 | cs.page_buf_ptr = (char*) decompressed_buf.ptr; |
874 | cs.page_buf_len = cs.page_header.uncompressed_page_size; |
875 | |
876 | break; |
877 | } |
878 | default: |
879 | throw runtime_error( |
880 | "Unsupported compression codec. Try uncompressed or snappy" ); |
881 | } |
882 | |
883 | cs.page_buf_end_ptr = cs.page_buf_ptr + cs.page_buf_len; |
884 | |
885 | switch (cs.page_header.type) { |
886 | case PageType::DICTIONARY_PAGE: |
887 | cs.scan_dict_page(result_col); |
888 | break; |
889 | |
890 | case PageType::DATA_PAGE: { |
891 | cs.scan_data_page(result_col); |
892 | break; |
893 | } |
894 | case PageType::DATA_PAGE_V2: |
895 | throw runtime_error("v2 data page format is not supported" ); |
896 | |
897 | default: |
898 | break; // ignore INDEX page type and any other custom extensions |
899 | } |
900 | |
901 | chunk_buf.ptr = payload_end_ptr; |
902 | bytes_to_read -= cs.page_header.compressed_page_size; |
903 | } |
904 | cs.cleanup(result_col); |
905 | } |
906 | |
907 | void ParquetFile::initialize_column(ResultColumn &col, uint64_t num_rows) { |
908 | col.defined.resize(num_rows, false); |
909 | memset(col.defined.ptr, 0, num_rows); |
910 | col.string_heap_chunks.clear(); |
911 | |
912 | // TODO do some logical type checking here, we dont like map, list, enum, json, bson etc |
913 | |
914 | switch (col.col->type) { |
915 | case Type::BOOLEAN: |
916 | col.data.resize(sizeof(bool) * num_rows, false); |
917 | break; |
918 | case Type::INT32: |
919 | col.data.resize(sizeof(int32_t) * num_rows, false); |
920 | break; |
921 | case Type::INT64: |
922 | col.data.resize(sizeof(int64_t) * num_rows, false); |
923 | break; |
924 | case Type::INT96: |
925 | col.data.resize(sizeof(Int96) * num_rows, false); |
926 | break; |
927 | case Type::FLOAT: |
928 | col.data.resize(sizeof(float) * num_rows, false); |
929 | break; |
930 | case Type::DOUBLE: |
931 | col.data.resize(sizeof(double) * num_rows, false); |
932 | break; |
933 | case Type::BYTE_ARRAY: |
934 | col.data.resize(sizeof(char*) * num_rows, false); |
935 | break; |
936 | |
937 | case Type::FIXED_LEN_BYTE_ARRAY: { |
938 | auto s_ele = columns[col.id]->schema_element; |
939 | |
940 | if (!s_ele->__isset.type_length) { |
941 | throw runtime_error("need a type length for fixed byte array" ); |
942 | } |
943 | col.data.resize(num_rows * sizeof(char*), false); |
944 | break; |
945 | } |
946 | |
947 | default: |
948 | throw runtime_error( |
949 | "Unsupported type " + type_to_string(col.col->type)); |
950 | } |
951 | } |
952 | |
953 | bool ParquetFile::scan(ScanState &s, ResultChunk &result) { |
954 | if (s.row_group_idx >= file_meta_data.row_groups.size()) { |
955 | result.nrows = 0; |
956 | return false; |
957 | } |
958 | |
959 | auto &row_group = file_meta_data.row_groups[s.row_group_idx]; |
960 | result.nrows = row_group.num_rows; |
961 | |
962 | for (auto &result_col : result.cols) { |
963 | initialize_column(result_col, row_group.num_rows); |
964 | scan_column(s, result_col); |
965 | } |
966 | |
967 | s.row_group_idx++; |
968 | return true; |
969 | } |
970 | |
971 | void ParquetFile::initialize_result(ResultChunk &result) { |
972 | result.nrows = 0; |
973 | result.cols.resize(columns.size()); |
974 | for (size_t col_idx = 0; col_idx < columns.size(); col_idx++) { |
975 | //result.cols[col_idx].type = columns[col_idx]->type; |
976 | result.cols[col_idx].col = columns[col_idx].get(); |
977 | |
978 | result.cols[col_idx].id = col_idx; |
979 | |
980 | } |
981 | } |
982 | |
983 | |