1#pragma once
2
3#include <mutex>
4#include <condition_variable>
5#include <Poco/Timespan.h>
6#include <boost/noncopyable.hpp>
7
8#include <common/logger_useful.h>
9#include <Common/Exception.h>
10
11
12namespace DB
13{
14 namespace ErrorCodes
15 {
16 extern const int LOGICAL_ERROR;
17 }
18}
19
20/** A class from which you can inherit and get a pool of something. Used for database connection pools.
21 * Descendant class must provide a method for creating a new object to place in the pool.
22 */
23
24template <typename TObject>
25class PoolBase : private boost::noncopyable
26{
27public:
28 using Object = TObject;
29 using ObjectPtr = std::shared_ptr<Object>;
30 using Ptr = std::shared_ptr<PoolBase<TObject>>;
31
32private:
33
34 /** The object with the flag, whether it is currently used. */
35 struct PooledObject
36 {
37 PooledObject(ObjectPtr object_, PoolBase & pool_)
38 : object(object_), pool(pool_)
39 {
40 }
41
42 ObjectPtr object;
43 bool in_use = false;
44 PoolBase & pool;
45 };
46
47 using Objects = std::vector<std::shared_ptr<PooledObject>>;
48
49 /** The helper, which sets the flag for using the object, and in the destructor - removes,
50 * and also notifies the event using condvar.
51 */
52 struct PoolEntryHelper
53 {
54 PoolEntryHelper(PooledObject & data_) : data(data_) { data.in_use = true; }
55 ~PoolEntryHelper()
56 {
57 std::unique_lock lock(data.pool.mutex);
58 data.in_use = false;
59 data.pool.available.notify_one();
60 }
61
62 PooledObject & data;
63 };
64
65public:
66 /** What is given to the user. */
67 class Entry
68 {
69 public:
70 friend class PoolBase<Object>;
71
72 Entry() {} /// For deferred initialization.
73
74 /** The `Entry` object protects the resource from being used by another thread.
75 * The following methods are forbidden for `rvalue`, so you can not write a similar to
76 *
77 * auto q = pool.Get()->query("SELECT .."); // Oops, after this line Entry was destroyed
78 * q.execute (); // Someone else can use this Connection
79 */
80 Object * operator->() && = delete;
81 const Object * operator->() const && = delete;
82 Object & operator*() && = delete;
83 const Object & operator*() const && = delete;
84
85 Object * operator->() & { return &*data->data.object; }
86 const Object * operator->() const & { return &*data->data.object; }
87 Object & operator*() & { return *data->data.object; }
88 const Object & operator*() const & { return *data->data.object; }
89
90 bool isNull() const { return data == nullptr; }
91
92 PoolBase * getPool() const
93 {
94 if (!data)
95 throw DB::Exception("Attempt to get pool from uninitialized entry", DB::ErrorCodes::LOGICAL_ERROR);
96 return &data->data.pool;
97 }
98
99 private:
100 std::shared_ptr<PoolEntryHelper> data;
101
102 Entry(PooledObject & object) : data(std::make_shared<PoolEntryHelper>(object)) {}
103 };
104
105 virtual ~PoolBase() {}
106
107 /** Allocates the object. Wait for free object in pool for 'timeout'. With 'timeout' < 0, the timeout is infinite. */
108 Entry get(Poco::Timespan::TimeDiff timeout)
109 {
110 std::unique_lock lock(mutex);
111
112 while (true)
113 {
114 for (auto & item : items)
115 if (!item->in_use)
116 return Entry(*item);
117
118 if (items.size() < max_items)
119 {
120 ObjectPtr object = allocObject();
121 items.emplace_back(std::make_shared<PooledObject>(object, *this));
122 return Entry(*items.back());
123 }
124
125 LOG_INFO(log, "No free connections in pool. Waiting.");
126
127 if (timeout < 0)
128 available.wait(lock);
129 else
130 available.wait_for(lock, std::chrono::microseconds(timeout));
131 }
132 }
133
134 void reserve(size_t count)
135 {
136 std::lock_guard lock(mutex);
137
138 while (items.size() < count)
139 items.emplace_back(std::make_shared<PooledObject>(allocObject(), *this));
140 }
141
142private:
143 /** The maximum size of the pool. */
144 unsigned max_items;
145
146 /** Pool. */
147 Objects items;
148
149 /** Lock to access the pool. */
150 std::mutex mutex;
151 std::condition_variable available;
152
153protected:
154
155 Logger * log;
156
157 PoolBase(unsigned max_items_, Logger * log_)
158 : max_items(max_items_), log(log_)
159 {
160 items.reserve(max_items);
161 }
162
163 /** Creates a new object to put into the pool. */
164 virtual ObjectPtr allocObject() = 0;
165};
166
167