| 1 | /* |
| 2 | * epoll_queue.c |
| 3 | * |
| 4 | * Copyright (C) 2019 Aerospike, Inc. |
| 5 | * |
| 6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
| 7 | * license agreements. |
| 8 | * |
| 9 | * This program is free software: you can redistribute it and/or modify it under |
| 10 | * the terms of the GNU Affero General Public License as published by the Free |
| 11 | * Software Foundation, either version 3 of the License, or (at your option) any |
| 12 | * later version. |
| 13 | * |
| 14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
| 15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| 16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
| 17 | * details. |
| 18 | * |
| 19 | * You should have received a copy of the GNU Affero General Public License |
| 20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
| 21 | */ |
| 22 | |
| 23 | //========================================================== |
| 24 | // Includes. |
| 25 | // |
| 26 | |
| 27 | #include "epoll_queue.h" |
| 28 | |
| 29 | #include <errno.h> |
| 30 | #include <stdbool.h> |
| 31 | #include <stdint.h> |
| 32 | #include <sys/eventfd.h> |
| 33 | #include <unistd.h> |
| 34 | |
| 35 | #include "citrusleaf/alloc.h" |
| 36 | |
| 37 | #include "fault.h" |
| 38 | |
| 39 | |
| 40 | //========================================================== |
| 41 | // Forward declarations. |
| 42 | // |
| 43 | |
| 44 | static void resize_queue(cf_epoll_queue* q); |
| 45 | static void unwrap_queue(cf_epoll_queue* q); |
| 46 | |
| 47 | |
| 48 | //========================================================== |
| 49 | // Inlines & macros. |
| 50 | // |
| 51 | |
| 52 | #define Q_N_ELES(_q) (_q->write_pos - _q->read_pos) |
| 53 | #define Q_ELE_PTR(_q, _i) (&_q->eles[(_i % _q->capacity) * _q->ele_sz]) |
| 54 | #define Q_EMPTY(_q) (_q->write_pos == _q->read_pos) |
| 55 | |
| 56 | |
| 57 | //========================================================== |
| 58 | // Public API. |
| 59 | // |
| 60 | |
| 61 | void |
| 62 | cf_epoll_queue_init(cf_epoll_queue* q, uint32_t ele_sz, uint32_t capacity) |
| 63 | { |
| 64 | q->event_fd = eventfd(0, EFD_NONBLOCK); |
| 65 | |
| 66 | if (q->event_fd < 0) { |
| 67 | cf_crash(CF_MISC, "eventfd() failed: %d (%s)" , errno, |
| 68 | cf_strerror(errno)); |
| 69 | } |
| 70 | |
| 71 | q->read_pos = 0; |
| 72 | q->write_pos = 0; |
| 73 | |
| 74 | q->capacity = capacity; |
| 75 | q->ele_sz = ele_sz; |
| 76 | q->eles = cf_malloc(capacity * ele_sz); |
| 77 | } |
| 78 | |
| 79 | void |
| 80 | cf_epoll_queue_destroy(cf_epoll_queue* q) |
| 81 | { |
| 82 | cf_free(q->eles); |
| 83 | } |
| 84 | |
| 85 | void |
| 86 | cf_epoll_queue_push(cf_epoll_queue* q, const void* ele) |
| 87 | { |
| 88 | if (Q_N_ELES(q) == q->capacity) { |
| 89 | resize_queue(q); |
| 90 | } |
| 91 | |
| 92 | memcpy(Q_ELE_PTR(q, q->write_pos), ele, q->ele_sz); |
| 93 | q->write_pos++; |
| 94 | |
| 95 | unwrap_queue(q); |
| 96 | |
| 97 | if (Q_N_ELES(q) == 1) { |
| 98 | uint64_t val = 1; |
| 99 | |
| 100 | if (write(q->event_fd, &val, sizeof(val)) < 0) { |
| 101 | cf_crash(CF_MISC, "write() failed: %d (%s)" , errno, |
| 102 | cf_strerror(errno)); |
| 103 | } |
| 104 | } |
| 105 | } |
| 106 | |
| 107 | bool |
| 108 | cf_epoll_queue_pop(cf_epoll_queue* q, void* ele) |
| 109 | { |
| 110 | if (Q_EMPTY(q)) { |
| 111 | return false; |
| 112 | } |
| 113 | |
| 114 | memcpy(ele, Q_ELE_PTR(q, q->read_pos), q->ele_sz); |
| 115 | q->read_pos++; |
| 116 | |
| 117 | if (Q_EMPTY(q)) { |
| 118 | q->read_pos = 0; |
| 119 | q->write_pos = 0; |
| 120 | |
| 121 | uint64_t val; |
| 122 | |
| 123 | if (read(q->event_fd, &val, sizeof(val)) < 0) { |
| 124 | cf_crash(CF_MISC, "read() failed: %d (%s)" , errno, |
| 125 | cf_strerror(errno)); |
| 126 | } |
| 127 | } |
| 128 | |
| 129 | return true; |
| 130 | } |
| 131 | |
| 132 | |
| 133 | //========================================================== |
| 134 | // Local helpers. |
| 135 | // |
| 136 | |
| 137 | static void |
| 138 | resize_queue(cf_epoll_queue* q) |
| 139 | { |
| 140 | uint32_t new_capacity = q->capacity * 2; |
| 141 | uint32_t read_ix = q->read_pos % q->capacity; |
| 142 | |
| 143 | if (read_ix == 0) { |
| 144 | q->eles = cf_realloc(q->eles, new_capacity * q->ele_sz); |
| 145 | } |
| 146 | else { |
| 147 | uint8_t* new_eles = cf_malloc(new_capacity * q->ele_sz); |
| 148 | |
| 149 | uint32_t end_sz = (q->capacity - read_ix) * q->ele_sz; |
| 150 | uint32_t total_sz = q->capacity * q->ele_sz; |
| 151 | |
| 152 | memcpy(new_eles, Q_ELE_PTR(q, q->read_pos), end_sz); |
| 153 | memcpy(new_eles + end_sz, q->eles, total_sz - end_sz); |
| 154 | |
| 155 | cf_free(q->eles); |
| 156 | q->eles = new_eles; |
| 157 | } |
| 158 | |
| 159 | q->read_pos = 0; |
| 160 | q->write_pos = q->capacity; |
| 161 | q->capacity = new_capacity; |
| 162 | } |
| 163 | |
| 164 | static void |
| 165 | unwrap_queue(cf_epoll_queue* q) |
| 166 | { |
| 167 | if ((q->write_pos & 0xc0000000) == 0) { |
| 168 | return; |
| 169 | } |
| 170 | |
| 171 | uint32_t n_eles = Q_N_ELES(q); |
| 172 | |
| 173 | q->read_pos %= q->capacity; |
| 174 | q->write_pos = q->read_pos + n_eles; |
| 175 | } |
| 176 | |