1#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
2#include <Common/ZooKeeper/KeeperException.h>
3#include <common/logger_useful.h>
4
5
6namespace DB
7{
8
9namespace ErrorCodes
10{
11 extern const int LOGICAL_ERROR;
12}
13
14EphemeralLockInZooKeeper::EphemeralLockInZooKeeper(
15 const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_, Coordination::Requests * precheck_ops)
16 : zookeeper(&zookeeper_), path_prefix(path_prefix_)
17{
18 /// The /abandonable_lock- name is for backward compatibility.
19 String holder_path_prefix = temp_path + "/abandonable_lock-";
20
21 /// Let's create an secondary ephemeral node.
22 if (!precheck_ops || precheck_ops->empty())
23 {
24 holder_path = zookeeper->create(holder_path_prefix, "", zkutil::CreateMode::EphemeralSequential);
25 }
26 else
27 {
28 precheck_ops->emplace_back(zkutil::makeCreateRequest(holder_path_prefix, "", zkutil::CreateMode::EphemeralSequential));
29 Coordination::Responses op_results = zookeeper->multi(*precheck_ops);
30 holder_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.back()).path_created;
31 }
32
33 /// Write the path to the secondary node in the main node.
34 path = zookeeper->create(path_prefix, holder_path, zkutil::CreateMode::EphemeralSequential);
35
36 if (path.size() <= path_prefix.size())
37 throw Exception("Logical error: name of the main node is shorter than prefix.", ErrorCodes::LOGICAL_ERROR);
38}
39
40void EphemeralLockInZooKeeper::unlock()
41{
42 Coordination::Requests ops;
43 getUnlockOps(ops);
44 zookeeper->multi(ops);
45 holder_path = "";
46}
47
48void EphemeralLockInZooKeeper::getUnlockOps(Coordination::Requests & ops)
49{
50 checkCreated();
51 ops.emplace_back(zkutil::makeRemoveRequest(path, -1));
52 ops.emplace_back(zkutil::makeRemoveRequest(holder_path, -1));
53}
54
55EphemeralLockInZooKeeper::~EphemeralLockInZooKeeper()
56{
57 if (!isCreated())
58 return;
59
60 try
61 {
62 unlock();
63 }
64 catch (...)
65 {
66 tryLogCurrentException("~EphemeralLockInZooKeeper");
67 }
68}
69
70
71EphemeralLocksInAllPartitions::EphemeralLocksInAllPartitions(
72 const String & block_numbers_path, const String & path_prefix, const String & temp_path,
73 zkutil::ZooKeeper & zookeeper_)
74 : zookeeper(zookeeper_)
75{
76 std::vector<String> holders;
77 while (true)
78 {
79 Coordination::Stat partitions_stat;
80 Strings partitions = zookeeper.getChildren(block_numbers_path, &partitions_stat);
81
82 if (holders.size() < partitions.size())
83 {
84 std::vector<std::future<Coordination::CreateResponse>> holder_futures;
85 for (size_t i = 0; i < partitions.size() - holders.size(); ++i)
86 {
87 String path = temp_path + "/abandonable_lock-";
88 holder_futures.push_back(zookeeper.asyncCreate(path, {}, zkutil::CreateMode::EphemeralSequential));
89 }
90 for (auto & future : holder_futures)
91 {
92 auto resp = future.get();
93 holders.push_back(resp.path_created);
94 }
95 }
96
97 Coordination::Requests lock_ops;
98 for (size_t i = 0; i < partitions.size(); ++i)
99 {
100 String partition_path_prefix = block_numbers_path + "/" + partitions[i] + "/" + path_prefix;
101 lock_ops.push_back(zkutil::makeCreateRequest(
102 partition_path_prefix, holders[i], zkutil::CreateMode::EphemeralSequential));
103 }
104 lock_ops.push_back(zkutil::makeCheckRequest(block_numbers_path, partitions_stat.version));
105
106 Coordination::Responses lock_responses;
107 int rc = zookeeper.tryMulti(lock_ops, lock_responses);
108 if (rc == Coordination::ZBADVERSION)
109 {
110 LOG_TRACE(&Logger::get("EphemeralLocksInAllPartitions"),
111 "Someone has inserted a block in a new partition while we were creating locks. Retry.");
112 continue;
113 }
114 else if (rc != Coordination::ZOK)
115 throw Coordination::Exception(rc);
116
117 for (size_t i = 0; i < partitions.size(); ++i)
118 {
119 size_t prefix_size = block_numbers_path.size() + 1 + partitions[i].size() + 1 + path_prefix.size();
120 const String & path = dynamic_cast<const Coordination::CreateResponse &>(*lock_responses[i]).path_created;
121 if (path.size() <= prefix_size)
122 throw Exception("Logical error: name of the sequential node is shorter than prefix.",
123 ErrorCodes::LOGICAL_ERROR);
124
125 UInt64 number = parse<UInt64>(path.c_str() + prefix_size, path.size() - prefix_size);
126 locks.push_back(LockInfo{path, holders[i], partitions[i], number});
127 }
128
129 return;
130 }
131}
132
133void EphemeralLocksInAllPartitions::unlock()
134{
135 std::vector<zkutil::ZooKeeper::FutureMulti> futures;
136 for (const auto & lock : locks)
137 {
138 Coordination::Requests unlock_ops;
139 unlock_ops.emplace_back(zkutil::makeRemoveRequest(lock.path, -1));
140 unlock_ops.emplace_back(zkutil::makeRemoveRequest(lock.holder_path, -1));
141 futures.push_back(zookeeper.asyncMulti(unlock_ops));
142 }
143
144 for (auto & future : futures)
145 future.get();
146
147 locks.clear();
148}
149
150EphemeralLocksInAllPartitions::~EphemeralLocksInAllPartitions()
151{
152 try
153 {
154 unlock();
155 }
156 catch (...)
157 {
158 tryLogCurrentException("~EphemeralLocksInAllPartitions");
159 }
160}
161
162}
163