1 | /* Copyright (C) 2013 Codership Oy <info@codership.com> |
2 | |
3 | This program is free software; you can redistribute it and/or modify |
4 | it under the terms of the GNU General Public License as published by |
5 | the Free Software Foundation; version 2 of the License. |
6 | |
7 | This program is distributed in the hope that it will be useful, |
8 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
9 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
10 | GNU General Public License for more details. |
11 | |
12 | You should have received a copy of the GNU General Public License along |
13 | with this program; if not, write to the Free Software Foundation, Inc., |
14 | 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ |
15 | |
16 | #include "mariadb.h" |
17 | #include "wsrep_binlog.h" |
18 | #include "wsrep_priv.h" |
19 | #include "log.h" |
20 | #include "log_event.h" |
21 | #include "wsrep_applier.h" |
22 | |
23 | extern handlerton *binlog_hton; |
24 | /* |
25 | Write the contents of a cache to a memory buffer. |
26 | |
27 | This function quite the same as MYSQL_BIN_LOG::write_cache(), |
28 | with the exception that here we write in buffer instead of log file. |
29 | */ |
30 | int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len) |
31 | { |
32 | *buf= NULL; |
33 | *buf_len= 0; |
34 | my_off_t const saved_pos(my_b_tell(cache)); |
35 | DBUG_ENTER("wsrep_write_cache_buf" ); |
36 | |
37 | if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) |
38 | { |
39 | WSREP_ERROR("failed to initialize io-cache" ); |
40 | DBUG_RETURN(ER_ERROR_ON_WRITE); |
41 | } |
42 | |
43 | uint length = my_b_bytes_in_cache(cache); |
44 | if (unlikely(0 == length)) length = my_b_fill(cache); |
45 | |
46 | size_t total_length = 0; |
47 | |
48 | if (likely(length > 0)) do |
49 | { |
50 | total_length += length; |
51 | /* |
52 | Bail out if buffer grows too large. |
53 | A temporary fix to avoid allocating indefinitely large buffer, |
54 | not a real limit on a writeset size which includes other things |
55 | like header and keys. |
56 | */ |
57 | if (total_length > wsrep_max_ws_size) |
58 | { |
59 | WSREP_WARN("transaction size limit (%lu) exceeded: %zu" , |
60 | wsrep_max_ws_size, total_length); |
61 | goto error; |
62 | } |
63 | uchar* tmp = (uchar *)my_realloc(*buf, total_length, |
64 | MYF(MY_ALLOW_ZERO_PTR)); |
65 | if (!tmp) |
66 | { |
67 | WSREP_ERROR("could not (re)allocate buffer: %zu + %u" , |
68 | *buf_len, length); |
69 | goto error; |
70 | } |
71 | *buf = tmp; |
72 | |
73 | memcpy(*buf + *buf_len, cache->read_pos, length); |
74 | *buf_len = total_length; |
75 | |
76 | if (cache->file < 0) |
77 | { |
78 | cache->read_pos= cache->read_end; |
79 | break; |
80 | } |
81 | } while ((length = my_b_fill(cache))); |
82 | |
83 | if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0)) |
84 | { |
85 | WSREP_WARN("failed to initialize io-cache" ); |
86 | goto cleanup; |
87 | } |
88 | |
89 | DBUG_RETURN(0); |
90 | |
91 | error: |
92 | if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0)) |
93 | { |
94 | WSREP_WARN("failed to initialize io-cache" ); |
95 | } |
96 | cleanup: |
97 | my_free(*buf); |
98 | *buf= NULL; |
99 | *buf_len= 0; |
100 | DBUG_RETURN(ER_ERROR_ON_WRITE); |
101 | } |
102 | |
103 | #define STACK_SIZE 4096 /* 4K - for buffer preallocated on the stack: |
104 | * many transactions would fit in there |
105 | * so there is no need to reach for the heap */ |
106 | |
107 | /* Returns minimum multiple of HEAP_PAGE_SIZE that is >= length */ |
108 | static inline size_t |
109 | heap_size(size_t length) |
110 | { |
111 | return (length + HEAP_PAGE_SIZE - 1)/HEAP_PAGE_SIZE*HEAP_PAGE_SIZE; |
112 | } |
113 | |
114 | /* append data to writeset */ |
115 | static inline wsrep_status_t |
116 | wsrep_append_data(wsrep_t* const wsrep, |
117 | wsrep_ws_handle_t* const ws, |
118 | const void* const data, |
119 | size_t const len) |
120 | { |
121 | struct wsrep_buf const buff = { data, len }; |
122 | wsrep_status_t const rc(wsrep->append_data(wsrep, ws, &buff, 1, |
123 | WSREP_DATA_ORDERED, true)); |
124 | DBUG_DUMP("buff" , (uchar*) data, len); |
125 | if (rc != WSREP_OK) |
126 | { |
127 | WSREP_WARN("append_data() returned %d" , rc); |
128 | } |
129 | |
130 | return rc; |
131 | } |
132 | |
133 | /* |
134 | Write the contents of a cache to wsrep provider. |
135 | |
136 | This function quite the same as MYSQL_BIN_LOG::write_cache(), |
137 | with the exception that here we write in buffer instead of log file. |
138 | |
139 | This version reads all of cache into single buffer and then appends to a |
140 | writeset at once. |
141 | */ |
142 | static int wsrep_write_cache_once(wsrep_t* const wsrep, |
143 | THD* const thd, |
144 | IO_CACHE* const cache, |
145 | size_t* const len) |
146 | { |
147 | my_off_t const saved_pos(my_b_tell(cache)); |
148 | DBUG_ENTER("wsrep_write_cache_once" ); |
149 | |
150 | if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) |
151 | { |
152 | WSREP_ERROR("failed to initialize io-cache" ); |
153 | DBUG_RETURN(ER_ERROR_ON_WRITE); |
154 | } |
155 | |
156 | int err(WSREP_OK); |
157 | |
158 | size_t total_length(0); |
159 | uchar stack_buf[STACK_SIZE]; /* to avoid dynamic allocations for few data*/ |
160 | uchar* heap_buf(NULL); |
161 | uchar* buf(stack_buf); |
162 | size_t allocated(sizeof(stack_buf)); |
163 | size_t used(0); |
164 | |
165 | uint length(my_b_bytes_in_cache(cache)); |
166 | if (unlikely(0 == length)) length = my_b_fill(cache); |
167 | |
168 | if (likely(length > 0)) do |
169 | { |
170 | total_length += length; |
171 | /* |
172 | Bail out if buffer grows too large. |
173 | A temporary fix to avoid allocating indefinitely large buffer, |
174 | not a real limit on a writeset size which includes other things |
175 | like header and keys. |
176 | */ |
177 | if (unlikely(total_length > wsrep_max_ws_size)) |
178 | { |
179 | WSREP_WARN("transaction size limit (%lu) exceeded: %zu" , |
180 | wsrep_max_ws_size, total_length); |
181 | err = WSREP_TRX_SIZE_EXCEEDED; |
182 | goto cleanup; |
183 | } |
184 | |
185 | if (total_length > allocated) |
186 | { |
187 | size_t const new_size(heap_size(total_length)); |
188 | uchar* tmp = (uchar *)my_realloc(heap_buf, new_size, |
189 | MYF(MY_ALLOW_ZERO_PTR)); |
190 | if (!tmp) |
191 | { |
192 | WSREP_ERROR("could not (re)allocate buffer: %zu + %u" , |
193 | allocated, length); |
194 | err = WSREP_TRX_SIZE_EXCEEDED; |
195 | goto cleanup; |
196 | } |
197 | |
198 | heap_buf = tmp; |
199 | buf = heap_buf; |
200 | allocated = new_size; |
201 | |
202 | if (used <= STACK_SIZE && used > 0) // there's data in stack_buf |
203 | { |
204 | DBUG_ASSERT(buf == stack_buf); |
205 | memcpy(heap_buf, stack_buf, used); |
206 | } |
207 | } |
208 | |
209 | memcpy(buf + used, cache->read_pos, length); |
210 | used = total_length; |
211 | if (cache->file < 0) |
212 | { |
213 | cache->read_pos= cache->read_end; |
214 | break; |
215 | } |
216 | } while ((length = my_b_fill(cache))); |
217 | |
218 | if (used > 0) |
219 | err = wsrep_append_data(wsrep, &thd->wsrep_ws_handle, buf, used); |
220 | |
221 | if (WSREP_OK == err) *len = total_length; |
222 | |
223 | cleanup: |
224 | if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0)) |
225 | { |
226 | WSREP_ERROR("failed to reinitialize io-cache" ); |
227 | } |
228 | |
229 | if (unlikely(WSREP_OK != err)) |
230 | { |
231 | wsrep_dump_rbr_buf_with_header(thd, buf, used); |
232 | } |
233 | |
234 | my_free(heap_buf); |
235 | DBUG_RETURN(err); |
236 | } |
237 | |
238 | /* |
239 | Write the contents of a cache to wsrep provider. |
240 | |
241 | This function quite the same as MYSQL_BIN_LOG::write_cache(), |
242 | with the exception that here we write in buffer instead of log file. |
243 | |
244 | This version uses incremental data appending as it reads it from cache. |
245 | */ |
246 | static int wsrep_write_cache_inc(wsrep_t* const wsrep, |
247 | THD* const thd, |
248 | IO_CACHE* const cache, |
249 | size_t* const len) |
250 | { |
251 | my_off_t const saved_pos(my_b_tell(cache)); |
252 | DBUG_ENTER("wsrep_write_cache_inc" ); |
253 | |
254 | if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) |
255 | { |
256 | WSREP_ERROR("failed to initialize io-cache" ); |
257 | DBUG_RETURN(WSREP_TRX_ERROR); |
258 | } |
259 | |
260 | int err(WSREP_OK); |
261 | |
262 | size_t total_length(0); |
263 | |
264 | uint length(my_b_bytes_in_cache(cache)); |
265 | if (unlikely(0 == length)) length = my_b_fill(cache); |
266 | |
267 | if (likely(length > 0)) do |
268 | { |
269 | total_length += length; |
270 | /* bail out if buffer grows too large |
271 | not a real limit on a writeset size which includes other things |
272 | like header and keys. |
273 | */ |
274 | if (unlikely(total_length > wsrep_max_ws_size)) |
275 | { |
276 | WSREP_WARN("transaction size limit (%lu) exceeded: %zu" , |
277 | wsrep_max_ws_size, total_length); |
278 | err = WSREP_TRX_SIZE_EXCEEDED; |
279 | goto cleanup; |
280 | } |
281 | |
282 | if(WSREP_OK != (err=wsrep_append_data(wsrep, &thd->wsrep_ws_handle, |
283 | cache->read_pos, length))) |
284 | goto cleanup; |
285 | |
286 | if (cache->file < 0) |
287 | { |
288 | cache->read_pos= cache->read_end; |
289 | break; |
290 | } |
291 | } while ((length = my_b_fill(cache))); |
292 | |
293 | if (WSREP_OK == err) *len = total_length; |
294 | |
295 | cleanup: |
296 | if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0)) |
297 | { |
298 | WSREP_ERROR("failed to reinitialize io-cache" ); |
299 | } |
300 | |
301 | DBUG_RETURN(err); |
302 | } |
303 | |
304 | /* |
305 | Write the contents of a cache to wsrep provider. |
306 | |
307 | This function quite the same as MYSQL_BIN_LOG::write_cache(), |
308 | with the exception that here we write in buffer instead of log file. |
309 | */ |
310 | int wsrep_write_cache(wsrep_t* const wsrep, |
311 | THD* const thd, |
312 | IO_CACHE* const cache, |
313 | size_t* const len) |
314 | { |
315 | if (wsrep_incremental_data_collection) { |
316 | return wsrep_write_cache_inc(wsrep, thd, cache, len); |
317 | } |
318 | else { |
319 | return wsrep_write_cache_once(wsrep, thd, cache, len); |
320 | } |
321 | } |
322 | |
323 | void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len) |
324 | { |
325 | char filename[PATH_MAX]= {0}; |
326 | int len= snprintf(filename, PATH_MAX, "%s/GRA_%lld_%lld.log" , |
327 | wsrep_data_home_dir, (longlong) thd->thread_id, |
328 | (longlong) wsrep_thd_trx_seqno(thd)); |
329 | if (len >= PATH_MAX) |
330 | { |
331 | WSREP_ERROR("RBR dump path too long: %d, skipping dump." , len); |
332 | return; |
333 | } |
334 | |
335 | FILE *of= fopen(filename, "wb" ); |
336 | |
337 | if (of) |
338 | { |
339 | if (fwrite(rbr_buf, buf_len, 1, of) == 0) |
340 | WSREP_ERROR("Failed to write buffer of length %llu to '%s'" , |
341 | (unsigned long long)buf_len, filename); |
342 | |
343 | fclose(of); |
344 | } |
345 | else |
346 | { |
347 | WSREP_ERROR("Failed to open file '%s': %d (%s)" , |
348 | filename, errno, strerror(errno)); |
349 | } |
350 | } |
351 | |
352 | /* |
353 | wsrep exploits binlog's caches even if binlogging itself is not |
354 | activated. In such case connection close needs calling |
355 | actual binlog's method. |
356 | Todo: split binlog hton from its caches to use ones by wsrep |
357 | without referring to binlog's stuff. |
358 | */ |
359 | int wsrep_binlog_close_connection(THD* thd) |
360 | { |
361 | DBUG_ENTER("wsrep_binlog_close_connection" ); |
362 | if (thd_get_ha_data(thd, binlog_hton) != NULL) |
363 | binlog_hton->close_connection (binlog_hton, thd); |
364 | DBUG_RETURN(0); |
365 | } |
366 | |
367 | int wsrep_binlog_savepoint_set(THD *thd, void *sv) |
368 | { |
369 | if (!wsrep_emulate_bin_log) return 0; |
370 | int rcode = binlog_hton->savepoint_set(binlog_hton, thd, sv); |
371 | return rcode; |
372 | } |
373 | |
374 | int wsrep_binlog_savepoint_rollback(THD *thd, void *sv) |
375 | { |
376 | if (!wsrep_emulate_bin_log) return 0; |
377 | int rcode = binlog_hton->savepoint_rollback(binlog_hton, thd, sv); |
378 | return rcode; |
379 | } |
380 | |
381 | #if 0 |
382 | void wsrep_dump_rbr_direct(THD* thd, IO_CACHE* cache) |
383 | { |
384 | char filename[PATH_MAX]= {0}; |
385 | int len= snprintf(filename, PATH_MAX, "%s/GRA_%lld_%lld.log" , |
386 | wsrep_data_home_dir, (longlong) thd->thread_id, |
387 | (longlong) wsrep_thd_trx_seqno(thd)); |
388 | size_t bytes_in_cache = 0; |
389 | // check path |
390 | if (len >= PATH_MAX) |
391 | { |
392 | WSREP_ERROR("RBR dump path too long: %d, skipping dump." , len); |
393 | return ; |
394 | } |
395 | // init cache |
396 | my_off_t const saved_pos(my_b_tell(cache)); |
397 | if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) |
398 | { |
399 | WSREP_ERROR("failed to initialize io-cache" ); |
400 | return ; |
401 | } |
402 | // open file |
403 | FILE* of = fopen(filename, "wb" ); |
404 | if (!of) |
405 | { |
406 | WSREP_ERROR("Failed to open file '%s': %d (%s)" , |
407 | filename, errno, strerror(errno)); |
408 | goto cleanup; |
409 | } |
410 | // ready to write |
411 | bytes_in_cache= my_b_bytes_in_cache(cache); |
412 | if (unlikely(bytes_in_cache == 0)) bytes_in_cache = my_b_fill(cache); |
413 | if (likely(bytes_in_cache > 0)) do |
414 | { |
415 | if (my_fwrite(of, cache->read_pos, bytes_in_cache, |
416 | MYF(MY_WME | MY_NABP)) == (size_t) -1) |
417 | { |
418 | WSREP_ERROR("Failed to write file '%s'" , filename); |
419 | goto cleanup; |
420 | } |
421 | |
422 | if (cache->file < 0) |
423 | { |
424 | cache->read_pos= cache->read_end; |
425 | break; |
426 | } |
427 | } while ((bytes_in_cache= my_b_fill(cache))); |
428 | if (cache->error == -1) |
429 | { |
430 | WSREP_ERROR("RBR inconsistent" ); |
431 | goto cleanup; |
432 | } |
433 | cleanup: |
434 | // init back |
435 | if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0)) |
436 | { |
437 | WSREP_ERROR("failed to reinitialize io-cache" ); |
438 | } |
439 | // close file |
440 | if (of) fclose(of); |
441 | } |
442 | #endif |
443 | |
444 | void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end) |
445 | { |
446 | thd->binlog_flush_pending_rows_event(stmt_end); |
447 | } |
448 | |
449 | /* Dump replication buffer along with header to a file. */ |
450 | void (THD *thd, const void *rbr_buf, |
451 | size_t buf_len) |
452 | { |
453 | DBUG_ENTER("wsrep_dump_rbr_buf_with_header" ); |
454 | |
455 | char filename[PATH_MAX]= {0}; |
456 | File file; |
457 | IO_CACHE cache; |
458 | Log_event_writer writer(&cache, 0); |
459 | Format_description_log_event *ev= 0; |
460 | |
461 | int len= my_snprintf(filename, PATH_MAX, "%s/GRA_%lld_%lld_v2.log" , |
462 | wsrep_data_home_dir, (longlong) thd->thread_id, |
463 | (long long) wsrep_thd_trx_seqno(thd)); |
464 | |
465 | if (len >= PATH_MAX) |
466 | { |
467 | WSREP_ERROR("RBR dump path too long: %d, skipping dump." , len); |
468 | DBUG_VOID_RETURN; |
469 | } |
470 | |
471 | if ((file= mysql_file_open(key_file_wsrep_gra_log, filename, |
472 | O_RDWR | O_CREAT | O_BINARY, MYF(MY_WME))) < 0) |
473 | { |
474 | WSREP_ERROR("Failed to open file '%s' : %d (%s)" , |
475 | filename, errno, strerror(errno)); |
476 | goto cleanup1; |
477 | } |
478 | |
479 | if (init_io_cache(&cache, file, 0, WRITE_CACHE, 0, 0, MYF(MY_WME | MY_NABP))) |
480 | { |
481 | mysql_file_close(file, MYF(MY_WME)); |
482 | goto cleanup2; |
483 | } |
484 | |
485 | if (my_b_safe_write(&cache, BINLOG_MAGIC, BIN_LOG_HEADER_SIZE)) |
486 | { |
487 | goto cleanup2; |
488 | } |
489 | |
490 | /* |
491 | Instantiate an FDLE object for non-wsrep threads (to be written |
492 | to the dump file). |
493 | */ |
494 | ev= (thd->wsrep_applier) ? wsrep_get_apply_format(thd) : |
495 | (new Format_description_log_event(4)); |
496 | |
497 | if (writer.write(ev) || my_b_write(&cache, (uchar*)rbr_buf, buf_len) || |
498 | flush_io_cache(&cache)) |
499 | { |
500 | WSREP_ERROR("Failed to write to '%s'." , filename); |
501 | goto cleanup2; |
502 | } |
503 | |
504 | cleanup2: |
505 | end_io_cache(&cache); |
506 | |
507 | cleanup1: |
508 | mysql_file_close(file, MYF(MY_WME)); |
509 | |
510 | if (!thd->wsrep_applier) delete ev; |
511 | |
512 | DBUG_VOID_RETURN; |
513 | } |
514 | |
515 | |