1 | #pragma once |
2 | |
3 | #include <mutex> |
4 | #include <condition_variable> |
5 | #include <Poco/Timespan.h> |
6 | #include <boost/noncopyable.hpp> |
7 | |
8 | #include <common/logger_useful.h> |
9 | #include <Common/Exception.h> |
10 | |
11 | |
12 | namespace DB |
13 | { |
14 | namespace ErrorCodes |
15 | { |
16 | extern const int LOGICAL_ERROR; |
17 | } |
18 | } |
19 | |
20 | /** A class from which you can inherit and get a pool of something. Used for database connection pools. |
21 | * Descendant class must provide a method for creating a new object to place in the pool. |
22 | */ |
23 | |
24 | template <typename TObject> |
25 | class PoolBase : private boost::noncopyable |
26 | { |
27 | public: |
28 | using Object = TObject; |
29 | using ObjectPtr = std::shared_ptr<Object>; |
30 | using Ptr = std::shared_ptr<PoolBase<TObject>>; |
31 | |
32 | private: |
33 | |
34 | /** The object with the flag, whether it is currently used. */ |
35 | struct PooledObject |
36 | { |
37 | PooledObject(ObjectPtr object_, PoolBase & pool_) |
38 | : object(object_), pool(pool_) |
39 | { |
40 | } |
41 | |
42 | ObjectPtr object; |
43 | bool in_use = false; |
44 | PoolBase & pool; |
45 | }; |
46 | |
47 | using Objects = std::vector<std::shared_ptr<PooledObject>>; |
48 | |
49 | /** The helper, which sets the flag for using the object, and in the destructor - removes, |
50 | * and also notifies the event using condvar. |
51 | */ |
52 | struct PoolEntryHelper |
53 | { |
54 | PoolEntryHelper(PooledObject & data_) : data(data_) { data.in_use = true; } |
55 | ~PoolEntryHelper() |
56 | { |
57 | std::unique_lock lock(data.pool.mutex); |
58 | data.in_use = false; |
59 | data.pool.available.notify_one(); |
60 | } |
61 | |
62 | PooledObject & data; |
63 | }; |
64 | |
65 | public: |
66 | /** What is given to the user. */ |
67 | class Entry |
68 | { |
69 | public: |
70 | friend class PoolBase<Object>; |
71 | |
72 | Entry() {} /// For deferred initialization. |
73 | |
74 | /** The `Entry` object protects the resource from being used by another thread. |
75 | * The following methods are forbidden for `rvalue`, so you can not write a similar to |
76 | * |
77 | * auto q = pool.Get()->query("SELECT .."); // Oops, after this line Entry was destroyed |
78 | * q.execute (); // Someone else can use this Connection |
79 | */ |
80 | Object * operator->() && = delete; |
81 | const Object * operator->() const && = delete; |
82 | Object & operator*() && = delete; |
83 | const Object & operator*() const && = delete; |
84 | |
85 | Object * operator->() & { return &*data->data.object; } |
86 | const Object * operator->() const & { return &*data->data.object; } |
87 | Object & operator*() & { return *data->data.object; } |
88 | const Object & operator*() const & { return *data->data.object; } |
89 | |
90 | bool isNull() const { return data == nullptr; } |
91 | |
92 | PoolBase * getPool() const |
93 | { |
94 | if (!data) |
95 | throw DB::Exception("Attempt to get pool from uninitialized entry" , DB::ErrorCodes::LOGICAL_ERROR); |
96 | return &data->data.pool; |
97 | } |
98 | |
99 | private: |
100 | std::shared_ptr<PoolEntryHelper> data; |
101 | |
102 | Entry(PooledObject & object) : data(std::make_shared<PoolEntryHelper>(object)) {} |
103 | }; |
104 | |
105 | virtual ~PoolBase() {} |
106 | |
107 | /** Allocates the object. Wait for free object in pool for 'timeout'. With 'timeout' < 0, the timeout is infinite. */ |
108 | Entry get(Poco::Timespan::TimeDiff timeout) |
109 | { |
110 | std::unique_lock lock(mutex); |
111 | |
112 | while (true) |
113 | { |
114 | for (auto & item : items) |
115 | if (!item->in_use) |
116 | return Entry(*item); |
117 | |
118 | if (items.size() < max_items) |
119 | { |
120 | ObjectPtr object = allocObject(); |
121 | items.emplace_back(std::make_shared<PooledObject>(object, *this)); |
122 | return Entry(*items.back()); |
123 | } |
124 | |
125 | LOG_INFO(log, "No free connections in pool. Waiting." ); |
126 | |
127 | if (timeout < 0) |
128 | available.wait(lock); |
129 | else |
130 | available.wait_for(lock, std::chrono::microseconds(timeout)); |
131 | } |
132 | } |
133 | |
134 | void reserve(size_t count) |
135 | { |
136 | std::lock_guard lock(mutex); |
137 | |
138 | while (items.size() < count) |
139 | items.emplace_back(std::make_shared<PooledObject>(allocObject(), *this)); |
140 | } |
141 | |
142 | private: |
143 | /** The maximum size of the pool. */ |
144 | unsigned max_items; |
145 | |
146 | /** Pool. */ |
147 | Objects items; |
148 | |
149 | /** Lock to access the pool. */ |
150 | std::mutex mutex; |
151 | std::condition_variable available; |
152 | |
153 | protected: |
154 | |
155 | Logger * log; |
156 | |
157 | PoolBase(unsigned max_items_, Logger * log_) |
158 | : max_items(max_items_), log(log_) |
159 | { |
160 | items.reserve(max_items); |
161 | } |
162 | |
163 | /** Creates a new object to put into the pool. */ |
164 | virtual ObjectPtr allocObject() = 0; |
165 | }; |
166 | |
167 | |