1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * logicalfuncs.c |
4 | * |
5 | * Support functions for using logical decoding and management of |
6 | * logical replication slots via SQL. |
7 | * |
8 | * |
9 | * Copyright (c) 2012-2019, PostgreSQL Global Development Group |
10 | * |
11 | * IDENTIFICATION |
12 | * src/backend/replication/logicalfuncs.c |
13 | *------------------------------------------------------------------------- |
14 | */ |
15 | |
16 | #include "postgres.h" |
17 | |
18 | #include <unistd.h> |
19 | |
20 | #include "fmgr.h" |
21 | #include "funcapi.h" |
22 | #include "miscadmin.h" |
23 | |
24 | #include "access/xlog_internal.h" |
25 | #include "access/xlogutils.h" |
26 | |
27 | #include "access/xact.h" |
28 | |
29 | #include "catalog/pg_type.h" |
30 | |
31 | #include "nodes/makefuncs.h" |
32 | |
33 | #include "mb/pg_wchar.h" |
34 | |
35 | #include "utils/array.h" |
36 | #include "utils/builtins.h" |
37 | #include "utils/inval.h" |
38 | #include "utils/memutils.h" |
39 | #include "utils/pg_lsn.h" |
40 | #include "utils/regproc.h" |
41 | #include "utils/resowner.h" |
42 | #include "utils/lsyscache.h" |
43 | |
44 | #include "replication/decode.h" |
45 | #include "replication/logical.h" |
46 | #include "replication/logicalfuncs.h" |
47 | #include "replication/message.h" |
48 | |
49 | #include "storage/fd.h" |
50 | |
51 | /* private date for writing out data */ |
52 | typedef struct DecodingOutputState |
53 | { |
54 | Tuplestorestate *tupstore; |
55 | TupleDesc tupdesc; |
56 | bool binary_output; |
57 | int64 returned_rows; |
58 | } DecodingOutputState; |
59 | |
60 | /* |
61 | * Prepare for an output plugin write. |
62 | */ |
63 | static void |
64 | LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, |
65 | bool last_write) |
66 | { |
67 | resetStringInfo(ctx->out); |
68 | } |
69 | |
70 | /* |
71 | * Perform output plugin write into tuplestore. |
72 | */ |
73 | static void |
74 | LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, |
75 | bool last_write) |
76 | { |
77 | Datum values[3]; |
78 | bool nulls[3]; |
79 | DecodingOutputState *p; |
80 | |
81 | /* SQL Datums can only be of a limited length... */ |
82 | if (ctx->out->len > MaxAllocSize - VARHDRSZ) |
83 | elog(ERROR, "too much output for sql interface" ); |
84 | |
85 | p = (DecodingOutputState *) ctx->output_writer_private; |
86 | |
87 | memset(nulls, 0, sizeof(nulls)); |
88 | values[0] = LSNGetDatum(lsn); |
89 | values[1] = TransactionIdGetDatum(xid); |
90 | |
91 | /* |
92 | * Assert ctx->out is in database encoding when we're writing textual |
93 | * output. |
94 | */ |
95 | if (!p->binary_output) |
96 | Assert(pg_verify_mbstr(GetDatabaseEncoding(), |
97 | ctx->out->data, ctx->out->len, |
98 | false)); |
99 | |
100 | /* ick, but cstring_to_text_with_len works for bytea perfectly fine */ |
101 | values[2] = PointerGetDatum( |
102 | cstring_to_text_with_len(ctx->out->data, ctx->out->len)); |
103 | |
104 | tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls); |
105 | p->returned_rows++; |
106 | } |
107 | |
108 | static void |
109 | check_permissions(void) |
110 | { |
111 | if (!superuser() && !has_rolreplication(GetUserId())) |
112 | ereport(ERROR, |
113 | (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), |
114 | (errmsg("must be superuser or replication role to use replication slots" )))); |
115 | } |
116 | |
117 | int |
118 | logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, |
119 | int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI) |
120 | { |
121 | return read_local_xlog_page(state, targetPagePtr, reqLen, |
122 | targetRecPtr, cur_page, pageTLI); |
123 | } |
124 | |
125 | /* |
126 | * Helper function for the various SQL callable logical decoding functions. |
127 | */ |
128 | static Datum |
129 | pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary) |
130 | { |
131 | Name name; |
132 | XLogRecPtr upto_lsn; |
133 | int32 upto_nchanges; |
134 | ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; |
135 | MemoryContext per_query_ctx; |
136 | MemoryContext oldcontext; |
137 | XLogRecPtr end_of_wal; |
138 | XLogRecPtr startptr; |
139 | LogicalDecodingContext *ctx; |
140 | ResourceOwner old_resowner = CurrentResourceOwner; |
141 | ArrayType *arr; |
142 | Size ndim; |
143 | List *options = NIL; |
144 | DecodingOutputState *p; |
145 | |
146 | check_permissions(); |
147 | |
148 | CheckLogicalDecodingRequirements(); |
149 | |
150 | if (PG_ARGISNULL(0)) |
151 | ereport(ERROR, |
152 | (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), |
153 | errmsg("slot name must not be null" ))); |
154 | name = PG_GETARG_NAME(0); |
155 | |
156 | if (PG_ARGISNULL(1)) |
157 | upto_lsn = InvalidXLogRecPtr; |
158 | else |
159 | upto_lsn = PG_GETARG_LSN(1); |
160 | |
161 | if (PG_ARGISNULL(2)) |
162 | upto_nchanges = InvalidXLogRecPtr; |
163 | else |
164 | upto_nchanges = PG_GETARG_INT32(2); |
165 | |
166 | if (PG_ARGISNULL(3)) |
167 | ereport(ERROR, |
168 | (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), |
169 | errmsg("options array must not be null" ))); |
170 | arr = PG_GETARG_ARRAYTYPE_P(3); |
171 | |
172 | /* check to see if caller supports us returning a tuplestore */ |
173 | if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) |
174 | ereport(ERROR, |
175 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
176 | errmsg("set-valued function called in context that cannot accept a set" ))); |
177 | if (!(rsinfo->allowedModes & SFRM_Materialize)) |
178 | ereport(ERROR, |
179 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
180 | errmsg("materialize mode required, but it is not allowed in this context" ))); |
181 | |
182 | /* state to write output to */ |
183 | p = palloc0(sizeof(DecodingOutputState)); |
184 | |
185 | p->binary_output = binary; |
186 | |
187 | /* Build a tuple descriptor for our result type */ |
188 | if (get_call_result_type(fcinfo, NULL, &p->tupdesc) != TYPEFUNC_COMPOSITE) |
189 | elog(ERROR, "return type must be a row type" ); |
190 | |
191 | per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; |
192 | oldcontext = MemoryContextSwitchTo(per_query_ctx); |
193 | |
194 | /* Deconstruct options array */ |
195 | ndim = ARR_NDIM(arr); |
196 | if (ndim > 1) |
197 | { |
198 | ereport(ERROR, |
199 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
200 | errmsg("array must be one-dimensional" ))); |
201 | } |
202 | else if (array_contains_nulls(arr)) |
203 | { |
204 | ereport(ERROR, |
205 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
206 | errmsg("array must not contain nulls" ))); |
207 | } |
208 | else if (ndim == 1) |
209 | { |
210 | int nelems; |
211 | Datum *datum_opts; |
212 | int i; |
213 | |
214 | Assert(ARR_ELEMTYPE(arr) == TEXTOID); |
215 | |
216 | deconstruct_array(arr, TEXTOID, -1, false, 'i', |
217 | &datum_opts, NULL, &nelems); |
218 | |
219 | if (nelems % 2 != 0) |
220 | ereport(ERROR, |
221 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
222 | errmsg("array must have even number of elements" ))); |
223 | |
224 | for (i = 0; i < nelems; i += 2) |
225 | { |
226 | char *name = TextDatumGetCString(datum_opts[i]); |
227 | char *opt = TextDatumGetCString(datum_opts[i + 1]); |
228 | |
229 | options = lappend(options, makeDefElem(name, (Node *) makeString(opt), -1)); |
230 | } |
231 | } |
232 | |
233 | p->tupstore = tuplestore_begin_heap(true, false, work_mem); |
234 | rsinfo->returnMode = SFRM_Materialize; |
235 | rsinfo->setResult = p->tupstore; |
236 | rsinfo->setDesc = p->tupdesc; |
237 | |
238 | /* |
239 | * Compute the current end-of-wal and maintain ThisTimeLineID. |
240 | * RecoveryInProgress() will update ThisTimeLineID on promotion. |
241 | */ |
242 | if (!RecoveryInProgress()) |
243 | end_of_wal = GetFlushRecPtr(); |
244 | else |
245 | end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID); |
246 | |
247 | ReplicationSlotAcquire(NameStr(*name), true); |
248 | |
249 | PG_TRY(); |
250 | { |
251 | /* restart at slot's confirmed_flush */ |
252 | ctx = CreateDecodingContext(InvalidXLogRecPtr, |
253 | options, |
254 | false, |
255 | logical_read_local_xlog_page, |
256 | LogicalOutputPrepareWrite, |
257 | LogicalOutputWrite, NULL); |
258 | |
259 | MemoryContextSwitchTo(oldcontext); |
260 | |
261 | /* |
262 | * Check whether the output plugin writes textual output if that's |
263 | * what we need. |
264 | */ |
265 | if (!binary && |
266 | ctx->options.output_type !=OUTPUT_PLUGIN_TEXTUAL_OUTPUT) |
267 | ereport(ERROR, |
268 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
269 | errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data" , |
270 | NameStr(MyReplicationSlot->data.plugin), |
271 | format_procedure(fcinfo->flinfo->fn_oid)))); |
272 | |
273 | ctx->output_writer_private = p; |
274 | |
275 | /* |
276 | * Decoding of WAL must start at restart_lsn so that the entirety of |
277 | * xacts that committed after the slot's confirmed_flush can be |
278 | * accumulated into reorder buffers. |
279 | */ |
280 | startptr = MyReplicationSlot->data.restart_lsn; |
281 | |
282 | /* invalidate non-timetravel entries */ |
283 | InvalidateSystemCaches(); |
284 | |
285 | /* Decode until we run out of records */ |
286 | while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) || |
287 | (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal)) |
288 | { |
289 | XLogRecord *record; |
290 | char *errm = NULL; |
291 | |
292 | record = XLogReadRecord(ctx->reader, startptr, &errm); |
293 | if (errm) |
294 | elog(ERROR, "%s" , errm); |
295 | |
296 | /* |
297 | * Now that we've set up the xlog reader state, subsequent calls |
298 | * pass InvalidXLogRecPtr to say "continue from last record" |
299 | */ |
300 | startptr = InvalidXLogRecPtr; |
301 | |
302 | /* |
303 | * The {begin_txn,change,commit_txn}_wrapper callbacks above will |
304 | * store the description into our tuplestore. |
305 | */ |
306 | if (record != NULL) |
307 | LogicalDecodingProcessRecord(ctx, ctx->reader); |
308 | |
309 | /* check limits */ |
310 | if (upto_lsn != InvalidXLogRecPtr && |
311 | upto_lsn <= ctx->reader->EndRecPtr) |
312 | break; |
313 | if (upto_nchanges != 0 && |
314 | upto_nchanges <= p->returned_rows) |
315 | break; |
316 | CHECK_FOR_INTERRUPTS(); |
317 | } |
318 | |
319 | tuplestore_donestoring(tupstore); |
320 | |
321 | /* |
322 | * Logical decoding could have clobbered CurrentResourceOwner during |
323 | * transaction management, so restore the executor's value. (This is |
324 | * a kluge, but it's not worth cleaning up right now.) |
325 | */ |
326 | CurrentResourceOwner = old_resowner; |
327 | |
328 | /* |
329 | * Next time, start where we left off. (Hunting things, the family |
330 | * business..) |
331 | */ |
332 | if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm) |
333 | { |
334 | LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr); |
335 | |
336 | /* |
337 | * If only the confirmed_flush_lsn has changed the slot won't get |
338 | * marked as dirty by the above. Callers on the walsender |
339 | * interface are expected to keep track of their own progress and |
340 | * don't need it written out. But SQL-interface users cannot |
341 | * specify their own start positions and it's harder for them to |
342 | * keep track of their progress, so we should make more of an |
343 | * effort to save it for them. |
344 | * |
345 | * Dirty the slot so it's written out at the next checkpoint. |
346 | * We'll still lose its position on crash, as documented, but it's |
347 | * better than always losing the position even on clean restart. |
348 | */ |
349 | ReplicationSlotMarkDirty(); |
350 | } |
351 | |
352 | /* free context, call shutdown callback */ |
353 | FreeDecodingContext(ctx); |
354 | |
355 | ReplicationSlotRelease(); |
356 | InvalidateSystemCaches(); |
357 | } |
358 | PG_CATCH(); |
359 | { |
360 | /* clear all timetravel entries */ |
361 | InvalidateSystemCaches(); |
362 | |
363 | PG_RE_THROW(); |
364 | } |
365 | PG_END_TRY(); |
366 | |
367 | return (Datum) 0; |
368 | } |
369 | |
370 | /* |
371 | * SQL function returning the changestream as text, consuming the data. |
372 | */ |
373 | Datum |
374 | pg_logical_slot_get_changes(PG_FUNCTION_ARGS) |
375 | { |
376 | return pg_logical_slot_get_changes_guts(fcinfo, true, false); |
377 | } |
378 | |
379 | /* |
380 | * SQL function returning the changestream as text, only peeking ahead. |
381 | */ |
382 | Datum |
383 | pg_logical_slot_peek_changes(PG_FUNCTION_ARGS) |
384 | { |
385 | return pg_logical_slot_get_changes_guts(fcinfo, false, false); |
386 | } |
387 | |
388 | /* |
389 | * SQL function returning the changestream in binary, consuming the data. |
390 | */ |
391 | Datum |
392 | pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS) |
393 | { |
394 | return pg_logical_slot_get_changes_guts(fcinfo, true, true); |
395 | } |
396 | |
397 | /* |
398 | * SQL function returning the changestream in binary, only peeking ahead. |
399 | */ |
400 | Datum |
401 | pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS) |
402 | { |
403 | return pg_logical_slot_get_changes_guts(fcinfo, false, true); |
404 | } |
405 | |
406 | |
407 | /* |
408 | * SQL function for writing logical decoding message into WAL. |
409 | */ |
410 | Datum |
411 | pg_logical_emit_message_bytea(PG_FUNCTION_ARGS) |
412 | { |
413 | bool transactional = PG_GETARG_BOOL(0); |
414 | char *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1)); |
415 | bytea *data = PG_GETARG_BYTEA_PP(2); |
416 | XLogRecPtr lsn; |
417 | |
418 | lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data), |
419 | transactional); |
420 | PG_RETURN_LSN(lsn); |
421 | } |
422 | |
423 | Datum |
424 | pg_logical_emit_message_text(PG_FUNCTION_ARGS) |
425 | { |
426 | /* bytea and text are compatible */ |
427 | return pg_logical_emit_message_bytea(fcinfo); |
428 | } |
429 | |