1// SPDX-License-Identifier: Apache-2.0
2// ----------------------------------------------------------------------------
3// Copyright 2011-2022 Arm Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License"); you may not
6// use this file except in compliance with the License. You may obtain a copy
7// of the License at:
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14// License for the specific language governing permissions and limitations
15// under the License.
16// ----------------------------------------------------------------------------
17
18/**
19 * @brief Functions and data declarations for the outer context.
20 *
21 * The outer context includes thread-pool management, which is slower to
22 * compile due to increased use of C++ stdlib. The inner context used in the
23 * majority of the codec library does not include this.
24 */
25
26#ifndef ASTCENC_INTERNAL_ENTRY_INCLUDED
27#define ASTCENC_INTERNAL_ENTRY_INCLUDED
28
29#include <atomic>
30#include <condition_variable>
31#include <functional>
32#include <mutex>
33
34#include "astcenc_internal.h"
35
36/* ============================================================================
37 Parallel execution control
38============================================================================ */
39
40/**
41 * @brief A simple counter-based manager for parallel task execution.
42 *
43 * The task processing execution consists of:
44 *
45 * * A single-threaded init stage.
46 * * A multi-threaded processing stage.
47 * * A condition variable so threads can wait for processing completion.
48 *
49 * The init stage will be executed by the first thread to arrive in the critical section, there is
50 * no main thread in the thread pool.
51 *
52 * The processing stage uses dynamic dispatch to assign task tickets to threads on an on-demand
53 * basis. Threads may each therefore executed different numbers of tasks, depending on their
54 * processing complexity. The task queue and the task tickets are just counters; the caller must map
55 * these integers to an actual processing partition in a specific problem domain.
56 *
57 * The exit wait condition is needed to ensure processing has finished before a worker thread can
58 * progress to the next stage of the pipeline. Specifically a worker may exit the processing stage
59 * because there are no new tasks to assign to it while other worker threads are still processing.
60 * Calling @c wait() will ensure that all other worker have finished before the thread can proceed.
61 *
62 * The basic usage model:
63 *
64 * // --------- From single-threaded code ---------
65 *
66 * // Reset the tracker state
67 * manager->reset()
68 *
69 * // --------- From multi-threaded code ---------
70 *
71 * // Run the stage init; only first thread actually runs the lambda
72 * manager->init(<lambda>)
73 *
74 * do
75 * {
76 * // Request a task assignment
77 * uint task_count;
78 * uint base_index = manager->get_tasks(<granule>, task_count);
79 *
80 * // Process any tasks we were given (task_count <= granule size)
81 * if (task_count)
82 * {
83 * // Run the user task processing code for N tasks here
84 * ...
85 *
86 * // Flag these tasks as complete
87 * manager->complete_tasks(task_count);
88 * }
89 * } while (task_count);
90 *
91 * // Wait for all threads to complete tasks before progressing
92 * manager->wait()
93 *
94 * // Run the stage term; only first thread actually runs the lambda
95 * manager->term(<lambda>)
96 */
97class ParallelManager
98{
99private:
100 /** @brief Lock used for critical section and condition synchronization. */
101 std::mutex m_lock;
102
103 /** @brief True if the stage init() step has been executed. */
104 bool m_init_done;
105
106 /** @brief True if the stage term() step has been executed. */
107 bool m_term_done;
108
109 /** @brief Condition variable for tracking stage processing completion. */
110 std::condition_variable m_complete;
111
112 /** @brief Number of tasks started, but not necessarily finished. */
113 std::atomic<unsigned int> m_start_count;
114
115 /** @brief Number of tasks finished. */
116 unsigned int m_done_count;
117
118 /** @brief Number of tasks that need to be processed. */
119 unsigned int m_task_count;
120
121public:
122 /** @brief Create a new ParallelManager. */
123 ParallelManager()
124 {
125 reset();
126 }
127
128 /**
129 * @brief Reset the tracker for a new processing batch.
130 *
131 * This must be called from single-threaded code before starting the multi-threaded processing
132 * operations.
133 */
134 void reset()
135 {
136 m_init_done = false;
137 m_term_done = false;
138 m_start_count = 0;
139 m_done_count = 0;
140 m_task_count = 0;
141 }
142
143 /**
144 * @brief Trigger the pipeline stage init step.
145 *
146 * This can be called from multi-threaded code. The first thread to hit this will process the
147 * initialization. Other threads will block and wait for it to complete.
148 *
149 * @param init_func Callable which executes the stage initialization. It must return the
150 * total number of tasks in the stage.
151 */
152 void init(std::function<unsigned int(void)> init_func)
153 {
154 std::lock_guard<std::mutex> lck(m_lock);
155 if (!m_init_done)
156 {
157 m_task_count = init_func();
158 m_init_done = true;
159 }
160 }
161
162 /**
163 * @brief Trigger the pipeline stage init step.
164 *
165 * This can be called from multi-threaded code. The first thread to hit this will process the
166 * initialization. Other threads will block and wait for it to complete.
167 *
168 * @param task_count Total number of tasks needing processing.
169 */
170 void init(unsigned int task_count)
171 {
172 std::lock_guard<std::mutex> lck(m_lock);
173 if (!m_init_done)
174 {
175 m_task_count = task_count;
176 m_init_done = true;
177 }
178 }
179
180 /**
181 * @brief Request a task assignment.
182 *
183 * Assign up to @c granule tasks to the caller for processing.
184 *
185 * @param granule Maximum number of tasks that can be assigned.
186 * @param[out] count Actual number of tasks assigned, or zero if no tasks were assigned.
187 *
188 * @return Task index of the first assigned task; assigned tasks increment from this.
189 */
190 unsigned int get_task_assignment(unsigned int granule, unsigned int& count)
191 {
192 unsigned int base = m_start_count.fetch_add(granule, std::memory_order_relaxed);
193 if (base >= m_task_count)
194 {
195 count = 0;
196 return 0;
197 }
198
199 count = astc::min(m_task_count - base, granule);
200 return base;
201 }
202
203 /**
204 * @brief Complete a task assignment.
205 *
206 * Mark @c count tasks as complete. This will notify all threads blocked on @c wait() if this
207 * completes the processing of the stage.
208 *
209 * @param count The number of completed tasks.
210 */
211 void complete_task_assignment(unsigned int count)
212 {
213 // Note: m_done_count cannot use an atomic without the mutex; this has a race between the
214 // update here and the wait() for other threads
215 std::unique_lock<std::mutex> lck(m_lock);
216 this->m_done_count += count;
217 if (m_done_count == m_task_count)
218 {
219 lck.unlock();
220 m_complete.notify_all();
221 }
222 }
223
224 /**
225 * @brief Wait for stage processing to complete.
226 */
227 void wait()
228 {
229 std::unique_lock<std::mutex> lck(m_lock);
230 m_complete.wait(lck, [this]{ return m_done_count == m_task_count; });
231 }
232
233 /**
234 * @brief Trigger the pipeline stage term step.
235 *
236 * This can be called from multi-threaded code. The first thread to hit this will process the
237 * work pool termination. Caller must have called @c wait() prior to calling this function to
238 * ensure that processing is complete.
239 *
240 * @param term_func Callable which executes the stage termination.
241 */
242 void term(std::function<void(void)> term_func)
243 {
244 std::lock_guard<std::mutex> lck(m_lock);
245 if (!m_term_done)
246 {
247 term_func();
248 m_term_done = true;
249 }
250 }
251};
252
253/**
254 * @brief The astcenc compression context.
255 */
256struct astcenc_context
257{
258 /** @brief The context internal state. */
259 astcenc_contexti context;
260
261#if !defined(ASTCENC_DECOMPRESS_ONLY)
262 /** @brief The parallel manager for averages computation. */
263 ParallelManager manage_avg;
264
265 /** @brief The parallel manager for compression. */
266 ParallelManager manage_compress;
267#endif
268
269 /** @brief The parallel manager for decompression. */
270 ParallelManager manage_decompress;
271};
272
273#endif
274