1#include "ActionLocksManager.h"
2#include <Interpreters/Context.h>
3#include <Databases/IDatabase.h>
4#include <Storages/IStorage.h>
5
6
7namespace DB
8{
9
10namespace ActionLocks
11{
12 extern const StorageActionBlockType PartsMerge = 1;
13 extern const StorageActionBlockType PartsFetch = 2;
14 extern const StorageActionBlockType PartsSend = 3;
15 extern const StorageActionBlockType ReplicationQueue = 4;
16 extern const StorageActionBlockType DistributedSend = 5;
17 extern const StorageActionBlockType PartsTTLMerge = 6;
18 extern const StorageActionBlockType PartsMove = 7;
19}
20
21
22template <typename F>
23inline void forEachTable(Context & context, F && f)
24{
25 for (auto & elem : context.getDatabases())
26 for (auto iterator = elem.second->getTablesIterator(context); iterator->isValid(); iterator->next())
27 f(iterator->table());
28
29}
30
31void ActionLocksManager::add(StorageActionBlockType action_type)
32{
33 forEachTable(global_context, [&] (const StoragePtr & table)
34 {
35 ActionLock action_lock = table->getActionLock(action_type);
36
37 if (!action_lock.expired())
38 {
39 std::lock_guard lock(mutex);
40 storage_locks[table.get()][action_type] = std::move(action_lock);
41 }
42 });
43}
44
45void ActionLocksManager::add(const String & database_name, const String & table_name, StorageActionBlockType action_type)
46{
47 if (auto table = global_context.tryGetTable(database_name, table_name))
48 {
49 ActionLock action_lock = table->getActionLock(action_type);
50
51 if (!action_lock.expired())
52 {
53 std::lock_guard lock(mutex);
54 storage_locks[table.get()][action_type] = std::move(action_lock);
55 }
56 }
57}
58
59void ActionLocksManager::remove(StorageActionBlockType action_type)
60{
61 std::lock_guard lock(mutex);
62
63 for (auto & storage_elem : storage_locks)
64 storage_elem.second.erase(action_type);
65}
66
67void ActionLocksManager::remove(const String & database_name, const String & table_name, StorageActionBlockType action_type)
68{
69 if (auto table = global_context.tryGetTable(database_name, table_name))
70 {
71 std::lock_guard lock(mutex);
72
73 if (storage_locks.count(table.get()))
74 storage_locks[table.get()].erase(action_type);
75 }
76}
77
78void ActionLocksManager::cleanExpired()
79{
80 std::lock_guard lock(mutex);
81
82 for (auto it_storage = storage_locks.begin(); it_storage != storage_locks.end();)
83 {
84 auto & locks = it_storage->second;
85 for (auto it_lock = locks.begin(); it_lock != locks.end();)
86 {
87 if (it_lock->second.expired())
88 it_lock = locks.erase(it_lock);
89 else
90 ++it_lock;
91 }
92
93 if (locks.empty())
94 it_storage = storage_locks.erase(it_storage);
95 else
96 ++it_storage;
97 }
98}
99
100}
101