| 1 | #pragma once |
| 2 | |
| 3 | #include <common/logger_useful.h> |
| 4 | |
| 5 | #include <Common/filesystemHelpers.h> |
| 6 | #include <Core/SortDescription.h> |
| 7 | #include <Core/SortCursor.h> |
| 8 | |
| 9 | #include <DataStreams/IBlockInputStream.h> |
| 10 | #include <DataStreams/NativeBlockInputStream.h> |
| 11 | |
| 12 | #include <IO/ReadBufferFromFile.h> |
| 13 | #include <Compression/CompressedReadBuffer.h> |
| 14 | |
| 15 | |
| 16 | namespace DB |
| 17 | { |
| 18 | |
| 19 | struct TemporaryFileStream; |
| 20 | |
| 21 | namespace ErrorCodes |
| 22 | { |
| 23 | extern const int NOT_ENOUGH_SPACE; |
| 24 | } |
| 25 | /** Merges stream of sorted each-separately blocks to sorted as-a-whole stream of blocks. |
| 26 | * If data to sort is too much, could use external sorting, with temporary files. |
| 27 | */ |
| 28 | |
| 29 | /** Part of implementation. Merging array of ready (already read from somewhere) blocks. |
| 30 | * Returns result of merge as stream of blocks, not more than 'max_merged_block_size' rows in each. |
| 31 | */ |
| 32 | class MergeSortingBlocksBlockInputStream : public IBlockInputStream |
| 33 | { |
| 34 | public: |
| 35 | /// limit - if not 0, allowed to return just first 'limit' rows in sorted order. |
| 36 | MergeSortingBlocksBlockInputStream(Blocks & blocks_, const SortDescription & description_, |
| 37 | size_t max_merged_block_size_, UInt64 limit_ = 0); |
| 38 | |
| 39 | String getName() const override { return "MergeSortingBlocks" ; } |
| 40 | |
| 41 | bool isSortedOutput() const override { return true; } |
| 42 | const SortDescription & getSortDescription() const override { return description; } |
| 43 | |
| 44 | Block () const override { return header; } |
| 45 | |
| 46 | protected: |
| 47 | Block readImpl() override; |
| 48 | |
| 49 | private: |
| 50 | Blocks & blocks; |
| 51 | Block ; |
| 52 | SortDescription description; |
| 53 | size_t max_merged_block_size; |
| 54 | UInt64 limit; |
| 55 | size_t total_merged_rows = 0; |
| 56 | |
| 57 | SortCursorImpls cursors; |
| 58 | |
| 59 | bool has_collation = false; |
| 60 | |
| 61 | SortingHeap<SortCursor> queue_without_collation; |
| 62 | SortingHeap<SortCursorWithCollation> queue_with_collation; |
| 63 | |
| 64 | /** Two different cursors are supported - with and without Collation. |
| 65 | * Templates are used (instead of virtual functions in SortCursor) for zero-overhead. |
| 66 | */ |
| 67 | template <typename TSortingHeap> |
| 68 | Block mergeImpl(TSortingHeap & queue); |
| 69 | }; |
| 70 | |
| 71 | |
| 72 | class MergeSortingBlockInputStream : public IBlockInputStream |
| 73 | { |
| 74 | public: |
| 75 | /// limit - if not 0, allowed to return just first 'limit' rows in sorted order. |
| 76 | MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_, |
| 77 | size_t max_merged_block_size_, UInt64 limit_, |
| 78 | size_t max_bytes_before_remerge_, |
| 79 | size_t max_bytes_before_external_sort_, const std::string & tmp_path_, |
| 80 | size_t min_free_disk_space_); |
| 81 | |
| 82 | String getName() const override { return "MergeSorting" ; } |
| 83 | |
| 84 | bool isSortedOutput() const override { return true; } |
| 85 | const SortDescription & getSortDescription() const override { return description; } |
| 86 | |
| 87 | Block () const override { return header; } |
| 88 | |
| 89 | protected: |
| 90 | Block readImpl() override; |
| 91 | |
| 92 | private: |
| 93 | SortDescription description; |
| 94 | size_t max_merged_block_size; |
| 95 | UInt64 limit; |
| 96 | |
| 97 | size_t max_bytes_before_remerge; |
| 98 | size_t max_bytes_before_external_sort; |
| 99 | const std::string tmp_path; |
| 100 | size_t min_free_disk_space; |
| 101 | |
| 102 | Logger * log = &Logger::get("MergeSortingBlockInputStream" ); |
| 103 | |
| 104 | Blocks blocks; |
| 105 | size_t sum_rows_in_blocks = 0; |
| 106 | size_t sum_bytes_in_blocks = 0; |
| 107 | std::unique_ptr<IBlockInputStream> impl; |
| 108 | |
| 109 | /// Before operation, will remove constant columns from blocks. And after, place constant columns back. |
| 110 | /// (to avoid excessive virtual function calls and because constants cannot be serialized in Native format for temporary files) |
| 111 | /// Save original block structure here. |
| 112 | Block ; |
| 113 | Block ; |
| 114 | |
| 115 | /// Everything below is for external sorting. |
| 116 | std::vector<std::unique_ptr<TemporaryFile>> temporary_files; |
| 117 | std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs; |
| 118 | |
| 119 | BlockInputStreams inputs_to_merge; |
| 120 | |
| 121 | /// Merge all accumulated blocks to keep no more than limit rows. |
| 122 | void remerge(); |
| 123 | /// If remerge doesn't save memory at least several times, mark it as useless and don't do it anymore. |
| 124 | bool remerge_is_useful = true; |
| 125 | }; |
| 126 | } |
| 127 | |