1 | /* |
2 | * Copyright (c) 2016-present, Yann Collet, Facebook, Inc. |
3 | * All rights reserved. |
4 | * |
5 | * This source code is licensed under both the BSD-style license (found in the |
6 | * LICENSE file in the root directory of this source tree) and the GPLv2 (found |
7 | * in the COPYING file in the root directory of this source tree). |
8 | * You may select, at your option, one of the above-listed licenses. |
9 | */ |
10 | |
11 | |
12 | /* ====== Dependencies ======= */ |
13 | #include <stddef.h> /* size_t */ |
14 | #include "pool.h" |
15 | #include "zstd_internal.h" /* ZSTD_malloc, ZSTD_free */ |
16 | |
17 | /* ====== Compiler specifics ====== */ |
18 | #if defined(_MSC_VER) |
19 | # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */ |
20 | #endif |
21 | |
22 | |
23 | #ifdef ZSTD_MULTITHREAD |
24 | |
25 | #include "threading.h" /* pthread adaptation */ |
26 | |
27 | /* A job is a function and an opaque argument */ |
28 | typedef struct POOL_job_s { |
29 | POOL_function function; |
30 | void *opaque; |
31 | } POOL_job; |
32 | |
33 | struct POOL_ctx_s { |
34 | ZSTD_customMem customMem; |
35 | /* Keep track of the threads */ |
36 | ZSTD_pthread_t *threads; |
37 | size_t numThreads; |
38 | |
39 | /* The queue is a circular buffer */ |
40 | POOL_job *queue; |
41 | size_t queueHead; |
42 | size_t queueTail; |
43 | size_t queueSize; |
44 | |
45 | /* The number of threads working on jobs */ |
46 | size_t numThreadsBusy; |
47 | /* Indicates if the queue is empty */ |
48 | int queueEmpty; |
49 | |
50 | /* The mutex protects the queue */ |
51 | ZSTD_pthread_mutex_t queueMutex; |
52 | /* Condition variable for pushers to wait on when the queue is full */ |
53 | ZSTD_pthread_cond_t queuePushCond; |
54 | /* Condition variables for poppers to wait on when the queue is empty */ |
55 | ZSTD_pthread_cond_t queuePopCond; |
56 | /* Indicates if the queue is shutting down */ |
57 | int shutdown; |
58 | }; |
59 | |
60 | /* POOL_thread() : |
61 | Work thread for the thread pool. |
62 | Waits for jobs and executes them. |
63 | @returns : NULL on failure else non-null. |
64 | */ |
65 | static void* POOL_thread(void* opaque) { |
66 | POOL_ctx* const ctx = (POOL_ctx*)opaque; |
67 | if (!ctx) { return NULL; } |
68 | for (;;) { |
69 | /* Lock the mutex and wait for a non-empty queue or until shutdown */ |
70 | ZSTD_pthread_mutex_lock(&ctx->queueMutex); |
71 | |
72 | while (ctx->queueEmpty && !ctx->shutdown) { |
73 | ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); |
74 | } |
75 | /* empty => shutting down: so stop */ |
76 | if (ctx->queueEmpty) { |
77 | ZSTD_pthread_mutex_unlock(&ctx->queueMutex); |
78 | return opaque; |
79 | } |
80 | /* Pop a job off the queue */ |
81 | { POOL_job const job = ctx->queue[ctx->queueHead]; |
82 | ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; |
83 | ctx->numThreadsBusy++; |
84 | ctx->queueEmpty = ctx->queueHead == ctx->queueTail; |
85 | /* Unlock the mutex, signal a pusher, and run the job */ |
86 | ZSTD_pthread_mutex_unlock(&ctx->queueMutex); |
87 | ZSTD_pthread_cond_signal(&ctx->queuePushCond); |
88 | |
89 | job.function(job.opaque); |
90 | |
91 | /* If the intended queue size was 0, signal after finishing job */ |
92 | if (ctx->queueSize == 1) { |
93 | ZSTD_pthread_mutex_lock(&ctx->queueMutex); |
94 | ctx->numThreadsBusy--; |
95 | ZSTD_pthread_mutex_unlock(&ctx->queueMutex); |
96 | ZSTD_pthread_cond_signal(&ctx->queuePushCond); |
97 | } } |
98 | } /* for (;;) */ |
99 | /* Unreachable */ |
100 | } |
101 | |
102 | POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) { |
103 | return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem); |
104 | } |
105 | |
106 | POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) { |
107 | POOL_ctx* ctx; |
108 | /* Check the parameters */ |
109 | if (!numThreads) { return NULL; } |
110 | /* Allocate the context and zero initialize */ |
111 | ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem); |
112 | if (!ctx) { return NULL; } |
113 | /* Initialize the job queue. |
114 | * It needs one extra space since one space is wasted to differentiate empty |
115 | * and full queues. |
116 | */ |
117 | ctx->queueSize = queueSize + 1; |
118 | ctx->queue = (POOL_job*)ZSTD_malloc(ctx->queueSize * sizeof(POOL_job), customMem); |
119 | ctx->queueHead = 0; |
120 | ctx->queueTail = 0; |
121 | ctx->numThreadsBusy = 0; |
122 | ctx->queueEmpty = 1; |
123 | (void)ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL); |
124 | (void)ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL); |
125 | (void)ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL); |
126 | ctx->shutdown = 0; |
127 | /* Allocate space for the thread handles */ |
128 | ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem); |
129 | ctx->numThreads = 0; |
130 | ctx->customMem = customMem; |
131 | /* Check for errors */ |
132 | if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; } |
133 | /* Initialize the threads */ |
134 | { size_t i; |
135 | for (i = 0; i < numThreads; ++i) { |
136 | if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) { |
137 | ctx->numThreads = i; |
138 | POOL_free(ctx); |
139 | return NULL; |
140 | } } |
141 | ctx->numThreads = numThreads; |
142 | } |
143 | return ctx; |
144 | } |
145 | |
146 | /*! POOL_join() : |
147 | Shutdown the queue, wake any sleeping threads, and join all of the threads. |
148 | */ |
149 | static void POOL_join(POOL_ctx* ctx) { |
150 | /* Shut down the queue */ |
151 | ZSTD_pthread_mutex_lock(&ctx->queueMutex); |
152 | ctx->shutdown = 1; |
153 | ZSTD_pthread_mutex_unlock(&ctx->queueMutex); |
154 | /* Wake up sleeping threads */ |
155 | ZSTD_pthread_cond_broadcast(&ctx->queuePushCond); |
156 | ZSTD_pthread_cond_broadcast(&ctx->queuePopCond); |
157 | /* Join all of the threads */ |
158 | { size_t i; |
159 | for (i = 0; i < ctx->numThreads; ++i) { |
160 | ZSTD_pthread_join(ctx->threads[i], NULL); |
161 | } } |
162 | } |
163 | |
164 | void POOL_free(POOL_ctx *ctx) { |
165 | if (!ctx) { return; } |
166 | POOL_join(ctx); |
167 | ZSTD_pthread_mutex_destroy(&ctx->queueMutex); |
168 | ZSTD_pthread_cond_destroy(&ctx->queuePushCond); |
169 | ZSTD_pthread_cond_destroy(&ctx->queuePopCond); |
170 | ZSTD_free(ctx->queue, ctx->customMem); |
171 | ZSTD_free(ctx->threads, ctx->customMem); |
172 | ZSTD_free(ctx, ctx->customMem); |
173 | } |
174 | |
175 | size_t POOL_sizeof(POOL_ctx *ctx) { |
176 | if (ctx==NULL) return 0; /* supports sizeof NULL */ |
177 | return sizeof(*ctx) |
178 | + ctx->queueSize * sizeof(POOL_job) |
179 | + ctx->numThreads * sizeof(ZSTD_pthread_t); |
180 | } |
181 | |
182 | /** |
183 | * Returns 1 if the queue is full and 0 otherwise. |
184 | * |
185 | * If the queueSize is 1 (the pool was created with an intended queueSize of 0), |
186 | * then a queue is empty if there is a thread free and no job is waiting. |
187 | */ |
188 | static int isQueueFull(POOL_ctx const* ctx) { |
189 | if (ctx->queueSize > 1) { |
190 | return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize); |
191 | } else { |
192 | return ctx->numThreadsBusy == ctx->numThreads || |
193 | !ctx->queueEmpty; |
194 | } |
195 | } |
196 | |
197 | |
198 | static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque) |
199 | { |
200 | POOL_job const job = {function, opaque}; |
201 | assert(ctx != NULL); |
202 | if (ctx->shutdown) return; |
203 | |
204 | ctx->queueEmpty = 0; |
205 | ctx->queue[ctx->queueTail] = job; |
206 | ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize; |
207 | ZSTD_pthread_cond_signal(&ctx->queuePopCond); |
208 | } |
209 | |
210 | void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) |
211 | { |
212 | assert(ctx != NULL); |
213 | ZSTD_pthread_mutex_lock(&ctx->queueMutex); |
214 | /* Wait until there is space in the queue for the new job */ |
215 | while (isQueueFull(ctx) && (!ctx->shutdown)) { |
216 | ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); |
217 | } |
218 | POOL_add_internal(ctx, function, opaque); |
219 | ZSTD_pthread_mutex_unlock(&ctx->queueMutex); |
220 | } |
221 | |
222 | |
223 | int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) |
224 | { |
225 | assert(ctx != NULL); |
226 | ZSTD_pthread_mutex_lock(&ctx->queueMutex); |
227 | if (isQueueFull(ctx)) { |
228 | ZSTD_pthread_mutex_unlock(&ctx->queueMutex); |
229 | return 0; |
230 | } |
231 | POOL_add_internal(ctx, function, opaque); |
232 | ZSTD_pthread_mutex_unlock(&ctx->queueMutex); |
233 | return 1; |
234 | } |
235 | |
236 | |
237 | #else /* ZSTD_MULTITHREAD not defined */ |
238 | |
239 | /* ========================== */ |
240 | /* No multi-threading support */ |
241 | /* ========================== */ |
242 | |
243 | |
244 | /* We don't need any data, but if it is empty, malloc() might return NULL. */ |
245 | struct POOL_ctx_s { |
246 | int dummy; |
247 | }; |
248 | static POOL_ctx g_ctx; |
249 | |
250 | POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) { |
251 | return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem); |
252 | } |
253 | |
254 | POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) { |
255 | (void)numThreads; |
256 | (void)queueSize; |
257 | (void)customMem; |
258 | return &g_ctx; |
259 | } |
260 | |
261 | void POOL_free(POOL_ctx* ctx) { |
262 | assert(!ctx || ctx == &g_ctx); |
263 | (void)ctx; |
264 | } |
265 | |
266 | void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) { |
267 | (void)ctx; |
268 | function(opaque); |
269 | } |
270 | |
271 | int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) { |
272 | (void)ctx; |
273 | function(opaque); |
274 | return 1; |
275 | } |
276 | |
277 | size_t POOL_sizeof(POOL_ctx* ctx) { |
278 | if (ctx==NULL) return 0; /* supports sizeof NULL */ |
279 | assert(ctx == &g_ctx); |
280 | return sizeof(*ctx); |
281 | } |
282 | |
283 | #endif /* ZSTD_MULTITHREAD */ |
284 | |