1 | #include "ParquetBlockInputFormat.h" |
2 | #if USE_PARQUET |
3 | |
4 | #include <Formats/FormatFactory.h> |
5 | #include <IO/BufferBase.h> |
6 | #include <IO/ReadBufferFromMemory.h> |
7 | #include <IO/WriteBufferFromString.h> |
8 | #include <IO/WriteHelpers.h> |
9 | #include <IO/copyData.h> |
10 | #include <arrow/api.h> |
11 | #include <parquet/arrow/reader.h> |
12 | #include <parquet/file_reader.h> |
13 | #include "ArrowColumnToCHColumn.h" |
14 | |
15 | namespace DB |
16 | { |
17 | |
18 | ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block ) |
19 | : IInputFormat(std::move(header_), in_) |
20 | { |
21 | } |
22 | |
23 | Chunk ParquetBlockInputFormat::generate() |
24 | { |
25 | Chunk res; |
26 | auto & = getPort().getHeader(); |
27 | |
28 | if (!in.eof()) |
29 | { |
30 | /* |
31 | First we load whole stream into string (its very bad and limiting .parquet file size to half? of RAM) |
32 | Then producing blocks for every row_group (dont load big .parquet files with one row_group - it can eat x10+ RAM from .parquet file size) |
33 | */ |
34 | |
35 | if (row_group_current < row_group_total) |
36 | throw Exception{"Got new data, but data from previous chunks was not read " + |
37 | std::to_string(row_group_current) + "/" + std::to_string(row_group_total), |
38 | ErrorCodes::CANNOT_READ_ALL_DATA}; |
39 | |
40 | file_data.clear(); |
41 | { |
42 | WriteBufferFromString file_buffer(file_data); |
43 | copyData(in, file_buffer); |
44 | } |
45 | |
46 | buffer = std::make_unique<arrow::Buffer>(file_data); |
47 | // TODO: maybe use parquet::RandomAccessSource? |
48 | auto status = parquet::arrow::FileReader::Make( |
49 | ::arrow::default_memory_pool(), |
50 | parquet::ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(*buffer)), |
51 | &file_reader); |
52 | |
53 | row_group_total = file_reader->num_row_groups(); |
54 | row_group_current = 0; |
55 | } |
56 | //DUMP(row_group_current, row_group_total); |
57 | if (row_group_current >= row_group_total) |
58 | return res; |
59 | |
60 | // TODO: also catch a ParquetException thrown by filereader? |
61 | //arrow::Status read_status = filereader.ReadTable(&table); |
62 | std::shared_ptr<arrow::Table> table; |
63 | arrow::Status read_status = file_reader->ReadRowGroup(row_group_current, &table); |
64 | |
65 | ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, read_status, header, row_group_current, "Parquet" ); |
66 | return res; |
67 | } |
68 | |
69 | void ParquetBlockInputFormat::resetParser() |
70 | { |
71 | IInputFormat::resetParser(); |
72 | |
73 | file_reader.reset(); |
74 | file_data.clear(); |
75 | buffer.reset(); |
76 | row_group_total = 0; |
77 | row_group_current = 0; |
78 | } |
79 | |
80 | void registerInputFormatProcessorParquet(FormatFactory &factory) |
81 | { |
82 | factory.registerInputFormatProcessor( |
83 | "Parquet" , |
84 | [](ReadBuffer &buf, |
85 | const Block &sample, |
86 | const RowInputFormatParams &, |
87 | const FormatSettings & /* settings */) |
88 | { |
89 | return std::make_shared<ParquetBlockInputFormat>(buf, sample); |
90 | }); |
91 | } |
92 | |
93 | } |
94 | |
95 | #else |
96 | |
97 | namespace DB |
98 | { |
99 | class FormatFactory; |
100 | void registerInputFormatProcessorParquet(FormatFactory &) |
101 | { |
102 | } |
103 | } |
104 | |
105 | #endif |
106 | |