1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9/*
10 * (c) 2017 Martin Kersten
11 * This module collects the workload-capture-replay statements during transaction execution,
12 * also known as asynchronous logical replication management. It can be used for
13 * multiple purposes: BACKUP, REPLICATION, and REPLAY
14 *
15 * For a BACKUP we need either a complete update log from the beginning, or
16 * a binary snapshot with a collection of logs recording its changes since.
17 * To ensure transaction ACID properties, the log record should be stored on
18 * disk within the transaction brackets, which may cause a serious IO load.
19 * (Tip, store these logs files on an SSD or NVM)
20 *
21 * For REPLICATION, also called a database clone or slave, we take a snapshot and the
22 * log files that reflect the recent changes. The log updates are replayed against
23 * the snapshot until a specific time point or transaction id is reached.
24 *
25 * Some systems also use the logical logs to REPLAY all (expensive) queries
26 * against the database. We skip this for the time being, as those queries
27 * can be captured already in the server.
28 * [A flag should be added to at least capture them]
29 *
30 * The goal of this module is to ease BACKUP and REPLICATION of a master database
31 * with a time-bounded delay. This means that both master and replica run at a certain beat
32 * (in seconds) by which information is made available or read by the replica.
33 *
34 * Such a replica is used in query workload sharing, database versioning, and (re-)partitioning.
35 * Tables taken from the master are not protected against local updates in the replica.
36 * However, any replay transaction that fails stops the cloning process.
37 * Furthermore, only persistent tables are considered for replication.
38 * Updates under the 'tmp' schema, i.e. temporary tables, are ignored.
39 *
40 * Simplicity and ease of end-user control has been the driving argument here.
41 *
42 * IMPLEMENTATION
43 * The underlying assumption of the techniques deployed is that the database
44 * resides on a proper (global/distributed) file system to guarantees recovery
45 * from most storage system related failures, e.g. using RAID disks or LSF systems.
46 *
47 * A database can be set into 'master' mode only once using the SQL command:
48 * CALL wrc_master.master() whose access permission is limited to the 'monetdb' user.[CHECK]
49 * An optional path to the log record directory can be given to reduce the IO latency,
50 * e.g. using a nearby SSD, or where there is ample of space to keep a long history,
51 * e.g. a HDD or cold storage location.
52 *
53 * By default, the command creates a directory .../dbfarm/dbname/wlc_logs to hold all logs
54 * and a configuration file .../dbfarm/dbname/wlc.config to hold the state of the transaction logs.
55 * It contains the following key=value pairs:
56 * snapshot=<path to a snapshot directory>
57 * logs=<path to the wlc log directory>
58 * state=<started, stopped>
59 * batches=<next available batch file to be applied>
60 * beat=<maximal delay between log files, in seconds>
61 * write=<timestamp of the last transaction recorded>
62 *
63 * A missing path to the snapshot denotes that we can start the clone with an empty database.
64 * The log files are stored as master/<dbname>_<batchnumber>. They belong to the snapshot.
65 *
66 * Each wlc log file contains a serial log of a number of committed compound transactions.
67 * The log records are represented as ordinary MAL statement blocks, which
68 * are executed in serial mode. (parallelism can be considered for large updates later)
69 * Each transaction job is identified by a unique id, its starting time, and the original responsible user.
70 * Each log-record should end with a commit to be allowed for re-execution.
71 * Log records with a rollback tag are merely for analysis by the DBA, their statements are ignored.
72 *
73 * A transaction log file is created by the master using a heartbeat (in seconds).
74 * A new transaction log file is published when the system has been collecting transaction records for some time.
75 * The beat can be set using a SQL command, e.g.
76 * CALL wcr_master.beat(duration)
77 * Setting it to zero leads to a log file per transaction and may cause a large log directory
78 * with thousands of small files.
79 * The default of 5 minutes should balance polling overhead in most practical situations.
80 * Intermittent flush() during this period ensures the committed log records survive
81 * a crash.
82 *
83 * A minor problem here is that we should ensure that the log file is closed even if there
84 * are no transactions running. It is solved with a separate monitor thread, which ensures
85 * that the a new log file is created at least after 'beat' seconds since the first logrecord was created.
86 * After closing, the replicas can see from the master configuration file that a new log batch is available.
87 *
88 * The final step is to close stop transaction logging with the command
89 * CALL wcr_master.stop().
90 * It typically is the end-of-life-time for a snapshot. For example, when planning to do
91 * a large bulk load of the database, stopping logging avoids a double write into the
92 * database. The database can only be brought back into master mode using a fresh snapshot.
93 *
94 * [It is not advicable to temporarily stop logging and continue afterwards, because then there
95 * is no guarantee the user will see a consistent database.]
96 *
97 * One of the key challenges for a DBA is to keep the log directory manageable, because it grows
98 * with the speed up updates being applied to the database. This calls for regularly checking
99 * for their disk footprint and taking a new snapshot as a frame of reference.
100 *
101 * [TODO A trigger should be added to stop logging and call for a fresh snapshot first]
102 * [TODO the batch files might include the snapshot id for ease of rebuild]
103 *
104 * The DBA tool 'monetdb' provides options to create a master and its replicas.
105 * It will also maintain the list of replicas for inspection and managing their drift.
106 * For example,
107 * monetdb master <dbname> [ <optional snapshot path>]
108 * which locks the database, takes a save copy, initializes the state chance to master.
109 *
110 * A fresh replica can be constructed as follows:
111 * monetdb replicate <dbname> <mastername>
112 *
113 * Instead of using the monetdb command line we can use the SQL calls directly
114 * sys.master() and sys.replicate(), provided we start with a fresh database.
115 *
116 * CLONE
117 *
118 * Every clone should start off with a copy of the binary snapshot identified by 'snapshot'.
119 * A fresh database can be turned into a clone using the call
120 * CALL wcr_replica.master('mastername')
121 * It will grab the latest snapshot of the master and applies all
122 * available log files before releasing the system.
123 * The master has no knowledge about the number of clones and their whereabouts.
124 *
125 * The clone process will iterate in the background through the log files,
126 * applying all update transactions.
127 *
128 * An optional timestamp or transaction id can be added to the replicate() command to
129 * apply the logs until a given moment. This is particularly handy when an unexpected
130 * desastrous user action (drop persistent table) has to be recovered from.
131 *
132 * CALL wcr_replica.master('mastername'); -- get logs from a specific master
133 * ...
134 * CALL wcr_replicate.replicate(tag); -- stops after we are in sink with tag
135 * ...
136 * CALL wcr_replicate.replicate(NOW()); -- stop after we sinked all transactions
137 * ...
138 * CALL wcr_replicate.replicate(); -- synchronize in background continuously
139 * ...
140 * CALL wcr_replicate.stop(); -- stop the synchroniation thread
141 *
142 * SELECT wcr_replica.clock();
143 * returns the timestamp of the last replicated transaction.
144 * SELECT wcr_replica.tick();
145 * returns the transaction id of the last replicated transaction.
146 * SELECT wcr_master.clock();
147 * return the timestamp of the last committed transaction in the master.
148 * SELECT wcr_master.tick();
149 * return the transaction id of the last committed transaction in the master.
150 *
151 * Any failure encountered during a log replay terminates the replication process,
152 * leaving a message in the merovingian log configuration.
153 *
154 * The wlc files purposely have a textual format derived from the MAL statements.
155 * This provides a stepping stone for remote execution later.
156 *
157 * [TODO consider the roll logging of SQL session variables, i.e. optimizer_pipe
158 * as part of the log record]
159 * For updates we don't need special care for this.
160 */
161#include "monetdb_config.h"
162#include <time.h>
163#include "mal_builder.h"
164#include "wlc.h"
165
166#undef _WLC_DEBUG_
167
168MT_Lock wlc_lock = MT_LOCK_INITIALIZER("wlc_lock");
169
170static char wlc_snapshot[FILENAME_MAX]; // The location of the snapshot against which the logs work
171static stream *wlc_fd = 0;
172
173// These properties are needed by the replica to direct the roll-forward.
174char wlc_dir[FILENAME_MAX]; // The location in the global file store for the logs
175char wlc_name[IDLENGTH]; // The master database name
176lng wlc_tag = 0; // next transaction id
177int wlc_state = 0; // The current status of the logger in the life cycle
178char wlc_write[26]; // The timestamp of the last committed transaction
179int wlc_batches = 0; // identifier of next batch
180int wlc_beat = 10; // maximal period covered by a single log file in seconds
181
182/* The database snapshots are binary copies of the dbfarm/database/bat
183 * New snapshots are created currently using the 'monetdb snapshot <db>' command
184 * or a SQL procedure.
185 *
186 * The wlc logs are stored in the snapshot directory as a time-stamped list
187 */
188
189int
190WLCused(void)
191{
192 return wlc_dir[0] != 0;
193}
194
195/* The master configuration file is a simple key=value table */
196str
197WLCreadConfig(FILE *fd)
198{
199 str msg = MAL_SUCCEED;
200 char path[FILENAME_MAX];
201 int len;
202
203 while( fgets(path, FILENAME_MAX, fd) ){
204 path[strlen(path)-1] = 0;
205 if( strncmp("logs=", path,5) == 0) {
206 len = snprintf(wlc_dir, FILENAME_MAX, "%s", path + 5);
207 if (len == -1 || len >= FILENAME_MAX) {
208 msg = createException(MAL, "wlc.readConfig", "logs config value is too large");
209 goto bailout;
210 }
211 }
212 if( strncmp("snapshot=", path,9) == 0) {
213 len = snprintf(wlc_snapshot, FILENAME_MAX, "%s", path + 9);
214 if (len == -1 || len >= FILENAME_MAX) {
215 msg = createException(MAL, "wlc.readConfig", "snapshot config value is too large");
216 goto bailout;
217 }
218 }
219 if( strncmp("tag=", path,4) == 0)
220 wlc_tag = atol(path+ 4);
221 if( strncmp("write=", path,6) == 0) {
222 len = snprintf(wlc_write, 26, "%s", path + 6);
223 if (len == -1 || len >= 26) {
224 msg = createException(MAL, "wlc.readConfig", "write config value is too large");
225 goto bailout;
226 }
227 }
228 if( strncmp("batches=", path, 8) == 0)
229 wlc_batches = atoi(path+ 8);
230 if( strncmp("beat=", path, 5) == 0)
231 wlc_beat = atoi(path+ 5);
232 if( strncmp("state=", path, 6) == 0)
233 wlc_state = atoi(path+ 6);
234 }
235bailout:
236 fclose(fd);
237 return msg;
238}
239
240str
241WLCgetConfig(void){
242 str l;
243 FILE *fd;
244
245 if((l = GDKfilepath(0,0,"wlc.config",0)) == NULL)
246 throw(MAL,"wlc.getConfig","Could not access wlc.config file\n");
247 fd = fopen(l,"r");
248 GDKfree(l);
249 if( fd == NULL)
250 throw(MAL,"wlc.getConfig","Could not access wlc.config file\n");
251 return WLCreadConfig(fd);
252}
253
254static
255str WLCsetConfig(void){
256 str path;
257 stream *fd;
258
259 /* be aware to be safe, on a failing fopen */
260 if((path = GDKfilepath(0,0,"wlc.config",0)) == NULL)
261 throw(MAL,"wlc.setConfig","Could not access wlc.config\n");
262 fd = open_wastream(path);
263 GDKfree(path);
264 if( fd == NULL)
265 throw(MAL,"wlc.setConfig","Could not access wlc.config\n");
266 if( wlc_snapshot[0] )
267 mnstr_printf(fd,"snapshot=%s\n", wlc_snapshot);
268 mnstr_printf(fd,"logs=%s\n", wlc_dir);
269 mnstr_printf(fd,"tag="LLFMT"\n", wlc_tag );
270 mnstr_printf(fd,"write=%s\n", wlc_write );
271 mnstr_printf(fd,"state=%d\n", wlc_state );
272 mnstr_printf(fd,"batches=%d\n", wlc_batches );
273 mnstr_printf(fd,"beat=%d\n", wlc_beat );
274 close_stream(fd);
275 return MAL_SUCCEED;
276}
277
278// creation of the logger file and updating the configuration file should be atomic !!!
279// The log files are marked with the database name. This allows for easy recognition later on.
280static str
281WLCsetlogger(void)
282{
283 int len;
284 char path[FILENAME_MAX];
285 str msg = MAL_SUCCEED;
286
287 if( wlc_dir[0] == 0)
288 throw(MAL,"wlc.setlogger","Path not initalized");
289 MT_lock_set(&wlc_lock);
290 len = snprintf(path,FILENAME_MAX,"%s%c%s_%012d", wlc_dir, DIR_SEP, wlc_name, wlc_batches);
291 if (len == -1 || len >= FILENAME_MAX) {
292 MT_lock_unset(&wlc_lock);
293 throw(MAL, "wlc.setlogger", "Logger filename path is too large");
294 }
295 wlc_fd = open_wastream(path);
296 if( wlc_fd == 0){
297 MT_lock_unset(&wlc_lock);
298 fprintf(stderr, "wlc.logger:Could not create %s\n",path);
299 throw(MAL,"wlc.logger","Could not create %s\n",path);
300 }
301
302 wlc_batches++;
303 msg = WLCsetConfig();
304 MT_lock_unset(&wlc_lock);
305 return msg;
306}
307
308/* force the current log file to its storage container */
309static str
310WLCcloselogger(void)
311{
312 if( wlc_fd == NULL)
313 return MAL_SUCCEED;
314 mnstr_flush(wlc_fd);
315 mnstr_fsync(wlc_fd);
316 close_stream(wlc_fd);
317 wlc_fd= NULL;
318 return WLCsetConfig();
319}
320
321/* force the current log file to its storage container, but dont create a new one yet */
322str
323WLCflush(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
324{
325 (void) cntxt;
326 (void) mb;
327 (void) stk;
328 (void) pci;
329 if( wlc_fd == NULL)
330 return MAL_SUCCEED;
331 mnstr_flush(wlc_fd);
332 mnstr_fsync(wlc_fd);
333 return WLCsetConfig();
334}
335
336void
337WLCreset(void)
338{
339 str msg = MAL_SUCCEED;
340 MT_lock_set(&wlc_lock);
341 msg = WLCcloselogger();
342 wlc_snapshot[0]=0;
343 wlc_dir[0]= 0;
344 wlc_name[0]= 0;
345 wlc_write[0] =0;
346 MT_lock_unset(&wlc_lock);
347 if(msg) //TODO we have to return a possible error message somehow
348 freeException(msg);
349}
350
351/*
352 * The WLClogger process ensures that log files are properly closed
353 * and released when their cycle time window has expired.
354 */
355
356static MT_Id wlc_logger;
357
358static void
359WLClogger(void *arg)
360{
361 int seconds;
362 str msg = MAL_SUCCEED;
363
364 (void) arg;
365 while(!GDKexiting()){
366 if( wlc_dir[0] && wlc_fd ){
367 MT_lock_set(&wlc_lock);
368 if((msg = WLCcloselogger()) != MAL_SUCCEED) {
369 fprintf(stderr, "%s",msg);
370 freeException(msg);
371 }
372 MT_lock_unset(&wlc_lock);
373 }
374 for( seconds = 0; (wlc_beat == 0 || seconds < wlc_beat) && ! GDKexiting(); seconds++)
375 MT_sleep_ms( 1000);
376 }
377}
378/*
379 * The existence of the master directory should be checked upon server restart.
380 * Then the master record information should be set and the WLClogger started.
381 */
382str
383WLCinit(void)
384{
385 str conf, msg= MAL_SUCCEED;
386 int len;
387
388 if( wlc_state == WLC_STARTUP){
389 // use default location for master configuration file
390 if((conf = GDKfilepath(0,0,"wlc.config",0)) == NULL)
391 throw(MAL,"wlc.init","Could not access wlc.config\n");
392
393 if( access(conf,F_OK) ){
394 GDKfree(conf);
395 return MAL_SUCCEED;
396 }
397 GDKfree(conf);
398 // we are in master mode
399 len = snprintf(wlc_name, IDLENGTH, "%s", GDKgetenv("gdk_dbname"));
400 if (len == -1 || len >= IDLENGTH)
401 throw(MAL, "wlc.init", "gdk_dbname variable is too large");
402
403 msg = WLCgetConfig();
404 if( msg)
405 fprintf(stderr, "%s",msg);
406 if (MT_create_thread(&wlc_logger, WLClogger , (void*) 0,
407 MT_THR_DETACHED, "WLClogger") < 0) {
408 fprintf(stderr, "wlc.logger thread could not be spawned");
409 }
410 }
411 return MAL_SUCCEED;
412}
413
414str
415WLCinitCmd(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
416{
417 (void) cntxt;
418 (void) mb;
419 (void) stk;
420 (void) pci;
421 return WLCinit();
422}
423
424str
425WLCgetclock(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
426{ str *ret = getArgReference_str(stk,pci,0);
427 (void) cntxt;
428 (void) mb;
429 if( wlc_write[0])
430 *ret = GDKstrdup(wlc_write);
431 else
432 *ret = GDKstrdup(str_nil);
433 if(*ret == NULL)
434 throw(MAL,"wlc.getclock", MAL_MALLOC_FAIL);
435 return MAL_SUCCEED;
436}
437
438str
439WLCgettick(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
440{ lng *ret = getArgReference_lng(stk,pci,0);
441 (void) cntxt;
442 (void) mb;
443 *ret = wlc_tag;
444 return MAL_SUCCEED;
445}
446
447/* Changing the beat should have immediate effect
448 * It forces a new log file
449 */
450str
451WLCsetbeat(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
452{ int beat;
453 (void) mb;
454 (void) cntxt;
455 beat = * getArgReference_int(stk,pci,1);
456 if ( beat < 0)
457 throw(MAL, "wlc.setbeat", "beat should be a positive number");
458 wlc_beat = beat;
459 return WLCcloselogger();
460}
461
462str
463WLCgetbeat(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
464{ int *ret = getArgReference_int(stk,pci,0);
465 (void) mb;
466 (void) cntxt;
467 *ret = wlc_beat;
468 return MAL_SUCCEED;
469}
470
471str
472WLCmaster(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
473{
474 int len;
475 char path[FILENAME_MAX];
476 str l;
477
478 (void) cntxt;
479 (void) mb;
480 if( wlc_state == WLC_STOP)
481 throw(MAL,"master","WARNING: logging has been stopped. Use new snapshot");
482 if( wlc_state == WLC_RUN)
483 throw(MAL,"master","WARNING: already in master mode, call ignored");
484 if( pci->argc == 2) {
485 len = snprintf(path, FILENAME_MAX, "%s", *getArgReference_str(stk, pci,1));
486 if (len == -1 || len >= FILENAME_MAX)
487 throw(MAL, "wlc.master", "wlc master filename path is too large");
488 } else {
489 if((l = GDKfilepath(0,0,"wlc_logs",0)) == NULL)
490 throw(SQL,"wlc.master", MAL_MALLOC_FAIL);
491 len = snprintf(path,FILENAME_MAX,"%s%c",l, DIR_SEP);
492 GDKfree(l);
493 if (len == -1 || len >= FILENAME_MAX)
494 throw(MAL, "wlc.master", "wlc master filename path is too large");
495 }
496 // set location for logs
497 if( GDKcreatedir(path) == GDK_FAIL)
498 throw(SQL,"wlc.master","Could not create %s\n", path);
499 len = snprintf(wlc_name, IDLENGTH, "%s", GDKgetenv("gdk_dbname"));
500 if (len == -1 || len >= IDLENGTH)
501 throw(SQL,"wlc.master","gdk_dbname is too large");
502 len = snprintf(wlc_dir, FILENAME_MAX, "%s", path);
503 if (len == -1 || len >= FILENAME_MAX)
504 throw(SQL,"wlc.master","wlc_dir directory name is too large");
505 wlc_state= WLC_RUN;
506 return WLCsetConfig();
507}
508
509str
510WLCstop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
511{
512 (void) cntxt;
513 (void) mb;
514 (void) stk;
515 (void) pci;
516 if( wlc_state != WLC_RUN )
517 throw(MAL,"wlc.stop","WARNING: master role not active");
518 wlc_state = WLC_STOP;
519 return WLCsetConfig();
520}
521
522static str
523WLCsettime(Client cntxt, InstrPtr pci, InstrPtr p, str fcn)
524{
525 struct timeval clock;
526 time_t clk ;
527 struct tm ctm;
528 char wlc_time[26];
529
530 (void) pci;
531 if(gettimeofday(&clock,NULL) == -1)
532 throw(MAL, fcn, "Unable to retrieve current time");
533 clk = clock.tv_sec;
534#ifdef HAVE_LOCALTIME_R
535 (void) localtime_r(&clk, &ctm);
536#else
537 ctm = *localtime(&clk);
538#endif
539 strftime(wlc_time, sizeof(wlc_time), "%Y-%m-%dT%H:%M:%S.000",&ctm);
540 if (pushStr(cntxt->wlc, p, wlc_time) == NULL)
541 throw(MAL, fcn, MAL_MALLOC_FAIL);
542 return MAL_SUCCEED;
543}
544
545/* Beware that a client context can be used in parallel and
546 * that we don't want transaction interference caused by merging
547 * the MAL instructions accidentally.
548 * The effectively means that the SQL transaction record should
549 * collect the MAL instructions and flush them.
550 */
551static str
552WLCpreparewrite(Client cntxt)
553{ str msg = MAL_SUCCEED;
554 // save the wlc record on a file
555#ifdef _WLC_DEBUG_
556 if( cntxt->wlc){
557 fprintf(stderr,"#WLCpreparewrite: %d %d\n", cntxt->wlc->stop , cntxt->wlc_kind);
558 fprintFunction(stderr, cntxt->wlc, 0, LIST_MAL_DEBUG );
559 }
560#endif
561 if( cntxt->wlc == 0 || cntxt->wlc->stop <= 1 || cntxt->wlc_kind == WLC_QUERY )
562 return MAL_SUCCEED;
563
564 if( wlc_state != WLC_RUN){
565#ifdef _WLC_DEBUG_
566 fprintf(stderr,"#WLCprepare: state %d\n", wlc_state);
567#endif
568 trimMalVariables(cntxt->wlc, NULL);
569 resetMalBlk(cntxt->wlc, 0);
570 cntxt->wlc_kind = WLC_QUERY;
571 return MAL_SUCCEED;
572 }
573 if( wlc_dir[0] ){
574 if (wlc_fd == NULL){
575 msg = WLCsetlogger();
576 if( msg) {
577#ifdef _WLC_DEBUG_
578 fprintf(stderr,"#WLCprepare: setlogger %s \n", msg);
579#endif
580 return msg;
581 }
582 }
583
584 MT_lock_set(&wlc_lock);
585 printFunction(wlc_fd, cntxt->wlc, 0, LIST_MAL_DEBUG );
586 (void) mnstr_flush(wlc_fd);
587 // close file if no delay is allowed
588 if( wlc_beat == 0 )
589 msg = WLCcloselogger();
590
591 MT_lock_unset(&wlc_lock);
592 trimMalVariables(cntxt->wlc, NULL);
593 resetMalBlk(cntxt->wlc, 0);
594 cntxt->wlc_kind = WLC_QUERY;
595 } else
596 throw(MAL,"wlc.write","WLC log path missing ");
597
598#ifdef _WLC_DEBUG_
599 fprintFunction(stderr, cntxt->wlc, 0, LIST_MAL_ALL );
600#endif
601 if( wlc_state == WLC_STOP)
602 throw(MAL,"wlc.write","Logging for this snapshot has been stopped. Use a new snapshot to continue logging.");
603 return msg;
604}
605
606static str
607WLCstart(Client cntxt, str fcn)
608{
609 InstrPtr pci;
610 str msg = MAL_SUCCEED;
611 MalBlkPtr mb = cntxt->wlc;
612 lng tag;
613
614 if( cntxt->wlc == NULL){
615 if((cntxt->wlc = newMalBlk(STMT_INCREMENT)) == NULL)
616 throw(MAL, fcn, MAL_MALLOC_FAIL);
617 mb = cntxt->wlc;
618 }
619 /* Find a single transaction sequence ending with COMMIT or ROLLBACK */
620 if( mb->stop > 1 ){
621 pci = getInstrPtr(mb, mb->stop -1 );
622 if ( ! (strcmp( getFunctionId(pci), "commit") == 0 || strcmp( getFunctionId(pci), "rollback") == 0))
623 return MAL_SUCCEED;
624 }
625
626 /* create the start of a new transaction block */
627 MT_lock_set(&wlc_lock);
628 tag = wlc_tag;
629 wlc_tag++; // Update wlc administration
630
631 pci = newStmt(mb,"wlr", "transaction");
632 pci = pushLng(mb, pci, tag);
633 if((msg = WLCsettime(cntxt,pci, pci, fcn)) == MAL_SUCCEED) {
634 snprintf(wlc_write, 26, "%s", getVarConstant(cntxt->wlc, getArg(pci, 2)).val.sval);
635 pci = pushStr(mb, pci, cntxt->username);
636 pci->ticks = GDKms();
637 }
638 MT_lock_unset(&wlc_lock);
639
640 return msg;
641}
642
643str
644WLCtransaction(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
645{
646 (void) cntxt;
647 (void) mb;
648 (void) stk;
649 (void) pci;
650
651 return MAL_SUCCEED;
652}
653
654str
655WLCquery(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
656{
657 InstrPtr p;
658 str msg = MAL_SUCCEED;
659
660 (void) stk;
661 if ( strcmp("-- no query",getVarConstant(mb, getArg(pci,1)).val.sval) == 0)
662 return MAL_SUCCEED; // ignore system internal queries.
663 msg = WLCstart(cntxt, "wlr.query");
664 if(msg)
665 return msg;
666 cntxt->wlc_kind = WLC_QUERY;
667 p = newStmt(cntxt->wlc, "wlr","query");
668 p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
669 return msg;
670}
671
672str
673WLCcatalog(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
674{
675 InstrPtr p;
676 str msg = MAL_SUCCEED;
677
678 (void) stk;
679 msg = WLCstart(cntxt, "wlr.catalog");
680 if(msg)
681 return msg;
682 cntxt->wlc_kind = WLC_CATALOG;
683 p = newStmt(cntxt->wlc, "wlr","catalog");
684 p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
685 return msg;
686}
687
688str
689WLCaction(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
690{
691 InstrPtr p;
692 str msg = MAL_SUCCEED;
693
694 (void) stk;
695 msg = WLCstart(cntxt, "wlr.action");
696 if(msg)
697 return msg;
698 cntxt->wlc_kind = WLC_UPDATE;
699 p = newStmt(cntxt->wlc, "wlr","action");
700 p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
701 return msg;
702}
703
704/*
705 * We actually don't need the catalog operations in the log.
706 * It is sufficient to upgrade the replay block to WLR_CATALOG.
707 */
708str
709WLCgeneric(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
710{
711 InstrPtr p;
712 int i, tpe, varid;
713 str msg = MAL_SUCCEED;
714
715 (void) stk;
716 msg = WLCstart(cntxt, "wlr.generic");
717 if(msg)
718 return msg;
719 cntxt->wlc_kind = WLC_IGNORE;
720 p = newStmt(cntxt->wlc, "wlr",getFunctionId(pci));
721 for( i = pci->retc; i< pci->argc; i++){
722 tpe =getArgType(mb, pci, i);
723 switch(tpe){
724 case TYPE_str:
725 p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci, i)).val.sval);
726 break;
727 default:
728 varid = defConstant(cntxt->wlc, tpe, getArgReference(stk, pci, i));
729 p = pushArgument(cntxt->wlc, p, varid);
730 }
731 }
732 p->ticks = GDKms();
733 cntxt->wlc_kind = WLC_CATALOG;
734 return msg;
735}
736
737#define bulk(TPE1, TPE2)\
738{ TPE1 *p = (TPE1 *) Tloc(b,0);\
739 TPE1 *q = (TPE1 *) Tloc(b, BUNlast(b));\
740 int k=0; \
741 for( ; p < q; p++, k++){\
742 if( k % 32 == 31){\
743 pci = newStmt(cntxt->wlc, "wlr",getFunctionId(pci));\
744 pci = pushStr(cntxt->wlc, pci, sch);\
745 pci = pushStr(cntxt->wlc, pci, tbl);\
746 pci = pushStr(cntxt->wlc, pci, col);\
747 pci->ticks = GDKms();\
748 }\
749 pci = push##TPE2(cntxt->wlc, pci ,*p);\
750} }
751
752#define updateBatch(TPE1,TPE2)\
753{ TPE1 *x = (TPE1 *) Tloc(bval,0);\
754 TPE1 *y = (TPE1 *) Tloc(bval, BUNlast(b));\
755 int k=0; \
756 for( ; x < y; x++, k++){\
757 p = newStmt(cntxt->wlc, "wlr","update");\
758 p = pushStr(cntxt->wlc, p, sch);\
759 p = pushStr(cntxt->wlc, p, tbl);\
760 p = pushStr(cntxt->wlc, p, col);\
761 p = pushOid(cntxt->wlc, p, (ol? *ol++: o++));\
762 p = push##TPE2(cntxt->wlc, p ,*x);\
763} }
764
765static str
766WLCdatashipping(Client cntxt, MalBlkPtr mb, InstrPtr pci, int bid)
767{ BAT *b;
768 str sch, tbl, col;
769 str msg = MAL_SUCCEED;
770 (void) mb;
771
772 b = BATdescriptor(bid);
773 if (b == NULL) {
774 throw(MAL, "wlc.datashipping", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
775 }
776
777// large BATs can also be re-created using the query.
778// Copy into should always be expanded, because the source may not
779// be accessible in the replica. TODO
780
781 sch = GDKstrdup(getVarConstant(cntxt->wlc, getArg(pci,1)).val.sval);
782 tbl = GDKstrdup(getVarConstant(cntxt->wlc, getArg(pci,2)).val.sval);
783 col = GDKstrdup(getVarConstant(cntxt->wlc, getArg(pci,3)).val.sval);
784 if(!sch || !tbl || !col) {
785 msg = createException(MAL, "wlc.datashipping", SQLSTATE(HY001) MAL_MALLOC_FAIL);
786 goto finish;
787 }
788 if (cntxt->wlc_kind < WLC_UPDATE)
789 cntxt->wlc_kind = WLC_UPDATE;
790 switch( ATOMstorage(b->ttype)){
791 case TYPE_bit: bulk(bit,Bit); break;
792 case TYPE_bte: bulk(bte,Bte); break;
793 case TYPE_sht: bulk(sht,Sht); break;
794 case TYPE_int: bulk(int,Int); break;
795 case TYPE_lng: bulk(lng,Lng); break;
796 case TYPE_flt: bulk(flt,Flt); break;
797 case TYPE_dbl: bulk(dbl,Dbl); break;
798#ifdef HAVE_HGE
799 case TYPE_hge: bulk(hge,Hge); break;
800#endif
801 case TYPE_str:
802 { BATiter bi;
803 BUN p,q;
804 int k=0;
805 bi= bat_iterator(b);
806 BATloop(b,p,q){
807 if( k % 32 == 31){
808 pci = newStmt(cntxt->wlc, "wlr",getFunctionId(pci));
809 pci = pushStr(cntxt->wlc, pci, sch);
810 pci = pushStr(cntxt->wlc, pci, tbl);
811 pci = pushStr(cntxt->wlc, pci, col);
812 }
813 k++;
814 pci = pushStr(cntxt->wlc, pci ,(str) BUNtvar(bi,p));
815 } }
816 break;
817 default:
818 fprintf(stderr, "#wlc datashipping, non-supported type %d\n", ATOMstorage(b->ttype));
819 cntxt->wlc_kind = WLC_CATALOG;
820 }
821finish:
822 BBPunfix(b->batCacheid);
823 if (sch)
824 GDKfree(sch);
825 if (tbl)
826 GDKfree(tbl);
827 if (col)
828 GDKfree(col);
829 return msg;
830}
831
832str
833WLCappend(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
834{
835 InstrPtr p;
836 int tpe, varid;
837 str msg = MAL_SUCCEED;
838
839 (void) stk;
840 (void) mb;
841 msg = WLCstart(cntxt, "wlr.append");
842 if(msg)
843 return msg;
844 p = newStmt(cntxt->wlc, "wlr","append");
845 p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
846 p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,2)).val.sval);
847 p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,3)).val.sval);
848
849 // extend the instructions with all values.
850 // If this become too large we can always switch to a "catalog" mode
851 // forcing re-execution instead
852 tpe= getArgType(mb,pci,4);
853 if (isaBatType(tpe) ){
854 // actually check the size of the BAT first, most have few elements
855 msg = WLCdatashipping(cntxt, mb, p, stk->stk[getArg(pci,4)].val.bval);
856 } else {
857 ValRecord cst;
858 if (VALcopy(&cst, getArgReference(stk,pci,4)) != NULL){
859 varid = defConstant(cntxt->wlc, tpe, &cst);
860 p = pushArgument(cntxt->wlc, p, varid);
861 }
862 }
863 if( cntxt->wlc_kind < WLC_UPDATE)
864 cntxt->wlc_kind = WLC_UPDATE;
865
866 return msg;
867}
868
869/* check for empty BATs first */
870str
871WLCdelete(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
872{ InstrPtr p;
873 int tpe, k = 0;
874 int bid = stk->stk[getArg(pci,3)].val.bval;
875 oid o=0, last, *ol;
876 BAT *b;
877 str msg = MAL_SUCCEED;
878
879 (void) stk;
880 (void) mb;
881 b= BBPquickdesc(bid, false);
882 if( BATcount(b) == 0)
883 return MAL_SUCCEED;
884 msg = WLCstart(cntxt, "wlr.delete");
885 if(msg) {
886 BBPunfix(b->batCacheid);
887 return msg;
888 }
889 cntxt->wlc_kind = WLC_UPDATE;
890 p = newStmt(cntxt->wlc, "wlr","delete");
891 p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
892 p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,2)).val.sval);
893
894 tpe= getArgType(mb,pci,3);
895 if (isaBatType(tpe) ){
896 b= BATdescriptor(bid);
897 if (b == NULL)
898 throw(MAL, "wlc.delete", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
899 o = b->tseqbase;
900 last = o + BATcount(b);
901 if( b->ttype == TYPE_void){
902 for( ; o < last; o++, k++){
903 if( k%32 == 31){
904 p = newStmt(cntxt->wlc, "wlr","delete");
905 p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
906 p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,2)).val.sval);
907 }
908 p = pushOid(cntxt->wlc,p, o);
909 }
910 } else {
911 ol = (oid*) Tloc(b,0);
912 for( ; o < last; o++, k++){
913 if( k%32 == 31){
914 p = newStmt(cntxt->wlc, "wlr","delete");
915 p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
916 p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,2)).val.sval);
917 }
918 p = pushOid(cntxt->wlc,p, *ol);
919 }
920 }
921 BBPunfix(b->batCacheid);
922 } else
923 throw(MAL,"wlc.delete","BAT expected");
924 if( cntxt->wlc_kind < WLC_UPDATE)
925 cntxt->wlc_kind = WLC_UPDATE;
926
927 return msg;
928}
929
930str
931WLCupdate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
932{ InstrPtr p;
933 str sch,tbl,col, msg = MAL_SUCCEED;
934 ValRecord cst;
935 int tpe, varid;
936 oid o = 0, *ol = 0;
937
938 sch = *getArgReference_str(stk,pci,1);
939 tbl = *getArgReference_str(stk,pci,2);
940 col = *getArgReference_str(stk,pci,3);
941 msg = WLCstart(cntxt, "wlr.update");
942 if(msg)
943 return msg;
944 cntxt->wlc_kind = WLC_UPDATE;
945 tpe= getArgType(mb,pci,5);
946 if (isaBatType(tpe) ){
947 BAT *b, *bval;
948 b= BATdescriptor(stk->stk[getArg(pci,4)].val.bval);
949 bval= BATdescriptor(stk->stk[getArg(pci,5)].val.bval);
950 if(b == NULL || bval == NULL) {
951 if(b)
952 BBPunfix(b->batCacheid);
953 if(bval)
954 BBPunfix(bval->batCacheid);
955 throw(MAL, "wlr.update", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
956 }
957 if( b->ttype == TYPE_void)
958 o = b->tseqbase;
959 else
960 ol = (oid*) Tloc(b,0);
961 switch( ATOMstorage(bval->ttype)){
962 case TYPE_bit: updateBatch(bit,Bit); break;
963 case TYPE_bte: updateBatch(bte,Bte); break;
964 case TYPE_sht: updateBatch(sht,Sht); break;
965 case TYPE_int: updateBatch(int,Int); break;
966 case TYPE_lng: updateBatch(lng,Lng); break;
967 case TYPE_flt: updateBatch(flt,Flt); break;
968 case TYPE_dbl: updateBatch(dbl,Dbl); break;
969#ifdef HAVE_HGE
970 case TYPE_hge: updateBatch(hge,Hge); break;
971#endif
972 case TYPE_str:
973 { BATiter bi;
974 int k=0;
975 BUN x,y;
976 bi = bat_iterator(bval);
977 BATloop(bval,x,y){
978 p = newStmt(cntxt->wlc, "wlr","update");
979 p = pushStr(cntxt->wlc, p, sch);
980 p = pushStr(cntxt->wlc, p, tbl);
981 p = pushStr(cntxt->wlc, p, col);
982 p = pushOid(cntxt->wlc, p, (ol? *ol++ : o++));
983 p = pushStr(cntxt->wlc, p , BUNtvar(bi,x));
984 k++;
985 } }
986 /* fall through */
987 default:
988 cntxt->wlc_kind = WLC_CATALOG;
989 }
990 BBPunfix(b->batCacheid);
991 } else {
992 p = newStmt(cntxt->wlc, "wlr","update");
993 p = pushStr(cntxt->wlc, p, sch);
994 p = pushStr(cntxt->wlc, p, tbl);
995 p = pushStr(cntxt->wlc, p, col);
996 o = *getArgReference_oid(stk,pci,4);
997 p = pushOid(cntxt->wlc,p, o);
998 if (VALcopy(&cst, getArgReference(stk,pci,5)) != NULL){
999 varid = defConstant(cntxt->wlc, tpe, &cst);
1000 p = pushArgument(cntxt->wlc, p, varid);
1001 }
1002 }
1003
1004 if( cntxt->wlc_kind < WLC_UPDATE)
1005 cntxt->wlc_kind = WLC_UPDATE;
1006 return msg;
1007}
1008
1009str
1010WLCclear_table(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1011{
1012 InstrPtr p;
1013 str msg = MAL_SUCCEED;
1014 (void) stk;
1015 msg = WLCstart(cntxt, "wlr.clear_table");
1016 if(msg)
1017 return msg;
1018 cntxt->wlc_kind = WLC_UPDATE;
1019 p = newStmt(cntxt->wlc, "wlr","clear_table");
1020 p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
1021 p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,2)).val.sval);
1022 if( cntxt->wlc_kind < WLC_UPDATE)
1023 cntxt->wlc_kind = WLC_UPDATE;
1024
1025 return msg;
1026}
1027
1028
1029str
1030WLCcommit(int clientid)
1031{
1032 if( mal_clients[clientid].wlc && mal_clients[clientid].wlc->stop > 1){
1033 newStmt(mal_clients[clientid].wlc,"wlr","commit");
1034 return WLCpreparewrite( &mal_clients[clientid]);
1035 }
1036 return MAL_SUCCEED;
1037}
1038
1039str
1040WLCcommitCmd(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1041{ str msg = MAL_SUCCEED;
1042 msg = WLCstart(cntxt, "wlr.commit");
1043 if(msg)
1044 return msg;
1045 (void) mb;
1046 (void) stk;
1047 (void) pci;
1048 cntxt->wlc_kind = WLC_UPDATE;
1049 return WLCcommit(cntxt->idx);
1050}
1051
1052str
1053WLCrollback(int clientid)
1054{
1055 if( mal_clients[clientid].wlc){
1056 newStmt(mal_clients[clientid].wlc,"wlr","rollback");
1057 return WLCpreparewrite( &mal_clients[clientid]);
1058 }
1059 return MAL_SUCCEED;
1060}
1061str
1062WLCrollbackCmd(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1063{ str msg = MAL_SUCCEED;
1064 msg = WLCstart(cntxt, "wlr.rollback");
1065 if(msg)
1066 return msg;
1067 (void) mb;
1068 (void) stk;
1069 (void) pci;
1070 cntxt->wlc_kind = WLC_UPDATE;
1071 return WLCrollback(cntxt->idx);
1072}
1073