1#include "ZooKeeperNodeCache.h"
2
3namespace zkutil
4{
5
6ZooKeeperNodeCache::ZooKeeperNodeCache(GetZooKeeper get_zookeeper_)
7 : get_zookeeper(std::move(get_zookeeper_))
8 , context(std::make_shared<Context>())
9{
10}
11
12ZooKeeperNodeCache::ZNode ZooKeeperNodeCache::get(const std::string & path, EventPtr watch_event)
13{
14 Coordination::WatchCallback watch_callback;
15 if (watch_event)
16 watch_callback = [watch_event](const Coordination::WatchResponse &) { watch_event->set(); };
17
18 return get(path, watch_callback);
19}
20
21ZooKeeperNodeCache::ZNode ZooKeeperNodeCache::get(const std::string & path, Coordination::WatchCallback caller_watch_callback)
22{
23 std::unordered_set<std::string> invalidated_paths;
24 {
25 std::lock_guard lock(context->mutex);
26
27 if (context->all_paths_invalidated)
28 {
29 /// Possibly, there was a previous session and it has expired. Clear the cache.
30 path_to_cached_znode.clear();
31 context->all_paths_invalidated = false;
32 }
33
34 invalidated_paths.swap(context->invalidated_paths);
35 }
36
37 zkutil::ZooKeeperPtr zookeeper = get_zookeeper();
38 if (!zookeeper)
39 throw DB::Exception("Could not get znode: '" + path + "'. ZooKeeper not configured.", DB::ErrorCodes::NO_ZOOKEEPER);
40
41 for (const auto & invalidated_path : invalidated_paths)
42 path_to_cached_znode.erase(invalidated_path);
43
44 auto cache_it = path_to_cached_znode.find(path);
45 if (cache_it != path_to_cached_znode.end())
46 return cache_it->second;
47
48 std::weak_ptr<Context> weak_context(context);
49 auto watch_callback = [weak_context, caller_watch_callback](const Coordination::WatchResponse & response)
50 {
51 if (!(response.type != Coordination::SESSION || response.state == Coordination::EXPIRED_SESSION))
52 return;
53
54 auto owned_context = weak_context.lock();
55 if (!owned_context)
56 return;
57
58 bool changed = false;
59 {
60 std::lock_guard lock(owned_context->mutex);
61
62 if (response.type != Coordination::SESSION)
63 changed = owned_context->invalidated_paths.emplace(response.path).second;
64 else if (response.state == Coordination::EXPIRED_SESSION)
65 {
66 owned_context->invalidated_paths.clear();
67 owned_context->all_paths_invalidated = true;
68 changed = true;
69 }
70 }
71 if (changed && caller_watch_callback)
72 caller_watch_callback(response);
73 };
74
75 ZNode result;
76
77 result.exists = zookeeper->tryGetWatch(path, result.contents, &result.stat, watch_callback);
78 if (result.exists)
79 {
80 path_to_cached_znode.emplace(path, result);
81 return result;
82 }
83
84 /// Node doesn't exist. We must set a watch on node creation (because it wasn't set by tryGetWatch).
85
86 result.exists = zookeeper->existsWatch(path, &result.stat, watch_callback);
87 if (!result.exists)
88 {
89 path_to_cached_znode.emplace(path, result);
90 return result;
91 }
92
93 /// Node was created between the two previous calls, try again. Watch is already set.
94
95 result.exists = zookeeper->tryGet(path, result.contents, &result.stat);
96 path_to_cached_znode.emplace(path, result);
97 return result;
98}
99
100}
101