1#include <random>
2#include <Common/thread_local_rng.h>
3#include <DataStreams/ConcatBlockInputStream.h>
4#include "narrowBlockInputStreams.h"
5
6
7namespace DB
8{
9
10BlockInputStreams narrowBlockInputStreams(BlockInputStreams & inputs, size_t width)
11{
12 size_t size = inputs.size();
13 if (size <= width)
14 return inputs;
15
16 std::vector<BlockInputStreams> partitions(width);
17
18 using Distribution = std::vector<size_t>;
19 Distribution distribution(size);
20
21 for (size_t i = 0; i < size; ++i)
22 distribution[i] = i % width;
23
24 std::shuffle(distribution.begin(), distribution.end(), thread_local_rng);
25
26 for (size_t i = 0; i < size; ++i)
27 partitions[distribution[i]].push_back(inputs[i]);
28
29 BlockInputStreams res(width);
30 for (size_t i = 0; i < width; ++i)
31 res[i] = std::make_shared<ConcatBlockInputStream>(partitions[i]);
32
33 return res;
34}
35
36}
37