1#include <IO/ReadBuffer.h>
2#include <IO/ReadHelpers.h>
3#include <Processors/Formats/Impl/BinaryRowInputFormat.h>
4#include <Formats/FormatFactory.h>
5
6
7namespace DB
8{
9
10BinaryRowInputFormat::BinaryRowInputFormat(ReadBuffer & in_, Block header, Params params_, bool with_names_, bool with_types_)
11 : IRowInputFormat(std::move(header), in_, params_), with_names(with_names_), with_types(with_types_)
12{
13}
14
15
16bool BinaryRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
17{
18 if (in.eof())
19 return false;
20
21 size_t num_columns = columns.size();
22 for (size_t i = 0; i < num_columns; ++i)
23 getPort().getHeader().getByPosition(i).type->deserializeBinary(*columns[i], in);
24
25 return true;
26}
27
28
29void BinaryRowInputFormat::readPrefix()
30{
31 /// NOTE The header is completely ignored. This can be easily improved.
32
33 UInt64 columns = 0;
34 String tmp;
35
36 if (with_names || with_types)
37 {
38 readVarUInt(columns, in);
39 }
40
41 if (with_names)
42 {
43 for (size_t i = 0; i < columns; ++i)
44 {
45 readStringBinary(tmp, in);
46 }
47 }
48
49 if (with_types)
50 {
51 for (size_t i = 0; i < columns; ++i)
52 {
53 readStringBinary(tmp, in);
54 }
55 }
56}
57
58
59void registerInputFormatProcessorRowBinary(FormatFactory & factory)
60{
61 factory.registerInputFormatProcessor("RowBinary", [](
62 ReadBuffer & buf,
63 const Block & sample,
64 const IRowInputFormat::Params & params,
65 const FormatSettings &)
66 {
67 return std::make_shared<BinaryRowInputFormat>(buf, sample, params, false, false);
68 });
69
70 factory.registerInputFormatProcessor("RowBinaryWithNamesAndTypes", [](
71 ReadBuffer & buf,
72 const Block & sample,
73 const IRowInputFormat::Params & params,
74 const FormatSettings &)
75 {
76 return std::make_shared<BinaryRowInputFormat>(buf, sample, params, true, true);
77 });
78}
79
80}
81