1/******************************************************
2Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
3
4Compressing datasink implementation for XtraBackup.
5
6This program is free software; you can redistribute it and/or modify
7it under the terms of the GNU General Public License as published by
8the Free Software Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful,
11but WITHOUT ANY WARRANTY; without even the implied warranty of
12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13GNU General Public License for more details.
14
15You should have received a copy of the GNU General Public License
16along with this program; if not, write to the Free Software
17Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
18
19*******************************************************/
20
21#include <my_global.h>
22#include <mysql_version.h>
23#include <my_base.h>
24#include <quicklz.h>
25#include <zlib.h>
26#include "common.h"
27#include "datasink.h"
28
29#define COMPRESS_CHUNK_SIZE ((size_t) (xtrabackup_compress_chunk_size))
30#define MY_QLZ_COMPRESS_OVERHEAD 400
31
32typedef struct {
33 pthread_t id;
34 uint num;
35 pthread_mutex_t ctrl_mutex;
36 pthread_cond_t ctrl_cond;
37 pthread_mutex_t data_mutex;
38 pthread_cond_t data_cond;
39 my_bool started;
40 my_bool data_avail;
41 my_bool cancelled;
42 const char *from;
43 size_t from_len;
44 char *to;
45 size_t to_len;
46 qlz_state_compress state;
47 ulong adler;
48} comp_thread_ctxt_t;
49
50typedef struct {
51 comp_thread_ctxt_t *threads;
52 uint nthreads;
53} ds_compress_ctxt_t;
54
55typedef struct {
56 ds_file_t *dest_file;
57 ds_compress_ctxt_t *comp_ctxt;
58 size_t bytes_processed;
59} ds_compress_file_t;
60
61/* Compression options */
62extern char *xtrabackup_compress_alg;
63extern uint xtrabackup_compress_threads;
64extern ulonglong xtrabackup_compress_chunk_size;
65
66static ds_ctxt_t *compress_init(const char *root);
67static ds_file_t *compress_open(ds_ctxt_t *ctxt, const char *path,
68 MY_STAT *mystat);
69static int compress_write(ds_file_t *file, const uchar *buf, size_t len);
70static int compress_close(ds_file_t *file);
71static void compress_deinit(ds_ctxt_t *ctxt);
72
73datasink_t datasink_compress = {
74 &compress_init,
75 &compress_open,
76 &compress_write,
77 &compress_close,
78 &compress_deinit
79};
80
81static inline int write_uint32_le(ds_file_t *file, ulong n);
82static inline int write_uint64_le(ds_file_t *file, ulonglong n);
83
84static comp_thread_ctxt_t *create_worker_threads(uint n);
85static void destroy_worker_threads(comp_thread_ctxt_t *threads, uint n);
86static void *compress_worker_thread_func(void *arg);
87
88static
89ds_ctxt_t *
90compress_init(const char *root)
91{
92 ds_ctxt_t *ctxt;
93 ds_compress_ctxt_t *compress_ctxt;
94 comp_thread_ctxt_t *threads;
95
96 /* Create and initialize the worker threads */
97 threads = create_worker_threads(xtrabackup_compress_threads);
98 if (threads == NULL) {
99 msg("compress: failed to create worker threads.\n");
100 return NULL;
101 }
102
103 ctxt = (ds_ctxt_t *) my_malloc(sizeof(ds_ctxt_t) +
104 sizeof(ds_compress_ctxt_t),
105 MYF(MY_FAE));
106
107 compress_ctxt = (ds_compress_ctxt_t *) (ctxt + 1);
108 compress_ctxt->threads = threads;
109 compress_ctxt->nthreads = xtrabackup_compress_threads;
110
111 ctxt->ptr = compress_ctxt;
112 ctxt->root = my_strdup(root, MYF(MY_FAE));
113
114 return ctxt;
115}
116
117static
118ds_file_t *
119compress_open(ds_ctxt_t *ctxt, const char *path, MY_STAT *mystat)
120{
121 ds_compress_ctxt_t *comp_ctxt;
122 ds_ctxt_t *dest_ctxt;
123 ds_file_t *dest_file;
124 char new_name[FN_REFLEN];
125 size_t name_len;
126 ds_file_t *file;
127 ds_compress_file_t *comp_file;
128
129 xb_ad(ctxt->pipe_ctxt != NULL);
130 dest_ctxt = ctxt->pipe_ctxt;
131
132 comp_ctxt = (ds_compress_ctxt_t *) ctxt->ptr;
133
134 /* Append the .qp extension to the filename */
135 fn_format(new_name, path, "", ".qp", MYF(MY_APPEND_EXT));
136
137 dest_file = ds_open(dest_ctxt, new_name, mystat);
138 if (dest_file == NULL) {
139 return NULL;
140 }
141
142 /* Write the qpress archive header */
143 if (ds_write(dest_file, "qpress10", 8) ||
144 write_uint64_le(dest_file, COMPRESS_CHUNK_SIZE)) {
145 goto err;
146 }
147
148 /* We are going to create a one-file "flat" (i.e. with no
149 subdirectories) archive. So strip the directory part from the path and
150 remove the '.qp' suffix. */
151 fn_format(new_name, path, "", "", MYF(MY_REPLACE_DIR));
152
153 /* Write the qpress file header */
154 name_len = strlen(new_name);
155 if (ds_write(dest_file, "F", 1) ||
156 write_uint32_le(dest_file, (uint)name_len) ||
157 /* we want to write the terminating \0 as well */
158 ds_write(dest_file, new_name, name_len + 1)) {
159 goto err;
160 }
161
162 file = (ds_file_t *) my_malloc(sizeof(ds_file_t) +
163 sizeof(ds_compress_file_t),
164 MYF(MY_FAE));
165 comp_file = (ds_compress_file_t *) (file + 1);
166 comp_file->dest_file = dest_file;
167 comp_file->comp_ctxt = comp_ctxt;
168 comp_file->bytes_processed = 0;
169
170 file->ptr = comp_file;
171 file->path = dest_file->path;
172
173 return file;
174
175err:
176 ds_close(dest_file);
177 return NULL;
178}
179
180static
181int
182compress_write(ds_file_t *file, const uchar *buf, size_t len)
183{
184 ds_compress_file_t *comp_file;
185 ds_compress_ctxt_t *comp_ctxt;
186 comp_thread_ctxt_t *threads;
187 comp_thread_ctxt_t *thd;
188 uint nthreads;
189 uint i;
190 const char *ptr;
191 ds_file_t *dest_file;
192
193 comp_file = (ds_compress_file_t *) file->ptr;
194 comp_ctxt = comp_file->comp_ctxt;
195 dest_file = comp_file->dest_file;
196
197 threads = comp_ctxt->threads;
198 nthreads = comp_ctxt->nthreads;
199
200 ptr = (const char *) buf;
201 while (len > 0) {
202 uint max_thread;
203
204 /* Send data to worker threads for compression */
205 for (i = 0; i < nthreads; i++) {
206 size_t chunk_len;
207
208 thd = threads + i;
209
210 pthread_mutex_lock(&thd->ctrl_mutex);
211
212 chunk_len = (len > COMPRESS_CHUNK_SIZE) ?
213 COMPRESS_CHUNK_SIZE : len;
214 thd->from = ptr;
215 thd->from_len = chunk_len;
216
217 pthread_mutex_lock(&thd->data_mutex);
218 thd->data_avail = TRUE;
219 pthread_cond_signal(&thd->data_cond);
220 pthread_mutex_unlock(&thd->data_mutex);
221
222 len -= chunk_len;
223 if (len == 0) {
224 break;
225 }
226 ptr += chunk_len;
227 }
228
229 max_thread = (i < nthreads) ? i : nthreads - 1;
230
231 /* Reap and stream the compressed data */
232 for (i = 0; i <= max_thread; i++) {
233 thd = threads + i;
234
235 pthread_mutex_lock(&thd->data_mutex);
236 while (thd->data_avail == TRUE) {
237 pthread_cond_wait(&thd->data_cond,
238 &thd->data_mutex);
239 }
240
241 xb_a(threads[i].to_len > 0);
242
243 if (ds_write(dest_file, "NEWBNEWB", 8) ||
244 write_uint64_le(dest_file,
245 comp_file->bytes_processed)) {
246 msg("compress: write to the destination stream "
247 "failed.\n");
248 return 1;
249 }
250
251 comp_file->bytes_processed += threads[i].from_len;
252
253 if (write_uint32_le(dest_file, threads[i].adler) ||
254 ds_write(dest_file, threads[i].to,
255 threads[i].to_len)) {
256 msg("compress: write to the destination stream "
257 "failed.\n");
258 return 1;
259 }
260
261 pthread_mutex_unlock(&threads[i].data_mutex);
262 pthread_mutex_unlock(&threads[i].ctrl_mutex);
263 }
264 }
265
266 return 0;
267}
268
269static
270int
271compress_close(ds_file_t *file)
272{
273 ds_compress_file_t *comp_file;
274 ds_file_t *dest_file;
275 int rc;
276
277 comp_file = (ds_compress_file_t *) file->ptr;
278 dest_file = comp_file->dest_file;
279
280 /* Write the qpress file trailer */
281 ds_write(dest_file, "ENDSENDS", 8);
282
283 /* Supposedly the number of written bytes should be written as a
284 "recovery information" in the file trailer, but in reality qpress
285 always writes 8 zeros here. Let's do the same */
286
287 write_uint64_le(dest_file, 0);
288
289 rc = ds_close(dest_file);
290
291 my_free(file);
292
293 return rc;
294}
295
296static
297void
298compress_deinit(ds_ctxt_t *ctxt)
299{
300 ds_compress_ctxt_t *comp_ctxt;
301
302 xb_ad(ctxt->pipe_ctxt != NULL);
303
304 comp_ctxt = (ds_compress_ctxt_t *) ctxt->ptr;;
305
306 destroy_worker_threads(comp_ctxt->threads, comp_ctxt->nthreads);
307
308 my_free(ctxt->root);
309 my_free(ctxt);
310}
311
312static inline
313int
314write_uint32_le(ds_file_t *file, ulong n)
315{
316 char tmp[4];
317
318 int4store(tmp, n);
319 return ds_write(file, tmp, sizeof(tmp));
320}
321
322static inline
323int
324write_uint64_le(ds_file_t *file, ulonglong n)
325{
326 char tmp[8];
327
328 int8store(tmp, n);
329 return ds_write(file, tmp, sizeof(tmp));
330}
331
332static
333comp_thread_ctxt_t *
334create_worker_threads(uint n)
335{
336 comp_thread_ctxt_t *threads;
337 uint i;
338
339 threads = (comp_thread_ctxt_t *)
340 my_malloc(sizeof(comp_thread_ctxt_t) * n, MYF(MY_FAE));
341
342 for (i = 0; i < n; i++) {
343 comp_thread_ctxt_t *thd = threads + i;
344
345 thd->num = i + 1;
346 thd->started = FALSE;
347 thd->cancelled = FALSE;
348 thd->data_avail = FALSE;
349
350 thd->to = (char *) my_malloc(COMPRESS_CHUNK_SIZE +
351 MY_QLZ_COMPRESS_OVERHEAD,
352 MYF(MY_FAE));
353
354 /* Initialize the control mutex and condition var */
355 if (pthread_mutex_init(&thd->ctrl_mutex, NULL) ||
356 pthread_cond_init(&thd->ctrl_cond, NULL)) {
357 goto err;
358 }
359
360 /* Initialize and data mutex and condition var */
361 if (pthread_mutex_init(&thd->data_mutex, NULL) ||
362 pthread_cond_init(&thd->data_cond, NULL)) {
363 goto err;
364 }
365
366 pthread_mutex_lock(&thd->ctrl_mutex);
367
368 if (pthread_create(&thd->id, NULL, compress_worker_thread_func,
369 thd)) {
370 msg("compress: pthread_create() failed: "
371 "errno = %d\n", errno);
372 goto err;
373 }
374 }
375
376 /* Wait for the threads to start */
377 for (i = 0; i < n; i++) {
378 comp_thread_ctxt_t *thd = threads + i;
379
380 while (thd->started == FALSE)
381 pthread_cond_wait(&thd->ctrl_cond, &thd->ctrl_mutex);
382 pthread_mutex_unlock(&thd->ctrl_mutex);
383 }
384
385 return threads;
386
387err:
388 return NULL;
389}
390
391static
392void
393destroy_worker_threads(comp_thread_ctxt_t *threads, uint n)
394{
395 uint i;
396
397 for (i = 0; i < n; i++) {
398 comp_thread_ctxt_t *thd = threads + i;
399
400 pthread_mutex_lock(&thd->data_mutex);
401 threads[i].cancelled = TRUE;
402 pthread_cond_signal(&thd->data_cond);
403 pthread_mutex_unlock(&thd->data_mutex);
404
405 pthread_join(thd->id, NULL);
406
407 pthread_cond_destroy(&thd->data_cond);
408 pthread_mutex_destroy(&thd->data_mutex);
409 pthread_cond_destroy(&thd->ctrl_cond);
410 pthread_mutex_destroy(&thd->ctrl_mutex);
411
412 my_free(thd->to);
413 }
414
415 my_free(threads);
416}
417
418static
419void *
420compress_worker_thread_func(void *arg)
421{
422 comp_thread_ctxt_t *thd = (comp_thread_ctxt_t *) arg;
423
424 pthread_mutex_lock(&thd->ctrl_mutex);
425
426 pthread_mutex_lock(&thd->data_mutex);
427
428 thd->started = TRUE;
429 pthread_cond_signal(&thd->ctrl_cond);
430
431 pthread_mutex_unlock(&thd->ctrl_mutex);
432
433 while (1) {
434 thd->data_avail = FALSE;
435 pthread_cond_signal(&thd->data_cond);
436
437 while (!thd->data_avail && !thd->cancelled) {
438 pthread_cond_wait(&thd->data_cond, &thd->data_mutex);
439 }
440
441 if (thd->cancelled)
442 break;
443
444 thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len,
445 &thd->state);
446
447 /* qpress uses 0x00010000 as the initial value, but its own
448 Adler-32 implementation treats the value differently:
449 1. higher order bits are the sum of all bytes in the sequence
450 2. lower order bits are the sum of resulting values at every
451 step.
452 So it's the other way around as compared to zlib's adler32().
453 That's why 0x00000001 is being passed here to be compatible
454 with qpress implementation. */
455
456 thd->adler = adler32(0x00000001, (uchar *) thd->to,
457 (uInt)thd->to_len);
458 }
459
460 pthread_mutex_unlock(&thd->data_mutex);
461
462 return NULL;
463}
464