1 | #pragma once |
2 | |
3 | #include <common/logger_useful.h> |
4 | |
5 | #include <Common/filesystemHelpers.h> |
6 | #include <Core/SortDescription.h> |
7 | #include <Core/SortCursor.h> |
8 | |
9 | #include <DataStreams/IBlockInputStream.h> |
10 | #include <DataStreams/NativeBlockInputStream.h> |
11 | |
12 | #include <IO/ReadBufferFromFile.h> |
13 | #include <Compression/CompressedReadBuffer.h> |
14 | |
15 | |
16 | namespace DB |
17 | { |
18 | |
19 | struct TemporaryFileStream; |
20 | |
21 | namespace ErrorCodes |
22 | { |
23 | extern const int NOT_ENOUGH_SPACE; |
24 | } |
25 | /** Merges stream of sorted each-separately blocks to sorted as-a-whole stream of blocks. |
26 | * If data to sort is too much, could use external sorting, with temporary files. |
27 | */ |
28 | |
29 | /** Part of implementation. Merging array of ready (already read from somewhere) blocks. |
30 | * Returns result of merge as stream of blocks, not more than 'max_merged_block_size' rows in each. |
31 | */ |
32 | class MergeSortingBlocksBlockInputStream : public IBlockInputStream |
33 | { |
34 | public: |
35 | /// limit - if not 0, allowed to return just first 'limit' rows in sorted order. |
36 | MergeSortingBlocksBlockInputStream(Blocks & blocks_, const SortDescription & description_, |
37 | size_t max_merged_block_size_, UInt64 limit_ = 0); |
38 | |
39 | String getName() const override { return "MergeSortingBlocks" ; } |
40 | |
41 | bool isSortedOutput() const override { return true; } |
42 | const SortDescription & getSortDescription() const override { return description; } |
43 | |
44 | Block () const override { return header; } |
45 | |
46 | protected: |
47 | Block readImpl() override; |
48 | |
49 | private: |
50 | Blocks & blocks; |
51 | Block ; |
52 | SortDescription description; |
53 | size_t max_merged_block_size; |
54 | UInt64 limit; |
55 | size_t total_merged_rows = 0; |
56 | |
57 | SortCursorImpls cursors; |
58 | |
59 | bool has_collation = false; |
60 | |
61 | SortingHeap<SortCursor> queue_without_collation; |
62 | SortingHeap<SortCursorWithCollation> queue_with_collation; |
63 | |
64 | /** Two different cursors are supported - with and without Collation. |
65 | * Templates are used (instead of virtual functions in SortCursor) for zero-overhead. |
66 | */ |
67 | template <typename TSortingHeap> |
68 | Block mergeImpl(TSortingHeap & queue); |
69 | }; |
70 | |
71 | |
72 | class MergeSortingBlockInputStream : public IBlockInputStream |
73 | { |
74 | public: |
75 | /// limit - if not 0, allowed to return just first 'limit' rows in sorted order. |
76 | MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_, |
77 | size_t max_merged_block_size_, UInt64 limit_, |
78 | size_t max_bytes_before_remerge_, |
79 | size_t max_bytes_before_external_sort_, const std::string & tmp_path_, |
80 | size_t min_free_disk_space_); |
81 | |
82 | String getName() const override { return "MergeSorting" ; } |
83 | |
84 | bool isSortedOutput() const override { return true; } |
85 | const SortDescription & getSortDescription() const override { return description; } |
86 | |
87 | Block () const override { return header; } |
88 | |
89 | protected: |
90 | Block readImpl() override; |
91 | |
92 | private: |
93 | SortDescription description; |
94 | size_t max_merged_block_size; |
95 | UInt64 limit; |
96 | |
97 | size_t max_bytes_before_remerge; |
98 | size_t max_bytes_before_external_sort; |
99 | const std::string tmp_path; |
100 | size_t min_free_disk_space; |
101 | |
102 | Logger * log = &Logger::get("MergeSortingBlockInputStream" ); |
103 | |
104 | Blocks blocks; |
105 | size_t sum_rows_in_blocks = 0; |
106 | size_t sum_bytes_in_blocks = 0; |
107 | std::unique_ptr<IBlockInputStream> impl; |
108 | |
109 | /// Before operation, will remove constant columns from blocks. And after, place constant columns back. |
110 | /// (to avoid excessive virtual function calls and because constants cannot be serialized in Native format for temporary files) |
111 | /// Save original block structure here. |
112 | Block ; |
113 | Block ; |
114 | |
115 | /// Everything below is for external sorting. |
116 | std::vector<std::unique_ptr<TemporaryFile>> temporary_files; |
117 | std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs; |
118 | |
119 | BlockInputStreams inputs_to_merge; |
120 | |
121 | /// Merge all accumulated blocks to keep no more than limit rows. |
122 | void remerge(); |
123 | /// If remerge doesn't save memory at least several times, mark it as useless and don't do it anymore. |
124 | bool remerge_is_useful = true; |
125 | }; |
126 | } |
127 | |