1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * proto.c |
4 | * logical replication protocol functions |
5 | * |
6 | * Copyright (c) 2015-2019, PostgreSQL Global Development Group |
7 | * |
8 | * IDENTIFICATION |
9 | * src/backend/replication/logical/proto.c |
10 | * |
11 | *------------------------------------------------------------------------- |
12 | */ |
13 | #include "postgres.h" |
14 | |
15 | #include "access/sysattr.h" |
16 | #include "catalog/pg_namespace.h" |
17 | #include "catalog/pg_type.h" |
18 | #include "libpq/pqformat.h" |
19 | #include "replication/logicalproto.h" |
20 | #include "utils/builtins.h" |
21 | #include "utils/lsyscache.h" |
22 | #include "utils/syscache.h" |
23 | |
24 | /* |
25 | * Protocol message flags. |
26 | */ |
27 | #define LOGICALREP_IS_REPLICA_IDENTITY 1 |
28 | |
29 | #define TRUNCATE_CASCADE (1<<0) |
30 | #define TRUNCATE_RESTART_SEQS (1<<1) |
31 | |
32 | static void logicalrep_write_attrs(StringInfo out, Relation rel); |
33 | static void logicalrep_write_tuple(StringInfo out, Relation rel, |
34 | HeapTuple tuple); |
35 | |
36 | static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); |
37 | static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); |
38 | |
39 | static void logicalrep_write_namespace(StringInfo out, Oid nspid); |
40 | static const char *logicalrep_read_namespace(StringInfo in); |
41 | |
42 | /* |
43 | * Write BEGIN to the output stream. |
44 | */ |
45 | void |
46 | logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn) |
47 | { |
48 | pq_sendbyte(out, 'B'); /* BEGIN */ |
49 | |
50 | /* fixed fields */ |
51 | pq_sendint64(out, txn->final_lsn); |
52 | pq_sendint64(out, txn->commit_time); |
53 | pq_sendint32(out, txn->xid); |
54 | } |
55 | |
56 | /* |
57 | * Read transaction BEGIN from the stream. |
58 | */ |
59 | void |
60 | logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data) |
61 | { |
62 | /* read fields */ |
63 | begin_data->final_lsn = pq_getmsgint64(in); |
64 | if (begin_data->final_lsn == InvalidXLogRecPtr) |
65 | elog(ERROR, "final_lsn not set in begin message" ); |
66 | begin_data->committime = pq_getmsgint64(in); |
67 | begin_data->xid = pq_getmsgint(in, 4); |
68 | } |
69 | |
70 | |
71 | /* |
72 | * Write COMMIT to the output stream. |
73 | */ |
74 | void |
75 | logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, |
76 | XLogRecPtr commit_lsn) |
77 | { |
78 | uint8 flags = 0; |
79 | |
80 | pq_sendbyte(out, 'C'); /* sending COMMIT */ |
81 | |
82 | /* send the flags field (unused for now) */ |
83 | pq_sendbyte(out, flags); |
84 | |
85 | /* send fields */ |
86 | pq_sendint64(out, commit_lsn); |
87 | pq_sendint64(out, txn->end_lsn); |
88 | pq_sendint64(out, txn->commit_time); |
89 | } |
90 | |
91 | /* |
92 | * Read transaction COMMIT from the stream. |
93 | */ |
94 | void |
95 | logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data) |
96 | { |
97 | /* read flags (unused for now) */ |
98 | uint8 flags = pq_getmsgbyte(in); |
99 | |
100 | if (flags != 0) |
101 | elog(ERROR, "unrecognized flags %u in commit message" , flags); |
102 | |
103 | /* read fields */ |
104 | commit_data->commit_lsn = pq_getmsgint64(in); |
105 | commit_data->end_lsn = pq_getmsgint64(in); |
106 | commit_data->committime = pq_getmsgint64(in); |
107 | } |
108 | |
109 | /* |
110 | * Write ORIGIN to the output stream. |
111 | */ |
112 | void |
113 | logicalrep_write_origin(StringInfo out, const char *origin, |
114 | XLogRecPtr origin_lsn) |
115 | { |
116 | pq_sendbyte(out, 'O'); /* ORIGIN */ |
117 | |
118 | /* fixed fields */ |
119 | pq_sendint64(out, origin_lsn); |
120 | |
121 | /* origin string */ |
122 | pq_sendstring(out, origin); |
123 | } |
124 | |
125 | /* |
126 | * Read ORIGIN from the output stream. |
127 | */ |
128 | char * |
129 | logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) |
130 | { |
131 | /* fixed fields */ |
132 | *origin_lsn = pq_getmsgint64(in); |
133 | |
134 | /* return origin */ |
135 | return pstrdup(pq_getmsgstring(in)); |
136 | } |
137 | |
138 | /* |
139 | * Write INSERT to the output stream. |
140 | */ |
141 | void |
142 | logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple) |
143 | { |
144 | pq_sendbyte(out, 'I'); /* action INSERT */ |
145 | |
146 | Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || |
147 | rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || |
148 | rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); |
149 | |
150 | /* use Oid as relation identifier */ |
151 | pq_sendint32(out, RelationGetRelid(rel)); |
152 | |
153 | pq_sendbyte(out, 'N'); /* new tuple follows */ |
154 | logicalrep_write_tuple(out, rel, newtuple); |
155 | } |
156 | |
157 | /* |
158 | * Read INSERT from stream. |
159 | * |
160 | * Fills the new tuple. |
161 | */ |
162 | LogicalRepRelId |
163 | logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) |
164 | { |
165 | char action; |
166 | LogicalRepRelId relid; |
167 | |
168 | /* read the relation id */ |
169 | relid = pq_getmsgint(in, 4); |
170 | |
171 | action = pq_getmsgbyte(in); |
172 | if (action != 'N') |
173 | elog(ERROR, "expected new tuple but got %d" , |
174 | action); |
175 | |
176 | logicalrep_read_tuple(in, newtup); |
177 | |
178 | return relid; |
179 | } |
180 | |
181 | /* |
182 | * Write UPDATE to the output stream. |
183 | */ |
184 | void |
185 | logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, |
186 | HeapTuple newtuple) |
187 | { |
188 | pq_sendbyte(out, 'U'); /* action UPDATE */ |
189 | |
190 | Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || |
191 | rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || |
192 | rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); |
193 | |
194 | /* use Oid as relation identifier */ |
195 | pq_sendint32(out, RelationGetRelid(rel)); |
196 | |
197 | if (oldtuple != NULL) |
198 | { |
199 | if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) |
200 | pq_sendbyte(out, 'O'); /* old tuple follows */ |
201 | else |
202 | pq_sendbyte(out, 'K'); /* old key follows */ |
203 | logicalrep_write_tuple(out, rel, oldtuple); |
204 | } |
205 | |
206 | pq_sendbyte(out, 'N'); /* new tuple follows */ |
207 | logicalrep_write_tuple(out, rel, newtuple); |
208 | } |
209 | |
210 | /* |
211 | * Read UPDATE from stream. |
212 | */ |
213 | LogicalRepRelId |
214 | logicalrep_read_update(StringInfo in, bool *has_oldtuple, |
215 | LogicalRepTupleData *oldtup, |
216 | LogicalRepTupleData *newtup) |
217 | { |
218 | char action; |
219 | LogicalRepRelId relid; |
220 | |
221 | /* read the relation id */ |
222 | relid = pq_getmsgint(in, 4); |
223 | |
224 | /* read and verify action */ |
225 | action = pq_getmsgbyte(in); |
226 | if (action != 'K' && action != 'O' && action != 'N') |
227 | elog(ERROR, "expected action 'N', 'O' or 'K', got %c" , |
228 | action); |
229 | |
230 | /* check for old tuple */ |
231 | if (action == 'K' || action == 'O') |
232 | { |
233 | logicalrep_read_tuple(in, oldtup); |
234 | *has_oldtuple = true; |
235 | |
236 | action = pq_getmsgbyte(in); |
237 | } |
238 | else |
239 | *has_oldtuple = false; |
240 | |
241 | /* check for new tuple */ |
242 | if (action != 'N') |
243 | elog(ERROR, "expected action 'N', got %c" , |
244 | action); |
245 | |
246 | logicalrep_read_tuple(in, newtup); |
247 | |
248 | return relid; |
249 | } |
250 | |
251 | /* |
252 | * Write DELETE to the output stream. |
253 | */ |
254 | void |
255 | logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple) |
256 | { |
257 | Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || |
258 | rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || |
259 | rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); |
260 | |
261 | pq_sendbyte(out, 'D'); /* action DELETE */ |
262 | |
263 | /* use Oid as relation identifier */ |
264 | pq_sendint32(out, RelationGetRelid(rel)); |
265 | |
266 | if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) |
267 | pq_sendbyte(out, 'O'); /* old tuple follows */ |
268 | else |
269 | pq_sendbyte(out, 'K'); /* old key follows */ |
270 | |
271 | logicalrep_write_tuple(out, rel, oldtuple); |
272 | } |
273 | |
274 | /* |
275 | * Read DELETE from stream. |
276 | * |
277 | * Fills the old tuple. |
278 | */ |
279 | LogicalRepRelId |
280 | logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) |
281 | { |
282 | char action; |
283 | LogicalRepRelId relid; |
284 | |
285 | /* read the relation id */ |
286 | relid = pq_getmsgint(in, 4); |
287 | |
288 | /* read and verify action */ |
289 | action = pq_getmsgbyte(in); |
290 | if (action != 'K' && action != 'O') |
291 | elog(ERROR, "expected action 'O' or 'K', got %c" , action); |
292 | |
293 | logicalrep_read_tuple(in, oldtup); |
294 | |
295 | return relid; |
296 | } |
297 | |
298 | /* |
299 | * Write TRUNCATE to the output stream. |
300 | */ |
301 | void |
302 | logicalrep_write_truncate(StringInfo out, |
303 | int nrelids, |
304 | Oid relids[], |
305 | bool cascade, bool restart_seqs) |
306 | { |
307 | int i; |
308 | uint8 flags = 0; |
309 | |
310 | pq_sendbyte(out, 'T'); /* action TRUNCATE */ |
311 | |
312 | pq_sendint32(out, nrelids); |
313 | |
314 | /* encode and send truncate flags */ |
315 | if (cascade) |
316 | flags |= TRUNCATE_CASCADE; |
317 | if (restart_seqs) |
318 | flags |= TRUNCATE_RESTART_SEQS; |
319 | pq_sendint8(out, flags); |
320 | |
321 | for (i = 0; i < nrelids; i++) |
322 | pq_sendint32(out, relids[i]); |
323 | } |
324 | |
325 | /* |
326 | * Read TRUNCATE from stream. |
327 | */ |
328 | List * |
329 | logicalrep_read_truncate(StringInfo in, |
330 | bool *cascade, bool *restart_seqs) |
331 | { |
332 | int i; |
333 | int nrelids; |
334 | List *relids = NIL; |
335 | uint8 flags; |
336 | |
337 | nrelids = pq_getmsgint(in, 4); |
338 | |
339 | /* read and decode truncate flags */ |
340 | flags = pq_getmsgint(in, 1); |
341 | *cascade = (flags & TRUNCATE_CASCADE) > 0; |
342 | *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0; |
343 | |
344 | for (i = 0; i < nrelids; i++) |
345 | relids = lappend_oid(relids, pq_getmsgint(in, 4)); |
346 | |
347 | return relids; |
348 | } |
349 | |
350 | /* |
351 | * Write relation description to the output stream. |
352 | */ |
353 | void |
354 | logicalrep_write_rel(StringInfo out, Relation rel) |
355 | { |
356 | char *relname; |
357 | |
358 | pq_sendbyte(out, 'R'); /* sending RELATION */ |
359 | |
360 | /* use Oid as relation identifier */ |
361 | pq_sendint32(out, RelationGetRelid(rel)); |
362 | |
363 | /* send qualified relation name */ |
364 | logicalrep_write_namespace(out, RelationGetNamespace(rel)); |
365 | relname = RelationGetRelationName(rel); |
366 | pq_sendstring(out, relname); |
367 | |
368 | /* send replica identity */ |
369 | pq_sendbyte(out, rel->rd_rel->relreplident); |
370 | |
371 | /* send the attribute info */ |
372 | logicalrep_write_attrs(out, rel); |
373 | } |
374 | |
375 | /* |
376 | * Read the relation info from stream and return as LogicalRepRelation. |
377 | */ |
378 | LogicalRepRelation * |
379 | logicalrep_read_rel(StringInfo in) |
380 | { |
381 | LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation)); |
382 | |
383 | rel->remoteid = pq_getmsgint(in, 4); |
384 | |
385 | /* Read relation name from stream */ |
386 | rel->nspname = pstrdup(logicalrep_read_namespace(in)); |
387 | rel->relname = pstrdup(pq_getmsgstring(in)); |
388 | |
389 | /* Read the replica identity. */ |
390 | rel->replident = pq_getmsgbyte(in); |
391 | |
392 | /* Get attribute description */ |
393 | logicalrep_read_attrs(in, rel); |
394 | |
395 | return rel; |
396 | } |
397 | |
398 | /* |
399 | * Write type info to the output stream. |
400 | * |
401 | * This function will always write base type info. |
402 | */ |
403 | void |
404 | logicalrep_write_typ(StringInfo out, Oid typoid) |
405 | { |
406 | Oid basetypoid = getBaseType(typoid); |
407 | HeapTuple tup; |
408 | Form_pg_type typtup; |
409 | |
410 | pq_sendbyte(out, 'Y'); /* sending TYPE */ |
411 | |
412 | tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid)); |
413 | if (!HeapTupleIsValid(tup)) |
414 | elog(ERROR, "cache lookup failed for type %u" , basetypoid); |
415 | typtup = (Form_pg_type) GETSTRUCT(tup); |
416 | |
417 | /* use Oid as relation identifier */ |
418 | pq_sendint32(out, typoid); |
419 | |
420 | /* send qualified type name */ |
421 | logicalrep_write_namespace(out, typtup->typnamespace); |
422 | pq_sendstring(out, NameStr(typtup->typname)); |
423 | |
424 | ReleaseSysCache(tup); |
425 | } |
426 | |
427 | /* |
428 | * Read type info from the output stream. |
429 | */ |
430 | void |
431 | logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) |
432 | { |
433 | ltyp->remoteid = pq_getmsgint(in, 4); |
434 | |
435 | /* Read type name from stream */ |
436 | ltyp->nspname = pstrdup(logicalrep_read_namespace(in)); |
437 | ltyp->typname = pstrdup(pq_getmsgstring(in)); |
438 | } |
439 | |
440 | /* |
441 | * Write a tuple to the outputstream, in the most efficient format possible. |
442 | */ |
443 | static void |
444 | logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) |
445 | { |
446 | TupleDesc desc; |
447 | Datum values[MaxTupleAttributeNumber]; |
448 | bool isnull[MaxTupleAttributeNumber]; |
449 | int i; |
450 | uint16 nliveatts = 0; |
451 | |
452 | desc = RelationGetDescr(rel); |
453 | |
454 | for (i = 0; i < desc->natts; i++) |
455 | { |
456 | if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) |
457 | continue; |
458 | nliveatts++; |
459 | } |
460 | pq_sendint16(out, nliveatts); |
461 | |
462 | /* try to allocate enough memory from the get-go */ |
463 | enlargeStringInfo(out, tuple->t_len + |
464 | nliveatts * (1 + 4)); |
465 | |
466 | heap_deform_tuple(tuple, desc, values, isnull); |
467 | |
468 | /* Write the values */ |
469 | for (i = 0; i < desc->natts; i++) |
470 | { |
471 | HeapTuple typtup; |
472 | Form_pg_type typclass; |
473 | Form_pg_attribute att = TupleDescAttr(desc, i); |
474 | char *outputstr; |
475 | |
476 | if (att->attisdropped || att->attgenerated) |
477 | continue; |
478 | |
479 | if (isnull[i]) |
480 | { |
481 | pq_sendbyte(out, 'n'); /* null column */ |
482 | continue; |
483 | } |
484 | else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i])) |
485 | { |
486 | pq_sendbyte(out, 'u'); /* unchanged toast column */ |
487 | continue; |
488 | } |
489 | |
490 | typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid)); |
491 | if (!HeapTupleIsValid(typtup)) |
492 | elog(ERROR, "cache lookup failed for type %u" , att->atttypid); |
493 | typclass = (Form_pg_type) GETSTRUCT(typtup); |
494 | |
495 | pq_sendbyte(out, 't'); /* 'text' data follows */ |
496 | |
497 | outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]); |
498 | pq_sendcountedtext(out, outputstr, strlen(outputstr), false); |
499 | pfree(outputstr); |
500 | |
501 | ReleaseSysCache(typtup); |
502 | } |
503 | } |
504 | |
505 | /* |
506 | * Read tuple in remote format from stream. |
507 | * |
508 | * The returned tuple points into the input stringinfo. |
509 | */ |
510 | static void |
511 | logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) |
512 | { |
513 | int i; |
514 | int natts; |
515 | |
516 | /* Get number of attributes */ |
517 | natts = pq_getmsgint(in, 2); |
518 | |
519 | memset(tuple->changed, 0, sizeof(tuple->changed)); |
520 | |
521 | /* Read the data */ |
522 | for (i = 0; i < natts; i++) |
523 | { |
524 | char kind; |
525 | |
526 | kind = pq_getmsgbyte(in); |
527 | |
528 | switch (kind) |
529 | { |
530 | case 'n': /* null */ |
531 | tuple->values[i] = NULL; |
532 | tuple->changed[i] = true; |
533 | break; |
534 | case 'u': /* unchanged column */ |
535 | /* we don't receive the value of an unchanged column */ |
536 | tuple->values[i] = NULL; |
537 | break; |
538 | case 't': /* text formatted value */ |
539 | { |
540 | int len; |
541 | |
542 | tuple->changed[i] = true; |
543 | |
544 | len = pq_getmsgint(in, 4); /* read length */ |
545 | |
546 | /* and data */ |
547 | tuple->values[i] = palloc(len + 1); |
548 | pq_copymsgbytes(in, tuple->values[i], len); |
549 | tuple->values[i][len] = '\0'; |
550 | } |
551 | break; |
552 | default: |
553 | elog(ERROR, "unrecognized data representation type '%c'" , kind); |
554 | } |
555 | } |
556 | } |
557 | |
558 | /* |
559 | * Write relation attributes to the stream. |
560 | */ |
561 | static void |
562 | logicalrep_write_attrs(StringInfo out, Relation rel) |
563 | { |
564 | TupleDesc desc; |
565 | int i; |
566 | uint16 nliveatts = 0; |
567 | Bitmapset *idattrs = NULL; |
568 | bool replidentfull; |
569 | |
570 | desc = RelationGetDescr(rel); |
571 | |
572 | /* send number of live attributes */ |
573 | for (i = 0; i < desc->natts; i++) |
574 | { |
575 | if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) |
576 | continue; |
577 | nliveatts++; |
578 | } |
579 | pq_sendint16(out, nliveatts); |
580 | |
581 | /* fetch bitmap of REPLICATION IDENTITY attributes */ |
582 | replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); |
583 | if (!replidentfull) |
584 | idattrs = RelationGetIndexAttrBitmap(rel, |
585 | INDEX_ATTR_BITMAP_IDENTITY_KEY); |
586 | |
587 | /* send the attributes */ |
588 | for (i = 0; i < desc->natts; i++) |
589 | { |
590 | Form_pg_attribute att = TupleDescAttr(desc, i); |
591 | uint8 flags = 0; |
592 | |
593 | if (att->attisdropped || att->attgenerated) |
594 | continue; |
595 | |
596 | /* REPLICA IDENTITY FULL means all columns are sent as part of key. */ |
597 | if (replidentfull || |
598 | bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, |
599 | idattrs)) |
600 | flags |= LOGICALREP_IS_REPLICA_IDENTITY; |
601 | |
602 | pq_sendbyte(out, flags); |
603 | |
604 | /* attribute name */ |
605 | pq_sendstring(out, NameStr(att->attname)); |
606 | |
607 | /* attribute type id */ |
608 | pq_sendint32(out, (int) att->atttypid); |
609 | |
610 | /* attribute mode */ |
611 | pq_sendint32(out, att->atttypmod); |
612 | } |
613 | |
614 | bms_free(idattrs); |
615 | } |
616 | |
617 | /* |
618 | * Read relation attribute names from the stream. |
619 | */ |
620 | static void |
621 | logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel) |
622 | { |
623 | int i; |
624 | int natts; |
625 | char **attnames; |
626 | Oid *atttyps; |
627 | Bitmapset *attkeys = NULL; |
628 | |
629 | natts = pq_getmsgint(in, 2); |
630 | attnames = palloc(natts * sizeof(char *)); |
631 | atttyps = palloc(natts * sizeof(Oid)); |
632 | |
633 | /* read the attributes */ |
634 | for (i = 0; i < natts; i++) |
635 | { |
636 | uint8 flags; |
637 | |
638 | /* Check for replica identity column */ |
639 | flags = pq_getmsgbyte(in); |
640 | if (flags & LOGICALREP_IS_REPLICA_IDENTITY) |
641 | attkeys = bms_add_member(attkeys, i); |
642 | |
643 | /* attribute name */ |
644 | attnames[i] = pstrdup(pq_getmsgstring(in)); |
645 | |
646 | /* attribute type id */ |
647 | atttyps[i] = (Oid) pq_getmsgint(in, 4); |
648 | |
649 | /* we ignore attribute mode for now */ |
650 | (void) pq_getmsgint(in, 4); |
651 | } |
652 | |
653 | rel->attnames = attnames; |
654 | rel->atttyps = atttyps; |
655 | rel->attkeys = attkeys; |
656 | rel->natts = natts; |
657 | } |
658 | |
659 | /* |
660 | * Write the namespace name or empty string for pg_catalog (to save space). |
661 | */ |
662 | static void |
663 | logicalrep_write_namespace(StringInfo out, Oid nspid) |
664 | { |
665 | if (nspid == PG_CATALOG_NAMESPACE) |
666 | pq_sendbyte(out, '\0'); |
667 | else |
668 | { |
669 | char *nspname = get_namespace_name(nspid); |
670 | |
671 | if (nspname == NULL) |
672 | elog(ERROR, "cache lookup failed for namespace %u" , |
673 | nspid); |
674 | |
675 | pq_sendstring(out, nspname); |
676 | } |
677 | } |
678 | |
679 | /* |
680 | * Read the namespace name while treating empty string as pg_catalog. |
681 | */ |
682 | static const char * |
683 | logicalrep_read_namespace(StringInfo in) |
684 | { |
685 | const char *nspname = pq_getmsgstring(in); |
686 | |
687 | if (nspname[0] == '\0') |
688 | nspname = "pg_catalog" ; |
689 | |
690 | return nspname; |
691 | } |
692 | |