1#pragma once
2#include <Processors/IProcessor.h>
3#include <Core/SortDescription.h>
4#include <Core/SortCursor.h>
5#include <DataStreams/IBlockInputStream.h>
6#include <Processors/ISource.h>
7#include <queue>
8
9
10namespace DB
11{
12
13/** Part of implementation. Merging array of ready (already read from somewhere) chunks.
14 * Returns result of merge as stream of chunks, not more than 'max_merged_block_size' rows in each.
15 */
16class MergeSorter
17{
18public:
19 MergeSorter(Chunks chunks_, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_);
20
21 Chunk read();
22
23private:
24 Chunks chunks;
25 SortDescription description;
26 size_t max_merged_block_size;
27 UInt64 limit;
28 size_t total_merged_rows = 0;
29
30 using CursorImpls = std::vector<SortCursorImpl>;
31 CursorImpls cursors;
32
33 bool has_collation = false;
34
35 std::priority_queue<SortCursor> queue_without_collation;
36 std::priority_queue<SortCursorWithCollation> queue_with_collation;
37
38 /** Two different cursors are supported - with and without Collation.
39 * Templates are used (instead of virtual functions in SortCursor) for zero-overhead.
40 */
41 template <typename TSortCursor>
42 Chunk mergeImpl(std::priority_queue<TSortCursor> & queue);
43};
44
45
46class MergeSorterSource : public ISource
47{
48public:
49 MergeSorterSource(Block header, Chunks chunks, SortDescription & description, size_t max_merged_block_size, UInt64 limit)
50 : ISource(std::move(header)), merge_sorter(std::move(chunks), description, max_merged_block_size, limit) {}
51
52 String getName() const override { return "MergeSorterSource"; }
53
54protected:
55 Chunk generate() override { return merge_sorter.read(); }
56
57private:
58 MergeSorter merge_sorter;
59};
60
61/** Base class for sorting.
62 * Currently there are two implementations: MergeSortingTransform and FinishSortingTransform.
63 */
64class SortingTransform : public IProcessor
65{
66public:
67 /// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
68 SortingTransform(const Block & header,
69 const SortDescription & description_,
70 size_t max_merged_block_size_, UInt64 limit_);
71
72 ~SortingTransform() override;
73
74protected:
75 Status prepare() override final;
76 void work() override final;
77
78 virtual void consume(Chunk chunk) = 0;
79 virtual void generate() = 0;
80 virtual void serialize();
81
82 SortDescription description;
83 size_t max_merged_block_size;
84 UInt64 limit;
85
86 size_t sum_rows_in_blocks = 0;
87 size_t sum_bytes_in_blocks = 0;
88
89 /// Before operation, will remove constant columns from blocks. And after, place constant columns back.
90 /// (to avoid excessive virtual function calls and because constants cannot be serialized in Native format for temporary files)
91 /// Save original block structure here.
92 Block header_without_constants;
93 /// Columns which were constant in header and we need to remove from chunks.
94 std::vector<bool> const_columns_to_remove;
95
96 void removeConstColumns(Chunk & chunk);
97 void enrichChunkWithConstants(Chunk & chunk);
98
99 enum class Stage
100 {
101 Consume = 0,
102 Generate,
103 Serialize,
104 };
105
106 Stage stage = Stage::Consume;
107
108 bool generated_prefix = false;
109 Chunk current_chunk;
110 Chunk generated_chunk;
111 Chunks chunks;
112
113 std::unique_ptr<MergeSorter> merge_sorter;
114 Processors processors;
115
116private:
117 Status prepareConsume();
118 Status prepareSerialize();
119 Status prepareGenerate();
120};
121
122}
123