1#include <Functions/IFunctionImpl.h>
2#include <Functions/FunctionFactory.h>
3#include <Columns/ColumnsNumber.h>
4#include <DataTypes/DataTypesNumber.h>
5#include <atomic>
6
7
8namespace DB
9{
10
11/** Incremental number of row within all blocks passed to this function. */
12class FunctionRowNumberInAllBlocks : public IFunction
13{
14private:
15 std::atomic<size_t> rows{0};
16
17public:
18 static constexpr auto name = "rowNumberInAllBlocks";
19 static FunctionPtr create(const Context &)
20 {
21 return std::make_shared<FunctionRowNumberInAllBlocks>();
22 }
23
24 /// Get the name of the function.
25 String getName() const override
26 {
27 return name;
28 }
29
30 bool isStateful() const override
31 {
32 return true;
33 }
34
35 size_t getNumberOfArguments() const override
36 {
37 return 0;
38 }
39
40 bool isDeterministic() const override { return false; }
41
42 bool isDeterministicInScopeOfQuery() const override
43 {
44 return false;
45 }
46
47 DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
48 {
49 return std::make_shared<DataTypeUInt64>();
50 }
51
52 void executeImplDryRun(Block & block, const ColumnNumbers &, size_t result, size_t input_rows_count) override
53 {
54 auto column = ColumnUInt64::create(input_rows_count);
55 block.getByPosition(result).column = std::move(column);
56 }
57
58 void executeImpl(Block & block, const ColumnNumbers &, size_t result, size_t input_rows_count) override
59 {
60 size_t current_row_number = rows.fetch_add(input_rows_count);
61
62 auto column = ColumnUInt64::create();
63 auto & data = column->getData();
64 data.resize(input_rows_count);
65 for (size_t i = 0; i < input_rows_count; ++i)
66 data[i] = current_row_number + i;
67
68 block.getByPosition(result).column = std::move(column);
69 }
70};
71
72
73void registerFunctionRowNumberInAllBlocks(FunctionFactory & factory)
74{
75 factory.registerFunction<FunctionRowNumberInAllBlocks>();
76}
77
78}
79