1#include "ParquetBlockInputFormat.h"
2#if USE_PARQUET
3
4#include <Formats/FormatFactory.h>
5#include <IO/BufferBase.h>
6#include <IO/ReadBufferFromMemory.h>
7#include <IO/WriteBufferFromString.h>
8#include <IO/WriteHelpers.h>
9#include <IO/copyData.h>
10#include <arrow/api.h>
11#include <parquet/arrow/reader.h>
12#include <parquet/file_reader.h>
13#include "ArrowColumnToCHColumn.h"
14
15namespace DB
16{
17
18 ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_)
19 : IInputFormat(std::move(header_), in_)
20 {
21 }
22
23 Chunk ParquetBlockInputFormat::generate()
24 {
25 Chunk res;
26 auto &header = getPort().getHeader();
27
28 if (!in.eof())
29 {
30 /*
31 First we load whole stream into string (its very bad and limiting .parquet file size to half? of RAM)
32 Then producing blocks for every row_group (dont load big .parquet files with one row_group - it can eat x10+ RAM from .parquet file size)
33 */
34
35 if (row_group_current < row_group_total)
36 throw Exception{"Got new data, but data from previous chunks was not read " +
37 std::to_string(row_group_current) + "/" + std::to_string(row_group_total),
38 ErrorCodes::CANNOT_READ_ALL_DATA};
39
40 file_data.clear();
41 {
42 WriteBufferFromString file_buffer(file_data);
43 copyData(in, file_buffer);
44 }
45
46 buffer = std::make_unique<arrow::Buffer>(file_data);
47 // TODO: maybe use parquet::RandomAccessSource?
48 auto status = parquet::arrow::FileReader::Make(
49 ::arrow::default_memory_pool(),
50 parquet::ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(*buffer)),
51 &file_reader);
52
53 row_group_total = file_reader->num_row_groups();
54 row_group_current = 0;
55 }
56 //DUMP(row_group_current, row_group_total);
57 if (row_group_current >= row_group_total)
58 return res;
59
60 // TODO: also catch a ParquetException thrown by filereader?
61 //arrow::Status read_status = filereader.ReadTable(&table);
62 std::shared_ptr<arrow::Table> table;
63 arrow::Status read_status = file_reader->ReadRowGroup(row_group_current, &table);
64
65 ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, read_status, header, row_group_current, "Parquet");
66 return res;
67 }
68
69 void ParquetBlockInputFormat::resetParser()
70 {
71 IInputFormat::resetParser();
72
73 file_reader.reset();
74 file_data.clear();
75 buffer.reset();
76 row_group_total = 0;
77 row_group_current = 0;
78 }
79
80 void registerInputFormatProcessorParquet(FormatFactory &factory)
81 {
82 factory.registerInputFormatProcessor(
83 "Parquet",
84 [](ReadBuffer &buf,
85 const Block &sample,
86 const RowInputFormatParams &,
87 const FormatSettings & /* settings */)
88 {
89 return std::make_shared<ParquetBlockInputFormat>(buf, sample);
90 });
91 }
92
93}
94
95#else
96
97namespace DB
98{
99class FormatFactory;
100void registerInputFormatProcessorParquet(FormatFactory &)
101{
102}
103}
104
105#endif
106