1/*-------------------------------------------------------------------------
2 *
3 * copy.c
4 * Implements the COPY utility command
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/commands/copy.c
12 *
13 *-------------------------------------------------------------------------
14 */
15#include "postgres.h"
16
17#include <ctype.h>
18#include <unistd.h>
19#include <sys/stat.h>
20
21#include "access/heapam.h"
22#include "access/htup_details.h"
23#include "access/sysattr.h"
24#include "access/tableam.h"
25#include "access/xact.h"
26#include "access/xlog.h"
27#include "catalog/dependency.h"
28#include "catalog/pg_authid.h"
29#include "catalog/pg_type.h"
30#include "commands/copy.h"
31#include "commands/defrem.h"
32#include "commands/trigger.h"
33#include "executor/execPartition.h"
34#include "executor/executor.h"
35#include "executor/nodeModifyTable.h"
36#include "executor/tuptable.h"
37#include "foreign/fdwapi.h"
38#include "libpq/libpq.h"
39#include "libpq/pqformat.h"
40#include "mb/pg_wchar.h"
41#include "miscadmin.h"
42#include "optimizer/optimizer.h"
43#include "nodes/makefuncs.h"
44#include "parser/parse_coerce.h"
45#include "parser/parse_collate.h"
46#include "parser/parse_expr.h"
47#include "parser/parse_relation.h"
48#include "port/pg_bswap.h"
49#include "rewrite/rewriteHandler.h"
50#include "storage/fd.h"
51#include "tcop/tcopprot.h"
52#include "utils/builtins.h"
53#include "utils/lsyscache.h"
54#include "utils/memutils.h"
55#include "utils/partcache.h"
56#include "utils/portal.h"
57#include "utils/rel.h"
58#include "utils/rls.h"
59#include "utils/snapmgr.h"
60
61
62#define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
63#define OCTVALUE(c) ((c) - '0')
64
65/*
66 * Represents the different source/dest cases we need to worry about at
67 * the bottom level
68 */
69typedef enum CopyDest
70{
71 COPY_FILE, /* to/from file (or a piped program) */
72 COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
73 COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
74 COPY_CALLBACK /* to/from callback function */
75} CopyDest;
76
77/*
78 * Represents the end-of-line terminator type of the input
79 */
80typedef enum EolType
81{
82 EOL_UNKNOWN,
83 EOL_NL,
84 EOL_CR,
85 EOL_CRNL
86} EolType;
87
88/*
89 * Represents the heap insert method to be used during COPY FROM.
90 */
91typedef enum CopyInsertMethod
92{
93 CIM_SINGLE, /* use table_tuple_insert or fdw routine */
94 CIM_MULTI, /* always use table_multi_insert */
95 CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */
96} CopyInsertMethod;
97
98/*
99 * This struct contains all the state variables used throughout a COPY
100 * operation. For simplicity, we use the same struct for all variants of COPY,
101 * even though some fields are used in only some cases.
102 *
103 * Multi-byte encodings: all supported client-side encodings encode multi-byte
104 * characters by having the first byte's high bit set. Subsequent bytes of the
105 * character can have the high bit not set. When scanning data in such an
106 * encoding to look for a match to a single-byte (ie ASCII) character, we must
107 * use the full pg_encoding_mblen() machinery to skip over multibyte
108 * characters, else we might find a false match to a trailing byte. In
109 * supported server encodings, there is no possibility of a false match, and
110 * it's faster to make useless comparisons to trailing bytes than it is to
111 * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
112 * when we have to do it the hard way.
113 */
114typedef struct CopyStateData
115{
116 /* low-level state data */
117 CopyDest copy_dest; /* type of copy source/destination */
118 FILE *copy_file; /* used if copy_dest == COPY_FILE */
119 StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
120 * dest == COPY_NEW_FE in COPY FROM */
121 bool is_copy_from; /* COPY TO, or COPY FROM? */
122 bool reached_eof; /* true if we read to end of copy data (not
123 * all copy_dest types maintain this) */
124 EolType eol_type; /* EOL type of input */
125 int file_encoding; /* file or remote side's character encoding */
126 bool need_transcoding; /* file encoding diff from server? */
127 bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
128
129 /* parameters from the COPY command */
130 Relation rel; /* relation to copy to or from */
131 QueryDesc *queryDesc; /* executable query to copy from */
132 List *attnumlist; /* integer list of attnums to copy */
133 char *filename; /* filename, or NULL for STDIN/STDOUT */
134 bool is_program; /* is 'filename' a program to popen? */
135 copy_data_source_cb data_source_cb; /* function for reading data */
136 bool binary; /* binary format? */
137 bool freeze; /* freeze rows on loading? */
138 bool csv_mode; /* Comma Separated Value format? */
139 bool header_line; /* CSV header line? */
140 char *null_print; /* NULL marker string (server encoding!) */
141 int null_print_len; /* length of same */
142 char *null_print_client; /* same converted to file encoding */
143 char *delim; /* column delimiter (must be 1 byte) */
144 char *quote; /* CSV quote char (must be 1 byte) */
145 char *escape; /* CSV escape char (must be 1 byte) */
146 List *force_quote; /* list of column names */
147 bool force_quote_all; /* FORCE_QUOTE *? */
148 bool *force_quote_flags; /* per-column CSV FQ flags */
149 List *force_notnull; /* list of column names */
150 bool *force_notnull_flags; /* per-column CSV FNN flags */
151 List *force_null; /* list of column names */
152 bool *force_null_flags; /* per-column CSV FN flags */
153 bool convert_selectively; /* do selective binary conversion? */
154 List *convert_select; /* list of column names (can be NIL) */
155 bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
156 Node *whereClause; /* WHERE condition (or NULL) */
157
158 /* these are just for error messages, see CopyFromErrorCallback */
159 const char *cur_relname; /* table name for error messages */
160 uint64 cur_lineno; /* line number for error messages */
161 const char *cur_attname; /* current att for error messages */
162 const char *cur_attval; /* current att value for error messages */
163
164 /*
165 * Working state for COPY TO/FROM
166 */
167 MemoryContext copycontext; /* per-copy execution context */
168
169 /*
170 * Working state for COPY TO
171 */
172 FmgrInfo *out_functions; /* lookup info for output functions */
173 MemoryContext rowcontext; /* per-row evaluation context */
174
175 /*
176 * Working state for COPY FROM
177 */
178 AttrNumber num_defaults;
179 FmgrInfo oid_in_function;
180 FmgrInfo *in_functions; /* array of input functions for each attrs */
181 Oid *typioparams; /* array of element types for in_functions */
182 int *defmap; /* array of default att numbers */
183 ExprState **defexprs; /* array of default att expressions */
184 bool volatile_defexprs; /* is any of defexprs volatile? */
185 List *range_table;
186 ExprState *qualexpr;
187
188 TransitionCaptureState *transition_capture;
189
190 /*
191 * These variables are used to reduce overhead in textual COPY FROM.
192 *
193 * attribute_buf holds the separated, de-escaped text for each field of
194 * the current line. The CopyReadAttributes functions return arrays of
195 * pointers into this buffer. We avoid palloc/pfree overhead by re-using
196 * the buffer on each cycle.
197 */
198 StringInfoData attribute_buf;
199
200 /* field raw data pointers found by COPY FROM */
201
202 int max_fields;
203 char **raw_fields;
204
205 /*
206 * Similarly, line_buf holds the whole input line being processed. The
207 * input cycle is first to read the whole line into line_buf, convert it
208 * to server encoding there, and then extract the individual attribute
209 * fields into attribute_buf. line_buf is preserved unmodified so that we
210 * can display it in error messages if appropriate.
211 */
212 StringInfoData line_buf;
213 bool line_buf_converted; /* converted to server encoding? */
214 bool line_buf_valid; /* contains the row being processed? */
215
216 /*
217 * Finally, raw_buf holds raw data read from the data source (file or
218 * client connection). CopyReadLine parses this data sufficiently to
219 * locate line boundaries, then transfers the data to line_buf and
220 * converts it. Note: we guarantee that there is a \0 at
221 * raw_buf[raw_buf_len].
222 */
223#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
224 char *raw_buf;
225 int raw_buf_index; /* next byte to process */
226 int raw_buf_len; /* total # of bytes stored */
227} CopyStateData;
228
229/* DestReceiver for COPY (query) TO */
230typedef struct
231{
232 DestReceiver pub; /* publicly-known function pointers */
233 CopyState cstate; /* CopyStateData for the command */
234 uint64 processed; /* # of tuples processed */
235} DR_copy;
236
237
238/*
239 * No more than this many tuples per CopyMultiInsertBuffer
240 *
241 * Caution: Don't make this too big, as we could end up with this many
242 * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
243 * multiInsertBuffers list. Increasing this can cause quadratic growth in
244 * memory requirements during copies into partitioned tables with a large
245 * number of partitions.
246 */
247#define MAX_BUFFERED_TUPLES 1000
248
249/*
250 * Flush buffers if there are >= this many bytes, as counted by the input
251 * size, of tuples stored.
252 */
253#define MAX_BUFFERED_BYTES 65535
254
255/* Trim the list of buffers back down to this number after flushing */
256#define MAX_PARTITION_BUFFERS 32
257
258/* Stores multi-insert data related to a single relation in CopyFrom. */
259typedef struct CopyMultiInsertBuffer
260{
261 TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
262 ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
263 BulkInsertState bistate; /* BulkInsertState for this rel */
264 int nused; /* number of 'slots' containing tuples */
265 uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
266 * stream */
267} CopyMultiInsertBuffer;
268
269/*
270 * Stores one or many CopyMultiInsertBuffers and details about the size and
271 * number of tuples which are stored in them. This allows multiple buffers to
272 * exist at once when COPYing into a partitioned table.
273 */
274typedef struct CopyMultiInsertInfo
275{
276 List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
277 int bufferedTuples; /* number of tuples buffered over all buffers */
278 int bufferedBytes; /* number of bytes from all buffered tuples */
279 CopyState cstate; /* Copy state for this CopyMultiInsertInfo */
280 EState *estate; /* Executor state used for COPY */
281 CommandId mycid; /* Command Id used for COPY */
282 int ti_options; /* table insert options */
283} CopyMultiInsertInfo;
284
285
286/*
287 * These macros centralize code used to process line_buf and raw_buf buffers.
288 * They are macros because they often do continue/break control and to avoid
289 * function call overhead in tight COPY loops.
290 *
291 * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
292 * prevent the continue/break processing from working. We end the "if (1)"
293 * with "else ((void) 0)" to ensure the "if" does not unintentionally match
294 * any "else" in the calling code, and to avoid any compiler warnings about
295 * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
296 */
297
298/*
299 * This keeps the character read at the top of the loop in the buffer
300 * even if there is more than one read-ahead.
301 */
302#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
303if (1) \
304{ \
305 if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
306 { \
307 raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
308 need_data = true; \
309 continue; \
310 } \
311} else ((void) 0)
312
313/* This consumes the remainder of the buffer and breaks */
314#define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
315if (1) \
316{ \
317 if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
318 { \
319 if (extralen) \
320 raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
321 /* backslash just before EOF, treat as data char */ \
322 result = true; \
323 break; \
324 } \
325} else ((void) 0)
326
327/*
328 * Transfer any approved data to line_buf; must do this to be sure
329 * there is some room in raw_buf.
330 */
331#define REFILL_LINEBUF \
332if (1) \
333{ \
334 if (raw_buf_ptr > cstate->raw_buf_index) \
335 { \
336 appendBinaryStringInfo(&cstate->line_buf, \
337 cstate->raw_buf + cstate->raw_buf_index, \
338 raw_buf_ptr - cstate->raw_buf_index); \
339 cstate->raw_buf_index = raw_buf_ptr; \
340 } \
341} else ((void) 0)
342
343/* Undo any read-ahead and jump out of the block. */
344#define NO_END_OF_COPY_GOTO \
345if (1) \
346{ \
347 raw_buf_ptr = prev_raw_ptr + 1; \
348 goto not_end_of_copy; \
349} else ((void) 0)
350
351static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
352
353
354/* non-export function prototypes */
355static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
356 RawStmt *raw_query, Oid queryRelId, List *attnamelist,
357 List *options);
358static void EndCopy(CopyState cstate);
359static void ClosePipeToProgram(CopyState cstate);
360static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
361 Oid queryRelId, const char *filename, bool is_program,
362 List *attnamelist, List *options);
363static void EndCopyTo(CopyState cstate);
364static uint64 DoCopyTo(CopyState cstate);
365static uint64 CopyTo(CopyState cstate);
366static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot);
367static bool CopyReadLine(CopyState cstate);
368static bool CopyReadLineText(CopyState cstate);
369static int CopyReadAttributesText(CopyState cstate);
370static int CopyReadAttributesCSV(CopyState cstate);
371static Datum CopyReadBinaryAttribute(CopyState cstate,
372 int column_no, FmgrInfo *flinfo,
373 Oid typioparam, int32 typmod,
374 bool *isnull);
375static void CopyAttributeOutText(CopyState cstate, char *string);
376static void CopyAttributeOutCSV(CopyState cstate, char *string,
377 bool use_quote, bool single_attr);
378static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
379 List *attnamelist);
380static char *limit_printout_length(const char *str);
381
382/* Low-level communications functions */
383static void SendCopyBegin(CopyState cstate);
384static void ReceiveCopyBegin(CopyState cstate);
385static void SendCopyEnd(CopyState cstate);
386static void CopySendData(CopyState cstate, const void *databuf, int datasize);
387static void CopySendString(CopyState cstate, const char *str);
388static void CopySendChar(CopyState cstate, char c);
389static void CopySendEndOfRow(CopyState cstate);
390static int CopyGetData(CopyState cstate, void *databuf,
391 int minread, int maxread);
392static void CopySendInt32(CopyState cstate, int32 val);
393static bool CopyGetInt32(CopyState cstate, int32 *val);
394static void CopySendInt16(CopyState cstate, int16 val);
395static bool CopyGetInt16(CopyState cstate, int16 *val);
396
397
398/*
399 * Send copy start/stop messages for frontend copies. These have changed
400 * in past protocol redesigns.
401 */
402static void
403SendCopyBegin(CopyState cstate)
404{
405 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
406 {
407 /* new way */
408 StringInfoData buf;
409 int natts = list_length(cstate->attnumlist);
410 int16 format = (cstate->binary ? 1 : 0);
411 int i;
412
413 pq_beginmessage(&buf, 'H');
414 pq_sendbyte(&buf, format); /* overall format */
415 pq_sendint16(&buf, natts);
416 for (i = 0; i < natts; i++)
417 pq_sendint16(&buf, format); /* per-column formats */
418 pq_endmessage(&buf);
419 cstate->copy_dest = COPY_NEW_FE;
420 }
421 else
422 {
423 /* old way */
424 if (cstate->binary)
425 ereport(ERROR,
426 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
427 errmsg("COPY BINARY is not supported to stdout or from stdin")));
428 pq_putemptymessage('H');
429 /* grottiness needed for old COPY OUT protocol */
430 pq_startcopyout();
431 cstate->copy_dest = COPY_OLD_FE;
432 }
433}
434
435static void
436ReceiveCopyBegin(CopyState cstate)
437{
438 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
439 {
440 /* new way */
441 StringInfoData buf;
442 int natts = list_length(cstate->attnumlist);
443 int16 format = (cstate->binary ? 1 : 0);
444 int i;
445
446 pq_beginmessage(&buf, 'G');
447 pq_sendbyte(&buf, format); /* overall format */
448 pq_sendint16(&buf, natts);
449 for (i = 0; i < natts; i++)
450 pq_sendint16(&buf, format); /* per-column formats */
451 pq_endmessage(&buf);
452 cstate->copy_dest = COPY_NEW_FE;
453 cstate->fe_msgbuf = makeStringInfo();
454 }
455 else
456 {
457 /* old way */
458 if (cstate->binary)
459 ereport(ERROR,
460 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
461 errmsg("COPY BINARY is not supported to stdout or from stdin")));
462 pq_putemptymessage('G');
463 /* any error in old protocol will make us lose sync */
464 pq_startmsgread();
465 cstate->copy_dest = COPY_OLD_FE;
466 }
467 /* We *must* flush here to ensure FE knows it can send. */
468 pq_flush();
469}
470
471static void
472SendCopyEnd(CopyState cstate)
473{
474 if (cstate->copy_dest == COPY_NEW_FE)
475 {
476 /* Shouldn't have any unsent data */
477 Assert(cstate->fe_msgbuf->len == 0);
478 /* Send Copy Done message */
479 pq_putemptymessage('c');
480 }
481 else
482 {
483 CopySendData(cstate, "\\.", 2);
484 /* Need to flush out the trailer (this also appends a newline) */
485 CopySendEndOfRow(cstate);
486 pq_endcopyout(false);
487 }
488}
489
490/*----------
491 * CopySendData sends output data to the destination (file or frontend)
492 * CopySendString does the same for null-terminated strings
493 * CopySendChar does the same for single characters
494 * CopySendEndOfRow does the appropriate thing at end of each data row
495 * (data is not actually flushed except by CopySendEndOfRow)
496 *
497 * NB: no data conversion is applied by these functions
498 *----------
499 */
500static void
501CopySendData(CopyState cstate, const void *databuf, int datasize)
502{
503 appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
504}
505
506static void
507CopySendString(CopyState cstate, const char *str)
508{
509 appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
510}
511
512static void
513CopySendChar(CopyState cstate, char c)
514{
515 appendStringInfoCharMacro(cstate->fe_msgbuf, c);
516}
517
518static void
519CopySendEndOfRow(CopyState cstate)
520{
521 StringInfo fe_msgbuf = cstate->fe_msgbuf;
522
523 switch (cstate->copy_dest)
524 {
525 case COPY_FILE:
526 if (!cstate->binary)
527 {
528 /* Default line termination depends on platform */
529#ifndef WIN32
530 CopySendChar(cstate, '\n');
531#else
532 CopySendString(cstate, "\r\n");
533#endif
534 }
535
536 if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
537 cstate->copy_file) != 1 ||
538 ferror(cstate->copy_file))
539 {
540 if (cstate->is_program)
541 {
542 if (errno == EPIPE)
543 {
544 /*
545 * The pipe will be closed automatically on error at
546 * the end of transaction, but we might get a better
547 * error message from the subprocess' exit code than
548 * just "Broken Pipe"
549 */
550 ClosePipeToProgram(cstate);
551
552 /*
553 * If ClosePipeToProgram() didn't throw an error, the
554 * program terminated normally, but closed the pipe
555 * first. Restore errno, and throw an error.
556 */
557 errno = EPIPE;
558 }
559 ereport(ERROR,
560 (errcode_for_file_access(),
561 errmsg("could not write to COPY program: %m")));
562 }
563 else
564 ereport(ERROR,
565 (errcode_for_file_access(),
566 errmsg("could not write to COPY file: %m")));
567 }
568 break;
569 case COPY_OLD_FE:
570 /* The FE/BE protocol uses \n as newline for all platforms */
571 if (!cstate->binary)
572 CopySendChar(cstate, '\n');
573
574 if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
575 {
576 /* no hope of recovering connection sync, so FATAL */
577 ereport(FATAL,
578 (errcode(ERRCODE_CONNECTION_FAILURE),
579 errmsg("connection lost during COPY to stdout")));
580 }
581 break;
582 case COPY_NEW_FE:
583 /* The FE/BE protocol uses \n as newline for all platforms */
584 if (!cstate->binary)
585 CopySendChar(cstate, '\n');
586
587 /* Dump the accumulated row as one CopyData message */
588 (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
589 break;
590 case COPY_CALLBACK:
591 Assert(false); /* Not yet supported. */
592 break;
593 }
594
595 resetStringInfo(fe_msgbuf);
596}
597
598/*
599 * CopyGetData reads data from the source (file or frontend)
600 *
601 * We attempt to read at least minread, and at most maxread, bytes from
602 * the source. The actual number of bytes read is returned; if this is
603 * less than minread, EOF was detected.
604 *
605 * Note: when copying from the frontend, we expect a proper EOF mark per
606 * protocol; if the frontend simply drops the connection, we raise error.
607 * It seems unwise to allow the COPY IN to complete normally in that case.
608 *
609 * NB: no data conversion is applied here.
610 */
611static int
612CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
613{
614 int bytesread = 0;
615
616 switch (cstate->copy_dest)
617 {
618 case COPY_FILE:
619 bytesread = fread(databuf, 1, maxread, cstate->copy_file);
620 if (ferror(cstate->copy_file))
621 ereport(ERROR,
622 (errcode_for_file_access(),
623 errmsg("could not read from COPY file: %m")));
624 if (bytesread == 0)
625 cstate->reached_eof = true;
626 break;
627 case COPY_OLD_FE:
628
629 /*
630 * We cannot read more than minread bytes (which in practice is 1)
631 * because old protocol doesn't have any clear way of separating
632 * the COPY stream from following data. This is slow, but not any
633 * slower than the code path was originally, and we don't care
634 * much anymore about the performance of old protocol.
635 */
636 if (pq_getbytes((char *) databuf, minread))
637 {
638 /* Only a \. terminator is legal EOF in old protocol */
639 ereport(ERROR,
640 (errcode(ERRCODE_CONNECTION_FAILURE),
641 errmsg("unexpected EOF on client connection with an open transaction")));
642 }
643 bytesread = minread;
644 break;
645 case COPY_NEW_FE:
646 while (maxread > 0 && bytesread < minread && !cstate->reached_eof)
647 {
648 int avail;
649
650 while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
651 {
652 /* Try to receive another message */
653 int mtype;
654
655 readmessage:
656 HOLD_CANCEL_INTERRUPTS();
657 pq_startmsgread();
658 mtype = pq_getbyte();
659 if (mtype == EOF)
660 ereport(ERROR,
661 (errcode(ERRCODE_CONNECTION_FAILURE),
662 errmsg("unexpected EOF on client connection with an open transaction")));
663 if (pq_getmessage(cstate->fe_msgbuf, 0))
664 ereport(ERROR,
665 (errcode(ERRCODE_CONNECTION_FAILURE),
666 errmsg("unexpected EOF on client connection with an open transaction")));
667 RESUME_CANCEL_INTERRUPTS();
668 switch (mtype)
669 {
670 case 'd': /* CopyData */
671 break;
672 case 'c': /* CopyDone */
673 /* COPY IN correctly terminated by frontend */
674 cstate->reached_eof = true;
675 return bytesread;
676 case 'f': /* CopyFail */
677 ereport(ERROR,
678 (errcode(ERRCODE_QUERY_CANCELED),
679 errmsg("COPY from stdin failed: %s",
680 pq_getmsgstring(cstate->fe_msgbuf))));
681 break;
682 case 'H': /* Flush */
683 case 'S': /* Sync */
684
685 /*
686 * Ignore Flush/Sync for the convenience of client
687 * libraries (such as libpq) that may send those
688 * without noticing that the command they just
689 * sent was COPY.
690 */
691 goto readmessage;
692 default:
693 ereport(ERROR,
694 (errcode(ERRCODE_PROTOCOL_VIOLATION),
695 errmsg("unexpected message type 0x%02X during COPY from stdin",
696 mtype)));
697 break;
698 }
699 }
700 avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
701 if (avail > maxread)
702 avail = maxread;
703 pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
704 databuf = (void *) ((char *) databuf + avail);
705 maxread -= avail;
706 bytesread += avail;
707 }
708 break;
709 case COPY_CALLBACK:
710 bytesread = cstate->data_source_cb(databuf, minread, maxread);
711 break;
712 }
713
714 return bytesread;
715}
716
717
718/*
719 * These functions do apply some data conversion
720 */
721
722/*
723 * CopySendInt32 sends an int32 in network byte order
724 */
725static void
726CopySendInt32(CopyState cstate, int32 val)
727{
728 uint32 buf;
729
730 buf = pg_hton32((uint32) val);
731 CopySendData(cstate, &buf, sizeof(buf));
732}
733
734/*
735 * CopyGetInt32 reads an int32 that appears in network byte order
736 *
737 * Returns true if OK, false if EOF
738 */
739static bool
740CopyGetInt32(CopyState cstate, int32 *val)
741{
742 uint32 buf;
743
744 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
745 {
746 *val = 0; /* suppress compiler warning */
747 return false;
748 }
749 *val = (int32) pg_ntoh32(buf);
750 return true;
751}
752
753/*
754 * CopySendInt16 sends an int16 in network byte order
755 */
756static void
757CopySendInt16(CopyState cstate, int16 val)
758{
759 uint16 buf;
760
761 buf = pg_hton16((uint16) val);
762 CopySendData(cstate, &buf, sizeof(buf));
763}
764
765/*
766 * CopyGetInt16 reads an int16 that appears in network byte order
767 */
768static bool
769CopyGetInt16(CopyState cstate, int16 *val)
770{
771 uint16 buf;
772
773 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
774 {
775 *val = 0; /* suppress compiler warning */
776 return false;
777 }
778 *val = (int16) pg_ntoh16(buf);
779 return true;
780}
781
782
783/*
784 * CopyLoadRawBuf loads some more data into raw_buf
785 *
786 * Returns true if able to obtain at least one more byte, else false.
787 *
788 * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
789 * down to the start of the buffer and then we load more data after that.
790 * This case is used only when a frontend multibyte character crosses a
791 * bufferload boundary.
792 */
793static bool
794CopyLoadRawBuf(CopyState cstate)
795{
796 int nbytes;
797 int inbytes;
798
799 if (cstate->raw_buf_index < cstate->raw_buf_len)
800 {
801 /* Copy down the unprocessed data */
802 nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
803 memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
804 nbytes);
805 }
806 else
807 nbytes = 0; /* no data need be saved */
808
809 inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
810 1, RAW_BUF_SIZE - nbytes);
811 nbytes += inbytes;
812 cstate->raw_buf[nbytes] = '\0';
813 cstate->raw_buf_index = 0;
814 cstate->raw_buf_len = nbytes;
815 return (inbytes > 0);
816}
817
818
819/*
820 * DoCopy executes the SQL COPY statement
821 *
822 * Either unload or reload contents of table <relation>, depending on <from>.
823 * (<from> = true means we are inserting into the table.) In the "TO" case
824 * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
825 * or DELETE query.
826 *
827 * If <pipe> is false, transfer is between the table and the file named
828 * <filename>. Otherwise, transfer is between the table and our regular
829 * input/output stream. The latter could be either stdin/stdout or a
830 * socket, depending on whether we're running under Postmaster control.
831 *
832 * Do not allow a Postgres user without the 'pg_read_server_files' or
833 * 'pg_write_server_files' role to read from or write to a file.
834 *
835 * Do not allow the copy if user doesn't have proper permission to access
836 * the table or the specifically requested columns.
837 */
838void
839DoCopy(ParseState *pstate, const CopyStmt *stmt,
840 int stmt_location, int stmt_len,
841 uint64 *processed)
842{
843 CopyState cstate;
844 bool is_from = stmt->is_from;
845 bool pipe = (stmt->filename == NULL);
846 Relation rel;
847 Oid relid;
848 RawStmt *query = NULL;
849 Node *whereClause = NULL;
850
851 /*
852 * Disallow COPY to/from file or program except to users with the
853 * appropriate role.
854 */
855 if (!pipe)
856 {
857 if (stmt->is_program)
858 {
859 if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_EXECUTE_SERVER_PROGRAM))
860 ereport(ERROR,
861 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
862 errmsg("must be superuser or a member of the pg_execute_server_program role to COPY to or from an external program"),
863 errhint("Anyone can COPY to stdout or from stdin. "
864 "psql's \\copy command also works for anyone.")));
865 }
866 else
867 {
868 if (is_from && !is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_SERVER_FILES))
869 ereport(ERROR,
870 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
871 errmsg("must be superuser or a member of the pg_read_server_files role to COPY from a file"),
872 errhint("Anyone can COPY to stdout or from stdin. "
873 "psql's \\copy command also works for anyone.")));
874
875 if (!is_from && !is_member_of_role(GetUserId(), DEFAULT_ROLE_WRITE_SERVER_FILES))
876 ereport(ERROR,
877 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
878 errmsg("must be superuser or a member of the pg_write_server_files role to COPY to a file"),
879 errhint("Anyone can COPY to stdout or from stdin. "
880 "psql's \\copy command also works for anyone.")));
881 }
882 }
883
884 if (stmt->relation)
885 {
886 LOCKMODE lockmode = is_from ? RowExclusiveLock : AccessShareLock;
887 RangeTblEntry *rte;
888 TupleDesc tupDesc;
889 List *attnums;
890 ListCell *cur;
891
892 Assert(!stmt->query);
893
894 /* Open and lock the relation, using the appropriate lock type. */
895 rel = table_openrv(stmt->relation, lockmode);
896
897 relid = RelationGetRelid(rel);
898
899 rte = addRangeTableEntryForRelation(pstate, rel, lockmode,
900 NULL, false, false);
901 rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
902
903 if (stmt->whereClause)
904 {
905 /* add rte to column namespace */
906 addRTEtoQuery(pstate, rte, false, true, true);
907
908 /* Transform the raw expression tree */
909 whereClause = transformExpr(pstate, stmt->whereClause, EXPR_KIND_COPY_WHERE);
910
911 /* Make sure it yields a boolean result. */
912 whereClause = coerce_to_boolean(pstate, whereClause, "WHERE");
913
914 /* we have to fix its collations too */
915 assign_expr_collations(pstate, whereClause);
916
917 whereClause = eval_const_expressions(NULL, whereClause);
918
919 whereClause = (Node *) canonicalize_qual((Expr *) whereClause, false);
920 whereClause = (Node *) make_ands_implicit((Expr *) whereClause);
921 }
922
923 tupDesc = RelationGetDescr(rel);
924 attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
925 foreach(cur, attnums)
926 {
927 int attno = lfirst_int(cur) -
928 FirstLowInvalidHeapAttributeNumber;
929
930 if (is_from)
931 rte->insertedCols = bms_add_member(rte->insertedCols, attno);
932 else
933 rte->selectedCols = bms_add_member(rte->selectedCols, attno);
934 }
935 ExecCheckRTPerms(pstate->p_rtable, true);
936
937 /*
938 * Permission check for row security policies.
939 *
940 * check_enable_rls will ereport(ERROR) if the user has requested
941 * something invalid and will otherwise indicate if we should enable
942 * RLS (returns RLS_ENABLED) or not for this COPY statement.
943 *
944 * If the relation has a row security policy and we are to apply it
945 * then perform a "query" copy and allow the normal query processing
946 * to handle the policies.
947 *
948 * If RLS is not enabled for this, then just fall through to the
949 * normal non-filtering relation handling.
950 */
951 if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
952 {
953 SelectStmt *select;
954 ColumnRef *cr;
955 ResTarget *target;
956 RangeVar *from;
957 List *targetList = NIL;
958
959 if (is_from)
960 ereport(ERROR,
961 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
962 errmsg("COPY FROM not supported with row-level security"),
963 errhint("Use INSERT statements instead.")));
964
965 /*
966 * Build target list
967 *
968 * If no columns are specified in the attribute list of the COPY
969 * command, then the target list is 'all' columns. Therefore, '*'
970 * should be used as the target list for the resulting SELECT
971 * statement.
972 *
973 * In the case that columns are specified in the attribute list,
974 * create a ColumnRef and ResTarget for each column and add them
975 * to the target list for the resulting SELECT statement.
976 */
977 if (!stmt->attlist)
978 {
979 cr = makeNode(ColumnRef);
980 cr->fields = list_make1(makeNode(A_Star));
981 cr->location = -1;
982
983 target = makeNode(ResTarget);
984 target->name = NULL;
985 target->indirection = NIL;
986 target->val = (Node *) cr;
987 target->location = -1;
988
989 targetList = list_make1(target);
990 }
991 else
992 {
993 ListCell *lc;
994
995 foreach(lc, stmt->attlist)
996 {
997 /*
998 * Build the ColumnRef for each column. The ColumnRef
999 * 'fields' property is a String 'Value' node (see
1000 * nodes/value.h) that corresponds to the column name
1001 * respectively.
1002 */
1003 cr = makeNode(ColumnRef);
1004 cr->fields = list_make1(lfirst(lc));
1005 cr->location = -1;
1006
1007 /* Build the ResTarget and add the ColumnRef to it. */
1008 target = makeNode(ResTarget);
1009 target->name = NULL;
1010 target->indirection = NIL;
1011 target->val = (Node *) cr;
1012 target->location = -1;
1013
1014 /* Add each column to the SELECT statement's target list */
1015 targetList = lappend(targetList, target);
1016 }
1017 }
1018
1019 /*
1020 * Build RangeVar for from clause, fully qualified based on the
1021 * relation which we have opened and locked.
1022 */
1023 from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
1024 pstrdup(RelationGetRelationName(rel)),
1025 -1);
1026
1027 /* Build query */
1028 select = makeNode(SelectStmt);
1029 select->targetList = targetList;
1030 select->fromClause = list_make1(from);
1031
1032 query = makeNode(RawStmt);
1033 query->stmt = (Node *) select;
1034 query->stmt_location = stmt_location;
1035 query->stmt_len = stmt_len;
1036
1037 /*
1038 * Close the relation for now, but keep the lock on it to prevent
1039 * changes between now and when we start the query-based COPY.
1040 *
1041 * We'll reopen it later as part of the query-based COPY.
1042 */
1043 table_close(rel, NoLock);
1044 rel = NULL;
1045 }
1046 }
1047 else
1048 {
1049 Assert(stmt->query);
1050
1051 query = makeNode(RawStmt);
1052 query->stmt = stmt->query;
1053 query->stmt_location = stmt_location;
1054 query->stmt_len = stmt_len;
1055
1056 relid = InvalidOid;
1057 rel = NULL;
1058 }
1059
1060 if (is_from)
1061 {
1062 Assert(rel);
1063
1064 /* check read-only transaction and parallel mode */
1065 if (XactReadOnly && !rel->rd_islocaltemp)
1066 PreventCommandIfReadOnly("COPY FROM");
1067 PreventCommandIfParallelMode("COPY FROM");
1068
1069 cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
1070 NULL, stmt->attlist, stmt->options);
1071 cstate->whereClause = whereClause;
1072 *processed = CopyFrom(cstate); /* copy from file to database */
1073 EndCopyFrom(cstate);
1074 }
1075 else
1076 {
1077 cstate = BeginCopyTo(pstate, rel, query, relid,
1078 stmt->filename, stmt->is_program,
1079 stmt->attlist, stmt->options);
1080 *processed = DoCopyTo(cstate); /* copy from database to file */
1081 EndCopyTo(cstate);
1082 }
1083
1084 /*
1085 * Close the relation. If reading, we can release the AccessShareLock we
1086 * got; if writing, we should hold the lock until end of transaction to
1087 * ensure that updates will be committed before lock is released.
1088 */
1089 if (rel != NULL)
1090 table_close(rel, (is_from ? NoLock : AccessShareLock));
1091}
1092
1093/*
1094 * Process the statement option list for COPY.
1095 *
1096 * Scan the options list (a list of DefElem) and transpose the information
1097 * into cstate, applying appropriate error checking.
1098 *
1099 * cstate is assumed to be filled with zeroes initially.
1100 *
1101 * This is exported so that external users of the COPY API can sanity-check
1102 * a list of options. In that usage, cstate should be passed as NULL
1103 * (since external users don't know sizeof(CopyStateData)) and the collected
1104 * data is just leaked until CurrentMemoryContext is reset.
1105 *
1106 * Note that additional checking, such as whether column names listed in FORCE
1107 * QUOTE actually exist, has to be applied later. This just checks for
1108 * self-consistency of the options list.
1109 */
1110void
1111ProcessCopyOptions(ParseState *pstate,
1112 CopyState cstate,
1113 bool is_from,
1114 List *options)
1115{
1116 bool format_specified = false;
1117 ListCell *option;
1118
1119 /* Support external use for option sanity checking */
1120 if (cstate == NULL)
1121 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1122
1123 cstate->is_copy_from = is_from;
1124
1125 cstate->file_encoding = -1;
1126
1127 /* Extract options from the statement node tree */
1128 foreach(option, options)
1129 {
1130 DefElem *defel = lfirst_node(DefElem, option);
1131
1132 if (strcmp(defel->defname, "format") == 0)
1133 {
1134 char *fmt = defGetString(defel);
1135
1136 if (format_specified)
1137 ereport(ERROR,
1138 (errcode(ERRCODE_SYNTAX_ERROR),
1139 errmsg("conflicting or redundant options"),
1140 parser_errposition(pstate, defel->location)));
1141 format_specified = true;
1142 if (strcmp(fmt, "text") == 0)
1143 /* default format */ ;
1144 else if (strcmp(fmt, "csv") == 0)
1145 cstate->csv_mode = true;
1146 else if (strcmp(fmt, "binary") == 0)
1147 cstate->binary = true;
1148 else
1149 ereport(ERROR,
1150 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1151 errmsg("COPY format \"%s\" not recognized", fmt),
1152 parser_errposition(pstate, defel->location)));
1153 }
1154 else if (strcmp(defel->defname, "freeze") == 0)
1155 {
1156 if (cstate->freeze)
1157 ereport(ERROR,
1158 (errcode(ERRCODE_SYNTAX_ERROR),
1159 errmsg("conflicting or redundant options"),
1160 parser_errposition(pstate, defel->location)));
1161 cstate->freeze = defGetBoolean(defel);
1162 }
1163 else if (strcmp(defel->defname, "delimiter") == 0)
1164 {
1165 if (cstate->delim)
1166 ereport(ERROR,
1167 (errcode(ERRCODE_SYNTAX_ERROR),
1168 errmsg("conflicting or redundant options"),
1169 parser_errposition(pstate, defel->location)));
1170 cstate->delim = defGetString(defel);
1171 }
1172 else if (strcmp(defel->defname, "null") == 0)
1173 {
1174 if (cstate->null_print)
1175 ereport(ERROR,
1176 (errcode(ERRCODE_SYNTAX_ERROR),
1177 errmsg("conflicting or redundant options"),
1178 parser_errposition(pstate, defel->location)));
1179 cstate->null_print = defGetString(defel);
1180 }
1181 else if (strcmp(defel->defname, "header") == 0)
1182 {
1183 if (cstate->header_line)
1184 ereport(ERROR,
1185 (errcode(ERRCODE_SYNTAX_ERROR),
1186 errmsg("conflicting or redundant options"),
1187 parser_errposition(pstate, defel->location)));
1188 cstate->header_line = defGetBoolean(defel);
1189 }
1190 else if (strcmp(defel->defname, "quote") == 0)
1191 {
1192 if (cstate->quote)
1193 ereport(ERROR,
1194 (errcode(ERRCODE_SYNTAX_ERROR),
1195 errmsg("conflicting or redundant options"),
1196 parser_errposition(pstate, defel->location)));
1197 cstate->quote = defGetString(defel);
1198 }
1199 else if (strcmp(defel->defname, "escape") == 0)
1200 {
1201 if (cstate->escape)
1202 ereport(ERROR,
1203 (errcode(ERRCODE_SYNTAX_ERROR),
1204 errmsg("conflicting or redundant options"),
1205 parser_errposition(pstate, defel->location)));
1206 cstate->escape = defGetString(defel);
1207 }
1208 else if (strcmp(defel->defname, "force_quote") == 0)
1209 {
1210 if (cstate->force_quote || cstate->force_quote_all)
1211 ereport(ERROR,
1212 (errcode(ERRCODE_SYNTAX_ERROR),
1213 errmsg("conflicting or redundant options"),
1214 parser_errposition(pstate, defel->location)));
1215 if (defel->arg && IsA(defel->arg, A_Star))
1216 cstate->force_quote_all = true;
1217 else if (defel->arg && IsA(defel->arg, List))
1218 cstate->force_quote = castNode(List, defel->arg);
1219 else
1220 ereport(ERROR,
1221 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1222 errmsg("argument to option \"%s\" must be a list of column names",
1223 defel->defname),
1224 parser_errposition(pstate, defel->location)));
1225 }
1226 else if (strcmp(defel->defname, "force_not_null") == 0)
1227 {
1228 if (cstate->force_notnull)
1229 ereport(ERROR,
1230 (errcode(ERRCODE_SYNTAX_ERROR),
1231 errmsg("conflicting or redundant options"),
1232 parser_errposition(pstate, defel->location)));
1233 if (defel->arg && IsA(defel->arg, List))
1234 cstate->force_notnull = castNode(List, defel->arg);
1235 else
1236 ereport(ERROR,
1237 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1238 errmsg("argument to option \"%s\" must be a list of column names",
1239 defel->defname),
1240 parser_errposition(pstate, defel->location)));
1241 }
1242 else if (strcmp(defel->defname, "force_null") == 0)
1243 {
1244 if (cstate->force_null)
1245 ereport(ERROR,
1246 (errcode(ERRCODE_SYNTAX_ERROR),
1247 errmsg("conflicting or redundant options")));
1248 if (defel->arg && IsA(defel->arg, List))
1249 cstate->force_null = castNode(List, defel->arg);
1250 else
1251 ereport(ERROR,
1252 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1253 errmsg("argument to option \"%s\" must be a list of column names",
1254 defel->defname),
1255 parser_errposition(pstate, defel->location)));
1256 }
1257 else if (strcmp(defel->defname, "convert_selectively") == 0)
1258 {
1259 /*
1260 * Undocumented, not-accessible-from-SQL option: convert only the
1261 * named columns to binary form, storing the rest as NULLs. It's
1262 * allowed for the column list to be NIL.
1263 */
1264 if (cstate->convert_selectively)
1265 ereport(ERROR,
1266 (errcode(ERRCODE_SYNTAX_ERROR),
1267 errmsg("conflicting or redundant options"),
1268 parser_errposition(pstate, defel->location)));
1269 cstate->convert_selectively = true;
1270 if (defel->arg == NULL || IsA(defel->arg, List))
1271 cstate->convert_select = castNode(List, defel->arg);
1272 else
1273 ereport(ERROR,
1274 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1275 errmsg("argument to option \"%s\" must be a list of column names",
1276 defel->defname),
1277 parser_errposition(pstate, defel->location)));
1278 }
1279 else if (strcmp(defel->defname, "encoding") == 0)
1280 {
1281 if (cstate->file_encoding >= 0)
1282 ereport(ERROR,
1283 (errcode(ERRCODE_SYNTAX_ERROR),
1284 errmsg("conflicting or redundant options"),
1285 parser_errposition(pstate, defel->location)));
1286 cstate->file_encoding = pg_char_to_encoding(defGetString(defel));
1287 if (cstate->file_encoding < 0)
1288 ereport(ERROR,
1289 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1290 errmsg("argument to option \"%s\" must be a valid encoding name",
1291 defel->defname),
1292 parser_errposition(pstate, defel->location)));
1293 }
1294 else
1295 ereport(ERROR,
1296 (errcode(ERRCODE_SYNTAX_ERROR),
1297 errmsg("option \"%s\" not recognized",
1298 defel->defname),
1299 parser_errposition(pstate, defel->location)));
1300 }
1301
1302 /*
1303 * Check for incompatible options (must do these two before inserting
1304 * defaults)
1305 */
1306 if (cstate->binary && cstate->delim)
1307 ereport(ERROR,
1308 (errcode(ERRCODE_SYNTAX_ERROR),
1309 errmsg("cannot specify DELIMITER in BINARY mode")));
1310
1311 if (cstate->binary && cstate->null_print)
1312 ereport(ERROR,
1313 (errcode(ERRCODE_SYNTAX_ERROR),
1314 errmsg("cannot specify NULL in BINARY mode")));
1315
1316 /* Set defaults for omitted options */
1317 if (!cstate->delim)
1318 cstate->delim = cstate->csv_mode ? "," : "\t";
1319
1320 if (!cstate->null_print)
1321 cstate->null_print = cstate->csv_mode ? "" : "\\N";
1322 cstate->null_print_len = strlen(cstate->null_print);
1323
1324 if (cstate->csv_mode)
1325 {
1326 if (!cstate->quote)
1327 cstate->quote = "\"";
1328 if (!cstate->escape)
1329 cstate->escape = cstate->quote;
1330 }
1331
1332 /* Only single-byte delimiter strings are supported. */
1333 if (strlen(cstate->delim) != 1)
1334 ereport(ERROR,
1335 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1336 errmsg("COPY delimiter must be a single one-byte character")));
1337
1338 /* Disallow end-of-line characters */
1339 if (strchr(cstate->delim, '\r') != NULL ||
1340 strchr(cstate->delim, '\n') != NULL)
1341 ereport(ERROR,
1342 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1343 errmsg("COPY delimiter cannot be newline or carriage return")));
1344
1345 if (strchr(cstate->null_print, '\r') != NULL ||
1346 strchr(cstate->null_print, '\n') != NULL)
1347 ereport(ERROR,
1348 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1349 errmsg("COPY null representation cannot use newline or carriage return")));
1350
1351 /*
1352 * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
1353 * backslash because it would be ambiguous. We can't allow the other
1354 * cases because data characters matching the delimiter must be
1355 * backslashed, and certain backslash combinations are interpreted
1356 * non-literally by COPY IN. Disallowing all lower case ASCII letters is
1357 * more than strictly necessary, but seems best for consistency and
1358 * future-proofing. Likewise we disallow all digits though only octal
1359 * digits are actually dangerous.
1360 */
1361 if (!cstate->csv_mode &&
1362 strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
1363 cstate->delim[0]) != NULL)
1364 ereport(ERROR,
1365 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1366 errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
1367
1368 /* Check header */
1369 if (!cstate->csv_mode && cstate->header_line)
1370 ereport(ERROR,
1371 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1372 errmsg("COPY HEADER available only in CSV mode")));
1373
1374 /* Check quote */
1375 if (!cstate->csv_mode && cstate->quote != NULL)
1376 ereport(ERROR,
1377 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1378 errmsg("COPY quote available only in CSV mode")));
1379
1380 if (cstate->csv_mode && strlen(cstate->quote) != 1)
1381 ereport(ERROR,
1382 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1383 errmsg("COPY quote must be a single one-byte character")));
1384
1385 if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
1386 ereport(ERROR,
1387 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1388 errmsg("COPY delimiter and quote must be different")));
1389
1390 /* Check escape */
1391 if (!cstate->csv_mode && cstate->escape != NULL)
1392 ereport(ERROR,
1393 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1394 errmsg("COPY escape available only in CSV mode")));
1395
1396 if (cstate->csv_mode && strlen(cstate->escape) != 1)
1397 ereport(ERROR,
1398 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1399 errmsg("COPY escape must be a single one-byte character")));
1400
1401 /* Check force_quote */
1402 if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
1403 ereport(ERROR,
1404 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1405 errmsg("COPY force quote available only in CSV mode")));
1406 if ((cstate->force_quote || cstate->force_quote_all) && is_from)
1407 ereport(ERROR,
1408 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1409 errmsg("COPY force quote only available using COPY TO")));
1410
1411 /* Check force_notnull */
1412 if (!cstate->csv_mode && cstate->force_notnull != NIL)
1413 ereport(ERROR,
1414 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1415 errmsg("COPY force not null available only in CSV mode")));
1416 if (cstate->force_notnull != NIL && !is_from)
1417 ereport(ERROR,
1418 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1419 errmsg("COPY force not null only available using COPY FROM")));
1420
1421 /* Check force_null */
1422 if (!cstate->csv_mode && cstate->force_null != NIL)
1423 ereport(ERROR,
1424 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1425 errmsg("COPY force null available only in CSV mode")));
1426
1427 if (cstate->force_null != NIL && !is_from)
1428 ereport(ERROR,
1429 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1430 errmsg("COPY force null only available using COPY FROM")));
1431
1432 /* Don't allow the delimiter to appear in the null string. */
1433 if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
1434 ereport(ERROR,
1435 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1436 errmsg("COPY delimiter must not appear in the NULL specification")));
1437
1438 /* Don't allow the CSV quote char to appear in the null string. */
1439 if (cstate->csv_mode &&
1440 strchr(cstate->null_print, cstate->quote[0]) != NULL)
1441 ereport(ERROR,
1442 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1443 errmsg("CSV quote character must not appear in the NULL specification")));
1444}
1445
1446/*
1447 * Common setup routines used by BeginCopyFrom and BeginCopyTo.
1448 *
1449 * Iff <binary>, unload or reload in the binary format, as opposed to the
1450 * more wasteful but more robust and portable text format.
1451 *
1452 * Iff <oids>, unload or reload the format that includes OID information.
1453 * On input, we accept OIDs whether or not the table has an OID column,
1454 * but silently drop them if it does not. On output, we report an error
1455 * if the user asks for OIDs in a table that has none (not providing an
1456 * OID column might seem friendlier, but could seriously confuse programs).
1457 *
1458 * If in the text format, delimit columns with delimiter <delim> and print
1459 * NULL values as <null_print>.
1460 */
1461static CopyState
1462BeginCopy(ParseState *pstate,
1463 bool is_from,
1464 Relation rel,
1465 RawStmt *raw_query,
1466 Oid queryRelId,
1467 List *attnamelist,
1468 List *options)
1469{
1470 CopyState cstate;
1471 TupleDesc tupDesc;
1472 int num_phys_attrs;
1473 MemoryContext oldcontext;
1474
1475 /* Allocate workspace and zero all fields */
1476 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1477
1478 /*
1479 * We allocate everything used by a cstate in a new memory context. This
1480 * avoids memory leaks during repeated use of COPY in a query.
1481 */
1482 cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1483 "COPY",
1484 ALLOCSET_DEFAULT_SIZES);
1485
1486 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1487
1488 /* Extract options from the statement node tree */
1489 ProcessCopyOptions(pstate, cstate, is_from, options);
1490
1491 /* Process the source/target relation or query */
1492 if (rel)
1493 {
1494 Assert(!raw_query);
1495
1496 cstate->rel = rel;
1497
1498 tupDesc = RelationGetDescr(cstate->rel);
1499 }
1500 else
1501 {
1502 List *rewritten;
1503 Query *query;
1504 PlannedStmt *plan;
1505 DestReceiver *dest;
1506
1507 Assert(!is_from);
1508 cstate->rel = NULL;
1509
1510 /*
1511 * Run parse analysis and rewrite. Note this also acquires sufficient
1512 * locks on the source table(s).
1513 *
1514 * Because the parser and planner tend to scribble on their input, we
1515 * make a preliminary copy of the source querytree. This prevents
1516 * problems in the case that the COPY is in a portal or plpgsql
1517 * function and is executed repeatedly. (See also the same hack in
1518 * DECLARE CURSOR and PREPARE.) XXX FIXME someday.
1519 */
1520 rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
1521 pstate->p_sourcetext, NULL, 0,
1522 NULL);
1523
1524 /* check that we got back something we can work with */
1525 if (rewritten == NIL)
1526 {
1527 ereport(ERROR,
1528 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1529 errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
1530 }
1531 else if (list_length(rewritten) > 1)
1532 {
1533 ListCell *lc;
1534
1535 /* examine queries to determine which error message to issue */
1536 foreach(lc, rewritten)
1537 {
1538 Query *q = lfirst_node(Query, lc);
1539
1540 if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
1541 ereport(ERROR,
1542 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1543 errmsg("conditional DO INSTEAD rules are not supported for COPY")));
1544 if (q->querySource == QSRC_NON_INSTEAD_RULE)
1545 ereport(ERROR,
1546 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1547 errmsg("DO ALSO rules are not supported for the COPY")));
1548 }
1549
1550 ereport(ERROR,
1551 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1552 errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
1553 }
1554
1555 query = linitial_node(Query, rewritten);
1556
1557 /* The grammar allows SELECT INTO, but we don't support that */
1558 if (query->utilityStmt != NULL &&
1559 IsA(query->utilityStmt, CreateTableAsStmt))
1560 ereport(ERROR,
1561 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1562 errmsg("COPY (SELECT INTO) is not supported")));
1563
1564 Assert(query->utilityStmt == NULL);
1565
1566 /*
1567 * Similarly the grammar doesn't enforce the presence of a RETURNING
1568 * clause, but this is required here.
1569 */
1570 if (query->commandType != CMD_SELECT &&
1571 query->returningList == NIL)
1572 {
1573 Assert(query->commandType == CMD_INSERT ||
1574 query->commandType == CMD_UPDATE ||
1575 query->commandType == CMD_DELETE);
1576
1577 ereport(ERROR,
1578 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1579 errmsg("COPY query must have a RETURNING clause")));
1580 }
1581
1582 /* plan the query */
1583 plan = pg_plan_query(query, CURSOR_OPT_PARALLEL_OK, NULL);
1584
1585 /*
1586 * With row level security and a user using "COPY relation TO", we
1587 * have to convert the "COPY relation TO" to a query-based COPY (eg:
1588 * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
1589 * in any RLS clauses.
1590 *
1591 * When this happens, we are passed in the relid of the originally
1592 * found relation (which we have locked). As the planner will look up
1593 * the relation again, we double-check here to make sure it found the
1594 * same one that we have locked.
1595 */
1596 if (queryRelId != InvalidOid)
1597 {
1598 /*
1599 * Note that with RLS involved there may be multiple relations,
1600 * and while the one we need is almost certainly first, we don't
1601 * make any guarantees of that in the planner, so check the whole
1602 * list and make sure we find the original relation.
1603 */
1604 if (!list_member_oid(plan->relationOids, queryRelId))
1605 ereport(ERROR,
1606 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1607 errmsg("relation referenced by COPY statement has changed")));
1608 }
1609
1610 /*
1611 * Use a snapshot with an updated command ID to ensure this query sees
1612 * results of any previously executed queries.
1613 */
1614 PushCopiedSnapshot(GetActiveSnapshot());
1615 UpdateActiveSnapshotCommandId();
1616
1617 /* Create dest receiver for COPY OUT */
1618 dest = CreateDestReceiver(DestCopyOut);
1619 ((DR_copy *) dest)->cstate = cstate;
1620
1621 /* Create a QueryDesc requesting no output */
1622 cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
1623 GetActiveSnapshot(),
1624 InvalidSnapshot,
1625 dest, NULL, NULL, 0);
1626
1627 /*
1628 * Call ExecutorStart to prepare the plan for execution.
1629 *
1630 * ExecutorStart computes a result tupdesc for us
1631 */
1632 ExecutorStart(cstate->queryDesc, 0);
1633
1634 tupDesc = cstate->queryDesc->tupDesc;
1635 }
1636
1637 /* Generate or convert list of attributes to process */
1638 cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1639
1640 num_phys_attrs = tupDesc->natts;
1641
1642 /* Convert FORCE_QUOTE name list to per-column flags, check validity */
1643 cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1644 if (cstate->force_quote_all)
1645 {
1646 int i;
1647
1648 for (i = 0; i < num_phys_attrs; i++)
1649 cstate->force_quote_flags[i] = true;
1650 }
1651 else if (cstate->force_quote)
1652 {
1653 List *attnums;
1654 ListCell *cur;
1655
1656 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
1657
1658 foreach(cur, attnums)
1659 {
1660 int attnum = lfirst_int(cur);
1661 Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1662
1663 if (!list_member_int(cstate->attnumlist, attnum))
1664 ereport(ERROR,
1665 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1666 errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
1667 NameStr(attr->attname))));
1668 cstate->force_quote_flags[attnum - 1] = true;
1669 }
1670 }
1671
1672 /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1673 cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1674 if (cstate->force_notnull)
1675 {
1676 List *attnums;
1677 ListCell *cur;
1678
1679 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
1680
1681 foreach(cur, attnums)
1682 {
1683 int attnum = lfirst_int(cur);
1684 Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1685
1686 if (!list_member_int(cstate->attnumlist, attnum))
1687 ereport(ERROR,
1688 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1689 errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1690 NameStr(attr->attname))));
1691 cstate->force_notnull_flags[attnum - 1] = true;
1692 }
1693 }
1694
1695 /* Convert FORCE_NULL name list to per-column flags, check validity */
1696 cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1697 if (cstate->force_null)
1698 {
1699 List *attnums;
1700 ListCell *cur;
1701
1702 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null);
1703
1704 foreach(cur, attnums)
1705 {
1706 int attnum = lfirst_int(cur);
1707 Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1708
1709 if (!list_member_int(cstate->attnumlist, attnum))
1710 ereport(ERROR,
1711 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1712 errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1713 NameStr(attr->attname))));
1714 cstate->force_null_flags[attnum - 1] = true;
1715 }
1716 }
1717
1718 /* Convert convert_selectively name list to per-column flags */
1719 if (cstate->convert_selectively)
1720 {
1721 List *attnums;
1722 ListCell *cur;
1723
1724 cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1725
1726 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
1727
1728 foreach(cur, attnums)
1729 {
1730 int attnum = lfirst_int(cur);
1731 Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1732
1733 if (!list_member_int(cstate->attnumlist, attnum))
1734 ereport(ERROR,
1735 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1736 errmsg_internal("selected column \"%s\" not referenced by COPY",
1737 NameStr(attr->attname))));
1738 cstate->convert_select_flags[attnum - 1] = true;
1739 }
1740 }
1741
1742 /* Use client encoding when ENCODING option is not specified. */
1743 if (cstate->file_encoding < 0)
1744 cstate->file_encoding = pg_get_client_encoding();
1745
1746 /*
1747 * Set up encoding conversion info. Even if the file and server encodings
1748 * are the same, we must apply pg_any_to_server() to validate data in
1749 * multibyte encodings.
1750 */
1751 cstate->need_transcoding =
1752 (cstate->file_encoding != GetDatabaseEncoding() ||
1753 pg_database_encoding_max_length() > 1);
1754 /* See Multibyte encoding comment above */
1755 cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
1756
1757 cstate->copy_dest = COPY_FILE; /* default */
1758
1759 MemoryContextSwitchTo(oldcontext);
1760
1761 return cstate;
1762}
1763
1764/*
1765 * Closes the pipe to an external program, checking the pclose() return code.
1766 */
1767static void
1768ClosePipeToProgram(CopyState cstate)
1769{
1770 int pclose_rc;
1771
1772 Assert(cstate->is_program);
1773
1774 pclose_rc = ClosePipeStream(cstate->copy_file);
1775 if (pclose_rc == -1)
1776 ereport(ERROR,
1777 (errcode_for_file_access(),
1778 errmsg("could not close pipe to external command: %m")));
1779 else if (pclose_rc != 0)
1780 {
1781 /*
1782 * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1783 * expectable for the called program to fail with SIGPIPE, and we
1784 * should not report that as an error. Otherwise, SIGPIPE indicates a
1785 * problem.
1786 */
1787 if (cstate->is_copy_from && !cstate->reached_eof &&
1788 wait_result_is_signal(pclose_rc, SIGPIPE))
1789 return;
1790
1791 ereport(ERROR,
1792 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1793 errmsg("program \"%s\" failed",
1794 cstate->filename),
1795 errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1796 }
1797}
1798
1799/*
1800 * Release resources allocated in a cstate for COPY TO/FROM.
1801 */
1802static void
1803EndCopy(CopyState cstate)
1804{
1805 if (cstate->is_program)
1806 {
1807 ClosePipeToProgram(cstate);
1808 }
1809 else
1810 {
1811 if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1812 ereport(ERROR,
1813 (errcode_for_file_access(),
1814 errmsg("could not close file \"%s\": %m",
1815 cstate->filename)));
1816 }
1817
1818 MemoryContextDelete(cstate->copycontext);
1819 pfree(cstate);
1820}
1821
1822/*
1823 * Setup CopyState to read tuples from a table or a query for COPY TO.
1824 */
1825static CopyState
1826BeginCopyTo(ParseState *pstate,
1827 Relation rel,
1828 RawStmt *query,
1829 Oid queryRelId,
1830 const char *filename,
1831 bool is_program,
1832 List *attnamelist,
1833 List *options)
1834{
1835 CopyState cstate;
1836 bool pipe = (filename == NULL);
1837 MemoryContext oldcontext;
1838
1839 if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
1840 {
1841 if (rel->rd_rel->relkind == RELKIND_VIEW)
1842 ereport(ERROR,
1843 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1844 errmsg("cannot copy from view \"%s\"",
1845 RelationGetRelationName(rel)),
1846 errhint("Try the COPY (SELECT ...) TO variant.")));
1847 else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
1848 ereport(ERROR,
1849 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1850 errmsg("cannot copy from materialized view \"%s\"",
1851 RelationGetRelationName(rel)),
1852 errhint("Try the COPY (SELECT ...) TO variant.")));
1853 else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1854 ereport(ERROR,
1855 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1856 errmsg("cannot copy from foreign table \"%s\"",
1857 RelationGetRelationName(rel)),
1858 errhint("Try the COPY (SELECT ...) TO variant.")));
1859 else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
1860 ereport(ERROR,
1861 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1862 errmsg("cannot copy from sequence \"%s\"",
1863 RelationGetRelationName(rel))));
1864 else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1865 ereport(ERROR,
1866 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1867 errmsg("cannot copy from partitioned table \"%s\"",
1868 RelationGetRelationName(rel)),
1869 errhint("Try the COPY (SELECT ...) TO variant.")));
1870 else
1871 ereport(ERROR,
1872 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1873 errmsg("cannot copy from non-table relation \"%s\"",
1874 RelationGetRelationName(rel))));
1875 }
1876
1877 cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
1878 options);
1879 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1880
1881 if (pipe)
1882 {
1883 Assert(!is_program); /* the grammar does not allow this */
1884 if (whereToSendOutput != DestRemote)
1885 cstate->copy_file = stdout;
1886 }
1887 else
1888 {
1889 cstate->filename = pstrdup(filename);
1890 cstate->is_program = is_program;
1891
1892 if (is_program)
1893 {
1894 cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
1895 if (cstate->copy_file == NULL)
1896 ereport(ERROR,
1897 (errcode_for_file_access(),
1898 errmsg("could not execute command \"%s\": %m",
1899 cstate->filename)));
1900 }
1901 else
1902 {
1903 mode_t oumask; /* Pre-existing umask value */
1904 struct stat st;
1905
1906 /*
1907 * Prevent write to relative path ... too easy to shoot oneself in
1908 * the foot by overwriting a database file ...
1909 */
1910 if (!is_absolute_path(filename))
1911 ereport(ERROR,
1912 (errcode(ERRCODE_INVALID_NAME),
1913 errmsg("relative path not allowed for COPY to file")));
1914
1915 oumask = umask(S_IWGRP | S_IWOTH);
1916 PG_TRY();
1917 {
1918 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1919 }
1920 PG_CATCH();
1921 {
1922 umask(oumask);
1923 PG_RE_THROW();
1924 }
1925 PG_END_TRY();
1926 umask(oumask);
1927 if (cstate->copy_file == NULL)
1928 {
1929 /* copy errno because ereport subfunctions might change it */
1930 int save_errno = errno;
1931
1932 ereport(ERROR,
1933 (errcode_for_file_access(),
1934 errmsg("could not open file \"%s\" for writing: %m",
1935 cstate->filename),
1936 (save_errno == ENOENT || save_errno == EACCES) ?
1937 errhint("COPY TO instructs the PostgreSQL server process to write a file. "
1938 "You may want a client-side facility such as psql's \\copy.") : 0));
1939 }
1940
1941 if (fstat(fileno(cstate->copy_file), &st))
1942 ereport(ERROR,
1943 (errcode_for_file_access(),
1944 errmsg("could not stat file \"%s\": %m",
1945 cstate->filename)));
1946
1947 if (S_ISDIR(st.st_mode))
1948 ereport(ERROR,
1949 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1950 errmsg("\"%s\" is a directory", cstate->filename)));
1951 }
1952 }
1953
1954 MemoryContextSwitchTo(oldcontext);
1955
1956 return cstate;
1957}
1958
1959/*
1960 * This intermediate routine exists mainly to localize the effects of setjmp
1961 * so we don't need to plaster a lot of variables with "volatile".
1962 */
1963static uint64
1964DoCopyTo(CopyState cstate)
1965{
1966 bool pipe = (cstate->filename == NULL);
1967 bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1968 uint64 processed;
1969
1970 PG_TRY();
1971 {
1972 if (fe_copy)
1973 SendCopyBegin(cstate);
1974
1975 processed = CopyTo(cstate);
1976
1977 if (fe_copy)
1978 SendCopyEnd(cstate);
1979 }
1980 PG_CATCH();
1981 {
1982 /*
1983 * Make sure we turn off old-style COPY OUT mode upon error. It is
1984 * okay to do this in all cases, since it does nothing if the mode is
1985 * not on.
1986 */
1987 pq_endcopyout(true);
1988 PG_RE_THROW();
1989 }
1990 PG_END_TRY();
1991
1992 return processed;
1993}
1994
1995/*
1996 * Clean up storage and release resources for COPY TO.
1997 */
1998static void
1999EndCopyTo(CopyState cstate)
2000{
2001 if (cstate->queryDesc != NULL)
2002 {
2003 /* Close down the query and free resources. */
2004 ExecutorFinish(cstate->queryDesc);
2005 ExecutorEnd(cstate->queryDesc);
2006 FreeQueryDesc(cstate->queryDesc);
2007 PopActiveSnapshot();
2008 }
2009
2010 /* Clean up storage */
2011 EndCopy(cstate);
2012}
2013
2014/*
2015 * Copy from relation or query TO file.
2016 */
2017static uint64
2018CopyTo(CopyState cstate)
2019{
2020 TupleDesc tupDesc;
2021 int num_phys_attrs;
2022 ListCell *cur;
2023 uint64 processed;
2024
2025 if (cstate->rel)
2026 tupDesc = RelationGetDescr(cstate->rel);
2027 else
2028 tupDesc = cstate->queryDesc->tupDesc;
2029 num_phys_attrs = tupDesc->natts;
2030 cstate->null_print_client = cstate->null_print; /* default */
2031
2032 /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
2033 cstate->fe_msgbuf = makeStringInfo();
2034
2035 /* Get info about the columns we need to process. */
2036 cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
2037 foreach(cur, cstate->attnumlist)
2038 {
2039 int attnum = lfirst_int(cur);
2040 Oid out_func_oid;
2041 bool isvarlena;
2042 Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
2043
2044 if (cstate->binary)
2045 getTypeBinaryOutputInfo(attr->atttypid,
2046 &out_func_oid,
2047 &isvarlena);
2048 else
2049 getTypeOutputInfo(attr->atttypid,
2050 &out_func_oid,
2051 &isvarlena);
2052 fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
2053 }
2054
2055 /*
2056 * Create a temporary memory context that we can reset once per row to
2057 * recover palloc'd memory. This avoids any problems with leaks inside
2058 * datatype output routines, and should be faster than retail pfree's
2059 * anyway. (We don't need a whole econtext as CopyFrom does.)
2060 */
2061 cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
2062 "COPY TO",
2063 ALLOCSET_DEFAULT_SIZES);
2064
2065 if (cstate->binary)
2066 {
2067 /* Generate header for a binary copy */
2068 int32 tmp;
2069
2070 /* Signature */
2071 CopySendData(cstate, BinarySignature, 11);
2072 /* Flags field */
2073 tmp = 0;
2074 CopySendInt32(cstate, tmp);
2075 /* No header extension */
2076 tmp = 0;
2077 CopySendInt32(cstate, tmp);
2078 }
2079 else
2080 {
2081 /*
2082 * For non-binary copy, we need to convert null_print to file
2083 * encoding, because it will be sent directly with CopySendString.
2084 */
2085 if (cstate->need_transcoding)
2086 cstate->null_print_client = pg_server_to_any(cstate->null_print,
2087 cstate->null_print_len,
2088 cstate->file_encoding);
2089
2090 /* if a header has been requested send the line */
2091 if (cstate->header_line)
2092 {
2093 bool hdr_delim = false;
2094
2095 foreach(cur, cstate->attnumlist)
2096 {
2097 int attnum = lfirst_int(cur);
2098 char *colname;
2099
2100 if (hdr_delim)
2101 CopySendChar(cstate, cstate->delim[0]);
2102 hdr_delim = true;
2103
2104 colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
2105
2106 CopyAttributeOutCSV(cstate, colname, false,
2107 list_length(cstate->attnumlist) == 1);
2108 }
2109
2110 CopySendEndOfRow(cstate);
2111 }
2112 }
2113
2114 if (cstate->rel)
2115 {
2116 TupleTableSlot *slot;
2117 TableScanDesc scandesc;
2118
2119 scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
2120 slot = table_slot_create(cstate->rel, NULL);
2121
2122 processed = 0;
2123 while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
2124 {
2125 CHECK_FOR_INTERRUPTS();
2126
2127 /* Deconstruct the tuple ... */
2128 slot_getallattrs(slot);
2129
2130 /* Format and send the data */
2131 CopyOneRowTo(cstate, slot);
2132 processed++;
2133 }
2134
2135 ExecDropSingleTupleTableSlot(slot);
2136 table_endscan(scandesc);
2137 }
2138 else
2139 {
2140 /* run the plan --- the dest receiver will send tuples */
2141 ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
2142 processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
2143 }
2144
2145 if (cstate->binary)
2146 {
2147 /* Generate trailer for a binary copy */
2148 CopySendInt16(cstate, -1);
2149 /* Need to flush out the trailer */
2150 CopySendEndOfRow(cstate);
2151 }
2152
2153 MemoryContextDelete(cstate->rowcontext);
2154
2155 return processed;
2156}
2157
2158/*
2159 * Emit one row during CopyTo().
2160 */
2161static void
2162CopyOneRowTo(CopyState cstate, TupleTableSlot *slot)
2163{
2164 bool need_delim = false;
2165 FmgrInfo *out_functions = cstate->out_functions;
2166 MemoryContext oldcontext;
2167 ListCell *cur;
2168 char *string;
2169
2170 MemoryContextReset(cstate->rowcontext);
2171 oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
2172
2173 if (cstate->binary)
2174 {
2175 /* Binary per-tuple header */
2176 CopySendInt16(cstate, list_length(cstate->attnumlist));
2177 }
2178
2179 /* Make sure the tuple is fully deconstructed */
2180 slot_getallattrs(slot);
2181
2182 foreach(cur, cstate->attnumlist)
2183 {
2184 int attnum = lfirst_int(cur);
2185 Datum value = slot->tts_values[attnum - 1];
2186 bool isnull = slot->tts_isnull[attnum - 1];
2187
2188 if (!cstate->binary)
2189 {
2190 if (need_delim)
2191 CopySendChar(cstate, cstate->delim[0]);
2192 need_delim = true;
2193 }
2194
2195 if (isnull)
2196 {
2197 if (!cstate->binary)
2198 CopySendString(cstate, cstate->null_print_client);
2199 else
2200 CopySendInt32(cstate, -1);
2201 }
2202 else
2203 {
2204 if (!cstate->binary)
2205 {
2206 string = OutputFunctionCall(&out_functions[attnum - 1],
2207 value);
2208 if (cstate->csv_mode)
2209 CopyAttributeOutCSV(cstate, string,
2210 cstate->force_quote_flags[attnum - 1],
2211 list_length(cstate->attnumlist) == 1);
2212 else
2213 CopyAttributeOutText(cstate, string);
2214 }
2215 else
2216 {
2217 bytea *outputbytes;
2218
2219 outputbytes = SendFunctionCall(&out_functions[attnum - 1],
2220 value);
2221 CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
2222 CopySendData(cstate, VARDATA(outputbytes),
2223 VARSIZE(outputbytes) - VARHDRSZ);
2224 }
2225 }
2226 }
2227
2228 CopySendEndOfRow(cstate);
2229
2230 MemoryContextSwitchTo(oldcontext);
2231}
2232
2233
2234/*
2235 * error context callback for COPY FROM
2236 *
2237 * The argument for the error context must be CopyState.
2238 */
2239void
2240CopyFromErrorCallback(void *arg)
2241{
2242 CopyState cstate = (CopyState) arg;
2243 char curlineno_str[32];
2244
2245 snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
2246 cstate->cur_lineno);
2247
2248 if (cstate->binary)
2249 {
2250 /* can't usefully display the data */
2251 if (cstate->cur_attname)
2252 errcontext("COPY %s, line %s, column %s",
2253 cstate->cur_relname, curlineno_str,
2254 cstate->cur_attname);
2255 else
2256 errcontext("COPY %s, line %s",
2257 cstate->cur_relname, curlineno_str);
2258 }
2259 else
2260 {
2261 if (cstate->cur_attname && cstate->cur_attval)
2262 {
2263 /* error is relevant to a particular column */
2264 char *attval;
2265
2266 attval = limit_printout_length(cstate->cur_attval);
2267 errcontext("COPY %s, line %s, column %s: \"%s\"",
2268 cstate->cur_relname, curlineno_str,
2269 cstate->cur_attname, attval);
2270 pfree(attval);
2271 }
2272 else if (cstate->cur_attname)
2273 {
2274 /* error is relevant to a particular column, value is NULL */
2275 errcontext("COPY %s, line %s, column %s: null input",
2276 cstate->cur_relname, curlineno_str,
2277 cstate->cur_attname);
2278 }
2279 else
2280 {
2281 /*
2282 * Error is relevant to a particular line.
2283 *
2284 * If line_buf still contains the correct line, and it's already
2285 * transcoded, print it. If it's still in a foreign encoding, it's
2286 * quite likely that the error is precisely a failure to do
2287 * encoding conversion (ie, bad data). We dare not try to convert
2288 * it, and at present there's no way to regurgitate it without
2289 * conversion. So we have to punt and just report the line number.
2290 */
2291 if (cstate->line_buf_valid &&
2292 (cstate->line_buf_converted || !cstate->need_transcoding))
2293 {
2294 char *lineval;
2295
2296 lineval = limit_printout_length(cstate->line_buf.data);
2297 errcontext("COPY %s, line %s: \"%s\"",
2298 cstate->cur_relname, curlineno_str, lineval);
2299 pfree(lineval);
2300 }
2301 else
2302 {
2303 errcontext("COPY %s, line %s",
2304 cstate->cur_relname, curlineno_str);
2305 }
2306 }
2307 }
2308}
2309
2310/*
2311 * Make sure we don't print an unreasonable amount of COPY data in a message.
2312 *
2313 * It would seem a lot easier to just use the sprintf "precision" limit to
2314 * truncate the string. However, some versions of glibc have a bug/misfeature
2315 * that vsnprintf will always fail (return -1) if it is asked to truncate
2316 * a string that contains invalid byte sequences for the current encoding.
2317 * So, do our own truncation. We return a pstrdup'd copy of the input.
2318 */
2319static char *
2320limit_printout_length(const char *str)
2321{
2322#define MAX_COPY_DATA_DISPLAY 100
2323
2324 int slen = strlen(str);
2325 int len;
2326 char *res;
2327
2328 /* Fast path if definitely okay */
2329 if (slen <= MAX_COPY_DATA_DISPLAY)
2330 return pstrdup(str);
2331
2332 /* Apply encoding-dependent truncation */
2333 len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
2334
2335 /*
2336 * Truncate, and add "..." to show we truncated the input.
2337 */
2338 res = (char *) palloc(len + 4);
2339 memcpy(res, str, len);
2340 strcpy(res + len, "...");
2341
2342 return res;
2343}
2344
2345/*
2346 * Allocate memory and initialize a new CopyMultiInsertBuffer for this
2347 * ResultRelInfo.
2348 */
2349static CopyMultiInsertBuffer *
2350CopyMultiInsertBufferInit(ResultRelInfo *rri)
2351{
2352 CopyMultiInsertBuffer *buffer;
2353
2354 buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
2355 memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
2356 buffer->resultRelInfo = rri;
2357 buffer->bistate = GetBulkInsertState();
2358 buffer->nused = 0;
2359
2360 return buffer;
2361}
2362
2363/*
2364 * Make a new buffer for this ResultRelInfo.
2365 */
2366static inline void
2367CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
2368 ResultRelInfo *rri)
2369{
2370 CopyMultiInsertBuffer *buffer;
2371
2372 buffer = CopyMultiInsertBufferInit(rri);
2373
2374 /* Setup back-link so we can easily find this buffer again */
2375 rri->ri_CopyMultiInsertBuffer = buffer;
2376 /* Record that we're tracking this buffer */
2377 miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
2378}
2379
2380/*
2381 * Initialize an already allocated CopyMultiInsertInfo.
2382 *
2383 * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
2384 * for that table.
2385 */
2386static void
2387CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
2388 CopyState cstate, EState *estate, CommandId mycid,
2389 int ti_options)
2390{
2391 miinfo->multiInsertBuffers = NIL;
2392 miinfo->bufferedTuples = 0;
2393 miinfo->bufferedBytes = 0;
2394 miinfo->cstate = cstate;
2395 miinfo->estate = estate;
2396 miinfo->mycid = mycid;
2397 miinfo->ti_options = ti_options;
2398
2399 /*
2400 * Only setup the buffer when not dealing with a partitioned table.
2401 * Buffers for partitioned tables will just be setup when we need to send
2402 * tuples their way for the first time.
2403 */
2404 if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
2405 CopyMultiInsertInfoSetupBuffer(miinfo, rri);
2406}
2407
2408/*
2409 * Returns true if the buffers are full
2410 */
2411static inline bool
2412CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
2413{
2414 if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
2415 miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
2416 return true;
2417 return false;
2418}
2419
2420/*
2421 * Returns true if we have no buffered tuples
2422 */
2423static inline bool
2424CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
2425{
2426 return miinfo->bufferedTuples == 0;
2427}
2428
2429/*
2430 * Write the tuples stored in 'buffer' out to the table.
2431 */
2432static inline void
2433CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
2434 CopyMultiInsertBuffer *buffer)
2435{
2436 MemoryContext oldcontext;
2437 int i;
2438 uint64 save_cur_lineno;
2439 CopyState cstate = miinfo->cstate;
2440 EState *estate = miinfo->estate;
2441 CommandId mycid = miinfo->mycid;
2442 int ti_options = miinfo->ti_options;
2443 bool line_buf_valid = cstate->line_buf_valid;
2444 int nused = buffer->nused;
2445 ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
2446 TupleTableSlot **slots = buffer->slots;
2447
2448 /* Set es_result_relation_info to the ResultRelInfo we're flushing. */
2449 estate->es_result_relation_info = resultRelInfo;
2450
2451 /*
2452 * Print error context information correctly, if one of the operations
2453 * below fail.
2454 */
2455 cstate->line_buf_valid = false;
2456 save_cur_lineno = cstate->cur_lineno;
2457
2458 /*
2459 * table_multi_insert may leak memory, so switch to short-lived memory
2460 * context before calling it.
2461 */
2462 oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2463 table_multi_insert(resultRelInfo->ri_RelationDesc,
2464 slots,
2465 nused,
2466 mycid,
2467 ti_options,
2468 buffer->bistate);
2469 MemoryContextSwitchTo(oldcontext);
2470
2471 for (i = 0; i < nused; i++)
2472 {
2473 /*
2474 * If there are any indexes, update them for all the inserted tuples,
2475 * and run AFTER ROW INSERT triggers.
2476 */
2477 if (resultRelInfo->ri_NumIndices > 0)
2478 {
2479 List *recheckIndexes;
2480
2481 cstate->cur_lineno = buffer->linenos[i];
2482 recheckIndexes =
2483 ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
2484 NIL);
2485 ExecARInsertTriggers(estate, resultRelInfo,
2486 slots[i], recheckIndexes,
2487 cstate->transition_capture);
2488 list_free(recheckIndexes);
2489 }
2490
2491 /*
2492 * There's no indexes, but see if we need to run AFTER ROW INSERT
2493 * triggers anyway.
2494 */
2495 else if (resultRelInfo->ri_TrigDesc != NULL &&
2496 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
2497 resultRelInfo->ri_TrigDesc->trig_insert_new_table))
2498 {
2499 cstate->cur_lineno = buffer->linenos[i];
2500 ExecARInsertTriggers(estate, resultRelInfo,
2501 slots[i], NIL, cstate->transition_capture);
2502 }
2503
2504 ExecClearTuple(slots[i]);
2505 }
2506
2507 /* Mark that all slots are free */
2508 buffer->nused = 0;
2509
2510 /* reset cur_lineno and line_buf_valid to what they were */
2511 cstate->line_buf_valid = line_buf_valid;
2512 cstate->cur_lineno = save_cur_lineno;
2513}
2514
2515/*
2516 * Drop used slots and free member for this buffer.
2517 *
2518 * The buffer must be flushed before cleanup.
2519 */
2520static inline void
2521CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
2522 CopyMultiInsertBuffer *buffer)
2523{
2524 int i;
2525
2526 /* Ensure buffer was flushed */
2527 Assert(buffer->nused == 0);
2528
2529 /* Remove back-link to ourself */
2530 buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
2531
2532 FreeBulkInsertState(buffer->bistate);
2533
2534 /* Since we only create slots on demand, just drop the non-null ones. */
2535 for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
2536 ExecDropSingleTupleTableSlot(buffer->slots[i]);
2537
2538 table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
2539 miinfo->ti_options);
2540
2541 pfree(buffer);
2542}
2543
2544/*
2545 * Write out all stored tuples in all buffers out to the tables.
2546 *
2547 * Once flushed we also trim the tracked buffers list down to size by removing
2548 * the buffers created earliest first.
2549 *
2550 * Callers should pass 'curr_rri' is the ResultRelInfo that's currently being
2551 * used. When cleaning up old buffers we'll never remove the one for
2552 * 'curr_rri'.
2553 */
2554static inline void
2555CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
2556{
2557 ListCell *lc;
2558
2559 foreach(lc, miinfo->multiInsertBuffers)
2560 {
2561 CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
2562
2563 CopyMultiInsertBufferFlush(miinfo, buffer);
2564 }
2565
2566 miinfo->bufferedTuples = 0;
2567 miinfo->bufferedBytes = 0;
2568
2569 /*
2570 * Trim the list of tracked buffers down if it exceeds the limit. Here we
2571 * remove buffers starting with the ones we created first. It seems more
2572 * likely that these older ones are less likely to be needed than ones
2573 * that were just created.
2574 */
2575 while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
2576 {
2577 CopyMultiInsertBuffer *buffer;
2578
2579 buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
2580
2581 /*
2582 * We never want to remove the buffer that's currently being used, so
2583 * if we happen to find that then move it to the end of the list.
2584 */
2585 if (buffer->resultRelInfo == curr_rri)
2586 {
2587 miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
2588 miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
2589 buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
2590 }
2591
2592 CopyMultiInsertBufferCleanup(miinfo, buffer);
2593 miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
2594 }
2595}
2596
2597/*
2598 * Cleanup allocated buffers and free memory
2599 */
2600static inline void
2601CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
2602{
2603 ListCell *lc;
2604
2605 foreach(lc, miinfo->multiInsertBuffers)
2606 CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
2607
2608 list_free(miinfo->multiInsertBuffers);
2609}
2610
2611/*
2612 * Get the next TupleTableSlot that the next tuple should be stored in.
2613 *
2614 * Callers must ensure that the buffer is not full.
2615 */
2616static inline TupleTableSlot *
2617CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
2618 ResultRelInfo *rri)
2619{
2620 CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
2621 int nused = buffer->nused;
2622
2623 Assert(buffer != NULL);
2624 Assert(nused < MAX_BUFFERED_TUPLES);
2625
2626 if (buffer->slots[nused] == NULL)
2627 buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
2628 return buffer->slots[nused];
2629}
2630
2631/*
2632 * Record the previously reserved TupleTableSlot that was reserved by
2633 * CopyMultiInsertInfoNextFreeSlot as being consumed.
2634 */
2635static inline void
2636CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
2637 TupleTableSlot *slot, int tuplen, uint64 lineno)
2638{
2639 CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
2640
2641 Assert(buffer != NULL);
2642 Assert(slot == buffer->slots[buffer->nused]);
2643
2644 /* Store the line number so we can properly report any errors later */
2645 buffer->linenos[buffer->nused] = lineno;
2646
2647 /* Record this slot as being used */
2648 buffer->nused++;
2649
2650 /* Update how many tuples are stored and their size */
2651 miinfo->bufferedTuples++;
2652 miinfo->bufferedBytes += tuplen;
2653}
2654
2655/*
2656 * Copy FROM file to relation.
2657 */
2658uint64
2659CopyFrom(CopyState cstate)
2660{
2661 ResultRelInfo *resultRelInfo;
2662 ResultRelInfo *target_resultRelInfo;
2663 ResultRelInfo *prevResultRelInfo = NULL;
2664 EState *estate = CreateExecutorState(); /* for ExecConstraints() */
2665 ModifyTableState *mtstate;
2666 ExprContext *econtext;
2667 TupleTableSlot *singleslot = NULL;
2668 MemoryContext oldcontext = CurrentMemoryContext;
2669
2670 PartitionTupleRouting *proute = NULL;
2671 ErrorContextCallback errcallback;
2672 CommandId mycid = GetCurrentCommandId(true);
2673 int ti_options = 0; /* start with default options for insert */
2674 BulkInsertState bistate = NULL;
2675 CopyInsertMethod insertMethod;
2676 CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
2677 uint64 processed = 0;
2678 bool has_before_insert_row_trig;
2679 bool has_instead_insert_row_trig;
2680 bool leafpart_use_multi_insert = false;
2681
2682 Assert(cstate->rel);
2683
2684 /*
2685 * The target must be a plain, foreign, or partitioned relation, or have
2686 * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
2687 * allowed on views, so we only hint about them in the view case.)
2688 */
2689 if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
2690 cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
2691 cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
2692 !(cstate->rel->trigdesc &&
2693 cstate->rel->trigdesc->trig_insert_instead_row))
2694 {
2695 if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
2696 ereport(ERROR,
2697 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2698 errmsg("cannot copy to view \"%s\"",
2699 RelationGetRelationName(cstate->rel)),
2700 errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
2701 else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
2702 ereport(ERROR,
2703 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2704 errmsg("cannot copy to materialized view \"%s\"",
2705 RelationGetRelationName(cstate->rel))));
2706 else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
2707 ereport(ERROR,
2708 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2709 errmsg("cannot copy to sequence \"%s\"",
2710 RelationGetRelationName(cstate->rel))));
2711 else
2712 ereport(ERROR,
2713 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2714 errmsg("cannot copy to non-table relation \"%s\"",
2715 RelationGetRelationName(cstate->rel))));
2716 }
2717
2718 /*----------
2719 * Check to see if we can avoid writing WAL
2720 *
2721 * If archive logging/streaming is not enabled *and* either
2722 * - table was created in same transaction as this COPY
2723 * - data is being written to relfilenode created in this transaction
2724 * then we can skip writing WAL. It's safe because if the transaction
2725 * doesn't commit, we'll discard the table (or the new relfilenode file).
2726 * If it does commit, we'll have done the table_finish_bulk_insert() at
2727 * the bottom of this routine first.
2728 *
2729 * As mentioned in comments in utils/rel.h, the in-same-transaction test
2730 * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
2731 * can be cleared before the end of the transaction. The exact case is
2732 * when a relation sets a new relfilenode twice in same transaction, yet
2733 * the second one fails in an aborted subtransaction, e.g.
2734 *
2735 * BEGIN;
2736 * TRUNCATE t;
2737 * SAVEPOINT save;
2738 * TRUNCATE t;
2739 * ROLLBACK TO save;
2740 * COPY ...
2741 *
2742 * Also, if the target file is new-in-transaction, we assume that checking
2743 * FSM for free space is a waste of time, even if we must use WAL because
2744 * of archiving. This could possibly be wrong, but it's unlikely.
2745 *
2746 * The comments for table_tuple_insert and RelationGetBufferForTuple
2747 * specify that skipping WAL logging is only safe if we ensure that our
2748 * tuples do not go into pages containing tuples from any other
2749 * transactions --- but this must be the case if we have a new table or
2750 * new relfilenode, so we need no additional work to enforce that.
2751 *
2752 * We currently don't support this optimization if the COPY target is a
2753 * partitioned table as we currently only lazily initialize partition
2754 * information when routing the first tuple to the partition. We cannot
2755 * know at this stage if we can perform this optimization. It should be
2756 * possible to improve on this, but it does mean maintaining heap insert
2757 * option flags per partition and setting them when we first open the
2758 * partition.
2759 *
2760 * This optimization is not supported for relation types which do not
2761 * have any physical storage, with foreign tables and views using
2762 * INSTEAD OF triggers entering in this category. Partitioned tables
2763 * are not supported as per the description above.
2764 *----------
2765 */
2766 /* createSubid is creation check, newRelfilenodeSubid is truncation check */
2767 if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
2768 (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
2769 cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId))
2770 {
2771 ti_options |= TABLE_INSERT_SKIP_FSM;
2772 if (!XLogIsNeeded())
2773 ti_options |= TABLE_INSERT_SKIP_WAL;
2774 }
2775
2776 /*
2777 * Optimize if new relfilenode was created in this subxact or one of its
2778 * committed children and we won't see those rows later as part of an
2779 * earlier scan or command. The subxact test ensures that if this subxact
2780 * aborts then the frozen rows won't be visible after xact cleanup. Note
2781 * that the stronger test of exactly which subtransaction created it is
2782 * crucial for correctness of this optimization. The test for an earlier
2783 * scan or command tolerates false negatives. FREEZE causes other sessions
2784 * to see rows they would not see under MVCC, and a false negative merely
2785 * spreads that anomaly to the current session.
2786 */
2787 if (cstate->freeze)
2788 {
2789 /*
2790 * We currently disallow COPY FREEZE on partitioned tables. The
2791 * reason for this is that we've simply not yet opened the partitions
2792 * to determine if the optimization can be applied to them. We could
2793 * go and open them all here, but doing so may be quite a costly
2794 * overhead for small copies. In any case, we may just end up routing
2795 * tuples to a small number of partitions. It seems better just to
2796 * raise an ERROR for partitioned tables.
2797 */
2798 if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2799 {
2800 ereport(ERROR,
2801 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2802 errmsg("cannot perform COPY FREEZE on a partitioned table")));
2803 }
2804
2805 /*
2806 * Tolerate one registration for the benefit of FirstXactSnapshot.
2807 * Scan-bearing queries generally create at least two registrations,
2808 * though relying on that is fragile, as is ignoring ActiveSnapshot.
2809 * Clear CatalogSnapshot to avoid counting its registration. We'll
2810 * still detect ongoing catalog scans, each of which separately
2811 * registers the snapshot it uses.
2812 */
2813 InvalidateCatalogSnapshot();
2814 if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
2815 ereport(ERROR,
2816 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2817 errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
2818
2819 if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
2820 cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
2821 ereport(ERROR,
2822 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2823 errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
2824
2825 ti_options |= TABLE_INSERT_FROZEN;
2826 }
2827
2828 /*
2829 * We need a ResultRelInfo so we can use the regular executor's
2830 * index-entry-making machinery. (There used to be a huge amount of code
2831 * here that basically duplicated execUtils.c ...)
2832 */
2833 resultRelInfo = makeNode(ResultRelInfo);
2834 InitResultRelInfo(resultRelInfo,
2835 cstate->rel,
2836 1, /* must match rel's position in range_table */
2837 NULL,
2838 0);
2839 target_resultRelInfo = resultRelInfo;
2840
2841 /* Verify the named relation is a valid target for INSERT */
2842 CheckValidResultRel(resultRelInfo, CMD_INSERT);
2843
2844 ExecOpenIndices(resultRelInfo, false);
2845
2846 estate->es_result_relations = resultRelInfo;
2847 estate->es_num_result_relations = 1;
2848 estate->es_result_relation_info = resultRelInfo;
2849
2850 ExecInitRangeTable(estate, cstate->range_table);
2851
2852 /*
2853 * Set up a ModifyTableState so we can let FDW(s) init themselves for
2854 * foreign-table result relation(s).
2855 */
2856 mtstate = makeNode(ModifyTableState);
2857 mtstate->ps.plan = NULL;
2858 mtstate->ps.state = estate;
2859 mtstate->operation = CMD_INSERT;
2860 mtstate->resultRelInfo = estate->es_result_relations;
2861
2862 if (resultRelInfo->ri_FdwRoutine != NULL &&
2863 resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
2864 resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
2865 resultRelInfo);
2866
2867 /* Prepare to catch AFTER triggers. */
2868 AfterTriggerBeginQuery();
2869
2870 /*
2871 * If there are any triggers with transition tables on the named relation,
2872 * we need to be prepared to capture transition tuples.
2873 *
2874 * Because partition tuple routing would like to know about whether
2875 * transition capture is active, we also set it in mtstate, which is
2876 * passed to ExecFindPartition() below.
2877 */
2878 cstate->transition_capture = mtstate->mt_transition_capture =
2879 MakeTransitionCaptureState(cstate->rel->trigdesc,
2880 RelationGetRelid(cstate->rel),
2881 CMD_INSERT);
2882
2883 /*
2884 * If the named relation is a partitioned table, initialize state for
2885 * CopyFrom tuple routing.
2886 */
2887 if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2888 proute = ExecSetupPartitionTupleRouting(estate, NULL, cstate->rel);
2889
2890 if (cstate->whereClause)
2891 cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
2892 &mtstate->ps);
2893
2894 /*
2895 * It's generally more efficient to prepare a bunch of tuples for
2896 * insertion, and insert them in one table_multi_insert() call, than call
2897 * table_tuple_insert() separately for every tuple. However, there are a
2898 * number of reasons why we might not be able to do this. These are
2899 * explained below.
2900 */
2901 if (resultRelInfo->ri_TrigDesc != NULL &&
2902 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2903 resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
2904 {
2905 /*
2906 * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
2907 * triggers on the table. Such triggers might query the table we're
2908 * inserting into and act differently if the tuples that have already
2909 * been processed and prepared for insertion are not there.
2910 */
2911 insertMethod = CIM_SINGLE;
2912 }
2913 else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
2914 resultRelInfo->ri_TrigDesc->trig_insert_new_table)
2915 {
2916 /*
2917 * For partitioned tables we can't support multi-inserts when there
2918 * are any statement level insert triggers. It might be possible to
2919 * allow partitioned tables with such triggers in the future, but for
2920 * now, CopyMultiInsertInfoFlush expects that any before row insert
2921 * and statement level insert triggers are on the same relation.
2922 */
2923 insertMethod = CIM_SINGLE;
2924 }
2925 else if (resultRelInfo->ri_FdwRoutine != NULL ||
2926 cstate->volatile_defexprs)
2927 {
2928 /*
2929 * Can't support multi-inserts to foreign tables or if there are any
2930 * volatile default expressions in the table. Similarly to the
2931 * trigger case above, such expressions may query the table we're
2932 * inserting into.
2933 *
2934 * Note: It does not matter if any partitions have any volatile
2935 * default expressions as we use the defaults from the target of the
2936 * COPY command.
2937 */
2938 insertMethod = CIM_SINGLE;
2939 }
2940 else if (contain_volatile_functions(cstate->whereClause))
2941 {
2942 /*
2943 * Can't support multi-inserts if there are any volatile function
2944 * expressions in WHERE clause. Similarly to the trigger case above,
2945 * such expressions may query the table we're inserting into.
2946 */
2947 insertMethod = CIM_SINGLE;
2948 }
2949 else
2950 {
2951 /*
2952 * For partitioned tables, we may still be able to perform bulk
2953 * inserts. However, the possibility of this depends on which types
2954 * of triggers exist on the partition. We must disable bulk inserts
2955 * if the partition is a foreign table or it has any before row insert
2956 * or insert instead triggers (same as we checked above for the parent
2957 * table). Since the partition's resultRelInfos are initialized only
2958 * when we actually need to insert the first tuple into them, we must
2959 * have the intermediate insert method of CIM_MULTI_CONDITIONAL to
2960 * flag that we must later determine if we can use bulk-inserts for
2961 * the partition being inserted into.
2962 */
2963 if (proute)
2964 insertMethod = CIM_MULTI_CONDITIONAL;
2965 else
2966 insertMethod = CIM_MULTI;
2967
2968 CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
2969 estate, mycid, ti_options);
2970 }
2971
2972 /*
2973 * If not using batch mode (which allocates slots as needed) set up a
2974 * tuple slot too. When inserting into a partitioned table, we also need
2975 * one, even if we might batch insert, to read the tuple in the root
2976 * partition's form.
2977 */
2978 if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
2979 {
2980 singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
2981 &estate->es_tupleTable);
2982 bistate = GetBulkInsertState();
2983 }
2984
2985 has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
2986 resultRelInfo->ri_TrigDesc->trig_insert_before_row);
2987
2988 has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
2989 resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
2990
2991 /*
2992 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
2993 * should do this for COPY, since it's not really an "INSERT" statement as
2994 * such. However, executing these triggers maintains consistency with the
2995 * EACH ROW triggers that we already fire on COPY.
2996 */
2997 ExecBSInsertTriggers(estate, resultRelInfo);
2998
2999 econtext = GetPerTupleExprContext(estate);
3000
3001 /* Set up callback to identify error line number */
3002 errcallback.callback = CopyFromErrorCallback;
3003 errcallback.arg = (void *) cstate;
3004 errcallback.previous = error_context_stack;
3005 error_context_stack = &errcallback;
3006
3007 for (;;)
3008 {
3009 TupleTableSlot *myslot;
3010 bool skip_tuple;
3011
3012 CHECK_FOR_INTERRUPTS();
3013
3014 /*
3015 * Reset the per-tuple exprcontext. We do this after every tuple, to
3016 * clean-up after expression evaluations etc.
3017 */
3018 ResetPerTupleExprContext(estate);
3019
3020 /* select slot to (initially) load row into */
3021 if (insertMethod == CIM_SINGLE || proute)
3022 {
3023 myslot = singleslot;
3024 Assert(myslot != NULL);
3025 }
3026 else
3027 {
3028 Assert(resultRelInfo == target_resultRelInfo);
3029 Assert(insertMethod == CIM_MULTI);
3030
3031 myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
3032 resultRelInfo);
3033 }
3034
3035 /*
3036 * Switch to per-tuple context before calling NextCopyFrom, which does
3037 * evaluate default expressions etc. and requires per-tuple context.
3038 */
3039 MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3040
3041 ExecClearTuple(myslot);
3042
3043 /* Directly store the values/nulls array in the slot */
3044 if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
3045 break;
3046
3047 ExecStoreVirtualTuple(myslot);
3048
3049 /*
3050 * Constraints and where clause might reference the tableoid column,
3051 * so (re-)initialize tts_tableOid before evaluating them.
3052 */
3053 myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
3054
3055 /* Triggers and stuff need to be invoked in query context. */
3056 MemoryContextSwitchTo(oldcontext);
3057
3058 if (cstate->whereClause)
3059 {
3060 econtext->ecxt_scantuple = myslot;
3061 /* Skip items that don't match COPY's WHERE clause */
3062 if (!ExecQual(cstate->qualexpr, econtext))
3063 continue;
3064 }
3065
3066 /* Determine the partition to insert the tuple into */
3067 if (proute)
3068 {
3069 TupleConversionMap *map;
3070
3071 /*
3072 * Attempt to find a partition suitable for this tuple.
3073 * ExecFindPartition() will raise an error if none can be found or
3074 * if the found partition is not suitable for INSERTs.
3075 */
3076 resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
3077 proute, myslot, estate);
3078
3079 if (prevResultRelInfo != resultRelInfo)
3080 {
3081 /* Determine which triggers exist on this partition */
3082 has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
3083 resultRelInfo->ri_TrigDesc->trig_insert_before_row);
3084
3085 has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
3086 resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
3087
3088 /*
3089 * Disable multi-inserts when the partition has BEFORE/INSTEAD
3090 * OF triggers, or if the partition is a foreign partition.
3091 */
3092 leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
3093 !has_before_insert_row_trig &&
3094 !has_instead_insert_row_trig &&
3095 resultRelInfo->ri_FdwRoutine == NULL;
3096
3097 /* Set the multi-insert buffer to use for this partition. */
3098 if (leafpart_use_multi_insert)
3099 {
3100 if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
3101 CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
3102 resultRelInfo);
3103 }
3104 else if (insertMethod == CIM_MULTI_CONDITIONAL &&
3105 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
3106 {
3107 /*
3108 * Flush pending inserts if this partition can't use
3109 * batching, so rows are visible to triggers etc.
3110 */
3111 CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
3112 }
3113
3114 if (bistate != NULL)
3115 ReleaseBulkInsertStatePin(bistate);
3116 prevResultRelInfo = resultRelInfo;
3117 }
3118
3119 /*
3120 * For ExecInsertIndexTuples() to work on the partition's indexes
3121 */
3122 estate->es_result_relation_info = resultRelInfo;
3123
3124 /*
3125 * If we're capturing transition tuples, we might need to convert
3126 * from the partition rowtype to root rowtype.
3127 */
3128 if (cstate->transition_capture != NULL)
3129 {
3130 if (has_before_insert_row_trig)
3131 {
3132 /*
3133 * If there are any BEFORE triggers on the partition,
3134 * we'll have to be ready to convert their result back to
3135 * tuplestore format.
3136 */
3137 cstate->transition_capture->tcs_original_insert_tuple = NULL;
3138 cstate->transition_capture->tcs_map =
3139 resultRelInfo->ri_PartitionInfo->pi_PartitionToRootMap;
3140 }
3141 else
3142 {
3143 /*
3144 * Otherwise, just remember the original unconverted
3145 * tuple, to avoid a needless round trip conversion.
3146 */
3147 cstate->transition_capture->tcs_original_insert_tuple = myslot;
3148 cstate->transition_capture->tcs_map = NULL;
3149 }
3150 }
3151
3152 /*
3153 * We might need to convert from the root rowtype to the partition
3154 * rowtype.
3155 */
3156 map = resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap;
3157 if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
3158 {
3159 /* non batch insert */
3160 if (map != NULL)
3161 {
3162 TupleTableSlot *new_slot;
3163
3164 new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
3165 myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
3166 }
3167 }
3168 else
3169 {
3170 /*
3171 * Prepare to queue up tuple for later batch insert into
3172 * current partition.
3173 */
3174 TupleTableSlot *batchslot;
3175
3176 /* no other path available for partitioned table */
3177 Assert(insertMethod == CIM_MULTI_CONDITIONAL);
3178
3179 batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
3180 resultRelInfo);
3181
3182 if (map != NULL)
3183 myslot = execute_attr_map_slot(map->attrMap, myslot,
3184 batchslot);
3185 else
3186 {
3187 /*
3188 * This looks more expensive than it is (Believe me, I
3189 * optimized it away. Twice.). The input is in virtual
3190 * form, and we'll materialize the slot below - for most
3191 * slot types the copy performs the work materialization
3192 * would later require anyway.
3193 */
3194 ExecCopySlot(batchslot, myslot);
3195 myslot = batchslot;
3196 }
3197 }
3198
3199 /* ensure that triggers etc see the right relation */
3200 myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
3201 }
3202
3203 skip_tuple = false;
3204
3205 /* BEFORE ROW INSERT Triggers */
3206 if (has_before_insert_row_trig)
3207 {
3208 if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
3209 skip_tuple = true; /* "do nothing" */
3210 }
3211
3212 if (!skip_tuple)
3213 {
3214 /*
3215 * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
3216 * tuple. Otherwise, proceed with inserting the tuple into the
3217 * table or foreign table.
3218 */
3219 if (has_instead_insert_row_trig)
3220 {
3221 ExecIRInsertTriggers(estate, resultRelInfo, myslot);
3222 }
3223 else
3224 {
3225 /* Compute stored generated columns */
3226 if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
3227 resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
3228 ExecComputeStoredGenerated(estate, myslot);
3229
3230 /*
3231 * If the target is a plain table, check the constraints of
3232 * the tuple.
3233 */
3234 if (resultRelInfo->ri_FdwRoutine == NULL &&
3235 resultRelInfo->ri_RelationDesc->rd_att->constr)
3236 ExecConstraints(resultRelInfo, myslot, estate);
3237
3238 /*
3239 * Also check the tuple against the partition constraint, if
3240 * there is one; except that if we got here via tuple-routing,
3241 * we don't need to if there's no BR trigger defined on the
3242 * partition.
3243 */
3244 if (resultRelInfo->ri_PartitionCheck &&
3245 (proute == NULL || has_before_insert_row_trig))
3246 ExecPartitionCheck(resultRelInfo, myslot, estate, true);
3247
3248 /* Store the slot in the multi-insert buffer, when enabled. */
3249 if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
3250 {
3251 /*
3252 * The slot previously might point into the per-tuple
3253 * context. For batching it needs to be longer lived.
3254 */
3255 ExecMaterializeSlot(myslot);
3256
3257 /* Add this tuple to the tuple buffer */
3258 CopyMultiInsertInfoStore(&multiInsertInfo,
3259 resultRelInfo, myslot,
3260 cstate->line_buf.len,
3261 cstate->cur_lineno);
3262
3263 /*
3264 * If enough inserts have queued up, then flush all
3265 * buffers out to their tables.
3266 */
3267 if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
3268 CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
3269 }
3270 else
3271 {
3272 List *recheckIndexes = NIL;
3273
3274 /* OK, store the tuple */
3275 if (resultRelInfo->ri_FdwRoutine != NULL)
3276 {
3277 myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
3278 resultRelInfo,
3279 myslot,
3280 NULL);
3281
3282 if (myslot == NULL) /* "do nothing" */
3283 continue; /* next tuple please */
3284
3285 /*
3286 * AFTER ROW Triggers might reference the tableoid
3287 * column, so (re-)initialize tts_tableOid before
3288 * evaluating them.
3289 */
3290 myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
3291 }
3292 else
3293 {
3294 /* OK, store the tuple and create index entries for it */
3295 table_tuple_insert(resultRelInfo->ri_RelationDesc,
3296 myslot, mycid, ti_options, bistate);
3297
3298 if (resultRelInfo->ri_NumIndices > 0)
3299 recheckIndexes = ExecInsertIndexTuples(myslot,
3300 estate,
3301 false,
3302 NULL,
3303 NIL);
3304 }
3305
3306 /* AFTER ROW INSERT Triggers */
3307 ExecARInsertTriggers(estate, resultRelInfo, myslot,
3308 recheckIndexes, cstate->transition_capture);
3309
3310 list_free(recheckIndexes);
3311 }
3312 }
3313
3314 /*
3315 * We count only tuples not suppressed by a BEFORE INSERT trigger
3316 * or FDW; this is the same definition used by nodeModifyTable.c
3317 * for counting tuples inserted by an INSERT command.
3318 */
3319 processed++;
3320 }
3321 }
3322
3323 /* Flush any remaining buffered tuples */
3324 if (insertMethod != CIM_SINGLE)
3325 {
3326 if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
3327 CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
3328 }
3329
3330 /* Done, clean up */
3331 error_context_stack = errcallback.previous;
3332
3333 if (bistate != NULL)
3334 FreeBulkInsertState(bistate);
3335
3336 MemoryContextSwitchTo(oldcontext);
3337
3338 /*
3339 * In the old protocol, tell pqcomm that we can process normal protocol
3340 * messages again.
3341 */
3342 if (cstate->copy_dest == COPY_OLD_FE)
3343 pq_endmsgread();
3344
3345 /* Execute AFTER STATEMENT insertion triggers */
3346 ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
3347
3348 /* Handle queued AFTER triggers */
3349 AfterTriggerEndQuery(estate);
3350
3351 ExecResetTupleTable(estate->es_tupleTable, false);
3352
3353 /* Allow the FDW to shut down */
3354 if (target_resultRelInfo->ri_FdwRoutine != NULL &&
3355 target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
3356 target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
3357 target_resultRelInfo);
3358
3359 /* Tear down the multi-insert buffer data */
3360 if (insertMethod != CIM_SINGLE)
3361 CopyMultiInsertInfoCleanup(&multiInsertInfo);
3362
3363 ExecCloseIndices(target_resultRelInfo);
3364
3365 /* Close all the partitioned tables, leaf partitions, and their indices */
3366 if (proute)
3367 ExecCleanupTupleRouting(mtstate, proute);
3368
3369 /* Close any trigger target relations */
3370 ExecCleanUpTriggerState(estate);
3371
3372 FreeExecutorState(estate);
3373
3374 return processed;
3375}
3376
3377/*
3378 * Setup to read tuples from a file for COPY FROM.
3379 *
3380 * 'rel': Used as a template for the tuples
3381 * 'filename': Name of server-local file to read
3382 * 'attnamelist': List of char *, columns to include. NIL selects all cols.
3383 * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
3384 *
3385 * Returns a CopyState, to be passed to NextCopyFrom and related functions.
3386 */
3387CopyState
3388BeginCopyFrom(ParseState *pstate,
3389 Relation rel,
3390 const char *filename,
3391 bool is_program,
3392 copy_data_source_cb data_source_cb,
3393 List *attnamelist,
3394 List *options)
3395{
3396 CopyState cstate;
3397 bool pipe = (filename == NULL);
3398 TupleDesc tupDesc;
3399 AttrNumber num_phys_attrs,
3400 num_defaults;
3401 FmgrInfo *in_functions;
3402 Oid *typioparams;
3403 int attnum;
3404 Oid in_func_oid;
3405 int *defmap;
3406 ExprState **defexprs;
3407 MemoryContext oldcontext;
3408 bool volatile_defexprs;
3409
3410 cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
3411 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
3412
3413 /* Initialize state variables */
3414 cstate->reached_eof = false;
3415 cstate->eol_type = EOL_UNKNOWN;
3416 cstate->cur_relname = RelationGetRelationName(cstate->rel);
3417 cstate->cur_lineno = 0;
3418 cstate->cur_attname = NULL;
3419 cstate->cur_attval = NULL;
3420
3421 /* Set up variables to avoid per-attribute overhead. */
3422 initStringInfo(&cstate->attribute_buf);
3423 initStringInfo(&cstate->line_buf);
3424 cstate->line_buf_converted = false;
3425 cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
3426 cstate->raw_buf_index = cstate->raw_buf_len = 0;
3427
3428 /* Assign range table, we'll need it in CopyFrom. */
3429 if (pstate)
3430 cstate->range_table = pstate->p_rtable;
3431
3432 tupDesc = RelationGetDescr(cstate->rel);
3433 num_phys_attrs = tupDesc->natts;
3434 num_defaults = 0;
3435 volatile_defexprs = false;
3436
3437 /*
3438 * Pick up the required catalog information for each attribute in the
3439 * relation, including the input function, the element type (to pass to
3440 * the input function), and info about defaults and constraints. (Which
3441 * input function we use depends on text/binary format choice.)
3442 */
3443 in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
3444 typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
3445 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
3446 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
3447
3448 for (attnum = 1; attnum <= num_phys_attrs; attnum++)
3449 {
3450 Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
3451
3452 /* We don't need info for dropped attributes */
3453 if (att->attisdropped)
3454 continue;
3455
3456 /* Fetch the input function and typioparam info */
3457 if (cstate->binary)
3458 getTypeBinaryInputInfo(att->atttypid,
3459 &in_func_oid, &typioparams[attnum - 1]);
3460 else
3461 getTypeInputInfo(att->atttypid,
3462 &in_func_oid, &typioparams[attnum - 1]);
3463 fmgr_info(in_func_oid, &in_functions[attnum - 1]);
3464
3465 /* Get default info if needed */
3466 if (!list_member_int(cstate->attnumlist, attnum) && !att->attgenerated)
3467 {
3468 /* attribute is NOT to be copied from input */
3469 /* use default value if one exists */
3470 Expr *defexpr = (Expr *) build_column_default(cstate->rel,
3471 attnum);
3472
3473 if (defexpr != NULL)
3474 {
3475 /* Run the expression through planner */
3476 defexpr = expression_planner(defexpr);
3477
3478 /* Initialize executable expression in copycontext */
3479 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
3480 defmap[num_defaults] = attnum - 1;
3481 num_defaults++;
3482
3483 /*
3484 * If a default expression looks at the table being loaded,
3485 * then it could give the wrong answer when using
3486 * multi-insert. Since database access can be dynamic this is
3487 * hard to test for exactly, so we use the much wider test of
3488 * whether the default expression is volatile. We allow for
3489 * the special case of when the default expression is the
3490 * nextval() of a sequence which in this specific case is
3491 * known to be safe for use with the multi-insert
3492 * optimization. Hence we use this special case function
3493 * checker rather than the standard check for
3494 * contain_volatile_functions().
3495 */
3496 if (!volatile_defexprs)
3497 volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
3498 }
3499 }
3500 }
3501
3502 /* We keep those variables in cstate. */
3503 cstate->in_functions = in_functions;
3504 cstate->typioparams = typioparams;
3505 cstate->defmap = defmap;
3506 cstate->defexprs = defexprs;
3507 cstate->volatile_defexprs = volatile_defexprs;
3508 cstate->num_defaults = num_defaults;
3509 cstate->is_program = is_program;
3510
3511 if (data_source_cb)
3512 {
3513 cstate->copy_dest = COPY_CALLBACK;
3514 cstate->data_source_cb = data_source_cb;
3515 }
3516 else if (pipe)
3517 {
3518 Assert(!is_program); /* the grammar does not allow this */
3519 if (whereToSendOutput == DestRemote)
3520 ReceiveCopyBegin(cstate);
3521 else
3522 cstate->copy_file = stdin;
3523 }
3524 else
3525 {
3526 cstate->filename = pstrdup(filename);
3527
3528 if (cstate->is_program)
3529 {
3530 cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
3531 if (cstate->copy_file == NULL)
3532 ereport(ERROR,
3533 (errcode_for_file_access(),
3534 errmsg("could not execute command \"%s\": %m",
3535 cstate->filename)));
3536 }
3537 else
3538 {
3539 struct stat st;
3540
3541 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
3542 if (cstate->copy_file == NULL)
3543 {
3544 /* copy errno because ereport subfunctions might change it */
3545 int save_errno = errno;
3546
3547 ereport(ERROR,
3548 (errcode_for_file_access(),
3549 errmsg("could not open file \"%s\" for reading: %m",
3550 cstate->filename),
3551 (save_errno == ENOENT || save_errno == EACCES) ?
3552 errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
3553 "You may want a client-side facility such as psql's \\copy.") : 0));
3554 }
3555
3556 if (fstat(fileno(cstate->copy_file), &st))
3557 ereport(ERROR,
3558 (errcode_for_file_access(),
3559 errmsg("could not stat file \"%s\": %m",
3560 cstate->filename)));
3561
3562 if (S_ISDIR(st.st_mode))
3563 ereport(ERROR,
3564 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3565 errmsg("\"%s\" is a directory", cstate->filename)));
3566 }
3567 }
3568
3569 if (cstate->binary)
3570 {
3571 /* Read and verify binary header */
3572 char readSig[11];
3573 int32 tmp;
3574
3575 /* Signature */
3576 if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
3577 memcmp(readSig, BinarySignature, 11) != 0)
3578 ereport(ERROR,
3579 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3580 errmsg("COPY file signature not recognized")));
3581 /* Flags field */
3582 if (!CopyGetInt32(cstate, &tmp))
3583 ereport(ERROR,
3584 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3585 errmsg("invalid COPY file header (missing flags)")));
3586 if ((tmp & (1 << 16)) != 0)
3587 ereport(ERROR,
3588 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3589 errmsg("invalid COPY file header (WITH OIDS)")));
3590 tmp &= ~(1 << 16);
3591 if ((tmp >> 16) != 0)
3592 ereport(ERROR,
3593 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3594 errmsg("unrecognized critical flags in COPY file header")));
3595 /* Header extension length */
3596 if (!CopyGetInt32(cstate, &tmp) ||
3597 tmp < 0)
3598 ereport(ERROR,
3599 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3600 errmsg("invalid COPY file header (missing length)")));
3601 /* Skip extension header, if present */
3602 while (tmp-- > 0)
3603 {
3604 if (CopyGetData(cstate, readSig, 1, 1) != 1)
3605 ereport(ERROR,
3606 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3607 errmsg("invalid COPY file header (wrong length)")));
3608 }
3609 }
3610
3611 /* create workspace for CopyReadAttributes results */
3612 if (!cstate->binary)
3613 {
3614 AttrNumber attr_count = list_length(cstate->attnumlist);
3615
3616 cstate->max_fields = attr_count;
3617 cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
3618 }
3619
3620 MemoryContextSwitchTo(oldcontext);
3621
3622 return cstate;
3623}
3624
3625/*
3626 * Read raw fields in the next line for COPY FROM in text or csv mode.
3627 * Return false if no more lines.
3628 *
3629 * An internal temporary buffer is returned via 'fields'. It is valid until
3630 * the next call of the function. Since the function returns all raw fields
3631 * in the input file, 'nfields' could be different from the number of columns
3632 * in the relation.
3633 *
3634 * NOTE: force_not_null option are not applied to the returned fields.
3635 */
3636bool
3637NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
3638{
3639 int fldct;
3640 bool done;
3641
3642 /* only available for text or csv input */
3643 Assert(!cstate->binary);
3644
3645 /* on input just throw the header line away */
3646 if (cstate->cur_lineno == 0 && cstate->header_line)
3647 {
3648 cstate->cur_lineno++;
3649 if (CopyReadLine(cstate))
3650 return false; /* done */
3651 }
3652
3653 cstate->cur_lineno++;
3654
3655 /* Actually read the line into memory here */
3656 done = CopyReadLine(cstate);
3657
3658 /*
3659 * EOF at start of line means we're done. If we see EOF after some
3660 * characters, we act as though it was newline followed by EOF, ie,
3661 * process the line and then exit loop on next iteration.
3662 */
3663 if (done && cstate->line_buf.len == 0)
3664 return false;
3665
3666 /* Parse the line into de-escaped field values */
3667 if (cstate->csv_mode)
3668 fldct = CopyReadAttributesCSV(cstate);
3669 else
3670 fldct = CopyReadAttributesText(cstate);
3671
3672 *fields = cstate->raw_fields;
3673 *nfields = fldct;
3674 return true;
3675}
3676
3677/*
3678 * Read next tuple from file for COPY FROM. Return false if no more tuples.
3679 *
3680 * 'econtext' is used to evaluate default expression for each columns not
3681 * read from the file. It can be NULL when no default values are used, i.e.
3682 * when all columns are read from the file.
3683 *
3684 * 'values' and 'nulls' arrays must be the same length as columns of the
3685 * relation passed to BeginCopyFrom. This function fills the arrays.
3686 * Oid of the tuple is returned with 'tupleOid' separately.
3687 */
3688bool
3689NextCopyFrom(CopyState cstate, ExprContext *econtext,
3690 Datum *values, bool *nulls)
3691{
3692 TupleDesc tupDesc;
3693 AttrNumber num_phys_attrs,
3694 attr_count,
3695 num_defaults = cstate->num_defaults;
3696 FmgrInfo *in_functions = cstate->in_functions;
3697 Oid *typioparams = cstate->typioparams;
3698 int i;
3699 int *defmap = cstate->defmap;
3700 ExprState **defexprs = cstate->defexprs;
3701
3702 tupDesc = RelationGetDescr(cstate->rel);
3703 num_phys_attrs = tupDesc->natts;
3704 attr_count = list_length(cstate->attnumlist);
3705
3706 /* Initialize all values for row to NULL */
3707 MemSet(values, 0, num_phys_attrs * sizeof(Datum));
3708 MemSet(nulls, true, num_phys_attrs * sizeof(bool));
3709
3710 if (!cstate->binary)
3711 {
3712 char **field_strings;
3713 ListCell *cur;
3714 int fldct;
3715 int fieldno;
3716 char *string;
3717
3718 /* read raw fields in the next line */
3719 if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
3720 return false;
3721
3722 /* check for overflowing fields */
3723 if (attr_count > 0 && fldct > attr_count)
3724 ereport(ERROR,
3725 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3726 errmsg("extra data after last expected column")));
3727
3728 fieldno = 0;
3729
3730 /* Loop to read the user attributes on the line. */
3731 foreach(cur, cstate->attnumlist)
3732 {
3733 int attnum = lfirst_int(cur);
3734 int m = attnum - 1;
3735 Form_pg_attribute att = TupleDescAttr(tupDesc, m);
3736
3737 if (fieldno >= fldct)
3738 ereport(ERROR,
3739 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3740 errmsg("missing data for column \"%s\"",
3741 NameStr(att->attname))));
3742 string = field_strings[fieldno++];
3743
3744 if (cstate->convert_select_flags &&
3745 !cstate->convert_select_flags[m])
3746 {
3747 /* ignore input field, leaving column as NULL */
3748 continue;
3749 }
3750
3751 if (cstate->csv_mode)
3752 {
3753 if (string == NULL &&
3754 cstate->force_notnull_flags[m])
3755 {
3756 /*
3757 * FORCE_NOT_NULL option is set and column is NULL -
3758 * convert it to the NULL string.
3759 */
3760 string = cstate->null_print;
3761 }
3762 else if (string != NULL && cstate->force_null_flags[m]
3763 && strcmp(string, cstate->null_print) == 0)
3764 {
3765 /*
3766 * FORCE_NULL option is set and column matches the NULL
3767 * string. It must have been quoted, or otherwise the
3768 * string would already have been set to NULL. Convert it
3769 * to NULL as specified.
3770 */
3771 string = NULL;
3772 }
3773 }
3774
3775 cstate->cur_attname = NameStr(att->attname);
3776 cstate->cur_attval = string;
3777 values[m] = InputFunctionCall(&in_functions[m],
3778 string,
3779 typioparams[m],
3780 att->atttypmod);
3781 if (string != NULL)
3782 nulls[m] = false;
3783 cstate->cur_attname = NULL;
3784 cstate->cur_attval = NULL;
3785 }
3786
3787 Assert(fieldno == attr_count);
3788 }
3789 else
3790 {
3791 /* binary */
3792 int16 fld_count;
3793 ListCell *cur;
3794
3795 cstate->cur_lineno++;
3796
3797 if (!CopyGetInt16(cstate, &fld_count))
3798 {
3799 /* EOF detected (end of file, or protocol-level EOF) */
3800 return false;
3801 }
3802
3803 if (fld_count == -1)
3804 {
3805 /*
3806 * Received EOF marker. In a V3-protocol copy, wait for the
3807 * protocol-level EOF, and complain if it doesn't come
3808 * immediately. This ensures that we correctly handle CopyFail,
3809 * if client chooses to send that now.
3810 *
3811 * Note that we MUST NOT try to read more data in an old-protocol
3812 * copy, since there is no protocol-level EOF marker then. We
3813 * could go either way for copy from file, but choose to throw
3814 * error if there's data after the EOF marker, for consistency
3815 * with the new-protocol case.
3816 */
3817 char dummy;
3818
3819 if (cstate->copy_dest != COPY_OLD_FE &&
3820 CopyGetData(cstate, &dummy, 1, 1) > 0)
3821 ereport(ERROR,
3822 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3823 errmsg("received copy data after EOF marker")));
3824 return false;
3825 }
3826
3827 if (fld_count != attr_count)
3828 ereport(ERROR,
3829 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3830 errmsg("row field count is %d, expected %d",
3831 (int) fld_count, attr_count)));
3832
3833 i = 0;
3834 foreach(cur, cstate->attnumlist)
3835 {
3836 int attnum = lfirst_int(cur);
3837 int m = attnum - 1;
3838 Form_pg_attribute att = TupleDescAttr(tupDesc, m);
3839
3840 cstate->cur_attname = NameStr(att->attname);
3841 i++;
3842 values[m] = CopyReadBinaryAttribute(cstate,
3843 i,
3844 &in_functions[m],
3845 typioparams[m],
3846 att->atttypmod,
3847 &nulls[m]);
3848 cstate->cur_attname = NULL;
3849 }
3850 }
3851
3852 /*
3853 * Now compute and insert any defaults available for the columns not
3854 * provided by the input data. Anything not processed here or above will
3855 * remain NULL.
3856 */
3857 for (i = 0; i < num_defaults; i++)
3858 {
3859 /*
3860 * The caller must supply econtext and have switched into the
3861 * per-tuple memory context in it.
3862 */
3863 Assert(econtext != NULL);
3864 Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
3865
3866 values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
3867 &nulls[defmap[i]]);
3868 }
3869
3870 return true;
3871}
3872
3873/*
3874 * Clean up storage and release resources for COPY FROM.
3875 */
3876void
3877EndCopyFrom(CopyState cstate)
3878{
3879 /* No COPY FROM related resources except memory. */
3880
3881 EndCopy(cstate);
3882}
3883
3884/*
3885 * Read the next input line and stash it in line_buf, with conversion to
3886 * server encoding.
3887 *
3888 * Result is true if read was terminated by EOF, false if terminated
3889 * by newline. The terminating newline or EOF marker is not included
3890 * in the final value of line_buf.
3891 */
3892static bool
3893CopyReadLine(CopyState cstate)
3894{
3895 bool result;
3896
3897 resetStringInfo(&cstate->line_buf);
3898 cstate->line_buf_valid = true;
3899
3900 /* Mark that encoding conversion hasn't occurred yet */
3901 cstate->line_buf_converted = false;
3902
3903 /* Parse data and transfer into line_buf */
3904 result = CopyReadLineText(cstate);
3905
3906 if (result)
3907 {
3908 /*
3909 * Reached EOF. In protocol version 3, we should ignore anything
3910 * after \. up to the protocol end of copy data. (XXX maybe better
3911 * not to treat \. as special?)
3912 */
3913 if (cstate->copy_dest == COPY_NEW_FE)
3914 {
3915 do
3916 {
3917 cstate->raw_buf_index = cstate->raw_buf_len;
3918 } while (CopyLoadRawBuf(cstate));
3919 }
3920 }
3921 else
3922 {
3923 /*
3924 * If we didn't hit EOF, then we must have transferred the EOL marker
3925 * to line_buf along with the data. Get rid of it.
3926 */
3927 switch (cstate->eol_type)
3928 {
3929 case EOL_NL:
3930 Assert(cstate->line_buf.len >= 1);
3931 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3932 cstate->line_buf.len--;
3933 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3934 break;
3935 case EOL_CR:
3936 Assert(cstate->line_buf.len >= 1);
3937 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
3938 cstate->line_buf.len--;
3939 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3940 break;
3941 case EOL_CRNL:
3942 Assert(cstate->line_buf.len >= 2);
3943 Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
3944 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3945 cstate->line_buf.len -= 2;
3946 cstate->line_buf.data[cstate->line_buf.len] = '\0';
3947 break;
3948 case EOL_UNKNOWN:
3949 /* shouldn't get here */
3950 Assert(false);
3951 break;
3952 }
3953 }
3954
3955 /* Done reading the line. Convert it to server encoding. */
3956 if (cstate->need_transcoding)
3957 {
3958 char *cvt;
3959
3960 cvt = pg_any_to_server(cstate->line_buf.data,
3961 cstate->line_buf.len,
3962 cstate->file_encoding);
3963 if (cvt != cstate->line_buf.data)
3964 {
3965 /* transfer converted data back to line_buf */
3966 resetStringInfo(&cstate->line_buf);
3967 appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
3968 pfree(cvt);
3969 }
3970 }
3971
3972 /* Now it's safe to use the buffer in error messages */
3973 cstate->line_buf_converted = true;
3974
3975 return result;
3976}
3977
3978/*
3979 * CopyReadLineText - inner loop of CopyReadLine for text mode
3980 */
3981static bool
3982CopyReadLineText(CopyState cstate)
3983{
3984 char *copy_raw_buf;
3985 int raw_buf_ptr;
3986 int copy_buf_len;
3987 bool need_data = false;
3988 bool hit_eof = false;
3989 bool result = false;
3990 char mblen_str[2];
3991
3992 /* CSV variables */
3993 bool first_char_in_line = true;
3994 bool in_quote = false,
3995 last_was_esc = false;
3996 char quotec = '\0';
3997 char escapec = '\0';
3998
3999 if (cstate->csv_mode)
4000 {
4001 quotec = cstate->quote[0];
4002 escapec = cstate->escape[0];
4003 /* ignore special escape processing if it's the same as quotec */
4004 if (quotec == escapec)
4005 escapec = '\0';
4006 }
4007
4008 mblen_str[1] = '\0';
4009
4010 /*
4011 * The objective of this loop is to transfer the entire next input line
4012 * into line_buf. Hence, we only care for detecting newlines (\r and/or
4013 * \n) and the end-of-copy marker (\.).
4014 *
4015 * In CSV mode, \r and \n inside a quoted field are just part of the data
4016 * value and are put in line_buf. We keep just enough state to know if we
4017 * are currently in a quoted field or not.
4018 *
4019 * These four characters, and the CSV escape and quote characters, are
4020 * assumed the same in frontend and backend encodings.
4021 *
4022 * For speed, we try to move data from raw_buf to line_buf in chunks
4023 * rather than one character at a time. raw_buf_ptr points to the next
4024 * character to examine; any characters from raw_buf_index to raw_buf_ptr
4025 * have been determined to be part of the line, but not yet transferred to
4026 * line_buf.
4027 *
4028 * For a little extra speed within the loop, we copy raw_buf and
4029 * raw_buf_len into local variables.
4030 */
4031 copy_raw_buf = cstate->raw_buf;
4032 raw_buf_ptr = cstate->raw_buf_index;
4033 copy_buf_len = cstate->raw_buf_len;
4034
4035 for (;;)
4036 {
4037 int prev_raw_ptr;
4038 char c;
4039
4040 /*
4041 * Load more data if needed. Ideally we would just force four bytes
4042 * of read-ahead and avoid the many calls to
4043 * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
4044 * does not allow us to read too far ahead or we might read into the
4045 * next data, so we read-ahead only as far we know we can. One
4046 * optimization would be to read-ahead four byte here if
4047 * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
4048 * considering the size of the buffer.
4049 */
4050 if (raw_buf_ptr >= copy_buf_len || need_data)
4051 {
4052 REFILL_LINEBUF;
4053
4054 /*
4055 * Try to read some more data. This will certainly reset
4056 * raw_buf_index to zero, and raw_buf_ptr must go with it.
4057 */
4058 if (!CopyLoadRawBuf(cstate))
4059 hit_eof = true;
4060 raw_buf_ptr = 0;
4061 copy_buf_len = cstate->raw_buf_len;
4062
4063 /*
4064 * If we are completely out of data, break out of the loop,
4065 * reporting EOF.
4066 */
4067 if (copy_buf_len <= 0)
4068 {
4069 result = true;
4070 break;
4071 }
4072 need_data = false;
4073 }
4074
4075 /* OK to fetch a character */
4076 prev_raw_ptr = raw_buf_ptr;
4077 c = copy_raw_buf[raw_buf_ptr++];
4078
4079 if (cstate->csv_mode)
4080 {
4081 /*
4082 * If character is '\\' or '\r', we may need to look ahead below.
4083 * Force fetch of the next character if we don't already have it.
4084 * We need to do this before changing CSV state, in case one of
4085 * these characters is also the quote or escape character.
4086 *
4087 * Note: old-protocol does not like forced prefetch, but it's OK
4088 * here since we cannot validly be at EOF.
4089 */
4090 if (c == '\\' || c == '\r')
4091 {
4092 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
4093 }
4094
4095 /*
4096 * Dealing with quotes and escapes here is mildly tricky. If the
4097 * quote char is also the escape char, there's no problem - we
4098 * just use the char as a toggle. If they are different, we need
4099 * to ensure that we only take account of an escape inside a
4100 * quoted field and immediately preceding a quote char, and not
4101 * the second in an escape-escape sequence.
4102 */
4103 if (in_quote && c == escapec)
4104 last_was_esc = !last_was_esc;
4105 if (c == quotec && !last_was_esc)
4106 in_quote = !in_quote;
4107 if (c != escapec)
4108 last_was_esc = false;
4109
4110 /*
4111 * Updating the line count for embedded CR and/or LF chars is
4112 * necessarily a little fragile - this test is probably about the
4113 * best we can do. (XXX it's arguable whether we should do this
4114 * at all --- is cur_lineno a physical or logical count?)
4115 */
4116 if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
4117 cstate->cur_lineno++;
4118 }
4119
4120 /* Process \r */
4121 if (c == '\r' && (!cstate->csv_mode || !in_quote))
4122 {
4123 /* Check for \r\n on first line, _and_ handle \r\n. */
4124 if (cstate->eol_type == EOL_UNKNOWN ||
4125 cstate->eol_type == EOL_CRNL)
4126 {
4127 /*
4128 * If need more data, go back to loop top to load it.
4129 *
4130 * Note that if we are at EOF, c will wind up as '\0' because
4131 * of the guaranteed pad of raw_buf.
4132 */
4133 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
4134
4135 /* get next char */
4136 c = copy_raw_buf[raw_buf_ptr];
4137
4138 if (c == '\n')
4139 {
4140 raw_buf_ptr++; /* eat newline */
4141 cstate->eol_type = EOL_CRNL; /* in case not set yet */
4142 }
4143 else
4144 {
4145 /* found \r, but no \n */
4146 if (cstate->eol_type == EOL_CRNL)
4147 ereport(ERROR,
4148 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4149 !cstate->csv_mode ?
4150 errmsg("literal carriage return found in data") :
4151 errmsg("unquoted carriage return found in data"),
4152 !cstate->csv_mode ?
4153 errhint("Use \"\\r\" to represent carriage return.") :
4154 errhint("Use quoted CSV field to represent carriage return.")));
4155
4156 /*
4157 * if we got here, it is the first line and we didn't find
4158 * \n, so don't consume the peeked character
4159 */
4160 cstate->eol_type = EOL_CR;
4161 }
4162 }
4163 else if (cstate->eol_type == EOL_NL)
4164 ereport(ERROR,
4165 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4166 !cstate->csv_mode ?
4167 errmsg("literal carriage return found in data") :
4168 errmsg("unquoted carriage return found in data"),
4169 !cstate->csv_mode ?
4170 errhint("Use \"\\r\" to represent carriage return.") :
4171 errhint("Use quoted CSV field to represent carriage return.")));
4172 /* If reach here, we have found the line terminator */
4173 break;
4174 }
4175
4176 /* Process \n */
4177 if (c == '\n' && (!cstate->csv_mode || !in_quote))
4178 {
4179 if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
4180 ereport(ERROR,
4181 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4182 !cstate->csv_mode ?
4183 errmsg("literal newline found in data") :
4184 errmsg("unquoted newline found in data"),
4185 !cstate->csv_mode ?
4186 errhint("Use \"\\n\" to represent newline.") :
4187 errhint("Use quoted CSV field to represent newline.")));
4188 cstate->eol_type = EOL_NL; /* in case not set yet */
4189 /* If reach here, we have found the line terminator */
4190 break;
4191 }
4192
4193 /*
4194 * In CSV mode, we only recognize \. alone on a line. This is because
4195 * \. is a valid CSV data value.
4196 */
4197 if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
4198 {
4199 char c2;
4200
4201 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
4202 IF_NEED_REFILL_AND_EOF_BREAK(0);
4203
4204 /* -----
4205 * get next character
4206 * Note: we do not change c so if it isn't \., we can fall
4207 * through and continue processing for file encoding.
4208 * -----
4209 */
4210 c2 = copy_raw_buf[raw_buf_ptr];
4211
4212 if (c2 == '.')
4213 {
4214 raw_buf_ptr++; /* consume the '.' */
4215
4216 /*
4217 * Note: if we loop back for more data here, it does not
4218 * matter that the CSV state change checks are re-executed; we
4219 * will come back here with no important state changed.
4220 */
4221 if (cstate->eol_type == EOL_CRNL)
4222 {
4223 /* Get the next character */
4224 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
4225 /* if hit_eof, c2 will become '\0' */
4226 c2 = copy_raw_buf[raw_buf_ptr++];
4227
4228 if (c2 == '\n')
4229 {
4230 if (!cstate->csv_mode)
4231 ereport(ERROR,
4232 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4233 errmsg("end-of-copy marker does not match previous newline style")));
4234 else
4235 NO_END_OF_COPY_GOTO;
4236 }
4237 else if (c2 != '\r')
4238 {
4239 if (!cstate->csv_mode)
4240 ereport(ERROR,
4241 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4242 errmsg("end-of-copy marker corrupt")));
4243 else
4244 NO_END_OF_COPY_GOTO;
4245 }
4246 }
4247
4248 /* Get the next character */
4249 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
4250 /* if hit_eof, c2 will become '\0' */
4251 c2 = copy_raw_buf[raw_buf_ptr++];
4252
4253 if (c2 != '\r' && c2 != '\n')
4254 {
4255 if (!cstate->csv_mode)
4256 ereport(ERROR,
4257 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4258 errmsg("end-of-copy marker corrupt")));
4259 else
4260 NO_END_OF_COPY_GOTO;
4261 }
4262
4263 if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
4264 (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
4265 (cstate->eol_type == EOL_CR && c2 != '\r'))
4266 {
4267 ereport(ERROR,
4268 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4269 errmsg("end-of-copy marker does not match previous newline style")));
4270 }
4271
4272 /*
4273 * Transfer only the data before the \. into line_buf, then
4274 * discard the data and the \. sequence.
4275 */
4276 if (prev_raw_ptr > cstate->raw_buf_index)
4277 appendBinaryStringInfo(&cstate->line_buf,
4278 cstate->raw_buf + cstate->raw_buf_index,
4279 prev_raw_ptr - cstate->raw_buf_index);
4280 cstate->raw_buf_index = raw_buf_ptr;
4281 result = true; /* report EOF */
4282 break;
4283 }
4284 else if (!cstate->csv_mode)
4285
4286 /*
4287 * If we are here, it means we found a backslash followed by
4288 * something other than a period. In non-CSV mode, anything
4289 * after a backslash is special, so we skip over that second
4290 * character too. If we didn't do that \\. would be
4291 * considered an eof-of copy, while in non-CSV mode it is a
4292 * literal backslash followed by a period. In CSV mode,
4293 * backslashes are not special, so we want to process the
4294 * character after the backslash just like a normal character,
4295 * so we don't increment in those cases.
4296 */
4297 raw_buf_ptr++;
4298 }
4299
4300 /*
4301 * This label is for CSV cases where \. appears at the start of a
4302 * line, but there is more text after it, meaning it was a data value.
4303 * We are more strict for \. in CSV mode because \. could be a data
4304 * value, while in non-CSV mode, \. cannot be a data value.
4305 */
4306not_end_of_copy:
4307
4308 /*
4309 * Process all bytes of a multi-byte character as a group.
4310 *
4311 * We only support multi-byte sequences where the first byte has the
4312 * high-bit set, so as an optimization we can avoid this block
4313 * entirely if it is not set.
4314 */
4315 if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
4316 {
4317 int mblen;
4318
4319 /*
4320 * It is enough to look at the first byte in all our encodings, to
4321 * get the length. (GB18030 is a bit special, but still works for
4322 * our purposes; see comment in pg_gb18030_mblen())
4323 */
4324 mblen_str[0] = c;
4325 mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
4326
4327 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
4328 IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
4329 raw_buf_ptr += mblen - 1;
4330 }
4331 first_char_in_line = false;
4332 } /* end of outer loop */
4333
4334 /*
4335 * Transfer any still-uncopied data to line_buf.
4336 */
4337 REFILL_LINEBUF;
4338
4339 return result;
4340}
4341
4342/*
4343 * Return decimal value for a hexadecimal digit
4344 */
4345static int
4346GetDecimalFromHex(char hex)
4347{
4348 if (isdigit((unsigned char) hex))
4349 return hex - '0';
4350 else
4351 return tolower((unsigned char) hex) - 'a' + 10;
4352}
4353
4354/*
4355 * Parse the current line into separate attributes (fields),
4356 * performing de-escaping as needed.
4357 *
4358 * The input is in line_buf. We use attribute_buf to hold the result
4359 * strings. cstate->raw_fields[k] is set to point to the k'th attribute
4360 * string, or NULL when the input matches the null marker string.
4361 * This array is expanded as necessary.
4362 *
4363 * (Note that the caller cannot check for nulls since the returned
4364 * string would be the post-de-escaping equivalent, which may look
4365 * the same as some valid data string.)
4366 *
4367 * delim is the column delimiter string (must be just one byte for now).
4368 * null_print is the null marker string. Note that this is compared to
4369 * the pre-de-escaped input string.
4370 *
4371 * The return value is the number of fields actually read.
4372 */
4373static int
4374CopyReadAttributesText(CopyState cstate)
4375{
4376 char delimc = cstate->delim[0];
4377 int fieldno;
4378 char *output_ptr;
4379 char *cur_ptr;
4380 char *line_end_ptr;
4381
4382 /*
4383 * We need a special case for zero-column tables: check that the input
4384 * line is empty, and return.
4385 */
4386 if (cstate->max_fields <= 0)
4387 {
4388 if (cstate->line_buf.len != 0)
4389 ereport(ERROR,
4390 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4391 errmsg("extra data after last expected column")));
4392 return 0;
4393 }
4394
4395 resetStringInfo(&cstate->attribute_buf);
4396
4397 /*
4398 * The de-escaped attributes will certainly not be longer than the input
4399 * data line, so we can just force attribute_buf to be large enough and
4400 * then transfer data without any checks for enough space. We need to do
4401 * it this way because enlarging attribute_buf mid-stream would invalidate
4402 * pointers already stored into cstate->raw_fields[].
4403 */
4404 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4405 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4406 output_ptr = cstate->attribute_buf.data;
4407
4408 /* set pointer variables for loop */
4409 cur_ptr = cstate->line_buf.data;
4410 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4411
4412 /* Outer loop iterates over fields */
4413 fieldno = 0;
4414 for (;;)
4415 {
4416 bool found_delim = false;
4417 char *start_ptr;
4418 char *end_ptr;
4419 int input_len;
4420 bool saw_non_ascii = false;
4421
4422 /* Make sure there is enough space for the next value */
4423 if (fieldno >= cstate->max_fields)
4424 {
4425 cstate->max_fields *= 2;
4426 cstate->raw_fields =
4427 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4428 }
4429
4430 /* Remember start of field on both input and output sides */
4431 start_ptr = cur_ptr;
4432 cstate->raw_fields[fieldno] = output_ptr;
4433
4434 /*
4435 * Scan data for field.
4436 *
4437 * Note that in this loop, we are scanning to locate the end of field
4438 * and also speculatively performing de-escaping. Once we find the
4439 * end-of-field, we can match the raw field contents against the null
4440 * marker string. Only after that comparison fails do we know that
4441 * de-escaping is actually the right thing to do; therefore we *must
4442 * not* throw any syntax errors before we've done the null-marker
4443 * check.
4444 */
4445 for (;;)
4446 {
4447 char c;
4448
4449 end_ptr = cur_ptr;
4450 if (cur_ptr >= line_end_ptr)
4451 break;
4452 c = *cur_ptr++;
4453 if (c == delimc)
4454 {
4455 found_delim = true;
4456 break;
4457 }
4458 if (c == '\\')
4459 {
4460 if (cur_ptr >= line_end_ptr)
4461 break;
4462 c = *cur_ptr++;
4463 switch (c)
4464 {
4465 case '0':
4466 case '1':
4467 case '2':
4468 case '3':
4469 case '4':
4470 case '5':
4471 case '6':
4472 case '7':
4473 {
4474 /* handle \013 */
4475 int val;
4476
4477 val = OCTVALUE(c);
4478 if (cur_ptr < line_end_ptr)
4479 {
4480 c = *cur_ptr;
4481 if (ISOCTAL(c))
4482 {
4483 cur_ptr++;
4484 val = (val << 3) + OCTVALUE(c);
4485 if (cur_ptr < line_end_ptr)
4486 {
4487 c = *cur_ptr;
4488 if (ISOCTAL(c))
4489 {
4490 cur_ptr++;
4491 val = (val << 3) + OCTVALUE(c);
4492 }
4493 }
4494 }
4495 }
4496 c = val & 0377;
4497 if (c == '\0' || IS_HIGHBIT_SET(c))
4498 saw_non_ascii = true;
4499 }
4500 break;
4501 case 'x':
4502 /* Handle \x3F */
4503 if (cur_ptr < line_end_ptr)
4504 {
4505 char hexchar = *cur_ptr;
4506
4507 if (isxdigit((unsigned char) hexchar))
4508 {
4509 int val = GetDecimalFromHex(hexchar);
4510
4511 cur_ptr++;
4512 if (cur_ptr < line_end_ptr)
4513 {
4514 hexchar = *cur_ptr;
4515 if (isxdigit((unsigned char) hexchar))
4516 {
4517 cur_ptr++;
4518 val = (val << 4) + GetDecimalFromHex(hexchar);
4519 }
4520 }
4521 c = val & 0xff;
4522 if (c == '\0' || IS_HIGHBIT_SET(c))
4523 saw_non_ascii = true;
4524 }
4525 }
4526 break;
4527 case 'b':
4528 c = '\b';
4529 break;
4530 case 'f':
4531 c = '\f';
4532 break;
4533 case 'n':
4534 c = '\n';
4535 break;
4536 case 'r':
4537 c = '\r';
4538 break;
4539 case 't':
4540 c = '\t';
4541 break;
4542 case 'v':
4543 c = '\v';
4544 break;
4545
4546 /*
4547 * in all other cases, take the char after '\'
4548 * literally
4549 */
4550 }
4551 }
4552
4553 /* Add c to output string */
4554 *output_ptr++ = c;
4555 }
4556
4557 /* Check whether raw input matched null marker */
4558 input_len = end_ptr - start_ptr;
4559 if (input_len == cstate->null_print_len &&
4560 strncmp(start_ptr, cstate->null_print, input_len) == 0)
4561 cstate->raw_fields[fieldno] = NULL;
4562 else
4563 {
4564 /*
4565 * At this point we know the field is supposed to contain data.
4566 *
4567 * If we de-escaped any non-7-bit-ASCII chars, make sure the
4568 * resulting string is valid data for the db encoding.
4569 */
4570 if (saw_non_ascii)
4571 {
4572 char *fld = cstate->raw_fields[fieldno];
4573
4574 pg_verifymbstr(fld, output_ptr - fld, false);
4575 }
4576 }
4577
4578 /* Terminate attribute value in output area */
4579 *output_ptr++ = '\0';
4580
4581 fieldno++;
4582 /* Done if we hit EOL instead of a delim */
4583 if (!found_delim)
4584 break;
4585 }
4586
4587 /* Clean up state of attribute_buf */
4588 output_ptr--;
4589 Assert(*output_ptr == '\0');
4590 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4591
4592 return fieldno;
4593}
4594
4595/*
4596 * Parse the current line into separate attributes (fields),
4597 * performing de-escaping as needed. This has exactly the same API as
4598 * CopyReadAttributesText, except we parse the fields according to
4599 * "standard" (i.e. common) CSV usage.
4600 */
4601static int
4602CopyReadAttributesCSV(CopyState cstate)
4603{
4604 char delimc = cstate->delim[0];
4605 char quotec = cstate->quote[0];
4606 char escapec = cstate->escape[0];
4607 int fieldno;
4608 char *output_ptr;
4609 char *cur_ptr;
4610 char *line_end_ptr;
4611
4612 /*
4613 * We need a special case for zero-column tables: check that the input
4614 * line is empty, and return.
4615 */
4616 if (cstate->max_fields <= 0)
4617 {
4618 if (cstate->line_buf.len != 0)
4619 ereport(ERROR,
4620 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4621 errmsg("extra data after last expected column")));
4622 return 0;
4623 }
4624
4625 resetStringInfo(&cstate->attribute_buf);
4626
4627 /*
4628 * The de-escaped attributes will certainly not be longer than the input
4629 * data line, so we can just force attribute_buf to be large enough and
4630 * then transfer data without any checks for enough space. We need to do
4631 * it this way because enlarging attribute_buf mid-stream would invalidate
4632 * pointers already stored into cstate->raw_fields[].
4633 */
4634 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4635 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4636 output_ptr = cstate->attribute_buf.data;
4637
4638 /* set pointer variables for loop */
4639 cur_ptr = cstate->line_buf.data;
4640 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4641
4642 /* Outer loop iterates over fields */
4643 fieldno = 0;
4644 for (;;)
4645 {
4646 bool found_delim = false;
4647 bool saw_quote = false;
4648 char *start_ptr;
4649 char *end_ptr;
4650 int input_len;
4651
4652 /* Make sure there is enough space for the next value */
4653 if (fieldno >= cstate->max_fields)
4654 {
4655 cstate->max_fields *= 2;
4656 cstate->raw_fields =
4657 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4658 }
4659
4660 /* Remember start of field on both input and output sides */
4661 start_ptr = cur_ptr;
4662 cstate->raw_fields[fieldno] = output_ptr;
4663
4664 /*
4665 * Scan data for field,
4666 *
4667 * The loop starts in "not quote" mode and then toggles between that
4668 * and "in quote" mode. The loop exits normally if it is in "not
4669 * quote" mode and a delimiter or line end is seen.
4670 */
4671 for (;;)
4672 {
4673 char c;
4674
4675 /* Not in quote */
4676 for (;;)
4677 {
4678 end_ptr = cur_ptr;
4679 if (cur_ptr >= line_end_ptr)
4680 goto endfield;
4681 c = *cur_ptr++;
4682 /* unquoted field delimiter */
4683 if (c == delimc)
4684 {
4685 found_delim = true;
4686 goto endfield;
4687 }
4688 /* start of quoted field (or part of field) */
4689 if (c == quotec)
4690 {
4691 saw_quote = true;
4692 break;
4693 }
4694 /* Add c to output string */
4695 *output_ptr++ = c;
4696 }
4697
4698 /* In quote */
4699 for (;;)
4700 {
4701 end_ptr = cur_ptr;
4702 if (cur_ptr >= line_end_ptr)
4703 ereport(ERROR,
4704 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4705 errmsg("unterminated CSV quoted field")));
4706
4707 c = *cur_ptr++;
4708
4709 /* escape within a quoted field */
4710 if (c == escapec)
4711 {
4712 /*
4713 * peek at the next char if available, and escape it if it
4714 * is an escape char or a quote char
4715 */
4716 if (cur_ptr < line_end_ptr)
4717 {
4718 char nextc = *cur_ptr;
4719
4720 if (nextc == escapec || nextc == quotec)
4721 {
4722 *output_ptr++ = nextc;
4723 cur_ptr++;
4724 continue;
4725 }
4726 }
4727 }
4728
4729 /*
4730 * end of quoted field. Must do this test after testing for
4731 * escape in case quote char and escape char are the same
4732 * (which is the common case).
4733 */
4734 if (c == quotec)
4735 break;
4736
4737 /* Add c to output string */
4738 *output_ptr++ = c;
4739 }
4740 }
4741endfield:
4742
4743 /* Terminate attribute value in output area */
4744 *output_ptr++ = '\0';
4745
4746 /* Check whether raw input matched null marker */
4747 input_len = end_ptr - start_ptr;
4748 if (!saw_quote && input_len == cstate->null_print_len &&
4749 strncmp(start_ptr, cstate->null_print, input_len) == 0)
4750 cstate->raw_fields[fieldno] = NULL;
4751
4752 fieldno++;
4753 /* Done if we hit EOL instead of a delim */
4754 if (!found_delim)
4755 break;
4756 }
4757
4758 /* Clean up state of attribute_buf */
4759 output_ptr--;
4760 Assert(*output_ptr == '\0');
4761 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4762
4763 return fieldno;
4764}
4765
4766
4767/*
4768 * Read a binary attribute
4769 */
4770static Datum
4771CopyReadBinaryAttribute(CopyState cstate,
4772 int column_no, FmgrInfo *flinfo,
4773 Oid typioparam, int32 typmod,
4774 bool *isnull)
4775{
4776 int32 fld_size;
4777 Datum result;
4778
4779 if (!CopyGetInt32(cstate, &fld_size))
4780 ereport(ERROR,
4781 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4782 errmsg("unexpected EOF in COPY data")));
4783 if (fld_size == -1)
4784 {
4785 *isnull = true;
4786 return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
4787 }
4788 if (fld_size < 0)
4789 ereport(ERROR,
4790 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4791 errmsg("invalid field size")));
4792
4793 /* reset attribute_buf to empty, and load raw data in it */
4794 resetStringInfo(&cstate->attribute_buf);
4795
4796 enlargeStringInfo(&cstate->attribute_buf, fld_size);
4797 if (CopyGetData(cstate, cstate->attribute_buf.data,
4798 fld_size, fld_size) != fld_size)
4799 ereport(ERROR,
4800 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4801 errmsg("unexpected EOF in COPY data")));
4802
4803 cstate->attribute_buf.len = fld_size;
4804 cstate->attribute_buf.data[fld_size] = '\0';
4805
4806 /* Call the column type's binary input converter */
4807 result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
4808 typioparam, typmod);
4809
4810 /* Trouble if it didn't eat the whole buffer */
4811 if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
4812 ereport(ERROR,
4813 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
4814 errmsg("incorrect binary data format")));
4815
4816 *isnull = false;
4817 return result;
4818}
4819
4820/*
4821 * Send text representation of one attribute, with conversion and escaping
4822 */
4823#define DUMPSOFAR() \
4824 do { \
4825 if (ptr > start) \
4826 CopySendData(cstate, start, ptr - start); \
4827 } while (0)
4828
4829static void
4830CopyAttributeOutText(CopyState cstate, char *string)
4831{
4832 char *ptr;
4833 char *start;
4834 char c;
4835 char delimc = cstate->delim[0];
4836
4837 if (cstate->need_transcoding)
4838 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4839 else
4840 ptr = string;
4841
4842 /*
4843 * We have to grovel through the string searching for control characters
4844 * and instances of the delimiter character. In most cases, though, these
4845 * are infrequent. To avoid overhead from calling CopySendData once per
4846 * character, we dump out all characters between escaped characters in a
4847 * single call. The loop invariant is that the data from "start" to "ptr"
4848 * can be sent literally, but hasn't yet been.
4849 *
4850 * We can skip pg_encoding_mblen() overhead when encoding is safe, because
4851 * in valid backend encodings, extra bytes of a multibyte character never
4852 * look like ASCII. This loop is sufficiently performance-critical that
4853 * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
4854 * of the normal safe-encoding path.
4855 */
4856 if (cstate->encoding_embeds_ascii)
4857 {
4858 start = ptr;
4859 while ((c = *ptr) != '\0')
4860 {
4861 if ((unsigned char) c < (unsigned char) 0x20)
4862 {
4863 /*
4864 * \r and \n must be escaped, the others are traditional. We
4865 * prefer to dump these using the C-like notation, rather than
4866 * a backslash and the literal character, because it makes the
4867 * dump file a bit more proof against Microsoftish data
4868 * mangling.
4869 */
4870 switch (c)
4871 {
4872 case '\b':
4873 c = 'b';
4874 break;
4875 case '\f':
4876 c = 'f';
4877 break;
4878 case '\n':
4879 c = 'n';
4880 break;
4881 case '\r':
4882 c = 'r';
4883 break;
4884 case '\t':
4885 c = 't';
4886 break;
4887 case '\v':
4888 c = 'v';
4889 break;
4890 default:
4891 /* If it's the delimiter, must backslash it */
4892 if (c == delimc)
4893 break;
4894 /* All ASCII control chars are length 1 */
4895 ptr++;
4896 continue; /* fall to end of loop */
4897 }
4898 /* if we get here, we need to convert the control char */
4899 DUMPSOFAR();
4900 CopySendChar(cstate, '\\');
4901 CopySendChar(cstate, c);
4902 start = ++ptr; /* do not include char in next run */
4903 }
4904 else if (c == '\\' || c == delimc)
4905 {
4906 DUMPSOFAR();
4907 CopySendChar(cstate, '\\');
4908 start = ptr++; /* we include char in next run */
4909 }
4910 else if (IS_HIGHBIT_SET(c))
4911 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4912 else
4913 ptr++;
4914 }
4915 }
4916 else
4917 {
4918 start = ptr;
4919 while ((c = *ptr) != '\0')
4920 {
4921 if ((unsigned char) c < (unsigned char) 0x20)
4922 {
4923 /*
4924 * \r and \n must be escaped, the others are traditional. We
4925 * prefer to dump these using the C-like notation, rather than
4926 * a backslash and the literal character, because it makes the
4927 * dump file a bit more proof against Microsoftish data
4928 * mangling.
4929 */
4930 switch (c)
4931 {
4932 case '\b':
4933 c = 'b';
4934 break;
4935 case '\f':
4936 c = 'f';
4937 break;
4938 case '\n':
4939 c = 'n';
4940 break;
4941 case '\r':
4942 c = 'r';
4943 break;
4944 case '\t':
4945 c = 't';
4946 break;
4947 case '\v':
4948 c = 'v';
4949 break;
4950 default:
4951 /* If it's the delimiter, must backslash it */
4952 if (c == delimc)
4953 break;
4954 /* All ASCII control chars are length 1 */
4955 ptr++;
4956 continue; /* fall to end of loop */
4957 }
4958 /* if we get here, we need to convert the control char */
4959 DUMPSOFAR();
4960 CopySendChar(cstate, '\\');
4961 CopySendChar(cstate, c);
4962 start = ++ptr; /* do not include char in next run */
4963 }
4964 else if (c == '\\' || c == delimc)
4965 {
4966 DUMPSOFAR();
4967 CopySendChar(cstate, '\\');
4968 start = ptr++; /* we include char in next run */
4969 }
4970 else
4971 ptr++;
4972 }
4973 }
4974
4975 DUMPSOFAR();
4976}
4977
4978/*
4979 * Send text representation of one attribute, with conversion and
4980 * CSV-style escaping
4981 */
4982static void
4983CopyAttributeOutCSV(CopyState cstate, char *string,
4984 bool use_quote, bool single_attr)
4985{
4986 char *ptr;
4987 char *start;
4988 char c;
4989 char delimc = cstate->delim[0];
4990 char quotec = cstate->quote[0];
4991 char escapec = cstate->escape[0];
4992
4993 /* force quoting if it matches null_print (before conversion!) */
4994 if (!use_quote && strcmp(string, cstate->null_print) == 0)
4995 use_quote = true;
4996
4997 if (cstate->need_transcoding)
4998 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4999 else
5000 ptr = string;
5001
5002 /*
5003 * Make a preliminary pass to discover if it needs quoting
5004 */
5005 if (!use_quote)
5006 {
5007 /*
5008 * Because '\.' can be a data value, quote it if it appears alone on a
5009 * line so it is not interpreted as the end-of-data marker.
5010 */
5011 if (single_attr && strcmp(ptr, "\\.") == 0)
5012 use_quote = true;
5013 else
5014 {
5015 char *tptr = ptr;
5016
5017 while ((c = *tptr) != '\0')
5018 {
5019 if (c == delimc || c == quotec || c == '\n' || c == '\r')
5020 {
5021 use_quote = true;
5022 break;
5023 }
5024 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
5025 tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
5026 else
5027 tptr++;
5028 }
5029 }
5030 }
5031
5032 if (use_quote)
5033 {
5034 CopySendChar(cstate, quotec);
5035
5036 /*
5037 * We adopt the same optimization strategy as in CopyAttributeOutText
5038 */
5039 start = ptr;
5040 while ((c = *ptr) != '\0')
5041 {
5042 if (c == quotec || c == escapec)
5043 {
5044 DUMPSOFAR();
5045 CopySendChar(cstate, escapec);
5046 start = ptr; /* we include char in next run */
5047 }
5048 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
5049 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
5050 else
5051 ptr++;
5052 }
5053 DUMPSOFAR();
5054
5055 CopySendChar(cstate, quotec);
5056 }
5057 else
5058 {
5059 /* If it doesn't need quoting, we can just dump it as-is */
5060 CopySendString(cstate, ptr);
5061 }
5062}
5063
5064/*
5065 * CopyGetAttnums - build an integer list of attnums to be copied
5066 *
5067 * The input attnamelist is either the user-specified column list,
5068 * or NIL if there was none (in which case we want all the non-dropped
5069 * columns).
5070 *
5071 * We don't include generated columns in the generated full list and we don't
5072 * allow them to be specified explicitly. They don't make sense for COPY
5073 * FROM, but we could possibly allow them for COPY TO. But this way it's at
5074 * least ensured that whatever we copy out can be copied back in.
5075 *
5076 * rel can be NULL ... it's only used for error reports.
5077 */
5078static List *
5079CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
5080{
5081 List *attnums = NIL;
5082
5083 if (attnamelist == NIL)
5084 {
5085 /* Generate default column list */
5086 int attr_count = tupDesc->natts;
5087 int i;
5088
5089 for (i = 0; i < attr_count; i++)
5090 {
5091 if (TupleDescAttr(tupDesc, i)->attisdropped)
5092 continue;
5093 if (TupleDescAttr(tupDesc, i)->attgenerated)
5094 continue;
5095 attnums = lappend_int(attnums, i + 1);
5096 }
5097 }
5098 else
5099 {
5100 /* Validate the user-supplied list and extract attnums */
5101 ListCell *l;
5102
5103 foreach(l, attnamelist)
5104 {
5105 char *name = strVal(lfirst(l));
5106 int attnum;
5107 int i;
5108
5109 /* Lookup column name */
5110 attnum = InvalidAttrNumber;
5111 for (i = 0; i < tupDesc->natts; i++)
5112 {
5113 Form_pg_attribute att = TupleDescAttr(tupDesc, i);
5114
5115 if (att->attisdropped)
5116 continue;
5117 if (namestrcmp(&(att->attname), name) == 0)
5118 {
5119 if (att->attgenerated)
5120 ereport(ERROR,
5121 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
5122 errmsg("column \"%s\" is a generated column",
5123 name),
5124 errdetail("Generated columns cannot be used in COPY.")));
5125 attnum = att->attnum;
5126 break;
5127 }
5128 }
5129 if (attnum == InvalidAttrNumber)
5130 {
5131 if (rel != NULL)
5132 ereport(ERROR,
5133 (errcode(ERRCODE_UNDEFINED_COLUMN),
5134 errmsg("column \"%s\" of relation \"%s\" does not exist",
5135 name, RelationGetRelationName(rel))));
5136 else
5137 ereport(ERROR,
5138 (errcode(ERRCODE_UNDEFINED_COLUMN),
5139 errmsg("column \"%s\" does not exist",
5140 name)));
5141 }
5142 /* Check for duplicates */
5143 if (list_member_int(attnums, attnum))
5144 ereport(ERROR,
5145 (errcode(ERRCODE_DUPLICATE_COLUMN),
5146 errmsg("column \"%s\" specified more than once",
5147 name)));
5148 attnums = lappend_int(attnums, attnum);
5149 }
5150 }
5151
5152 return attnums;
5153}
5154
5155
5156/*
5157 * copy_dest_startup --- executor startup
5158 */
5159static void
5160copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
5161{
5162 /* no-op */
5163}
5164
5165/*
5166 * copy_dest_receive --- receive one tuple
5167 */
5168static bool
5169copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
5170{
5171 DR_copy *myState = (DR_copy *) self;
5172 CopyState cstate = myState->cstate;
5173
5174 /* Send the data */
5175 CopyOneRowTo(cstate, slot);
5176 myState->processed++;
5177
5178 return true;
5179}
5180
5181/*
5182 * copy_dest_shutdown --- executor end
5183 */
5184static void
5185copy_dest_shutdown(DestReceiver *self)
5186{
5187 /* no-op */
5188}
5189
5190/*
5191 * copy_dest_destroy --- release DestReceiver object
5192 */
5193static void
5194copy_dest_destroy(DestReceiver *self)
5195{
5196 pfree(self);
5197}
5198
5199/*
5200 * CreateCopyDestReceiver -- create a suitable DestReceiver object
5201 */
5202DestReceiver *
5203CreateCopyDestReceiver(void)
5204{
5205 DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
5206
5207 self->pub.receiveSlot = copy_dest_receive;
5208 self->pub.rStartup = copy_dest_startup;
5209 self->pub.rShutdown = copy_dest_shutdown;
5210 self->pub.rDestroy = copy_dest_destroy;
5211 self->pub.mydest = DestCopyOut;
5212
5213 self->cstate = NULL; /* will be set later */
5214 self->processed = 0;
5215
5216 return (DestReceiver *) self;
5217}
5218