33 * copyfrom.c
44 * COPY <table> FROM file/program/client
55 *
6+ * This file contains routines needed to efficiently load tuples into a
7+ * table. That includes looking up the correct partition, firing triggers,
8+ * calling the table AM function to insert the data, and updating indexes.
9+ * Reading data from the input file or client and parsing it into Datums
10+ * is handled in copyfromparse.c.
11+ *
612 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
713 * Portions Copyright (c) 1994, Regents of the University of California
814 *
2329#include "access/tableam.h"
2430#include "access/xact.h"
2531#include "access/xlog.h"
32+ #include "catalog/namespace.h"
2633#include "commands/copy.h"
2734#include "commands/copyfrom_internal.h"
2835#include "commands/progress.h"
@@ -87,7 +94,7 @@ typedef struct CopyMultiInsertInfo
8794 List * multiInsertBuffers ; /* List of tracked CopyMultiInsertBuffers */
8895 int bufferedTuples ; /* number of tuples buffered over all buffers */
8996 int bufferedBytes ; /* number of bytes from all buffered tuples */
90- CopyFromState cstate ; /* Copy state for this CopyMultiInsertInfo */
97+ CopyFromState cstate ; /* Copy state for this CopyMultiInsertInfo */
9198 EState * estate ; /* Executor state used for COPY */
9299 CommandId mycid ; /* Command Id used for COPY */
93100 int ti_options ; /* table insert options */
@@ -107,7 +114,7 @@ static void ClosePipeFromProgram(CopyFromState cstate);
107114void
108115CopyFromErrorCallback (void * arg )
109116{
110- CopyFromState cstate = (CopyFromState ) arg ;
117+ CopyFromState cstate = (CopyFromState ) arg ;
111118 char curlineno_str [32 ];
112119
113120 snprintf (curlineno_str , sizeof (curlineno_str ), UINT64_FORMAT ,
@@ -149,15 +156,9 @@ CopyFromErrorCallback(void *arg)
149156 /*
150157 * Error is relevant to a particular line.
151158 *
152- * If line_buf still contains the correct line, and it's already
153- * transcoded, print it. If it's still in a foreign encoding, it's
154- * quite likely that the error is precisely a failure to do
155- * encoding conversion (ie, bad data). We dare not try to convert
156- * it, and at present there's no way to regurgitate it without
157- * conversion. So we have to punt and just report the line number.
159+ * If line_buf still contains the correct line, print it.
158160 */
159- if (cstate -> line_buf_valid &&
160- (cstate -> line_buf_converted || !cstate -> need_transcoding ))
161+ if (cstate -> line_buf_valid )
161162 {
162163 char * lineval ;
163164
@@ -300,7 +301,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
300301 MemoryContext oldcontext ;
301302 int i ;
302303 uint64 save_cur_lineno ;
303- CopyFromState cstate = miinfo -> cstate ;
304+ CopyFromState cstate = miinfo -> cstate ;
304305 EState * estate = miinfo -> estate ;
305306 CommandId mycid = miinfo -> mycid ;
306307 int ti_options = miinfo -> ti_options ;
@@ -1191,7 +1192,7 @@ BeginCopyFrom(ParseState *pstate,
11911192 List * attnamelist ,
11921193 List * options )
11931194{
1194- CopyFromState cstate ;
1195+ CopyFromState cstate ;
11951196 bool pipe = (filename == NULL );
11961197 TupleDesc tupDesc ;
11971198 AttrNumber num_phys_attrs ,
@@ -1229,7 +1230,7 @@ BeginCopyFrom(ParseState *pstate,
12291230 oldcontext = MemoryContextSwitchTo (cstate -> copycontext );
12301231
12311232 /* Extract options from the statement node tree */
1232- ProcessCopyOptions (pstate , & cstate -> opts , true /* is_from */ , options );
1233+ ProcessCopyOptions (pstate , & cstate -> opts , true /* is_from */ , options );
12331234
12341235 /* Process the target relation */
12351236 cstate -> rel = rel ;
@@ -1320,15 +1321,20 @@ BeginCopyFrom(ParseState *pstate,
13201321 cstate -> file_encoding = cstate -> opts .file_encoding ;
13211322
13221323 /*
1323- * Set up encoding conversion info. Even if the file and server encodings
1324- * are the same, we must apply pg_any_to_server() to validate data in
1325- * multibyte encodings.
1324+ * Look up encoding conversion function.
13261325 */
1327- cstate -> need_transcoding =
1328- (cstate -> file_encoding != GetDatabaseEncoding () ||
1329- pg_database_encoding_max_length () > 1 );
1330- /* See Multibyte encoding comment above */
1331- cstate -> encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY (cstate -> file_encoding );
1326+ if (cstate -> file_encoding == GetDatabaseEncoding () ||
1327+ cstate -> file_encoding == PG_SQL_ASCII ||
1328+ GetDatabaseEncoding () == PG_SQL_ASCII )
1329+ {
1330+ cstate -> need_transcoding = false;
1331+ }
1332+ else
1333+ {
1334+ cstate -> need_transcoding = true;
1335+ cstate -> conversion_proc = FindDefaultConversionProc (cstate -> file_encoding ,
1336+ GetDatabaseEncoding ());
1337+ }
13321338
13331339 cstate -> copy_src = COPY_FILE ; /* default */
13341340
@@ -1339,27 +1345,43 @@ BeginCopyFrom(ParseState *pstate,
13391345 oldcontext = MemoryContextSwitchTo (cstate -> copycontext );
13401346
13411347 /* Initialize state variables */
1342- cstate -> reached_eof = false;
13431348 cstate -> eol_type = EOL_UNKNOWN ;
13441349 cstate -> cur_relname = RelationGetRelationName (cstate -> rel );
13451350 cstate -> cur_lineno = 0 ;
13461351 cstate -> cur_attname = NULL ;
13471352 cstate -> cur_attval = NULL ;
13481353
13491354 /*
1350- * Set up variables to avoid per-attribute overhead. attribute_buf and
1351- * raw_buf are used in both text and binary modes, but we use line_buf
1352- * only in text mode.
1355+ * Allocate buffers for the input pipeline.
1356+ *
1357+ * attribute_buf and raw_buf are used in both text and binary modes, but
1358+ * input_buf and line_buf only in text mode.
13531359 */
1354- initStringInfo (& cstate -> attribute_buf );
1355- cstate -> raw_buf = (char * ) palloc (RAW_BUF_SIZE + 1 );
1360+ cstate -> raw_buf = palloc (RAW_BUF_SIZE + 1 );
13561361 cstate -> raw_buf_index = cstate -> raw_buf_len = 0 ;
1362+ cstate -> raw_reached_eof = false;
1363+
13571364 if (!cstate -> opts .binary )
13581365 {
1366+ /*
1367+ * If encoding conversion is needed, we need another buffer to hold
1368+ * the converted input data. Otherwise, we can just point input_buf
1369+ * to the same buffer as raw_buf.
1370+ */
1371+ if (cstate -> need_transcoding )
1372+ {
1373+ cstate -> input_buf = (char * ) palloc (INPUT_BUF_SIZE + 1 );
1374+ cstate -> input_buf_index = cstate -> input_buf_len = 0 ;
1375+ }
1376+ else
1377+ cstate -> input_buf = cstate -> raw_buf ;
1378+ cstate -> input_reached_eof = false;
1379+
13591380 initStringInfo (& cstate -> line_buf );
1360- cstate -> line_buf_converted = false;
13611381 }
13621382
1383+ initStringInfo (& cstate -> attribute_buf );
1384+
13631385 /* Assign range table, we'll need it in CopyFrom. */
13641386 if (pstate )
13651387 cstate -> range_table = pstate -> p_rtable ;
@@ -1584,7 +1606,7 @@ ClosePipeFromProgram(CopyFromState cstate)
15841606 * should not report that as an error. Otherwise, SIGPIPE indicates a
15851607 * problem.
15861608 */
1587- if (!cstate -> reached_eof &&
1609+ if (!cstate -> raw_reached_eof &&
15881610 wait_result_is_signal (pclose_rc , SIGPIPE ))
15891611 return ;
15901612
0 commit comments