1/*-------------------------------------------------------------------------
2 *
3 * tqueue.c
4 * Use shm_mq to send & receive tuples between parallel backends
5 *
6 * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
7 * under the hood, writes tuples from the executor to a shm_mq.
8 *
9 * A TupleQueueReader reads tuples from a shm_mq and returns the tuples.
10 *
11 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
13 *
14 * IDENTIFICATION
15 * src/backend/executor/tqueue.c
16 *
17 *-------------------------------------------------------------------------
18 */
19
20#include "postgres.h"
21
22#include "access/htup_details.h"
23#include "executor/tqueue.h"
24
25/*
26 * DestReceiver object's private contents
27 *
28 * queue is a pointer to data supplied by DestReceiver's caller.
29 */
30typedef struct TQueueDestReceiver
31{
32 DestReceiver pub; /* public fields */
33 shm_mq_handle *queue; /* shm_mq to send to */
34} TQueueDestReceiver;
35
36/*
37 * TupleQueueReader object's private contents
38 *
39 * queue is a pointer to data supplied by reader's caller.
40 *
41 * "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h
42 */
43struct TupleQueueReader
44{
45 shm_mq_handle *queue; /* shm_mq to receive from */
46};
47
48/*
49 * Receive a tuple from a query, and send it to the designated shm_mq.
50 *
51 * Returns true if successful, false if shm_mq has been detached.
52 */
53static bool
54tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
55{
56 TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
57 HeapTuple tuple;
58 shm_mq_result result;
59 bool should_free;
60
61 /* Send the tuple itself. */
62 tuple = ExecFetchSlotHeapTuple(slot, true, &should_free);
63 result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false);
64
65 if (should_free)
66 heap_freetuple(tuple);
67
68 /* Check for failure. */
69 if (result == SHM_MQ_DETACHED)
70 return false;
71 else if (result != SHM_MQ_SUCCESS)
72 ereport(ERROR,
73 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
74 errmsg("could not send tuple to shared-memory queue")));
75
76 return true;
77}
78
79/*
80 * Prepare to receive tuples from executor.
81 */
82static void
83tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
84{
85 /* do nothing */
86}
87
88/*
89 * Clean up at end of an executor run
90 */
91static void
92tqueueShutdownReceiver(DestReceiver *self)
93{
94 TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
95
96 if (tqueue->queue != NULL)
97 shm_mq_detach(tqueue->queue);
98 tqueue->queue = NULL;
99}
100
101/*
102 * Destroy receiver when done with it
103 */
104static void
105tqueueDestroyReceiver(DestReceiver *self)
106{
107 TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
108
109 /* We probably already detached from queue, but let's be sure */
110 if (tqueue->queue != NULL)
111 shm_mq_detach(tqueue->queue);
112 pfree(self);
113}
114
115/*
116 * Create a DestReceiver that writes tuples to a tuple queue.
117 */
118DestReceiver *
119CreateTupleQueueDestReceiver(shm_mq_handle *handle)
120{
121 TQueueDestReceiver *self;
122
123 self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
124
125 self->pub.receiveSlot = tqueueReceiveSlot;
126 self->pub.rStartup = tqueueStartupReceiver;
127 self->pub.rShutdown = tqueueShutdownReceiver;
128 self->pub.rDestroy = tqueueDestroyReceiver;
129 self->pub.mydest = DestTupleQueue;
130 self->queue = handle;
131
132 return (DestReceiver *) self;
133}
134
135/*
136 * Create a tuple queue reader.
137 */
138TupleQueueReader *
139CreateTupleQueueReader(shm_mq_handle *handle)
140{
141 TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
142
143 reader->queue = handle;
144
145 return reader;
146}
147
148/*
149 * Destroy a tuple queue reader.
150 *
151 * Note: cleaning up the underlying shm_mq is the caller's responsibility.
152 * We won't access it here, as it may be detached already.
153 */
154void
155DestroyTupleQueueReader(TupleQueueReader *reader)
156{
157 pfree(reader);
158}
159
160/*
161 * Fetch a tuple from a tuple queue reader.
162 *
163 * The return value is NULL if there are no remaining tuples or if
164 * nowait = true and no tuple is ready to return. *done, if not NULL,
165 * is set to true when there are no remaining tuples and otherwise to false.
166 *
167 * The returned tuple, if any, is allocated in CurrentMemoryContext.
168 * Note that this routine must not leak memory! (We used to allow that,
169 * but not any more.)
170 *
171 * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
172 * accumulate bytes from a partially-read message, so it's useful to call
173 * this with nowait = true even if nothing is returned.
174 */
175HeapTuple
176TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
177{
178 HeapTupleData htup;
179 shm_mq_result result;
180 Size nbytes;
181 void *data;
182
183 if (done != NULL)
184 *done = false;
185
186 /* Attempt to read a message. */
187 result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
188
189 /* If queue is detached, set *done and return NULL. */
190 if (result == SHM_MQ_DETACHED)
191 {
192 if (done != NULL)
193 *done = true;
194 return NULL;
195 }
196
197 /* In non-blocking mode, bail out if no message ready yet. */
198 if (result == SHM_MQ_WOULD_BLOCK)
199 return NULL;
200 Assert(result == SHM_MQ_SUCCESS);
201
202 /*
203 * Set up a dummy HeapTupleData pointing to the data from the shm_mq
204 * (which had better be sufficiently aligned).
205 */
206 ItemPointerSetInvalid(&htup.t_self);
207 htup.t_tableOid = InvalidOid;
208 htup.t_len = nbytes;
209 htup.t_data = data;
210
211 return heap_copytuple(&htup);
212}
213