1 | /* |
2 | * thr_tsvc.c |
3 | * |
4 | * Copyright (C) 2008-2014 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | //========================================================== |
24 | // Includes. |
25 | // |
26 | |
27 | #include "base/thr_tsvc.h" |
28 | |
29 | #include <stdbool.h> |
30 | #include <stddef.h> |
31 | #include <stdint.h> |
32 | |
33 | #include "citrusleaf/alloc.h" |
34 | #include "citrusleaf/cf_atomic.h" |
35 | #include "citrusleaf/cf_clock.h" |
36 | #include "citrusleaf/cf_digest.h" |
37 | |
38 | #include "fault.h" |
39 | #include "node.h" |
40 | |
41 | #include "base/cfg.h" |
42 | #include "base/datamodel.h" |
43 | #include "base/proto.h" |
44 | #include "base/scan.h" |
45 | #include "base/secondary_index.h" |
46 | #include "base/security.h" |
47 | #include "base/stats.h" |
48 | #include "base/transaction.h" |
49 | #include "base/transaction_policy.h" |
50 | #include "base/xdr_serverside.h" |
51 | #include "fabric/partition.h" |
52 | #include "fabric/partition_balance.h" |
53 | #include "storage/storage.h" |
54 | #include "transaction/delete.h" |
55 | #include "transaction/proxy.h" |
56 | #include "transaction/re_replicate.h" |
57 | #include "transaction/read.h" |
58 | #include "transaction/udf.h" |
59 | #include "transaction/write.h" |
60 | |
61 | |
62 | //========================================================== |
63 | // Inlines & macros. |
64 | // |
65 | |
66 | static inline bool |
67 | should_security_check_data_op(const as_transaction *tr) |
68 | { |
69 | return tr->origin == FROM_CLIENT || tr->origin == FROM_BATCH; |
70 | } |
71 | |
72 | static inline as_sec_perm |
73 | scan_perm(const as_transaction *tr) |
74 | { |
75 | if (as_transaction_is_udf(tr)) { |
76 | return PERM_UDF_SCAN; |
77 | } |
78 | |
79 | return (tr->msgp->msg.info2 & AS_MSG_INFO2_WRITE) != 0 ? |
80 | PERM_OPS_SCAN : PERM_SCAN; |
81 | } |
82 | |
83 | static inline as_sec_perm |
84 | query_perm(const as_transaction *tr) |
85 | { |
86 | if (as_transaction_is_udf(tr)) { |
87 | return PERM_UDF_QUERY; |
88 | } |
89 | |
90 | return (tr->msgp->msg.info2 & AS_MSG_INFO2_WRITE) != 0 ? |
91 | PERM_OPS_QUERY : PERM_QUERY; |
92 | } |
93 | |
94 | static inline const char* |
95 | write_type_tag(const as_transaction *tr) |
96 | { |
97 | return as_transaction_is_delete(tr) ? "delete" : |
98 | (as_transaction_is_udf(tr) ? "udf" : "write" ); |
99 | } |
100 | |
101 | static inline void |
102 | detail_unique_client_rw(const as_transaction *tr, bool is_write) |
103 | { |
104 | if (tr->origin == FROM_CLIENT) { |
105 | cf_detail_digest(AS_RW_CLIENT, &tr->keyd, "{%s} client %s %s " , |
106 | tr->rsv.ns->name, tr->from.proto_fd_h->client, |
107 | is_write ? write_type_tag(tr) : "read" ); |
108 | } |
109 | } |
110 | |
111 | |
112 | //========================================================== |
113 | // Public API. |
114 | // |
115 | |
116 | // Handle the transaction, including proxy to another node if necessary. |
117 | void |
118 | as_tsvc_process_transaction(as_transaction *tr) |
119 | { |
120 | if (tr->msgp->proto.type == PROTO_TYPE_INTERNAL_XDR) { |
121 | as_xdr_read_txn(tr); |
122 | return; |
123 | } |
124 | |
125 | int rv; |
126 | bool free_msgp = true; |
127 | cl_msg *msgp = tr->msgp; |
128 | as_msg *m = &msgp->msg; |
129 | |
130 | as_transaction_init_body(tr); |
131 | |
132 | // Check that the socket is authenticated. |
133 | if (tr->origin == FROM_CLIENT) { |
134 | uint8_t result = as_security_check(tr->from.proto_fd_h, PERM_NONE); |
135 | |
136 | if (result != AS_OK) { |
137 | as_security_log(tr->from.proto_fd_h, result, PERM_NONE, NULL, NULL); |
138 | as_transaction_error(tr, NULL, (uint32_t)result); |
139 | goto Cleanup; |
140 | } |
141 | } |
142 | |
143 | // All transactions must have a namespace. |
144 | as_msg_field *nf = as_msg_field_get(m, AS_MSG_FIELD_TYPE_NAMESPACE); |
145 | |
146 | if (! nf) { |
147 | cf_warning(AS_TSVC, "no namespace in protocol request" ); |
148 | as_transaction_error(tr, NULL, AS_ERR_NAMESPACE); |
149 | goto Cleanup; |
150 | } |
151 | |
152 | as_namespace *ns = as_namespace_get_bymsgfield(nf); |
153 | |
154 | if (! ns) { |
155 | uint32_t ns_sz = as_msg_field_get_value_sz(nf); |
156 | CF_ZSTR_DEFINE(ns_name, AS_ID_NAMESPACE_SZ, nf->data, ns_sz); |
157 | |
158 | cf_warning(AS_TSVC, "unknown namespace %s (%u) in protocol request - check configuration file" , |
159 | ns_name, ns_sz); |
160 | |
161 | as_transaction_error(tr, NULL, AS_ERR_NAMESPACE); |
162 | goto Cleanup; |
163 | } |
164 | |
165 | // Have we finished the very first partition balance? |
166 | if (! as_partition_balance_is_init_resolved()) { |
167 | if (tr->origin == FROM_PROXY) { |
168 | as_proxy_return_to_sender(tr, ns); |
169 | tr->from.proxy_node = 0; // pattern, not needed |
170 | } |
171 | else { |
172 | cf_debug(AS_TSVC, "rejecting transaction - initial partition balance unresolved" ); |
173 | as_transaction_error(tr, NULL, AS_ERR_UNAVAILABLE); |
174 | // Note that we forfeited namespace info above so scan & query don't |
175 | // get counted as single-record error. |
176 | } |
177 | |
178 | goto Cleanup; |
179 | } |
180 | |
181 | //------------------------------------------------------ |
182 | // Multi-record transaction. |
183 | // |
184 | |
185 | if (as_transaction_is_multi_record(tr)) { |
186 | if (m->transaction_ttl != 0) { |
187 | // Queries may specify transaction_ttl, but don't use |
188 | // g_config.transaction_max_ns as a default. Assuming specified TTL |
189 | // is large enough that it's not worth checking for timeout here. |
190 | tr->end_time = tr->start_time + |
191 | ((uint64_t)m->transaction_ttl * 1000000); |
192 | } |
193 | |
194 | if (as_transaction_is_batch_direct(tr)) { |
195 | // Old batch - deprecated. |
196 | as_multi_rec_transaction_error(tr, AS_ERR_UNSUPPORTED_FEATURE); |
197 | } |
198 | else if (as_transaction_is_query(tr)) { |
199 | // Query. |
200 | cf_atomic64_incr(&ns->query_reqs); |
201 | |
202 | if (! as_security_check_data_op(tr, ns, query_perm(tr))) { |
203 | as_multi_rec_transaction_error(tr, tr->result_code); |
204 | goto Cleanup; |
205 | } |
206 | |
207 | if (as_query(tr, ns) != 0) { |
208 | cf_atomic64_incr(&ns->query_fail); |
209 | as_multi_rec_transaction_error(tr, tr->result_code); |
210 | } |
211 | } |
212 | else { |
213 | // Scan. |
214 | if (! as_security_check_data_op(tr, ns, scan_perm(tr))) { |
215 | as_multi_rec_transaction_error(tr, tr->result_code); |
216 | goto Cleanup; |
217 | } |
218 | |
219 | if ((rv = as_scan(tr, ns)) != 0) { |
220 | as_multi_rec_transaction_error(tr, rv); |
221 | } |
222 | } |
223 | |
224 | goto Cleanup; |
225 | } |
226 | |
227 | //------------------------------------------------------ |
228 | // Single-record transaction. |
229 | // |
230 | |
231 | // Calculate end_time based on message transaction TTL. May be recalculating |
232 | // for re-queued transactions, but nice if end_time not copied on/off queue. |
233 | if (m->transaction_ttl != 0) { |
234 | tr->end_time = tr->start_time + |
235 | ((uint64_t)m->transaction_ttl * 1000000); |
236 | } |
237 | else { |
238 | // Incorporate g_config.transaction_max_ns if appropriate. |
239 | // TODO - should g_config.transaction_max_ns = 0 be special? |
240 | tr->end_time = tr->start_time + g_config.transaction_max_ns; |
241 | } |
242 | |
243 | // Did the transaction time out while on the queue? |
244 | if (cf_getns() > tr->end_time) { |
245 | cf_debug(AS_TSVC, "transaction timed out in queue" ); |
246 | as_transaction_error(tr, ns, AS_ERR_TIMEOUT); |
247 | goto Cleanup; |
248 | } |
249 | |
250 | // All single-record transactions must have a digest, or a key from which |
251 | // to calculate it. |
252 | if (as_transaction_has_digest(tr)) { |
253 | // Modern client - just copy digest into tr. |
254 | |
255 | as_msg_field *df = as_msg_field_get(m, AS_MSG_FIELD_TYPE_DIGEST_RIPE); |
256 | uint32_t digest_sz = as_msg_field_get_value_sz(df); |
257 | |
258 | if (digest_sz != sizeof(cf_digest)) { |
259 | cf_warning(AS_TSVC, "digest msg field size %u" , digest_sz); |
260 | as_transaction_error(tr, ns, AS_ERR_PARAMETER); |
261 | goto Cleanup; |
262 | } |
263 | |
264 | tr->keyd = *(cf_digest *)df->data; |
265 | } |
266 | else if (as_transaction_has_key(tr)) { |
267 | // Old client - calculate digest from key & set, directly into tr. |
268 | |
269 | as_msg_field *kf = as_msg_field_get(m, AS_MSG_FIELD_TYPE_KEY); |
270 | uint32_t key_sz = as_msg_field_get_value_sz(kf); |
271 | |
272 | as_msg_field *sf = as_transaction_has_set(tr) ? |
273 | as_msg_field_get(m, AS_MSG_FIELD_TYPE_SET) : NULL; |
274 | uint32_t set_sz = sf ? as_msg_field_get_value_sz(sf) : 0; |
275 | |
276 | cf_digest_compute2(sf->data, set_sz, kf->data, key_sz, &tr->keyd); |
277 | } |
278 | // else - batch sub-transactions & all internal transactions have neither |
279 | // digest nor key in the message - digest is already in tr. |
280 | |
281 | // Process the transaction. |
282 | |
283 | bool is_write = (m->info2 & AS_MSG_INFO2_WRITE) != 0; |
284 | bool is_read = (m->info1 & AS_MSG_INFO1_READ) != 0; |
285 | // Both can be set together, but is_write puts us on the 'write path' - |
286 | // write reservation, replica writes, etc. Writes quickly get split into |
287 | // write, delete, or UDF after the reservation. |
288 | |
289 | uint32_t pid = as_partition_getid(&tr->keyd); |
290 | cf_node dest; |
291 | |
292 | if (is_write) { |
293 | if (should_security_check_data_op(tr) && |
294 | ! as_security_check_data_op(tr, ns, |
295 | PERM_WRITE | (is_read ? PERM_READ : 0))) { |
296 | as_transaction_error(tr, ns, tr->result_code); |
297 | goto Cleanup; |
298 | } |
299 | |
300 | rv = as_partition_reserve_write(ns, pid, &tr->rsv, &dest); |
301 | } |
302 | else if (is_read) { |
303 | if (should_security_check_data_op(tr) && |
304 | ! as_security_check_data_op(tr, ns, PERM_READ)) { |
305 | as_transaction_error(tr, ns, tr->result_code); |
306 | goto Cleanup; |
307 | } |
308 | |
309 | rv = as_partition_reserve_read_tr(ns, pid, tr, &dest); |
310 | } |
311 | else { |
312 | cf_warning(AS_TSVC, "transaction is neither read nor write - unexpected" ); |
313 | as_transaction_error(tr, ns, AS_ERR_PARAMETER); |
314 | goto Cleanup; |
315 | } |
316 | |
317 | if (rv == -2) { |
318 | // Partition is unavailable. |
319 | as_transaction_error(tr, ns, AS_ERR_UNAVAILABLE); |
320 | goto Cleanup; |
321 | } |
322 | |
323 | if (dest == 0) { |
324 | cf_crash(AS_TSVC, "invalid destination while reserving partition" ); |
325 | } |
326 | |
327 | if (rv == 0) { |
328 | // <><><><><><> Reservation Succeeded <><><><><><> |
329 | |
330 | if (! as_transaction_is_restart(tr)) { |
331 | tr->benchmark_time = 0; |
332 | detail_unique_client_rw(tr, is_write); |
333 | } |
334 | |
335 | transaction_status status; |
336 | |
337 | if (is_write) { |
338 | if (as_transaction_is_delete(tr)) { |
339 | status = as_delete_start(tr); |
340 | } |
341 | else if (tr->origin == FROM_IUDF || as_transaction_is_udf(tr)) { |
342 | status = as_udf_start(tr); |
343 | } |
344 | else if (tr->origin == FROM_RE_REPL) { |
345 | status = as_re_replicate_start(tr); |
346 | } |
347 | else { |
348 | status = as_write_start(tr); |
349 | } |
350 | } |
351 | else { |
352 | status = as_read_start(tr); |
353 | } |
354 | |
355 | switch (status) { |
356 | case TRANS_DONE_ERROR: |
357 | case TRANS_DONE_SUCCESS: |
358 | // Done, response already sent - free msg & release reservation. |
359 | as_partition_release(&tr->rsv); |
360 | break; |
361 | case TRANS_IN_PROGRESS: |
362 | // Don't free msg or release reservation - both owned by rw_request. |
363 | free_msgp = false; |
364 | break; |
365 | case TRANS_WAITING: |
366 | // Will be re-queued - don't free msg, but release reservation. |
367 | free_msgp = false; |
368 | as_partition_release(&tr->rsv); |
369 | break; |
370 | default: |
371 | cf_crash(AS_TSVC, "invalid transaction status %d" , status); |
372 | break; |
373 | } |
374 | } |
375 | else { |
376 | // <><><><><><> Reservation Failed <><><><><><> |
377 | |
378 | switch (tr->origin) { |
379 | case FROM_CLIENT: |
380 | case FROM_BATCH: |
381 | as_proxy_divert(dest, tr, ns); |
382 | // CLIENT: fabric owns msgp, BATCH: it's shared, don't free it. |
383 | free_msgp = false; |
384 | break; |
385 | case FROM_PROXY: |
386 | as_proxy_return_to_sender(tr, ns); |
387 | tr->from.proxy_node = 0; // pattern, not needed |
388 | break; |
389 | case FROM_IUDF: |
390 | tr->from.iudf_orig->cb(tr->from.iudf_orig->udata, AS_ERR_UNKNOWN); |
391 | tr->from.iudf_orig = NULL; // pattern, not needed |
392 | break; |
393 | case FROM_IOPS: |
394 | tr->from.iops_orig->cb(tr->from.iops_orig->udata, AS_ERR_UNKNOWN); |
395 | tr->from.iops_orig = NULL; // pattern, not needed |
396 | break; |
397 | case FROM_RE_REPL: |
398 | tr->from.re_repl_orig_cb(tr); |
399 | tr->from.re_repl_orig_cb = NULL; // pattern, not needed |
400 | break; |
401 | default: |
402 | cf_crash(AS_TSVC, "unexpected transaction origin %u" , tr->origin); |
403 | break; |
404 | } |
405 | } |
406 | |
407 | Cleanup: |
408 | |
409 | if (free_msgp && ! SHARED_MSGP(tr)) { |
410 | cf_free(msgp); |
411 | } |
412 | } // end process_transaction() |
413 | |