1/******************************************************
2Copyright (c) 2012 Percona LLC and/or its affiliates.
3
4tmpfile datasink for XtraBackup.
5
6This program is free software; you can redistribute it and/or modify
7it under the terms of the GNU General Public License as published by
8the Free Software Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful,
11but WITHOUT ANY WARRANTY; without even the implied warranty of
12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13GNU General Public License for more details.
14
15You should have received a copy of the GNU General Public License
16along with this program; if not, write to the Free Software
17Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
18
19*******************************************************/
20
21/* Do all writes to temporary files first, then pipe them to the specified
22datasink in a serialized way in deinit(). */
23
24#include <my_global.h>
25#include <my_base.h>
26#include "common.h"
27#include "datasink.h"
28
29typedef struct {
30 pthread_mutex_t mutex;
31 LIST *file_list;
32} ds_tmpfile_ctxt_t;
33
34typedef struct {
35 LIST list;
36 File fd;
37 char *orig_path;
38 MY_STAT mystat;
39 ds_file_t *file;
40} ds_tmp_file_t;
41
42static ds_ctxt_t *tmpfile_init(const char *root);
43static ds_file_t *tmpfile_open(ds_ctxt_t *ctxt, const char *path,
44 MY_STAT *mystat);
45static int tmpfile_write(ds_file_t *file, const uchar *buf, size_t len);
46static int tmpfile_close(ds_file_t *file);
47static void tmpfile_deinit(ds_ctxt_t *ctxt);
48
49datasink_t datasink_tmpfile = {
50 &tmpfile_init,
51 &tmpfile_open,
52 &tmpfile_write,
53 &tmpfile_close,
54 &tmpfile_deinit
55};
56
57
58static ds_ctxt_t *
59tmpfile_init(const char *root)
60{
61 ds_ctxt_t *ctxt;
62 ds_tmpfile_ctxt_t *tmpfile_ctxt;
63
64 ctxt = my_malloc(sizeof(ds_ctxt_t) + sizeof(ds_tmpfile_ctxt_t),
65 MYF(MY_FAE));
66 tmpfile_ctxt = (ds_tmpfile_ctxt_t *) (ctxt + 1);
67 tmpfile_ctxt->file_list = NULL;
68 if (pthread_mutex_init(&tmpfile_ctxt->mutex, NULL)) {
69
70 my_free(ctxt);
71 return NULL;
72 }
73
74 ctxt->ptr = tmpfile_ctxt;
75 ctxt->root = my_strdup(root, MYF(MY_FAE));
76
77 return ctxt;
78}
79
80static ds_file_t *
81tmpfile_open(ds_ctxt_t *ctxt, const char *path,
82 MY_STAT *mystat)
83{
84 ds_tmpfile_ctxt_t *tmpfile_ctxt;
85 char tmp_path[FN_REFLEN];
86 ds_tmp_file_t *tmp_file;
87 ds_file_t *file;
88 size_t path_len;
89 File fd;
90
91 /* Create a temporary file in tmpdir. The file will be automatically
92 removed on close. Code copied from mysql_tmpfile(). */
93 fd = create_temp_file(tmp_path,xtrabackup_tmpdir,
94 "xbtemp", O_BINARY | O_SEQUENTIAL,
95 MYF(MY_WME | MY_TEMPORARY));
96
97 if (fd < 0) {
98 return NULL;
99 }
100
101 path_len = strlen(path) + 1; /* terminating '\0' */
102
103 file = (ds_file_t *) my_malloc(sizeof(ds_file_t) +
104 sizeof(ds_tmp_file_t) + path_len,
105 MYF(MY_FAE));
106
107 tmp_file = (ds_tmp_file_t *) (file + 1);
108 tmp_file->file = file;
109 memcpy(&tmp_file->mystat, mystat, sizeof(MY_STAT));
110 /* Save a copy of 'path', since it may not be accessible later */
111 tmp_file->orig_path = (char *) tmp_file + sizeof(ds_tmp_file_t);
112
113 tmp_file->fd = fd;
114 memcpy(tmp_file->orig_path, path, path_len);
115
116 /* Store the real temporary file name in file->path */
117 file->path = my_strdup(tmp_path, MYF(MY_FAE));
118 file->ptr = tmp_file;
119
120 /* Store the file object in the list to be piped later */
121 tmpfile_ctxt = (ds_tmpfile_ctxt_t *) ctxt->ptr;
122 tmp_file->list.data = tmp_file;
123
124 pthread_mutex_lock(&tmpfile_ctxt->mutex);
125 tmpfile_ctxt->file_list = list_add(tmpfile_ctxt->file_list,
126 &tmp_file->list);
127 pthread_mutex_unlock(&tmpfile_ctxt->mutex);
128
129 return file;
130}
131
132static int
133tmpfile_write(ds_file_t *file, const uchar *buf, size_t len)
134{
135 File fd = ((ds_tmp_file_t *) file->ptr)->fd;
136
137 if (!my_write(fd, buf, len, MYF(MY_WME | MY_NABP))) {
138 posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED);
139 return 0;
140 }
141
142 return 1;
143}
144
145static int
146tmpfile_close(ds_file_t *file)
147{
148 /* Do nothing -- we will close (and thus remove) the file after piping
149 it to the destination datasink in tmpfile_deinit(). */
150
151 my_free(file->path);
152
153 return 0;
154}
155
156static void
157tmpfile_deinit(ds_ctxt_t *ctxt)
158{
159 LIST *list;
160 ds_tmpfile_ctxt_t *tmpfile_ctxt;
161 MY_STAT mystat;
162 ds_tmp_file_t *tmp_file;
163 ds_file_t *dst_file;
164 ds_ctxt_t *pipe_ctxt;
165 void *buf = NULL;
166 const size_t buf_size = 10 * 1024 * 1024;
167 size_t bytes;
168 size_t offset;
169
170 pipe_ctxt = ctxt->pipe_ctxt;
171 xb_a(pipe_ctxt != NULL);
172
173 buf = my_malloc(buf_size, MYF(MY_FAE));
174
175 tmpfile_ctxt = (ds_tmpfile_ctxt_t *) ctxt->ptr;
176 list = tmpfile_ctxt->file_list;
177
178 /* Walk the files in the order they have been added */
179 list = list_reverse(list);
180 while (list != NULL) {
181 tmp_file = list->data;
182 /* Stat the file to replace size and mtime on the original
183 * mystat struct */
184 if (my_fstat(tmp_file->fd, &mystat, MYF(0))) {
185 msg("error: my_fstat() failed.\n");
186 exit(EXIT_FAILURE);
187 }
188 tmp_file->mystat.st_size = mystat.st_size;
189 tmp_file->mystat.st_mtime = mystat.st_mtime;
190
191 dst_file = ds_open(pipe_ctxt, tmp_file->orig_path,
192 &tmp_file->mystat);
193 if (dst_file == NULL) {
194 msg("error: could not stream a temporary file to "
195 "'%s'\n", tmp_file->orig_path);
196 exit(EXIT_FAILURE);
197 }
198
199 /* copy to the destination datasink */
200 posix_fadvise(tmp_file->fd, 0, 0, POSIX_FADV_SEQUENTIAL);
201 if (my_seek(tmp_file->fd, 0, SEEK_SET, MYF(0)) ==
202 MY_FILEPOS_ERROR) {
203 msg("error: my_seek() failed for '%s', errno = %d.\n",
204 tmp_file->file->path, my_errno);
205 exit(EXIT_FAILURE);
206 }
207 offset = 0;
208 while ((bytes = my_read(tmp_file->fd, buf, buf_size,
209 MYF(MY_WME))) > 0) {
210 posix_fadvise(tmp_file->fd, offset, buf_size, POSIX_FADV_DONTNEED);
211 offset += buf_size;
212 if (ds_write(dst_file, buf, bytes)) {
213 msg("error: cannot write to stream for '%s'.\n",
214 tmp_file->orig_path);
215 exit(EXIT_FAILURE);
216 }
217 }
218 if (bytes == (size_t) -1) {
219 exit(EXIT_FAILURE);
220 }
221
222 my_close(tmp_file->fd, MYF(MY_WME));
223 ds_close(dst_file);
224
225 list = list_rest(list);
226 my_free(tmp_file->file);
227 }
228
229 pthread_mutex_destroy(&tmpfile_ctxt->mutex);
230
231 my_free(buf);
232 my_free(ctxt->root);
233 my_free(ctxt);
234}
235