1/******************************************************
2Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
3
4Streaming implementation for XtraBackup.
5
6This program is free software; you can redistribute it and/or modify
7it under the terms of the GNU General Public License as published by
8the Free Software Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful,
11but WITHOUT ANY WARRANTY; without even the implied warranty of
12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13GNU General Public License for more details.
14
15You should have received a copy of the GNU General Public License
16along with this program; if not, write to the Free Software
17Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
18
19*******************************************************/
20
21#include <my_global.h>
22#include <my_base.h>
23#include "common.h"
24#include "datasink.h"
25#include "xbstream.h"
26
27typedef struct {
28 xb_wstream_t *xbstream;
29 ds_file_t *dest_file;
30 pthread_mutex_t mutex;
31} ds_stream_ctxt_t;
32
33typedef struct {
34 xb_wstream_file_t *xbstream_file;
35 ds_stream_ctxt_t *stream_ctxt;
36} ds_stream_file_t;
37
38/***********************************************************************
39General streaming interface */
40
41static ds_ctxt_t *xbstream_init(const char *root);
42static ds_file_t *xbstream_open(ds_ctxt_t *ctxt, const char *path,
43 MY_STAT *mystat);
44static int xbstream_write(ds_file_t *file, const uchar *buf, size_t len);
45static int xbstream_close(ds_file_t *file);
46static void xbstream_deinit(ds_ctxt_t *ctxt);
47
48datasink_t datasink_xbstream = {
49 &xbstream_init,
50 &xbstream_open,
51 &xbstream_write,
52 &xbstream_close,
53 &xbstream_deinit
54};
55
56static
57ssize_t
58my_xbstream_write_callback(xb_wstream_file_t *f __attribute__((unused)),
59 void *userdata, const void *buf, size_t len)
60{
61 ds_stream_ctxt_t *stream_ctxt;
62
63 stream_ctxt = (ds_stream_ctxt_t *) userdata;
64
65 xb_ad(stream_ctxt != NULL);
66 xb_ad(stream_ctxt->dest_file != NULL);
67
68 if (!ds_write(stream_ctxt->dest_file, buf, len)) {
69 return len;
70 }
71 return -1;
72}
73
74static
75ds_ctxt_t *
76xbstream_init(const char *root __attribute__((unused)))
77{
78 ds_ctxt_t *ctxt;
79 ds_stream_ctxt_t *stream_ctxt;
80 xb_wstream_t *xbstream;
81
82 ctxt = my_malloc(sizeof(ds_ctxt_t) + sizeof(ds_stream_ctxt_t),
83 MYF(MY_FAE));
84 stream_ctxt = (ds_stream_ctxt_t *)(ctxt + 1);
85
86 if (pthread_mutex_init(&stream_ctxt->mutex, NULL)) {
87 msg("xbstream_init: pthread_mutex_init() failed.\n");
88 goto err;
89 }
90
91 xbstream = xb_stream_write_new();
92 if (xbstream == NULL) {
93 msg("xb_stream_write_new() failed.\n");
94 goto err;
95 }
96 stream_ctxt->xbstream = xbstream;
97 stream_ctxt->dest_file = NULL;
98
99 ctxt->ptr = stream_ctxt;
100
101 return ctxt;
102
103err:
104 my_free(ctxt);
105 return NULL;
106}
107
108static
109ds_file_t *
110xbstream_open(ds_ctxt_t *ctxt, const char *path, MY_STAT *mystat)
111{
112 ds_file_t *file;
113 ds_stream_file_t *stream_file;
114 ds_stream_ctxt_t *stream_ctxt;
115 ds_ctxt_t *dest_ctxt;
116 xb_wstream_t *xbstream;
117 xb_wstream_file_t *xbstream_file;
118
119
120 xb_ad(ctxt->pipe_ctxt != NULL);
121 dest_ctxt = ctxt->pipe_ctxt;
122
123 stream_ctxt = (ds_stream_ctxt_t *) ctxt->ptr;
124
125 pthread_mutex_lock(&stream_ctxt->mutex);
126 if (stream_ctxt->dest_file == NULL) {
127 stream_ctxt->dest_file = ds_open(dest_ctxt, path, mystat);
128 if (stream_ctxt->dest_file == NULL) {
129 return NULL;
130 }
131 }
132 pthread_mutex_unlock(&stream_ctxt->mutex);
133
134 file = (ds_file_t *) my_malloc(sizeof(ds_file_t) +
135 sizeof(ds_stream_file_t),
136 MYF(MY_FAE));
137 stream_file = (ds_stream_file_t *) (file + 1);
138
139 xbstream = stream_ctxt->xbstream;
140
141 xbstream_file = xb_stream_write_open(xbstream, path, mystat,
142 stream_ctxt,
143 my_xbstream_write_callback);
144
145 if (xbstream_file == NULL) {
146 msg("xb_stream_write_open() failed.\n");
147 goto err;
148 }
149
150 stream_file->xbstream_file = xbstream_file;
151 stream_file->stream_ctxt = stream_ctxt;
152 file->ptr = stream_file;
153 file->path = stream_ctxt->dest_file->path;
154
155 return file;
156
157err:
158 if (stream_ctxt->dest_file) {
159 ds_close(stream_ctxt->dest_file);
160 stream_ctxt->dest_file = NULL;
161 }
162 my_free(file);
163
164 return NULL;
165}
166
167static
168int
169xbstream_write(ds_file_t *file, const uchar *buf, size_t len)
170{
171 ds_stream_file_t *stream_file;
172 xb_wstream_file_t *xbstream_file;
173
174
175 stream_file = (ds_stream_file_t *) file->ptr;
176
177 xbstream_file = stream_file->xbstream_file;
178
179 if (xb_stream_write_data(xbstream_file, buf, len)) {
180 msg("xb_stream_write_data() failed.\n");
181 return 1;
182 }
183
184 return 0;
185}
186
187static
188int
189xbstream_close(ds_file_t *file)
190{
191 ds_stream_file_t *stream_file;
192 int rc = 0;
193
194 stream_file = (ds_stream_file_t *)file->ptr;
195
196 rc = xb_stream_write_close(stream_file->xbstream_file);
197
198 my_free(file);
199
200 return rc;
201}
202
203static
204void
205xbstream_deinit(ds_ctxt_t *ctxt)
206{
207 ds_stream_ctxt_t *stream_ctxt;
208
209 stream_ctxt = (ds_stream_ctxt_t *) ctxt->ptr;
210
211 if (xb_stream_write_done(stream_ctxt->xbstream)) {
212 msg("xb_stream_done() failed.\n");
213 }
214
215 if (stream_ctxt->dest_file) {
216 ds_close(stream_ctxt->dest_file);
217 stream_ctxt->dest_file = NULL;
218 }
219
220 pthread_mutex_destroy(&stream_ctxt->mutex);
221
222 my_free(ctxt);
223}
224