1 | #include "ORCBlockInputFormat.h" |
2 | #if USE_ORC |
3 | |
4 | #include <Formats/FormatFactory.h> |
5 | #include <IO/BufferBase.h> |
6 | #include <IO/ReadBufferFromMemory.h> |
7 | #include <IO/WriteBufferFromString.h> |
8 | #include <IO/WriteHelpers.h> |
9 | #include <IO/copyData.h> |
10 | #include <arrow/io/memory.h> |
11 | #include "ArrowColumnToCHColumn.h" |
12 | |
13 | namespace DB |
14 | { |
15 | ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer &in_, Block ) : IInputFormat(std::move(header_), in_) |
16 | { |
17 | } |
18 | |
19 | Chunk ORCBlockInputFormat::generate() |
20 | { |
21 | Chunk res; |
22 | |
23 | auto & = getPort().getHeader(); |
24 | |
25 | if (!in.eof()) |
26 | { |
27 | if (row_group_current < row_group_total) |
28 | throw Exception{"Got new data, but data from previous chunks was not read " + |
29 | std::to_string(row_group_current) + "/" + std::to_string(row_group_total), |
30 | ErrorCodes::CANNOT_READ_ALL_DATA}; |
31 | |
32 | file_data.clear(); |
33 | { |
34 | WriteBufferFromString file_buffer(file_data); |
35 | copyData(in, file_buffer); |
36 | } |
37 | |
38 | std::unique_ptr<arrow::Buffer> local_buffer = std::make_unique<arrow::Buffer>(file_data); |
39 | |
40 | |
41 | std::shared_ptr<arrow::io::RandomAccessFile> in_stream(new arrow::io::BufferReader(*local_buffer)); |
42 | |
43 | bool ok = arrow::adapters::orc::ORCFileReader::Open(in_stream, arrow::default_memory_pool(), |
44 | &file_reader).ok(); |
45 | if (!ok) |
46 | return res; |
47 | |
48 | row_group_total = file_reader->NumberOfRows(); |
49 | row_group_current = 0; |
50 | |
51 | } else |
52 | return res; |
53 | |
54 | if (row_group_current >= row_group_total) |
55 | return res; |
56 | std::shared_ptr<arrow::Table> table; |
57 | |
58 | arrow::Status read_status = file_reader->Read(&table); |
59 | |
60 | ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, read_status, header, row_group_current, "ORC" ); |
61 | |
62 | return res; |
63 | } |
64 | |
65 | void ORCBlockInputFormat::resetParser() |
66 | { |
67 | IInputFormat::resetParser(); |
68 | |
69 | file_reader.reset(); |
70 | file_data.clear(); |
71 | row_group_total = 0; |
72 | row_group_current = 0; |
73 | } |
74 | |
75 | void registerInputFormatProcessorORC(FormatFactory &factory) |
76 | { |
77 | factory.registerInputFormatProcessor( |
78 | "ORC" , |
79 | [](ReadBuffer &buf, |
80 | const Block &sample, |
81 | const RowInputFormatParams &, |
82 | const FormatSettings & /* settings */) |
83 | { |
84 | return std::make_shared<ORCBlockInputFormat>(buf, sample); |
85 | }); |
86 | } |
87 | |
88 | } |
89 | #else |
90 | |
91 | namespace DB |
92 | { |
93 | class FormatFactory; |
94 | void registerInputFormatProcessorORC(FormatFactory &) |
95 | { |
96 | } |
97 | } |
98 | |
99 | #endif |
100 | |