1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2017 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29
30#include "rd.h"
31#include "rdbuf.h"
32#include "rdunittest.h"
33#include "rdlog.h"
34#include "rdcrc32.h"
35#include "crc32c.h"
36
37
38static size_t
39rd_buf_get_writable0 (rd_buf_t *rbuf, rd_segment_t **segp, void **p);
40
41
42/**
43 * @brief Destroy the segment and free its payload.
44 *
45 * @remark Will NOT unlink from buffer.
46 */
47static void rd_segment_destroy (rd_segment_t *seg) {
48 /* Free payload */
49 if (seg->seg_free && seg->seg_p)
50 seg->seg_free(seg->seg_p);
51
52 if (seg->seg_flags & RD_SEGMENT_F_FREE)
53 rd_free(seg);
54}
55
56/**
57 * @brief Initialize segment with absolute offset, backing memory pointer,
58 * and backing memory size.
59 * @remark The segment is NOT linked.
60 */
61static void rd_segment_init (rd_segment_t *seg, void *mem, size_t size) {
62 memset(seg, 0, sizeof(*seg));
63 seg->seg_p = mem;
64 seg->seg_size = size;
65}
66
67
68/**
69 * @brief Append segment to buffer
70 *
71 * @remark Will set the buffer position to the new \p seg if no existing wpos.
72 * @remark Will set the segment seg_absof to the current length of the buffer.
73 */
74static rd_segment_t *rd_buf_append_segment (rd_buf_t *rbuf, rd_segment_t *seg) {
75 TAILQ_INSERT_TAIL(&rbuf->rbuf_segments, seg, seg_link);
76 rbuf->rbuf_segment_cnt++;
77 seg->seg_absof = rbuf->rbuf_len;
78 rbuf->rbuf_len += seg->seg_of;
79 rbuf->rbuf_size += seg->seg_size;
80
81 /* Update writable position */
82 if (!rbuf->rbuf_wpos)
83 rbuf->rbuf_wpos = seg;
84 else
85 rd_buf_get_writable0(rbuf, NULL, NULL);
86
87 return seg;
88}
89
90
91
92
93/**
94 * @brief Attempt to allocate \p size bytes from the buffers extra buffers.
95 * @returns the allocated pointer which MUST NOT be freed, or NULL if
96 * not enough memory.
97 * @remark the returned pointer is memory-aligned to be safe.
98 */
99static void *extra_alloc (rd_buf_t *rbuf, size_t size) {
100 size_t of = RD_ROUNDUP(rbuf->rbuf_extra_len, 8); /* FIXME: 32-bit */
101 void *p;
102
103 if (of + size > rbuf->rbuf_extra_size)
104 return NULL;
105
106 p = rbuf->rbuf_extra + of; /* Aligned pointer */
107
108 rbuf->rbuf_extra_len = of + size;
109
110 return p;
111}
112
113
114
115/**
116 * @brief Get a pre-allocated segment if available, or allocate a new
117 * segment with the extra amount of \p size bytes allocated for payload.
118 *
119 * Will not append the segment to the buffer.
120 */
121static rd_segment_t *
122rd_buf_alloc_segment0 (rd_buf_t *rbuf, size_t size) {
123 rd_segment_t *seg;
124
125 /* See if there is enough room in the extra buffer for
126 * allocating the segment header and the buffer,
127 * or just the segment header, else fall back to malloc. */
128 if ((seg = extra_alloc(rbuf, sizeof(*seg) + size))) {
129 rd_segment_init(seg, size > 0 ? seg+1 : NULL, size);
130
131 } else if ((seg = extra_alloc(rbuf, sizeof(*seg)))) {
132 rd_segment_init(seg, size > 0 ? rd_malloc(size) : NULL, size);
133 if (size > 0)
134 seg->seg_free = rd_free;
135
136 } else if ((seg = rd_malloc(sizeof(*seg) + size))) {
137 rd_segment_init(seg, size > 0 ? seg+1 : NULL, size);
138 seg->seg_flags |= RD_SEGMENT_F_FREE;
139
140 } else
141 rd_assert(!*"segment allocation failure");
142
143 return seg;
144}
145
146/**
147 * @brief Allocate between \p min_size .. \p max_size of backing memory
148 * and add it as a new segment to the buffer.
149 *
150 * The buffer position is updated to point to the new segment.
151 *
152 * The segment will be over-allocated if permitted by max_size
153 * (max_size == 0 or max_size > min_size).
154 */
155static rd_segment_t *
156rd_buf_alloc_segment (rd_buf_t *rbuf, size_t min_size, size_t max_size) {
157 rd_segment_t *seg;
158
159 /* Over-allocate if allowed. */
160 if (min_size != max_size || max_size == 0)
161 max_size = RD_MAX(sizeof(*seg) * 4,
162 RD_MAX(min_size * 2,
163 rbuf->rbuf_size / 2));
164
165 seg = rd_buf_alloc_segment0(rbuf, max_size);
166
167 rd_buf_append_segment(rbuf, seg);
168
169 return seg;
170}
171
172
173/**
174 * @brief Ensures that \p size bytes will be available
175 * for writing and the position will be updated to point to the
176 * start of this contiguous block.
177 */
178void rd_buf_write_ensure_contig (rd_buf_t *rbuf, size_t size) {
179 rd_segment_t *seg = rbuf->rbuf_wpos;
180
181 if (seg) {
182 void *p;
183 size_t remains = rd_segment_write_remains(seg, &p);
184
185 if (remains >= size)
186 return; /* Existing segment has enough space. */
187
188 /* Future optimization:
189 * If existing segment has enough remaining space to warrant
190 * a split, do it, before allocating a new one. */
191 }
192
193 /* Allocate new segment */
194 rbuf->rbuf_wpos = rd_buf_alloc_segment(rbuf, size, size);
195}
196
197/**
198 * @brief Ensures that at least \p size bytes will be available for
199 * a future write.
200 *
201 * Typically used prior to a call to rd_buf_get_write_iov()
202 */
203void rd_buf_write_ensure (rd_buf_t *rbuf, size_t min_size, size_t max_size) {
204 size_t remains;
205 while ((remains = rd_buf_write_remains(rbuf)) < min_size)
206 rd_buf_alloc_segment(rbuf,
207 min_size - remains,
208 max_size ? max_size - remains : 0);
209}
210
211
212/**
213 * @returns the segment at absolute offset \p absof, or NULL if out of range.
214 *
215 * @remark \p hint is an optional segment where to start looking, such as
216 * the current write or read position.
217 */
218rd_segment_t *
219rd_buf_get_segment_at_offset (const rd_buf_t *rbuf, const rd_segment_t *hint,
220 size_t absof) {
221 const rd_segment_t *seg = hint;
222
223 if (unlikely(absof > rbuf->rbuf_len))
224 return NULL;
225
226 /* Only use current write position if possible and if it helps */
227 if (!seg || absof < seg->seg_absof)
228 seg = TAILQ_FIRST(&rbuf->rbuf_segments);
229
230 do {
231 if (absof >= seg->seg_absof &&
232 absof < seg->seg_absof + seg->seg_of) {
233 rd_dassert(seg->seg_absof <= rd_buf_len(rbuf));
234 return (rd_segment_t *)seg;
235 }
236 } while ((seg = TAILQ_NEXT(seg, seg_link)));
237
238 return NULL;
239}
240
241
242/**
243 * @brief Split segment \p seg at absolute offset \p absof, appending
244 * a new segment after \p seg with its memory pointing to the
245 * memory starting at \p absof.
246 * \p seg 's memory will be shorted to the \p absof.
247 *
248 * The new segment is NOT appended to the buffer.
249 *
250 * @warning MUST ONLY be used on the LAST segment
251 *
252 * @warning if a segment is inserted between these two splitted parts
253 * it is imperative that the later segment's absof is corrected.
254 *
255 * @remark The seg_free callback is retained on the original \p seg
256 * and is not copied to the new segment, but flags are copied.
257 */
258static rd_segment_t *rd_segment_split (rd_buf_t *rbuf, rd_segment_t *seg,
259 size_t absof) {
260 rd_segment_t *newseg;
261 size_t relof;
262
263 rd_assert(seg == rbuf->rbuf_wpos);
264 rd_assert(absof >= seg->seg_absof &&
265 absof <= seg->seg_absof + seg->seg_of);
266
267 relof = absof - seg->seg_absof;
268
269 newseg = rd_buf_alloc_segment0(rbuf, 0);
270
271 /* Add later part of split bytes to new segment */
272 newseg->seg_p = seg->seg_p+relof;
273 newseg->seg_of = seg->seg_of-relof;
274 newseg->seg_size = seg->seg_size-relof;
275 newseg->seg_absof = SIZE_MAX; /* Invalid */
276 newseg->seg_flags |= seg->seg_flags;
277
278 /* Remove earlier part of split bytes from previous segment */
279 seg->seg_of = relof;
280 seg->seg_size = relof;
281
282 /* newseg's length will be added to rbuf_len in append_segment(),
283 * so shave it off here from seg's perspective. */
284 rbuf->rbuf_len -= newseg->seg_of;
285 rbuf->rbuf_size -= newseg->seg_size;
286
287 return newseg;
288}
289
290
291
292
293/**
294 * @brief Unlink and destroy a segment, updating the \p rbuf
295 * with the decrease in length and capacity.
296 */
297static void rd_buf_destroy_segment (rd_buf_t *rbuf, rd_segment_t *seg) {
298 rd_assert(rbuf->rbuf_segment_cnt > 0 &&
299 rbuf->rbuf_len >= seg->seg_of &&
300 rbuf->rbuf_size >= seg->seg_size);
301
302 TAILQ_REMOVE(&rbuf->rbuf_segments, seg, seg_link);
303 rbuf->rbuf_segment_cnt--;
304 rbuf->rbuf_len -= seg->seg_of;
305 rbuf->rbuf_size -= seg->seg_size;
306 rd_dassert(rbuf->rbuf_len <= seg->seg_absof);
307 if (rbuf->rbuf_wpos == seg)
308 rbuf->rbuf_wpos = NULL;
309
310 rd_segment_destroy(seg);
311}
312
313
314/**
315 * @brief Free memory associated with the \p rbuf, but not the rbuf itself.
316 * Segments will be destroyed.
317 */
318void rd_buf_destroy (rd_buf_t *rbuf) {
319 rd_segment_t *seg, *tmp;
320
321#if ENABLE_DEVEL
322 /* FIXME */
323 if (rbuf->rbuf_len > 0 && 0) {
324 size_t overalloc = rbuf->rbuf_size - rbuf->rbuf_len;
325 float fill_grade = (float)rbuf->rbuf_len /
326 (float)rbuf->rbuf_size;
327
328 printf("fill grade: %.2f%% (%zu bytes over-allocated)\n",
329 fill_grade * 100.0f, overalloc);
330 }
331#endif
332
333
334 TAILQ_FOREACH_SAFE(seg, &rbuf->rbuf_segments, seg_link, tmp) {
335 rd_segment_destroy(seg);
336
337 }
338
339 if (rbuf->rbuf_extra)
340 rd_free(rbuf->rbuf_extra);
341}
342
343
344/**
345 * @brief Initialize buffer, pre-allocating \p fixed_seg_cnt segments
346 * where the first segment will have a \p buf_size of backing memory.
347 *
348 * The caller may rearrange the backing memory as it see fits.
349 */
350void rd_buf_init (rd_buf_t *rbuf, size_t fixed_seg_cnt, size_t buf_size) {
351 size_t totalloc = 0;
352
353 memset(rbuf, 0, sizeof(*rbuf));
354 TAILQ_INIT(&rbuf->rbuf_segments);
355
356 if (!fixed_seg_cnt) {
357 assert(!buf_size);
358 return;
359 }
360
361 /* Pre-allocate memory for a fixed set of segments that are known
362 * before-hand, to minimize the number of extra allocations
363 * needed for well-known layouts (such as headers, etc) */
364 totalloc += RD_ROUNDUP(sizeof(rd_segment_t), 8) * fixed_seg_cnt;
365
366 /* Pre-allocate extra space for the backing buffer. */
367 totalloc += buf_size;
368
369 rbuf->rbuf_extra_size = totalloc;
370 rbuf->rbuf_extra = rd_malloc(rbuf->rbuf_extra_size);
371}
372
373
374
375
376/**
377 * @brief Convenience writer iterator interface.
378 *
379 * After writing to \p p the caller must update the written length
380 * by calling rd_buf_write(rbuf, NULL, written_length)
381 *
382 * @returns the number of contiguous writable bytes in segment
383 * and sets \p *p to point to the start of the memory region.
384 */
385static size_t
386rd_buf_get_writable0 (rd_buf_t *rbuf, rd_segment_t **segp, void **p) {
387 rd_segment_t *seg;
388
389 for (seg = rbuf->rbuf_wpos ; seg ; seg = TAILQ_NEXT(seg, seg_link)) {
390 size_t len = rd_segment_write_remains(seg, p);
391
392 /* Even though the write offset hasn't changed we
393 * avoid future segment scans by adjusting the
394 * wpos here to the first writable segment. */
395 rbuf->rbuf_wpos = seg;
396 if (segp)
397 *segp = seg;
398
399 if (unlikely(len == 0))
400 continue;
401
402 /* Also adjust absof if the segment was allocated
403 * before the previous segment's memory was exhausted
404 * and thus now might have a lower absolute offset
405 * than the previos segment's now higher relative offset. */
406 if (seg->seg_of == 0 && seg->seg_absof < rbuf->rbuf_len)
407 seg->seg_absof = rbuf->rbuf_len;
408
409 return len;
410 }
411
412 return 0;
413}
414
415size_t rd_buf_get_writable (rd_buf_t *rbuf, void **p) {
416 rd_segment_t *seg;
417 return rd_buf_get_writable0(rbuf, &seg, p);
418}
419
420
421
422
423/**
424 * @brief Write \p payload of \p size bytes to current position
425 * in buffer. A new segment will be allocated and appended
426 * if needed.
427 *
428 * @returns the write position where payload was written (pre-write).
429 * Returning the pre-positition allows write_update() to later
430 * update the same location, effectively making write()s
431 * also a place-holder mechanism.
432 *
433 * @remark If \p payload is NULL only the write position is updated,
434 * in this mode it is required for the buffer to have enough
435 * memory for the NULL write (as it would otherwise cause
436 * uninitialized memory in any new segments allocated from this
437 * function).
438 */
439size_t rd_buf_write (rd_buf_t *rbuf, const void *payload, size_t size) {
440 size_t remains = size;
441 size_t initial_absof;
442 const char *psrc = (const char *)payload;
443
444 initial_absof = rbuf->rbuf_len;
445
446 /* Ensure enough space by pre-allocating segments. */
447 rd_buf_write_ensure(rbuf, size, 0);
448
449 while (remains > 0) {
450 void *p;
451 rd_segment_t *seg = NULL;
452 size_t segremains = rd_buf_get_writable0(rbuf, &seg, &p);
453 size_t wlen = RD_MIN(remains, segremains);
454
455 rd_dassert(seg == rbuf->rbuf_wpos);
456 rd_dassert(wlen > 0);
457 rd_dassert(seg->seg_p+seg->seg_of <= (char *)p &&
458 (char *)p < seg->seg_p+seg->seg_size);
459
460 if (payload) {
461 memcpy(p, psrc, wlen);
462 psrc += wlen;
463 }
464
465 seg->seg_of += wlen;
466 rbuf->rbuf_len += wlen;
467 remains -= wlen;
468 }
469
470 rd_assert(remains == 0);
471
472 return initial_absof;
473}
474
475
476
477/**
478 * @brief Write \p slice to \p rbuf
479 *
480 * @remark The slice position will be updated.
481 *
482 * @returns the number of bytes witten (always slice length)
483 */
484size_t rd_buf_write_slice (rd_buf_t *rbuf, rd_slice_t *slice) {
485 const void *p;
486 size_t rlen;
487 size_t sum = 0;
488
489 while ((rlen = rd_slice_reader(slice, &p))) {
490 size_t r;
491 r = rd_buf_write(rbuf, p, rlen);
492 rd_dassert(r != 0);
493 sum += r;
494 }
495
496 return sum;
497}
498
499
500
501/**
502 * @brief Write \p payload of \p size at absolute offset \p absof
503 * WITHOUT updating the total buffer length.
504 *
505 * This is used to update a previously written region, such
506 * as updating the header length.
507 *
508 * @returns the number of bytes written, which may be less than \p size
509 * if the update spans multiple segments.
510 */
511static size_t rd_segment_write_update (rd_segment_t *seg, size_t absof,
512 const void *payload, size_t size) {
513 size_t relof;
514 size_t wlen;
515
516 rd_dassert(absof >= seg->seg_absof);
517 relof = absof - seg->seg_absof;
518 rd_assert(relof <= seg->seg_of);
519 wlen = RD_MIN(size, seg->seg_of - relof);
520 rd_dassert(relof + wlen <= seg->seg_of);
521
522 memcpy(seg->seg_p+relof, payload, wlen);
523
524 return wlen;
525}
526
527
528
529/**
530 * @brief Write \p payload of \p size at absolute offset \p absof
531 * WITHOUT updating the total buffer length.
532 *
533 * This is used to update a previously written region, such
534 * as updating the header length.
535 */
536size_t rd_buf_write_update (rd_buf_t *rbuf, size_t absof,
537 const void *payload, size_t size) {
538 rd_segment_t *seg;
539 const char *psrc = (const char *)payload;
540 size_t of;
541
542 /* Find segment for offset */
543 seg = rd_buf_get_segment_at_offset(rbuf, rbuf->rbuf_wpos, absof);
544 rd_assert(seg && *"invalid absolute offset");
545
546 for (of = 0 ; of < size ; seg = TAILQ_NEXT(seg, seg_link)) {
547 rd_assert(seg->seg_absof <= rd_buf_len(rbuf));
548 size_t wlen = rd_segment_write_update(seg, absof+of,
549 psrc+of, size-of);
550 of += wlen;
551 }
552
553 rd_dassert(of == size);
554
555 return of;
556}
557
558
559
560/**
561 * @brief Push reference memory segment to current write position.
562 */
563void rd_buf_push (rd_buf_t *rbuf, const void *payload, size_t size,
564 void (*free_cb)(void *)) {
565 rd_segment_t *prevseg, *seg, *tailseg = NULL;
566
567 if ((prevseg = rbuf->rbuf_wpos) &&
568 rd_segment_write_remains(prevseg, NULL) > 0) {
569 /* If the current segment still has room in it split it
570 * and insert the pushed segment in the middle (below). */
571 tailseg = rd_segment_split(rbuf, prevseg,
572 prevseg->seg_absof +
573 prevseg->seg_of);
574 }
575
576 seg = rd_buf_alloc_segment0(rbuf, 0);
577 seg->seg_p = (char *)payload;
578 seg->seg_size = size;
579 seg->seg_of = size;
580 seg->seg_free = free_cb;
581 seg->seg_flags |= RD_SEGMENT_F_RDONLY;
582
583 rd_buf_append_segment(rbuf, seg);
584
585 if (tailseg)
586 rd_buf_append_segment(rbuf, tailseg);
587}
588
589
590
591
592
593
594
595/**
596 * @brief Do a write-seek, updating the write position to the given
597 * absolute \p absof.
598 *
599 * @warning Any sub-sequent segments will be destroyed.
600 *
601 * @returns -1 if the offset is out of bounds, else 0.
602 */
603int rd_buf_write_seek (rd_buf_t *rbuf, size_t absof) {
604 rd_segment_t *seg, *next;
605 size_t relof;
606
607 seg = rd_buf_get_segment_at_offset(rbuf, rbuf->rbuf_wpos, absof);
608 if (unlikely(!seg))
609 return -1;
610
611 relof = absof - seg->seg_absof;
612 if (unlikely(relof > seg->seg_of))
613 return -1;
614
615 /* Destroy sub-sequent segments in reverse order so that
616 * destroy_segment() length checks are correct.
617 * Will decrement rbuf_len et.al. */
618 for (next = TAILQ_LAST(&rbuf->rbuf_segments, rd_segment_head) ;
619 next != seg ; ) {
620 rd_segment_t *this = next;
621 next = TAILQ_PREV(this, rd_segment_head, seg_link);
622 rd_buf_destroy_segment(rbuf, this);
623 }
624
625 /* Update relative write offset */
626 seg->seg_of = relof;
627 rbuf->rbuf_wpos = seg;
628 rbuf->rbuf_len = seg->seg_absof + seg->seg_of;
629
630 rd_assert(rbuf->rbuf_len == absof);
631
632 return 0;
633}
634
635
636/**
637 * @brief Set up the iovecs in \p iovs (of size \p iov_max) with the writable
638 * segments from the buffer's current write position.
639 *
640 * @param iovcntp will be set to the number of populated \p iovs[]
641 * @param size_max limits the total number of bytes made available.
642 * Note: this value may be overshot with the size of one
643 * segment.
644 *
645 * @returns the total number of bytes in the represented segments.
646 *
647 * @remark the write position will NOT be updated.
648 */
649size_t rd_buf_get_write_iov (const rd_buf_t *rbuf,
650 struct iovec *iovs, size_t *iovcntp,
651 size_t iov_max, size_t size_max) {
652 const rd_segment_t *seg;
653 size_t iovcnt = 0;
654 size_t sum = 0;
655
656 for (seg = rbuf->rbuf_wpos ;
657 seg && iovcnt < iov_max && sum < size_max ;
658 seg = TAILQ_NEXT(seg, seg_link)) {
659 size_t len;
660 void *p;
661
662 len = rd_segment_write_remains(seg, &p);
663 if (unlikely(len == 0))
664 continue;
665
666 iovs[iovcnt].iov_base = p;
667 iovs[iovcnt++].iov_len = len;
668
669 sum += len;
670 }
671
672 *iovcntp = iovcnt;
673
674 return sum;
675}
676
677
678
679
680
681
682
683
684
685
686
687/**
688 * @name Slice reader interface
689 *
690 * @{
691 */
692
693/**
694 * @brief Initialize a new slice of \p size bytes starting at \p seg with
695 * relative offset \p rof.
696 *
697 * @returns 0 on success or -1 if there is not at least \p size bytes available
698 * in the buffer.
699 */
700int rd_slice_init_seg (rd_slice_t *slice, const rd_buf_t *rbuf,
701 const rd_segment_t *seg, size_t rof, size_t size) {
702 /* Verify that \p size bytes are indeed available in the buffer. */
703 if (unlikely(rbuf->rbuf_len < (seg->seg_absof + rof + size)))
704 return -1;
705
706 slice->buf = rbuf;
707 slice->seg = seg;
708 slice->rof = rof;
709 slice->start = seg->seg_absof + rof;
710 slice->end = slice->start + size;
711
712 rd_assert(seg->seg_absof+rof >= slice->start &&
713 seg->seg_absof+rof <= slice->end);
714
715 rd_assert(slice->end <= rd_buf_len(rbuf));
716
717 return 0;
718}
719
720/**
721 * @brief Initialize new slice of \p size bytes starting at offset \p absof
722 *
723 * @returns 0 on success or -1 if there is not at least \p size bytes available
724 * in the buffer.
725 */
726int rd_slice_init (rd_slice_t *slice, const rd_buf_t *rbuf,
727 size_t absof, size_t size) {
728 const rd_segment_t *seg = rd_buf_get_segment_at_offset(rbuf, NULL,
729 absof);
730 if (unlikely(!seg))
731 return -1;
732
733 return rd_slice_init_seg(slice, rbuf, seg,
734 absof - seg->seg_absof, size);
735}
736
737/**
738 * @brief Initialize new slice covering the full buffer \p rbuf
739 */
740void rd_slice_init_full (rd_slice_t *slice, const rd_buf_t *rbuf) {
741 int r = rd_slice_init(slice, rbuf, 0, rd_buf_len(rbuf));
742 rd_assert(r == 0);
743}
744
745
746
747/**
748 * @sa rd_slice_reader() rd_slice_peeker()
749 */
750size_t rd_slice_reader0 (rd_slice_t *slice, const void **p, int update_pos) {
751 size_t rof = slice->rof;
752 size_t rlen;
753 const rd_segment_t *seg;
754
755 /* Find segment with non-zero payload */
756 for (seg = slice->seg ;
757 seg && seg->seg_absof+rof < slice->end && seg->seg_of == rof ;
758 seg = TAILQ_NEXT(seg, seg_link))
759 rof = 0;
760
761 if (unlikely(!seg || seg->seg_absof+rof >= slice->end))
762 return 0;
763
764 rd_assert(seg->seg_absof+rof <= slice->end);
765
766
767 *p = (const void *)(seg->seg_p + rof);
768 rlen = RD_MIN(seg->seg_of - rof, rd_slice_remains(slice));
769
770 if (update_pos) {
771 if (slice->seg != seg) {
772 rd_assert(seg->seg_absof + rof >= slice->start &&
773 seg->seg_absof + rof+rlen <= slice->end);
774 slice->seg = seg;
775 slice->rof = rlen;
776 } else {
777 slice->rof += rlen;
778 }
779 }
780
781 return rlen;
782}
783
784
785/**
786 * @brief Convenience reader iterator interface.
787 *
788 * Call repeatedly from while loop until it returns 0.
789 *
790 * @param slice slice to read from, position will be updated.
791 * @param p will be set to the start of \p *rlenp contiguous bytes of memory
792 * @param rlenp will be set to the number of bytes available in \p p
793 *
794 * @returns the number of bytes read, or 0 if slice is empty.
795 */
796size_t rd_slice_reader (rd_slice_t *slice, const void **p) {
797 return rd_slice_reader0(slice, p, 1/*update_pos*/);
798}
799
800/**
801 * @brief Identical to rd_slice_reader() but does NOT update the read position
802 */
803size_t rd_slice_peeker (const rd_slice_t *slice, const void **p) {
804 return rd_slice_reader0((rd_slice_t *)slice, p, 0/*dont update_pos*/);
805}
806
807
808
809
810
811/**
812 * @brief Read \p size bytes from current read position,
813 * advancing the read offset by the number of bytes copied to \p dst.
814 *
815 * If there are less than \p size remaining in the buffer
816 * then 0 is returned and no bytes are copied.
817 *
818 * @returns \p size, or 0 if \p size bytes are not available in buffer.
819 *
820 * @remark This performs a complete read, no partitial reads.
821 *
822 * @remark If \p dst is NULL only the read position is updated.
823 */
824size_t rd_slice_read (rd_slice_t *slice, void *dst, size_t size) {
825 size_t remains = size;
826 char *d = (char *)dst; /* Possibly NULL */
827 size_t rlen;
828 const void *p;
829 size_t orig_end = slice->end;
830
831 if (unlikely(rd_slice_remains(slice) < size))
832 return 0;
833
834 /* Temporarily shrink slice to offset + \p size */
835 slice->end = rd_slice_abs_offset(slice) + size;
836
837 while ((rlen = rd_slice_reader(slice, &p))) {
838 rd_dassert(remains >= rlen);
839 if (dst) {
840 memcpy(d, p, rlen);
841 d += rlen;
842 }
843 remains -= rlen;
844 }
845
846 rd_dassert(remains == 0);
847
848 /* Restore original size */
849 slice->end = orig_end;
850
851 return size;
852}
853
854
855/**
856 * @brief Read \p size bytes from absolute slice offset \p offset
857 * and store in \p dst, without updating the slice read position.
858 *
859 * @returns \p size if the offset and size was within the slice, else 0.
860 */
861size_t rd_slice_peek (const rd_slice_t *slice, size_t offset,
862 void *dst, size_t size) {
863 rd_slice_t sub = *slice;
864
865 if (unlikely(rd_slice_seek(&sub, offset) == -1))
866 return 0;
867
868 return rd_slice_read(&sub, dst, size);
869
870}
871
872
873
874/**
875 * @returns a pointer to \p size contiguous bytes at the current read offset.
876 * If there isn't \p size contiguous bytes available NULL will
877 * be returned.
878 *
879 * @remark The read position is updated to point past \p size.
880 */
881const void *rd_slice_ensure_contig (rd_slice_t *slice, size_t size) {
882 void *p;
883
884 if (unlikely(rd_slice_remains(slice) < size ||
885 slice->rof + size > slice->seg->seg_of))
886 return NULL;
887
888 p = slice->seg->seg_p + slice->rof;
889
890 rd_slice_read(slice, NULL, size);
891
892 return p;
893}
894
895
896
897/**
898 * @brief Sets the slice's read position. The offset is the slice offset,
899 * not buffer offset.
900 *
901 * @returns 0 if offset was within range, else -1 in which case the position
902 * is not changed.
903 */
904int rd_slice_seek (rd_slice_t *slice, size_t offset) {
905 const rd_segment_t *seg;
906 size_t absof = slice->start + offset;
907
908 if (unlikely(absof >= slice->end))
909 return -1;
910
911 seg = rd_buf_get_segment_at_offset(slice->buf, slice->seg, absof);
912 rd_assert(seg);
913
914 slice->seg = seg;
915 slice->rof = absof - seg->seg_absof;
916 rd_assert(seg->seg_absof + slice->rof >= slice->start &&
917 seg->seg_absof + slice->rof <= slice->end);
918
919 return 0;
920}
921
922
923/**
924 * @brief Narrow the current slice to \p size, saving
925 * the original slice state info \p save_slice.
926 *
927 * Use rd_slice_widen() to restore the saved slice
928 * with the read count updated from the narrowed slice.
929 *
930 * This is useful for reading a sub-slice of a larger slice
931 * without having to pass the lesser length around.
932 *
933 * @returns 1 if enough underlying slice buffer memory is available, else 0.
934 */
935int rd_slice_narrow (rd_slice_t *slice, rd_slice_t *save_slice, size_t size) {
936 if (unlikely(slice->start + size > slice->end))
937 return 0;
938 *save_slice = *slice;
939 slice->end = slice->start + size;
940 rd_assert(rd_slice_abs_offset(slice) <= slice->end);
941 return 1;
942}
943
944/**
945 * @brief Same as rd_slice_narrow() but using a relative size \p relsize
946 * from the current read position.
947 */
948int rd_slice_narrow_relative (rd_slice_t *slice, rd_slice_t *save_slice,
949 size_t relsize) {
950 return rd_slice_narrow(slice, save_slice,
951 rd_slice_offset(slice) + relsize);
952}
953
954
955/**
956 * @brief Restore the original \p save_slice size from a previous call to
957 * rd_slice_narrow(), while keeping the updated read pointer from
958 * \p slice.
959 */
960void rd_slice_widen (rd_slice_t *slice, const rd_slice_t *save_slice) {
961 slice->end = save_slice->end;
962}
963
964
965/**
966 * @brief Copy the original slice \p orig to \p new_slice and adjust
967 * the new slice length to \p size.
968 *
969 * This is a side-effect free form of rd_slice_narrow() which is not to
970 * be used with rd_slice_widen().
971 *
972 * @returns 1 if enough underlying slice buffer memory is available, else 0.
973 */
974int rd_slice_narrow_copy (const rd_slice_t *orig, rd_slice_t *new_slice,
975 size_t size) {
976 if (unlikely(orig->start + size > orig->end))
977 return 0;
978 *new_slice = *orig;
979 new_slice->end = orig->start + size;
980 rd_assert(rd_slice_abs_offset(new_slice) <= new_slice->end);
981 return 1;
982}
983
984/**
985 * @brief Same as rd_slice_narrow_copy() but with a relative size from
986 * the current read position.
987 */
988int rd_slice_narrow_copy_relative (const rd_slice_t *orig,
989 rd_slice_t *new_slice,
990 size_t relsize) {
991 return rd_slice_narrow_copy(orig, new_slice,
992 rd_slice_offset(orig) + relsize);
993}
994
995
996
997
998
999/**
1000 * @brief Set up the iovec \p iovs (of size \p iov_max) with the readable
1001 * segments from the slice's current read position.
1002 *
1003 * @param iovcntp will be set to the number of populated \p iovs[]
1004 * @param size_max limits the total number of bytes made available.
1005 * Note: this value may be overshot with the size of one
1006 * segment.
1007 *
1008 * @returns the total number of bytes in the represented segments.
1009 *
1010 * @remark will NOT update the read position.
1011 */
1012size_t rd_slice_get_iov (const rd_slice_t *slice,
1013 struct iovec *iovs, size_t *iovcntp,
1014 size_t iov_max, size_t size_max) {
1015 const void *p;
1016 size_t rlen;
1017 size_t iovcnt = 0;
1018 size_t sum = 0;
1019 rd_slice_t copy = *slice; /* Use a copy of the slice so we dont
1020 * update the position for the caller. */
1021
1022 while (sum < size_max && iovcnt < iov_max &&
1023 (rlen = rd_slice_reader(&copy, &p))) {
1024 iovs[iovcnt].iov_base = (void *)p;
1025 iovs[iovcnt++].iov_len = rlen;
1026
1027 sum += rlen;
1028 }
1029
1030 *iovcntp = iovcnt;
1031
1032 return sum;
1033}
1034
1035
1036
1037
1038
1039/**
1040 * @brief CRC32 calculation of slice.
1041 *
1042 * @returns the calculated CRC
1043 *
1044 * @remark the slice's position is updated.
1045 */
1046uint32_t rd_slice_crc32 (rd_slice_t *slice) {
1047 rd_crc32_t crc;
1048 const void *p;
1049 size_t rlen;
1050
1051 crc = rd_crc32_init();
1052
1053 while ((rlen = rd_slice_reader(slice, &p)))
1054 crc = rd_crc32_update(crc, p, rlen);
1055
1056 return (uint32_t)rd_crc32_finalize(crc);
1057}
1058
1059/**
1060 * @brief Compute CRC-32C of segments starting at at buffer position \p absof,
1061 * also supporting the case where the position/offset is not at the
1062 * start of the first segment.
1063 *
1064 * @remark the slice's position is updated.
1065 */
1066uint32_t rd_slice_crc32c (rd_slice_t *slice) {
1067 const void *p;
1068 size_t rlen;
1069 uint32_t crc = 0;
1070
1071 while ((rlen = rd_slice_reader(slice, &p)))
1072 crc = crc32c(crc, (const char *)p, rlen);
1073
1074 return crc;
1075}
1076
1077
1078
1079
1080
1081/**
1082 * @name Debugging dumpers
1083 *
1084 *
1085 */
1086
1087static void rd_segment_dump (const rd_segment_t *seg, const char *ind,
1088 size_t relof, int do_hexdump) {
1089 fprintf(stderr,
1090 "%s((rd_segment_t *)%p): "
1091 "p %p, of %"PRIusz", "
1092 "absof %"PRIusz", size %"PRIusz", free %p, flags 0x%x\n",
1093 ind, seg, seg->seg_p, seg->seg_of,
1094 seg->seg_absof, seg->seg_size, seg->seg_free, seg->seg_flags);
1095 rd_assert(relof <= seg->seg_of);
1096 if (do_hexdump)
1097 rd_hexdump(stderr, "segment",
1098 seg->seg_p+relof, seg->seg_of-relof);
1099}
1100
1101void rd_buf_dump (const rd_buf_t *rbuf, int do_hexdump) {
1102 const rd_segment_t *seg;
1103
1104 fprintf(stderr,
1105 "((rd_buf_t *)%p):\n"
1106 " len %"PRIusz" size %"PRIusz
1107 ", %"PRIusz"/%"PRIusz" extra memory used\n",
1108 rbuf, rbuf->rbuf_len, rbuf->rbuf_size,
1109 rbuf->rbuf_extra_len, rbuf->rbuf_extra_size);
1110
1111 if (rbuf->rbuf_wpos) {
1112 fprintf(stderr, " wpos:\n");
1113 rd_segment_dump(rbuf->rbuf_wpos, " ", 0, 0);
1114 }
1115
1116 if (rbuf->rbuf_segment_cnt > 0) {
1117 size_t segcnt = 0;
1118
1119 fprintf(stderr, " %"PRIusz" linked segments:\n",
1120 rbuf->rbuf_segment_cnt);
1121 TAILQ_FOREACH(seg, &rbuf->rbuf_segments, seg_link) {
1122 rd_segment_dump(seg, " ", 0, do_hexdump);
1123 rd_assert(++segcnt <= rbuf->rbuf_segment_cnt);
1124 }
1125 }
1126}
1127
1128void rd_slice_dump (const rd_slice_t *slice, int do_hexdump) {
1129 const rd_segment_t *seg;
1130 size_t relof;
1131
1132 fprintf(stderr,
1133 "((rd_slice_t *)%p):\n"
1134 " buf %p (len %"PRIusz"), seg %p (absof %"PRIusz"), "
1135 "rof %"PRIusz", start %"PRIusz", end %"PRIusz", size %"PRIusz
1136 ", offset %"PRIusz"\n",
1137 slice, slice->buf, rd_buf_len(slice->buf),
1138 slice->seg, slice->seg ? slice->seg->seg_absof : 0,
1139 slice->rof, slice->start, slice->end,
1140 rd_slice_size(slice), rd_slice_offset(slice));
1141 relof = slice->rof;
1142
1143 for (seg = slice->seg ; seg ; seg = TAILQ_NEXT(seg, seg_link)) {
1144 rd_segment_dump(seg, " ", relof, do_hexdump);
1145 relof = 0;
1146 }
1147}
1148
1149
1150/**
1151 * @name Unit-tests
1152 *
1153 *
1154 *
1155 */
1156
1157
1158/**
1159 * @brief Basic write+read test
1160 */
1161static int do_unittest_write_read (void) {
1162 rd_buf_t b;
1163 char ones[1024];
1164 char twos[1024];
1165 char threes[1024];
1166 char fiftyfives[100]; /* 0x55 indicates "untouched" memory */
1167 char buf[1024*3];
1168 rd_slice_t slice;
1169 size_t r, pos;
1170
1171 memset(ones, 0x1, sizeof(ones));
1172 memset(twos, 0x2, sizeof(twos));
1173 memset(threes, 0x3, sizeof(threes));
1174 memset(fiftyfives, 0x55, sizeof(fiftyfives));
1175 memset(buf, 0x55, sizeof(buf));
1176
1177 rd_buf_init(&b, 2, 1000);
1178
1179 /*
1180 * Verify write
1181 */
1182 r = rd_buf_write(&b, ones, 200);
1183 RD_UT_ASSERT(r == 0, "write() returned position %"PRIusz, r);
1184 pos = rd_buf_write_pos(&b);
1185 RD_UT_ASSERT(pos == 200, "pos() returned position %"PRIusz, pos);
1186
1187 r = rd_buf_write(&b, twos, 800);
1188 RD_UT_ASSERT(pos == 200, "write() returned position %"PRIusz, r);
1189 pos = rd_buf_write_pos(&b);
1190 RD_UT_ASSERT(pos == 200+800, "pos() returned position %"PRIusz, pos);
1191
1192 /* Buffer grows here */
1193 r = rd_buf_write(&b, threes, 1);
1194 RD_UT_ASSERT(pos == 200+800,
1195 "write() returned position %"PRIusz, r);
1196 pos = rd_buf_write_pos(&b);
1197 RD_UT_ASSERT(pos == 200+800+1, "pos() returned position %"PRIusz, pos);
1198
1199 /*
1200 * Verify read
1201 */
1202 /* Get full slice. */
1203 rd_slice_init_full(&slice, &b);
1204
1205 r = rd_slice_read(&slice, buf, 200+800+2);
1206 RD_UT_ASSERT(r == 0,
1207 "read() > remaining should have failed, gave %"PRIusz, r);
1208 r = rd_slice_read(&slice, buf, 200+800+1);
1209 RD_UT_ASSERT(r == 200+800+1,
1210 "read() returned %"PRIusz" (%"PRIusz" remains)",
1211 r, rd_slice_remains(&slice));
1212
1213 RD_UT_ASSERT(!memcmp(buf, ones, 200), "verify ones");
1214 RD_UT_ASSERT(!memcmp(buf+200, twos, 800), "verify twos");
1215 RD_UT_ASSERT(!memcmp(buf+200+800, threes, 1), "verify threes");
1216 RD_UT_ASSERT(!memcmp(buf+200+800+1, fiftyfives, 100), "verify 55s");
1217
1218 rd_buf_destroy(&b);
1219
1220 RD_UT_PASS();
1221}
1222
1223
1224/**
1225 * @brief Helper read verifier, not a unit-test itself.
1226 */
1227#define do_unittest_read_verify(b,absof,len,verify) do { \
1228 int __fail = do_unittest_read_verify0(b,absof,len,verify); \
1229 RD_UT_ASSERT(!__fail, \
1230 "read_verify(absof=%"PRIusz",len=%"PRIusz") " \
1231 "failed", (size_t)absof, (size_t)len); \
1232 } while (0)
1233
1234static int
1235do_unittest_read_verify0 (const rd_buf_t *b, size_t absof, size_t len,
1236 const char *verify) {
1237 rd_slice_t slice, sub;
1238 char buf[1024];
1239 size_t half;
1240 size_t r;
1241 int i;
1242
1243 rd_assert(sizeof(buf) >= len);
1244
1245 /* Get reader slice */
1246 i = rd_slice_init(&slice, b, absof, len);
1247 RD_UT_ASSERT(i == 0, "slice_init() failed: %d", i);
1248
1249 r = rd_slice_read(&slice, buf, len);
1250 RD_UT_ASSERT(r == len,
1251 "read() returned %"PRIusz" expected %"PRIusz
1252 " (%"PRIusz" remains)",
1253 r, len, rd_slice_remains(&slice));
1254
1255 RD_UT_ASSERT(!memcmp(buf, verify, len), "verify");
1256
1257 r = rd_slice_offset(&slice);
1258 RD_UT_ASSERT(r == len, "offset() returned %"PRIusz", not %"PRIusz,
1259 r, len);
1260
1261 half = len / 2;
1262 i = rd_slice_seek(&slice, half);
1263 RD_UT_ASSERT(i == 0, "seek(%"PRIusz") returned %d", half, i);
1264 r = rd_slice_offset(&slice);
1265 RD_UT_ASSERT(r == half, "offset() returned %"PRIusz", not %"PRIusz,
1266 r, half);
1267
1268 /* Get a sub-slice covering the later half. */
1269 sub = rd_slice_pos(&slice);
1270 r = rd_slice_offset(&sub);
1271 RD_UT_ASSERT(r == 0, "sub: offset() returned %"PRIusz", not %"PRIusz,
1272 r, (size_t)0);
1273 r = rd_slice_size(&sub);
1274 RD_UT_ASSERT(r == half, "sub: size() returned %"PRIusz", not %"PRIusz,
1275 r, half);
1276 r = rd_slice_remains(&sub);
1277 RD_UT_ASSERT(r == half,
1278 "sub: remains() returned %"PRIusz", not %"PRIusz,
1279 r, half);
1280
1281 /* Read half */
1282 r = rd_slice_read(&sub, buf, half);
1283 RD_UT_ASSERT(r == half,
1284 "sub read() returned %"PRIusz" expected %"PRIusz
1285 " (%"PRIusz" remains)",
1286 r, len, rd_slice_remains(&sub));
1287
1288 RD_UT_ASSERT(!memcmp(buf, verify, len), "verify");
1289
1290 r = rd_slice_offset(&sub);
1291 RD_UT_ASSERT(r == rd_slice_size(&sub),
1292 "sub offset() returned %"PRIusz", not %"PRIusz,
1293 r, rd_slice_size(&sub));
1294 r = rd_slice_remains(&sub);
1295 RD_UT_ASSERT(r == 0,
1296 "sub: remains() returned %"PRIusz", not %"PRIusz,
1297 r, (size_t)0);
1298
1299 return 0;
1300}
1301
1302
1303/**
1304 * @brief write_seek() and split() test
1305 */
1306static int do_unittest_write_split_seek (void) {
1307 rd_buf_t b;
1308 char ones[1024];
1309 char twos[1024];
1310 char threes[1024];
1311 char fiftyfives[100]; /* 0x55 indicates "untouched" memory */
1312 char buf[1024*3];
1313 size_t r, pos;
1314 rd_segment_t *seg, *newseg;
1315
1316 memset(ones, 0x1, sizeof(ones));
1317 memset(twos, 0x2, sizeof(twos));
1318 memset(threes, 0x3, sizeof(threes));
1319 memset(fiftyfives, 0x55, sizeof(fiftyfives));
1320 memset(buf, 0x55, sizeof(buf));
1321
1322 rd_buf_init(&b, 0, 0);
1323
1324 /*
1325 * Verify write
1326 */
1327 r = rd_buf_write(&b, ones, 400);
1328 RD_UT_ASSERT(r == 0, "write() returned position %"PRIusz, r);
1329 pos = rd_buf_write_pos(&b);
1330 RD_UT_ASSERT(pos == 400, "pos() returned position %"PRIusz, pos);
1331
1332 do_unittest_read_verify(&b, 0, 400, ones);
1333
1334 /*
1335 * Seek and re-write
1336 */
1337 r = rd_buf_write_seek(&b, 200);
1338 RD_UT_ASSERT(r == 0, "seek() failed");
1339 pos = rd_buf_write_pos(&b);
1340 RD_UT_ASSERT(pos == 200, "pos() returned position %"PRIusz, pos);
1341
1342 r = rd_buf_write(&b, twos, 100);
1343 RD_UT_ASSERT(pos == 200, "write() returned position %"PRIusz, r);
1344 pos = rd_buf_write_pos(&b);
1345 RD_UT_ASSERT(pos == 200+100, "pos() returned position %"PRIusz, pos);
1346
1347 do_unittest_read_verify(&b, 0, 200, ones);
1348 do_unittest_read_verify(&b, 200, 100, twos);
1349
1350 /* Make sure read() did not modify the write position. */
1351 pos = rd_buf_write_pos(&b);
1352 RD_UT_ASSERT(pos == 200+100, "pos() returned position %"PRIusz, pos);
1353
1354 /* Split buffer, write position is now at split where writes
1355 * are not allowed (mid buffer). */
1356 seg = rd_buf_get_segment_at_offset(&b, NULL, 50);
1357 RD_UT_ASSERT(seg->seg_of != 0, "assumed mid-segment");
1358 newseg = rd_segment_split(&b, seg, 50);
1359 rd_buf_append_segment(&b, newseg);
1360 seg = rd_buf_get_segment_at_offset(&b, NULL, 50);
1361 RD_UT_ASSERT(seg != NULL, "seg");
1362 RD_UT_ASSERT(seg == newseg, "newseg %p, seg %p", newseg, seg);
1363 RD_UT_ASSERT(seg->seg_of > 0,
1364 "assumed beginning of segment, got %"PRIusz, seg->seg_of);
1365
1366 pos = rd_buf_write_pos(&b);
1367 RD_UT_ASSERT(pos == 200+100, "pos() returned position %"PRIusz, pos);
1368
1369 /* Re-verify that nothing changed */
1370 do_unittest_read_verify(&b, 0, 200, ones);
1371 do_unittest_read_verify(&b, 200, 100, twos);
1372
1373 /* Do a write seek at buffer boundary, sub-sequent buffers should
1374 * be destroyed. */
1375 r = rd_buf_write_seek(&b, 50);
1376 RD_UT_ASSERT(r == 0, "seek() failed");
1377 do_unittest_read_verify(&b, 0, 50, ones);
1378
1379 rd_buf_destroy(&b);
1380
1381 RD_UT_PASS();
1382}
1383
1384/**
1385 * @brief Unittest to verify payload is correctly written and read.
1386 * Each written u32 word is the running CRC of the word count.
1387 */
1388static int do_unittest_write_read_payload_correctness (void) {
1389 uint32_t crc;
1390 uint32_t write_crc, read_crc;
1391 const int seed = 12345;
1392 rd_buf_t b;
1393 const size_t max_cnt = 20000;
1394 rd_slice_t slice;
1395 size_t r;
1396 size_t i;
1397 int pass;
1398
1399 crc = rd_crc32_init();
1400 crc = rd_crc32_update(crc, (void *)&seed, sizeof(seed));
1401
1402 rd_buf_init(&b, 0, 0);
1403 for (i = 0 ; i < max_cnt ; i++) {
1404 crc = rd_crc32_update(crc, (void *)&i, sizeof(i));
1405 rd_buf_write(&b, &crc, sizeof(crc));
1406 }
1407
1408 write_crc = rd_crc32_finalize(crc);
1409
1410 r = rd_buf_len(&b);
1411 RD_UT_ASSERT(r == max_cnt * sizeof(crc),
1412 "expected length %"PRIusz", not %"PRIusz,
1413 r, max_cnt * sizeof(crc));
1414
1415 /*
1416 * Now verify the contents with a reader.
1417 */
1418 rd_slice_init_full(&slice, &b);
1419
1420 r = rd_slice_remains(&slice);
1421 RD_UT_ASSERT(r == rd_buf_len(&b),
1422 "slice remains %"PRIusz", should be %"PRIusz,
1423 r, rd_buf_len(&b));
1424
1425 for (pass = 0 ; pass < 2 ; pass++) {
1426 /* Two passes:
1427 * - pass 1: using peek()
1428 * - pass 2: using read()
1429 */
1430 const char *pass_str = pass == 0 ? "peek":"read";
1431
1432 crc = rd_crc32_init();
1433 crc = rd_crc32_update(crc, (void *)&seed, sizeof(seed));
1434
1435 for (i = 0 ; i < max_cnt ; i++) {
1436 uint32_t buf_crc;
1437
1438 crc = rd_crc32_update(crc, (void *)&i, sizeof(&i));
1439
1440 if (pass == 0)
1441 r = rd_slice_peek(&slice, i * sizeof(buf_crc),
1442 &buf_crc, sizeof(buf_crc));
1443 else
1444 r = rd_slice_read(&slice, &buf_crc,
1445 sizeof(buf_crc));
1446 RD_UT_ASSERT(r == sizeof(buf_crc),
1447 "%s() at #%"PRIusz" failed: "
1448 "r is %"PRIusz" not %"PRIusz,
1449 pass_str, i, r, sizeof(buf_crc));
1450 RD_UT_ASSERT(buf_crc == crc,
1451 "%s: invalid crc at #%"PRIusz
1452 ": expected %"PRIu32", read %"PRIu32,
1453 pass_str, i, crc, buf_crc);
1454 }
1455
1456 read_crc = rd_crc32_finalize(crc);
1457
1458 RD_UT_ASSERT(read_crc == write_crc,
1459 "%s: finalized read crc %"PRIu32
1460 " != write crc %"PRIu32,
1461 pass_str, read_crc, write_crc);
1462
1463 }
1464
1465 r = rd_slice_remains(&slice);
1466 RD_UT_ASSERT(r == 0,
1467 "slice remains %"PRIusz", should be %"PRIusz,
1468 r, (size_t)0);
1469
1470 rd_buf_destroy(&b);
1471
1472 RD_UT_PASS();
1473}
1474
1475#define do_unittest_iov_verify(...) do { \
1476 int __fail = do_unittest_iov_verify0(__VA_ARGS__); \
1477 RD_UT_ASSERT(!__fail, "iov_verify() failed"); \
1478 } while (0)
1479static int do_unittest_iov_verify0 (rd_buf_t *b,
1480 size_t exp_iovcnt, size_t exp_totsize) {
1481 #define MY_IOV_MAX 16
1482 struct iovec iov[MY_IOV_MAX];
1483 size_t iovcnt;
1484 size_t i;
1485 size_t totsize, sum;
1486
1487 rd_assert(exp_iovcnt <= MY_IOV_MAX);
1488
1489 totsize = rd_buf_get_write_iov(b, iov, &iovcnt, MY_IOV_MAX, exp_totsize);
1490 RD_UT_ASSERT(totsize >= exp_totsize,
1491 "iov total size %"PRIusz" expected >= %"PRIusz,
1492 totsize, exp_totsize);
1493 RD_UT_ASSERT(iovcnt >= exp_iovcnt && iovcnt <= MY_IOV_MAX,
1494 "iovcnt %"PRIusz
1495 ", expected %"PRIusz" < x <= MY_IOV_MAX",
1496 iovcnt, exp_iovcnt);
1497
1498 sum = 0;
1499 for (i = 0 ; i < iovcnt ; i++) {
1500 RD_UT_ASSERT(iov[i].iov_base,
1501 "iov #%"PRIusz" iov_base not set", i);
1502 RD_UT_ASSERT(iov[i].iov_len,
1503 "iov #%"PRIusz" iov_len %"PRIusz" out of range",
1504 i, iov[i].iov_len);
1505 sum += iov[i].iov_len;
1506 RD_UT_ASSERT(sum <= totsize, "sum %"PRIusz" > totsize %"PRIusz,
1507 sum, totsize);
1508 }
1509
1510 RD_UT_ASSERT(sum == totsize,
1511 "sum %"PRIusz" != totsize %"PRIusz,
1512 sum, totsize);
1513
1514 return 0;
1515}
1516
1517
1518/**
1519 * @brief Verify that buffer to iovec conversion works.
1520 */
1521static int do_unittest_write_iov (void) {
1522 rd_buf_t b;
1523
1524 rd_buf_init(&b, 0, 0);
1525 rd_buf_write_ensure(&b, 100, 100);
1526
1527 do_unittest_iov_verify(&b, 1, 100);
1528
1529 /* Add a secondary buffer */
1530 rd_buf_write_ensure(&b, 30000, 0);
1531
1532 do_unittest_iov_verify(&b, 2, 100+30000);
1533
1534
1535 rd_buf_destroy(&b);
1536
1537 RD_UT_PASS();
1538}
1539
1540
1541int unittest_rdbuf (void) {
1542 int fails = 0;
1543
1544 fails += do_unittest_write_read();
1545 fails += do_unittest_write_split_seek();
1546 fails += do_unittest_write_read_payload_correctness();
1547 fails += do_unittest_write_iov();
1548
1549 return fails;
1550}
1551