1 | /* |
2 | * epoll_queue.c |
3 | * |
4 | * Copyright (C) 2019 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | //========================================================== |
24 | // Includes. |
25 | // |
26 | |
27 | #include "epoll_queue.h" |
28 | |
29 | #include <errno.h> |
30 | #include <stdbool.h> |
31 | #include <stdint.h> |
32 | #include <sys/eventfd.h> |
33 | #include <unistd.h> |
34 | |
35 | #include "citrusleaf/alloc.h" |
36 | |
37 | #include "fault.h" |
38 | |
39 | |
40 | //========================================================== |
41 | // Forward declarations. |
42 | // |
43 | |
44 | static void resize_queue(cf_epoll_queue* q); |
45 | static void unwrap_queue(cf_epoll_queue* q); |
46 | |
47 | |
48 | //========================================================== |
49 | // Inlines & macros. |
50 | // |
51 | |
52 | #define Q_N_ELES(_q) (_q->write_pos - _q->read_pos) |
53 | #define Q_ELE_PTR(_q, _i) (&_q->eles[(_i % _q->capacity) * _q->ele_sz]) |
54 | #define Q_EMPTY(_q) (_q->write_pos == _q->read_pos) |
55 | |
56 | |
57 | //========================================================== |
58 | // Public API. |
59 | // |
60 | |
61 | void |
62 | cf_epoll_queue_init(cf_epoll_queue* q, uint32_t ele_sz, uint32_t capacity) |
63 | { |
64 | q->event_fd = eventfd(0, EFD_NONBLOCK); |
65 | |
66 | if (q->event_fd < 0) { |
67 | cf_crash(CF_MISC, "eventfd() failed: %d (%s)" , errno, |
68 | cf_strerror(errno)); |
69 | } |
70 | |
71 | q->read_pos = 0; |
72 | q->write_pos = 0; |
73 | |
74 | q->capacity = capacity; |
75 | q->ele_sz = ele_sz; |
76 | q->eles = cf_malloc(capacity * ele_sz); |
77 | } |
78 | |
79 | void |
80 | cf_epoll_queue_destroy(cf_epoll_queue* q) |
81 | { |
82 | cf_free(q->eles); |
83 | } |
84 | |
85 | void |
86 | cf_epoll_queue_push(cf_epoll_queue* q, const void* ele) |
87 | { |
88 | if (Q_N_ELES(q) == q->capacity) { |
89 | resize_queue(q); |
90 | } |
91 | |
92 | memcpy(Q_ELE_PTR(q, q->write_pos), ele, q->ele_sz); |
93 | q->write_pos++; |
94 | |
95 | unwrap_queue(q); |
96 | |
97 | if (Q_N_ELES(q) == 1) { |
98 | uint64_t val = 1; |
99 | |
100 | if (write(q->event_fd, &val, sizeof(val)) < 0) { |
101 | cf_crash(CF_MISC, "write() failed: %d (%s)" , errno, |
102 | cf_strerror(errno)); |
103 | } |
104 | } |
105 | } |
106 | |
107 | bool |
108 | cf_epoll_queue_pop(cf_epoll_queue* q, void* ele) |
109 | { |
110 | if (Q_EMPTY(q)) { |
111 | return false; |
112 | } |
113 | |
114 | memcpy(ele, Q_ELE_PTR(q, q->read_pos), q->ele_sz); |
115 | q->read_pos++; |
116 | |
117 | if (Q_EMPTY(q)) { |
118 | q->read_pos = 0; |
119 | q->write_pos = 0; |
120 | |
121 | uint64_t val; |
122 | |
123 | if (read(q->event_fd, &val, sizeof(val)) < 0) { |
124 | cf_crash(CF_MISC, "read() failed: %d (%s)" , errno, |
125 | cf_strerror(errno)); |
126 | } |
127 | } |
128 | |
129 | return true; |
130 | } |
131 | |
132 | |
133 | //========================================================== |
134 | // Local helpers. |
135 | // |
136 | |
137 | static void |
138 | resize_queue(cf_epoll_queue* q) |
139 | { |
140 | uint32_t new_capacity = q->capacity * 2; |
141 | uint32_t read_ix = q->read_pos % q->capacity; |
142 | |
143 | if (read_ix == 0) { |
144 | q->eles = cf_realloc(q->eles, new_capacity * q->ele_sz); |
145 | } |
146 | else { |
147 | uint8_t* new_eles = cf_malloc(new_capacity * q->ele_sz); |
148 | |
149 | uint32_t end_sz = (q->capacity - read_ix) * q->ele_sz; |
150 | uint32_t total_sz = q->capacity * q->ele_sz; |
151 | |
152 | memcpy(new_eles, Q_ELE_PTR(q, q->read_pos), end_sz); |
153 | memcpy(new_eles + end_sz, q->eles, total_sz - end_sz); |
154 | |
155 | cf_free(q->eles); |
156 | q->eles = new_eles; |
157 | } |
158 | |
159 | q->read_pos = 0; |
160 | q->write_pos = q->capacity; |
161 | q->capacity = new_capacity; |
162 | } |
163 | |
164 | static void |
165 | unwrap_queue(cf_epoll_queue* q) |
166 | { |
167 | if ((q->write_pos & 0xc0000000) == 0) { |
168 | return; |
169 | } |
170 | |
171 | uint32_t n_eles = Q_N_ELES(q); |
172 | |
173 | q->read_pos %= q->capacity; |
174 | q->write_pos = q->read_pos + n_eles; |
175 | } |
176 | |