1 | #include <Access/MemoryAccessStorage.h> |
2 | #include <ext/scope_guard.h> |
3 | #include <unordered_set> |
4 | |
5 | |
6 | namespace DB |
7 | { |
8 | MemoryAccessStorage::MemoryAccessStorage(const String & storage_name_) |
9 | : IAccessStorage(storage_name_), shared_ptr_to_this{std::make_shared<const MemoryAccessStorage *>(this)} |
10 | { |
11 | } |
12 | |
13 | |
14 | MemoryAccessStorage::~MemoryAccessStorage() {} |
15 | |
16 | |
17 | std::optional<UUID> MemoryAccessStorage::findImpl(std::type_index type, const String & name) const |
18 | { |
19 | std::lock_guard lock{mutex}; |
20 | auto it = names.find({name, type}); |
21 | if (it == names.end()) |
22 | return {}; |
23 | |
24 | Entry & entry = *(it->second); |
25 | return entry.id; |
26 | } |
27 | |
28 | |
29 | std::vector<UUID> MemoryAccessStorage::findAllImpl(std::type_index type) const |
30 | { |
31 | std::lock_guard lock{mutex}; |
32 | std::vector<UUID> result; |
33 | result.reserve(entries.size()); |
34 | for (const auto & [id, entry] : entries) |
35 | if (entry.entity->isTypeOf(type)) |
36 | result.emplace_back(id); |
37 | return result; |
38 | } |
39 | |
40 | |
41 | bool MemoryAccessStorage::existsImpl(const UUID & id) const |
42 | { |
43 | std::lock_guard lock{mutex}; |
44 | return entries.count(id); |
45 | } |
46 | |
47 | |
48 | AccessEntityPtr MemoryAccessStorage::readImpl(const UUID & id) const |
49 | { |
50 | std::lock_guard lock{mutex}; |
51 | auto it = entries.find(id); |
52 | if (it == entries.end()) |
53 | throwNotFound(id); |
54 | const Entry & entry = it->second; |
55 | return entry.entity; |
56 | } |
57 | |
58 | |
59 | String MemoryAccessStorage::readNameImpl(const UUID & id) const |
60 | { |
61 | return readImpl(id)->getFullName(); |
62 | } |
63 | |
64 | |
65 | UUID MemoryAccessStorage::insertImpl(const AccessEntityPtr & new_entity, bool replace_if_exists) |
66 | { |
67 | Notifications notifications; |
68 | SCOPE_EXIT({ notify(notifications); }); |
69 | |
70 | UUID id = generateRandomID(); |
71 | std::lock_guard lock{mutex}; |
72 | insertNoLock(generateRandomID(), new_entity, replace_if_exists, notifications); |
73 | return id; |
74 | } |
75 | |
76 | |
77 | void MemoryAccessStorage::insertNoLock(const UUID & id, const AccessEntityPtr & new_entity, bool replace_if_exists, Notifications & notifications) |
78 | { |
79 | const String & name = new_entity->getFullName(); |
80 | std::type_index type = new_entity->getType(); |
81 | |
82 | /// Check that we can insert. |
83 | auto it = entries.find(id); |
84 | if (it != entries.end()) |
85 | { |
86 | const auto & existing_entry = it->second; |
87 | throwIDCollisionCannotInsert(id, type, name, existing_entry.entity->getType(), existing_entry.entity->getFullName()); |
88 | } |
89 | |
90 | auto it2 = names.find({name, type}); |
91 | if (it2 != names.end()) |
92 | { |
93 | const auto & existing_entry = *(it2->second); |
94 | if (replace_if_exists) |
95 | removeNoLock(existing_entry.id, notifications); |
96 | else |
97 | throwNameCollisionCannotInsert(type, name); |
98 | } |
99 | |
100 | /// Do insertion. |
101 | auto & entry = entries[id]; |
102 | entry.id = id; |
103 | entry.entity = new_entity; |
104 | names[std::pair{name, type}] = &entry; |
105 | prepareNotifications(entry, false, notifications); |
106 | } |
107 | |
108 | |
109 | void MemoryAccessStorage::removeImpl(const UUID & id) |
110 | { |
111 | Notifications notifications; |
112 | SCOPE_EXIT({ notify(notifications); }); |
113 | |
114 | std::lock_guard lock{mutex}; |
115 | removeNoLock(id, notifications); |
116 | } |
117 | |
118 | |
119 | void MemoryAccessStorage::removeNoLock(const UUID & id, Notifications & notifications) |
120 | { |
121 | auto it = entries.find(id); |
122 | if (it == entries.end()) |
123 | throwNotFound(id); |
124 | |
125 | Entry & entry = it->second; |
126 | const String & name = entry.entity->getFullName(); |
127 | std::type_index type = entry.entity->getType(); |
128 | |
129 | prepareNotifications(entry, true, notifications); |
130 | |
131 | /// Do removing. |
132 | names.erase({name, type}); |
133 | entries.erase(it); |
134 | } |
135 | |
136 | |
137 | void MemoryAccessStorage::updateImpl(const UUID & id, const UpdateFunc & update_func) |
138 | { |
139 | Notifications notifications; |
140 | SCOPE_EXIT({ notify(notifications); }); |
141 | |
142 | std::lock_guard lock{mutex}; |
143 | updateNoLock(id, update_func, notifications); |
144 | } |
145 | |
146 | |
147 | void MemoryAccessStorage::updateNoLock(const UUID & id, const UpdateFunc & update_func, Notifications & notifications) |
148 | { |
149 | auto it = entries.find(id); |
150 | if (it == entries.end()) |
151 | throwNotFound(id); |
152 | |
153 | Entry & entry = it->second; |
154 | auto old_entity = entry.entity; |
155 | auto new_entity = update_func(old_entity); |
156 | |
157 | if (*new_entity == *old_entity) |
158 | return; |
159 | |
160 | entry.entity = new_entity; |
161 | |
162 | if (new_entity->getFullName() != old_entity->getFullName()) |
163 | { |
164 | auto it2 = names.find({new_entity->getFullName(), new_entity->getType()}); |
165 | if (it2 != names.end()) |
166 | throwNameCollisionCannotRename(old_entity->getType(), old_entity->getFullName(), new_entity->getFullName()); |
167 | |
168 | names.erase({old_entity->getFullName(), old_entity->getType()}); |
169 | names[std::pair{new_entity->getFullName(), new_entity->getType()}] = &entry; |
170 | } |
171 | |
172 | prepareNotifications(entry, false, notifications); |
173 | } |
174 | |
175 | |
176 | void MemoryAccessStorage::setAll(const std::vector<AccessEntityPtr> & all_entities) |
177 | { |
178 | std::vector<std::pair<UUID, AccessEntityPtr>> entities_with_ids; |
179 | entities_with_ids.reserve(all_entities.size()); |
180 | for (const auto & entity : all_entities) |
181 | entities_with_ids.emplace_back(generateRandomID(), entity); |
182 | setAll(entities_with_ids); |
183 | } |
184 | |
185 | |
186 | void MemoryAccessStorage::setAll(const std::vector<std::pair<UUID, AccessEntityPtr>> & all_entities) |
187 | { |
188 | Notifications notifications; |
189 | SCOPE_EXIT({ notify(notifications); }); |
190 | |
191 | std::lock_guard lock{mutex}; |
192 | setAllNoLock(all_entities, notifications); |
193 | } |
194 | |
195 | |
196 | void MemoryAccessStorage::setAllNoLock(const std::vector<std::pair<UUID, AccessEntityPtr>> & all_entities, Notifications & notifications) |
197 | { |
198 | /// Get list of the currently used IDs. Later we will remove those of them which are not used anymore. |
199 | std::unordered_set<UUID> not_used_ids; |
200 | for (const auto & id_and_entry : entries) |
201 | not_used_ids.emplace(id_and_entry.first); |
202 | |
203 | /// Remove conflicting entities. |
204 | for (const auto & [id, entity] : all_entities) |
205 | { |
206 | auto it = entries.find(id); |
207 | if (it != entries.end()) |
208 | { |
209 | not_used_ids.erase(id); /// ID is used. |
210 | Entry & entry = it->second; |
211 | if (entry.entity->getType() != entity->getType()) |
212 | { |
213 | removeNoLock(id, notifications); |
214 | continue; |
215 | } |
216 | } |
217 | auto it2 = names.find({entity->getFullName(), entity->getType()}); |
218 | if (it2 != names.end()) |
219 | { |
220 | Entry & entry = *(it2->second); |
221 | if (entry.id != id) |
222 | removeNoLock(id, notifications); |
223 | } |
224 | } |
225 | |
226 | /// Remove entities which are not used anymore. |
227 | for (const auto & id : not_used_ids) |
228 | removeNoLock(id, notifications); |
229 | |
230 | /// Insert or update entities. |
231 | for (const auto & [id, entity] : all_entities) |
232 | { |
233 | auto it = entries.find(id); |
234 | if (it != entries.end()) |
235 | { |
236 | if (*(it->second.entity) != *entity) |
237 | { |
238 | const AccessEntityPtr & changed_entity = entity; |
239 | updateNoLock(id, [&changed_entity](const AccessEntityPtr &) { return changed_entity; }, notifications); |
240 | } |
241 | } |
242 | else |
243 | insertNoLock(id, entity, false, notifications); |
244 | } |
245 | } |
246 | |
247 | |
248 | void MemoryAccessStorage::prepareNotifications(const Entry & entry, bool remove, Notifications & notifications) const |
249 | { |
250 | for (const auto & handler : entry.handlers_by_id) |
251 | notifications.push_back({handler, entry.id, remove ? nullptr : entry.entity}); |
252 | |
253 | auto range = handlers_by_type.equal_range(entry.entity->getType()); |
254 | for (auto it = range.first; it != range.second; ++it) |
255 | notifications.push_back({it->second, entry.id, remove ? nullptr : entry.entity}); |
256 | } |
257 | |
258 | |
259 | IAccessStorage::SubscriptionPtr MemoryAccessStorage::subscribeForChangesImpl(std::type_index type, const OnChangedHandler & handler) const |
260 | { |
261 | class SubscriptionImpl : public Subscription |
262 | { |
263 | public: |
264 | SubscriptionImpl( |
265 | const MemoryAccessStorage & storage_, |
266 | std::type_index type_, |
267 | const OnChangedHandler & handler_) |
268 | : storage_weak(storage_.shared_ptr_to_this) |
269 | { |
270 | std::lock_guard lock{storage_.mutex}; |
271 | handler_it = storage_.handlers_by_type.emplace(type_, handler_); |
272 | } |
273 | |
274 | ~SubscriptionImpl() override |
275 | { |
276 | auto storage = storage_weak.lock(); |
277 | if (storage) |
278 | { |
279 | std::lock_guard lock{(*storage)->mutex}; |
280 | (*storage)->handlers_by_type.erase(handler_it); |
281 | } |
282 | } |
283 | |
284 | private: |
285 | std::weak_ptr<const MemoryAccessStorage *> storage_weak; |
286 | std::unordered_multimap<std::type_index, OnChangedHandler>::iterator handler_it; |
287 | }; |
288 | |
289 | return std::make_unique<SubscriptionImpl>(*this, type, handler); |
290 | } |
291 | |
292 | |
293 | IAccessStorage::SubscriptionPtr MemoryAccessStorage::subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const |
294 | { |
295 | class SubscriptionImpl : public Subscription |
296 | { |
297 | public: |
298 | SubscriptionImpl( |
299 | const MemoryAccessStorage & storage_, |
300 | const UUID & id_, |
301 | const OnChangedHandler & handler_) |
302 | : storage_weak(storage_.shared_ptr_to_this), |
303 | id(id_) |
304 | { |
305 | std::lock_guard lock{storage_.mutex}; |
306 | auto it = storage_.entries.find(id); |
307 | if (it == storage_.entries.end()) |
308 | { |
309 | storage_weak.reset(); |
310 | return; |
311 | } |
312 | const Entry & entry = it->second; |
313 | handler_it = entry.handlers_by_id.insert(entry.handlers_by_id.end(), handler_); |
314 | } |
315 | |
316 | ~SubscriptionImpl() override |
317 | { |
318 | auto storage = storage_weak.lock(); |
319 | if (storage) |
320 | { |
321 | std::lock_guard lock{(*storage)->mutex}; |
322 | auto it = (*storage)->entries.find(id); |
323 | if (it != (*storage)->entries.end()) |
324 | { |
325 | const Entry & entry = it->second; |
326 | entry.handlers_by_id.erase(handler_it); |
327 | } |
328 | } |
329 | } |
330 | |
331 | private: |
332 | std::weak_ptr<const MemoryAccessStorage *> storage_weak; |
333 | UUID id; |
334 | std::list<OnChangedHandler>::iterator handler_it; |
335 | }; |
336 | |
337 | return std::make_unique<SubscriptionImpl>(*this, id, handler); |
338 | } |
339 | |
340 | |
341 | bool MemoryAccessStorage::hasSubscriptionImpl(const UUID & id) const |
342 | { |
343 | auto it = entries.find(id); |
344 | if (it != entries.end()) |
345 | { |
346 | const Entry & entry = it->second; |
347 | return !entry.handlers_by_id.empty(); |
348 | } |
349 | return false; |
350 | } |
351 | |
352 | |
353 | bool MemoryAccessStorage::hasSubscriptionImpl(std::type_index type) const |
354 | { |
355 | auto range = handlers_by_type.equal_range(type); |
356 | return range.first != range.second; |
357 | } |
358 | } |
359 | |