1 | #pragma once |
2 | |
3 | #include <common/logger_useful.h> |
4 | |
5 | #include <DataStreams/MergingSortedBlockInputStream.h> |
6 | #include <DataStreams/ColumnGathererStream.h> |
7 | |
8 | |
9 | namespace DB |
10 | { |
11 | |
12 | /** Merges several sorted streams into one. |
13 | * For each group of consecutive identical values of the primary key (the columns by which the data is sorted), |
14 | * keeps row with max `version` value. |
15 | */ |
16 | class ReplacingSortedBlockInputStream : public MergingSortedBlockInputStream |
17 | { |
18 | public: |
19 | ReplacingSortedBlockInputStream( |
20 | const BlockInputStreams & inputs_, const SortDescription & description_, |
21 | const String & version_column, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr, |
22 | bool average_block_sizes_ = false) |
23 | : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_, false, average_block_sizes_) |
24 | { |
25 | if (!version_column.empty()) |
26 | version_column_number = header.getPositionByName(version_column); |
27 | } |
28 | |
29 | String getName() const override { return "ReplacingSorted" ; } |
30 | |
31 | protected: |
32 | /// Can return 1 more records than max_block_size. |
33 | Block readImpl() override; |
34 | |
35 | private: |
36 | ssize_t version_column_number = -1; |
37 | |
38 | Logger * log = &Logger::get("ReplacingSortedBlockInputStream" ); |
39 | |
40 | /// All data has been read. |
41 | bool finished = false; |
42 | |
43 | /// Primary key of current row. |
44 | SharedBlockRowRef current_key; |
45 | /// Primary key of next row. |
46 | SharedBlockRowRef next_key; |
47 | /// Last row with maximum version for current primary key. |
48 | SharedBlockRowRef selected_row; |
49 | /// The position (into current_row_sources) of the row with the highest version. |
50 | size_t max_pos = 0; |
51 | |
52 | /// Sources of rows with the current primary key. |
53 | PODArray<RowSourcePart> current_row_sources; |
54 | |
55 | void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue); |
56 | |
57 | /// Output into result the rows for current primary key. |
58 | void insertRow(MutableColumns & merged_columns); |
59 | }; |
60 | |
61 | } |
62 | |