1#include <DataStreams/LimitByBlockInputStream.h>
2#include <Common/PODArray.h>
3#include <Common/SipHash.h>
4
5
6namespace DB
7{
8
9LimitByBlockInputStream::LimitByBlockInputStream(const BlockInputStreamPtr & input,
10 size_t group_length_, size_t group_offset_, const Names & columns)
11 : columns_names(columns)
12 , group_length(group_length_)
13 , group_offset(group_offset_)
14{
15 children.push_back(input);
16}
17
18Block LimitByBlockInputStream::readImpl()
19{
20 /// Execute until end of stream or until
21 /// a block with some new records will be gotten.
22 while (true)
23 {
24 Block block = children[0]->read();
25 if (!block)
26 return Block();
27
28 const ColumnRawPtrs column_ptrs(getKeyColumns(block));
29 const size_t rows = block.rows();
30 IColumn::Filter filter(rows);
31 size_t inserted_count = 0;
32
33 for (size_t i = 0; i < rows; ++i)
34 {
35 UInt128 key;
36 SipHash hash;
37
38 for (auto & column : column_ptrs)
39 column->updateHashWithValue(i, hash);
40
41 hash.get128(key.low, key.high);
42
43 auto count = keys_counts[key]++;
44 if (count >= group_offset && count < group_length + group_offset)
45 {
46 inserted_count++;
47 filter[i] = 1;
48 }
49 else
50 filter[i] = 0;
51 }
52
53 /// Just go to the next block if there isn't any new records in the current one.
54 if (!inserted_count)
55 continue;
56
57 size_t all_columns = block.columns();
58 for (size_t i = 0; i < all_columns; ++i)
59 block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(filter, inserted_count);
60
61 return block;
62 }
63}
64
65ColumnRawPtrs LimitByBlockInputStream::getKeyColumns(Block & block) const
66{
67 ColumnRawPtrs column_ptrs;
68 column_ptrs.reserve(columns_names.size());
69
70 for (const auto & name : columns_names)
71 {
72 auto & column = block.getByName(name).column;
73
74 /// Ignore all constant columns.
75 if (!isColumnConst(*column))
76 column_ptrs.emplace_back(column.get());
77 }
78
79 return column_ptrs;
80}
81
82}
83