1/*
2 * Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License").
5 * You may not use this file except in compliance with the License.
6 * A copy of the License is located at
7 *
8 * http://aws.amazon.com/apache2.0
9 *
10 * or in the "license" file accompanying this file. This file is distributed
11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12 * express or implied. See the License for the specific language governing
13 * permissions and limitations under the License.
14 */
15
16#include <aws/core/utils/threading/Executor.h>
17#include <aws/core/utils/threading/ThreadTask.h>
18#include <thread>
19#include <cassert>
20
21static const char* POOLED_CLASS_TAG = "PooledThreadExecutor";
22
23using namespace Aws::Utils::Threading;
24
25bool DefaultExecutor::SubmitToThread(std::function<void()>&& fx)
26{
27 auto main = [fx, this] {
28 fx();
29 Detach(std::this_thread::get_id());
30 };
31
32 State expected;
33 do
34 {
35 expected = State::Free;
36 if(m_state.compare_exchange_strong(expected, State::Locked))
37 {
38 std::thread t(main);
39 const auto id = t.get_id(); // copy the id before we std::move the thread
40 m_threads.emplace(id, std::move(t));
41 m_state = State::Free;
42 return true;
43 }
44 }
45 while(expected != State::Shutdown);
46 return false;
47}
48
49void DefaultExecutor::Detach(std::thread::id id)
50{
51 State expected;
52 do
53 {
54 expected = State::Free;
55 if(m_state.compare_exchange_strong(expected, State::Locked))
56 {
57 auto it = m_threads.find(id);
58 assert(it != m_threads.end());
59 it->second.detach();
60 m_threads.erase(it);
61 m_state = State::Free;
62 return;
63 }
64 }
65 while(expected != State::Shutdown);
66}
67
68DefaultExecutor::~DefaultExecutor()
69{
70 auto expected = State::Free;
71 while(!m_state.compare_exchange_strong(expected, State::Shutdown))
72 {
73 //spin while currently detaching threads finish
74 assert(expected == State::Locked);
75 expected = State::Free;
76 }
77
78 auto it = m_threads.begin();
79 while(!m_threads.empty())
80 {
81 it->second.join();
82 it = m_threads.erase(it);
83 }
84}
85
86PooledThreadExecutor::PooledThreadExecutor(size_t poolSize, OverflowPolicy overflowPolicy) :
87 m_sync(0, poolSize), m_poolSize(poolSize), m_overflowPolicy(overflowPolicy)
88{
89 for (size_t index = 0; index < m_poolSize; ++index)
90 {
91 m_threadTaskHandles.push_back(Aws::New<ThreadTask>(POOLED_CLASS_TAG, *this));
92 }
93}
94
95PooledThreadExecutor::~PooledThreadExecutor()
96{
97 for(auto threadTask : m_threadTaskHandles)
98 {
99 threadTask->StopProcessingWork();
100 }
101
102 m_sync.ReleaseAll();
103
104 for (auto threadTask : m_threadTaskHandles)
105 {
106 Aws::Delete(threadTask);
107 }
108
109 while(m_tasks.size() > 0)
110 {
111 std::function<void()>* fn = m_tasks.front();
112 m_tasks.pop();
113
114 if(fn)
115 {
116 Aws::Delete(fn);
117 }
118 }
119
120}
121
122bool PooledThreadExecutor::SubmitToThread(std::function<void()>&& fn)
123{
124 //avoid the need to do copies inside the lock. Instead lets do a pointer push.
125 std::function<void()>* fnCpy = Aws::New<std::function<void()>>(POOLED_CLASS_TAG, std::forward<std::function<void()>>(fn));
126
127 {
128 std::lock_guard<std::mutex> locker(m_queueLock);
129
130 if (m_overflowPolicy == OverflowPolicy::REJECT_IMMEDIATELY && m_tasks.size() >= m_poolSize)
131 {
132 Aws::Delete(fnCpy);
133 return false;
134 }
135
136 m_tasks.push(fnCpy);
137 }
138
139 m_sync.Release();
140
141 return true;
142}
143
144std::function<void()>* PooledThreadExecutor::PopTask()
145{
146 std::lock_guard<std::mutex> locker(m_queueLock);
147
148 if (m_tasks.size() > 0)
149 {
150 std::function<void()>* fn = m_tasks.front();
151 if (fn)
152 {
153 m_tasks.pop();
154 return fn;
155 }
156 }
157
158 return nullptr;
159}
160
161bool PooledThreadExecutor::HasTasks()
162{
163 std::lock_guard<std::mutex> locker(m_queueLock);
164 return m_tasks.size() > 0;
165}
166