1 | #include "ActionLocksManager.h" |
---|---|
2 | #include <Interpreters/Context.h> |
3 | #include <Databases/IDatabase.h> |
4 | #include <Storages/IStorage.h> |
5 | |
6 | |
7 | namespace DB |
8 | { |
9 | |
10 | namespace ActionLocks |
11 | { |
12 | extern const StorageActionBlockType PartsMerge = 1; |
13 | extern const StorageActionBlockType PartsFetch = 2; |
14 | extern const StorageActionBlockType PartsSend = 3; |
15 | extern const StorageActionBlockType ReplicationQueue = 4; |
16 | extern const StorageActionBlockType DistributedSend = 5; |
17 | extern const StorageActionBlockType PartsTTLMerge = 6; |
18 | extern const StorageActionBlockType PartsMove = 7; |
19 | } |
20 | |
21 | |
22 | template <typename F> |
23 | inline void forEachTable(Context & context, F && f) |
24 | { |
25 | for (auto & elem : context.getDatabases()) |
26 | for (auto iterator = elem.second->getTablesIterator(context); iterator->isValid(); iterator->next()) |
27 | f(iterator->table()); |
28 | |
29 | } |
30 | |
31 | void ActionLocksManager::add(StorageActionBlockType action_type) |
32 | { |
33 | forEachTable(global_context, [&] (const StoragePtr & table) |
34 | { |
35 | ActionLock action_lock = table->getActionLock(action_type); |
36 | |
37 | if (!action_lock.expired()) |
38 | { |
39 | std::lock_guard lock(mutex); |
40 | storage_locks[table.get()][action_type] = std::move(action_lock); |
41 | } |
42 | }); |
43 | } |
44 | |
45 | void ActionLocksManager::add(const String & database_name, const String & table_name, StorageActionBlockType action_type) |
46 | { |
47 | if (auto table = global_context.tryGetTable(database_name, table_name)) |
48 | { |
49 | ActionLock action_lock = table->getActionLock(action_type); |
50 | |
51 | if (!action_lock.expired()) |
52 | { |
53 | std::lock_guard lock(mutex); |
54 | storage_locks[table.get()][action_type] = std::move(action_lock); |
55 | } |
56 | } |
57 | } |
58 | |
59 | void ActionLocksManager::remove(StorageActionBlockType action_type) |
60 | { |
61 | std::lock_guard lock(mutex); |
62 | |
63 | for (auto & storage_elem : storage_locks) |
64 | storage_elem.second.erase(action_type); |
65 | } |
66 | |
67 | void ActionLocksManager::remove(const String & database_name, const String & table_name, StorageActionBlockType action_type) |
68 | { |
69 | if (auto table = global_context.tryGetTable(database_name, table_name)) |
70 | { |
71 | std::lock_guard lock(mutex); |
72 | |
73 | if (storage_locks.count(table.get())) |
74 | storage_locks[table.get()].erase(action_type); |
75 | } |
76 | } |
77 | |
78 | void ActionLocksManager::cleanExpired() |
79 | { |
80 | std::lock_guard lock(mutex); |
81 | |
82 | for (auto it_storage = storage_locks.begin(); it_storage != storage_locks.end();) |
83 | { |
84 | auto & locks = it_storage->second; |
85 | for (auto it_lock = locks.begin(); it_lock != locks.end();) |
86 | { |
87 | if (it_lock->second.expired()) |
88 | it_lock = locks.erase(it_lock); |
89 | else |
90 | ++it_lock; |
91 | } |
92 | |
93 | if (locks.empty()) |
94 | it_storage = storage_locks.erase(it_storage); |
95 | else |
96 | ++it_storage; |
97 | } |
98 | } |
99 | |
100 | } |
101 |