1 | /* |
2 | * This Source Code Form is subject to the terms of the Mozilla Public |
3 | * License, v. 2.0. If a copy of the MPL was not distributed with this |
4 | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
5 | * |
6 | * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V. |
7 | */ |
8 | |
9 | #include "monetdb_config.h" |
10 | |
11 | #ifdef HAVE_FORK |
12 | |
13 | #include "gdk_interprocess.h" |
14 | #include "gdk.h" |
15 | #include "gdk_private.h" |
16 | |
17 | #include <string.h> |
18 | |
19 | #include <sys/types.h> |
20 | #include <sys/ipc.h> |
21 | #include <sys/shm.h> |
22 | #include <sys/wait.h> |
23 | #include <unistd.h> |
24 | #include <sys/mman.h> |
25 | #include <sys/stat.h> |
26 | #include <fcntl.h> |
27 | #include <sched.h> |
28 | #include <sys/sem.h> |
29 | #include <time.h> |
30 | |
31 | static ATOMIC_TYPE interprocess_unique_id = ATOMIC_VAR_INIT(1); |
32 | static key_t base_key = 800000000; |
33 | |
34 | // Regular ftok produces too many collisions |
35 | static inline void |
36 | ftok_enhanced(int id, key_t * return_key) |
37 | { |
38 | *return_key = base_key + id; |
39 | } |
40 | |
41 | //! Obtain a set of unique identifiers that can be used to create memory mapped files or semaphores |
42 | /* offset: The amount of unique identifiers necessary |
43 | * return: The first unique identifier reserved. The consecutive [offset] identifiers are also reserved. |
44 | * (ex. if offset = 5 and the return value is 10, then the identifiers 10-14 are reserved) |
45 | */ |
46 | size_t |
47 | GDKuniqueid(size_t offset) |
48 | { |
49 | return (size_t) ATOMIC_ADD(&interprocess_unique_id, (ATOMIC_BASE_TYPE) offset); |
50 | } |
51 | |
52 | //! Create a memory mapped file if it does not exist and open it |
53 | /* id: The unique identifier of the memory mapped file (use GDKuniquemmapid to get a unique identifier) |
54 | * size: Minimum required size of the file |
55 | * return: Return value pointing into the file, NULL if not successful |
56 | */ |
57 | void * |
58 | GDKinitmmap(size_t id, size_t size, size_t *return_size) |
59 | { |
60 | char address[100]; |
61 | void *ptr; |
62 | int fd; |
63 | int mod = MMAP_READ | MMAP_WRITE | MMAP_SEQUENTIAL | MMAP_SYNC | MAP_SHARED; |
64 | char *path; |
65 | |
66 | GDKmmapfile(address, sizeof(address), id); |
67 | |
68 | /* round up to multiple of GDK_mmap_pagesize with a |
69 | * minimum of one |
70 | size = (maxsize + GDK_mmap_pagesize - 1) & ~(GDK_mmap_pagesize - 1); |
71 | if (size == 0) |
72 | size = GDK_mmap_pagesize; */ |
73 | path = GDKfilepath(0, BATDIR, address, "tmp" ); |
74 | if (path == NULL) { |
75 | return NULL; |
76 | } |
77 | fd = GDKfdlocate(NOFARM, path, "wb" , NULL); |
78 | if (fd < 0) { |
79 | GDKfree(path); |
80 | return NULL; |
81 | } |
82 | if (GDKextendf(fd, size, path) != GDK_SUCCEED) { |
83 | close(fd); |
84 | GDKfree(path); |
85 | return NULL; |
86 | } |
87 | close(fd); |
88 | ptr = GDKmmap(path, mod, size); |
89 | GDKfree(path); |
90 | if (ptr == NULL) { |
91 | return NULL; |
92 | } |
93 | if (return_size != NULL) { |
94 | *return_size = size; |
95 | } |
96 | return ptr; |
97 | } |
98 | |
99 | //! Release a memory mapped file that was created through GDKinitmmap |
100 | /* ptr: Pointer to the file |
101 | * size: Size of the file |
102 | * id: Identifier of the file |
103 | * return: GDK_SUCCEED if successful, GDK_FAIL if not successful |
104 | */ |
105 | gdk_return |
106 | GDKreleasemmap(void *ptr, size_t size, size_t id) |
107 | { |
108 | char address[100]; |
109 | char *path; |
110 | int ret; |
111 | GDKmmapfile(address, sizeof(address), id); |
112 | if (GDKmunmap(ptr, size) != GDK_SUCCEED) { |
113 | return GDK_FAIL; |
114 | } |
115 | path = GDKfilepath(0, BATDIR, address, "tmp" ); |
116 | if (path == NULL) { |
117 | return GDK_FAIL; |
118 | } |
119 | ret = remove(path); |
120 | if (ret < 0) |
121 | GDKsyserror("cannot remove '%s'" , path); |
122 | GDKfree(path); |
123 | return ret < 0 ? GDK_FAIL : GDK_SUCCEED; |
124 | } |
125 | |
126 | //! snprintf the file name of a memory mapped file (as created by GDKinitmmap) |
127 | /* buffer: The buffer to write the name to |
128 | * max: The maxsize of the buffer (should be at least ~10 characters) |
129 | * id: Identifier of the file |
130 | */ |
131 | gdk_return |
132 | GDKmmapfile(str buffer, size_t max, size_t id) |
133 | { |
134 | snprintf(buffer, max, "pymmap%zu" , id); |
135 | return GDK_SUCCEED; |
136 | } |
137 | |
138 | static gdk_return |
139 | interprocess_init_semaphore(int id, int count, int flags, int *semid) |
140 | { |
141 | key_t key; |
142 | ftok_enhanced(id, &key); |
143 | *semid = semget(key, count, flags | 0666); |
144 | if (*semid < 0) { |
145 | GDKsyserror("semget failed" ); |
146 | return GDK_FAIL; |
147 | } |
148 | return GDK_SUCCEED; |
149 | } |
150 | |
151 | //! Create an interprocess semaphore |
152 | /* id: identifier (obtain from GDKuniqueid) |
153 | * count: amount of semaphores |
154 | * semid: identifier of the created semaphore (only set if function returns GDK_SUCCEED) |
155 | */ |
156 | gdk_return |
157 | GDKcreatesem(int id, int count, int *semid) |
158 | { |
159 | return interprocess_init_semaphore(id, count, IPC_CREAT, semid); |
160 | } |
161 | |
162 | //! Get an interprocess semaphore that was already created using GDKcreatesem |
163 | /* id: identifier (obtain from GDKuniqueid) |
164 | * count: amount of semaphores |
165 | * semid: identifier of the semaphore (only set if function returns GDK_SUCCEED) |
166 | */ |
167 | gdk_return |
168 | GDKgetsem(int id, int count, int *semid) |
169 | { |
170 | return interprocess_init_semaphore(id, count, 0, semid); |
171 | } |
172 | |
173 | //! Gets the value of an interprocess semaphore |
174 | /* sem_id: semaphore identifier (obtained from GDKcreatesem or GDKgetsem) |
175 | * number: the semaphore number (must be less than 'count' given when creating the semaphore) |
176 | * semval: the value of the semaphore (only set if function returns GDK_SUCCEED) |
177 | */ |
178 | gdk_return |
179 | GDKgetsemval(int sem_id, int number, int *semval) |
180 | { |
181 | *semval = semctl(sem_id, number, GETVAL, 0); |
182 | if (*semval < 0) { |
183 | GDKsyserror("semctl failed" ); |
184 | return GDK_FAIL; |
185 | } |
186 | return GDK_SUCCEED; |
187 | } |
188 | |
189 | //! Change the value of an interprocess semaphore |
190 | /* sem_id: semaphore identifier (obtained from GDKcreatesem or GDKgetsem) |
191 | * number: the semaphore number (must be less than 'count' given when creating the semaphore) |
192 | * change: The change to apply to the semaphore value |
193 | */ |
194 | gdk_return |
195 | GDKchangesemval(int sem_id, int number, int change) |
196 | { |
197 | struct sembuf buffer; |
198 | buffer.sem_num = number; |
199 | buffer.sem_op = change; |
200 | buffer.sem_flg = 0; |
201 | |
202 | if (semop(sem_id, &buffer, 1) < 0) { |
203 | GDKsyserror("semop failed" ); |
204 | return GDK_FAIL; |
205 | } |
206 | return GDK_SUCCEED; |
207 | } |
208 | |
209 | //! Change the value of an interprocess semaphore with a timeout |
210 | /* sem_id: semaphore identifier (obtained from GDKcreatesem or GDKgetsem) |
211 | * number: the semaphore number (must be less than 'count' given when creating the semaphore) |
212 | * change: The change to apply to the semaphore value |
213 | * timeout_mseconds: The timeout in milliseconds |
214 | * succeed: Set to true if the value was successfully changed, or false if the timeout was reached |
215 | */ |
216 | gdk_return |
217 | GDKchangesemval_timeout(int sem_id, int number, int change, int timeout_mseconds, bool *succeed) |
218 | { |
219 | #ifdef HAVE_SEMTIMEDOP |
220 | // Some linux installations don't have semtimedop |
221 | // The easiest solution is to just call semop instead |
222 | // The only reason we use semtimedop is to prevent deadlocks when there are segfaults in a subprocess, which really shouldn't happen anyway |
223 | // So having semtimedop is not vital to the functioning of the program |
224 | struct timespec timeout; |
225 | struct sembuf buffer; |
226 | buffer.sem_num = number; |
227 | buffer.sem_op = change; |
228 | buffer.sem_flg = 0; |
229 | *succeed = false; |
230 | |
231 | timeout.tv_sec = (timeout_mseconds / 1000); |
232 | timeout.tv_nsec = (timeout_mseconds % 1000) * 1000; |
233 | |
234 | if (semtimedop(sem_id, &buffer, 1, &timeout) < 0) { |
235 | if (errno == EAGAIN || errno == EINTR) { |
236 | // operation timed out; not an error |
237 | errno = 0; |
238 | return GDK_SUCCEED; |
239 | } else { |
240 | GDKsyserror("semtimedop failed" ); |
241 | return GDK_FAIL; |
242 | } |
243 | } |
244 | *succeed = true; |
245 | return GDK_SUCCEED; |
246 | #else |
247 | (void) timeout_mseconds; |
248 | *succeed = true; |
249 | return GDKchangesemval(sem_id, number, change); |
250 | #endif |
251 | } |
252 | |
253 | //! Destroy an interprocess semaphore |
254 | /* sem_id: semaphore identifier (obtained from GDKcreatesem or GDKgetsem) |
255 | */ |
256 | gdk_return |
257 | GDKreleasesem(int sem_id) |
258 | { |
259 | if (semctl(sem_id, 0, IPC_RMID) < 0) { |
260 | GDKsyserror("semctl failed" ); |
261 | return GDK_FAIL; |
262 | } |
263 | return GDK_SUCCEED; |
264 | } |
265 | |
266 | // align to 8 bytes |
267 | #define align(sz) ((sz + 7) & ~7) |
268 | |
269 | size_t |
270 | GDKbatcopysize(BAT *bat, str colname) |
271 | { |
272 | size_t size = 0; |
273 | |
274 | size += align(strlen(colname) + 1); //[COLNAME] |
275 | size += align(sizeof(BAT)); //[BAT] |
276 | size += align(bat->twidth * BATcount(bat)); //[DATA] |
277 | |
278 | if (bat->tvheap != NULL) { |
279 | size += align(sizeof(Heap)); //[VHEAP] |
280 | size += align(bat->tvheap->size); //[VHEAPDATA] |
281 | } |
282 | return size; |
283 | } |
284 | |
285 | size_t |
286 | GDKbatcopy(char *dest, BAT *bat, str colname) |
287 | { |
288 | size_t batsize = bat->twidth * BATcount(bat); |
289 | size_t position = 0; |
290 | |
291 | //[COLNAME] |
292 | memcpy(dest + position, colname, strlen(colname) + 1); |
293 | position += align(strlen(colname) + 1); |
294 | //[BAT] |
295 | memcpy(dest + position, bat, sizeof(BAT)); |
296 | position += align(sizeof(BAT)); |
297 | //[DATA] |
298 | memcpy(dest + position, Tloc(bat, 0), batsize); |
299 | position += align(batsize); |
300 | if (bat->tvheap != NULL) { |
301 | //[VHEAP] |
302 | memcpy(dest + position, bat->tvheap, sizeof(Heap)); |
303 | position += align(sizeof(Heap)); |
304 | //[VHEAPDATA] |
305 | memcpy(dest + position, bat->tvheap->base, bat->tvheap->size); |
306 | position += align(bat->tvheap->size); |
307 | } |
308 | return position; |
309 | } |
310 | |
311 | size_t |
312 | GDKbatread(char *src, BAT **bat, str *colname) |
313 | { |
314 | size_t position = 0; |
315 | BAT *b; |
316 | //load the data for this column from shared memory |
317 | //[COLNAME] |
318 | *colname = src + position; |
319 | position += align(strlen(*colname) + 1); |
320 | //[BAT] |
321 | b = (BAT *) (src + position); |
322 | position += align(sizeof(BAT)); |
323 | //[DATA] |
324 | b->theap.base = (void *) (src + position); |
325 | position += align(b->twidth * BATcount(b)); |
326 | if (b->tvheap != NULL) { |
327 | //[VHEAP] |
328 | b->tvheap = (Heap *) (src + position); |
329 | position += align(sizeof(Heap)); |
330 | //[VHEAPDATA] |
331 | b->tvheap->base = (void *) (src + position); |
332 | position += align(b->tvheap->size); |
333 | } |
334 | *bat = b; |
335 | return position; |
336 | } |
337 | |
338 | #endif |
339 | |