1#include <Common/Exception.h>
2#include <Columns/ColumnsNumber.h>
3#include <DataTypes/DataTypesNumber.h>
4#include <DataStreams/IBlockInputStream.h>
5#include <DataStreams/LimitBlockInputStream.h>
6#include <Storages/System/StorageSystemNumbers.h>
7
8
9namespace DB
10{
11
12namespace
13{
14
15class NumbersBlockInputStream : public IBlockInputStream
16{
17public:
18 NumbersBlockInputStream(UInt64 block_size_, UInt64 offset_, UInt64 step_)
19 : block_size(block_size_), next(offset_), step(step_) {}
20
21 String getName() const override { return "Numbers"; }
22
23 Block getHeader() const override
24 {
25 return { ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number") };
26 }
27
28protected:
29 Block readImpl() override
30 {
31 auto column = ColumnUInt64::create(block_size);
32 ColumnUInt64::Container & vec = column->getData();
33
34 size_t curr = next; /// The local variable for some reason works faster (>20%) than member of class.
35 UInt64 * pos = vec.data(); /// This also accelerates the code.
36 UInt64 * end = &vec[block_size];
37 while (pos < end)
38 *pos++ = curr++;
39
40 next += step;
41 return { ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeUInt64>(), "number") };
42 }
43private:
44 UInt64 block_size;
45 UInt64 next;
46 UInt64 step;
47};
48
49
50struct NumbersMultiThreadedState
51{
52 std::atomic<UInt64> counter;
53 explicit NumbersMultiThreadedState(UInt64 offset) : counter(offset) {}
54};
55
56using NumbersMultiThreadedStatePtr = std::shared_ptr<NumbersMultiThreadedState>;
57
58class NumbersMultiThreadedBlockInputStream : public IBlockInputStream
59{
60public:
61 NumbersMultiThreadedBlockInputStream(NumbersMultiThreadedStatePtr state_, UInt64 block_size_, UInt64 max_counter_)
62 : state(std::move(state_)), block_size(block_size_), max_counter(max_counter_) {}
63
64 String getName() const override { return "NumbersMt"; }
65
66 Block getHeader() const override
67 {
68 return { ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number") };
69 }
70
71protected:
72 Block readImpl() override
73 {
74 if (block_size == 0)
75 return {};
76
77 UInt64 curr = state->counter.fetch_add(block_size, std::memory_order_acquire);
78
79 if (curr >= max_counter)
80 return {};
81
82 if (curr + block_size > max_counter)
83 block_size = max_counter - curr;
84
85 auto column = ColumnUInt64::create(block_size);
86 ColumnUInt64::Container & vec = column->getData();
87
88 UInt64 * pos = vec.data();
89 UInt64 * end = &vec[block_size];
90 while (pos < end)
91 *pos++ = curr++;
92
93 return { ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeUInt64>(), "number") };
94 }
95
96private:
97 NumbersMultiThreadedStatePtr state;
98
99 UInt64 block_size;
100 UInt64 max_counter;
101};
102
103}
104
105
106StorageSystemNumbers::StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional<UInt64> limit_, UInt64 offset_, bool even_distribution_)
107 : name(name_), multithreaded(multithreaded_), even_distribution(even_distribution_), limit(limit_), offset(offset_)
108{
109 setColumns(ColumnsDescription({{"number", std::make_shared<DataTypeUInt64>()}}));
110}
111
112BlockInputStreams StorageSystemNumbers::read(
113 const Names & column_names,
114 const SelectQueryInfo &,
115 const Context & /*context*/,
116 QueryProcessingStage::Enum /*processed_stage*/,
117 size_t max_block_size,
118 unsigned num_streams)
119{
120 check(column_names);
121
122 if (limit && *limit < max_block_size)
123 {
124 max_block_size = static_cast<size_t>(*limit);
125 multithreaded = false;
126 }
127
128 if (!multithreaded)
129 num_streams = 1;
130
131 BlockInputStreams res(num_streams);
132
133 if (num_streams > 1 && !even_distribution && *limit)
134 {
135 auto state = std::make_shared<NumbersMultiThreadedState>(offset);
136 UInt64 max_counter = offset + *limit;
137
138 for (size_t i = 0; i < num_streams; ++i)
139 res[i] = std::make_shared<NumbersMultiThreadedBlockInputStream>(state, max_block_size, max_counter);
140
141 return res;
142 }
143
144 for (size_t i = 0; i < num_streams; ++i)
145 {
146 res[i] = std::make_shared<NumbersBlockInputStream>(max_block_size, offset + i * max_block_size, num_streams * max_block_size);
147
148 if (limit) /// This formula is how to split 'limit' elements to 'num_streams' chunks almost uniformly.
149 res[i] = std::make_shared<LimitBlockInputStream>(res[i], *limit * (i + 1) / num_streams - *limit * i / num_streams, 0, false, true);
150 }
151
152 return res;
153}
154
155}
156