1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9#include "monetdb_config.h"
10
11#ifdef HAVE_FORK
12
13#include "gdk_interprocess.h"
14#include "gdk.h"
15#include "gdk_private.h"
16
17#include <string.h>
18
19#include <sys/types.h>
20#include <sys/ipc.h>
21#include <sys/shm.h>
22#include <sys/wait.h>
23#include <unistd.h>
24#include <sys/mman.h>
25#include <sys/stat.h>
26#include <fcntl.h>
27#include <sched.h>
28#include <sys/sem.h>
29#include <time.h>
30
31static ATOMIC_TYPE interprocess_unique_id = ATOMIC_VAR_INIT(1);
32static key_t base_key = 800000000;
33
34// Regular ftok produces too many collisions
35static inline void
36ftok_enhanced(int id, key_t * return_key)
37{
38 *return_key = base_key + id;
39}
40
41//! Obtain a set of unique identifiers that can be used to create memory mapped files or semaphores
42/* offset: The amount of unique identifiers necessary
43 * return: The first unique identifier reserved. The consecutive [offset] identifiers are also reserved.
44 * (ex. if offset = 5 and the return value is 10, then the identifiers 10-14 are reserved)
45*/
46size_t
47GDKuniqueid(size_t offset)
48{
49 return (size_t) ATOMIC_ADD(&interprocess_unique_id, (ATOMIC_BASE_TYPE) offset);
50}
51
52//! Create a memory mapped file if it does not exist and open it
53/* id: The unique identifier of the memory mapped file (use GDKuniquemmapid to get a unique identifier)
54 * size: Minimum required size of the file
55 * return: Return value pointing into the file, NULL if not successful
56*/
57void *
58GDKinitmmap(size_t id, size_t size, size_t *return_size)
59{
60 char address[100];
61 void *ptr;
62 int fd;
63 int mod = MMAP_READ | MMAP_WRITE | MMAP_SEQUENTIAL | MMAP_SYNC | MAP_SHARED;
64 char *path;
65
66 GDKmmapfile(address, sizeof(address), id);
67
68 /* round up to multiple of GDK_mmap_pagesize with a
69 * minimum of one
70 size = (maxsize + GDK_mmap_pagesize - 1) & ~(GDK_mmap_pagesize - 1);
71 if (size == 0)
72 size = GDK_mmap_pagesize; */
73 path = GDKfilepath(0, BATDIR, address, "tmp");
74 if (path == NULL) {
75 return NULL;
76 }
77 fd = GDKfdlocate(NOFARM, path, "wb", NULL);
78 if (fd < 0) {
79 GDKfree(path);
80 return NULL;
81 }
82 if (GDKextendf(fd, size, path) != GDK_SUCCEED) {
83 close(fd);
84 GDKfree(path);
85 return NULL;
86 }
87 close(fd);
88 ptr = GDKmmap(path, mod, size);
89 GDKfree(path);
90 if (ptr == NULL) {
91 return NULL;
92 }
93 if (return_size != NULL) {
94 *return_size = size;
95 }
96 return ptr;
97}
98
99//! Release a memory mapped file that was created through GDKinitmmap
100/* ptr: Pointer to the file
101 * size: Size of the file
102 * id: Identifier of the file
103 * return: GDK_SUCCEED if successful, GDK_FAIL if not successful
104*/
105gdk_return
106GDKreleasemmap(void *ptr, size_t size, size_t id)
107{
108 char address[100];
109 char *path;
110 int ret;
111 GDKmmapfile(address, sizeof(address), id);
112 if (GDKmunmap(ptr, size) != GDK_SUCCEED) {
113 return GDK_FAIL;
114 }
115 path = GDKfilepath(0, BATDIR, address, "tmp");
116 if (path == NULL) {
117 return GDK_FAIL;
118 }
119 ret = remove(path);
120 if (ret < 0)
121 GDKsyserror("cannot remove '%s'", path);
122 GDKfree(path);
123 return ret < 0 ? GDK_FAIL : GDK_SUCCEED;
124}
125
126//! snprintf the file name of a memory mapped file (as created by GDKinitmmap)
127/* buffer: The buffer to write the name to
128 * max: The maxsize of the buffer (should be at least ~10 characters)
129 * id: Identifier of the file
130*/
131gdk_return
132GDKmmapfile(str buffer, size_t max, size_t id)
133{
134 snprintf(buffer, max, "pymmap%zu", id);
135 return GDK_SUCCEED;
136}
137
138static gdk_return
139interprocess_init_semaphore(int id, int count, int flags, int *semid)
140{
141 key_t key;
142 ftok_enhanced(id, &key);
143 *semid = semget(key, count, flags | 0666);
144 if (*semid < 0) {
145 GDKsyserror("semget failed");
146 return GDK_FAIL;
147 }
148 return GDK_SUCCEED;
149}
150
151//! Create an interprocess semaphore
152/* id: identifier (obtain from GDKuniqueid)
153 * count: amount of semaphores
154 * semid: identifier of the created semaphore (only set if function returns GDK_SUCCEED)
155 */
156gdk_return
157GDKcreatesem(int id, int count, int *semid)
158{
159 return interprocess_init_semaphore(id, count, IPC_CREAT, semid);
160}
161
162//! Get an interprocess semaphore that was already created using GDKcreatesem
163/* id: identifier (obtain from GDKuniqueid)
164 * count: amount of semaphores
165 * semid: identifier of the semaphore (only set if function returns GDK_SUCCEED)
166 */
167gdk_return
168GDKgetsem(int id, int count, int *semid)
169{
170 return interprocess_init_semaphore(id, count, 0, semid);
171}
172
173//! Gets the value of an interprocess semaphore
174/* sem_id: semaphore identifier (obtained from GDKcreatesem or GDKgetsem)
175 * number: the semaphore number (must be less than 'count' given when creating the semaphore)
176 * semval: the value of the semaphore (only set if function returns GDK_SUCCEED)
177 */
178gdk_return
179GDKgetsemval(int sem_id, int number, int *semval)
180{
181 *semval = semctl(sem_id, number, GETVAL, 0);
182 if (*semval < 0) {
183 GDKsyserror("semctl failed");
184 return GDK_FAIL;
185 }
186 return GDK_SUCCEED;
187}
188
189//! Change the value of an interprocess semaphore
190/* sem_id: semaphore identifier (obtained from GDKcreatesem or GDKgetsem)
191 * number: the semaphore number (must be less than 'count' given when creating the semaphore)
192 * change: The change to apply to the semaphore value
193 */
194gdk_return
195GDKchangesemval(int sem_id, int number, int change)
196{
197 struct sembuf buffer;
198 buffer.sem_num = number;
199 buffer.sem_op = change;
200 buffer.sem_flg = 0;
201
202 if (semop(sem_id, &buffer, 1) < 0) {
203 GDKsyserror("semop failed");
204 return GDK_FAIL;
205 }
206 return GDK_SUCCEED;
207}
208
209//! Change the value of an interprocess semaphore with a timeout
210/* sem_id: semaphore identifier (obtained from GDKcreatesem or GDKgetsem)
211 * number: the semaphore number (must be less than 'count' given when creating the semaphore)
212 * change: The change to apply to the semaphore value
213 * timeout_mseconds: The timeout in milliseconds
214 * succeed: Set to true if the value was successfully changed, or false if the timeout was reached
215 */
216gdk_return
217GDKchangesemval_timeout(int sem_id, int number, int change, int timeout_mseconds, bool *succeed)
218{
219#ifdef HAVE_SEMTIMEDOP
220 // Some linux installations don't have semtimedop
221 // The easiest solution is to just call semop instead
222 // The only reason we use semtimedop is to prevent deadlocks when there are segfaults in a subprocess, which really shouldn't happen anyway
223 // So having semtimedop is not vital to the functioning of the program
224 struct timespec timeout;
225 struct sembuf buffer;
226 buffer.sem_num = number;
227 buffer.sem_op = change;
228 buffer.sem_flg = 0;
229 *succeed = false;
230
231 timeout.tv_sec = (timeout_mseconds / 1000);
232 timeout.tv_nsec = (timeout_mseconds % 1000) * 1000;
233
234 if (semtimedop(sem_id, &buffer, 1, &timeout) < 0) {
235 if (errno == EAGAIN || errno == EINTR) {
236 // operation timed out; not an error
237 errno = 0;
238 return GDK_SUCCEED;
239 } else {
240 GDKsyserror("semtimedop failed");
241 return GDK_FAIL;
242 }
243 }
244 *succeed = true;
245 return GDK_SUCCEED;
246#else
247 (void) timeout_mseconds;
248 *succeed = true;
249 return GDKchangesemval(sem_id, number, change);
250#endif
251}
252
253//! Destroy an interprocess semaphore
254/* sem_id: semaphore identifier (obtained from GDKcreatesem or GDKgetsem)
255 */
256gdk_return
257GDKreleasesem(int sem_id)
258{
259 if (semctl(sem_id, 0, IPC_RMID) < 0) {
260 GDKsyserror("semctl failed");
261 return GDK_FAIL;
262 }
263 return GDK_SUCCEED;
264}
265
266// align to 8 bytes
267#define align(sz) ((sz + 7) & ~7)
268
269size_t
270GDKbatcopysize(BAT *bat, str colname)
271{
272 size_t size = 0;
273
274 size += align(strlen(colname) + 1); //[COLNAME]
275 size += align(sizeof(BAT)); //[BAT]
276 size += align(bat->twidth * BATcount(bat)); //[DATA]
277
278 if (bat->tvheap != NULL) {
279 size += align(sizeof(Heap)); //[VHEAP]
280 size += align(bat->tvheap->size); //[VHEAPDATA]
281 }
282 return size;
283}
284
285size_t
286GDKbatcopy(char *dest, BAT *bat, str colname)
287{
288 size_t batsize = bat->twidth * BATcount(bat);
289 size_t position = 0;
290
291 //[COLNAME]
292 memcpy(dest + position, colname, strlen(colname) + 1);
293 position += align(strlen(colname) + 1);
294 //[BAT]
295 memcpy(dest + position, bat, sizeof(BAT));
296 position += align(sizeof(BAT));
297 //[DATA]
298 memcpy(dest + position, Tloc(bat, 0), batsize);
299 position += align(batsize);
300 if (bat->tvheap != NULL) {
301 //[VHEAP]
302 memcpy(dest + position, bat->tvheap, sizeof(Heap));
303 position += align(sizeof(Heap));
304 //[VHEAPDATA]
305 memcpy(dest + position, bat->tvheap->base, bat->tvheap->size);
306 position += align(bat->tvheap->size);
307 }
308 return position;
309}
310
311size_t
312GDKbatread(char *src, BAT **bat, str *colname)
313{
314 size_t position = 0;
315 BAT *b;
316 //load the data for this column from shared memory
317 //[COLNAME]
318 *colname = src + position;
319 position += align(strlen(*colname) + 1);
320 //[BAT]
321 b = (BAT *) (src + position);
322 position += align(sizeof(BAT));
323 //[DATA]
324 b->theap.base = (void *) (src + position);
325 position += align(b->twidth * BATcount(b));
326 if (b->tvheap != NULL) {
327 //[VHEAP]
328 b->tvheap = (Heap *) (src + position);
329 position += align(sizeof(Heap));
330 //[VHEAPDATA]
331 b->tvheap->base = (void *) (src + position);
332 position += align(b->tvheap->size);
333 }
334 *bat = b;
335 return position;
336}
337
338#endif
339