1/******************************************************
2Copyright (c) 2011-2017 Percona LLC and/or its affiliates.
3
4The xbstream format writer implementation.
5
6This program is free software; you can redistribute it and/or modify
7it under the terms of the GNU General Public License as published by
8the Free Software Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful,
11but WITHOUT ANY WARRANTY; without even the implied warranty of
12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13GNU General Public License for more details.
14
15You should have received a copy of the GNU General Public License
16along with this program; if not, write to the Free Software
17Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
18
19*******************************************************/
20
21#include <my_global.h>
22#include <my_base.h>
23#include <zlib.h>
24#include "common.h"
25#include "xbstream.h"
26#include "crc_glue.h"
27
28/* Group writes smaller than this into a single chunk */
29#define XB_STREAM_MIN_CHUNK_SIZE (10 * 1024 * 1024)
30
31struct xb_wstream_struct {
32 pthread_mutex_t mutex;
33};
34
35struct xb_wstream_file_struct {
36 xb_wstream_t *stream;
37 char *path;
38 size_t path_len;
39 char chunk[XB_STREAM_MIN_CHUNK_SIZE];
40 char *chunk_ptr;
41 size_t chunk_free;
42 my_off_t offset;
43 void *userdata;
44 xb_stream_write_callback *write;
45};
46
47static int xb_stream_flush(xb_wstream_file_t *file);
48static int xb_stream_write_chunk(xb_wstream_file_t *file,
49 const void *buf, size_t len);
50static int xb_stream_write_eof(xb_wstream_file_t *file);
51
52static
53ssize_t
54xb_stream_default_write_callback(xb_wstream_file_t *file __attribute__((unused)),
55 void *userdata __attribute__((unused)),
56 const void *buf, size_t len)
57{
58 if (my_write(my_fileno(stdout), buf, len, MYF(MY_WME | MY_NABP)))
59 return -1;
60 return len;
61}
62
63xb_wstream_t *
64xb_stream_write_new(void)
65{
66 xb_wstream_t *stream;
67
68 stream = (xb_wstream_t *) my_malloc(sizeof(xb_wstream_t), MYF(MY_FAE));
69 pthread_mutex_init(&stream->mutex, NULL);
70
71 return stream;;
72}
73
74xb_wstream_file_t *
75xb_stream_write_open(xb_wstream_t *stream, const char *path,
76 MY_STAT *mystat __attribute__((unused)),
77 void *userdata,
78 xb_stream_write_callback *onwrite)
79{
80 xb_wstream_file_t *file;
81 size_t path_len;
82
83 path_len = strlen(path);
84
85 if (path_len > FN_REFLEN) {
86 msg("xb_stream_write_open(): file path is too long.\n");
87 return NULL;
88 }
89
90 file = (xb_wstream_file_t *) my_malloc(sizeof(xb_wstream_file_t) +
91 path_len + 1, MYF(MY_FAE));
92
93 file->path = (char *) (file + 1);
94#ifdef _WIN32
95 /* Normalize path on Windows, so we can restore elsewhere.*/
96 {
97 int i;
98 for (i = 0; ; i++) {
99 file->path[i] = (path[i] == '\\') ? '/' : path[i];
100 if (!path[i])
101 break;
102 }
103 }
104#else
105 memcpy(file->path, path, path_len + 1);
106#endif
107 file->path_len = path_len;
108
109 file->stream = stream;
110 file->offset = 0;
111 file->chunk_ptr = file->chunk;
112 file->chunk_free = XB_STREAM_MIN_CHUNK_SIZE;
113 if (onwrite) {
114#ifdef __WIN__
115 setmode(fileno(stdout), _O_BINARY);
116#endif
117 file->userdata = userdata;
118 file->write = onwrite;
119 } else {
120 file->userdata = NULL;
121 file->write = xb_stream_default_write_callback;
122 }
123
124 return file;
125}
126
127int
128xb_stream_write_data(xb_wstream_file_t *file, const void *buf, size_t len)
129{
130 if (len < file->chunk_free) {
131 memcpy(file->chunk_ptr, buf, len);
132 file->chunk_ptr += len;
133 file->chunk_free -= len;
134
135 return 0;
136 }
137
138 if (xb_stream_flush(file))
139 return 1;
140
141 return xb_stream_write_chunk(file, buf, len);
142}
143
144int
145xb_stream_write_close(xb_wstream_file_t *file)
146{
147 if (xb_stream_flush(file) ||
148 xb_stream_write_eof(file)) {
149 my_free(file);
150 return 1;
151 }
152
153 my_free(file);
154
155 return 0;
156}
157
158int
159xb_stream_write_done(xb_wstream_t *stream)
160{
161 pthread_mutex_destroy(&stream->mutex);
162
163 my_free(stream);
164
165 return 0;
166}
167
168static
169int
170xb_stream_flush(xb_wstream_file_t *file)
171{
172 if (file->chunk_ptr == file->chunk) {
173 return 0;
174 }
175
176 if (xb_stream_write_chunk(file, file->chunk,
177 file->chunk_ptr - file->chunk)) {
178 return 1;
179 }
180
181 file->chunk_ptr = file->chunk;
182 file->chunk_free = XB_STREAM_MIN_CHUNK_SIZE;
183
184 return 0;
185}
186
187static
188int
189xb_stream_write_chunk(xb_wstream_file_t *file, const void *buf, size_t len)
190{
191 /* Chunk magic + flags + chunk type + path_len + path + len + offset +
192 checksum */
193 uchar tmpbuf[sizeof(XB_STREAM_CHUNK_MAGIC) - 1 + 1 + 1 + 4 +
194 FN_REFLEN + 8 + 8 + 4];
195 uchar *ptr;
196 xb_wstream_t *stream = file->stream;
197 ulong checksum;
198
199 /* Write xbstream header */
200 ptr = tmpbuf;
201
202 /* Chunk magic */
203 memcpy(ptr, XB_STREAM_CHUNK_MAGIC, sizeof(XB_STREAM_CHUNK_MAGIC) - 1);
204 ptr += sizeof(XB_STREAM_CHUNK_MAGIC) - 1;
205
206 *ptr++ = 0; /* Chunk flags */
207
208 *ptr++ = (uchar) XB_CHUNK_TYPE_PAYLOAD; /* Chunk type */
209
210 int4store(ptr, file->path_len); /* Path length */
211 ptr += 4;
212
213 memcpy(ptr, file->path, file->path_len); /* Path */
214 ptr += file->path_len;
215
216 int8store(ptr, len); /* Payload length */
217 ptr += 8;
218
219 checksum = crc32_iso3309(0, buf, (uint)len); /* checksum */
220
221 pthread_mutex_lock(&stream->mutex);
222
223 int8store(ptr, file->offset); /* Payload offset */
224 ptr += 8;
225
226 int4store(ptr, checksum);
227 ptr += 4;
228
229 xb_ad(ptr <= tmpbuf + sizeof(tmpbuf));
230
231 if (file->write(file, file->userdata, tmpbuf, ptr-tmpbuf) == -1)
232 goto err;
233
234
235 if (file->write(file, file->userdata, buf, len) == -1) /* Payload */
236 goto err;
237
238 file->offset+= len;
239
240 pthread_mutex_unlock(&stream->mutex);
241
242 return 0;
243
244err:
245
246 pthread_mutex_unlock(&stream->mutex);
247
248 return 1;
249}
250
251static
252int
253xb_stream_write_eof(xb_wstream_file_t *file)
254{
255 /* Chunk magic + flags + chunk type + path_len + path */
256 uchar tmpbuf[sizeof(XB_STREAM_CHUNK_MAGIC) - 1 + 1 + 1 + 4 +
257 FN_REFLEN];
258 uchar *ptr;
259 xb_wstream_t *stream = file->stream;
260
261 pthread_mutex_lock(&stream->mutex);
262
263 /* Write xbstream header */
264 ptr = tmpbuf;
265
266 /* Chunk magic */
267 memcpy(ptr, XB_STREAM_CHUNK_MAGIC, sizeof(XB_STREAM_CHUNK_MAGIC) - 1);
268 ptr += sizeof(XB_STREAM_CHUNK_MAGIC) - 1;
269
270 *ptr++ = 0; /* Chunk flags */
271
272 *ptr++ = (uchar) XB_CHUNK_TYPE_EOF; /* Chunk type */
273
274 int4store(ptr, file->path_len); /* Path length */
275 ptr += 4;
276
277 memcpy(ptr, file->path, file->path_len); /* Path */
278 ptr += file->path_len;
279
280 xb_ad(ptr <= tmpbuf + sizeof(tmpbuf));
281
282 if (file->write(file, file->userdata, tmpbuf,
283 (ulonglong) (ptr - tmpbuf)) == -1)
284 goto err;
285
286 pthread_mutex_unlock(&stream->mutex);
287
288 return 0;
289err:
290
291 pthread_mutex_unlock(&stream->mutex);
292
293 return 1;
294}
295