1#pragma once
2
3#include <common/logger_useful.h>
4
5#include <DataStreams/MergingSortedBlockInputStream.h>
6#include <DataStreams/ColumnGathererStream.h>
7
8
9namespace DB
10{
11
12/** Merges several sorted streams to one.
13 * For each group of consecutive identical values of the primary key (the columns by which the data is sorted),
14 * keeps no more than one row with the value of the column `sign_column = -1` ("negative row")
15 * and no more than a row with the value of the column `sign_column = 1` ("positive row").
16 * That is, it collapses the records from the change log.
17 *
18 * If the number of positive and negative rows is the same, and the last row is positive, then the first negative and last positive rows are written.
19 * If the number of positive and negative rows is the same, and the last line is negative, it writes nothing.
20 * If the positive by 1 is greater than the negative rows, then only the last positive row is written.
21 * If negative by 1 is greater than positive rows, then only the first negative row is written.
22 * Otherwise, a logical error.
23 */
24class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream
25{
26public:
27 CollapsingSortedBlockInputStream(
28 BlockInputStreams inputs_, const SortDescription & description_,
29 const String & sign_column, size_t max_block_size_,
30 WriteBuffer * out_row_sources_buf_ = nullptr, bool average_block_sizes_ = false)
31 : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_, false, average_block_sizes_)
32 {
33 sign_column_number = header.getPositionByName(sign_column);
34 }
35
36 String getName() const override { return "CollapsingSorted"; }
37
38protected:
39 /// Can return 1 more records than max_block_size.
40 Block readImpl() override;
41
42private:
43 size_t sign_column_number;
44
45 Logger * log = &Logger::get("CollapsingSortedBlockInputStream");
46
47 /// Read is finished.
48 bool finished = false;
49
50 SharedBlockRowRef current_key; /// The current primary key.
51 SharedBlockRowRef next_key; /// The primary key of the next row.
52
53 SharedBlockRowRef first_negative; /// The first negative row for the current primary key.
54 SharedBlockRowRef last_positive; /// The last positive row for the current primary key.
55 SharedBlockRowRef last_negative; /// Last negative row. It is only stored if there is not one row is written to output.
56
57 size_t count_positive = 0; /// The number of positive rows for the current primary key.
58 size_t count_negative = 0; /// The number of negative rows for the current primary key.
59 bool last_is_positive = false; /// true if the last row for the current primary key is positive.
60
61 size_t count_incorrect_data = 0; /// To prevent too many error messages from writing to the log.
62
63 size_t blocks_written = 0;
64
65 /// Fields specific for VERTICAL merge algorithm.
66 /// Row numbers are relative to the start of current primary key.
67 size_t current_pos = 0; /// Current row number
68 size_t first_negative_pos = 0; /// Row number of first_negative
69 size_t last_positive_pos = 0; /// Row number of last_positive
70 size_t last_negative_pos = 0; /// Row number of last_negative
71 PODArray<RowSourcePart> current_row_sources; /// Sources of rows with the current primary key
72
73 /** We support two different cursors - with Collation and without.
74 * Templates are used instead of polymorphic SortCursors and calls to virtual functions.
75 */
76 void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
77
78 /// Output to result rows for the current primary key.
79 void insertRows(MutableColumns & merged_columns, size_t block_size, MergeStopCondition & condition);
80
81 void reportIncorrectData();
82};
83
84}
85