1/*
2 * Copyright 2008-2018 Aerospike, Inc.
3 *
4 * Portions may be licensed to Aerospike, Inc. under one or more contributor
5 * license agreements.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8 * use this file except in compliance with the License. You may obtain a copy of
9 * the License at http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17#include <citrusleaf/cf_queue_priority.h>
18#include <citrusleaf/cf_clock.h>
19#include <citrusleaf/alloc.h>
20#include <string.h>
21
22/******************************************************************************
23 * FUNCTIONS
24 ******************************************************************************/
25
26cf_queue_priority *cf_queue_priority_create(size_t element_sz, bool threadsafe)
27{
28 cf_queue_priority *q = (cf_queue_priority*)cf_malloc(sizeof(cf_queue_priority));
29
30 if (! q) {
31 return NULL;
32 }
33
34 q->threadsafe = threadsafe;
35
36 if (! (q->low_q = cf_queue_create(element_sz, false))) {
37 goto Fail1;
38 }
39
40 if (! (q->medium_q = cf_queue_create(element_sz, false))) {
41 goto Fail2;
42 }
43
44 if (! (q->high_q = cf_queue_create(element_sz, false))) {
45 goto Fail3;
46 }
47
48 if (! threadsafe) {
49 return q;
50 }
51
52 if (0 != pthread_mutex_init(&q->LOCK, NULL)) {
53 goto Fail4;
54 }
55
56 if (0 != pthread_cond_init(&q->CV, NULL)) {
57 goto Fail5;
58 }
59
60 return q;
61
62Fail5:
63 pthread_mutex_destroy(&q->LOCK);
64Fail4:
65 cf_queue_destroy(q->high_q);
66Fail3:
67 cf_queue_destroy(q->medium_q);
68Fail2:
69 cf_queue_destroy(q->low_q);
70Fail1:
71 cf_free(q);
72
73 return NULL;
74}
75
76void cf_queue_priority_destroy(cf_queue_priority *q)
77{
78 cf_queue_destroy(q->high_q);
79 cf_queue_destroy(q->medium_q);
80 cf_queue_destroy(q->low_q);
81
82 if (q->threadsafe) {
83 pthread_mutex_destroy(&q->LOCK);
84 pthread_cond_destroy(&q->CV);
85 }
86
87 cf_free(q);
88}
89
90static inline void cf_queue_priority_lock(cf_queue_priority *q)
91{
92 if (q->threadsafe) {
93 pthread_mutex_lock(&q->LOCK);
94 }
95}
96
97static inline void cf_queue_priority_unlock(cf_queue_priority *q)
98{
99 if (q->threadsafe) {
100 pthread_mutex_unlock(&q->LOCK);
101 }
102}
103
104int cf_queue_priority_push(cf_queue_priority *q, const void *ptr, int pri)
105{
106 cf_queue_priority_lock(q);
107
108 int rv = CF_QUEUE_ERR;
109
110 if (pri == CF_QUEUE_PRIORITY_HIGH) {
111 rv = cf_queue_push(q->high_q, ptr);
112 }
113 else if (pri == CF_QUEUE_PRIORITY_MEDIUM) {
114 rv = cf_queue_push(q->medium_q, ptr);
115 }
116 else if (pri == CF_QUEUE_PRIORITY_LOW) {
117 rv = cf_queue_push(q->low_q, ptr);
118 }
119
120 if (rv == 0 && q->threadsafe) {
121 pthread_cond_signal(&q->CV);
122 }
123
124 cf_queue_priority_unlock(q);
125 return rv;
126}
127
128int cf_queue_priority_pop(cf_queue_priority *q, void *buf, int ms_wait)
129{
130 cf_queue_priority_lock(q);
131
132 struct timespec tp;
133
134 if (ms_wait > 0) {
135 cf_set_wait_timespec(ms_wait, &tp);
136 }
137
138 if (q->threadsafe) {
139 while (CF_Q_PRI_EMPTY(q)) {
140 if (CF_QUEUE_FOREVER == ms_wait) {
141 pthread_cond_wait(&q->CV, &q->LOCK);
142 }
143 else if (CF_QUEUE_NOWAIT == ms_wait) {
144 pthread_mutex_unlock(&q->LOCK);
145 return CF_QUEUE_EMPTY;
146 }
147 else {
148 pthread_cond_timedwait(&q->CV, &q->LOCK, &tp);
149
150 if (CF_Q_PRI_EMPTY(q)) {
151 pthread_mutex_unlock(&q->LOCK);
152 return CF_QUEUE_EMPTY;
153 }
154 }
155 }
156 }
157
158 int rv = CF_QUEUE_EMPTY;
159
160 if (CF_Q_SZ(q->high_q)) {
161 rv = cf_queue_pop(q->high_q, buf, 0);
162 }
163 else if (CF_Q_SZ(q->medium_q)) {
164 rv = cf_queue_pop(q->medium_q, buf, 0);
165 }
166 else if (CF_Q_SZ(q->low_q)) {
167 rv = cf_queue_pop(q->low_q, buf, 0);
168 }
169
170 cf_queue_priority_unlock(q);
171 return rv;
172}
173
174int cf_queue_priority_sz(cf_queue_priority *q)
175{
176 int rv = 0;
177
178 cf_queue_priority_lock(q);
179 rv += cf_queue_sz(q->high_q);
180 rv += cf_queue_sz(q->medium_q);
181 rv += cf_queue_sz(q->low_q);
182 cf_queue_priority_unlock(q);
183
184 return rv;
185}
186
187/**
188 * Use this function to find an element to pop from the queue using a reduce
189 * callback function. Have the callback function return -1 when you want to pop
190 * the element and stop reducing. If you have not popped an element,
191 * CF_QUEUE_NOMATCH is returned.
192 */
193int cf_queue_priority_reduce_pop(cf_queue_priority *priority_q, void *buf, cf_queue_reduce_fn cb, void *udata)
194{
195 cf_queue_priority_lock(priority_q);
196
197 cf_queue *queues[3];
198
199 queues[0] = priority_q->high_q;
200 queues[1] = priority_q->medium_q;
201 queues[2] = priority_q->low_q;
202
203 cf_queue *q;
204
205 for (int q_itr = 0; q_itr < 3; q_itr++) {
206 q = queues[q_itr];
207
208 if (CF_Q_SZ(q) == 0) {
209 continue;
210 }
211
212 for (uint32_t i = q->read_offset; i < q->write_offset; i++) {
213 int rv = cb(CF_Q_ELEM_PTR(q, i), udata);
214
215 if (rv == 0) {
216 continue;
217 }
218
219 if (rv == -1) {
220 // Found an element, so copy to 'buf', delete from q, and return.
221 memcpy(buf, CF_Q_ELEM_PTR(q, i), q->element_sz);
222 cf_queue_delete_offset(q, i);
223
224 cf_queue_priority_unlock(priority_q);
225 return CF_QUEUE_OK;
226 }
227 }
228 }
229
230 cf_queue_priority_unlock(priority_q);
231 return CF_QUEUE_NOMATCH;
232}
233
234//
235// This assumes the element we're looking for is unique! Returns
236// CF_QUEUE_NOMATCH if the element is not found or not moved.
237//
238int cf_queue_priority_change(cf_queue_priority *priority_q, const void *ptr, int new_pri)
239{
240 cf_queue_priority_lock(priority_q);
241
242 cf_queue *queues[3];
243
244 queues[0] = priority_q->high_q;
245 queues[1] = priority_q->medium_q;
246 queues[2] = priority_q->low_q;
247
248 int dest_q_itr = CF_QUEUE_PRIORITY_HIGH - new_pri;
249 cf_queue *q;
250
251 for (int q_itr = 0; q_itr < 3; q_itr++) {
252 q = queues[q_itr];
253
254 if (q_itr == dest_q_itr || CF_Q_SZ(q) == 0) {
255 continue;
256 }
257
258 for (uint32_t i = q->read_offset; i < q->write_offset; i++) {
259 if (memcmp(CF_Q_ELEM_PTR(q, i), ptr, q->element_sz) == 0) {
260 // Move it to the queue with desired priority.
261 cf_queue_delete_offset(q, i);
262 cf_queue_push(queues[dest_q_itr], ptr);
263
264 cf_queue_priority_unlock(priority_q);
265 return CF_QUEUE_OK;
266 }
267 }
268 }
269
270 cf_queue_priority_unlock(priority_q);
271 return CF_QUEUE_NOMATCH;
272}
273
274//
275// Reduce the inner queues whose priorities are different to 'new_pri'. If the
276// callback returns -1, move that element to the inner queue whose priority is
277// 'new_pri' and return CF_QUEUE_OK. Returns CF_QUEUE_NOMATCH if callback never
278// triggers a move.
279//
280int cf_queue_priority_reduce_change(cf_queue_priority *priority_q, int new_pri, cf_queue_reduce_fn cb, void *udata)
281{
282 cf_queue_priority_lock(priority_q);
283
284 cf_queue *queues[3];
285
286 queues[0] = priority_q->high_q;
287 queues[1] = priority_q->medium_q;
288 queues[2] = priority_q->low_q;
289
290 int dest_q_itr = CF_QUEUE_PRIORITY_HIGH - new_pri;
291 cf_queue *q;
292
293 for (int q_itr = 0; q_itr < 3; q_itr++) {
294 q = queues[q_itr];
295
296 if (q_itr == dest_q_itr || CF_Q_SZ(q) == 0) {
297 continue;
298 }
299
300 for (uint32_t i = q->read_offset; i < q->write_offset; i++) {
301 int rv = cb(CF_Q_ELEM_PTR(q, i), udata);
302
303 if (rv == 0) {
304 continue;
305 }
306
307 if (rv == -1) {
308 // Found it - move to desired queue and return.
309 uint8_t* buf = alloca(q->element_sz);
310
311 memcpy(buf, CF_Q_ELEM_PTR(q, i), q->element_sz);
312 cf_queue_delete_offset(q, i);
313 cf_queue_push(queues[dest_q_itr], buf);
314
315 cf_queue_priority_unlock(priority_q);
316 return CF_QUEUE_OK;
317 }
318 }
319 }
320
321 cf_queue_priority_unlock(priority_q);
322 return CF_QUEUE_NOMATCH;
323}
324