1/*
2 * aggr.c
3 *
4 * Copyright (C) 2014-2015 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23#include "base/aggr.h"
24
25#include <stdbool.h>
26#include <stdint.h>
27#include <stddef.h>
28#include <string.h>
29
30
31#include "aerospike/as_val.h"
32#include "aerospike/mod_lua.h"
33#include "citrusleaf/cf_ll.h"
34
35#include "fault.h"
36
37#include "base/datamodel.h"
38#include "base/proto.h"
39#include "base/transaction.h"
40#include "base/udf_arglist.h"
41#include "base/udf_record.h"
42#include "fabric/partition.h"
43
44
45#define AS_AGGR_ERR -1
46#define AS_AGGR_OK 0
47
48/*
49 * Aggregation Stream Object
50 */
51// **************************************************************************************************
52typedef struct {
53 // Iteration
54 cf_ll_iterator * iter;
55 as_index_keys_arr * keys_arr;
56 int keys_arr_offset;
57
58 // Record
59 bool rec_open; // Record in stream open
60 as_rec * urec; // UDF record cloak
61 as_namespace * ns;
62 as_partition_reservation * rsv; // Reservation Object
63
64 // Module Data
65 as_aggr_call * call; // Aggregation info
66 void * udata; // Execution context
67} aggr_state;
68
69static as_partition_reservation *
70ptn_reserve(aggr_state *astate, uint32_t pid, as_partition_reservation *rsv)
71{
72 as_aggr_call *call = astate->call;
73 if (call && call->aggr_hooks && call->aggr_hooks->ptn_reserve) {
74 return call->aggr_hooks->ptn_reserve(astate->udata, astate->ns, pid, rsv);
75 }
76 return NULL;
77}
78
79static void
80ptn_release(aggr_state *astate)
81{
82 as_aggr_call *call = astate->call;
83 if (call && call->aggr_hooks && call->aggr_hooks->ptn_release) {
84 call->aggr_hooks->ptn_release(astate->udata, astate->rsv);
85 }
86}
87
88#if 0
89// In case we ever need this hook...
90static void
91set_error(aggr_state *astate, int err)
92{
93 as_aggr_call *call = astate->call;
94 if (call && call->aggr_hooks && call->aggr_hooks->set_error) {
95 call->aggr_hooks->set_error(astate->udata, err);
96 }
97}
98#endif // 0
99
100static bool
101pre_check(aggr_state *astate, void *skey)
102{
103 as_aggr_call *call = astate->call;
104 if (call && call->aggr_hooks && call->aggr_hooks->pre_check) {
105 return call->aggr_hooks->pre_check(astate->udata, as_rec_source(astate->urec), skey);
106 }
107 return true; // if not defined pre_check succeeds
108}
109
110static int
111aopen(aggr_state *astate, const cf_digest *digest)
112{
113 udf_record * urecord = as_rec_source(astate->urec);
114 as_transaction * tr = urecord->tr;
115
116 int pid = as_partition_getid(digest);
117 urecord->keyd = *digest;
118
119 astate->rsv = ptn_reserve(astate, pid, &tr->rsv);
120 if (!astate->rsv) {
121 cf_debug(AS_AGGR, "Reservation not done for partition %d", pid);
122 return -1;
123 }
124
125 // NB: Partial Initialization due to heaviness. Not everything needed
126 // TODO: Make such initialization Commodity
127 tr->rsv.ns = astate->rsv->ns;
128 tr->rsv.p = astate->rsv->p;
129 tr->rsv.tree = astate->rsv->tree;
130 tr->keyd = urecord->keyd;
131
132 if (udf_record_open(urecord) == 0) {
133 astate->rec_open = true;
134 return 0;
135 }
136 ptn_release(astate);
137 return -1;
138}
139
140void
141aclose(aggr_state *astate)
142{
143 // Bypassing doing the direct destroy because we need to
144 // avoid reducing the ref count. This rec (query_record
145 // implementation of as_rec) is ref counted when passed from
146 // here to Lua. If Lua access it even after moving to next
147 // element in the stream it does it at its own risk. Record
148 // may have changed under the hood.
149 if (astate->rec_open) {
150 udf_record_close(as_rec_source(astate->urec));
151 ptn_release(astate);
152 astate->rec_open = false;
153 }
154 return;
155}
156
157void
158acleanup(aggr_state *astate)
159{
160 if (astate->iter) {
161 cf_ll_releaseIterator(astate->iter);
162 astate->iter = NULL;
163 }
164 aclose(astate);
165
166 as_rec_destroy(astate->urec);
167}
168
169// **************************************************************************************************
170
171/*
172 * Aggregation Input Stream
173 */
174// **************************************************************************************************
175cf_digest *
176get_next(aggr_state *astate)
177{
178 astate->keys_arr_offset++;
179 if (!astate->keys_arr || (astate->keys_arr_offset == astate->keys_arr->num)) {
180
181 cf_ll_element * ele = cf_ll_getNext(astate->iter);
182
183 // if NULL or number of element 0. No holes expected
184 if (!ele) {
185 return NULL;
186 }
187
188 astate->keys_arr = ((as_index_keys_ll_element*)ele)->keys_arr;
189 if (!astate->keys_arr || (astate->keys_arr->num < 1)) {
190 astate->keys_arr = NULL;
191 return NULL;
192 }
193
194 astate->keys_arr_offset = 0;
195 }
196 return &astate->keys_arr->pindex_digs[astate->keys_arr_offset];
197}
198
199// only operates on the record as_val in the stream points to
200// and updates the references ... this function has to acquire
201// partition reservation and also the object lock. So if the UDF
202// does something stupid the object lock is gonna get held for
203// a while ... there has to be timeout mechanism in here I think
204static as_val *
205istream_read(const as_stream *s)
206{
207 aggr_state *astate = as_stream_source(s);
208
209 aclose(astate);
210
211 // Iterate through stream to get next digest and
212 // populate record with it
213 while (!astate->rec_open) {
214
215 if (get_next(astate) == NULL) {
216 return NULL;
217 }
218
219 if (!aopen(astate, &astate->keys_arr->pindex_digs[astate->keys_arr_offset])) {
220 if (!pre_check(astate, &astate->keys_arr->sindex_keys[astate->keys_arr_offset])) {
221 aclose(astate);
222 }
223 }
224 }
225 return (as_val *)astate->urec;
226}
227
228const as_stream_hooks istream_hooks = {
229 .destroy = NULL,
230 .read = istream_read,
231 .write = NULL
232};
233// **************************************************************************************************
234
235
236
237/*
238 * Aggregation Output Stream
239 */
240// **************************************************************************************************
241as_stream_status
242ostream_write(const as_stream *s, as_val *val)
243{
244 aggr_state *astate = (aggr_state *)as_stream_source(s);
245 return astate->call->aggr_hooks->ostream_write(astate->udata, val);
246}
247
248const as_stream_hooks ostream_hooks = {
249 .destroy = NULL,
250 .read = NULL,
251 .write = ostream_write
252};
253// **************************************************************************************************
254
255
256/*
257 * Aggregation AS_AEROSPIKE interface for LUA
258 */
259// **************************************************************************************************
260static int
261as_aggr_aerospike_log(const as_aerospike * a, const char * file, const int line, const int lvl, const char * msg)
262{
263 cf_fault_event(AS_AGGR, lvl, file, line, "%s", (char *) msg);
264 return 0;
265}
266
267static const as_aerospike_hooks as_aggr_aerospike_hooks = {
268 .rec_update = NULL,
269 .rec_remove = NULL,
270 .rec_exists = NULL,
271 .log = as_aggr_aerospike_log,
272 .get_current_time = NULL,
273 .destroy = NULL
274};
275// **************************************************************************************************
276
277
278
279int
280as_aggr_process(as_namespace *ns, as_aggr_call * ag_call, cf_ll * ap_recl, void * udata, as_result * ap_res)
281{
282 as_index_ref r_ref;
283 as_storage_rd rd;
284 bzero(&rd, sizeof(as_storage_rd));
285 as_transaction tr;
286
287
288 udf_record urecord;
289 udf_record_init(&urecord, false);
290 urecord.tr = &tr;
291 urecord.r_ref = &r_ref;
292 urecord.rd = &rd;
293 as_rec * urec = as_rec_new(&urecord, &udf_record_hooks);
294
295 aggr_state astate = {
296 .iter = cf_ll_getIterator(ap_recl, true /*forward*/),
297 .urec = urec,
298 .keys_arr = NULL,
299 .keys_arr_offset = 0,
300 .call = ag_call,
301 .udata = udata,
302 .rec_open = false,
303 .rsv = &tr.rsv,
304 .ns = ns
305 };
306
307 if (!astate.iter) {
308 cf_warning (AS_AGGR, "Could not set up iterator .. possibly out of memory .. Aborting Query !!");
309 as_rec_destroy(urec);
310 return AS_AGGR_ERR;
311 }
312
313 as_aerospike as;
314 as_aerospike_init(&as, NULL, &as_aggr_aerospike_hooks);
315
316 // Input Stream
317 as_stream istream;
318 as_stream_init(&istream, &astate, &istream_hooks);
319
320 // Output stream
321 as_stream ostream;
322 as_stream_init(&ostream, &astate, &ostream_hooks);
323
324 as_udf_context ctx = {
325 .as = &as,
326 .timer = NULL,
327 .memtracker = NULL
328 };
329 int ret = as_module_apply_stream(&mod_lua, &ctx, ag_call->def.filename, ag_call->def.function, &istream, ag_call->def.arglist, &ostream, ap_res);
330
331 acleanup(&astate);
332 return ret;
333}
334