1#pragma once
2
3#include <common/logger_useful.h>
4
5#include <Common/filesystemHelpers.h>
6#include <Core/SortDescription.h>
7#include <Core/SortCursor.h>
8
9#include <DataStreams/IBlockInputStream.h>
10#include <DataStreams/NativeBlockInputStream.h>
11
12#include <IO/ReadBufferFromFile.h>
13#include <Compression/CompressedReadBuffer.h>
14
15
16namespace DB
17{
18
19struct TemporaryFileStream;
20
21namespace ErrorCodes
22{
23 extern const int NOT_ENOUGH_SPACE;
24}
25/** Merges stream of sorted each-separately blocks to sorted as-a-whole stream of blocks.
26 * If data to sort is too much, could use external sorting, with temporary files.
27 */
28
29/** Part of implementation. Merging array of ready (already read from somewhere) blocks.
30 * Returns result of merge as stream of blocks, not more than 'max_merged_block_size' rows in each.
31 */
32class MergeSortingBlocksBlockInputStream : public IBlockInputStream
33{
34public:
35 /// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
36 MergeSortingBlocksBlockInputStream(Blocks & blocks_, const SortDescription & description_,
37 size_t max_merged_block_size_, UInt64 limit_ = 0);
38
39 String getName() const override { return "MergeSortingBlocks"; }
40
41 bool isSortedOutput() const override { return true; }
42 const SortDescription & getSortDescription() const override { return description; }
43
44 Block getHeader() const override { return header; }
45
46protected:
47 Block readImpl() override;
48
49private:
50 Blocks & blocks;
51 Block header;
52 SortDescription description;
53 size_t max_merged_block_size;
54 UInt64 limit;
55 size_t total_merged_rows = 0;
56
57 SortCursorImpls cursors;
58
59 bool has_collation = false;
60
61 SortingHeap<SortCursor> queue_without_collation;
62 SortingHeap<SortCursorWithCollation> queue_with_collation;
63
64 /** Two different cursors are supported - with and without Collation.
65 * Templates are used (instead of virtual functions in SortCursor) for zero-overhead.
66 */
67 template <typename TSortingHeap>
68 Block mergeImpl(TSortingHeap & queue);
69};
70
71
72class MergeSortingBlockInputStream : public IBlockInputStream
73{
74public:
75 /// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
76 MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_,
77 size_t max_merged_block_size_, UInt64 limit_,
78 size_t max_bytes_before_remerge_,
79 size_t max_bytes_before_external_sort_, const std::string & tmp_path_,
80 size_t min_free_disk_space_);
81
82 String getName() const override { return "MergeSorting"; }
83
84 bool isSortedOutput() const override { return true; }
85 const SortDescription & getSortDescription() const override { return description; }
86
87 Block getHeader() const override { return header; }
88
89protected:
90 Block readImpl() override;
91
92private:
93 SortDescription description;
94 size_t max_merged_block_size;
95 UInt64 limit;
96
97 size_t max_bytes_before_remerge;
98 size_t max_bytes_before_external_sort;
99 const std::string tmp_path;
100 size_t min_free_disk_space;
101
102 Logger * log = &Logger::get("MergeSortingBlockInputStream");
103
104 Blocks blocks;
105 size_t sum_rows_in_blocks = 0;
106 size_t sum_bytes_in_blocks = 0;
107 std::unique_ptr<IBlockInputStream> impl;
108
109 /// Before operation, will remove constant columns from blocks. And after, place constant columns back.
110 /// (to avoid excessive virtual function calls and because constants cannot be serialized in Native format for temporary files)
111 /// Save original block structure here.
112 Block header;
113 Block header_without_constants;
114
115 /// Everything below is for external sorting.
116 std::vector<std::unique_ptr<TemporaryFile>> temporary_files;
117 std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
118
119 BlockInputStreams inputs_to_merge;
120
121 /// Merge all accumulated blocks to keep no more than limit rows.
122 void remerge();
123 /// If remerge doesn't save memory at least several times, mark it as useless and don't do it anymore.
124 bool remerge_is_useful = true;
125};
126}
127