1/*-------------------------------------------------------------------------
2 *
3 * inv_api.c
4 * routines for manipulating inversion fs large objects. This file
5 * contains the user-level large object application interface routines.
6 *
7 *
8 * Note: we access pg_largeobject.data using its C struct declaration.
9 * This is safe because it immediately follows pageno which is an int4 field,
10 * and therefore the data field will always be 4-byte aligned, even if it
11 * is in the short 1-byte-header format. We have to detoast it since it's
12 * quite likely to be in compressed or short format. We also need to check
13 * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14 *
15 * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16 * does most of the backend code. We expect that CurrentMemoryContext will
17 * be a short-lived context. Data that must persist across function calls
18 * is kept either in CacheMemoryContext (the Relation structs) or in the
19 * memory context given to inv_open (for LargeObjectDesc structs).
20 *
21 *
22 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
23 * Portions Copyright (c) 1994, Regents of the University of California
24 *
25 *
26 * IDENTIFICATION
27 * src/backend/storage/large_object/inv_api.c
28 *
29 *-------------------------------------------------------------------------
30 */
31#include "postgres.h"
32
33#include <limits.h>
34
35#include "access/genam.h"
36#include "access/sysattr.h"
37#include "access/table.h"
38#include "access/tuptoaster.h"
39#include "access/xact.h"
40#include "catalog/dependency.h"
41#include "catalog/indexing.h"
42#include "catalog/objectaccess.h"
43#include "catalog/pg_largeobject.h"
44#include "catalog/pg_largeobject_metadata.h"
45#include "libpq/libpq-fs.h"
46#include "miscadmin.h"
47#include "storage/large_object.h"
48#include "utils/fmgroids.h"
49#include "utils/rel.h"
50#include "utils/snapmgr.h"
51
52
53/*
54 * GUC: backwards-compatibility flag to suppress LO permission checks
55 */
56bool lo_compat_privileges;
57
58/*
59 * All accesses to pg_largeobject and its index make use of a single Relation
60 * reference, so that we only need to open pg_relation once per transaction.
61 * To avoid problems when the first such reference occurs inside a
62 * subtransaction, we execute a slightly klugy maneuver to assign ownership of
63 * the Relation reference to TopTransactionResourceOwner.
64 */
65static Relation lo_heap_r = NULL;
66static Relation lo_index_r = NULL;
67
68
69/*
70 * Open pg_largeobject and its index, if not already done in current xact
71 */
72static void
73open_lo_relation(void)
74{
75 ResourceOwner currentOwner;
76
77 if (lo_heap_r && lo_index_r)
78 return; /* already open in current xact */
79
80 /* Arrange for the top xact to own these relation references */
81 currentOwner = CurrentResourceOwner;
82 CurrentResourceOwner = TopTransactionResourceOwner;
83
84 /* Use RowExclusiveLock since we might either read or write */
85 if (lo_heap_r == NULL)
86 lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
87 if (lo_index_r == NULL)
88 lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
89
90 CurrentResourceOwner = currentOwner;
91}
92
93/*
94 * Clean up at main transaction end
95 */
96void
97close_lo_relation(bool isCommit)
98{
99 if (lo_heap_r || lo_index_r)
100 {
101 /*
102 * Only bother to close if committing; else abort cleanup will handle
103 * it
104 */
105 if (isCommit)
106 {
107 ResourceOwner currentOwner;
108
109 currentOwner = CurrentResourceOwner;
110 CurrentResourceOwner = TopTransactionResourceOwner;
111
112 if (lo_index_r)
113 index_close(lo_index_r, NoLock);
114 if (lo_heap_r)
115 table_close(lo_heap_r, NoLock);
116
117 CurrentResourceOwner = currentOwner;
118 }
119 lo_heap_r = NULL;
120 lo_index_r = NULL;
121 }
122}
123
124
125/*
126 * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
127 * read with can be specified.
128 */
129static bool
130myLargeObjectExists(Oid loid, Snapshot snapshot)
131{
132 Relation pg_lo_meta;
133 ScanKeyData skey[1];
134 SysScanDesc sd;
135 HeapTuple tuple;
136 bool retval = false;
137
138 ScanKeyInit(&skey[0],
139 Anum_pg_largeobject_metadata_oid,
140 BTEqualStrategyNumber, F_OIDEQ,
141 ObjectIdGetDatum(loid));
142
143 pg_lo_meta = table_open(LargeObjectMetadataRelationId,
144 AccessShareLock);
145
146 sd = systable_beginscan(pg_lo_meta,
147 LargeObjectMetadataOidIndexId, true,
148 snapshot, 1, skey);
149
150 tuple = systable_getnext(sd);
151 if (HeapTupleIsValid(tuple))
152 retval = true;
153
154 systable_endscan(sd);
155
156 table_close(pg_lo_meta, AccessShareLock);
157
158 return retval;
159}
160
161
162/*
163 * Extract data field from a pg_largeobject tuple, detoasting if needed
164 * and verifying that the length is sane. Returns data pointer (a bytea *),
165 * data length, and an indication of whether to pfree the data pointer.
166 */
167static void
168getdatafield(Form_pg_largeobject tuple,
169 bytea **pdatafield,
170 int *plen,
171 bool *pfreeit)
172{
173 bytea *datafield;
174 int len;
175 bool freeit;
176
177 datafield = &(tuple->data); /* see note at top of file */
178 freeit = false;
179 if (VARATT_IS_EXTENDED(datafield))
180 {
181 datafield = (bytea *)
182 heap_tuple_untoast_attr((struct varlena *) datafield);
183 freeit = true;
184 }
185 len = VARSIZE(datafield) - VARHDRSZ;
186 if (len < 0 || len > LOBLKSIZE)
187 ereport(ERROR,
188 (errcode(ERRCODE_DATA_CORRUPTED),
189 errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
190 tuple->loid, tuple->pageno, len)));
191 *pdatafield = datafield;
192 *plen = len;
193 *pfreeit = freeit;
194}
195
196
197/*
198 * inv_create -- create a new large object
199 *
200 * Arguments:
201 * lobjId - OID to use for new large object, or InvalidOid to pick one
202 *
203 * Returns:
204 * OID of new object
205 *
206 * If lobjId is not InvalidOid, then an error occurs if the OID is already
207 * in use.
208 */
209Oid
210inv_create(Oid lobjId)
211{
212 Oid lobjId_new;
213
214 /*
215 * Create a new largeobject with empty data pages
216 */
217 lobjId_new = LargeObjectCreate(lobjId);
218
219 /*
220 * dependency on the owner of largeobject
221 *
222 * The reason why we use LargeObjectRelationId instead of
223 * LargeObjectMetadataRelationId here is to provide backward compatibility
224 * to the applications which utilize a knowledge about internal layout of
225 * system catalogs. OID of pg_largeobject_metadata and loid of
226 * pg_largeobject are same value, so there are no actual differences here.
227 */
228 recordDependencyOnOwner(LargeObjectRelationId,
229 lobjId_new, GetUserId());
230
231 /* Post creation hook for new large object */
232 InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
233
234 /*
235 * Advance command counter to make new tuple visible to later operations.
236 */
237 CommandCounterIncrement();
238
239 return lobjId_new;
240}
241
242/*
243 * inv_open -- access an existing large object.
244 *
245 * Returns:
246 * Large object descriptor, appropriately filled in. The descriptor
247 * and subsidiary data are allocated in the specified memory context,
248 * which must be suitably long-lived for the caller's purposes.
249 */
250LargeObjectDesc *
251inv_open(Oid lobjId, int flags, MemoryContext mcxt)
252{
253 LargeObjectDesc *retval;
254 Snapshot snapshot = NULL;
255 int descflags = 0;
256
257 /*
258 * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
259 * | INV_READ), the caller being allowed to read the large object
260 * descriptor in either case.
261 */
262 if (flags & INV_WRITE)
263 descflags |= IFS_WRLOCK | IFS_RDLOCK;
264 if (flags & INV_READ)
265 descflags |= IFS_RDLOCK;
266
267 if (descflags == 0)
268 ereport(ERROR,
269 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
270 errmsg("invalid flags for opening a large object: %d",
271 flags)));
272
273 /* Get snapshot. If write is requested, use an instantaneous snapshot. */
274 if (descflags & IFS_WRLOCK)
275 snapshot = NULL;
276 else
277 snapshot = GetActiveSnapshot();
278
279 /* Can't use LargeObjectExists here because we need to specify snapshot */
280 if (!myLargeObjectExists(lobjId, snapshot))
281 ereport(ERROR,
282 (errcode(ERRCODE_UNDEFINED_OBJECT),
283 errmsg("large object %u does not exist", lobjId)));
284
285 /* Apply permission checks, again specifying snapshot */
286 if ((descflags & IFS_RDLOCK) != 0)
287 {
288 if (!lo_compat_privileges &&
289 pg_largeobject_aclcheck_snapshot(lobjId,
290 GetUserId(),
291 ACL_SELECT,
292 snapshot) != ACLCHECK_OK)
293 ereport(ERROR,
294 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
295 errmsg("permission denied for large object %u",
296 lobjId)));
297 }
298 if ((descflags & IFS_WRLOCK) != 0)
299 {
300 if (!lo_compat_privileges &&
301 pg_largeobject_aclcheck_snapshot(lobjId,
302 GetUserId(),
303 ACL_UPDATE,
304 snapshot) != ACLCHECK_OK)
305 ereport(ERROR,
306 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
307 errmsg("permission denied for large object %u",
308 lobjId)));
309 }
310
311 /* OK to create a descriptor */
312 retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
313 sizeof(LargeObjectDesc));
314 retval->id = lobjId;
315 retval->subid = GetCurrentSubTransactionId();
316 retval->offset = 0;
317 retval->flags = descflags;
318
319 /*
320 * We must register the snapshot in TopTransaction's resowner, because it
321 * must stay alive until the LO is closed rather than until the current
322 * portal shuts down. Do this last to avoid uselessly leaking the
323 * snapshot if an error is thrown above.
324 */
325 if (snapshot)
326 snapshot = RegisterSnapshotOnOwner(snapshot,
327 TopTransactionResourceOwner);
328 retval->snapshot = snapshot;
329
330 return retval;
331}
332
333/*
334 * Closes a large object descriptor previously made by inv_open(), and
335 * releases the long-term memory used by it.
336 */
337void
338inv_close(LargeObjectDesc *obj_desc)
339{
340 Assert(PointerIsValid(obj_desc));
341
342 UnregisterSnapshotFromOwner(obj_desc->snapshot,
343 TopTransactionResourceOwner);
344
345 pfree(obj_desc);
346}
347
348/*
349 * Destroys an existing large object (not to be confused with a descriptor!)
350 *
351 * Note we expect caller to have done any required permissions check.
352 */
353int
354inv_drop(Oid lobjId)
355{
356 ObjectAddress object;
357
358 /*
359 * Delete any comments and dependencies on the large object
360 */
361 object.classId = LargeObjectRelationId;
362 object.objectId = lobjId;
363 object.objectSubId = 0;
364 performDeletion(&object, DROP_CASCADE, 0);
365
366 /*
367 * Advance command counter so that tuple removal will be seen by later
368 * large-object operations in this transaction.
369 */
370 CommandCounterIncrement();
371
372 /* For historical reasons, we always return 1 on success. */
373 return 1;
374}
375
376/*
377 * Determine size of a large object
378 *
379 * NOTE: LOs can contain gaps, just like Unix files. We actually return
380 * the offset of the last byte + 1.
381 */
382static uint64
383inv_getsize(LargeObjectDesc *obj_desc)
384{
385 uint64 lastbyte = 0;
386 ScanKeyData skey[1];
387 SysScanDesc sd;
388 HeapTuple tuple;
389
390 Assert(PointerIsValid(obj_desc));
391
392 open_lo_relation();
393
394 ScanKeyInit(&skey[0],
395 Anum_pg_largeobject_loid,
396 BTEqualStrategyNumber, F_OIDEQ,
397 ObjectIdGetDatum(obj_desc->id));
398
399 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
400 obj_desc->snapshot, 1, skey);
401
402 /*
403 * Because the pg_largeobject index is on both loid and pageno, but we
404 * constrain only loid, a backwards scan should visit all pages of the
405 * large object in reverse pageno order. So, it's sufficient to examine
406 * the first valid tuple (== last valid page).
407 */
408 tuple = systable_getnext_ordered(sd, BackwardScanDirection);
409 if (HeapTupleIsValid(tuple))
410 {
411 Form_pg_largeobject data;
412 bytea *datafield;
413 int len;
414 bool pfreeit;
415
416 if (HeapTupleHasNulls(tuple)) /* paranoia */
417 elog(ERROR, "null field found in pg_largeobject");
418 data = (Form_pg_largeobject) GETSTRUCT(tuple);
419 getdatafield(data, &datafield, &len, &pfreeit);
420 lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
421 if (pfreeit)
422 pfree(datafield);
423 }
424
425 systable_endscan_ordered(sd);
426
427 return lastbyte;
428}
429
430int64
431inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
432{
433 int64 newoffset;
434
435 Assert(PointerIsValid(obj_desc));
436
437 /*
438 * We allow seek/tell if you have either read or write permission, so no
439 * need for a permission check here.
440 */
441
442 /*
443 * Note: overflow in the additions is possible, but since we will reject
444 * negative results, we don't need any extra test for that.
445 */
446 switch (whence)
447 {
448 case SEEK_SET:
449 newoffset = offset;
450 break;
451 case SEEK_CUR:
452 newoffset = obj_desc->offset + offset;
453 break;
454 case SEEK_END:
455 newoffset = inv_getsize(obj_desc) + offset;
456 break;
457 default:
458 ereport(ERROR,
459 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
460 errmsg("invalid whence setting: %d", whence)));
461 newoffset = 0; /* keep compiler quiet */
462 break;
463 }
464
465 /*
466 * use errmsg_internal here because we don't want to expose INT64_FORMAT
467 * in translatable strings; doing better is not worth the trouble
468 */
469 if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
470 ereport(ERROR,
471 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
472 errmsg_internal("invalid large object seek target: " INT64_FORMAT,
473 newoffset)));
474
475 obj_desc->offset = newoffset;
476 return newoffset;
477}
478
479int64
480inv_tell(LargeObjectDesc *obj_desc)
481{
482 Assert(PointerIsValid(obj_desc));
483
484 /*
485 * We allow seek/tell if you have either read or write permission, so no
486 * need for a permission check here.
487 */
488
489 return obj_desc->offset;
490}
491
492int
493inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
494{
495 int nread = 0;
496 int64 n;
497 int64 off;
498 int len;
499 int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
500 uint64 pageoff;
501 ScanKeyData skey[2];
502 SysScanDesc sd;
503 HeapTuple tuple;
504
505 Assert(PointerIsValid(obj_desc));
506 Assert(buf != NULL);
507
508 if ((obj_desc->flags & IFS_RDLOCK) == 0)
509 ereport(ERROR,
510 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
511 errmsg("permission denied for large object %u",
512 obj_desc->id)));
513
514 if (nbytes <= 0)
515 return 0;
516
517 open_lo_relation();
518
519 ScanKeyInit(&skey[0],
520 Anum_pg_largeobject_loid,
521 BTEqualStrategyNumber, F_OIDEQ,
522 ObjectIdGetDatum(obj_desc->id));
523
524 ScanKeyInit(&skey[1],
525 Anum_pg_largeobject_pageno,
526 BTGreaterEqualStrategyNumber, F_INT4GE,
527 Int32GetDatum(pageno));
528
529 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
530 obj_desc->snapshot, 2, skey);
531
532 while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
533 {
534 Form_pg_largeobject data;
535 bytea *datafield;
536 bool pfreeit;
537
538 if (HeapTupleHasNulls(tuple)) /* paranoia */
539 elog(ERROR, "null field found in pg_largeobject");
540 data = (Form_pg_largeobject) GETSTRUCT(tuple);
541
542 /*
543 * We expect the indexscan will deliver pages in order. However,
544 * there may be missing pages if the LO contains unwritten "holes". We
545 * want missing sections to read out as zeroes.
546 */
547 pageoff = ((uint64) data->pageno) * LOBLKSIZE;
548 if (pageoff > obj_desc->offset)
549 {
550 n = pageoff - obj_desc->offset;
551 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
552 MemSet(buf + nread, 0, n);
553 nread += n;
554 obj_desc->offset += n;
555 }
556
557 if (nread < nbytes)
558 {
559 Assert(obj_desc->offset >= pageoff);
560 off = (int) (obj_desc->offset - pageoff);
561 Assert(off >= 0 && off < LOBLKSIZE);
562
563 getdatafield(data, &datafield, &len, &pfreeit);
564 if (len > off)
565 {
566 n = len - off;
567 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
568 memcpy(buf + nread, VARDATA(datafield) + off, n);
569 nread += n;
570 obj_desc->offset += n;
571 }
572 if (pfreeit)
573 pfree(datafield);
574 }
575
576 if (nread >= nbytes)
577 break;
578 }
579
580 systable_endscan_ordered(sd);
581
582 return nread;
583}
584
585int
586inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
587{
588 int nwritten = 0;
589 int n;
590 int off;
591 int len;
592 int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
593 ScanKeyData skey[2];
594 SysScanDesc sd;
595 HeapTuple oldtuple;
596 Form_pg_largeobject olddata;
597 bool neednextpage;
598 bytea *datafield;
599 bool pfreeit;
600 union
601 {
602 bytea hdr;
603 /* this is to make the union big enough for a LO data chunk: */
604 char data[LOBLKSIZE + VARHDRSZ];
605 /* ensure union is aligned well enough: */
606 int32 align_it;
607 } workbuf;
608 char *workb = VARDATA(&workbuf.hdr);
609 HeapTuple newtup;
610 Datum values[Natts_pg_largeobject];
611 bool nulls[Natts_pg_largeobject];
612 bool replace[Natts_pg_largeobject];
613 CatalogIndexState indstate;
614
615 Assert(PointerIsValid(obj_desc));
616 Assert(buf != NULL);
617
618 /* enforce writability because snapshot is probably wrong otherwise */
619 if ((obj_desc->flags & IFS_WRLOCK) == 0)
620 ereport(ERROR,
621 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
622 errmsg("permission denied for large object %u",
623 obj_desc->id)));
624
625 if (nbytes <= 0)
626 return 0;
627
628 /* this addition can't overflow because nbytes is only int32 */
629 if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
630 ereport(ERROR,
631 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
632 errmsg("invalid large object write request size: %d",
633 nbytes)));
634
635 open_lo_relation();
636
637 indstate = CatalogOpenIndexes(lo_heap_r);
638
639 ScanKeyInit(&skey[0],
640 Anum_pg_largeobject_loid,
641 BTEqualStrategyNumber, F_OIDEQ,
642 ObjectIdGetDatum(obj_desc->id));
643
644 ScanKeyInit(&skey[1],
645 Anum_pg_largeobject_pageno,
646 BTGreaterEqualStrategyNumber, F_INT4GE,
647 Int32GetDatum(pageno));
648
649 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
650 obj_desc->snapshot, 2, skey);
651
652 oldtuple = NULL;
653 olddata = NULL;
654 neednextpage = true;
655
656 while (nwritten < nbytes)
657 {
658 /*
659 * If possible, get next pre-existing page of the LO. We expect the
660 * indexscan will deliver these in order --- but there may be holes.
661 */
662 if (neednextpage)
663 {
664 if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
665 {
666 if (HeapTupleHasNulls(oldtuple)) /* paranoia */
667 elog(ERROR, "null field found in pg_largeobject");
668 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
669 Assert(olddata->pageno >= pageno);
670 }
671 neednextpage = false;
672 }
673
674 /*
675 * If we have a pre-existing page, see if it is the page we want to
676 * write, or a later one.
677 */
678 if (olddata != NULL && olddata->pageno == pageno)
679 {
680 /*
681 * Update an existing page with fresh data.
682 *
683 * First, load old data into workbuf
684 */
685 getdatafield(olddata, &datafield, &len, &pfreeit);
686 memcpy(workb, VARDATA(datafield), len);
687 if (pfreeit)
688 pfree(datafield);
689
690 /*
691 * Fill any hole
692 */
693 off = (int) (obj_desc->offset % LOBLKSIZE);
694 if (off > len)
695 MemSet(workb + len, 0, off - len);
696
697 /*
698 * Insert appropriate portion of new data
699 */
700 n = LOBLKSIZE - off;
701 n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
702 memcpy(workb + off, buf + nwritten, n);
703 nwritten += n;
704 obj_desc->offset += n;
705 off += n;
706 /* compute valid length of new page */
707 len = (len >= off) ? len : off;
708 SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
709
710 /*
711 * Form and insert updated tuple
712 */
713 memset(values, 0, sizeof(values));
714 memset(nulls, false, sizeof(nulls));
715 memset(replace, false, sizeof(replace));
716 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
717 replace[Anum_pg_largeobject_data - 1] = true;
718 newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
719 values, nulls, replace);
720 CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
721 indstate);
722 heap_freetuple(newtup);
723
724 /*
725 * We're done with this old page.
726 */
727 oldtuple = NULL;
728 olddata = NULL;
729 neednextpage = true;
730 }
731 else
732 {
733 /*
734 * Write a brand new page.
735 *
736 * First, fill any hole
737 */
738 off = (int) (obj_desc->offset % LOBLKSIZE);
739 if (off > 0)
740 MemSet(workb, 0, off);
741
742 /*
743 * Insert appropriate portion of new data
744 */
745 n = LOBLKSIZE - off;
746 n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
747 memcpy(workb + off, buf + nwritten, n);
748 nwritten += n;
749 obj_desc->offset += n;
750 /* compute valid length of new page */
751 len = off + n;
752 SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
753
754 /*
755 * Form and insert updated tuple
756 */
757 memset(values, 0, sizeof(values));
758 memset(nulls, false, sizeof(nulls));
759 values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
760 values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
761 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
762 newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
763 CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
764 heap_freetuple(newtup);
765 }
766 pageno++;
767 }
768
769 systable_endscan_ordered(sd);
770
771 CatalogCloseIndexes(indstate);
772
773 /*
774 * Advance command counter so that my tuple updates will be seen by later
775 * large-object operations in this transaction.
776 */
777 CommandCounterIncrement();
778
779 return nwritten;
780}
781
782void
783inv_truncate(LargeObjectDesc *obj_desc, int64 len)
784{
785 int32 pageno = (int32) (len / LOBLKSIZE);
786 int32 off;
787 ScanKeyData skey[2];
788 SysScanDesc sd;
789 HeapTuple oldtuple;
790 Form_pg_largeobject olddata;
791 union
792 {
793 bytea hdr;
794 /* this is to make the union big enough for a LO data chunk: */
795 char data[LOBLKSIZE + VARHDRSZ];
796 /* ensure union is aligned well enough: */
797 int32 align_it;
798 } workbuf;
799 char *workb = VARDATA(&workbuf.hdr);
800 HeapTuple newtup;
801 Datum values[Natts_pg_largeobject];
802 bool nulls[Natts_pg_largeobject];
803 bool replace[Natts_pg_largeobject];
804 CatalogIndexState indstate;
805
806 Assert(PointerIsValid(obj_desc));
807
808 /* enforce writability because snapshot is probably wrong otherwise */
809 if ((obj_desc->flags & IFS_WRLOCK) == 0)
810 ereport(ERROR,
811 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
812 errmsg("permission denied for large object %u",
813 obj_desc->id)));
814
815 /*
816 * use errmsg_internal here because we don't want to expose INT64_FORMAT
817 * in translatable strings; doing better is not worth the trouble
818 */
819 if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
820 ereport(ERROR,
821 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
822 errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
823 len)));
824
825 open_lo_relation();
826
827 indstate = CatalogOpenIndexes(lo_heap_r);
828
829 /*
830 * Set up to find all pages with desired loid and pageno >= target
831 */
832 ScanKeyInit(&skey[0],
833 Anum_pg_largeobject_loid,
834 BTEqualStrategyNumber, F_OIDEQ,
835 ObjectIdGetDatum(obj_desc->id));
836
837 ScanKeyInit(&skey[1],
838 Anum_pg_largeobject_pageno,
839 BTGreaterEqualStrategyNumber, F_INT4GE,
840 Int32GetDatum(pageno));
841
842 sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
843 obj_desc->snapshot, 2, skey);
844
845 /*
846 * If possible, get the page the truncation point is in. The truncation
847 * point may be beyond the end of the LO or in a hole.
848 */
849 olddata = NULL;
850 if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
851 {
852 if (HeapTupleHasNulls(oldtuple)) /* paranoia */
853 elog(ERROR, "null field found in pg_largeobject");
854 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
855 Assert(olddata->pageno >= pageno);
856 }
857
858 /*
859 * If we found the page of the truncation point we need to truncate the
860 * data in it. Otherwise if we're in a hole, we need to create a page to
861 * mark the end of data.
862 */
863 if (olddata != NULL && olddata->pageno == pageno)
864 {
865 /* First, load old data into workbuf */
866 bytea *datafield;
867 int pagelen;
868 bool pfreeit;
869
870 getdatafield(olddata, &datafield, &pagelen, &pfreeit);
871 memcpy(workb, VARDATA(datafield), pagelen);
872 if (pfreeit)
873 pfree(datafield);
874
875 /*
876 * Fill any hole
877 */
878 off = len % LOBLKSIZE;
879 if (off > pagelen)
880 MemSet(workb + pagelen, 0, off - pagelen);
881
882 /* compute length of new page */
883 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
884
885 /*
886 * Form and insert updated tuple
887 */
888 memset(values, 0, sizeof(values));
889 memset(nulls, false, sizeof(nulls));
890 memset(replace, false, sizeof(replace));
891 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
892 replace[Anum_pg_largeobject_data - 1] = true;
893 newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
894 values, nulls, replace);
895 CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
896 indstate);
897 heap_freetuple(newtup);
898 }
899 else
900 {
901 /*
902 * If the first page we found was after the truncation point, we're in
903 * a hole that we'll fill, but we need to delete the later page
904 * because the loop below won't visit it again.
905 */
906 if (olddata != NULL)
907 {
908 Assert(olddata->pageno > pageno);
909 CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
910 }
911
912 /*
913 * Write a brand new page.
914 *
915 * Fill the hole up to the truncation point
916 */
917 off = len % LOBLKSIZE;
918 if (off > 0)
919 MemSet(workb, 0, off);
920
921 /* compute length of new page */
922 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
923
924 /*
925 * Form and insert new tuple
926 */
927 memset(values, 0, sizeof(values));
928 memset(nulls, false, sizeof(nulls));
929 values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
930 values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
931 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
932 newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
933 CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
934 heap_freetuple(newtup);
935 }
936
937 /*
938 * Delete any pages after the truncation point. If the initial search
939 * didn't find a page, then of course there's nothing more to do.
940 */
941 if (olddata != NULL)
942 {
943 while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
944 {
945 CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
946 }
947 }
948
949 systable_endscan_ordered(sd);
950
951 CatalogCloseIndexes(indstate);
952
953 /*
954 * Advance command counter so that tuple updates will be seen by later
955 * large-object operations in this transaction.
956 */
957 CommandCounterIncrement();
958}
959