1/*
2 * Copyright 2017-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <folly/executors/SerialExecutor.h>
18
19#include <glog/logging.h>
20
21#include <folly/ExceptionString.h>
22
23namespace folly {
24
25SerialExecutor::SerialExecutor(KeepAlive<Executor> parent)
26 : parent_(std::move(parent)) {}
27
28SerialExecutor::~SerialExecutor() {
29 DCHECK(!keepAliveCounter_);
30}
31
32Executor::KeepAlive<SerialExecutor> SerialExecutor::create(
33 KeepAlive<Executor> parent) {
34 return makeKeepAlive<SerialExecutor>(new SerialExecutor(std::move(parent)));
35}
36
37SerialExecutor::UniquePtr SerialExecutor::createUnique(
38 std::shared_ptr<Executor> parent) {
39 auto executor = new SerialExecutor(getKeepAliveToken(parent.get()));
40 return {executor, Deleter{std::move(parent)}};
41}
42
43bool SerialExecutor::keepAliveAcquire() {
44 auto keepAliveCounter =
45 keepAliveCounter_.fetch_add(1, std::memory_order_relaxed);
46 DCHECK(keepAliveCounter > 0);
47 return true;
48}
49
50void SerialExecutor::keepAliveRelease() {
51 auto keepAliveCounter =
52 keepAliveCounter_.fetch_sub(1, std::memory_order_acq_rel);
53 DCHECK(keepAliveCounter > 0);
54 if (keepAliveCounter == 1) {
55 delete this;
56 }
57}
58
59void SerialExecutor::add(Func func) {
60 queue_.enqueue(std::move(func));
61 parent_->add([keepAlive = getKeepAliveToken(this)] { keepAlive->run(); });
62}
63
64void SerialExecutor::addWithPriority(Func func, int8_t priority) {
65 queue_.enqueue(std::move(func));
66 parent_->addWithPriority(
67 [keepAlive = getKeepAliveToken(this)] { keepAlive->run(); }, priority);
68}
69
70void SerialExecutor::run() {
71 // We want scheduled_ to guard side-effects of completed tasks, so we can't
72 // use std::memory_order_relaxed here.
73 if (scheduled_.fetch_add(1, std::memory_order_acquire) > 0) {
74 return;
75 }
76
77 do {
78 Func func;
79 queue_.dequeue(func);
80
81 try {
82 func();
83 } catch (std::exception const& ex) {
84 LOG(ERROR) << "SerialExecutor: func threw unhandled exception "
85 << folly::exceptionStr(ex);
86 } catch (...) {
87 LOG(ERROR) << "SerialExecutor: func threw unhandled non-exception "
88 "object";
89 }
90
91 // We want scheduled_ to guard side-effects of completed tasks, so we can't
92 // use std::memory_order_relaxed here.
93 } while (scheduled_.fetch_sub(1, std::memory_order_release) > 1);
94}
95
96} // namespace folly
97