1 | /* |
2 | * Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"). |
5 | * You may not use this file except in compliance with the License. |
6 | * A copy of the License is located at |
7 | * |
8 | * http://aws.amazon.com/apache2.0 |
9 | * |
10 | * or in the "license" file accompanying this file. This file is distributed |
11 | * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either |
12 | * express or implied. See the License for the specific language governing |
13 | * permissions and limitations under the License. |
14 | */ |
15 | |
16 | #include <aws/core/utils/threading/Executor.h> |
17 | #include <aws/core/utils/threading/ThreadTask.h> |
18 | #include <thread> |
19 | #include <cassert> |
20 | |
21 | static const char* POOLED_CLASS_TAG = "PooledThreadExecutor" ; |
22 | |
23 | using namespace Aws::Utils::Threading; |
24 | |
25 | bool DefaultExecutor::SubmitToThread(std::function<void()>&& fx) |
26 | { |
27 | auto main = [fx, this] { |
28 | fx(); |
29 | Detach(std::this_thread::get_id()); |
30 | }; |
31 | |
32 | State expected; |
33 | do |
34 | { |
35 | expected = State::Free; |
36 | if(m_state.compare_exchange_strong(expected, State::Locked)) |
37 | { |
38 | std::thread t(main); |
39 | const auto id = t.get_id(); // copy the id before we std::move the thread |
40 | m_threads.emplace(id, std::move(t)); |
41 | m_state = State::Free; |
42 | return true; |
43 | } |
44 | } |
45 | while(expected != State::Shutdown); |
46 | return false; |
47 | } |
48 | |
49 | void DefaultExecutor::Detach(std::thread::id id) |
50 | { |
51 | State expected; |
52 | do |
53 | { |
54 | expected = State::Free; |
55 | if(m_state.compare_exchange_strong(expected, State::Locked)) |
56 | { |
57 | auto it = m_threads.find(id); |
58 | assert(it != m_threads.end()); |
59 | it->second.detach(); |
60 | m_threads.erase(it); |
61 | m_state = State::Free; |
62 | return; |
63 | } |
64 | } |
65 | while(expected != State::Shutdown); |
66 | } |
67 | |
68 | DefaultExecutor::~DefaultExecutor() |
69 | { |
70 | auto expected = State::Free; |
71 | while(!m_state.compare_exchange_strong(expected, State::Shutdown)) |
72 | { |
73 | //spin while currently detaching threads finish |
74 | assert(expected == State::Locked); |
75 | expected = State::Free; |
76 | } |
77 | |
78 | auto it = m_threads.begin(); |
79 | while(!m_threads.empty()) |
80 | { |
81 | it->second.join(); |
82 | it = m_threads.erase(it); |
83 | } |
84 | } |
85 | |
86 | PooledThreadExecutor::PooledThreadExecutor(size_t poolSize, OverflowPolicy overflowPolicy) : |
87 | m_sync(0, poolSize), m_poolSize(poolSize), m_overflowPolicy(overflowPolicy) |
88 | { |
89 | for (size_t index = 0; index < m_poolSize; ++index) |
90 | { |
91 | m_threadTaskHandles.push_back(Aws::New<ThreadTask>(POOLED_CLASS_TAG, *this)); |
92 | } |
93 | } |
94 | |
95 | PooledThreadExecutor::~PooledThreadExecutor() |
96 | { |
97 | for(auto threadTask : m_threadTaskHandles) |
98 | { |
99 | threadTask->StopProcessingWork(); |
100 | } |
101 | |
102 | m_sync.ReleaseAll(); |
103 | |
104 | for (auto threadTask : m_threadTaskHandles) |
105 | { |
106 | Aws::Delete(threadTask); |
107 | } |
108 | |
109 | while(m_tasks.size() > 0) |
110 | { |
111 | std::function<void()>* fn = m_tasks.front(); |
112 | m_tasks.pop(); |
113 | |
114 | if(fn) |
115 | { |
116 | Aws::Delete(fn); |
117 | } |
118 | } |
119 | |
120 | } |
121 | |
122 | bool PooledThreadExecutor::SubmitToThread(std::function<void()>&& fn) |
123 | { |
124 | //avoid the need to do copies inside the lock. Instead lets do a pointer push. |
125 | std::function<void()>* fnCpy = Aws::New<std::function<void()>>(POOLED_CLASS_TAG, std::forward<std::function<void()>>(fn)); |
126 | |
127 | { |
128 | std::lock_guard<std::mutex> locker(m_queueLock); |
129 | |
130 | if (m_overflowPolicy == OverflowPolicy::REJECT_IMMEDIATELY && m_tasks.size() >= m_poolSize) |
131 | { |
132 | Aws::Delete(fnCpy); |
133 | return false; |
134 | } |
135 | |
136 | m_tasks.push(fnCpy); |
137 | } |
138 | |
139 | m_sync.Release(); |
140 | |
141 | return true; |
142 | } |
143 | |
144 | std::function<void()>* PooledThreadExecutor::PopTask() |
145 | { |
146 | std::lock_guard<std::mutex> locker(m_queueLock); |
147 | |
148 | if (m_tasks.size() > 0) |
149 | { |
150 | std::function<void()>* fn = m_tasks.front(); |
151 | if (fn) |
152 | { |
153 | m_tasks.pop(); |
154 | return fn; |
155 | } |
156 | } |
157 | |
158 | return nullptr; |
159 | } |
160 | |
161 | bool PooledThreadExecutor::HasTasks() |
162 | { |
163 | std::lock_guard<std::mutex> locker(m_queueLock); |
164 | return m_tasks.size() > 0; |
165 | } |
166 | |