1#pragma once
2
3#include <DataStreams/IBlockInputStream.h>
4#include <IO/ReadBuffer.h>
5#include <Common/PODArray.h>
6
7
8namespace Poco { class Logger; }
9
10
11namespace DB
12{
13
14
15/// Tiny struct, stores number of a Part from which current row was fetched, and insertion flag.
16struct RowSourcePart
17{
18 UInt8 data = 0;
19
20 RowSourcePart() = default;
21
22 RowSourcePart(size_t source_num, bool skip_flag = false)
23 {
24 static_assert(sizeof(*this) == 1, "Size of RowSourcePart is too big due to compiler settings");
25 setSourceNum(source_num);
26 setSkipFlag(skip_flag);
27 }
28
29 size_t getSourceNum() const { return data & MASK_NUMBER; }
30
31 /// In CollapsingMergeTree case flag means "skip this rows"
32 bool getSkipFlag() const { return (data & MASK_FLAG) != 0; }
33
34 void setSourceNum(size_t source_num)
35 {
36 data = (data & MASK_FLAG) | (static_cast<UInt8>(source_num) & MASK_NUMBER);
37 }
38
39 void setSkipFlag(bool flag)
40 {
41 data = flag ? data | MASK_FLAG : data & ~MASK_FLAG;
42 }
43
44 static constexpr size_t MAX_PARTS = 0x7F;
45 static constexpr UInt8 MASK_NUMBER = 0x7F;
46 static constexpr UInt8 MASK_FLAG = 0x80;
47};
48
49using MergedRowSources = PODArray<RowSourcePart>;
50
51
52/** Gather single stream from multiple streams according to streams mask.
53 * Stream mask maps row number to index of source stream.
54 * Streams should contain exactly one column.
55 */
56class ColumnGathererStream : public IBlockInputStream
57{
58public:
59 ColumnGathererStream(
60 const String & column_name_, const BlockInputStreams & source_streams, ReadBuffer & row_sources_buf_,
61 size_t block_preferred_size_ = DEFAULT_BLOCK_SIZE);
62
63 String getName() const override { return "ColumnGatherer"; }
64
65 Block readImpl() override;
66
67 void readSuffixImpl() override;
68
69 Block getHeader() const override { return children.at(0)->getHeader(); }
70
71 /// for use in implementations of IColumn::gather()
72 template <typename Column>
73 void gather(Column & column_res);
74
75private:
76 /// Cache required fields
77 struct Source
78 {
79 const IColumn * column = nullptr;
80 size_t pos = 0;
81 size_t size = 0;
82 Block block;
83
84 void update(const String & name)
85 {
86 column = block.getByName(name).column.get();
87 size = block.rows();
88 pos = 0;
89 }
90 };
91
92 void fetchNewBlock(Source & source, size_t source_num);
93
94 String column_name;
95 ColumnWithTypeAndName column;
96
97 std::vector<Source> sources;
98 ReadBuffer & row_sources_buf;
99
100 size_t block_preferred_size;
101
102 Source * source_to_fully_copy = nullptr;
103 Block output_block;
104
105 Poco::Logger * log;
106};
107
108template <typename Column>
109void ColumnGathererStream::gather(Column & column_res)
110{
111 if (source_to_fully_copy) /// Was set on a previous iteration
112 {
113 output_block.getByPosition(0).column = source_to_fully_copy->block.getByName(column_name).column;
114 source_to_fully_copy->pos = source_to_fully_copy->size;
115 source_to_fully_copy = nullptr;
116 return;
117 }
118
119 row_sources_buf.nextIfAtEnd();
120 RowSourcePart * row_source_pos = reinterpret_cast<RowSourcePart *>(row_sources_buf.position());
121 RowSourcePart * row_sources_end = reinterpret_cast<RowSourcePart *>(row_sources_buf.buffer().end());
122
123 size_t cur_block_preferred_size = std::min(static_cast<size_t>(row_sources_end - row_source_pos), block_preferred_size);
124 column_res.reserve(cur_block_preferred_size);
125
126 size_t cur_size = 0;
127
128 while (row_source_pos < row_sources_end && cur_size < cur_block_preferred_size)
129 {
130 RowSourcePart row_source = *row_source_pos;
131 size_t source_num = row_source.getSourceNum();
132 Source & source = sources[source_num];
133 bool source_skip = row_source.getSkipFlag();
134 ++row_source_pos;
135
136 if (source.pos >= source.size) /// Fetch new block from source_num part
137 {
138 fetchNewBlock(source, source_num);
139 }
140
141 /// Consecutive optimization. TODO: precompute lengths
142 size_t len = 1;
143 size_t max_len = std::min(static_cast<size_t>(row_sources_end - row_source_pos), source.size - source.pos); // interval should be in the same block
144 while (len < max_len && row_source_pos->data == row_source.data)
145 {
146 ++len;
147 ++row_source_pos;
148 }
149
150 row_sources_buf.position() = reinterpret_cast<char *>(row_source_pos);
151
152 if (!source_skip)
153 {
154 /// Whole block could be produced via copying pointer from current block
155 if (source.pos == 0 && source.size == len)
156 {
157 /// If current block already contains data, return it.
158 /// Whole column from current source will be returned on next read() iteration.
159 if (cur_size > 0)
160 {
161 source_to_fully_copy = &source;
162 return;
163 }
164
165 output_block.getByPosition(0).column = source.block.getByName(column_name).column;
166 source.pos += len;
167 return;
168 }
169 else if (len == 1)
170 column_res.insertFrom(*source.column, source.pos);
171 else
172 column_res.insertRangeFrom(*source.column, source.pos, len);
173
174 cur_size += len;
175 }
176
177 source.pos += len;
178 }
179}
180
181}
182