1#pragma once
2
3#include <common/logger_useful.h>
4
5#include <DataStreams/MergingSortedBlockInputStream.h>
6#include <DataStreams/ColumnGathererStream.h>
7
8
9namespace DB
10{
11
12/** Merges several sorted streams into one.
13 * For each group of consecutive identical values of the primary key (the columns by which the data is sorted),
14 * keeps row with max `version` value.
15 */
16class ReplacingSortedBlockInputStream : public MergingSortedBlockInputStream
17{
18public:
19 ReplacingSortedBlockInputStream(
20 const BlockInputStreams & inputs_, const SortDescription & description_,
21 const String & version_column, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr,
22 bool average_block_sizes_ = false)
23 : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_, false, average_block_sizes_)
24 {
25 if (!version_column.empty())
26 version_column_number = header.getPositionByName(version_column);
27 }
28
29 String getName() const override { return "ReplacingSorted"; }
30
31protected:
32 /// Can return 1 more records than max_block_size.
33 Block readImpl() override;
34
35private:
36 ssize_t version_column_number = -1;
37
38 Logger * log = &Logger::get("ReplacingSortedBlockInputStream");
39
40 /// All data has been read.
41 bool finished = false;
42
43 /// Primary key of current row.
44 SharedBlockRowRef current_key;
45 /// Primary key of next row.
46 SharedBlockRowRef next_key;
47 /// Last row with maximum version for current primary key.
48 SharedBlockRowRef selected_row;
49 /// The position (into current_row_sources) of the row with the highest version.
50 size_t max_pos = 0;
51
52 /// Sources of rows with the current primary key.
53 PODArray<RowSourcePart> current_row_sources;
54
55 void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
56
57 /// Output into result the rows for current primary key.
58 void insertRow(MutableColumns & merged_columns);
59};
60
61}
62