1 | #include <random> |
---|---|
2 | #include <Common/thread_local_rng.h> |
3 | #include <DataStreams/ConcatBlockInputStream.h> |
4 | #include "narrowBlockInputStreams.h" |
5 | |
6 | |
7 | namespace DB |
8 | { |
9 | |
10 | BlockInputStreams narrowBlockInputStreams(BlockInputStreams & inputs, size_t width) |
11 | { |
12 | size_t size = inputs.size(); |
13 | if (size <= width) |
14 | return inputs; |
15 | |
16 | std::vector<BlockInputStreams> partitions(width); |
17 | |
18 | using Distribution = std::vector<size_t>; |
19 | Distribution distribution(size); |
20 | |
21 | for (size_t i = 0; i < size; ++i) |
22 | distribution[i] = i % width; |
23 | |
24 | std::shuffle(distribution.begin(), distribution.end(), thread_local_rng); |
25 | |
26 | for (size_t i = 0; i < size; ++i) |
27 | partitions[distribution[i]].push_back(inputs[i]); |
28 | |
29 | BlockInputStreams res(width); |
30 | for (size_t i = 0; i < width; ++i) |
31 | res[i] = std::make_shared<ConcatBlockInputStream>(partitions[i]); |
32 | |
33 | return res; |
34 | } |
35 | |
36 | } |
37 |