1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9/* stream
10 * ======
11 * Niels Nes
12 * An simple interface to streams
13 *
14 * Processing files, streams, and sockets is quite different on Linux
15 * and Windows platforms. To improve portability between both, we advise
16 * to replace the stdio actions with the stream functionality provided
17 * here.
18 *
19 * This interface can also be used to open 'non compressed, gzipped,
20 * bz2zipped' data files and sockets. Using this interface one could
21 * easily switch between the various underlying storage types.
22 *
23 * buffered streams
24 * ----------------
25 *
26 * The bstream (or buffered_stream) can be used for efficient reading of
27 * a stream. Reading can be done in large chunks and access can be done
28 * in smaller bits, by directly accessing the underlying buffer.
29 *
30 * Beware that a flush on a buffered stream emits an empty block to
31 * synchronize with the other side, telling it has reached the end of
32 * the sequence and can close its descriptors.
33 *
34 * bstream functions
35 * -----------------
36 *
37 * The bstream_create gets a read stream (rs) as input and the initial
38 * chunk size and creates a buffered stream from this. A spare byte is
39 * kept at the end of the buffer. The bstream_read will at least read
40 * the next 'size' bytes. If the not read data (aka pos < len) together
41 * with the new data will not fit in the current buffer it is resized.
42 * The spare byte is kept.
43 *
44 * tee streams
45 * -----------
46 *
47 * A tee stream is a write stream that duplicates all output to two
48 * write streams of the same type (txt/bin).
49 */
50
51
52#include "monetdb_config.h"
53#include "stream.h"
54#include "stream_socket.h"
55#include "matomic.h"
56
57#include <string.h>
58#include <stddef.h>
59
60#ifdef HAVE_SYS_TYPES_H
61# include <sys/types.h>
62#endif
63#ifdef HAVE_SYS_STAT_H
64# include <sys/stat.h>
65#endif
66#ifdef HAVE_UNISTD_H
67# include <unistd.h>
68#endif
69#ifdef HAVE_NETDB_H
70# include <netinet/in_systm.h>
71# include <netinet/in.h>
72# include <netinet/ip.h>
73# include <netinet/tcp.h>
74# include <netdb.h>
75#endif
76#ifdef HAVE_POLL_H
77#include <poll.h>
78#endif
79
80#ifdef NATIVE_WIN32
81#include <io.h>
82#endif
83#ifdef HAVE_FCNTL_H
84#include <fcntl.h>
85#endif
86#ifdef HAVE_LIBZ
87#include <zlib.h>
88#endif
89#ifdef HAVE_LIBBZ2
90#include <bzlib.h>
91#endif
92#ifdef HAVE_LIBLZMA
93#include <lzma.h>
94#endif
95#ifdef HAVE_LIBSNAPPY
96#include <snappy-c.h>
97#endif
98#ifdef HAVE_LIBLZ4
99#include <lz4.h>
100#include <lz4frame.h>
101#endif
102
103#ifdef HAVE_ICONV
104#ifdef HAVE_ICONV_H
105#include <iconv.h>
106#endif
107#ifdef HAVE_LANGINFO_H
108#include <langinfo.h>
109#endif
110#endif
111
112#ifndef SHUT_RD
113#define SHUT_RD 0
114#define SHUT_WR 1
115#define SHUT_RDWR 2
116#endif
117
118#ifndef EWOULDBLOCK
119#define EWOULDBLOCK EAGAIN
120#endif
121
122#ifndef EINTR
123#define EINTR EAGAIN
124#endif
125
126#ifndef INVALID_SOCKET
127#define INVALID_SOCKET (-1)
128#endif
129
130#ifdef NATIVE_WIN32
131#define pclose _pclose
132#define fileno(fd) _fileno(fd)
133#endif
134
135#define UTF8BOM "\xEF\xBB\xBF" /* UTF-8 encoding of Unicode BOM */
136#define UTF8BOMLENGTH 3 /* length of above */
137
138#ifdef _MSC_VER
139/* use intrinsic functions on Windows */
140#define short_int_SWAP(s) ((int16_t) _byteswap_ushort((uint16_t) (s)))
141/* on Windows, long is the same size as int */
142#define normal_int_SWAP(i) ((int) _byteswap_ulong((unsigned long) (i)))
143#define long_int_SWAP(l) ((int64_t) _byteswap_uint64((unsigned __int64) (l)))
144#else
145#define short_int_SWAP(s) \
146 ((int16_t) (((0x00ff & (uint16_t) (s)) << 8) | \
147 ((0xff00 & (uint16_t) (s)) >> 8)))
148
149#define normal_int_SWAP(i) \
150 ((int) (((((unsigned) 0xff << 0) & (unsigned) (i)) << 24) | \
151 ((((unsigned) 0xff << 8) & (unsigned) (i)) << 8) | \
152 ((((unsigned) 0xff << 16) & (unsigned) (i)) >> 8) | \
153 ((((unsigned) 0xff << 24) & (unsigned) (i)) >> 24)))
154
155#define long_int_SWAP(l) \
156 ((int64_t) (((((uint64_t) 0xff << 0) & (uint64_t) (l)) << 56) | \
157 ((((uint64_t) 0xff << 8) & (uint64_t) (l)) << 40) | \
158 ((((uint64_t) 0xff << 16) & (uint64_t) (l)) << 24) | \
159 ((((uint64_t) 0xff << 24) & (uint64_t) (l)) << 8) | \
160 ((((uint64_t) 0xff << 32) & (uint64_t) (l)) >> 8) | \
161 ((((uint64_t) 0xff << 40) & (uint64_t) (l)) >> 24) | \
162 ((((uint64_t) 0xff << 48) & (uint64_t) (l)) >> 40) | \
163 ((((uint64_t) 0xff << 56) & (uint64_t) (l)) >> 56)))
164#endif
165
166#ifdef HAVE_HGE
167#define huge_int_SWAP(h) \
168 ((hge) (((((uhge) 0xff << 0) & (uhge) (h)) << 120) | \
169 ((((uhge) 0xff << 8) & (uhge) (h)) << 104) | \
170 ((((uhge) 0xff << 16) & (uhge) (h)) << 88) | \
171 ((((uhge) 0xff << 24) & (uhge) (h)) << 72) | \
172 ((((uhge) 0xff << 32) & (uhge) (h)) << 56) | \
173 ((((uhge) 0xff << 40) & (uhge) (h)) << 40) | \
174 ((((uhge) 0xff << 48) & (uhge) (h)) << 24) | \
175 ((((uhge) 0xff << 56) & (uhge) (h)) << 8) | \
176 ((((uhge) 0xff << 64) & (uhge) (h)) >> 8) | \
177 ((((uhge) 0xff << 72) & (uhge) (h)) >> 24) | \
178 ((((uhge) 0xff << 80) & (uhge) (h)) >> 40) | \
179 ((((uhge) 0xff << 88) & (uhge) (h)) >> 56) | \
180 ((((uhge) 0xff << 96) & (uhge) (h)) >> 72) | \
181 ((((uhge) 0xff << 104) & (uhge) (h)) >> 88) | \
182 ((((uhge) 0xff << 112) & (uhge) (h)) >> 104) | \
183 ((((uhge) 0xff << 120) & (uhge) (h)) >> 120)))
184#endif
185
186
187struct stream {
188 char *name; /* name of the stream */
189 bool swapbytes; /* whether to swap bytes */
190 bool readonly; /* only reading or only writing */
191 bool isutf8; /* known to be UTF-8 due to BOM */
192 bool binary; /* text/binary */
193 unsigned int timeout; /* timeout in ms */
194 bool (*timeout_func)(void); /* callback function: NULL/true -> return */
195 union {
196 void *p;
197 int i;
198 SOCKET s;
199 } stream_data;
200 int errnr;
201 ssize_t (*read)(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt);
202 ssize_t (*write)(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt);
203 void (*close)(stream *s);
204 void (*clrerr)(stream *s);
205 char *(*error)(stream *s);
206 void (*destroy)(stream *s);
207 int (*flush)(stream *s);
208 int (*fsync)(stream *s);
209 int (*fgetpos)(stream *restrict s, fpos_t *restrict p);
210 int (*fsetpos)(stream *restrict s, fpos_t *restrict p);
211 void (*update_timeout)(stream *s);
212 int (*isalive)(stream *s);
213};
214
215int
216mnstr_init(void)
217{
218 static ATOMIC_FLAG inited = ATOMIC_FLAG_INIT;
219
220 if (ATOMIC_TAS(&inited))
221 return 0;
222
223#ifdef NATIVE_WIN32
224 {
225 WSADATA w;
226
227 if (WSAStartup(0x0101, &w) != 0)
228 return -1;
229 }
230#endif
231 return 0;
232}
233
234/* #define STREAM_DEBUG 1 */
235/* #define BSTREAM_DEBUG 1 */
236
237#ifdef HAVE__WFOPEN
238/* convert a string from UTF-8 to wide characters; the return value is
239 * freshly allocated */
240static wchar_t *
241utf8towchar(const char *src)
242{
243 wchar_t *dest;
244 size_t i = 0;
245 size_t j = 0;
246 uint32_t c;
247
248 /* count how many wchar_t's we need, while also checking for
249 * correctness of the input */
250 while (src[j]) {
251 i++;
252 if ((src[j+0] & 0x80) == 0) {
253 j += 1;
254 } else if ((src[j+0] & 0xE0) == 0xC0
255 && (src[j+1] & 0xC0) == 0x80
256 && (src[j+0] & 0x1E) != 0) {
257 j += 2;
258 } else if ((src[j+0] & 0xF0) == 0xE0
259 && (src[j+1] & 0xC0) == 0x80
260 && (src[j+2] & 0xC0) == 0x80
261 && ((src[j+0] & 0x0F) != 0
262 || (src[j+1] & 0x20) != 0)) {
263 j += 3;
264 } else if ((src[j+0] & 0xF8) == 0xF0
265 && (src[j+1] & 0xC0) == 0x80
266 && (src[j+2] & 0xC0) == 0x80
267 && (src[j+3] & 0xC0) == 0x80) {
268 c = (src[j+0] & 0x07) << 18
269 | (src[j+1] & 0x3F) << 12
270 | (src[j+2] & 0x3F) << 6
271 | (src[j+3] & 0x3F);
272 if (c < 0x10000
273 || c > 0x10FFFF
274 || (c & 0x1FF800) == 0x00D800)
275 return NULL;
276#if SIZEOF_WCHAR_T == 2
277 i++;
278#endif
279 j += 4;
280 } else {
281 return NULL;
282 }
283 }
284 dest = malloc((i + 1) * sizeof(wchar_t));
285 if (dest == NULL)
286 return NULL;
287 /* go through the source string again, this time we can skip
288 * the correctness tests */
289 i = j = 0;
290 while (src[j]) {
291 if ((src[j+0] & 0x80) == 0) {
292 dest[i++] = src[j+0];
293 j += 1;
294 } else if ((src[j+0] & 0xE0) == 0xC0) {
295 dest[i++] = (src[j+0] & 0x1F) << 6
296 | (src[j+1] & 0x3F);
297 j += 2;
298 } else if ((src[j+0] & 0xF0) == 0xE0) {
299 dest[i++] = (src[j+0] & 0x0F) << 12
300 | (src[j+1] & 0x3F) << 6
301 | (src[j+2] & 0x3F);
302 j += 3;
303 } else if ((src[j+0] & 0xF8) == 0xF0) {
304 c = (src[j+0] & 0x07) << 18
305 | (src[j+1] & 0x3F) << 12
306 | (src[j+2] & 0x3F) << 6
307 | (src[j+3] & 0x3F);
308#if SIZEOF_WCHAR_T == 2
309 dest[i++] = 0xD800 | ((c - 0x10000) >> 10);
310 dest[i++] = 0xDE00 | (c & 0x3FF);
311#else
312 dest[i++] = c;
313#endif
314 j += 4;
315 }
316 }
317 dest[i] = 0;
318 return dest;
319}
320#else
321static char *
322cvfilename(const char *filename)
323{
324#if defined(HAVE_NL_LANGINFO) && defined(HAVE_ICONV)
325 char *code_set = nl_langinfo(CODESET);
326
327 if (code_set != NULL && strcmp(code_set, "UTF-8") != 0) {
328 iconv_t cd = iconv_open("UTF-8", code_set);
329
330 if (cd != (iconv_t) -1) {
331 size_t len = strlen(filename);
332 size_t size = 4 * len;
333 ICONV_CONST char *from = (ICONV_CONST char *) filename;
334 char *r = malloc(size + 1);
335 char *p = r;
336
337 if (r) {
338 if (iconv(cd, &from, &len, &p, &size) != (size_t) -1) {
339 iconv_close(cd);
340 *p = 0;
341 return r;
342 }
343 free(r);
344 }
345 iconv_close(cd);
346 }
347 }
348#endif
349 /* couldn't use iconv for whatever reason; alternative is to
350 * use utf8towchar above to convert to a wide character string
351 * (wcs) and convert that to the locale-specific encoding
352 * using wcstombs or wcsrtombs (but preferably only if the
353 * locale's encoding is not UTF-8) */
354 return strdup(filename);
355}
356#endif
357
358/* Read at most cnt elements of size elmsize from the stream. Returns
359 * the number of elements actually read or < 0 on failure. */
360ssize_t
361mnstr_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
362{
363 if (s == NULL || buf == NULL)
364 return -1;
365#ifdef STREAM_DEBUG
366 fprintf(stderr, "read %s %zu %zu\n",
367 s->name ? s->name : "<unnamed>", elmsize, cnt);
368#endif
369 assert(s->readonly);
370 if (s->errnr)
371 return -1;
372 return s->read(s, buf, elmsize, cnt);
373}
374
375/* Read one line (seperated by \n) of at most maxcnt-1 characters from
376 * the stream. Returns the number of characters actually read,
377 * includes the trailing \n; terminated by a NULL byte. */
378ssize_t
379mnstr_readline(stream *restrict s, void *restrict buf, size_t maxcnt)
380{
381 char *b = buf, *start = buf;
382
383 if (s == NULL || buf == NULL)
384 return -1;
385#ifdef STREAM_DEBUG
386 fprintf(stderr, "readline %s %zu\n",
387 s->name ? s->name : "<unnamed>", maxcnt);
388#endif
389 assert(s->readonly);
390 if (s->errnr)
391 return -1;
392 if (maxcnt == 0)
393 return 0;
394 if (maxcnt == 1) {
395 *start = 0;
396 return 0;
397 }
398 for (;;) {
399 switch (s->read(s, start, 1, 1)) {
400 case 1:
401 /* successfully read a character,
402 * check whether it is the line
403 * separator and whether we have space
404 * left for more */
405 if (*start++ == '\n' || --maxcnt == 1) {
406 *start = 0;
407#if 0
408 if (!s->binary &&
409 start[-1] == '\n' &&
410 start > b + 1 &&
411 start[-2] == '\r') {
412 /* convert CR-LF to just LF */
413 start[-2] = start[-1];
414 *--start = 0;
415 }
416#endif
417 return (ssize_t) (start - b);
418 }
419 break;
420 case -1:
421 /* error: if we didn't read anything yet,
422 * return the error, otherwise return what we
423 * have */
424 if (start == b)
425 return -1;
426 /* fall through */
427 case 0:
428 /* end of file: return what we have */
429 *start = 0;
430 return (ssize_t) (start - b);
431 }
432 }
433}
434
435/* Write cnt elements of size elmsize to the stream. Returns the
436 * number of elements actually written. If elmsize or cnt equals zero,
437 * returns cnt. */
438ssize_t
439mnstr_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
440{
441 if (s == NULL || buf == NULL)
442 return -1;
443#ifdef STREAM_DEBUG
444 fprintf(stderr, "write %s %zu %zu\n",
445 s->name ? s->name : "<unnamed>", elmsize, cnt);
446#endif
447 assert(!s->readonly);
448 if (s->errnr)
449 return -1;
450 return s->write(s, buf, elmsize, cnt);
451}
452
453void
454mnstr_settimeout(stream *s, unsigned int ms, bool (*func)(void))
455{
456 if (s) {
457 s->timeout = ms;
458 s->timeout_func = func;
459 if (s->update_timeout)
460 s->update_timeout(s);
461 }
462}
463
464void
465mnstr_close(stream *s)
466{
467 if (s) {
468#ifdef STREAM_DEBUG
469 fprintf(stderr, "close %s\n", s->name ? s->name : "<unnamed>");
470#endif
471 s->close(s);
472 }
473}
474
475void
476mnstr_destroy(stream *s)
477{
478 if (s) {
479#ifdef STREAM_DEBUG
480 fprintf(stderr, "destroy %s\n",
481 s->name ? s->name : "<unnamed>");
482#endif
483 s->destroy(s);
484 }
485}
486
487char *
488mnstr_error(stream *s)
489{
490 if (s == NULL)
491 return "Connection terminated";
492 return s->error(s);
493}
494
495/* flush buffer, return 0 on success, non-zero on failure */
496int
497mnstr_flush(stream *s)
498{
499 if (s == NULL)
500 return -1;
501#ifdef STREAM_DEBUG
502 fprintf(stderr, "flush %s\n", s->name ? s->name : "<unnamed>");
503#endif
504 assert(!s->readonly);
505 if (s->errnr)
506 return -1;
507 if (s->flush)
508 return s->flush(s);
509 return 0;
510}
511
512/* sync file to disk, return 0 on success, non-zero on failure */
513int
514mnstr_fsync(stream *s)
515{
516 if (s == NULL)
517 return -1;
518#ifdef STREAM_DEBUG
519 fprintf(stderr, "fsync %s (%d)\n",
520 s->name ? s->name : "<unnamed>", s->errnr);
521#endif
522 assert(!s->readonly);
523 if (s->errnr)
524 return -1;
525 if (s->fsync)
526 return s->fsync(s);
527 return 0;
528}
529
530int
531mnstr_fgetpos(stream *restrict s, fpos_t *restrict p)
532{
533 if (s == NULL || p == NULL)
534 return -1;
535#ifdef STREAM_DEBUG
536 fprintf(stderr, "fgetpos %s\n", s->name ? s->name : "<unnamed>");
537#endif
538 if (s->errnr)
539 return -1;
540 if (s->fgetpos)
541 return s->fgetpos(s, p);
542 return 0;
543}
544
545int
546mnstr_fsetpos(stream *restrict s, fpos_t *restrict p)
547{
548 if (s == NULL)
549 return -1;
550#ifdef STREAM_DEBUG
551 fprintf(stderr, "fsetpos %s\n", s->name ? s->name : "<unnamed>");
552#endif
553 if (s->errnr)
554 return -1;
555 if (s->fsetpos)
556 return s->fsetpos(s, p);
557 return 0;
558}
559
560int
561mnstr_isalive(stream *s)
562{
563 if (s == NULL)
564 return 0;
565 if (s->errnr)
566 return -1;
567 if (s->isalive)
568 return s->isalive(s);
569 return 1;
570}
571
572char *
573mnstr_name(stream *s)
574{
575 if (s == NULL)
576 return "connection terminated";
577 return s->name;
578}
579
580int
581mnstr_errnr(stream *s)
582{
583 if (s == NULL)
584 return MNSTR_READ_ERROR;
585 return s->errnr;
586}
587
588void
589mnstr_clearerr(stream *s)
590{
591 if (s != NULL) {
592 s->errnr = MNSTR_NO__ERROR;
593 if (s->clrerr)
594 s->clrerr(s);
595 }
596}
597
598bool
599mnstr_isbinary(stream *s)
600{
601 if (s == NULL)
602 return false;
603 return s->binary;
604}
605
606bool
607mnstr_get_swapbytes(stream *s)
608{
609 if (s == NULL)
610 return 0;
611 return s->swapbytes;
612}
613
614/* set stream to big-endian/little-endian byte order; the default is
615 * native byte order */
616void
617mnstr_set_bigendian(stream *s, bool bigendian)
618{
619 if (s == NULL)
620 return;
621#ifdef STREAM_DEBUG
622 fprintf(stderr, "mnstr_set_bigendian %s %s\n",
623 s->name ? s->name : "<unnamed>",
624 swapbytes ? "true" : "false");
625#endif
626 assert(s->readonly);
627 s->binary = true;
628#ifdef WORDS_BIGENDIAN
629 s->swapbytes = !bigendian;
630#else
631 s->swapbytes = bigendian;
632#endif
633}
634
635
636void
637close_stream(stream *s)
638{
639 if (s) {
640 if (s->close)
641 s->close(s);
642 if (s->destroy)
643 s->destroy(s);
644 }
645}
646
647static const char *
648get_extension(const char *file)
649{
650 char *ext_start;
651
652 return (ext_start = strrchr(file, '.')) != NULL ? ext_start + 1 : "";
653}
654
655static void
656destroy(stream *s)
657{
658 if (s->name)
659 free(s->name);
660 free(s);
661}
662
663static char *
664error(stream *s)
665{
666 char buf[128];
667
668 switch (s->errnr) {
669 case MNSTR_OPEN_ERROR:
670 snprintf(buf, sizeof(buf), "error could not open file %.100s\n",
671 s->name);
672 return strdup(buf);
673 case MNSTR_READ_ERROR:
674 snprintf(buf, sizeof(buf), "error reading file %.100s\n",
675 s->name);
676 return strdup(buf);
677 case MNSTR_WRITE_ERROR:
678 snprintf(buf, sizeof(buf), "error writing file %.100s\n",
679 s->name);
680 return strdup(buf);
681 case MNSTR_TIMEOUT:
682 snprintf(buf, sizeof(buf), "timeout on %.100s\n", s->name);
683 return strdup(buf);
684 }
685 return strdup("Unknown error");
686}
687
688static stream *
689create_stream(const char *name)
690{
691 stream *s;
692
693 if (name == NULL)
694 return NULL;
695 if ((s = (stream *) malloc(sizeof(*s))) == NULL)
696 return NULL;
697 *s = (stream) {
698 .swapbytes = false,
699 .readonly = true,
700 .isutf8 = false, /* not known for sure */
701 .binary = false,
702 .name = strdup(name),
703 .errnr = MNSTR_NO__ERROR,
704 .error = error,
705 .destroy = destroy,
706 };
707 if(s->name == NULL) {
708 free(s);
709 return NULL;
710 }
711#ifdef STREAM_DEBUG
712 fprintf(stderr, "create_stream %s -> %p\n",
713 name ? name : "<unnamed>", s);
714#endif
715 return s;
716}
717
718/* ------------------------------------------------------------------ */
719/* streams working on a disk file */
720
721static ssize_t
722file_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
723{
724 FILE *fp = (FILE *) s->stream_data.p;
725 size_t rc = 0;
726
727 if (fp == NULL) {
728 s->errnr = MNSTR_READ_ERROR;
729 return -1;
730 }
731
732 if (elmsize && cnt && !feof(fp)) {
733 if (ferror(fp) ||
734 ((rc = fread(buf, elmsize, cnt, fp)) == 0 && ferror(fp))) {
735 s->errnr = MNSTR_READ_ERROR;
736 return -1;
737 }
738 }
739 return (ssize_t) rc;
740}
741
742static ssize_t
743file_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
744{
745 FILE *fp = (FILE *) s->stream_data.p;
746
747 if (fp == NULL) {
748 s->errnr = MNSTR_WRITE_ERROR;
749 return -1;
750 }
751
752 if (elmsize && cnt) {
753 size_t rc = fwrite(buf, elmsize, cnt, fp);
754
755 if (ferror(fp)) {
756 s->errnr = MNSTR_WRITE_ERROR;
757 return -1;
758 }
759 return (ssize_t) rc;
760 }
761 return (ssize_t) cnt;
762}
763
764static void
765file_close(stream *s)
766{
767 FILE *fp = (FILE *) s->stream_data.p;
768
769 if (fp == NULL)
770 return;
771 if (fp != stdin && fp != stdout && fp != stderr) {
772 if (s->name && *s->name == '|')
773 pclose(fp);
774 else
775 fclose(fp);
776 } else if (!s->readonly)
777 fflush(fp);
778 s->stream_data.p = NULL;
779}
780
781static void
782file_destroy(stream *s)
783{
784 file_close(s);
785 destroy(s);
786}
787
788
789static void
790file_clrerr(stream *s)
791{
792 FILE *fp = (FILE *) s->stream_data.p;
793
794 if (fp)
795 clearerr(fp);
796}
797
798static int
799file_flush(stream *s)
800{
801 FILE *fp = (FILE *) s->stream_data.p;
802
803 if (fp == NULL || (!s->readonly && fflush(fp) < 0)) {
804 s->errnr = MNSTR_WRITE_ERROR;
805 return -1;
806 }
807 return 0;
808}
809
810static int
811file_fsync(stream *s)
812{
813
814 FILE *fp = (FILE *) s->stream_data.p;
815
816 if (fp == NULL ||
817 (!s->readonly
818#ifdef NATIVE_WIN32
819 && _commit(fileno(fp)) < 0
820#else
821#ifdef HAVE_FDATASYNC
822 && fdatasync(fileno(fp)) < 0
823#else
824#ifdef HAVE_FSYNC
825 && fsync(fileno(fp)) < 0
826#endif
827#endif
828#endif
829 )) {
830 s->errnr = MNSTR_WRITE_ERROR;
831 return -1;
832 }
833 return 0;
834}
835
836static int
837file_fgetpos(stream *restrict s, fpos_t *restrict p)
838{
839 FILE *fp = (FILE *) s->stream_data.p;
840
841 if (fp == NULL || p == NULL)
842 return -1;
843 return fgetpos(fp, p) ? -1 : 0;
844}
845
846static int
847file_fsetpos(stream *restrict s, fpos_t *restrict p)
848{
849 FILE *fp = (FILE *) s->stream_data.p;
850
851 if (fp == NULL || p == NULL)
852 return -1;
853 return fsetpos(fp, p) ? -1 : 0;
854}
855
856static stream *
857open_stream(const char *restrict filename, const char *restrict flags)
858{
859 stream *s;
860 FILE *fp;
861 fpos_t pos;
862 char buf[UTF8BOMLENGTH + 1];
863
864 if ((s = create_stream(filename)) == NULL)
865 return NULL;
866#ifdef HAVE__WFOPEN
867 {
868 wchar_t *wfname = utf8towchar(filename);
869 wchar_t *wflags = utf8towchar(flags);
870 if (wfname != NULL && wflags != NULL)
871 fp = _wfopen(wfname, wflags);
872 else
873 fp = NULL;
874 if (wfname)
875 free(wfname);
876 if (wflags)
877 free(wflags);
878 }
879#else
880 {
881 char *fname = cvfilename(filename);
882 if (fname) {
883 fp = fopen(fname, flags);
884 free(fname);
885 } else
886 fp = NULL;
887 }
888#endif
889 if (fp == NULL) {
890 destroy(s);
891 return NULL;
892 }
893 s->read = file_read;
894 s->write = file_write;
895 s->close = file_close;
896 s->destroy = file_destroy;
897 s->clrerr = file_clrerr;
898 s->flush = file_flush;
899 s->fsync = file_fsync;
900 s->fgetpos = file_fgetpos;
901 s->fsetpos = file_fsetpos;
902 s->stream_data.p = (void *) fp;
903 /* if a text file is opened for reading, and it starts with
904 * the UTF-8 encoding of the Unicode Byte Order Mark, skip the
905 * mark, and mark the stream as being a UTF-8 stream */
906 if (flags[0] == 'r' && flags[1] != 'b' && fgetpos(fp, &pos) == 0) {
907 if (file_read(s, buf, 1, UTF8BOMLENGTH) == UTF8BOMLENGTH &&
908 strncmp(buf, UTF8BOM, UTF8BOMLENGTH) == 0)
909 s->isutf8 = true;
910 else if (fsetpos(fp, &pos) != 0) {
911 /* unlikely: we couldn't seek the file back */
912 fclose(fp);
913 destroy(s);
914 return NULL;
915 }
916 }
917 return s;
918}
919
920/* ------------------------------------------------------------------ */
921/* streams working on a gzip-compressed disk file */
922
923#ifdef HAVE_LIBZ
924#if ZLIB_VERNUM < 0x1290
925typedef size_t z_size_t;
926
927/* simplistic version for ancient systems (CentOS 6, Ubuntu Trusty) */
928static z_size_t
929gzfread(void *buf, z_size_t size, z_size_t nitems, gzFile file)
930{
931 unsigned sz = nitems * size > (size_t) 1 << 30 ? 1 << 30 : (unsigned) (nitems * size);
932 int len;
933
934 len = gzread(file, buf, sz);
935 if (len == -1)
936 return 0;
937 return (z_size_t) len / size;
938}
939
940static z_size_t
941gzfwrite(const void *buf, z_size_t size, z_size_t nitems, gzFile file)
942{
943 z_size_t sz = nitems * size;
944
945 while (sz > 0) {
946 unsigned len = sz > ((z_size_t) 1 << 30) ? 1 << 30 : (unsigned) sz;
947 int wlen;
948
949 wlen = gzwrite(file, buf, len);
950 if (wlen <= 0)
951 return 0;
952 buf = (const void *) ((const char *) buf + wlen);
953 sz -= (z_size_t) wlen;
954 }
955 return nitems;
956}
957#endif
958
959static ssize_t
960stream_gzread(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
961{
962 gzFile fp = (gzFile) s->stream_data.p;
963 z_size_t size;
964
965 if (fp == NULL) {
966 s->errnr = MNSTR_READ_ERROR;
967 return -1;
968 }
969
970 if (elmsize == 0 || cnt == 0)
971 return 0;
972
973 size = gzfread(buf, elmsize, cnt, fp);
974 /* when in text mode, convert \r\n line endings to \n */
975 if (!s->binary) {
976 char *p1, *p2, *pe;
977
978 p1 = buf;
979 pe = p1 + size;
980 while (p1 < pe && *p1 != '\r')
981 p1++;
982 p2 = p1;
983 while (p1 < pe) {
984 if (*p1 == '\r' && p1[1] == '\n')
985 size--;
986 else
987 *p2++ = *p1;
988 p1++;
989 }
990 }
991
992 return size == 0 ? -1 : (ssize_t) size;
993}
994
995static ssize_t
996stream_gzwrite(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
997{
998 gzFile fp = (gzFile) s->stream_data.p;
999 z_size_t size;
1000
1001 if (fp == NULL) {
1002 s->errnr = MNSTR_WRITE_ERROR;
1003 return -1;
1004 }
1005
1006 if (elmsize == 0 || cnt == 0)
1007 return 0;
1008
1009 size = gzfwrite(buf, elmsize, cnt, fp);
1010 return size == 0 ? -1 : (ssize_t) size;
1011}
1012
1013static int
1014stream_gzflush(stream *s)
1015{
1016 if (s->stream_data.p == NULL)
1017 return -1;
1018 if (!s->readonly &&
1019 gzflush((gzFile) s->stream_data.p, Z_SYNC_FLUSH) != Z_OK)
1020 return -1;
1021 return 0;
1022}
1023
1024static void
1025stream_gzclose(stream *s)
1026{
1027 stream_gzflush(s);
1028 if (s->stream_data.p)
1029 gzclose((gzFile) s->stream_data.p);
1030 s->stream_data.p = NULL;
1031}
1032
1033static stream *
1034open_gzstream(const char *restrict filename, const char *restrict flags)
1035{
1036 stream *s;
1037 gzFile fp;
1038
1039 if ((s = create_stream(filename)) == NULL)
1040 return NULL;
1041#ifdef HAVE__WFOPEN
1042 {
1043 wchar_t *wfname = utf8towchar(filename);
1044 if (wfname != NULL) {
1045 fp = gzopen_w(wfname, flags);
1046 free(wfname);
1047 } else
1048 fp = NULL;
1049 }
1050#else
1051 {
1052 char *fname = cvfilename(filename);
1053 if (fname) {
1054 fp = gzopen(fname, flags);
1055 free(fname);
1056 } else
1057 fp = NULL;
1058 }
1059#endif
1060 if (fp == NULL) {
1061 destroy(s);
1062 return NULL;
1063 }
1064 s->read = stream_gzread;
1065 s->write = stream_gzwrite;
1066 s->close = stream_gzclose;
1067 s->flush = stream_gzflush;
1068 s->stream_data.p = (void *) fp;
1069 if (flags[0] == 'r' && flags[1] != 'b') {
1070 char buf[UTF8BOMLENGTH];
1071 if (gzread(fp, buf, UTF8BOMLENGTH) == UTF8BOMLENGTH &&
1072 strncmp(buf, UTF8BOM, UTF8BOMLENGTH) == 0) {
1073 s->isutf8 = true;
1074 } else {
1075 gzrewind(fp);
1076 }
1077 }
1078 return s;
1079}
1080
1081static stream *
1082open_gzrstream(const char *filename)
1083{
1084 stream *s;
1085
1086 if ((s = open_gzstream(filename, "rb")) == NULL)
1087 return NULL;
1088 s->binary = true;
1089 return s;
1090}
1091
1092static stream *
1093open_gzwstream(const char *restrict filename, const char *restrict mode)
1094{
1095 stream *s;
1096
1097 if ((s = open_gzstream(filename, mode)) == NULL)
1098 return NULL;
1099 s->readonly = false;
1100 s->binary = true;
1101 return s;
1102}
1103
1104static stream *
1105open_gzrastream(const char *filename)
1106{
1107 stream *s;
1108
1109 if ((s = open_gzstream(filename, "r")) == NULL)
1110 return NULL;
1111 s->binary = false;
1112 return s;
1113}
1114
1115static stream *
1116open_gzwastream(const char *restrict filename, const char *restrict mode)
1117{
1118 stream *s;
1119
1120 if ((s = open_gzstream(filename, mode)) == NULL)
1121 return NULL;
1122 s->readonly = false;
1123 s->binary = false;
1124 return s;
1125}
1126#else
1127#define open_gzrstream(filename) NULL
1128#define open_gzwstream(filename, mode) NULL
1129#define open_gzrastream(filename) NULL
1130#define open_gzwastream(filename, mode) NULL
1131#endif
1132
1133/* ------------------------------------------------------------------ */
1134/* streams working on a bzip2-compressed disk file */
1135
1136#ifdef HAVE_LIBBZ2
1137struct bz {
1138 BZFILE *b;
1139 FILE *f;
1140};
1141
1142static void
1143stream_bzclose(stream *s)
1144{
1145 int err = BZ_OK;
1146
1147 if (s->stream_data.p) {
1148 if (s->readonly)
1149 BZ2_bzReadClose(&err, ((struct bz *) s->stream_data.p)->b);
1150 else
1151 BZ2_bzWriteClose(&err, ((struct bz *) s->stream_data.p)->b, 0, NULL, NULL);
1152 fclose(((struct bz *) s->stream_data.p)->f);
1153 free(s->stream_data.p);
1154 }
1155 s->stream_data.p = NULL;
1156}
1157
1158static ssize_t
1159stream_bzread(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
1160{
1161 size_t size = elmsize * cnt;
1162 int err;
1163 void *punused;
1164 int nunused;
1165 char unused[BZ_MAX_UNUSED];
1166 struct bz *bzp = s->stream_data.p;
1167
1168 if (bzp == NULL) {
1169 s->errnr = MNSTR_READ_ERROR;
1170 return -1;
1171 }
1172 if (size == 0)
1173 return 0;
1174 size = (size_t) BZ2_bzRead(&err, bzp->b, buf, size > ((size_t) 1 << 30) ? 1 << 30 : (int) size);
1175 if (err == BZ_STREAM_END) {
1176 /* end of stream, but not necessarily end of file: get
1177 * unused bits, close stream, and open again with the
1178 * saved unused bits */
1179 BZ2_bzReadGetUnused(&err, bzp->b, &punused, &nunused);
1180 if (err == BZ_OK && (nunused > 0 || !feof(bzp->f))) {
1181 if (nunused > 0)
1182 memcpy(unused, punused, nunused);
1183 BZ2_bzReadClose(&err, bzp->b);
1184 bzp->b = BZ2_bzReadOpen(&err, bzp->f, 0, 0, unused, nunused);
1185 } else {
1186 stream_bzclose(s);
1187 }
1188 }
1189 if (err != BZ_OK) {
1190 s->errnr = MNSTR_READ_ERROR;
1191 return -1;
1192 }
1193 /* when in text mode, convert \r\n line endings to \n */
1194 if (!s->binary) {
1195 char *p1, *p2, *pe;
1196
1197 p1 = buf;
1198 pe = p1 + size;
1199 while (p1 < pe && *p1 != '\r')
1200 p1++;
1201 p2 = p1;
1202 while (p1 < pe) {
1203 if (*p1 == '\r' && p1[1] == '\n')
1204 size--;
1205 else
1206 *p2++ = *p1;
1207 p1++;
1208 }
1209 }
1210 return (ssize_t) (size / elmsize);
1211}
1212
1213static ssize_t
1214stream_bzwrite(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
1215{
1216 size_t size = elmsize * cnt;
1217 int err;
1218 struct bz *bzp = s->stream_data.p;
1219
1220 if (bzp == NULL) {
1221 s->errnr = MNSTR_WRITE_ERROR;
1222 return -1;
1223 }
1224 if (size == 0)
1225 return 0;
1226 while (size > 0) {
1227 int sz = size > (1 << 30) ? 1 << 30 : (int) size;
1228 BZ2_bzWrite(&err, bzp->b, (void *) buf, sz);
1229 if (err != BZ_OK) {
1230 stream_bzclose(s);
1231 s->errnr = MNSTR_WRITE_ERROR;
1232 return -1;
1233 }
1234 size -= (size_t) sz;
1235 }
1236 return (ssize_t) cnt;
1237}
1238
1239static stream *
1240open_bzstream(const char *restrict filename, const char *restrict flags)
1241{
1242 stream *s;
1243 int err;
1244 struct bz *bzp;
1245 char fl[3];
1246
1247 if ((bzp = malloc(sizeof(struct bz))) == NULL)
1248 return NULL;
1249 if ((s = create_stream(filename)) == NULL) {
1250 free(bzp);
1251 return NULL;
1252 }
1253 *bzp = (struct bz) {0};
1254 fl[0] = flags[0]; /* 'r' or 'w' */
1255 fl[1] = 'b'; /* always binary */
1256 fl[2] = '\0';
1257#ifdef HAVE__WFOPEN
1258 {
1259 wchar_t *wfname = utf8towchar(filename);
1260 wchar_t *wflags = utf8towchar(fl);
1261 if (wfname != NULL && wflags != NULL)
1262 bzp->f = _wfopen(wfname, wflags);
1263 else
1264 bzp->f = NULL;
1265 if (wfname)
1266 free(wfname);
1267 if (wflags)
1268 free(wflags);
1269 }
1270#else
1271 {
1272 char *fname = cvfilename(filename);
1273 if (fname) {
1274 bzp->f = fopen(fname, fl);
1275 free(fname);
1276 } else
1277 bzp->f = NULL;
1278 }
1279#endif
1280 if (bzp->f == NULL) {
1281 destroy(s);
1282 free(bzp);
1283 return NULL;
1284 }
1285 s->read = stream_bzread;
1286 s->write = stream_bzwrite;
1287 s->close = stream_bzclose;
1288 s->flush = NULL;
1289 s->stream_data.p = (void *) bzp;
1290 if (flags[0] == 'r' && flags[1] != 'b') {
1291 s->readonly = true;
1292 bzp->b = BZ2_bzReadOpen(&err, bzp->f, 0, 0, NULL, 0);
1293 if (err == BZ_STREAM_END) {
1294 BZ2_bzReadClose(&err, bzp->b);
1295 bzp->b = NULL;
1296 } else {
1297 char buf[UTF8BOMLENGTH];
1298
1299 if (stream_bzread(s, buf, 1, UTF8BOMLENGTH) == UTF8BOMLENGTH &&
1300 strncmp(buf, UTF8BOM, UTF8BOMLENGTH) == 0) {
1301 s->isutf8 = true;
1302 } else if (s->stream_data.p) {
1303 bzp = s->stream_data.p;
1304 BZ2_bzReadClose(&err, bzp->b);
1305 rewind(bzp->f);
1306 bzp->b = BZ2_bzReadOpen(&err, bzp->f, 0, 0, NULL, 0);
1307 }
1308 }
1309 } else if (flags[0] == 'r') {
1310 bzp->b = BZ2_bzReadOpen(&err, bzp->f, 0, 0, NULL, 0);
1311 s->readonly = true;
1312 } else {
1313 bzp->b = BZ2_bzWriteOpen(&err, bzp->f, 9, 0, 30);
1314 s->readonly = false;
1315 }
1316 if (err != BZ_OK) {
1317 stream_bzclose(s);
1318 destroy(s);
1319 return NULL;
1320 }
1321 return s;
1322}
1323
1324static stream *
1325open_bzrstream(const char *filename)
1326{
1327 stream *s;
1328
1329 if ((s = open_bzstream(filename, "rb")) == NULL)
1330 return NULL;
1331 s->binary = true;
1332 return s;
1333}
1334
1335static stream *
1336open_bzwstream(const char *restrict filename, const char *restrict mode)
1337{
1338 stream *s;
1339
1340 if ((s = open_bzstream(filename, mode)) == NULL)
1341 return NULL;
1342 s->readonly = false;
1343 s->binary = true;
1344 return s;
1345}
1346
1347static stream *
1348open_bzrastream(const char *filename)
1349{
1350 stream *s;
1351
1352 if ((s = open_bzstream(filename, "r")) == NULL)
1353 return NULL;
1354 s->binary = false;
1355 return s;
1356}
1357
1358static stream *
1359open_bzwastream(const char *restrict filename, const char *restrict mode)
1360{
1361 stream *s;
1362
1363 if ((s = open_bzstream(filename, mode)) == NULL)
1364 return NULL;
1365 s->readonly = false;
1366 s->binary = false;
1367 return s;
1368}
1369#else
1370#define open_bzrstream(filename) NULL
1371#define open_bzwstream(filename, mode) NULL
1372#define open_bzrastream(filename) NULL
1373#define open_bzwastream(filename, mode) NULL
1374#endif
1375
1376/* ------------------------------------------------------------------ */
1377/* streams working on a lzma-compressed disk file */
1378
1379#ifdef HAVE_LIBLZMA
1380#define XZBUFSIZ 64*1024
1381typedef struct xz_stream {
1382 FILE *fp;
1383 lzma_stream strm;
1384 size_t todo;
1385 uint8_t buf[XZBUFSIZ];
1386} xz_stream;
1387
1388static ssize_t
1389stream_xzread(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
1390{
1391 xz_stream *xz = s->stream_data.p;
1392 size_t size = elmsize * cnt, origsize = size, ressize = 0;
1393 uint8_t *outbuf = buf;
1394 lzma_action action = LZMA_RUN;
1395
1396 if (xz == NULL) {
1397 s->errnr = MNSTR_READ_ERROR;
1398 return -1;
1399 }
1400
1401 xz->strm.next_in = xz->buf;
1402 xz->strm.avail_in = xz->todo;
1403 xz->strm.next_out = outbuf;
1404 xz->strm.avail_out = size;
1405 while (size && (xz->strm.avail_in || !feof(xz->fp))) {
1406 lzma_ret ret;
1407 size_t sz = (size > XZBUFSIZ) ? XZBUFSIZ : size;
1408
1409 if (xz->strm.avail_in == 0 &&
1410 (xz->strm.avail_in = fread(xz->buf, 1, sz, xz->fp)) == 0) {
1411 s->errnr = MNSTR_READ_ERROR;
1412 return -1;
1413 }
1414 xz->strm.next_in = xz->buf;
1415 if (feof(xz->fp))
1416 action = LZMA_FINISH;
1417 ret = lzma_code(&xz->strm, action);
1418 if (xz->strm.avail_out == 0 || ret == LZMA_STREAM_END) {
1419 origsize -= xz->strm.avail_out; /* remaining space */
1420 xz->todo = xz->strm.avail_in;
1421 if (xz->todo > 0)
1422 memmove(xz->buf, xz->strm.next_in, xz->todo);
1423 ressize = origsize;
1424 break;
1425 }
1426 if (ret != LZMA_OK) {
1427 s->errnr = MNSTR_READ_ERROR;
1428 return -1;
1429 }
1430 }
1431 if (ressize) {
1432 /* when in text mode, convert \r\n line endings to
1433 * \n */
1434 if (!s->binary) {
1435 char *p1, *p2, *pe;
1436
1437 p1 = buf;
1438 pe = p1 + ressize;
1439 while (p1 < pe && *p1 != '\r')
1440 p1++;
1441 p2 = p1;
1442 while (p1 < pe) {
1443 if (*p1 == '\r' && p1[1] == '\n')
1444 ressize--;
1445 else
1446 *p2++ = *p1;
1447 p1++;
1448 }
1449 }
1450 return (ssize_t) (ressize / elmsize);
1451 }
1452 return 0;
1453}
1454
1455static ssize_t
1456stream_xzwrite(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
1457{
1458 xz_stream *xz = s->stream_data.p;
1459 size_t size = elmsize * cnt;
1460 lzma_action action = LZMA_RUN;
1461
1462 if (xz == NULL) {
1463 s->errnr = MNSTR_WRITE_ERROR;
1464 return -1;
1465 }
1466
1467 xz->strm.next_in = buf;
1468 xz->strm.avail_in = size;
1469 xz->strm.next_out = xz->buf;
1470 xz->strm.avail_out = XZBUFSIZ;
1471
1472 size = 0;
1473 while (xz->strm.avail_in) {
1474 size_t sz = 0, isz = xz->strm.avail_in;
1475
1476 lzma_ret ret = lzma_code(&xz->strm, action);
1477 if (xz->strm.avail_out == 0 || ret != LZMA_OK) {
1478 s->errnr = MNSTR_WRITE_ERROR;
1479 return -1;
1480 }
1481 sz = XZBUFSIZ - xz->strm.avail_out;
1482 if (fwrite(xz->buf, 1, sz, xz->fp) != sz) {
1483 s->errnr = MNSTR_WRITE_ERROR;
1484 return -1;
1485 }
1486 assert(xz->strm.avail_in == 0);
1487 size += isz;
1488 xz->strm.next_out = xz->buf;
1489 xz->strm.avail_out = XZBUFSIZ;
1490 }
1491 if (size)
1492 return (ssize_t) (size / elmsize);
1493 return (ssize_t) cnt;
1494}
1495
1496static void
1497stream_xzclose(stream *s)
1498{
1499 xz_stream *xz = s->stream_data.p;
1500
1501 if (xz) {
1502 if (!s->readonly) {
1503 lzma_ret ret = lzma_code(&xz->strm, LZMA_FINISH);
1504
1505 if (xz->strm.avail_out && ret == LZMA_STREAM_END) {
1506 size_t sz = XZBUFSIZ - xz->strm.avail_out;
1507 if (fwrite(xz->buf, 1, sz, xz->fp) != sz)
1508 s->errnr = MNSTR_WRITE_ERROR;
1509 }
1510 fflush(xz->fp);
1511 }
1512 fclose(xz->fp);
1513 lzma_end(&xz->strm);
1514 free(xz);
1515 }
1516 s->stream_data.p = NULL;
1517}
1518
1519static int
1520stream_xzflush(stream *s)
1521{
1522 xz_stream *xz = s->stream_data.p;
1523
1524 if (xz == NULL)
1525 return -1;
1526 if (!s->readonly && fflush(xz->fp))
1527 return -1;
1528 return 0;
1529}
1530
1531static stream *
1532open_xzstream(const char *restrict filename, const char *restrict flags)
1533{
1534 stream *s;
1535 xz_stream *xz;
1536 uint32_t preset = 0;
1537 char fl[3];
1538
1539 if ((xz = calloc(1, sizeof(struct xz_stream))) == NULL)
1540 return NULL;
1541 if (((flags[0] == 'r' &&
1542 lzma_stream_decoder(&xz->strm, UINT64_MAX, LZMA_CONCATENATED) != LZMA_OK)) ||
1543 (flags[0] == 'w' &&
1544 lzma_easy_encoder(&xz->strm, preset, LZMA_CHECK_CRC64) != LZMA_OK)) {
1545 free(xz);
1546 return NULL;
1547 }
1548 if ((s = create_stream(filename)) == NULL) {
1549 free(xz);
1550 return NULL;
1551 }
1552 fl[0] = flags[0]; /* 'r' or 'w' */
1553 fl[1] = 'b'; /* always binary */
1554 fl[2] = '\0';
1555#ifdef HAVE__WFOPEN
1556 {
1557 wchar_t *wfname = utf8towchar(filename);
1558 wchar_t *wflags = utf8towchar(fl);
1559 if (wfname != NULL)
1560 xz->fp = _wfopen(wfname, wflags);
1561 else
1562 xz->fp = NULL;
1563 if (wfname)
1564 free(wfname);
1565 if (wflags)
1566 free(wflags);
1567 }
1568#else
1569 {
1570 char *fname = cvfilename(filename);
1571 if (fname) {
1572 xz->fp = fopen(fname, fl);
1573 free(fname);
1574 } else
1575 xz->fp = NULL;
1576 }
1577#endif
1578 if (xz->fp == NULL) {
1579 destroy(s);
1580 free(xz);
1581 return NULL;
1582 }
1583 s->read = stream_xzread;
1584 s->write = stream_xzwrite;
1585 s->close = stream_xzclose;
1586 s->flush = stream_xzflush;
1587 s->stream_data.p = (void *) xz;
1588 if (flags[0] == 'r' && flags[1] != 'b') {
1589 char buf[UTF8BOMLENGTH];
1590 if (stream_xzread(s, buf, 1, UTF8BOMLENGTH) == UTF8BOMLENGTH &&
1591 strncmp(buf, UTF8BOM, UTF8BOMLENGTH) == 0) {
1592 s->isutf8 = true;
1593 } else {
1594 lzma_end(&xz->strm);
1595 if (lzma_stream_decoder(&xz->strm, UINT64_MAX, LZMA_CONCATENATED) != LZMA_OK
1596 || fseek (xz->fp, 0L, SEEK_SET) < 0) {
1597 fclose(xz->fp);
1598 free(xz);
1599 destroy(s);
1600 return NULL;
1601 }
1602 xz->todo = 0;
1603 }
1604 }
1605 return s;
1606}
1607
1608static stream *
1609open_xzrstream(const char *filename)
1610{
1611 stream *s;
1612
1613 if ((s = open_xzstream(filename, "rb")) == NULL)
1614 return NULL;
1615 s->binary = true;
1616 return s;
1617}
1618
1619static stream *
1620open_xzwstream(const char *restrict filename, const char *restrict mode)
1621{
1622 stream *s;
1623
1624 if ((s = open_xzstream(filename, mode)) == NULL)
1625 return NULL;
1626 s->readonly = false;
1627 s->binary = true;
1628 return s;
1629}
1630
1631static stream *
1632open_xzrastream(const char *filename)
1633{
1634 stream *s;
1635
1636 if ((s = open_xzstream(filename, "r")) == NULL)
1637 return NULL;
1638 s->binary = false;
1639 return s;
1640}
1641
1642static stream *
1643open_xzwastream(const char *restrict filename, const char *restrict mode)
1644{
1645 stream *s;
1646
1647 if ((s = open_xzstream(filename, mode)) == NULL)
1648 return NULL;
1649 s->readonly = false;
1650 s->binary = false;
1651 return s;
1652}
1653#else
1654#define open_xzrstream(filename) NULL
1655#define open_xzwstream(filename, mode) NULL
1656#define open_xzrastream(filename) NULL
1657#define open_xzwastream(filename, mode) NULL
1658#endif
1659
1660/* ------------------------------------------------------------------ */
1661/* streams working on a lz4-compressed disk file */
1662
1663#ifdef HAVE_LIBLZ4
1664#define LZ4DECOMPBUFSIZ 128*1024
1665typedef struct lz4_stream {
1666 FILE *fp;
1667 size_t total_processing;
1668 size_t ring_buffer_size;
1669 void* ring_buffer;
1670 union {
1671 LZ4F_compressionContext_t comp_context;
1672 LZ4F_decompressionContext_t dec_context;
1673 } context;
1674} lz4_stream;
1675
1676static ssize_t
1677stream_lz4read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
1678{
1679 lz4_stream *lz4 = s->stream_data.p;
1680 size_t size = elmsize * cnt, total_read = 0, total_decompressed, ret, remaining_to_decompress;
1681
1682 if (lz4 == NULL || size <= 0) {
1683 s->errnr = MNSTR_READ_ERROR;
1684 return -1;
1685 }
1686
1687 while (total_read < size) {
1688 if (lz4->total_processing == lz4->ring_buffer_size) {
1689 if(feof(lz4->fp)) {
1690 break;
1691 } else {
1692 lz4->ring_buffer_size = fread(lz4->ring_buffer, 1, LZ4_COMPRESSBOUND(LZ4DECOMPBUFSIZ), lz4->fp);
1693 if (lz4->ring_buffer_size == 0 || ferror(lz4->fp)) {
1694 s->errnr = MNSTR_READ_ERROR;
1695 return -1;
1696 }
1697 lz4->total_processing = 0;
1698 }
1699 }
1700
1701 remaining_to_decompress = size - total_read;
1702 total_decompressed = lz4->ring_buffer_size - lz4->total_processing;
1703 ret = LZ4F_decompress(lz4->context.dec_context, (char*)buf + total_read, &remaining_to_decompress,
1704 (char*)lz4->ring_buffer + lz4->total_processing, &total_decompressed, NULL);
1705 if(LZ4F_isError(ret)) {
1706 s->errnr = MNSTR_WRITE_ERROR;
1707 return -1;
1708 }
1709
1710 lz4->total_processing += total_decompressed;
1711 total_read += remaining_to_decompress;
1712 }
1713
1714 /* when in text mode, convert \r\n line endings to \n */
1715 if (!s->binary) {
1716 char *p1, *p2, *pe;
1717
1718 p1 = buf;
1719 pe = p1 + total_read;
1720 while (p1 < pe && *p1 != '\r')
1721 p1++;
1722 p2 = p1;
1723 while (p1 < pe) {
1724 if (*p1 == '\r' && p1[1] == '\n')
1725 total_read--;
1726 else
1727 *p2++ = *p1;
1728 p1++;
1729 }
1730 }
1731 return (ssize_t) (total_read / elmsize);
1732}
1733
1734static ssize_t
1735stream_lz4write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
1736{
1737 lz4_stream *lz4 = s->stream_data.p;
1738 size_t ret, size = elmsize * cnt, total_written = 0, next_batch, next_attempt, available, real_written;
1739
1740 if (lz4 == NULL || size > LZ4_MAX_INPUT_SIZE || size <= 0) {
1741 s->errnr = MNSTR_WRITE_ERROR;
1742 return -1;
1743 }
1744
1745 while (total_written < size) {
1746 next_batch = size - total_written;
1747 available = lz4->ring_buffer_size - lz4->total_processing;
1748 do {
1749 next_attempt = LZ4F_compressBound(next_batch, NULL); /* lz4->ring_buffer must be at least 65548 bytes */
1750 if(next_attempt > available) {
1751 next_batch >>= 1;
1752 } else {
1753 break;
1754 }
1755 if(next_batch == 0)
1756 break;
1757 } while(1);
1758 assert(next_batch > 0);
1759
1760 ret = LZ4F_compressUpdate(lz4->context.comp_context, ((char*)lz4->ring_buffer) + lz4->total_processing,
1761 available, ((char*)buf) + total_written, next_batch, NULL);
1762 if(LZ4F_isError(ret)) {
1763 s->errnr = MNSTR_WRITE_ERROR;
1764 return -1;
1765 } else {
1766 lz4->total_processing += ret;
1767 }
1768
1769 if(lz4->total_processing == lz4->ring_buffer_size) {
1770 real_written = fwrite((void *)lz4->ring_buffer, 1, lz4->total_processing, lz4->fp);
1771 if (real_written == 0) {
1772 s->errnr = MNSTR_WRITE_ERROR;
1773 return -1;
1774 }
1775 lz4->total_processing = 0;
1776 }
1777 total_written += next_batch;
1778 }
1779
1780 return (ssize_t) (total_written / elmsize);
1781}
1782
1783static void
1784stream_lz4close(stream *s)
1785{
1786 lz4_stream *lz4 = s->stream_data.p;
1787
1788 if (lz4) {
1789 if (!s->readonly) {
1790 size_t ret, real_written;
1791
1792 if (lz4->total_processing > 0 && lz4->total_processing < lz4->ring_buffer_size) { /* compress remaining */
1793 real_written = fwrite(lz4->ring_buffer, 1, lz4->total_processing, lz4->fp);
1794 if (real_written == 0) {
1795 s->errnr = MNSTR_WRITE_ERROR;
1796 return ;
1797 }
1798 lz4->total_processing = 0;
1799 } /* finish compression */
1800 ret = LZ4F_compressEnd(lz4->context.comp_context, lz4->ring_buffer, lz4->ring_buffer_size, NULL);
1801 if(LZ4F_isError(ret)) {
1802 s->errnr = MNSTR_WRITE_ERROR;
1803 return ;
1804 }
1805 assert(ret < LZ4DECOMPBUFSIZ);
1806 lz4->total_processing = ret;
1807
1808 real_written = fwrite(lz4->ring_buffer, 1, lz4->total_processing, lz4->fp);
1809 if (real_written == 0) {
1810 s->errnr = MNSTR_WRITE_ERROR;
1811 return ;
1812 }
1813 lz4->total_processing = 0;
1814
1815 fflush(lz4->fp);
1816 }
1817 if(!s->readonly) {
1818 (void) LZ4F_freeCompressionContext(lz4->context.comp_context);
1819 } else {
1820 (void) LZ4F_freeDecompressionContext(lz4->context.dec_context);
1821 }
1822 fclose(lz4->fp);
1823 free(lz4->ring_buffer);
1824 free(lz4);
1825 }
1826 s->stream_data.p = NULL;
1827}
1828
1829static int
1830stream_lz4flush(stream *s)
1831{
1832 lz4_stream *lz4 = s->stream_data.p;
1833 size_t real_written, ret;
1834
1835 if (lz4 == NULL)
1836 return -1;
1837 if (!s->readonly) {
1838 if (lz4->total_processing > 0 && lz4->total_processing < lz4->ring_buffer_size) { /* compress remaining */
1839 real_written = fwrite(lz4->ring_buffer, 1, lz4->total_processing, lz4->fp);
1840 if (real_written == 0) {
1841 s->errnr = MNSTR_WRITE_ERROR;
1842 return -1;
1843 }
1844 lz4->total_processing = 0;
1845 }
1846 ret = LZ4F_flush(lz4->context.comp_context, lz4->ring_buffer, lz4->ring_buffer_size, NULL); /* flush it */
1847 if(LZ4F_isError(ret)) {
1848 s->errnr = MNSTR_WRITE_ERROR;
1849 return -1;
1850 }
1851 lz4->total_processing = ret;
1852 real_written = fwrite(lz4->ring_buffer, 1, lz4->total_processing, lz4->fp);
1853 if (real_written == 0) {
1854 s->errnr = MNSTR_WRITE_ERROR;
1855 return -1;
1856 }
1857 lz4->total_processing = 0;
1858
1859 if(fflush(lz4->fp))
1860 return -1;
1861 }
1862 return 0;
1863}
1864
1865static stream *
1866open_lz4stream(const char *restrict filename, const char *restrict flags)
1867{
1868 stream *s;
1869 lz4_stream *lz4;
1870 LZ4F_errorCode_t error_code;
1871 char fl[3];
1872 size_t buffer_size = (flags[0] == 'r') ? LZ4_COMPRESSBOUND(LZ4DECOMPBUFSIZ) : LZ4DECOMPBUFSIZ;
1873
1874 if ((lz4 = malloc(sizeof(struct lz4_stream))) == NULL)
1875 return NULL;
1876 *lz4 = (struct lz4_stream) {
1877 .ring_buffer = malloc(buffer_size),
1878 .total_processing = (flags[0] == 'r') ? buffer_size : 0,
1879 .ring_buffer_size = buffer_size,
1880 };
1881 if (lz4->ring_buffer == NULL) {
1882 free(lz4);
1883 return NULL;
1884 }
1885
1886 if(flags[0] == 'w') {
1887 error_code = LZ4F_createCompressionContext(&(lz4->context.comp_context), LZ4F_VERSION);
1888 } else {
1889 error_code = LZ4F_createDecompressionContext(&(lz4->context.dec_context), LZ4F_VERSION);
1890 }
1891 if(LZ4F_isError(error_code)) {
1892 free(lz4->ring_buffer);
1893 free(lz4);
1894 return NULL;
1895 }
1896
1897 if ((s = create_stream(filename)) == NULL) {
1898 if(flags[0] == 'w') {
1899 (void) LZ4F_freeCompressionContext(lz4->context.comp_context);
1900 } else {
1901 (void) LZ4F_freeDecompressionContext(lz4->context.dec_context);
1902 }
1903 free(lz4->ring_buffer);
1904 free(lz4);
1905 return NULL;
1906 }
1907 fl[0] = flags[0]; /* 'r' or 'w' */
1908 fl[1] = 'b'; /* always binary */
1909 fl[2] = '\0';
1910#ifdef HAVE__WFOPEN
1911 {
1912 wchar_t *wfname = utf8towchar(filename);
1913 wchar_t *wflags = utf8towchar(fl);
1914 if (wfname != NULL)
1915 lz4->fp = _wfopen(wfname, wflags);
1916 else
1917 lz4->fp = NULL;
1918 if (wfname)
1919 free(wfname);
1920 if (wflags)
1921 free(wflags);
1922 }
1923#else
1924 {
1925 char *fname = cvfilename(filename);
1926 if (fname) {
1927 lz4->fp = fopen(fname, fl);
1928 free(fname);
1929 } else
1930 lz4->fp = NULL;
1931 }
1932#endif
1933 if (lz4->fp == NULL) {
1934 destroy(s);
1935 if(flags[0] == 'w') {
1936 (void) LZ4F_freeCompressionContext(lz4->context.comp_context);
1937 } else {
1938 (void) LZ4F_freeDecompressionContext(lz4->context.dec_context);
1939 }
1940 free(lz4->ring_buffer);
1941 free(lz4);
1942 return NULL;
1943 }
1944 s->read = stream_lz4read;
1945 s->write = stream_lz4write;
1946 s->close = stream_lz4close;
1947 s->flush = stream_lz4flush;
1948 s->stream_data.p = (void *) lz4;
1949
1950 if(flags[0] == 'w') { /* start compression by writting the headers */
1951 size_t nwritten = LZ4F_compressBegin(lz4->context.comp_context, lz4->ring_buffer, lz4->ring_buffer_size, NULL);
1952 assert(nwritten < LZ4DECOMPBUFSIZ);
1953 if(LZ4F_isError(nwritten)) {
1954 (void) LZ4F_freeCompressionContext(lz4->context.comp_context);
1955 free(lz4->ring_buffer);
1956 free(lz4);
1957 return NULL;
1958 } else {
1959 lz4->total_processing += nwritten;
1960 }
1961 } else if (flags[0] == 'r' && flags[1] != 'b') { /* check for utf-8 encoding */
1962 char buf[UTF8BOMLENGTH];
1963 if (stream_lz4read(s, buf, 1, UTF8BOMLENGTH) == UTF8BOMLENGTH &&
1964 strncmp(buf, UTF8BOM, UTF8BOMLENGTH) == 0) {
1965 s->isutf8 = true;
1966 } else {
1967 rewind(lz4->fp);
1968 lz4->total_processing = buffer_size;
1969 lz4->ring_buffer_size = buffer_size;
1970 LZ4F_resetDecompressionContext(lz4->context.dec_context);
1971 mnstr_clearerr(s);
1972 }
1973 }
1974 return s;
1975}
1976
1977static stream *
1978open_lz4rstream(const char *filename)
1979{
1980 stream *s;
1981
1982 if ((s = open_lz4stream(filename, "rb")) == NULL)
1983 return NULL;
1984 s->binary = true;
1985 return s;
1986}
1987
1988static stream *
1989open_lz4wstream(const char *restrict filename, const char *restrict mode)
1990{
1991 stream *s;
1992
1993 if ((s = open_lz4stream(filename, mode)) == NULL)
1994 return NULL;
1995 s->readonly = false;
1996 s->binary = true;
1997 return s;
1998}
1999
2000static stream *
2001open_lz4rastream(const char *filename)
2002{
2003 stream *s;
2004
2005 if ((s = open_lz4stream(filename, "r")) == NULL)
2006 return NULL;
2007 s->binary = false;
2008 return s;
2009}
2010
2011static stream *
2012open_lz4wastream(const char *restrict filename, const char *restrict mode)
2013{
2014 stream *s;
2015
2016 if ((s = open_lz4stream(filename, mode)) == NULL)
2017 return NULL;
2018 s->readonly = false;
2019 s->binary = false;
2020 return s;
2021}
2022#else
2023#define open_lz4rstream(filename) NULL
2024#define open_lz4wstream(filename, mode) NULL
2025#define open_lz4rastream(filename) NULL
2026#define open_lz4wastream(filename, mode) NULL
2027#endif
2028
2029/* ------------------------------------------------------------------ */
2030/* streams working on a disk file, compressed or not */
2031
2032stream *
2033open_rstream(const char *filename)
2034{
2035 stream *s;
2036 const char *ext;
2037
2038 if (filename == NULL)
2039 return NULL;
2040#ifdef STREAM_DEBUG
2041 fprintf(stderr, "open_rstream %s\n", filename);
2042#endif
2043 ext = get_extension(filename);
2044
2045 if (strcmp(ext, "gz") == 0)
2046 return open_gzrstream(filename);
2047 if (strcmp(ext, "bz2") == 0)
2048 return open_bzrstream(filename);
2049 if (strcmp(ext, "xz") == 0)
2050 return open_xzrstream(filename);
2051 if (strcmp(ext, "lz4") == 0)
2052 return open_lz4rstream(filename);
2053
2054 if ((s = open_stream(filename, "rb")) == NULL)
2055 return NULL;
2056 s->binary = true;
2057 return s;
2058}
2059
2060stream *
2061open_wstream(const char *filename)
2062{
2063 stream *s;
2064 const char *ext;
2065
2066 if (filename == NULL)
2067 return NULL;
2068#ifdef STREAM_DEBUG
2069 fprintf(stderr, "open_wstream %s\n", filename);
2070#endif
2071 ext = get_extension(filename);
2072
2073 if (strcmp(ext, "gz") == 0)
2074 return open_gzwstream(filename, "wb");
2075 if (strcmp(ext, "bz2") == 0)
2076 return open_bzwstream(filename, "wb");
2077 if (strcmp(ext, "xz") == 0)
2078 return open_xzwstream(filename, "wb");
2079 if (strcmp(ext, "lz4") == 0)
2080 return open_lz4wstream(filename, "wb");
2081
2082 if ((s = open_stream(filename, "wb")) == NULL)
2083 return NULL;
2084 s->readonly = false;
2085 s->binary = true;
2086 return s;
2087}
2088
2089stream *
2090open_rastream(const char *filename)
2091{
2092 stream *s;
2093 const char *ext;
2094
2095 if (filename == NULL)
2096 return NULL;
2097#ifdef STREAM_DEBUG
2098 fprintf(stderr, "open_rastream %s\n", filename);
2099#endif
2100 ext = get_extension(filename);
2101
2102 if (strcmp(ext, "gz") == 0)
2103 return open_gzrastream(filename);
2104 if (strcmp(ext, "bz2") == 0)
2105 return open_bzrastream(filename);
2106 if (strcmp(ext, "xz") == 0)
2107 return open_xzrastream(filename);
2108 if (strcmp(ext, "lz4") == 0)
2109 return open_lz4rastream(filename);
2110
2111 if ((s = open_stream(filename, "r")) == NULL)
2112 return NULL;
2113 s->binary = false;
2114 return s;
2115}
2116
2117stream *
2118open_wastream(const char *filename)
2119{
2120 stream *s;
2121 const char *ext;
2122
2123 if (filename == NULL)
2124 return NULL;
2125#ifdef STREAM_DEBUG
2126 fprintf(stderr, "open_wastream %s\n", filename);
2127#endif
2128 ext = get_extension(filename);
2129
2130 if (strcmp(ext, "gz") == 0)
2131 return open_gzwastream(filename, "w");
2132 if (strcmp(ext, "bz2") == 0)
2133 return open_bzwastream(filename, "w");
2134 if (strcmp(ext, "xz") == 0)
2135 return open_xzwastream(filename, "w");
2136 if (strcmp(ext, "lz4") == 0)
2137 return open_lz4wastream(filename, "w");
2138
2139 if ((s = open_stream(filename, "w")) == NULL)
2140 return NULL;
2141 s->readonly = false;
2142 s->binary = false;
2143 return s;
2144}
2145
2146/* ------------------------------------------------------------------ */
2147/* streams working on a remote file using cURL */
2148
2149#ifdef HAVE_CURL
2150#include <curl/curl.h>
2151
2152struct curl_data {
2153 CURL *handle;
2154 char *buffer; /* buffer to store incoming data */
2155 size_t maxsize; /* size of allocated buffer */
2156 size_t usesize; /* end of used data */
2157 size_t offset; /* start of unread data */
2158 int running; /* whether still transferring */
2159#ifdef USE_CURL_MULTI
2160 CURLMcode result; /* result of transfer (if !running) */
2161 struct curl_data *next; /* linked list (curl_handles) */
2162#endif
2163};
2164#ifdef USE_CURL_MULTI
2165static CURLM *multi_handle;
2166static struct curl_data *curl_handles;
2167#endif
2168
2169#define BLOCK_CURL ((size_t) 1 << 16)
2170
2171/* this function is called by libcurl when there is data for us */
2172static size_t
2173write_callback(char *buffer, size_t size, size_t nitems, void *userp)
2174{
2175 stream *s = (stream *) userp;
2176 struct curl_data *c = (struct curl_data *) s->stream_data.p;
2177
2178 size *= nitems;
2179 if (size == 0) /* unlikely */
2180 return 0;
2181 /* allocate a buffer if we don't have one yet */
2182 if (c->buffer == NULL) {
2183 /* BLOCK_CURL had better be a power of 2! */
2184 c->maxsize = (size + BLOCK_CURL - 1) & ~(BLOCK_CURL - 1);
2185 if ((c->buffer = malloc(c->maxsize)) == NULL)
2186 return 0;
2187 c->usesize = 0;
2188 c->offset = 0;
2189 }
2190#ifndef USE_CURL_MULTI
2191 /* move data if we don't have enough space */
2192 if (c->maxsize - c->usesize < size && c->offset > 0) {
2193 memmove(c->buffer, c->buffer + c->offset, c->usesize - c->offset);
2194 c->usesize -= c->offset;
2195 c->offset = 0;
2196 }
2197#endif
2198 /* allocate more buffer space if we still don't have enough space */
2199 if (c->maxsize - c->usesize < size) {
2200 char *b;
2201 size_t maxsize;
2202
2203 maxsize = (c->usesize + size + BLOCK_CURL - 1) & ~(BLOCK_CURL - 1);
2204 b = realloc(c->buffer, c->maxsize);
2205 if (b == NULL)
2206 return 0; /* indicate failure to library */
2207 c->buffer = b;
2208 c->maxsize = maxsize;
2209 }
2210 /* finally, store the data we received */
2211 memcpy(c->buffer + c->usesize, buffer, size);
2212 c->usesize += size;
2213 return size;
2214}
2215
2216static void
2217curl_destroy(stream *s)
2218{
2219 struct curl_data *c;
2220#ifdef USE_CURL_MULTI
2221 struct curl_data **cp;
2222#endif
2223
2224 if ((c = (struct curl_data *) s->stream_data.p) != NULL) {
2225 s->stream_data.p = NULL;
2226#ifdef USE_CURL_MULTI
2227 /* lock access to curl_handles */
2228 cp = &curl_handles;
2229 while (*cp && *cp != c)
2230 cp = &(*cp)->next;
2231 if (*cp)
2232 *cp = c->next;
2233 /* unlock access to curl_handles */
2234#endif
2235 if (c->handle) {
2236#ifdef USE_CURL_MULTI
2237 curl_multi_remove_handle(mult_handle, c->handle);
2238#endif
2239 curl_easy_cleanup(c->handle);
2240 }
2241 if (c->buffer)
2242 free(c->buffer);
2243 free(c);
2244 }
2245 destroy(s);
2246}
2247
2248static ssize_t
2249curl_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
2250{
2251 struct curl_data *c = (struct curl_data *) s->stream_data.p;
2252 size_t size = cnt * elmsize;
2253
2254 if (c == NULL) {
2255 s->errnr = MNSTR_READ_ERROR;
2256 return -1;
2257 }
2258
2259 if (size == 0)
2260 return 0;
2261 if (c->usesize - c->offset >= elmsize || !c->running) {
2262 /* there is at least one element's worth of data
2263 * available, or we have reached the end: return as
2264 * much as we have, but no more than requested */
2265 if (size > c->usesize - c->offset) {
2266 cnt = (c->usesize - c->offset) / elmsize;
2267 size = cnt * elmsize;
2268 }
2269 memcpy(buf, c->buffer + c->offset, size);
2270 c->offset += size;
2271 if (c->offset == c->usesize)
2272 c->usesize = c->offset = 0;
2273 return (ssize_t) cnt;
2274 }
2275 /* not enough data, we must wait until we get some */
2276#ifndef USE_CURL_MULTI
2277 return 0;
2278#endif
2279}
2280
2281static ssize_t
2282curl_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
2283{
2284 (void) s;
2285 (void) buf;
2286 (void) elmsize;
2287 (void) cnt;
2288 assert(0);
2289 return -1;
2290}
2291
2292static void
2293curl_close(stream *s)
2294{
2295 (void) s;
2296}
2297
2298stream *
2299open_urlstream(const char *url)
2300{
2301 stream *s;
2302 struct curl_data *c;
2303#ifdef USE_CURL_MULTI
2304 CURLMsg *msg;
2305#endif
2306
2307 if ((c = malloc(sizeof(*c))) == NULL)
2308 return NULL;
2309 *c = (struct curl_data) {
2310 .running = 1,
2311 };
2312 if ((s = create_stream(url)) == NULL) {
2313 free(c);
2314 return NULL;
2315 }
2316#ifdef USE_CURL_MULTI
2317 /* lock access to curl_handles */
2318 c->next = curl_handles;
2319 curl_handles = c;
2320 /* unlock access to curl_handles */
2321#endif
2322 s->read = curl_read;
2323 s->write = curl_write;
2324 s->close = curl_close;
2325 s->destroy = curl_destroy;
2326 if ((c->handle = curl_easy_init()) == NULL) {
2327 free(c);
2328 destroy(s);
2329 return NULL;
2330 }
2331 s->stream_data.p = (void *) c;
2332 curl_easy_setopt(c->handle, CURLOPT_URL, s->name);
2333 curl_easy_setopt(c->handle, CURLOPT_WRITEDATA, s);
2334 curl_easy_setopt(c->handle, CURLOPT_VERBOSE, 0);
2335 curl_easy_setopt(c->handle, CURLOPT_NOSIGNAL, 1);
2336 curl_easy_setopt(c->handle, CURLOPT_WRITEFUNCTION, write_callback);
2337#ifdef USE_CURL_MULTI
2338 if (multi_handle == NULL)
2339 multi_handle = curl_multi_init();
2340 curl_multi_add_handle(multi_handle, c->handle);
2341 while (curl_multi_perform(multi_handle, NULL) == CURLM_CALL_MULTI_PERFORM)
2342 ;
2343 while ((msg = curl_multi_info_read(multi_handle, NULL)) != NULL) {
2344 struct curl_data *p;
2345 /* lock access to curl_handles */
2346 for (p = curl_handles; p; p = p->next) {
2347 if (p->handle == msg->easy_handle) {
2348 switch (msg->msg) {
2349 case CURLMSG_DONE:
2350 p->running = 0;
2351 p->result = msg->data.result;
2352 curl_multi_remove_handle(multi_handle, p->handle);
2353 curl_easy_cleanup(p->handle);
2354 p->handle = NULL;
2355 break;
2356 default:
2357 break;
2358 }
2359 break;
2360 }
2361 }
2362 /* unlock access to curl_handles */
2363 }
2364#else
2365 if (curl_easy_perform(c->handle) != CURLE_OK) {
2366 curl_destroy(s);
2367 return NULL;
2368 }
2369 curl_easy_cleanup(c->handle);
2370 c->handle = NULL;
2371 c->running = 0;
2372#endif
2373 return s;
2374}
2375
2376#else
2377stream *
2378open_urlstream(const char *url)
2379{
2380 if (url != NULL &&
2381 strncmp(url, "file://", sizeof("file://") - 1) == 0) {
2382 url +=sizeof("file://") - 1;
2383#ifdef _MSC_VER
2384 /* file:///C:/... -- remove third / as well */
2385 if (url[0] == '/' && url[2] == ':')
2386 url++;
2387#endif
2388 return open_rastream(url);
2389 }
2390 return NULL;
2391}
2392#endif /* HAVE_CURL */
2393
2394/* ------------------------------------------------------------------ */
2395/* streams working on a socket */
2396
2397static ssize_t
2398socket_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
2399{
2400 size_t size = elmsize * cnt, res = 0;
2401#ifdef NATIVE_WIN32
2402 int nr = 0;
2403#else
2404 ssize_t nr = 0;
2405#endif
2406
2407 if (s->errnr)
2408 return -1;
2409
2410 if (size == 0 || elmsize == 0)
2411 return (ssize_t) cnt;
2412
2413 errno = 0;
2414 while (res < size &&
2415 (
2416#ifdef NATIVE_WIN32
2417 /* send works on int, make sure the argument fits */
2418 ((nr = send(s->stream_data.s, (const char *) buf + res, (int) min(size - res, 1 << 16), 0)) > 0)
2419#else
2420 ((nr = write(s->stream_data.s, (const char *) buf + res, size - res)) > 0)
2421#endif
2422 || (nr < 0 && /* syscall failed */
2423 s->timeout > 0 && /* potentially timeout */
2424#ifdef _MSC_VER
2425 WSAGetLastError() == WSAEWOULDBLOCK &&
2426#else
2427 (errno == EAGAIN
2428#if EAGAIN != EWOULDBLOCK
2429 || errno == EWOULDBLOCK
2430#endif
2431 ) && /* it was! */
2432#endif
2433 s->timeout_func != NULL && /* callback function exists */
2434 !s->timeout_func()) /* callback says don't stop */
2435 ||(nr < 0 &&
2436#ifdef _MSC_VER
2437 WSAGetLastError() == WSAEINTR
2438#else
2439 errno == EINTR
2440#endif
2441 )) /* interrupted */
2442 ) {
2443 errno = 0;
2444#ifdef _MSC_VER
2445 WSASetLastError(0);
2446#endif
2447 if (nr > 0)
2448 res += (size_t) nr;
2449 }
2450 if (res >= elmsize)
2451 return (ssize_t) (res / elmsize);
2452 if (nr < 0) {
2453 if (s->timeout > 0 &&
2454#ifdef _MSC_VER
2455 WSAGetLastError() == WSAEWOULDBLOCK
2456#else
2457 (errno == EAGAIN
2458#if EAGAIN != EWOULDBLOCK
2459 || errno == EWOULDBLOCK
2460#endif
2461 )
2462#endif
2463 )
2464 s->errnr = MNSTR_TIMEOUT;
2465 else
2466 s->errnr = MNSTR_WRITE_ERROR;
2467 return -1;
2468 }
2469 return 0;
2470}
2471
2472static ssize_t
2473socket_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
2474{
2475#ifdef _MSC_VER
2476 int nr = 0;
2477#else
2478 ssize_t nr = 0;
2479#endif
2480 size_t size = elmsize * cnt;
2481
2482 if (s->errnr)
2483 return -1;
2484 if (size == 0)
2485 return 0;
2486
2487#ifdef _MSC_VER
2488 /* recv only takes an int parameter, and read does not accept
2489 * sockets */
2490 if (size > INT_MAX)
2491 size = elmsize * (INT_MAX / elmsize);
2492#endif
2493 for (;;) {
2494 if (s->timeout) {
2495 int ret;
2496#ifdef HAVE_POLL
2497 struct pollfd pfd;
2498
2499 pfd = (struct pollfd) {.fd = s->stream_data.s,
2500 .events = POLLIN};
2501
2502 ret = poll(&pfd, 1, (int) s->timeout);
2503 if (ret == -1 || (pfd.revents & POLLERR)) {
2504 s->errnr = MNSTR_READ_ERROR;
2505 return -1;
2506 }
2507#else
2508 struct timeval tv;
2509 fd_set fds;
2510
2511 errno = 0;
2512#ifdef _MSC_VER
2513 WSASetLastError(0);
2514#endif
2515 FD_ZERO(&fds);
2516 FD_SET(s->stream_data.s, &fds);
2517 tv.tv_sec = s->timeout / 1000;
2518 tv.tv_usec = (s->timeout % 1000) * 1000;
2519 ret = select(
2520#ifdef _MSC_VER
2521 0, /* ignored on Windows */
2522#else
2523 s->stream_data.s + 1,
2524#endif
2525 &fds, NULL, NULL, &tv);
2526 if (ret == SOCKET_ERROR) {
2527 s->errnr = MNSTR_READ_ERROR;
2528 return -1;
2529 }
2530#endif
2531 if (ret == 0) {
2532 if (s->timeout_func == NULL || s->timeout_func()) {
2533 s->errnr = MNSTR_TIMEOUT;
2534 return -1;
2535 }
2536 continue;
2537 }
2538 assert(ret == 1);
2539#ifdef HAVE_POLL
2540 assert(pfd.revents & (POLLIN|POLLHUP));
2541#else
2542 assert(FD_ISSET(s->stream_data.s, &fds));
2543#endif
2544 }
2545#ifdef _MSC_VER
2546 nr = recv(s->stream_data.s, buf, (int) size, 0);
2547 if (nr == SOCKET_ERROR) {
2548 s->errnr = MNSTR_READ_ERROR;
2549 return -1;
2550 }
2551#else
2552 nr = read(s->stream_data.s, buf, size);
2553 if (nr == -1) {
2554 s->errnr = MNSTR_READ_ERROR;
2555 return -1;
2556 }
2557#endif
2558 break;
2559 }
2560 if (nr == 0)
2561 return 0; /* end of file */
2562 if (elmsize > 1) {
2563 while ((size_t) nr % elmsize != 0) {
2564 /* if elmsize > 1, we really expect that "the
2565 * other side" wrote complete items in a
2566 * single system call, so we expect to at
2567 * least receive complete items, and hence we
2568 * continue reading until we did in fact
2569 * receive an integral number of complete
2570 * items, ignoring any timeouts (but not real
2571 * errors) (note that recursion is limited
2572 * since we don't propagate the element size
2573 * to the recursive call) */
2574 ssize_t n;
2575 n = socket_read(s, (char *) buf + nr, 1, size - (size_t) nr);
2576 if (n < 0) {
2577 s->errnr = MNSTR_READ_ERROR;
2578 return -1;
2579 }
2580 if (n == 0) /* unexpected end of file */
2581 break;
2582 nr +=
2583#ifdef _MSC_VER
2584 (int)
2585#endif
2586 n;
2587 }
2588 }
2589 return nr / (ssize_t) elmsize;
2590}
2591
2592static void
2593socket_close(stream *s)
2594{
2595 SOCKET fd = s->stream_data.s;
2596
2597 if (fd != INVALID_SOCKET) {
2598 /* Related read/write (in/out, from/to) streams
2599 * share a single socket which is not dup'ed (anymore)
2600 * as Windows' dup doesn't work on sockets;
2601 * hence, only one of the streams must/may close that
2602 * socket; we choose to let the read socket do the
2603 * job, since in mapi.c it may happen that the read
2604 * stream is closed before the write stream was even
2605 * created.
2606 */
2607 if (s->readonly) {
2608#ifdef HAVE_SHUTDOWN
2609 shutdown(fd, SHUT_RDWR);
2610#endif
2611 closesocket(fd);
2612 }
2613 }
2614 s->stream_data.s = INVALID_SOCKET;
2615}
2616
2617static void
2618socket_update_timeout(stream *s)
2619{
2620 SOCKET fd = s->stream_data.s;
2621 struct timeval tv;
2622
2623 if (fd == INVALID_SOCKET)
2624 return;
2625 tv.tv_sec = s->timeout / 1000;
2626 tv.tv_usec = (s->timeout % 1000) * 1000;
2627 /* cast to char * for Windows, no harm on "normal" systems */
2628 if (!s->readonly)
2629 (void) setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (char *) &tv, (socklen_t) sizeof(tv));
2630}
2631
2632#ifndef MSG_DONTWAIT
2633#define MSG_DONTWAIT 0
2634#endif
2635
2636static int
2637socket_isalive(stream *s)
2638{
2639 SOCKET fd = s->stream_data.s;
2640#ifdef HAVE_POLL
2641 struct pollfd pfd;
2642 int ret;
2643 pfd = (struct pollfd){.fd = fd};
2644 if ((ret = poll(&pfd, 1, 0)) == 0)
2645 return 1;
2646 if (ret < 0 || pfd.revents & (POLLERR | POLLHUP))
2647 return 0;
2648 assert(0); /* unexpected revents value */
2649 return 0;
2650#else
2651 fd_set fds;
2652 struct timeval t;
2653 char buffer[32];
2654
2655 t.tv_sec = 0;
2656 t.tv_usec = 0;
2657 FD_ZERO(&fds);
2658 FD_SET(fd, &fds);
2659 return select(
2660#ifdef _MSC_VER
2661 0, /* ignored on Windows */
2662#else
2663 fd + 1,
2664#endif
2665 &fds, NULL, NULL, &t) <= 0 ||
2666 recv(fd, buffer, sizeof(buffer), MSG_PEEK | MSG_DONTWAIT) != 0;
2667#endif
2668}
2669
2670static stream *
2671socket_open(SOCKET sock, const char *name)
2672{
2673 stream *s;
2674 int domain = 0;
2675
2676 if (sock == INVALID_SOCKET)
2677 return NULL;
2678 if ((s = create_stream(name)) == NULL)
2679 return NULL;
2680 s->read = socket_read;
2681 s->write = socket_write;
2682 s->close = socket_close;
2683 s->stream_data.s = sock;
2684 s->update_timeout = socket_update_timeout;
2685 s->isalive = socket_isalive;
2686
2687 errno = 0;
2688#ifdef _MSC_VER
2689 WSASetLastError(0);
2690#endif
2691#if defined(SO_DOMAIN)
2692 {
2693 socklen_t len = (socklen_t) sizeof(domain);
2694 if (getsockopt(sock, SOL_SOCKET, SO_DOMAIN, (void *) &domain, &len) == SOCKET_ERROR)
2695 domain = AF_INET; /* give it a value if call fails */
2696 }
2697#endif
2698#if defined(SO_KEEPALIVE) && !defined(WIN32)
2699 if (domain != PF_UNIX) { /* not on UNIX sockets */
2700 int opt = 1;
2701 (void) setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *) &opt, sizeof(opt));
2702 }
2703#endif
2704#if defined(IPTOS_THROUGHPUT) && !defined(WIN32)
2705 if (domain != PF_UNIX) { /* not on UNIX sockets */
2706 int tos = IPTOS_THROUGHPUT;
2707
2708 (void) setsockopt(sock, IPPROTO_IP, IP_TOS, (void *) &tos, sizeof(tos));
2709 }
2710#endif
2711#ifdef TCP_NODELAY
2712 if (domain != PF_UNIX) { /* not on UNIX sockets */
2713 int nodelay = 1;
2714
2715 (void) setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *) &nodelay, sizeof(nodelay));
2716 }
2717#endif
2718#ifdef HAVE_FCNTL
2719 {
2720 int fl = fcntl(sock, F_GETFL);
2721
2722 fl &= ~O_NONBLOCK;
2723 if (fcntl(sock, F_SETFL, fl) < 0) {
2724 s->errnr = MNSTR_OPEN_ERROR;
2725 return s;
2726 }
2727 }
2728#endif
2729
2730 return s;
2731}
2732
2733stream *
2734socket_rstream(SOCKET sock, const char *name)
2735{
2736 stream *s = NULL;
2737
2738#ifdef STREAM_DEBUG
2739 fprintf(stderr, "socket_rstream %zd %s\n", (ssize_t) sock, name);
2740#endif
2741 if ((s = socket_open(sock, name)) != NULL)
2742 s->binary = true;
2743 return s;
2744}
2745
2746stream *
2747socket_wstream(SOCKET sock, const char *name)
2748{
2749 stream *s;
2750
2751#ifdef STREAM_DEBUG
2752 fprintf(stderr, "socket_wstream %zd %s\n", (ssize_t) sock, name);
2753#endif
2754 if ((s = socket_open(sock, name)) == NULL)
2755 return NULL;
2756 s->readonly = false;
2757 s->binary = true;
2758 return s;
2759}
2760
2761/* ------------------------------------------------------------------ */
2762/* streams working on an open file pointer */
2763
2764#ifdef _MSC_VER
2765/* special case code for reading from/writing to a Windows console and
2766 * for reading from a Windows pipe
2767 *
2768 * For reading from and writing to the console we can use a wide
2769 * character interface which means that we are independent of the code
2770 * page being used. We can translate the wide characters (which are
2771 * Unicode code points) easily to UTF-8.
2772 *
2773 * Both for reading from the console and from a pipe, we avoid hanging
2774 * (waiting for input) in the read function. Instead, we only call
2775 * the read function when we know there is input available. This is
2776 * to prevent a deadlock situation, especially for reading from pipes,
2777 * when another thread were to also interact with pipes (as happend in
2778 * the scipy Python module as used in the sql/backends/monet5/pyapi05
2779 * test). */
2780
2781struct console {
2782 HANDLE h;
2783 DWORD len;
2784 DWORD rd;
2785 unsigned char i;
2786 uint32_t ch;
2787 WCHAR wbuf[8192];
2788};
2789
2790static ssize_t
2791console_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
2792{
2793 struct console *c = s->stream_data.p;
2794 size_t n = elmsize * cnt;
2795 unsigned char *p = buf;
2796
2797 if (c == NULL) {
2798 s->errnr = MNSTR_READ_ERROR;
2799 return -1;
2800 }
2801 if (n == 0)
2802 return 0;
2803 if (c->rd == c->len) {
2804 while (WaitForSingleObject(c->h, INFINITE) == WAIT_TIMEOUT)
2805 ;
2806 if (!ReadConsoleW(c->h, c->wbuf, 8192, &c->len, NULL)) {
2807 s->errnr = MNSTR_READ_ERROR;
2808 return -1;
2809 }
2810 c->rd = 0;
2811 if (c->len > 0 && c->wbuf[0] == 26) { /* control-Z */
2812 c->len = 0;
2813 return 0;
2814 }
2815 if (c->len > 0 && c->wbuf[0] == 0xFEFF)
2816 c->rd++; /* skip BOM */
2817 }
2818 while (n > 0 && c->rd < c->len) {
2819 if (c->wbuf[c->rd] == L'\r') {
2820 /* skip CR */
2821 c->rd++;
2822 } else if (c->wbuf[c->rd] <= 0x7F) {
2823 /* old-fashioned ASCII */
2824 *p++ = (unsigned char) c->wbuf[c->rd++];
2825 n--;
2826 } else if (c->wbuf[c->rd] <= 0x7FF) {
2827 if (c->i == 0) {
2828 *p++ = 0xC0 | (c->wbuf[c->rd] >> 6);
2829 c->i = 1;
2830 n--;
2831 }
2832 if (c->i == 1 && n > 0) {
2833 *p++ = 0x80 | (c->wbuf[c->rd++] & 0x3F);
2834 c->i = 0;
2835 n--;
2836 }
2837 } else if ((c->wbuf[c->rd] & 0xFC00) == 0xD800) {
2838 /* high surrogate */
2839 /* Unicode code points U+10000 and
2840 * higher cannot be represented in two
2841 * bytes in UTF-16. Instead they are
2842 * represented in four bytes using so
2843 * called high and low surrogates.
2844 * 00000000000uuuuuxxxxyyyyyyzzzzzz
2845 * 110110wwwwxxxxyy 110111yyyyzzzzzz
2846 * -> 11110uuu 10uuxxxx 10yyyyyy 10zzzzzz
2847 * where uuuuu = wwww + 1 */
2848 if (c->i == 0) {
2849 *p++ = 0xF0 | (((c->wbuf[c->rd] & 0x03C0) + 0x0040) >> 8);
2850 c->i = 1;
2851 n--;
2852 }
2853 if (c->i == 1 && n > 0) {
2854 *p++ = 0x80 | ((((c->wbuf[c->rd] & 0x03FC) + 0x0040) >> 2) & 0x3F);
2855 c->i = 2;
2856 n--;
2857 }
2858 if (c->i == 2 && n > 0) {
2859 *p = 0x80 | ((c->wbuf[c->rd++] & 0x0003) << 4);
2860 c->i = 3;
2861 }
2862 } else if ((c->wbuf[c->rd] & 0xFC00) == 0xDC00) {
2863 /* low surrogate */
2864 if (c->i == 3) {
2865 *p++ |= (c->wbuf[c->rd] & 0x03C0) >> 6;
2866 c->i = 4;
2867 n--;
2868 }
2869 if (c->i == 4 && n > 0) {
2870 *p++ = 0x80 | (c->wbuf[c->rd++] & 0x3F);
2871 c->i = 0;
2872 n--;
2873 }
2874 } else {
2875 if (c->i == 0) {
2876 *p++ = 0xE0 | (c->wbuf[c->rd] >> 12);
2877 c->i = 1;
2878 n--;
2879 }
2880 if (c->i == 1 && n > 0) {
2881 *p++ = 0x80 | ((c->wbuf[c->rd] >> 6) & 0x3F);
2882 c->i = 2;
2883 n--;
2884 }
2885 if (c->i == 2 && n > 0) {
2886 *p++ = 0x80 | (c->wbuf[c->rd++] & 0x3F);
2887 c->i = 0;
2888 n--;
2889 }
2890 }
2891 }
2892 return (ssize_t) ((p - (unsigned char *) buf) / elmsize);
2893}
2894
2895static ssize_t
2896console_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
2897{
2898 struct console *c = s->stream_data.p;
2899 size_t n = elmsize * cnt;
2900 const unsigned char *p = buf;
2901 uint32_t ch;
2902 int x;
2903
2904 if (c == NULL) {
2905 s->errnr = MNSTR_WRITE_ERROR;
2906 return -1;
2907 }
2908 if (n == 0)
2909 return 0;
2910
2911 c->len = 0;
2912 if (c->i > 0) {
2913 while (c->i > 0 && n > 0) {
2914 if ((*p & 0xC0) != 0x80) {
2915 s->errnr = MNSTR_WRITE_ERROR;
2916 return -1;
2917 }
2918 c->ch <<= 6;
2919 c->ch |= *p & 0x3F;
2920 p++;
2921 n--;
2922 c->i--;
2923 }
2924 if (c->i > 0) {
2925 ;
2926 } else if (c->ch > 0x10FFFF || (c->ch & 0xFFFFF800) == 0xD800) {
2927 s->errnr = MNSTR_WRITE_ERROR;
2928 return -1;
2929 } else if (c->ch > 0xFFFF) {
2930 c->wbuf[c->len++] = 0xD800 | ((c->ch >> 10) - (1 << 6));
2931 c->wbuf[c->len++] = 0xDC00 | (c->ch & 0x03FF);
2932 } else {
2933 c->wbuf[c->len++] = c->ch;
2934 }
2935 }
2936 while (n > 0) {
2937 if (c->len >= 8191) {
2938 if (!WriteConsoleW(c->h, c->wbuf, c->len, &c->rd, NULL)) {
2939 s->errnr = MNSTR_WRITE_ERROR;
2940 return -1;
2941 }
2942 c->len = 0;
2943 }
2944 if ((*p & 0x80) == 0) {
2945 if (*p == '\n')
2946 c->wbuf[c->len++] = L'\r';
2947 c->wbuf[c->len++] = *p++;
2948 n--;
2949 x = 0;
2950 continue;
2951 } else if ((*p & 0xE0) == 0xC0) {
2952 x = 1;
2953 ch = *p & 0x1F;
2954 } else if ((*p & 0xF0) == 0xE0) {
2955 x = 2;
2956 ch = *p & 0x0F;
2957 } else if ((*p & 0xF8) == 0xF0) {
2958 x = 3;
2959 ch = *p & 0x07;
2960 } else {
2961 s->errnr = MNSTR_WRITE_ERROR;
2962 return -1;
2963 }
2964 p++;
2965 n--;
2966 while (x > 0 && n > 0) {
2967 if ((*p & 0xC0) != 0x80) {
2968 s->errnr = MNSTR_WRITE_ERROR;
2969 return -1;
2970 }
2971 ch <<= 6;
2972 ch |= *p & 0x3F;
2973 p++;
2974 n--;
2975 x--;
2976 }
2977 if (x > 0) {
2978 c->ch = ch;
2979 c->i = x;
2980 } else if (ch > 0x10FFFF || (ch & 0xFFFFF800) == 0xD800) {
2981 s->errnr = MNSTR_WRITE_ERROR;
2982 return -1;
2983 } else if (ch > 0xFFFF) {
2984 c->wbuf[c->len++] = 0xD800 | ((ch >> 10) - (1 << 6));
2985 c->wbuf[c->len++] = 0xDC00 | (ch & 0x03FF);
2986 } else {
2987 c->wbuf[c->len++] = ch;
2988 }
2989 }
2990 if (c->len > 0) {
2991 if (!WriteConsoleW(c->h, c->wbuf, c->len, &c->rd, NULL)) {
2992 s->errnr = MNSTR_WRITE_ERROR;
2993 return -1;
2994 }
2995 c->len = 0;
2996 }
2997 return (ssize_t) ((p - (const unsigned char *) buf) / elmsize);
2998}
2999
3000static ssize_t
3001pipe_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
3002{
3003 HANDLE h = s->stream_data.p;
3004 size_t n;
3005 unsigned char *p;
3006 DWORD nread;
3007
3008 if (h == NULL) {
3009 s->errnr = MNSTR_READ_ERROR;
3010 return -1;
3011 }
3012 if (elmsize == 0 || cnt == 0)
3013 return 0;
3014 tailrecurse:
3015 n = elmsize * cnt;
3016 p = buf;
3017
3018 for (;;) {
3019 DWORD ret = PeekNamedPipe(h, NULL, 0, NULL, &nread, NULL);
3020 if (ret == 0) {
3021 if (GetLastError() == ERROR_BROKEN_PIPE)
3022 return 0;
3023 s->errnr = MNSTR_READ_ERROR;
3024 return -1;
3025 }
3026 if (nread > 0)
3027 break;
3028 Sleep(100);
3029 }
3030 if ((size_t) nread < n)
3031 n = (size_t) nread;
3032 if (!ReadFile(h, buf, (DWORD) n, &nread, NULL)) {
3033 s->errnr = MNSTR_READ_ERROR;
3034 return -1;
3035 }
3036 /* when in text mode, convert \r\n line endings to \n */
3037 if (!s->binary && nread > 0) {
3038 char *p1, *p2, *pe;
3039
3040 p1 = buf;
3041 pe = p1 + nread;
3042 while (p1 < pe && *p1 != '\r')
3043 p1++;
3044 p2 = p1;
3045 while (p1 < pe) {
3046 if (*p1 == '\r' /*&& p1[1] == '\n'*/)
3047 nread--;
3048 else
3049 *p2++ = *p1;
3050 p1++;
3051 }
3052 if (nread == 0) {
3053 /* try again after removing \r and ending up
3054 * with nothing */
3055 goto tailrecurse;
3056 }
3057 }
3058 return nread / elmsize;
3059}
3060
3061static void
3062console_destroy(stream *s)
3063{
3064 if (s->stream_data.p)
3065 free(s->stream_data.p);
3066 destroy(s);
3067}
3068#endif
3069
3070static stream *
3071file_stream(const char *name)
3072{
3073 stream *s;
3074
3075 if ((s = create_stream(name)) == NULL)
3076 return NULL;
3077 s->read = file_read;
3078 s->write = file_write;
3079 s->close = file_close;
3080 s->destroy = file_destroy;
3081 s->flush = file_flush;
3082 s->fsync = file_fsync;
3083 s->fgetpos = file_fgetpos;
3084 s->fsetpos = file_fsetpos;
3085 return s;
3086}
3087
3088stream *
3089file_rstream(FILE *restrict fp, const char *restrict name)
3090{
3091 stream *s;
3092
3093 if (fp == NULL)
3094 return NULL;
3095#ifdef STREAM_DEBUG
3096 fprintf(stderr, "file_rstream %s\n", name);
3097#endif
3098 if ((s = file_stream(name)) == NULL)
3099 return NULL;
3100 s->binary = true;
3101 s->stream_data.p = (void *) fp;
3102 return s;
3103}
3104
3105stream *
3106file_wstream(FILE *restrict fp, const char *restrict name)
3107{
3108 stream *s;
3109
3110 if (fp == NULL)
3111 return NULL;
3112#ifdef STREAM_DEBUG
3113 fprintf(stderr, "file_wstream %s\n", name);
3114#endif
3115 if ((s = file_stream(name)) == NULL)
3116 return NULL;
3117 s->readonly = false;
3118 s->binary = true;
3119 s->stream_data.p = (void *) fp;
3120 return s;
3121}
3122
3123stream *
3124file_rastream(FILE *restrict fp, const char *restrict name)
3125{
3126 stream *s;
3127 fpos_t pos;
3128 char buf[UTF8BOMLENGTH + 1];
3129 struct stat stb;
3130
3131 if (fp == NULL)
3132 return NULL;
3133#ifdef STREAM_DEBUG
3134 fprintf(stderr, "file_rastream %s\n", name);
3135#endif
3136 if ((s = file_stream(name)) == NULL)
3137 return NULL;
3138 s->binary = false;
3139 s->stream_data.p = (void *) fp;
3140 if (fstat(fileno(fp), &stb) == 0 &&
3141 S_ISREG(stb.st_mode) &&
3142 fgetpos(fp, &pos) == 0) {
3143 if (file_read(s, buf, 1, UTF8BOMLENGTH) == UTF8BOMLENGTH &&
3144 strncmp(buf, UTF8BOM, UTF8BOMLENGTH) == 0) {
3145 s->isutf8 = true;
3146 return s;
3147 }
3148 if (fsetpos(fp, &pos) != 0) {
3149 /* unlikely: we couldn't seek the file back */
3150 destroy(s);
3151 return NULL;
3152 }
3153 }
3154#ifdef _MSC_VER
3155 if (fp == stdin) {
3156 HANDLE h = GetStdHandle(STD_INPUT_HANDLE);
3157
3158 switch (GetFileType(h)) {
3159 case FILE_TYPE_PIPE:
3160 s->stream_data.p = h;
3161 s->read = pipe_read;
3162 s->write = NULL;
3163 s->destroy = destroy;
3164 s->close = NULL;
3165 s->flush = NULL;
3166 s->fsync = NULL;
3167 s->fgetpos = NULL;
3168 s->fsetpos = NULL;
3169 break;
3170 case FILE_TYPE_CHAR: {
3171 struct console *c = malloc(sizeof(struct console));
3172 if (c == NULL) {
3173 destroy(s);
3174 return NULL;
3175 }
3176 s->stream_data.p = c;
3177 *c = (struct console) {
3178 .h = h,
3179 };
3180 s->read = console_read;
3181 s->write = NULL;
3182 s->destroy = console_destroy;
3183 s->close = NULL;
3184 s->flush = NULL;
3185 s->fsync = NULL;
3186 s->fgetpos = NULL;
3187 s->fsetpos = NULL;
3188 s->isutf8 = true;
3189 break;
3190 }
3191 }
3192 }
3193#endif
3194 return s;
3195}
3196
3197stream *
3198file_wastream(FILE *restrict fp, const char *restrict name)
3199{
3200 stream *s;
3201
3202 if (fp == NULL)
3203 return NULL;
3204#ifdef STREAM_DEBUG
3205 fprintf(stderr, "file_wastream %s\n", name);
3206#endif
3207 if ((s = file_stream(name)) == NULL)
3208 return NULL;
3209 s->readonly = false;
3210 s->binary = false;
3211#ifdef _MSC_VER
3212 if ((fileno(fp) == 1 || fileno(fp) == 2) && isatty(fileno(fp))) {
3213 struct console *c = malloc(sizeof(struct console));
3214 if (c == NULL) {
3215 destroy(s);
3216 return NULL;
3217 }
3218 s->stream_data.p = c;
3219 *c = (struct console) {
3220 .h = GetStdHandle(STD_OUTPUT_HANDLE),
3221 };
3222 s->read = NULL;
3223 s->write = console_write;
3224 s->destroy = console_destroy;
3225 s->close = NULL;
3226 s->flush = NULL;
3227 s->fsync = NULL;
3228 s->fgetpos = NULL;
3229 s->fsetpos = NULL;
3230 s->isutf8 = true;
3231 return s;
3232 }
3233#endif
3234 s->stream_data.p = (void *) fp;
3235 return s;
3236}
3237
3238/* some lower-level access functions */
3239FILE *
3240getFile(stream *s)
3241{
3242#ifdef _MSC_VER
3243 if (s->read == console_read)
3244 return stdin;
3245 if (s->write == console_write)
3246 return stdout;
3247#endif
3248 if (s->read != file_read)
3249 return NULL;
3250 return (FILE *) s->stream_data.p;
3251}
3252
3253int
3254getFileNo(stream *s)
3255{
3256 FILE *f;
3257
3258 f = getFile(s);
3259 if (f == NULL)
3260 return -1;
3261 return fileno(f);
3262}
3263
3264size_t
3265getFileSize(stream *s)
3266{
3267 struct stat stb;
3268 int fd = getFileNo(s);
3269
3270 if (fd >= 0 && fstat(fd, &stb) == 0)
3271 return (size_t) stb.st_size;
3272 return 0; /* unknown */
3273}
3274
3275/* ------------------------------------------------------------------ */
3276/* streams working on a substream, converting character sets using iconv */
3277
3278#ifdef HAVE_ICONV
3279
3280struct icstream {
3281 iconv_t cd;
3282 stream *s;
3283 char buffer[BUFSIZ];
3284 size_t buflen;
3285 bool eof;
3286};
3287
3288static ssize_t
3289ic_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
3290{
3291 struct icstream *ic = (struct icstream *) s->stream_data.p;
3292 ICONV_CONST char *inbuf;
3293 size_t inbytesleft = elmsize * cnt;
3294 char *bf = NULL;
3295
3296 if (ic == NULL)
3297 goto bailout;
3298
3299 /* if unconverted data from a previous call remains, add it to
3300 * the start of the new data, using temporary space */
3301 if (ic->buflen > 0) {
3302 bf = malloc(ic->buflen + inbytesleft);
3303 if (bf == NULL) {
3304 /* cannot allocate memory */
3305 goto bailout;
3306 }
3307 memcpy(bf, ic->buffer, ic->buflen);
3308 memcpy(bf + ic->buflen, buf, inbytesleft);
3309 buf = bf;
3310 inbytesleft += ic->buflen;
3311 ic->buflen = 0;
3312 }
3313 inbuf = (ICONV_CONST char *) buf;
3314 while (inbytesleft > 0) {
3315 char *outbuf = ic->buffer;
3316 size_t outbytesleft = sizeof(ic->buffer);
3317
3318 if (iconv(ic->cd, &inbuf, &inbytesleft, &outbuf, &outbytesleft) == (size_t) -1) {
3319 switch (errno) {
3320 case EILSEQ:
3321 /* invalid multibyte sequence encountered */
3322 goto bailout;
3323 case EINVAL:
3324 /* incomplete multibyte sequence
3325 * encountered flush what has been
3326 * converted */
3327 if (outbytesleft < sizeof(ic->buffer) &&
3328 mnstr_write(ic->s, ic->buffer, 1, sizeof(ic->buffer) - outbytesleft) < 0) {
3329 goto bailout;
3330 }
3331 /* remember what hasn't been converted */
3332 if (inbytesleft > sizeof(ic->buffer)) {
3333 /* ridiculously long multibyte
3334 * sequence, so return
3335 * error */
3336 goto bailout;
3337 }
3338 memcpy(ic->buffer, inbuf, inbytesleft);
3339 ic->buflen = inbytesleft;
3340 if (bf)
3341 free(bf);
3342 return (ssize_t) cnt;
3343 case E2BIG:
3344 /* not enough space in output buffer */
3345 break;
3346 default:
3347 /* cannot happen (according to manual) */
3348 goto bailout;
3349 }
3350 }
3351 if (mnstr_write(ic->s, ic->buffer, 1, sizeof(ic->buffer) - outbytesleft) < 0) {
3352 goto bailout;
3353 }
3354 }
3355 if (bf)
3356 free(bf);
3357 return (ssize_t) cnt;
3358
3359 bailout:
3360 s->errnr = MNSTR_WRITE_ERROR;
3361 if (bf)
3362 free(bf);
3363 return -1;
3364}
3365
3366static ssize_t
3367ic_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
3368{
3369 struct icstream *ic = (struct icstream *) s->stream_data.p;
3370 ICONV_CONST char *inbuf;
3371 size_t inbytesleft;
3372 char *outbuf;
3373 size_t outbytesleft;
3374
3375 if (ic == NULL) {
3376 s->errnr = MNSTR_READ_ERROR;
3377 return -1;
3378 }
3379 inbuf = ic->buffer;
3380 inbytesleft = ic->buflen;
3381 outbuf = (char *) buf;
3382 outbytesleft = elmsize * cnt;
3383 if (outbytesleft == 0)
3384 return 0;
3385 while (outbytesleft > 0 && !ic->eof) {
3386 if (ic->buflen == sizeof(ic->buffer)) {
3387 /* ridiculously long multibyte sequence, return error */
3388 s->errnr = MNSTR_READ_ERROR;
3389 return -1;
3390 }
3391
3392 switch (mnstr_read(ic->s, ic->buffer + ic->buflen, 1, 1)) {
3393 case 1:
3394 /* expected: read one byte */
3395 ic->buflen++;
3396 inbytesleft++;
3397 break;
3398 case 0:
3399 /* end of file */
3400 ic->eof = true;
3401 if (ic->buflen > 0) {
3402 /* incomplete input */
3403 s->errnr = MNSTR_READ_ERROR;
3404 return -1;
3405 }
3406 if (iconv(ic->cd, NULL, NULL, &outbuf, &outbytesleft) == (size_t) -1) {
3407 /* some error occurred */
3408 s->errnr = MNSTR_READ_ERROR;
3409 return -1;
3410 }
3411 goto exit_func; /* double break */
3412 default:
3413 /* error */
3414 s->errnr = ic->s->errnr;
3415 return -1;
3416 }
3417 if (iconv(ic->cd, &inbuf, &inbytesleft, &outbuf, &outbytesleft) == (size_t) -1) {
3418 switch (errno) {
3419 case EILSEQ:
3420 /* invalid multibyte sequence encountered */
3421 s->errnr = MNSTR_READ_ERROR;
3422 return -1;
3423 case EINVAL:
3424 /* incomplete multibyte sequence encountered */
3425 break;
3426 case E2BIG:
3427 /* not enough space in output buffer,
3428 * return what we have, saving what's in
3429 * the buffer */
3430 goto exit_func;
3431 default:
3432 /* cannot happen (according to manual) */
3433 s->errnr = MNSTR_READ_ERROR;
3434 return -1;
3435 }
3436 }
3437 if (inbytesleft == 0) {
3438 /* converted complete buffer */
3439 inbuf = ic->buffer;
3440 ic->buflen = 0;
3441 }
3442 }
3443 exit_func:
3444 if (inbuf > ic->buffer)
3445 memmove(ic->buffer, inbuf, inbytesleft);
3446 ic->buflen = inbytesleft;
3447 if (outbytesleft == elmsize * cnt) {
3448 /* if we're returning data, we must pass on EOF on the
3449 * next call (i.e. keep ic->eof set), otherwise we
3450 * must clear it so that the next call will cause the
3451 * underlying stream to be read again */
3452 ic->eof = false;
3453 }
3454 return (ssize_t) ((elmsize * cnt - outbytesleft) / elmsize);
3455}
3456
3457static int
3458ic_flush(stream *s)
3459{
3460 struct icstream *ic = (struct icstream *) s->stream_data.p;
3461 char *outbuf;
3462 size_t outbytesleft;
3463
3464 if (ic == NULL)
3465 return -1;
3466 outbuf = ic->buffer;
3467 outbytesleft = sizeof(ic->buffer);
3468 /* if unconverted data from a previous call remains, it was an
3469 * incomplete multibyte sequence, so an error */
3470 if (ic->buflen > 0 ||
3471 iconv(ic->cd, NULL, NULL, &outbuf, &outbytesleft) == (size_t) -1 ||
3472 (outbytesleft < sizeof(ic->buffer) &&
3473 mnstr_write(ic->s, ic->buffer, 1, sizeof(ic->buffer) - outbytesleft) < 0)) {
3474 s->errnr = MNSTR_WRITE_ERROR;
3475 return -1;
3476 }
3477 return mnstr_flush(ic->s);
3478}
3479
3480static void
3481ic_close(stream *s)
3482{
3483 struct icstream *ic = (struct icstream *) s->stream_data.p;
3484
3485 if (ic) {
3486 if (!s->readonly)
3487 ic_flush(s);
3488 iconv_close(ic->cd);
3489 close_stream(ic->s);
3490 free(s->stream_data.p);
3491 s->stream_data.p = NULL;
3492 }
3493}
3494
3495static void
3496ic_destroy(stream *s)
3497{
3498 ic_close(s);
3499 destroy(s);
3500}
3501
3502static void
3503ic_update_timeout(stream *s)
3504{
3505 struct icstream *ic = (struct icstream *) s->stream_data.p;
3506
3507 if (ic && ic->s) {
3508 ic->s->timeout = s->timeout;
3509 ic->s->timeout_func = s->timeout_func;
3510 if (ic->s->update_timeout)
3511 ic->s->update_timeout(ic->s);
3512 }
3513}
3514
3515static int
3516ic_isalive(stream *s)
3517{
3518 struct icstream *ic = (struct icstream *) s->stream_data.p;
3519
3520 if (ic && ic->s) {
3521 if (ic->s->isalive)
3522 return ic->s->isalive(ic->s);
3523 return 1;
3524 }
3525 return 0;
3526}
3527
3528static void
3529ic_clrerr(stream *s)
3530{
3531 if (s->stream_data.p)
3532 mnstr_clearerr(((struct icstream *) s->stream_data.p)->s);
3533}
3534
3535static stream *
3536ic_open(iconv_t cd, stream *restrict ss, const char *restrict name)
3537{
3538 stream *s;
3539 struct icstream *ic;
3540
3541 if (ss->isutf8)
3542 return ss;
3543 if ((s = create_stream(name)) == NULL)
3544 return NULL;
3545 s->read = ic_read;
3546 s->write = ic_write;
3547 s->close = ic_close;
3548 s->destroy = ic_destroy;
3549 s->clrerr = ic_clrerr;
3550 s->flush = ic_flush;
3551 s->update_timeout = ic_update_timeout;
3552 s->isalive = ic_isalive;
3553 ic = malloc(sizeof(struct icstream));
3554 if (ic == NULL) {
3555 mnstr_destroy(s);
3556 return NULL;
3557 }
3558 s->stream_data.p = ic;
3559 *ic = (struct icstream) {
3560 .cd = cd,
3561 .s = ss,
3562 .buflen = 0,
3563 .eof = false,
3564 };
3565 return s;
3566}
3567
3568stream *
3569iconv_rstream(stream *restrict ss, const char *restrict charset, const char *restrict name)
3570{
3571 stream *s;
3572 iconv_t cd;
3573
3574 if (ss == NULL || charset == NULL || name == NULL)
3575 return NULL;
3576#ifdef STREAM_DEBUG
3577 fprintf(stderr, "iconv_rstream %s %s\n", charset, name);
3578#endif
3579 if (ss->isutf8)
3580 return ss;
3581 cd = iconv_open("utf-8", charset);
3582 if (cd == (iconv_t) -1)
3583 return NULL;
3584 s = ic_open(cd, ss, name);
3585 if (s == NULL) {
3586 iconv_close(cd);
3587 return NULL;
3588 }
3589 s->readonly = true;
3590 s->isutf8 = true;
3591 return s;
3592}
3593
3594stream *
3595iconv_wstream(stream *restrict ss, const char *restrict charset, const char *restrict name)
3596{
3597 stream *s;
3598 iconv_t cd;
3599
3600 if (ss == NULL || charset == NULL || name == NULL)
3601 return NULL;
3602#ifdef STREAM_DEBUG
3603 fprintf(stderr, "iconv_wstream %s %s\n", charset, name);
3604#endif
3605 if (ss->isutf8)
3606 return ss;
3607 cd = iconv_open(charset, "utf-8");
3608 if (cd == (iconv_t) -1)
3609 return NULL;
3610 s = ic_open(cd, ss, name);
3611 if (s == NULL) {
3612 iconv_close(cd);
3613 return NULL;
3614 }
3615 s->readonly = false;
3616 return s;
3617}
3618
3619#else
3620stream *
3621iconv_rstream(stream *restrict ss, const char *restrict charset, const char *restrict name)
3622{
3623 if (ss == NULL || charset == NULL || name == NULL)
3624 return NULL;
3625 if (ss->isutf8 ||
3626 strcmp(charset, "utf-8") == 0 ||
3627 strcmp(charset, "UTF-8") == 0 ||
3628 strcmp(charset, "UTF8") == 0)
3629 return ss;
3630
3631 return NULL;
3632}
3633
3634stream *
3635iconv_wstream(stream *restrict ss, const char *restrict charset, const char *restrict name)
3636{
3637 if (ss == NULL || charset == NULL || name == NULL)
3638 return NULL;
3639 if (ss->isutf8 ||
3640 strcmp(charset, "utf-8") == 0 ||
3641 strcmp(charset, "UTF-8") == 0 ||
3642 strcmp(charset, "UTF8") == 0)
3643 return ss;
3644
3645 return NULL;
3646}
3647#endif /* HAVE_ICONV */
3648
3649/* ------------------------------------------------------------------ */
3650
3651void
3652buffer_init(buffer *restrict b, char *restrict buf, size_t size)
3653{
3654 if (b == NULL || buf == NULL)
3655 return;
3656 b->pos = 0;
3657 b->buf = buf;
3658 b->len = size;
3659}
3660
3661buffer *
3662buffer_create(size_t size)
3663{
3664 buffer *b;
3665
3666 if ((b = malloc(sizeof(*b))) == NULL)
3667 return NULL;
3668 *b = (buffer) {
3669 .buf = malloc(size),
3670 .len = size,
3671 };
3672 if (b->buf == NULL) {
3673 free(b);
3674 return NULL;
3675 }
3676 return b;
3677}
3678
3679char *
3680buffer_get_buf(buffer *b)
3681{
3682 char *r;
3683
3684 if (b == NULL)
3685 return NULL;
3686 if (b->pos == b->len) {
3687 if ((r = realloc(b->buf, b->len + 1)) == NULL) {
3688 /* keep b->buf in tact */
3689 return NULL;
3690 }
3691 b->buf = r;
3692 }
3693 r = b->buf;
3694 r[b->pos] = '\0';
3695 b->buf = malloc(b->len);
3696 if (b->buf == NULL) {
3697 free(b);
3698 free(r);
3699 return NULL;
3700 }
3701 b->len = b->buf ? b->len : 0;
3702 b->pos = 0;
3703 return r;
3704}
3705
3706void
3707buffer_destroy(buffer *b)
3708{
3709 if (b == NULL)
3710 return;
3711 if (b->buf)
3712 free(b->buf);
3713 free(b);
3714}
3715
3716buffer *
3717mnstr_get_buffer(stream *s)
3718{
3719 if (s == NULL)
3720 return NULL;
3721 return (buffer *) s->stream_data.p;
3722}
3723
3724static ssize_t
3725buffer_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
3726{
3727 size_t size = elmsize * cnt;
3728 buffer *b;
3729
3730 b = (buffer *) s->stream_data.p;
3731 assert(b);
3732 if (size && b && b->pos + size <= b->len) {
3733 memcpy(buf, b->buf + b->pos, size);
3734 b->pos += size;
3735 return (ssize_t) (size / elmsize);
3736 }
3737 return 0;
3738}
3739
3740static ssize_t
3741buffer_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
3742{
3743 size_t size = elmsize * cnt;
3744 buffer *b;
3745
3746 b = (buffer *) s->stream_data.p;
3747 assert(b);
3748 if (b == NULL) {
3749 s->errnr = MNSTR_WRITE_ERROR;
3750 return -1;
3751 }
3752 if (b->pos + size > b->len) {
3753 char *p;
3754 size_t ns = b->pos + size + 8192;
3755
3756 if ((p = realloc(b->buf, ns)) == NULL) {
3757 s->errnr = MNSTR_WRITE_ERROR;
3758 return -1;
3759 }
3760 b->buf = p;
3761 b->len = ns;
3762 }
3763 memcpy(b->buf + b->pos, buf, size);
3764 b->pos += size;
3765 return (ssize_t) cnt;
3766}
3767
3768static void
3769buffer_close(stream *s)
3770{
3771 (void) s;
3772}
3773
3774static int
3775buffer_flush(stream *s)
3776{
3777 buffer *b;
3778
3779 b = (buffer *) s->stream_data.p;
3780 assert(b);
3781 if (b == NULL)
3782 return -1;
3783 b->pos = 0;
3784 return 0;
3785}
3786
3787stream *
3788buffer_rastream(buffer *restrict b, const char *restrict name)
3789{
3790 stream *s;
3791
3792 if (b == NULL || name == NULL)
3793 return NULL;
3794#ifdef STREAM_DEBUG
3795 fprintf(stderr, "buffer_rastream %s\n", name);
3796#endif
3797 if ((s = create_stream(name)) == NULL)
3798 return NULL;
3799 s->binary = false;
3800 s->read = buffer_read;
3801 s->write = buffer_write;
3802 s->close = buffer_close;
3803 s->flush = buffer_flush;
3804 s->stream_data.p = (void *) b;
3805 return s;
3806}
3807
3808stream *
3809buffer_wastream(buffer *restrict b, const char *restrict name)
3810{
3811 stream *s;
3812
3813 if (b == NULL || name == NULL)
3814 return NULL;
3815#ifdef STREAM_DEBUG
3816 fprintf(stderr, "buffer_wastream %s\n", name);
3817#endif
3818 if ((s = create_stream(name)) == NULL)
3819 return NULL;
3820 s->readonly = false;
3821 s->binary = false;
3822 s->read = buffer_read;
3823 s->write = buffer_write;
3824 s->close = buffer_close;
3825 s->flush = buffer_flush;
3826 s->stream_data.p = (void *) b;
3827 return s;
3828}
3829
3830
3831
3832/* ------------------------------------------------------------------ */
3833
3834/* A buffered stream consists of a sequence of blocks. Each block
3835 * consists of a count followed by the data in the block. A flush is
3836 * indicated by an empty block (i.e. just a count of 0).
3837 */
3838typedef struct bs {
3839 stream *s; /* underlying stream */
3840 unsigned nr; /* how far we got in buf */
3841 unsigned itotal; /* amount available in current read block */
3842 size_t blks; /* read/writen blocks (possibly partial) */
3843 size_t bytes; /* read/writen bytes */
3844 char buf[BLOCK]; /* the buffered data (minus the size of
3845 * size-short */
3846} bs;
3847
3848static bs *
3849bs_create(stream *s)
3850{
3851 /* should be a binary stream */
3852 bs *ns;
3853
3854 if ((ns = malloc(sizeof(*ns))) == NULL)
3855 return NULL;
3856 *ns = (bs) {
3857 .s = s,
3858 };
3859 return ns;
3860}
3861
3862/* Collect data until the internal buffer is filled, then write the
3863 * filled buffer to the underlying stream.
3864 * Struct field usage:
3865 * s - the underlying stream;
3866 * buf - the buffer in which data is collected;
3867 * nr - how much of buf is already filled (if nr == sizeof(buf) the
3868 * data is written to the underlying stream, so upon entry nr <
3869 * sizeof(buf));
3870 * itotal - unused.
3871 */
3872static ssize_t
3873bs_write(stream *restrict ss, const void *restrict buf, size_t elmsize, size_t cnt)
3874{
3875 bs *s;
3876 size_t todo = cnt * elmsize;
3877 uint16_t blksize;
3878
3879 s = (bs *) ss->stream_data.p;
3880 if (s == NULL)
3881 return -1;
3882 assert(!ss->readonly);
3883 assert(s->nr < sizeof(s->buf));
3884 while (todo > 0) {
3885 size_t n = sizeof(s->buf) - s->nr;
3886
3887 if (todo < n)
3888 n = todo;
3889 memcpy(s->buf + s->nr, buf, n);
3890 s->nr += (unsigned) n;
3891 todo -= n;
3892 buf = ((const char *) buf + n);
3893 if (s->nr == sizeof(s->buf)) {
3894 /* block is full, write it to the stream */
3895#ifdef BSTREAM_DEBUG
3896 {
3897 unsigned i;
3898
3899 fprintf(stderr, "W %s %u \"", ss->name, s->nr);
3900 for (i = 0; i < s->nr; i++)
3901 if (' ' <= s->buf[i] && s->buf[i] < 127)
3902 putc(s->buf[i], stderr);
3903 else
3904 fprintf(stderr, "\\%03o", s->buf[i]);
3905 fprintf(stderr, "\"\n");
3906 }
3907#endif
3908 /* since the block is at max BLOCK (8K) - 2 size we can
3909 * store it in a two byte integer */
3910 blksize = (uint16_t) s->nr;
3911 s->bytes += s->nr;
3912 /* the last bit tells whether a flush is in
3913 * there, it's not at this moment, so shift it
3914 * to the left */
3915 blksize <<= 1;
3916 if (!mnstr_writeSht(s->s, (int16_t) blksize) ||
3917 s->s->write(s->s, s->buf, 1, s->nr) != (ssize_t) s->nr) {
3918 ss->errnr = MNSTR_WRITE_ERROR;
3919 s->nr = 0; /* data is lost due to error */
3920 return -1;
3921 }
3922 s->blks++;
3923 s->nr = 0;
3924 }
3925 }
3926 return (ssize_t) cnt;
3927}
3928
3929/* If the internal buffer is partially filled, write it to the
3930 * underlying stream. Then in any case write an empty buffer to the
3931 * underlying stream to indicate to the receiver that the data was
3932 * flushed.
3933 */
3934static int
3935bs_flush(stream *ss)
3936{
3937 uint16_t blksize;
3938 bs *s;
3939
3940 s = (bs *) ss->stream_data.p;
3941 if (s == NULL)
3942 return -1;
3943 assert(!ss->readonly);
3944 assert(s->nr < sizeof(s->buf));
3945 if (!ss->readonly) {
3946 /* flush the rest of buffer (if s->nr > 0), then set the
3947 * last bit to 1 to to indicate user-instigated flush */
3948#ifdef BSTREAM_DEBUG
3949 if (s->nr > 0) {
3950 unsigned i;
3951
3952 fprintf(stderr, "W %s %u \"", ss->name, s->nr);
3953 for (i = 0; i < s->nr; i++)
3954 if (' ' <= s->buf[i] && s->buf[i] < 127)
3955 putc(s->buf[i], stderr);
3956 else
3957 fprintf(stderr, "\\%03o", s->buf[i]);
3958 fprintf(stderr, "\"\n");
3959 fprintf(stderr, "W %s 0\n", ss->name);
3960 }
3961#endif
3962 blksize = (uint16_t) (s->nr << 1);
3963 s->bytes += s->nr;
3964 /* indicate that this is the last buffer of a block by
3965 * setting the low-order bit */
3966 blksize |= 1;
3967 /* allways flush (even empty blocks) needed for the protocol) */
3968 if ((!mnstr_writeSht(s->s, (int16_t) blksize) ||
3969 (s->nr > 0 &&
3970 s->s->write(s->s, s->buf, 1, s->nr) != (ssize_t) s->nr))) {
3971 ss->errnr = MNSTR_WRITE_ERROR;
3972 s->nr = 0; /* data is lost due to error */
3973 return -1;
3974 }
3975 s->blks++;
3976 s->nr = 0;
3977 }
3978 return 0;
3979}
3980
3981/* Read buffered data and return the number of items read. At the
3982 * flush boundary we will return 0 to indicate the end of a block.
3983 *
3984 * Structure field usage:
3985 * s - the underlying stream;
3986 * buf - not used;
3987 * itotal - the amount of data in the current block that hasn't yet
3988 * been read;
3989 * nr - indicates whether the flush marker has to be returned.
3990 */
3991static ssize_t
3992bs_read(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt)
3993{
3994 bs *s;
3995 size_t todo = cnt * elmsize;
3996 size_t n;
3997
3998 s = (bs *) ss->stream_data.p;
3999 if (s == NULL)
4000 return -1;
4001 assert(ss->readonly);
4002 assert(s->nr <= 1);
4003
4004 if (s->itotal == 0) {
4005 int16_t blksize = 0;
4006
4007 if (s->nr) {
4008 /* We read the closing block but hadn't
4009 * returned that yet. Return it now, and note
4010 * that we did by setting s->nr to 0. */
4011 assert(s->nr == 1);
4012 s->nr = 0;
4013 return 0;
4014 }
4015
4016 assert(s->nr == 0);
4017
4018 /* There is nothing more to read in the current block,
4019 * so read the count for the next block */
4020 switch (mnstr_readSht(s->s, &blksize)) {
4021 case -1:
4022 ss->errnr = s->s->errnr;
4023 return -1;
4024 case 0:
4025 return 0;
4026 case 1:
4027 break;
4028 }
4029 if ((uint16_t) blksize > (BLOCK << 1 | 1)) {
4030 ss->errnr = MNSTR_READ_ERROR;
4031 return -1;
4032 }
4033#ifdef BSTREAM_DEBUG
4034 fprintf(stderr, "RC size: %u, final: %s\n", (uint16_t) blksize >> 1, (uint16_t) blksize & 1 ? "true" : "false");
4035 fprintf(stderr, "RC %s %u\n", ss->name, (uint16_t) blksize);
4036#endif
4037 s->itotal = (uint16_t) blksize >> 1; /* amount readable */
4038 /* store whether this was the last block or not */
4039 s->nr = (uint16_t) blksize & 1;
4040 s->bytes += s->itotal;
4041 s->blks++;
4042 }
4043
4044 /* Fill the caller's buffer. */
4045 cnt = 0; /* count how much we put into the buffer */
4046 while (todo > 0) {
4047 /* there is more data waiting in the current block, so
4048 * read it */
4049 n = todo < s->itotal ? todo : s->itotal;
4050 while (n > 0) {
4051 ssize_t m = s->s->read(s->s, buf, 1, n);
4052
4053 if (m <= 0) {
4054 ss->errnr = s->s->errnr;
4055 return -1;
4056 }
4057#ifdef BSTREAM_DEBUG
4058 {
4059 ssize_t i;
4060
4061 fprintf(stderr, "RD %s %zd \"", ss->name, m);
4062 for (i = 0; i < m; i++)
4063 if (' ' <= ((char *) buf)[i] &&
4064 ((char *) buf)[i] < 127)
4065 putc(((char *) buf)[i], stderr);
4066 else
4067 fprintf(stderr, "\\%03o", ((char *) buf)[i]);
4068 fprintf(stderr, "\"\n");
4069 }
4070#endif
4071 buf = (void *) ((char *) buf + m);
4072 cnt += (size_t) m;
4073 n -= (size_t) m;
4074 s->itotal -= (unsigned) m;
4075 todo -= (size_t) m;
4076 }
4077
4078 if (s->itotal == 0) {
4079 int16_t blksize = 0;
4080
4081 /* The current block has been completely read,
4082 * so read the count for the next block, only
4083 * if the previous was not the last one */
4084 if (s->nr)
4085 break;
4086 switch (mnstr_readSht(s->s, &blksize)) {
4087 case -1:
4088 ss->errnr = s->s->errnr;
4089 return -1;
4090 case 0:
4091 return 0;
4092 case 1:
4093 break;
4094 }
4095 if ((uint16_t) blksize > (BLOCK << 1 | 1)) {
4096 ss->errnr = MNSTR_READ_ERROR;
4097 return -1;
4098 }
4099#ifdef BSTREAM_DEBUG
4100 fprintf(stderr, "RC size: %d, final: %s\n", (uint16_t) blksize >> 1, (uint16_t) blksize & 1 ? "true" : "false");
4101 fprintf(stderr, "RC %s %d\n", ss->name, s->nr);
4102 fprintf(stderr, "RC %s %d\n", ss->name, blksize);
4103#endif
4104 s->itotal = (uint16_t) blksize >> 1; /* amount readable */
4105 /* store whether this was the last block or not */
4106 s->nr = (uint16_t) blksize & 1;
4107 s->bytes += s->itotal;
4108 s->blks++;
4109 }
4110 }
4111 /* if we got an empty block with the end-of-sequence marker
4112 * set (low-order bit) we must only return an empty read once,
4113 * so we must squash the flag that we still have to return an
4114 * empty read */
4115 if (todo > 0 && cnt == 0)
4116 s->nr = 0;
4117 return (ssize_t) (elmsize > 0 ? cnt / elmsize : 0);
4118}
4119
4120static void
4121bs_update_timeout(stream *ss)
4122{
4123 bs *s;
4124
4125 if ((s = ss->stream_data.p) != NULL && s->s) {
4126 s->s->timeout = ss->timeout;
4127 s->s->timeout_func = ss->timeout_func;
4128 if (s->s->update_timeout)
4129 s->s->update_timeout(s->s);
4130 }
4131}
4132
4133static int
4134bs_isalive(stream *ss)
4135{
4136 struct bs *s;
4137
4138 if ((s = ss->stream_data.p) != NULL && s->s) {
4139 if (s->s->isalive)
4140 return s->s->isalive(s->s);
4141 return 1;
4142 }
4143 return 0;
4144}
4145
4146static void
4147bs_close(stream *ss)
4148{
4149 bs *s;
4150
4151 s = (bs *) ss->stream_data.p;
4152 assert(s);
4153 if (s == NULL)
4154 return;
4155 if (!ss->readonly && s->nr > 0)
4156 bs_flush(ss);
4157 if (s->s)
4158 s->s->close(s->s);
4159}
4160
4161static void
4162bs_destroy(stream *ss)
4163{
4164 bs *s;
4165
4166 s = (bs *) ss->stream_data.p;
4167 assert(s);
4168 if (s) {
4169 if (s->s)
4170 s->s->destroy(s->s);
4171 free(s);
4172 }
4173 destroy(ss);
4174}
4175
4176static void
4177bs_clrerr(stream *s)
4178{
4179 if (s->stream_data.p)
4180 mnstr_clearerr(((bs *) s->stream_data.p)->s);
4181}
4182
4183stream *
4184bs_stream(stream *s)
4185{
4186 assert(isa_block_stream(s));
4187 return ((bs *) s->stream_data.p)->s;
4188}
4189
4190stream *
4191block_stream(stream *s)
4192{
4193 stream *ns;
4194 bs *b;
4195
4196 if (s == NULL)
4197 return NULL;
4198#ifdef STREAM_DEBUG
4199 fprintf(stderr, "block_stream %s\n", s->name ? s->name : "<unnamed>");
4200#endif
4201 if ((ns = create_stream(s->name)) == NULL)
4202 return NULL;
4203 if ((b = bs_create(s)) == NULL) {
4204 destroy(ns);
4205 return NULL;
4206 }
4207 /* blocksizes have a fixed little endian byteorder */
4208#ifdef WORDS_BIGENDIAN
4209 s->swapbytes = true;
4210#endif
4211 ns->binary = s->binary;
4212 ns->readonly = s->readonly;
4213 ns->close = bs_close;
4214 ns->clrerr = bs_clrerr;
4215 ns->destroy = bs_destroy;
4216 ns->flush = bs_flush;
4217 ns->read = bs_read;
4218 ns->write = bs_write;
4219 ns->update_timeout = bs_update_timeout;
4220 ns->isalive = bs_isalive;
4221 ns->stream_data.p = (void *) b;
4222
4223 return ns;
4224}
4225
4226typedef struct bs2 {
4227 stream *s; /* underlying stream */
4228 size_t nr; /* how far we got in buf */
4229 size_t itotal; /* amount available in current read block */
4230 size_t bufsiz;
4231 size_t readpos;
4232 compression_method comp;
4233 char *compbuf;
4234 size_t compbufsiz;
4235 char *buf;
4236} bs2;
4237
4238
4239static ssize_t
4240compress_stream_data(bs2 *s)
4241{
4242 assert(s->comp != COMPRESSION_NONE);
4243 if (s->comp == COMPRESSION_SNAPPY) {
4244#ifdef HAVE_LIBSNAPPY
4245 size_t compressed_length = s->compbufsiz;
4246 snappy_status ret;
4247 if ((ret = snappy_compress(s->buf, s->nr, s->compbuf, &compressed_length)) != SNAPPY_OK) {
4248 s->s->errnr = (int) ret;
4249 return -1;
4250 }
4251 return compressed_length;
4252#else
4253 assert(0);
4254 return -1;
4255#endif
4256 } else if (s->comp == COMPRESSION_LZ4) {
4257#ifdef HAVE_LIBLZ4
4258 int compressed_length = (int) s->compbufsiz;
4259 if ((compressed_length = LZ4_compress_fast(s->buf, s->compbuf, s->nr, compressed_length, 1)) == 0) {
4260 s->s->errnr = -1;
4261 return -1;
4262 }
4263 return compressed_length;
4264#else
4265 assert(0);
4266 return -1;
4267#endif
4268 }
4269 return -1;
4270}
4271
4272
4273static ssize_t
4274decompress_stream_data(bs2 *s)
4275{
4276 assert(s->comp != COMPRESSION_NONE);
4277 if (s->comp == COMPRESSION_SNAPPY) {
4278#ifdef HAVE_LIBSNAPPY
4279 snappy_status ret;
4280 size_t uncompressed_length = s->bufsiz;
4281 if ((ret = snappy_uncompress(s->compbuf, s->itotal, s->buf, &uncompressed_length)) != SNAPPY_OK) {
4282 s->s->errnr = (int) ret;
4283 return -1;
4284 }
4285 return (ssize_t) uncompressed_length;
4286#else
4287 assert(0);
4288 return -1;
4289#endif
4290 } else if (s->comp == COMPRESSION_LZ4) {
4291#ifdef HAVE_LIBLZ4
4292 int uncompressed_length = (int) s->bufsiz;
4293 if ((uncompressed_length = LZ4_decompress_safe(s->compbuf, s->buf, s->itotal, uncompressed_length)) <= 0) {
4294 s->s->errnr = uncompressed_length;
4295 return -1;
4296 }
4297 return uncompressed_length;
4298#else
4299 assert(0);
4300 return -1;
4301#endif
4302 }
4303 return -1;
4304}
4305
4306static ssize_t
4307compression_size_bound(bs2 *s)
4308{
4309 if (s->comp == COMPRESSION_NONE) {
4310 return 0;
4311 } else if (s->comp == COMPRESSION_SNAPPY) {
4312#ifndef HAVE_LIBSNAPPY
4313 return -1;
4314#else
4315 return snappy_max_compressed_length(s->bufsiz);
4316#endif
4317 } else if (s->comp == COMPRESSION_LZ4) {
4318#ifndef HAVE_LIBLZ4
4319 return -1;
4320#else
4321 return LZ4_compressBound(s->bufsiz);
4322#endif
4323 }
4324 return -1;
4325}
4326
4327static bs2 *
4328bs2_create(stream *s, size_t bufsiz, compression_method comp)
4329{
4330 /* should be a binary stream */
4331 bs2 *ns;
4332 ssize_t compress_bound = 0;
4333
4334 if ((ns = malloc(sizeof(*ns))) == NULL)
4335 return NULL;
4336 *ns = (bs2) {
4337 .buf = malloc(bufsiz),
4338 .s = s,
4339 .bufsiz = bufsiz,
4340 .comp = comp,
4341 };
4342 if (ns->buf == NULL) {
4343 free(ns);
4344 return NULL;
4345 }
4346
4347 compress_bound = compression_size_bound(ns);
4348 if (compress_bound > 0) {
4349 ns->compbufsiz = (size_t) compress_bound;
4350 ns->compbuf = malloc(ns->compbufsiz);
4351 if (!ns->compbuf) {
4352 free(ns->buf);
4353 free(ns);
4354 return NULL;
4355 }
4356 } else if (compress_bound < 0) {
4357 free(ns->buf);
4358 free(ns);
4359 return NULL;
4360 }
4361 return ns;
4362}
4363
4364/* Collect data until the internal buffer is filled, then write the
4365 * filled buffer to the underlying stream.
4366 * Struct field usage:
4367 * s - the underlying stream;
4368 * buf - the buffer in which data is collected;
4369 * nr - how much of buf is already filled (if nr == sizeof(buf) the
4370 * data is written to the underlying stream, so upon entry nr <
4371 * sizeof(buf));
4372 * itotal - unused.
4373 */
4374static ssize_t
4375bs2_write(stream *restrict ss, const void *restrict buf, size_t elmsize, size_t cnt)
4376{
4377 bs2 *s;
4378 size_t todo = cnt * elmsize;
4379 int64_t blksize;
4380 char *writebuf;
4381 size_t writelen;
4382
4383 s = (bs2 *) ss->stream_data.p;
4384 if (s == NULL)
4385 return -1;
4386 assert(!ss->readonly);
4387 assert(s->nr < s->bufsiz);
4388 while (todo > 0) {
4389 size_t n = s->bufsiz - s->nr;
4390
4391 if (todo < n)
4392 n = todo;
4393 memcpy(s->buf + s->nr, buf, n);
4394 s->nr += n;
4395 todo -= n;
4396 buf = ((const char *) buf + n);
4397 /* block is full, write it to the stream */
4398 if (s->nr == s->bufsiz) {
4399
4400#ifdef BSTREAM_DEBUG
4401 {
4402 size_t i;
4403
4404 fprintf(stderr, "W %s %lu \"", ss->name, s->nr);
4405 for (i = 0; i < s->nr; i++)
4406 if (' ' <= s->buf[i] && s->buf[i] < 127)
4407 putc(s->buf[i], stderr);
4408 else
4409 fprintf(stderr, "\\%03o", s->buf[i]);
4410 fprintf(stderr, "\"\n");
4411 }
4412#endif
4413
4414 writelen = s->nr;
4415 blksize = (int64_t) s->nr;
4416 writebuf = s->buf;
4417
4418 if (s->comp != COMPRESSION_NONE) {
4419 ssize_t compressed_length = compress_stream_data(s);
4420 if (compressed_length < 0) {
4421 return -1;
4422 }
4423 writebuf = s->compbuf;
4424 blksize = (int64_t) compressed_length;
4425 writelen = (size_t) compressed_length;
4426 }
4427
4428
4429 /* the last bit tells whether a flush is in there, it's not
4430 * at this moment, so shift it to the left */
4431 blksize <<= 1;
4432 if (!mnstr_writeLng(s->s, blksize) ||
4433 s->s->write(s->s, writebuf, 1, writelen) != (ssize_t) writelen) {
4434 ss->errnr = MNSTR_WRITE_ERROR;
4435 return -1;
4436 }
4437 s->nr = 0;
4438 }
4439 }
4440 return (ssize_t) cnt;
4441}
4442
4443/* If the internal buffer is partially filled, write it to the
4444 * underlying stream. Then in any case write an empty buffer to the
4445 * underlying stream to indicate to the receiver that the data was
4446 * flushed.
4447 */
4448static int
4449bs2_flush(stream *ss)
4450{
4451 int64_t blksize;
4452 bs2 *s;
4453 char *writebuf;
4454 size_t writelen;
4455
4456 s = (bs2 *) ss->stream_data.p;
4457 if (s == NULL)
4458 return -1;
4459 assert(!ss->readonly);
4460 assert(s->nr < s->bufsiz);
4461 if (!ss->readonly) {
4462 /* flush the rest of buffer (if s->nr > 0), then set the
4463 * last bit to 1 to to indicate user-instigated flush */
4464#ifdef BSTREAM_DEBUG
4465 if (s->nr > 0) {
4466 size_t i;
4467
4468 fprintf(stderr, "W %s %lu \"", ss->name, s->nr);
4469 for (i = 0; i < s->nr; i++)
4470 if (' ' <= s->buf[i] && s->buf[i] < 127)
4471 putc(s->buf[i], stderr);
4472 else
4473 fprintf(stderr, "\\%03o", s->buf[i]);
4474 fprintf(stderr, "\"\n");
4475 fprintf(stderr, "W %s 0\n", ss->name);
4476 }
4477#endif
4478
4479 writelen = s->nr;
4480 blksize = (int64_t) s->nr;
4481 writebuf = s->buf;
4482
4483 if (s->nr > 0 && s->comp != COMPRESSION_NONE) {
4484 ssize_t compressed_length = compress_stream_data(s);
4485 if (compressed_length < 0) {
4486 return -1;
4487 }
4488 writebuf = s->compbuf;
4489 blksize = (int64_t) compressed_length;
4490 writelen = (size_t) compressed_length;
4491 }
4492
4493 /* indicate that this is the last buffer of a block by
4494 * setting the low-order bit */
4495 blksize <<= 1;
4496 blksize |= 1;
4497 /* always flush (even empty blocks) needed for the protocol) */
4498
4499 if ((!mnstr_writeLng(s->s, blksize) ||
4500 (s->nr > 0 &&
4501 s->s->write(s->s, writebuf, 1, writelen) != (ssize_t) writelen))) {
4502 ss->errnr = MNSTR_WRITE_ERROR;
4503 return -1;
4504 }
4505 s->nr = 0;
4506 }
4507 return 0;
4508}
4509
4510/* Read buffered data and return the number of items read. At the
4511 * flush boundary we will return 0 to indicate the end of a block.
4512 *
4513 * Structure field usage:
4514 * s - the underlying stream;
4515 * buf - not used;
4516 * itotal - the amount of data in the current block that hasn't yet
4517 * been read;
4518 * nr - indicates whether the flush marker has to be returned.
4519 */
4520static ssize_t
4521bs2_read(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt)
4522{
4523 bs2 *s;
4524 size_t todo = cnt * elmsize;
4525 size_t n;
4526
4527 s = (bs2 *) ss->stream_data.p;
4528 if (s == NULL)
4529 return -1;
4530 assert(ss->readonly);
4531 assert(s->nr <= 1);
4532
4533 if (s->itotal == 0) {
4534 int64_t blksize = 0;
4535
4536 if (s->nr) {
4537 /* We read the closing block but hadn't
4538 * returned that yet. Return it now, and note
4539 * that we did by setting s->nr to 0. */
4540 assert(s->nr == 1);
4541 s->nr = 0;
4542 return 0;
4543 }
4544
4545 assert(s->nr == 0);
4546
4547 /* There is nothing more to read in the current block,
4548 * so read the count for the next block */
4549 switch (mnstr_readLng(s->s, &blksize)) {
4550 case -1:
4551 ss->errnr = s->s->errnr;
4552 return -1;
4553 case 0:
4554 return 0;
4555 case 1:
4556 break;
4557 }
4558 if (blksize < 0) {
4559 ss->errnr = MNSTR_READ_ERROR;
4560 return -1;
4561 }
4562#ifdef BSTREAM_DEBUG
4563 fprintf(stderr, "R1 '%s' length: %lld, final: %s\n", ss->name, blksize >> 1, blksize & 1 ? "true" : "false");
4564#endif
4565 s->itotal = (size_t) (blksize >> 1); /* amount readable */
4566 /* store whether this was the last block or not */
4567 s->nr = blksize & 1;
4568
4569 if (s->itotal > 0) {
4570 /* read everything into the comp buf */
4571 ssize_t uncompressed_length = (ssize_t) s->bufsiz;
4572 size_t m = 0;
4573 char *buf = s->buf;
4574
4575 if (s->comp != COMPRESSION_NONE) {
4576 buf = s->compbuf;
4577 }
4578
4579 while (m < s->itotal) {
4580 ssize_t bytes_read = 0;
4581 bytes_read = s->s->read(s->s, buf + m, 1, s->itotal - m);
4582 if (bytes_read <= 0) {
4583 ss->errnr = s->s->errnr;
4584 return -1;
4585 }
4586 m += (size_t) bytes_read;
4587 }
4588 if (s->comp != COMPRESSION_NONE) {
4589 uncompressed_length = decompress_stream_data(s);
4590 if (uncompressed_length < 0) {
4591 ss->errnr = (int) uncompressed_length;
4592 return -1;
4593 }
4594 } else {
4595 uncompressed_length = (ssize_t) m;
4596 }
4597 s->itotal = (size_t) uncompressed_length;
4598 s->readpos = 0;
4599 }
4600 }
4601
4602 /* Fill the caller's buffer. */
4603 cnt = 0; /* count how much we put into the buffer */
4604 while (todo > 0) {
4605 /* there is more data waiting in the current block, so
4606 * read it */
4607 n = todo < s->itotal ? todo : s->itotal;
4608
4609 memcpy(buf, s->buf + s->readpos, n);
4610 buf = (void *) ((char *) buf + n);
4611 cnt += n;
4612 todo -= n;
4613 s->readpos += n;
4614 s->itotal -= n;
4615
4616 if (s->itotal == 0) {
4617 int64_t blksize = 0;
4618
4619 /* The current block has been completely read,
4620 * so read the count for the next block, only
4621 * if the previous was not the last one */
4622 if (s->nr)
4623 break;
4624 switch (mnstr_readLng(s->s, &blksize)) {
4625 case -1:
4626 ss->errnr = s->s->errnr;
4627 return -1;
4628 case 0:
4629 return 0;
4630 case 1:
4631 break;
4632 }
4633 if (blksize < 0) {
4634 ss->errnr = MNSTR_READ_ERROR;
4635 return -1;
4636 }
4637#ifdef BSTREAM_DEBUG
4638 fprintf(stderr, "R3 '%s' length: %lld, final: %s\n", ss->name, blksize >> 1, blksize & 1 ? "true" : "false");
4639#endif
4640
4641
4642 s->itotal = (size_t) (blksize >> 1); /* amount readable */
4643 /* store whether this was the last block or not */
4644 s->nr = blksize & 1;
4645
4646 if (s->itotal > 0) {
4647 /* read everything into the comp buf */
4648 ssize_t uncompressed_length = (ssize_t) s->bufsiz;
4649 size_t m = 0;
4650 char *buf = s->buf;
4651
4652 if (s->comp != COMPRESSION_NONE) {
4653 buf = s->compbuf;
4654 }
4655
4656 while (m < s->itotal) {
4657 ssize_t bytes_read = 0;
4658 bytes_read = s->s->read(s->s, buf + m, 1, s->itotal - m);
4659 if (bytes_read <= 0) {
4660 ss->errnr = s->s->errnr;
4661 return -1;
4662 }
4663 m += (size_t) bytes_read;
4664 }
4665 if (s->comp != COMPRESSION_NONE) {
4666 uncompressed_length = decompress_stream_data(s);
4667 if (uncompressed_length < 0) {
4668 ss->errnr = (int) uncompressed_length;
4669 return -1;
4670 }
4671 } else {
4672 uncompressed_length = (ssize_t) m;
4673 }
4674 s->itotal = (size_t) uncompressed_length;
4675 s->readpos = 0;
4676 }
4677 }
4678 }
4679 /* if we got an empty block with the end-of-sequence marker
4680 * set (low-order bit) we must only return an empty read once,
4681 * so we must squash the flag that we still have to return an
4682 * empty read */
4683 if (todo > 0 && cnt == 0)
4684 s->nr = 0;
4685 return (ssize_t) (elmsize > 0 ? cnt / elmsize : 0);
4686}
4687
4688
4689
4690static void
4691bs2_resetbuf(stream *ss)
4692{
4693 bs2 *s = (bs2 *) ss->stream_data.p;
4694 assert(ss->read == bs2_read);
4695 s->itotal = 0;
4696 s->nr = 0;
4697 s->readpos = 0;
4698}
4699
4700int
4701bs2_resizebuf(stream *ss, size_t bufsiz)
4702{
4703 ssize_t compress_bound;
4704 bs2 *s = (bs2 *) ss->stream_data.p;
4705 assert(ss->read == bs2_read);
4706
4707 if (s->buf)
4708 free(s->buf);
4709 if (s->compbuf)
4710 free(s->compbuf);
4711
4712 s->bufsiz = 0;
4713 s->buf = NULL;
4714 s->compbuf = NULL;
4715
4716 if ((s->buf = malloc(bufsiz)) == NULL) {
4717 return -1;
4718 }
4719 s->bufsiz = bufsiz;
4720 compress_bound = compression_size_bound(s);
4721 if (compress_bound > 0) {
4722 s->compbufsiz = (size_t) compress_bound;
4723 s->compbuf = malloc(s->compbufsiz);
4724 if (!s->compbuf) {
4725 free(s->buf);
4726 s->buf = NULL;
4727 return -1;
4728 }
4729 }
4730 bs2_resetbuf(ss);
4731 return 0;
4732}
4733
4734buffer
4735bs2_buffer(stream *ss)
4736{
4737 bs2 *s = (bs2 *) ss->stream_data.p;
4738 buffer b;
4739 assert(ss->read == bs2_read);
4740 b.buf = s->buf;
4741 b.pos = s->nr;
4742 b.len = s->itotal;
4743 return b;
4744}
4745
4746void
4747bs2_setpos(stream *ss, size_t pos)
4748{
4749 bs2 *s = (bs2 *) ss->stream_data.p;
4750 assert(pos < s->bufsiz);
4751 s->nr = pos;
4752}
4753
4754bool
4755isa_block_stream(stream *s)
4756{
4757 assert(s != NULL);
4758 return s &&
4759 ((s->read == bs_read ||
4760 s->write == bs_write) ||
4761 (s->read == bs2_read ||
4762 s->write == bs2_write));
4763}
4764
4765static void
4766bs2_close(stream *ss)
4767{
4768 bs2 *s;
4769
4770 s = (bs2 *) ss->stream_data.p;
4771 assert(s);
4772 if (s == NULL)
4773 return;
4774 if (!ss->readonly && s->nr > 0)
4775 bs2_flush(ss);
4776 assert(s->s);
4777 if (s->s)
4778 s->s->close(s->s);
4779}
4780
4781static void
4782bs2_destroy(stream *ss)
4783{
4784 bs2 *s;
4785
4786 s = (bs2 *) ss->stream_data.p;
4787 assert(s);
4788 if (s) {
4789 assert(s->s);
4790 if (s->s)
4791 s->s->destroy(s->s);
4792 if (s->buf)
4793 free(s->buf);
4794 if (s->compbuf)
4795 free(s->compbuf);
4796 free(s);
4797 }
4798 destroy(ss);
4799}
4800
4801static void
4802bs2_update_timeout(stream *ss)
4803{
4804 bs2 *s;
4805
4806 if ((s = ss->stream_data.p) != NULL && s->s) {
4807 s->s->timeout = ss->timeout;
4808 s->s->timeout_func = ss->timeout_func;
4809 if (s->s->update_timeout)
4810 s->s->update_timeout(s->s);
4811 }
4812}
4813
4814static int
4815bs2_isalive(stream *ss)
4816{
4817 struct bs2 *s;
4818
4819 if ((s = ss->stream_data.p) != NULL && s->s) {
4820 if (s->s->isalive)
4821 return s->s->isalive(s->s);
4822 return 1;
4823 }
4824 return 0;
4825}
4826
4827stream *
4828block_stream2(stream *s, size_t bufsiz, compression_method comp)
4829{
4830 stream *ns;
4831 stream *os = NULL;
4832 bs2 *b;
4833
4834 if (s == NULL)
4835 return NULL;
4836 if (s->read == bs_read || s->write == bs_write) {
4837 /* if passed in a block_stream instance, extract the
4838 * underlying stream */
4839 os = s;
4840 s = ((bs *) s->stream_data.p)->s;
4841 }
4842
4843#ifdef STREAM_DEBUG
4844 fprintf(stderr, "block_stream2 %s\n", s->name ? s->name : "<unnamed>");
4845#endif
4846 if ((ns = create_stream(s->name)) == NULL)
4847 return NULL;
4848 if ((b = bs2_create(s, bufsiz, comp)) == NULL) {
4849 destroy(ns);
4850 return NULL;
4851 }
4852 /* blocksizes have a fixed little endian byteorder */
4853#ifdef WORDS_BIGENDIAN
4854 s->swapbytes = true;
4855#endif
4856 ns->binary = s->binary;
4857 ns->readonly = s->readonly;
4858 ns->close = bs2_close;
4859 ns->clrerr = bs_clrerr;
4860 ns->destroy = bs2_destroy;
4861 ns->flush = bs2_flush;
4862 ns->read = bs2_read;
4863 ns->write = bs2_write;
4864 ns->update_timeout = bs2_update_timeout;
4865 ns->isalive = bs2_isalive;
4866 ns->stream_data.p = (void *) b;
4867
4868 if (os != NULL) {
4869 /* we extracted the underlying stream, destroy the old
4870 * shell */
4871 ((bs *) os->stream_data.p)->s = NULL;
4872 bs_destroy(os);
4873 }
4874
4875 return ns;
4876}
4877
4878
4879ssize_t
4880mnstr_read_block(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
4881{
4882 ssize_t len = 0;
4883 char x = 0;
4884
4885 if (s == NULL || buf == NULL)
4886 return -1;
4887 assert(s->read == bs_read || s->write == bs_write);
4888 if ((len = mnstr_read(s, buf, elmsize, cnt)) < 0 ||
4889 mnstr_read(s, &x, 0, 0) < 0 /* read prompt */ ||
4890 x > 0)
4891 return -1;
4892 return len;
4893}
4894
4895
4896int
4897mnstr_readChr(stream *restrict s, char *restrict val)
4898{
4899 if (s == NULL || val == NULL)
4900 return -1;
4901 return (int) s->read(s, (void *) val, sizeof(*val), 1);
4902}
4903
4904int
4905mnstr_writeChr(stream *s, char val)
4906{
4907 if (s == NULL || s->errnr)
4908 return 0;
4909 return s->write(s, (void *) &val, sizeof(val), 1) == 1;
4910}
4911
4912int
4913mnstr_readBte(stream *restrict s, int8_t *restrict val)
4914{
4915 if (s == NULL || val == NULL)
4916 return -1;
4917 return (int) s->read(s, (void *) val, sizeof(*val), 1);
4918}
4919
4920int
4921mnstr_writeBte(stream *s, int8_t val)
4922{
4923 if (s == NULL || s->errnr)
4924 return 0;
4925 return s->write(s, (void *) &val, sizeof(val), 1) == 1;
4926}
4927
4928int
4929mnstr_readSht(stream *restrict s, int16_t *restrict val)
4930{
4931 if (s == NULL || val == NULL)
4932 return 0;
4933 assert(s->binary);
4934 switch (s->read(s, val, sizeof(*val), 1)) {
4935 case 1:
4936 if (s->swapbytes)
4937 *val = short_int_SWAP(*val);
4938 return 1;
4939 case 0:
4940 return 0;
4941 default: /* -1 */
4942 return -1;
4943 }
4944}
4945
4946int
4947mnstr_writeSht(stream *s, int16_t val)
4948{
4949 if (s == NULL || s->errnr)
4950 return 0;
4951 assert(s->binary);
4952 if (s->swapbytes)
4953 val = short_int_SWAP(val);
4954 return s->write(s, &val, sizeof(val), 1) == 1;
4955}
4956
4957int
4958mnstr_readInt(stream *restrict s, int *restrict val)
4959{
4960 if (s == NULL || val == NULL)
4961 return 0;
4962 assert(s->binary);
4963 switch (s->read(s, val, sizeof(*val), 1)) {
4964 case 1:
4965 if (s->swapbytes)
4966 *val = normal_int_SWAP(*val);
4967 return 1;
4968 case 0:
4969 return 0;
4970 default: /* -1 */
4971 return -1;
4972 }
4973}
4974
4975int
4976mnstr_writeInt(stream *s, int val)
4977{
4978 if (s == NULL || s->errnr)
4979 return 0;
4980 assert(s->binary);
4981 if (s->swapbytes)
4982 val = normal_int_SWAP(val);
4983 return s->write(s, &val, sizeof(val), 1) == 1;
4984}
4985
4986int
4987mnstr_writeStr(stream *restrict s, const char *restrict val)
4988{
4989 if (s == NULL || s->errnr)
4990 return 0;
4991 return s->write(s, (void *) val, strlen(val), (size_t) 1) == 1;
4992}
4993
4994int
4995mnstr_readStr(stream *restrict s, char *restrict val)
4996{
4997 if (s == NULL || s->errnr)
4998 return 0;
4999 do {
5000 if (mnstr_readChr(s, val) != 1) {
5001 return -1;
5002 }
5003 val++;
5004 } while (*(val - 1) != '\0');
5005 return 1;
5006}
5007
5008
5009int
5010mnstr_readLng(stream *restrict s, int64_t *restrict val)
5011{
5012 if (s == NULL || val == NULL)
5013 return 0;
5014 assert(s->binary);
5015 switch (s->read(s, val, sizeof(*val), 1)) {
5016 case 1:
5017 if (s->swapbytes)
5018 *val = long_int_SWAP(*val);
5019 return 1;
5020 case 0:
5021 return 0;
5022 default: /* -1 */
5023 return -1;
5024 }
5025}
5026
5027int
5028mnstr_writeLng(stream *s, int64_t val)
5029{
5030 if (s == NULL || s->errnr)
5031 return 0;
5032 assert(s->binary);
5033 if (s->swapbytes)
5034 val = long_int_SWAP(val);
5035 return s->write(s, &val, sizeof(val), 1) == 1;
5036}
5037
5038int
5039mnstr_writeFlt(stream *s, float val)
5040{
5041 if (s == NULL || s->errnr)
5042 return 0;
5043 assert(s->binary);
5044 return s->write(s, &val, sizeof(val), 1) == 1;
5045}
5046
5047int
5048mnstr_writeDbl(stream *s, double val)
5049{
5050 if (s == NULL || s->errnr)
5051 return 0;
5052 assert(s->binary);
5053 return s->write(s, &val, sizeof(val), 1) == 1;
5054}
5055
5056
5057#ifdef HAVE_HGE
5058int
5059mnstr_readHge(stream *restrict s, hge *restrict val)
5060{
5061 if (s == NULL || val == NULL)
5062 return 0;
5063 assert(s->binary);
5064 switch (s->read(s, val, sizeof(*val), 1)) {
5065 case 1:
5066 if (s->swapbytes)
5067 *val = huge_int_SWAP(*val);
5068 return 1;
5069 case 0:
5070 return 0;
5071 default: /* -1 */
5072 return -1;
5073 }
5074}
5075
5076int
5077mnstr_writeHge(stream *s, hge val)
5078{
5079 if (s == NULL || s->errnr)
5080 return 0;
5081 assert(s->binary);
5082 if (s->swapbytes)
5083 val = huge_int_SWAP(val);
5084 return s->write(s, &val, sizeof(val), 1) == 1;
5085}
5086#endif
5087
5088int
5089mnstr_readBteArray(stream *restrict s, int8_t *restrict val, size_t cnt)
5090{
5091 if (s == NULL || val == NULL)
5092 return 0;
5093
5094 if (s->read(s, (void *) val, sizeof(*val), cnt) < (ssize_t) cnt) {
5095 if (s->errnr == MNSTR_NO__ERROR)
5096 s->errnr = MNSTR_READ_ERROR;
5097 return 0;
5098 }
5099
5100 return 1;
5101}
5102
5103int
5104mnstr_writeBteArray(stream *restrict s, const int8_t *restrict val, size_t cnt)
5105{
5106 if (s == NULL || s->errnr || val == NULL)
5107 return 0;
5108 return s->write(s, val, sizeof(*val), cnt) == (ssize_t) cnt;
5109}
5110
5111int
5112mnstr_readShtArray(stream *restrict s, int16_t *restrict val, size_t cnt)
5113{
5114 if (s == NULL || val == NULL)
5115 return 0;
5116 assert(s->binary);
5117 if (s->read(s, val, sizeof(*val), cnt) < (ssize_t) cnt) {
5118 if (s->errnr == MNSTR_NO__ERROR)
5119 s->errnr = MNSTR_READ_ERROR;
5120 return 0;
5121 }
5122 if (s->swapbytes) {
5123 for (size_t i = 0; i < cnt; i++, val++)
5124 *val = short_int_SWAP(*val);
5125 }
5126 return 1;
5127}
5128
5129int
5130mnstr_writeShtArray(stream *restrict s, const int16_t *restrict val, size_t cnt)
5131{
5132 if (s == NULL || s->errnr || val == NULL)
5133 return 0;
5134 assert(s->binary);
5135 if (s->swapbytes) {
5136 for (size_t i = 0; i < cnt; i++)
5137 if (!mnstr_writeSht(s, val[i]))
5138 return 0;
5139 return 1;
5140 }
5141 return s->write(s, val, sizeof(*val), cnt) == (ssize_t) cnt;
5142}
5143
5144int
5145mnstr_readIntArray(stream *restrict s, int *restrict val, size_t cnt)
5146{
5147 if (s == NULL || val == NULL)
5148 return 0;
5149 assert(s->binary);
5150 if (s->read(s, val, sizeof(*val), cnt) < (ssize_t) cnt) {
5151 if (s->errnr == MNSTR_NO__ERROR)
5152 s->errnr = MNSTR_READ_ERROR;
5153 return 0;
5154 }
5155 if (s->swapbytes) {
5156 for (size_t i = 0; i < cnt; i++, val++)
5157 *val = normal_int_SWAP(*val);
5158 }
5159 return 1;
5160}
5161
5162int
5163mnstr_writeIntArray(stream *restrict s, const int *restrict val, size_t cnt)
5164{
5165 if (s == NULL || s->errnr || val == NULL)
5166 return 0;
5167 assert(s->binary);
5168 if (s->swapbytes) {
5169 for (size_t i = 0; i < cnt; i++)
5170 if (!mnstr_writeInt(s, val[i]))
5171 return 0;
5172 return 1;
5173 }
5174 return s->write(s, val, sizeof(*val), cnt) == (ssize_t) cnt;
5175}
5176
5177int
5178mnstr_readLngArray(stream *restrict s, int64_t *restrict val, size_t cnt)
5179{
5180 if (s == NULL || val == NULL)
5181 return 0;
5182 assert(s->binary);
5183 if (s->read(s, val, sizeof(*val), cnt) < (ssize_t) cnt) {
5184 if (s->errnr == MNSTR_NO__ERROR)
5185 s->errnr = MNSTR_READ_ERROR;
5186 return 0;
5187 }
5188 if (s->swapbytes) {
5189 for (size_t i = 0; i < cnt; i++, val++)
5190 *val = long_int_SWAP(*val);
5191 }
5192 return 1;
5193}
5194
5195int
5196mnstr_writeLngArray(stream *restrict s, const int64_t *restrict val, size_t cnt)
5197{
5198 if (s == NULL || s->errnr || val == NULL)
5199 return 0;
5200 assert(s->binary);
5201 if (s->swapbytes) {
5202 for (size_t i = 0; i < cnt; i++)
5203 if (!mnstr_writeLng(s, val[i]))
5204 return 0;
5205 return 1;
5206 }
5207 return s->write(s, val, sizeof(*val), cnt) == (ssize_t) cnt;
5208}
5209
5210#ifdef HAVE_HGE
5211int
5212mnstr_readHgeArray(stream *restrict s, hge *restrict val, size_t cnt)
5213{
5214 if (s == NULL || val == NULL)
5215 return 0;
5216 assert(s->binary);
5217 if (s->read(s, val, sizeof(*val), cnt) < (ssize_t) cnt) {
5218 if (s->errnr == MNSTR_NO__ERROR)
5219 s->errnr = MNSTR_READ_ERROR;
5220 return 0;
5221 }
5222 if (s->swapbytes) {
5223 for (size_t i = 0; i < cnt; i++, val++)
5224 *val = huge_int_SWAP(*val);
5225 }
5226 return 1;
5227}
5228
5229int
5230mnstr_writeHgeArray(stream *restrict s, const hge *restrict val, size_t cnt)
5231{
5232 if (s == NULL || s->errnr || val == NULL)
5233 return 0;
5234 assert(s->binary);
5235 if (s->swapbytes) {
5236 for (size_t i = 0; i < cnt; i++)
5237 if (!mnstr_writeHge(s, val[i]))
5238 return 0;
5239 return 1;
5240 }
5241 return s->write(s, val, sizeof(*val), cnt) == (ssize_t) cnt;
5242}
5243#endif
5244
5245int
5246mnstr_printf(stream *restrict s, const char *restrict format, ...)
5247{
5248 char buf[512], *bf = buf;
5249 int i = 0;
5250 size_t bfsz = sizeof(buf);
5251 va_list ap;
5252
5253 if (s == NULL || s->errnr)
5254 return -1;
5255
5256 va_start(ap, format);
5257 i = vsnprintf(bf, bfsz, format, ap);
5258 va_end(ap);
5259 while (i < 0 || (size_t) i >= bfsz) {
5260 if (i >= 0) /* glibc 2.1 */
5261 bfsz = (size_t) i + 1; /* precisely what is needed */
5262 else /* glibc 2.0 */
5263 bfsz *= 2; /* twice the old size */
5264 if (bf != buf)
5265 free(bf);
5266 bf = malloc(bfsz);
5267 if (bf == NULL) {
5268 s->errnr = MNSTR_WRITE_ERROR;
5269 return -1;
5270 }
5271 va_start(ap, format);
5272 i = vsnprintf(bf, bfsz, format, ap);
5273 va_end(ap);
5274 }
5275 s->write(s, (void *) bf, (size_t) i, (size_t) 1);
5276 if (bf != buf)
5277 free(bf);
5278 return s->errnr ? -1 : i;
5279}
5280
5281
5282/* ------------------------------------------------------------------ */
5283
5284bstream *
5285bstream_create(stream *s, size_t size)
5286{
5287 bstream *b;
5288
5289 if (s == NULL)
5290 return NULL;
5291 if ((b = malloc(sizeof(*b))) == NULL)
5292 return NULL;
5293 *b = (bstream) {
5294 .mode = size,
5295 .s = s,
5296 .eof = false,
5297 };
5298 if (size == 0)
5299 size = BUFSIZ;
5300 b->buf = malloc(size + 1 + 1);
5301 if (b->buf == NULL) {
5302 free(b);
5303 return NULL;
5304 }
5305 b->size = size;
5306 return b;
5307}
5308
5309ssize_t
5310bstream_read(bstream *s, size_t size)
5311{
5312 ssize_t rd, rd1 = 0;
5313
5314 if (s == NULL)
5315 return -1;
5316
5317 if (s->eof)
5318 return 0;
5319
5320 assert(s->buf != NULL);
5321
5322 if (s->pos > 0) {
5323 if (s->pos < s->len) {
5324 /* move all data and end of string marker */
5325 memmove(s->buf, s->buf + s->pos, s->len - s->pos + 1);
5326 s->len -= s->pos;
5327 } else
5328 s->len = 0;
5329 s->pos = 0;
5330 }
5331
5332 if (s->len == s->size) {
5333 size_t sz = size > 8192 ? 8192 : size;
5334 char tmpbuf[8192];
5335
5336 /* before we realloc more space, see if there is a need */
5337 if ((rd1 = s->s->read(s->s, tmpbuf, 1, sz)) == 0) {
5338 s->eof = true;
5339 return 0;
5340 }
5341 if (rd1 < 0)
5342 return rd1;
5343 char *p;
5344 size_t ns = s->size + size;
5345 if ((p = realloc(s->buf, ns + 1)) == NULL) {
5346 return -1;
5347 }
5348 s->size = ns;
5349 s->buf = p;
5350 memcpy(s->buf + s->len, tmpbuf, rd1);
5351 s->len += rd1;
5352 size -= rd1;
5353 if (size == 0)
5354 return rd1;
5355 }
5356
5357 if (s->len + size > s->size)
5358 size = s->size - s->len;
5359
5360 rd = s->s->read(s->s, s->buf + s->len, 1, size);
5361
5362 if (rd < 0)
5363 return rd;
5364
5365 if (rd == 0) {
5366 s->eof = true;
5367 return rd1;
5368 }
5369 s->len += (size_t) rd;
5370 s->buf[s->len] = 0; /* fill in the spare with EOS */
5371 return rd + rd1;
5372}
5373
5374#ifdef _POSIX2_LINE_MAX
5375#define STREAM_LINE_MAX _POSIX2_LINE_MAX
5376#else
5377#define STREAM_LINE_MAX 2048
5378#endif
5379
5380static ssize_t
5381bstream_readline(bstream *s)
5382{
5383 size_t size = STREAM_LINE_MAX;
5384 size_t rd;
5385
5386 if (s->eof)
5387 return 0;
5388
5389 if (s->pos > 0 && s->len + size >= s->size) {
5390 if (s->pos < s->len) {
5391 /* move all data and end of string marker */
5392 memmove(s->buf, s->buf + s->pos, s->len - s->pos + 1);
5393 s->len -= s->pos;
5394 } else
5395 s->len = 0;
5396 s->pos = 0;
5397 }
5398
5399 assert(s->buf != NULL);
5400 if (s->len == s->size) {
5401 char *p;
5402 size_t ns = s->size + size + 8192;
5403 if ((p = realloc(s->buf, ns + 1)) == NULL) {
5404 return -1;
5405 }
5406 s->size = ns;
5407 s->buf = p;
5408 }
5409
5410 if (size > s->size - s->len)
5411 size = s->size - s->len;
5412
5413 if (fgets(s->buf + s->len, (int) size, s->s->stream_data.p) == NULL)
5414 return -1;
5415
5416 rd = strlen(s->buf + s->len);
5417
5418 if (rd == 0) {
5419 s->eof = true;
5420 return 0;
5421 }
5422 s->len += rd;
5423 s->buf[s->len] = 0; /* fill in the spare with EOS */
5424 return (ssize_t) rd;
5425}
5426
5427
5428ssize_t
5429bstream_next(bstream *s)
5430{
5431 if (s == NULL)
5432 return -1;
5433 if (s->mode > 0) {
5434 return bstream_read(s, s->mode);
5435 } else if (s->s->read == file_read) {
5436 return bstream_readline(s);
5437 } else {
5438 size_t sz = 0;
5439 ssize_t rd;
5440
5441 while ((rd = bstream_read(s, 1)) == 1 &&
5442 s->buf[s->pos + sz] != '\n') {
5443 sz++; /* sz += rd, but rd == 1 */
5444 }
5445 if (rd < 0)
5446 return rd;
5447 return (ssize_t) sz;
5448 }
5449}
5450
5451void
5452bstream_destroy(bstream *s)
5453{
5454 if (s) {
5455 if (s->s) {
5456 s->s->close(s->s);
5457 s->s->destroy(s->s);
5458 }
5459 if (s->buf)
5460 free(s->buf);
5461 free(s);
5462 }
5463}
5464
5465/* ------------------------------------------------------------------ */
5466/* callback stream
5467 *
5468 * read-only stream which calls a user-provided callback function in
5469 * order to get more data to be returned to the reader */
5470
5471struct cbstream {
5472 void *private;
5473 void (*destroy)(void *);
5474 void (*close)(void *);
5475 ssize_t (*read)(void *, void *, size_t, size_t);
5476};
5477
5478static void
5479cb_destroy(stream *s)
5480{
5481 struct cbstream *cb = s->stream_data.p;
5482
5483 if (cb->destroy)
5484 cb->destroy(cb->private);
5485 free(cb);
5486 s->stream_data.p = NULL;
5487 destroy(s);
5488}
5489
5490static void
5491cb_close(stream *s)
5492{
5493 struct cbstream *cb = s->stream_data.p;
5494
5495 if (cb->close)
5496 cb->close(cb->private);
5497}
5498
5499static ssize_t
5500cb_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
5501{
5502 struct cbstream *cb = s->stream_data.p;
5503
5504 return cb->read(cb->private, buf, elmsize, cnt);
5505}
5506
5507stream *
5508callback_stream(void *restrict private,
5509 ssize_t (*read)(void *restrict private, void *restrict buf, size_t elmsize, size_t cnt),
5510 void (*close)(void *private),
5511 void (*destroy)(void *private),
5512 const char *restrict name)
5513{
5514 stream *s;
5515 struct cbstream *cb;
5516
5517 s = create_stream(name);
5518 if (s == NULL)
5519 return NULL;
5520 cb = malloc(sizeof(struct cbstream));
5521 if (cb == NULL) {
5522 destroy(s);
5523 return NULL;
5524 }
5525 *cb = (struct cbstream) {
5526 .private = private,
5527 .destroy = destroy,
5528 .read = read,
5529 .close = close,
5530 };
5531 s->stream_data.p = cb;
5532 s->read = cb_read;
5533 s->destroy = cb_destroy;
5534 s->close = cb_close;
5535 return s;
5536}
5537
5538static ssize_t
5539stream_blackhole_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
5540{
5541 (void) s;
5542 (void) buf;
5543 (void) elmsize;
5544 return (ssize_t) cnt;
5545}
5546
5547static void
5548stream_blackhole_close(stream *s)
5549{
5550 (void) s;
5551 /* no resources to close */
5552}
5553
5554stream *
5555stream_blackhole_create(void)
5556{
5557 stream *s;
5558 if ((s = create_stream("blackhole")) == NULL) {
5559 return NULL;
5560 }
5561
5562 s->read = NULL;
5563 s->write = stream_blackhole_write;
5564 s->close = stream_blackhole_close;
5565 s->flush = NULL;
5566 s->readonly = false;
5567 return s;
5568}
5569
5570
5571/* fixed-width format streams */
5572#define STREAM_FWF_NAME "fwf_ftw"
5573
5574typedef struct {
5575 stream *s;
5576 bool eof;
5577 /* config */
5578 size_t num_fields;
5579 size_t *widths;
5580 char filler;
5581 /* state */
5582 size_t line_len;
5583 char *in_buf;
5584 char *out_buf;
5585 size_t out_buf_start;
5586 size_t out_buf_remaining;
5587} stream_fwf_data;
5588
5589
5590static ssize_t
5591stream_fwf_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
5592{
5593 stream_fwf_data *fsd;
5594 size_t to_write = cnt;
5595 size_t buf_written = 0;
5596 char nl_buf;
5597
5598 fsd = (stream_fwf_data *) s->stream_data.p;
5599 if (fsd == NULL || elmsize != 1) {
5600 return -1;
5601 }
5602 if (fsd->eof)
5603 return 0;
5604
5605 while (to_write > 0) {
5606 /* input conversion */
5607 if (fsd->out_buf_remaining == 0) { /* need to convert next line */
5608 size_t field_idx, in_buf_pos = 0, out_buf_pos = 0;
5609 ssize_t actually_read = fsd->s->read(fsd->s, fsd->in_buf, 1, fsd->line_len);
5610 if (actually_read < (ssize_t) fsd->line_len) { /* incomplete last line */
5611 if (actually_read < 0) {
5612 return actually_read; /* this is an error */
5613 }
5614 fsd->eof = true;
5615 return (ssize_t) buf_written; /* skip last line */
5616 }
5617 /* consume to next newline */
5618 while (fsd->s->read(fsd->s, &nl_buf, 1, 1) == 1 &&
5619 nl_buf != '\n')
5620 ;
5621
5622 for (field_idx = 0; field_idx < fsd->num_fields; field_idx++) {
5623 char *val_start, *val_end;
5624 val_start = fsd->in_buf + in_buf_pos;
5625 in_buf_pos += fsd->widths[field_idx];
5626 val_end = fsd->in_buf + in_buf_pos - 1;
5627 while (*val_start == fsd->filler)
5628 val_start++;
5629 while (*val_end == fsd->filler)
5630 val_end--;
5631 while (val_start <= val_end) {
5632 if (*val_start == STREAM_FWF_FIELD_SEP) {
5633 fsd->out_buf[out_buf_pos++] = STREAM_FWF_ESCAPE;
5634 }
5635 fsd->out_buf[out_buf_pos++] = *val_start++;
5636 }
5637 fsd->out_buf[out_buf_pos++] = STREAM_FWF_FIELD_SEP;
5638 }
5639 fsd->out_buf[out_buf_pos++] = STREAM_FWF_RECORD_SEP;
5640 fsd->out_buf_remaining = out_buf_pos;
5641 fsd->out_buf_start = 0;
5642 }
5643 /* now we know something is in output_buf so deliver it */
5644 if (fsd->out_buf_remaining <= to_write) {
5645 memcpy((char *) buf + buf_written, fsd->out_buf + fsd->out_buf_start, fsd->out_buf_remaining);
5646 to_write -= fsd->out_buf_remaining;
5647 buf_written += fsd->out_buf_remaining;
5648 fsd->out_buf_remaining = 0;
5649 } else {
5650 memcpy((char *) buf + buf_written, fsd->out_buf + fsd->out_buf_start, to_write);
5651 fsd->out_buf_start += to_write;
5652 fsd->out_buf_remaining -= to_write;
5653 buf_written += to_write;
5654 to_write = 0;
5655 }
5656 }
5657 return (ssize_t) buf_written;
5658}
5659
5660
5661static void
5662stream_fwf_close(stream *s)
5663{
5664 stream_fwf_data *fsd = (stream_fwf_data *) s->stream_data.p;
5665
5666 if (fsd != NULL) {
5667 stream_fwf_data *fsd = (stream_fwf_data *) s->stream_data.p;
5668 close_stream(fsd->s);
5669 free(fsd->widths);
5670 free(fsd->in_buf);
5671 free(fsd->out_buf);
5672 free(fsd);
5673 s->stream_data.p = NULL;
5674 }
5675}
5676
5677static void
5678stream_fwf_destroy(stream *s)
5679{
5680 stream_fwf_close(s);
5681 destroy(s);
5682}
5683
5684stream *
5685stream_fwf_create(stream *restrict s, size_t num_fields, size_t *restrict widths, char filler)
5686{
5687 stream *ns;
5688 stream_fwf_data *fsd = malloc(sizeof(stream_fwf_data));
5689
5690 if (fsd == NULL) {
5691 return NULL;
5692 }
5693 *fsd = (stream_fwf_data) {
5694 .s = s,
5695 .num_fields = num_fields,
5696 .widths = widths,
5697 .filler = filler,
5698 .line_len = 0,
5699 .eof = false,
5700 };
5701 for (size_t i = 0; i < num_fields; i++) {
5702 fsd->line_len += widths[i];
5703 }
5704 fsd->in_buf = malloc(fsd->line_len);
5705 if (fsd->in_buf == NULL) {
5706 close_stream(fsd->s);
5707 free(fsd);
5708 return NULL;
5709 }
5710 fsd->out_buf = malloc(fsd->line_len * 3);
5711 if (fsd->out_buf == NULL) {
5712 close_stream(fsd->s);
5713 free(fsd->in_buf);
5714 free(fsd);
5715 return NULL;
5716 }
5717 if ((ns = create_stream(STREAM_FWF_NAME)) == NULL) {
5718 close_stream(fsd->s);
5719 free(fsd->in_buf);
5720 free(fsd->out_buf);
5721 free(fsd);
5722 return NULL;
5723 }
5724 ns->read = stream_fwf_read;
5725 ns->close = stream_fwf_close;
5726 ns->destroy = stream_fwf_destroy;
5727 ns->write = NULL;
5728 ns->flush = NULL;
5729 ns->readonly = true;
5730 ns->stream_data.p = fsd;
5731 return ns;
5732}
5733