1/******************************************************
2Copyright (c) 2012-2013 Percona LLC and/or its affiliates.
3
4buffer datasink for XtraBackup.
5
6This program is free software; you can redistribute it and/or modify
7it under the terms of the GNU General Public License as published by
8the Free Software Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful,
11but WITHOUT ANY WARRANTY; without even the implied warranty of
12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13GNU General Public License for more details.
14
15You should have received a copy of the GNU General Public License
16along with this program; if not, write to the Free Software
17Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
18
19*******************************************************/
20
21/* Does buffered output to a destination datasink set with ds_set_pipe().
22Writes to the destination datasink are guaranteed to not be smaller than a
23specified buffer size (DS_DEFAULT_BUFFER_SIZE by default), with the only
24exception for the last write for a file. */
25
26#include <my_global.h>
27#include <my_base.h>
28#include "ds_buffer.h"
29#include "common.h"
30#include "datasink.h"
31
32#define DS_DEFAULT_BUFFER_SIZE (64 * 1024)
33
34typedef struct {
35 ds_file_t *dst_file;
36 char *buf;
37 size_t pos;
38 size_t size;
39} ds_buffer_file_t;
40
41typedef struct {
42 size_t buffer_size;
43} ds_buffer_ctxt_t;
44
45static ds_ctxt_t *buffer_init(const char *root);
46static ds_file_t *buffer_open(ds_ctxt_t *ctxt, const char *path,
47 MY_STAT *mystat);
48static int buffer_write(ds_file_t *file, const uchar *buf, size_t len);
49static int buffer_close(ds_file_t *file);
50static void buffer_deinit(ds_ctxt_t *ctxt);
51
52datasink_t datasink_buffer = {
53 &buffer_init,
54 &buffer_open,
55 &buffer_write,
56 &buffer_close,
57 &buffer_deinit
58};
59
60/* Change the default buffer size */
61void ds_buffer_set_size(ds_ctxt_t *ctxt, size_t size)
62{
63 ds_buffer_ctxt_t *buffer_ctxt = (ds_buffer_ctxt_t *) ctxt->ptr;
64
65 buffer_ctxt->buffer_size = size;
66}
67
68static ds_ctxt_t *
69buffer_init(const char *root)
70{
71 ds_ctxt_t *ctxt;
72 ds_buffer_ctxt_t *buffer_ctxt;
73
74 ctxt = my_malloc(sizeof(ds_ctxt_t) + sizeof(ds_buffer_ctxt_t),
75 MYF(MY_FAE));
76 buffer_ctxt = (ds_buffer_ctxt_t *) (ctxt + 1);
77 buffer_ctxt->buffer_size = DS_DEFAULT_BUFFER_SIZE;
78
79 ctxt->ptr = buffer_ctxt;
80 ctxt->root = my_strdup(root, MYF(MY_FAE));
81
82 return ctxt;
83}
84
85static ds_file_t *
86buffer_open(ds_ctxt_t *ctxt, const char *path, MY_STAT *mystat)
87{
88 ds_buffer_ctxt_t *buffer_ctxt;
89 ds_ctxt_t *pipe_ctxt;
90 ds_file_t *dst_file;
91 ds_file_t *file;
92 ds_buffer_file_t *buffer_file;
93
94 pipe_ctxt = ctxt->pipe_ctxt;
95 xb_a(pipe_ctxt != NULL);
96
97 dst_file = ds_open(pipe_ctxt, path, mystat);
98 if (dst_file == NULL) {
99 exit(EXIT_FAILURE);
100 }
101
102 buffer_ctxt = (ds_buffer_ctxt_t *) ctxt->ptr;
103
104 file = (ds_file_t *) my_malloc(sizeof(ds_file_t) +
105 sizeof(ds_buffer_file_t) +
106 buffer_ctxt->buffer_size,
107 MYF(MY_FAE));
108
109 buffer_file = (ds_buffer_file_t *) (file + 1);
110 buffer_file->dst_file = dst_file;
111 buffer_file->buf = (char *) (buffer_file + 1);
112 buffer_file->size = buffer_ctxt->buffer_size;
113 buffer_file->pos = 0;
114
115 file->path = dst_file->path;
116 file->ptr = buffer_file;
117
118 return file;
119}
120
121static int
122buffer_write(ds_file_t *file, const uchar *buf, size_t len)
123{
124 ds_buffer_file_t *buffer_file;
125
126 buffer_file = (ds_buffer_file_t *) file->ptr;
127
128 while (len > 0) {
129 if (buffer_file->pos + len > buffer_file->size) {
130 if (buffer_file->pos > 0) {
131 size_t bytes;
132
133 bytes = buffer_file->size - buffer_file->pos;
134 memcpy(buffer_file->buf + buffer_file->pos, buf,
135 bytes);
136
137 if (ds_write(buffer_file->dst_file,
138 buffer_file->buf,
139 buffer_file->size)) {
140 return 1;
141 }
142
143 buffer_file->pos = 0;
144
145 buf += bytes;
146 len -= bytes;
147 } else {
148 /* We don't have any buffered bytes, just write
149 the entire source buffer */
150 if (ds_write(buffer_file->dst_file, buf, len)) {
151 return 1;
152 }
153 break;
154 }
155 } else {
156 memcpy(buffer_file->buf + buffer_file->pos, buf, len);
157 buffer_file->pos += len;
158 break;
159 }
160 }
161
162 return 0;
163}
164
165static int
166buffer_close(ds_file_t *file)
167{
168 ds_buffer_file_t *buffer_file;
169 int ret;
170
171 buffer_file = (ds_buffer_file_t *) file->ptr;
172 if (buffer_file->pos > 0) {
173 ds_write(buffer_file->dst_file, buffer_file->buf,
174 buffer_file->pos);
175 }
176
177 ret = ds_close(buffer_file->dst_file);
178
179 my_free(file);
180
181 return ret;
182}
183
184static void
185buffer_deinit(ds_ctxt_t *ctxt)
186{
187 my_free(ctxt->root);
188 my_free(ctxt);
189}
190