1/*-------------------------------------------------------------------------
2 *
3 * worker_internal.h
4 * Internal headers shared by logical replication workers.
5 *
6 * Portions Copyright (c) 2016-2019, PostgreSQL Global Development Group
7 *
8 * src/include/replication/worker_internal.h
9 *
10 *-------------------------------------------------------------------------
11 */
12#ifndef WORKER_INTERNAL_H
13#define WORKER_INTERNAL_H
14
15#include <signal.h>
16
17#include "access/xlogdefs.h"
18#include "catalog/pg_subscription.h"
19#include "datatype/timestamp.h"
20#include "storage/lock.h"
21
22typedef struct LogicalRepWorker
23{
24 /* Time at which this worker was launched. */
25 TimestampTz launch_time;
26
27 /* Indicates if this slot is used or free. */
28 bool in_use;
29
30 /* Increased every time the slot is taken by new worker. */
31 uint16 generation;
32
33 /* Pointer to proc array. NULL if not running. */
34 PGPROC *proc;
35
36 /* Database id to connect to. */
37 Oid dbid;
38
39 /* User to use for connection (will be same as owner of subscription). */
40 Oid userid;
41
42 /* Subscription id for the worker. */
43 Oid subid;
44
45 /* Used for initial table synchronization. */
46 Oid relid;
47 char relstate;
48 XLogRecPtr relstate_lsn;
49 slock_t relmutex;
50
51 /* Stats. */
52 XLogRecPtr last_lsn;
53 TimestampTz last_send_time;
54 TimestampTz last_recv_time;
55 XLogRecPtr reply_lsn;
56 TimestampTz reply_time;
57} LogicalRepWorker;
58
59/* Main memory context for apply worker. Permanent during worker lifetime. */
60extern MemoryContext ApplyContext;
61
62/* libpqreceiver connection */
63extern struct WalReceiverConn *wrconn;
64
65/* Worker and subscription objects. */
66extern Subscription *MySubscription;
67extern LogicalRepWorker *MyLogicalRepWorker;
68
69extern bool in_remote_transaction;
70
71extern void logicalrep_worker_attach(int slot);
72extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
73 bool only_running);
74extern List *logicalrep_workers_find(Oid subid, bool only_running);
75extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
76 Oid userid, Oid relid);
77extern void logicalrep_worker_stop(Oid subid, Oid relid);
78extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
79extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
80extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
81
82extern int logicalrep_sync_worker_count(Oid subid);
83
84extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
85void process_syncing_tables(XLogRecPtr current_lsn);
86void invalidate_syncing_table_states(Datum arg, int cacheid,
87 uint32 hashvalue);
88
89static inline bool
90am_tablesync_worker(void)
91{
92 return OidIsValid(MyLogicalRepWorker->relid);
93}
94
95#endif /* WORKER_INTERNAL_H */
96