1#include "ORCBlockInputFormat.h"
2#if USE_ORC
3
4#include <Formats/FormatFactory.h>
5#include <IO/BufferBase.h>
6#include <IO/ReadBufferFromMemory.h>
7#include <IO/WriteBufferFromString.h>
8#include <IO/WriteHelpers.h>
9#include <IO/copyData.h>
10#include <arrow/io/memory.h>
11#include "ArrowColumnToCHColumn.h"
12
13namespace DB
14{
15 ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer &in_, Block header_) : IInputFormat(std::move(header_), in_)
16 {
17 }
18
19 Chunk ORCBlockInputFormat::generate()
20 {
21 Chunk res;
22
23 auto &header = getPort().getHeader();
24
25 if (!in.eof())
26 {
27 if (row_group_current < row_group_total)
28 throw Exception{"Got new data, but data from previous chunks was not read " +
29 std::to_string(row_group_current) + "/" + std::to_string(row_group_total),
30 ErrorCodes::CANNOT_READ_ALL_DATA};
31
32 file_data.clear();
33 {
34 WriteBufferFromString file_buffer(file_data);
35 copyData(in, file_buffer);
36 }
37
38 std::unique_ptr<arrow::Buffer> local_buffer = std::make_unique<arrow::Buffer>(file_data);
39
40
41 std::shared_ptr<arrow::io::RandomAccessFile> in_stream(new arrow::io::BufferReader(*local_buffer));
42
43 bool ok = arrow::adapters::orc::ORCFileReader::Open(in_stream, arrow::default_memory_pool(),
44 &file_reader).ok();
45 if (!ok)
46 return res;
47
48 row_group_total = file_reader->NumberOfRows();
49 row_group_current = 0;
50
51 } else
52 return res;
53
54 if (row_group_current >= row_group_total)
55 return res;
56 std::shared_ptr<arrow::Table> table;
57
58 arrow::Status read_status = file_reader->Read(&table);
59
60 ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, read_status, header, row_group_current, "ORC");
61
62 return res;
63 }
64
65 void ORCBlockInputFormat::resetParser()
66 {
67 IInputFormat::resetParser();
68
69 file_reader.reset();
70 file_data.clear();
71 row_group_total = 0;
72 row_group_current = 0;
73 }
74
75 void registerInputFormatProcessorORC(FormatFactory &factory)
76 {
77 factory.registerInputFormatProcessor(
78 "ORC",
79 [](ReadBuffer &buf,
80 const Block &sample,
81 const RowInputFormatParams &,
82 const FormatSettings & /* settings */)
83 {
84 return std::make_shared<ORCBlockInputFormat>(buf, sample);
85 });
86 }
87
88}
89#else
90
91namespace DB
92{
93 class FormatFactory;
94 void registerInputFormatProcessorORC(FormatFactory &)
95 {
96 }
97}
98
99#endif
100