1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9/*
10 * Niels Nes
11 * A simple interface to IO streams
12 * All file IO is tunneled through the stream library, which guarantees
13 * cross-platform capabilities. Several protocols are provided, e.g. it
14 * can be used to open 'non compressed, gzipped, bzip2ed' data files. It
15 * encapsulates the corresponding library managed in common/stream.
16 */
17
18#include "monetdb_config.h"
19#include "streams.h"
20#include "mal_exception.h"
21
22str mnstr_open_rstreamwrap(Stream *S, str *filename)
23{
24 stream *s;
25
26 if ((s = open_rstream(*filename)) == NULL || mnstr_errnr(s)) {
27 int errnr = errno;
28 if (s)
29 close_stream(s);
30 throw(IO, "streams.open", "could not open file '%s': %s",
31 *filename, strerror(errnr));
32 } else {
33 *(stream**)S = s;
34 }
35
36 return MAL_SUCCEED;
37}
38str mnstr_open_wstreamwrap(Stream *S, str *filename)
39{
40 stream *s;
41
42 if ((s = open_wstream(*filename)) == NULL || mnstr_errnr(s)) {
43 int errnr = errno;
44 if (s)
45 close_stream(s);
46 throw(IO, "streams.open", "could not open file '%s': %s",
47 *filename, strerror(errnr));
48 } else {
49 *(stream**)S = s;
50 }
51
52 return MAL_SUCCEED;
53}
54
55str mnstr_open_rastreamwrap(Stream *S, str *filename)
56{
57 stream *s;
58
59 if ((s = open_rastream(*filename)) == NULL || mnstr_errnr(s)) {
60 int errnr = errno;
61 if (s)
62 close_stream(s);
63 throw(IO, "streams.open", "could not open file '%s': %s",
64 *filename, strerror(errnr));
65 } else {
66 *(stream**)S = s;
67 }
68
69 return MAL_SUCCEED;
70}
71
72str mnstr_open_wastreamwrap(Stream *S, str *filename)
73{
74 stream *s;
75
76 if ((s = open_wastream(*filename)) == NULL || mnstr_errnr(s)) {
77 int errnr = errno;
78 if (s)
79 close_stream(s);
80 throw(IO, "streams.open", "could not open file '%s': %s",
81 *filename, strerror(errnr));
82 } else {
83 *(stream**)S = s;
84 }
85
86 return MAL_SUCCEED;
87}
88
89str
90mnstr_write_stringwrap(void *ret, Stream *S, str *data)
91{
92 stream *s = *(stream **)S;
93 (void)ret;
94
95 if (mnstr_write(s, *data, 1, strlen(*data)) < 0)
96 throw(IO, "streams.writeStr", "failed to write string");
97
98 return MAL_SUCCEED;
99}
100
101str
102mnstr_writeIntwrap(void *ret, Stream *S, int *data)
103{
104 stream *s = *(stream **)S;
105 (void)ret;
106
107 if (!mnstr_writeInt(s, *data))
108 throw(IO, "streams.writeInt", "failed to write int");
109
110 return MAL_SUCCEED;
111}
112
113str
114mnstr_readIntwrap(int *ret, Stream *S)
115{
116 stream *s = *(stream **)S;
117
118 if (mnstr_readInt(s, ret) != 1)
119 throw(IO, "streams.readInt", "failed to read int");
120
121 return MAL_SUCCEED;
122}
123
124#define CHUNK (64 * 1024)
125str
126mnstr_read_stringwrap(str *res, Stream *S)
127{
128 stream *s = *(stream **)S;
129 ssize_t len = 0;
130 size_t size = CHUNK + 1;
131 char *buf = GDKmalloc(size), *start = buf, *tmp;
132
133 if( buf == NULL)
134 throw(MAL,"mnstr_read_stringwrap", SQLSTATE(HY001) MAL_MALLOC_FAIL);
135 while ((len = mnstr_read(s, start, 1, CHUNK)) > 0) {
136 size += len;
137 tmp = GDKrealloc(buf, size);
138 if (tmp == NULL) {
139 GDKfree(buf);
140 throw(MAL,"mnstr_read_stringwrap", SQLSTATE(HY001) MAL_MALLOC_FAIL);
141 }
142 buf = tmp;
143 start = buf + size - CHUNK - 1;
144
145 *start = '\0';
146 }
147 if (len < 0)
148 throw(IO, "streams.readStr", "failed to read string");
149 start += len;
150 *start = '\0';
151 *res = buf;
152
153 return MAL_SUCCEED;
154}
155
156str
157mnstr_flush_streamwrap(void *ret, Stream *S)
158{
159 stream *s = *(stream **)S;
160 (void)ret;
161
162 if (mnstr_flush(s))
163 throw(IO, "streams.flush", "failed to flush stream");
164
165 return MAL_SUCCEED;
166}
167
168str
169mnstr_close_streamwrap(void *ret, Stream *S)
170{
171 (void)ret;
172
173 close_stream(*(stream **)S);
174
175 return MAL_SUCCEED;
176}
177
178str
179open_block_streamwrap(Stream *S, Stream *is)
180{
181 if ((*(stream **)S = block_stream(*(stream **)is)) == NULL)
182 throw(IO, "bstreams.open", "failed to open block stream");
183
184 return MAL_SUCCEED;
185}
186
187str
188bstream_create_wrapwrap(Bstream *Bs, Stream *S, int *bufsize)
189{
190 if ((*(bstream **)Bs = bstream_create(*(stream **)S, (size_t)*bufsize)) == NULL)
191 throw(IO, "bstreams.create", "failed to create block stream");
192
193 return MAL_SUCCEED;
194}
195
196str
197bstream_destroy_wrapwrap(void *ret, Bstream *BS)
198{
199 (void)ret;
200
201 bstream_destroy(*(bstream **)BS);
202
203 return MAL_SUCCEED;
204}
205
206str
207bstream_read_wrapwrap(int *res, Bstream *BS, int *size)
208{
209 *res = (int)bstream_read(*(bstream **)BS, (size_t)*size);
210
211 return MAL_SUCCEED;
212}
213