1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9/*
10 * A master can be replicated by taking a binary copy of the 'bat' directory
11 * when in quiescent mode or a more formal snapshot..
12 * Alternatively you start with an empty database.
13 *
14 * The wlc log records written are numbered 0.. wlc_tag - 1
15 * The replicator copies all of them unto and including wlc_limit.
16 * This leads to the wlr_tag from -1 .. wlc_limit, wlr_tag,..., INT64_MAX
17 *
18 * Replication start after setting the master id and giving an (optional)
19 * wlr_limit.
20 * Any error encountered in replaying the log stops the process, because then
21 * no guarantee can be given on the consistency with the master database.
22 * A manual fix for an exceptional case is allowed, whereafter a call
23 * to CALL wlrclear() accepts the failing transaction and prepares
24 * to the next CALL replicate(),
25 */
26#include "monetdb_config.h"
27#include "sql.h"
28#include "wlc.h"
29#include "wlr.h"
30#include "sql_scenario.h"
31#include "sql_execute.h"
32#include "opt_prelude.h"
33#include "mal_parser.h"
34#include "mal_client.h"
35#include "mal_authorize.h"
36#include "querylog.h"
37
38#define WLR_WAIT 0
39#define WLR_RUN 101
40#define WLR_STOP 201
41
42#define WLC_COMMIT 40
43#define WLC_ROLLBACK 50
44#define WLC_ERROR 60
45
46MT_Lock wlr_lock = MT_LOCK_INITIALIZER("wlr_lock");
47
48// #define _WLR_DEBUG_
49
50/* The current status of the replica processing.
51 * It is based on the assumption that at most one replica thread is running
52 * importing data from a single master.
53 */
54static char wlr_master[IDLENGTH];
55static int wlr_batches; // the next file to be processed
56static lng wlr_tag = -1; // the last transaction id being processed
57static char wlr_timelimit[26]; // stop re-processing transactions when time limit is reached
58static char wlr_read[26]; // stop re-processing transactions when time limit is reached
59static int wlr_beat; // period between successive synchronisations with master
60static char wlr_error[FILENAME_MAX]; // errors should stop the process
61
62static MT_Id wlr_thread = 0; // The single replicator thread
63static int wlr_state = WLR_WAIT; // which state WAIT/RUN
64static lng wlr_limit = -1; // stop re-processing after transaction id 'wlr_limit' is processed
65
66#define MAXLINE 2048
67
68/* Simple read the replica configuration status file */
69static int
70WLRgetConfig(void){
71 char *path;
72 char line[MAXLINE];
73 FILE *fd;
74 int len;
75
76 if((path = GDKfilepath(0, 0, "wlr.config", 0)) == NULL){
77 fprintf(stderr, "wlr.getConfig:Could not create wlr.config file path\n");
78 return -1;
79 }
80 fd = fopen(path,"r");
81 GDKfree(path);
82 if( fd == NULL){
83 // during start of the replicator it need not be there
84 return 1;
85 }
86 while( fgets(line, MAXLINE, fd) ){
87 line[strlen(line)-1]= 0;
88#ifdef _WLR_DEBUG_
89 fprintf(stderr,"#WLRgetConfig %s\n", line);
90#endif
91 if( strncmp("master=", line,7) == 0) {
92 len = snprintf(wlr_master, IDLENGTH, "%s", line + 7);
93 if (len == -1 || len >= IDLENGTH) {
94 fprintf(stderr, "wlr.getConfig:master config value is too large");
95 goto bailout;
96 } else
97 if (len == 0) {
98 fprintf(stderr, "wlr.getConfig:master config path missing");
99 goto bailout;
100 }
101 } else
102 if( strncmp("batches=", line, 8) == 0)
103 wlr_batches = atoi(line+ 8);
104 else
105 if( strncmp("tag=", line, 4) == 0)
106 wlr_tag = atoi(line+ 4);
107 else
108 if( strncmp("beat=", line, 5) == 0)
109 wlr_beat = atoi(line+ 5);
110 else
111 if( strncmp("timelimit=", line, 10) == 0)
112 strcpy(wlr_timelimit, line + 10);
113 else
114 if( strncmp("error=", line, 6) == 0) {
115 char *s;
116 len = snprintf(wlr_error, FILENAME_MAX, "%s", line + 6);
117 if (len == -1 || len >= FILENAME_MAX) {
118 fprintf(stderr, "wlr.getConfig:error config value is too large");
119 goto bailout;
120 }
121 s = strchr(wlr_error, (int) '\n');
122 if ( s) *s = 0;
123 } else{
124 fprintf(stderr, "wlr.getConfig:unknown configuration item '%s'", line);
125 goto bailout;
126 }
127 }
128 return 0;
129bailout:
130 fclose(fd);
131 return -1;
132}
133
134/* Keep the current status in the configuration status file */
135static void
136WLRputConfig(void){
137 char *path;
138 stream *fd;
139
140 if((path = GDKfilepath(0,0,"wlr.config",0)) == NULL){
141 fprintf(stderr,"wlr.setMaster:Could not access wlr.config file\n");
142 return ;
143 }
144 fd = open_wastream(path);
145 GDKfree(path);
146 if( fd == NULL){
147 fprintf(stderr,"wlr.setMaster:Could not create wlr.config file\n");
148 return;
149 }
150
151 mnstr_printf(fd,"master=%s\n", wlr_master);
152 mnstr_printf(fd,"batches=%d\n", wlr_batches);
153 mnstr_printf(fd,"tag="LLFMT"\n", wlr_tag);
154 mnstr_printf(fd,"beat=%d\n", wlr_beat);
155 if( wlr_timelimit[0])
156 mnstr_printf(fd,"timelimit=%s\n", wlr_timelimit);
157 if( wlr_error[0])
158 mnstr_printf(fd,"error=%s\n", wlr_error);
159 close_stream(fd);
160#ifdef _WLR_DEBUG_
161 fprintf(stderr,"#WLRput: batches %d tag " LLFMT " limit "LLFMT " beat %d timelimit %s\n",
162 wlr_batches, wlr_tag, wlr_limit, wlr_beat, wlr_timelimit);
163#endif
164}
165
166/*
167 * When the master database exist, we should set the replica administration.
168 * But only once.
169 *
170 * The log files are identified by a range. It starts with 0 when an empty database
171 * was used to bootstrap. Otherwise it is the range received from the dbmaster.
172 * At any time we should be able to restart the synchronization
173 * process by grabbing a new set of log files.
174 * This calls for keeping track in the replica what log files have been applied
175 * and what the last completed transaction was.
176 *
177 * Given that the replication thread runs independently, all errors encountered
178 * should be sent to the system logging system.
179 */
180static str
181WLRgetMaster(void)
182{
183 char path[FILENAME_MAX];
184 int len;
185 str dir;
186 FILE *fd;
187
188 if( wlr_master[0] == 0 )
189 return MAL_SUCCEED;
190
191 /* collect master properties */
192 len = snprintf(path, FILENAME_MAX, "..%c%s", DIR_SEP, wlr_master);
193 if (len == -1 || len >= FILENAME_MAX)
194 throw(MAL, "wlr.getMaster", "wlc.config filename path is too large");
195 if((dir = GDKfilepath(0,path,"wlc.config",0)) == NULL)
196 throw(MAL,"wlr.getMaster","Could not access wlc.config file %s/wlc.config\n", path);
197
198 fd = fopen(dir,"r");
199 GDKfree(dir);
200 if( fd ){
201 WLCreadConfig(fd);
202 if( ! wlr_master[0] )
203 throw(MAL,"wlr.getMaster","Master not identified\n");
204 wlc_state = WLC_CLONE; // not used as master
205 } else
206 throw(MAL,"wlr.getMaster","Could not get read access to '%s'config file\n", wlr_master);
207 return MAL_SUCCEED;
208}
209
210/* each WLR block is turned into a separate MAL block and executed
211 * This block is re-used as we consider the complete file.
212 */
213
214#define cleanup(){\
215 resetMalBlkAndFreeInstructions(mb, 1);\
216 trimMalVariables(mb, NULL);\
217 }
218
219static void
220WLRprocessBatch(void *arg)
221{
222 Client cntxt = (Client) arg;
223 int i, len;
224 char path[FILENAME_MAX];
225 stream *fd = NULL;
226 Client c;
227 size_t sz;
228 MalBlkPtr mb;
229 InstrPtr q;
230 str msg, other;
231 mvc *sql;
232 Symbol prev = NULL;
233 lng tag = wlr_tag;
234 char tag_read[26]; // stop re-processing transactions when time limit is reached
235
236 c =MCforkClient(cntxt);
237 if( c == 0){
238 fprintf(stderr, "#Could not create user for WLR process\n");
239 return;
240 }
241 c->promptlength = 0;
242 c->listing = 0;
243 c->fdout = open_wastream(".wlr");
244 if(c->fdout == NULL) {
245 MCcloseClient(c);
246 fprintf(stderr, "#Could not create user for WLR process\n");
247 return;
248 }
249
250 /* Cook a log file into a concreate MAL function for multiple transactions */
251 prev = newFunction(putName("user"), putName("wlr"), FUNCTIONsymbol);
252 if(prev == NULL) {
253 MCcloseClient(c);
254 fprintf(stderr, "#Could not create user for WLR process\n");
255 return;
256 }
257 c->curprg = prev;
258 mb = c->curprg->def;
259 setVarType(mb, 0, TYPE_void);
260
261 msg = SQLinitClient(c);
262 if( msg != MAL_SUCCEED)
263 fprintf(stderr,"#Failed to initialize the client\n");
264 msg = getSQLContext(c, mb, &sql, NULL);
265 if( msg)
266 fprintf(stderr,"#Failed to access the transaction context: %s\n",msg);
267 if ((msg = checkSQLContext(c)) != NULL)
268 fprintf(stderr,"#Inconsistent SQL context: %s\n",msg);
269
270#ifdef _WLR_DEBUG_
271 fprintf(stderr,"#Ready to start the replay against batches state %d wlr "LLFMT" wlr_limit "LLFMT" wlr %d wlc %d taglimit "LLFMT" exit %d\n",
272 wlr_state, wlr_tag, wlr_limit, wlr_batches, wlc_batches, wlr_limit, GDKexiting() );
273#endif
274 path[0]=0;
275 for( i= wlr_batches; i < wlc_batches && !GDKexiting() && wlr_state != WLR_STOP && wlr_tag < wlr_limit; i++){
276 len = snprintf(path,FILENAME_MAX,"%s%c%s_%012d", wlc_dir, DIR_SEP, wlr_master, i);
277 if (len == -1 || len >= FILENAME_MAX) {
278 fprintf(stderr,"#wlr.process: filename path is too large\n");
279 continue;
280 }
281 fd= open_rastream(path);
282 if( fd == NULL){
283 fprintf(stderr,"#wlr.process:'%s' can not be accessed \n",path);
284 // Be careful not to miss log files.
285 continue;
286 }
287 sz = getFileSize(fd);
288 if (sz > (size_t) 1 << 29) {
289 close_stream(fd);
290 fprintf(stderr, "#wlr.process File %s too large to process", path);
291 continue;
292 }
293 if((c->fdin = bstream_create(fd, sz == 0 ? (size_t) (2 * 128 * BLOCK) : sz)) == NULL) {
294 close_stream(fd);
295 fprintf(stderr, "#wlr.process Failed to open stream for file %s", path);
296 continue;
297 }
298 if (bstream_next(c->fdin) < 0){
299 fprintf(stderr, "!WARNING: could not read %s\n", path);
300 continue;
301 }
302
303 c->yycur = 0;
304#ifdef _WLR_DEBUG_
305 fprintf(stderr,"#REPLAY LOG FILE:%s\n",path);
306#endif
307
308 // now parse the file line by line to reconstruct the WLR blocks
309 do{
310 parseMAL(c, c->curprg, 1, 1);
311
312 mb = c->curprg->def;
313 if( mb->errors){
314 char line[FILENAME_MAX];
315 snprintf(line, FILENAME_MAX,"#wlr.process:failed further parsing '%s':",path);
316 snprintf(wlr_error, FILENAME_MAX, "%.*s", FILENAME_MAX, line);
317 fprintf(stderr,"%s\n",line);
318 fprintFunction(stderr, mb, 0, LIST_MAL_DEBUG );
319 cleanup();
320#ifdef _WLR_DEBUG_
321 fprintf(stderr,"#redo transaction error \n");
322#endif
323 continue;
324 }
325 q= getInstrPtr(mb, mb->stop - 1);
326 if( getModuleId(q) != wlrRef){
327#ifdef _WLR_DEBUG_XTRA
328 fprintf(stderr,"#unexpected instruction ");
329 fprintInstruction(stderr, mb, 0, q, LIST_MAL_ALL);
330#endif
331 cleanup();
332 break;
333 }
334 if( getModuleId(q) == wlrRef && getFunctionId(q) == transactionRef){
335 tag = getVarConstant(mb, getArg(q,1)).val.lval;
336 snprintf(tag_read, sizeof(wlr_read), "%s", getVarConstant(mb, getArg(q,2)).val.sval);
337#ifdef _WLR_DEBUG_
338 fprintf(stderr,"#do transaction tag "LLFMT" wlr_limit "LLFMT" wlr_tag "LLFMT"\n", tag, wlr_limit, wlr_tag);
339#endif
340 // break loop if we don't see a the next expected transaction
341 if ( tag <= wlr_tag){
342 /* skip already executed transaction log */
343 continue;
344 } else
345 if( ( tag > wlr_limit) ||
346 ( wlr_timelimit[0] && strcmp(tag_read, wlr_timelimit) > 0)){
347 /* stop execution of the transactions if your reached the limit */
348 cleanup();
349#ifdef _WLR_DEBUG_
350 fprintf(stderr,"#Found final transaction "LLFMT"("LLFMT")\n", wlr_limit, wlr_tag);
351#endif
352 break;
353 }
354#ifdef _WLR_DEBUG_
355 fprintf(stderr,"#run against tlimit %s wlr_tag "LLFMT" tag" LLFMT" \n", wlr_timelimit, wlr_tag, tag);
356#endif
357 }
358 // only re-execute successful transactions.
359 if ( getModuleId(q) == wlrRef && getFunctionId(q) ==commitRef ){
360 pushEndInstruction(mb);
361 // execute this block if no errors are found
362 chkTypes(c->usermodule, mb, FALSE);
363 chkFlow(mb);
364 chkDeclarations(mb);
365
366 if( mb->errors == 0){
367 sql->session->auto_commit = 0;
368 sql->session->ac_on_commit = 1;
369 sql->session->level = 0;
370 if(mvc_trans(sql) < 0) {
371 fprintf(stderr,"#Allocation failure while starting the transaction \n");
372 } else {
373#ifdef _WLR_DEBUG_
374 fprintf(stderr,"#process a transaction\n");
375 fprintFunction(stderr, mb, 0, LIST_MAL_DEBUG | LIST_MAL_MAPI );
376#endif
377 wlr_tag = tag; // remember which transaction we executed
378 snprintf(wlr_read, sizeof(wlr_read), "%s", tag_read);
379 msg= runMAL(c,mb,0,0);
380 if( msg == MAL_SUCCEED){
381 /* at this point we have updated the replica, but the configuration has not been changed.
382 * If at this point an error occurs, we could redo the same transaction twice later on.
383 * The solution is to make sure that we recognize that a transaction has started and is completed successfully
384 */
385 WLRputConfig();
386 }
387 // ignore warnings
388 if (msg && strstr(msg,"WARNING"))
389 msg = MAL_SUCCEED;
390 if( msg != MAL_SUCCEED){
391 // they should always succeed
392 msg =createException(MAL,"wlr.process", "batch %d:"LLFMT" :%s\n", i, tag, msg);
393 //fprintFunction(stderr, mb, 0, LIST_MAL_DEBUG );
394 if((other = mvc_rollback(sql,0,NULL, false)) != MAL_SUCCEED) //an error was already established
395 GDKfree(other);
396 } else
397 if((other = mvc_commit(sql, 0, 0, false)) != MAL_SUCCEED) {
398 msg = createException(MAL,"wlr.process", "transaction %d:"LLFMT" commit failed: %s\n", i, tag, other);
399 freeException(other);
400 }
401 }
402 } else {
403 char line[FILENAME_MAX];
404 snprintf(line, FILENAME_MAX,"#wlr.process:typechecking failed '%s':\n",path);
405 snprintf(wlr_error, FILENAME_MAX, "%s", line);
406 fprintf(stderr,"%s",line);
407 fprintFunction(stderr, mb, 0, LIST_MAL_DEBUG );
408 }
409 cleanup();
410 if ( wlr_tag + 1 == wlc_tag || tag == wlr_limit)
411 break;
412 } else
413 if ( getModuleId(q) == wlrRef && (getFunctionId(q) == rollbackRef || getFunctionId(q) == commitRef)){
414 cleanup();
415 if ( wlr_tag + 1 == wlc_tag || tag == wlr_limit)
416 break;
417 }
418 } while(wlr_state != WLR_STOP && mb->errors == 0 && msg == MAL_SUCCEED);
419#ifdef _WLR_DEBUG_
420 fprintf(stderr,"#wlr.process:processed log file wlr_tag "LLFMT" wlr_limit "LLFMT" time %s\n", wlr_tag, wlr_limit, wlr_timelimit);
421#endif
422 // skip to next file when all is read
423 wlr_batches++;
424 if( msg != MAL_SUCCEED)
425 snprintf(wlr_error, FILENAME_MAX, "%s", msg);
426 WLRputConfig();
427 bstream_destroy(c->fdin);
428 if ( wlr_tag == wlr_limit)
429 break;
430 }
431 (void) fflush(stderr);
432 close_stream(c->fdout);
433 SQLexitClient(c);
434 MCcloseClient(c);
435 if(prev)
436 freeSymbol(prev);
437}
438
439/*
440 * A single WLR thread is allowed to run in the background.
441 * If it happens to crash then replication roll forward is suspended.
442 * Moreover, the background job can only leave error messages in the merovingian log.
443 *
444 * A timing issue.
445 * The WLRprocess can only start after an SQL environment has been initialized.
446 * It is therefore initialized when a SQLclient() is issued.
447 */
448static void
449WLRprocessScheduler(void *arg)
450{ Client cntxt = (Client) arg;
451 int i, duration = 0;
452 struct timeval clock;
453 time_t clk;
454 struct tm ctm;
455 char clktxt[26];
456
457 if( ( i = WLRgetConfig()) ){
458 if ( i> 0)
459 WLRputConfig();
460 else return;
461 }
462 assert(wlr_master[0]);
463 cntxt = MCinitClient(MAL_ADMIN, NULL,NULL);
464
465 MT_lock_set(&wlr_lock);
466 if ( wlr_state != WLR_STOP)
467 wlr_state = WLR_RUN;
468 MT_lock_unset(&wlr_lock);
469#ifdef _WLR_DEBUG_
470 fprintf(stderr, "#Run the replicator %d %d\n", GDKexiting(), wlr_state);
471#endif
472 while( wlr_state != WLR_STOP && !wlr_error[0]){
473 // wait at most for the cycle period, also at start
474 duration = (wlc_beat > 0 ? wlc_beat:1) * 1000 ;
475 if( wlr_timelimit[0]){
476 gettimeofday(&clock, NULL);
477 clk = clock.tv_sec;
478#ifdef HAVE_LOCALTIME_R
479 (void) localtime_r(&clk, &ctm);
480#else
481 ctm = *localtime(&clk);
482#endif
483
484 strftime(clktxt, sizeof(clktxt), "%Y-%m-%dT%H:%M:%S.000",&ctm);
485#ifdef _WLR_DEBUG_
486 fprintf(stderr,"#now %s tlimit %s\n",clktxt, wlr_timelimit);
487#endif
488 // actually never wait longer then the timelimit requires
489 // preference is given to the beat.
490 MT_thread_setworking("sleeping");
491 if(strncmp(clktxt, wlr_timelimit,sizeof(wlr_timelimit)) >= 0 && duration >100)
492 MT_sleep_ms(duration);
493 }
494 for( ; duration > 0 && wlr_state != WLR_STOP; duration -= 200){
495 if ( wlr_tag + 1 == wlc_tag || wlr_tag >= wlr_limit || wlr_limit == -1){
496 MT_thread_setworking("sleeping");
497 MT_sleep_ms(200);
498 }
499 }
500 MT_thread_setworking("processing");
501 if( WLRgetMaster() == 0 && wlr_tag + 1 < wlc_tag && wlr_tag < wlr_limit && wlr_batches <= wlc_batches && wlr_state != WLR_STOP)
502 WLRprocessBatch(cntxt);
503
504 /* Can not use GDKexiting(), because a test may already reach that point before it did anything.
505 * Instead wait for the explicit WLR_STOP
506 */
507 if( GDKexiting()){
508#ifdef _WLR_DEBUG_
509 fprintf(stderr, "#Replicator thread stopped due to GDKexiting()\n");
510#endif
511 MT_lock_set(&wlr_lock);
512 wlr_state = WLR_STOP;
513 MT_lock_unset(&wlr_lock);
514 break;
515 }
516 }
517 wlr_thread = 0;
518 MT_lock_set(&wlr_lock);
519 if( wlr_state == WLR_RUN)
520 wlr_state = WLR_WAIT;
521 MT_lock_unset(&wlr_lock);
522 MCcloseClient(cntxt);
523
524#ifdef _WLR_DEBUG_
525 fprintf(stderr, "#Replicator thread is stopped \n");
526#endif
527}
528
529// The replicate() command can be issued at the SQL console
530// which can accept exceptions
531str
532WLRmaster(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
533{
534 if( getArgType(mb, pci, 1) == TYPE_str)
535 return WLRstart(cntxt, mb, stk, pci);
536 throw(MAL, "wlr.master", "No master configuration");
537}
538
539str
540WLRreplicate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
541{ str timelimit = wlr_timelimit;
542 size_t size = sizeof(wlr_timelimit);
543 struct timeval clock;
544 time_t clk;
545 struct tm ctm;
546 char clktxt[26];
547 int duration = 4000;
548 lng limit = INT64_MAX;
549 str msg = MAL_SUCCEED;
550 (void) cntxt;
551
552 if( WLRgetConfig())
553 throw(MAL, "sql.replicate", "No replication configuration");
554
555 if( !wlr_thread)
556 throw(MAL, "sql.replicate", "Replicator not started, call wlr.master() first ");
557
558 if( pci->argc == 0)
559 wlr_limit = INT64_MAX;
560 else
561 if( getArgType(mb, pci, 1) == TYPE_timestamp){
562 if (timestamp_precision_tostr(&timelimit, &size, *getArgReference_TYPE(stk, pci, 1, timestamp), 3, true) < 0)
563 throw(SQL, "wlr.replicate", GDK_EXCEPTION);
564 fprintf(stderr,"#time limit %s\n",timelimit);
565 } else
566 if( getArgType(mb, pci, 1) == TYPE_bte)
567 limit = getVarConstant(mb,getArg(pci,1)).val.btval;
568 else
569 if( getArgType(mb, pci, 1) == TYPE_sht)
570 limit = getVarConstant(mb,getArg(pci,1)).val.shval;
571 else
572 if( getArgType(mb, pci, 1) == TYPE_int)
573 limit = getVarConstant(mb,getArg(pci,1)).val.ival;
574 else
575 if( getArgType(mb, pci, 1) == TYPE_lng)
576 limit = getVarConstant(mb,getArg(pci,1)).val.lval;
577
578 if ( limit < 0 && timelimit[0] == 0)
579 throw(MAL, "sql.replicate", "Stop tag limit should be positive or timestamp should be set");
580 if (limit < INT64_MAX && limit >= wlc_tag)
581 throw(MAL, "sql.replicate", "Stop tag limit "LLFMT" be less than wlc_tag "LLFMT, limit, wlc_tag);
582 if ( limit >= 0)
583 wlr_limit = limit;
584
585 if ( wlc_state != WLC_CLONE)
586 throw(MAL, "sql.replicate", "No replication master set");
587 WLRputConfig();
588
589 // the client thread should wait for the replicator to its job
590 gettimeofday(&clock, NULL);
591 clk = clock.tv_sec;
592#ifdef HAVE_LOCALTIME_R
593 (void) localtime_r(&clk, &ctm);
594#else
595 ctm = *localtime(&clk);
596#endif
597 strftime(clktxt, sizeof(clktxt), "%Y-%m-%dT%H:%M:%S.000",&ctm);
598#ifdef _WLR_DEBUG_
599 fprintf(stderr, "#replicate: wait until wlr_limit = "LLFMT" (tag "LLFMT") time %s (%s)\n", wlr_limit, wlr_tag, (wlr_timelimit[0]? wlr_timelimit:""), clktxt);
600#endif
601
602 while ( (wlr_tag < wlr_limit ) || (wlr_timelimit[0] && strncmp(clktxt, wlr_timelimit, sizeof(wlr_timelimit)) > 0) ) {
603 if( wlr_state == WLR_STOP)
604 break;
605 if( wlr_limit == INT64_MAX && wlr_tag >= wlc_tag -1 )
606 break;
607
608 if ( wlr_error[0])
609 throw(MAL, "sql.replicate", "tag "LLFMT": %s", wlr_tag, wlr_error);
610 if ( wlr_tag == wlc_tag)
611 break;
612
613#ifdef _WLR_DEBUG_
614 fprintf(stderr, "#replicate wait state %d wlr_limit "LLFMT" (wlr_tag "LLFMT") wlc_tag "LLFMT" wlr_batches %d\n",
615 wlr_state, wlr_limit, wlr_tag, wlc_tag, wlr_batches);
616 fflush(stderr);
617#endif
618 if ( !wlr_thread ){
619 if( wlr_error[0])
620 throw(SQL,"wlr.startreplicate",SQLSTATE(42000) "Replicator terminated prematurely %s", wlr_error);
621 throw(SQL,"wlr.startreplicate",SQLSTATE(42000) "Replicator terminated prematurelys");
622 }
623
624 duration -= 200;
625 if ( duration < 0){
626 if( wlr_limit == INT64_MAX && wlr_timelimit[0] == 0)
627 break;
628 throw(SQL,"wlr.startreplicate",SQLSTATE(42000) "Timeout to wait for replicator to catch up."
629 "Catched up until "LLFMT", " LLFMT " pending", wlr_tag, wlr_limit - wlr_tag);
630 }
631 // don't make the sleep too short.
632 MT_sleep_ms( 200);
633 }
634#ifdef _WLR_DEBUG_
635 fprintf(stderr, "#replicate finished "LLFMT" (tag "LLFMT")\n", wlr_limit, wlr_tag);
636#endif
637 return msg;
638}
639
640/* watch out, each log record can contain multiple transaction COMMIT/ROLLBACKs
641 * This means the wlc_kind can not be set to the last one.
642 */
643str
644WLRtransaction(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
645{ InstrPtr p;
646 int i;
647
648 (void) cntxt;
649 (void) pci;
650 (void) stk;
651 cntxt->wlc_kind = 0;
652 if( wlr_error[0]){
653 cntxt->wlc_kind = WLC_ERROR;
654 return MAL_SUCCEED;
655 }
656 for( i = mb->stop-1; cntxt->wlc_kind == 0 && i > 1; i--){
657 p = getInstrPtr(mb,i);
658 if( getModuleId(p) == wlrRef && getFunctionId(p)== commitRef)
659 cntxt->wlc_kind = WLC_COMMIT;
660 if( getModuleId(p) == wlrRef && getFunctionId(p)== rollbackRef)
661 cntxt->wlc_kind = WLC_ROLLBACK;
662 }
663 return MAL_SUCCEED;
664}
665
666
667/* the Configuration is shared with the Logger thread, so protect its access */
668str
669WLRstart(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
670{
671 int len;
672 str msg;
673
674 (void) cntxt;
675 (void) mb;
676 if( wlr_thread)
677 throw(MAL, "wlr.replicate", SQLSTATE(42000) "replicator thread already running for %s ", wlr_master);
678
679 len = snprintf(wlr_master, IDLENGTH, "%s", *getArgReference_str(stk, pci, 1));
680 if (len == -1 || len >= IDLENGTH)
681 throw(MAL, "wlr.replicate", SQLSTATE(42000) "Input value is too large for wlr_master buffer");
682 if( (msg =WLRgetMaster()) != MAL_SUCCEED)
683 return msg;
684
685
686 // time the consolidation process in the background
687 if (MT_create_thread(&wlr_thread, WLRprocessScheduler, (void*) NULL,
688 MT_THR_DETACHED, "WLRprocessSched") < 0) {
689 throw(SQL,"wlr.init",SQLSTATE(42000) "Starting wlr manager failed");
690 }
691#ifdef _WLR_DEBUG_
692 fprintf(stderr,"#WLR scheduler forked\n");
693#else
694 (void) cntxt;
695#endif
696 // Wait until the replicator is properly initialized
697 while( wlr_state != WLR_RUN && wlr_error[0] == 0){
698#ifdef _WLR_DEBUG_
699 fprintf(stderr,"#WLR replicator initializing\n");
700#endif
701 MT_sleep_ms( 50);
702 }
703 return MAL_SUCCEED;
704}
705
706str
707WLRstop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
708{
709 (void) cntxt;
710 (void) mb;
711 (void) stk;
712 (void) pci;
713 // kill the replicator thread and reset for a new one
714#ifdef _WLR_DEBUG_
715 fprintf(stderr,"#WLR stop replication\n");
716#endif
717 MT_lock_set(&wlr_lock);
718 if( wlr_state == WLR_RUN)
719 wlr_state = WLR_STOP;
720 MT_lock_unset(&wlr_lock);
721
722 return MAL_SUCCEED;
723}
724
725str
726WLRgetclock(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
727{
728 str *ret = getArgReference_str(stk,pci,0);
729 str msg = MAL_SUCCEED;
730
731 (void) cntxt;
732 (void) mb;
733
734 if( WLRgetConfig())
735 return msg;
736 if( wlr_read[0])
737 *ret= GDKstrdup(wlr_read);
738 else *ret= GDKstrdup(str_nil);
739 if (*ret == NULL)
740 throw(MAL, "wlr.getreplicaclock", SQLSTATE(HY001) MAL_MALLOC_FAIL);
741 return msg;
742}
743
744str
745WLRgettick(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
746{
747 lng *ret = getArgReference_lng(stk,pci,0);
748 str msg = MAL_SUCCEED;
749
750 (void) cntxt;
751 (void) mb;
752
753 if( WLRgetConfig())
754 return msg;
755 *ret = wlr_tag;
756 return msg;
757}
758
759/* the replica cycle can be set to fixed interval.
760 * This allows for ensuring an up to date version every N seconds
761 */
762str
763WLRsetbeat(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
764{ int new;
765 (void) cntxt;
766 (void) mb;
767 new = *getArgReference_int(stk,pci,1);
768 if ( new < wlc_beat || new < 1)
769 throw(SQL,"replicatebeat",SQLSTATE(42000) "Cycle time should be larger then master or >= 1 second");
770 wlr_beat = new;
771 return MAL_SUCCEED;
772}
773
774static str
775WLRquery(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
776{ str qry = *getArgReference_str(stk,pci,1);
777 str msg = MAL_SUCCEED;
778 char *x, *y, *qtxt;
779
780 (void) mb;
781 if( cntxt->wlc_kind == WLC_ROLLBACK || cntxt->wlc_kind == WLC_ERROR)
782 return msg;
783 // execute the query in replay mode when required.
784 // we need to get rid of the escaped quote.
785 x = qtxt= (char*) GDKmalloc(strlen(qry) +1);
786 if( qtxt == NULL)
787 throw(SQL,"wlr.query",SQLSTATE(HY001) MAL_MALLOC_FAIL);
788 for(y = qry; *y; y++){
789 if( *y == '\\' ){
790 if( *(y+1) == '\'')
791 y += 1;
792 }
793 *x++ = *y;
794 }
795 *x = 0;
796 msg = SQLstatementIntern(cntxt, &qtxt, "SQLstatement", TRUE, TRUE, NULL);
797 GDKfree(qtxt);
798 return msg;
799}
800
801/* A change event need not be executed, because it is already captured
802 * in the update/append/delete
803 */
804str
805WLRaccept(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
806{
807 (void) cntxt;
808 (void) pci;
809 (void) stk;
810 (void) mb;
811 return MAL_SUCCEED;
812}
813
814str
815WLRcommit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
816{
817 (void) cntxt;
818 (void) pci;
819 (void) stk;
820 (void) mb;
821 return MAL_SUCCEED;
822}
823
824str
825WLRrollback(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
826{
827 (void) cntxt;
828 (void) pci;
829 (void) stk;
830 (void) mb;
831 return MAL_SUCCEED;
832}
833
834str
835WLRaction(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
836{
837 (void) cntxt;
838 (void) pci;
839 (void) stk;
840 (void) mb;
841 return MAL_SUCCEED;
842}
843
844str
845WLRcatalog(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
846{
847 return WLRquery(cntxt,mb,stk,pci);
848}
849
850str
851WLRgeneric(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
852{
853 // currently they are informative only
854 (void) cntxt;
855 (void) mb;
856 (void) stk;
857 (void) pci;
858 return MAL_SUCCEED;
859}
860
861/* TODO: Martin take a look at this.
862 *
863 * PSA: DO NOT USE THIS OUT OF WLRappend or very bad things will happen!
864 * (variable msg and tag cleanup will not be defined).
865 */
866#define WLRcolumn(TPE) \
867 for( i = 4; i < pci->argc; i++){ \
868 TPE val = *getArgReference_##TPE(stk,pci,i); \
869 if (BUNappend(ins, (void*) &val, false) != GDK_SUCCEED) { \
870 msg = createException(MAL, "WLRappend", "BUNappend failed"); \
871 goto cleanup; \
872 } \
873 }
874
875str
876WLRappend(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
877{
878 str sname, tname, cname;
879 int tpe,i;
880 mvc *m=NULL;
881 sql_schema *s;
882 sql_table *t;
883 sql_column *c;
884 BAT *ins = 0;
885 str msg = MAL_SUCCEED;
886
887 if( cntxt->wlc_kind == WLC_ROLLBACK || cntxt->wlc_kind == WLC_ERROR)
888 return msg;
889 sname = *getArgReference_str(stk,pci,1);
890 tname = *getArgReference_str(stk,pci,2);
891 cname = *getArgReference_str(stk,pci,3);
892
893 if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL)
894 return msg;
895 if ((msg = checkSQLContext(cntxt)) != NULL)
896 return msg;
897
898 s = mvc_bind_schema(m, sname);
899 if (s == NULL)
900 throw(SQL, "sql.append", SQLSTATE(3F000) "Schema missing %s",sname);
901 t = mvc_bind_table(m, s, tname);
902 if (t == NULL)
903 throw(SQL, "sql.append", SQLSTATE(42S02) "Table missing %s.%s",sname,tname);
904 // get the data into local BAT
905
906 tpe= getArgType(mb,pci,4);
907 ins = COLnew(0, tpe, 0, TRANSIENT);
908 if( ins == NULL){
909 throw(SQL,"WLRappend",SQLSTATE(HY001) MAL_MALLOC_FAIL);
910 }
911
912 switch(ATOMstorage(tpe)){
913 case TYPE_bit: WLRcolumn(bit); break;
914 case TYPE_bte: WLRcolumn(bte); break;
915 case TYPE_sht: WLRcolumn(sht); break;
916 case TYPE_int: WLRcolumn(int); break;
917 case TYPE_lng: WLRcolumn(lng); break;
918 case TYPE_oid: WLRcolumn(oid); break;
919 case TYPE_flt: WLRcolumn(flt); break;
920 case TYPE_dbl: WLRcolumn(dbl); break;
921#ifdef HAVE_HGE
922 case TYPE_hge: WLRcolumn(hge); break;
923#endif
924 case TYPE_str:
925 for( i = 4; i < pci->argc; i++){
926 str val = *getArgReference_str(stk,pci,i);
927 if (BUNappend(ins, (void*) val, false) != GDK_SUCCEED) {
928 msg = createException(MAL, "WLRappend", "BUNappend failed");
929 goto cleanup;
930 }
931 }
932 break;
933 }
934
935 if (cname[0] != '%' && (c = mvc_bind_column(m, t, cname)) != NULL) {
936 store_funcs.append_col(m->session->tr, c, ins, TYPE_bat);
937 } else if (cname[0] == '%') {
938 sql_idx *i = mvc_bind_idx(m, s, cname + 1);
939 if (i)
940 store_funcs.append_idx(m->session->tr, i, ins, tpe);
941 }
942cleanup:
943 BBPunfix(((BAT *) ins)->batCacheid);
944 return msg;
945}
946
947str
948WLRdelete(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
949{
950 str sname, tname;
951 int i;
952 mvc *m=NULL;
953 sql_schema *s;
954 sql_table *t;
955 BAT *ins = 0;
956 oid o;
957 str msg= MAL_SUCCEED;
958
959 if( cntxt->wlc_kind == WLC_ROLLBACK || cntxt->wlc_kind == WLC_ERROR)
960 return msg;
961 sname = *getArgReference_str(stk,pci,1);
962 tname = *getArgReference_str(stk,pci,2);
963
964 if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL)
965 return msg;
966 if ((msg = checkSQLContext(cntxt)) != NULL)
967 return msg;
968
969 s = mvc_bind_schema(m, sname);
970 if (s == NULL)
971 throw(SQL, "sql.append", SQLSTATE(3F000) "Schema missing %s",sname);
972 t = mvc_bind_table(m, s, tname);
973 if (t == NULL)
974 throw(SQL, "sql.append", SQLSTATE(42S02) "Table missing %s.%s",sname,tname);
975 // get the data into local BAT
976
977 ins = COLnew(0, TYPE_oid, 0, TRANSIENT);
978 if( ins == NULL){
979 throw(SQL,"WLRappend",SQLSTATE(HY001) MAL_MALLOC_FAIL);
980 }
981
982 for( i = 3; i < pci->argc; i++){
983 o = *getArgReference_oid(stk,pci,i);
984 if (BUNappend(ins, (void*) &o, false) != GDK_SUCCEED) {
985 msg = createException(MAL, "WLRdelete", "BUNappend failed");
986 goto cleanup;
987 }
988 }
989
990 store_funcs.delete_tab(m->session->tr, t, ins, TYPE_bat);
991cleanup:
992 BBPunfix(((BAT *) ins)->batCacheid);
993 return msg;
994}
995
996/* TODO: Martin take a look at this.
997 *
998 * PSA: DO NOT USE THIS OUT OF WLRupdate or very bad things will happen!
999 * (variable msg and tag cleanup will not be defined).
1000 */
1001#define WLRvalue(TPE) \
1002 { TPE val = *getArgReference_##TPE(stk,pci,5); \
1003 if (BUNappend(upd, (void*) &val, false) != GDK_SUCCEED) { \
1004 fprintf(stderr, "WLRupdate:BUNappend failed"); \
1005 goto cleanup; \
1006 } \
1007 }
1008
1009str
1010WLRupdate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1011{
1012 str sname, tname, cname;
1013 mvc *m=NULL;
1014 sql_schema *s;
1015 sql_table *t;
1016 sql_column *c;
1017 BAT *upd = 0, *tids=0;
1018 str msg= MAL_SUCCEED;
1019 oid o;
1020 int tpe = getArgType(mb,pci,5);
1021
1022 if( cntxt->wlc_kind == WLC_ROLLBACK || cntxt->wlc_kind == WLC_ERROR)
1023 return msg;
1024 sname = *getArgReference_str(stk,pci,1);
1025 tname = *getArgReference_str(stk,pci,2);
1026 cname = *getArgReference_str(stk,pci,3);
1027 o = *getArgReference_oid(stk,pci,4);
1028
1029 if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL)
1030 return msg;
1031 if ((msg = checkSQLContext(cntxt)) != NULL)
1032 return msg;
1033
1034 s = mvc_bind_schema(m, sname);
1035 if (s == NULL)
1036 throw(SQL, "sql.update", SQLSTATE(3F000) "Schema missing %s",sname);
1037 t = mvc_bind_table(m, s, tname);
1038 if (t == NULL)
1039 throw(SQL, "sql.update", SQLSTATE(42S02) "Table missing %s.%s",sname,tname);
1040 // get the data into local BAT
1041
1042 tids = COLnew(0, TYPE_oid, 0, TRANSIENT);
1043 if( tids == NULL){
1044 throw(SQL,"WLRupdate",SQLSTATE(HY001) MAL_MALLOC_FAIL);
1045 }
1046 upd = COLnew(0, tpe, 0, TRANSIENT);
1047 if( upd == NULL){
1048 BBPunfix(((BAT *) tids)->batCacheid);
1049 throw(SQL,"WLRupdate",SQLSTATE(HY001) MAL_MALLOC_FAIL);
1050 }
1051 if (BUNappend(tids, &o, false) != GDK_SUCCEED) {
1052 msg = createException(MAL, "WLRupdate", "BUNappend failed");
1053 goto cleanup;
1054 }
1055
1056 switch(ATOMstorage(tpe)){
1057 case TYPE_bit: WLRvalue(bit); break;
1058 case TYPE_bte: WLRvalue(bte); break;
1059 case TYPE_sht: WLRvalue(sht); break;
1060 case TYPE_int: WLRvalue(int); break;
1061 case TYPE_lng: WLRvalue(lng); break;
1062 case TYPE_oid: WLRvalue(oid); break;
1063 case TYPE_flt: WLRvalue(flt); break;
1064 case TYPE_dbl: WLRvalue(dbl); break;
1065#ifdef HAVE_HGE
1066 case TYPE_hge: WLRvalue(hge); break;
1067#endif
1068 case TYPE_str:
1069 {
1070 str val = *getArgReference_str(stk,pci,5);
1071 if (BUNappend(upd, (void*) val, false) != GDK_SUCCEED) {
1072 msg = createException(MAL, "WLRupdate", "BUNappend failed");
1073 goto cleanup;
1074 }
1075 }
1076 break;
1077 default:
1078 fprintf(stderr, "Missing type in WLRupdate");
1079 }
1080
1081 BATmsync(tids);
1082 BATmsync(upd);
1083 if (cname[0] != '%' && (c = mvc_bind_column(m, t, cname)) != NULL) {
1084 store_funcs.update_col(m->session->tr, c, tids, upd, TYPE_bat);
1085 } else if (cname[0] == '%') {
1086 sql_idx *i = mvc_bind_idx(m, s, cname + 1);
1087 if (i)
1088 store_funcs.update_idx(m->session->tr, i, tids, upd, TYPE_bat);
1089 }
1090
1091cleanup:
1092 BBPunfix(((BAT *) tids)->batCacheid);
1093 BBPunfix(((BAT *) upd)->batCacheid);
1094 return msg;
1095}
1096
1097str
1098WLRclear_table(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1099{
1100 sql_schema *s;
1101 sql_table *t;
1102 mvc *m = NULL;
1103 str msg= MAL_SUCCEED;
1104 str *sname = getArgReference_str(stk, pci, 1);
1105 str *tname = getArgReference_str(stk, pci, 2);
1106
1107 if( cntxt->wlc_kind == WLC_ROLLBACK || cntxt->wlc_kind == WLC_ERROR)
1108 return msg;
1109 if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL)
1110 return msg;
1111 if ((msg = checkSQLContext(cntxt)) != NULL)
1112 return msg;
1113 s = mvc_bind_schema(m, *sname);
1114 if (s == NULL)
1115 throw(SQL, "sql.clear_table", SQLSTATE(3F000) "Schema missing %s",*sname);
1116 t = mvc_bind_table(m, s, *tname);
1117 if (t == NULL)
1118 throw(SQL, "sql.clear_table", SQLSTATE(42S02) "Table missing %s.%s",*sname,*tname);
1119 (void) mvc_clear_table(m, t);
1120 return MAL_SUCCEED;
1121}
1122