1/*
2 * epoll_queue.c
3 *
4 * Copyright (C) 2019 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "epoll_queue.h"
28
29#include <errno.h>
30#include <stdbool.h>
31#include <stdint.h>
32#include <sys/eventfd.h>
33#include <unistd.h>
34
35#include "citrusleaf/alloc.h"
36
37#include "fault.h"
38
39
40//==========================================================
41// Forward declarations.
42//
43
44static void resize_queue(cf_epoll_queue* q);
45static void unwrap_queue(cf_epoll_queue* q);
46
47
48//==========================================================
49// Inlines & macros.
50//
51
52#define Q_N_ELES(_q) (_q->write_pos - _q->read_pos)
53#define Q_ELE_PTR(_q, _i) (&_q->eles[(_i % _q->capacity) * _q->ele_sz])
54#define Q_EMPTY(_q) (_q->write_pos == _q->read_pos)
55
56
57//==========================================================
58// Public API.
59//
60
61void
62cf_epoll_queue_init(cf_epoll_queue* q, uint32_t ele_sz, uint32_t capacity)
63{
64 q->event_fd = eventfd(0, EFD_NONBLOCK);
65
66 if (q->event_fd < 0) {
67 cf_crash(CF_MISC, "eventfd() failed: %d (%s)", errno,
68 cf_strerror(errno));
69 }
70
71 q->read_pos = 0;
72 q->write_pos = 0;
73
74 q->capacity = capacity;
75 q->ele_sz = ele_sz;
76 q->eles = cf_malloc(capacity * ele_sz);
77}
78
79void
80cf_epoll_queue_destroy(cf_epoll_queue* q)
81{
82 cf_free(q->eles);
83}
84
85void
86cf_epoll_queue_push(cf_epoll_queue* q, const void* ele)
87{
88 if (Q_N_ELES(q) == q->capacity) {
89 resize_queue(q);
90 }
91
92 memcpy(Q_ELE_PTR(q, q->write_pos), ele, q->ele_sz);
93 q->write_pos++;
94
95 unwrap_queue(q);
96
97 if (Q_N_ELES(q) == 1) {
98 uint64_t val = 1;
99
100 if (write(q->event_fd, &val, sizeof(val)) < 0) {
101 cf_crash(CF_MISC, "write() failed: %d (%s)", errno,
102 cf_strerror(errno));
103 }
104 }
105}
106
107bool
108cf_epoll_queue_pop(cf_epoll_queue* q, void* ele)
109{
110 if (Q_EMPTY(q)) {
111 return false;
112 }
113
114 memcpy(ele, Q_ELE_PTR(q, q->read_pos), q->ele_sz);
115 q->read_pos++;
116
117 if (Q_EMPTY(q)) {
118 q->read_pos = 0;
119 q->write_pos = 0;
120
121 uint64_t val;
122
123 if (read(q->event_fd, &val, sizeof(val)) < 0) {
124 cf_crash(CF_MISC, "read() failed: %d (%s)", errno,
125 cf_strerror(errno));
126 }
127 }
128
129 return true;
130}
131
132
133//==========================================================
134// Local helpers.
135//
136
137static void
138resize_queue(cf_epoll_queue* q)
139{
140 uint32_t new_capacity = q->capacity * 2;
141 uint32_t read_ix = q->read_pos % q->capacity;
142
143 if (read_ix == 0) {
144 q->eles = cf_realloc(q->eles, new_capacity * q->ele_sz);
145 }
146 else {
147 uint8_t* new_eles = cf_malloc(new_capacity * q->ele_sz);
148
149 uint32_t end_sz = (q->capacity - read_ix) * q->ele_sz;
150 uint32_t total_sz = q->capacity * q->ele_sz;
151
152 memcpy(new_eles, Q_ELE_PTR(q, q->read_pos), end_sz);
153 memcpy(new_eles + end_sz, q->eles, total_sz - end_sz);
154
155 cf_free(q->eles);
156 q->eles = new_eles;
157 }
158
159 q->read_pos = 0;
160 q->write_pos = q->capacity;
161 q->capacity = new_capacity;
162}
163
164static void
165unwrap_queue(cf_epoll_queue* q)
166{
167 if ((q->write_pos & 0xc0000000) == 0) {
168 return;
169 }
170
171 uint32_t n_eles = Q_N_ELES(q);
172
173 q->read_pos %= q->capacity;
174 q->write_pos = q->read_pos + n_eles;
175}
176