1/************************************************************************************
2 Copyright (C) 2018 MariaDB Corpoeation AB
3
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Library General Public
6 License as published by the Free Software Foundation; either
7 version 2 of the License, or (at your option) any later version.
8
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Library General Public License for more details.
13
14 You should have received a copy of the GNU Library General Public
15 License along with this library; if not see <http://www.gnu.org/licenses>
16 or write to the Free Software Foundation, Inc.,
17 51 Franklin St., Fifth Floor, Boston, MA 02110, USA
18
19*************************************************************************************/
20
21#include <ma_global.h>
22#include <ma_sys.h>
23#include <mysql.h>
24#include <errmsg.h>
25#include <stdlib.h>
26#include <string.h>
27#include <stdarg.h>
28#include <zlib.h>
29#include <mariadb_rpl.h>
30
31static int rpl_alloc_string(MARIADB_RPL_EVENT *event,
32 MARIADB_STRING *s,
33 unsigned char *buffer,
34 size_t len)
35{
36 if (!(s->str= ma_alloc_root(&event->memroot, len)))
37 return 1;
38 memcpy(s->str, buffer, len);
39 s->length= len;
40 return 0;
41}
42
43MARIADB_RPL * STDCALL mariadb_rpl_init_ex(MYSQL *mysql, unsigned int version)
44{
45 MARIADB_RPL *rpl;
46
47 if (version < MARIADB_RPL_REQUIRED_VERSION ||
48 version > MARIADB_RPL_VERSION)
49 {
50 my_set_error(mysql, CR_VERSION_MISMATCH, SQLSTATE_UNKNOWN, 0, version,
51 MARIADB_RPL_VERSION, MARIADB_RPL_REQUIRED_VERSION);
52 return 0;
53 }
54
55 if (!mysql)
56 return NULL;
57
58 if (!(rpl= (MARIADB_RPL *)calloc(1, sizeof(MARIADB_RPL))))
59 {
60 SET_CLIENT_ERROR(mysql, CR_OUT_OF_MEMORY, SQLSTATE_UNKNOWN, 0);
61 return 0;
62 }
63 rpl->version= version;
64 rpl->mysql= mysql;
65 return rpl;
66}
67
68void STDCALL mariadb_free_rpl_event(MARIADB_RPL_EVENT *event)
69{
70 if (event)
71 {
72 ma_free_root(&event->memroot, MYF(0));
73 free(event);
74 }
75}
76
77int STDCALL mariadb_rpl_open(MARIADB_RPL *rpl)
78{
79 unsigned char *ptr, *buf;
80 if (!rpl || !rpl->mysql)
81 return 1;
82
83 /* COM_BINLOG_DUMP:
84 Ofs Len Data
85 0 1 COM_BINLOG_DUMP
86 1 4 position
87 5 2 flags
88 7 4 server id
89 11 * filename
90
91 * = filename length
92
93 */
94 ptr= buf=
95#ifdef WIN32
96 (unsigned char *)_alloca(rpl->filename_length + 11);
97#else
98 (unsigned char *)alloca(rpl->filename_length + 11);
99#endif
100
101 int4store(ptr, (unsigned int)rpl->start_position);
102 ptr+= 4;
103 int2store(ptr, rpl->flags);
104 ptr+= 2;
105 int4store(ptr, rpl->server_id);
106 ptr+= 4;
107 memcpy(ptr, rpl->filename, rpl->filename_length);
108 ptr+= rpl->filename_length;
109
110 if (ma_simple_command(rpl->mysql, COM_BINLOG_DUMP, (const char *)buf, ptr - buf, 1, 0))
111 return 1;
112 return 0;
113}
114
115MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVENT *event)
116{
117 unsigned char *ev;
118 size_t len;
119 MARIADB_RPL_EVENT *rpl_event= 0;
120
121 if (!rpl || !rpl->mysql)
122 return 0;
123
124 while (1) {
125 unsigned long pkt_len= ma_net_safe_read(rpl->mysql);
126
127 if (pkt_len == packet_error)
128 {
129 rpl->buffer_size= 0;
130 return 0;
131 }
132
133 /* EOF packet:
134 see https://mariadb.com/kb/en/library/eof_packet/
135 Packet length must be less than 9 bytes, EOF header
136 is 0xFE.
137 */
138 if (pkt_len < 9 && rpl->mysql->net.read_pos[0] == 0xFE)
139 {
140 rpl->buffer_size= 0;
141 return 0;
142 }
143
144 /* if ignore heartbeat flag was set, we ignore this
145 record and continue to fetch next record.
146 The first byte is always status byte (0x00)
147 For event header description see
148 https://mariadb.com/kb/en/library/2-binlog-event-header/ */
149 if (rpl->flags & MARIADB_RPL_IGNORE_HEARTBEAT)
150 {
151 if (rpl->mysql->net.read_pos[1 + 4] == HEARTBEAT_LOG_EVENT)
152 continue;
153 }
154
155 rpl->buffer_size= pkt_len;
156 rpl->buffer= rpl->mysql->net.read_pos;
157
158 if (event)
159 {
160 MA_MEM_ROOT memroot= event->memroot;
161 rpl_event= event;
162 ma_free_root(&memroot, MYF(MY_KEEP_PREALLOC));
163 memset(rpl_event, 0, sizeof(MARIADB_RPL_EVENT));
164 rpl_event->memroot= memroot;
165 } else {
166 if (!(rpl_event = (MARIADB_RPL_EVENT *)malloc(sizeof(MARIADB_RPL_EVENT))))
167 goto mem_error;
168 memset(rpl_event, 0, sizeof(MARIADB_RPL_EVENT));
169 ma_init_alloc_root(&rpl_event->memroot, 8192, 0);
170 }
171 rpl_event->checksum= uint4korr(rpl->buffer + rpl->buffer_size - 4);
172
173 rpl_event->ok= rpl->buffer[0];
174 rpl_event->timestamp= uint4korr(rpl->buffer + 1);
175 rpl_event->event_type= (unsigned char)*(rpl->buffer + 5);
176 rpl_event->server_id= uint4korr(rpl->buffer + 6);
177 rpl_event->event_length= uint4korr(rpl->buffer + 10);
178 rpl_event->next_event_pos= uint4korr(rpl->buffer + 14);
179 rpl_event->flags= uint2korr(rpl->buffer + 18);
180
181 ev= rpl->buffer + EVENT_HEADER_OFS;
182
183 if (rpl->use_checksum)
184 {
185 rpl_event->checksum= *(ev + rpl_event->event_length - 4);
186 rpl_event->event_length-= 4;
187 }
188
189 switch(rpl_event->event_type) {
190 case HEARTBEAT_LOG_EVENT:
191 rpl_event->event.heartbeat.timestamp= uint4korr(ev);
192 ev+= 4;
193 rpl_event->event.heartbeat.next_position= uint4korr(ev);
194 ev+= 4;
195 rpl_event->event.heartbeat.type= (uint8_t)*ev;
196 ev+= 1;
197 rpl_event->event.heartbeat.flags= uint2korr(ev);
198 break;
199 case BINLOG_CHECKPOINT_EVENT:
200 len= uint4korr(ev);
201 ev+= 4;
202 if (rpl_alloc_string(rpl_event, &rpl_event->event.checkpoint.filename, ev, len))
203 goto mem_error;
204 break;
205 case FORMAT_DESCRIPTION_EVENT:
206 rpl_event->event.format_description.format = uint2korr(ev);
207 ev+= 2;
208 rpl_event->event.format_description.server_version = (char *)(ev);
209 ev+= 50;
210 rpl_event->event.format_description.timestamp= uint4korr(ev);
211 ev+= 4;
212 rpl->fd_header_len= rpl_event->event.format_description.header_len= (uint8_t)*ev;
213 ev= rpl->buffer + rpl->buffer_size - 5;
214 rpl->use_checksum= *ev;
215 break;
216 case QUERY_EVENT:
217 {
218 size_t db_len, status_len;
219 rpl_event->event.query.thread_id= uint4korr(ev);
220 ev+= 4;
221 rpl_event->event.query.seconds= uint4korr(ev);
222 ev+= 4;
223 db_len= *ev;
224 ev++;
225 rpl_event->event.query.errornr= uint2korr(ev);
226 ev+= 2;
227 status_len= uint2korr(ev);
228 ev+= 2;
229 if (rpl_alloc_string(rpl_event, &rpl_event->event.query.status, ev, status_len))
230 goto mem_error;
231 ev+= status_len;
232
233 if (rpl_alloc_string(rpl_event, &rpl_event->event.query.database, ev, db_len))
234 goto mem_error;
235 ev+= db_len + 1; /* zero terminated */
236
237 /* calculate statement size: buffer + buffer_size - current_ofs (ev) - crc_size */
238 len= (size_t)(rpl->buffer + rpl->buffer_size - ev - 4);
239 if (rpl_alloc_string(rpl_event, &rpl_event->event.query.statement, ev, len))
240 goto mem_error;
241 break;
242 }
243 case TABLE_MAP_EVENT:
244 rpl_event->event.table_map.table_id= uint6korr(ev);
245 ev+= 8;
246 len= *ev;
247 ev++;
248 if (rpl_alloc_string(rpl_event, &rpl_event->event.table_map.database, ev, len))
249 goto mem_error;
250 ev+= len + 1;
251 len= *ev;
252 ev++;
253 if (rpl_alloc_string(rpl_event, &rpl_event->event.table_map.table, ev, len))
254 goto mem_error;
255 ev+= len + 1;
256 rpl_event->event.table_map.column_count= mysql_net_field_length(&ev);
257 len= rpl_event->event.table_map.column_count;
258 if (rpl_alloc_string(rpl_event, &rpl_event->event.table_map.column_types, ev, len))
259 goto mem_error;
260 ev+= len;
261 len= mysql_net_field_length(&ev);
262 if (rpl_alloc_string(rpl_event, &rpl_event->event.table_map.metadata, ev, len))
263 goto mem_error;
264 break;
265 case RAND_EVENT:
266 rpl_event->event.rand.first_seed= uint8korr(ev);
267 ev+= 8;
268 rpl_event->event.rand.second_seed= uint8korr(ev);
269 break;
270 case INTVAR_EVENT:
271 rpl_event->event.intvar.type= *ev;
272 ev++;
273 rpl_event->event.intvar.value= uint8korr(ev);
274 break;
275 case USER_VAR_EVENT:
276 len= uint4korr(ev);
277 ev+= 4;
278 if (rpl_alloc_string(rpl_event, &rpl_event->event.uservar.name, ev, len))
279 goto mem_error;
280 ev+= len;
281 if (!(rpl_event->event.uservar.is_null= (uint8)*ev))
282 {
283 ev++;
284 rpl_event->event.uservar.type= *ev;
285 ev++;
286 rpl_event->event.uservar.charset_nr= uint4korr(ev);
287 ev+= 4;
288 len= uint4korr(ev);
289 ev+= 4;
290 if (rpl_alloc_string(rpl_event, &rpl_event->event.uservar.value, ev, len))
291 goto mem_error;
292 ev+= len;
293 if ((unsigned long)(ev - rpl->buffer) < rpl->buffer_size)
294 rpl_event->event.uservar.flags= *ev;
295 }
296 break;
297 case START_ENCRYPTION_EVENT:
298 rpl_event->event.encryption.scheme= *ev;
299 ev++;
300 rpl_event->event.encryption.key_version= uint4korr(ev);
301 ev+= 4;
302 rpl_event->event.encryption.nonce= (char *)ev;
303 break;
304 case ANNOTATE_ROWS_EVENT:
305 len= (uint32)(rpl->buffer + rpl->buffer_size - (unsigned char *)ev - 4);
306 if (rpl_alloc_string(rpl_event, &rpl_event->event.annotate_rows.statement, ev, len))
307 goto mem_error;
308 break;
309 case ROTATE_EVENT:
310 rpl_event->event.rotate.position= uint8korr(ev);
311 ev+= 8;
312 len= rpl_event->event_length - rpl->fd_header_len - 8;
313 if (rpl_alloc_string(rpl_event, &rpl_event->event.rotate.filename, ev, len))
314 goto mem_error;
315 break;
316 case XID_EVENT:
317 rpl_event->event.xid.transaction_nr= uint8korr(ev);
318 break;
319 case STOP_EVENT:
320 /* nothing to do here */
321 break;
322 case GTID_EVENT:
323 rpl_event->event.gtid.sequence_nr= uint8korr(ev);
324 ev+= 8;
325 rpl_event->event.gtid.domain_id= uint4korr(ev);
326 ev+= 4;
327 rpl_event->event.gtid.flags= *ev;
328 ev++;
329 if (rpl_event->event.gtid.flags & FL_GROUP_COMMIT_ID)
330 rpl_event->event.gtid.commit_id= uint8korr(ev);
331 break;
332 case GTID_LIST_EVENT:
333 {
334 uint32 i;
335 rpl_event->event.gtid_list.gtid_cnt= uint4korr(ev);
336 ev++;
337 if (!(rpl_event->event.gtid_list.gtid= (MARIADB_GTID *)ma_alloc_root(&rpl_event->memroot, sizeof(MARIADB_GTID) * rpl_event->event.gtid_list.gtid_cnt)))
338 goto mem_error;
339 for (i=0; i < rpl_event->event.gtid_list.gtid_cnt; i++)
340 {
341 rpl_event->event.gtid_list.gtid[i].domain_id= uint4korr(ev);
342 ev+= 4;
343 rpl_event->event.gtid_list.gtid[i].server_id= uint4korr(ev);
344 ev+= 4;
345 rpl_event->event.gtid_list.gtid[i].sequence_nr= uint8korr(ev);
346 ev+= 8;
347 }
348 break;
349 }
350 case WRITE_ROWS_EVENT_V1:
351 case UPDATE_ROWS_EVENT_V1:
352 case DELETE_ROWS_EVENT_V1:
353 rpl_event->event.rows.type= rpl_event->event_type - WRITE_ROWS_EVENT_V1;
354 if (rpl->fd_header_len == 6)
355 {
356 rpl_event->event.rows.table_id= uint4korr(ev);
357 ev+= 4;
358 } else {
359 rpl_event->event.rows.table_id= uint6korr(ev);
360 ev+= 6;
361 }
362 rpl_event->event.rows.flags= uint2korr(ev);
363 ev+= 2;
364 len= rpl_event->event.rows.column_count= mysql_net_field_length(&ev);
365 if (!len)
366 break;
367 if (!(rpl_event->event.rows.column_bitmap =
368 (char *)ma_alloc_root(&rpl_event->memroot, (len + 7) / 8)))
369 goto mem_error;
370 memcpy(rpl_event->event.rows.column_bitmap, ev, (len + 7) / 8);
371 ev+= (len + 7) / 8;
372 if (rpl_event->event_type == UPDATE_ROWS_EVENT_V1)
373 {
374 if (!(rpl_event->event.rows.column_update_bitmap =
375 (char *)ma_alloc_root(&rpl_event->memroot, (len + 7) / 8)))
376 goto mem_error;
377 memcpy(rpl_event->event.rows.column_update_bitmap, ev, (len + 7) / 8);
378 ev+= (len + 7) / 8;
379 }
380 len= (rpl->buffer + rpl_event->event_length + EVENT_HEADER_OFS - rpl->fd_header_len) - ev;
381 if ((rpl_event->event.rows.row_data_size= len))
382 {
383 if (!(rpl_event->event.rows.row_data =
384 (char *)ma_alloc_root(&rpl_event->memroot, rpl_event->event.rows.row_data_size)))
385 goto mem_error;
386 memcpy(rpl_event->event.rows.row_data, ev, rpl_event->event.rows.row_data_size);
387 }
388 break;
389 default:
390 return NULL;
391 break;
392 }
393 return rpl_event;
394 }
395mem_error:
396 SET_CLIENT_ERROR(rpl->mysql, CR_OUT_OF_MEMORY, SQLSTATE_UNKNOWN, 0);
397 return 0;
398}
399
400void STDCALL mariadb_rpl_close(MARIADB_RPL *rpl)
401{
402 if (!rpl)
403 return;
404 if (rpl->filename)
405 free((void *)rpl->filename);
406 free(rpl);
407 return;
408}
409
410int STDCALL mariadb_rpl_optionsv(MARIADB_RPL *rpl,
411 enum mariadb_rpl_option option,
412 ...)
413{
414 va_list ap;
415 int rc= 0;
416
417 if (!rpl)
418 return 1;
419
420 va_start(ap, option);
421
422 switch (option) {
423 case MARIADB_RPL_FILENAME:
424 {
425 char *arg1= va_arg(ap, char *);
426 rpl->filename_length= (uint32_t)va_arg(ap, size_t);
427 free((void *)rpl->filename);
428 rpl->filename= NULL;
429 if (rpl->filename_length)
430 {
431 rpl->filename= (char *)malloc(rpl->filename_length);
432 memcpy((void *)rpl->filename, arg1, rpl->filename_length);
433 }
434 else if (arg1)
435 {
436 rpl->filename= strdup((const char *)arg1);
437 rpl->filename_length= (uint32_t)strlen(rpl->filename);
438 }
439 break;
440 }
441 case MARIADB_RPL_SERVER_ID:
442 {
443 rpl->server_id= va_arg(ap, unsigned int);
444 break;
445 }
446 case MARIADB_RPL_FLAGS:
447 {
448 rpl->flags= va_arg(ap, unsigned int);
449 break;
450 }
451 case MARIADB_RPL_START:
452 {
453 rpl->start_position= va_arg(ap, unsigned long);
454 break;
455 }
456 default:
457 rc= -1;
458 goto end;
459 }
460end:
461 return rc;
462}
463
464int STDCALL mariadb_rpl_get_optionsv(MARIADB_RPL *rpl,
465 enum mariadb_rpl_option option,
466 ...)
467{
468 va_list ap;
469
470 if (!rpl)
471 return 1;
472
473 va_start(ap, option);
474
475 switch (option) {
476 case MARIADB_RPL_FILENAME:
477 {
478 const char **name= (const char **)va_arg(ap, char **);
479 size_t *len= (size_t*)va_arg(ap, size_t *);
480
481 *name= rpl->filename;
482 *len= rpl->filename_length;
483 break;
484 }
485 case MARIADB_RPL_SERVER_ID:
486 {
487 unsigned int *id= va_arg(ap, unsigned int *);
488 *id= rpl->server_id;
489 break;
490 }
491 case MARIADB_RPL_FLAGS:
492 {
493 unsigned int *flags= va_arg(ap, unsigned int *);
494 *flags= rpl->flags;
495 break;
496 }
497 case MARIADB_RPL_START:
498 {
499 unsigned long *start= va_arg(ap, unsigned long *);
500 *start= rpl->start_position;
501 break;
502 }
503 default:
504 return 1;
505 break;
506 }
507 return 0;
508}
509