1 | /* |
2 | * Copyright 2008-2018 Aerospike, Inc. |
3 | * |
4 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
5 | * license agreements. |
6 | * |
7 | * Licensed under the Apache License, Version 2.0 (the "License"); you may not |
8 | * use this file except in compliance with the License. You may obtain a copy of |
9 | * the License at http://www.apache.org/licenses/LICENSE-2.0 |
10 | * |
11 | * Unless required by applicable law or agreed to in writing, software |
12 | * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
13 | * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
14 | * License for the specific language governing permissions and limitations under |
15 | * the License. |
16 | */ |
17 | #include <citrusleaf/cf_queue_priority.h> |
18 | #include <citrusleaf/cf_clock.h> |
19 | #include <citrusleaf/alloc.h> |
20 | #include <string.h> |
21 | |
22 | /****************************************************************************** |
23 | * FUNCTIONS |
24 | ******************************************************************************/ |
25 | |
26 | cf_queue_priority *cf_queue_priority_create(size_t element_sz, bool threadsafe) |
27 | { |
28 | cf_queue_priority *q = (cf_queue_priority*)cf_malloc(sizeof(cf_queue_priority)); |
29 | |
30 | if (! q) { |
31 | return NULL; |
32 | } |
33 | |
34 | q->threadsafe = threadsafe; |
35 | |
36 | if (! (q->low_q = cf_queue_create(element_sz, false))) { |
37 | goto Fail1; |
38 | } |
39 | |
40 | if (! (q->medium_q = cf_queue_create(element_sz, false))) { |
41 | goto Fail2; |
42 | } |
43 | |
44 | if (! (q->high_q = cf_queue_create(element_sz, false))) { |
45 | goto Fail3; |
46 | } |
47 | |
48 | if (! threadsafe) { |
49 | return q; |
50 | } |
51 | |
52 | if (0 != pthread_mutex_init(&q->LOCK, NULL)) { |
53 | goto Fail4; |
54 | } |
55 | |
56 | if (0 != pthread_cond_init(&q->CV, NULL)) { |
57 | goto Fail5; |
58 | } |
59 | |
60 | return q; |
61 | |
62 | Fail5: |
63 | pthread_mutex_destroy(&q->LOCK); |
64 | Fail4: |
65 | cf_queue_destroy(q->high_q); |
66 | Fail3: |
67 | cf_queue_destroy(q->medium_q); |
68 | Fail2: |
69 | cf_queue_destroy(q->low_q); |
70 | Fail1: |
71 | cf_free(q); |
72 | |
73 | return NULL; |
74 | } |
75 | |
76 | void cf_queue_priority_destroy(cf_queue_priority *q) |
77 | { |
78 | cf_queue_destroy(q->high_q); |
79 | cf_queue_destroy(q->medium_q); |
80 | cf_queue_destroy(q->low_q); |
81 | |
82 | if (q->threadsafe) { |
83 | pthread_mutex_destroy(&q->LOCK); |
84 | pthread_cond_destroy(&q->CV); |
85 | } |
86 | |
87 | cf_free(q); |
88 | } |
89 | |
90 | static inline void cf_queue_priority_lock(cf_queue_priority *q) |
91 | { |
92 | if (q->threadsafe) { |
93 | pthread_mutex_lock(&q->LOCK); |
94 | } |
95 | } |
96 | |
97 | static inline void cf_queue_priority_unlock(cf_queue_priority *q) |
98 | { |
99 | if (q->threadsafe) { |
100 | pthread_mutex_unlock(&q->LOCK); |
101 | } |
102 | } |
103 | |
104 | int cf_queue_priority_push(cf_queue_priority *q, const void *ptr, int pri) |
105 | { |
106 | cf_queue_priority_lock(q); |
107 | |
108 | int rv = CF_QUEUE_ERR; |
109 | |
110 | if (pri == CF_QUEUE_PRIORITY_HIGH) { |
111 | rv = cf_queue_push(q->high_q, ptr); |
112 | } |
113 | else if (pri == CF_QUEUE_PRIORITY_MEDIUM) { |
114 | rv = cf_queue_push(q->medium_q, ptr); |
115 | } |
116 | else if (pri == CF_QUEUE_PRIORITY_LOW) { |
117 | rv = cf_queue_push(q->low_q, ptr); |
118 | } |
119 | |
120 | if (rv == 0 && q->threadsafe) { |
121 | pthread_cond_signal(&q->CV); |
122 | } |
123 | |
124 | cf_queue_priority_unlock(q); |
125 | return rv; |
126 | } |
127 | |
128 | int cf_queue_priority_pop(cf_queue_priority *q, void *buf, int ms_wait) |
129 | { |
130 | cf_queue_priority_lock(q); |
131 | |
132 | struct timespec tp; |
133 | |
134 | if (ms_wait > 0) { |
135 | cf_set_wait_timespec(ms_wait, &tp); |
136 | } |
137 | |
138 | if (q->threadsafe) { |
139 | while (CF_Q_PRI_EMPTY(q)) { |
140 | if (CF_QUEUE_FOREVER == ms_wait) { |
141 | pthread_cond_wait(&q->CV, &q->LOCK); |
142 | } |
143 | else if (CF_QUEUE_NOWAIT == ms_wait) { |
144 | pthread_mutex_unlock(&q->LOCK); |
145 | return CF_QUEUE_EMPTY; |
146 | } |
147 | else { |
148 | pthread_cond_timedwait(&q->CV, &q->LOCK, &tp); |
149 | |
150 | if (CF_Q_PRI_EMPTY(q)) { |
151 | pthread_mutex_unlock(&q->LOCK); |
152 | return CF_QUEUE_EMPTY; |
153 | } |
154 | } |
155 | } |
156 | } |
157 | |
158 | int rv = CF_QUEUE_EMPTY; |
159 | |
160 | if (CF_Q_SZ(q->high_q)) { |
161 | rv = cf_queue_pop(q->high_q, buf, 0); |
162 | } |
163 | else if (CF_Q_SZ(q->medium_q)) { |
164 | rv = cf_queue_pop(q->medium_q, buf, 0); |
165 | } |
166 | else if (CF_Q_SZ(q->low_q)) { |
167 | rv = cf_queue_pop(q->low_q, buf, 0); |
168 | } |
169 | |
170 | cf_queue_priority_unlock(q); |
171 | return rv; |
172 | } |
173 | |
174 | int cf_queue_priority_sz(cf_queue_priority *q) |
175 | { |
176 | int rv = 0; |
177 | |
178 | cf_queue_priority_lock(q); |
179 | rv += cf_queue_sz(q->high_q); |
180 | rv += cf_queue_sz(q->medium_q); |
181 | rv += cf_queue_sz(q->low_q); |
182 | cf_queue_priority_unlock(q); |
183 | |
184 | return rv; |
185 | } |
186 | |
187 | /** |
188 | * Use this function to find an element to pop from the queue using a reduce |
189 | * callback function. Have the callback function return -1 when you want to pop |
190 | * the element and stop reducing. If you have not popped an element, |
191 | * CF_QUEUE_NOMATCH is returned. |
192 | */ |
193 | int cf_queue_priority_reduce_pop(cf_queue_priority *priority_q, void *buf, cf_queue_reduce_fn cb, void *udata) |
194 | { |
195 | cf_queue_priority_lock(priority_q); |
196 | |
197 | cf_queue *queues[3]; |
198 | |
199 | queues[0] = priority_q->high_q; |
200 | queues[1] = priority_q->medium_q; |
201 | queues[2] = priority_q->low_q; |
202 | |
203 | cf_queue *q; |
204 | |
205 | for (int q_itr = 0; q_itr < 3; q_itr++) { |
206 | q = queues[q_itr]; |
207 | |
208 | if (CF_Q_SZ(q) == 0) { |
209 | continue; |
210 | } |
211 | |
212 | for (uint32_t i = q->read_offset; i < q->write_offset; i++) { |
213 | int rv = cb(CF_Q_ELEM_PTR(q, i), udata); |
214 | |
215 | if (rv == 0) { |
216 | continue; |
217 | } |
218 | |
219 | if (rv == -1) { |
220 | // Found an element, so copy to 'buf', delete from q, and return. |
221 | memcpy(buf, CF_Q_ELEM_PTR(q, i), q->element_sz); |
222 | cf_queue_delete_offset(q, i); |
223 | |
224 | cf_queue_priority_unlock(priority_q); |
225 | return CF_QUEUE_OK; |
226 | } |
227 | } |
228 | } |
229 | |
230 | cf_queue_priority_unlock(priority_q); |
231 | return CF_QUEUE_NOMATCH; |
232 | } |
233 | |
234 | // |
235 | // This assumes the element we're looking for is unique! Returns |
236 | // CF_QUEUE_NOMATCH if the element is not found or not moved. |
237 | // |
238 | int cf_queue_priority_change(cf_queue_priority *priority_q, const void *ptr, int new_pri) |
239 | { |
240 | cf_queue_priority_lock(priority_q); |
241 | |
242 | cf_queue *queues[3]; |
243 | |
244 | queues[0] = priority_q->high_q; |
245 | queues[1] = priority_q->medium_q; |
246 | queues[2] = priority_q->low_q; |
247 | |
248 | int dest_q_itr = CF_QUEUE_PRIORITY_HIGH - new_pri; |
249 | cf_queue *q; |
250 | |
251 | for (int q_itr = 0; q_itr < 3; q_itr++) { |
252 | q = queues[q_itr]; |
253 | |
254 | if (q_itr == dest_q_itr || CF_Q_SZ(q) == 0) { |
255 | continue; |
256 | } |
257 | |
258 | for (uint32_t i = q->read_offset; i < q->write_offset; i++) { |
259 | if (memcmp(CF_Q_ELEM_PTR(q, i), ptr, q->element_sz) == 0) { |
260 | // Move it to the queue with desired priority. |
261 | cf_queue_delete_offset(q, i); |
262 | cf_queue_push(queues[dest_q_itr], ptr); |
263 | |
264 | cf_queue_priority_unlock(priority_q); |
265 | return CF_QUEUE_OK; |
266 | } |
267 | } |
268 | } |
269 | |
270 | cf_queue_priority_unlock(priority_q); |
271 | return CF_QUEUE_NOMATCH; |
272 | } |
273 | |
274 | // |
275 | // Reduce the inner queues whose priorities are different to 'new_pri'. If the |
276 | // callback returns -1, move that element to the inner queue whose priority is |
277 | // 'new_pri' and return CF_QUEUE_OK. Returns CF_QUEUE_NOMATCH if callback never |
278 | // triggers a move. |
279 | // |
280 | int cf_queue_priority_reduce_change(cf_queue_priority *priority_q, int new_pri, cf_queue_reduce_fn cb, void *udata) |
281 | { |
282 | cf_queue_priority_lock(priority_q); |
283 | |
284 | cf_queue *queues[3]; |
285 | |
286 | queues[0] = priority_q->high_q; |
287 | queues[1] = priority_q->medium_q; |
288 | queues[2] = priority_q->low_q; |
289 | |
290 | int dest_q_itr = CF_QUEUE_PRIORITY_HIGH - new_pri; |
291 | cf_queue *q; |
292 | |
293 | for (int q_itr = 0; q_itr < 3; q_itr++) { |
294 | q = queues[q_itr]; |
295 | |
296 | if (q_itr == dest_q_itr || CF_Q_SZ(q) == 0) { |
297 | continue; |
298 | } |
299 | |
300 | for (uint32_t i = q->read_offset; i < q->write_offset; i++) { |
301 | int rv = cb(CF_Q_ELEM_PTR(q, i), udata); |
302 | |
303 | if (rv == 0) { |
304 | continue; |
305 | } |
306 | |
307 | if (rv == -1) { |
308 | // Found it - move to desired queue and return. |
309 | uint8_t* buf = alloca(q->element_sz); |
310 | |
311 | memcpy(buf, CF_Q_ELEM_PTR(q, i), q->element_sz); |
312 | cf_queue_delete_offset(q, i); |
313 | cf_queue_push(queues[dest_q_itr], buf); |
314 | |
315 | cf_queue_priority_unlock(priority_q); |
316 | return CF_QUEUE_OK; |
317 | } |
318 | } |
319 | } |
320 | |
321 | cf_queue_priority_unlock(priority_q); |
322 | return CF_QUEUE_NOMATCH; |
323 | } |
324 | |