1/*
2 Copyright (c) 2011, 2013 Monty Program Ab.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
16
17
18#ifndef MY_APC_STANDALONE
19
20#include "mariadb.h"
21#include "sql_class.h"
22
23#endif
24
25/* For standalone testing of APC system, see unittest/sql/my_apc-t.cc */
26
27/*
28 Initialize the target.
29
30 @note
31 Initialization must be done prior to enabling/disabling the target, or making
32 any call requests to it.
33 Initial state after initialization is 'disabled'.
34*/
35void Apc_target::init(mysql_mutex_t *target_mutex)
36{
37 DBUG_ASSERT(!enabled);
38 LOCK_thd_kill_ptr= target_mutex;
39#ifndef DBUG_OFF
40 n_calls_processed= 0;
41#endif
42}
43
44
45/* [internal] Put request qe into the request list */
46
47void Apc_target::enqueue_request(Call_request *qe)
48{
49 mysql_mutex_assert_owner(LOCK_thd_kill_ptr);
50 if (apc_calls)
51 {
52 Call_request *after= apc_calls->prev;
53 qe->next= apc_calls;
54 apc_calls->prev= qe;
55
56 qe->prev= after;
57 after->next= qe;
58 }
59 else
60 {
61 apc_calls= qe;
62 qe->next= qe->prev= qe;
63 }
64}
65
66
67/*
68 [internal] Remove request qe from the request queue.
69
70 The request is not necessarily first in the queue.
71*/
72
73void Apc_target::dequeue_request(Call_request *qe)
74{
75 mysql_mutex_assert_owner(LOCK_thd_kill_ptr);
76 if (apc_calls == qe)
77 {
78 if ((apc_calls= apc_calls->next) == qe)
79 {
80 apc_calls= NULL;
81 }
82 }
83
84 qe->prev->next= qe->next;
85 qe->next->prev= qe->prev;
86}
87
88#ifdef HAVE_PSI_INTERFACE
89
90/* One key for all conds */
91PSI_cond_key key_show_explain_request_COND;
92
93static PSI_cond_info show_explain_psi_conds[]=
94{
95 { &key_show_explain_request_COND, "show_explain", 0 /* not using PSI_FLAG_GLOBAL*/ }
96};
97
98void init_show_explain_psi_keys(void)
99{
100 if (PSI_server == NULL)
101 return;
102
103 PSI_server->register_cond("sql", show_explain_psi_conds,
104 array_elements(show_explain_psi_conds));
105}
106#endif
107
108
109/*
110 Make an APC (Async Procedure Call) to another thread.
111
112 @detail
113 Make an APC call: schedule it for execution and wait until the target
114 thread has executed it.
115
116 - The caller is responsible for making sure he's not posting request
117 to the thread he's calling this function from.
118
119 - The caller must have locked target_mutex. The function will release it.
120
121 @retval FALSE - Ok, the call has been made
122 @retval TRUE - Call wasnt made (either the target is in disabled state or
123 timeout occurred)
124*/
125
126bool Apc_target::make_apc_call(THD *caller_thd, Apc_call *call,
127 int timeout_sec, bool *timed_out)
128{
129 bool res= TRUE;
130 *timed_out= FALSE;
131
132 if (enabled)
133 {
134 /* Create and post the request */
135 Call_request apc_request;
136 apc_request.call= call;
137 apc_request.processed= FALSE;
138 mysql_cond_init(key_show_explain_request_COND, &apc_request.COND_request,
139 NULL);
140 enqueue_request(&apc_request);
141 apc_request.what="enqueued by make_apc_call";
142
143 struct timespec abstime;
144 const int timeout= timeout_sec;
145 set_timespec(abstime, timeout);
146
147 int wait_res= 0;
148 PSI_stage_info old_stage;
149 caller_thd->ENTER_COND(&apc_request.COND_request, LOCK_thd_kill_ptr,
150 &stage_show_explain, &old_stage);
151 /* todo: how about processing other errors here? */
152 while (!apc_request.processed && (wait_res != ETIMEDOUT))
153 {
154 /* We own LOCK_thd_kill_ptr */
155 wait_res= mysql_cond_timedwait(&apc_request.COND_request,
156 LOCK_thd_kill_ptr, &abstime);
157 // &apc_request.LOCK_request, &abstime);
158 if (caller_thd->killed)
159 break;
160 }
161
162 if (!apc_request.processed)
163 {
164 /*
165 The wait has timed out, or this thread was KILLed.
166 Remove the request from the queue (ok to do because we own
167 LOCK_thd_kill_ptr)
168 */
169 apc_request.processed= TRUE;
170 dequeue_request(&apc_request);
171 *timed_out= TRUE;
172 res= TRUE;
173 }
174 else
175 {
176 /* Request was successfully executed and dequeued by the target thread */
177 res= FALSE;
178 }
179 /*
180 exit_cond() will call mysql_mutex_unlock(LOCK_thd_kill_ptr) for us:
181 */
182 caller_thd->EXIT_COND(&old_stage);
183
184 /* Destroy all APC request data */
185 mysql_cond_destroy(&apc_request.COND_request);
186 }
187 else
188 {
189 mysql_mutex_unlock(LOCK_thd_kill_ptr);
190 }
191 return res;
192}
193
194
195/*
196 Process all APC requests.
197 This should be called periodically by the APC target thread.
198*/
199
200void Apc_target::process_apc_requests()
201{
202 while (1)
203 {
204 Call_request *request;
205
206 mysql_mutex_lock(LOCK_thd_kill_ptr);
207 if (!(request= get_first_in_queue()))
208 {
209 /* No requests in the queue */
210 mysql_mutex_unlock(LOCK_thd_kill_ptr);
211 break;
212 }
213
214 /*
215 Remove the request from the queue (we're holding queue lock so we can be
216 sure that request owner won't try to remove it)
217 */
218 request->what="dequeued by process_apc_requests";
219 dequeue_request(request);
220 request->processed= TRUE;
221
222 request->call->call_in_target_thread();
223 request->what="func called by process_apc_requests";
224
225#ifndef DBUG_OFF
226 n_calls_processed++;
227#endif
228 mysql_cond_signal(&request->COND_request);
229 mysql_mutex_unlock(LOCK_thd_kill_ptr);
230 }
231}
232
233