1 | #include "ZooKeeperNodeCache.h" |
2 | |
3 | namespace zkutil |
4 | { |
5 | |
6 | ZooKeeperNodeCache::ZooKeeperNodeCache(GetZooKeeper get_zookeeper_) |
7 | : get_zookeeper(std::move(get_zookeeper_)) |
8 | , context(std::make_shared<Context>()) |
9 | { |
10 | } |
11 | |
12 | ZooKeeperNodeCache::ZNode ZooKeeperNodeCache::get(const std::string & path, EventPtr watch_event) |
13 | { |
14 | Coordination::WatchCallback watch_callback; |
15 | if (watch_event) |
16 | watch_callback = [watch_event](const Coordination::WatchResponse &) { watch_event->set(); }; |
17 | |
18 | return get(path, watch_callback); |
19 | } |
20 | |
21 | ZooKeeperNodeCache::ZNode ZooKeeperNodeCache::get(const std::string & path, Coordination::WatchCallback caller_watch_callback) |
22 | { |
23 | std::unordered_set<std::string> invalidated_paths; |
24 | { |
25 | std::lock_guard lock(context->mutex); |
26 | |
27 | if (context->all_paths_invalidated) |
28 | { |
29 | /// Possibly, there was a previous session and it has expired. Clear the cache. |
30 | path_to_cached_znode.clear(); |
31 | context->all_paths_invalidated = false; |
32 | } |
33 | |
34 | invalidated_paths.swap(context->invalidated_paths); |
35 | } |
36 | |
37 | zkutil::ZooKeeperPtr zookeeper = get_zookeeper(); |
38 | if (!zookeeper) |
39 | throw DB::Exception("Could not get znode: '" + path + "'. ZooKeeper not configured." , DB::ErrorCodes::NO_ZOOKEEPER); |
40 | |
41 | for (const auto & invalidated_path : invalidated_paths) |
42 | path_to_cached_znode.erase(invalidated_path); |
43 | |
44 | auto cache_it = path_to_cached_znode.find(path); |
45 | if (cache_it != path_to_cached_znode.end()) |
46 | return cache_it->second; |
47 | |
48 | std::weak_ptr<Context> weak_context(context); |
49 | auto watch_callback = [weak_context, caller_watch_callback](const Coordination::WatchResponse & response) |
50 | { |
51 | if (!(response.type != Coordination::SESSION || response.state == Coordination::EXPIRED_SESSION)) |
52 | return; |
53 | |
54 | auto owned_context = weak_context.lock(); |
55 | if (!owned_context) |
56 | return; |
57 | |
58 | bool changed = false; |
59 | { |
60 | std::lock_guard lock(owned_context->mutex); |
61 | |
62 | if (response.type != Coordination::SESSION) |
63 | changed = owned_context->invalidated_paths.emplace(response.path).second; |
64 | else if (response.state == Coordination::EXPIRED_SESSION) |
65 | { |
66 | owned_context->invalidated_paths.clear(); |
67 | owned_context->all_paths_invalidated = true; |
68 | changed = true; |
69 | } |
70 | } |
71 | if (changed && caller_watch_callback) |
72 | caller_watch_callback(response); |
73 | }; |
74 | |
75 | ZNode result; |
76 | |
77 | result.exists = zookeeper->tryGetWatch(path, result.contents, &result.stat, watch_callback); |
78 | if (result.exists) |
79 | { |
80 | path_to_cached_znode.emplace(path, result); |
81 | return result; |
82 | } |
83 | |
84 | /// Node doesn't exist. We must set a watch on node creation (because it wasn't set by tryGetWatch). |
85 | |
86 | result.exists = zookeeper->existsWatch(path, &result.stat, watch_callback); |
87 | if (!result.exists) |
88 | { |
89 | path_to_cached_znode.emplace(path, result); |
90 | return result; |
91 | } |
92 | |
93 | /// Node was created between the two previous calls, try again. Watch is already set. |
94 | |
95 | result.exists = zookeeper->tryGet(path, result.contents, &result.stat); |
96 | path_to_cached_znode.emplace(path, result); |
97 | return result; |
98 | } |
99 | |
100 | } |
101 | |