1 | /****************************************************** |
2 | Copyright (c) 2011-2013 Percona LLC and/or its affiliates. |
3 | |
4 | Streaming implementation for XtraBackup. |
5 | |
6 | This program is free software; you can redistribute it and/or modify |
7 | it under the terms of the GNU General Public License as published by |
8 | the Free Software Foundation; version 2 of the License. |
9 | |
10 | This program is distributed in the hope that it will be useful, |
11 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | GNU General Public License for more details. |
14 | |
15 | You should have received a copy of the GNU General Public License |
16 | along with this program; if not, write to the Free Software |
17 | Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA |
18 | |
19 | *******************************************************/ |
20 | |
21 | #include <my_global.h> |
22 | #include <my_base.h> |
23 | #include "common.h" |
24 | #include "datasink.h" |
25 | #include "xbstream.h" |
26 | |
27 | typedef struct { |
28 | xb_wstream_t *xbstream; |
29 | ds_file_t *dest_file; |
30 | pthread_mutex_t mutex; |
31 | } ds_stream_ctxt_t; |
32 | |
33 | typedef struct { |
34 | xb_wstream_file_t *xbstream_file; |
35 | ds_stream_ctxt_t *stream_ctxt; |
36 | } ds_stream_file_t; |
37 | |
38 | /*********************************************************************** |
39 | General streaming interface */ |
40 | |
41 | static ds_ctxt_t *xbstream_init(const char *root); |
42 | static ds_file_t *xbstream_open(ds_ctxt_t *ctxt, const char *path, |
43 | MY_STAT *mystat); |
44 | static int xbstream_write(ds_file_t *file, const uchar *buf, size_t len); |
45 | static int xbstream_close(ds_file_t *file); |
46 | static void xbstream_deinit(ds_ctxt_t *ctxt); |
47 | |
48 | datasink_t datasink_xbstream = { |
49 | &xbstream_init, |
50 | &xbstream_open, |
51 | &xbstream_write, |
52 | &xbstream_close, |
53 | &xbstream_deinit |
54 | }; |
55 | |
56 | static |
57 | ssize_t |
58 | my_xbstream_write_callback(xb_wstream_file_t *f __attribute__((unused)), |
59 | void *userdata, const void *buf, size_t len) |
60 | { |
61 | ds_stream_ctxt_t *stream_ctxt; |
62 | |
63 | stream_ctxt = (ds_stream_ctxt_t *) userdata; |
64 | |
65 | xb_ad(stream_ctxt != NULL); |
66 | xb_ad(stream_ctxt->dest_file != NULL); |
67 | |
68 | if (!ds_write(stream_ctxt->dest_file, buf, len)) { |
69 | return len; |
70 | } |
71 | return -1; |
72 | } |
73 | |
74 | static |
75 | ds_ctxt_t * |
76 | xbstream_init(const char *root __attribute__((unused))) |
77 | { |
78 | ds_ctxt_t *ctxt; |
79 | ds_stream_ctxt_t *stream_ctxt; |
80 | xb_wstream_t *xbstream; |
81 | |
82 | ctxt = my_malloc(sizeof(ds_ctxt_t) + sizeof(ds_stream_ctxt_t), |
83 | MYF(MY_FAE)); |
84 | stream_ctxt = (ds_stream_ctxt_t *)(ctxt + 1); |
85 | |
86 | if (pthread_mutex_init(&stream_ctxt->mutex, NULL)) { |
87 | msg("xbstream_init: pthread_mutex_init() failed.\n" ); |
88 | goto err; |
89 | } |
90 | |
91 | xbstream = xb_stream_write_new(); |
92 | if (xbstream == NULL) { |
93 | msg("xb_stream_write_new() failed.\n" ); |
94 | goto err; |
95 | } |
96 | stream_ctxt->xbstream = xbstream; |
97 | stream_ctxt->dest_file = NULL; |
98 | |
99 | ctxt->ptr = stream_ctxt; |
100 | |
101 | return ctxt; |
102 | |
103 | err: |
104 | my_free(ctxt); |
105 | return NULL; |
106 | } |
107 | |
108 | static |
109 | ds_file_t * |
110 | xbstream_open(ds_ctxt_t *ctxt, const char *path, MY_STAT *mystat) |
111 | { |
112 | ds_file_t *file; |
113 | ds_stream_file_t *stream_file; |
114 | ds_stream_ctxt_t *stream_ctxt; |
115 | ds_ctxt_t *dest_ctxt; |
116 | xb_wstream_t *xbstream; |
117 | xb_wstream_file_t *xbstream_file; |
118 | |
119 | |
120 | xb_ad(ctxt->pipe_ctxt != NULL); |
121 | dest_ctxt = ctxt->pipe_ctxt; |
122 | |
123 | stream_ctxt = (ds_stream_ctxt_t *) ctxt->ptr; |
124 | |
125 | pthread_mutex_lock(&stream_ctxt->mutex); |
126 | if (stream_ctxt->dest_file == NULL) { |
127 | stream_ctxt->dest_file = ds_open(dest_ctxt, path, mystat); |
128 | if (stream_ctxt->dest_file == NULL) { |
129 | return NULL; |
130 | } |
131 | } |
132 | pthread_mutex_unlock(&stream_ctxt->mutex); |
133 | |
134 | file = (ds_file_t *) my_malloc(sizeof(ds_file_t) + |
135 | sizeof(ds_stream_file_t), |
136 | MYF(MY_FAE)); |
137 | stream_file = (ds_stream_file_t *) (file + 1); |
138 | |
139 | xbstream = stream_ctxt->xbstream; |
140 | |
141 | xbstream_file = xb_stream_write_open(xbstream, path, mystat, |
142 | stream_ctxt, |
143 | my_xbstream_write_callback); |
144 | |
145 | if (xbstream_file == NULL) { |
146 | msg("xb_stream_write_open() failed.\n" ); |
147 | goto err; |
148 | } |
149 | |
150 | stream_file->xbstream_file = xbstream_file; |
151 | stream_file->stream_ctxt = stream_ctxt; |
152 | file->ptr = stream_file; |
153 | file->path = stream_ctxt->dest_file->path; |
154 | |
155 | return file; |
156 | |
157 | err: |
158 | if (stream_ctxt->dest_file) { |
159 | ds_close(stream_ctxt->dest_file); |
160 | stream_ctxt->dest_file = NULL; |
161 | } |
162 | my_free(file); |
163 | |
164 | return NULL; |
165 | } |
166 | |
167 | static |
168 | int |
169 | xbstream_write(ds_file_t *file, const uchar *buf, size_t len) |
170 | { |
171 | ds_stream_file_t *stream_file; |
172 | xb_wstream_file_t *xbstream_file; |
173 | |
174 | |
175 | stream_file = (ds_stream_file_t *) file->ptr; |
176 | |
177 | xbstream_file = stream_file->xbstream_file; |
178 | |
179 | if (xb_stream_write_data(xbstream_file, buf, len)) { |
180 | msg("xb_stream_write_data() failed.\n" ); |
181 | return 1; |
182 | } |
183 | |
184 | return 0; |
185 | } |
186 | |
187 | static |
188 | int |
189 | xbstream_close(ds_file_t *file) |
190 | { |
191 | ds_stream_file_t *stream_file; |
192 | int rc = 0; |
193 | |
194 | stream_file = (ds_stream_file_t *)file->ptr; |
195 | |
196 | rc = xb_stream_write_close(stream_file->xbstream_file); |
197 | |
198 | my_free(file); |
199 | |
200 | return rc; |
201 | } |
202 | |
203 | static |
204 | void |
205 | xbstream_deinit(ds_ctxt_t *ctxt) |
206 | { |
207 | ds_stream_ctxt_t *stream_ctxt; |
208 | |
209 | stream_ctxt = (ds_stream_ctxt_t *) ctxt->ptr; |
210 | |
211 | if (xb_stream_write_done(stream_ctxt->xbstream)) { |
212 | msg("xb_stream_done() failed.\n" ); |
213 | } |
214 | |
215 | if (stream_ctxt->dest_file) { |
216 | ds_close(stream_ctxt->dest_file); |
217 | stream_ctxt->dest_file = NULL; |
218 | } |
219 | |
220 | pthread_mutex_destroy(&stream_ctxt->mutex); |
221 | |
222 | my_free(ctxt); |
223 | } |
224 | |