1#pragma once
2
3#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
4#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
5#include <IO/WriteBufferFromFile.h>
6#include <Compression/CompressedWriteBuffer.h>
7#include <IO/HashingWriteBuffer.h>
8#include <Storages/MergeTree/MergeTreeData.h>
9#include <DataStreams/IBlockOutputStream.h>
10
11
12namespace DB
13{
14
15class IMergedBlockOutputStream : public IBlockOutputStream
16{
17public:
18 IMergedBlockOutputStream(
19 MergeTreeData & storage_,
20 const String & part_path_,
21 size_t min_compress_block_size_,
22 size_t max_compress_block_size_,
23 CompressionCodecPtr default_codec_,
24 size_t aio_threshold_,
25 bool blocks_are_granules_size_,
26 const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
27 const MergeTreeIndexGranularity & index_granularity_,
28 const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr);
29
30 using WrittenOffsetColumns = std::set<std::string>;
31
32protected:
33 using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
34 using SerializationStates = std::vector<SerializationState>;
35
36 struct ColumnStream
37 {
38 ColumnStream(
39 const String & escaped_column_name_,
40 const String & data_path_,
41 const std::string & data_file_extension_,
42 const std::string & marks_path_,
43 const std::string & marks_file_extension_,
44 const CompressionCodecPtr & compression_codec_,
45 size_t max_compress_block_size_,
46 size_t estimated_size_,
47 size_t aio_threshold_);
48
49 String escaped_column_name;
50 std::string data_file_extension;
51 std::string marks_file_extension;
52
53 /// compressed -> compressed_buf -> plain_hashing -> plain_file
54 std::unique_ptr<WriteBufferFromFileBase> plain_file;
55 HashingWriteBuffer plain_hashing;
56 CompressedWriteBuffer compressed_buf;
57 HashingWriteBuffer compressed;
58
59 /// marks -> marks_file
60 WriteBufferFromFile marks_file;
61 HashingWriteBuffer marks;
62
63 void finalize();
64
65 void sync();
66
67 void addToChecksums(MergeTreeData::DataPart::Checksums & checksums);
68 };
69
70 using ColumnStreams = std::map<String, std::unique_ptr<ColumnStream>>;
71
72 void addStreams(const String & path, const String & name, const IDataType & type,
73 const CompressionCodecPtr & codec, size_t estimated_size, bool skip_offsets);
74
75
76 IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets);
77
78 /// Write data of one column.
79 /// Return how many marks were written and
80 /// how many rows were written for last mark
81 std::pair<size_t, size_t> writeColumn(
82 const String & name,
83 const IDataType & type,
84 const IColumn & column,
85 WrittenOffsetColumns & offset_columns,
86 bool skip_offsets,
87 IDataType::SerializeBinaryBulkStatePtr & serialization_state,
88 size_t from_mark
89 );
90
91 /// Write single granule of one column (rows between 2 marks)
92 size_t writeSingleGranule(
93 const String & name,
94 const IDataType & type,
95 const IColumn & column,
96 WrittenOffsetColumns & offset_columns,
97 bool skip_offsets,
98 IDataType::SerializeBinaryBulkStatePtr & serialization_state,
99 IDataType::SerializeBinaryBulkSettings & serialize_settings,
100 size_t from_row,
101 size_t number_of_rows,
102 bool write_marks);
103
104 /// Write mark for column
105 void writeSingleMark(
106 const String & name,
107 const IDataType & type,
108 WrittenOffsetColumns & offset_columns,
109 bool skip_offsets,
110 size_t number_of_rows,
111 DB::IDataType::SubstreamPath & path);
112
113 /// Count index_granularity for block and store in `index_granularity`
114 void fillIndexGranularity(const Block & block);
115
116 /// Write final mark to the end of column
117 void writeFinalMark(
118 const std::string & column_name,
119 const DataTypePtr column_type,
120 WrittenOffsetColumns & offset_columns,
121 bool skip_offsets,
122 DB::IDataType::SubstreamPath & path);
123
124 void initSkipIndices();
125 void calculateAndSerializeSkipIndices(const ColumnsWithTypeAndName & skip_indexes_columns, size_t rows);
126 void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & checksums);
127protected:
128 MergeTreeData & storage;
129
130 SerializationStates serialization_states;
131 String part_path;
132
133 ColumnStreams column_streams;
134
135 /// The offset to the first row of the block for which you want to write the index.
136 size_t index_offset = 0;
137
138 size_t min_compress_block_size;
139 size_t max_compress_block_size;
140
141 size_t aio_threshold;
142
143 size_t current_mark = 0;
144
145 /// Number of mark in data from which skip indices have to start
146 /// aggregation. I.e. it's data mark number, not skip indices mark.
147 size_t skip_index_data_mark = 0;
148
149 const bool can_use_adaptive_granularity;
150 const std::string marks_file_extension;
151 const bool blocks_are_granules_size;
152
153 MergeTreeIndexGranularity index_granularity;
154
155 const bool compute_granularity;
156 CompressionCodecPtr codec;
157
158 std::vector<MergeTreeIndexPtr> skip_indices;
159 std::vector<std::unique_ptr<ColumnStream>> skip_indices_streams;
160 MergeTreeIndexAggregators skip_indices_aggregators;
161 std::vector<size_t> skip_index_filling;
162
163 const bool with_final_mark;
164};
165
166}
167