1 | /* |
2 | * aggr.c |
3 | * |
4 | * Copyright (C) 2014-2015 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | #include "base/aggr.h" |
24 | |
25 | #include <stdbool.h> |
26 | #include <stdint.h> |
27 | #include <stddef.h> |
28 | #include <string.h> |
29 | |
30 | |
31 | #include "aerospike/as_val.h" |
32 | #include "aerospike/mod_lua.h" |
33 | #include "citrusleaf/cf_ll.h" |
34 | |
35 | #include "fault.h" |
36 | |
37 | #include "base/datamodel.h" |
38 | #include "base/proto.h" |
39 | #include "base/transaction.h" |
40 | #include "base/udf_arglist.h" |
41 | #include "base/udf_record.h" |
42 | #include "fabric/partition.h" |
43 | |
44 | |
45 | #define AS_AGGR_ERR -1 |
46 | #define AS_AGGR_OK 0 |
47 | |
48 | /* |
49 | * Aggregation Stream Object |
50 | */ |
51 | // ************************************************************************************************** |
52 | typedef struct { |
53 | // Iteration |
54 | cf_ll_iterator * iter; |
55 | as_index_keys_arr * keys_arr; |
56 | int keys_arr_offset; |
57 | |
58 | // Record |
59 | bool rec_open; // Record in stream open |
60 | as_rec * urec; // UDF record cloak |
61 | as_namespace * ns; |
62 | as_partition_reservation * rsv; // Reservation Object |
63 | |
64 | // Module Data |
65 | as_aggr_call * call; // Aggregation info |
66 | void * udata; // Execution context |
67 | } aggr_state; |
68 | |
69 | static as_partition_reservation * |
70 | ptn_reserve(aggr_state *astate, uint32_t pid, as_partition_reservation *rsv) |
71 | { |
72 | as_aggr_call *call = astate->call; |
73 | if (call && call->aggr_hooks && call->aggr_hooks->ptn_reserve) { |
74 | return call->aggr_hooks->ptn_reserve(astate->udata, astate->ns, pid, rsv); |
75 | } |
76 | return NULL; |
77 | } |
78 | |
79 | static void |
80 | ptn_release(aggr_state *astate) |
81 | { |
82 | as_aggr_call *call = astate->call; |
83 | if (call && call->aggr_hooks && call->aggr_hooks->ptn_release) { |
84 | call->aggr_hooks->ptn_release(astate->udata, astate->rsv); |
85 | } |
86 | } |
87 | |
88 | #if 0 |
89 | // In case we ever need this hook... |
90 | static void |
91 | set_error(aggr_state *astate, int err) |
92 | { |
93 | as_aggr_call *call = astate->call; |
94 | if (call && call->aggr_hooks && call->aggr_hooks->set_error) { |
95 | call->aggr_hooks->set_error(astate->udata, err); |
96 | } |
97 | } |
98 | #endif // 0 |
99 | |
100 | static bool |
101 | pre_check(aggr_state *astate, void *skey) |
102 | { |
103 | as_aggr_call *call = astate->call; |
104 | if (call && call->aggr_hooks && call->aggr_hooks->pre_check) { |
105 | return call->aggr_hooks->pre_check(astate->udata, as_rec_source(astate->urec), skey); |
106 | } |
107 | return true; // if not defined pre_check succeeds |
108 | } |
109 | |
110 | static int |
111 | aopen(aggr_state *astate, const cf_digest *digest) |
112 | { |
113 | udf_record * urecord = as_rec_source(astate->urec); |
114 | as_transaction * tr = urecord->tr; |
115 | |
116 | int pid = as_partition_getid(digest); |
117 | urecord->keyd = *digest; |
118 | |
119 | astate->rsv = ptn_reserve(astate, pid, &tr->rsv); |
120 | if (!astate->rsv) { |
121 | cf_debug(AS_AGGR, "Reservation not done for partition %d" , pid); |
122 | return -1; |
123 | } |
124 | |
125 | // NB: Partial Initialization due to heaviness. Not everything needed |
126 | // TODO: Make such initialization Commodity |
127 | tr->rsv.ns = astate->rsv->ns; |
128 | tr->rsv.p = astate->rsv->p; |
129 | tr->rsv.tree = astate->rsv->tree; |
130 | tr->keyd = urecord->keyd; |
131 | |
132 | if (udf_record_open(urecord) == 0) { |
133 | astate->rec_open = true; |
134 | return 0; |
135 | } |
136 | ptn_release(astate); |
137 | return -1; |
138 | } |
139 | |
140 | void |
141 | aclose(aggr_state *astate) |
142 | { |
143 | // Bypassing doing the direct destroy because we need to |
144 | // avoid reducing the ref count. This rec (query_record |
145 | // implementation of as_rec) is ref counted when passed from |
146 | // here to Lua. If Lua access it even after moving to next |
147 | // element in the stream it does it at its own risk. Record |
148 | // may have changed under the hood. |
149 | if (astate->rec_open) { |
150 | udf_record_close(as_rec_source(astate->urec)); |
151 | ptn_release(astate); |
152 | astate->rec_open = false; |
153 | } |
154 | return; |
155 | } |
156 | |
157 | void |
158 | acleanup(aggr_state *astate) |
159 | { |
160 | if (astate->iter) { |
161 | cf_ll_releaseIterator(astate->iter); |
162 | astate->iter = NULL; |
163 | } |
164 | aclose(astate); |
165 | |
166 | as_rec_destroy(astate->urec); |
167 | } |
168 | |
169 | // ************************************************************************************************** |
170 | |
171 | /* |
172 | * Aggregation Input Stream |
173 | */ |
174 | // ************************************************************************************************** |
175 | cf_digest * |
176 | get_next(aggr_state *astate) |
177 | { |
178 | astate->keys_arr_offset++; |
179 | if (!astate->keys_arr || (astate->keys_arr_offset == astate->keys_arr->num)) { |
180 | |
181 | cf_ll_element * ele = cf_ll_getNext(astate->iter); |
182 | |
183 | // if NULL or number of element 0. No holes expected |
184 | if (!ele) { |
185 | return NULL; |
186 | } |
187 | |
188 | astate->keys_arr = ((as_index_keys_ll_element*)ele)->keys_arr; |
189 | if (!astate->keys_arr || (astate->keys_arr->num < 1)) { |
190 | astate->keys_arr = NULL; |
191 | return NULL; |
192 | } |
193 | |
194 | astate->keys_arr_offset = 0; |
195 | } |
196 | return &astate->keys_arr->pindex_digs[astate->keys_arr_offset]; |
197 | } |
198 | |
199 | // only operates on the record as_val in the stream points to |
200 | // and updates the references ... this function has to acquire |
201 | // partition reservation and also the object lock. So if the UDF |
202 | // does something stupid the object lock is gonna get held for |
203 | // a while ... there has to be timeout mechanism in here I think |
204 | static as_val * |
205 | istream_read(const as_stream *s) |
206 | { |
207 | aggr_state *astate = as_stream_source(s); |
208 | |
209 | aclose(astate); |
210 | |
211 | // Iterate through stream to get next digest and |
212 | // populate record with it |
213 | while (!astate->rec_open) { |
214 | |
215 | if (get_next(astate) == NULL) { |
216 | return NULL; |
217 | } |
218 | |
219 | if (!aopen(astate, &astate->keys_arr->pindex_digs[astate->keys_arr_offset])) { |
220 | if (!pre_check(astate, &astate->keys_arr->sindex_keys[astate->keys_arr_offset])) { |
221 | aclose(astate); |
222 | } |
223 | } |
224 | } |
225 | return (as_val *)astate->urec; |
226 | } |
227 | |
228 | const as_stream_hooks istream_hooks = { |
229 | .destroy = NULL, |
230 | .read = istream_read, |
231 | .write = NULL |
232 | }; |
233 | // ************************************************************************************************** |
234 | |
235 | |
236 | |
237 | /* |
238 | * Aggregation Output Stream |
239 | */ |
240 | // ************************************************************************************************** |
241 | as_stream_status |
242 | ostream_write(const as_stream *s, as_val *val) |
243 | { |
244 | aggr_state *astate = (aggr_state *)as_stream_source(s); |
245 | return astate->call->aggr_hooks->ostream_write(astate->udata, val); |
246 | } |
247 | |
248 | const as_stream_hooks ostream_hooks = { |
249 | .destroy = NULL, |
250 | .read = NULL, |
251 | .write = ostream_write |
252 | }; |
253 | // ************************************************************************************************** |
254 | |
255 | |
256 | /* |
257 | * Aggregation AS_AEROSPIKE interface for LUA |
258 | */ |
259 | // ************************************************************************************************** |
260 | static int |
261 | as_aggr_aerospike_log(const as_aerospike * a, const char * file, const int line, const int lvl, const char * msg) |
262 | { |
263 | cf_fault_event(AS_AGGR, lvl, file, line, "%s" , (char *) msg); |
264 | return 0; |
265 | } |
266 | |
267 | static const as_aerospike_hooks as_aggr_aerospike_hooks = { |
268 | .rec_update = NULL, |
269 | .rec_remove = NULL, |
270 | .rec_exists = NULL, |
271 | .log = as_aggr_aerospike_log, |
272 | .get_current_time = NULL, |
273 | .destroy = NULL |
274 | }; |
275 | // ************************************************************************************************** |
276 | |
277 | |
278 | |
279 | int |
280 | as_aggr_process(as_namespace *ns, as_aggr_call * ag_call, cf_ll * ap_recl, void * udata, as_result * ap_res) |
281 | { |
282 | as_index_ref r_ref; |
283 | as_storage_rd rd; |
284 | bzero(&rd, sizeof(as_storage_rd)); |
285 | as_transaction tr; |
286 | |
287 | |
288 | udf_record urecord; |
289 | udf_record_init(&urecord, false); |
290 | urecord.tr = &tr; |
291 | urecord.r_ref = &r_ref; |
292 | urecord.rd = &rd; |
293 | as_rec * urec = as_rec_new(&urecord, &udf_record_hooks); |
294 | |
295 | aggr_state astate = { |
296 | .iter = cf_ll_getIterator(ap_recl, true /*forward*/), |
297 | .urec = urec, |
298 | .keys_arr = NULL, |
299 | .keys_arr_offset = 0, |
300 | .call = ag_call, |
301 | .udata = udata, |
302 | .rec_open = false, |
303 | .rsv = &tr.rsv, |
304 | .ns = ns |
305 | }; |
306 | |
307 | if (!astate.iter) { |
308 | cf_warning (AS_AGGR, "Could not set up iterator .. possibly out of memory .. Aborting Query !!" ); |
309 | as_rec_destroy(urec); |
310 | return AS_AGGR_ERR; |
311 | } |
312 | |
313 | as_aerospike as; |
314 | as_aerospike_init(&as, NULL, &as_aggr_aerospike_hooks); |
315 | |
316 | // Input Stream |
317 | as_stream istream; |
318 | as_stream_init(&istream, &astate, &istream_hooks); |
319 | |
320 | // Output stream |
321 | as_stream ostream; |
322 | as_stream_init(&ostream, &astate, &ostream_hooks); |
323 | |
324 | as_udf_context ctx = { |
325 | .as = &as, |
326 | .timer = NULL, |
327 | .memtracker = NULL |
328 | }; |
329 | int ret = as_module_apply_stream(&mod_lua, &ctx, ag_call->def.filename, ag_call->def.function, &istream, ag_call->def.arglist, &ostream, ap_res); |
330 | |
331 | acleanup(&astate); |
332 | return ret; |
333 | } |
334 | |