1#include <Access/MemoryAccessStorage.h>
2#include <ext/scope_guard.h>
3#include <unordered_set>
4
5
6namespace DB
7{
8MemoryAccessStorage::MemoryAccessStorage(const String & storage_name_)
9 : IAccessStorage(storage_name_), shared_ptr_to_this{std::make_shared<const MemoryAccessStorage *>(this)}
10{
11}
12
13
14MemoryAccessStorage::~MemoryAccessStorage() {}
15
16
17std::optional<UUID> MemoryAccessStorage::findImpl(std::type_index type, const String & name) const
18{
19 std::lock_guard lock{mutex};
20 auto it = names.find({name, type});
21 if (it == names.end())
22 return {};
23
24 Entry & entry = *(it->second);
25 return entry.id;
26}
27
28
29std::vector<UUID> MemoryAccessStorage::findAllImpl(std::type_index type) const
30{
31 std::lock_guard lock{mutex};
32 std::vector<UUID> result;
33 result.reserve(entries.size());
34 for (const auto & [id, entry] : entries)
35 if (entry.entity->isTypeOf(type))
36 result.emplace_back(id);
37 return result;
38}
39
40
41bool MemoryAccessStorage::existsImpl(const UUID & id) const
42{
43 std::lock_guard lock{mutex};
44 return entries.count(id);
45}
46
47
48AccessEntityPtr MemoryAccessStorage::readImpl(const UUID & id) const
49{
50 std::lock_guard lock{mutex};
51 auto it = entries.find(id);
52 if (it == entries.end())
53 throwNotFound(id);
54 const Entry & entry = it->second;
55 return entry.entity;
56}
57
58
59String MemoryAccessStorage::readNameImpl(const UUID & id) const
60{
61 return readImpl(id)->getFullName();
62}
63
64
65UUID MemoryAccessStorage::insertImpl(const AccessEntityPtr & new_entity, bool replace_if_exists)
66{
67 Notifications notifications;
68 SCOPE_EXIT({ notify(notifications); });
69
70 UUID id = generateRandomID();
71 std::lock_guard lock{mutex};
72 insertNoLock(generateRandomID(), new_entity, replace_if_exists, notifications);
73 return id;
74}
75
76
77void MemoryAccessStorage::insertNoLock(const UUID & id, const AccessEntityPtr & new_entity, bool replace_if_exists, Notifications & notifications)
78{
79 const String & name = new_entity->getFullName();
80 std::type_index type = new_entity->getType();
81
82 /// Check that we can insert.
83 auto it = entries.find(id);
84 if (it != entries.end())
85 {
86 const auto & existing_entry = it->second;
87 throwIDCollisionCannotInsert(id, type, name, existing_entry.entity->getType(), existing_entry.entity->getFullName());
88 }
89
90 auto it2 = names.find({name, type});
91 if (it2 != names.end())
92 {
93 const auto & existing_entry = *(it2->second);
94 if (replace_if_exists)
95 removeNoLock(existing_entry.id, notifications);
96 else
97 throwNameCollisionCannotInsert(type, name);
98 }
99
100 /// Do insertion.
101 auto & entry = entries[id];
102 entry.id = id;
103 entry.entity = new_entity;
104 names[std::pair{name, type}] = &entry;
105 prepareNotifications(entry, false, notifications);
106}
107
108
109void MemoryAccessStorage::removeImpl(const UUID & id)
110{
111 Notifications notifications;
112 SCOPE_EXIT({ notify(notifications); });
113
114 std::lock_guard lock{mutex};
115 removeNoLock(id, notifications);
116}
117
118
119void MemoryAccessStorage::removeNoLock(const UUID & id, Notifications & notifications)
120{
121 auto it = entries.find(id);
122 if (it == entries.end())
123 throwNotFound(id);
124
125 Entry & entry = it->second;
126 const String & name = entry.entity->getFullName();
127 std::type_index type = entry.entity->getType();
128
129 prepareNotifications(entry, true, notifications);
130
131 /// Do removing.
132 names.erase({name, type});
133 entries.erase(it);
134}
135
136
137void MemoryAccessStorage::updateImpl(const UUID & id, const UpdateFunc & update_func)
138{
139 Notifications notifications;
140 SCOPE_EXIT({ notify(notifications); });
141
142 std::lock_guard lock{mutex};
143 updateNoLock(id, update_func, notifications);
144}
145
146
147void MemoryAccessStorage::updateNoLock(const UUID & id, const UpdateFunc & update_func, Notifications & notifications)
148{
149 auto it = entries.find(id);
150 if (it == entries.end())
151 throwNotFound(id);
152
153 Entry & entry = it->second;
154 auto old_entity = entry.entity;
155 auto new_entity = update_func(old_entity);
156
157 if (*new_entity == *old_entity)
158 return;
159
160 entry.entity = new_entity;
161
162 if (new_entity->getFullName() != old_entity->getFullName())
163 {
164 auto it2 = names.find({new_entity->getFullName(), new_entity->getType()});
165 if (it2 != names.end())
166 throwNameCollisionCannotRename(old_entity->getType(), old_entity->getFullName(), new_entity->getFullName());
167
168 names.erase({old_entity->getFullName(), old_entity->getType()});
169 names[std::pair{new_entity->getFullName(), new_entity->getType()}] = &entry;
170 }
171
172 prepareNotifications(entry, false, notifications);
173}
174
175
176void MemoryAccessStorage::setAll(const std::vector<AccessEntityPtr> & all_entities)
177{
178 std::vector<std::pair<UUID, AccessEntityPtr>> entities_with_ids;
179 entities_with_ids.reserve(all_entities.size());
180 for (const auto & entity : all_entities)
181 entities_with_ids.emplace_back(generateRandomID(), entity);
182 setAll(entities_with_ids);
183}
184
185
186void MemoryAccessStorage::setAll(const std::vector<std::pair<UUID, AccessEntityPtr>> & all_entities)
187{
188 Notifications notifications;
189 SCOPE_EXIT({ notify(notifications); });
190
191 std::lock_guard lock{mutex};
192 setAllNoLock(all_entities, notifications);
193}
194
195
196void MemoryAccessStorage::setAllNoLock(const std::vector<std::pair<UUID, AccessEntityPtr>> & all_entities, Notifications & notifications)
197{
198 /// Get list of the currently used IDs. Later we will remove those of them which are not used anymore.
199 std::unordered_set<UUID> not_used_ids;
200 for (const auto & id_and_entry : entries)
201 not_used_ids.emplace(id_and_entry.first);
202
203 /// Remove conflicting entities.
204 for (const auto & [id, entity] : all_entities)
205 {
206 auto it = entries.find(id);
207 if (it != entries.end())
208 {
209 not_used_ids.erase(id); /// ID is used.
210 Entry & entry = it->second;
211 if (entry.entity->getType() != entity->getType())
212 {
213 removeNoLock(id, notifications);
214 continue;
215 }
216 }
217 auto it2 = names.find({entity->getFullName(), entity->getType()});
218 if (it2 != names.end())
219 {
220 Entry & entry = *(it2->second);
221 if (entry.id != id)
222 removeNoLock(id, notifications);
223 }
224 }
225
226 /// Remove entities which are not used anymore.
227 for (const auto & id : not_used_ids)
228 removeNoLock(id, notifications);
229
230 /// Insert or update entities.
231 for (const auto & [id, entity] : all_entities)
232 {
233 auto it = entries.find(id);
234 if (it != entries.end())
235 {
236 if (*(it->second.entity) != *entity)
237 {
238 const AccessEntityPtr & changed_entity = entity;
239 updateNoLock(id, [&changed_entity](const AccessEntityPtr &) { return changed_entity; }, notifications);
240 }
241 }
242 else
243 insertNoLock(id, entity, false, notifications);
244 }
245}
246
247
248void MemoryAccessStorage::prepareNotifications(const Entry & entry, bool remove, Notifications & notifications) const
249{
250 for (const auto & handler : entry.handlers_by_id)
251 notifications.push_back({handler, entry.id, remove ? nullptr : entry.entity});
252
253 auto range = handlers_by_type.equal_range(entry.entity->getType());
254 for (auto it = range.first; it != range.second; ++it)
255 notifications.push_back({it->second, entry.id, remove ? nullptr : entry.entity});
256}
257
258
259IAccessStorage::SubscriptionPtr MemoryAccessStorage::subscribeForChangesImpl(std::type_index type, const OnChangedHandler & handler) const
260{
261 class SubscriptionImpl : public Subscription
262 {
263 public:
264 SubscriptionImpl(
265 const MemoryAccessStorage & storage_,
266 std::type_index type_,
267 const OnChangedHandler & handler_)
268 : storage_weak(storage_.shared_ptr_to_this)
269 {
270 std::lock_guard lock{storage_.mutex};
271 handler_it = storage_.handlers_by_type.emplace(type_, handler_);
272 }
273
274 ~SubscriptionImpl() override
275 {
276 auto storage = storage_weak.lock();
277 if (storage)
278 {
279 std::lock_guard lock{(*storage)->mutex};
280 (*storage)->handlers_by_type.erase(handler_it);
281 }
282 }
283
284 private:
285 std::weak_ptr<const MemoryAccessStorage *> storage_weak;
286 std::unordered_multimap<std::type_index, OnChangedHandler>::iterator handler_it;
287 };
288
289 return std::make_unique<SubscriptionImpl>(*this, type, handler);
290}
291
292
293IAccessStorage::SubscriptionPtr MemoryAccessStorage::subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const
294{
295 class SubscriptionImpl : public Subscription
296 {
297 public:
298 SubscriptionImpl(
299 const MemoryAccessStorage & storage_,
300 const UUID & id_,
301 const OnChangedHandler & handler_)
302 : storage_weak(storage_.shared_ptr_to_this),
303 id(id_)
304 {
305 std::lock_guard lock{storage_.mutex};
306 auto it = storage_.entries.find(id);
307 if (it == storage_.entries.end())
308 {
309 storage_weak.reset();
310 return;
311 }
312 const Entry & entry = it->second;
313 handler_it = entry.handlers_by_id.insert(entry.handlers_by_id.end(), handler_);
314 }
315
316 ~SubscriptionImpl() override
317 {
318 auto storage = storage_weak.lock();
319 if (storage)
320 {
321 std::lock_guard lock{(*storage)->mutex};
322 auto it = (*storage)->entries.find(id);
323 if (it != (*storage)->entries.end())
324 {
325 const Entry & entry = it->second;
326 entry.handlers_by_id.erase(handler_it);
327 }
328 }
329 }
330
331 private:
332 std::weak_ptr<const MemoryAccessStorage *> storage_weak;
333 UUID id;
334 std::list<OnChangedHandler>::iterator handler_it;
335 };
336
337 return std::make_unique<SubscriptionImpl>(*this, id, handler);
338}
339
340
341bool MemoryAccessStorage::hasSubscriptionImpl(const UUID & id) const
342{
343 auto it = entries.find(id);
344 if (it != entries.end())
345 {
346 const Entry & entry = it->second;
347 return !entry.handlers_by_id.empty();
348 }
349 return false;
350}
351
352
353bool MemoryAccessStorage::hasSubscriptionImpl(std::type_index type) const
354{
355 auto range = handlers_by_type.equal_range(type);
356 return range.first != range.second;
357}
358}
359