1/*
2 * cf_thread.c
3 *
4 * Copyright (C) 2018 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "cf_thread.h"
28
29#include <errno.h>
30#include <execinfo.h>
31#include <pthread.h>
32#include <signal.h>
33#include <stdint.h>
34#include <sys/syscall.h>
35#include <sys/types.h>
36#include <unistd.h>
37
38#include "cf_mutex.h"
39#include "dynbuf.h"
40#include "fault.h"
41
42#include "citrusleaf/alloc.h"
43#include "citrusleaf/cf_ll.h"
44
45#include "warnings.h"
46
47
48//==========================================================
49// Typedefs & constants.
50//
51
52typedef struct thread_info_s {
53 cf_ll_element link; // base object must be first
54 cf_thread_run_fn run;
55 void* udata;
56 pid_t sys_tid;
57} thread_info;
58
59
60//==========================================================
61// Globals.
62//
63
64__thread pid_t g_sys_tid = 0;
65
66static pthread_attr_t g_attr_detached;
67
68static cf_ll g_thread_list;
69static __thread thread_info* g_thread_info;
70
71static cf_mutex g_trace_lock = CF_MUTEX_INIT;
72static cf_dyn_buf* g_trace_db;
73static volatile uint32_t g_traces_pending;
74static volatile uint32_t g_traces_done;
75
76
77//==========================================================
78// Forward declarations.
79//
80
81static thread_info* make_thread_info(cf_thread_run_fn run, void* udata);
82static void register_thread_info(void* udata);
83static void deregister_thread_info(void);
84static void* detached_shim_fn(void* udata);
85static void* joinable_shim_fn(void* udata);
86static int32_t traces_cb(cf_ll_element* ele, void* udata);
87
88
89//==========================================================
90// Public API.
91//
92
93void
94cf_thread_init(void)
95{
96 // TODO - check system thread limit and warn if too low?
97
98 pthread_attr_init(&g_attr_detached);
99 pthread_attr_setdetachstate(&g_attr_detached, PTHREAD_CREATE_DETACHED);
100
101 cf_ll_init(&g_thread_list, NULL, true);
102}
103
104cf_tid
105cf_thread_create_detached(cf_thread_run_fn run, void* udata)
106{
107 cf_assert(g_alloc_started, CF_MISC, "started thread too early");
108
109 thread_info* info = make_thread_info(run, udata);
110 pthread_t tid;
111 int result = pthread_create(&tid, &g_attr_detached, detached_shim_fn, info);
112
113 if (result != 0) {
114 // Non-zero return values are errno values.
115 cf_crash(CF_MISC, "failed to create detached thread: %d (%s)", result,
116 cf_strerror(result));
117 }
118
119 return (cf_tid)tid;
120}
121
122cf_tid
123cf_thread_create_joinable(cf_thread_run_fn run, void* udata)
124{
125 cf_assert(g_alloc_started, CF_MISC, "started thread too early");
126
127 thread_info* info = make_thread_info(run, udata);
128 pthread_t tid;
129 int result = pthread_create(&tid, NULL, joinable_shim_fn, info);
130
131 if (result != 0) {
132 // Non-zero return values are errno values.
133 cf_crash(CF_MISC, "failed to create joinable thread: %d (%s)", result,
134 cf_strerror(result));
135 }
136
137 return (cf_tid)tid;
138}
139
140int32_t
141cf_thread_traces(char* key, cf_dyn_buf* db)
142{
143 (void)key;
144
145 g_trace_db = db;
146 g_traces_pending = 0;
147 g_traces_done = 0;
148
149 cf_ll_reduce(&g_thread_list, true, traces_cb, NULL);
150
151 // Quit after 15 seconds - may not get all done if a thread exits after
152 // we signal it but before its action is handled.
153 for (uint32_t i = 0; i < 1500; i++) {
154 if (g_traces_done == g_traces_pending) {
155 break;
156 }
157
158 usleep(10 * 1000);
159 }
160
161 cf_dyn_buf_chomp(db);
162 g_trace_db = NULL;
163
164 return 0;
165}
166
167void
168cf_thread_traces_action(int32_t sig_num, siginfo_t* info, void* ctx)
169{
170 (void)sig_num;
171 (void)info;
172 (void)ctx;
173
174 cf_mutex_lock(&g_trace_lock);
175
176 cf_dyn_buf_append_format(g_trace_db, "---------- %d (0x%lx) ----------;",
177 g_thread_info->sys_tid, cf_fault_strip_aslr(g_thread_info->run));
178
179 void* addrs[50];
180 int32_t n_addrs = backtrace(addrs, 50);
181 char** syms = backtrace_symbols(addrs, n_addrs);
182
183 if (syms == NULL) {
184 cf_dyn_buf_append_format(g_trace_db, "failed;");
185 g_traces_done++;
186 cf_mutex_unlock(&g_trace_lock);
187 return;
188 }
189
190 for (int32_t i = 0; i < n_addrs; i++) {
191 cf_dyn_buf_append_format(g_trace_db, "%s;", syms[i]);
192 }
193
194 g_traces_done++;
195
196 cf_mutex_unlock(&g_trace_lock);
197}
198
199
200//==========================================================
201// Local helpers.
202//
203
204static thread_info*
205make_thread_info(cf_thread_run_fn run, void* udata)
206{
207 thread_info* info = cf_calloc(1, sizeof(thread_info));
208
209 info->run = run;
210 info->udata = udata;
211
212 return info;
213}
214
215static void
216register_thread_info(void* udata)
217{
218 g_thread_info = (thread_info*)udata;
219 g_thread_info->sys_tid = cf_thread_sys_tid();
220
221 cf_ll_append(&g_thread_list, &g_thread_info->link);
222}
223
224static void
225deregister_thread_info(void)
226{
227 cf_ll_delete(&g_thread_list, &g_thread_info->link);
228 cf_free(g_thread_info);
229}
230
231static void*
232detached_shim_fn(void* udata)
233{
234 register_thread_info(udata);
235
236 void* rv = g_thread_info->run(g_thread_info->udata);
237
238 // Prevent crashes in glibc 2.24 for short-lived detached threads.
239 usleep(100 * 1000);
240
241 deregister_thread_info();
242
243 return rv;
244}
245
246static void*
247joinable_shim_fn(void* udata)
248{
249 register_thread_info(udata);
250
251 void* rv = g_thread_info->run(g_thread_info->udata);
252
253 deregister_thread_info();
254
255 return rv;
256}
257
258static int32_t
259traces_cb(cf_ll_element* ele, void* udata)
260{
261 (void)udata;
262
263 thread_info* info = (thread_info*)ele;
264
265 if (syscall(SYS_tgkill, getpid(), info->sys_tid, SIGUSR2) < 0) {
266 cf_warning(CF_MISC, "failed to signal thread %d: %d (%s)",
267 info->sys_tid, errno, cf_strerror(errno));
268 return 0;
269 }
270
271 g_traces_pending++;
272
273 return 0;
274}
275