| 1 | #pragma once | 
|---|
| 2 |  | 
|---|
| 3 | #include <common/logger_useful.h> | 
|---|
| 4 |  | 
|---|
| 5 | #include <Common/filesystemHelpers.h> | 
|---|
| 6 | #include <Core/SortDescription.h> | 
|---|
| 7 | #include <Core/SortCursor.h> | 
|---|
| 8 |  | 
|---|
| 9 | #include <DataStreams/IBlockInputStream.h> | 
|---|
| 10 | #include <DataStreams/NativeBlockInputStream.h> | 
|---|
| 11 |  | 
|---|
| 12 | #include <IO/ReadBufferFromFile.h> | 
|---|
| 13 | #include <Compression/CompressedReadBuffer.h> | 
|---|
| 14 |  | 
|---|
| 15 |  | 
|---|
| 16 | namespace DB | 
|---|
| 17 | { | 
|---|
| 18 |  | 
|---|
| 19 | struct TemporaryFileStream; | 
|---|
| 20 |  | 
|---|
| 21 | namespace ErrorCodes | 
|---|
| 22 | { | 
|---|
| 23 | extern const int NOT_ENOUGH_SPACE; | 
|---|
| 24 | } | 
|---|
| 25 | /** Merges stream of sorted each-separately blocks to sorted as-a-whole stream of blocks. | 
|---|
| 26 | * If data to sort is too much, could use external sorting, with temporary files. | 
|---|
| 27 | */ | 
|---|
| 28 |  | 
|---|
| 29 | /** Part of implementation. Merging array of ready (already read from somewhere) blocks. | 
|---|
| 30 | * Returns result of merge as stream of blocks, not more than 'max_merged_block_size' rows in each. | 
|---|
| 31 | */ | 
|---|
| 32 | class MergeSortingBlocksBlockInputStream : public IBlockInputStream | 
|---|
| 33 | { | 
|---|
| 34 | public: | 
|---|
| 35 | /// limit - if not 0, allowed to return just first 'limit' rows in sorted order. | 
|---|
| 36 | MergeSortingBlocksBlockInputStream(Blocks & blocks_, const SortDescription & description_, | 
|---|
| 37 | size_t max_merged_block_size_, UInt64 limit_ = 0); | 
|---|
| 38 |  | 
|---|
| 39 | String getName() const override { return "MergeSortingBlocks"; } | 
|---|
| 40 |  | 
|---|
| 41 | bool isSortedOutput() const override { return true; } | 
|---|
| 42 | const SortDescription & getSortDescription() const override { return description; } | 
|---|
| 43 |  | 
|---|
| 44 | Block () const override { return header; } | 
|---|
| 45 |  | 
|---|
| 46 | protected: | 
|---|
| 47 | Block readImpl() override; | 
|---|
| 48 |  | 
|---|
| 49 | private: | 
|---|
| 50 | Blocks & blocks; | 
|---|
| 51 | Block ; | 
|---|
| 52 | SortDescription description; | 
|---|
| 53 | size_t max_merged_block_size; | 
|---|
| 54 | UInt64 limit; | 
|---|
| 55 | size_t total_merged_rows = 0; | 
|---|
| 56 |  | 
|---|
| 57 | SortCursorImpls cursors; | 
|---|
| 58 |  | 
|---|
| 59 | bool has_collation = false; | 
|---|
| 60 |  | 
|---|
| 61 | SortingHeap<SortCursor> queue_without_collation; | 
|---|
| 62 | SortingHeap<SortCursorWithCollation> queue_with_collation; | 
|---|
| 63 |  | 
|---|
| 64 | /** Two different cursors are supported - with and without Collation. | 
|---|
| 65 | *  Templates are used (instead of virtual functions in SortCursor) for zero-overhead. | 
|---|
| 66 | */ | 
|---|
| 67 | template <typename TSortingHeap> | 
|---|
| 68 | Block mergeImpl(TSortingHeap & queue); | 
|---|
| 69 | }; | 
|---|
| 70 |  | 
|---|
| 71 |  | 
|---|
| 72 | class MergeSortingBlockInputStream : public IBlockInputStream | 
|---|
| 73 | { | 
|---|
| 74 | public: | 
|---|
| 75 | /// limit - if not 0, allowed to return just first 'limit' rows in sorted order. | 
|---|
| 76 | MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_, | 
|---|
| 77 | size_t max_merged_block_size_, UInt64 limit_, | 
|---|
| 78 | size_t max_bytes_before_remerge_, | 
|---|
| 79 | size_t max_bytes_before_external_sort_, const std::string & tmp_path_, | 
|---|
| 80 | size_t min_free_disk_space_); | 
|---|
| 81 |  | 
|---|
| 82 | String getName() const override { return "MergeSorting"; } | 
|---|
| 83 |  | 
|---|
| 84 | bool isSortedOutput() const override { return true; } | 
|---|
| 85 | const SortDescription & getSortDescription() const override { return description; } | 
|---|
| 86 |  | 
|---|
| 87 | Block () const override { return header; } | 
|---|
| 88 |  | 
|---|
| 89 | protected: | 
|---|
| 90 | Block readImpl() override; | 
|---|
| 91 |  | 
|---|
| 92 | private: | 
|---|
| 93 | SortDescription description; | 
|---|
| 94 | size_t max_merged_block_size; | 
|---|
| 95 | UInt64 limit; | 
|---|
| 96 |  | 
|---|
| 97 | size_t max_bytes_before_remerge; | 
|---|
| 98 | size_t max_bytes_before_external_sort; | 
|---|
| 99 | const std::string tmp_path; | 
|---|
| 100 | size_t min_free_disk_space; | 
|---|
| 101 |  | 
|---|
| 102 | Logger * log = &Logger::get( "MergeSortingBlockInputStream"); | 
|---|
| 103 |  | 
|---|
| 104 | Blocks blocks; | 
|---|
| 105 | size_t sum_rows_in_blocks = 0; | 
|---|
| 106 | size_t sum_bytes_in_blocks = 0; | 
|---|
| 107 | std::unique_ptr<IBlockInputStream> impl; | 
|---|
| 108 |  | 
|---|
| 109 | /// Before operation, will remove constant columns from blocks. And after, place constant columns back. | 
|---|
| 110 | /// (to avoid excessive virtual function calls and because constants cannot be serialized in Native format for temporary files) | 
|---|
| 111 | /// Save original block structure here. | 
|---|
| 112 | Block ; | 
|---|
| 113 | Block ; | 
|---|
| 114 |  | 
|---|
| 115 | /// Everything below is for external sorting. | 
|---|
| 116 | std::vector<std::unique_ptr<TemporaryFile>> temporary_files; | 
|---|
| 117 | std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs; | 
|---|
| 118 |  | 
|---|
| 119 | BlockInputStreams inputs_to_merge; | 
|---|
| 120 |  | 
|---|
| 121 | /// Merge all accumulated blocks to keep no more than limit rows. | 
|---|
| 122 | void remerge(); | 
|---|
| 123 | /// If remerge doesn't save memory at least several times, mark it as useless and don't do it anymore. | 
|---|
| 124 | bool remerge_is_useful = true; | 
|---|
| 125 | }; | 
|---|
| 126 | } | 
|---|
| 127 |  | 
|---|