1/*
2 * Copyright 2008-2018 Aerospike, Inc.
3 *
4 * Portions may be licensed to Aerospike, Inc. under one or more contributor
5 * license agreements.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8 * use this file except in compliance with the License. You may obtain a copy of
9 * the License at http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17#pragma once
18
19#include <aerospike/as_std.h>
20#include <pthread.h>
21
22#ifdef __cplusplus
23extern "C" {
24#endif
25
26/******************************************************************************
27 * CONSTANTS
28 ******************************************************************************/
29
30#ifndef CF_QUEUE_ALLOCSZ
31#define CF_QUEUE_ALLOCSZ 64
32#endif
33
34#define CF_QUEUE_OK 0
35#define CF_QUEUE_ERR -1
36#define CF_QUEUE_EMPTY -2
37#define CF_QUEUE_NOMATCH -3 // used in reduce_pop methods
38
39// mswait < 0 wait forever
40// mswait == 0 wait not at all
41// mswait > 0 wait that number of ms
42#define CF_QUEUE_FOREVER -1
43#define CF_QUEUE_NOWAIT 0
44#define CF_QUEUE_WAIT_1SEC 1000
45#define CF_QUEUE_WAIT_30SEC 30000
46
47/******************************************************************************
48 * TYPES
49 ******************************************************************************/
50
51typedef int (*cf_queue_reduce_fn) (void *buf, void *udata);
52
53/**
54 * cf_queue
55 */
56typedef struct cf_queue_s {
57 /**
58 * Private data - please use API.
59 */
60 bool threadsafe; // if false, no mutex lock
61 bool free_struct; // free struct cf_queue in addition to elements
62 unsigned int alloc_sz; // number of elements currently allocated
63 unsigned int read_offset; // offset (in elements) of head
64 unsigned int write_offset; // offset (in elements) past tail
65 size_t element_sz; // number of bytes in an element
66 pthread_mutex_t LOCK; // the mutex lock
67 pthread_cond_t CV; // the condvar
68 uint8_t * elements; // the block of queue elements
69} cf_queue;
70
71/******************************************************************************
72 * FUNCTIONS
73 ******************************************************************************/
74
75bool cf_queue_init(cf_queue* q, size_t element_sz, uint32_t capacity, bool threadsafe);
76
77cf_queue *cf_queue_create(size_t element_sz, bool threadsafe);
78
79void cf_queue_destroy(cf_queue *q);
80
81/**
82 * Get the number of elements currently in the queue.
83 */
84int cf_queue_sz(cf_queue *q);
85
86/**
87 * Push to the tail of the queue.
88 */
89int cf_queue_push(cf_queue *q, const void *ptr);
90
91/**
92 * Push element on the queue only if size < limit.
93 */
94bool cf_queue_push_limit(cf_queue *q, const void *ptr, uint32_t limit);
95
96/**
97 * Same as cf_queue_push() except it's a no-op if element is already queued.
98 */
99int cf_queue_push_unique(cf_queue *q, const void *ptr);
100
101/**
102 * Push to the front of the queue.
103 */
104int cf_queue_push_head(cf_queue *q, const void *ptr);
105
106/**
107 * Pops from the head of the queue.
108 */
109int cf_queue_pop(cf_queue *q, void *buf, int ms_wait);
110
111/**
112 * Run the entire queue, calling the callback, with the lock held.
113 *
114 * return -2 from the callback to delete an element and stop iterating
115 * return -1 from the callback to stop iterating
116 * return 0 from the callback to keep iterating
117 */
118int cf_queue_reduce(cf_queue *q, cf_queue_reduce_fn cb, void *udata);
119
120/**
121 * Find best element to pop from the queue via a reduce callback function.
122 *
123 * return 0 from the callback to keep iterating
124 * return -1 from the callback to pop and stop iterating
125 * return -2 from the callback if the element is the best to pop so far, but you
126 * want to keep looking
127 */
128int cf_queue_reduce_pop(cf_queue *q, void *buf, int ms_wait, cf_queue_reduce_fn cb, void *udata);
129
130/**
131 * Same as cf_queue_reduce() but run the queue from the tail.
132 */
133int cf_queue_reduce_reverse(cf_queue *q, cf_queue_reduce_fn cb, void *udata);
134
135/**
136 * The most common reason to want to 'reduce' is delete. If 'buf' is NULL, this
137 * will delete all. If 'only_one' is true, only the first occurrence matching
138 * 'buf' will be deleted. (Do this if you know there can only be one occurrence
139 * on the queue.)
140 */
141int cf_queue_delete(cf_queue *q, const void *ptr, bool only_one);
142
143/**
144 * Delete all items in queue.
145 */
146int cf_queue_delete_all(cf_queue *q);
147
148void cf_queue_delete_offset(cf_queue *q, uint32_t index);
149
150/******************************************************************************
151 * MACROS
152 ******************************************************************************/
153
154#define CF_Q_SZ(__q) (__q->write_offset - __q->read_offset)
155
156#define CF_Q_EMPTY(__q) (__q->write_offset == __q->read_offset)
157
158#define CF_Q_ELEM_PTR(__q, __i) (&__q->elements[(__i % __q->alloc_sz) * __q->element_sz])
159
160/******************************************************************************/
161
162#ifdef __cplusplus
163} // end extern "C"
164#endif
165