1/*-------------------------------------------------------------------------
2 *
3 * proto.c
4 * logical replication protocol functions
5 *
6 * Copyright (c) 2015-2019, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/logical/proto.c
10 *
11 *-------------------------------------------------------------------------
12 */
13#include "postgres.h"
14
15#include "access/sysattr.h"
16#include "catalog/pg_namespace.h"
17#include "catalog/pg_type.h"
18#include "libpq/pqformat.h"
19#include "replication/logicalproto.h"
20#include "utils/builtins.h"
21#include "utils/lsyscache.h"
22#include "utils/syscache.h"
23
24/*
25 * Protocol message flags.
26 */
27#define LOGICALREP_IS_REPLICA_IDENTITY 1
28
29#define TRUNCATE_CASCADE (1<<0)
30#define TRUNCATE_RESTART_SEQS (1<<1)
31
32static void logicalrep_write_attrs(StringInfo out, Relation rel);
33static void logicalrep_write_tuple(StringInfo out, Relation rel,
34 HeapTuple tuple);
35
36static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
37static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
38
39static void logicalrep_write_namespace(StringInfo out, Oid nspid);
40static const char *logicalrep_read_namespace(StringInfo in);
41
42/*
43 * Write BEGIN to the output stream.
44 */
45void
46logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
47{
48 pq_sendbyte(out, 'B'); /* BEGIN */
49
50 /* fixed fields */
51 pq_sendint64(out, txn->final_lsn);
52 pq_sendint64(out, txn->commit_time);
53 pq_sendint32(out, txn->xid);
54}
55
56/*
57 * Read transaction BEGIN from the stream.
58 */
59void
60logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
61{
62 /* read fields */
63 begin_data->final_lsn = pq_getmsgint64(in);
64 if (begin_data->final_lsn == InvalidXLogRecPtr)
65 elog(ERROR, "final_lsn not set in begin message");
66 begin_data->committime = pq_getmsgint64(in);
67 begin_data->xid = pq_getmsgint(in, 4);
68}
69
70
71/*
72 * Write COMMIT to the output stream.
73 */
74void
75logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
76 XLogRecPtr commit_lsn)
77{
78 uint8 flags = 0;
79
80 pq_sendbyte(out, 'C'); /* sending COMMIT */
81
82 /* send the flags field (unused for now) */
83 pq_sendbyte(out, flags);
84
85 /* send fields */
86 pq_sendint64(out, commit_lsn);
87 pq_sendint64(out, txn->end_lsn);
88 pq_sendint64(out, txn->commit_time);
89}
90
91/*
92 * Read transaction COMMIT from the stream.
93 */
94void
95logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
96{
97 /* read flags (unused for now) */
98 uint8 flags = pq_getmsgbyte(in);
99
100 if (flags != 0)
101 elog(ERROR, "unrecognized flags %u in commit message", flags);
102
103 /* read fields */
104 commit_data->commit_lsn = pq_getmsgint64(in);
105 commit_data->end_lsn = pq_getmsgint64(in);
106 commit_data->committime = pq_getmsgint64(in);
107}
108
109/*
110 * Write ORIGIN to the output stream.
111 */
112void
113logicalrep_write_origin(StringInfo out, const char *origin,
114 XLogRecPtr origin_lsn)
115{
116 pq_sendbyte(out, 'O'); /* ORIGIN */
117
118 /* fixed fields */
119 pq_sendint64(out, origin_lsn);
120
121 /* origin string */
122 pq_sendstring(out, origin);
123}
124
125/*
126 * Read ORIGIN from the output stream.
127 */
128char *
129logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
130{
131 /* fixed fields */
132 *origin_lsn = pq_getmsgint64(in);
133
134 /* return origin */
135 return pstrdup(pq_getmsgstring(in));
136}
137
138/*
139 * Write INSERT to the output stream.
140 */
141void
142logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
143{
144 pq_sendbyte(out, 'I'); /* action INSERT */
145
146 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
147 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
148 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
149
150 /* use Oid as relation identifier */
151 pq_sendint32(out, RelationGetRelid(rel));
152
153 pq_sendbyte(out, 'N'); /* new tuple follows */
154 logicalrep_write_tuple(out, rel, newtuple);
155}
156
157/*
158 * Read INSERT from stream.
159 *
160 * Fills the new tuple.
161 */
162LogicalRepRelId
163logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
164{
165 char action;
166 LogicalRepRelId relid;
167
168 /* read the relation id */
169 relid = pq_getmsgint(in, 4);
170
171 action = pq_getmsgbyte(in);
172 if (action != 'N')
173 elog(ERROR, "expected new tuple but got %d",
174 action);
175
176 logicalrep_read_tuple(in, newtup);
177
178 return relid;
179}
180
181/*
182 * Write UPDATE to the output stream.
183 */
184void
185logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
186 HeapTuple newtuple)
187{
188 pq_sendbyte(out, 'U'); /* action UPDATE */
189
190 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
191 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
192 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
193
194 /* use Oid as relation identifier */
195 pq_sendint32(out, RelationGetRelid(rel));
196
197 if (oldtuple != NULL)
198 {
199 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
200 pq_sendbyte(out, 'O'); /* old tuple follows */
201 else
202 pq_sendbyte(out, 'K'); /* old key follows */
203 logicalrep_write_tuple(out, rel, oldtuple);
204 }
205
206 pq_sendbyte(out, 'N'); /* new tuple follows */
207 logicalrep_write_tuple(out, rel, newtuple);
208}
209
210/*
211 * Read UPDATE from stream.
212 */
213LogicalRepRelId
214logicalrep_read_update(StringInfo in, bool *has_oldtuple,
215 LogicalRepTupleData *oldtup,
216 LogicalRepTupleData *newtup)
217{
218 char action;
219 LogicalRepRelId relid;
220
221 /* read the relation id */
222 relid = pq_getmsgint(in, 4);
223
224 /* read and verify action */
225 action = pq_getmsgbyte(in);
226 if (action != 'K' && action != 'O' && action != 'N')
227 elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
228 action);
229
230 /* check for old tuple */
231 if (action == 'K' || action == 'O')
232 {
233 logicalrep_read_tuple(in, oldtup);
234 *has_oldtuple = true;
235
236 action = pq_getmsgbyte(in);
237 }
238 else
239 *has_oldtuple = false;
240
241 /* check for new tuple */
242 if (action != 'N')
243 elog(ERROR, "expected action 'N', got %c",
244 action);
245
246 logicalrep_read_tuple(in, newtup);
247
248 return relid;
249}
250
251/*
252 * Write DELETE to the output stream.
253 */
254void
255logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
256{
257 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
258 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
259 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
260
261 pq_sendbyte(out, 'D'); /* action DELETE */
262
263 /* use Oid as relation identifier */
264 pq_sendint32(out, RelationGetRelid(rel));
265
266 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
267 pq_sendbyte(out, 'O'); /* old tuple follows */
268 else
269 pq_sendbyte(out, 'K'); /* old key follows */
270
271 logicalrep_write_tuple(out, rel, oldtuple);
272}
273
274/*
275 * Read DELETE from stream.
276 *
277 * Fills the old tuple.
278 */
279LogicalRepRelId
280logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
281{
282 char action;
283 LogicalRepRelId relid;
284
285 /* read the relation id */
286 relid = pq_getmsgint(in, 4);
287
288 /* read and verify action */
289 action = pq_getmsgbyte(in);
290 if (action != 'K' && action != 'O')
291 elog(ERROR, "expected action 'O' or 'K', got %c", action);
292
293 logicalrep_read_tuple(in, oldtup);
294
295 return relid;
296}
297
298/*
299 * Write TRUNCATE to the output stream.
300 */
301void
302logicalrep_write_truncate(StringInfo out,
303 int nrelids,
304 Oid relids[],
305 bool cascade, bool restart_seqs)
306{
307 int i;
308 uint8 flags = 0;
309
310 pq_sendbyte(out, 'T'); /* action TRUNCATE */
311
312 pq_sendint32(out, nrelids);
313
314 /* encode and send truncate flags */
315 if (cascade)
316 flags |= TRUNCATE_CASCADE;
317 if (restart_seqs)
318 flags |= TRUNCATE_RESTART_SEQS;
319 pq_sendint8(out, flags);
320
321 for (i = 0; i < nrelids; i++)
322 pq_sendint32(out, relids[i]);
323}
324
325/*
326 * Read TRUNCATE from stream.
327 */
328List *
329logicalrep_read_truncate(StringInfo in,
330 bool *cascade, bool *restart_seqs)
331{
332 int i;
333 int nrelids;
334 List *relids = NIL;
335 uint8 flags;
336
337 nrelids = pq_getmsgint(in, 4);
338
339 /* read and decode truncate flags */
340 flags = pq_getmsgint(in, 1);
341 *cascade = (flags & TRUNCATE_CASCADE) > 0;
342 *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
343
344 for (i = 0; i < nrelids; i++)
345 relids = lappend_oid(relids, pq_getmsgint(in, 4));
346
347 return relids;
348}
349
350/*
351 * Write relation description to the output stream.
352 */
353void
354logicalrep_write_rel(StringInfo out, Relation rel)
355{
356 char *relname;
357
358 pq_sendbyte(out, 'R'); /* sending RELATION */
359
360 /* use Oid as relation identifier */
361 pq_sendint32(out, RelationGetRelid(rel));
362
363 /* send qualified relation name */
364 logicalrep_write_namespace(out, RelationGetNamespace(rel));
365 relname = RelationGetRelationName(rel);
366 pq_sendstring(out, relname);
367
368 /* send replica identity */
369 pq_sendbyte(out, rel->rd_rel->relreplident);
370
371 /* send the attribute info */
372 logicalrep_write_attrs(out, rel);
373}
374
375/*
376 * Read the relation info from stream and return as LogicalRepRelation.
377 */
378LogicalRepRelation *
379logicalrep_read_rel(StringInfo in)
380{
381 LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
382
383 rel->remoteid = pq_getmsgint(in, 4);
384
385 /* Read relation name from stream */
386 rel->nspname = pstrdup(logicalrep_read_namespace(in));
387 rel->relname = pstrdup(pq_getmsgstring(in));
388
389 /* Read the replica identity. */
390 rel->replident = pq_getmsgbyte(in);
391
392 /* Get attribute description */
393 logicalrep_read_attrs(in, rel);
394
395 return rel;
396}
397
398/*
399 * Write type info to the output stream.
400 *
401 * This function will always write base type info.
402 */
403void
404logicalrep_write_typ(StringInfo out, Oid typoid)
405{
406 Oid basetypoid = getBaseType(typoid);
407 HeapTuple tup;
408 Form_pg_type typtup;
409
410 pq_sendbyte(out, 'Y'); /* sending TYPE */
411
412 tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
413 if (!HeapTupleIsValid(tup))
414 elog(ERROR, "cache lookup failed for type %u", basetypoid);
415 typtup = (Form_pg_type) GETSTRUCT(tup);
416
417 /* use Oid as relation identifier */
418 pq_sendint32(out, typoid);
419
420 /* send qualified type name */
421 logicalrep_write_namespace(out, typtup->typnamespace);
422 pq_sendstring(out, NameStr(typtup->typname));
423
424 ReleaseSysCache(tup);
425}
426
427/*
428 * Read type info from the output stream.
429 */
430void
431logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
432{
433 ltyp->remoteid = pq_getmsgint(in, 4);
434
435 /* Read type name from stream */
436 ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
437 ltyp->typname = pstrdup(pq_getmsgstring(in));
438}
439
440/*
441 * Write a tuple to the outputstream, in the most efficient format possible.
442 */
443static void
444logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
445{
446 TupleDesc desc;
447 Datum values[MaxTupleAttributeNumber];
448 bool isnull[MaxTupleAttributeNumber];
449 int i;
450 uint16 nliveatts = 0;
451
452 desc = RelationGetDescr(rel);
453
454 for (i = 0; i < desc->natts; i++)
455 {
456 if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
457 continue;
458 nliveatts++;
459 }
460 pq_sendint16(out, nliveatts);
461
462 /* try to allocate enough memory from the get-go */
463 enlargeStringInfo(out, tuple->t_len +
464 nliveatts * (1 + 4));
465
466 heap_deform_tuple(tuple, desc, values, isnull);
467
468 /* Write the values */
469 for (i = 0; i < desc->natts; i++)
470 {
471 HeapTuple typtup;
472 Form_pg_type typclass;
473 Form_pg_attribute att = TupleDescAttr(desc, i);
474 char *outputstr;
475
476 if (att->attisdropped || att->attgenerated)
477 continue;
478
479 if (isnull[i])
480 {
481 pq_sendbyte(out, 'n'); /* null column */
482 continue;
483 }
484 else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
485 {
486 pq_sendbyte(out, 'u'); /* unchanged toast column */
487 continue;
488 }
489
490 typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
491 if (!HeapTupleIsValid(typtup))
492 elog(ERROR, "cache lookup failed for type %u", att->atttypid);
493 typclass = (Form_pg_type) GETSTRUCT(typtup);
494
495 pq_sendbyte(out, 't'); /* 'text' data follows */
496
497 outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
498 pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
499 pfree(outputstr);
500
501 ReleaseSysCache(typtup);
502 }
503}
504
505/*
506 * Read tuple in remote format from stream.
507 *
508 * The returned tuple points into the input stringinfo.
509 */
510static void
511logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
512{
513 int i;
514 int natts;
515
516 /* Get number of attributes */
517 natts = pq_getmsgint(in, 2);
518
519 memset(tuple->changed, 0, sizeof(tuple->changed));
520
521 /* Read the data */
522 for (i = 0; i < natts; i++)
523 {
524 char kind;
525
526 kind = pq_getmsgbyte(in);
527
528 switch (kind)
529 {
530 case 'n': /* null */
531 tuple->values[i] = NULL;
532 tuple->changed[i] = true;
533 break;
534 case 'u': /* unchanged column */
535 /* we don't receive the value of an unchanged column */
536 tuple->values[i] = NULL;
537 break;
538 case 't': /* text formatted value */
539 {
540 int len;
541
542 tuple->changed[i] = true;
543
544 len = pq_getmsgint(in, 4); /* read length */
545
546 /* and data */
547 tuple->values[i] = palloc(len + 1);
548 pq_copymsgbytes(in, tuple->values[i], len);
549 tuple->values[i][len] = '\0';
550 }
551 break;
552 default:
553 elog(ERROR, "unrecognized data representation type '%c'", kind);
554 }
555 }
556}
557
558/*
559 * Write relation attributes to the stream.
560 */
561static void
562logicalrep_write_attrs(StringInfo out, Relation rel)
563{
564 TupleDesc desc;
565 int i;
566 uint16 nliveatts = 0;
567 Bitmapset *idattrs = NULL;
568 bool replidentfull;
569
570 desc = RelationGetDescr(rel);
571
572 /* send number of live attributes */
573 for (i = 0; i < desc->natts; i++)
574 {
575 if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
576 continue;
577 nliveatts++;
578 }
579 pq_sendint16(out, nliveatts);
580
581 /* fetch bitmap of REPLICATION IDENTITY attributes */
582 replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
583 if (!replidentfull)
584 idattrs = RelationGetIndexAttrBitmap(rel,
585 INDEX_ATTR_BITMAP_IDENTITY_KEY);
586
587 /* send the attributes */
588 for (i = 0; i < desc->natts; i++)
589 {
590 Form_pg_attribute att = TupleDescAttr(desc, i);
591 uint8 flags = 0;
592
593 if (att->attisdropped || att->attgenerated)
594 continue;
595
596 /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
597 if (replidentfull ||
598 bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
599 idattrs))
600 flags |= LOGICALREP_IS_REPLICA_IDENTITY;
601
602 pq_sendbyte(out, flags);
603
604 /* attribute name */
605 pq_sendstring(out, NameStr(att->attname));
606
607 /* attribute type id */
608 pq_sendint32(out, (int) att->atttypid);
609
610 /* attribute mode */
611 pq_sendint32(out, att->atttypmod);
612 }
613
614 bms_free(idattrs);
615}
616
617/*
618 * Read relation attribute names from the stream.
619 */
620static void
621logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
622{
623 int i;
624 int natts;
625 char **attnames;
626 Oid *atttyps;
627 Bitmapset *attkeys = NULL;
628
629 natts = pq_getmsgint(in, 2);
630 attnames = palloc(natts * sizeof(char *));
631 atttyps = palloc(natts * sizeof(Oid));
632
633 /* read the attributes */
634 for (i = 0; i < natts; i++)
635 {
636 uint8 flags;
637
638 /* Check for replica identity column */
639 flags = pq_getmsgbyte(in);
640 if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
641 attkeys = bms_add_member(attkeys, i);
642
643 /* attribute name */
644 attnames[i] = pstrdup(pq_getmsgstring(in));
645
646 /* attribute type id */
647 atttyps[i] = (Oid) pq_getmsgint(in, 4);
648
649 /* we ignore attribute mode for now */
650 (void) pq_getmsgint(in, 4);
651 }
652
653 rel->attnames = attnames;
654 rel->atttyps = atttyps;
655 rel->attkeys = attkeys;
656 rel->natts = natts;
657}
658
659/*
660 * Write the namespace name or empty string for pg_catalog (to save space).
661 */
662static void
663logicalrep_write_namespace(StringInfo out, Oid nspid)
664{
665 if (nspid == PG_CATALOG_NAMESPACE)
666 pq_sendbyte(out, '\0');
667 else
668 {
669 char *nspname = get_namespace_name(nspid);
670
671 if (nspname == NULL)
672 elog(ERROR, "cache lookup failed for namespace %u",
673 nspid);
674
675 pq_sendstring(out, nspname);
676 }
677}
678
679/*
680 * Read the namespace name while treating empty string as pg_catalog.
681 */
682static const char *
683logicalrep_read_namespace(StringInfo in)
684{
685 const char *nspname = pq_getmsgstring(in);
686
687 if (nspname[0] == '\0')
688 nspname = "pg_catalog";
689
690 return nspname;
691}
692