1#pragma once
2#include <Processors/Transforms/SortingTransform.h>
3#include <Core/SortDescription.h>
4#include <Common/filesystemHelpers.h>
5#include <IO/ReadBufferFromFile.h>
6#include <Compression/CompressedReadBuffer.h>
7#include <DataStreams/IBlockInputStream.h>
8#include <DataStreams/NativeBlockInputStream.h>
9
10#include <common/logger_useful.h>
11
12#include <queue>
13
14namespace DB
15{
16
17namespace ErrorCodes
18{
19 extern const int NOT_ENOUGH_SPACE;
20}
21class MergeSorter;
22
23class MergeSortingTransform : public SortingTransform
24{
25public:
26 /// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
27 MergeSortingTransform(const Block & header,
28 const SortDescription & description_,
29 size_t max_merged_block_size_, UInt64 limit_,
30 size_t max_bytes_before_remerge_,
31 size_t max_bytes_before_external_sort_, const std::string & tmp_path_,
32 size_t min_free_disk_space_);
33
34 String getName() const override { return "MergeSortingTransform"; }
35
36protected:
37 void consume(Chunk chunk) override;
38 void serialize() override;
39 void generate() override;
40
41 Processors expandPipeline() override;
42
43private:
44 size_t max_bytes_before_remerge;
45 size_t max_bytes_before_external_sort;
46 const std::string tmp_path;
47 size_t min_free_disk_space;
48
49 Logger * log = &Logger::get("MergeSortingTransform");
50
51 /// If remerge doesn't save memory at least several times, mark it as useless and don't do it anymore.
52 bool remerge_is_useful = true;
53
54 /// Everything below is for external sorting.
55 std::vector<std::unique_ptr<TemporaryFile>> temporary_files;
56
57 /// Merge all accumulated blocks to keep no more than limit rows.
58 void remerge();
59
60 ProcessorPtr external_merging_sorted;
61};
62
63}
64