1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2017 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#ifndef _RDBUF_H
30#define _RDBUF_H
31
32#ifndef _MSC_VER
33/* for struct iovec */
34#include <sys/socket.h>
35#include <sys/types.h>
36#endif
37
38#include "rdsysqueue.h"
39
40
41/**
42 * @name Generic byte buffers
43 *
44 * @{
45 *
46 * A buffer is a list of segments, each segment having a memory pointer,
47 * write offset, and capacity.
48 *
49 * The main buffer and segment structure is tailored for append-writing
50 * or append-pushing foreign memory.
51 *
52 * Updates of previously written memory regions are possible through the
53 * use of write_update() that takes an absolute offset.
54 *
55 * The write position is part of the buffer and segment structures, while
56 * read is a separate object (rd_slice_t) that does not affect the buffer.
57 */
58
59
60/**
61 * @brief Buffer segment
62 */
63typedef struct rd_segment_s {
64 TAILQ_ENTRY(rd_segment_s) seg_link; /*<< rbuf_segments Link */
65 char *seg_p; /**< Backing-store memory */
66 size_t seg_of; /**< Current relative write-position
67 * (length of payload in this segment) */
68 size_t seg_size; /**< Allocated size of seg_p */
69 size_t seg_absof; /**< Absolute offset of this segment's
70 * beginning in the grand rd_buf_t */
71 void (*seg_free) (void *p); /**< Optional free function for seg_p */
72 int seg_flags; /**< Segment flags */
73#define RD_SEGMENT_F_RDONLY 0x1 /**< Read-only segment */
74#define RD_SEGMENT_F_FREE 0x2 /**< Free segment on destroy,
75 * e.g, not a fixed segment. */
76} rd_segment_t;
77
78
79
80
81TAILQ_HEAD(rd_segment_head,rd_segment_s);
82
83/**
84 * @brief Buffer, containing a list of segments.
85 */
86typedef struct rd_buf_s {
87 struct rd_segment_head rbuf_segments; /**< TAILQ list of segments */
88 size_t rbuf_segment_cnt; /**< Number of segments */
89
90 rd_segment_t *rbuf_wpos; /**< Current write position seg */
91 size_t rbuf_len; /**< Current (written) length */
92 size_t rbuf_size; /**< Total allocated size of
93 * all segments. */
94
95 char *rbuf_extra; /* Extra memory allocated for
96 * use by segment structs,
97 * buffer memory, etc. */
98 size_t rbuf_extra_len; /* Current extra memory used */
99 size_t rbuf_extra_size; /* Total size of extra memory */
100} rd_buf_t;
101
102
103
104/**
105 * @brief A read-only slice of a buffer.
106 */
107typedef struct rd_slice_s {
108 const rd_buf_t *buf; /**< Pointer to buffer */
109 const rd_segment_t *seg; /**< Current read position segment.
110 * Will point to NULL when end of
111 * slice is reached. */
112 size_t rof; /**< Relative read offset in segment */
113 size_t start; /**< Slice start offset in buffer */
114 size_t end; /**< Slice end offset in buffer+1 */
115} rd_slice_t;
116
117
118
119/**
120 * @returns the current write position (absolute offset)
121 */
122static RD_INLINE RD_UNUSED size_t rd_buf_write_pos (const rd_buf_t *rbuf) {
123 const rd_segment_t *seg = rbuf->rbuf_wpos;
124
125 if (unlikely(!seg)) {
126#if ENABLE_DEVEL
127 rd_assert(rbuf->rbuf_len == 0);
128#endif
129 return 0;
130 }
131#if ENABLE_DEVEL
132 rd_assert(seg->seg_absof + seg->seg_of == rbuf->rbuf_len);
133#endif
134 return seg->seg_absof + seg->seg_of;
135}
136
137
138/**
139 * @returns the number of bytes available for writing (before growing).
140 */
141static RD_INLINE RD_UNUSED size_t rd_buf_write_remains (const rd_buf_t *rbuf) {
142 return rbuf->rbuf_size - rbuf->rbuf_len;
143}
144
145
146
147
148/**
149 * @returns the number of bytes remaining to write to the given segment,
150 * and sets the \p *p pointer (unless NULL) to the start of
151 * the contiguous memory.
152 */
153static RD_INLINE RD_UNUSED size_t
154rd_segment_write_remains (const rd_segment_t *seg, void **p) {
155 if (unlikely((seg->seg_flags & RD_SEGMENT_F_RDONLY)))
156 return 0;
157 if (p)
158 *p = (void *)(seg->seg_p + seg->seg_of);
159 return seg->seg_size - seg->seg_of;
160}
161
162
163
164/**
165 * @returns the last segment for the buffer.
166 */
167static RD_INLINE RD_UNUSED rd_segment_t *rd_buf_last (const rd_buf_t *rbuf) {
168 return TAILQ_LAST(&rbuf->rbuf_segments, rd_segment_head);
169}
170
171
172/**
173 * @returns the total written buffer length
174 */
175static RD_INLINE RD_UNUSED size_t rd_buf_len (const rd_buf_t *rbuf) {
176 return rbuf->rbuf_len;
177}
178
179
180int rd_buf_write_seek (rd_buf_t *rbuf, size_t absof);
181
182
183size_t rd_buf_write (rd_buf_t *rbuf, const void *payload, size_t size);
184size_t rd_buf_write_slice (rd_buf_t *rbuf, rd_slice_t *slice);
185size_t rd_buf_write_update (rd_buf_t *rbuf, size_t absof,
186 const void *payload, size_t size);
187void rd_buf_push (rd_buf_t *rbuf, const void *payload, size_t size,
188 void (*free_cb)(void *));
189
190
191size_t rd_buf_get_writable (rd_buf_t *rbuf, void **p);
192
193void rd_buf_write_ensure_contig (rd_buf_t *rbuf, size_t size);
194
195void rd_buf_write_ensure (rd_buf_t *rbuf, size_t min_size, size_t max_size);
196
197size_t rd_buf_get_write_iov (const rd_buf_t *rbuf,
198 struct iovec *iovs, size_t *iovcntp,
199 size_t iov_max, size_t size_max);
200
201void rd_buf_init (rd_buf_t *rbuf, size_t fixed_seg_cnt, size_t buf_size);
202
203void rd_buf_destroy (rd_buf_t *rbuf);
204
205void rd_buf_dump (const rd_buf_t *rbuf, int do_hexdump);
206
207int unittest_rdbuf (void);
208
209
210/**@}*/
211
212
213
214
215/**
216 * @name Buffer read operates on slices of an rd_buf_t and does not
217 * modify the underlying itself.
218 *
219 * @warning A slice will not be valid/safe after the buffer or
220 * segments have been modified by a buf write operation
221 * (write, update, write_seek, etc).
222 * @{
223 */
224
225
226/**
227 * @returns the remaining length in the slice
228 */
229#define rd_slice_remains(slice) ((slice)->end - rd_slice_abs_offset(slice))
230
231/**
232 * @returns the total size of the slice, regardless of current position.
233 */
234#define rd_slice_size(slice) ((slice)->end - (slice)->start)
235
236/**
237 * @returns the read position in the slice as a new slice.
238 */
239static RD_INLINE RD_UNUSED rd_slice_t rd_slice_pos (const rd_slice_t *slice) {
240 rd_slice_t newslice = *slice;
241
242 if (!slice->seg)
243 return newslice;
244
245 newslice.start = slice->seg->seg_absof + slice->rof;
246
247 return newslice;
248}
249
250/**
251 * @returns the read position as an absolute buffer byte offset.
252 * @remark this is the buffer offset, not the slice's local offset.
253 */
254static RD_INLINE RD_UNUSED size_t
255rd_slice_abs_offset (const rd_slice_t *slice) {
256 if (unlikely(!slice->seg)) /* reader has reached the end */
257 return slice->end;
258
259 return slice->seg->seg_absof + slice->rof;
260}
261
262/**
263 * @returns the read position as a byte offset.
264 * @remark this is the slice-local offset, not the backing buffer's offset.
265 */
266static RD_INLINE RD_UNUSED size_t rd_slice_offset (const rd_slice_t *slice) {
267 if (unlikely(!slice->seg)) /* reader has reached the end */
268 return rd_slice_size(slice);
269
270 return (slice->seg->seg_absof + slice->rof) - slice->start;
271}
272
273
274
275
276int rd_slice_init_seg (rd_slice_t *slice, const rd_buf_t *rbuf,
277 const rd_segment_t *seg, size_t rof, size_t size);
278int rd_slice_init (rd_slice_t *slice, const rd_buf_t *rbuf,
279 size_t absof, size_t size);
280void rd_slice_init_full (rd_slice_t *slice, const rd_buf_t *rbuf);
281
282size_t rd_slice_reader (rd_slice_t *slice, const void **p);
283size_t rd_slice_peeker (const rd_slice_t *slice, const void **p);
284
285size_t rd_slice_read (rd_slice_t *slice, void *dst, size_t size);
286size_t rd_slice_peek (const rd_slice_t *slice, size_t offset,
287 void *dst, size_t size);
288
289size_t rd_slice_read_varint (rd_slice_t *slice, size_t *nump);
290
291const void *rd_slice_ensure_contig (rd_slice_t *slice, size_t size);
292
293int rd_slice_seek (rd_slice_t *slice, size_t offset);
294
295size_t rd_slice_get_iov (const rd_slice_t *slice,
296 struct iovec *iovs, size_t *iovcntp,
297 size_t iov_max, size_t size_max);
298
299
300uint32_t rd_slice_crc32 (rd_slice_t *slice);
301uint32_t rd_slice_crc32c (rd_slice_t *slice);
302
303
304int rd_slice_narrow (rd_slice_t *slice, rd_slice_t *save_slice, size_t size)
305 RD_WARN_UNUSED_RESULT;
306int rd_slice_narrow_relative (rd_slice_t *slice, rd_slice_t *save_slice,
307 size_t relsize)
308 RD_WARN_UNUSED_RESULT;
309void rd_slice_widen (rd_slice_t *slice, const rd_slice_t *save_slice);
310int rd_slice_narrow_copy (const rd_slice_t *orig, rd_slice_t *new_slice,
311 size_t size)
312 RD_WARN_UNUSED_RESULT;
313int rd_slice_narrow_copy_relative (const rd_slice_t *orig,
314 rd_slice_t *new_slice,
315 size_t relsize)
316 RD_WARN_UNUSED_RESULT;
317
318void rd_slice_dump (const rd_slice_t *slice, int do_hexdump);
319
320
321/**@}*/
322
323
324
325#endif /* _RDBUF_H */
326