1/*
2 * pgbench.c
3 *
4 * A simple benchmark program for PostgreSQL
5 * Originally written by Tatsuo Ishii and enhanced by many contributors.
6 *
7 * src/bin/pgbench/pgbench.c
8 * Copyright (c) 2000-2019, PostgreSQL Global Development Group
9 * ALL RIGHTS RESERVED;
10 *
11 * Permission to use, copy, modify, and distribute this software and its
12 * documentation for any purpose, without fee, and without a written agreement
13 * is hereby granted, provided that the above copyright notice and this
14 * paragraph and the following two paragraphs appear in all copies.
15 *
16 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20 * POSSIBILITY OF SUCH DAMAGE.
21 *
22 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
25 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
27 *
28 */
29
30#ifdef WIN32
31#define FD_SETSIZE 1024 /* must set before winsock2.h is included */
32#endif
33
34#include "postgres_fe.h"
35#include "common/int.h"
36#include "common/logging.h"
37#include "fe_utils/conditional.h"
38#include "getopt_long.h"
39#include "libpq-fe.h"
40#include "portability/instr_time.h"
41
42#include <ctype.h>
43#include <float.h>
44#include <limits.h>
45#include <math.h>
46#include <signal.h>
47#include <time.h>
48#include <sys/time.h>
49#ifdef HAVE_SYS_RESOURCE_H
50#include <sys/resource.h> /* for getrlimit */
51#endif
52
53/* For testing, PGBENCH_USE_SELECT can be defined to force use of that code */
54#if defined(HAVE_PPOLL) && !defined(PGBENCH_USE_SELECT)
55#define POLL_USING_PPOLL
56#ifdef HAVE_POLL_H
57#include <poll.h>
58#endif
59#else /* no ppoll(), so use select() */
60#define POLL_USING_SELECT
61#ifdef HAVE_SYS_SELECT_H
62#include <sys/select.h>
63#endif
64#endif
65
66#ifndef M_PI
67#define M_PI 3.14159265358979323846
68#endif
69
70#include "pgbench.h"
71
72#define ERRCODE_UNDEFINED_TABLE "42P01"
73
74/*
75 * Hashing constants
76 */
77#define FNV_PRIME UINT64CONST(0x100000001b3)
78#define FNV_OFFSET_BASIS UINT64CONST(0xcbf29ce484222325)
79#define MM2_MUL UINT64CONST(0xc6a4a7935bd1e995)
80#define MM2_MUL_TIMES_8 UINT64CONST(0x35253c9ade8f4ca8)
81#define MM2_ROT 47
82
83/*
84 * Multi-platform socket set implementations
85 */
86
87#ifdef POLL_USING_PPOLL
88#define SOCKET_WAIT_METHOD "ppoll"
89
90typedef struct socket_set
91{
92 int maxfds; /* allocated length of pollfds[] array */
93 int curfds; /* number currently in use */
94 struct pollfd pollfds[FLEXIBLE_ARRAY_MEMBER];
95} socket_set;
96
97#endif /* POLL_USING_PPOLL */
98
99#ifdef POLL_USING_SELECT
100#define SOCKET_WAIT_METHOD "select"
101
102typedef struct socket_set
103{
104 int maxfd; /* largest FD currently set in fds */
105 fd_set fds;
106} socket_set;
107
108#endif /* POLL_USING_SELECT */
109
110/*
111 * Multi-platform pthread implementations
112 */
113
114#ifdef WIN32
115/* Use native win32 threads on Windows */
116typedef struct win32_pthread *pthread_t;
117typedef int pthread_attr_t;
118
119static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
120static int pthread_join(pthread_t th, void **thread_return);
121#elif defined(ENABLE_THREAD_SAFETY)
122/* Use platform-dependent pthread capability */
123#include <pthread.h>
124#else
125/* No threads implementation, use none (-j 1) */
126#define pthread_t void *
127#endif
128
129
130/********************************************************************
131 * some configurable parameters */
132
133#define DEFAULT_INIT_STEPS "dtgvp" /* default -I setting */
134
135#define LOG_STEP_SECONDS 5 /* seconds between log messages */
136#define DEFAULT_NXACTS 10 /* default nxacts */
137
138#define MIN_GAUSSIAN_PARAM 2.0 /* minimum parameter for gauss */
139
140#define MIN_ZIPFIAN_PARAM 1.001 /* minimum parameter for zipfian */
141#define MAX_ZIPFIAN_PARAM 1000.0 /* maximum parameter for zipfian */
142
143int nxacts = 0; /* number of transactions per client */
144int duration = 0; /* duration in seconds */
145int64 end_time = 0; /* when to stop in micro seconds, under -T */
146
147/*
148 * scaling factor. for example, scale = 10 will make 1000000 tuples in
149 * pgbench_accounts table.
150 */
151int scale = 1;
152
153/*
154 * fillfactor. for example, fillfactor = 90 will use only 90 percent
155 * space during inserts and leave 10 percent free.
156 */
157int fillfactor = 100;
158
159/*
160 * use unlogged tables?
161 */
162bool unlogged_tables = false;
163
164/*
165 * log sampling rate (1.0 = log everything, 0.0 = option not given)
166 */
167double sample_rate = 0.0;
168
169/*
170 * When threads are throttled to a given rate limit, this is the target delay
171 * to reach that rate in usec. 0 is the default and means no throttling.
172 */
173double throttle_delay = 0;
174
175/*
176 * Transactions which take longer than this limit (in usec) are counted as
177 * late, and reported as such, although they are completed anyway. When
178 * throttling is enabled, execution time slots that are more than this late
179 * are skipped altogether, and counted separately.
180 */
181int64 latency_limit = 0;
182
183/*
184 * tablespace selection
185 */
186char *tablespace = NULL;
187char *index_tablespace = NULL;
188
189/* random seed used to initialize base_random_sequence */
190int64 random_seed = -1;
191
192/*
193 * end of configurable parameters
194 *********************************************************************/
195
196#define nbranches 1 /* Makes little sense to change this. Change
197 * -s instead */
198#define ntellers 10
199#define naccounts 100000
200
201/*
202 * The scale factor at/beyond which 32bit integers are incapable of storing
203 * 64bit values.
204 *
205 * Although the actual threshold is 21474, we use 20000 because it is easier to
206 * document and remember, and isn't that far away from the real threshold.
207 */
208#define SCALE_32BIT_THRESHOLD 20000
209
210bool use_log; /* log transaction latencies to a file */
211bool use_quiet; /* quiet logging onto stderr */
212int agg_interval; /* log aggregates instead of individual
213 * transactions */
214bool per_script_stats = false; /* whether to collect stats per script */
215int progress = 0; /* thread progress report every this seconds */
216bool progress_timestamp = false; /* progress report with Unix time */
217int nclients = 1; /* number of clients */
218int nthreads = 1; /* number of threads */
219bool is_connect; /* establish connection for each transaction */
220bool report_per_command; /* report per-command latencies */
221int main_pid; /* main process id used in log filename */
222
223char *pghost = "";
224char *pgport = "";
225char *login = NULL;
226char *dbName;
227char *logfile_prefix = NULL;
228const char *progname;
229
230#define WSEP '@' /* weight separator */
231
232volatile bool timer_exceeded = false; /* flag from signal handler */
233
234/*
235 * Variable definitions.
236 *
237 * If a variable only has a string value, "svalue" is that value, and value is
238 * "not set". If the value is known, "value" contains the value (in any
239 * variant).
240 *
241 * In this case "svalue" contains the string equivalent of the value, if we've
242 * had occasion to compute that, or NULL if we haven't.
243 */
244typedef struct
245{
246 char *name; /* variable's name */
247 char *svalue; /* its value in string form, if known */
248 PgBenchValue value; /* actual variable's value */
249} Variable;
250
251#define MAX_SCRIPTS 128 /* max number of SQL scripts allowed */
252#define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
253
254/*
255 * Simple data structure to keep stats about something.
256 *
257 * XXX probably the first value should be kept and used as an offset for
258 * better numerical stability...
259 */
260typedef struct SimpleStats
261{
262 int64 count; /* how many values were encountered */
263 double min; /* the minimum seen */
264 double max; /* the maximum seen */
265 double sum; /* sum of values */
266 double sum2; /* sum of squared values */
267} SimpleStats;
268
269/*
270 * Data structure to hold various statistics: per-thread and per-script stats
271 * are maintained and merged together.
272 */
273typedef struct StatsData
274{
275 time_t start_time; /* interval start time, for aggregates */
276 int64 cnt; /* number of transactions, including skipped */
277 int64 skipped; /* number of transactions skipped under --rate
278 * and --latency-limit */
279 SimpleStats latency;
280 SimpleStats lag;
281} StatsData;
282
283/*
284 * Struct to keep random state.
285 */
286typedef struct RandomState
287{
288 unsigned short xseed[3];
289} RandomState;
290
291/* Various random sequences are initialized from this one. */
292static RandomState base_random_sequence;
293
294/*
295 * Connection state machine states.
296 */
297typedef enum
298{
299 /*
300 * The client must first choose a script to execute. Once chosen, it can
301 * either be throttled (state CSTATE_PREPARE_THROTTLE under --rate), start
302 * right away (state CSTATE_START_TX) or not start at all if the timer was
303 * exceeded (state CSTATE_FINISHED).
304 */
305 CSTATE_CHOOSE_SCRIPT,
306
307 /*
308 * CSTATE_START_TX performs start-of-transaction processing. Establishes
309 * a new connection for the transaction in --connect mode, records the
310 * transaction start time, and proceed to the first command.
311 *
312 * Note: once a script is started, it will either error or run till its
313 * end, where it may be interrupted. It is not interrupted while running,
314 * so pgbench --time is to be understood as tx are allowed to start in
315 * that time, and will finish when their work is completed.
316 */
317 CSTATE_START_TX,
318
319 /*
320 * In CSTATE_PREPARE_THROTTLE state, we calculate when to begin the next
321 * transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
322 * sleeps until that moment, then advances to CSTATE_START_TX, or
323 * CSTATE_FINISHED if the next transaction would start beyond the end of
324 * the run.
325 */
326 CSTATE_PREPARE_THROTTLE,
327 CSTATE_THROTTLE,
328
329 /*
330 * We loop through these states, to process each command in the script:
331 *
332 * CSTATE_START_COMMAND starts the execution of a command. On a SQL
333 * command, the command is sent to the server, and we move to
334 * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set,
335 * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
336 * meta-commands are executed immediately. If the command about to start
337 * is actually beyond the end of the script, advance to CSTATE_END_TX.
338 *
339 * CSTATE_WAIT_RESULT waits until we get a result set back from the server
340 * for the current command.
341 *
342 * CSTATE_SLEEP waits until the end of \sleep.
343 *
344 * CSTATE_END_COMMAND records the end-of-command timestamp, increments the
345 * command counter, and loops back to CSTATE_START_COMMAND state.
346 *
347 * CSTATE_SKIP_COMMAND is used by conditional branches which are not
348 * executed. It quickly skip commands that do not need any evaluation.
349 * This state can move forward several commands, till there is something
350 * to do or the end of the script.
351 */
352 CSTATE_START_COMMAND,
353 CSTATE_WAIT_RESULT,
354 CSTATE_SLEEP,
355 CSTATE_END_COMMAND,
356 CSTATE_SKIP_COMMAND,
357
358 /*
359 * CSTATE_END_TX performs end-of-transaction processing. It calculates
360 * latency, and logs the transaction. In --connect mode, it closes the
361 * current connection.
362 *
363 * Then either starts over in CSTATE_CHOOSE_SCRIPT, or enters
364 * CSTATE_FINISHED if we have no more work to do.
365 */
366 CSTATE_END_TX,
367
368 /*
369 * Final states. CSTATE_ABORTED means that the script execution was
370 * aborted because a command failed, CSTATE_FINISHED means success.
371 */
372 CSTATE_ABORTED,
373 CSTATE_FINISHED
374} ConnectionStateEnum;
375
376/*
377 * Connection state.
378 */
379typedef struct
380{
381 PGconn *con; /* connection handle to DB */
382 int id; /* client No. */
383 ConnectionStateEnum state; /* state machine's current state. */
384 ConditionalStack cstack; /* enclosing conditionals state */
385
386 /*
387 * Separate randomness for each client. This is used for random functions
388 * PGBENCH_RANDOM_* during the execution of the script.
389 */
390 RandomState cs_func_rs;
391
392 int use_file; /* index in sql_script for this client */
393 int command; /* command number in script */
394
395 /* client variables */
396 Variable *variables; /* array of variable definitions */
397 int nvariables; /* number of variables */
398 bool vars_sorted; /* are variables sorted by name? */
399
400 /* various times about current transaction */
401 int64 txn_scheduled; /* scheduled start time of transaction (usec) */
402 int64 sleep_until; /* scheduled start time of next cmd (usec) */
403 instr_time txn_begin; /* used for measuring schedule lag times */
404 instr_time stmt_begin; /* used for measuring statement latencies */
405
406 bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
407
408 /* per client collected stats */
409 int64 cnt; /* client transaction count, for -t */
410 int ecnt; /* error count */
411} CState;
412
413/*
414 * Thread state
415 */
416typedef struct
417{
418 int tid; /* thread id */
419 pthread_t thread; /* thread handle */
420 CState *state; /* array of CState */
421 int nstate; /* length of state[] */
422
423 /*
424 * Separate randomness for each thread. Each thread option uses its own
425 * random state to make all of them independent of each other and
426 * therefore deterministic at the thread level.
427 */
428 RandomState ts_choose_rs; /* random state for selecting a script */
429 RandomState ts_throttle_rs; /* random state for transaction throttling */
430 RandomState ts_sample_rs; /* random state for log sampling */
431
432 int64 throttle_trigger; /* previous/next throttling (us) */
433 FILE *logfile; /* where to log, or NULL */
434
435 /* per thread collected stats */
436 instr_time start_time; /* thread start time */
437 instr_time conn_time;
438 StatsData stats;
439 int64 latency_late; /* executed but late transactions */
440} TState;
441
442#define INVALID_THREAD ((pthread_t) 0)
443
444/*
445 * queries read from files
446 */
447#define SQL_COMMAND 1
448#define META_COMMAND 2
449
450/*
451 * max number of backslash command arguments or SQL variables,
452 * including the command or SQL statement itself
453 */
454#define MAX_ARGS 256
455
456typedef enum MetaCommand
457{
458 META_NONE, /* not a known meta-command */
459 META_SET, /* \set */
460 META_SETSHELL, /* \setshell */
461 META_SHELL, /* \shell */
462 META_SLEEP, /* \sleep */
463 META_GSET, /* \gset */
464 META_IF, /* \if */
465 META_ELIF, /* \elif */
466 META_ELSE, /* \else */
467 META_ENDIF /* \endif */
468} MetaCommand;
469
470typedef enum QueryMode
471{
472 QUERY_SIMPLE, /* simple query */
473 QUERY_EXTENDED, /* extended query */
474 QUERY_PREPARED, /* extended query with prepared statements */
475 NUM_QUERYMODE
476} QueryMode;
477
478static QueryMode querymode = QUERY_SIMPLE;
479static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
480
481/*
482 * struct Command represents one command in a script.
483 *
484 * lines The raw, possibly multi-line command text. Variable substitution
485 * not applied.
486 * first_line A short, single-line extract of 'lines', for error reporting.
487 * type SQL_COMMAND or META_COMMAND
488 * meta The type of meta-command, or META_NONE if command is SQL
489 * argc Number of arguments of the command, 0 if not yet processed.
490 * argv Command arguments, the first of which is the command or SQL
491 * string itself. For SQL commands, after post-processing
492 * argv[0] is the same as 'lines' with variables substituted.
493 * varprefix SQL commands terminated with \gset have this set
494 * to a non NULL value. If nonempty, it's used to prefix the
495 * variable name that receives the value.
496 * expr Parsed expression, if needed.
497 * stats Time spent in this command.
498 */
499typedef struct Command
500{
501 PQExpBufferData lines;
502 char *first_line;
503 int type;
504 MetaCommand meta;
505 int argc;
506 char *argv[MAX_ARGS];
507 char *varprefix;
508 PgBenchExpr *expr;
509 SimpleStats stats;
510} Command;
511
512typedef struct ParsedScript
513{
514 const char *desc; /* script descriptor (eg, file name) */
515 int weight; /* selection weight */
516 Command **commands; /* NULL-terminated array of Commands */
517 StatsData stats; /* total time spent in script */
518} ParsedScript;
519
520static ParsedScript sql_script[MAX_SCRIPTS]; /* SQL script files */
521static int num_scripts; /* number of scripts in sql_script[] */
522static int64 total_weight = 0;
523
524static int debug = 0; /* debug flag */
525
526/* Builtin test scripts */
527typedef struct BuiltinScript
528{
529 const char *name; /* very short name for -b ... */
530 const char *desc; /* short description */
531 const char *script; /* actual pgbench script */
532} BuiltinScript;
533
534static const BuiltinScript builtin_script[] =
535{
536 {
537 "tpcb-like",
538 "<builtin: TPC-B (sort of)>",
539 "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
540 "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
541 "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
542 "\\set delta random(-5000, 5000)\n"
543 "BEGIN;\n"
544 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
545 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
546 "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
547 "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
548 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
549 "END;\n"
550 },
551 {
552 "simple-update",
553 "<builtin: simple update>",
554 "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
555 "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
556 "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
557 "\\set delta random(-5000, 5000)\n"
558 "BEGIN;\n"
559 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
560 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
561 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
562 "END;\n"
563 },
564 {
565 "select-only",
566 "<builtin: select only>",
567 "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
568 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
569 }
570};
571
572
573/* Function prototypes */
574static void setNullValue(PgBenchValue *pv);
575static void setBoolValue(PgBenchValue *pv, bool bval);
576static void setIntValue(PgBenchValue *pv, int64 ival);
577static void setDoubleValue(PgBenchValue *pv, double dval);
578static bool evaluateExpr(CState *st, PgBenchExpr *expr,
579 PgBenchValue *retval);
580static ConnectionStateEnum executeMetaCommand(CState *st, instr_time *now);
581static void doLog(TState *thread, CState *st,
582 StatsData *agg, bool skipped, double latency, double lag);
583static void processXactStats(TState *thread, CState *st, instr_time *now,
584 bool skipped, StatsData *agg);
585static void addScript(ParsedScript script);
586static void *threadRun(void *arg);
587static void finishCon(CState *st);
588static void setalarm(int seconds);
589static socket_set *alloc_socket_set(int count);
590static void free_socket_set(socket_set *sa);
591static void clear_socket_set(socket_set *sa);
592static void add_socket_to_set(socket_set *sa, int fd, int idx);
593static int wait_on_socket_set(socket_set *sa, int64 usecs);
594static bool socket_has_input(socket_set *sa, int fd, int idx);
595
596
597/* callback functions for our flex lexer */
598static const PsqlScanCallbacks pgbench_callbacks = {
599 NULL, /* don't need get_variable functionality */
600};
601
602
603static void
604usage(void)
605{
606 printf("%s is a benchmarking tool for PostgreSQL.\n\n"
607 "Usage:\n"
608 " %s [OPTION]... [DBNAME]\n"
609 "\nInitialization options:\n"
610 " -i, --initialize invokes initialization mode\n"
611 " -I, --init-steps=[dtgvpf]+ (default \"dtgvp\")\n"
612 " run selected initialization steps\n"
613 " -F, --fillfactor=NUM set fill factor\n"
614 " -n, --no-vacuum do not run VACUUM during initialization\n"
615 " -q, --quiet quiet logging (one message each 5 seconds)\n"
616 " -s, --scale=NUM scaling factor\n"
617 " --foreign-keys create foreign key constraints between tables\n"
618 " --index-tablespace=TABLESPACE\n"
619 " create indexes in the specified tablespace\n"
620 " --tablespace=TABLESPACE create tables in the specified tablespace\n"
621 " --unlogged-tables create tables as unlogged tables\n"
622 "\nOptions to select what to run:\n"
623 " -b, --builtin=NAME[@W] add builtin script NAME weighted at W (default: 1)\n"
624 " (use \"-b list\" to list available scripts)\n"
625 " -f, --file=FILENAME[@W] add script FILENAME weighted at W (default: 1)\n"
626 " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
627 " (same as \"-b simple-update\")\n"
628 " -S, --select-only perform SELECT-only transactions\n"
629 " (same as \"-b select-only\")\n"
630 "\nBenchmarking options:\n"
631 " -c, --client=NUM number of concurrent database clients (default: 1)\n"
632 " -C, --connect establish new connection for each transaction\n"
633 " -D, --define=VARNAME=VALUE\n"
634 " define variable for use by custom script\n"
635 " -j, --jobs=NUM number of threads (default: 1)\n"
636 " -l, --log write transaction times to log file\n"
637 " -L, --latency-limit=NUM count transactions lasting more than NUM ms as late\n"
638 " -M, --protocol=simple|extended|prepared\n"
639 " protocol for submitting queries (default: simple)\n"
640 " -n, --no-vacuum do not run VACUUM before tests\n"
641 " -P, --progress=NUM show thread progress report every NUM seconds\n"
642 " -r, --report-latencies report average latency per command\n"
643 " -R, --rate=NUM target rate in transactions per second\n"
644 " -s, --scale=NUM report this scale factor in output\n"
645 " -t, --transactions=NUM number of transactions each client runs (default: 10)\n"
646 " -T, --time=NUM duration of benchmark test in seconds\n"
647 " -v, --vacuum-all vacuum all four standard tables before tests\n"
648 " --aggregate-interval=NUM aggregate data over NUM seconds\n"
649 " --log-prefix=PREFIX prefix for transaction time log file\n"
650 " (default: \"pgbench_log\")\n"
651 " --progress-timestamp use Unix epoch timestamps for progress\n"
652 " --random-seed=SEED set random seed (\"time\", \"rand\", integer)\n"
653 " --sampling-rate=NUM fraction of transactions to log (e.g., 0.01 for 1%%)\n"
654 "\nCommon options:\n"
655 " -d, --debug print debugging output\n"
656 " -h, --host=HOSTNAME database server host or socket directory\n"
657 " -p, --port=PORT database server port number\n"
658 " -U, --username=USERNAME connect as specified database user\n"
659 " -V, --version output version information, then exit\n"
660 " -?, --help show this help, then exit\n"
661 "\n"
662 "Report bugs to <pgsql-bugs@lists.postgresql.org>.\n",
663 progname, progname);
664}
665
666/* return whether str matches "^\s*[-+]?[0-9]+$" */
667static bool
668is_an_int(const char *str)
669{
670 const char *ptr = str;
671
672 /* skip leading spaces; cast is consistent with strtoint64 */
673 while (*ptr && isspace((unsigned char) *ptr))
674 ptr++;
675
676 /* skip sign */
677 if (*ptr == '+' || *ptr == '-')
678 ptr++;
679
680 /* at least one digit */
681 if (*ptr && !isdigit((unsigned char) *ptr))
682 return false;
683
684 /* eat all digits */
685 while (*ptr && isdigit((unsigned char) *ptr))
686 ptr++;
687
688 /* must have reached end of string */
689 return *ptr == '\0';
690}
691
692
693/*
694 * strtoint64 -- convert a string to 64-bit integer
695 *
696 * This function is a slightly modified version of scanint8() from
697 * src/backend/utils/adt/int8.c.
698 *
699 * The function returns whether the conversion worked, and if so
700 * "*result" is set to the result.
701 *
702 * If not errorOK, an error message is also printed out on errors.
703 */
704bool
705strtoint64(const char *str, bool errorOK, int64 *result)
706{
707 const char *ptr = str;
708 int64 tmp = 0;
709 bool neg = false;
710
711 /*
712 * Do our own scan, rather than relying on sscanf which might be broken
713 * for long long.
714 *
715 * As INT64_MIN can't be stored as a positive 64 bit integer, accumulate
716 * value as a negative number.
717 */
718
719 /* skip leading spaces */
720 while (*ptr && isspace((unsigned char) *ptr))
721 ptr++;
722
723 /* handle sign */
724 if (*ptr == '-')
725 {
726 ptr++;
727 neg = true;
728 }
729 else if (*ptr == '+')
730 ptr++;
731
732 /* require at least one digit */
733 if (unlikely(!isdigit((unsigned char) *ptr)))
734 goto invalid_syntax;
735
736 /* process digits */
737 while (*ptr && isdigit((unsigned char) *ptr))
738 {
739 int8 digit = (*ptr++ - '0');
740
741 if (unlikely(pg_mul_s64_overflow(tmp, 10, &tmp)) ||
742 unlikely(pg_sub_s64_overflow(tmp, digit, &tmp)))
743 goto out_of_range;
744 }
745
746 /* allow trailing whitespace, but not other trailing chars */
747 while (*ptr != '\0' && isspace((unsigned char) *ptr))
748 ptr++;
749
750 if (unlikely(*ptr != '\0'))
751 goto invalid_syntax;
752
753 if (!neg)
754 {
755 if (unlikely(tmp == PG_INT64_MIN))
756 goto out_of_range;
757 tmp = -tmp;
758 }
759
760 *result = tmp;
761 return true;
762
763out_of_range:
764 if (!errorOK)
765 fprintf(stderr,
766 "value \"%s\" is out of range for type bigint\n", str);
767 return false;
768
769invalid_syntax:
770 if (!errorOK)
771 fprintf(stderr,
772 "invalid input syntax for type bigint: \"%s\"\n", str);
773 return false;
774}
775
776/* convert string to double, detecting overflows/underflows */
777bool
778strtodouble(const char *str, bool errorOK, double *dv)
779{
780 char *end;
781
782 errno = 0;
783 *dv = strtod(str, &end);
784
785 if (unlikely(errno != 0))
786 {
787 if (!errorOK)
788 fprintf(stderr,
789 "value \"%s\" is out of range for type double\n", str);
790 return false;
791 }
792
793 if (unlikely(end == str || *end != '\0'))
794 {
795 if (!errorOK)
796 fprintf(stderr,
797 "invalid input syntax for type double: \"%s\"\n", str);
798 return false;
799 }
800 return true;
801}
802
803/*
804 * Initialize a random state struct.
805 *
806 * We derive the seed from base_random_sequence, which must be set up already.
807 */
808static void
809initRandomState(RandomState *random_state)
810{
811 random_state->xseed[0] = (unsigned short)
812 (pg_jrand48(base_random_sequence.xseed) & 0xFFFF);
813 random_state->xseed[1] = (unsigned short)
814 (pg_jrand48(base_random_sequence.xseed) & 0xFFFF);
815 random_state->xseed[2] = (unsigned short)
816 (pg_jrand48(base_random_sequence.xseed) & 0xFFFF);
817}
818
819/*
820 * Random number generator: uniform distribution from min to max inclusive.
821 *
822 * Although the limits are expressed as int64, you can't generate the full
823 * int64 range in one call, because the difference of the limits mustn't
824 * overflow int64. In practice it's unwise to ask for more than an int32
825 * range, because of the limited precision of pg_erand48().
826 */
827static int64
828getrand(RandomState *random_state, int64 min, int64 max)
829{
830 /*
831 * Odd coding is so that min and max have approximately the same chance of
832 * being selected as do numbers between them.
833 *
834 * pg_erand48() is thread-safe and concurrent, which is why we use it
835 * rather than random(), which in glibc is non-reentrant, and therefore
836 * protected by a mutex, and therefore a bottleneck on machines with many
837 * CPUs.
838 */
839 return min + (int64) ((max - min + 1) * pg_erand48(random_state->xseed));
840}
841
842/*
843 * random number generator: exponential distribution from min to max inclusive.
844 * the parameter is so that the density of probability for the last cut-off max
845 * value is exp(-parameter).
846 */
847static int64
848getExponentialRand(RandomState *random_state, int64 min, int64 max,
849 double parameter)
850{
851 double cut,
852 uniform,
853 rand;
854
855 /* abort if wrong parameter, but must really be checked beforehand */
856 Assert(parameter > 0.0);
857 cut = exp(-parameter);
858 /* erand in [0, 1), uniform in (0, 1] */
859 uniform = 1.0 - pg_erand48(random_state->xseed);
860
861 /*
862 * inner expression in (cut, 1] (if parameter > 0), rand in [0, 1)
863 */
864 Assert((1.0 - cut) != 0.0);
865 rand = -log(cut + (1.0 - cut) * uniform) / parameter;
866 /* return int64 random number within between min and max */
867 return min + (int64) ((max - min + 1) * rand);
868}
869
870/* random number generator: gaussian distribution from min to max inclusive */
871static int64
872getGaussianRand(RandomState *random_state, int64 min, int64 max,
873 double parameter)
874{
875 double stdev;
876 double rand;
877
878 /* abort if parameter is too low, but must really be checked beforehand */
879 Assert(parameter >= MIN_GAUSSIAN_PARAM);
880
881 /*
882 * Get user specified random number from this loop, with -parameter <
883 * stdev <= parameter
884 *
885 * This loop is executed until the number is in the expected range.
886 *
887 * As the minimum parameter is 2.0, the probability of looping is low:
888 * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the
889 * average sinus multiplier as 2/pi, we have a 8.6% looping probability in
890 * the worst case. For a parameter value of 5.0, the looping probability
891 * is about e^{-5} * 2 / pi ~ 0.43%.
892 */
893 do
894 {
895 /*
896 * pg_erand48 generates [0,1), but for the basic version of the
897 * Box-Muller transform the two uniformly distributed random numbers
898 * are expected in (0, 1] (see
899 * https://en.wikipedia.org/wiki/Box-Muller_transform)
900 */
901 double rand1 = 1.0 - pg_erand48(random_state->xseed);
902 double rand2 = 1.0 - pg_erand48(random_state->xseed);
903
904 /* Box-Muller basic form transform */
905 double var_sqrt = sqrt(-2.0 * log(rand1));
906
907 stdev = var_sqrt * sin(2.0 * M_PI * rand2);
908
909 /*
910 * we may try with cos, but there may be a bias induced if the
911 * previous value fails the test. To be on the safe side, let us try
912 * over.
913 */
914 }
915 while (stdev < -parameter || stdev >= parameter);
916
917 /* stdev is in [-parameter, parameter), normalization to [0,1) */
918 rand = (stdev + parameter) / (parameter * 2.0);
919
920 /* return int64 random number within between min and max */
921 return min + (int64) ((max - min + 1) * rand);
922}
923
924/*
925 * random number generator: generate a value, such that the series of values
926 * will approximate a Poisson distribution centered on the given value.
927 *
928 * Individual results are rounded to integers, though the center value need
929 * not be one.
930 */
931static int64
932getPoissonRand(RandomState *random_state, double center)
933{
934 /*
935 * Use inverse transform sampling to generate a value > 0, such that the
936 * expected (i.e. average) value is the given argument.
937 */
938 double uniform;
939
940 /* erand in [0, 1), uniform in (0, 1] */
941 uniform = 1.0 - pg_erand48(random_state->xseed);
942
943 return (int64) (-log(uniform) * center + 0.5);
944}
945
946/*
947 * Computing zipfian using rejection method, based on
948 * "Non-Uniform Random Variate Generation",
949 * Luc Devroye, p. 550-551, Springer 1986.
950 *
951 * This works for s > 1.0, but may perform badly for s very close to 1.0.
952 */
953static int64
954computeIterativeZipfian(RandomState *random_state, int64 n, double s)
955{
956 double b = pow(2.0, s - 1.0);
957 double x,
958 t,
959 u,
960 v;
961
962 /* Ensure n is sane */
963 if (n <= 1)
964 return 1;
965
966 while (true)
967 {
968 /* random variates */
969 u = pg_erand48(random_state->xseed);
970 v = pg_erand48(random_state->xseed);
971
972 x = floor(pow(u, -1.0 / (s - 1.0)));
973
974 t = pow(1.0 + 1.0 / x, s - 1.0);
975 /* reject if too large or out of bound */
976 if (v * x * (t - 1.0) / (b - 1.0) <= t / b && x <= n)
977 break;
978 }
979 return (int64) x;
980}
981
982/* random number generator: zipfian distribution from min to max inclusive */
983static int64
984getZipfianRand(RandomState *random_state, int64 min, int64 max, double s)
985{
986 int64 n = max - min + 1;
987
988 /* abort if parameter is invalid */
989 Assert(MIN_ZIPFIAN_PARAM <= s && s <= MAX_ZIPFIAN_PARAM);
990
991 return min - 1 + computeIterativeZipfian(random_state, n, s);
992}
993
994/*
995 * FNV-1a hash function
996 */
997static int64
998getHashFnv1a(int64 val, uint64 seed)
999{
1000 int64 result;
1001 int i;
1002
1003 result = FNV_OFFSET_BASIS ^ seed;
1004 for (i = 0; i < 8; ++i)
1005 {
1006 int32 octet = val & 0xff;
1007
1008 val = val >> 8;
1009 result = result ^ octet;
1010 result = result * FNV_PRIME;
1011 }
1012
1013 return result;
1014}
1015
1016/*
1017 * Murmur2 hash function
1018 *
1019 * Based on original work of Austin Appleby
1020 * https://github.com/aappleby/smhasher/blob/master/src/MurmurHash2.cpp
1021 */
1022static int64
1023getHashMurmur2(int64 val, uint64 seed)
1024{
1025 uint64 result = seed ^ MM2_MUL_TIMES_8; /* sizeof(int64) */
1026 uint64 k = (uint64) val;
1027
1028 k *= MM2_MUL;
1029 k ^= k >> MM2_ROT;
1030 k *= MM2_MUL;
1031
1032 result ^= k;
1033 result *= MM2_MUL;
1034
1035 result ^= result >> MM2_ROT;
1036 result *= MM2_MUL;
1037 result ^= result >> MM2_ROT;
1038
1039 return (int64) result;
1040}
1041
1042/*
1043 * Initialize the given SimpleStats struct to all zeroes
1044 */
1045static void
1046initSimpleStats(SimpleStats *ss)
1047{
1048 memset(ss, 0, sizeof(SimpleStats));
1049}
1050
1051/*
1052 * Accumulate one value into a SimpleStats struct.
1053 */
1054static void
1055addToSimpleStats(SimpleStats *ss, double val)
1056{
1057 if (ss->count == 0 || val < ss->min)
1058 ss->min = val;
1059 if (ss->count == 0 || val > ss->max)
1060 ss->max = val;
1061 ss->count++;
1062 ss->sum += val;
1063 ss->sum2 += val * val;
1064}
1065
1066/*
1067 * Merge two SimpleStats objects
1068 */
1069static void
1070mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
1071{
1072 if (acc->count == 0 || ss->min < acc->min)
1073 acc->min = ss->min;
1074 if (acc->count == 0 || ss->max > acc->max)
1075 acc->max = ss->max;
1076 acc->count += ss->count;
1077 acc->sum += ss->sum;
1078 acc->sum2 += ss->sum2;
1079}
1080
1081/*
1082 * Initialize a StatsData struct to mostly zeroes, with its start time set to
1083 * the given value.
1084 */
1085static void
1086initStats(StatsData *sd, time_t start_time)
1087{
1088 sd->start_time = start_time;
1089 sd->cnt = 0;
1090 sd->skipped = 0;
1091 initSimpleStats(&sd->latency);
1092 initSimpleStats(&sd->lag);
1093}
1094
1095/*
1096 * Accumulate one additional item into the given stats object.
1097 */
1098static void
1099accumStats(StatsData *stats, bool skipped, double lat, double lag)
1100{
1101 stats->cnt++;
1102
1103 if (skipped)
1104 {
1105 /* no latency to record on skipped transactions */
1106 stats->skipped++;
1107 }
1108 else
1109 {
1110 addToSimpleStats(&stats->latency, lat);
1111
1112 /* and possibly the same for schedule lag */
1113 if (throttle_delay)
1114 addToSimpleStats(&stats->lag, lag);
1115 }
1116}
1117
1118/* call PQexec() and exit() on failure */
1119static void
1120executeStatement(PGconn *con, const char *sql)
1121{
1122 PGresult *res;
1123
1124 res = PQexec(con, sql);
1125 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1126 {
1127 fprintf(stderr, "%s", PQerrorMessage(con));
1128 exit(1);
1129 }
1130 PQclear(res);
1131}
1132
1133/* call PQexec() and complain, but without exiting, on failure */
1134static void
1135tryExecuteStatement(PGconn *con, const char *sql)
1136{
1137 PGresult *res;
1138
1139 res = PQexec(con, sql);
1140 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1141 {
1142 fprintf(stderr, "%s", PQerrorMessage(con));
1143 fprintf(stderr, "(ignoring this error and continuing anyway)\n");
1144 }
1145 PQclear(res);
1146}
1147
1148/* set up a connection to the backend */
1149static PGconn *
1150doConnect(void)
1151{
1152 PGconn *conn;
1153 bool new_pass;
1154 static bool have_password = false;
1155 static char password[100];
1156
1157 /*
1158 * Start the connection. Loop until we have a password if requested by
1159 * backend.
1160 */
1161 do
1162 {
1163#define PARAMS_ARRAY_SIZE 7
1164
1165 const char *keywords[PARAMS_ARRAY_SIZE];
1166 const char *values[PARAMS_ARRAY_SIZE];
1167
1168 keywords[0] = "host";
1169 values[0] = pghost;
1170 keywords[1] = "port";
1171 values[1] = pgport;
1172 keywords[2] = "user";
1173 values[2] = login;
1174 keywords[3] = "password";
1175 values[3] = have_password ? password : NULL;
1176 keywords[4] = "dbname";
1177 values[4] = dbName;
1178 keywords[5] = "fallback_application_name";
1179 values[5] = progname;
1180 keywords[6] = NULL;
1181 values[6] = NULL;
1182
1183 new_pass = false;
1184
1185 conn = PQconnectdbParams(keywords, values, true);
1186
1187 if (!conn)
1188 {
1189 fprintf(stderr, "connection to database \"%s\" failed\n",
1190 dbName);
1191 return NULL;
1192 }
1193
1194 if (PQstatus(conn) == CONNECTION_BAD &&
1195 PQconnectionNeedsPassword(conn) &&
1196 !have_password)
1197 {
1198 PQfinish(conn);
1199 simple_prompt("Password: ", password, sizeof(password), false);
1200 have_password = true;
1201 new_pass = true;
1202 }
1203 } while (new_pass);
1204
1205 /* check to see that the backend connection was successfully made */
1206 if (PQstatus(conn) == CONNECTION_BAD)
1207 {
1208 fprintf(stderr, "connection to database \"%s\" failed:\n%s",
1209 dbName, PQerrorMessage(conn));
1210 PQfinish(conn);
1211 return NULL;
1212 }
1213
1214 return conn;
1215}
1216
1217/* qsort comparator for Variable array */
1218static int
1219compareVariableNames(const void *v1, const void *v2)
1220{
1221 return strcmp(((const Variable *) v1)->name,
1222 ((const Variable *) v2)->name);
1223}
1224
1225/* Locate a variable by name; returns NULL if unknown */
1226static Variable *
1227lookupVariable(CState *st, char *name)
1228{
1229 Variable key;
1230
1231 /* On some versions of Solaris, bsearch of zero items dumps core */
1232 if (st->nvariables <= 0)
1233 return NULL;
1234
1235 /* Sort if we have to */
1236 if (!st->vars_sorted)
1237 {
1238 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
1239 compareVariableNames);
1240 st->vars_sorted = true;
1241 }
1242
1243 /* Now we can search */
1244 key.name = name;
1245 return (Variable *) bsearch((void *) &key,
1246 (void *) st->variables,
1247 st->nvariables,
1248 sizeof(Variable),
1249 compareVariableNames);
1250}
1251
1252/* Get the value of a variable, in string form; returns NULL if unknown */
1253static char *
1254getVariable(CState *st, char *name)
1255{
1256 Variable *var;
1257 char stringform[64];
1258
1259 var = lookupVariable(st, name);
1260 if (var == NULL)
1261 return NULL; /* not found */
1262
1263 if (var->svalue)
1264 return var->svalue; /* we have it in string form */
1265
1266 /* We need to produce a string equivalent of the value */
1267 Assert(var->value.type != PGBT_NO_VALUE);
1268 if (var->value.type == PGBT_NULL)
1269 snprintf(stringform, sizeof(stringform), "NULL");
1270 else if (var->value.type == PGBT_BOOLEAN)
1271 snprintf(stringform, sizeof(stringform),
1272 "%s", var->value.u.bval ? "true" : "false");
1273 else if (var->value.type == PGBT_INT)
1274 snprintf(stringform, sizeof(stringform),
1275 INT64_FORMAT, var->value.u.ival);
1276 else if (var->value.type == PGBT_DOUBLE)
1277 snprintf(stringform, sizeof(stringform),
1278 "%.*g", DBL_DIG, var->value.u.dval);
1279 else /* internal error, unexpected type */
1280 Assert(0);
1281 var->svalue = pg_strdup(stringform);
1282 return var->svalue;
1283}
1284
1285/* Try to convert variable to a value; return false on failure */
1286static bool
1287makeVariableValue(Variable *var)
1288{
1289 size_t slen;
1290
1291 if (var->value.type != PGBT_NO_VALUE)
1292 return true; /* no work */
1293
1294 slen = strlen(var->svalue);
1295
1296 if (slen == 0)
1297 /* what should it do on ""? */
1298 return false;
1299
1300 if (pg_strcasecmp(var->svalue, "null") == 0)
1301 {
1302 setNullValue(&var->value);
1303 }
1304
1305 /*
1306 * accept prefixes such as y, ye, n, no... but not for "o". 0/1 are
1307 * recognized later as an int, which is converted to bool if needed.
1308 */
1309 else if (pg_strncasecmp(var->svalue, "true", slen) == 0 ||
1310 pg_strncasecmp(var->svalue, "yes", slen) == 0 ||
1311 pg_strcasecmp(var->svalue, "on") == 0)
1312 {
1313 setBoolValue(&var->value, true);
1314 }
1315 else if (pg_strncasecmp(var->svalue, "false", slen) == 0 ||
1316 pg_strncasecmp(var->svalue, "no", slen) == 0 ||
1317 pg_strcasecmp(var->svalue, "off") == 0 ||
1318 pg_strcasecmp(var->svalue, "of") == 0)
1319 {
1320 setBoolValue(&var->value, false);
1321 }
1322 else if (is_an_int(var->svalue))
1323 {
1324 /* if it looks like an int, it must be an int without overflow */
1325 int64 iv;
1326
1327 if (!strtoint64(var->svalue, false, &iv))
1328 return false;
1329
1330 setIntValue(&var->value, iv);
1331 }
1332 else /* type should be double */
1333 {
1334 double dv;
1335
1336 if (!strtodouble(var->svalue, true, &dv))
1337 {
1338 fprintf(stderr,
1339 "malformed variable \"%s\" value: \"%s\"\n",
1340 var->name, var->svalue);
1341 return false;
1342 }
1343 setDoubleValue(&var->value, dv);
1344 }
1345 return true;
1346}
1347
1348/*
1349 * Check whether a variable's name is allowed.
1350 *
1351 * We allow any non-ASCII character, as well as ASCII letters, digits, and
1352 * underscore.
1353 *
1354 * Keep this in sync with the definitions of variable name characters in
1355 * "src/fe_utils/psqlscan.l", "src/bin/psql/psqlscanslash.l" and
1356 * "src/bin/pgbench/exprscan.l". Also see parseVariable(), below.
1357 *
1358 * Note: this static function is copied from "src/bin/psql/variables.c"
1359 */
1360static bool
1361valid_variable_name(const char *name)
1362{
1363 const unsigned char *ptr = (const unsigned char *) name;
1364
1365 /* Mustn't be zero-length */
1366 if (*ptr == '\0')
1367 return false;
1368
1369 while (*ptr)
1370 {
1371 if (IS_HIGHBIT_SET(*ptr) ||
1372 strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1373 "_0123456789", *ptr) != NULL)
1374 ptr++;
1375 else
1376 return false;
1377 }
1378
1379 return true;
1380}
1381
1382/*
1383 * Lookup a variable by name, creating it if need be.
1384 * Caller is expected to assign a value to the variable.
1385 * Returns NULL on failure (bad name).
1386 */
1387static Variable *
1388lookupCreateVariable(CState *st, const char *context, char *name)
1389{
1390 Variable *var;
1391
1392 var = lookupVariable(st, name);
1393 if (var == NULL)
1394 {
1395 Variable *newvars;
1396
1397 /*
1398 * Check for the name only when declaring a new variable to avoid
1399 * overhead.
1400 */
1401 if (!valid_variable_name(name))
1402 {
1403 fprintf(stderr, "%s: invalid variable name: \"%s\"\n",
1404 context, name);
1405 return NULL;
1406 }
1407
1408 /* Create variable at the end of the array */
1409 if (st->variables)
1410 newvars = (Variable *) pg_realloc(st->variables,
1411 (st->nvariables + 1) * sizeof(Variable));
1412 else
1413 newvars = (Variable *) pg_malloc(sizeof(Variable));
1414
1415 st->variables = newvars;
1416
1417 var = &newvars[st->nvariables];
1418
1419 var->name = pg_strdup(name);
1420 var->svalue = NULL;
1421 /* caller is expected to initialize remaining fields */
1422
1423 st->nvariables++;
1424 /* we don't re-sort the array till we have to */
1425 st->vars_sorted = false;
1426 }
1427
1428 return var;
1429}
1430
1431/* Assign a string value to a variable, creating it if need be */
1432/* Returns false on failure (bad name) */
1433static bool
1434putVariable(CState *st, const char *context, char *name, const char *value)
1435{
1436 Variable *var;
1437 char *val;
1438
1439 var = lookupCreateVariable(st, context, name);
1440 if (!var)
1441 return false;
1442
1443 /* dup then free, in case value is pointing at this variable */
1444 val = pg_strdup(value);
1445
1446 if (var->svalue)
1447 free(var->svalue);
1448 var->svalue = val;
1449 var->value.type = PGBT_NO_VALUE;
1450
1451 return true;
1452}
1453
1454/* Assign a value to a variable, creating it if need be */
1455/* Returns false on failure (bad name) */
1456static bool
1457putVariableValue(CState *st, const char *context, char *name,
1458 const PgBenchValue *value)
1459{
1460 Variable *var;
1461
1462 var = lookupCreateVariable(st, context, name);
1463 if (!var)
1464 return false;
1465
1466 if (var->svalue)
1467 free(var->svalue);
1468 var->svalue = NULL;
1469 var->value = *value;
1470
1471 return true;
1472}
1473
1474/* Assign an integer value to a variable, creating it if need be */
1475/* Returns false on failure (bad name) */
1476static bool
1477putVariableInt(CState *st, const char *context, char *name, int64 value)
1478{
1479 PgBenchValue val;
1480
1481 setIntValue(&val, value);
1482 return putVariableValue(st, context, name, &val);
1483}
1484
1485/*
1486 * Parse a possible variable reference (:varname).
1487 *
1488 * "sql" points at a colon. If what follows it looks like a valid
1489 * variable name, return a malloc'd string containing the variable name,
1490 * and set *eaten to the number of characters consumed.
1491 * Otherwise, return NULL.
1492 */
1493static char *
1494parseVariable(const char *sql, int *eaten)
1495{
1496 int i = 0;
1497 char *name;
1498
1499 do
1500 {
1501 i++;
1502 } while (IS_HIGHBIT_SET(sql[i]) ||
1503 strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1504 "_0123456789", sql[i]) != NULL);
1505 if (i == 1)
1506 return NULL; /* no valid variable name chars */
1507
1508 name = pg_malloc(i);
1509 memcpy(name, &sql[1], i - 1);
1510 name[i - 1] = '\0';
1511
1512 *eaten = i;
1513 return name;
1514}
1515
1516static char *
1517replaceVariable(char **sql, char *param, int len, char *value)
1518{
1519 int valueln = strlen(value);
1520
1521 if (valueln > len)
1522 {
1523 size_t offset = param - *sql;
1524
1525 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
1526 param = *sql + offset;
1527 }
1528
1529 if (valueln != len)
1530 memmove(param + valueln, param + len, strlen(param + len) + 1);
1531 memcpy(param, value, valueln);
1532
1533 return param + valueln;
1534}
1535
1536static char *
1537assignVariables(CState *st, char *sql)
1538{
1539 char *p,
1540 *name,
1541 *val;
1542
1543 p = sql;
1544 while ((p = strchr(p, ':')) != NULL)
1545 {
1546 int eaten;
1547
1548 name = parseVariable(p, &eaten);
1549 if (name == NULL)
1550 {
1551 while (*p == ':')
1552 {
1553 p++;
1554 }
1555 continue;
1556 }
1557
1558 val = getVariable(st, name);
1559 free(name);
1560 if (val == NULL)
1561 {
1562 p++;
1563 continue;
1564 }
1565
1566 p = replaceVariable(&sql, p, eaten, val);
1567 }
1568
1569 return sql;
1570}
1571
1572static void
1573getQueryParams(CState *st, const Command *command, const char **params)
1574{
1575 int i;
1576
1577 for (i = 0; i < command->argc - 1; i++)
1578 params[i] = getVariable(st, command->argv[i + 1]);
1579}
1580
1581static char *
1582valueTypeName(PgBenchValue *pval)
1583{
1584 if (pval->type == PGBT_NO_VALUE)
1585 return "none";
1586 else if (pval->type == PGBT_NULL)
1587 return "null";
1588 else if (pval->type == PGBT_INT)
1589 return "int";
1590 else if (pval->type == PGBT_DOUBLE)
1591 return "double";
1592 else if (pval->type == PGBT_BOOLEAN)
1593 return "boolean";
1594 else
1595 {
1596 /* internal error, should never get there */
1597 Assert(false);
1598 return NULL;
1599 }
1600}
1601
1602/* get a value as a boolean, or tell if there is a problem */
1603static bool
1604coerceToBool(PgBenchValue *pval, bool *bval)
1605{
1606 if (pval->type == PGBT_BOOLEAN)
1607 {
1608 *bval = pval->u.bval;
1609 return true;
1610 }
1611 else /* NULL, INT or DOUBLE */
1612 {
1613 fprintf(stderr, "cannot coerce %s to boolean\n", valueTypeName(pval));
1614 *bval = false; /* suppress uninitialized-variable warnings */
1615 return false;
1616 }
1617}
1618
1619/*
1620 * Return true or false from an expression for conditional purposes.
1621 * Non zero numerical values are true, zero and NULL are false.
1622 */
1623static bool
1624valueTruth(PgBenchValue *pval)
1625{
1626 switch (pval->type)
1627 {
1628 case PGBT_NULL:
1629 return false;
1630 case PGBT_BOOLEAN:
1631 return pval->u.bval;
1632 case PGBT_INT:
1633 return pval->u.ival != 0;
1634 case PGBT_DOUBLE:
1635 return pval->u.dval != 0.0;
1636 default:
1637 /* internal error, unexpected type */
1638 Assert(0);
1639 return false;
1640 }
1641}
1642
1643/* get a value as an int, tell if there is a problem */
1644static bool
1645coerceToInt(PgBenchValue *pval, int64 *ival)
1646{
1647 if (pval->type == PGBT_INT)
1648 {
1649 *ival = pval->u.ival;
1650 return true;
1651 }
1652 else if (pval->type == PGBT_DOUBLE)
1653 {
1654 double dval = pval->u.dval;
1655
1656 if (dval < PG_INT64_MIN || PG_INT64_MAX < dval)
1657 {
1658 fprintf(stderr, "double to int overflow for %f\n", dval);
1659 return false;
1660 }
1661 *ival = (int64) dval;
1662 return true;
1663 }
1664 else /* BOOLEAN or NULL */
1665 {
1666 fprintf(stderr, "cannot coerce %s to int\n", valueTypeName(pval));
1667 return false;
1668 }
1669}
1670
1671/* get a value as a double, or tell if there is a problem */
1672static bool
1673coerceToDouble(PgBenchValue *pval, double *dval)
1674{
1675 if (pval->type == PGBT_DOUBLE)
1676 {
1677 *dval = pval->u.dval;
1678 return true;
1679 }
1680 else if (pval->type == PGBT_INT)
1681 {
1682 *dval = (double) pval->u.ival;
1683 return true;
1684 }
1685 else /* BOOLEAN or NULL */
1686 {
1687 fprintf(stderr, "cannot coerce %s to double\n", valueTypeName(pval));
1688 return false;
1689 }
1690}
1691
1692/* assign a null value */
1693static void
1694setNullValue(PgBenchValue *pv)
1695{
1696 pv->type = PGBT_NULL;
1697 pv->u.ival = 0;
1698}
1699
1700/* assign a boolean value */
1701static void
1702setBoolValue(PgBenchValue *pv, bool bval)
1703{
1704 pv->type = PGBT_BOOLEAN;
1705 pv->u.bval = bval;
1706}
1707
1708/* assign an integer value */
1709static void
1710setIntValue(PgBenchValue *pv, int64 ival)
1711{
1712 pv->type = PGBT_INT;
1713 pv->u.ival = ival;
1714}
1715
1716/* assign a double value */
1717static void
1718setDoubleValue(PgBenchValue *pv, double dval)
1719{
1720 pv->type = PGBT_DOUBLE;
1721 pv->u.dval = dval;
1722}
1723
1724static bool
1725isLazyFunc(PgBenchFunction func)
1726{
1727 return func == PGBENCH_AND || func == PGBENCH_OR || func == PGBENCH_CASE;
1728}
1729
1730/* lazy evaluation of some functions */
1731static bool
1732evalLazyFunc(CState *st,
1733 PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval)
1734{
1735 PgBenchValue a1,
1736 a2;
1737 bool ba1,
1738 ba2;
1739
1740 Assert(isLazyFunc(func) && args != NULL && args->next != NULL);
1741
1742 /* args points to first condition */
1743 if (!evaluateExpr(st, args->expr, &a1))
1744 return false;
1745
1746 /* second condition for AND/OR and corresponding branch for CASE */
1747 args = args->next;
1748
1749 switch (func)
1750 {
1751 case PGBENCH_AND:
1752 if (a1.type == PGBT_NULL)
1753 {
1754 setNullValue(retval);
1755 return true;
1756 }
1757
1758 if (!coerceToBool(&a1, &ba1))
1759 return false;
1760
1761 if (!ba1)
1762 {
1763 setBoolValue(retval, false);
1764 return true;
1765 }
1766
1767 if (!evaluateExpr(st, args->expr, &a2))
1768 return false;
1769
1770 if (a2.type == PGBT_NULL)
1771 {
1772 setNullValue(retval);
1773 return true;
1774 }
1775 else if (!coerceToBool(&a2, &ba2))
1776 return false;
1777 else
1778 {
1779 setBoolValue(retval, ba2);
1780 return true;
1781 }
1782
1783 return true;
1784
1785 case PGBENCH_OR:
1786
1787 if (a1.type == PGBT_NULL)
1788 {
1789 setNullValue(retval);
1790 return true;
1791 }
1792
1793 if (!coerceToBool(&a1, &ba1))
1794 return false;
1795
1796 if (ba1)
1797 {
1798 setBoolValue(retval, true);
1799 return true;
1800 }
1801
1802 if (!evaluateExpr(st, args->expr, &a2))
1803 return false;
1804
1805 if (a2.type == PGBT_NULL)
1806 {
1807 setNullValue(retval);
1808 return true;
1809 }
1810 else if (!coerceToBool(&a2, &ba2))
1811 return false;
1812 else
1813 {
1814 setBoolValue(retval, ba2);
1815 return true;
1816 }
1817
1818 case PGBENCH_CASE:
1819 /* when true, execute branch */
1820 if (valueTruth(&a1))
1821 return evaluateExpr(st, args->expr, retval);
1822
1823 /* now args contains next condition or final else expression */
1824 args = args->next;
1825
1826 /* final else case? */
1827 if (args->next == NULL)
1828 return evaluateExpr(st, args->expr, retval);
1829
1830 /* no, another when, proceed */
1831 return evalLazyFunc(st, PGBENCH_CASE, args, retval);
1832
1833 default:
1834 /* internal error, cannot get here */
1835 Assert(0);
1836 break;
1837 }
1838 return false;
1839}
1840
1841/* maximum number of function arguments */
1842#define MAX_FARGS 16
1843
1844/*
1845 * Recursive evaluation of standard functions,
1846 * which do not require lazy evaluation.
1847 */
1848static bool
1849evalStandardFunc(CState *st,
1850 PgBenchFunction func, PgBenchExprLink *args,
1851 PgBenchValue *retval)
1852{
1853 /* evaluate all function arguments */
1854 int nargs = 0;
1855 PgBenchValue vargs[MAX_FARGS];
1856 PgBenchExprLink *l = args;
1857 bool has_null = false;
1858
1859 for (nargs = 0; nargs < MAX_FARGS && l != NULL; nargs++, l = l->next)
1860 {
1861 if (!evaluateExpr(st, l->expr, &vargs[nargs]))
1862 return false;
1863 has_null |= vargs[nargs].type == PGBT_NULL;
1864 }
1865
1866 if (l != NULL)
1867 {
1868 fprintf(stderr,
1869 "too many function arguments, maximum is %d\n", MAX_FARGS);
1870 return false;
1871 }
1872
1873 /* NULL arguments */
1874 if (has_null && func != PGBENCH_IS && func != PGBENCH_DEBUG)
1875 {
1876 setNullValue(retval);
1877 return true;
1878 }
1879
1880 /* then evaluate function */
1881 switch (func)
1882 {
1883 /* overloaded operators */
1884 case PGBENCH_ADD:
1885 case PGBENCH_SUB:
1886 case PGBENCH_MUL:
1887 case PGBENCH_DIV:
1888 case PGBENCH_MOD:
1889 case PGBENCH_EQ:
1890 case PGBENCH_NE:
1891 case PGBENCH_LE:
1892 case PGBENCH_LT:
1893 {
1894 PgBenchValue *lval = &vargs[0],
1895 *rval = &vargs[1];
1896
1897 Assert(nargs == 2);
1898
1899 /* overloaded type management, double if some double */
1900 if ((lval->type == PGBT_DOUBLE ||
1901 rval->type == PGBT_DOUBLE) && func != PGBENCH_MOD)
1902 {
1903 double ld,
1904 rd;
1905
1906 if (!coerceToDouble(lval, &ld) ||
1907 !coerceToDouble(rval, &rd))
1908 return false;
1909
1910 switch (func)
1911 {
1912 case PGBENCH_ADD:
1913 setDoubleValue(retval, ld + rd);
1914 return true;
1915
1916 case PGBENCH_SUB:
1917 setDoubleValue(retval, ld - rd);
1918 return true;
1919
1920 case PGBENCH_MUL:
1921 setDoubleValue(retval, ld * rd);
1922 return true;
1923
1924 case PGBENCH_DIV:
1925 setDoubleValue(retval, ld / rd);
1926 return true;
1927
1928 case PGBENCH_EQ:
1929 setBoolValue(retval, ld == rd);
1930 return true;
1931
1932 case PGBENCH_NE:
1933 setBoolValue(retval, ld != rd);
1934 return true;
1935
1936 case PGBENCH_LE:
1937 setBoolValue(retval, ld <= rd);
1938 return true;
1939
1940 case PGBENCH_LT:
1941 setBoolValue(retval, ld < rd);
1942 return true;
1943
1944 default:
1945 /* cannot get here */
1946 Assert(0);
1947 }
1948 }
1949 else /* we have integer operands, or % */
1950 {
1951 int64 li,
1952 ri,
1953 res;
1954
1955 if (!coerceToInt(lval, &li) ||
1956 !coerceToInt(rval, &ri))
1957 return false;
1958
1959 switch (func)
1960 {
1961 case PGBENCH_ADD:
1962 if (pg_add_s64_overflow(li, ri, &res))
1963 {
1964 fprintf(stderr, "bigint add out of range\n");
1965 return false;
1966 }
1967 setIntValue(retval, res);
1968 return true;
1969
1970 case PGBENCH_SUB:
1971 if (pg_sub_s64_overflow(li, ri, &res))
1972 {
1973 fprintf(stderr, "bigint sub out of range\n");
1974 return false;
1975 }
1976 setIntValue(retval, res);
1977 return true;
1978
1979 case PGBENCH_MUL:
1980 if (pg_mul_s64_overflow(li, ri, &res))
1981 {
1982 fprintf(stderr, "bigint mul out of range\n");
1983 return false;
1984 }
1985 setIntValue(retval, res);
1986 return true;
1987
1988 case PGBENCH_EQ:
1989 setBoolValue(retval, li == ri);
1990 return true;
1991
1992 case PGBENCH_NE:
1993 setBoolValue(retval, li != ri);
1994 return true;
1995
1996 case PGBENCH_LE:
1997 setBoolValue(retval, li <= ri);
1998 return true;
1999
2000 case PGBENCH_LT:
2001 setBoolValue(retval, li < ri);
2002 return true;
2003
2004 case PGBENCH_DIV:
2005 case PGBENCH_MOD:
2006 if (ri == 0)
2007 {
2008 fprintf(stderr, "division by zero\n");
2009 return false;
2010 }
2011 /* special handling of -1 divisor */
2012 if (ri == -1)
2013 {
2014 if (func == PGBENCH_DIV)
2015 {
2016 /* overflow check (needed for INT64_MIN) */
2017 if (li == PG_INT64_MIN)
2018 {
2019 fprintf(stderr, "bigint div out of range\n");
2020 return false;
2021 }
2022 else
2023 setIntValue(retval, -li);
2024 }
2025 else
2026 setIntValue(retval, 0);
2027 return true;
2028 }
2029 /* else divisor is not -1 */
2030 if (func == PGBENCH_DIV)
2031 setIntValue(retval, li / ri);
2032 else /* func == PGBENCH_MOD */
2033 setIntValue(retval, li % ri);
2034
2035 return true;
2036
2037 default:
2038 /* cannot get here */
2039 Assert(0);
2040 }
2041 }
2042
2043 Assert(0);
2044 return false; /* NOTREACHED */
2045 }
2046
2047 /* integer bitwise operators */
2048 case PGBENCH_BITAND:
2049 case PGBENCH_BITOR:
2050 case PGBENCH_BITXOR:
2051 case PGBENCH_LSHIFT:
2052 case PGBENCH_RSHIFT:
2053 {
2054 int64 li,
2055 ri;
2056
2057 if (!coerceToInt(&vargs[0], &li) || !coerceToInt(&vargs[1], &ri))
2058 return false;
2059
2060 if (func == PGBENCH_BITAND)
2061 setIntValue(retval, li & ri);
2062 else if (func == PGBENCH_BITOR)
2063 setIntValue(retval, li | ri);
2064 else if (func == PGBENCH_BITXOR)
2065 setIntValue(retval, li ^ ri);
2066 else if (func == PGBENCH_LSHIFT)
2067 setIntValue(retval, li << ri);
2068 else if (func == PGBENCH_RSHIFT)
2069 setIntValue(retval, li >> ri);
2070 else /* cannot get here */
2071 Assert(0);
2072
2073 return true;
2074 }
2075
2076 /* logical operators */
2077 case PGBENCH_NOT:
2078 {
2079 bool b;
2080
2081 if (!coerceToBool(&vargs[0], &b))
2082 return false;
2083
2084 setBoolValue(retval, !b);
2085 return true;
2086 }
2087
2088 /* no arguments */
2089 case PGBENCH_PI:
2090 setDoubleValue(retval, M_PI);
2091 return true;
2092
2093 /* 1 overloaded argument */
2094 case PGBENCH_ABS:
2095 {
2096 PgBenchValue *varg = &vargs[0];
2097
2098 Assert(nargs == 1);
2099
2100 if (varg->type == PGBT_INT)
2101 {
2102 int64 i = varg->u.ival;
2103
2104 setIntValue(retval, i < 0 ? -i : i);
2105 }
2106 else
2107 {
2108 double d = varg->u.dval;
2109
2110 Assert(varg->type == PGBT_DOUBLE);
2111 setDoubleValue(retval, d < 0.0 ? -d : d);
2112 }
2113
2114 return true;
2115 }
2116
2117 case PGBENCH_DEBUG:
2118 {
2119 PgBenchValue *varg = &vargs[0];
2120
2121 Assert(nargs == 1);
2122
2123 fprintf(stderr, "debug(script=%d,command=%d): ",
2124 st->use_file, st->command + 1);
2125
2126 if (varg->type == PGBT_NULL)
2127 fprintf(stderr, "null\n");
2128 else if (varg->type == PGBT_BOOLEAN)
2129 fprintf(stderr, "boolean %s\n", varg->u.bval ? "true" : "false");
2130 else if (varg->type == PGBT_INT)
2131 fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
2132 else if (varg->type == PGBT_DOUBLE)
2133 fprintf(stderr, "double %.*g\n", DBL_DIG, varg->u.dval);
2134 else /* internal error, unexpected type */
2135 Assert(0);
2136
2137 *retval = *varg;
2138
2139 return true;
2140 }
2141
2142 /* 1 double argument */
2143 case PGBENCH_DOUBLE:
2144 case PGBENCH_SQRT:
2145 case PGBENCH_LN:
2146 case PGBENCH_EXP:
2147 {
2148 double dval;
2149
2150 Assert(nargs == 1);
2151
2152 if (!coerceToDouble(&vargs[0], &dval))
2153 return false;
2154
2155 if (func == PGBENCH_SQRT)
2156 dval = sqrt(dval);
2157 else if (func == PGBENCH_LN)
2158 dval = log(dval);
2159 else if (func == PGBENCH_EXP)
2160 dval = exp(dval);
2161 /* else is cast: do nothing */
2162
2163 setDoubleValue(retval, dval);
2164 return true;
2165 }
2166
2167 /* 1 int argument */
2168 case PGBENCH_INT:
2169 {
2170 int64 ival;
2171
2172 Assert(nargs == 1);
2173
2174 if (!coerceToInt(&vargs[0], &ival))
2175 return false;
2176
2177 setIntValue(retval, ival);
2178 return true;
2179 }
2180
2181 /* variable number of arguments */
2182 case PGBENCH_LEAST:
2183 case PGBENCH_GREATEST:
2184 {
2185 bool havedouble;
2186 int i;
2187
2188 Assert(nargs >= 1);
2189
2190 /* need double result if any input is double */
2191 havedouble = false;
2192 for (i = 0; i < nargs; i++)
2193 {
2194 if (vargs[i].type == PGBT_DOUBLE)
2195 {
2196 havedouble = true;
2197 break;
2198 }
2199 }
2200 if (havedouble)
2201 {
2202 double extremum;
2203
2204 if (!coerceToDouble(&vargs[0], &extremum))
2205 return false;
2206 for (i = 1; i < nargs; i++)
2207 {
2208 double dval;
2209
2210 if (!coerceToDouble(&vargs[i], &dval))
2211 return false;
2212 if (func == PGBENCH_LEAST)
2213 extremum = Min(extremum, dval);
2214 else
2215 extremum = Max(extremum, dval);
2216 }
2217 setDoubleValue(retval, extremum);
2218 }
2219 else
2220 {
2221 int64 extremum;
2222
2223 if (!coerceToInt(&vargs[0], &extremum))
2224 return false;
2225 for (i = 1; i < nargs; i++)
2226 {
2227 int64 ival;
2228
2229 if (!coerceToInt(&vargs[i], &ival))
2230 return false;
2231 if (func == PGBENCH_LEAST)
2232 extremum = Min(extremum, ival);
2233 else
2234 extremum = Max(extremum, ival);
2235 }
2236 setIntValue(retval, extremum);
2237 }
2238 return true;
2239 }
2240
2241 /* random functions */
2242 case PGBENCH_RANDOM:
2243 case PGBENCH_RANDOM_EXPONENTIAL:
2244 case PGBENCH_RANDOM_GAUSSIAN:
2245 case PGBENCH_RANDOM_ZIPFIAN:
2246 {
2247 int64 imin,
2248 imax;
2249
2250 Assert(nargs >= 2);
2251
2252 if (!coerceToInt(&vargs[0], &imin) ||
2253 !coerceToInt(&vargs[1], &imax))
2254 return false;
2255
2256 /* check random range */
2257 if (imin > imax)
2258 {
2259 fprintf(stderr, "empty range given to random\n");
2260 return false;
2261 }
2262 else if (imax - imin < 0 || (imax - imin) + 1 < 0)
2263 {
2264 /* prevent int overflows in random functions */
2265 fprintf(stderr, "random range is too large\n");
2266 return false;
2267 }
2268
2269 if (func == PGBENCH_RANDOM)
2270 {
2271 Assert(nargs == 2);
2272 setIntValue(retval, getrand(&st->cs_func_rs, imin, imax));
2273 }
2274 else /* gaussian & exponential */
2275 {
2276 double param;
2277
2278 Assert(nargs == 3);
2279
2280 if (!coerceToDouble(&vargs[2], &param))
2281 return false;
2282
2283 if (func == PGBENCH_RANDOM_GAUSSIAN)
2284 {
2285 if (param < MIN_GAUSSIAN_PARAM)
2286 {
2287 fprintf(stderr,
2288 "gaussian parameter must be at least %f "
2289 "(not %f)\n", MIN_GAUSSIAN_PARAM, param);
2290 return false;
2291 }
2292
2293 setIntValue(retval,
2294 getGaussianRand(&st->cs_func_rs,
2295 imin, imax, param));
2296 }
2297 else if (func == PGBENCH_RANDOM_ZIPFIAN)
2298 {
2299 if (param < MIN_ZIPFIAN_PARAM || param > MAX_ZIPFIAN_PARAM)
2300 {
2301 fprintf(stderr,
2302 "zipfian parameter must be in range [%.3f, %.0f]"
2303 " (not %f)\n",
2304 MIN_ZIPFIAN_PARAM, MAX_ZIPFIAN_PARAM, param);
2305 return false;
2306 }
2307
2308 setIntValue(retval,
2309 getZipfianRand(&st->cs_func_rs, imin, imax, param));
2310 }
2311 else /* exponential */
2312 {
2313 if (param <= 0.0)
2314 {
2315 fprintf(stderr,
2316 "exponential parameter must be greater than zero"
2317 " (not %f)\n", param);
2318 return false;
2319 }
2320
2321 setIntValue(retval,
2322 getExponentialRand(&st->cs_func_rs,
2323 imin, imax, param));
2324 }
2325 }
2326
2327 return true;
2328 }
2329
2330 case PGBENCH_POW:
2331 {
2332 PgBenchValue *lval = &vargs[0];
2333 PgBenchValue *rval = &vargs[1];
2334 double ld,
2335 rd;
2336
2337 Assert(nargs == 2);
2338
2339 if (!coerceToDouble(lval, &ld) ||
2340 !coerceToDouble(rval, &rd))
2341 return false;
2342
2343 setDoubleValue(retval, pow(ld, rd));
2344
2345 return true;
2346 }
2347
2348 case PGBENCH_IS:
2349 {
2350 Assert(nargs == 2);
2351
2352 /*
2353 * note: this simple implementation is more permissive than
2354 * SQL
2355 */
2356 setBoolValue(retval,
2357 vargs[0].type == vargs[1].type &&
2358 vargs[0].u.bval == vargs[1].u.bval);
2359 return true;
2360 }
2361
2362 /* hashing */
2363 case PGBENCH_HASH_FNV1A:
2364 case PGBENCH_HASH_MURMUR2:
2365 {
2366 int64 val,
2367 seed;
2368
2369 Assert(nargs == 2);
2370
2371 if (!coerceToInt(&vargs[0], &val) ||
2372 !coerceToInt(&vargs[1], &seed))
2373 return false;
2374
2375 if (func == PGBENCH_HASH_MURMUR2)
2376 setIntValue(retval, getHashMurmur2(val, seed));
2377 else if (func == PGBENCH_HASH_FNV1A)
2378 setIntValue(retval, getHashFnv1a(val, seed));
2379 else
2380 /* cannot get here */
2381 Assert(0);
2382
2383 return true;
2384 }
2385
2386 default:
2387 /* cannot get here */
2388 Assert(0);
2389 /* dead code to avoid a compiler warning */
2390 return false;
2391 }
2392}
2393
2394/* evaluate some function */
2395static bool
2396evalFunc(CState *st,
2397 PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval)
2398{
2399 if (isLazyFunc(func))
2400 return evalLazyFunc(st, func, args, retval);
2401 else
2402 return evalStandardFunc(st, func, args, retval);
2403}
2404
2405/*
2406 * Recursive evaluation of an expression in a pgbench script
2407 * using the current state of variables.
2408 * Returns whether the evaluation was ok,
2409 * the value itself is returned through the retval pointer.
2410 */
2411static bool
2412evaluateExpr(CState *st, PgBenchExpr *expr, PgBenchValue *retval)
2413{
2414 switch (expr->etype)
2415 {
2416 case ENODE_CONSTANT:
2417 {
2418 *retval = expr->u.constant;
2419 return true;
2420 }
2421
2422 case ENODE_VARIABLE:
2423 {
2424 Variable *var;
2425
2426 if ((var = lookupVariable(st, expr->u.variable.varname)) == NULL)
2427 {
2428 fprintf(stderr, "undefined variable \"%s\"\n",
2429 expr->u.variable.varname);
2430 return false;
2431 }
2432
2433 if (!makeVariableValue(var))
2434 return false;
2435
2436 *retval = var->value;
2437 return true;
2438 }
2439
2440 case ENODE_FUNCTION:
2441 return evalFunc(st,
2442 expr->u.function.function,
2443 expr->u.function.args,
2444 retval);
2445
2446 default:
2447 /* internal error which should never occur */
2448 fprintf(stderr, "unexpected enode type in evaluation: %d\n",
2449 expr->etype);
2450 exit(1);
2451 }
2452}
2453
2454/*
2455 * Convert command name to meta-command enum identifier
2456 */
2457static MetaCommand
2458getMetaCommand(const char *cmd)
2459{
2460 MetaCommand mc;
2461
2462 if (cmd == NULL)
2463 mc = META_NONE;
2464 else if (pg_strcasecmp(cmd, "set") == 0)
2465 mc = META_SET;
2466 else if (pg_strcasecmp(cmd, "setshell") == 0)
2467 mc = META_SETSHELL;
2468 else if (pg_strcasecmp(cmd, "shell") == 0)
2469 mc = META_SHELL;
2470 else if (pg_strcasecmp(cmd, "sleep") == 0)
2471 mc = META_SLEEP;
2472 else if (pg_strcasecmp(cmd, "if") == 0)
2473 mc = META_IF;
2474 else if (pg_strcasecmp(cmd, "elif") == 0)
2475 mc = META_ELIF;
2476 else if (pg_strcasecmp(cmd, "else") == 0)
2477 mc = META_ELSE;
2478 else if (pg_strcasecmp(cmd, "endif") == 0)
2479 mc = META_ENDIF;
2480 else if (pg_strcasecmp(cmd, "gset") == 0)
2481 mc = META_GSET;
2482 else
2483 mc = META_NONE;
2484 return mc;
2485}
2486
2487/*
2488 * Run a shell command. The result is assigned to the variable if not NULL.
2489 * Return true if succeeded, or false on error.
2490 */
2491static bool
2492runShellCommand(CState *st, char *variable, char **argv, int argc)
2493{
2494 char command[SHELL_COMMAND_SIZE];
2495 int i,
2496 len = 0;
2497 FILE *fp;
2498 char res[64];
2499 char *endptr;
2500 int retval;
2501
2502 /*----------
2503 * Join arguments with whitespace separators. Arguments starting with
2504 * exactly one colon are treated as variables:
2505 * name - append a string "name"
2506 * :var - append a variable named 'var'
2507 * ::name - append a string ":name"
2508 *----------
2509 */
2510 for (i = 0; i < argc; i++)
2511 {
2512 char *arg;
2513 int arglen;
2514
2515 if (argv[i][0] != ':')
2516 {
2517 arg = argv[i]; /* a string literal */
2518 }
2519 else if (argv[i][1] == ':')
2520 {
2521 arg = argv[i] + 1; /* a string literal starting with colons */
2522 }
2523 else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
2524 {
2525 fprintf(stderr, "%s: undefined variable \"%s\"\n",
2526 argv[0], argv[i]);
2527 return false;
2528 }
2529
2530 arglen = strlen(arg);
2531 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
2532 {
2533 fprintf(stderr, "%s: shell command is too long\n", argv[0]);
2534 return false;
2535 }
2536
2537 if (i > 0)
2538 command[len++] = ' ';
2539 memcpy(command + len, arg, arglen);
2540 len += arglen;
2541 }
2542
2543 command[len] = '\0';
2544
2545 /* Fast path for non-assignment case */
2546 if (variable == NULL)
2547 {
2548 if (system(command))
2549 {
2550 if (!timer_exceeded)
2551 fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
2552 return false;
2553 }
2554 return true;
2555 }
2556
2557 /* Execute the command with pipe and read the standard output. */
2558 if ((fp = popen(command, "r")) == NULL)
2559 {
2560 fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
2561 return false;
2562 }
2563 if (fgets(res, sizeof(res), fp) == NULL)
2564 {
2565 if (!timer_exceeded)
2566 fprintf(stderr, "%s: could not read result of shell command\n", argv[0]);
2567 (void) pclose(fp);
2568 return false;
2569 }
2570 if (pclose(fp) < 0)
2571 {
2572 fprintf(stderr, "%s: could not close shell command\n", argv[0]);
2573 return false;
2574 }
2575
2576 /* Check whether the result is an integer and assign it to the variable */
2577 retval = (int) strtol(res, &endptr, 10);
2578 while (*endptr != '\0' && isspace((unsigned char) *endptr))
2579 endptr++;
2580 if (*res == '\0' || *endptr != '\0')
2581 {
2582 fprintf(stderr, "%s: shell command must return an integer (not \"%s\")\n",
2583 argv[0], res);
2584 return false;
2585 }
2586 if (!putVariableInt(st, "setshell", variable, retval))
2587 return false;
2588
2589#ifdef DEBUG
2590 printf("shell parameter name: \"%s\", value: \"%s\"\n", argv[1], res);
2591#endif
2592 return true;
2593}
2594
2595#define MAX_PREPARE_NAME 32
2596static void
2597preparedStatementName(char *buffer, int file, int state)
2598{
2599 sprintf(buffer, "P%d_%d", file, state);
2600}
2601
2602static void
2603commandFailed(CState *st, const char *cmd, const char *message)
2604{
2605 fprintf(stderr,
2606 "client %d aborted in command %d (%s) of script %d; %s\n",
2607 st->id, st->command, cmd, st->use_file, message);
2608}
2609
2610/* return a script number with a weighted choice. */
2611static int
2612chooseScript(TState *thread)
2613{
2614 int i = 0;
2615 int64 w;
2616
2617 if (num_scripts == 1)
2618 return 0;
2619
2620 w = getrand(&thread->ts_choose_rs, 0, total_weight - 1);
2621 do
2622 {
2623 w -= sql_script[i++].weight;
2624 } while (w >= 0);
2625
2626 return i - 1;
2627}
2628
2629/* Send a SQL command, using the chosen querymode */
2630static bool
2631sendCommand(CState *st, Command *command)
2632{
2633 int r;
2634
2635 if (querymode == QUERY_SIMPLE)
2636 {
2637 char *sql;
2638
2639 sql = pg_strdup(command->argv[0]);
2640 sql = assignVariables(st, sql);
2641
2642 if (debug)
2643 fprintf(stderr, "client %d sending %s\n", st->id, sql);
2644 r = PQsendQuery(st->con, sql);
2645 free(sql);
2646 }
2647 else if (querymode == QUERY_EXTENDED)
2648 {
2649 const char *sql = command->argv[0];
2650 const char *params[MAX_ARGS];
2651
2652 getQueryParams(st, command, params);
2653
2654 if (debug)
2655 fprintf(stderr, "client %d sending %s\n", st->id, sql);
2656 r = PQsendQueryParams(st->con, sql, command->argc - 1,
2657 NULL, params, NULL, NULL, 0);
2658 }
2659 else if (querymode == QUERY_PREPARED)
2660 {
2661 char name[MAX_PREPARE_NAME];
2662 const char *params[MAX_ARGS];
2663
2664 if (!st->prepared[st->use_file])
2665 {
2666 int j;
2667 Command **commands = sql_script[st->use_file].commands;
2668
2669 for (j = 0; commands[j] != NULL; j++)
2670 {
2671 PGresult *res;
2672 char name[MAX_PREPARE_NAME];
2673
2674 if (commands[j]->type != SQL_COMMAND)
2675 continue;
2676 preparedStatementName(name, st->use_file, j);
2677 res = PQprepare(st->con, name,
2678 commands[j]->argv[0], commands[j]->argc - 1, NULL);
2679 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2680 fprintf(stderr, "%s", PQerrorMessage(st->con));
2681 PQclear(res);
2682 }
2683 st->prepared[st->use_file] = true;
2684 }
2685
2686 getQueryParams(st, command, params);
2687 preparedStatementName(name, st->use_file, st->command);
2688
2689 if (debug)
2690 fprintf(stderr, "client %d sending %s\n", st->id, name);
2691 r = PQsendQueryPrepared(st->con, name, command->argc - 1,
2692 params, NULL, NULL, 0);
2693 }
2694 else /* unknown sql mode */
2695 r = 0;
2696
2697 if (r == 0)
2698 {
2699 if (debug)
2700 fprintf(stderr, "client %d could not send %s\n",
2701 st->id, command->argv[0]);
2702 st->ecnt++;
2703 return false;
2704 }
2705 else
2706 return true;
2707}
2708
2709/*
2710 * Process query response from the backend.
2711 *
2712 * If varprefix is not NULL, it's the variable name prefix where to store
2713 * the results of the *last* command.
2714 *
2715 * Returns true if everything is A-OK, false if any error occurs.
2716 */
2717static bool
2718readCommandResponse(CState *st, char *varprefix)
2719{
2720 PGresult *res;
2721 PGresult *next_res;
2722 int qrynum = 0;
2723
2724 res = PQgetResult(st->con);
2725
2726 while (res != NULL)
2727 {
2728 bool is_last;
2729
2730 /* peek at the next result to know whether the current is last */
2731 next_res = PQgetResult(st->con);
2732 is_last = (next_res == NULL);
2733
2734 switch (PQresultStatus(res))
2735 {
2736 case PGRES_COMMAND_OK: /* non-SELECT commands */
2737 case PGRES_EMPTY_QUERY: /* may be used for testing no-op overhead */
2738 if (is_last && varprefix != NULL)
2739 {
2740 fprintf(stderr,
2741 "client %d script %d command %d query %d: expected one row, got %d\n",
2742 st->id, st->use_file, st->command, qrynum, 0);
2743 goto error;
2744 }
2745 break;
2746
2747 case PGRES_TUPLES_OK:
2748 if (is_last && varprefix != NULL)
2749 {
2750 if (PQntuples(res) != 1)
2751 {
2752 fprintf(stderr,
2753 "client %d script %d command %d query %d: expected one row, got %d\n",
2754 st->id, st->use_file, st->command, qrynum, PQntuples(res));
2755 goto error;
2756 }
2757
2758 /* store results into variables */
2759 for (int fld = 0; fld < PQnfields(res); fld++)
2760 {
2761 char *varname = PQfname(res, fld);
2762
2763 /* allocate varname only if necessary, freed below */
2764 if (*varprefix != '\0')
2765 varname = psprintf("%s%s", varprefix, varname);
2766
2767 /* store result as a string */
2768 if (!putVariable(st, "gset", varname,
2769 PQgetvalue(res, 0, fld)))
2770 {
2771 /* internal error */
2772 fprintf(stderr,
2773 "client %d script %d command %d query %d: error storing into variable %s\n",
2774 st->id, st->use_file, st->command, qrynum,
2775 varname);
2776 goto error;
2777 }
2778
2779 if (*varprefix != '\0')
2780 pg_free(varname);
2781 }
2782 }
2783 /* otherwise the result is simply thrown away by PQclear below */
2784 break;
2785
2786 default:
2787 /* anything else is unexpected */
2788 fprintf(stderr,
2789 "client %d script %d aborted in command %d query %d: %s",
2790 st->id, st->use_file, st->command, qrynum,
2791 PQerrorMessage(st->con));
2792 goto error;
2793 }
2794
2795 PQclear(res);
2796 qrynum++;
2797 res = next_res;
2798 }
2799
2800 if (qrynum == 0)
2801 {
2802 fprintf(stderr, "client %d command %d: no results\n", st->id, st->command);
2803 st->ecnt++;
2804 return false;
2805 }
2806
2807 return true;
2808
2809error:
2810 st->ecnt++;
2811 PQclear(res);
2812 PQclear(next_res);
2813 do
2814 {
2815 res = PQgetResult(st->con);
2816 PQclear(res);
2817 } while (res);
2818
2819 return false;
2820}
2821
2822/*
2823 * Parse the argument to a \sleep command, and return the requested amount
2824 * of delay, in microseconds. Returns true on success, false on error.
2825 */
2826static bool
2827evaluateSleep(CState *st, int argc, char **argv, int *usecs)
2828{
2829 char *var;
2830 int usec;
2831
2832 if (*argv[1] == ':')
2833 {
2834 if ((var = getVariable(st, argv[1] + 1)) == NULL)
2835 {
2836 fprintf(stderr, "%s: undefined variable \"%s\"\n",
2837 argv[0], argv[1]);
2838 return false;
2839 }
2840 usec = atoi(var);
2841 }
2842 else
2843 usec = atoi(argv[1]);
2844
2845 if (argc > 2)
2846 {
2847 if (pg_strcasecmp(argv[2], "ms") == 0)
2848 usec *= 1000;
2849 else if (pg_strcasecmp(argv[2], "s") == 0)
2850 usec *= 1000000;
2851 }
2852 else
2853 usec *= 1000000;
2854
2855 *usecs = usec;
2856 return true;
2857}
2858
2859/*
2860 * Advance the state machine of a connection.
2861 */
2862static void
2863advanceConnectionState(TState *thread, CState *st, StatsData *agg)
2864{
2865 instr_time now;
2866
2867 /*
2868 * gettimeofday() isn't free, so we get the current timestamp lazily the
2869 * first time it's needed, and reuse the same value throughout this
2870 * function after that. This also ensures that e.g. the calculated
2871 * latency reported in the log file and in the totals are the same. Zero
2872 * means "not set yet". Reset "now" when we execute shell commands or
2873 * expressions, which might take a non-negligible amount of time, though.
2874 */
2875 INSTR_TIME_SET_ZERO(now);
2876
2877 /*
2878 * Loop in the state machine, until we have to wait for a result from the
2879 * server or have to sleep for throttling or \sleep.
2880 *
2881 * Note: In the switch-statement below, 'break' will loop back here,
2882 * meaning "continue in the state machine". Return is used to return to
2883 * the caller, giving the thread the opportunity to advance another
2884 * client.
2885 */
2886 for (;;)
2887 {
2888 Command *command;
2889
2890 switch (st->state)
2891 {
2892 /* Select transaction (script) to run. */
2893 case CSTATE_CHOOSE_SCRIPT:
2894 st->use_file = chooseScript(thread);
2895 Assert(conditional_stack_empty(st->cstack));
2896
2897 if (debug)
2898 fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
2899 sql_script[st->use_file].desc);
2900
2901 /*
2902 * If time is over, we're done; otherwise, get ready to start
2903 * a new transaction, or to get throttled if that's requested.
2904 */
2905 st->state = timer_exceeded ? CSTATE_FINISHED :
2906 throttle_delay > 0 ? CSTATE_PREPARE_THROTTLE : CSTATE_START_TX;
2907 break;
2908
2909 /* Start new transaction (script) */
2910 case CSTATE_START_TX:
2911
2912 /* establish connection if needed, i.e. under --connect */
2913 if (st->con == NULL)
2914 {
2915 instr_time start;
2916
2917 INSTR_TIME_SET_CURRENT_LAZY(now);
2918 start = now;
2919 if ((st->con = doConnect()) == NULL)
2920 {
2921 fprintf(stderr, "client %d aborted while establishing connection\n",
2922 st->id);
2923 st->state = CSTATE_ABORTED;
2924 break;
2925 }
2926 INSTR_TIME_SET_CURRENT(now);
2927 INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
2928
2929 /* Reset session-local state */
2930 memset(st->prepared, 0, sizeof(st->prepared));
2931 }
2932
2933 /* record transaction start time */
2934 INSTR_TIME_SET_CURRENT_LAZY(now);
2935 st->txn_begin = now;
2936
2937 /*
2938 * When not throttling, this is also the transaction's
2939 * scheduled start time.
2940 */
2941 if (!throttle_delay)
2942 st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
2943
2944 /* Begin with the first command */
2945 st->state = CSTATE_START_COMMAND;
2946 st->command = 0;
2947 break;
2948
2949 /*
2950 * Handle throttling once per transaction by sleeping.
2951 */
2952 case CSTATE_PREPARE_THROTTLE:
2953
2954 /*
2955 * Generate a delay such that the series of delays will
2956 * approximate a Poisson distribution centered on the
2957 * throttle_delay time.
2958 *
2959 * If transactions are too slow or a given wait is shorter
2960 * than a transaction, the next transaction will start right
2961 * away.
2962 */
2963 Assert(throttle_delay > 0);
2964
2965 thread->throttle_trigger +=
2966 getPoissonRand(&thread->ts_throttle_rs, throttle_delay);
2967 st->txn_scheduled = thread->throttle_trigger;
2968
2969 /*
2970 * If --latency-limit is used, and this slot is already late
2971 * so that the transaction will miss the latency limit even if
2972 * it completed immediately, skip this time slot and schedule
2973 * to continue running on the next slot that isn't late yet.
2974 * But don't iterate beyond the -t limit, if one is given.
2975 */
2976 if (latency_limit)
2977 {
2978 int64 now_us;
2979
2980 INSTR_TIME_SET_CURRENT_LAZY(now);
2981 now_us = INSTR_TIME_GET_MICROSEC(now);
2982
2983 while (thread->throttle_trigger < now_us - latency_limit &&
2984 (nxacts <= 0 || st->cnt < nxacts))
2985 {
2986 processXactStats(thread, st, &now, true, agg);
2987 /* next rendez-vous */
2988 thread->throttle_trigger +=
2989 getPoissonRand(&thread->ts_throttle_rs, throttle_delay);
2990 st->txn_scheduled = thread->throttle_trigger;
2991 }
2992
2993 /*
2994 * stop client if -t was exceeded in the previous skip
2995 * loop
2996 */
2997 if (nxacts > 0 && st->cnt >= nxacts)
2998 {
2999 st->state = CSTATE_FINISHED;
3000 break;
3001 }
3002 }
3003
3004 /*
3005 * stop client if next transaction is beyond pgbench end of
3006 * execution; otherwise, throttle it.
3007 */
3008 st->state = end_time > 0 && st->txn_scheduled > end_time ?
3009 CSTATE_FINISHED : CSTATE_THROTTLE;
3010 break;
3011
3012 /*
3013 * Wait until it's time to start next transaction.
3014 */
3015 case CSTATE_THROTTLE:
3016 INSTR_TIME_SET_CURRENT_LAZY(now);
3017
3018 if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
3019 return; /* still sleeping, nothing to do here */
3020
3021 /* done sleeping, but don't start transaction if we're done */
3022 st->state = timer_exceeded ? CSTATE_FINISHED : CSTATE_START_TX;
3023 break;
3024
3025 /*
3026 * Send a command to server (or execute a meta-command)
3027 */
3028 case CSTATE_START_COMMAND:
3029 command = sql_script[st->use_file].commands[st->command];
3030
3031 /* Transition to script end processing if done */
3032 if (command == NULL)
3033 {
3034 st->state = CSTATE_END_TX;
3035 break;
3036 }
3037
3038 /* record begin time of next command, and initiate it */
3039 if (report_per_command)
3040 {
3041 INSTR_TIME_SET_CURRENT_LAZY(now);
3042 st->stmt_begin = now;
3043 }
3044
3045 /* Execute the command */
3046 if (command->type == SQL_COMMAND)
3047 {
3048 if (!sendCommand(st, command))
3049 {
3050 commandFailed(st, "SQL", "SQL command send failed");
3051 st->state = CSTATE_ABORTED;
3052 }
3053 else
3054 st->state = CSTATE_WAIT_RESULT;
3055 }
3056 else if (command->type == META_COMMAND)
3057 {
3058 /*-----
3059 * Possible state changes when executing meta commands:
3060 * - on errors CSTATE_ABORTED
3061 * - on sleep CSTATE_SLEEP
3062 * - else CSTATE_END_COMMAND
3063 */
3064 st->state = executeMetaCommand(st, &now);
3065 }
3066
3067 /*
3068 * We're now waiting for an SQL command to complete, or
3069 * finished processing a metacommand, or need to sleep, or
3070 * something bad happened.
3071 */
3072 Assert(st->state == CSTATE_WAIT_RESULT ||
3073 st->state == CSTATE_END_COMMAND ||
3074 st->state == CSTATE_SLEEP ||
3075 st->state == CSTATE_ABORTED);
3076 break;
3077
3078 /*
3079 * non executed conditional branch
3080 */
3081 case CSTATE_SKIP_COMMAND:
3082 Assert(!conditional_active(st->cstack));
3083 /* quickly skip commands until something to do... */
3084 while (true)
3085 {
3086 Command *command;
3087
3088 command = sql_script[st->use_file].commands[st->command];
3089
3090 /* cannot reach end of script in that state */
3091 Assert(command != NULL);
3092
3093 /*
3094 * if this is conditional related, update conditional
3095 * state
3096 */
3097 if (command->type == META_COMMAND &&
3098 (command->meta == META_IF ||
3099 command->meta == META_ELIF ||
3100 command->meta == META_ELSE ||
3101 command->meta == META_ENDIF))
3102 {
3103 switch (conditional_stack_peek(st->cstack))
3104 {
3105 case IFSTATE_FALSE:
3106 if (command->meta == META_IF ||
3107 command->meta == META_ELIF)
3108 {
3109 /* we must evaluate the condition */
3110 st->state = CSTATE_START_COMMAND;
3111 }
3112 else if (command->meta == META_ELSE)
3113 {
3114 /* we must execute next command */
3115 conditional_stack_poke(st->cstack,
3116 IFSTATE_ELSE_TRUE);
3117 st->state = CSTATE_START_COMMAND;
3118 st->command++;
3119 }
3120 else if (command->meta == META_ENDIF)
3121 {
3122 Assert(!conditional_stack_empty(st->cstack));
3123 conditional_stack_pop(st->cstack);
3124 if (conditional_active(st->cstack))
3125 st->state = CSTATE_START_COMMAND;
3126
3127 /*
3128 * else state remains in
3129 * CSTATE_SKIP_COMMAND
3130 */
3131 st->command++;
3132 }
3133 break;
3134
3135 case IFSTATE_IGNORED:
3136 case IFSTATE_ELSE_FALSE:
3137 if (command->meta == META_IF)
3138 conditional_stack_push(st->cstack,
3139 IFSTATE_IGNORED);
3140 else if (command->meta == META_ENDIF)
3141 {
3142 Assert(!conditional_stack_empty(st->cstack));
3143 conditional_stack_pop(st->cstack);
3144 if (conditional_active(st->cstack))
3145 st->state = CSTATE_START_COMMAND;
3146 }
3147 /* could detect "else" & "elif" after "else" */
3148 st->command++;
3149 break;
3150
3151 case IFSTATE_NONE:
3152 case IFSTATE_TRUE:
3153 case IFSTATE_ELSE_TRUE:
3154 default:
3155
3156 /*
3157 * inconsistent if inactive, unreachable dead
3158 * code
3159 */
3160 Assert(false);
3161 }
3162 }
3163 else
3164 {
3165 /* skip and consider next */
3166 st->command++;
3167 }
3168
3169 if (st->state != CSTATE_SKIP_COMMAND)
3170 /* out of quick skip command loop */
3171 break;
3172 }
3173 break;
3174
3175 /*
3176 * Wait for the current SQL command to complete
3177 */
3178 case CSTATE_WAIT_RESULT:
3179 if (debug)
3180 fprintf(stderr, "client %d receiving\n", st->id);
3181 if (!PQconsumeInput(st->con))
3182 {
3183 /* there's something wrong */
3184 commandFailed(st, "SQL", "perhaps the backend died while processing");
3185 st->state = CSTATE_ABORTED;
3186 break;
3187 }
3188 if (PQisBusy(st->con))
3189 return; /* don't have the whole result yet */
3190
3191 /* store or discard the query results */
3192 if (readCommandResponse(st, sql_script[st->use_file].commands[st->command]->varprefix))
3193 st->state = CSTATE_END_COMMAND;
3194 else
3195 st->state = CSTATE_ABORTED;
3196 break;
3197
3198 /*
3199 * Wait until sleep is done. This state is entered after a
3200 * \sleep metacommand. The behavior is similar to
3201 * CSTATE_THROTTLE, but proceeds to CSTATE_START_COMMAND
3202 * instead of CSTATE_START_TX.
3203 */
3204 case CSTATE_SLEEP:
3205 INSTR_TIME_SET_CURRENT_LAZY(now);
3206 if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
3207 return; /* still sleeping, nothing to do here */
3208 /* Else done sleeping. */
3209 st->state = CSTATE_END_COMMAND;
3210 break;
3211
3212 /*
3213 * End of command: record stats and proceed to next command.
3214 */
3215 case CSTATE_END_COMMAND:
3216
3217 /*
3218 * command completed: accumulate per-command execution times
3219 * in thread-local data structure, if per-command latencies
3220 * are requested.
3221 */
3222 if (report_per_command)
3223 {
3224 Command *command;
3225
3226 INSTR_TIME_SET_CURRENT_LAZY(now);
3227
3228 command = sql_script[st->use_file].commands[st->command];
3229 /* XXX could use a mutex here, but we choose not to */
3230 addToSimpleStats(&command->stats,
3231 INSTR_TIME_GET_DOUBLE(now) -
3232 INSTR_TIME_GET_DOUBLE(st->stmt_begin));
3233 }
3234
3235 /* Go ahead with next command, to be executed or skipped */
3236 st->command++;
3237 st->state = conditional_active(st->cstack) ?
3238 CSTATE_START_COMMAND : CSTATE_SKIP_COMMAND;
3239 break;
3240
3241 /*
3242 * End of transaction (end of script, really).
3243 */
3244 case CSTATE_END_TX:
3245
3246 /* transaction finished: calculate latency and do log */
3247 processXactStats(thread, st, &now, false, agg);
3248
3249 /*
3250 * missing \endif... cannot happen if CheckConditional was
3251 * okay
3252 */
3253 Assert(conditional_stack_empty(st->cstack));
3254
3255 if (is_connect)
3256 {
3257 finishCon(st);
3258 INSTR_TIME_SET_ZERO(now);
3259 }
3260
3261 if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
3262 {
3263 /* script completed */
3264 st->state = CSTATE_FINISHED;
3265 break;
3266 }
3267
3268 /* next transaction (script) */
3269 st->state = CSTATE_CHOOSE_SCRIPT;
3270
3271 /*
3272 * Ensure that we always return on this point, so as to avoid
3273 * an infinite loop if the script only contains meta commands.
3274 */
3275 return;
3276
3277 /*
3278 * Final states. Close the connection if it's still open.
3279 */
3280 case CSTATE_ABORTED:
3281 case CSTATE_FINISHED:
3282 finishCon(st);
3283 return;
3284 }
3285 }
3286}
3287
3288/*
3289 * Subroutine for advanceConnectionState -- initiate or execute the current
3290 * meta command, and return the next state to set.
3291 *
3292 * *now is updated to the current time, unless the command is expected to
3293 * take no time to execute.
3294 */
3295static ConnectionStateEnum
3296executeMetaCommand(CState *st, instr_time *now)
3297{
3298 Command *command = sql_script[st->use_file].commands[st->command];
3299 int argc;
3300 char **argv;
3301
3302 Assert(command != NULL && command->type == META_COMMAND);
3303
3304 argc = command->argc;
3305 argv = command->argv;
3306
3307 if (debug)
3308 {
3309 fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
3310 for (int i = 1; i < argc; i++)
3311 fprintf(stderr, " %s", argv[i]);
3312 fprintf(stderr, "\n");
3313 }
3314
3315 if (command->meta == META_SLEEP)
3316 {
3317 int usec;
3318
3319 /*
3320 * A \sleep doesn't execute anything, we just get the delay from the
3321 * argument, and enter the CSTATE_SLEEP state. (The per-command
3322 * latency will be recorded in CSTATE_SLEEP state, not here, after the
3323 * delay has elapsed.)
3324 */
3325 if (!evaluateSleep(st, argc, argv, &usec))
3326 {
3327 commandFailed(st, "sleep", "execution of meta-command failed");
3328 return CSTATE_ABORTED;
3329 }
3330
3331 INSTR_TIME_SET_CURRENT_LAZY(*now);
3332 st->sleep_until = INSTR_TIME_GET_MICROSEC(*now) + usec;
3333 return CSTATE_SLEEP;
3334 }
3335 else if (command->meta == META_SET)
3336 {
3337 PgBenchExpr *expr = command->expr;
3338 PgBenchValue result;
3339
3340 if (!evaluateExpr(st, expr, &result))
3341 {
3342 commandFailed(st, argv[0], "evaluation of meta-command failed");
3343 return CSTATE_ABORTED;
3344 }
3345
3346 if (!putVariableValue(st, argv[0], argv[1], &result))
3347 {
3348 commandFailed(st, "set", "assignment of meta-command failed");
3349 return CSTATE_ABORTED;
3350 }
3351 }
3352 else if (command->meta == META_IF)
3353 {
3354 /* backslash commands with an expression to evaluate */
3355 PgBenchExpr *expr = command->expr;
3356 PgBenchValue result;
3357 bool cond;
3358
3359 if (!evaluateExpr(st, expr, &result))
3360 {
3361 commandFailed(st, argv[0], "evaluation of meta-command failed");
3362 return CSTATE_ABORTED;
3363 }
3364
3365 cond = valueTruth(&result);
3366 conditional_stack_push(st->cstack, cond ? IFSTATE_TRUE : IFSTATE_FALSE);
3367 }
3368 else if (command->meta == META_ELIF)
3369 {
3370 /* backslash commands with an expression to evaluate */
3371 PgBenchExpr *expr = command->expr;
3372 PgBenchValue result;
3373 bool cond;
3374
3375 if (conditional_stack_peek(st->cstack) == IFSTATE_TRUE)
3376 {
3377 /* elif after executed block, skip eval and wait for endif. */
3378 conditional_stack_poke(st->cstack, IFSTATE_IGNORED);
3379 return CSTATE_END_COMMAND;
3380 }
3381
3382 if (!evaluateExpr(st, expr, &result))
3383 {
3384 commandFailed(st, argv[0], "evaluation of meta-command failed");
3385 return CSTATE_ABORTED;
3386 }
3387
3388 cond = valueTruth(&result);
3389 Assert(conditional_stack_peek(st->cstack) == IFSTATE_FALSE);
3390 conditional_stack_poke(st->cstack, cond ? IFSTATE_TRUE : IFSTATE_FALSE);
3391 }
3392 else if (command->meta == META_ELSE)
3393 {
3394 switch (conditional_stack_peek(st->cstack))
3395 {
3396 case IFSTATE_TRUE:
3397 conditional_stack_poke(st->cstack, IFSTATE_ELSE_FALSE);
3398 break;
3399 case IFSTATE_FALSE: /* inconsistent if active */
3400 case IFSTATE_IGNORED: /* inconsistent if active */
3401 case IFSTATE_NONE: /* else without if */
3402 case IFSTATE_ELSE_TRUE: /* else after else */
3403 case IFSTATE_ELSE_FALSE: /* else after else */
3404 default:
3405 /* dead code if conditional check is ok */
3406 Assert(false);
3407 }
3408 }
3409 else if (command->meta == META_ENDIF)
3410 {
3411 Assert(!conditional_stack_empty(st->cstack));
3412 conditional_stack_pop(st->cstack);
3413 }
3414 else if (command->meta == META_SETSHELL)
3415 {
3416 if (!runShellCommand(st, argv[1], argv + 2, argc - 2))
3417 {
3418 commandFailed(st, "setshell", "execution of meta-command failed");
3419 return CSTATE_ABORTED;
3420 }
3421 }
3422 else if (command->meta == META_SHELL)
3423 {
3424 if (!runShellCommand(st, NULL, argv + 1, argc - 1))
3425 {
3426 commandFailed(st, "shell", "execution of meta-command failed");
3427 return CSTATE_ABORTED;
3428 }
3429 }
3430
3431 /*
3432 * executing the expression or shell command might have taken a
3433 * non-negligible amount of time, so reset 'now'
3434 */
3435 INSTR_TIME_SET_ZERO(*now);
3436
3437 return CSTATE_END_COMMAND;
3438}
3439
3440/*
3441 * Print log entry after completing one transaction.
3442 *
3443 * We print Unix-epoch timestamps in the log, so that entries can be
3444 * correlated against other logs. On some platforms this could be obtained
3445 * from the instr_time reading the caller has, but rather than get entangled
3446 * with that, we just eat the cost of an extra syscall in all cases.
3447 */
3448static void
3449doLog(TState *thread, CState *st,
3450 StatsData *agg, bool skipped, double latency, double lag)
3451{
3452 FILE *logfile = thread->logfile;
3453
3454 Assert(use_log);
3455
3456 /*
3457 * Skip the log entry if sampling is enabled and this row doesn't belong
3458 * to the random sample.
3459 */
3460 if (sample_rate != 0.0 &&
3461 pg_erand48(thread->ts_sample_rs.xseed) > sample_rate)
3462 return;
3463
3464 /* should we aggregate the results or not? */
3465 if (agg_interval > 0)
3466 {
3467 /*
3468 * Loop until we reach the interval of the current moment, and print
3469 * any empty intervals in between (this may happen with very low tps,
3470 * e.g. --rate=0.1).
3471 */
3472 time_t now = time(NULL);
3473
3474 while (agg->start_time + agg_interval <= now)
3475 {
3476 /* print aggregated report to logfile */
3477 fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
3478 (long) agg->start_time,
3479 agg->cnt,
3480 agg->latency.sum,
3481 agg->latency.sum2,
3482 agg->latency.min,
3483 agg->latency.max);
3484 if (throttle_delay)
3485 {
3486 fprintf(logfile, " %.0f %.0f %.0f %.0f",
3487 agg->lag.sum,
3488 agg->lag.sum2,
3489 agg->lag.min,
3490 agg->lag.max);
3491 if (latency_limit)
3492 fprintf(logfile, " " INT64_FORMAT, agg->skipped);
3493 }
3494 fputc('\n', logfile);
3495
3496 /* reset data and move to next interval */
3497 initStats(agg, agg->start_time + agg_interval);
3498 }
3499
3500 /* accumulate the current transaction */
3501 accumStats(agg, skipped, latency, lag);
3502 }
3503 else
3504 {
3505 /* no, print raw transactions */
3506 struct timeval tv;
3507
3508 gettimeofday(&tv, NULL);
3509 if (skipped)
3510 fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld",
3511 st->id, st->cnt, st->use_file,
3512 (long) tv.tv_sec, (long) tv.tv_usec);
3513 else
3514 fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld",
3515 st->id, st->cnt, latency, st->use_file,
3516 (long) tv.tv_sec, (long) tv.tv_usec);
3517 if (throttle_delay)
3518 fprintf(logfile, " %.0f", lag);
3519 fputc('\n', logfile);
3520 }
3521}
3522
3523/*
3524 * Accumulate and report statistics at end of a transaction.
3525 *
3526 * (This is also called when a transaction is late and thus skipped.
3527 * Note that even skipped transactions are counted in the "cnt" fields.)
3528 */
3529static void
3530processXactStats(TState *thread, CState *st, instr_time *now,
3531 bool skipped, StatsData *agg)
3532{
3533 double latency = 0.0,
3534 lag = 0.0;
3535 bool thread_details = progress || throttle_delay || latency_limit,
3536 detailed = thread_details || use_log || per_script_stats;
3537
3538 if (detailed && !skipped)
3539 {
3540 INSTR_TIME_SET_CURRENT_LAZY(*now);
3541
3542 /* compute latency & lag */
3543 latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
3544 lag = INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled;
3545 }
3546
3547 if (thread_details)
3548 {
3549 /* keep detailed thread stats */
3550 accumStats(&thread->stats, skipped, latency, lag);
3551
3552 /* count transactions over the latency limit, if needed */
3553 if (latency_limit && latency > latency_limit)
3554 thread->latency_late++;
3555 }
3556 else
3557 {
3558 /* no detailed stats, just count */
3559 thread->stats.cnt++;
3560 }
3561
3562 /* client stat is just counting */
3563 st->cnt++;
3564
3565 if (use_log)
3566 doLog(thread, st, agg, skipped, latency, lag);
3567
3568 /* XXX could use a mutex here, but we choose not to */
3569 if (per_script_stats)
3570 accumStats(&sql_script[st->use_file].stats, skipped, latency, lag);
3571}
3572
3573
3574/* discard connections */
3575static void
3576disconnect_all(CState *state, int length)
3577{
3578 int i;
3579
3580 for (i = 0; i < length; i++)
3581 finishCon(&state[i]);
3582}
3583
3584/*
3585 * Remove old pgbench tables, if any exist
3586 */
3587static void
3588initDropTables(PGconn *con)
3589{
3590 fprintf(stderr, "dropping old tables...\n");
3591
3592 /*
3593 * We drop all the tables in one command, so that whether there are
3594 * foreign key dependencies or not doesn't matter.
3595 */
3596 executeStatement(con, "drop table if exists "
3597 "pgbench_accounts, "
3598 "pgbench_branches, "
3599 "pgbench_history, "
3600 "pgbench_tellers");
3601}
3602
3603/*
3604 * Create pgbench's standard tables
3605 */
3606static void
3607initCreateTables(PGconn *con)
3608{
3609 /*
3610 * Note: TPC-B requires at least 100 bytes per row, and the "filler"
3611 * fields in these table declarations were intended to comply with that.
3612 * The pgbench_accounts table complies with that because the "filler"
3613 * column is set to blank-padded empty string. But for all other tables
3614 * the columns default to NULL and so don't actually take any space. We
3615 * could fix that by giving them non-null default values. However, that
3616 * would completely break comparability of pgbench results with prior
3617 * versions. Since pgbench has never pretended to be fully TPC-B compliant
3618 * anyway, we stick with the historical behavior.
3619 */
3620 struct ddlinfo
3621 {
3622 const char *table; /* table name */
3623 const char *smcols; /* column decls if accountIDs are 32 bits */
3624 const char *bigcols; /* column decls if accountIDs are 64 bits */
3625 int declare_fillfactor;
3626 };
3627 static const struct ddlinfo DDLs[] = {
3628 {
3629 "pgbench_history",
3630 "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
3631 "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
3632 0
3633 },
3634 {
3635 "pgbench_tellers",
3636 "tid int not null,bid int,tbalance int,filler char(84)",
3637 "tid int not null,bid int,tbalance int,filler char(84)",
3638 1
3639 },
3640 {
3641 "pgbench_accounts",
3642 "aid int not null,bid int,abalance int,filler char(84)",
3643 "aid bigint not null,bid int,abalance int,filler char(84)",
3644 1
3645 },
3646 {
3647 "pgbench_branches",
3648 "bid int not null,bbalance int,filler char(88)",
3649 "bid int not null,bbalance int,filler char(88)",
3650 1
3651 }
3652 };
3653 int i;
3654
3655 fprintf(stderr, "creating tables...\n");
3656
3657 for (i = 0; i < lengthof(DDLs); i++)
3658 {
3659 char opts[256];
3660 char buffer[256];
3661 const struct ddlinfo *ddl = &DDLs[i];
3662 const char *cols;
3663
3664 /* Construct new create table statement. */
3665 opts[0] = '\0';
3666 if (ddl->declare_fillfactor)
3667 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
3668 " with (fillfactor=%d)", fillfactor);
3669 if (tablespace != NULL)
3670 {
3671 char *escape_tablespace;
3672
3673 escape_tablespace = PQescapeIdentifier(con, tablespace,
3674 strlen(tablespace));
3675 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
3676 " tablespace %s", escape_tablespace);
3677 PQfreemem(escape_tablespace);
3678 }
3679
3680 cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;
3681
3682 snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",
3683 unlogged_tables ? " unlogged" : "",
3684 ddl->table, cols, opts);
3685
3686 executeStatement(con, buffer);
3687 }
3688}
3689
3690/*
3691 * Fill the standard tables with some data
3692 */
3693static void
3694initGenerateData(PGconn *con)
3695{
3696 char sql[256];
3697 PGresult *res;
3698 int i;
3699 int64 k;
3700
3701 /* used to track elapsed time and estimate of the remaining time */
3702 instr_time start,
3703 diff;
3704 double elapsed_sec,
3705 remaining_sec;
3706 int log_interval = 1;
3707
3708 fprintf(stderr, "generating data...\n");
3709
3710 /*
3711 * we do all of this in one transaction to enable the backend's
3712 * data-loading optimizations
3713 */
3714 executeStatement(con, "begin");
3715
3716 /*
3717 * truncate away any old data, in one command in case there are foreign
3718 * keys
3719 */
3720 executeStatement(con, "truncate table "
3721 "pgbench_accounts, "
3722 "pgbench_branches, "
3723 "pgbench_history, "
3724 "pgbench_tellers");
3725
3726 /*
3727 * fill branches, tellers, accounts in that order in case foreign keys
3728 * already exist
3729 */
3730 for (i = 0; i < nbranches * scale; i++)
3731 {
3732 /* "filler" column defaults to NULL */
3733 snprintf(sql, sizeof(sql),
3734 "insert into pgbench_branches(bid,bbalance) values(%d,0)",
3735 i + 1);
3736 executeStatement(con, sql);
3737 }
3738
3739 for (i = 0; i < ntellers * scale; i++)
3740 {
3741 /* "filler" column defaults to NULL */
3742 snprintf(sql, sizeof(sql),
3743 "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
3744 i + 1, i / ntellers + 1);
3745 executeStatement(con, sql);
3746 }
3747
3748 /*
3749 * accounts is big enough to be worth using COPY and tracking runtime
3750 */
3751 res = PQexec(con, "copy pgbench_accounts from stdin");
3752 if (PQresultStatus(res) != PGRES_COPY_IN)
3753 {
3754 fprintf(stderr, "%s", PQerrorMessage(con));
3755 exit(1);
3756 }
3757 PQclear(res);
3758
3759 INSTR_TIME_SET_CURRENT(start);
3760
3761 for (k = 0; k < (int64) naccounts * scale; k++)
3762 {
3763 int64 j = k + 1;
3764
3765 /* "filler" column defaults to blank padded empty string */
3766 snprintf(sql, sizeof(sql),
3767 INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
3768 j, k / naccounts + 1, 0);
3769 if (PQputline(con, sql))
3770 {
3771 fprintf(stderr, "PQputline failed\n");
3772 exit(1);
3773 }
3774
3775 /*
3776 * If we want to stick with the original logging, print a message each
3777 * 100k inserted rows.
3778 */
3779 if ((!use_quiet) && (j % 100000 == 0))
3780 {
3781 INSTR_TIME_SET_CURRENT(diff);
3782 INSTR_TIME_SUBTRACT(diff, start);
3783
3784 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
3785 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
3786
3787 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
3788 j, (int64) naccounts * scale,
3789 (int) (((int64) j * 100) / (naccounts * (int64) scale)),
3790 elapsed_sec, remaining_sec);
3791 }
3792 /* let's not call the timing for each row, but only each 100 rows */
3793 else if (use_quiet && (j % 100 == 0))
3794 {
3795 INSTR_TIME_SET_CURRENT(diff);
3796 INSTR_TIME_SUBTRACT(diff, start);
3797
3798 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
3799 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
3800
3801 /* have we reached the next interval (or end)? */
3802 if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
3803 {
3804 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
3805 j, (int64) naccounts * scale,
3806 (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);
3807
3808 /* skip to the next interval */
3809 log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
3810 }
3811 }
3812
3813 }
3814 if (PQputline(con, "\\.\n"))
3815 {
3816 fprintf(stderr, "very last PQputline failed\n");
3817 exit(1);
3818 }
3819 if (PQendcopy(con))
3820 {
3821 fprintf(stderr, "PQendcopy failed\n");
3822 exit(1);
3823 }
3824
3825 executeStatement(con, "commit");
3826}
3827
3828/*
3829 * Invoke vacuum on the standard tables
3830 */
3831static void
3832initVacuum(PGconn *con)
3833{
3834 fprintf(stderr, "vacuuming...\n");
3835 executeStatement(con, "vacuum analyze pgbench_branches");
3836 executeStatement(con, "vacuum analyze pgbench_tellers");
3837 executeStatement(con, "vacuum analyze pgbench_accounts");
3838 executeStatement(con, "vacuum analyze pgbench_history");
3839}
3840
3841/*
3842 * Create primary keys on the standard tables
3843 */
3844static void
3845initCreatePKeys(PGconn *con)
3846{
3847 static const char *const DDLINDEXes[] = {
3848 "alter table pgbench_branches add primary key (bid)",
3849 "alter table pgbench_tellers add primary key (tid)",
3850 "alter table pgbench_accounts add primary key (aid)"
3851 };
3852 int i;
3853
3854 fprintf(stderr, "creating primary keys...\n");
3855 for (i = 0; i < lengthof(DDLINDEXes); i++)
3856 {
3857 char buffer[256];
3858
3859 strlcpy(buffer, DDLINDEXes[i], sizeof(buffer));
3860
3861 if (index_tablespace != NULL)
3862 {
3863 char *escape_tablespace;
3864
3865 escape_tablespace = PQescapeIdentifier(con, index_tablespace,
3866 strlen(index_tablespace));
3867 snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer),
3868 " using index tablespace %s", escape_tablespace);
3869 PQfreemem(escape_tablespace);
3870 }
3871
3872 executeStatement(con, buffer);
3873 }
3874}
3875
3876/*
3877 * Create foreign key constraints between the standard tables
3878 */
3879static void
3880initCreateFKeys(PGconn *con)
3881{
3882 static const char *const DDLKEYs[] = {
3883 "alter table pgbench_tellers add constraint pgbench_tellers_bid_fkey foreign key (bid) references pgbench_branches",
3884 "alter table pgbench_accounts add constraint pgbench_accounts_bid_fkey foreign key (bid) references pgbench_branches",
3885 "alter table pgbench_history add constraint pgbench_history_bid_fkey foreign key (bid) references pgbench_branches",
3886 "alter table pgbench_history add constraint pgbench_history_tid_fkey foreign key (tid) references pgbench_tellers",
3887 "alter table pgbench_history add constraint pgbench_history_aid_fkey foreign key (aid) references pgbench_accounts"
3888 };
3889 int i;
3890
3891 fprintf(stderr, "creating foreign keys...\n");
3892 for (i = 0; i < lengthof(DDLKEYs); i++)
3893 {
3894 executeStatement(con, DDLKEYs[i]);
3895 }
3896}
3897
3898/*
3899 * Validate an initialization-steps string
3900 *
3901 * (We could just leave it to runInitSteps() to fail if there are wrong
3902 * characters, but since initialization can take awhile, it seems friendlier
3903 * to check during option parsing.)
3904 */
3905static void
3906checkInitSteps(const char *initialize_steps)
3907{
3908 const char *step;
3909
3910 if (initialize_steps[0] == '\0')
3911 {
3912 fprintf(stderr, "no initialization steps specified\n");
3913 exit(1);
3914 }
3915
3916 for (step = initialize_steps; *step != '\0'; step++)
3917 {
3918 if (strchr("dtgvpf ", *step) == NULL)
3919 {
3920 fprintf(stderr, "unrecognized initialization step \"%c\"\n",
3921 *step);
3922 fprintf(stderr, "allowed steps are: \"d\", \"t\", \"g\", \"v\", \"p\", \"f\"\n");
3923 exit(1);
3924 }
3925 }
3926}
3927
3928/*
3929 * Invoke each initialization step in the given string
3930 */
3931static void
3932runInitSteps(const char *initialize_steps)
3933{
3934 PGconn *con;
3935 const char *step;
3936
3937 if ((con = doConnect()) == NULL)
3938 exit(1);
3939
3940 for (step = initialize_steps; *step != '\0'; step++)
3941 {
3942 switch (*step)
3943 {
3944 case 'd':
3945 initDropTables(con);
3946 break;
3947 case 't':
3948 initCreateTables(con);
3949 break;
3950 case 'g':
3951 initGenerateData(con);
3952 break;
3953 case 'v':
3954 initVacuum(con);
3955 break;
3956 case 'p':
3957 initCreatePKeys(con);
3958 break;
3959 case 'f':
3960 initCreateFKeys(con);
3961 break;
3962 case ' ':
3963 break; /* ignore */
3964 default:
3965 fprintf(stderr, "unrecognized initialization step \"%c\"\n",
3966 *step);
3967 PQfinish(con);
3968 exit(1);
3969 }
3970 }
3971
3972 fprintf(stderr, "done.\n");
3973 PQfinish(con);
3974}
3975
3976/*
3977 * Replace :param with $n throughout the command's SQL text, which
3978 * is a modifiable string in cmd->lines.
3979 */
3980static bool
3981parseQuery(Command *cmd)
3982{
3983 char *sql,
3984 *p;
3985
3986 cmd->argc = 1;
3987
3988 p = sql = pg_strdup(cmd->lines.data);
3989 while ((p = strchr(p, ':')) != NULL)
3990 {
3991 char var[13];
3992 char *name;
3993 int eaten;
3994
3995 name = parseVariable(p, &eaten);
3996 if (name == NULL)
3997 {
3998 while (*p == ':')
3999 {
4000 p++;
4001 }
4002 continue;
4003 }
4004
4005 /*
4006 * cmd->argv[0] is the SQL statement itself, so the max number of
4007 * arguments is one less than MAX_ARGS
4008 */
4009 if (cmd->argc >= MAX_ARGS)
4010 {
4011 fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n",
4012 MAX_ARGS - 1, cmd->lines.data);
4013 pg_free(name);
4014 return false;
4015 }
4016
4017 sprintf(var, "$%d", cmd->argc);
4018 p = replaceVariable(&sql, p, eaten, var);
4019
4020 cmd->argv[cmd->argc] = name;
4021 cmd->argc++;
4022 }
4023
4024 Assert(cmd->argv[0] == NULL);
4025 cmd->argv[0] = sql;
4026 return true;
4027}
4028
4029/*
4030 * syntax error while parsing a script (in practice, while parsing a
4031 * backslash command, because we don't detect syntax errors in SQL)
4032 *
4033 * source: source of script (filename or builtin-script ID)
4034 * lineno: line number within script (count from 1)
4035 * line: whole line of backslash command, if available
4036 * command: backslash command name, if available
4037 * msg: the actual error message
4038 * more: optional extra message
4039 * column: zero-based column number, or -1 if unknown
4040 */
4041void
4042syntax_error(const char *source, int lineno,
4043 const char *line, const char *command,
4044 const char *msg, const char *more, int column)
4045{
4046 fprintf(stderr, "%s:%d: %s", source, lineno, msg);
4047 if (more != NULL)
4048 fprintf(stderr, " (%s)", more);
4049 if (column >= 0 && line == NULL)
4050 fprintf(stderr, " at column %d", column + 1);
4051 if (command != NULL)
4052 fprintf(stderr, " in command \"%s\"", command);
4053 fprintf(stderr, "\n");
4054 if (line != NULL)
4055 {
4056 fprintf(stderr, "%s\n", line);
4057 if (column >= 0)
4058 {
4059 int i;
4060
4061 for (i = 0; i < column; i++)
4062 fprintf(stderr, " ");
4063 fprintf(stderr, "^ error found here\n");
4064 }
4065 }
4066 exit(1);
4067}
4068
4069/*
4070 * Return a pointer to the start of the SQL command, after skipping over
4071 * whitespace and "--" comments.
4072 * If the end of the string is reached, return NULL.
4073 */
4074static char *
4075skip_sql_comments(char *sql_command)
4076{
4077 char *p = sql_command;
4078
4079 /* Skip any leading whitespace, as well as "--" style comments */
4080 for (;;)
4081 {
4082 if (isspace((unsigned char) *p))
4083 p++;
4084 else if (strncmp(p, "--", 2) == 0)
4085 {
4086 p = strchr(p, '\n');
4087 if (p == NULL)
4088 return NULL;
4089 p++;
4090 }
4091 else
4092 break;
4093 }
4094
4095 /* NULL if there's nothing but whitespace and comments */
4096 if (*p == '\0')
4097 return NULL;
4098
4099 return p;
4100}
4101
4102/*
4103 * Parse a SQL command; return a Command struct, or NULL if it's a comment
4104 *
4105 * On entry, psqlscan.l has collected the command into "buf", so we don't
4106 * really need to do much here except check for comments and set up a Command
4107 * struct.
4108 */
4109static Command *
4110create_sql_command(PQExpBuffer buf, const char *source)
4111{
4112 Command *my_command;
4113 char *p = skip_sql_comments(buf->data);
4114
4115 if (p == NULL)
4116 return NULL;
4117
4118 /* Allocate and initialize Command structure */
4119 my_command = (Command *) pg_malloc(sizeof(Command));
4120 initPQExpBuffer(&my_command->lines);
4121 appendPQExpBufferStr(&my_command->lines, p);
4122 my_command->first_line = NULL; /* this is set later */
4123 my_command->type = SQL_COMMAND;
4124 my_command->meta = META_NONE;
4125 my_command->argc = 0;
4126 memset(my_command->argv, 0, sizeof(my_command->argv));
4127 my_command->varprefix = NULL; /* allocated later, if needed */
4128 my_command->expr = NULL;
4129 initSimpleStats(&my_command->stats);
4130
4131 return my_command;
4132}
4133
4134/* Free a Command structure and associated data */
4135static void
4136free_command(Command *command)
4137{
4138 termPQExpBuffer(&command->lines);
4139 if (command->first_line)
4140 pg_free(command->first_line);
4141 for (int i = 0; i < command->argc; i++)
4142 pg_free(command->argv[i]);
4143 if (command->varprefix)
4144 pg_free(command->varprefix);
4145
4146 /*
4147 * It should also free expr recursively, but this is currently not needed
4148 * as only gset commands (which do not have an expression) are freed.
4149 */
4150 pg_free(command);
4151}
4152
4153/*
4154 * Once an SQL command is fully parsed, possibly by accumulating several
4155 * parts, complete other fields of the Command structure.
4156 */
4157static void
4158postprocess_sql_command(Command *my_command)
4159{
4160 char buffer[128];
4161
4162 Assert(my_command->type == SQL_COMMAND);
4163
4164 /* Save the first line for error display. */
4165 strlcpy(buffer, my_command->lines.data, sizeof(buffer));
4166 buffer[strcspn(buffer, "\n\r")] = '\0';
4167 my_command->first_line = pg_strdup(buffer);
4168
4169 /* parse query if necessary */
4170 switch (querymode)
4171 {
4172 case QUERY_SIMPLE:
4173 my_command->argv[0] = my_command->lines.data;
4174 my_command->argc++;
4175 break;
4176 case QUERY_EXTENDED:
4177 case QUERY_PREPARED:
4178 if (!parseQuery(my_command))
4179 exit(1);
4180 break;
4181 default:
4182 exit(1);
4183 }
4184}
4185
4186/*
4187 * Parse a backslash command; return a Command struct, or NULL if comment
4188 *
4189 * At call, we have scanned only the initial backslash.
4190 */
4191static Command *
4192process_backslash_command(PsqlScanState sstate, const char *source)
4193{
4194 Command *my_command;
4195 PQExpBufferData word_buf;
4196 int word_offset;
4197 int offsets[MAX_ARGS]; /* offsets of argument words */
4198 int start_offset;
4199 int lineno;
4200 int j;
4201
4202 initPQExpBuffer(&word_buf);
4203
4204 /* Remember location of the backslash */
4205 start_offset = expr_scanner_offset(sstate) - 1;
4206 lineno = expr_scanner_get_lineno(sstate, start_offset);
4207
4208 /* Collect first word of command */
4209 if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
4210 {
4211 termPQExpBuffer(&word_buf);
4212 return NULL;
4213 }
4214
4215 /* Allocate and initialize Command structure */
4216 my_command = (Command *) pg_malloc0(sizeof(Command));
4217 my_command->type = META_COMMAND;
4218 my_command->argc = 0;
4219 initSimpleStats(&my_command->stats);
4220
4221 /* Save first word (command name) */
4222 j = 0;
4223 offsets[j] = word_offset;
4224 my_command->argv[j++] = pg_strdup(word_buf.data);
4225 my_command->argc++;
4226
4227 /* ... and convert it to enum form */
4228 my_command->meta = getMetaCommand(my_command->argv[0]);
4229
4230 if (my_command->meta == META_SET ||
4231 my_command->meta == META_IF ||
4232 my_command->meta == META_ELIF)
4233 {
4234 yyscan_t yyscanner;
4235
4236 /* For \set, collect var name */
4237 if (my_command->meta == META_SET)
4238 {
4239 if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
4240 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
4241 "missing argument", NULL, -1);
4242
4243 offsets[j] = word_offset;
4244 my_command->argv[j++] = pg_strdup(word_buf.data);
4245 my_command->argc++;
4246 }
4247
4248 /* then for all parse the expression */
4249 yyscanner = expr_scanner_init(sstate, source, lineno, start_offset,
4250 my_command->argv[0]);
4251
4252 if (expr_yyparse(yyscanner) != 0)
4253 {
4254 /* dead code: exit done from syntax_error called by yyerror */
4255 exit(1);
4256 }
4257
4258 my_command->expr = expr_parse_result;
4259
4260 /* Save line, trimming any trailing newline */
4261 my_command->first_line =
4262 expr_scanner_get_substring(sstate,
4263 start_offset,
4264 expr_scanner_offset(sstate),
4265 true);
4266
4267 expr_scanner_finish(yyscanner);
4268
4269 termPQExpBuffer(&word_buf);
4270
4271 return my_command;
4272 }
4273
4274 /* For all other commands, collect remaining words. */
4275 while (expr_lex_one_word(sstate, &word_buf, &word_offset))
4276 {
4277 /*
4278 * my_command->argv[0] is the command itself, so the max number of
4279 * arguments is one less than MAX_ARGS
4280 */
4281 if (j >= MAX_ARGS)
4282 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
4283 "too many arguments", NULL, -1);
4284
4285 offsets[j] = word_offset;
4286 my_command->argv[j++] = pg_strdup(word_buf.data);
4287 my_command->argc++;
4288 }
4289
4290 /* Save line, trimming any trailing newline */
4291 my_command->first_line =
4292 expr_scanner_get_substring(sstate,
4293 start_offset,
4294 expr_scanner_offset(sstate),
4295 true);
4296
4297 if (my_command->meta == META_SLEEP)
4298 {
4299 if (my_command->argc < 2)
4300 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
4301 "missing argument", NULL, -1);
4302
4303 if (my_command->argc > 3)
4304 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
4305 "too many arguments", NULL,
4306 offsets[3] - start_offset);
4307
4308 /*
4309 * Split argument into number and unit to allow "sleep 1ms" etc. We
4310 * don't have to terminate the number argument with null because it
4311 * will be parsed with atoi, which ignores trailing non-digit
4312 * characters.
4313 */
4314 if (my_command->argc == 2 && my_command->argv[1][0] != ':')
4315 {
4316 char *c = my_command->argv[1];
4317
4318 while (isdigit((unsigned char) *c))
4319 c++;
4320 if (*c)
4321 {
4322 my_command->argv[2] = c;
4323 offsets[2] = offsets[1] + (c - my_command->argv[1]);
4324 my_command->argc = 3;
4325 }
4326 }
4327
4328 if (my_command->argc == 3)
4329 {
4330 if (pg_strcasecmp(my_command->argv[2], "us") != 0 &&
4331 pg_strcasecmp(my_command->argv[2], "ms") != 0 &&
4332 pg_strcasecmp(my_command->argv[2], "s") != 0)
4333 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
4334 "unrecognized time unit, must be us, ms or s",
4335 my_command->argv[2], offsets[2] - start_offset);
4336 }
4337 }
4338 else if (my_command->meta == META_SETSHELL)
4339 {
4340 if (my_command->argc < 3)
4341 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
4342 "missing argument", NULL, -1);
4343 }
4344 else if (my_command->meta == META_SHELL)
4345 {
4346 if (my_command->argc < 2)
4347 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
4348 "missing command", NULL, -1);
4349 }
4350 else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF)
4351 {
4352 if (my_command->argc != 1)
4353 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
4354 "unexpected argument", NULL, -1);
4355 }
4356 else if (my_command->meta == META_GSET)
4357 {
4358 if (my_command->argc > 2)
4359 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
4360 "too many arguments", NULL, -1);
4361 }
4362 else
4363 {
4364 /* my_command->meta == META_NONE */
4365 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
4366 "invalid command", NULL, -1);
4367 }
4368
4369 termPQExpBuffer(&word_buf);
4370
4371 return my_command;
4372}
4373
4374static void
4375ConditionError(const char *desc, int cmdn, const char *msg)
4376{
4377 fprintf(stderr,
4378 "condition error in script \"%s\" command %d: %s\n",
4379 desc, cmdn, msg);
4380 exit(1);
4381}
4382
4383/*
4384 * Partial evaluation of conditionals before recording and running the script.
4385 */
4386static void
4387CheckConditional(ParsedScript ps)
4388{
4389 /* statically check conditional structure */
4390 ConditionalStack cs = conditional_stack_create();
4391 int i;
4392
4393 for (i = 0; ps.commands[i] != NULL; i++)
4394 {
4395 Command *cmd = ps.commands[i];
4396
4397 if (cmd->type == META_COMMAND)
4398 {
4399 switch (cmd->meta)
4400 {
4401 case META_IF:
4402 conditional_stack_push(cs, IFSTATE_FALSE);
4403 break;
4404 case META_ELIF:
4405 if (conditional_stack_empty(cs))
4406 ConditionError(ps.desc, i + 1, "\\elif without matching \\if");
4407 if (conditional_stack_peek(cs) == IFSTATE_ELSE_FALSE)
4408 ConditionError(ps.desc, i + 1, "\\elif after \\else");
4409 break;
4410 case META_ELSE:
4411 if (conditional_stack_empty(cs))
4412 ConditionError(ps.desc, i + 1, "\\else without matching \\if");
4413 if (conditional_stack_peek(cs) == IFSTATE_ELSE_FALSE)
4414 ConditionError(ps.desc, i + 1, "\\else after \\else");
4415 conditional_stack_poke(cs, IFSTATE_ELSE_FALSE);
4416 break;
4417 case META_ENDIF:
4418 if (!conditional_stack_pop(cs))
4419 ConditionError(ps.desc, i + 1, "\\endif without matching \\if");
4420 break;
4421 default:
4422 /* ignore anything else... */
4423 break;
4424 }
4425 }
4426 }
4427 if (!conditional_stack_empty(cs))
4428 ConditionError(ps.desc, i + 1, "\\if without matching \\endif");
4429 conditional_stack_destroy(cs);
4430}
4431
4432/*
4433 * Parse a script (either the contents of a file, or a built-in script)
4434 * and add it to the list of scripts.
4435 */
4436static void
4437ParseScript(const char *script, const char *desc, int weight)
4438{
4439 ParsedScript ps;
4440 PsqlScanState sstate;
4441 PQExpBufferData line_buf;
4442 int alloc_num;
4443 int index;
4444 int lineno;
4445 int start_offset;
4446
4447#define COMMANDS_ALLOC_NUM 128
4448 alloc_num = COMMANDS_ALLOC_NUM;
4449
4450 /* Initialize all fields of ps */
4451 ps.desc = desc;
4452 ps.weight = weight;
4453 ps.commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
4454 initStats(&ps.stats, 0);
4455
4456 /* Prepare to parse script */
4457 sstate = psql_scan_create(&pgbench_callbacks);
4458
4459 /*
4460 * Ideally, we'd scan scripts using the encoding and stdstrings settings
4461 * we get from a DB connection. However, without major rearrangement of
4462 * pgbench's argument parsing, we can't have a DB connection at the time
4463 * we parse scripts. Using SQL_ASCII (encoding 0) should work well enough
4464 * with any backend-safe encoding, though conceivably we could be fooled
4465 * if a script file uses a client-only encoding. We also assume that
4466 * stdstrings should be true, which is a bit riskier.
4467 */
4468 psql_scan_setup(sstate, script, strlen(script), 0, true);
4469 start_offset = expr_scanner_offset(sstate) - 1;
4470
4471 initPQExpBuffer(&line_buf);
4472
4473 index = 0;
4474
4475 for (;;)
4476 {
4477 PsqlScanResult sr;
4478 promptStatus_t prompt;
4479 Command *command = NULL;
4480
4481 resetPQExpBuffer(&line_buf);
4482 lineno = expr_scanner_get_lineno(sstate, start_offset);
4483
4484 sr = psql_scan(sstate, &line_buf, &prompt);
4485
4486 /* If we collected a new SQL command, process that */
4487 command = create_sql_command(&line_buf, desc);
4488
4489 /* store new command */
4490 if (command)
4491 ps.commands[index++] = command;
4492
4493 /* If we reached a backslash, process that */
4494 if (sr == PSCAN_BACKSLASH)
4495 {
4496 command = process_backslash_command(sstate, desc);
4497
4498 if (command)
4499 {
4500 /*
4501 * If this is gset, merge into the preceding command. (We
4502 * don't use a command slot in this case).
4503 */
4504 if (command->meta == META_GSET)
4505 {
4506 Command *cmd;
4507
4508 if (index == 0)
4509 syntax_error(desc, lineno, NULL, NULL,
4510 "\\gset must follow a SQL command",
4511 NULL, -1);
4512
4513 cmd = ps.commands[index - 1];
4514
4515 if (cmd->type != SQL_COMMAND ||
4516 cmd->varprefix != NULL)
4517 syntax_error(desc, lineno, NULL, NULL,
4518 "\\gset must follow a SQL command",
4519 cmd->first_line, -1);
4520
4521 /* get variable prefix */
4522 if (command->argc <= 1 || command->argv[1][0] == '\0')
4523 cmd->varprefix = pg_strdup("");
4524 else
4525 cmd->varprefix = pg_strdup(command->argv[1]);
4526
4527 /* cleanup unused command */
4528 free_command(command);
4529
4530 continue;
4531 }
4532
4533 /* Attach any other backslash command as a new command */
4534 ps.commands[index++] = command;
4535 }
4536 }
4537
4538 /*
4539 * Since we used a command slot, allocate more if needed. Note we
4540 * always allocate one more in order to accommodate the NULL
4541 * terminator below.
4542 */
4543 if (index >= alloc_num)
4544 {
4545 alloc_num += COMMANDS_ALLOC_NUM;
4546 ps.commands = (Command **)
4547 pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
4548 }
4549
4550 /* Done if we reached EOF */
4551 if (sr == PSCAN_INCOMPLETE || sr == PSCAN_EOL)
4552 break;
4553 }
4554
4555 ps.commands[index] = NULL;
4556
4557 addScript(ps);
4558
4559 termPQExpBuffer(&line_buf);
4560 psql_scan_finish(sstate);
4561 psql_scan_destroy(sstate);
4562}
4563
4564/*
4565 * Read the entire contents of file fd, and return it in a malloc'd buffer.
4566 *
4567 * The buffer will typically be larger than necessary, but we don't care
4568 * in this program, because we'll free it as soon as we've parsed the script.
4569 */
4570static char *
4571read_file_contents(FILE *fd)
4572{
4573 char *buf;
4574 size_t buflen = BUFSIZ;
4575 size_t used = 0;
4576
4577 buf = (char *) pg_malloc(buflen);
4578
4579 for (;;)
4580 {
4581 size_t nread;
4582
4583 nread = fread(buf + used, 1, BUFSIZ, fd);
4584 used += nread;
4585 /* If fread() read less than requested, must be EOF or error */
4586 if (nread < BUFSIZ)
4587 break;
4588 /* Enlarge buf so we can read some more */
4589 buflen += BUFSIZ;
4590 buf = (char *) pg_realloc(buf, buflen);
4591 }
4592 /* There is surely room for a terminator */
4593 buf[used] = '\0';
4594
4595 return buf;
4596}
4597
4598/*
4599 * Given a file name, read it and add its script to the list.
4600 * "-" means to read stdin.
4601 * NB: filename must be storage that won't disappear.
4602 */
4603static void
4604process_file(const char *filename, int weight)
4605{
4606 FILE *fd;
4607 char *buf;
4608
4609 /* Slurp the file contents into "buf" */
4610 if (strcmp(filename, "-") == 0)
4611 fd = stdin;
4612 else if ((fd = fopen(filename, "r")) == NULL)
4613 {
4614 fprintf(stderr, "could not open file \"%s\": %s\n",
4615 filename, strerror(errno));
4616 exit(1);
4617 }
4618
4619 buf = read_file_contents(fd);
4620
4621 if (ferror(fd))
4622 {
4623 fprintf(stderr, "could not read file \"%s\": %s\n",
4624 filename, strerror(errno));
4625 exit(1);
4626 }
4627
4628 if (fd != stdin)
4629 fclose(fd);
4630
4631 ParseScript(buf, filename, weight);
4632
4633 free(buf);
4634}
4635
4636/* Parse the given builtin script and add it to the list. */
4637static void
4638process_builtin(const BuiltinScript *bi, int weight)
4639{
4640 ParseScript(bi->script, bi->desc, weight);
4641}
4642
4643/* show available builtin scripts */
4644static void
4645listAvailableScripts(void)
4646{
4647 int i;
4648
4649 fprintf(stderr, "Available builtin scripts:\n");
4650 for (i = 0; i < lengthof(builtin_script); i++)
4651 fprintf(stderr, "\t%s\n", builtin_script[i].name);
4652 fprintf(stderr, "\n");
4653}
4654
4655/* return builtin script "name" if unambiguous, fails if not found */
4656static const BuiltinScript *
4657findBuiltin(const char *name)
4658{
4659 int i,
4660 found = 0,
4661 len = strlen(name);
4662 const BuiltinScript *result = NULL;
4663
4664 for (i = 0; i < lengthof(builtin_script); i++)
4665 {
4666 if (strncmp(builtin_script[i].name, name, len) == 0)
4667 {
4668 result = &builtin_script[i];
4669 found++;
4670 }
4671 }
4672
4673 /* ok, unambiguous result */
4674 if (found == 1)
4675 return result;
4676
4677 /* error cases */
4678 if (found == 0)
4679 fprintf(stderr, "no builtin script found for name \"%s\"\n", name);
4680 else /* found > 1 */
4681 fprintf(stderr,
4682 "ambiguous builtin name: %d builtin scripts found for prefix \"%s\"\n", found, name);
4683
4684 listAvailableScripts();
4685 exit(1);
4686}
4687
4688/*
4689 * Determine the weight specification from a script option (-b, -f), if any,
4690 * and return it as an integer (1 is returned if there's no weight). The
4691 * script name is returned in *script as a malloc'd string.
4692 */
4693static int
4694parseScriptWeight(const char *option, char **script)
4695{
4696 char *sep;
4697 int weight;
4698
4699 if ((sep = strrchr(option, WSEP)))
4700 {
4701 int namelen = sep - option;
4702 long wtmp;
4703 char *badp;
4704
4705 /* generate the script name */
4706 *script = pg_malloc(namelen + 1);
4707 strncpy(*script, option, namelen);
4708 (*script)[namelen] = '\0';
4709
4710 /* process digits of the weight spec */
4711 errno = 0;
4712 wtmp = strtol(sep + 1, &badp, 10);
4713 if (errno != 0 || badp == sep + 1 || *badp != '\0')
4714 {
4715 fprintf(stderr, "invalid weight specification: %s\n", sep);
4716 exit(1);
4717 }
4718 if (wtmp > INT_MAX || wtmp < 0)
4719 {
4720 fprintf(stderr,
4721 "weight specification out of range (0 .. %u): " INT64_FORMAT "\n",
4722 INT_MAX, (int64) wtmp);
4723 exit(1);
4724 }
4725 weight = wtmp;
4726 }
4727 else
4728 {
4729 *script = pg_strdup(option);
4730 weight = 1;
4731 }
4732
4733 return weight;
4734}
4735
4736/* append a script to the list of scripts to process */
4737static void
4738addScript(ParsedScript script)
4739{
4740 if (script.commands == NULL || script.commands[0] == NULL)
4741 {
4742 fprintf(stderr, "empty command list for script \"%s\"\n", script.desc);
4743 exit(1);
4744 }
4745
4746 if (num_scripts >= MAX_SCRIPTS)
4747 {
4748 fprintf(stderr, "at most %d SQL scripts are allowed\n", MAX_SCRIPTS);
4749 exit(1);
4750 }
4751
4752 CheckConditional(script);
4753
4754 sql_script[num_scripts] = script;
4755 num_scripts++;
4756}
4757
4758/*
4759 * Print progress report.
4760 *
4761 * On entry, *last and *last_report contain the statistics and time of last
4762 * progress report. On exit, they are updated with the new stats.
4763 */
4764static void
4765printProgressReport(TState *threads, int64 test_start, int64 now,
4766 StatsData *last, int64 *last_report)
4767{
4768 /* generate and show report */
4769 int64 run = now - *last_report,
4770 ntx;
4771 double tps,
4772 total_run,
4773 latency,
4774 sqlat,
4775 lag,
4776 stdev;
4777 char tbuf[315];
4778 StatsData cur;
4779
4780 /*
4781 * Add up the statistics of all threads.
4782 *
4783 * XXX: No locking. There is no guarantee that we get an atomic snapshot
4784 * of the transaction count and latencies, so these figures can well be
4785 * off by a small amount. The progress report's purpose is to give a
4786 * quick overview of how the test is going, so that shouldn't matter too
4787 * much. (If a read from a 64-bit integer is not atomic, you might get a
4788 * "torn" read and completely bogus latencies though!)
4789 */
4790 initStats(&cur, 0);
4791 for (int i = 0; i < nthreads; i++)
4792 {
4793 mergeSimpleStats(&cur.latency, &threads[i].stats.latency);
4794 mergeSimpleStats(&cur.lag, &threads[i].stats.lag);
4795 cur.cnt += threads[i].stats.cnt;
4796 cur.skipped += threads[i].stats.skipped;
4797 }
4798
4799 /* we count only actually executed transactions */
4800 ntx = (cur.cnt - cur.skipped) - (last->cnt - last->skipped);
4801 total_run = (now - test_start) / 1000000.0;
4802 tps = 1000000.0 * ntx / run;
4803 if (ntx > 0)
4804 {
4805 latency = 0.001 * (cur.latency.sum - last->latency.sum) / ntx;
4806 sqlat = 1.0 * (cur.latency.sum2 - last->latency.sum2) / ntx;
4807 stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
4808 lag = 0.001 * (cur.lag.sum - last->lag.sum) / ntx;
4809 }
4810 else
4811 {
4812 latency = sqlat = stdev = lag = 0;
4813 }
4814
4815 if (progress_timestamp)
4816 {
4817 /*
4818 * On some platforms the current system timestamp is available in
4819 * now_time, but rather than get entangled with that, we just eat the
4820 * cost of an extra syscall in all cases.
4821 */
4822 struct timeval tv;
4823
4824 gettimeofday(&tv, NULL);
4825 snprintf(tbuf, sizeof(tbuf), "%ld.%03ld s",
4826 (long) tv.tv_sec, (long) (tv.tv_usec / 1000));
4827 }
4828 else
4829 {
4830 /* round seconds are expected, but the thread may be late */
4831 snprintf(tbuf, sizeof(tbuf), "%.1f s", total_run);
4832 }
4833
4834 fprintf(stderr,
4835 "progress: %s, %.1f tps, lat %.3f ms stddev %.3f",
4836 tbuf, tps, latency, stdev);
4837
4838 if (throttle_delay)
4839 {
4840 fprintf(stderr, ", lag %.3f ms", lag);
4841 if (latency_limit)
4842 fprintf(stderr, ", " INT64_FORMAT " skipped",
4843 cur.skipped - last->skipped);
4844 }
4845 fprintf(stderr, "\n");
4846
4847 *last = cur;
4848 *last_report = now;
4849}
4850
4851static void
4852printSimpleStats(const char *prefix, SimpleStats *ss)
4853{
4854 if (ss->count > 0)
4855 {
4856 double latency = ss->sum / ss->count;
4857 double stddev = sqrt(ss->sum2 / ss->count - latency * latency);
4858
4859 printf("%s average = %.3f ms\n", prefix, 0.001 * latency);
4860 printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev);
4861 }
4862}
4863
4864/* print out results */
4865static void
4866printResults(StatsData *total, instr_time total_time,
4867 instr_time conn_total_time, int64 latency_late)
4868{
4869 double time_include,
4870 tps_include,
4871 tps_exclude;
4872 int64 ntx = total->cnt - total->skipped;
4873
4874 time_include = INSTR_TIME_GET_DOUBLE(total_time);
4875
4876 /* tps is about actually executed transactions */
4877 tps_include = ntx / time_include;
4878 tps_exclude = ntx /
4879 (time_include - (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
4880
4881 /* Report test parameters. */
4882 printf("transaction type: %s\n",
4883 num_scripts == 1 ? sql_script[0].desc : "multiple scripts");
4884 printf("scaling factor: %d\n", scale);
4885 printf("query mode: %s\n", QUERYMODE[querymode]);
4886 printf("number of clients: %d\n", nclients);
4887 printf("number of threads: %d\n", nthreads);
4888 if (duration <= 0)
4889 {
4890 printf("number of transactions per client: %d\n", nxacts);
4891 printf("number of transactions actually processed: " INT64_FORMAT "/%d\n",
4892 ntx, nxacts * nclients);
4893 }
4894 else
4895 {
4896 printf("duration: %d s\n", duration);
4897 printf("number of transactions actually processed: " INT64_FORMAT "\n",
4898 ntx);
4899 }
4900
4901 /* Remaining stats are nonsensical if we failed to execute any xacts */
4902 if (total->cnt <= 0)
4903 return;
4904
4905 if (throttle_delay && latency_limit)
4906 printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
4907 total->skipped,
4908 100.0 * total->skipped / total->cnt);
4909
4910 if (latency_limit)
4911 printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT "/" INT64_FORMAT " (%.3f %%)\n",
4912 latency_limit / 1000.0, latency_late, ntx,
4913 (ntx > 0) ? 100.0 * latency_late / ntx : 0.0);
4914
4915 if (throttle_delay || progress || latency_limit)
4916 printSimpleStats("latency", &total->latency);
4917 else
4918 {
4919 /* no measurement, show average latency computed from run time */
4920 printf("latency average = %.3f ms\n",
4921 1000.0 * time_include * nclients / total->cnt);
4922 }
4923
4924 if (throttle_delay)
4925 {
4926 /*
4927 * Report average transaction lag under rate limit throttling. This
4928 * is the delay between scheduled and actual start times for the
4929 * transaction. The measured lag may be caused by thread/client load,
4930 * the database load, or the Poisson throttling process.
4931 */
4932 printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
4933 0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max);
4934 }
4935
4936 printf("tps = %f (including connections establishing)\n", tps_include);
4937 printf("tps = %f (excluding connections establishing)\n", tps_exclude);
4938
4939 /* Report per-script/command statistics */
4940 if (per_script_stats || report_per_command)
4941 {
4942 int i;
4943
4944 for (i = 0; i < num_scripts; i++)
4945 {
4946 if (per_script_stats)
4947 {
4948 StatsData *sstats = &sql_script[i].stats;
4949
4950 printf("SQL script %d: %s\n"
4951 " - weight: %d (targets %.1f%% of total)\n"
4952 " - " INT64_FORMAT " transactions (%.1f%% of total, tps = %f)\n",
4953 i + 1, sql_script[i].desc,
4954 sql_script[i].weight,
4955 100.0 * sql_script[i].weight / total_weight,
4956 sstats->cnt,
4957 100.0 * sstats->cnt / total->cnt,
4958 (sstats->cnt - sstats->skipped) / time_include);
4959
4960 if (throttle_delay && latency_limit && sstats->cnt > 0)
4961 printf(" - number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n",
4962 sstats->skipped,
4963 100.0 * sstats->skipped / sstats->cnt);
4964
4965 printSimpleStats(" - latency", &sstats->latency);
4966 }
4967
4968 /* Report per-command latencies */
4969 if (report_per_command)
4970 {
4971 Command **commands;
4972
4973 if (per_script_stats)
4974 printf(" - statement latencies in milliseconds:\n");
4975 else
4976 printf("statement latencies in milliseconds:\n");
4977
4978 for (commands = sql_script[i].commands;
4979 *commands != NULL;
4980 commands++)
4981 {
4982 SimpleStats *cstats = &(*commands)->stats;
4983
4984 printf(" %11.3f %s\n",
4985 (cstats->count > 0) ?
4986 1000.0 * cstats->sum / cstats->count : 0.0,
4987 (*commands)->first_line);
4988 }
4989 }
4990 }
4991 }
4992}
4993
4994/*
4995 * Set up a random seed according to seed parameter (NULL means default),
4996 * and initialize base_random_sequence for use in initializing other sequences.
4997 */
4998static bool
4999set_random_seed(const char *seed)
5000{
5001 uint64 iseed;
5002
5003 if (seed == NULL || strcmp(seed, "time") == 0)
5004 {
5005 /* rely on current time */
5006 instr_time now;
5007
5008 INSTR_TIME_SET_CURRENT(now);
5009 iseed = (uint64) INSTR_TIME_GET_MICROSEC(now);
5010 }
5011 else if (strcmp(seed, "rand") == 0)
5012 {
5013 /* use some "strong" random source */
5014 if (!pg_strong_random(&iseed, sizeof(iseed)))
5015 {
5016 fprintf(stderr, "could not generate random seed.\n");
5017 return false;
5018 }
5019 }
5020 else
5021 {
5022 /* parse unsigned-int seed value */
5023 unsigned long ulseed;
5024 char garbage;
5025
5026 /* Don't try to use UINT64_FORMAT here; it might not work for sscanf */
5027 if (sscanf(seed, "%lu%c", &ulseed, &garbage) != 1)
5028 {
5029 fprintf(stderr,
5030 "unrecognized random seed option \"%s\": expecting an unsigned integer, \"time\" or \"rand\"\n",
5031 seed);
5032 return false;
5033 }
5034 iseed = (uint64) ulseed;
5035 }
5036
5037 if (seed != NULL)
5038 fprintf(stderr, "setting random seed to " UINT64_FORMAT "\n", iseed);
5039 random_seed = iseed;
5040
5041 /* Fill base_random_sequence with low-order bits of seed */
5042 base_random_sequence.xseed[0] = iseed & 0xFFFF;
5043 base_random_sequence.xseed[1] = (iseed >> 16) & 0xFFFF;
5044 base_random_sequence.xseed[2] = (iseed >> 32) & 0xFFFF;
5045
5046 return true;
5047}
5048
5049int
5050main(int argc, char **argv)
5051{
5052 static struct option long_options[] = {
5053 /* systematic long/short named options */
5054 {"builtin", required_argument, NULL, 'b'},
5055 {"client", required_argument, NULL, 'c'},
5056 {"connect", no_argument, NULL, 'C'},
5057 {"debug", no_argument, NULL, 'd'},
5058 {"define", required_argument, NULL, 'D'},
5059 {"file", required_argument, NULL, 'f'},
5060 {"fillfactor", required_argument, NULL, 'F'},
5061 {"host", required_argument, NULL, 'h'},
5062 {"initialize", no_argument, NULL, 'i'},
5063 {"init-steps", required_argument, NULL, 'I'},
5064 {"jobs", required_argument, NULL, 'j'},
5065 {"log", no_argument, NULL, 'l'},
5066 {"latency-limit", required_argument, NULL, 'L'},
5067 {"no-vacuum", no_argument, NULL, 'n'},
5068 {"port", required_argument, NULL, 'p'},
5069 {"progress", required_argument, NULL, 'P'},
5070 {"protocol", required_argument, NULL, 'M'},
5071 {"quiet", no_argument, NULL, 'q'},
5072 {"report-latencies", no_argument, NULL, 'r'},
5073 {"rate", required_argument, NULL, 'R'},
5074 {"scale", required_argument, NULL, 's'},
5075 {"select-only", no_argument, NULL, 'S'},
5076 {"skip-some-updates", no_argument, NULL, 'N'},
5077 {"time", required_argument, NULL, 'T'},
5078 {"transactions", required_argument, NULL, 't'},
5079 {"username", required_argument, NULL, 'U'},
5080 {"vacuum-all", no_argument, NULL, 'v'},
5081 /* long-named only options */
5082 {"unlogged-tables", no_argument, NULL, 1},
5083 {"tablespace", required_argument, NULL, 2},
5084 {"index-tablespace", required_argument, NULL, 3},
5085 {"sampling-rate", required_argument, NULL, 4},
5086 {"aggregate-interval", required_argument, NULL, 5},
5087 {"progress-timestamp", no_argument, NULL, 6},
5088 {"log-prefix", required_argument, NULL, 7},
5089 {"foreign-keys", no_argument, NULL, 8},
5090 {"random-seed", required_argument, NULL, 9},
5091 {NULL, 0, NULL, 0}
5092 };
5093
5094 int c;
5095 bool is_init_mode = false; /* initialize mode? */
5096 char *initialize_steps = NULL;
5097 bool foreign_keys = false;
5098 bool is_no_vacuum = false;
5099 bool do_vacuum_accounts = false; /* vacuum accounts table? */
5100 int optindex;
5101 bool scale_given = false;
5102
5103 bool benchmarking_option_set = false;
5104 bool initialization_option_set = false;
5105 bool internal_script_used = false;
5106
5107 CState *state; /* status of clients */
5108 TState *threads; /* array of thread */
5109
5110 instr_time start_time; /* start up time */
5111 instr_time total_time;
5112 instr_time conn_total_time;
5113 int64 latency_late = 0;
5114 StatsData stats;
5115 int weight;
5116
5117 int i;
5118 int nclients_dealt;
5119
5120#ifdef HAVE_GETRLIMIT
5121 struct rlimit rlim;
5122#endif
5123
5124 PGconn *con;
5125 PGresult *res;
5126 char *env;
5127
5128 int exit_code = 0;
5129
5130 pg_logging_init(argv[0]);
5131 progname = get_progname(argv[0]);
5132
5133 if (argc > 1)
5134 {
5135 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
5136 {
5137 usage();
5138 exit(0);
5139 }
5140 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
5141 {
5142 puts("pgbench (PostgreSQL) " PG_VERSION);
5143 exit(0);
5144 }
5145 }
5146
5147 if ((env = getenv("PGHOST")) != NULL && *env != '\0')
5148 pghost = env;
5149 if ((env = getenv("PGPORT")) != NULL && *env != '\0')
5150 pgport = env;
5151 else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
5152 login = env;
5153
5154 state = (CState *) pg_malloc0(sizeof(CState));
5155
5156 /* set random seed early, because it may be used while parsing scripts. */
5157 if (!set_random_seed(getenv("PGBENCH_RANDOM_SEED")))
5158 {
5159 fprintf(stderr, "error while setting random seed from PGBENCH_RANDOM_SEED environment variable\n");
5160 exit(1);
5161 }
5162
5163 while ((c = getopt_long(argc, argv, "iI:h:nvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
5164 {
5165 char *script;
5166
5167 switch (c)
5168 {
5169 case 'i':
5170 is_init_mode = true;
5171 break;
5172 case 'I':
5173 if (initialize_steps)
5174 pg_free(initialize_steps);
5175 initialize_steps = pg_strdup(optarg);
5176 checkInitSteps(initialize_steps);
5177 initialization_option_set = true;
5178 break;
5179 case 'h':
5180 pghost = pg_strdup(optarg);
5181 break;
5182 case 'n':
5183 is_no_vacuum = true;
5184 break;
5185 case 'v':
5186 benchmarking_option_set = true;
5187 do_vacuum_accounts = true;
5188 break;
5189 case 'p':
5190 pgport = pg_strdup(optarg);
5191 break;
5192 case 'd':
5193 debug++;
5194 break;
5195 case 'c':
5196 benchmarking_option_set = true;
5197 nclients = atoi(optarg);
5198 if (nclients <= 0)
5199 {
5200 fprintf(stderr, "invalid number of clients: \"%s\"\n",
5201 optarg);
5202 exit(1);
5203 }
5204#ifdef HAVE_GETRLIMIT
5205#ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */
5206 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
5207#else /* but BSD doesn't ... */
5208 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
5209#endif /* RLIMIT_NOFILE */
5210 {
5211 fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
5212 exit(1);
5213 }
5214 if (rlim.rlim_cur < nclients + 3)
5215 {
5216 fprintf(stderr, "need at least %d open files, but system limit is %ld\n",
5217 nclients + 3, (long) rlim.rlim_cur);
5218 fprintf(stderr, "Reduce number of clients, or use limit/ulimit to increase the system limit.\n");
5219 exit(1);
5220 }
5221#endif /* HAVE_GETRLIMIT */
5222 break;
5223 case 'j': /* jobs */
5224 benchmarking_option_set = true;
5225 nthreads = atoi(optarg);
5226 if (nthreads <= 0)
5227 {
5228 fprintf(stderr, "invalid number of threads: \"%s\"\n",
5229 optarg);
5230 exit(1);
5231 }
5232#ifndef ENABLE_THREAD_SAFETY
5233 if (nthreads != 1)
5234 {
5235 fprintf(stderr, "threads are not supported on this platform; use -j1\n");
5236 exit(1);
5237 }
5238#endif /* !ENABLE_THREAD_SAFETY */
5239 break;
5240 case 'C':
5241 benchmarking_option_set = true;
5242 is_connect = true;
5243 break;
5244 case 'r':
5245 benchmarking_option_set = true;
5246 report_per_command = true;
5247 break;
5248 case 's':
5249 scale_given = true;
5250 scale = atoi(optarg);
5251 if (scale <= 0)
5252 {
5253 fprintf(stderr, "invalid scaling factor: \"%s\"\n", optarg);
5254 exit(1);
5255 }
5256 break;
5257 case 't':
5258 benchmarking_option_set = true;
5259 nxacts = atoi(optarg);
5260 if (nxacts <= 0)
5261 {
5262 fprintf(stderr, "invalid number of transactions: \"%s\"\n",
5263 optarg);
5264 exit(1);
5265 }
5266 break;
5267 case 'T':
5268 benchmarking_option_set = true;
5269 duration = atoi(optarg);
5270 if (duration <= 0)
5271 {
5272 fprintf(stderr, "invalid duration: \"%s\"\n", optarg);
5273 exit(1);
5274 }
5275 break;
5276 case 'U':
5277 login = pg_strdup(optarg);
5278 break;
5279 case 'l':
5280 benchmarking_option_set = true;
5281 use_log = true;
5282 break;
5283 case 'q':
5284 initialization_option_set = true;
5285 use_quiet = true;
5286 break;
5287 case 'b':
5288 if (strcmp(optarg, "list") == 0)
5289 {
5290 listAvailableScripts();
5291 exit(0);
5292 }
5293 weight = parseScriptWeight(optarg, &script);
5294 process_builtin(findBuiltin(script), weight);
5295 benchmarking_option_set = true;
5296 internal_script_used = true;
5297 break;
5298 case 'S':
5299 process_builtin(findBuiltin("select-only"), 1);
5300 benchmarking_option_set = true;
5301 internal_script_used = true;
5302 break;
5303 case 'N':
5304 process_builtin(findBuiltin("simple-update"), 1);
5305 benchmarking_option_set = true;
5306 internal_script_used = true;
5307 break;
5308 case 'f':
5309 weight = parseScriptWeight(optarg, &script);
5310 process_file(script, weight);
5311 benchmarking_option_set = true;
5312 break;
5313 case 'D':
5314 {
5315 char *p;
5316
5317 benchmarking_option_set = true;
5318
5319 if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
5320 {
5321 fprintf(stderr, "invalid variable definition: \"%s\"\n",
5322 optarg);
5323 exit(1);
5324 }
5325
5326 *p++ = '\0';
5327 if (!putVariable(&state[0], "option", optarg, p))
5328 exit(1);
5329 }
5330 break;
5331 case 'F':
5332 initialization_option_set = true;
5333 fillfactor = atoi(optarg);
5334 if (fillfactor < 10 || fillfactor > 100)
5335 {
5336 fprintf(stderr, "invalid fillfactor: \"%s\"\n", optarg);
5337 exit(1);
5338 }
5339 break;
5340 case 'M':
5341 benchmarking_option_set = true;
5342 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
5343 if (strcmp(optarg, QUERYMODE[querymode]) == 0)
5344 break;
5345 if (querymode >= NUM_QUERYMODE)
5346 {
5347 fprintf(stderr, "invalid query mode (-M): \"%s\"\n",
5348 optarg);
5349 exit(1);
5350 }
5351 break;
5352 case 'P':
5353 benchmarking_option_set = true;
5354 progress = atoi(optarg);
5355 if (progress <= 0)
5356 {
5357 fprintf(stderr, "invalid thread progress delay: \"%s\"\n",
5358 optarg);
5359 exit(1);
5360 }
5361 break;
5362 case 'R':
5363 {
5364 /* get a double from the beginning of option value */
5365 double throttle_value = atof(optarg);
5366
5367 benchmarking_option_set = true;
5368
5369 if (throttle_value <= 0.0)
5370 {
5371 fprintf(stderr, "invalid rate limit: \"%s\"\n", optarg);
5372 exit(1);
5373 }
5374 /* Invert rate limit into per-transaction delay in usec */
5375 throttle_delay = 1000000.0 / throttle_value;
5376 }
5377 break;
5378 case 'L':
5379 {
5380 double limit_ms = atof(optarg);
5381
5382 if (limit_ms <= 0.0)
5383 {
5384 fprintf(stderr, "invalid latency limit: \"%s\"\n",
5385 optarg);
5386 exit(1);
5387 }
5388 benchmarking_option_set = true;
5389 latency_limit = (int64) (limit_ms * 1000);
5390 }
5391 break;
5392 case 1: /* unlogged-tables */
5393 initialization_option_set = true;
5394 unlogged_tables = true;
5395 break;
5396 case 2: /* tablespace */
5397 initialization_option_set = true;
5398 tablespace = pg_strdup(optarg);
5399 break;
5400 case 3: /* index-tablespace */
5401 initialization_option_set = true;
5402 index_tablespace = pg_strdup(optarg);
5403 break;
5404 case 4: /* sampling-rate */
5405 benchmarking_option_set = true;
5406 sample_rate = atof(optarg);
5407 if (sample_rate <= 0.0 || sample_rate > 1.0)
5408 {
5409 fprintf(stderr, "invalid sampling rate: \"%s\"\n", optarg);
5410 exit(1);
5411 }
5412 break;
5413 case 5: /* aggregate-interval */
5414 benchmarking_option_set = true;
5415 agg_interval = atoi(optarg);
5416 if (agg_interval <= 0)
5417 {
5418 fprintf(stderr, "invalid number of seconds for aggregation: \"%s\"\n",
5419 optarg);
5420 exit(1);
5421 }
5422 break;
5423 case 6: /* progress-timestamp */
5424 progress_timestamp = true;
5425 benchmarking_option_set = true;
5426 break;
5427 case 7: /* log-prefix */
5428 benchmarking_option_set = true;
5429 logfile_prefix = pg_strdup(optarg);
5430 break;
5431 case 8: /* foreign-keys */
5432 initialization_option_set = true;
5433 foreign_keys = true;
5434 break;
5435 case 9: /* random-seed */
5436 benchmarking_option_set = true;
5437 if (!set_random_seed(optarg))
5438 {
5439 fprintf(stderr, "error while setting random seed from --random-seed option\n");
5440 exit(1);
5441 }
5442 break;
5443 default:
5444 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
5445 exit(1);
5446 break;
5447 }
5448 }
5449
5450 /* set default script if none */
5451 if (num_scripts == 0 && !is_init_mode)
5452 {
5453 process_builtin(findBuiltin("tpcb-like"), 1);
5454 benchmarking_option_set = true;
5455 internal_script_used = true;
5456 }
5457
5458 /* complete SQL command initialization and compute total weight */
5459 for (i = 0; i < num_scripts; i++)
5460 {
5461 Command **commands = sql_script[i].commands;
5462
5463 for (int j = 0; commands[j] != NULL; j++)
5464 if (commands[j]->type == SQL_COMMAND)
5465 postprocess_sql_command(commands[j]);
5466
5467 /* cannot overflow: weight is 32b, total_weight 64b */
5468 total_weight += sql_script[i].weight;
5469 }
5470
5471 if (total_weight == 0 && !is_init_mode)
5472 {
5473 fprintf(stderr, "total script weight must not be zero\n");
5474 exit(1);
5475 }
5476
5477 /* show per script stats if several scripts are used */
5478 if (num_scripts > 1)
5479 per_script_stats = true;
5480
5481 /*
5482 * Don't need more threads than there are clients. (This is not merely an
5483 * optimization; throttle_delay is calculated incorrectly below if some
5484 * threads have no clients assigned to them.)
5485 */
5486 if (nthreads > nclients)
5487 nthreads = nclients;
5488
5489 /*
5490 * Convert throttle_delay to a per-thread delay time. Note that this
5491 * might be a fractional number of usec, but that's OK, since it's just
5492 * the center of a Poisson distribution of delays.
5493 */
5494 throttle_delay *= nthreads;
5495
5496 if (argc > optind)
5497 dbName = argv[optind];
5498 else
5499 {
5500 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
5501 dbName = env;
5502 else if (login != NULL && *login != '\0')
5503 dbName = login;
5504 else
5505 dbName = "";
5506 }
5507
5508 if (is_init_mode)
5509 {
5510 if (benchmarking_option_set)
5511 {
5512 fprintf(stderr, "some of the specified options cannot be used in initialization (-i) mode\n");
5513 exit(1);
5514 }
5515
5516 if (initialize_steps == NULL)
5517 initialize_steps = pg_strdup(DEFAULT_INIT_STEPS);
5518
5519 if (is_no_vacuum)
5520 {
5521 /* Remove any vacuum step in initialize_steps */
5522 char *p;
5523
5524 while ((p = strchr(initialize_steps, 'v')) != NULL)
5525 *p = ' ';
5526 }
5527
5528 if (foreign_keys)
5529 {
5530 /* Add 'f' to end of initialize_steps, if not already there */
5531 if (strchr(initialize_steps, 'f') == NULL)
5532 {
5533 initialize_steps = (char *)
5534 pg_realloc(initialize_steps,
5535 strlen(initialize_steps) + 2);
5536 strcat(initialize_steps, "f");
5537 }
5538 }
5539
5540 runInitSteps(initialize_steps);
5541 exit(0);
5542 }
5543 else
5544 {
5545 if (initialization_option_set)
5546 {
5547 fprintf(stderr, "some of the specified options cannot be used in benchmarking mode\n");
5548 exit(1);
5549 }
5550 }
5551
5552 if (nxacts > 0 && duration > 0)
5553 {
5554 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
5555 exit(1);
5556 }
5557
5558 /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
5559 if (nxacts <= 0 && duration <= 0)
5560 nxacts = DEFAULT_NXACTS;
5561
5562 /* --sampling-rate may be used only with -l */
5563 if (sample_rate > 0.0 && !use_log)
5564 {
5565 fprintf(stderr, "log sampling (--sampling-rate) is allowed only when logging transactions (-l)\n");
5566 exit(1);
5567 }
5568
5569 /* --sampling-rate may not be used with --aggregate-interval */
5570 if (sample_rate > 0.0 && agg_interval > 0)
5571 {
5572 fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) cannot be used at the same time\n");
5573 exit(1);
5574 }
5575
5576 if (agg_interval > 0 && !use_log)
5577 {
5578 fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
5579 exit(1);
5580 }
5581
5582 if (!use_log && logfile_prefix)
5583 {
5584 fprintf(stderr, "log file prefix (--log-prefix) is allowed only when logging transactions (-l)\n");
5585 exit(1);
5586 }
5587
5588 if (duration > 0 && agg_interval > duration)
5589 {
5590 fprintf(stderr, "number of seconds for aggregation (%d) must not be higher than test duration (%d)\n", agg_interval, duration);
5591 exit(1);
5592 }
5593
5594 if (duration > 0 && agg_interval > 0 && duration % agg_interval != 0)
5595 {
5596 fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
5597 exit(1);
5598 }
5599
5600 if (progress_timestamp && progress == 0)
5601 {
5602 fprintf(stderr, "--progress-timestamp is allowed only under --progress\n");
5603 exit(1);
5604 }
5605
5606 /*
5607 * save main process id in the global variable because process id will be
5608 * changed after fork.
5609 */
5610 main_pid = (int) getpid();
5611
5612 if (nclients > 1)
5613 {
5614 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
5615 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
5616
5617 /* copy any -D switch values to all clients */
5618 for (i = 1; i < nclients; i++)
5619 {
5620 int j;
5621
5622 state[i].id = i;
5623 for (j = 0; j < state[0].nvariables; j++)
5624 {
5625 Variable *var = &state[0].variables[j];
5626
5627 if (var->value.type != PGBT_NO_VALUE)
5628 {
5629 if (!putVariableValue(&state[i], "startup",
5630 var->name, &var->value))
5631 exit(1);
5632 }
5633 else
5634 {
5635 if (!putVariable(&state[i], "startup",
5636 var->name, var->svalue))
5637 exit(1);
5638 }
5639 }
5640 }
5641 }
5642
5643 /* other CState initializations */
5644 for (i = 0; i < nclients; i++)
5645 {
5646 state[i].cstack = conditional_stack_create();
5647 initRandomState(&state[i].cs_func_rs);
5648 }
5649
5650 if (debug)
5651 {
5652 if (duration <= 0)
5653 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
5654 pghost, pgport, nclients, nxacts, dbName);
5655 else
5656 printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
5657 pghost, pgport, nclients, duration, dbName);
5658 }
5659
5660 /* opening connection... */
5661 con = doConnect();
5662 if (con == NULL)
5663 exit(1);
5664
5665 if (PQstatus(con) == CONNECTION_BAD)
5666 {
5667 fprintf(stderr, "connection to database \"%s\" failed\n", dbName);
5668 fprintf(stderr, "%s", PQerrorMessage(con));
5669 exit(1);
5670 }
5671
5672 if (internal_script_used)
5673 {
5674 /*
5675 * get the scaling factor that should be same as count(*) from
5676 * pgbench_branches if this is not a custom query
5677 */
5678 res = PQexec(con, "select count(*) from pgbench_branches");
5679 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5680 {
5681 char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
5682
5683 fprintf(stderr, "%s", PQerrorMessage(con));
5684 if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) == 0)
5685 {
5686 fprintf(stderr, "Perhaps you need to do initialization (\"pgbench -i\") in database \"%s\"\n", PQdb(con));
5687 }
5688
5689 exit(1);
5690 }
5691 scale = atoi(PQgetvalue(res, 0, 0));
5692 if (scale < 0)
5693 {
5694 fprintf(stderr, "invalid count(*) from pgbench_branches: \"%s\"\n",
5695 PQgetvalue(res, 0, 0));
5696 exit(1);
5697 }
5698 PQclear(res);
5699
5700 /* warn if we override user-given -s switch */
5701 if (scale_given)
5702 fprintf(stderr,
5703 "scale option ignored, using count from pgbench_branches table (%d)\n",
5704 scale);
5705 }
5706
5707 /*
5708 * :scale variables normally get -s or database scale, but don't override
5709 * an explicit -D switch
5710 */
5711 if (lookupVariable(&state[0], "scale") == NULL)
5712 {
5713 for (i = 0; i < nclients; i++)
5714 {
5715 if (!putVariableInt(&state[i], "startup", "scale", scale))
5716 exit(1);
5717 }
5718 }
5719
5720 /*
5721 * Define a :client_id variable that is unique per connection. But don't
5722 * override an explicit -D switch.
5723 */
5724 if (lookupVariable(&state[0], "client_id") == NULL)
5725 {
5726 for (i = 0; i < nclients; i++)
5727 if (!putVariableInt(&state[i], "startup", "client_id", i))
5728 exit(1);
5729 }
5730
5731 /* set default seed for hash functions */
5732 if (lookupVariable(&state[0], "default_seed") == NULL)
5733 {
5734 uint64 seed =
5735 ((uint64) pg_jrand48(base_random_sequence.xseed) & 0xFFFFFFFF) |
5736 (((uint64) pg_jrand48(base_random_sequence.xseed) & 0xFFFFFFFF) << 32);
5737
5738 for (i = 0; i < nclients; i++)
5739 if (!putVariableInt(&state[i], "startup", "default_seed", (int64) seed))
5740 exit(1);
5741 }
5742
5743 /* set random seed unless overwritten */
5744 if (lookupVariable(&state[0], "random_seed") == NULL)
5745 {
5746 for (i = 0; i < nclients; i++)
5747 if (!putVariableInt(&state[i], "startup", "random_seed", random_seed))
5748 exit(1);
5749 }
5750
5751 if (!is_no_vacuum)
5752 {
5753 fprintf(stderr, "starting vacuum...");
5754 tryExecuteStatement(con, "vacuum pgbench_branches");
5755 tryExecuteStatement(con, "vacuum pgbench_tellers");
5756 tryExecuteStatement(con, "truncate pgbench_history");
5757 fprintf(stderr, "end.\n");
5758
5759 if (do_vacuum_accounts)
5760 {
5761 fprintf(stderr, "starting vacuum pgbench_accounts...");
5762 tryExecuteStatement(con, "vacuum analyze pgbench_accounts");
5763 fprintf(stderr, "end.\n");
5764 }
5765 }
5766 PQfinish(con);
5767
5768 /* set up thread data structures */
5769 threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
5770 nclients_dealt = 0;
5771
5772 for (i = 0; i < nthreads; i++)
5773 {
5774 TState *thread = &threads[i];
5775
5776 thread->tid = i;
5777 thread->state = &state[nclients_dealt];
5778 thread->nstate =
5779 (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
5780 initRandomState(&thread->ts_choose_rs);
5781 initRandomState(&thread->ts_throttle_rs);
5782 initRandomState(&thread->ts_sample_rs);
5783 thread->logfile = NULL; /* filled in later */
5784 thread->latency_late = 0;
5785 initStats(&thread->stats, 0);
5786
5787 nclients_dealt += thread->nstate;
5788 }
5789
5790 /* all clients must be assigned to a thread */
5791 Assert(nclients_dealt == nclients);
5792
5793 /* get start up time */
5794 INSTR_TIME_SET_CURRENT(start_time);
5795
5796 /* set alarm if duration is specified. */
5797 if (duration > 0)
5798 setalarm(duration);
5799
5800 /* start threads */
5801#ifdef ENABLE_THREAD_SAFETY
5802 for (i = 0; i < nthreads; i++)
5803 {
5804 TState *thread = &threads[i];
5805
5806 INSTR_TIME_SET_CURRENT(thread->start_time);
5807
5808 /* compute when to stop */
5809 if (duration > 0)
5810 end_time = INSTR_TIME_GET_MICROSEC(thread->start_time) +
5811 (int64) 1000000 * duration;
5812
5813 /* the first thread (i = 0) is executed by main thread */
5814 if (i > 0)
5815 {
5816 int err = pthread_create(&thread->thread, NULL, threadRun, thread);
5817
5818 if (err != 0 || thread->thread == INVALID_THREAD)
5819 {
5820 fprintf(stderr, "could not create thread: %s\n", strerror(err));
5821 exit(1);
5822 }
5823 }
5824 else
5825 {
5826 thread->thread = INVALID_THREAD;
5827 }
5828 }
5829#else
5830 INSTR_TIME_SET_CURRENT(threads[0].start_time);
5831 /* compute when to stop */
5832 if (duration > 0)
5833 end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) +
5834 (int64) 1000000 * duration;
5835 threads[0].thread = INVALID_THREAD;
5836#endif /* ENABLE_THREAD_SAFETY */
5837
5838 /* wait for threads and accumulate results */
5839 initStats(&stats, 0);
5840 INSTR_TIME_SET_ZERO(conn_total_time);
5841 for (i = 0; i < nthreads; i++)
5842 {
5843 TState *thread = &threads[i];
5844
5845#ifdef ENABLE_THREAD_SAFETY
5846 if (threads[i].thread == INVALID_THREAD)
5847 /* actually run this thread directly in the main thread */
5848 (void) threadRun(thread);
5849 else
5850 /* wait of other threads. should check that 0 is returned? */
5851 pthread_join(thread->thread, NULL);
5852#else
5853 (void) threadRun(thread);
5854#endif /* ENABLE_THREAD_SAFETY */
5855
5856 for (int j = 0; j < thread->nstate; j++)
5857 if (thread->state[j].state == CSTATE_ABORTED)
5858 exit_code = 2;
5859
5860 /* aggregate thread level stats */
5861 mergeSimpleStats(&stats.latency, &thread->stats.latency);
5862 mergeSimpleStats(&stats.lag, &thread->stats.lag);
5863 stats.cnt += thread->stats.cnt;
5864 stats.skipped += thread->stats.skipped;
5865 latency_late += thread->latency_late;
5866 INSTR_TIME_ADD(conn_total_time, thread->conn_time);
5867 }
5868 disconnect_all(state, nclients);
5869
5870 /*
5871 * XXX We compute results as though every client of every thread started
5872 * and finished at the same time. That model can diverge noticeably from
5873 * reality for a short benchmark run involving relatively many threads.
5874 * The first thread may process notably many transactions before the last
5875 * thread begins. Improving the model alone would bring limited benefit,
5876 * because performance during those periods of partial thread count can
5877 * easily exceed steady state performance. This is one of the many ways
5878 * short runs convey deceptive performance figures.
5879 */
5880 INSTR_TIME_SET_CURRENT(total_time);
5881 INSTR_TIME_SUBTRACT(total_time, start_time);
5882 printResults(&stats, total_time, conn_total_time, latency_late);
5883
5884 if (exit_code != 0)
5885 fprintf(stderr, "Run was aborted; the above results are incomplete.\n");
5886
5887 return exit_code;
5888}
5889
5890static void *
5891threadRun(void *arg)
5892{
5893 TState *thread = (TState *) arg;
5894 CState *state = thread->state;
5895 instr_time start,
5896 end;
5897 int nstate = thread->nstate;
5898 int remains = nstate; /* number of remaining clients */
5899 socket_set *sockets = alloc_socket_set(nstate);
5900 int i;
5901
5902 /* for reporting progress: */
5903 int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
5904 int64 last_report = thread_start;
5905 int64 next_report = last_report + (int64) progress * 1000000;
5906 StatsData last,
5907 aggs;
5908
5909 /*
5910 * Initialize throttling rate target for all of the thread's clients. It
5911 * might be a little more accurate to reset thread->start_time here too.
5912 * The possible drift seems too small relative to typical throttle delay
5913 * times to worry about it.
5914 */
5915 INSTR_TIME_SET_CURRENT(start);
5916 thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
5917
5918 INSTR_TIME_SET_ZERO(thread->conn_time);
5919
5920 initStats(&aggs, time(NULL));
5921 last = aggs;
5922
5923 /* open log file if requested */
5924 if (use_log)
5925 {
5926 char logpath[MAXPGPATH];
5927 char *prefix = logfile_prefix ? logfile_prefix : "pgbench_log";
5928
5929 if (thread->tid == 0)
5930 snprintf(logpath, sizeof(logpath), "%s.%d", prefix, main_pid);
5931 else
5932 snprintf(logpath, sizeof(logpath), "%s.%d.%d", prefix, main_pid, thread->tid);
5933
5934 thread->logfile = fopen(logpath, "w");
5935
5936 if (thread->logfile == NULL)
5937 {
5938 fprintf(stderr, "could not open logfile \"%s\": %s\n",
5939 logpath, strerror(errno));
5940 goto done;
5941 }
5942 }
5943
5944 if (!is_connect)
5945 {
5946 /* make connections to the database before starting */
5947 for (i = 0; i < nstate; i++)
5948 {
5949 if ((state[i].con = doConnect()) == NULL)
5950 goto done;
5951 }
5952 }
5953
5954 /* time after thread and connections set up */
5955 INSTR_TIME_SET_CURRENT(thread->conn_time);
5956 INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
5957
5958 /* explicitly initialize the state machines */
5959 for (i = 0; i < nstate; i++)
5960 {
5961 state[i].state = CSTATE_CHOOSE_SCRIPT;
5962 }
5963
5964 /* loop till all clients have terminated */
5965 while (remains > 0)
5966 {
5967 int nsocks; /* number of sockets to be waited for */
5968 int64 min_usec;
5969 int64 now_usec = 0; /* set this only if needed */
5970
5971 /*
5972 * identify which client sockets should be checked for input, and
5973 * compute the nearest time (if any) at which we need to wake up.
5974 */
5975 clear_socket_set(sockets);
5976 nsocks = 0;
5977 min_usec = PG_INT64_MAX;
5978 for (i = 0; i < nstate; i++)
5979 {
5980 CState *st = &state[i];
5981
5982 if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
5983 {
5984 /* a nap from the script, or under throttling */
5985 int64 this_usec;
5986
5987 /* get current time if needed */
5988 if (now_usec == 0)
5989 {
5990 instr_time now;
5991
5992 INSTR_TIME_SET_CURRENT(now);
5993 now_usec = INSTR_TIME_GET_MICROSEC(now);
5994 }
5995
5996 /* min_usec should be the minimum delay across all clients */
5997 this_usec = (st->state == CSTATE_SLEEP ?
5998 st->sleep_until : st->txn_scheduled) - now_usec;
5999 if (min_usec > this_usec)
6000 min_usec = this_usec;
6001 }
6002 else if (st->state == CSTATE_WAIT_RESULT)
6003 {
6004 /*
6005 * waiting for result from server - nothing to do unless the
6006 * socket is readable
6007 */
6008 int sock = PQsocket(st->con);
6009
6010 if (sock < 0)
6011 {
6012 fprintf(stderr, "invalid socket: %s",
6013 PQerrorMessage(st->con));
6014 goto done;
6015 }
6016
6017 add_socket_to_set(sockets, sock, nsocks++);
6018 }
6019 else if (st->state != CSTATE_ABORTED &&
6020 st->state != CSTATE_FINISHED)
6021 {
6022 /*
6023 * This client thread is ready to do something, so we don't
6024 * want to wait. No need to examine additional clients.
6025 */
6026 min_usec = 0;
6027 break;
6028 }
6029 }
6030
6031 /* also wake up to print the next progress report on time */
6032 if (progress && min_usec > 0 && thread->tid == 0)
6033 {
6034 /* get current time if needed */
6035 if (now_usec == 0)
6036 {
6037 instr_time now;
6038
6039 INSTR_TIME_SET_CURRENT(now);
6040 now_usec = INSTR_TIME_GET_MICROSEC(now);
6041 }
6042
6043 if (now_usec >= next_report)
6044 min_usec = 0;
6045 else if ((next_report - now_usec) < min_usec)
6046 min_usec = next_report - now_usec;
6047 }
6048
6049 /*
6050 * If no clients are ready to execute actions, sleep until we receive
6051 * data on some client socket or the timeout (if any) elapses.
6052 */
6053 if (min_usec > 0)
6054 {
6055 int rc = 0;
6056
6057 if (min_usec != PG_INT64_MAX)
6058 {
6059 if (nsocks > 0)
6060 {
6061 rc = wait_on_socket_set(sockets, min_usec);
6062 }
6063 else /* nothing active, simple sleep */
6064 {
6065 pg_usleep(min_usec);
6066 }
6067 }
6068 else /* no explicit delay, wait without timeout */
6069 {
6070 rc = wait_on_socket_set(sockets, 0);
6071 }
6072
6073 if (rc < 0)
6074 {
6075 if (errno == EINTR)
6076 {
6077 /* On EINTR, go back to top of loop */
6078 continue;
6079 }
6080 /* must be something wrong */
6081 fprintf(stderr, "%s() failed: %s\n", SOCKET_WAIT_METHOD, strerror(errno));
6082 goto done;
6083 }
6084 }
6085 else
6086 {
6087 /* min_usec <= 0, i.e. something needs to be executed now */
6088
6089 /* If we didn't wait, don't try to read any data */
6090 clear_socket_set(sockets);
6091 }
6092
6093 /* ok, advance the state machine of each connection */
6094 nsocks = 0;
6095 for (i = 0; i < nstate; i++)
6096 {
6097 CState *st = &state[i];
6098
6099 if (st->state == CSTATE_WAIT_RESULT)
6100 {
6101 /* don't call advanceConnectionState unless data is available */
6102 int sock = PQsocket(st->con);
6103
6104 if (sock < 0)
6105 {
6106 fprintf(stderr, "invalid socket: %s",
6107 PQerrorMessage(st->con));
6108 goto done;
6109 }
6110
6111 if (!socket_has_input(sockets, sock, nsocks++))
6112 continue;
6113 }
6114 else if (st->state == CSTATE_FINISHED ||
6115 st->state == CSTATE_ABORTED)
6116 {
6117 /* this client is done, no need to consider it anymore */
6118 continue;
6119 }
6120
6121 advanceConnectionState(thread, st, &aggs);
6122
6123 /*
6124 * If advanceConnectionState changed client to finished state,
6125 * that's one less client that remains.
6126 */
6127 if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
6128 remains--;
6129 }
6130
6131 /* progress report is made by thread 0 for all threads */
6132 if (progress && thread->tid == 0)
6133 {
6134 instr_time now_time;
6135 int64 now;
6136
6137 INSTR_TIME_SET_CURRENT(now_time);
6138 now = INSTR_TIME_GET_MICROSEC(now_time);
6139 if (now >= next_report)
6140 {
6141 /*
6142 * Horrible hack: this relies on the thread pointer we are
6143 * passed to be equivalent to threads[0], that is the first
6144 * entry of the threads array. That is why this MUST be done
6145 * by thread 0 and not any other.
6146 */
6147 printProgressReport(thread, thread_start, now,
6148 &last, &last_report);
6149
6150 /*
6151 * Ensure that the next report is in the future, in case
6152 * pgbench/postgres got stuck somewhere.
6153 */
6154 do
6155 {
6156 next_report += (int64) progress * 1000000;
6157 } while (now >= next_report);
6158 }
6159 }
6160 }
6161
6162done:
6163 INSTR_TIME_SET_CURRENT(start);
6164 disconnect_all(state, nstate);
6165 INSTR_TIME_SET_CURRENT(end);
6166 INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
6167 if (thread->logfile)
6168 {
6169 if (agg_interval > 0)
6170 {
6171 /* log aggregated but not yet reported transactions */
6172 doLog(thread, state, &aggs, false, 0, 0);
6173 }
6174 fclose(thread->logfile);
6175 thread->logfile = NULL;
6176 }
6177 free_socket_set(sockets);
6178 return NULL;
6179}
6180
6181static void
6182finishCon(CState *st)
6183{
6184 if (st->con != NULL)
6185 {
6186 PQfinish(st->con);
6187 st->con = NULL;
6188 }
6189}
6190
6191/*
6192 * Support for duration option: set timer_exceeded after so many seconds.
6193 */
6194
6195#ifndef WIN32
6196
6197static void
6198handle_sig_alarm(SIGNAL_ARGS)
6199{
6200 timer_exceeded = true;
6201}
6202
6203static void
6204setalarm(int seconds)
6205{
6206 pqsignal(SIGALRM, handle_sig_alarm);
6207 alarm(seconds);
6208}
6209
6210#else /* WIN32 */
6211
6212static VOID CALLBACK
6213win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
6214{
6215 timer_exceeded = true;
6216}
6217
6218static void
6219setalarm(int seconds)
6220{
6221 HANDLE queue;
6222 HANDLE timer;
6223
6224 /* This function will be called at most once, so we can cheat a bit. */
6225 queue = CreateTimerQueue();
6226 if (seconds > ((DWORD) -1) / 1000 ||
6227 !CreateTimerQueueTimer(&timer, queue,
6228 win32_timer_callback, NULL, seconds * 1000, 0,
6229 WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
6230 {
6231 fprintf(stderr, "failed to set timer\n");
6232 exit(1);
6233 }
6234}
6235
6236#endif /* WIN32 */
6237
6238
6239/*
6240 * These functions provide an abstraction layer that hides the syscall
6241 * we use to wait for input on a set of sockets.
6242 *
6243 * Currently there are two implementations, based on ppoll(2) and select(2).
6244 * ppoll() is preferred where available due to its typically higher ceiling
6245 * on the number of usable sockets. We do not use the more-widely-available
6246 * poll(2) because it only offers millisecond timeout resolution, which could
6247 * be problematic with high --rate settings.
6248 *
6249 * Function APIs:
6250 *
6251 * alloc_socket_set: allocate an empty socket set with room for up to
6252 * "count" sockets.
6253 *
6254 * free_socket_set: deallocate a socket set.
6255 *
6256 * clear_socket_set: reset a socket set to empty.
6257 *
6258 * add_socket_to_set: add socket with indicated FD to slot "idx" in the
6259 * socket set. Slots must be filled in order, starting with 0.
6260 *
6261 * wait_on_socket_set: wait for input on any socket in set, or for timeout
6262 * to expire. timeout is measured in microseconds; 0 means wait forever.
6263 * Returns result code of underlying syscall (>=0 if OK, else see errno).
6264 *
6265 * socket_has_input: after waiting, call this to see if given socket has
6266 * input. fd and idx parameters should match some previous call to
6267 * add_socket_to_set.
6268 *
6269 * Note that wait_on_socket_set destructively modifies the state of the
6270 * socket set. After checking for input, caller must apply clear_socket_set
6271 * and add_socket_to_set again before waiting again.
6272 */
6273
6274#ifdef POLL_USING_PPOLL
6275
6276static socket_set *
6277alloc_socket_set(int count)
6278{
6279 socket_set *sa;
6280
6281 sa = (socket_set *) pg_malloc0(offsetof(socket_set, pollfds) +
6282 sizeof(struct pollfd) * count);
6283 sa->maxfds = count;
6284 sa->curfds = 0;
6285 return sa;
6286}
6287
6288static void
6289free_socket_set(socket_set *sa)
6290{
6291 pg_free(sa);
6292}
6293
6294static void
6295clear_socket_set(socket_set *sa)
6296{
6297 sa->curfds = 0;
6298}
6299
6300static void
6301add_socket_to_set(socket_set *sa, int fd, int idx)
6302{
6303 Assert(idx < sa->maxfds && idx == sa->curfds);
6304 sa->pollfds[idx].fd = fd;
6305 sa->pollfds[idx].events = POLLIN;
6306 sa->pollfds[idx].revents = 0;
6307 sa->curfds++;
6308}
6309
6310static int
6311wait_on_socket_set(socket_set *sa, int64 usecs)
6312{
6313 if (usecs > 0)
6314 {
6315 struct timespec timeout;
6316
6317 timeout.tv_sec = usecs / 1000000;
6318 timeout.tv_nsec = (usecs % 1000000) * 1000;
6319 return ppoll(sa->pollfds, sa->curfds, &timeout, NULL);
6320 }
6321 else
6322 {
6323 return ppoll(sa->pollfds, sa->curfds, NULL, NULL);
6324 }
6325}
6326
6327static bool
6328socket_has_input(socket_set *sa, int fd, int idx)
6329{
6330 /*
6331 * In some cases, threadRun will apply clear_socket_set and then try to
6332 * apply socket_has_input anyway with arguments that it used before that,
6333 * or might've used before that except that it exited its setup loop
6334 * early. Hence, if the socket set is empty, silently return false
6335 * regardless of the parameters. If it's not empty, we can Assert that
6336 * the parameters match a previous call.
6337 */
6338 if (sa->curfds == 0)
6339 return false;
6340
6341 Assert(idx < sa->curfds && sa->pollfds[idx].fd == fd);
6342 return (sa->pollfds[idx].revents & POLLIN) != 0;
6343}
6344
6345#endif /* POLL_USING_PPOLL */
6346
6347#ifdef POLL_USING_SELECT
6348
6349static socket_set *
6350alloc_socket_set(int count)
6351{
6352 return (socket_set *) pg_malloc0(sizeof(socket_set));
6353}
6354
6355static void
6356free_socket_set(socket_set *sa)
6357{
6358 pg_free(sa);
6359}
6360
6361static void
6362clear_socket_set(socket_set *sa)
6363{
6364 FD_ZERO(&sa->fds);
6365 sa->maxfd = -1;
6366}
6367
6368static void
6369add_socket_to_set(socket_set *sa, int fd, int idx)
6370{
6371 if (fd < 0 || fd >= FD_SETSIZE)
6372 {
6373 /*
6374 * Doing a hard exit here is a bit grotty, but it doesn't seem worth
6375 * complicating the API to make it less grotty.
6376 */
6377 fprintf(stderr, "too many client connections for select()\n");
6378 exit(1);
6379 }
6380 FD_SET(fd, &sa->fds);
6381 if (fd > sa->maxfd)
6382 sa->maxfd = fd;
6383}
6384
6385static int
6386wait_on_socket_set(socket_set *sa, int64 usecs)
6387{
6388 if (usecs > 0)
6389 {
6390 struct timeval timeout;
6391
6392 timeout.tv_sec = usecs / 1000000;
6393 timeout.tv_usec = usecs % 1000000;
6394 return select(sa->maxfd + 1, &sa->fds, NULL, NULL, &timeout);
6395 }
6396 else
6397 {
6398 return select(sa->maxfd + 1, &sa->fds, NULL, NULL, NULL);
6399 }
6400}
6401
6402static bool
6403socket_has_input(socket_set *sa, int fd, int idx)
6404{
6405 return (FD_ISSET(fd, &sa->fds) != 0);
6406}
6407
6408#endif /* POLL_USING_SELECT */
6409
6410
6411/* partial pthread implementation for Windows */
6412
6413#ifdef WIN32
6414
6415typedef struct win32_pthread
6416{
6417 HANDLE handle;
6418 void *(*routine) (void *);
6419 void *arg;
6420 void *result;
6421} win32_pthread;
6422
6423static unsigned __stdcall
6424win32_pthread_run(void *arg)
6425{
6426 win32_pthread *th = (win32_pthread *) arg;
6427
6428 th->result = th->routine(th->arg);
6429
6430 return 0;
6431}
6432
6433static int
6434pthread_create(pthread_t *thread,
6435 pthread_attr_t *attr,
6436 void *(*start_routine) (void *),
6437 void *arg)
6438{
6439 int save_errno;
6440 win32_pthread *th;
6441
6442 th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
6443 th->routine = start_routine;
6444 th->arg = arg;
6445 th->result = NULL;
6446
6447 th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
6448 if (th->handle == NULL)
6449 {
6450 save_errno = errno;
6451 free(th);
6452 return save_errno;
6453 }
6454
6455 *thread = th;
6456 return 0;
6457}
6458
6459static int
6460pthread_join(pthread_t th, void **thread_return)
6461{
6462 if (th == NULL || th->handle == NULL)
6463 return errno = EINVAL;
6464
6465 if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
6466 {
6467 _dosmaperr(GetLastError());
6468 return errno;
6469 }
6470
6471 if (thread_return)
6472 *thread_return = th->result;
6473
6474 CloseHandle(th->handle);
6475 free(th);
6476 return 0;
6477}
6478
6479#endif /* WIN32 */
6480