1/* Copyright (C) 2006-2008 MySQL AB
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
15
16#include "../maria_def.h"
17#include <stdio.h>
18#include <errno.h>
19#include <tap.h>
20#include "../trnman.h"
21
22extern my_bool maria_log_remove(const char *testdir);
23extern char *create_tmpdir(const char *progname);
24
25#ifndef DBUG_OFF
26static const char *default_dbug_option;
27#endif
28
29#define PCACHE_SIZE (1024*1024*10)
30
31#define LOG_FILE_SIZE (1024L*1024L*1024L + 1024L*1024L*512)
32/*#define LOG_FLAGS TRANSLOG_SECTOR_PROTECTION | TRANSLOG_PAGE_CRC */
33#define LOG_FLAGS 0
34/*#define LONG_BUFFER_SIZE (1024L*1024L*1024L + 1024L*1024L*512)*/
35
36#ifdef MULTIFLUSH_TEST
37
38#define LONG_BUFFER_SZ (16384L)
39#define MIN_REC_LENGTH 10
40#define SHOW_DIVIDER 20
41#define ITERATIONS 10000
42#define FLUSH_ITERATIONS 1000
43#define WRITERS 2
44#define FLUSHERS 10
45
46#else
47
48#define LONG_BUFFER_SZ (512L*1024L*1024L)
49#define MIN_REC_LENGTH 30
50#define SHOW_DIVIDER 10
51#define ITERATIONS 3
52#define FLUSH_ITERATIONS 0
53#define WRITERS 3
54#define FLUSHERS 0
55
56#endif
57
58#define LONG_BUFFER_SIZE (LONG_BUFFER_SZ >> (skip_big_tests ? 4 : 0))
59
60static uint number_of_writers= WRITERS;
61static uint number_of_flushers= FLUSHERS;
62
63static pthread_cond_t COND_thread_count;
64static pthread_mutex_t LOCK_thread_count;
65static uint thread_count;
66
67static ulong lens[WRITERS][ITERATIONS];
68static LSN lsns1[WRITERS][ITERATIONS];
69static LSN lsns2[WRITERS][ITERATIONS];
70static uchar *long_buffer;
71
72
73static LSN last_lsn; /* For test purposes the variable allow dirty read/write */
74
75/*
76 Get pseudo-random length of the field in
77 limits [MIN_REC_LENGTH..LONG_BUFFER_SIZE]
78
79 SYNOPSIS
80 get_len()
81
82 RETURN
83 length - length >= 0 length <= LONG_BUFFER_SIZE
84*/
85
86static uint32 get_len()
87{
88 return MIN_REC_LENGTH +
89 (uint32)(((ulonglong)rand())*
90 (LONG_BUFFER_SIZE - MIN_REC_LENGTH - 1)/RAND_MAX);
91}
92
93
94/*
95 Check that the buffer filled correctly
96
97 SYNOPSIS
98 check_content()
99 ptr Pointer to the buffer
100 length length of the buffer
101
102 RETURN
103 0 - OK
104 1 - Error
105*/
106
107static my_bool check_content(uchar *ptr, ulong length)
108{
109 ulong i;
110 for (i= 0; i < length; i++)
111 {
112 if (((uchar)ptr[i]) != (i & 0xFF))
113 {
114 fprintf(stderr, "Byte # %lu is %x instead of %x",
115 i, (uint) ptr[i], (uint) (i & 0xFF));
116 return 1;
117 }
118 }
119 return 0;
120}
121
122
123/*
124 Read whole record content, and check content (put with offset)
125
126 SYNOPSIS
127 read_and_check_content()
128 rec The record header buffer
129 buffer The buffer to read the record in
130 skip Skip this number of bytes ot the record content
131
132 RETURN
133 0 - OK
134 1 - Error
135*/
136
137
138static my_bool read_and_check_content(TRANSLOG_HEADER_BUFFER *rec,
139 uchar *buffer, uint skip)
140{
141 int res= 0;
142 translog_size_t len;
143
144 if ((len= translog_read_record(rec->lsn, 0, rec->record_length,
145 buffer, NULL)) != rec->record_length)
146 {
147 fprintf(stderr, "Requested %lu byte, read %lu\n",
148 (ulong) rec->record_length, (ulong) len);
149 res= 1;
150 }
151 res|= check_content(buffer + skip, rec->record_length - skip);
152 return(res);
153}
154
155void writer(int num)
156{
157 LSN lsn;
158 TRN trn;
159 uchar long_tr_id[6];
160 uint i;
161
162 trn.short_id= num;
163 trn.first_undo_lsn= TRANSACTION_LOGGED_LONG_ID;
164 for (i= 0; i < ITERATIONS; i++)
165 {
166 uint len= get_len();
167 LEX_CUSTRING parts[TRANSLOG_INTERNAL_PARTS + 1];
168 lens[num][i]= len;
169
170 int2store(long_tr_id, num);
171 int4store(long_tr_id + 2, i);
172 parts[TRANSLOG_INTERNAL_PARTS + 0].str= long_tr_id;
173 parts[TRANSLOG_INTERNAL_PARTS + 0].length= 6;
174 if (translog_write_record(&lsn,
175 LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
176 &trn, NULL, 6, TRANSLOG_INTERNAL_PARTS + 1,
177 parts, NULL, NULL))
178 {
179 fprintf(stderr, "Can't write LOGREC_FIXED_RECORD_0LSN_EXAMPLE record #%lu "
180 "thread %i\n", (ulong) i, num);
181 translog_destroy();
182 pthread_mutex_lock(&LOCK_thread_count);
183 ok(0, "write records");
184 pthread_mutex_unlock(&LOCK_thread_count);
185 return;
186 }
187 lsns1[num][i]= lsn;
188 parts[TRANSLOG_INTERNAL_PARTS + 0].str= long_buffer;
189 parts[TRANSLOG_INTERNAL_PARTS + 0].length= len;
190 if (translog_write_record(&lsn,
191 LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE,
192 &trn, NULL,
193 len, TRANSLOG_INTERNAL_PARTS + 1,
194 parts, NULL, NULL))
195 {
196 fprintf(stderr, "Can't write variable record #%lu\n", (ulong) i);
197 translog_destroy();
198 pthread_mutex_lock(&LOCK_thread_count);
199 ok(0, "write records");
200 pthread_mutex_unlock(&LOCK_thread_count);
201 return;
202 }
203 lsns2[num][i]= lsn;
204 last_lsn= lsn;
205 pthread_mutex_lock(&LOCK_thread_count);
206 ok(1, "write records");
207 pthread_mutex_unlock(&LOCK_thread_count);
208 }
209 return;
210}
211
212
213static void *test_thread_writer(void *arg)
214{
215 int param= *((int*) arg);
216
217 my_thread_init();
218
219 writer(param);
220
221 pthread_mutex_lock(&LOCK_thread_count);
222 thread_count--;
223 ok(1, "writer finished"); /* just to show progress */
224 pthread_cond_signal(&COND_thread_count); /* Tell main we are
225 ready */
226 pthread_mutex_unlock(&LOCK_thread_count);
227 free((uchar*) arg);
228 my_thread_end();
229 return(0);
230}
231
232
233static void *test_thread_flusher(void *arg)
234{
235 int param= *((int*) arg);
236 int i;
237
238 my_thread_init();
239
240 for(i= 0; i < FLUSH_ITERATIONS; i++)
241 {
242 translog_flush(last_lsn);
243 pthread_mutex_lock(&LOCK_thread_count);
244 ok(1, "-- flush %d", param);
245 pthread_mutex_unlock(&LOCK_thread_count);
246 }
247
248 pthread_mutex_lock(&LOCK_thread_count);
249 thread_count--;
250 ok(1, "flusher finished"); /* just to show progress */
251 pthread_cond_signal(&COND_thread_count); /* Tell main we are
252 ready */
253 pthread_mutex_unlock(&LOCK_thread_count);
254 free((uchar*) arg);
255 my_thread_end();
256 return(0);
257}
258
259
260int main(int argc __attribute__((unused)),
261 char **argv __attribute__ ((unused)))
262{
263 uint32 i;
264 PAGECACHE pagecache;
265 LSN first_lsn;
266 TRANSLOG_HEADER_BUFFER rec;
267 struct st_translog_scanner_data scanner;
268 pthread_t tid;
269 pthread_attr_t thr_attr;
270 int *param, error;
271 int rc;
272 MY_INIT(argv[0]);
273
274 // plan read MYTAP_CONFIG so skip_big_tests will be set before using
275 plan(WRITERS + FLUSHERS +
276 ITERATIONS * WRITERS * 3 + FLUSH_ITERATIONS * FLUSHERS );
277 /* We don't need to do physical syncs in this test */
278 my_disable_sync= 1;
279
280 bzero(&pagecache, sizeof(pagecache));
281 maria_data_root= create_tmpdir(argv[0]);
282 if (maria_log_remove(0))
283 exit(1);
284
285 long_buffer= malloc(LONG_BUFFER_SIZE + 7 * 2 + 2);
286 if (long_buffer == 0)
287 {
288 fprintf(stderr, "End of memory\n");
289 exit(1);
290 }
291 for (i= 0; i < (uint32)(LONG_BUFFER_SIZE + 7 * 2 + 2); i++)
292 long_buffer[i]= (i & 0xFF);
293
294#ifndef DBUG_OFF
295#if defined(__WIN__)
296 default_dbug_option= "d:t:i:O,\\ma_test_loghandler.trace";
297#else
298 default_dbug_option= "d:t:i:o,/tmp/ma_test_loghandler.trace";
299#endif
300 if (argc > 1)
301 {
302 DBUG_SET(default_dbug_option);
303 DBUG_SET_INITIAL(default_dbug_option);
304 }
305#endif
306
307
308 if ((error= pthread_cond_init(&COND_thread_count, NULL)))
309 {
310 fprintf(stderr, "COND_thread_count: %d from pthread_cond_init "
311 "(errno: %d)\n", error, errno);
312 exit(1);
313 }
314 if ((error= pthread_mutex_init(&LOCK_thread_count, MY_MUTEX_INIT_FAST)))
315 {
316 fprintf(stderr, "LOCK_thread_count: %d from pthread_cond_init "
317 "(errno: %d)\n", error, errno);
318 exit(1);
319 }
320 if ((error= pthread_attr_init(&thr_attr)))
321 {
322 fprintf(stderr, "Got error: %d from pthread_attr_init "
323 "(errno: %d)\n", error, errno);
324 exit(1);
325 }
326 if ((error= pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED)))
327 {
328 fprintf(stderr,
329 "Got error: %d from pthread_attr_setdetachstate (errno: %d)\n",
330 error, errno);
331 exit(1);
332 }
333
334#ifdef HAVE_THR_SETCONCURRENCY
335 thr_setconcurrency(2);
336#endif
337
338 if (ma_control_file_open(TRUE, TRUE))
339 {
340 fprintf(stderr, "Can't init control file (%d)\n", errno);
341 exit(1);
342 }
343 if (init_pagecache(&pagecache, PCACHE_SIZE, 0, 0,
344 TRANSLOG_PAGE_SIZE, 0, 0) == 0)
345 {
346 fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno);
347 exit(1);
348 }
349 if (translog_init_with_table(maria_data_root, LOG_FILE_SIZE, 50112, 0, &pagecache,
350 LOG_FLAGS, 0, &translog_example_table_init,
351 0))
352 {
353 fprintf(stderr, "Can't init loghandler (%d)\n", errno);
354 exit(1);
355 }
356 /* Suppressing of automatic record writing */
357 dummy_transaction_object.first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID;
358
359 srand(122334817L);
360 {
361 LEX_CUSTRING parts[TRANSLOG_INTERNAL_PARTS + 1];
362 uchar long_tr_id[6]=
363 {
364 0x11, 0x22, 0x33, 0x44, 0x55, 0x66
365 };
366
367 parts[TRANSLOG_INTERNAL_PARTS + 0].str= long_tr_id;
368 parts[TRANSLOG_INTERNAL_PARTS + 0].length= 6;
369 dummy_transaction_object.first_undo_lsn= TRANSACTION_LOGGED_LONG_ID;
370 if (translog_write_record(&first_lsn,
371 LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
372 &dummy_transaction_object, NULL, 6,
373 TRANSLOG_INTERNAL_PARTS + 1,
374 parts, NULL, NULL))
375 {
376 fprintf(stderr, "Can't write the first record\n");
377 translog_destroy();
378 exit(1);
379 }
380 }
381
382
383 pthread_mutex_lock(&LOCK_thread_count);
384 while (number_of_writers != 0 || number_of_flushers != 0)
385 {
386 if (number_of_writers)
387 {
388 param= (int*) malloc(sizeof(int));
389 *param= number_of_writers - 1;
390 if ((error= pthread_create(&tid, &thr_attr, test_thread_writer,
391 (void*) param)))
392 {
393 fprintf(stderr, "Got error: %d from pthread_create (errno: %d)\n",
394 error, errno);
395 exit(1);
396 }
397 thread_count++;
398 number_of_writers--;
399 }
400 if (number_of_flushers)
401 {
402 param= (int*) malloc(sizeof(int));
403 *param= number_of_flushers - 1;
404 if ((error= pthread_create(&tid, &thr_attr, test_thread_flusher,
405 (void*) param)))
406 {
407 fprintf(stderr, "Got error: %d from pthread_create (errno: %d)\n",
408 error, errno);
409 exit(1);
410 }
411 thread_count++;
412 number_of_flushers--;
413 }
414 }
415 pthread_mutex_unlock(&LOCK_thread_count);
416
417 pthread_attr_destroy(&thr_attr);
418
419 /* wait finishing */
420 pthread_mutex_lock(&LOCK_thread_count);
421 while (thread_count)
422 {
423 if ((error= pthread_cond_wait(&COND_thread_count, &LOCK_thread_count)))
424 fprintf(stderr, "COND_thread_count: %d from pthread_cond_wait\n", error);
425 }
426 pthread_mutex_unlock(&LOCK_thread_count);
427
428 /* Find last LSN and flush up to it (all our log) */
429 {
430 LSN max= 0;
431 for (i= 0; i < WRITERS; i++)
432 {
433 if (cmp_translog_addr(lsns2[i][ITERATIONS - 1], max) > 0)
434 max= lsns2[i][ITERATIONS - 1];
435 }
436 translog_flush(max);
437 }
438
439 rc= 1;
440
441 {
442 uint indeces[WRITERS];
443 uint index, stage;
444 int len;
445 bzero(indeces, sizeof(uint) * WRITERS);
446
447 bzero(indeces, sizeof(indeces));
448
449 if (translog_scanner_init(first_lsn, 1, &scanner, 0))
450 {
451 fprintf(stderr, "scanner init failed\n");
452 goto err;
453 }
454 for (i= 0;; i++)
455 {
456 len= translog_read_next_record_header(&scanner, &rec);
457
458 if (len == RECHEADER_READ_ERROR)
459 {
460 fprintf(stderr, "1-%d translog_read_next_record_header failed (%d)\n",
461 i, errno);
462 translog_free_record_header(&rec);
463 goto err;
464 }
465 if (len == RECHEADER_READ_EOF)
466 {
467 if (i != WRITERS * ITERATIONS * 2)
468 {
469 fprintf(stderr, "EOL met at iteration %u instead of %u\n",
470 i, ITERATIONS * WRITERS * 2);
471 translog_free_record_header(&rec);
472 goto err;
473 }
474 break;
475 }
476 index= indeces[rec.short_trid] / 2;
477 stage= indeces[rec.short_trid] % 2;
478 if (stage == 0)
479 {
480 if (rec.type !=LOGREC_FIXED_RECORD_0LSN_EXAMPLE ||
481 rec.record_length != 6 ||
482 uint2korr(rec.header) != rec.short_trid ||
483 index != uint4korr(rec.header + 2) ||
484 cmp_translog_addr(lsns1[rec.short_trid][index], rec.lsn) != 0)
485 {
486 fprintf(stderr, "Incorrect LOGREC_FIXED_RECORD_0LSN_EXAMPLE "
487 "data read(%d)\n"
488 "type %u, strid %u %u, len %u, i: %u %u, "
489 "lsn" LSN_FMT " " LSN_FMT "\n",
490 i, (uint) rec.type,
491 (uint) rec.short_trid, (uint) uint2korr(rec.header),
492 (uint) rec.record_length,
493 (uint) index, (uint) uint4korr(rec.header + 2),
494 LSN_IN_PARTS(rec.lsn),
495 LSN_IN_PARTS(lsns1[rec.short_trid][index]));
496 translog_free_record_header(&rec);
497 goto err;
498 }
499 }
500 else
501 {
502 if (rec.type != LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE ||
503 len != 9 ||
504 rec.record_length != lens[rec.short_trid][index] ||
505 cmp_translog_addr(lsns2[rec.short_trid][index], rec.lsn) != 0 ||
506 check_content(rec.header, (uint)len))
507 {
508 fprintf(stderr,
509 "Incorrect LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE "
510 "data read(%d) "
511 "thread: %d, iteration %d, stage %d\n"
512 "type %u (%d), len %d, length %lu %lu (%d) "
513 "lsn" LSN_FMT " " LSN_FMT "\n",
514 i, (uint) rec.short_trid, index, stage,
515 (uint) rec.type, (rec.type !=
516 LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE),
517 len,
518 (ulong) rec.record_length, lens[rec.short_trid][index],
519 (rec.record_length != lens[rec.short_trid][index]),
520 LSN_IN_PARTS(rec.lsn),
521 LSN_IN_PARTS(lsns2[rec.short_trid][index]));
522 translog_free_record_header(&rec);
523 goto err;
524 }
525 if (read_and_check_content(&rec, long_buffer, 0))
526 {
527 fprintf(stderr,
528 "Incorrect LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE "
529 "in whole rec read lsn" LSN_FMT "\n",
530 LSN_IN_PARTS(rec.lsn));
531 translog_free_record_header(&rec);
532 goto err;
533 }
534 }
535 ok(1, "record read");
536 translog_free_record_header(&rec);
537 indeces[rec.short_trid]++;
538 }
539 }
540
541 rc= 0;
542err:
543 if (rc)
544 ok(0, "record read");
545 translog_destroy();
546 end_pagecache(&pagecache, 1);
547 ma_control_file_end();
548 if (maria_log_remove(maria_data_root))
549 exit(1);
550
551 my_uuid_end();
552 my_free_open_file_info();
553 my_end(0);
554 return(exit_status());
555}
556
557#include "../ma_check_standalone.h"
558