1#include <algorithm>
2
3#include <DataStreams/LimitBlockInputStream.h>
4
5
6namespace DB
7{
8
9/// gets pointers to all columns of block, which were used for ORDER BY
10static ColumnRawPtrs extractSortColumns(const Block & block, const SortDescription & description)
11{
12 size_t size = description.size();
13 ColumnRawPtrs res;
14 res.reserve(size);
15
16 for (size_t i = 0; i < size; ++i)
17 {
18 const IColumn * column = !description[i].column_name.empty()
19 ? block.getByName(description[i].column_name).column.get()
20 : block.safeGetByPosition(description[i].column_number).column.get();
21 res.emplace_back(column);
22 }
23
24 return res;
25}
26
27
28LimitBlockInputStream::LimitBlockInputStream(
29 const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_,
30 bool use_limit_as_total_rows_approx, bool with_ties_, const SortDescription & description_)
31 : limit(limit_), offset(offset_), always_read_till_end(always_read_till_end_), with_ties(with_ties_)
32 , description(description_)
33{
34 if (use_limit_as_total_rows_approx)
35 {
36 addTotalRowsApprox(static_cast<size_t>(limit));
37 }
38
39 children.push_back(input);
40}
41
42Block LimitBlockInputStream::readImpl()
43{
44 Block res;
45 UInt64 rows = 0;
46
47 /// pos >= offset + limit and all rows in the end of previous block were equal
48 /// to row at 'limit' position. So we check current block.
49 if (!ties_row_ref.empty() && pos >= offset + limit)
50 {
51 res = children.back()->read();
52 rows = res.rows();
53
54 if (!res)
55 return res;
56
57 SharedBlockPtr ptr = new detail::SharedBlock(std::move(res));
58 ptr->sort_columns = extractSortColumns(*ptr, description);
59
60 UInt64 len;
61 for (len = 0; len < rows; ++len)
62 {
63 SharedBlockRowRef current_row;
64 current_row.set(ptr, &ptr->sort_columns, len);
65
66 if (current_row != ties_row_ref)
67 {
68 ties_row_ref.reset();
69 break;
70 }
71 }
72
73 if (len < rows)
74 {
75 for (size_t i = 0; i < ptr->columns(); ++i)
76 ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(0, len);
77 }
78
79 return *ptr;
80 }
81
82 if (pos >= offset + limit)
83 {
84 if (!always_read_till_end)
85 return res;
86 else
87 {
88 while (children.back()->read())
89 ;
90 return res;
91 }
92 }
93
94 do
95 {
96 res = children.back()->read();
97 if (!res)
98 return res;
99 rows = res.rows();
100 pos += rows;
101 } while (pos <= offset);
102
103 SharedBlockPtr ptr = new detail::SharedBlock(std::move(res));
104 if (with_ties)
105 ptr->sort_columns = extractSortColumns(*ptr, description);
106
107 /// give away the whole block
108 if (pos >= offset + rows && pos <= offset + limit)
109 {
110 /// Save rowref for last row, because probalbly next block begins with the same row.
111 if (with_ties && pos == offset + limit)
112 ties_row_ref.set(ptr, &ptr->sort_columns, rows - 1);
113 return *ptr;
114 }
115
116 /// give away a piece of the block
117 UInt64 start = std::max(
118 static_cast<Int64>(0),
119 static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows));
120
121 UInt64 length = std::min(
122 static_cast<Int64>(limit), std::min(
123 static_cast<Int64>(pos) - static_cast<Int64>(offset),
124 static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows)));
125
126
127 /// check if other rows in current block equals to last one in limit
128 if (with_ties)
129 {
130 ties_row_ref.set(ptr, &ptr->sort_columns, start + length - 1);
131
132 for (size_t i = ties_row_ref.row_num + 1; i < rows; ++i)
133 {
134 SharedBlockRowRef current_row;
135 current_row.set(ptr, &ptr->sort_columns, i);
136 if (current_row == ties_row_ref)
137 ++length;
138 else
139 {
140 ties_row_ref.reset();
141 break;
142 }
143 }
144 }
145
146 if (length == rows)
147 return *ptr;
148
149 for (size_t i = 0; i < ptr->columns(); ++i)
150 ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(start, length);
151
152 // TODO: we should provide feedback to child-block, so it will know how many rows are actually consumed.
153 // It's crucial for streaming engines like Kafka.
154
155 return *ptr;
156}
157
158}
159