1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * tqueue.c |
4 | * Use shm_mq to send & receive tuples between parallel backends |
5 | * |
6 | * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver |
7 | * under the hood, writes tuples from the executor to a shm_mq. |
8 | * |
9 | * A TupleQueueReader reads tuples from a shm_mq and returns the tuples. |
10 | * |
11 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
12 | * Portions Copyright (c) 1994, Regents of the University of California |
13 | * |
14 | * IDENTIFICATION |
15 | * src/backend/executor/tqueue.c |
16 | * |
17 | *------------------------------------------------------------------------- |
18 | */ |
19 | |
20 | #include "postgres.h" |
21 | |
22 | #include "access/htup_details.h" |
23 | #include "executor/tqueue.h" |
24 | |
25 | /* |
26 | * DestReceiver object's private contents |
27 | * |
28 | * queue is a pointer to data supplied by DestReceiver's caller. |
29 | */ |
30 | typedef struct TQueueDestReceiver |
31 | { |
32 | DestReceiver pub; /* public fields */ |
33 | shm_mq_handle *queue; /* shm_mq to send to */ |
34 | } TQueueDestReceiver; |
35 | |
36 | /* |
37 | * TupleQueueReader object's private contents |
38 | * |
39 | * queue is a pointer to data supplied by reader's caller. |
40 | * |
41 | * "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h |
42 | */ |
43 | struct TupleQueueReader |
44 | { |
45 | shm_mq_handle *queue; /* shm_mq to receive from */ |
46 | }; |
47 | |
48 | /* |
49 | * Receive a tuple from a query, and send it to the designated shm_mq. |
50 | * |
51 | * Returns true if successful, false if shm_mq has been detached. |
52 | */ |
53 | static bool |
54 | tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) |
55 | { |
56 | TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; |
57 | HeapTuple tuple; |
58 | shm_mq_result result; |
59 | bool should_free; |
60 | |
61 | /* Send the tuple itself. */ |
62 | tuple = ExecFetchSlotHeapTuple(slot, true, &should_free); |
63 | result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false); |
64 | |
65 | if (should_free) |
66 | heap_freetuple(tuple); |
67 | |
68 | /* Check for failure. */ |
69 | if (result == SHM_MQ_DETACHED) |
70 | return false; |
71 | else if (result != SHM_MQ_SUCCESS) |
72 | ereport(ERROR, |
73 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
74 | errmsg("could not send tuple to shared-memory queue" ))); |
75 | |
76 | return true; |
77 | } |
78 | |
79 | /* |
80 | * Prepare to receive tuples from executor. |
81 | */ |
82 | static void |
83 | tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo) |
84 | { |
85 | /* do nothing */ |
86 | } |
87 | |
88 | /* |
89 | * Clean up at end of an executor run |
90 | */ |
91 | static void |
92 | tqueueShutdownReceiver(DestReceiver *self) |
93 | { |
94 | TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; |
95 | |
96 | if (tqueue->queue != NULL) |
97 | shm_mq_detach(tqueue->queue); |
98 | tqueue->queue = NULL; |
99 | } |
100 | |
101 | /* |
102 | * Destroy receiver when done with it |
103 | */ |
104 | static void |
105 | tqueueDestroyReceiver(DestReceiver *self) |
106 | { |
107 | TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; |
108 | |
109 | /* We probably already detached from queue, but let's be sure */ |
110 | if (tqueue->queue != NULL) |
111 | shm_mq_detach(tqueue->queue); |
112 | pfree(self); |
113 | } |
114 | |
115 | /* |
116 | * Create a DestReceiver that writes tuples to a tuple queue. |
117 | */ |
118 | DestReceiver * |
119 | CreateTupleQueueDestReceiver(shm_mq_handle *handle) |
120 | { |
121 | TQueueDestReceiver *self; |
122 | |
123 | self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver)); |
124 | |
125 | self->pub.receiveSlot = tqueueReceiveSlot; |
126 | self->pub.rStartup = tqueueStartupReceiver; |
127 | self->pub.rShutdown = tqueueShutdownReceiver; |
128 | self->pub.rDestroy = tqueueDestroyReceiver; |
129 | self->pub.mydest = DestTupleQueue; |
130 | self->queue = handle; |
131 | |
132 | return (DestReceiver *) self; |
133 | } |
134 | |
135 | /* |
136 | * Create a tuple queue reader. |
137 | */ |
138 | TupleQueueReader * |
139 | CreateTupleQueueReader(shm_mq_handle *handle) |
140 | { |
141 | TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader)); |
142 | |
143 | reader->queue = handle; |
144 | |
145 | return reader; |
146 | } |
147 | |
148 | /* |
149 | * Destroy a tuple queue reader. |
150 | * |
151 | * Note: cleaning up the underlying shm_mq is the caller's responsibility. |
152 | * We won't access it here, as it may be detached already. |
153 | */ |
154 | void |
155 | DestroyTupleQueueReader(TupleQueueReader *reader) |
156 | { |
157 | pfree(reader); |
158 | } |
159 | |
160 | /* |
161 | * Fetch a tuple from a tuple queue reader. |
162 | * |
163 | * The return value is NULL if there are no remaining tuples or if |
164 | * nowait = true and no tuple is ready to return. *done, if not NULL, |
165 | * is set to true when there are no remaining tuples and otherwise to false. |
166 | * |
167 | * The returned tuple, if any, is allocated in CurrentMemoryContext. |
168 | * Note that this routine must not leak memory! (We used to allow that, |
169 | * but not any more.) |
170 | * |
171 | * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still |
172 | * accumulate bytes from a partially-read message, so it's useful to call |
173 | * this with nowait = true even if nothing is returned. |
174 | */ |
175 | HeapTuple |
176 | TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) |
177 | { |
178 | HeapTupleData htup; |
179 | shm_mq_result result; |
180 | Size nbytes; |
181 | void *data; |
182 | |
183 | if (done != NULL) |
184 | *done = false; |
185 | |
186 | /* Attempt to read a message. */ |
187 | result = shm_mq_receive(reader->queue, &nbytes, &data, nowait); |
188 | |
189 | /* If queue is detached, set *done and return NULL. */ |
190 | if (result == SHM_MQ_DETACHED) |
191 | { |
192 | if (done != NULL) |
193 | *done = true; |
194 | return NULL; |
195 | } |
196 | |
197 | /* In non-blocking mode, bail out if no message ready yet. */ |
198 | if (result == SHM_MQ_WOULD_BLOCK) |
199 | return NULL; |
200 | Assert(result == SHM_MQ_SUCCESS); |
201 | |
202 | /* |
203 | * Set up a dummy HeapTupleData pointing to the data from the shm_mq |
204 | * (which had better be sufficiently aligned). |
205 | */ |
206 | ItemPointerSetInvalid(&htup.t_self); |
207 | htup.t_tableOid = InvalidOid; |
208 | htup.t_len = nbytes; |
209 | htup.t_data = data; |
210 | |
211 | return heap_copytuple(&htup); |
212 | } |
213 | |