1#pragma once
2#include <Processors/IProcessor.h>
3#include <Core/SortDescription.h>
4#include <Core/SortCursor.h>
5#include <Processors/SharedChunk.h>
6
7#include <queue>
8
9namespace DB
10{
11
12class MergingSortedTransform : public IProcessor
13{
14public:
15 MergingSortedTransform(
16 const Block & header,
17 size_t num_inputs,
18 const SortDescription & description_,
19 size_t max_block_size,
20 UInt64 limit = 0,
21 bool quiet = false,
22 bool have_all_inputs = true);
23
24 String getName() const override { return "MergingSortedTransform"; }
25 Status prepare() override;
26 void work() override;
27
28 void addInput();
29 void setHaveAllInputs();
30
31protected:
32
33 class MergedData
34 {
35 public:
36 explicit MergedData(const Block & header)
37 {
38 columns.reserve(header.columns());
39 for (const auto & column : header)
40 columns.emplace_back(column.type->createColumn());
41 }
42
43 void insertRow(const ColumnRawPtrs & raw_columns, size_t row)
44 {
45 size_t num_columns = raw_columns.size();
46 for (size_t i = 0; i < num_columns; ++i)
47 columns[i]->insertFrom(*raw_columns[i], row);
48
49 ++total_merged_rows;
50 ++merged_rows;
51 }
52
53 void insertFromChunk(Chunk && chunk, size_t limit_rows)
54 {
55 if (merged_rows)
56 throw Exception("Cannot insert to MergedData from Chunk because MergedData is not empty.",
57 ErrorCodes::LOGICAL_ERROR);
58
59 auto num_rows = chunk.getNumRows();
60 columns = chunk.mutateColumns();
61 if (limit_rows && num_rows > limit_rows)
62 {
63 num_rows = limit_rows;
64 for (auto & column : columns)
65 column = (*column->cut(0, num_rows)->convertToFullColumnIfConst()).mutate();
66 }
67
68 total_merged_rows += num_rows;
69 merged_rows = num_rows;
70 }
71
72 Chunk pull()
73 {
74 MutableColumns empty_columns;
75 empty_columns.reserve(columns.size());
76
77 for (const auto & column : columns)
78 empty_columns.emplace_back(column->cloneEmpty());
79
80 empty_columns.swap(columns);
81 Chunk chunk(std::move(empty_columns), merged_rows);
82 merged_rows = 0;
83
84 return chunk;
85 }
86
87 UInt64 totalMergedRows() const { return total_merged_rows; }
88 UInt64 mergedRows() const { return merged_rows; }
89
90 private:
91 UInt64 total_merged_rows = 0;
92 UInt64 merged_rows = 0;
93 MutableColumns columns;
94 };
95
96 /// Settings
97 SortDescription description;
98 const size_t max_block_size;
99 UInt64 limit;
100 bool has_collation = false;
101 bool quiet = false;
102
103 std::atomic<bool> have_all_inputs;
104
105 MergedData merged_data;
106
107 /// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step)
108 /// If it is not nullptr then it should be populated during execution
109 WriteBuffer * out_row_sources_buf = nullptr;
110
111 /// Chunks currently being merged.
112 std::vector<SharedChunkPtr> source_chunks;
113
114 using CursorImpls = std::vector<SortCursorImpl>;
115 CursorImpls cursors;
116
117 using Queue = std::priority_queue<SortCursor>;
118 Queue queue_without_collation;
119
120 using QueueWithCollation = std::priority_queue<SortCursorWithCollation>;
121 QueueWithCollation queue_with_collation;
122
123private:
124
125 /// Processor state.
126 bool is_initialized = false;
127 bool is_finished = false;
128 bool need_data = false;
129 size_t next_input_to_read = 0;
130
131 template <typename TSortCursor>
132 void merge(std::priority_queue<TSortCursor> & queue);
133
134 void insertFromChunk(size_t source_num);
135
136 void updateCursor(Chunk chunk, size_t source_num)
137 {
138 auto num_rows = chunk.getNumRows();
139 auto columns = chunk.detachColumns();
140 for (auto & column : columns)
141 column = column->convertToFullColumnIfConst();
142
143 chunk.setColumns(std::move(columns), num_rows);
144
145 auto & shared_chunk_ptr = source_chunks[source_num];
146
147 if (!shared_chunk_ptr)
148 {
149 shared_chunk_ptr = new detail::SharedChunk(std::move(chunk));
150 cursors[source_num] = SortCursorImpl(shared_chunk_ptr->getColumns(), description, source_num);
151 has_collation |= cursors[source_num].has_collation;
152 }
153 else
154 {
155 *shared_chunk_ptr = std::move(chunk);
156 cursors[source_num].reset(shared_chunk_ptr->getColumns(), {});
157 }
158
159 shared_chunk_ptr->all_columns = cursors[source_num].all_columns;
160 shared_chunk_ptr->sort_columns = cursors[source_num].sort_columns;
161 }
162
163 void pushToQueue(size_t source_num)
164 {
165 if (has_collation)
166 queue_with_collation.push(SortCursorWithCollation(&cursors[source_num]));
167 else
168 queue_without_collation.push(SortCursor(&cursors[source_num]));
169 }
170
171 template <typename TSortCursor>
172 void initQueue(std::priority_queue<TSortCursor> & queue)
173 {
174 for (auto & cursor : cursors)
175 if (!cursor.empty())
176 queue.push(TSortCursor(&cursor));
177 }
178};
179
180}
181