1#pragma once
2
3namespace DB
4{
5
6/// Simple struct which stores threads with numbers [0 .. num_threads - 1].
7/// Allows to push and pop specified thread, or pop any thread if has.
8/// Oll operations (except init) are O(1). No memory allocations after init happen.
9struct ThreadsQueue
10{
11 void init(size_t num_threads)
12 {
13 stack_size = 0;
14 stack.clear();
15 thread_pos_in_stack.clear();
16
17 stack.reserve(num_threads);
18 thread_pos_in_stack.reserve(num_threads);
19
20 for (size_t thread = 0; thread < num_threads; ++thread)
21 {
22 stack.emplace_back(thread);
23 thread_pos_in_stack.emplace_back(thread);
24 }
25 }
26
27 bool has(size_t thread) const { return thread_pos_in_stack[thread] < stack_size; }
28 size_t size() const { return stack_size; }
29 bool empty() const { return stack_size == 0; }
30
31 void push(size_t thread)
32 {
33 if (unlikely(has(thread)))
34 throw Exception("Can't push thread because it is already in threads queue.", ErrorCodes::LOGICAL_ERROR);
35
36 swap_threads(thread, stack[stack_size]);
37 ++stack_size;
38 }
39
40 void pop(size_t thread)
41 {
42 if (unlikely(!has(thread)))
43 throw Exception("Can't pop thread because it is not in threads queue.", ErrorCodes::LOGICAL_ERROR);
44
45 --stack_size;
46 swap_threads(thread, stack[stack_size]);
47 }
48
49 size_t pop_any()
50 {
51 if (unlikely(stack_size == 0))
52 throw Exception("Can't pop from empty queue.", ErrorCodes::LOGICAL_ERROR);
53
54 --stack_size;
55 return stack[stack_size];
56 }
57
58private:
59 std::vector<size_t> stack;
60 std::vector<size_t> thread_pos_in_stack;
61 size_t stack_size = 0;
62
63 void swap_threads(size_t first, size_t second)
64 {
65 std::swap(thread_pos_in_stack[first], thread_pos_in_stack[second]);
66 std::swap(stack[thread_pos_in_stack[first]], stack[thread_pos_in_stack[second]]);
67 }
68};
69
70}
71