1#pragma once
2
3#include <map>
4#include <mutex>
5#include <condition_variable>
6#include <memory>
7#include <chrono>
8#include <Common/CurrentMetrics.h>
9#include <Common/Stopwatch.h>
10
11
12namespace CurrentMetrics
13{
14 extern const Metric QueryPreempted;
15}
16
17
18namespace DB
19{
20
21/** Implements query priorities in very primitive way.
22 * Allows to freeze query execution if at least one query of higher priority is executed.
23 *
24 * Priority value is integer, smaller means higher priority.
25 *
26 * Priority 0 is special - queries with that priority is always executed,
27 * not depends on other queries and not affect other queries.
28 * Thus 0 means - don't use priorities.
29 *
30 * NOTE Possibilities for improvement:
31 * - implement limit on maximum number of running queries with same priority.
32 */
33class QueryPriorities
34{
35public:
36 using Priority = int;
37
38private:
39 friend struct Handle;
40
41 using Count = int;
42
43 /// Number of currently running queries for each priority.
44 using Container = std::map<Priority, Count>;
45
46 std::mutex mutex;
47 std::condition_variable condvar;
48 Container container;
49
50
51 /** If there are higher priority queries - sleep until they are finish or timeout happens.
52 */
53 template <typename Duration>
54 void waitIfNeed(Priority priority, Duration timeout)
55 {
56 if (0 == priority)
57 return;
58
59 std::unique_lock lock(mutex);
60
61 /// Is there at least one more priority query?
62 bool found = false;
63 for (const auto & value : container)
64 {
65 if (value.first >= priority)
66 break;
67
68 if (value.second > 0)
69 {
70 found = true;
71 break;
72 }
73 }
74
75 if (!found)
76 return;
77
78 CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryPreempted};
79
80 /// Spurious wakeups are Ok. We allow to wait less than requested.
81 condvar.wait_for(lock, timeout);
82 }
83
84public:
85 struct HandleImpl
86 {
87 private:
88 QueryPriorities & parent;
89 QueryPriorities::Container::value_type & value;
90
91 public:
92 HandleImpl(QueryPriorities & parent_, QueryPriorities::Container::value_type & value_)
93 : parent(parent_), value(value_) {}
94
95 ~HandleImpl()
96 {
97 {
98 std::lock_guard lock(parent.mutex);
99 --value.second;
100 }
101 parent.condvar.notify_all();
102 }
103
104 template <typename Duration>
105 void waitIfNeed(Duration timeout)
106 {
107 parent.waitIfNeed(value.first, timeout);
108 }
109 };
110
111 using Handle = std::shared_ptr<HandleImpl>;
112
113 /** Register query with specified priority.
114 * Returns an object that remove record in destructor.
115 */
116 Handle insert(Priority priority)
117 {
118 if (0 == priority)
119 return {};
120
121 std::lock_guard lock(mutex);
122 auto it = container.emplace(priority, 0).first;
123 ++it->second;
124 return std::make_shared<HandleImpl>(*this, *it);
125 }
126};
127
128}
129