1#include <DataStreams/ReplacingSortedBlockInputStream.h>
2#include <Columns/ColumnsNumber.h>
3#include <common/logger_useful.h>
4
5
6namespace DB
7{
8
9namespace ErrorCodes
10{
11 extern const int LOGICAL_ERROR;
12}
13
14
15void ReplacingSortedBlockInputStream::insertRow(MutableColumns & merged_columns)
16{
17 if (out_row_sources_buf)
18 {
19 /// true flag value means "skip row"
20 current_row_sources[max_pos].setSkipFlag(false);
21
22 out_row_sources_buf->write(reinterpret_cast<const char *>(current_row_sources.data()),
23 current_row_sources.size() * sizeof(RowSourcePart));
24 current_row_sources.resize(0);
25 }
26
27 for (size_t i = 0; i < num_columns; ++i)
28 merged_columns[i]->insertFrom(*(*selected_row.columns)[i], selected_row.row_num);
29}
30
31
32Block ReplacingSortedBlockInputStream::readImpl()
33{
34 if (finished)
35 return Block();
36
37 MutableColumns merged_columns;
38 init(merged_columns);
39
40 if (has_collation)
41 throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
42
43 if (merged_columns.empty())
44 return Block();
45
46 merge(merged_columns, queue_without_collation);
47 return header.cloneWithColumns(std::move(merged_columns));
48}
49
50
51void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
52{
53 MergeStopCondition stop_condition(average_block_sizes, max_block_size);
54 /// Take the rows in needed order and put them into `merged_columns` until rows no more than `max_block_size`
55 while (!queue.empty())
56 {
57 SortCursor current = queue.top();
58 size_t current_block_granularity = current->rows;
59
60 if (current_key.empty())
61 setPrimaryKeyRef(current_key, current);
62
63 setPrimaryKeyRef(next_key, current);
64
65 bool key_differs = next_key != current_key;
66
67 /// if there are enough rows and the last one is calculated completely
68 if (key_differs && stop_condition.checkStop())
69 return;
70
71 queue.pop();
72
73 if (key_differs)
74 {
75 /// Write the data for the previous primary key.
76 insertRow(merged_columns);
77 stop_condition.addRowWithGranularity(current_block_granularity);
78 selected_row.reset();
79 current_key.swap(next_key);
80 }
81
82 /// Initially, skip all rows. Unskip last on insert.
83 size_t current_pos = current_row_sources.size();
84 if (out_row_sources_buf)
85 current_row_sources.emplace_back(current.impl->order, true);
86
87 /// A non-strict comparison, since we select the last row for the same version values.
88 if (version_column_number == -1
89 || selected_row.empty()
90 || current->all_columns[version_column_number]->compareAt(
91 current->pos, selected_row.row_num,
92 *(*selected_row.columns)[version_column_number],
93 /* nan_direction_hint = */ 1) >= 0)
94 {
95 max_pos = current_pos;
96 setRowRef(selected_row, current);
97 }
98
99 if (!current->isLast())
100 {
101 current->next();
102 queue.push(current);
103 }
104 else
105 {
106 /// We get the next block from the corresponding source, if there is one.
107 fetchNextBlock(current, queue);
108 }
109 }
110
111 /// We will write the data for the last primary key.
112 if (!selected_row.empty())
113 insertRow(merged_columns);
114
115 finished = true;
116}
117
118}
119