1/*
2 * rw_request.c
3 *
4 * Copyright (C) 2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "transaction/rw_request.h"
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <string.h>
32
33#include "citrusleaf/alloc.h"
34#include "citrusleaf/cf_atomic.h"
35#include "citrusleaf/cf_digest.h"
36
37#include "cf_mutex.h"
38#include "dynbuf.h"
39#include "fault.h"
40
41#include "base/datamodel.h"
42#include "base/proto.h"
43#include "base/service.h"
44#include "base/transaction.h"
45#include "fabric/fabric.h"
46#include "fabric/partition.h"
47
48
49//==========================================================
50// Typedefs & constants.
51//
52
53typedef struct rw_wait_ele_s {
54 uint8_t tr_head[AS_TRANSACTION_HEAD_SIZE];
55 struct rw_wait_ele_s* next;
56} rw_wait_ele;
57
58
59//==========================================================
60// Globals.
61//
62
63static cf_atomic32 g_rw_tid = 0;
64
65
66//==========================================================
67// Public API.
68//
69
70rw_request*
71rw_request_create(cf_digest* keyd)
72{
73 rw_request* rw = cf_rc_alloc(sizeof(rw_request));
74
75 // as_transaction look-alike:
76 rw->msgp = NULL;
77 rw->msg_fields = 0;
78 rw->origin = 0;
79 rw->from_flags = 0;
80 rw->from.any = NULL;
81 rw->from_data.any = 0;
82 rw->keyd = *keyd;
83 rw->start_time = 0;
84 rw->benchmark_time = 0;
85
86 AS_PARTITION_RESERVATION_INIT(rw->rsv);
87
88 rw->end_time = 0;
89 rw->result_code = AS_OK;
90 rw->flags = 0;
91 rw->generation = 0;
92 rw->void_time = 0;
93 rw->last_update_time = 0;
94 // End of as_transaction look-alike.
95
96 cf_mutex_init(&rw->lock);
97
98 rw->wait_queue_head = NULL;
99 rw->wait_queue_tail = NULL;
100 rw->wait_queue_depth = 0;
101
102 rw->is_set_up = false;
103
104 rw->is_old_pickle = false;
105 rw->pickle = NULL;
106 rw->pickle_sz = 0;
107 rw->set_name = NULL;
108 rw->set_name_len = 0;
109 rw->key = NULL;
110 rw->key_size = 0;
111
112 rw->response_db.buf = NULL;
113 rw->response_db.is_stack = false;
114 rw->response_db.alloc_sz = 0;
115 rw->response_db.used_sz = 0;
116
117 rw->tid = cf_atomic32_incr(&g_rw_tid);
118 rw->dup_res_complete = false;
119 rw->repl_write_complete = false;
120 rw->repl_ping_complete = false;
121 rw->dup_res_cb = NULL;
122 rw->repl_write_cb = NULL;
123 rw->repl_ping_cb = NULL;
124 rw->timeout_cb = NULL;
125
126 rw->dest_msg = NULL;
127 rw->xmit_ms = 0;
128 rw->retry_interval_ms = 0;
129
130 rw->n_dest_nodes = 0;
131
132 rw->best_dup_msg = NULL;
133 rw->best_dup_result_code = AS_OK;
134 rw->best_dup_gen = 0;
135 rw->best_dup_lut = 0;
136
137 rw->tie_was_replicated = false;
138
139 rw->repl_start_us = 0;
140
141 return rw;
142}
143
144
145void
146rw_request_destroy(rw_request* rw)
147{
148 // Paranoia:
149 if (rw->from.any) {
150 cf_crash(AS_RW, "rw_request_destroy: origin %d has non-null 'from'",
151 rw->origin);
152 }
153
154 if (rw->msgp != NULL && ! SHARED_MSGP(rw)) {
155 cf_free(rw->msgp);
156 }
157
158 if (rw->pickle) {
159 cf_free(rw->pickle);
160 }
161
162 if (rw->key) {
163 cf_free(rw->key);
164 }
165
166 cf_dyn_buf_free(&rw->response_db);
167
168 if (rw->dest_msg) {
169 as_fabric_msg_put(rw->dest_msg);
170 }
171
172 if (rw->is_set_up) {
173 if (rw->best_dup_msg) {
174 as_fabric_msg_put(rw->best_dup_msg);
175 }
176
177 as_partition_release(&rw->rsv);
178 }
179
180 cf_mutex_destroy(&rw->lock);
181
182 rw_wait_ele* e = rw->wait_queue_head;
183
184 while (e) {
185 rw_wait_ele* next = e->next;
186 as_transaction* tr = (as_transaction*)e->tr_head;
187
188 tr->from_flags |= FROM_FLAG_RESTART;
189 as_service_enqueue_internal(tr);
190
191 cf_free(e);
192 e = next;
193 }
194}
195
196
197void
198rw_request_wait_q_push(rw_request* rw, as_transaction* tr)
199{
200 rw_wait_ele* e = cf_malloc(sizeof(rw_wait_ele));
201
202 as_transaction_copy_head((as_transaction*)e->tr_head, tr);
203 tr->from.any = NULL;
204 tr->msgp = NULL;
205
206 e->next = NULL;
207
208 if (rw->wait_queue_tail) {
209 rw->wait_queue_tail->next = e;
210 rw->wait_queue_tail = e;
211 }
212 else {
213 rw->wait_queue_head = e;
214 rw->wait_queue_tail = e;
215 }
216
217 rw->wait_queue_depth++;
218}
219
220
221void
222rw_request_wait_q_push_head(rw_request* rw, as_transaction* tr)
223{
224 rw_wait_ele* e = cf_malloc(sizeof(rw_wait_ele));
225 cf_assert(e, AS_RW, "alloc rw_wait_ele");
226
227 as_transaction_copy_head((as_transaction*)e->tr_head, tr);
228 tr->from.any = NULL;
229 tr->msgp = NULL;
230
231 e->next = rw->wait_queue_head;
232 rw->wait_queue_head = e;
233
234 if (! rw->wait_queue_tail) {
235 rw->wait_queue_tail = e;
236 }
237
238 rw->wait_queue_depth++;
239}
240