1/*
2 * Copyright 2018-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <future>
20
21#include <glog/logging.h>
22
23#include <folly/Executor.h>
24#include <folly/synchronization/Baton.h>
25
26namespace folly {
27
28/// An Executor accepts units of work with add(), which should be
29/// threadsafe.
30class DefaultKeepAliveExecutor : public virtual Executor {
31 public:
32 DefaultKeepAliveExecutor() : Executor() {}
33
34 virtual ~DefaultKeepAliveExecutor() {
35 DCHECK(!keepAlive_);
36 }
37
38 folly::Executor::KeepAlive<> weakRef() {
39 return WeakRef::create(controlBlock_, this);
40 }
41
42 protected:
43 void joinKeepAlive() {
44 DCHECK(keepAlive_);
45 keepAlive_.reset();
46 keepAliveReleaseBaton_.wait();
47 }
48
49 private:
50 struct ControlBlock {
51 std::atomic<ssize_t> keepAliveCount_{1};
52 };
53
54 class WeakRef : public Executor {
55 public:
56 static folly::Executor::KeepAlive<> create(
57 std::shared_ptr<ControlBlock> controlBlock,
58 Executor* executor) {
59 return makeKeepAlive(new WeakRef(std::move(controlBlock), executor));
60 }
61
62 void add(Func f) override {
63 if (auto executor = lock()) {
64 executor->add(std::move(f));
65 }
66 }
67
68 void addWithPriority(Func f, int8_t priority) override {
69 if (auto executor = lock()) {
70 executor->addWithPriority(std::move(f), priority);
71 }
72 }
73
74 virtual uint8_t getNumPriorities() const override {
75 return numPriorities_;
76 }
77
78 private:
79 WeakRef(std::shared_ptr<ControlBlock> controlBlock, Executor* executor)
80 : controlBlock_(std::move(controlBlock)),
81 executor_(executor),
82 numPriorities_(executor->getNumPriorities()) {}
83
84 bool keepAliveAcquire() override {
85 auto keepAliveCount =
86 keepAliveCount_.fetch_add(1, std::memory_order_relaxed);
87 // We should never increment from 0
88 DCHECK(keepAliveCount > 0);
89 return true;
90 }
91
92 void keepAliveRelease() override {
93 auto keepAliveCount =
94 keepAliveCount_.fetch_sub(1, std::memory_order_acq_rel);
95 DCHECK(keepAliveCount >= 1);
96
97 if (keepAliveCount == 1) {
98 delete this;
99 }
100 }
101
102 folly::Executor::KeepAlive<> lock() {
103 auto controlBlock =
104 controlBlock_->keepAliveCount_.load(std::memory_order_relaxed);
105 do {
106 if (controlBlock == 0) {
107 return {};
108 }
109 } while (!controlBlock_->keepAliveCount_.compare_exchange_weak(
110 controlBlock,
111 controlBlock + 1,
112 std::memory_order_release,
113 std::memory_order_relaxed));
114
115 return makeKeepAlive(executor_);
116 }
117
118 std::atomic<size_t> keepAliveCount_{1};
119
120 std::shared_ptr<ControlBlock> controlBlock_;
121 Executor* executor_;
122
123 uint8_t numPriorities_;
124 };
125
126 bool keepAliveAcquire() override {
127 auto keepAliveCount =
128 controlBlock_->keepAliveCount_.fetch_add(1, std::memory_order_relaxed);
129 // We should never increment from 0
130 DCHECK(keepAliveCount > 0);
131 return true;
132 }
133
134 void keepAliveRelease() override {
135 auto keepAliveCount =
136 controlBlock_->keepAliveCount_.fetch_sub(1, std::memory_order_acquire);
137 DCHECK(keepAliveCount >= 1);
138
139 if (keepAliveCount == 1) {
140 keepAliveReleaseBaton_.post(); // std::memory_order_release
141 }
142 }
143
144 std::shared_ptr<ControlBlock> controlBlock_{std::make_shared<ControlBlock>()};
145 Baton<> keepAliveReleaseBaton_;
146 KeepAlive<DefaultKeepAliveExecutor> keepAlive_{
147 makeKeepAlive<DefaultKeepAliveExecutor>(this)};
148};
149
150} // namespace folly
151