1/*
2 * Copyright 2008-2018 Aerospike, Inc.
3 *
4 * Portions may be licensed to Aerospike, Inc. under one or more contributor
5 * license agreements.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8 * use this file except in compliance with the License. You may obtain a copy of
9 * the License at http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17#include <citrusleaf/cf_queue.h>
18#include <citrusleaf/cf_clock.h>
19#include <citrusleaf/alloc.h>
20#include <string.h>
21
22/******************************************************************************
23 * FUNCTIONS
24 ******************************************************************************/
25
26bool
27cf_queue_init(cf_queue *q, size_t element_sz, uint32_t capacity,
28 bool threadsafe)
29{
30 q->alloc_sz = capacity;
31 q->write_offset = q->read_offset = 0;
32 q->element_sz = element_sz;
33 q->threadsafe = threadsafe;
34 q->free_struct = false;
35
36 q->elements = (uint8_t*)cf_malloc(capacity * element_sz);
37
38 if (! q->elements) {
39 return false;
40 }
41
42 if (! q->threadsafe) {
43 return q;
44 }
45
46 if (0 != pthread_mutex_init(&q->LOCK, NULL)) {
47 cf_free(q->elements);
48 return false;
49 }
50
51 if (0 != pthread_cond_init(&q->CV, NULL)) {
52 pthread_mutex_destroy(&q->LOCK);
53 cf_free(q->elements);
54 return false;
55 }
56
57 return true;
58}
59
60cf_queue *
61cf_queue_create(size_t element_sz, bool threadsafe)
62{
63 cf_queue *q = (cf_queue*)cf_malloc(sizeof(cf_queue));
64
65 if (! q) {
66 return NULL;
67 }
68
69 if (! cf_queue_init(q, element_sz, CF_QUEUE_ALLOCSZ, threadsafe)) {
70 cf_free(q);
71 return NULL;
72 }
73
74 q->free_struct = true;
75
76 return q;
77}
78
79void
80cf_queue_destroy(cf_queue *q)
81{
82 if (q->threadsafe) {
83 pthread_cond_destroy(&q->CV);
84 pthread_mutex_destroy(&q->LOCK);
85 }
86
87 cf_free(q->elements);
88
89 if (q->free_struct) {
90 memset(q, 0, sizeof(cf_queue));
91 cf_free(q);
92 }
93}
94
95static inline void
96cf_queue_lock(cf_queue *q)
97{
98 if (q->threadsafe) {
99 pthread_mutex_lock(&q->LOCK);
100 }
101}
102
103static inline void
104cf_queue_unlock(cf_queue *q)
105{
106 if (q->threadsafe) {
107 pthread_mutex_unlock(&q->LOCK);
108 }
109}
110
111int
112cf_queue_sz(cf_queue *q)
113{
114 cf_queue_lock(q);
115 int rv = CF_Q_SZ(q);
116 cf_queue_unlock(q);
117
118 return rv;
119}
120
121//
122// Internal function. Call with new size with lock held. This function only
123// works on full queues.
124//
125static int
126cf_queue_resize(cf_queue *q, uint32_t new_sz)
127{
128 // Check if queue is not full.
129 if (CF_Q_SZ(q) != q->alloc_sz) {
130 return CF_QUEUE_ERR;
131 }
132
133 // The rare case where the queue is not fragmented, and realloc makes sense
134 // and none of the offsets need to move.
135 if (0 == q->read_offset % q->alloc_sz) {
136 q->elements = (uint8_t*)cf_realloc(q->elements, new_sz * q->element_sz);
137
138 if (! q->elements) {
139 return CF_QUEUE_ERR;
140 }
141
142 q->read_offset = 0;
143 q->write_offset = q->alloc_sz;
144 }
145 else {
146 uint8_t *newq = (uint8_t*)cf_malloc(new_sz * q->element_sz);
147
148 if (! newq) {
149 return CF_QUEUE_ERR;
150 }
151
152 // end_sz is used bytes in old queue from insert point to end.
153 size_t end_sz =
154 (q->alloc_sz - (q->read_offset % q->alloc_sz)) * q->element_sz;
155
156 memcpy(&newq[0], CF_Q_ELEM_PTR(q, q->read_offset), end_sz);
157 memcpy(&newq[end_sz], &q->elements[0],
158 (q->alloc_sz * q->element_sz) - end_sz);
159
160 cf_free(q->elements);
161 q->elements = newq;
162
163 q->write_offset = q->alloc_sz;
164 q->read_offset = 0;
165 }
166
167 q->alloc_sz = new_sz;
168
169 return CF_QUEUE_OK;
170}
171
172//
173// We have to guard against wrap-around, call this occasionally. We really
174// expect this will never get called, however it can be a symptom of a queue
175// getting really, really deep.
176//
177static inline void
178cf_queue_unwrap(cf_queue *q)
179{
180 if ((q->write_offset & 0xC0000000) != 0) {
181 int sz = CF_Q_SZ(q);
182
183 q->read_offset %= q->alloc_sz;
184 q->write_offset = q->read_offset + sz;
185 }
186}
187
188int
189cf_queue_push(cf_queue *q, const void *ptr)
190{
191 cf_queue_lock(q);
192
193 // Check queue length.
194 if (CF_Q_SZ(q) == q->alloc_sz) {
195 if (0 != cf_queue_resize(q, q->alloc_sz * 2)) {
196 cf_queue_unlock(q);
197 return CF_QUEUE_ERR;
198 }
199 }
200
201 // TODO - if queues are power of 2, this can be a shift.
202 memcpy(CF_Q_ELEM_PTR(q, q->write_offset), ptr, q->element_sz);
203 q->write_offset++;
204 cf_queue_unwrap(q);
205
206 if (q->threadsafe) {
207 pthread_cond_signal(&q->CV);
208 }
209
210 cf_queue_unlock(q);
211 return CF_QUEUE_OK;
212}
213
214//
215// Push element on the queue only if size < limit.
216//
217bool
218cf_queue_push_limit(cf_queue *q, const void *ptr, uint32_t limit)
219{
220 cf_queue_lock(q);
221
222 uint32_t size = CF_Q_SZ(q);
223
224 if (size >= limit) {
225 cf_queue_unlock(q);
226 return false;
227 }
228
229 if (size == q->alloc_sz) {
230 if (0 != cf_queue_resize(q, q->alloc_sz * 2)) {
231 cf_queue_unlock(q);
232 return false;
233 }
234 }
235
236 // TODO - if queues are power of 2, this can be a shift.
237 memcpy(CF_Q_ELEM_PTR(q, q->write_offset), ptr, q->element_sz);
238 q->write_offset++;
239 cf_queue_unwrap(q);
240
241 if (q->threadsafe) {
242 pthread_cond_signal(&q->CV);
243 }
244
245 cf_queue_unlock(q);
246 return true;
247}
248
249//
250// Same as cf_queue_push() except it's a no-op if element is already queued.
251//
252int
253cf_queue_push_unique(cf_queue *q, const void *ptr)
254{
255 cf_queue_lock(q);
256
257 // Check if element is already queued.
258 if (CF_Q_SZ(q) != 0) {
259 for (uint32_t i = q->read_offset; i < q->write_offset; i++) {
260 if (0 == memcmp(CF_Q_ELEM_PTR(q, i), ptr, q->element_sz)) {
261 // Element is already queued.
262 // TODO - return 0 if all callers regard this as normal?
263 cf_queue_unlock(q);
264 return -2;
265 }
266 }
267 }
268
269 if (CF_Q_SZ(q) == q->alloc_sz) {
270 if (0 != cf_queue_resize(q, q->alloc_sz * 2)) {
271 cf_queue_unlock(q);
272 return CF_QUEUE_ERR;
273 }
274 }
275
276 // TODO - if queues are power of 2, this can be a shift.
277 memcpy(CF_Q_ELEM_PTR(q, q->write_offset), ptr, q->element_sz);
278 q->write_offset++;
279 cf_queue_unwrap(q);
280
281 if (q->threadsafe) {
282 pthread_cond_signal(&q->CV);
283 }
284
285 cf_queue_unlock(q);
286 return CF_QUEUE_OK;
287}
288
289//
290// Push to the front of the queue.
291//
292int
293cf_queue_push_head(cf_queue *q, const void *ptr)
294{
295 cf_queue_lock(q);
296
297 if (CF_Q_SZ(q) == q->alloc_sz) {
298 if (0 != cf_queue_resize(q, q->alloc_sz * 2)) {
299 cf_queue_unlock(q);
300 return CF_QUEUE_ERR;
301 }
302 }
303
304 if (q->read_offset == 0) {
305 q->read_offset += q->alloc_sz;
306 q->write_offset += q->alloc_sz;
307 }
308
309 q->read_offset--;
310 memcpy(CF_Q_ELEM_PTR(q, q->read_offset), ptr, q->element_sz);
311
312 cf_queue_unwrap(q);
313
314 if (q->threadsafe) {
315 pthread_cond_signal(&q->CV);
316 }
317
318 cf_queue_unlock(q);
319 return CF_QUEUE_OK;
320}
321
322//
323// If ms_wait < 0, wait forever.
324// If ms_wait = 0, don't wait at all.
325// If ms_wait > 0, wait that number of milliseconds.
326//
327int
328cf_queue_pop(cf_queue *q, void *buf, int ms_wait)
329{
330 struct timespec tp;
331
332 if (ms_wait > 0) {
333 cf_set_wait_timespec(ms_wait, &tp);
334 }
335
336 cf_queue_lock(q);
337
338 if (q->threadsafe) {
339
340 // Note that we have to use a while() loop. The pthread_cond_signal()
341 // documentation says that AT LEAST ONE waiting thread will be awakened.
342 // If more than one are awakened, the first will get the popped element,
343 // others will find the queue empty and go back to waiting.
344
345 while (CF_Q_EMPTY(q)) {
346 if (CF_QUEUE_FOREVER == ms_wait) {
347 pthread_cond_wait(&q->CV, &q->LOCK);
348 }
349 else if (CF_QUEUE_NOWAIT == ms_wait) {
350 pthread_mutex_unlock(&q->LOCK);
351 return CF_QUEUE_EMPTY;
352 }
353 else {
354 pthread_cond_timedwait(&q->CV, &q->LOCK, &tp);
355
356 if (CF_Q_EMPTY(q)) {
357 pthread_mutex_unlock(&q->LOCK);
358 return CF_QUEUE_EMPTY;
359 }
360 }
361 }
362 }
363 else if (CF_Q_EMPTY(q)) {
364 return CF_QUEUE_EMPTY;
365 }
366
367 memcpy(buf, CF_Q_ELEM_PTR(q, q->read_offset), q->element_sz);
368 q->read_offset++;
369
370 // This probably keeps the cache fresher because the queue is fully empty.
371 if (q->read_offset == q->write_offset) {
372 q->read_offset = q->write_offset = 0;
373 }
374
375 cf_queue_unlock(q);
376 return CF_QUEUE_OK;
377}
378
379void
380cf_queue_delete_offset(cf_queue *q, uint32_t index)
381{
382 index %= q->alloc_sz;
383
384 uint32_t r_index = q->read_offset % q->alloc_sz;
385 uint32_t w_index = q->write_offset % q->alloc_sz;
386
387 // Assumes index is validated!
388
389 // If we're deleting the one at the head, just increase the read offset.
390 if (index == r_index) {
391 q->read_offset++;
392 return;
393 }
394
395 // If we're deleting the tail just decrease the write offset.
396 if (w_index && (index == w_index - 1)) {
397 q->write_offset--;
398 return;
399 }
400
401 if (index > r_index) {
402 // The memory copy is overlapping, so must use memmove().
403 memmove(&q->elements[(r_index + 1) * q->element_sz],
404 &q->elements[r_index * q->element_sz],
405 (index - r_index) * q->element_sz);
406 q->read_offset++;
407 return;
408 }
409
410 if (index < w_index) {
411 // The memory copy is overlapping, so must use memmove().
412 memmove(&q->elements[index * q->element_sz],
413 &q->elements[(index + 1) * q->element_sz],
414 (w_index - index - 1) * q->element_sz);
415 q->write_offset--;
416 }
417}
418
419//
420// Iterate over all queue members, calling the callback.
421//
422int
423cf_queue_reduce(cf_queue *q, cf_queue_reduce_fn cb, void *udata)
424{
425 cf_queue_lock(q);
426
427 if (CF_Q_SZ(q) != 0) {
428 for (uint32_t i = q->read_offset; i < q->write_offset; i++) {
429 int rv = cb(CF_Q_ELEM_PTR(q, i), udata);
430
431 if (rv == 0) {
432 continue;
433 }
434
435 if (rv == -1) {
436 // Found what it was looking for, stop reducing.
437 break;
438 }
439
440 if (rv == -2) {
441 // Delete, and stop reducing.
442 cf_queue_delete_offset(q, i);
443 break;
444 }
445 }
446 }
447
448 cf_queue_unlock(q);
449 return CF_QUEUE_OK;
450}
451
452//
453// Iterate over all queue members, calling the callback. Pop element (or not)
454// based on callback return value.
455//
456int
457cf_queue_reduce_pop(cf_queue *q, void *buf, int ms_wait, cf_queue_reduce_fn cb,
458 void *udata)
459{
460 struct timespec tp;
461
462 if (ms_wait > 0) {
463 cf_set_wait_timespec(ms_wait, &tp);
464 }
465
466 cf_queue_lock(q);
467
468 if (q->threadsafe) {
469
470 // Note that we have to use a while() loop. The pthread_cond_signal()
471 // documentation says that AT LEAST ONE waiting thread will be awakened.
472 // If more than one are awakened, the first will get the popped element,
473 // others will find the queue empty and go back to waiting.
474
475 while (CF_Q_EMPTY(q)) {
476 if (CF_QUEUE_FOREVER == ms_wait) {
477 pthread_cond_wait(&q->CV, &q->LOCK);
478 }
479 else if (CF_QUEUE_NOWAIT == ms_wait) {
480 pthread_mutex_unlock(&q->LOCK);
481 return CF_QUEUE_EMPTY;
482 }
483 else {
484 pthread_cond_timedwait(&q->CV, &q->LOCK, &tp);
485
486 if (CF_Q_EMPTY(q)) {
487 pthread_mutex_unlock(&q->LOCK);
488 return CF_QUEUE_EMPTY;
489 }
490 }
491 }
492 }
493 else if (CF_Q_EMPTY(q)) {
494 return CF_QUEUE_EMPTY;
495 }
496
497 uint32_t best_index = q->read_offset;
498
499 for (uint32_t i = q->read_offset; i < q->write_offset; i++) {
500 int rv = cb(CF_Q_ELEM_PTR(q, i), udata);
501
502 if (rv == 0) {
503 continue;
504 }
505
506 if (rv == -1) {
507 // Found what it was looking for, so break.
508 best_index = i;
509 break;
510 }
511
512 if (rv == -2) {
513 // Found new candidate, but keep looking for a better one.
514 best_index = i;
515 }
516 }
517
518 memcpy(buf, CF_Q_ELEM_PTR(q, best_index), q->element_sz);
519 cf_queue_delete_offset(q, best_index);
520
521 cf_queue_unlock(q);
522
523 return CF_QUEUE_OK;
524}
525
526//
527// Iterate over all queue members starting from the tail, calling the callback.
528//
529int
530cf_queue_reduce_reverse(cf_queue *q, cf_queue_reduce_fn cb, void *udata)
531{
532 cf_queue_lock(q);
533
534 if (CF_Q_SZ(q) != 0) {
535 for (int i = (int)q->write_offset - 1; i >= (int)q->read_offset; i--) {
536 int rv = cb(CF_Q_ELEM_PTR(q, i), udata);
537
538 if (rv == 0) {
539 continue;
540 }
541
542 if (rv == -1) {
543 // Found what it was looking for, stop reducing.
544 break;
545 }
546
547 if (rv == -2) {
548 // Delete, and stop reducing.
549 cf_queue_delete_offset(q, i);
550 break;
551 }
552 }
553 }
554
555 cf_queue_unlock(q);
556 return CF_QUEUE_OK;
557}
558
559//
560// Delete element(s) from the queue. Pass 'only_one' as true if there can be
561// only one element with this value on the queue.
562//
563int
564cf_queue_delete(cf_queue *q, const void *ptr, bool only_one)
565{
566 cf_queue_lock(q);
567
568 bool found = false;
569
570 if (CF_Q_SZ(q) != 0) {
571 for (uint32_t i = q->read_offset; i < q->write_offset; i++) {
572 int rv = 0;
573
574 // If buf is null, rv is always 0 and we delete all elements.
575 if (ptr) {
576 rv = memcmp(CF_Q_ELEM_PTR(q, i), ptr, q->element_sz);
577 }
578
579 if (rv == 0) {
580 cf_queue_delete_offset(q, i);
581 found = true;
582
583 if (only_one) {
584 break;
585 }
586 }
587 }
588 }
589
590 cf_queue_unlock(q);
591 return found ? CF_QUEUE_OK : CF_QUEUE_EMPTY;
592}
593
594int
595cf_queue_delete_all(cf_queue *q)
596{
597 return cf_queue_delete(q, NULL, false);
598}
599