1/*
2 * rw_request_hash.c
3 *
4 * Copyright (C) 2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "transaction/rw_request_hash.h"
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <stdint.h>
32#include <string.h>
33#include <unistd.h>
34
35#include "citrusleaf/alloc.h"
36#include "citrusleaf/cf_atomic.h"
37#include "citrusleaf/cf_clock.h"
38
39#include "cf_mutex.h"
40#include "cf_thread.h"
41#include "fault.h"
42#include "msg.h"
43#include "node.h"
44#include "rchash.h"
45
46#include "base/cfg.h"
47#include "base/datamodel.h"
48#include "base/proto.h"
49#include "base/transaction.h"
50#include "base/transaction_policy.h"
51#include "fabric/fabric.h"
52#include "transaction/duplicate_resolve.h"
53#include "transaction/replica_ping.h"
54#include "transaction/replica_write.h"
55#include "transaction/rw_request.h"
56#include "transaction/rw_utils.h"
57
58
59//==========================================================
60// Typedefs & constants.
61//
62
63const msg_template rw_mt[] = {
64 { RW_FIELD_OP, M_FT_UINT32 },
65 { RW_FIELD_RESULT, M_FT_UINT32 },
66 { RW_FIELD_NAMESPACE, M_FT_BUF },
67 { RW_FIELD_NS_ID, M_FT_UINT32 },
68 { RW_FIELD_GENERATION, M_FT_UINT32 },
69 { RW_FIELD_DIGEST, M_FT_BUF },
70 { RW_FIELD_RECORD, M_FT_BUF },
71 { RW_FIELD_UNUSED_7, M_FT_BUF },
72 { RW_FIELD_CLUSTER_KEY, M_FT_UINT64 },
73 { RW_FIELD_OLD_RECORD, M_FT_BUF },
74 { RW_FIELD_TID, M_FT_UINT32 },
75 { RW_FIELD_VOID_TIME, M_FT_UINT32 },
76 { RW_FIELD_INFO, M_FT_UINT32 },
77 { RW_FIELD_UNUSED_13, M_FT_BUF },
78 { RW_FIELD_UNUSED_14, M_FT_BUF },
79 { RW_FIELD_UNUSED_15, M_FT_UINT64 },
80 { RW_FIELD_LAST_UPDATE_TIME, M_FT_UINT64 },
81 { RW_FIELD_SET_NAME, M_FT_BUF },
82 { RW_FIELD_KEY, M_FT_BUF },
83 { RW_FIELD_REGIME, M_FT_UINT32 }
84};
85
86COMPILER_ASSERT(sizeof(rw_mt) / sizeof(msg_template) == NUM_RW_FIELDS);
87
88#define RW_MSG_SCRATCH_SIZE 192
89
90
91//==========================================================
92// Globals.
93//
94
95static cf_rchash* g_rw_request_hash = NULL;
96
97
98//==========================================================
99// Forward declarations.
100//
101
102uint32_t rw_request_hash_fn(const void* value, uint32_t value_len);
103transaction_status handle_hot_key(rw_request* rw0, as_transaction* tr);
104
105void* run_retransmit(void* arg);
106int retransmit_reduce_fn(const void* key, uint32_t keylen, void* data, void* udata);
107void update_retransmit_stats(const rw_request* rw);
108
109int rw_msg_cb(cf_node id, msg* m, void* udata);
110
111
112//==========================================================
113// Public API.
114//
115
116void
117as_rw_init()
118{
119 g_rw_request_hash = cf_rchash_create(rw_request_hash_fn,
120 rw_request_hdestroy, sizeof(rw_request_hkey), 32 * 1024,
121 CF_RCHASH_MANY_LOCK);
122
123 cf_thread_create_detached(run_retransmit, NULL);
124
125 as_fabric_register_msg_fn(M_TYPE_RW, rw_mt, sizeof(rw_mt),
126 RW_MSG_SCRATCH_SIZE, rw_msg_cb, NULL);
127}
128
129
130uint32_t
131rw_request_hash_count()
132{
133 return cf_rchash_get_size(g_rw_request_hash);
134}
135
136
137transaction_status
138rw_request_hash_insert(rw_request_hkey* hkey, rw_request* rw,
139 as_transaction* tr)
140{
141 while (cf_rchash_put_unique(g_rw_request_hash, hkey, sizeof(*hkey), rw) !=
142 CF_RCHASH_OK) {
143 // rw_request with this digest already in hash - get it.
144
145 rw_request* rw0;
146
147 if (cf_rchash_get(g_rw_request_hash, hkey, sizeof(*hkey),
148 (void**)&rw0) != CF_RCHASH_OK) {
149 // But now it's gone - try insertion again immediately.
150 continue;
151 }
152 // else - got it - handle "hot key" scenario.
153
154 cf_mutex_lock(&rw0->lock);
155
156 transaction_status status = handle_hot_key(rw0, tr);
157
158 cf_mutex_unlock(&rw0->lock);
159 rw_request_release(rw0);
160
161 return status; // rw_request was not inserted in the hash
162 }
163
164 return TRANS_IN_PROGRESS; // rw_request was inserted in the hash
165}
166
167
168void
169rw_request_hash_delete(rw_request_hkey* hkey, rw_request* rw)
170{
171 cf_rchash_delete_object(g_rw_request_hash, hkey, sizeof(*hkey), rw);
172}
173
174
175rw_request*
176rw_request_hash_get(rw_request_hkey* hkey)
177{
178 rw_request* rw = NULL;
179
180 cf_rchash_get(g_rw_request_hash, hkey, sizeof(*hkey), (void**)&rw);
181
182 return rw;
183}
184
185
186// For debugging only.
187void
188rw_request_hash_dump()
189{
190 cf_info(AS_RW, "rw_request_hash dump not yet implemented");
191 // TODO - implement something, or deprecate.
192}
193
194
195//==========================================================
196// Local helpers - hash insertion.
197//
198
199uint32_t
200rw_request_hash_fn(const void* key, uint32_t key_size)
201{
202 rw_request_hkey* hkey = (rw_request_hkey*)key;
203
204 return *(uint32_t*)&hkey->keyd.digest[DIGEST_HASH_BASE_BYTE];
205}
206
207
208transaction_status
209handle_hot_key(rw_request* rw0, as_transaction* tr)
210{
211 as_namespace* ns = tr->rsv.ns;
212
213 if (tr->origin == FROM_RE_REPL) {
214 // Always put this transaction at the head of the original rw_request's
215 // queue - it will be retried (first) when the original is complete.
216 rw_request_wait_q_push_head(rw0, tr);
217
218 return TRANS_WAITING;
219 }
220 else if (ns->transaction_pending_limit != 0 &&
221 rw0->wait_queue_depth > ns->transaction_pending_limit) {
222 // If we're over the hot key pending limit, fail this transaction.
223 cf_detail_digest(AS_RW, &tr->keyd, "{%s} key busy ", ns->name);
224
225 cf_atomic64_incr(&ns->n_fail_key_busy);
226 tr->result_code = AS_ERR_KEY_BUSY;
227
228 return TRANS_DONE_ERROR;
229 }
230 else {
231 // Queue this transaction on the original rw_request - it will be
232 // retried when the original is complete.
233 rw_request_wait_q_push(rw0, tr);
234
235 return TRANS_WAITING;
236 }
237}
238
239
240//==========================================================
241// Local helpers - retransmit.
242//
243
244void*
245run_retransmit(void* arg)
246{
247 while (true) {
248 usleep(130 * 1000);
249
250 now_times now;
251
252 now.now_ns = cf_getns();
253 now.now_ms = now.now_ns / 1000000;
254
255 cf_rchash_reduce(g_rw_request_hash, retransmit_reduce_fn, &now);
256 }
257
258 return NULL;
259}
260
261
262int
263retransmit_reduce_fn(const void* key, uint32_t keylen, void* data, void* udata)
264{
265 rw_request* rw = data;
266 now_times* now = (now_times*)udata;
267
268 if (! rw->is_set_up) {
269 return 0;
270 }
271
272 if (now->now_ns > rw->end_time) {
273 cf_mutex_lock(&rw->lock);
274
275 rw->timeout_cb(rw);
276
277 cf_mutex_unlock(&rw->lock);
278
279 return CF_RCHASH_REDUCE_DELETE;
280 }
281
282 if (rw->xmit_ms < now->now_ms) {
283 cf_mutex_lock(&rw->lock);
284
285 if (rw->from.any) {
286 rw->xmit_ms = now->now_ms + rw->retry_interval_ms;
287 rw->retry_interval_ms *= 2;
288
289 send_rw_messages(rw);
290 update_retransmit_stats(rw);
291 }
292 // else - lost race against dup-res or repl-write callback.
293
294 cf_mutex_unlock(&rw->lock);
295 }
296
297 return 0;
298}
299
300
301void
302update_retransmit_stats(const rw_request* rw)
303{
304 as_namespace* ns = rw->rsv.ns;
305 as_msg* m = &rw->msgp->msg;
306 bool is_dup_res = rw->repl_write_cb == NULL;
307
308 // Note - only one retransmit thread, so no need for atomic increments.
309
310 switch (rw->origin) {
311 case FROM_PROXY:
312 if (rw_request_is_batch_sub(rw)) {
313 ns->n_retransmit_all_batch_sub_dup_res++;
314 break;
315 }
316 // No break.
317 case FROM_CLIENT: {
318 bool is_write = (m->info2 & AS_MSG_INFO2_WRITE) != 0;
319 bool is_delete = (m->info2 & AS_MSG_INFO2_DELETE) != 0;
320 bool is_udf = (rw->msg_fields & AS_MSG_FIELD_BIT_UDF_FILENAME) != 0;
321
322 if (is_dup_res) {
323 if (is_write) {
324 if (is_delete) {
325 ns->n_retransmit_all_delete_dup_res++;
326 }
327 else if (is_udf) {
328 ns->n_retransmit_all_udf_dup_res++;
329 }
330 else {
331 ns->n_retransmit_all_write_dup_res++;
332 }
333 }
334 else {
335 ns->n_retransmit_all_read_dup_res++;
336 }
337 }
338 else {
339 cf_assert(is_write, AS_RW, "read doing replica write");
340
341 if (is_delete) {
342 ns->n_retransmit_all_delete_repl_write++;
343 }
344 else if (is_udf) {
345 ns->n_retransmit_all_udf_repl_write++;
346 }
347 else {
348 ns->n_retransmit_all_write_repl_write++;
349 }
350 }
351 }
352 break;
353 case FROM_BATCH:
354 // For now batch sub transactions are read-only.
355 ns->n_retransmit_all_batch_sub_dup_res++;
356 break;
357 case FROM_IUDF:
358 if (is_dup_res) {
359 ns->n_retransmit_udf_sub_dup_res++;
360 }
361 else {
362 ns->n_retransmit_udf_sub_repl_write++;
363 }
364 break;
365 case FROM_IOPS:
366 if (is_dup_res) {
367 ns->n_retransmit_ops_sub_dup_res++;
368 }
369 else {
370 ns->n_retransmit_ops_sub_repl_write++;
371 }
372 break;
373 case FROM_RE_REPL:
374 // For now we don't report re-replication retransmit stats.
375 break;
376 default:
377 cf_crash(AS_RW, "unexpected transaction origin %u", rw->origin);
378 break;
379 }
380}
381
382
383//==========================================================
384// Local helpers - handle RW fabric messages.
385//
386
387int
388rw_msg_cb(cf_node id, msg* m, void* udata)
389{
390 uint32_t op;
391
392 if (msg_get_uint32(m, RW_FIELD_OP, &op) != 0) {
393 cf_warning(AS_RW, "got rw msg without op field");
394 as_fabric_msg_put(m);
395 return 0;
396 }
397
398 switch (op) {
399 //--------------------------------------------
400 // Duplicate resolution:
401 //
402 case RW_OP_DUP:
403 dup_res_handle_request(id, m);
404 break;
405 case RW_OP_DUP_ACK:
406 dup_res_handle_ack(id, m);
407 break;
408
409 //--------------------------------------------
410 // Replica writes:
411 //
412 case RW_OP_REPL_WRITE:
413 repl_write_handle_op(id, m);
414 break;
415 case RW_OP_WRITE:
416 repl_write_handle_old_op(id, m);
417 break;
418 case RW_OP_WRITE_ACK:
419 repl_write_handle_ack(id, m);
420 break;
421 case RW_OP_REPL_CONFIRM:
422 repl_write_handle_confirmation(m);
423 break;
424
425 //--------------------------------------------
426 // Replica pings:
427 //
428 case RW_OP_REPL_PING:
429 repl_ping_handle_op(id, m);
430 break;
431 case RW_OP_REPL_PING_ACK:
432 repl_ping_handle_ack(id, m);
433 break;
434
435 default:
436 cf_warning(AS_RW, "got rw msg with unrecognized op %u", op);
437 as_fabric_msg_put(m);
438 break;
439 }
440
441 return 0;
442}
443