1#include <Processors/Transforms/LimitByTransform.h>
2#include <Common/PODArray.h>
3#include <Common/SipHash.h>
4
5namespace DB
6{
7
8LimitByTransform::LimitByTransform(const Block & header, size_t group_length_, size_t group_offset_, const Names & columns)
9 : ISimpleTransform(header, header, true)
10 , group_length(group_length_)
11 , group_offset(group_offset_)
12{
13 key_positions.reserve(columns.size());
14
15 for (const auto & name : columns)
16 {
17 auto position = header.getPositionByName(name);
18 auto & column = header.getByPosition(position).column;
19
20 /// Ignore all constant columns.
21 if (!(column && isColumnConst(*column)))
22 key_positions.emplace_back(position);
23 }
24}
25
26void LimitByTransform::transform(Chunk & chunk)
27{
28 size_t num_rows = chunk.getNumRows();
29 auto columns = chunk.detachColumns();
30
31 IColumn::Filter filter(num_rows);
32 size_t inserted_count = 0;
33
34 for (size_t row = 0; row < num_rows; ++row)
35 {
36 UInt128 key(0, 0);
37 SipHash hash;
38
39 for (auto position : key_positions)
40 columns[position]->updateHashWithValue(row, hash);
41
42 hash.get128(key.low, key.high);
43
44 auto count = keys_counts[key]++;
45 if (count >= group_offset && count < group_length + group_offset)
46 {
47 inserted_count++;
48 filter[row] = 1;
49 }
50 else
51 filter[row] = 0;
52 }
53
54 /// Just go to the next block if there isn't any new records in the current one.
55 if (!inserted_count)
56 /// SimpleTransform will skip it.
57 return;
58
59 if (inserted_count < num_rows)
60 {
61 for (auto & column : columns)
62 if (isColumnConst(*column))
63 column = column->cut(0, inserted_count);
64 else
65 column = column->filter(filter, inserted_count);
66 }
67
68 chunk.setColumns(std::move(columns), inserted_count);
69}
70
71}
72