1#include <Core/Defines.h>
2#include <Core/Block.h>
3
4#include <IO/WriteHelpers.h>
5#include <IO/VarInt.h>
6#include <Compression/CompressedWriteBuffer.h>
7
8#include <DataStreams/MarkInCompressedFile.h>
9#include <DataStreams/NativeBlockOutputStream.h>
10
11#include <Common/typeid_cast.h>
12#include <DataTypes/DataTypeLowCardinality.h>
13
14namespace DB
15{
16
17namespace ErrorCodes
18{
19 extern const int LOGICAL_ERROR;
20}
21
22
23NativeBlockOutputStream::NativeBlockOutputStream(
24 WriteBuffer & ostr_, UInt64 client_revision_, const Block & header_, bool remove_low_cardinality_,
25 WriteBuffer * index_ostr_, size_t initial_size_of_file_)
26 : ostr(ostr_), client_revision(client_revision_), header(header_),
27 index_ostr(index_ostr_), initial_size_of_file(initial_size_of_file_), remove_low_cardinality(remove_low_cardinality_)
28{
29 if (index_ostr)
30 {
31 ostr_concrete = typeid_cast<CompressedWriteBuffer *>(&ostr);
32 if (!ostr_concrete)
33 throw Exception("When need to write index for NativeBlockOutputStream, ostr must be CompressedWriteBuffer.", ErrorCodes::LOGICAL_ERROR);
34 }
35}
36
37
38void NativeBlockOutputStream::flush()
39{
40 ostr.next();
41}
42
43
44void NativeBlockOutputStream::writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit)
45{
46 /** If there are columns-constants - then we materialize them.
47 * (Since the data type does not know how to serialize / deserialize constants.)
48 */
49 ColumnPtr full_column = column->convertToFullColumnIfConst();
50
51 IDataType::SerializeBinaryBulkSettings settings;
52 settings.getter = [&ostr](IDataType::SubstreamPath) -> WriteBuffer * { return &ostr; };
53 settings.position_independent_encoding = false;
54 settings.low_cardinality_max_dictionary_size = 0;
55
56 IDataType::SerializeBinaryBulkStatePtr state;
57 type.serializeBinaryBulkStatePrefix(settings, state);
58 type.serializeBinaryBulkWithMultipleStreams(*full_column, offset, limit, settings, state);
59 type.serializeBinaryBulkStateSuffix(settings, state);
60}
61
62
63void NativeBlockOutputStream::write(const Block & block)
64{
65 /// Additional information about the block.
66 if (client_revision > 0)
67 block.info.write(ostr);
68
69 block.checkNumberOfRows();
70
71 /// Dimensions
72 size_t columns = block.columns();
73 size_t rows = block.rows();
74
75 writeVarUInt(columns, ostr);
76 writeVarUInt(rows, ostr);
77
78 /** The index has the same structure as the data stream.
79 * But instead of column values, it contains a mark that points to the location in the data file where this part of the column is located.
80 */
81 if (index_ostr)
82 {
83 writeVarUInt(columns, *index_ostr);
84 writeVarUInt(rows, *index_ostr);
85 }
86
87 for (size_t i = 0; i < columns; ++i)
88 {
89 /// For the index.
90 MarkInCompressedFile mark;
91
92 if (index_ostr)
93 {
94 ostr_concrete->next(); /// Finish compressed block.
95 mark.offset_in_compressed_file = initial_size_of_file + ostr_concrete->getCompressedBytes();
96 mark.offset_in_decompressed_block = ostr_concrete->getRemainingBytes();
97 }
98
99 ColumnWithTypeAndName column = block.safeGetByPosition(i);
100
101 /// Send data to old clients without low cardinality type.
102 if (remove_low_cardinality || (client_revision && client_revision < DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE))
103 {
104 column.column = recursiveRemoveLowCardinality(column.column);
105 column.type = recursiveRemoveLowCardinality(column.type);
106 }
107
108 /// Name
109 writeStringBinary(column.name, ostr);
110
111 /// Type
112 String type_name = column.type->getName();
113
114 /// For compatibility, we will not send explicit timezone parameter in DateTime data type
115 /// to older clients, that cannot understand it.
116 if (client_revision < DBMS_MIN_REVISION_WITH_TIME_ZONE_PARAMETER_IN_DATETIME_DATA_TYPE
117 && startsWith(type_name, "DateTime("))
118 type_name = "DateTime";
119
120 writeStringBinary(type_name, ostr);
121
122 /// Data
123 if (rows) /// Zero items of data is always represented as zero number of bytes.
124 writeData(*column.type, column.column, ostr, 0, 0);
125
126 if (index_ostr)
127 {
128 writeStringBinary(column.name, *index_ostr);
129 writeStringBinary(column.type->getName(), *index_ostr);
130
131 writeBinary(mark.offset_in_compressed_file, *index_ostr);
132 writeBinary(mark.offset_in_decompressed_block, *index_ostr);
133 }
134 }
135}
136
137}
138