1/******************************************************
2Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
3
4The xbstream utility: serialize/deserialize files in the XBSTREAM format.
5
6This program is free software; you can redistribute it and/or modify
7it under the terms of the GNU General Public License as published by
8the Free Software Foundation; version 2 of the License.
9
10This program is distributed in the hope that it will be useful,
11but WITHOUT ANY WARRANTY; without even the implied warranty of
12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13GNU General Public License for more details.
14
15You should have received a copy of the GNU General Public License
16along with this program; if not, write to the Free Software
17Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
18
19*******************************************************/
20
21#include <my_global.h>
22#include <my_base.h>
23#include <my_getopt.h>
24#include <hash.h>
25#include <my_pthread.h>
26#include "common.h"
27#include "xbstream.h"
28#include "datasink.h"
29#include "crc_glue.h"
30
31#define XBSTREAM_VERSION "1.0"
32#define XBSTREAM_BUFFER_SIZE (10 * 1024 * 1024UL)
33
34#define START_FILE_HASH_SIZE 16
35
36typedef enum {
37 RUN_MODE_NONE,
38 RUN_MODE_CREATE,
39 RUN_MODE_EXTRACT
40} run_mode_t;
41
42/* Need the following definitions to avoid linking with ds_*.o and their link
43dependencies */
44datasink_t datasink_archive;
45datasink_t datasink_xbstream;
46datasink_t datasink_compress;
47datasink_t datasink_tmpfile;
48datasink_t datasink_buffer;
49
50static run_mode_t opt_mode;
51static char * opt_directory = NULL;
52static my_bool opt_verbose = 0;
53static int opt_parallel = 1;
54
55static struct my_option my_long_options[] =
56{
57 {"help", '?', "Display this help and exit.",
58 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
59 {"create", 'c', "Stream the specified files to the standard output.",
60 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
61 {"extract", 'x', "Extract to disk files from the stream on the "
62 "standard input.",
63 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
64 {"directory", 'C', "Change the current directory to the specified one "
65 "before streaming or extracting.", &opt_directory, &opt_directory, 0,
66 GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
67 {"verbose", 'v', "Print verbose output.", &opt_verbose, &opt_verbose,
68 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
69 {"parallel", 'p', "Number of worker threads for reading / writing.",
70 &opt_parallel, &opt_parallel, 0, GET_INT, REQUIRED_ARG,
71 1, 1, INT_MAX, 0, 0, 0},
72
73 {0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
74};
75
76typedef struct {
77 HASH *filehash;
78 xb_rstream_t *stream;
79 ds_ctxt_t *ds_ctxt;
80 pthread_mutex_t *mutex;
81} extract_ctxt_t;
82
83typedef struct {
84 char *path;
85 uint pathlen;
86 my_off_t offset;
87 ds_file_t *file;
88 pthread_mutex_t mutex;
89} file_entry_t;
90
91static int get_options(int *argc, char ***argv);
92static int mode_create(int argc, char **argv);
93static int mode_extract(int n_threads, int argc, char **argv);
94static my_bool get_one_option(int optid, const struct my_option *opt,
95 char *argument);
96
97int
98main(int argc, char **argv)
99{
100 MY_INIT(argv[0]);
101
102 crc_init();
103
104 if (get_options(&argc, &argv)) {
105 goto err;
106 }
107
108 if (opt_mode == RUN_MODE_NONE) {
109 msg("%s: either -c or -x must be specified.\n", my_progname);
110 goto err;
111 }
112
113 /* Change the current directory if -C is specified */
114 if (opt_directory && my_setwd(opt_directory, MYF(MY_WME))) {
115 goto err;
116 }
117
118 if (opt_mode == RUN_MODE_CREATE && mode_create(argc, argv)) {
119 goto err;
120 } else if (opt_mode == RUN_MODE_EXTRACT &&
121 mode_extract(opt_parallel, argc, argv)) {
122 goto err;
123 }
124
125 my_cleanup_options(my_long_options);
126
127 my_end(0);
128
129 return EXIT_SUCCESS;
130err:
131 my_cleanup_options(my_long_options);
132
133 my_end(0);
134
135 exit(EXIT_FAILURE);
136}
137
138static
139int
140get_options(int *argc, char ***argv)
141{
142 int ho_error;
143
144 if ((ho_error= handle_options(argc, argv, my_long_options,
145 get_one_option))) {
146 exit(EXIT_FAILURE);
147 }
148
149 return 0;
150}
151
152static
153void
154print_version(void)
155{
156 printf("%s Ver %s for %s (%s)\n", my_progname, XBSTREAM_VERSION,
157 SYSTEM_TYPE, MACHINE_TYPE);
158}
159
160static
161void
162usage(void)
163{
164 print_version();
165 puts("Copyright (C) 2011-2013 Percona LLC and/or its affiliates.");
166 puts("This software comes with ABSOLUTELY NO WARRANTY. "
167 "This is free software,\nand you are welcome to modify and "
168 "redistribute it under the GPL license.\n");
169
170 puts("Serialize/deserialize files in the XBSTREAM format.\n");
171
172 puts("Usage: ");
173 printf(" %s -c [OPTIONS...] FILES... # stream specified files to "
174 "standard output.\n", my_progname);
175 printf(" %s -x [OPTIONS...] # extract files from the stream"
176 "on the standard input.\n", my_progname);
177
178 puts("\nOptions:");
179 my_print_help(my_long_options);
180}
181
182static
183int
184set_run_mode(run_mode_t mode)
185{
186 if (opt_mode != RUN_MODE_NONE) {
187 msg("%s: can't set specify both -c and -x.\n", my_progname);
188 return 1;
189 }
190
191 opt_mode = mode;
192
193 return 0;
194}
195
196static
197my_bool
198get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
199 char *argument __attribute__((unused)))
200{
201 switch (optid) {
202 case 'c':
203 if (set_run_mode(RUN_MODE_CREATE)) {
204 return TRUE;
205 }
206 break;
207 case 'x':
208 if (set_run_mode(RUN_MODE_EXTRACT)) {
209 return TRUE;
210 }
211 break;
212 case '?':
213 usage();
214 exit(0);
215 }
216
217 return FALSE;
218}
219
220static
221int
222stream_one_file(File file, xb_wstream_file_t *xbfile)
223{
224 uchar *buf;
225 ssize_t bytes;
226 my_off_t offset;
227
228 posix_fadvise(file, 0, 0, POSIX_FADV_SEQUENTIAL);
229 offset = my_tell(file, MYF(MY_WME));
230
231 buf = (uchar*)(my_malloc(XBSTREAM_BUFFER_SIZE, MYF(MY_FAE)));
232
233 while ((bytes = (ssize_t)my_read(file, buf, XBSTREAM_BUFFER_SIZE,
234 MYF(MY_WME))) > 0) {
235 if (xb_stream_write_data(xbfile, buf, bytes)) {
236 msg("%s: xb_stream_write_data() failed.\n",
237 my_progname);
238 my_free(buf);
239 return 1;
240 }
241 posix_fadvise(file, offset, XBSTREAM_BUFFER_SIZE,
242 POSIX_FADV_DONTNEED);
243 offset += XBSTREAM_BUFFER_SIZE;
244
245 }
246
247 my_free(buf);
248
249 if (bytes < 0) {
250 return 1;
251 }
252
253 return 0;
254}
255
256static
257int
258mode_create(int argc, char **argv)
259{
260 int i;
261 MY_STAT mystat;
262 xb_wstream_t *stream;
263
264 if (argc < 1) {
265 msg("%s: no files are specified.\n", my_progname);
266 return 1;
267 }
268
269 stream = xb_stream_write_new();
270 if (stream == NULL) {
271 msg("%s: xb_stream_write_new() failed.\n", my_progname);
272 return 1;
273 }
274
275 for (i = 0; i < argc; i++) {
276 char *filepath = argv[i];
277 File src_file;
278 xb_wstream_file_t *file;
279
280 if (my_stat(filepath, &mystat, MYF(MY_WME)) == NULL) {
281 goto err;
282 }
283 if (!MY_S_ISREG(mystat.st_mode)) {
284 msg("%s: %s is not a regular file, exiting.\n",
285 my_progname, filepath);
286 goto err;
287 }
288
289 if ((src_file = my_open(filepath, O_RDONLY, MYF(MY_WME))) < 0) {
290 msg("%s: failed to open %s.\n", my_progname, filepath);
291 goto err;
292 }
293
294 file = xb_stream_write_open(stream, filepath, &mystat, NULL, NULL);
295 if (file == NULL) {
296 goto err;
297 }
298
299 if (opt_verbose) {
300 msg("%s\n", filepath);
301 }
302
303 if (stream_one_file(src_file, file) ||
304 xb_stream_write_close(file) ||
305 my_close(src_file, MYF(MY_WME))) {
306 goto err;
307 }
308 }
309
310 xb_stream_write_done(stream);
311
312 return 0;
313err:
314 xb_stream_write_done(stream);
315
316 return 1;
317}
318
319static
320file_entry_t *
321file_entry_new(extract_ctxt_t *ctxt, const char *path, uint pathlen)
322{
323 file_entry_t *entry;
324 ds_file_t *file;
325
326 entry = (file_entry_t *) my_malloc(sizeof(file_entry_t),
327 MYF(MY_WME | MY_ZEROFILL));
328 if (entry == NULL) {
329 return NULL;
330 }
331
332 entry->path = my_strndup(path, pathlen, MYF(MY_WME));
333 if (entry->path == NULL) {
334 goto err;
335 }
336 entry->pathlen = pathlen;
337
338 file = ds_open(ctxt->ds_ctxt, path, NULL);
339
340 if (file == NULL) {
341 msg("%s: failed to create file.\n", my_progname);
342 goto err;
343 }
344
345 if (opt_verbose) {
346 msg("%s\n", entry->path);
347 }
348
349 entry->file = file;
350
351 pthread_mutex_init(&entry->mutex, NULL);
352
353 return entry;
354
355err:
356 if (entry->path != NULL) {
357 my_free(entry->path);
358 }
359 my_free(entry);
360
361 return NULL;
362}
363
364static
365uchar *
366get_file_entry_key(file_entry_t *entry, size_t *length,
367 my_bool not_used __attribute__((unused)))
368{
369 *length = entry->pathlen;
370 return (uchar *) entry->path;
371}
372
373static
374void
375file_entry_free(file_entry_t *entry)
376{
377 pthread_mutex_destroy(&entry->mutex);
378 ds_close(entry->file);
379 my_free(entry->path);
380 my_free(entry);
381}
382
383static
384void *
385extract_worker_thread_func(void *arg)
386{
387 xb_rstream_chunk_t chunk;
388 file_entry_t *entry;
389 xb_rstream_result_t res;
390
391 extract_ctxt_t *ctxt = (extract_ctxt_t *) arg;
392
393 my_thread_init();
394
395 memset(&chunk, 0, sizeof(chunk));
396
397 while (1) {
398
399 pthread_mutex_lock(ctxt->mutex);
400 res = xb_stream_read_chunk(ctxt->stream, &chunk);
401
402 if (res != XB_STREAM_READ_CHUNK) {
403 pthread_mutex_unlock(ctxt->mutex);
404 break;
405 }
406
407 /* If unknown type and ignorable flag is set, skip this chunk */
408 if (chunk.type == XB_CHUNK_TYPE_UNKNOWN && \
409 !(chunk.flags & XB_STREAM_FLAG_IGNORABLE)) {
410 pthread_mutex_unlock(ctxt->mutex);
411 continue;
412 }
413
414 /* See if we already have this file open */
415 entry = (file_entry_t *) my_hash_search(ctxt->filehash,
416 (uchar *) chunk.path,
417 chunk.pathlen);
418
419 if (entry == NULL) {
420 entry = file_entry_new(ctxt,
421 chunk.path,
422 chunk.pathlen);
423 if (entry == NULL) {
424 pthread_mutex_unlock(ctxt->mutex);
425 break;
426 }
427 if (my_hash_insert(ctxt->filehash, (uchar *) entry)) {
428 msg("%s: my_hash_insert() failed.\n",
429 my_progname);
430 pthread_mutex_unlock(ctxt->mutex);
431 break;
432 }
433 }
434
435 pthread_mutex_lock(&entry->mutex);
436
437 pthread_mutex_unlock(ctxt->mutex);
438
439 res = xb_stream_validate_checksum(&chunk);
440
441 if (res != XB_STREAM_READ_CHUNK) {
442 pthread_mutex_unlock(&entry->mutex);
443 break;
444 }
445
446 if (chunk.type == XB_CHUNK_TYPE_EOF) {
447 pthread_mutex_lock(ctxt->mutex);
448 pthread_mutex_unlock(&entry->mutex);
449 my_hash_delete(ctxt->filehash, (uchar *) entry);
450 pthread_mutex_unlock(ctxt->mutex);
451
452 continue;
453 }
454
455 if (entry->offset != chunk.offset) {
456 msg("%s: out-of-order chunk: real offset = 0x%llx, "
457 "expected offset = 0x%llx\n", my_progname,
458 chunk.offset, entry->offset);
459 pthread_mutex_unlock(&entry->mutex);
460 res = XB_STREAM_READ_ERROR;
461 break;
462 }
463
464 if (ds_write(entry->file, chunk.data, chunk.length)) {
465 msg("%s: my_write() failed.\n", my_progname);
466 pthread_mutex_unlock(&entry->mutex);
467 res = XB_STREAM_READ_ERROR;
468 break;
469 }
470
471 entry->offset += chunk.length;
472
473 pthread_mutex_unlock(&entry->mutex);
474 }
475
476 if (chunk.data)
477 my_free(chunk.data);
478
479 my_thread_end();
480
481 return (void *)(res);
482}
483
484
485static
486int
487mode_extract(int n_threads, int argc __attribute__((unused)),
488 char **argv __attribute__((unused)))
489{
490 xb_rstream_t *stream = NULL;
491 HASH filehash;
492 ds_ctxt_t *ds_ctxt = NULL;
493 extract_ctxt_t ctxt;
494 int i;
495 pthread_t *tids = NULL;
496 void **retvals = NULL;
497 pthread_mutex_t mutex;
498 int ret = 0;
499
500 if (my_hash_init(&filehash, &my_charset_bin, START_FILE_HASH_SIZE,
501 0, 0, (my_hash_get_key) get_file_entry_key,
502 (my_hash_free_key) file_entry_free, MYF(0))) {
503 msg("%s: failed to initialize file hash.\n", my_progname);
504 return 1;
505 }
506
507 if (pthread_mutex_init(&mutex, NULL)) {
508 msg("%s: failed to initialize mutex.\n", my_progname);
509 my_hash_free(&filehash);
510 return 1;
511 }
512
513 /* If --directory is specified, it is already set as CWD by now. */
514 ds_ctxt = ds_create(".", DS_TYPE_LOCAL);
515 if (ds_ctxt == NULL) {
516 ret = 1;
517 goto exit;
518 }
519
520
521 stream = xb_stream_read_new();
522 if (stream == NULL) {
523 msg("%s: xb_stream_read_new() failed.\n", my_progname);
524 pthread_mutex_destroy(&mutex);
525 ret = 1;
526 goto exit;
527 }
528
529 ctxt.stream = stream;
530 ctxt.filehash = &filehash;
531 ctxt.ds_ctxt = ds_ctxt;
532 ctxt.mutex = &mutex;
533
534 tids = calloc(n_threads, sizeof(pthread_t));
535 retvals = calloc(n_threads, sizeof(void*));
536
537 for (i = 0; i < n_threads; i++)
538 pthread_create(tids + i, NULL, extract_worker_thread_func,
539 &ctxt);
540
541 for (i = 0; i < n_threads; i++)
542 pthread_join(tids[i], retvals + i);
543
544 for (i = 0; i < n_threads; i++) {
545 if ((size_t)retvals[i] == XB_STREAM_READ_ERROR) {
546 ret = 1;
547 goto exit;
548 }
549 }
550
551exit:
552 pthread_mutex_destroy(&mutex);
553
554 free(tids);
555 free(retvals);
556
557 my_hash_free(&filehash);
558 if (ds_ctxt != NULL) {
559 ds_destroy(ds_ctxt);
560 }
561 xb_stream_read_done(stream);
562
563 return ret;
564}
565