1/* Copyright (C) 2013 Codership Oy <info@codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License along
13 with this program; if not, write to the Free Software Foundation, Inc.,
14 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
15
16#include "mariadb.h"
17#include "wsrep_binlog.h"
18#include "wsrep_priv.h"
19#include "log.h"
20#include "log_event.h"
21#include "wsrep_applier.h"
22
23extern handlerton *binlog_hton;
24/*
25 Write the contents of a cache to a memory buffer.
26
27 This function quite the same as MYSQL_BIN_LOG::write_cache(),
28 with the exception that here we write in buffer instead of log file.
29 */
30int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len)
31{
32 *buf= NULL;
33 *buf_len= 0;
34 my_off_t const saved_pos(my_b_tell(cache));
35 DBUG_ENTER("wsrep_write_cache_buf");
36
37 if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
38 {
39 WSREP_ERROR("failed to initialize io-cache");
40 DBUG_RETURN(ER_ERROR_ON_WRITE);
41 }
42
43 uint length = my_b_bytes_in_cache(cache);
44 if (unlikely(0 == length)) length = my_b_fill(cache);
45
46 size_t total_length = 0;
47
48 if (likely(length > 0)) do
49 {
50 total_length += length;
51 /*
52 Bail out if buffer grows too large.
53 A temporary fix to avoid allocating indefinitely large buffer,
54 not a real limit on a writeset size which includes other things
55 like header and keys.
56 */
57 if (total_length > wsrep_max_ws_size)
58 {
59 WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
60 wsrep_max_ws_size, total_length);
61 goto error;
62 }
63 uchar* tmp = (uchar *)my_realloc(*buf, total_length,
64 MYF(MY_ALLOW_ZERO_PTR));
65 if (!tmp)
66 {
67 WSREP_ERROR("could not (re)allocate buffer: %zu + %u",
68 *buf_len, length);
69 goto error;
70 }
71 *buf = tmp;
72
73 memcpy(*buf + *buf_len, cache->read_pos, length);
74 *buf_len = total_length;
75
76 if (cache->file < 0)
77 {
78 cache->read_pos= cache->read_end;
79 break;
80 }
81 } while ((length = my_b_fill(cache)));
82
83 if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
84 {
85 WSREP_WARN("failed to initialize io-cache");
86 goto cleanup;
87 }
88
89 DBUG_RETURN(0);
90
91error:
92 if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
93 {
94 WSREP_WARN("failed to initialize io-cache");
95 }
96cleanup:
97 my_free(*buf);
98 *buf= NULL;
99 *buf_len= 0;
100 DBUG_RETURN(ER_ERROR_ON_WRITE);
101}
102
103#define STACK_SIZE 4096 /* 4K - for buffer preallocated on the stack:
104 * many transactions would fit in there
105 * so there is no need to reach for the heap */
106
107/* Returns minimum multiple of HEAP_PAGE_SIZE that is >= length */
108static inline size_t
109heap_size(size_t length)
110{
111 return (length + HEAP_PAGE_SIZE - 1)/HEAP_PAGE_SIZE*HEAP_PAGE_SIZE;
112}
113
114/* append data to writeset */
115static inline wsrep_status_t
116wsrep_append_data(wsrep_t* const wsrep,
117 wsrep_ws_handle_t* const ws,
118 const void* const data,
119 size_t const len)
120{
121 struct wsrep_buf const buff = { data, len };
122 wsrep_status_t const rc(wsrep->append_data(wsrep, ws, &buff, 1,
123 WSREP_DATA_ORDERED, true));
124 DBUG_DUMP("buff", (uchar*) data, len);
125 if (rc != WSREP_OK)
126 {
127 WSREP_WARN("append_data() returned %d", rc);
128 }
129
130 return rc;
131}
132
133/*
134 Write the contents of a cache to wsrep provider.
135
136 This function quite the same as MYSQL_BIN_LOG::write_cache(),
137 with the exception that here we write in buffer instead of log file.
138
139 This version reads all of cache into single buffer and then appends to a
140 writeset at once.
141 */
142static int wsrep_write_cache_once(wsrep_t* const wsrep,
143 THD* const thd,
144 IO_CACHE* const cache,
145 size_t* const len)
146{
147 my_off_t const saved_pos(my_b_tell(cache));
148 DBUG_ENTER("wsrep_write_cache_once");
149
150 if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
151 {
152 WSREP_ERROR("failed to initialize io-cache");
153 DBUG_RETURN(ER_ERROR_ON_WRITE);
154 }
155
156 int err(WSREP_OK);
157
158 size_t total_length(0);
159 uchar stack_buf[STACK_SIZE]; /* to avoid dynamic allocations for few data*/
160 uchar* heap_buf(NULL);
161 uchar* buf(stack_buf);
162 size_t allocated(sizeof(stack_buf));
163 size_t used(0);
164
165 uint length(my_b_bytes_in_cache(cache));
166 if (unlikely(0 == length)) length = my_b_fill(cache);
167
168 if (likely(length > 0)) do
169 {
170 total_length += length;
171 /*
172 Bail out if buffer grows too large.
173 A temporary fix to avoid allocating indefinitely large buffer,
174 not a real limit on a writeset size which includes other things
175 like header and keys.
176 */
177 if (unlikely(total_length > wsrep_max_ws_size))
178 {
179 WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
180 wsrep_max_ws_size, total_length);
181 err = WSREP_TRX_SIZE_EXCEEDED;
182 goto cleanup;
183 }
184
185 if (total_length > allocated)
186 {
187 size_t const new_size(heap_size(total_length));
188 uchar* tmp = (uchar *)my_realloc(heap_buf, new_size,
189 MYF(MY_ALLOW_ZERO_PTR));
190 if (!tmp)
191 {
192 WSREP_ERROR("could not (re)allocate buffer: %zu + %u",
193 allocated, length);
194 err = WSREP_TRX_SIZE_EXCEEDED;
195 goto cleanup;
196 }
197
198 heap_buf = tmp;
199 buf = heap_buf;
200 allocated = new_size;
201
202 if (used <= STACK_SIZE && used > 0) // there's data in stack_buf
203 {
204 DBUG_ASSERT(buf == stack_buf);
205 memcpy(heap_buf, stack_buf, used);
206 }
207 }
208
209 memcpy(buf + used, cache->read_pos, length);
210 used = total_length;
211 if (cache->file < 0)
212 {
213 cache->read_pos= cache->read_end;
214 break;
215 }
216 } while ((length = my_b_fill(cache)));
217
218 if (used > 0)
219 err = wsrep_append_data(wsrep, &thd->wsrep_ws_handle, buf, used);
220
221 if (WSREP_OK == err) *len = total_length;
222
223cleanup:
224 if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
225 {
226 WSREP_ERROR("failed to reinitialize io-cache");
227 }
228
229 if (unlikely(WSREP_OK != err))
230 {
231 wsrep_dump_rbr_buf_with_header(thd, buf, used);
232 }
233
234 my_free(heap_buf);
235 DBUG_RETURN(err);
236}
237
238/*
239 Write the contents of a cache to wsrep provider.
240
241 This function quite the same as MYSQL_BIN_LOG::write_cache(),
242 with the exception that here we write in buffer instead of log file.
243
244 This version uses incremental data appending as it reads it from cache.
245 */
246static int wsrep_write_cache_inc(wsrep_t* const wsrep,
247 THD* const thd,
248 IO_CACHE* const cache,
249 size_t* const len)
250{
251 my_off_t const saved_pos(my_b_tell(cache));
252 DBUG_ENTER("wsrep_write_cache_inc");
253
254 if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
255 {
256 WSREP_ERROR("failed to initialize io-cache");
257 DBUG_RETURN(WSREP_TRX_ERROR);
258 }
259
260 int err(WSREP_OK);
261
262 size_t total_length(0);
263
264 uint length(my_b_bytes_in_cache(cache));
265 if (unlikely(0 == length)) length = my_b_fill(cache);
266
267 if (likely(length > 0)) do
268 {
269 total_length += length;
270 /* bail out if buffer grows too large
271 not a real limit on a writeset size which includes other things
272 like header and keys.
273 */
274 if (unlikely(total_length > wsrep_max_ws_size))
275 {
276 WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
277 wsrep_max_ws_size, total_length);
278 err = WSREP_TRX_SIZE_EXCEEDED;
279 goto cleanup;
280 }
281
282 if(WSREP_OK != (err=wsrep_append_data(wsrep, &thd->wsrep_ws_handle,
283 cache->read_pos, length)))
284 goto cleanup;
285
286 if (cache->file < 0)
287 {
288 cache->read_pos= cache->read_end;
289 break;
290 }
291 } while ((length = my_b_fill(cache)));
292
293 if (WSREP_OK == err) *len = total_length;
294
295cleanup:
296 if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
297 {
298 WSREP_ERROR("failed to reinitialize io-cache");
299 }
300
301 DBUG_RETURN(err);
302}
303
304/*
305 Write the contents of a cache to wsrep provider.
306
307 This function quite the same as MYSQL_BIN_LOG::write_cache(),
308 with the exception that here we write in buffer instead of log file.
309 */
310int wsrep_write_cache(wsrep_t* const wsrep,
311 THD* const thd,
312 IO_CACHE* const cache,
313 size_t* const len)
314{
315 if (wsrep_incremental_data_collection) {
316 return wsrep_write_cache_inc(wsrep, thd, cache, len);
317 }
318 else {
319 return wsrep_write_cache_once(wsrep, thd, cache, len);
320 }
321}
322
323void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len)
324{
325 char filename[PATH_MAX]= {0};
326 int len= snprintf(filename, PATH_MAX, "%s/GRA_%lld_%lld.log",
327 wsrep_data_home_dir, (longlong) thd->thread_id,
328 (longlong) wsrep_thd_trx_seqno(thd));
329 if (len >= PATH_MAX)
330 {
331 WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len);
332 return;
333 }
334
335 FILE *of= fopen(filename, "wb");
336
337 if (of)
338 {
339 if (fwrite(rbr_buf, buf_len, 1, of) == 0)
340 WSREP_ERROR("Failed to write buffer of length %llu to '%s'",
341 (unsigned long long)buf_len, filename);
342
343 fclose(of);
344 }
345 else
346 {
347 WSREP_ERROR("Failed to open file '%s': %d (%s)",
348 filename, errno, strerror(errno));
349 }
350}
351
352/*
353 wsrep exploits binlog's caches even if binlogging itself is not
354 activated. In such case connection close needs calling
355 actual binlog's method.
356 Todo: split binlog hton from its caches to use ones by wsrep
357 without referring to binlog's stuff.
358*/
359int wsrep_binlog_close_connection(THD* thd)
360{
361 DBUG_ENTER("wsrep_binlog_close_connection");
362 if (thd_get_ha_data(thd, binlog_hton) != NULL)
363 binlog_hton->close_connection (binlog_hton, thd);
364 DBUG_RETURN(0);
365}
366
367int wsrep_binlog_savepoint_set(THD *thd, void *sv)
368{
369 if (!wsrep_emulate_bin_log) return 0;
370 int rcode = binlog_hton->savepoint_set(binlog_hton, thd, sv);
371 return rcode;
372}
373
374int wsrep_binlog_savepoint_rollback(THD *thd, void *sv)
375{
376 if (!wsrep_emulate_bin_log) return 0;
377 int rcode = binlog_hton->savepoint_rollback(binlog_hton, thd, sv);
378 return rcode;
379}
380
381#if 0
382void wsrep_dump_rbr_direct(THD* thd, IO_CACHE* cache)
383{
384 char filename[PATH_MAX]= {0};
385 int len= snprintf(filename, PATH_MAX, "%s/GRA_%lld_%lld.log",
386 wsrep_data_home_dir, (longlong) thd->thread_id,
387 (longlong) wsrep_thd_trx_seqno(thd));
388 size_t bytes_in_cache = 0;
389 // check path
390 if (len >= PATH_MAX)
391 {
392 WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len);
393 return ;
394 }
395 // init cache
396 my_off_t const saved_pos(my_b_tell(cache));
397 if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
398 {
399 WSREP_ERROR("failed to initialize io-cache");
400 return ;
401 }
402 // open file
403 FILE* of = fopen(filename, "wb");
404 if (!of)
405 {
406 WSREP_ERROR("Failed to open file '%s': %d (%s)",
407 filename, errno, strerror(errno));
408 goto cleanup;
409 }
410 // ready to write
411 bytes_in_cache= my_b_bytes_in_cache(cache);
412 if (unlikely(bytes_in_cache == 0)) bytes_in_cache = my_b_fill(cache);
413 if (likely(bytes_in_cache > 0)) do
414 {
415 if (my_fwrite(of, cache->read_pos, bytes_in_cache,
416 MYF(MY_WME | MY_NABP)) == (size_t) -1)
417 {
418 WSREP_ERROR("Failed to write file '%s'", filename);
419 goto cleanup;
420 }
421
422 if (cache->file < 0)
423 {
424 cache->read_pos= cache->read_end;
425 break;
426 }
427 } while ((bytes_in_cache= my_b_fill(cache)));
428 if (cache->error == -1)
429 {
430 WSREP_ERROR("RBR inconsistent");
431 goto cleanup;
432 }
433cleanup:
434 // init back
435 if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
436 {
437 WSREP_ERROR("failed to reinitialize io-cache");
438 }
439 // close file
440 if (of) fclose(of);
441}
442#endif
443
444void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end)
445{
446 thd->binlog_flush_pending_rows_event(stmt_end);
447}
448
449/* Dump replication buffer along with header to a file. */
450void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf,
451 size_t buf_len)
452{
453 DBUG_ENTER("wsrep_dump_rbr_buf_with_header");
454
455 char filename[PATH_MAX]= {0};
456 File file;
457 IO_CACHE cache;
458 Log_event_writer writer(&cache, 0);
459 Format_description_log_event *ev= 0;
460
461 int len= my_snprintf(filename, PATH_MAX, "%s/GRA_%lld_%lld_v2.log",
462 wsrep_data_home_dir, (longlong) thd->thread_id,
463 (long long) wsrep_thd_trx_seqno(thd));
464
465 if (len >= PATH_MAX)
466 {
467 WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len);
468 DBUG_VOID_RETURN;
469 }
470
471 if ((file= mysql_file_open(key_file_wsrep_gra_log, filename,
472 O_RDWR | O_CREAT | O_BINARY, MYF(MY_WME))) < 0)
473 {
474 WSREP_ERROR("Failed to open file '%s' : %d (%s)",
475 filename, errno, strerror(errno));
476 goto cleanup1;
477 }
478
479 if (init_io_cache(&cache, file, 0, WRITE_CACHE, 0, 0, MYF(MY_WME | MY_NABP)))
480 {
481 mysql_file_close(file, MYF(MY_WME));
482 goto cleanup2;
483 }
484
485 if (my_b_safe_write(&cache, BINLOG_MAGIC, BIN_LOG_HEADER_SIZE))
486 {
487 goto cleanup2;
488 }
489
490 /*
491 Instantiate an FDLE object for non-wsrep threads (to be written
492 to the dump file).
493 */
494 ev= (thd->wsrep_applier) ? wsrep_get_apply_format(thd) :
495 (new Format_description_log_event(4));
496
497 if (writer.write(ev) || my_b_write(&cache, (uchar*)rbr_buf, buf_len) ||
498 flush_io_cache(&cache))
499 {
500 WSREP_ERROR("Failed to write to '%s'.", filename);
501 goto cleanup2;
502 }
503
504cleanup2:
505 end_io_cache(&cache);
506
507cleanup1:
508 mysql_file_close(file, MYF(MY_WME));
509
510 if (!thd->wsrep_applier) delete ev;
511
512 DBUG_VOID_RETURN;
513}
514
515