4242#include <sys/stat.h>
4343#include <unistd.h>
4444
45+ #include "access/xact.h"
4546#include "libpq/be-fsstubs.h"
4647#include "libpq/libpq-fs.h"
4748#include "miscadmin.h"
5051#include "utils/acl.h"
5152#include "utils/builtins.h"
5253#include "utils/memutils.h"
54+ #include "utils/snapmgr.h"
5355
5456/* define this to enable debug logging */
5557/* #define FSDB 1 */
6769static LargeObjectDesc * * cookies = NULL ;
6870static int cookies_size = 0 ;
6971
72+ static bool lo_cleanup_needed = false;
7073static MemoryContext fscxt = NULL ;
7174
72- #define CreateFSContext () \
73- do { \
74- if (fscxt == NULL) \
75- fscxt = AllocSetContextCreate(TopMemoryContext, \
76- "Filesystem", \
77- ALLOCSET_DEFAULT_SIZES); \
78- } while (0)
79-
80-
81- static int newLOfd (LargeObjectDesc * lobjCookie );
82- static void deleteLOfd (int fd );
75+ static int newLOfd (void );
76+ static void closeLOfd (int fd );
8377static Oid lo_import_internal (text * filename , Oid lobjOid );
8478
8579
@@ -99,11 +93,26 @@ be_lo_open(PG_FUNCTION_ARGS)
9993 elog (DEBUG4 , "lo_open(%u,%d)" , lobjId , mode );
10094#endif
10195
102- CreateFSContext ();
96+ /*
97+ * Allocate a large object descriptor first. This will also create
98+ * 'fscxt' if this is the first LO opened in this transaction.
99+ */
100+ fd = newLOfd ();
103101
104102 lobjDesc = inv_open (lobjId , mode , fscxt );
103+ lobjDesc -> subid = GetCurrentSubTransactionId ();
104+
105+ /*
106+ * We must register the snapshot in TopTransaction's resowner so that it
107+ * stays alive until the LO is closed rather than until the current portal
108+ * shuts down.
109+ */
110+ if (lobjDesc -> snapshot )
111+ lobjDesc -> snapshot = RegisterSnapshotOnOwner (lobjDesc -> snapshot ,
112+ TopTransactionResourceOwner );
105113
106- fd = newLOfd (lobjDesc );
114+ Assert (cookies [fd ] == NULL );
115+ cookies [fd ] = lobjDesc ;
107116
108117 PG_RETURN_INT32 (fd );
109118}
@@ -122,9 +131,7 @@ be_lo_close(PG_FUNCTION_ARGS)
122131 elog (DEBUG4 , "lo_close(%d)" , fd );
123132#endif
124133
125- inv_close (cookies [fd ]);
126-
127- deleteLOfd (fd );
134+ closeLOfd (fd );
128135
129136 PG_RETURN_INT32 (0 );
130137}
@@ -238,12 +245,7 @@ be_lo_creat(PG_FUNCTION_ARGS)
238245{
239246 Oid lobjId ;
240247
241- /*
242- * We don't actually need to store into fscxt, but create it anyway to
243- * ensure that AtEOXact_LargeObject knows there is state to clean up
244- */
245- CreateFSContext ();
246-
248+ lo_cleanup_needed = true;
247249 lobjId = inv_create (InvalidOid );
248250
249251 PG_RETURN_OID (lobjId );
@@ -254,12 +256,7 @@ be_lo_create(PG_FUNCTION_ARGS)
254256{
255257 Oid lobjId = PG_GETARG_OID (0 );
256258
257- /*
258- * We don't actually need to store into fscxt, but create it anyway to
259- * ensure that AtEOXact_LargeObject knows there is state to clean up
260- */
261- CreateFSContext ();
262-
259+ lo_cleanup_needed = true;
263260 lobjId = inv_create (lobjId );
264261
265262 PG_RETURN_OID (lobjId );
@@ -330,16 +327,13 @@ be_lo_unlink(PG_FUNCTION_ARGS)
330327 for (i = 0 ; i < cookies_size ; i ++ )
331328 {
332329 if (cookies [i ] != NULL && cookies [i ]-> id == lobjId )
333- {
334- inv_close (cookies [i ]);
335- deleteLOfd (i );
336- }
330+ closeLOfd (i );
337331 }
338332 }
339333
340334 /*
341335 * inv_drop does not create a need for end-of-transaction cleanup and
342- * hence we don't need to have created fscxt .
336+ * hence we don't need to set lo_cleanup_needed .
343337 */
344338 PG_RETURN_INT32 (inv_drop (lobjId ));
345339}
@@ -419,8 +413,6 @@ lo_import_internal(text *filename, Oid lobjOid)
419413 LargeObjectDesc * lobj ;
420414 Oid oid ;
421415
422- CreateFSContext ();
423-
424416 /*
425417 * open the file to be read in
426418 */
@@ -435,12 +427,13 @@ lo_import_internal(text *filename, Oid lobjOid)
435427 /*
436428 * create an inversion object
437429 */
430+ lo_cleanup_needed = true;
438431 oid = inv_create (lobjOid );
439432
440433 /*
441434 * read in from the filesystem and write to the inversion object
442435 */
443- lobj = inv_open (oid , INV_WRITE , fscxt );
436+ lobj = inv_open (oid , INV_WRITE , CurrentMemoryContext );
444437
445438 while ((nbytes = read (fd , buf , BUFSIZE )) > 0 )
446439 {
@@ -482,12 +475,11 @@ be_lo_export(PG_FUNCTION_ARGS)
482475 LargeObjectDesc * lobj ;
483476 mode_t oumask ;
484477
485- CreateFSContext ();
486-
487478 /*
488479 * open the inversion object (no need to test for failure)
489480 */
490- lobj = inv_open (lobjId , INV_READ , fscxt );
481+ lo_cleanup_needed = true;
482+ lobj = inv_open (lobjId , INV_READ , CurrentMemoryContext );
491483
492484 /*
493485 * open the file to be written to
@@ -592,20 +584,22 @@ AtEOXact_LargeObject(bool isCommit)
592584{
593585 int i ;
594586
595- if (fscxt == NULL )
587+ if (! lo_cleanup_needed )
596588 return ; /* no LO operations in this xact */
597589
598590 /*
599591 * Close LO fds and clear cookies array so that LO fds are no longer good.
600- * On abort we skip the close step.
592+ * The memory context and resource owner holding them are going away at
593+ * the end-of-transaction anyway, but on commit, we need to close them to
594+ * avoid warnings about leaked resources at commit. On abort we can skip
595+ * this step.
601596 */
602- for ( i = 0 ; i < cookies_size ; i ++ )
597+ if ( isCommit )
603598 {
604- if ( cookies [ i ] != NULL )
599+ for ( i = 0 ; i < cookies_size ; i ++ )
605600 {
606- if (isCommit )
607- inv_close (cookies [i ]);
608- deleteLOfd (i );
601+ if (cookies [i ] != NULL )
602+ closeLOfd (i );
609603 }
610604 }
611605
@@ -614,11 +608,14 @@ AtEOXact_LargeObject(bool isCommit)
614608 cookies_size = 0 ;
615609
616610 /* Release the LO memory context to prevent permanent memory leaks. */
617- MemoryContextDelete (fscxt );
611+ if (fscxt )
612+ MemoryContextDelete (fscxt );
618613 fscxt = NULL ;
619614
620615 /* Give inv_api.c a chance to clean up, too */
621616 close_lo_relation (isCommit );
617+
618+ lo_cleanup_needed = false;
622619}
623620
624621/*
@@ -646,14 +643,7 @@ AtEOSubXact_LargeObject(bool isCommit, SubTransactionId mySubid,
646643 if (isCommit )
647644 lo -> subid = parentSubid ;
648645 else
649- {
650- /*
651- * Make sure we do not call inv_close twice if it errors out
652- * for some reason. Better a leak than a crash.
653- */
654- deleteLOfd (i );
655- inv_close (lo );
656- }
646+ closeLOfd (i );
657647 }
658648 }
659649}
@@ -663,19 +653,22 @@ AtEOSubXact_LargeObject(bool isCommit, SubTransactionId mySubid,
663653 *****************************************************************************/
664654
665655static int
666- newLOfd (LargeObjectDesc * lobjCookie )
656+ newLOfd (void )
667657{
668658 int i ,
669659 newsize ;
670660
661+ lo_cleanup_needed = true;
662+ if (fscxt == NULL )
663+ fscxt = AllocSetContextCreate (TopMemoryContext ,
664+ "Filesystem" ,
665+ ALLOCSET_DEFAULT_SIZES );
666+
671667 /* Try to find a free slot */
672668 for (i = 0 ; i < cookies_size ; i ++ )
673669 {
674670 if (cookies [i ] == NULL )
675- {
676- cookies [i ] = lobjCookie ;
677671 return i ;
678- }
679672 }
680673
681674 /* No free slot, so make the array bigger */
@@ -700,15 +693,25 @@ newLOfd(LargeObjectDesc *lobjCookie)
700693 cookies_size = newsize ;
701694 }
702695
703- Assert (cookies [i ] == NULL );
704- cookies [i ] = lobjCookie ;
705696 return i ;
706697}
707698
708699static void
709- deleteLOfd (int fd )
700+ closeLOfd (int fd )
710701{
702+ LargeObjectDesc * lobj ;
703+
704+ /*
705+ * Make sure we do not try to free twice if this errors out for some
706+ * reason. Better a leak than a crash.
707+ */
708+ lobj = cookies [fd ];
711709 cookies [fd ] = NULL ;
710+
711+ if (lobj -> snapshot )
712+ UnregisterSnapshotFromOwner (lobj -> snapshot ,
713+ TopTransactionResourceOwner );
714+ inv_close (lobj );
712715}
713716
714717/*****************************************************************************
@@ -727,13 +730,8 @@ lo_get_fragment_internal(Oid loOid, int64 offset, int32 nbytes)
727730 int total_read PG_USED_FOR_ASSERTS_ONLY ;
728731 bytea * result = NULL ;
729732
730- /*
731- * We don't actually need to store into fscxt, but create it anyway to
732- * ensure that AtEOXact_LargeObject knows there is state to clean up
733- */
734- CreateFSContext ();
735-
736- loDesc = inv_open (loOid , INV_READ , fscxt );
733+ lo_cleanup_needed = true;
734+ loDesc = inv_open (loOid , INV_READ , CurrentMemoryContext );
737735
738736 /*
739737 * Compute number of bytes we'll actually read, accommodating nbytes == -1
@@ -817,10 +815,9 @@ be_lo_from_bytea(PG_FUNCTION_ARGS)
817815 LargeObjectDesc * loDesc ;
818816 int written PG_USED_FOR_ASSERTS_ONLY ;
819817
820- CreateFSContext ();
821-
818+ lo_cleanup_needed = true;
822819 loOid = inv_create (loOid );
823- loDesc = inv_open (loOid , INV_WRITE , fscxt );
820+ loDesc = inv_open (loOid , INV_WRITE , CurrentMemoryContext );
824821 written = inv_write (loDesc , VARDATA_ANY (str ), VARSIZE_ANY_EXHDR (str ));
825822 Assert (written == VARSIZE_ANY_EXHDR (str ));
826823 inv_close (loDesc );
@@ -840,9 +837,8 @@ be_lo_put(PG_FUNCTION_ARGS)
840837 LargeObjectDesc * loDesc ;
841838 int written PG_USED_FOR_ASSERTS_ONLY ;
842839
843- CreateFSContext ();
844-
845- loDesc = inv_open (loOid , INV_WRITE , fscxt );
840+ lo_cleanup_needed = true;
841+ loDesc = inv_open (loOid , INV_WRITE , CurrentMemoryContext );
846842
847843 /* Permission check */
848844 if (!lo_compat_privileges &&
0 commit comments