| 1 | /* |
| 2 | * Copyright 2008-2018 Aerospike, Inc. |
| 3 | * |
| 4 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
| 5 | * license agreements. |
| 6 | * |
| 7 | * Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| 8 | * use this file except in compliance with the License. You may obtain a copy of |
| 9 | * the License at http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | * |
| 11 | * Unless required by applicable law or agreed to in writing, software |
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| 13 | * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| 14 | * License for the specific language governing permissions and limitations under |
| 15 | * the License. |
| 16 | */ |
| 17 | #include <citrusleaf/cf_queue_priority.h> |
| 18 | #include <citrusleaf/cf_clock.h> |
| 19 | #include <citrusleaf/alloc.h> |
| 20 | #include <string.h> |
| 21 | |
| 22 | /****************************************************************************** |
| 23 | * FUNCTIONS |
| 24 | ******************************************************************************/ |
| 25 | |
| 26 | cf_queue_priority *cf_queue_priority_create(size_t element_sz, bool threadsafe) |
| 27 | { |
| 28 | cf_queue_priority *q = (cf_queue_priority*)cf_malloc(sizeof(cf_queue_priority)); |
| 29 | |
| 30 | if (! q) { |
| 31 | return NULL; |
| 32 | } |
| 33 | |
| 34 | q->threadsafe = threadsafe; |
| 35 | |
| 36 | if (! (q->low_q = cf_queue_create(element_sz, false))) { |
| 37 | goto Fail1; |
| 38 | } |
| 39 | |
| 40 | if (! (q->medium_q = cf_queue_create(element_sz, false))) { |
| 41 | goto Fail2; |
| 42 | } |
| 43 | |
| 44 | if (! (q->high_q = cf_queue_create(element_sz, false))) { |
| 45 | goto Fail3; |
| 46 | } |
| 47 | |
| 48 | if (! threadsafe) { |
| 49 | return q; |
| 50 | } |
| 51 | |
| 52 | if (0 != pthread_mutex_init(&q->LOCK, NULL)) { |
| 53 | goto Fail4; |
| 54 | } |
| 55 | |
| 56 | if (0 != pthread_cond_init(&q->CV, NULL)) { |
| 57 | goto Fail5; |
| 58 | } |
| 59 | |
| 60 | return q; |
| 61 | |
| 62 | Fail5: |
| 63 | pthread_mutex_destroy(&q->LOCK); |
| 64 | Fail4: |
| 65 | cf_queue_destroy(q->high_q); |
| 66 | Fail3: |
| 67 | cf_queue_destroy(q->medium_q); |
| 68 | Fail2: |
| 69 | cf_queue_destroy(q->low_q); |
| 70 | Fail1: |
| 71 | cf_free(q); |
| 72 | |
| 73 | return NULL; |
| 74 | } |
| 75 | |
| 76 | void cf_queue_priority_destroy(cf_queue_priority *q) |
| 77 | { |
| 78 | cf_queue_destroy(q->high_q); |
| 79 | cf_queue_destroy(q->medium_q); |
| 80 | cf_queue_destroy(q->low_q); |
| 81 | |
| 82 | if (q->threadsafe) { |
| 83 | pthread_mutex_destroy(&q->LOCK); |
| 84 | pthread_cond_destroy(&q->CV); |
| 85 | } |
| 86 | |
| 87 | cf_free(q); |
| 88 | } |
| 89 | |
| 90 | static inline void cf_queue_priority_lock(cf_queue_priority *q) |
| 91 | { |
| 92 | if (q->threadsafe) { |
| 93 | pthread_mutex_lock(&q->LOCK); |
| 94 | } |
| 95 | } |
| 96 | |
| 97 | static inline void cf_queue_priority_unlock(cf_queue_priority *q) |
| 98 | { |
| 99 | if (q->threadsafe) { |
| 100 | pthread_mutex_unlock(&q->LOCK); |
| 101 | } |
| 102 | } |
| 103 | |
| 104 | int cf_queue_priority_push(cf_queue_priority *q, const void *ptr, int pri) |
| 105 | { |
| 106 | cf_queue_priority_lock(q); |
| 107 | |
| 108 | int rv = CF_QUEUE_ERR; |
| 109 | |
| 110 | if (pri == CF_QUEUE_PRIORITY_HIGH) { |
| 111 | rv = cf_queue_push(q->high_q, ptr); |
| 112 | } |
| 113 | else if (pri == CF_QUEUE_PRIORITY_MEDIUM) { |
| 114 | rv = cf_queue_push(q->medium_q, ptr); |
| 115 | } |
| 116 | else if (pri == CF_QUEUE_PRIORITY_LOW) { |
| 117 | rv = cf_queue_push(q->low_q, ptr); |
| 118 | } |
| 119 | |
| 120 | if (rv == 0 && q->threadsafe) { |
| 121 | pthread_cond_signal(&q->CV); |
| 122 | } |
| 123 | |
| 124 | cf_queue_priority_unlock(q); |
| 125 | return rv; |
| 126 | } |
| 127 | |
| 128 | int cf_queue_priority_pop(cf_queue_priority *q, void *buf, int ms_wait) |
| 129 | { |
| 130 | cf_queue_priority_lock(q); |
| 131 | |
| 132 | struct timespec tp; |
| 133 | |
| 134 | if (ms_wait > 0) { |
| 135 | cf_set_wait_timespec(ms_wait, &tp); |
| 136 | } |
| 137 | |
| 138 | if (q->threadsafe) { |
| 139 | while (CF_Q_PRI_EMPTY(q)) { |
| 140 | if (CF_QUEUE_FOREVER == ms_wait) { |
| 141 | pthread_cond_wait(&q->CV, &q->LOCK); |
| 142 | } |
| 143 | else if (CF_QUEUE_NOWAIT == ms_wait) { |
| 144 | pthread_mutex_unlock(&q->LOCK); |
| 145 | return CF_QUEUE_EMPTY; |
| 146 | } |
| 147 | else { |
| 148 | pthread_cond_timedwait(&q->CV, &q->LOCK, &tp); |
| 149 | |
| 150 | if (CF_Q_PRI_EMPTY(q)) { |
| 151 | pthread_mutex_unlock(&q->LOCK); |
| 152 | return CF_QUEUE_EMPTY; |
| 153 | } |
| 154 | } |
| 155 | } |
| 156 | } |
| 157 | |
| 158 | int rv = CF_QUEUE_EMPTY; |
| 159 | |
| 160 | if (CF_Q_SZ(q->high_q)) { |
| 161 | rv = cf_queue_pop(q->high_q, buf, 0); |
| 162 | } |
| 163 | else if (CF_Q_SZ(q->medium_q)) { |
| 164 | rv = cf_queue_pop(q->medium_q, buf, 0); |
| 165 | } |
| 166 | else if (CF_Q_SZ(q->low_q)) { |
| 167 | rv = cf_queue_pop(q->low_q, buf, 0); |
| 168 | } |
| 169 | |
| 170 | cf_queue_priority_unlock(q); |
| 171 | return rv; |
| 172 | } |
| 173 | |
| 174 | int cf_queue_priority_sz(cf_queue_priority *q) |
| 175 | { |
| 176 | int rv = 0; |
| 177 | |
| 178 | cf_queue_priority_lock(q); |
| 179 | rv += cf_queue_sz(q->high_q); |
| 180 | rv += cf_queue_sz(q->medium_q); |
| 181 | rv += cf_queue_sz(q->low_q); |
| 182 | cf_queue_priority_unlock(q); |
| 183 | |
| 184 | return rv; |
| 185 | } |
| 186 | |
| 187 | /** |
| 188 | * Use this function to find an element to pop from the queue using a reduce |
| 189 | * callback function. Have the callback function return -1 when you want to pop |
| 190 | * the element and stop reducing. If you have not popped an element, |
| 191 | * CF_QUEUE_NOMATCH is returned. |
| 192 | */ |
| 193 | int cf_queue_priority_reduce_pop(cf_queue_priority *priority_q, void *buf, cf_queue_reduce_fn cb, void *udata) |
| 194 | { |
| 195 | cf_queue_priority_lock(priority_q); |
| 196 | |
| 197 | cf_queue *queues[3]; |
| 198 | |
| 199 | queues[0] = priority_q->high_q; |
| 200 | queues[1] = priority_q->medium_q; |
| 201 | queues[2] = priority_q->low_q; |
| 202 | |
| 203 | cf_queue *q; |
| 204 | |
| 205 | for (int q_itr = 0; q_itr < 3; q_itr++) { |
| 206 | q = queues[q_itr]; |
| 207 | |
| 208 | if (CF_Q_SZ(q) == 0) { |
| 209 | continue; |
| 210 | } |
| 211 | |
| 212 | for (uint32_t i = q->read_offset; i < q->write_offset; i++) { |
| 213 | int rv = cb(CF_Q_ELEM_PTR(q, i), udata); |
| 214 | |
| 215 | if (rv == 0) { |
| 216 | continue; |
| 217 | } |
| 218 | |
| 219 | if (rv == -1) { |
| 220 | // Found an element, so copy to 'buf', delete from q, and return. |
| 221 | memcpy(buf, CF_Q_ELEM_PTR(q, i), q->element_sz); |
| 222 | cf_queue_delete_offset(q, i); |
| 223 | |
| 224 | cf_queue_priority_unlock(priority_q); |
| 225 | return CF_QUEUE_OK; |
| 226 | } |
| 227 | } |
| 228 | } |
| 229 | |
| 230 | cf_queue_priority_unlock(priority_q); |
| 231 | return CF_QUEUE_NOMATCH; |
| 232 | } |
| 233 | |
| 234 | // |
| 235 | // This assumes the element we're looking for is unique! Returns |
| 236 | // CF_QUEUE_NOMATCH if the element is not found or not moved. |
| 237 | // |
| 238 | int cf_queue_priority_change(cf_queue_priority *priority_q, const void *ptr, int new_pri) |
| 239 | { |
| 240 | cf_queue_priority_lock(priority_q); |
| 241 | |
| 242 | cf_queue *queues[3]; |
| 243 | |
| 244 | queues[0] = priority_q->high_q; |
| 245 | queues[1] = priority_q->medium_q; |
| 246 | queues[2] = priority_q->low_q; |
| 247 | |
| 248 | int dest_q_itr = CF_QUEUE_PRIORITY_HIGH - new_pri; |
| 249 | cf_queue *q; |
| 250 | |
| 251 | for (int q_itr = 0; q_itr < 3; q_itr++) { |
| 252 | q = queues[q_itr]; |
| 253 | |
| 254 | if (q_itr == dest_q_itr || CF_Q_SZ(q) == 0) { |
| 255 | continue; |
| 256 | } |
| 257 | |
| 258 | for (uint32_t i = q->read_offset; i < q->write_offset; i++) { |
| 259 | if (memcmp(CF_Q_ELEM_PTR(q, i), ptr, q->element_sz) == 0) { |
| 260 | // Move it to the queue with desired priority. |
| 261 | cf_queue_delete_offset(q, i); |
| 262 | cf_queue_push(queues[dest_q_itr], ptr); |
| 263 | |
| 264 | cf_queue_priority_unlock(priority_q); |
| 265 | return CF_QUEUE_OK; |
| 266 | } |
| 267 | } |
| 268 | } |
| 269 | |
| 270 | cf_queue_priority_unlock(priority_q); |
| 271 | return CF_QUEUE_NOMATCH; |
| 272 | } |
| 273 | |
| 274 | // |
| 275 | // Reduce the inner queues whose priorities are different to 'new_pri'. If the |
| 276 | // callback returns -1, move that element to the inner queue whose priority is |
| 277 | // 'new_pri' and return CF_QUEUE_OK. Returns CF_QUEUE_NOMATCH if callback never |
| 278 | // triggers a move. |
| 279 | // |
| 280 | int cf_queue_priority_reduce_change(cf_queue_priority *priority_q, int new_pri, cf_queue_reduce_fn cb, void *udata) |
| 281 | { |
| 282 | cf_queue_priority_lock(priority_q); |
| 283 | |
| 284 | cf_queue *queues[3]; |
| 285 | |
| 286 | queues[0] = priority_q->high_q; |
| 287 | queues[1] = priority_q->medium_q; |
| 288 | queues[2] = priority_q->low_q; |
| 289 | |
| 290 | int dest_q_itr = CF_QUEUE_PRIORITY_HIGH - new_pri; |
| 291 | cf_queue *q; |
| 292 | |
| 293 | for (int q_itr = 0; q_itr < 3; q_itr++) { |
| 294 | q = queues[q_itr]; |
| 295 | |
| 296 | if (q_itr == dest_q_itr || CF_Q_SZ(q) == 0) { |
| 297 | continue; |
| 298 | } |
| 299 | |
| 300 | for (uint32_t i = q->read_offset; i < q->write_offset; i++) { |
| 301 | int rv = cb(CF_Q_ELEM_PTR(q, i), udata); |
| 302 | |
| 303 | if (rv == 0) { |
| 304 | continue; |
| 305 | } |
| 306 | |
| 307 | if (rv == -1) { |
| 308 | // Found it - move to desired queue and return. |
| 309 | uint8_t* buf = alloca(q->element_sz); |
| 310 | |
| 311 | memcpy(buf, CF_Q_ELEM_PTR(q, i), q->element_sz); |
| 312 | cf_queue_delete_offset(q, i); |
| 313 | cf_queue_push(queues[dest_q_itr], buf); |
| 314 | |
| 315 | cf_queue_priority_unlock(priority_q); |
| 316 | return CF_QUEUE_OK; |
| 317 | } |
| 318 | } |
| 319 | } |
| 320 | |
| 321 | cf_queue_priority_unlock(priority_q); |
| 322 | return CF_QUEUE_NOMATCH; |
| 323 | } |
| 324 | |