1/*
2 * thr_query.c
3 *
4 * Copyright (C) 2012-2015 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23/*
24 * This code is responsible for the query execution. Each query received
25 * query transaction for the query threads to execute. Query has two parts
26 * a) Generator : This query the Aerospike Index B-tree and creates the digest list and
27 * queues it up for LOOKUP / UDF / AGGREGATION
28 * b) Aggregator : This does required processing of the record and send back
29 * response to the clients.
30 * LOOKUP: Read the record from the disk and based on the
31 * records selected by query packs it into the buffer
32 * and returns it back to the client
33 * UDF: Reads the record from the disk and based on the
34 * query applies UDF and packs the result back into
35 * the buffer and returns it back to the client.
36 * AGGREGATION: Creates istream(on the digstlist) and ostream(
37 * over the network buffer) and applies aggregator
38 * functions. For a single query this can be called
39 * multiple times. The istream interface takes care
40 * of partition reservation / record opening/ closing
41 * and object lock synchronization. Whole of which
42 * is driven by as_stream_read / as_stream_write from
43 * inside aggregation UDF. ostream keeps sending by
44 * batched result to the client.
45 *
46 * Please note all these parts can either be performed under single thread
47 * context or by different set of threads. For the namespace with data on disk
48 * I/O is performed separately in different set of I/O pools
49 *
50 * Flow of code looks like
51 *
52 * 1. thr_tsvc()
53 *
54 * ---------------------------------> query_generator
55 * / /|\ |
56 * as_query ----- | | qtr released
57 * (sets up qtr) \ qtr reserved | \|/
58 * ----------------> g_query_q ------> query_th
59 *
60 *
61 * 2. Query Threads
62 * ---------------------------------> qwork_process
63 * / /|\ |
64 * query_generator -- | | qtr released
65 * (sets up qwork) \ qtr reserved | \|/
66 * --------------> g_query_work_queue -> query_th
67 *
68 *
69 *
70 * 3. I/O threads
71 * query_process_ioreq --> query_io
72 * /
73 * qwork_process -----------------query_process_udfreq --> internal txn
74 * \
75 * query_process_aggreq --> ag_aggr_process
76 *
77 * (Releases all the resources qtr and qwork if allocated)
78 *
79 * A query may be single thread execution or a multi threaded application. In the
80 * single thread execution all the functions are called in the single thread context
81 * and no queue is involved. In case of multi thread context qtr is setup by thr_tsvc
82 * and which is picked up by the query threads which could either service it in single
83 * thread or queue up to the I/O worker thread (done generally in case of data on ssd)
84 *
85 */
86
87#include "base/thr_query.h"
88
89#include <assert.h>
90#include <errno.h>
91#include <pthread.h>
92#include <stdio.h>
93#include <string.h>
94#include <strings.h>
95#include <unistd.h>
96#include <sys/time.h>
97
98#include "aerospike/as_atomic.h"
99#include "aerospike/as_buffer.h"
100#include "aerospike/as_integer.h"
101#include "aerospike/as_list.h"
102#include "aerospike/as_map.h"
103#include "aerospike/as_msgpack.h"
104#include "aerospike/as_serializer.h"
105#include "aerospike/as_stream.h"
106#include "aerospike/as_string.h"
107#include "aerospike/as_rec.h"
108#include "aerospike/as_val.h"
109#include "aerospike/mod_lua.h"
110#include "citrusleaf/cf_ll.h"
111#include "citrusleaf/cf_queue.h"
112
113#include "ai_btree.h"
114#include "bt.h"
115#include "bt_iterator.h"
116#include "cf_mutex.h"
117#include "cf_thread.h"
118#include "rchash.h"
119
120#include "base/aggr.h"
121#include "base/as_stap.h"
122#include "base/datamodel.h"
123#include "base/predexp.h"
124#include "base/proto.h"
125#include "base/secondary_index.h"
126#include "base/service.h"
127#include "base/stats.h"
128#include "base/transaction.h"
129#include "base/udf_record.h"
130#include "fabric/fabric.h"
131#include "fabric/partition.h"
132#include "geospatial/geospatial.h"
133#include "transaction/udf.h"
134#include "transaction/write.h"
135
136
137/*
138 * Query Transaction State
139 */
140// **************************************************************************************************
141typedef enum {
142 AS_QTR_STATE_INIT = 0,
143 AS_QTR_STATE_RUNNING = 1,
144 AS_QTR_STATE_ABORT = 2,
145 AS_QTR_STATE_ERR = 3,
146 AS_QTR_STATE_DONE = 4,
147} qtr_state;
148// **************************************************************************************************
149
150/*
151 * Query Transcation Type
152 */
153// **************************************************************************************************
154typedef enum {
155 QUERY_TYPE_LOOKUP = 0,
156 QUERY_TYPE_AGGR = 1,
157 QUERY_TYPE_UDF_BG = 2,
158 QUERY_TYPE_OPS_BG = 3,
159
160 QUERY_TYPE_UNKNOWN = -1
161} query_type;
162
163
164
165/*
166 * Query Transaction Structure
167 */
168// **************************************************************************************************
169typedef struct as_query_transaction_s {
170
171 /*
172 * MT (Read Only) No protection required
173 */
174 /************************** Query Parameter ********************************/
175 uint64_t trid;
176 as_namespace * ns;
177 char * setname;
178 as_sindex * si;
179 as_sindex_range * srange;
180 query_type job_type; // Job type [LOOKUP/AGG/UDF]
181 bool no_bin_data;
182 predexp_eval_t * predexp_eval;
183 cf_vector * binlist;
184 as_file_handle * fd_h; // ref counted nonetheless
185 /************************** Run Time Data *********************************/
186 bool blocking;
187 uint32_t priority;
188 uint64_t start_time; // Start time
189 uint64_t end_time; // timeout value
190
191 /*
192 * MT (Single Writer / Single Threaded / Multiple Readers)
193 * Atomics or no Protection
194 */
195 /****************** Stats (only generator) ***********************/
196 uint64_t querying_ai_time_ns; // Time spent by query to run lookup secondary index trees.
197 uint32_t n_digests; // Digests picked by from secondary index
198 // including record read
199 bool short_running;
200 bool track;
201
202 /*
203 * MT (Multiple Writers)
204 * These fields are either needs to be atomic or protected by lock.
205 */
206 /****************** Stats (worker threads) ***********************/
207 cf_atomic64 n_result_records; // Number of records returned as result
208 // if aggregation returns 1 record count
209 // is 1, irrelevant of number of record
210 // being touched.
211 cf_atomic64 net_io_bytes;
212 cf_atomic64 n_read_success;
213
214 /********************** Query Progress ***********************************/
215 cf_atomic32 n_qwork_active;
216 cf_atomic32 n_io_outstanding;
217 cf_atomic32 n_udf_tr_queued; // Throttling: max in flight scan
218 cf_atomic32 n_ops_tr_queued; // Throttling: max in flight scan
219
220 /********************* Net IO packet order *******************************/
221 cf_atomic32 netio_push_seq;
222 cf_atomic32 netio_pop_seq;
223
224 /********************** IO Buf Builder ***********************************/
225 cf_mutex buf_mutex;
226 cf_buf_builder * bb_r;
227 /****************** Query State and Result Code **************************/
228 cf_mutex slock;
229 bool do_requeue;
230 qtr_state state;
231 int result_code;
232
233 /********************* Fields Not Memzeroed **********************
234 *
235 * Empirically, some of the following fields *still* require memzero
236 * initialization. Please test with a memset(qtr, 0xff, sizeof(*qtr))
237 * right after allocation before you initialize before moving them
238 * into the uninitialized section.
239 *
240 * NB: Read Only or Single threaded
241 */
242 struct ai_obj bkey;
243 as_aggr_call agg_call; // Stream UDF Details
244 iudf_origin iudf_orig; // Background UDF details
245 iops_origin iops_orig; // Background ops details
246 as_sindex_qctx qctx; // Secondary Index details
247 as_partition_reservation * rsv;
248} as_query_transaction;
249// **************************************************************************************************
250
251
252
253/*
254 * Query Request Type
255 */
256// **************************************************************************************************
257typedef enum {
258 QUERY_WORK_TYPE_NONE = -1, // Request for I/O
259 QUERY_WORK_TYPE_LOOKUP = 0, // Request for I/O
260 QUERY_WORK_TYPE_AGG = 1, // Request for Aggregation
261 QUERY_WORK_TYPE_UDF_BG = 2, // Request for running UDF on query result
262 QUERY_WORK_TYPE_OPS_BG = 3 // Request for running ops on query result
263} query_work_type;
264// **************************************************************************************************
265
266
267/*
268 * Query Request
269 */
270// **************************************************************************************************
271typedef struct query_work_s {
272 query_work_type type;
273 as_query_transaction * qtr;
274 cf_ll * recl;
275 uint64_t queued_time_ns;
276} query_work;
277// **************************************************************************************************
278
279
280/*
281 * Job Monitoring
282 */
283// **************************************************************************************************
284typedef struct query_jobstat_s {
285 int index;
286 as_mon_jobstat ** jobstat;
287 int max_size;
288} query_jobstat;
289// **************************************************************************************************
290
291/*
292 * Skey list
293 */
294// **************************************************************************************************
295typedef struct qtr_skey_s {
296 as_query_transaction * qtr;
297 as_sindex_key * skey;
298} qtr_skey;
299// **************************************************************************************************
300
301
302/*
303 * Query Engine Global
304 */
305// **************************************************************************************************
306static int g_current_queries_count = 0;
307static pthread_rwlock_t g_query_lock
308 = PTHREAD_RWLOCK_WRITER_NONRECURSIVE_INITIALIZER_NP;
309static cf_rchash * g_query_job_hash = NULL;
310// Buf Builder Pool
311static cf_queue * g_query_response_bb_pool = 0;
312static cf_queue * g_query_qwork_pool = 0;
313cf_mutex g_query_pool_mutex = CF_MUTEX_INIT;
314as_query_transaction * g_query_pool_head = NULL;
315size_t g_query_pool_count = 0;
316//
317// GENERATOR
318static cf_queue * g_query_short_queue = 0;
319static cf_queue * g_query_long_queue = 0;
320static cf_atomic32 g_query_threadcnt = 0;
321
322cf_atomic32 g_query_short_running = 0;
323cf_atomic32 g_query_long_running = 0;
324
325// I/O & AGGREGATOR
326static cf_queue * g_query_work_queue = 0;
327static cf_atomic32 g_query_worker_threadcnt = 0;
328// **************************************************************************************************
329
330/*
331 * Extern Functions
332 */
333// **************************************************************************************************
334
335extern cf_vector * as_sindex_binlist_from_msg(as_namespace *ns, as_msg *msgp, int * numbins);
336
337// **************************************************************************************************
338
339/*
340 * Forward Declaration
341 */
342// **************************************************************************************************
343
344static void qtr_finish_work(as_query_transaction *qtr, cf_atomic32 *stat, char *fname, int lineno, bool release);
345
346// **************************************************************************************************
347
348/*
349 * Histograms
350 */
351// **************************************************************************************************
352histogram * query_txn_q_wait_hist; // Histogram to track time spend in trasaction queue. Transaction
353 // queue backing, it is busy. Check if query in transaction is
354 // true from query perspective.
355histogram * query_query_q_wait_hist; // Histogram to track time spend waiting in queue for query thread.
356 // Query queue backing up. Try increasing query thread in case CPU is
357 // not fully utilized or if system is not IO bound
358histogram * query_prepare_batch_hist; // Histogram to track time spend while preparing batches. Secondary index
359 // slow. Check batch is too big
360histogram * query_batch_io_q_wait_hist; // Histogram to track time spend waiting in queue for worker thread.
361histogram * query_batch_io_hist; // Histogram to track time spend doing I/O per batch. This includes
362 // priority based sleep after n units of work.
363 // For above two Query worker thread busy if not IO bound then try bumping
364 // up the priority. Query thread may be yielding too much.
365histogram * query_net_io_hist; // Histogram to track time spend sending results to client. Network problem!!
366 // or client too slow
367
368#define QUERY_HIST_INSERT_DATA_POINT(type, start_time_ns) \
369do { \
370 if (g_config.query_enable_histogram && start_time_ns != 0) { \
371 if (type) { \
372 histogram_insert_data_point(type, start_time_ns); \
373 } \
374 } \
375} while(0);
376
377#define QUERY_HIST_INSERT_RAW(type, time_ns) \
378do { \
379 if (g_config.query_enable_histogram && time_ns != 0) { \
380 if (type) { \
381 histogram_insert_raw(type, time_ns); \
382 } \
383 } \
384} while(0);
385
386// **************************************************************************************************
387
388
389/*
390 * Query Locks
391 */
392// **************************************************************************************************
393static void
394qtr_lock(as_query_transaction *qtr) {
395 if (qtr) {
396 cf_mutex_lock(&qtr->slock);
397 }
398}
399static void
400qtr_unlock(as_query_transaction *qtr) {
401 if (qtr) {
402 cf_mutex_unlock(&qtr->slock);
403 }
404}
405// **************************************************************************************************
406
407
408/*
409 * Query Transaction Pool
410 */
411// **************************************************************************************************
412static as_query_transaction *
413qtr_alloc()
414{
415 cf_mutex_lock(&g_query_pool_mutex);
416
417 as_query_transaction * qtr;
418
419 if (!g_query_pool_head) {
420 qtr = cf_rc_alloc(sizeof(as_query_transaction));
421 } else {
422 qtr = g_query_pool_head;
423 g_query_pool_head = * (as_query_transaction **) qtr;
424 --g_query_pool_count;
425 cf_rc_reserve(qtr);
426 }
427
428 cf_mutex_unlock(&g_query_pool_mutex);
429 return qtr;
430}
431
432static void
433qtr_free(as_query_transaction * qtr)
434{
435 cf_mutex_lock(&g_query_pool_mutex);
436
437 if (g_query_pool_count >= AS_QUERY_MAX_QTR_POOL) {
438 cf_rc_free(qtr);
439 }
440 else {
441 // Use the initial location as a next pointer.
442 * (as_query_transaction **) qtr = g_query_pool_head;
443 g_query_pool_head = qtr;
444 ++g_query_pool_count;
445 }
446
447 cf_mutex_unlock(&g_query_pool_mutex);
448}
449// **************************************************************************************************
450
451
452/*
453 * Bufbuilder buffer pool
454 */
455// **************************************************************************************************
456static int
457bb_poolrelease(cf_buf_builder *bb_r)
458{
459 int ret = AS_QUERY_OK;
460 if ((cf_queue_sz(g_query_response_bb_pool) > g_config.query_bufpool_size)
461 || g_config.query_buf_size != cf_buf_builder_size(bb_r)) {
462 cf_detail(AS_QUERY, "Freed Buffer of Size %zu with", bb_r->alloc_sz + sizeof(as_msg));
463 cf_buf_builder_free(bb_r);
464 } else {
465 cf_detail(AS_QUERY, "Pushed %p %"PRIu64" %d ", bb_r, g_config.query_buf_size, cf_buf_builder_size(bb_r));
466 cf_queue_push(g_query_response_bb_pool, &bb_r);
467 }
468 return ret;
469}
470
471static cf_buf_builder *
472bb_poolrequest()
473{
474 cf_buf_builder *bb_r;
475 int rv = cf_queue_pop(g_query_response_bb_pool, &bb_r, CF_QUEUE_NOWAIT);
476 if (rv == CF_QUEUE_EMPTY) {
477 bb_r = cf_buf_builder_create(g_config.query_buf_size);
478 } else if (rv == CF_QUEUE_OK) {
479 bb_r->used_sz = 0;
480 cf_detail(AS_QUERY, "Popped %p", bb_r);
481 } else {
482 cf_warning(AS_QUERY, "Failed to find response buffer in the pool%d", rv);
483 return NULL;
484 }
485 return bb_r;
486};
487// **************************************************************************************************
488
489/*
490 * Query Request Pool
491 */
492// **************************************************************************************************
493static int
494qwork_poolrelease(query_work *qwork)
495{
496 if (!qwork) return AS_QUERY_OK;
497 qwork->qtr = 0;
498 qwork->type = QUERY_WORK_TYPE_NONE;
499
500 if (cf_queue_sz(g_query_qwork_pool) < AS_QUERY_MAX_QREQ) {
501 cf_detail(AS_QUERY, "Pushed qwork %p", qwork);
502 cf_queue_push(g_query_qwork_pool, &qwork);
503 } else {
504 cf_detail(AS_QUERY, "Freed qwork %p", qwork);
505 cf_free(qwork);
506 }
507
508 return AS_QUERY_OK;
509}
510
511static query_work *
512qwork_poolrequest()
513{
514 query_work *qwork = NULL;
515 int rv = cf_queue_pop(g_query_qwork_pool, &qwork, CF_QUEUE_NOWAIT);
516 if (rv == CF_QUEUE_EMPTY) {
517 qwork = cf_malloc(sizeof(query_work));
518 memset(qwork, 0, sizeof(query_work));
519 } else if (rv != CF_QUEUE_OK) {
520 cf_warning(AS_QUERY, "Failed to find query work in the pool");
521 return NULL;
522 }
523 qwork->qtr = 0;
524 qwork->type = QUERY_WORK_TYPE_NONE;
525 return qwork;
526};
527// **************************************************************************************************
528
529
530/*
531 * Query State set/get function
532 */
533// **************************************************************************************************
534static void
535qtr_set_running(as_query_transaction *qtr) {
536 qtr_lock(qtr);
537 if (qtr->state == AS_QTR_STATE_INIT) {
538 qtr->state = AS_QTR_STATE_RUNNING;
539 } else {
540 cf_crash(AS_QUERY, "Invalid Query state %d while moving to running state ...", qtr->state);
541 }
542 qtr_unlock(qtr);
543}
544
545/*
546 * Query in non init state (picked up by generator) means it is
547 * running. Could be RUNNING/ABORT/FAIL/DONE
548 */
549static bool
550qtr_started(as_query_transaction *qtr) {
551 qtr_lock(qtr);
552 bool started = false;
553 if (qtr->state != AS_QTR_STATE_INIT) {
554 started = true;
555 }
556 qtr_unlock(qtr);
557 return started;
558}
559
560static void
561qtr_set_abort(as_query_transaction *qtr, int result_code, char *fname, int lineno)
562{
563 qtr_lock(qtr);
564 if (qtr->state == AS_QTR_STATE_RUNNING
565 || qtr->state == AS_QTR_STATE_DONE) {
566 cf_debug(AS_QUERY, "Query %p Aborted at %s:%d", qtr, fname, lineno);
567 qtr->state = AS_QTR_STATE_ABORT;
568 qtr->result_code = result_code;
569 }
570 qtr_unlock(qtr);
571}
572
573static void
574qtr_set_err(as_query_transaction *qtr, int result_code, char *fname, int lineno)
575{
576 qtr_lock(qtr);
577 if (qtr->state == AS_QTR_STATE_RUNNING) {
578 cf_debug(AS_QUERY, "Query %p Error at %s:%d", qtr, fname, lineno);
579 qtr->state = AS_QTR_STATE_ERR;
580 qtr->result_code = result_code;
581 }
582 qtr_unlock(qtr);
583}
584
585static void
586qtr_set_done(as_query_transaction *qtr, int result_code, char *fname, int lineno)
587{
588 qtr_lock(qtr);
589 if (qtr->state == AS_QTR_STATE_RUNNING) {
590 cf_debug(AS_QUERY, "Query %p Done at %s:%d", qtr, fname, lineno);
591 qtr->state = AS_QTR_STATE_DONE;
592 qtr->result_code = result_code;
593 }
594 qtr_unlock(qtr);
595}
596
597static bool
598qtr_failed(as_query_transaction *qtr)
599{
600 qtr_lock(qtr);
601 bool abort = false;
602 if ((qtr->state == AS_QTR_STATE_ABORT)
603 || (qtr->state == AS_QTR_STATE_ERR)) {
604 abort = true;
605 }
606 qtr_unlock(qtr);
607 return abort;
608}
609
610static bool
611qtr_is_abort(as_query_transaction *qtr)
612{
613 qtr_lock(qtr);
614 bool abort = false;
615 if (qtr->state == AS_QTR_STATE_ABORT) {
616 abort = true;
617 }
618 qtr_unlock(qtr);
619 return abort;
620}
621
622
623static bool
624qtr_finished(as_query_transaction *qtr)
625{
626 qtr_lock(qtr);
627 bool finished = false;
628 if ((qtr->state == AS_QTR_STATE_DONE)
629 || (qtr->state == AS_QTR_STATE_ERR)
630 || (qtr->state == AS_QTR_STATE_ABORT)) {
631 finished = true;
632 }
633 qtr_unlock(qtr);
634 return finished;
635}
636
637static void
638query_check_timeout(as_query_transaction *qtr)
639{
640 if ((qtr)
641 && (qtr->end_time != 0)
642 && (cf_getns() > qtr->end_time)) {
643 cf_debug(AS_QUERY, "Query Timed-out %lu %lu", cf_getns(), qtr->end_time);
644 qtr_set_err(qtr, AS_ERR_QUERY_TIMEOUT, __FILE__, __LINE__);
645 }
646}
647// **************************************************************************************************
648
649
650/*
651 * Query Destructor Function
652 */
653// **************************************************************************************************
654static void
655query_release_prereserved_partitions(as_query_transaction * qtr)
656{
657 if (!qtr) {
658 cf_warning(AS_QUERY, "qtr is NULL");
659 return;
660 }
661 if (qtr->qctx.partitions_pre_reserved) {
662 for (int i=0; i<AS_PARTITIONS; i++) {
663 if (qtr->qctx.can_partition_query[i]) {
664 as_partition_release(&qtr->rsv[i]);
665 }
666 }
667 if (qtr->rsv) {
668 cf_free(qtr->rsv);
669 }
670 }
671}
672
673/*
674 * NB: These stats come into picture only if query really started
675 * running. If it fails before even running it is accounted in
676 * fail
677 */
678static inline void
679query_update_stats(as_query_transaction *qtr)
680{
681 uint64_t rows = cf_atomic64_get(qtr->n_result_records);
682
683 switch (qtr->job_type) {
684 case QUERY_TYPE_LOOKUP:
685 if (qtr->state == AS_QTR_STATE_ABORT) {
686 cf_atomic64_incr(&qtr->ns->n_lookup_abort);
687 } else if (qtr->state == AS_QTR_STATE_ERR) {
688 cf_atomic64_incr(&(qtr->si->stats.lookup_errs));
689 cf_atomic64_incr(&qtr->ns->n_lookup_errs);
690 }
691 if (!qtr_failed(qtr))
692 cf_atomic64_incr(&qtr->ns->n_lookup_success);
693 cf_atomic64_incr(&qtr->si->stats.n_lookup);
694 cf_atomic64_add(&qtr->si->stats.lookup_response_size, qtr->net_io_bytes);
695 cf_atomic64_add(&qtr->si->stats.lookup_num_records, rows);
696 cf_atomic64_add(&qtr->ns->lookup_response_size, qtr->net_io_bytes);
697 cf_atomic64_add(&qtr->ns->lookup_num_records, rows);
698 break;
699
700 case QUERY_TYPE_AGGR:
701 if (qtr->state == AS_QTR_STATE_ABORT) {
702 cf_atomic64_incr(&qtr->ns->n_agg_abort);
703 } else if (qtr->state == AS_QTR_STATE_ERR) {
704 cf_atomic64_incr(&(qtr->si->stats.agg_errs));
705 cf_atomic64_incr(&qtr->ns->n_agg_errs);
706 }
707 if (!qtr_failed(qtr))
708 cf_atomic64_incr(&qtr->ns->n_agg_success);
709 cf_atomic64_incr(&qtr->si->stats.n_aggregation);
710 cf_atomic64_add(&qtr->si->stats.agg_response_size, qtr->net_io_bytes);
711 cf_atomic64_add(&qtr->si->stats.agg_num_records, rows);
712 cf_atomic64_add(&qtr->ns->agg_response_size, qtr->net_io_bytes);
713 cf_atomic64_add(&qtr->ns->agg_num_records, rows);
714 break;
715
716 case QUERY_TYPE_UDF_BG:
717 if (qtr_failed(qtr)) {
718 cf_atomic64_incr(&qtr->ns->n_query_udf_bg_failure);
719 } else {
720 cf_atomic64_incr(&qtr->ns->n_query_udf_bg_success);
721 }
722 break;
723
724 case QUERY_TYPE_OPS_BG:
725 if (qtr_failed(qtr)) {
726 cf_atomic64_incr(&qtr->ns->n_query_ops_bg_failure);
727 } else {
728 cf_atomic64_incr(&qtr->ns->n_query_ops_bg_success);
729 }
730 break;
731
732 default:
733 cf_crash(AS_QUERY, "Unknown Query Type !!");
734 break;
735 }
736
737 // Can't use macro that tr and rw use.
738 qtr->ns->query_hist_active = true;
739 cf_hist_track_insert_data_point(qtr->ns->query_hist, qtr->start_time);
740
741 SINDEX_HIST_INSERT_DATA_POINT(qtr->si, query_hist, qtr->start_time);
742
743 if (qtr->querying_ai_time_ns) {
744 QUERY_HIST_INSERT_RAW(query_prepare_batch_hist, qtr->querying_ai_time_ns);
745 }
746
747 if (qtr->n_digests) {
748 SINDEX_HIST_INSERT_RAW(qtr->si, query_rcnt_hist, qtr->n_digests);
749 if (rows) {
750 // Can't use macro that tr and rw use.
751 qtr->ns->query_rec_count_hist_active = true;
752 histogram_insert_raw(qtr->ns->query_rec_count_hist, rows);
753
754 SINDEX_HIST_INSERT_RAW(qtr->si, query_diff_hist, qtr->n_digests - rows);
755 }
756 }
757
758
759
760 uint64_t query_stop_time = cf_getns();
761 uint64_t elapsed_us = (query_stop_time - qtr->start_time) / 1000;
762 cf_detail(AS_QUERY,
763 "Total time elapsed %"PRIu64" us, %"PRIu64" of %d read operations avg latency %"PRIu64" us",
764 elapsed_us, rows, qtr->n_digests, rows > 0 ? elapsed_us / rows : 0);
765}
766
767static void
768query_run_teardown(as_query_transaction *qtr)
769{
770 query_update_stats(qtr);
771
772 if (qtr->n_udf_tr_queued != 0) {
773 cf_warning(AS_QUERY, "QUEUED UDF not equal to zero when query transaction is done");
774 }
775
776 if (qtr->qctx.recl) {
777 cf_ll_reduce(qtr->qctx.recl, true /*forward*/, as_index_keys_ll_reduce_fn, NULL);
778 cf_free(qtr->qctx.recl);
779 qtr->qctx.recl = NULL;
780 }
781
782 if (qtr->short_running) {
783 cf_atomic32_decr(&g_query_short_running);
784 } else {
785 cf_atomic32_decr(&g_query_long_running);
786 }
787
788 // Release all the partitions
789 query_release_prereserved_partitions(qtr);
790
791
792 if (qtr->bb_r) {
793 bb_poolrelease(qtr->bb_r);
794 qtr->bb_r = NULL;
795 }
796
797 cf_mutex_destroy(&qtr->buf_mutex);
798}
799
800static void
801query_teardown(as_query_transaction *qtr)
802{
803 if (qtr->srange) as_sindex_range_free(&qtr->srange);
804 if (qtr->si) AS_SINDEX_RELEASE(qtr->si);
805 if (qtr->binlist) cf_vector_destroy(qtr->binlist);
806 if (qtr->setname) cf_free(qtr->setname);
807
808 predexp_destroy(qtr->predexp_eval);
809
810 if (qtr->job_type == QUERY_TYPE_AGGR && qtr->agg_call.def.arglist) {
811 as_list_destroy(qtr->agg_call.def.arglist);
812 }
813 else if (qtr->job_type == QUERY_TYPE_UDF_BG) {
814 iudf_origin_destroy(&qtr->iudf_orig);
815 }
816 else if (qtr->job_type == QUERY_TYPE_OPS_BG) {
817 iops_origin_destroy(&qtr->iops_orig);
818 }
819
820 cf_mutex_destroy(&qtr->slock);
821}
822
823static void
824query_release_fd(as_file_handle *fd_h, bool force_close)
825{
826 if (fd_h) {
827 fd_h->last_used = cf_getns();
828 as_end_of_transaction(fd_h, force_close);
829 }
830}
831
832static void
833query_transaction_done(as_query_transaction *qtr)
834{
835
836#if defined(USE_SYSTEMTAP)
837 uint64_t nodeid = g_config.self_node;
838#endif
839
840 if (!qtr)
841 return;
842
843 ASD_QUERY_TRANS_DONE(nodeid, qtr->trid, (void *) qtr);
844
845 if (qtr_started(qtr)) {
846 query_run_teardown(qtr);
847 }
848
849
850 // if query is aborted force close connection.
851 // Not to be reused
852 query_release_fd(qtr->fd_h, qtr_is_abort(qtr));
853 qtr->fd_h = NULL;
854 query_teardown(qtr);
855
856 ASD_QUERY_QTR_FREE(nodeid, qtr->trid, (void *) qtr);
857
858 qtr_free(qtr);
859}
860// **************************************************************************************************
861
862
863/*
864 * Query Transaction Ref Counts
865 */
866// **************************************************************************************************
867int
868qtr_release(as_query_transaction *qtr, char *fname, int lineno)
869{
870 if (qtr) {
871 int val = cf_rc_release(qtr);
872 if (val == 0) {
873 cf_detail(AS_QUERY, "Released qtr [%s:%d] %p %d ", fname, lineno, qtr, val);
874 query_transaction_done(qtr);
875 }
876 cf_detail(AS_QUERY, "Released qtr [%s:%d] %p %d ", fname, lineno, qtr, val);
877 }
878 return AS_QUERY_OK;
879}
880
881static int
882qtr_reserve(as_query_transaction *qtr, char *fname, int lineno)
883{
884 if (!qtr) {
885 return AS_QUERY_ERR;
886 }
887 int val = cf_rc_reserve(qtr);
888 cf_detail(AS_QUERY, "Reserved qtr [%s:%d] %p %d ", fname, lineno, qtr, val);
889 return AS_QUERY_OK;
890}
891// **************************************************************************************************
892
893
894/*
895 * Async Network IO Entry Point
896 */
897// **************************************************************************************************
898/* Call back function to determine if the IO should go ahead or not.
899 * Purpose
900 * 1. If our sequence number does not match requeue
901 * 2. If query aborted fail IO.
902 * 3. In all other cases let the IO go through. That would mean
903 * if IO is queued it will be done before the fin with error
904 * result_code is sent !!
905 */
906int
907query_netio_start_cb(void *udata, int seq)
908{
909 as_netio *io = (as_netio *)udata;
910 as_query_transaction *qtr = (as_query_transaction *)io->data;
911 cf_detail(AS_QUERY, "Netio Started_CB %d %d %d %d ", io->offset, io->seq, qtr->netio_pop_seq, qtr->state);
912
913 // It is needed to send all the packets in sequence
914 // A packet can be requeued after being half sent.
915 if (seq > cf_atomic32_get(qtr->netio_pop_seq)) {
916 return AS_NETIO_CONTINUE;
917 }
918
919 if (qtr_is_abort(qtr)) {
920 return AS_QUERY_ERR;
921 }
922
923 return AS_NETIO_OK;
924}
925
926/*
927 * The function after the IO on the network has been done.
928 * 1. If OK was done successfully bump up the sequence number and
929 * fix stats
930 * 2. Release the qtr if something fails ... which would trigger
931 * fin packet send and eventually free up qtr
932 * Abort it set if something goes wrong
933 */
934int
935query_netio_finish_cb(void *data, int retcode)
936{
937 as_netio *io = (as_netio *)data;
938 cf_detail(AS_QUERY, "Query Finish Callback io seq %d with retCode %d", io->seq, retcode);
939 as_query_transaction *qtr = (as_query_transaction *)io->data;
940 if (qtr && (retcode != AS_NETIO_CONTINUE)) {
941 // If send success make stat is updated
942 if (retcode == AS_NETIO_OK) {
943 cf_atomic64_add(&qtr->net_io_bytes, io->bb_r->used_sz + 8);
944 } else {
945 qtr_set_abort(qtr, AS_ERR_QUERY_NET_IO, __FILE__, __LINE__);
946 }
947 QUERY_HIST_INSERT_DATA_POINT(query_net_io_hist, io->start_time);
948
949 // Undo the increment from query_netio(). Cannot reach zero here: the
950 // increment owned by the transaction will only be undone after all netio
951 // is complete.
952 cf_rc_release(io->fd_h);
953 io->fd_h = NULL;
954 bb_poolrelease(io->bb_r);
955
956 cf_atomic32_incr(&qtr->netio_pop_seq);
957
958 qtr_finish_work(qtr, &qtr->n_io_outstanding, __FILE__, __LINE__, true);
959 }
960 return retcode;
961}
962
963#define MAX_OUTSTANDING_IO_REQ 2
964static int
965query_netio_wait(as_query_transaction *qtr)
966{
967 return (cf_atomic32_get(qtr->n_io_outstanding) > MAX_OUTSTANDING_IO_REQ) ? AS_QUERY_ERR : AS_QUERY_OK;
968}
969
970// Returns AS_NETIO_OK always
971static int
972query_netio(as_query_transaction *qtr)
973{
974#if defined(USE_SYSTEMTAP)
975 uint64_t nodeid = g_config.self_node;
976#endif
977
978 ASD_QUERY_NETIO_STARTING(nodeid, qtr->trid);
979
980 as_netio io;
981
982 io.finish_cb = query_netio_finish_cb;
983 io.start_cb = query_netio_start_cb;
984
985 qtr_reserve(qtr, __FILE__, __LINE__);
986 io.data = qtr;
987
988 io.bb_r = qtr->bb_r;
989 qtr->bb_r = NULL;
990
991 cf_rc_reserve(qtr->fd_h);
992 io.fd_h = qtr->fd_h;
993
994 io.offset = 0;
995
996 cf_atomic32_incr(&qtr->n_io_outstanding);
997 io.seq = cf_atomic32_incr(&qtr->netio_push_seq);
998 io.start_time = cf_getns();
999
1000 int ret = as_netio_send(&io, false, qtr->blocking);
1001 qtr->bb_r = bb_poolrequest();
1002 cf_buf_builder_reserve(&qtr->bb_r, 8, NULL);
1003
1004 ASD_QUERY_NETIO_FINISHED(nodeid, qtr->trid);
1005
1006 return ret;
1007}
1008// **************************************************************************************************
1009
1010
1011/*
1012 * Query Reservation Abstraction
1013 */
1014// **************************************************************************************************
1015// Returns NULL if partition with is 'pid' is not query-able Else
1016// if all the partitions are reserved upfront returns the rsv used for reserving the partition
1017// else reserves the partition and returns rsv
1018as_partition_reservation *
1019query_reserve_partition(as_namespace * ns, as_query_transaction * qtr, uint32_t pid, as_partition_reservation * rsv)
1020{
1021 if (qtr->qctx.partitions_pre_reserved) {
1022 if (!qtr->qctx.can_partition_query[pid]) {
1023 cf_debug(AS_QUERY, "Getting digest in rec list which do not belong to query-able partition.");
1024 return NULL;
1025 }
1026 return &qtr->rsv[pid];
1027 }
1028
1029 // Works for scan aggregation
1030 if (!rsv) {
1031 cf_warning(AS_QUERY, "rsv is null while reserving partition.");
1032 return NULL;
1033 }
1034
1035 if (0 != as_partition_reserve_query(ns, pid, rsv)) {
1036 return NULL;
1037 }
1038
1039 return rsv;
1040}
1041
1042void
1043query_release_partition(as_query_transaction * qtr, as_partition_reservation * rsv)
1044{
1045 if (!qtr->qctx.partitions_pre_reserved) {
1046 as_partition_release(rsv);
1047 }
1048}
1049
1050// Pre reserves query-able partitions
1051void
1052as_query_pre_reserve_partitions(as_query_transaction * qtr)
1053{
1054 if (!qtr) {
1055 cf_warning(AS_QUERY, "qtr is NULL");
1056 return;
1057 }
1058 if (qtr->qctx.partitions_pre_reserved) {
1059 qtr->rsv = cf_malloc(sizeof(as_partition_reservation) * AS_PARTITIONS);
1060 as_partition_prereserve_query(qtr->ns, qtr->qctx.can_partition_query, qtr->rsv);
1061 } else {
1062 qtr->rsv = NULL;
1063 }
1064}
1065
1066// **************************************************************************************************
1067
1068
1069/*
1070 * Query tracking
1071 */
1072// **************************************************************************************************
1073// Put qtr in a global hash
1074static int
1075hash_put_qtr(as_query_transaction * qtr)
1076{
1077 if (!qtr->track) {
1078 return AS_QUERY_CONTINUE;
1079 }
1080
1081 int rc = cf_rchash_put_unique(g_query_job_hash, &qtr->trid, sizeof(qtr->trid), qtr);
1082 if (rc) {
1083 cf_warning(AS_SINDEX, "QTR Put in hash failed with error %d", rc);
1084 }
1085
1086 return rc;
1087}
1088
1089// Get Qtr from global hash
1090static int
1091hash_get_qtr(uint64_t trid, as_query_transaction ** qtr)
1092{
1093 int rv = cf_rchash_get(g_query_job_hash, &trid, sizeof(trid), (void **) qtr);
1094 if (CF_RCHASH_OK != rv) {
1095 cf_info(AS_SINDEX, "Query job with transaction id [%"PRIu64"] does not exist", trid );
1096 }
1097 return rv;
1098}
1099
1100// Delete Qtr from global hash
1101static int
1102hash_delete_qtr(as_query_transaction *qtr)
1103{
1104 if (!qtr->track) {
1105 return AS_QUERY_CONTINUE;
1106 }
1107
1108 int rv = cf_rchash_delete(g_query_job_hash, &qtr->trid, sizeof(qtr->trid));
1109 if (CF_RCHASH_OK != rv) {
1110 cf_warning(AS_SINDEX, "Failed to delete qtr from query hash.");
1111 }
1112 return rv;
1113}
1114// If any query run from more than g_config.query_untracked_time_ms
1115// we are going to track it
1116// else no.
1117int
1118hash_track_qtr(as_query_transaction *qtr)
1119{
1120 if (!qtr->track) {
1121 if ((cf_getns() - qtr->start_time) > (g_config.query_untracked_time_ms * 1000000)) {
1122 qtr->track = true;
1123 qtr_reserve(qtr, __FILE__, __LINE__);
1124 int ret = hash_put_qtr(qtr);
1125 if (ret != 0 && ret != AS_QUERY_CONTINUE) {
1126 // track should be disabled otherwise at the
1127 // qtr cleanup stage some other qtr with the same
1128 // trid can get cleaned up.
1129 qtr->track = false;
1130 qtr_release(qtr, __FILE__, __LINE__);
1131 return AS_QUERY_ERR;
1132 }
1133 }
1134 }
1135 return AS_QUERY_OK;
1136}
1137// **************************************************************************************************
1138
1139
1140
1141/*
1142 * Query Request IO functions
1143 */
1144// **************************************************************************************************
1145/*
1146 * Function query_add_response
1147 *
1148 * Returns -
1149 * AS_QUERY_OK - On success.
1150 * AS_QUERY_ERR - On failure.
1151 *
1152 * Notes -
1153 * Basic query call back function. Fills up the client response buffer;
1154 * sends out buffer and then
1155 * reinitializes the buf for the next set of requests,
1156 * In case buffer is full Bail out quick if unable to send response back to client
1157 *
1158 * On success, qtr->n_result_records is incremented by 1.
1159 *
1160 * Synchronization -
1161 * Takes a lock over qtr->buf
1162 */
1163static int
1164query_add_response(void *void_qtr, as_storage_rd *rd)
1165{
1166 as_query_transaction *qtr = (as_query_transaction *)void_qtr;
1167
1168 // TODO - check and handle error result (< 0 - drive IO) explicitly?
1169 size_t msg_sz = (size_t)as_msg_make_response_bufbuilder(NULL, rd,
1170 qtr->no_bin_data, qtr->binlist);
1171 int ret = 0;
1172
1173 cf_mutex_lock(&qtr->buf_mutex);
1174 cf_buf_builder *bb_r = qtr->bb_r;
1175 if (bb_r == NULL) {
1176 // Assert that query is aborted if bb_r is found to be null
1177 cf_mutex_unlock(&qtr->buf_mutex);
1178 return AS_QUERY_ERR;
1179 }
1180
1181 if (msg_sz > (bb_r->alloc_sz - bb_r->used_sz) && bb_r->used_sz != 0) {
1182 query_netio(qtr);
1183 }
1184
1185 int32_t result = as_msg_make_response_bufbuilder(&qtr->bb_r, rd,
1186 qtr->no_bin_data, qtr->binlist);
1187
1188 if (result < 0) {
1189 ret = result;
1190 cf_warning(AS_QUERY, "Weird there is space but still the packing failed "
1191 "available = %zd msg size = %zu",
1192 bb_r->alloc_sz - bb_r->used_sz, msg_sz);
1193 }
1194 cf_atomic64_incr(&qtr->n_result_records);
1195 cf_mutex_unlock(&qtr->buf_mutex);
1196 return ret;
1197}
1198
1199
1200static int
1201query_add_fin(as_query_transaction *qtr)
1202{
1203
1204#if defined(USE_SYSTEMTAP)
1205 uint64_t nodeid = g_config.self_node;
1206#endif
1207 cf_detail(AS_QUERY, "Adding fin %p", qtr);
1208 uint8_t *b;
1209 // in case of aborted query, the bb_r is already released
1210 if (qtr->bb_r == NULL) {
1211 // Assert that query is aborted if bb_r is found to be null
1212 return AS_QUERY_ERR;
1213 }
1214 cf_buf_builder_reserve(&qtr->bb_r, sizeof(as_msg), &b);
1215
1216 ASD_QUERY_ADDFIN(nodeid, qtr->trid);
1217 // set up the header
1218 uint8_t *buf = b;
1219 as_msg *msgp = (as_msg *) buf;
1220 msgp->header_sz = sizeof(as_msg);
1221 msgp->info1 = 0;
1222 msgp->info2 = 0;
1223 msgp->info3 = AS_MSG_INFO3_LAST;
1224 msgp->unused = 0;
1225 msgp->result_code = qtr->result_code;
1226 msgp->generation = 0;
1227 msgp->record_ttl = 0;
1228 msgp->n_fields = 0;
1229 msgp->n_ops = 0;
1230 msgp->transaction_ttl = 0;
1231 as_msg_swap_header(msgp);
1232 return AS_QUERY_OK;
1233}
1234
1235static int
1236query_send_fin(as_query_transaction *qtr) {
1237 // Send out the final data back
1238 if (qtr->fd_h) {
1239 query_add_fin(qtr);
1240 query_netio(qtr);
1241 }
1242 return AS_QUERY_OK;
1243}
1244
1245static void
1246query_send_bg_udf_response(as_transaction *tr)
1247{
1248 cf_detail(AS_QUERY, "Send Fin for Background UDF");
1249 bool force_close = ! as_msg_send_fin(&tr->from.proto_fd_h->sock, AS_OK);
1250 query_release_fd(tr->from.proto_fd_h, force_close);
1251 tr->from.proto_fd_h = NULL;
1252}
1253
1254static void
1255query_send_bg_ops_response(as_transaction *tr)
1256{
1257 cf_detail(AS_QUERY, "send fin for background ops");
1258 bool force_close = ! as_msg_send_fin(&tr->from.proto_fd_h->sock, AS_OK);
1259 query_release_fd(tr->from.proto_fd_h, force_close);
1260 tr->from.proto_fd_h = NULL;
1261}
1262
1263static bool
1264query_match_integer_fromval(as_query_transaction * qtr, as_val *v, as_sindex_key *skey)
1265{
1266 as_sindex_bin_data *start = &qtr->srange->start;
1267 as_sindex_bin_data *end = &qtr->srange->end;
1268
1269 if ((AS_PARTICLE_TYPE_INTEGER != as_sindex_pktype(qtr->si->imd))
1270 || (AS_PARTICLE_TYPE_INTEGER != start->type)
1271 || (AS_PARTICLE_TYPE_INTEGER != end->type)) {
1272 cf_debug(AS_QUERY, "query_record_matches: Type mismatch %d!=%d!=%d!=%d binname=%s index=%s",
1273 AS_PARTICLE_TYPE_INTEGER, start->type, end->type, as_sindex_pktype(qtr->si->imd),
1274 qtr->si->imd->bname, qtr->si->imd->iname);
1275 return false;
1276 }
1277 as_integer * i = as_integer_fromval(v);
1278 int64_t value = as_integer_get(i);
1279 if (skey->key.int_key != value) {
1280 cf_debug(AS_QUERY, "query_record_matches: sindex key does "
1281 "not matches bin value in record. skey %ld bin value %ld", skey->key.int_key, value);
1282 return false;
1283 }
1284
1285 return true;
1286}
1287
1288static bool
1289query_match_string_fromval(as_query_transaction * qtr, as_val *v, as_sindex_key *skey)
1290{
1291 as_sindex_bin_data *start = &qtr->srange->start;
1292 as_sindex_bin_data *end = &qtr->srange->end;
1293
1294 if ((AS_PARTICLE_TYPE_STRING != as_sindex_pktype(qtr->si->imd))
1295 || (AS_PARTICLE_TYPE_STRING != start->type)
1296 || (AS_PARTICLE_TYPE_STRING != end->type)) {
1297 cf_debug(AS_QUERY, "query_record_matches: Type mismatch %d!=%d!=%d!=%d binname=%s index=%s",
1298 AS_PARTICLE_TYPE_STRING, start->type, end->type, as_sindex_pktype(qtr->si->imd),
1299 qtr->si->imd->bname, qtr->si->imd->iname);
1300 return false;
1301 }
1302
1303 char * str_val = as_string_get(as_string_fromval(v));
1304 cf_digest str_digest;
1305 cf_digest_compute(str_val, strlen(str_val), &str_digest);
1306
1307 if (memcmp(&str_digest, &skey->key.str_key, AS_DIGEST_KEY_SZ)) {
1308 return false;
1309 }
1310 return true;
1311}
1312
1313static bool
1314query_match_geojson_fromval(as_query_transaction * qtr, as_val *v, as_sindex_key *skey)
1315{
1316 as_sindex_bin_data *start = &qtr->srange->start;
1317 as_sindex_bin_data *end = &qtr->srange->end;
1318
1319 if ((AS_PARTICLE_TYPE_GEOJSON != as_sindex_pktype(qtr->si->imd))
1320 || (AS_PARTICLE_TYPE_GEOJSON != start->type)
1321 || (AS_PARTICLE_TYPE_GEOJSON != end->type)) {
1322 cf_debug(AS_QUERY, "query_record_matches: Type mismatch %d!=%d!=%d!=%d binname=%s index=%s",
1323 AS_PARTICLE_TYPE_GEOJSON, start->type, end->type,
1324 as_sindex_pktype(qtr->si->imd), qtr->si->imd->bname,
1325 qtr->si->imd->iname);
1326 return false;
1327 }
1328
1329 return as_particle_geojson_match_asval(v, qtr->srange->cellid,
1330 qtr->srange->region, qtr->ns->geo2dsphere_within_strict);
1331}
1332
1333// If the value matches foreach should stop iterating the
1334bool
1335query_match_mapkeys_foreach(const as_val * key, const as_val * val, void * udata)
1336{
1337 qtr_skey * q_s = (qtr_skey *)udata;
1338 switch (key->type) {
1339 case AS_STRING:
1340 // If matches return false
1341 return !query_match_string_fromval(q_s->qtr, (as_val *)key, q_s->skey);
1342 case AS_INTEGER:
1343 // If matches return false
1344 return !query_match_integer_fromval(q_s->qtr,(as_val *) key, q_s->skey);
1345 case AS_GEOJSON:
1346 // If matches return false
1347 return !query_match_geojson_fromval(q_s->qtr,(as_val *) key, q_s->skey);
1348 default:
1349 // All others don't match
1350 return true;
1351 }
1352}
1353
1354static bool
1355query_match_mapvalues_foreach(const as_val * key, const as_val * val, void * udata)
1356{
1357 qtr_skey * q_s = (qtr_skey *)udata;
1358 switch (val->type) {
1359 case AS_STRING:
1360 // If matches return false
1361 return !query_match_string_fromval(q_s->qtr, (as_val *)val, q_s->skey);
1362 case AS_INTEGER:
1363 // If matches return false
1364 return !query_match_integer_fromval(q_s->qtr, (as_val *)val, q_s->skey);
1365 case AS_GEOJSON:
1366 // If matches return false
1367 return !query_match_geojson_fromval(q_s->qtr, (as_val *)val, q_s->skey);
1368 default:
1369 // All others don't match
1370 return true;
1371 }
1372}
1373
1374static bool
1375query_match_listele_foreach(as_val * val, void * udata)
1376{
1377 qtr_skey * q_s = (qtr_skey *)udata;
1378 switch (val->type) {
1379 case AS_STRING:
1380 // If matches return false
1381 return !query_match_string_fromval(q_s->qtr, val, q_s->skey);
1382 case AS_INTEGER:
1383 // If matches return false
1384 return !query_match_integer_fromval(q_s->qtr, val, q_s->skey);
1385 case AS_GEOJSON:
1386 // If matches return false
1387 return !query_match_geojson_fromval(q_s->qtr, val, q_s->skey);
1388 default:
1389 // All others don't match
1390 return true;
1391 }
1392}
1393/*
1394 * Validate record based on its content and query make sure it indeed should
1395 * be selected. Secondary index does lazy delete for the entries for the record
1396 * for which data is on ssd. See sindex design doc for details. Hence it is
1397 * possible that it returns digest for which record may have changed. Do the
1398 * validation before returning the row.
1399 */
1400static bool
1401query_record_matches(as_query_transaction *qtr, as_storage_rd *rd, as_sindex_key * skey)
1402{
1403 // TODO: Add counters and make sure it is not a performance hit
1404 as_sindex_bin_data *start = &qtr->srange->start;
1405 as_sindex_bin_data *end = &qtr->srange->end;
1406
1407 //TODO: Make it more general to support sindex over multiple bins
1408 as_bin * b = as_bin_get_by_id(rd, qtr->si->imd->binid);
1409
1410 if (!b) {
1411 cf_debug(AS_QUERY , "as_query_record_validation: "
1412 "Bin name %s not found ", qtr->si->imd->bname);
1413 // Possible bin may not be there anymore classic case of
1414 // bin delete.
1415 return false;
1416 }
1417 uint8_t type = as_bin_get_particle_type(b);
1418
1419 // If the bin is of type cdt, we need to see if anyone of the value within cdt
1420 // matches the query.
1421 // This can be performance hit for big list and maps.
1422 as_val * res_val = NULL;
1423 as_val * val = NULL;
1424 bool matches = false;
1425 bool from_cdt = false;
1426 switch (type) {
1427 case AS_PARTICLE_TYPE_INTEGER : {
1428 if ((type != as_sindex_pktype(qtr->si->imd))
1429 || (type != start->type)
1430 || (type != end->type)) {
1431 cf_debug(AS_QUERY, "query_record_matches: Type mismatch %d!=%d!=%d!=%d binname=%s index=%s",
1432 type, start->type, end->type, as_sindex_pktype(qtr->si->imd),
1433 qtr->si->imd->bname, qtr->si->imd->iname);
1434 matches = false;
1435 break;
1436 }
1437
1438 int64_t i = as_bin_particle_integer_value(b);
1439 if (skey->key.int_key != i) {
1440 cf_debug(AS_QUERY, "query_record_matches: sindex key does "
1441 "not matches bin value in record. bin value %ld skey value %ld", i, skey->key.int_key);
1442 matches = false;
1443 break;
1444 }
1445 matches = true;
1446 break;
1447 }
1448 case AS_PARTICLE_TYPE_STRING : {
1449 if ((type != as_sindex_pktype(qtr->si->imd))
1450 || (type != start->type)
1451 || (type != end->type)) {
1452 cf_debug(AS_QUERY, "query_record_matches: Type mismatch %d!=%d!=%d!=%d binname=%s index=%s",
1453 type, start->type, end->type, as_sindex_pktype(qtr->si->imd),
1454 qtr->si->imd->bname, qtr->si->imd->iname);
1455 matches = false;
1456 break;
1457 }
1458
1459 char * buf;
1460 uint32_t psz = as_bin_particle_string_ptr(b, &buf);
1461 cf_digest bin_digest;
1462 cf_digest_compute(buf, psz, &bin_digest);
1463 if (memcmp(&skey->key.str_key, &bin_digest, AS_DIGEST_KEY_SZ)) {
1464 matches = false;
1465 break;
1466 }
1467 matches = true;
1468 break;
1469 }
1470 case AS_PARTICLE_TYPE_GEOJSON : {
1471 if ((type != as_sindex_pktype(qtr->si->imd))
1472 || (type != start->type)
1473 || (type != end->type)) {
1474 cf_debug(AS_QUERY, "as_query_record_matches: Type mismatch %d!=%d!=%d!=%d binname=%s index=%s",
1475 type, start->type, end->type, as_sindex_pktype(qtr->si->imd),
1476 qtr->si->imd->bname, qtr->si->imd->iname);
1477 return false;
1478 }
1479
1480 bool iswithin = as_particle_geojson_match(b->particle,
1481 qtr->srange->cellid, qtr->srange->region,
1482 qtr->ns->geo2dsphere_within_strict);
1483
1484 // We either found a valid point or a false positive.
1485 if (iswithin) {
1486 cf_atomic64_incr(&qtr->ns->geo_region_query_points);
1487 }
1488 else {
1489 cf_atomic64_incr(&qtr->ns->geo_region_query_falsepos);
1490 }
1491
1492 return iswithin;
1493 }
1494 case AS_PARTICLE_TYPE_MAP : {
1495 val = as_bin_particle_to_asval(b);
1496 res_val = as_sindex_extract_val_from_path(qtr->si->imd, val);
1497 if (!res_val) {
1498 matches = false;
1499 break;
1500 }
1501 from_cdt = true;
1502 break;
1503 }
1504 case AS_PARTICLE_TYPE_LIST : {
1505 val = as_bin_particle_to_asval(b);
1506 res_val = as_sindex_extract_val_from_path(qtr->si->imd, val);
1507 if (!res_val) {
1508 matches = false;
1509 break;
1510 }
1511 from_cdt = true;
1512 break;
1513 }
1514 default: {
1515 break;
1516 }
1517 }
1518
1519 if (from_cdt) {
1520 if (res_val->type == AS_INTEGER) {
1521 // Defensive check.
1522 if (qtr->si->imd->itype == AS_SINDEX_ITYPE_DEFAULT) {
1523 matches = query_match_integer_fromval(qtr, res_val, skey);
1524 }
1525 else {
1526 matches = false;
1527 }
1528 }
1529 else if (res_val->type == AS_STRING) {
1530 // Defensive check.
1531 if (qtr->si->imd->itype == AS_SINDEX_ITYPE_DEFAULT) {
1532 matches = query_match_string_fromval(qtr, res_val, skey);
1533 }
1534 else {
1535 matches = false;
1536 }
1537 }
1538 else if (res_val->type == AS_MAP) {
1539 qtr_skey q_s;
1540 q_s.qtr = qtr;
1541 q_s.skey = skey;
1542 // Defensive check.
1543 if (qtr->si->imd->itype == AS_SINDEX_ITYPE_MAPKEYS) {
1544 as_map * map = as_map_fromval(res_val);
1545 matches = !as_map_foreach(map, query_match_mapkeys_foreach, &q_s);
1546 }
1547 else if (qtr->si->imd->itype == AS_SINDEX_ITYPE_MAPVALUES){
1548 as_map * map = as_map_fromval(res_val);
1549 matches = !as_map_foreach(map, query_match_mapvalues_foreach, &q_s);
1550 }
1551 else {
1552 matches = false;
1553 }
1554 }
1555 else if (res_val->type == AS_LIST) {
1556 qtr_skey q_s;
1557 q_s.qtr = qtr;
1558 q_s.skey = skey;
1559
1560 // Defensive check
1561 if (qtr->si->imd->itype == AS_SINDEX_ITYPE_LIST) {
1562 as_list * list = as_list_fromval(res_val);
1563 matches = !as_list_foreach(list, query_match_listele_foreach, &q_s);
1564 }
1565 else {
1566 matches = false;
1567 }
1568 }
1569 }
1570
1571 if (val) {
1572 as_val_destroy(val);
1573 }
1574 return matches;
1575}
1576
1577
1578
1579static int
1580query_io(as_query_transaction *qtr, cf_digest *dig, as_sindex_key * skey)
1581{
1582#if defined(USE_SYSTEMTAP)
1583 uint64_t nodeid = g_config.self_node;
1584#endif
1585
1586 as_namespace * ns = qtr->ns;
1587 as_partition_reservation rsv_stack;
1588 as_partition_reservation * rsv = &rsv_stack;
1589
1590 // We make sure while making digest list that current partition is query-able
1591 // Attempt the query reservation here as well. If this partition is not
1592 // query-able anymore then no need to return anything
1593 // Since we are reserving all the partitions upfront, this is a defensive check
1594 uint32_t pid = as_partition_getid(dig);
1595 rsv = query_reserve_partition(ns, qtr, pid, rsv);
1596 if (!rsv) {
1597 return AS_QUERY_OK;
1598 }
1599
1600 ASD_QUERY_IO_STARTING(nodeid, qtr->trid);
1601
1602 as_index_ref r_ref;
1603 int rec_rv = as_record_get_live(rsv->tree, dig, &r_ref, ns);
1604
1605 if (rec_rv == 0) {
1606 as_index *r = r_ref.r;
1607
1608 predexp_args_t predargs = { .ns = ns, .md = r, .vl = NULL, .rd = NULL };
1609 predexp_retval_t predrv = PREDEXP_TRUE;
1610
1611 if (qtr->predexp_eval != NULL) {
1612 predrv = predexp_matches_metadata(qtr->predexp_eval, &predargs);
1613
1614 if (predrv == PREDEXP_FALSE) {
1615 as_record_done(&r_ref, ns);
1616 goto CLEANUP;
1617 }
1618 }
1619
1620 // check to see this isn't a record waiting to die
1621 if (as_record_is_doomed(r, ns)) {
1622 as_record_done(&r_ref, ns);
1623 cf_debug(AS_QUERY,
1624 "build_response: record expired. treat as not found");
1625 // Not sending error message to client as per the agreement
1626 // that server will never send a error result code to the query client.
1627 goto CLEANUP;
1628 }
1629
1630 // make sure it's brought in from storage if necessary
1631 as_storage_rd rd;
1632 as_storage_record_open(ns, r, &rd);
1633 qtr->n_read_success += 1;
1634
1635 // TODO - even if qtr->no_bin_data is true, we still read bins in order
1636 // to check via query_record_matches() below. If sindex evolves to not
1637 // have to do that, optimize this case and bypass reading bins.
1638
1639 as_storage_rd_load_n_bins(&rd); // TODO - handle error returned
1640
1641 // Note: This array must stay in scope until the response
1642 // for this record has been built, since in the get
1643 // data w/ record on device case, it's copied by
1644 // reference directly into the record descriptor!
1645 as_bin stack_bins[rd.ns->storage_data_in_memory ? 0 : rd.n_bins];
1646
1647 // Figure out which bins you want - for now, all
1648 as_storage_rd_load_bins(&rd, stack_bins); // TODO - handle error returned
1649 rd.n_bins = as_bin_inuse_count(&rd);
1650
1651 // Now we have a record.
1652 predargs.rd = &rd;
1653
1654 if (predrv == PREDEXP_UNKNOWN &&
1655 ! predexp_matches_record(qtr->predexp_eval, &predargs)) {
1656 as_storage_record_close(&rd);
1657 as_record_done(&r_ref, ns);
1658 goto CLEANUP;
1659 }
1660
1661 // Call Back
1662 if (!query_record_matches(qtr, &rd, skey)) {
1663 as_storage_record_close(&rd);
1664 as_record_done(&r_ref, ns);
1665 query_release_partition(qtr, rsv);
1666 cf_atomic64_incr(&g_stats.query_false_positives);
1667 ASD_QUERY_IO_NOTMATCH(nodeid, qtr->trid);
1668 return AS_QUERY_OK;
1669 }
1670
1671 int ret = query_add_response(qtr, &rd);
1672 if (ret != 0) {
1673 as_storage_record_close(&rd);
1674 as_record_done(&r_ref, ns);
1675 qtr_set_err(qtr, AS_ERR_QUERY_CB, __FILE__, __LINE__);
1676 query_release_partition(qtr, rsv);
1677 ASD_QUERY_IO_ERROR(nodeid, qtr->trid);
1678 return AS_QUERY_ERR;
1679 }
1680 as_storage_record_close(&rd);
1681 as_record_done(&r_ref, ns);
1682 } else {
1683 // What do we do about empty records?
1684 // 1. Should gin up an empty record
1685 // 2. Current error is returned back to the client.
1686 cf_detail(AS_QUERY, "query_generator: "
1687 "as_record_get returned %d : key %"PRIx64, rec_rv,
1688 *(uint64_t *)dig);
1689 }
1690CLEANUP :
1691 query_release_partition(qtr, rsv);
1692
1693 ASD_QUERY_IO_FINISHED(nodeid, qtr->trid);
1694
1695 return AS_QUERY_OK;
1696}
1697// **************************************************************************************************
1698
1699/*
1700 * Query Aggregation Request Workhorse Function
1701 */
1702// **************************************************************************************************
1703static int
1704query_add_val_response(void *void_qtr, const as_val *val, bool success)
1705{
1706 as_query_transaction *qtr = (as_query_transaction *)void_qtr;
1707 if (!qtr) {
1708 return AS_QUERY_ERR;
1709 }
1710
1711 uint32_t msg_sz = as_particle_asval_client_value_size(val);
1712 if (0 == msg_sz) {
1713 cf_warning(AS_PROTO, "particle to buf: could not copy data!");
1714 }
1715
1716 cf_mutex_lock(&qtr->buf_mutex);
1717 cf_buf_builder *bb_r = qtr->bb_r;
1718 if (bb_r == NULL) {
1719 // Assert that query is aborted if bb_r is found to be null
1720 cf_mutex_unlock(&qtr->buf_mutex);
1721 return AS_QUERY_ERR;
1722 }
1723
1724 if (msg_sz > (bb_r->alloc_sz - bb_r->used_sz) && bb_r->used_sz != 0) {
1725 query_netio(qtr);
1726 }
1727
1728 as_msg_make_val_response_bufbuilder(val, &qtr->bb_r, msg_sz, success);
1729 cf_atomic64_incr(&qtr->n_result_records);
1730
1731 cf_mutex_unlock(&qtr->buf_mutex);
1732 return 0;
1733}
1734
1735
1736static void
1737query_add_result(char *res, as_query_transaction *qtr, bool success)
1738{
1739 const as_val * v = (as_val *) as_string_new (res, false);
1740 query_add_val_response((void *) qtr, v, success);
1741 as_val_destroy(v);
1742}
1743
1744
1745static int
1746query_process_aggreq(query_work *qagg)
1747{
1748 as_query_transaction *qtr = qagg->qtr;
1749 if (!qtr) {
1750 return AS_QUERY_ERR;
1751 }
1752
1753 if (!cf_ll_size(qagg->recl)) {
1754 return AS_QUERY_ERR;
1755 }
1756
1757 as_result *res = as_result_new();
1758 int ret = as_aggr_process(qtr->ns, &qtr->agg_call, qagg->recl, (void *)qtr, res);
1759
1760 if (ret != 0) {
1761 char *rs = as_module_err_string(ret);
1762 if (res->value != NULL) {
1763 as_string * lua_s = as_string_fromval(res->value);
1764 char * lua_err = (char *) as_string_tostring(lua_s);
1765 if (lua_err != NULL) {
1766 int l_rs_len = strlen(rs);
1767 rs = cf_realloc(rs,l_rs_len + strlen(lua_err) + 4);
1768 sprintf(&rs[l_rs_len]," : %s",lua_err);
1769 }
1770 }
1771 query_add_result(rs, qtr, false);
1772 cf_free(rs);
1773 }
1774 as_result_destroy(res);
1775 return ret;
1776}
1777// **************************************************************************************************
1778
1779
1780/*
1781 * Aggregation HOOKS
1782 */
1783// **************************************************************************************************
1784as_stream_status
1785agg_ostream_write(void *udata, as_val *v)
1786{
1787 as_query_transaction *qtr = (as_query_transaction *)udata;
1788 if (!v) {
1789 return AS_STREAM_OK;
1790 }
1791 int ret = AS_STREAM_OK;
1792 if (query_add_val_response((void *)qtr, v, true)) {
1793 ret = AS_STREAM_ERR;
1794 }
1795 as_val_destroy(v);
1796 return ret;
1797}
1798
1799static as_partition_reservation *
1800agg_reserve_partition(void *udata, as_namespace *ns, uint32_t pid, as_partition_reservation *rsv)
1801{
1802 return query_reserve_partition(ns, (as_query_transaction *)udata, pid, rsv);
1803}
1804
1805static void
1806agg_release_partition(void *udata, as_partition_reservation *rsv)
1807{
1808 query_release_partition((as_query_transaction *)udata, rsv);
1809}
1810
1811static void
1812agg_set_error(void * udata, int err)
1813{
1814 qtr_set_err((as_query_transaction *)udata, AS_ERR_QUERY_CB, __FILE__, __LINE__);
1815}
1816
1817// true if matches
1818static bool
1819agg_record_matches(void *udata, udf_record *urecord, void *key_data)
1820{
1821 as_query_transaction * qtr = (as_query_transaction*)udata;
1822 as_sindex_key *skey = (void *)key_data;
1823 qtr->n_read_success++;
1824 if (query_record_matches(qtr, urecord->rd, skey) == false) {
1825 cf_atomic64_incr(&g_stats.query_false_positives); // PUT IT INSIDE PRE_CHECK
1826 return false;
1827 }
1828 return true;
1829}
1830
1831const as_aggr_hooks query_aggr_hooks = {
1832 .ostream_write = agg_ostream_write,
1833 .set_error = agg_set_error,
1834 .ptn_reserve = agg_reserve_partition,
1835 .ptn_release = agg_release_partition,
1836 .pre_check = agg_record_matches
1837};
1838// **************************************************************************************************
1839
1840
1841
1842
1843
1844/*
1845 * Query Request UDF functions
1846 */
1847// **************************************************************************************************
1848// NB: Caller holds a write hash lock _BE_CAREFUL_ if you intend to take
1849// lock inside this function
1850void
1851query_udf_bg_tr_complete(void *udata, int result)
1852{
1853 as_query_transaction *qtr = (as_query_transaction *)udata;
1854
1855 cf_assert(qtr != NULL, AS_QUERY, "complete called with null udata");
1856
1857 qtr_finish_work(qtr, &qtr->n_udf_tr_queued, __FILE__, __LINE__, true);
1858}
1859
1860// Creates a internal transaction for per record UDF execution triggered
1861// from inside generator. The generator could be scan job generating digest
1862// or query generating digest.
1863int
1864query_udf_bg_tr_start(as_query_transaction *qtr, cf_digest *keyd)
1865{
1866 if (qtr->iudf_orig.predexp != NULL) {
1867 as_partition_reservation rsv_stack;
1868 as_partition_reservation *rsv = &rsv_stack;
1869 uint32_t pid = as_partition_getid(keyd);
1870
1871 if (! (rsv = query_reserve_partition(qtr->ns, qtr, pid, rsv))) {
1872 return AS_QUERY_OK;
1873 }
1874
1875 as_index_ref r_ref;
1876
1877 if (as_record_get_live(rsv->tree, keyd, &r_ref, qtr->ns) != 0) {
1878 query_release_partition(qtr, rsv);
1879 return AS_QUERY_OK;
1880 }
1881
1882 predexp_args_t predargs = {
1883 .ns = qtr->ns, .md = r_ref.r, .vl = NULL, .rd = NULL
1884 };
1885
1886 if (predexp_matches_metadata(qtr->iudf_orig.predexp, &predargs) ==
1887 PREDEXP_FALSE) {
1888 as_record_done(&r_ref, qtr->ns);
1889 query_release_partition(qtr, rsv);
1890 as_incr_uint64(&qtr->ns->n_udf_sub_udf_filtered_out);
1891 return AS_QUERY_OK;
1892 }
1893
1894 as_record_done(&r_ref, qtr->ns);
1895 query_release_partition(qtr, rsv);
1896 }
1897
1898 as_transaction tr;
1899
1900 as_transaction_init_iudf(&tr, qtr->ns, keyd, &qtr->iudf_orig);
1901
1902 qtr_reserve(qtr, __FILE__, __LINE__);
1903 cf_atomic32_incr(&qtr->n_udf_tr_queued);
1904
1905 as_service_enqueue_internal(&tr);
1906
1907 return AS_QUERY_OK;
1908}
1909
1910static int
1911query_process_udfreq(query_work *qudf)
1912{
1913 int ret = AS_QUERY_OK;
1914 cf_ll_element * ele = NULL;
1915 cf_ll_iterator * iter = NULL;
1916 as_query_transaction *qtr = qudf->qtr;
1917 if (!qtr) return AS_QUERY_ERR;
1918 cf_detail(AS_QUERY, "Performing UDF");
1919 iter = cf_ll_getIterator(qudf->recl, true /*forward*/);
1920 if (!iter) {
1921 ret = AS_QUERY_ERR;
1922 qtr_set_err(qtr, AS_SINDEX_ERR_NO_MEMORY, __FILE__, __LINE__);
1923 goto Cleanup;
1924 }
1925
1926 while ((ele = cf_ll_getNext(iter))) {
1927 as_index_keys_ll_element * node;
1928 node = (as_index_keys_ll_element *) ele;
1929 as_index_keys_arr * keys_arr = node->keys_arr;
1930 if (!keys_arr) {
1931 continue;
1932 }
1933 node->keys_arr = NULL;
1934
1935 for (int i = 0; i < keys_arr->num; i++) {
1936
1937 while (cf_atomic32_get(qtr->n_udf_tr_queued) >= (AS_QUERY_MAX_UDF_TRANSACTIONS * (qtr->priority / 10 + 1))) {
1938 usleep(g_config.query_sleep_us);
1939 query_check_timeout(qtr);
1940 if (qtr_failed(qtr)) {
1941 ret = AS_QUERY_ERR;
1942 goto Cleanup;
1943 }
1944 }
1945
1946 if (AS_QUERY_ERR == query_udf_bg_tr_start(qtr, &keys_arr->pindex_digs[i])) {
1947 as_index_keys_release_arr_to_queue(keys_arr);
1948 ret = AS_QUERY_ERR;
1949 goto Cleanup;
1950 }
1951 }
1952 as_index_keys_release_arr_to_queue(keys_arr);
1953 }
1954Cleanup:
1955 if (iter) {
1956 cf_ll_releaseIterator(iter);
1957 iter = NULL;
1958 }
1959 return ret;
1960}
1961// **************************************************************************************************
1962
1963
1964
1965
1966
1967/*
1968 * Query Request ops functions
1969 */
1970// **************************************************************************************************
1971// NB: Caller holds a write hash lock _BE_CAREFUL_ if you intend to take
1972// lock inside this function
1973void
1974query_ops_bg_tr_complete(void *udata, int result)
1975{
1976 as_query_transaction *qtr = (as_query_transaction *)udata;
1977
1978 cf_assert(qtr != NULL, AS_QUERY, "complete called with null udata");
1979
1980 qtr_finish_work(qtr, &qtr->n_ops_tr_queued, __FILE__, __LINE__, true);
1981}
1982
1983// Creates a internal transaction for per record ops execution triggered
1984// from inside generator. The generator could be scan job generating digest
1985// or query generating digest.
1986int
1987query_ops_bg_tr_start(as_query_transaction *qtr, cf_digest *keyd)
1988{
1989 if (qtr->iops_orig.predexp != NULL) {
1990 as_partition_reservation rsv_stack;
1991 as_partition_reservation *rsv = &rsv_stack;
1992 uint32_t pid = as_partition_getid(keyd);
1993
1994 if (! (rsv = query_reserve_partition(qtr->ns, qtr, pid, rsv))) {
1995 return AS_QUERY_OK;
1996 }
1997
1998 as_index_ref r_ref;
1999
2000 if (as_record_get_live(rsv->tree, keyd, &r_ref, qtr->ns) != 0) {
2001 query_release_partition(qtr, rsv);
2002 return AS_QUERY_OK;
2003 }
2004
2005 predexp_args_t predargs = {
2006 .ns = qtr->ns, .md = r_ref.r, .vl = NULL, .rd = NULL
2007 };
2008
2009 if (predexp_matches_metadata(qtr->iops_orig.predexp, &predargs) ==
2010 PREDEXP_FALSE) {
2011 as_record_done(&r_ref, qtr->ns);
2012 query_release_partition(qtr, rsv);
2013 as_incr_uint64(&qtr->ns->n_ops_sub_write_filtered_out);
2014 return AS_QUERY_OK;
2015 }
2016
2017 as_record_done(&r_ref, qtr->ns);
2018 query_release_partition(qtr, rsv);
2019 }
2020
2021 as_transaction tr;
2022
2023 as_transaction_init_iops(&tr, qtr->ns, keyd, &qtr->iops_orig);
2024
2025 qtr_reserve(qtr, __FILE__, __LINE__);
2026 cf_atomic32_incr(&qtr->n_ops_tr_queued);
2027
2028 as_service_enqueue_internal(&tr);
2029
2030 return AS_QUERY_OK;
2031}
2032
2033static int
2034query_process_opsreq(query_work *qops)
2035{
2036 int ret = AS_QUERY_OK;
2037 cf_ll_element * ele = NULL;
2038 cf_ll_iterator * iter = NULL;
2039 as_query_transaction *qtr = qops->qtr;
2040 if (!qtr) return AS_QUERY_ERR;
2041 cf_detail(AS_QUERY, "Performing ops");
2042 iter = cf_ll_getIterator(qops->recl, true /*forward*/);
2043 if (!iter) {
2044 ret = AS_QUERY_ERR;
2045 qtr_set_err(qtr, AS_SINDEX_ERR_NO_MEMORY, __FILE__, __LINE__);
2046 goto Cleanup;
2047 }
2048
2049 while ((ele = cf_ll_getNext(iter))) {
2050 as_index_keys_ll_element * node;
2051 node = (as_index_keys_ll_element *) ele;
2052 as_index_keys_arr * keys_arr = node->keys_arr;
2053 if (!keys_arr) {
2054 continue;
2055 }
2056 node->keys_arr = NULL;
2057
2058 for (int i = 0; i < keys_arr->num; i++) {
2059
2060 while (cf_atomic32_get(qtr->n_ops_tr_queued) >= (AS_QUERY_MAX_OPS_TRANSACTIONS * (qtr->priority / 10 + 1))) {
2061 usleep(g_config.query_sleep_us);
2062 query_check_timeout(qtr);
2063 if (qtr_failed(qtr)) {
2064 ret = AS_QUERY_ERR;
2065 goto Cleanup;
2066 }
2067 }
2068
2069 if (AS_QUERY_ERR == query_ops_bg_tr_start(qtr, &keys_arr->pindex_digs[i])) {
2070 as_index_keys_release_arr_to_queue(keys_arr);
2071 ret = AS_QUERY_ERR;
2072 goto Cleanup;
2073 }
2074 }
2075 as_index_keys_release_arr_to_queue(keys_arr);
2076 }
2077Cleanup:
2078 if (iter) {
2079 cf_ll_releaseIterator(iter);
2080 iter = NULL;
2081 }
2082 return ret;
2083}
2084// **************************************************************************************************
2085
2086
2087
2088
2089static int
2090query_process_ioreq(query_work *qio)
2091{
2092
2093#if defined(USE_SYSTEMTAP)
2094 uint64_t nodeid = g_config.self_node;
2095#endif
2096
2097 as_query_transaction *qtr = qio->qtr;
2098 if (!qtr) {
2099 return AS_QUERY_ERR;
2100 }
2101
2102 ASD_QUERY_IOREQ_STARTING(nodeid, qtr->trid);
2103
2104 cf_ll_element * ele = NULL;
2105 cf_ll_iterator * iter = NULL;
2106
2107 cf_detail(AS_QUERY, "Performing IO");
2108 uint64_t time_ns = 0;
2109 if (g_config.query_enable_histogram || qtr->si->enable_histogram) {
2110 time_ns = cf_getns();
2111 }
2112 iter = cf_ll_getIterator(qio->recl, true /*forward*/);
2113 if (!iter) {
2114 cf_crash(AS_QUERY, "Cannot allocate iterator... out of memory !!");
2115 }
2116
2117 while ((ele = cf_ll_getNext(iter))) {
2118 as_index_keys_ll_element * node;
2119 node = (as_index_keys_ll_element *) ele;
2120 as_index_keys_arr *keys_arr = node->keys_arr;
2121 if (!keys_arr) {
2122 continue;
2123 }
2124 node->keys_arr = NULL;
2125 for (int i = 0; i < keys_arr->num; i++) {
2126 if (AS_QUERY_OK != query_io(qtr, &keys_arr->pindex_digs[i], &keys_arr->sindex_keys[i])) {
2127 as_index_keys_release_arr_to_queue(keys_arr);
2128 goto Cleanup;
2129 }
2130
2131 int64_t nresults = cf_atomic64_get(qtr->n_result_records);
2132 if (nresults > 0 && (nresults % qtr->priority == 0))
2133 {
2134 usleep(g_config.query_sleep_us);
2135 query_check_timeout(qtr);
2136 if (qtr_failed(qtr)) {
2137 as_index_keys_release_arr_to_queue(keys_arr);
2138 goto Cleanup;
2139 }
2140 }
2141 }
2142 as_index_keys_release_arr_to_queue(keys_arr);
2143 }
2144Cleanup:
2145
2146 if (iter) {
2147 cf_ll_releaseIterator(iter);
2148 iter = NULL;
2149 }
2150 QUERY_HIST_INSERT_DATA_POINT(query_batch_io_hist, time_ns);
2151 SINDEX_HIST_INSERT_DATA_POINT(qtr->si, query_batch_io, time_ns);
2152
2153 ASD_QUERY_IOREQ_FINISHED(nodeid, qtr->trid);
2154
2155 return AS_QUERY_OK;
2156}
2157
2158// **************************************************************************************************
2159
2160
2161/*
2162 * Query Request Processing
2163 */
2164// **************************************************************************************************
2165static int
2166qwork_process(query_work *qworkp)
2167{
2168 QUERY_HIST_INSERT_DATA_POINT(query_batch_io_q_wait_hist, qworkp->queued_time_ns);
2169 cf_detail(AS_QUERY, "Processing Request %d", qworkp->type);
2170 if (qtr_failed(qworkp->qtr)) {
2171 return AS_QUERY_ERR;
2172 }
2173 int ret = AS_QUERY_OK;
2174 switch (qworkp->type) {
2175 case QUERY_WORK_TYPE_LOOKUP:
2176 ret = query_process_ioreq(qworkp);
2177 break;
2178 case QUERY_WORK_TYPE_UDF_BG: // Does it need different call ??
2179 ret = query_process_udfreq(qworkp);
2180 break;
2181 case QUERY_WORK_TYPE_OPS_BG: // Does it need different call ??
2182 ret = query_process_opsreq(qworkp);
2183 break;
2184 case QUERY_WORK_TYPE_AGG:
2185 ret = query_process_aggreq(qworkp);
2186 break;
2187 default:
2188 cf_warning(AS_QUERY, "Unsupported query type %d.. Dropping it", qworkp->type);
2189 break;
2190 }
2191 return ret;
2192}
2193
2194static void
2195qwork_setup(query_work *qworkp, as_query_transaction *qtr)
2196{
2197 qtr_reserve(qtr, __FILE__, __LINE__);
2198 qworkp->qtr = qtr;
2199 qworkp->recl = qtr->qctx.recl;
2200 qtr->qctx.recl = NULL;
2201 qworkp->queued_time_ns = cf_getns();
2202 qtr->n_digests += qtr->qctx.n_bdigs;
2203 qtr->qctx.n_bdigs = 0;
2204
2205 switch (qtr->job_type) {
2206 case QUERY_TYPE_LOOKUP:
2207 qworkp->type = QUERY_WORK_TYPE_LOOKUP;
2208 break;
2209 case QUERY_TYPE_AGGR:
2210 qworkp->type = QUERY_WORK_TYPE_AGG;
2211 break;
2212 case QUERY_TYPE_UDF_BG:
2213 qworkp->type = QUERY_WORK_TYPE_UDF_BG;
2214 break;
2215 case QUERY_TYPE_OPS_BG:
2216 qworkp->type = QUERY_WORK_TYPE_OPS_BG;
2217 break;
2218 default:
2219 cf_crash(AS_QUERY, "Unknown Query Type !!");
2220 }
2221}
2222
2223static void
2224qwork_teardown(query_work *qworkp)
2225{
2226 if (qworkp->recl) {
2227 cf_ll_reduce(qworkp->recl, true /*forward*/, as_index_keys_ll_reduce_fn, NULL);
2228 cf_free(qworkp->recl);
2229 qworkp->recl = NULL;
2230 }
2231 qtr_release(qworkp->qtr, __FILE__, __LINE__);
2232 qworkp->qtr = NULL;
2233}
2234// **************************************************************************************************
2235
2236
2237void *
2238qwork_th(void *q_to_wait_on)
2239{
2240 unsigned int thread_id = cf_atomic32_incr(&g_query_worker_threadcnt);
2241 cf_detail(AS_QUERY, "Created Query Worker Thread %d", thread_id);
2242 query_work * qworkp = NULL;
2243 int ret = AS_QUERY_OK;
2244
2245 while (1) {
2246 // Kill self if thread id is greater than that of number of configured
2247 // Config change should be flag for quick check
2248 if (thread_id > g_config.query_worker_threads) {
2249 pthread_rwlock_rdlock(&g_query_lock);
2250 if (thread_id > g_config.query_worker_threads) {
2251 cf_atomic32_decr(&g_query_worker_threadcnt);
2252 pthread_rwlock_unlock(&g_query_lock);
2253 cf_detail(AS_QUERY, "Query Worker thread %d exited", thread_id);
2254 return NULL;
2255 }
2256 pthread_rwlock_unlock(&g_query_lock);
2257 }
2258 if (cf_queue_pop(g_query_work_queue, &qworkp, CF_QUEUE_FOREVER) != 0) {
2259 cf_crash(AS_QUERY, "Failed to pop from Query worker queue.");
2260 }
2261 cf_detail(AS_QUERY, "Popped I/O work [%p,%p]", qworkp, qworkp->qtr);
2262
2263 ret = qwork_process(qworkp);
2264
2265 as_query_transaction *qtr = qworkp->qtr;
2266 if ((ret != AS_QUERY_OK) && !qtr_failed(qtr)) {
2267 cf_warning(AS_QUERY, "Request processing failed but query is not qtr_failed .... ret %d", ret);
2268 }
2269 qtr_finish_work(qtr, &qtr->n_qwork_active, __FILE__, __LINE__, false);
2270 qwork_teardown(qworkp);
2271 qwork_poolrelease(qworkp);
2272 }
2273
2274 return NULL;
2275}
2276
2277/*
2278 * Query Generator
2279 */
2280// **************************************************************************************************
2281/*
2282 * Function query_get_nextbatch
2283 *
2284 * Notes-
2285 * Function generates the next batch of digest list after looking up
2286 * secondary index tree. The function populates qctx->recl with the
2287 * digest list.
2288 *
2289 * Returns
2290 * AS_QUERY_OK: If the batch is full qctx->n_bdigs == qctx->bsize. The caller
2291 * then processes the batch and reset the qctx->recl and qctx->n_bdigs.
2292 *
2293 * AS_QUERY_CONTINUE: If the caller should continue calling this function.
2294 *
2295 * AS_QUERY_ERR: In case of error
2296 */
2297int
2298query_get_nextbatch(as_query_transaction *qtr)
2299{
2300 int ret = AS_QUERY_OK;
2301 as_sindex *si = qtr->si;
2302 as_sindex_qctx *qctx = &qtr->qctx;
2303 uint64_t time_ns = 0;
2304 if (g_config.query_enable_histogram
2305 || qtr->si->enable_histogram) {
2306 time_ns = cf_getns();
2307 }
2308
2309 as_sindex_range *srange = &qtr->srange[qctx->range_index];
2310
2311 if (qctx->pimd_idx == -1) {
2312 if (!srange->isrange) {
2313 qctx->pimd_idx = ai_btree_key_hash_from_sbin(si->imd, &srange->start);
2314 } else {
2315 qctx->pimd_idx = 0;
2316 }
2317 }
2318
2319 if (!qctx->recl) {
2320 qctx->recl = cf_malloc(sizeof(cf_ll));
2321 cf_ll_init(qctx->recl, as_index_keys_ll_destroy_fn, false /*no lock*/);
2322 qctx->n_bdigs = 0;
2323 } else {
2324 // Following condition may be true if the
2325 // query has moved from short query pool to
2326 // long running query pool
2327 if (qctx->n_bdigs >= qctx->bsize)
2328 return ret;
2329 }
2330
2331 // Query Aerospike Index
2332 int qret = as_sindex_query(qtr->si, srange, &qtr->qctx);
2333 cf_detail(AS_QUERY, "start %ld end %ld @ %d pimd found %"PRIu64, srange->start.u.i64, srange->end.u.i64, qctx->pimd_idx, qctx->n_bdigs);
2334
2335 qctx->new_ibtr = false;
2336 if (qret < 0) { // [AS_SINDEX_OK, AS_SINDEX_CONTINUE] -> OK
2337 qtr_set_err(qtr, as_sindex_err_to_clienterr(qret, __FILE__, __LINE__), __FILE__, __LINE__);
2338 ret = AS_QUERY_ERR;
2339 goto batchout;
2340 }
2341
2342 if (time_ns) {
2343 if (g_config.query_enable_histogram) {
2344 qtr->querying_ai_time_ns += cf_getns() - time_ns;
2345 } else if (qtr->si->enable_histogram) {
2346 SINDEX_HIST_INSERT_DATA_POINT(qtr->si, query_batch_lookup, time_ns);
2347 }
2348 }
2349 if (qctx->n_bdigs < qctx->bsize) {
2350 qctx->new_ibtr = true;
2351 qctx->nbtr_done = false;
2352 qctx->pimd_idx++;
2353 cf_detail(AS_QUERY, "All the Data finished moving to next tree %d", qctx->pimd_idx);
2354 if (!srange->isrange) {
2355 qtr->result_code = AS_OK;
2356 ret = AS_QUERY_DONE;
2357 goto batchout;
2358 }
2359 if (qctx->pimd_idx == si->imd->nprts) {
2360
2361 // Geospatial queries need to search multiple ranges. The
2362 // srange object is a vector of MAX_REGION_CELLS elements.
2363 // We iterate over ranges until we encounter an empty
2364 // srange (num_binval == 0).
2365 //
2366 if (qctx->range_index == (MAX_REGION_CELLS - 1) ||
2367 qtr->srange[qctx->range_index+1].num_binval == 0) {
2368 qtr->result_code = AS_OK;
2369 ret = AS_QUERY_DONE;
2370 goto batchout;
2371 }
2372 qctx->range_index++;
2373 qctx->pimd_idx = -1;
2374 }
2375 ret = AS_QUERY_CONTINUE;
2376 goto batchout;
2377 }
2378batchout:
2379 return ret;
2380}
2381
2382
2383/*
2384 * Phase II setup just after the generator picks up query for
2385 * the first time
2386 */
2387static int
2388query_run_setup(as_query_transaction *qtr)
2389{
2390
2391#if defined(USE_SYSTEMTAP)
2392 uint64_t nodeid = g_config.self_node;
2393#endif
2394
2395 QUERY_HIST_INSERT_DATA_POINT(query_query_q_wait_hist, qtr->start_time);
2396 cf_atomic64_set(&qtr->n_result_records, 0);
2397 qtr->track = false;
2398 qtr->querying_ai_time_ns = 0;
2399 qtr->n_io_outstanding = 0;
2400 qtr->netio_push_seq = 0;
2401 qtr->netio_pop_seq = 1;
2402 qtr->blocking = false;
2403 cf_mutex_init(&qtr->buf_mutex);
2404
2405 // Aerospike Index object initialization
2406 qtr->result_code = AS_OK;
2407
2408 // Initialize qctx
2409 // start with the threshold value
2410 qtr->qctx.bsize = g_config.query_threshold;
2411 qtr->qctx.new_ibtr = true;
2412 qtr->qctx.nbtr_done = false;
2413 qtr->qctx.pimd_idx = -1;
2414 qtr->qctx.recl = NULL;
2415 qtr->qctx.n_bdigs = 0;
2416 qtr->qctx.range_index = 0;
2417 qtr->qctx.partitions_pre_reserved = g_config.partitions_pre_reserved;
2418 qtr->qctx.bkey = &qtr->bkey;
2419 init_ai_obj(qtr->qctx.bkey);
2420 bzero(&qtr->qctx.bdig, sizeof(cf_digest));
2421 // Populate all the paritions for which this partition is query-able
2422 as_query_pre_reserve_partitions(qtr);
2423
2424 qtr->priority = g_config.query_priority;
2425 qtr->bb_r = bb_poolrequest();
2426 cf_buf_builder_reserve(&qtr->bb_r, 8, NULL);
2427
2428 qtr_set_running(qtr);
2429 cf_atomic64_incr(&qtr->ns->query_short_reqs);
2430 cf_atomic32_incr(&g_query_short_running);
2431
2432 // This needs to be distant from the initialization of nodeid to
2433 // workaround a lame systemtap/compiler interaction.
2434 ASD_QUERY_INIT(nodeid, qtr->trid);
2435
2436 return AS_QUERY_OK;
2437}
2438
2439static int
2440query_qtr_enqueue(as_query_transaction *qtr, bool is_requeue)
2441{
2442 uint64_t limit = 0;
2443 uint32_t size = 0;
2444 cf_queue * q;
2445 cf_atomic64 * queue_full_err;
2446 if (qtr->short_running) {
2447 limit = g_config.query_short_q_max_size;
2448 size = cf_atomic32_get(g_query_short_running);
2449 q = g_query_short_queue;
2450 queue_full_err = &qtr->ns->query_short_queue_full;
2451 }
2452 else {
2453 limit = g_config.query_long_q_max_size;
2454 size = cf_atomic32_get(g_query_long_running);
2455 q = g_query_long_queue;
2456 queue_full_err = &qtr->ns->query_long_queue_full;
2457 }
2458
2459 // Allow requeue without limit check, to cover for dynamic
2460 // config change while query
2461 if (!is_requeue && (size > limit)) {
2462 cf_atomic64_incr(queue_full_err);
2463 return AS_QUERY_ERR;
2464 } else {
2465 cf_queue_push(q, &qtr);
2466 cf_detail(AS_QUERY, "Logged query ");
2467 }
2468
2469 return AS_QUERY_OK;
2470}
2471
2472int
2473query_requeue(as_query_transaction *qtr)
2474{
2475 int ret = AS_QUERY_OK;
2476 if (query_qtr_enqueue(qtr, true) != 0) {
2477 cf_warning(AS_QUERY, "Queuing Error... continue!!");
2478 qtr_set_err(qtr, AS_ERR_QUERY_QUEUE_FULL, __FILE__, __LINE__);
2479 ret = AS_QUERY_ERR;
2480 } else {
2481 cf_detail(AS_QUERY, "Query Queued Due to Network");
2482 ret = AS_QUERY_OK;
2483 }
2484 return ret;
2485}
2486
2487static void
2488qtr_finish_work(as_query_transaction *qtr, cf_atomic32 *stat, char *fname, int lineno, bool release)
2489{
2490 qtr_lock(qtr);
2491 uint32_t val = cf_atomic32_decr(stat);
2492 if ((val == 0) && qtr->do_requeue) {
2493 query_requeue(qtr);
2494 cf_detail(AS_QUERY, "(%s:%d) Job Requeued %p", fname, lineno, qtr);
2495 qtr->do_requeue = false;
2496 }
2497 qtr_unlock(qtr);
2498 if (release) {
2499 qtr_release(qtr, fname, lineno);
2500 }
2501}
2502
2503//
2504// 0: Successfully requeued
2505// -1: Query Err
2506// 1: Not requeued continue
2507// 2: Query finished
2508//
2509static int
2510query_qtr_check_and_requeue(as_query_transaction *qtr)
2511{
2512 bool do_enqueue = false;
2513 // Step 1: If the query batch is done then wait for number of outstanding qwork to
2514 // finish. This may slow down query responses get the better model
2515 if (qtr_finished(qtr)) {
2516 if ((cf_atomic32_get(qtr->n_qwork_active) == 0)
2517 && (cf_atomic32_get(qtr->n_io_outstanding) == 0)
2518 && (cf_atomic32_get(qtr->n_udf_tr_queued) == 0)) {
2519 cf_detail(AS_QUERY, "Request is finished");
2520 return AS_QUERY_DONE;
2521 }
2522 do_enqueue = true;
2523 cf_detail(AS_QUERY, "Request not finished qwork(%d) io(%d)", cf_atomic32_get(qtr->n_qwork_active), cf_atomic32_get(qtr->n_io_outstanding));
2524 }
2525
2526 // Step 2: Client is slow requeue
2527 if (query_netio_wait(qtr) != AS_QUERY_OK) {
2528 do_enqueue = true;
2529 }
2530
2531 // Step 3: Check to see if this is long running query. This is determined by
2532 // checking number of records read. Please note that it makes sure the false
2533 // entries in secondary index does not effect this decision. All short running
2534 // queries perform I/O in the batch thread context.
2535 if ((cf_atomic64_get(qtr->n_result_records) >= g_config.query_threshold)
2536 && qtr->short_running) {
2537 qtr->short_running = false;
2538 // Change batch size to the long running job batch size value
2539 qtr->qctx.bsize = g_config.query_bsize;
2540 cf_atomic32_decr(&g_query_short_running);
2541 cf_atomic32_incr(&g_query_long_running);
2542 cf_atomic64_incr(&qtr->ns->query_long_reqs);
2543 cf_atomic64_decr(&qtr->ns->query_short_reqs);
2544 cf_detail(AS_QUERY, "Query Queued Into Long running thread pool %ld %d", cf_atomic64_get(qtr->n_result_records), qtr->short_running);
2545 do_enqueue = true;
2546 }
2547
2548 if (do_enqueue) {
2549 int ret = AS_QUERY_OK;
2550 qtr_lock(qtr);
2551 if ((cf_atomic32_get(qtr->n_qwork_active) != 0)
2552 || (cf_atomic32_get(qtr->n_io_outstanding) != 0)
2553 || (cf_atomic32_get(qtr->n_udf_tr_queued) != 0)) {
2554 cf_detail(AS_QUERY, "Job Setup for Requeue %p", qtr);
2555
2556 // Release of one of the above will perform requeue... look for
2557 // qtr_finish_work();
2558 qtr->do_requeue = true;
2559 ret = AS_QUERY_OK;
2560 } else {
2561 ret = query_requeue(qtr);
2562 }
2563 qtr_unlock(qtr);
2564 return ret;
2565 }
2566
2567 return AS_QUERY_CONTINUE;
2568}
2569static bool
2570query_process_inline(as_query_transaction *qtr)
2571{
2572 if ( g_config.query_req_in_query_thread
2573 || (cf_atomic32_get((qtr)->n_qwork_active) > g_config.query_req_max_inflight)
2574 || (qtr && qtr->short_running)
2575 || (qtr && qtr_finished(qtr))) {
2576 return true;
2577 }
2578 else {
2579 return false;
2580 }
2581}
2582/*
2583 * Process the query work either inilne or pass it on to the
2584 * worker thread
2585 *
2586 * Returns
2587 * -1 : Fail
2588 * 0 : Success
2589 */
2590static int
2591qtr_process(as_query_transaction *qtr)
2592{
2593 if (query_process_inline(qtr)) {
2594 query_work qwork;
2595 qwork_setup(&qwork, qtr);
2596
2597 int ret = qwork_process(&qwork);
2598
2599 qwork_teardown(&qwork);
2600 return ret;
2601
2602 } else {
2603 query_work *qworkp = qwork_poolrequest();
2604 if (!qworkp) {
2605 cf_warning(AS_QUERY, "Could not allocate query "
2606 "request structure .. out of memory .. Aborting !!!");
2607 return AS_QUERY_ERR;
2608 }
2609 // Successfully queued
2610 cf_atomic32_incr(&qtr->n_qwork_active);
2611 qwork_setup(qworkp, qtr);
2612 cf_queue_push(g_query_work_queue, &qworkp);
2613
2614 }
2615 return AS_QUERY_OK;
2616}
2617
2618static int
2619query_check_bound(as_query_transaction *qtr)
2620{
2621 if (cf_atomic64_get(qtr->n_result_records) > g_config.query_rec_count_bound) {
2622 return AS_QUERY_ERR;
2623 }
2624 return AS_QUERY_OK;
2625}
2626/*
2627 * Function query_generator
2628 *
2629 * Does the following
2630 * 1. Calls the sindex layer for fetching digest list
2631 * 2. If short running query performs I/O inline and for long running query
2632 * queues it up for work threads to execute.
2633 * 3. If the query is short_running and has hit threshold. Requeue it for
2634 * long running generator threads
2635 *
2636 * Returns -
2637 * Nothing, sets the qtr status accordingly
2638 */
2639static void
2640query_generator(as_query_transaction *qtr)
2641{
2642#if defined(USE_SYSTEMTAP)
2643 uint64_t nodeid = g_config.self_node;
2644 uint64_t trid = qtr->trid;
2645 size_t nrecs = 0;
2646#endif
2647
2648 // Query can get requeue for many different reason. Check if it is
2649 // already started before indulging in act to setting it up for run
2650 if (!qtr_started(qtr)) {
2651 query_run_setup(qtr);
2652 }
2653
2654 int loop = 0;
2655 while (true) {
2656
2657 // Step 1: Check for requeue
2658 int ret = query_qtr_check_and_requeue(qtr);
2659 if (ret == AS_QUERY_ERR) {
2660 cf_warning(AS_QUERY, "Unexpected requeue failure .. shutdown connection.. abort!!");
2661 qtr_set_abort(qtr, AS_ERR_QUERY_NET_IO, __FILE__, __LINE__);
2662 break;
2663 } else if (ret == AS_QUERY_DONE) {
2664 break;
2665 } else if (ret == AS_QUERY_OK) {
2666 return;
2667 }
2668 // Step 2: Check for timeout
2669 query_check_timeout(qtr);
2670 if (qtr_failed(qtr)) {
2671 qtr_set_err(qtr, AS_ERR_QUERY_TIMEOUT, __FILE__, __LINE__);
2672 continue;
2673 }
2674 // Step 3: Conditionally track
2675 if (hash_track_qtr(qtr)) {
2676 qtr_set_err(qtr, AS_ERR_QUERY_DUPLICATE, __FILE__, __LINE__);
2677 continue;
2678 }
2679
2680 // Step 4: If needs user based abort
2681 if (query_check_bound(qtr)) {
2682 qtr_set_err(qtr, AS_ERR_QUERY_USER_ABORT, __FILE__, __LINE__);
2683 continue;
2684 }
2685
2686 // Step 5: Get Next Batch
2687 loop++;
2688 int qret = query_get_nextbatch(qtr);
2689
2690 cf_detail(AS_QUERY, "Loop=%d, Selected=%"PRIu64", ret=%d", loop, qtr->qctx.n_bdigs, qret);
2691 switch (qret) {
2692 case AS_QUERY_OK:
2693 case AS_QUERY_DONE:
2694 break;
2695 case AS_QUERY_ERR:
2696 continue;
2697 case AS_QUERY_CONTINUE:
2698 continue;
2699 default:
2700 cf_warning(AS_QUERY, "Unexpected return type");
2701 continue;
2702 }
2703
2704 if (qret == AS_QUERY_DONE) {
2705 // In case all physical tree is done return. if not range loop
2706 // till less than batch size results are returned
2707#if defined(USE_SYSTEMTAP)
2708 nrecs = qtr->n_result_records;
2709#endif
2710 qtr_set_done(qtr, AS_OK, __FILE__, __LINE__);
2711 }
2712
2713 // Step 6: Prepare Query Request either to process inline or for
2714 // queueing up for offline processing
2715 if (qtr_process(qtr)) {
2716 qtr_set_err(qtr, AS_ERR_QUERY_CB, __FILE__, __LINE__);
2717 continue;
2718 }
2719 }
2720
2721 if (!qtr_is_abort(qtr)) {
2722 // Send the fin packet in it is NOT a shutdown
2723 query_send_fin(qtr);
2724 }
2725 // deleting it from the global hash.
2726 hash_delete_qtr(qtr);
2727 qtr_release(qtr, __FILE__, __LINE__);
2728 ASD_QUERY_DONE(nodeid, trid, nrecs);
2729}
2730
2731/*
2732 * Function as_query_worker
2733 *
2734 * Notes -
2735 * Process one queue's Query requests.
2736 * - Immediately fail if query has timed out
2737 * - Maximum queries that can be served is number of threads
2738 *
2739 * Releases the qtr, which will call as_query_trasaction_done
2740 *
2741 * Synchronization -
2742 * Takes a global query lock while
2743 */
2744void*
2745query_th(void* q_to_wait_on)
2746{
2747 cf_queue * query_queue = (cf_queue*)q_to_wait_on;
2748 unsigned int thread_id = cf_atomic32_incr(&g_query_threadcnt);
2749 cf_detail(AS_QUERY, "Query Thread Created %d", thread_id);
2750 as_query_transaction *qtr = NULL;
2751
2752 while (1) {
2753 // Kill self if thread id is greater than that of number of configured
2754 // thread
2755 if (thread_id > g_config.query_threads) {
2756 pthread_rwlock_rdlock(&g_query_lock);
2757 if (thread_id > g_config.query_threads) {
2758 cf_atomic32_decr(&g_query_threadcnt);
2759 pthread_rwlock_unlock(&g_query_lock);
2760 cf_detail(AS_QUERY, "Query thread %d exited", thread_id);
2761 return NULL;
2762 }
2763 pthread_rwlock_unlock(&g_query_lock);
2764 }
2765 if (cf_queue_pop(query_queue, &qtr, CF_QUEUE_FOREVER) != 0) {
2766 cf_crash(AS_QUERY, "Failed to pop from Query worker queue.");
2767 }
2768
2769 query_generator(qtr);
2770 }
2771 return AS_QUERY_OK;
2772}
2773
2774/*
2775 * Parse the UDF OP type to find what type of UDF this is or otherwise not even
2776 * UDF
2777 */
2778query_type
2779query_get_type(as_transaction* tr)
2780{
2781 if (! as_transaction_is_udf(tr)) {
2782 return (tr->msgp->msg.info2 & AS_MSG_INFO2_WRITE) != 0 ?
2783 QUERY_TYPE_OPS_BG : QUERY_TYPE_LOOKUP;
2784 }
2785
2786 as_msg_field *udf_op_f = as_transaction_has_udf_op(tr) ?
2787 as_msg_field_get(&tr->msgp->msg, AS_MSG_FIELD_TYPE_UDF_OP) : NULL;
2788
2789 if (udf_op_f && *udf_op_f->data == (uint8_t)AS_UDF_OP_AGGREGATE) {
2790 return QUERY_TYPE_AGGR;
2791 }
2792
2793 if (udf_op_f && *udf_op_f->data == (uint8_t)AS_UDF_OP_BACKGROUND) {
2794 return QUERY_TYPE_UDF_BG;
2795 }
2796
2797 return QUERY_TYPE_UNKNOWN;
2798}
2799
2800/*
2801 * Function aggr_query_init
2802 */
2803int
2804aggr_query_init(as_aggr_call * call, as_transaction *tr)
2805{
2806 if (! udf_def_init_from_msg(&call->def, tr)) {
2807 return AS_QUERY_ERR;
2808 }
2809
2810 call->aggr_hooks = &query_aggr_hooks;
2811 return AS_QUERY_OK;
2812}
2813
2814static bool
2815query_setup_udf_call(as_query_transaction *qtr, as_transaction *tr)
2816{
2817 switch (qtr->job_type) {
2818 case QUERY_TYPE_LOOKUP:
2819 cf_atomic64_incr(&qtr->ns->n_lookup);
2820 break;
2821 case QUERY_TYPE_AGGR:
2822 if (aggr_query_init(&qtr->agg_call, tr) != AS_QUERY_OK) {
2823 tr->result_code = AS_ERR_PARAMETER;
2824 return false;
2825 }
2826 cf_atomic64_incr(&qtr->ns->n_aggregation);
2827 break;
2828 case QUERY_TYPE_UDF_BG:
2829 if (! udf_def_init_from_msg(&qtr->iudf_orig.def, tr)) {
2830 tr->result_code = AS_ERR_PARAMETER;
2831 return false;
2832 }
2833 break;
2834 case QUERY_TYPE_OPS_BG:
2835 break;
2836 default:
2837 cf_crash(AS_QUERY, "Invalid QUERY TYPE %d !!!", qtr->job_type);
2838 break;
2839 }
2840 return true;
2841}
2842
2843static bool
2844query_setup_shared_msgp(as_query_transaction *qtr, as_transaction *tr)
2845{
2846 switch (qtr->job_type) {
2847 case QUERY_TYPE_LOOKUP:
2848 case QUERY_TYPE_AGGR:
2849 break;
2850 case QUERY_TYPE_UDF_BG: {
2851 uint8_t info2 = AS_MSG_INFO2_WRITE |
2852 (tr->msgp->msg.info2 & AS_MSG_INFO2_DURABLE_DELETE);
2853
2854 qtr->iudf_orig.msgp = as_msg_create_internal(qtr->ns->name, 0,
2855 info2, 0, 0, NULL, 0);
2856 }
2857 break;
2858 case QUERY_TYPE_OPS_BG: {
2859 as_msg* om = &tr->msgp->msg;
2860
2861 if ((om->info1 & AS_MSG_INFO1_READ) != 0) {
2862 cf_warning(AS_QUERY, "ops not write only");
2863 return false;
2864 }
2865
2866 if (om->n_ops == 0) {
2867 cf_warning(AS_QUERY, "ops query has no ops");
2868 return false;
2869 }
2870
2871 uint8_t info2 = AS_MSG_INFO2_WRITE |
2872 (om->info2 & AS_MSG_INFO2_DURABLE_DELETE);
2873 uint8_t info3 = AS_MSG_INFO3_UPDATE_ONLY |
2874 (om->info3 & AS_MSG_INFO3_REPLACE_ONLY);
2875
2876 int i = 0;
2877 uint8_t* ops = (uint8_t*)as_msg_op_iterate(om, NULL, &i);
2878
2879 qtr->iops_orig.msgp = as_msg_create_internal(qtr->ns->name, 0,
2880 info2, info3, om->n_ops, ops,
2881 tr->msgp->proto.sz - (ops - (uint8_t*)om));
2882 }
2883 break;
2884 default:
2885 cf_crash(AS_QUERY, "Invalid QUERY TYPE %d !!!", qtr->job_type);
2886 break;
2887 }
2888 return true;
2889}
2890
2891static void
2892query_setup_fd(as_query_transaction *qtr, as_transaction *tr)
2893{
2894 switch (qtr->job_type) {
2895 case QUERY_TYPE_LOOKUP:
2896 case QUERY_TYPE_AGGR:
2897 qtr->fd_h = tr->from.proto_fd_h;
2898 break;
2899 case QUERY_TYPE_UDF_BG:
2900 case QUERY_TYPE_OPS_BG:
2901 qtr->fd_h = NULL;
2902 break;
2903 default:
2904 cf_crash(AS_QUERY, "Invalid QUERY TYPE %d !!!", qtr->job_type);
2905 break;
2906 }
2907}
2908/*
2909 * Phase I query setup which happens just before query is queued for generator
2910 * Populates valid qtrp in case of success and NULL in case of failure.
2911 * All the query related parsing code sits here
2912 *
2913 * Returns:
2914 * AS_QUERY_OK in case of successful
2915 * AS_QUERY_DONE in case nothing to be like scan on non-existent set
2916 * AS_QUERY_ERR in case of parsing failure
2917 *
2918 */
2919static int
2920query_setup(as_transaction *tr, as_namespace *ns, as_query_transaction **qtrp)
2921{
2922
2923#if defined(USE_SYSTEMTAP)
2924 uint64_t nodeid = g_config.self_node;
2925 uint64_t trid = tr ? as_transaction_trid(tr) : 0;
2926#endif
2927
2928 int rv = AS_QUERY_ERR;
2929 *qtrp = NULL;
2930
2931 ASD_QUERY_STARTING(nodeid, trid);
2932
2933 uint64_t start_time = cf_getns();
2934 as_sindex *si = NULL;
2935 cf_vector *binlist = 0;
2936 as_sindex_range *srange = 0;
2937 predexp_eval_t *predexp_eval = NULL;
2938 char *setname = NULL;
2939 as_query_transaction *qtr = NULL;
2940
2941 bool has_sindex = as_sindex_ns_has_sindex(ns);
2942 if (!has_sindex) {
2943 tr->result_code = AS_ERR_SINDEX_NOT_FOUND;
2944 cf_debug(AS_QUERY, "No Secondary Index on namespace %s", ns->name);
2945 goto Cleanup;
2946 }
2947
2948 as_msg *m = &tr->msgp->msg;
2949
2950 // TODO - still lots of redundant msg field parsing (e.g. for set) - fix.
2951 if ((si = as_sindex_from_msg(ns, m)) == NULL) {
2952 cf_debug(AS_QUERY, "No Index Defined in the Query");
2953 }
2954
2955 ASD_SINDEX_MSGRANGE_STARTING(nodeid, trid);
2956 int ret = as_sindex_rangep_from_msg(ns, m, &srange);
2957 if (AS_QUERY_OK != ret) {
2958 cf_debug(AS_QUERY, "Could not instantiate index range metadata... "
2959 "Err, %s", as_sindex_err_str(ret));
2960 tr->result_code = as_sindex_err_to_clienterr(ret, __FILE__, __LINE__);
2961 goto Cleanup;
2962 }
2963
2964 ASD_SINDEX_MSGRANGE_FINISHED(nodeid, trid);
2965 // get optional set
2966 as_msg_field *sfp = as_transaction_has_set(tr) ?
2967 as_msg_field_get(m, AS_MSG_FIELD_TYPE_SET) : NULL;
2968
2969 if (sfp) {
2970 uint32_t setname_len = as_msg_field_get_value_sz(sfp);
2971
2972 if (setname_len >= AS_SET_NAME_MAX_SIZE) {
2973 cf_warning(AS_QUERY, "set name too long");
2974 tr->result_code = AS_ERR_PARAMETER;
2975 goto Cleanup;
2976 }
2977
2978 if (setname_len != 0) {
2979 setname = cf_strndup((const char *)sfp->data, setname_len);
2980 }
2981 }
2982
2983 if (si) {
2984
2985 if (! as_sindex_can_query(si)) {
2986 tr->result_code = as_sindex_err_to_clienterr(
2987 AS_SINDEX_ERR_NOT_READABLE, __FILE__, __LINE__);
2988 goto Cleanup;
2989 }
2990 } else {
2991 // Look up sindex by bin in the query in case not
2992 // specified in query
2993 si = as_sindex_from_range(ns, setname, srange);
2994 }
2995
2996 if (as_transaction_has_predexp(tr)) {
2997 as_msg_field * pfp = as_msg_field_get(m, AS_MSG_FIELD_TYPE_PREDEXP);
2998 predexp_eval = predexp_build(pfp);
2999 if (! predexp_eval) {
3000 cf_warning(AS_QUERY, "Failed to build predicate expression");
3001 tr->result_code = AS_ERR_PARAMETER;
3002 goto Cleanup;
3003 }
3004 }
3005
3006 int numbins = 0;
3007 // Populate binlist to be Projected by the Query
3008 binlist = as_sindex_binlist_from_msg(ns, m, &numbins);
3009
3010 // If anyone of the bin in the bin is bad, fail the query
3011 if (numbins != 0 && !binlist) {
3012 tr->result_code = AS_ERR_SINDEX_GENERIC;
3013 goto Cleanup;
3014 }
3015
3016 if (!has_sindex || !si) {
3017 tr->result_code = AS_ERR_SINDEX_NOT_FOUND;
3018 goto Cleanup;
3019 }
3020
3021 // quick check if there is any data with the certain set name
3022 if (setname && as_namespace_get_set_id(ns, setname) == INVALID_SET_ID) {
3023 cf_info(AS_QUERY, "Query on non-existent set %s", setname);
3024 tr->result_code = AS_OK;
3025 rv = AS_QUERY_DONE;
3026 goto Cleanup;
3027 }
3028 cf_detail(AS_QUERY, "Query on index %s ",
3029 ((as_sindex_metadata *)si->imd)->iname);
3030
3031 query_type qtype = query_get_type(tr);
3032 if (qtype == QUERY_TYPE_UNKNOWN) {
3033 tr->result_code = AS_ERR_PARAMETER;
3034 rv = AS_QUERY_ERR;
3035 goto Cleanup;
3036 }
3037
3038 if (qtype == QUERY_TYPE_AGGR && as_transaction_has_predexp(tr)) {
3039 cf_warning(AS_QUERY, "aggregation queries do not support predexp filters");
3040 tr->result_code = AS_ERR_UNSUPPORTED_FEATURE;
3041 rv = AS_QUERY_ERR;
3042 goto Cleanup;
3043 }
3044
3045 ASD_QUERY_QTRSETUP_STARTING(nodeid, trid);
3046 qtr = qtr_alloc();
3047 if (!qtr) {
3048 tr->result_code = AS_ERR_UNKNOWN;
3049 goto Cleanup;
3050 }
3051 ASD_QUERY_QTR_ALLOC(nodeid, trid, (void *) qtr);
3052 // Be aware of the size of qtr
3053 // Memset it partial
3054 memset(qtr, 0, offsetof(as_query_transaction, bkey));
3055
3056 ASD_QUERY_QTRSETUP_FINISHED(nodeid, trid);
3057
3058 qtr->ns = ns;
3059 qtr->job_type = qtype;
3060
3061 if (! query_setup_udf_call(qtr, tr)) {
3062 rv = AS_QUERY_ERR;
3063 cf_free(qtr);
3064 goto Cleanup;
3065 }
3066
3067 if (! query_setup_shared_msgp(qtr, tr)) {
3068 rv = AS_QUERY_ERR;
3069 cf_free(qtr);
3070 goto Cleanup;
3071 // Nothing to clean from udf setup - query types that allocate there
3072 // can't fail here.
3073 }
3074
3075 query_setup_fd(qtr, tr);
3076
3077 if (qtr->job_type == QUERY_TYPE_LOOKUP) {
3078 qtr->predexp_eval = predexp_eval;
3079 qtr->no_bin_data = (m->info1 & AS_MSG_INFO1_GET_NO_BINS) != 0;
3080 }
3081 else if (qtr->job_type == QUERY_TYPE_UDF_BG) {
3082 qtr->iudf_orig.predexp = predexp_eval;
3083 qtr->iudf_orig.cb = query_udf_bg_tr_complete;
3084 qtr->iudf_orig.udata = (void *)qtr;
3085 }
3086 else if (qtr->job_type == QUERY_TYPE_OPS_BG) {
3087 qtr->iops_orig.predexp = predexp_eval;
3088 qtr->iops_orig.cb = query_ops_bg_tr_complete;
3089 qtr->iops_orig.udata = (void *)qtr;
3090 }
3091
3092 // Consume everything from tr rest will be picked up in init
3093 qtr->trid = as_transaction_trid(tr);
3094 qtr->setname = setname;
3095 qtr->si = si;
3096 qtr->srange = srange;
3097 qtr->binlist = binlist;
3098 qtr->start_time = start_time;
3099 qtr->end_time = tr->end_time;
3100 qtr->rsv = NULL;
3101
3102 rv = AS_QUERY_OK;
3103
3104 cf_mutex_init(&qtr->slock);
3105 qtr->state = AS_QTR_STATE_INIT;
3106 qtr->do_requeue = false;
3107 qtr->short_running = true;
3108
3109 *qtrp = qtr;
3110 return rv;
3111
3112Cleanup:
3113 // Pre Query Setup Failure
3114 if (setname) cf_free(setname);
3115 if (si) AS_SINDEX_RELEASE(si);
3116
3117 predexp_destroy(predexp_eval);
3118
3119 if (srange) as_sindex_range_free(&srange);
3120 if (binlist) cf_vector_destroy(binlist);
3121 return rv;
3122}
3123
3124/*
3125 * Arguments -
3126 * tr - transaction coming from the client.
3127 *
3128 * Returns -
3129 * AS_QUERY_OK - on success. Responds, frees msgp and proto_fd
3130 * AS_QUERY_ERR - on failure. That means the query was not even started.
3131 * frees msgp, response is responsibility of caller
3132 *
3133 * Notes -
3134 * Allocates and reserves the qtr if query_in_transaction_thr
3135 * is set to false or data is in not in memory.
3136 * Has the responsibility to free tr->msgp.
3137 * Either call query_transaction_done or Cleanup to free the msgp
3138 */
3139int
3140as_query(as_transaction *tr, as_namespace *ns)
3141{
3142 if (tr) {
3143 QUERY_HIST_INSERT_DATA_POINT(query_txn_q_wait_hist, tr->start_time);
3144 }
3145
3146 as_query_transaction *qtr;
3147 int rv = query_setup(tr, ns, &qtr);
3148
3149 if (rv == AS_QUERY_DONE) {
3150 // Send FIN packet to client to ignore this.
3151 bool force_close = ! as_msg_send_fin(&tr->from.proto_fd_h->sock, AS_OK);
3152 query_release_fd(tr->from.proto_fd_h, force_close);
3153 tr->from.proto_fd_h = NULL; // Paranoid
3154 return AS_QUERY_OK;
3155 } else if (rv == AS_QUERY_ERR) {
3156 // tsvc takes care of managing fd
3157 return AS_QUERY_ERR;
3158 }
3159
3160 if (g_config.query_in_transaction_thr) {
3161 if (qtr->job_type == QUERY_TYPE_UDF_BG) {
3162 query_send_bg_udf_response(tr);
3163 }
3164 else if (qtr->job_type == QUERY_TYPE_OPS_BG) {
3165 query_send_bg_ops_response(tr);
3166 }
3167 query_generator(qtr);
3168 } else {
3169 if (query_qtr_enqueue(qtr, false)) {
3170 // This error will be accounted by thr_tsvc layer. Thus
3171 // reset fd_h before calling qtr release, and let the
3172 // transaction handler deal with the failure.
3173 qtr->fd_h = NULL;
3174 qtr_release(qtr, __FILE__, __LINE__);
3175 tr->result_code = AS_ERR_QUERY_QUEUE_FULL;
3176 return AS_QUERY_ERR;
3177 }
3178 // Respond after queuing is successfully.
3179 if (qtr->job_type == QUERY_TYPE_UDF_BG) {
3180 query_send_bg_udf_response(tr);
3181 }
3182 else if (qtr->job_type == QUERY_TYPE_OPS_BG) {
3183 query_send_bg_ops_response(tr);
3184 }
3185 }
3186
3187 // Query engine will reply to queued query as needed.
3188 tr->from.proto_fd_h = NULL;
3189 return AS_QUERY_OK;
3190}
3191// **************************************************************************************************
3192
3193
3194/*
3195 * Query Utility and Monitoring functions
3196 */
3197// **************************************************************************************************
3198
3199// Find matching trid and kill the query
3200int
3201as_query_kill(uint64_t trid)
3202{
3203 as_query_transaction *qtr;
3204 int rv = hash_get_qtr(trid, &qtr);
3205
3206 if (rv != AS_QUERY_OK) {
3207 cf_warning(AS_QUERY, "Cannot kill query with trid [%"PRIu64"]", trid);
3208 } else {
3209 qtr_set_abort(qtr, AS_ERR_QUERY_USER_ABORT, __FILE__, __LINE__);
3210 rv = AS_QUERY_OK;
3211 qtr_release(qtr, __FILE__, __LINE__);
3212 }
3213
3214 return rv;
3215}
3216
3217// Find matching trid and set priority
3218int
3219as_query_set_priority(uint64_t trid, uint32_t priority)
3220{
3221 as_query_transaction *qtr;
3222 int rv = hash_get_qtr(trid, &qtr);
3223
3224 if (rv != AS_QUERY_OK) {
3225 cf_warning(AS_QUERY, "Cannot set priority for query with trid [%"PRIu64"]", trid);
3226 } else {
3227 uint32_t old_priority = qtr->priority;
3228 qtr->priority = priority;
3229 cf_info(AS_QUERY, "Query priority changed from %d to %d", old_priority, priority);
3230 rv = AS_QUERY_OK;
3231 qtr_release(qtr, __FILE__, __LINE__);
3232 }
3233 return rv;
3234}
3235
3236int
3237as_query_list_job_reduce_fn(const void *key, uint32_t keylen, void *object, void *udata)
3238{
3239 as_query_transaction * qtr = (as_query_transaction*)object;
3240 cf_dyn_buf * db = (cf_dyn_buf*) udata;
3241
3242 cf_dyn_buf_append_string(db, "trid=");
3243 cf_dyn_buf_append_uint64(db, qtr->trid);
3244 cf_dyn_buf_append_string(db, ":job_type=");
3245 cf_dyn_buf_append_int(db, qtr->job_type);
3246 cf_dyn_buf_append_string(db, ":n_result_records=");
3247 cf_dyn_buf_append_uint64(db, cf_atomic_int_get(qtr->n_result_records));
3248 cf_dyn_buf_append_string(db, ":run_time=");
3249 cf_dyn_buf_append_uint64(db, (cf_getns() - qtr->start_time) / 1000);
3250 cf_dyn_buf_append_string(db, ":state=");
3251 if(qtr_failed(qtr)) {
3252 cf_dyn_buf_append_string(db, "ABORTED");
3253 } else {
3254 cf_dyn_buf_append_string(db, "RUNNING");
3255 }
3256 cf_dyn_buf_append_string(db, ";");
3257 return AS_QUERY_OK;
3258}
3259
3260// Lists thr current running queries
3261int
3262as_query_list(char *name, cf_dyn_buf *db)
3263{
3264 uint32_t size = cf_rchash_get_size(g_query_job_hash);
3265 // No elements in the query job hash, return failure
3266 if (!size) {
3267 cf_dyn_buf_append_string(db, "No running queries");
3268 }
3269 // Else go through all the jobs in the hash and list their statistics
3270 else {
3271 cf_rchash_reduce(g_query_job_hash, as_query_list_job_reduce_fn, db);
3272 cf_dyn_buf_chomp(db);
3273 }
3274 return AS_QUERY_OK;
3275}
3276
3277
3278// query module to monitor
3279void
3280as_query_fill_jobstat(as_query_transaction *qtr, as_mon_jobstat *stat)
3281{
3282 stat->trid = qtr->trid;
3283 stat->run_time = (cf_getns() - qtr->start_time) / 1000000;
3284 stat->recs_succeeded = qtr->n_read_success;
3285 stat->net_io_bytes = qtr->net_io_bytes;
3286 stat->priority = qtr->priority;
3287
3288 // Not relevant:
3289 stat->active_threads = 0;
3290 stat->socket_timeout = 0;
3291
3292 // Not implemented:
3293 stat->rps = 0;
3294 stat->client[0] = '\0';
3295 stat->progress_pct = 0;
3296 stat->time_since_done = 0;
3297 stat->job_type[0] = '\0';
3298
3299 stat->recs_throttled = 0;
3300 stat->recs_filtered_meta = 0;
3301 stat->recs_filtered_bins = 0;
3302 stat->recs_failed = 0;
3303
3304 strcpy(stat->ns, qtr->ns->name);
3305
3306 if (qtr->setname) {
3307 strcpy(stat->set, qtr->setname);
3308 } else {
3309 strcpy(stat->set, "NULL");
3310 }
3311
3312 strcpy(stat->status, "active");
3313
3314 char *specific_data = stat->jdata;
3315 sprintf(specific_data, ":sindex-name=%s:", qtr->si->imd->iname);
3316}
3317
3318/*
3319 * Populates the as_mon_jobstat and returns to mult-key lookup monitoring infrastructure.
3320 * Serves as a callback function
3321 *
3322 * Returns -
3323 * NULL - In case of failure.
3324 * as_mon_jobstat - On success.
3325 */
3326as_mon_jobstat *
3327as_query_get_jobstat(uint64_t trid)
3328{
3329 as_mon_jobstat *stat;
3330 as_query_transaction *qtr;
3331 int rv = hash_get_qtr(trid, &qtr);
3332
3333 if (rv != AS_QUERY_OK) {
3334 cf_warning(AS_MON, "No query was found with trid [%"PRIu64"]", trid);
3335 stat = NULL;
3336 }
3337 else {
3338 stat = cf_malloc(sizeof(as_mon_jobstat));
3339 as_query_fill_jobstat(qtr, stat);
3340 qtr_release(qtr, __FILE__, __LINE__);
3341 }
3342 return stat;
3343}
3344
3345
3346int
3347as_mon_query_jobstat_reduce_fn(const void *key, uint32_t keylen, void *object, void *udata)
3348{
3349 as_query_transaction * qtr = (as_query_transaction*)object;
3350 query_jobstat *job_pool = (query_jobstat*) udata;
3351
3352 if ( job_pool->index >= job_pool->max_size) return AS_QUERY_OK;
3353 as_mon_jobstat * stat = *(job_pool->jobstat);
3354 stat = stat + job_pool->index;
3355 as_query_fill_jobstat(qtr, stat);
3356 (job_pool->index)++;
3357 return AS_QUERY_OK;
3358}
3359
3360as_mon_jobstat *
3361as_query_get_jobstat_all(int * size)
3362{
3363 *size = cf_rchash_get_size(g_query_job_hash);
3364 if(*size == 0) return AS_QUERY_OK;
3365
3366 as_mon_jobstat * job_stats;
3367 query_jobstat job_pool;
3368
3369 job_stats = (as_mon_jobstat *) cf_malloc(sizeof(as_mon_jobstat) * (*size));
3370 job_pool.jobstat = &job_stats;
3371 job_pool.index = 0;
3372 job_pool.max_size = *size;
3373 cf_rchash_reduce(g_query_job_hash, as_mon_query_jobstat_reduce_fn, &job_pool);
3374 *size = job_pool.index;
3375 return job_stats;
3376}
3377
3378void
3379as_query_histogram_dumpall()
3380{
3381 if (g_config.query_enable_histogram == false)
3382 {
3383 return;
3384 }
3385
3386 if (query_txn_q_wait_hist) {
3387 histogram_dump(query_txn_q_wait_hist);
3388 }
3389 if (query_query_q_wait_hist) {
3390 histogram_dump(query_query_q_wait_hist);
3391 }
3392 if (query_prepare_batch_hist) {
3393 histogram_dump(query_prepare_batch_hist);
3394 }
3395 if (query_batch_io_q_wait_hist) {
3396 histogram_dump(query_batch_io_q_wait_hist);
3397 }
3398 if (query_batch_io_hist) {
3399 histogram_dump(query_batch_io_hist);
3400 }
3401 if (query_net_io_hist) {
3402 histogram_dump(query_net_io_hist);
3403 }
3404}
3405
3406
3407/*
3408 * Query Subsystem Initialization function
3409 */
3410// **************************************************************************************************
3411void
3412as_query_gconfig_default(as_config *c)
3413{
3414 // NB: Do not change query_threads default to odd. as_query_reinit code cannot
3415 // handle it. Code to handle it is unnecessarily complicated code, hence opted
3416 // to make the default value even.
3417 c->query_threads = 6;
3418 c->query_worker_threads = 15;
3419 c->query_priority = 10;
3420 c->query_sleep_us = 1;
3421 c->query_bsize = QUERY_BATCH_SIZE;
3422 c->query_in_transaction_thr = 0;
3423 c->query_req_max_inflight = AS_QUERY_MAX_QREQ_INFLIGHT;
3424 c->query_bufpool_size = AS_QUERY_MAX_BUFS;
3425 c->query_short_q_max_size = AS_QUERY_MAX_SHORT_QUEUE_SZ;
3426 c->query_long_q_max_size = AS_QUERY_MAX_LONG_QUEUE_SZ;
3427 c->query_buf_size = AS_QUERY_BUF_SIZE;
3428 c->query_threshold = 10; // threshold after which the query is considered long running
3429 // no reason for choosing 10
3430 c->query_rec_count_bound = UINT64_MAX; // Unlimited
3431 c->query_req_in_query_thread = 0;
3432 c->query_untracked_time_ms = AS_QUERY_UNTRACKED_TIME;
3433
3434 c->partitions_pre_reserved = false;
3435}
3436
3437
3438void
3439as_query_init()
3440{
3441 g_current_queries_count = 0;
3442 cf_detail(AS_QUERY, "Initialize %d Query Worker threads.", g_config.query_threads);
3443
3444 // global job hash to keep track of the query job
3445 g_query_job_hash = cf_rchash_create(cf_rchash_fn_u32, NULL, sizeof(uint64_t), 64, CF_RCHASH_MANY_LOCK);
3446
3447 // I/O threads
3448 g_query_qwork_pool = cf_queue_create(sizeof(query_work *), true);
3449 g_query_response_bb_pool = cf_queue_create(sizeof(void *), true);
3450 g_query_work_queue = cf_queue_create(sizeof(query_work *), true);
3451
3452 for (uint32_t i = 0; i < g_config.query_worker_threads; i++) {
3453 cf_thread_create_detached(qwork_th, (void*)g_query_work_queue);
3454 }
3455
3456 g_query_short_queue = cf_queue_create(sizeof(as_query_transaction *), true);
3457 g_query_long_queue = cf_queue_create(sizeof(as_query_transaction *), true);
3458
3459 for (uint32_t i = 0; i < g_config.query_threads; i += 2) {
3460 cf_thread_create_detached(query_th, (void*)g_query_short_queue);
3461 cf_thread_create_detached(query_th, (void*)g_query_long_queue);
3462 }
3463
3464 char hist_name[64];
3465
3466 sprintf(hist_name, "query_txn_q_wait_us");
3467 query_txn_q_wait_hist = histogram_create(hist_name, HIST_MICROSECONDS);
3468
3469 sprintf(hist_name, "query_query_q_wait_us");
3470 query_query_q_wait_hist = histogram_create(hist_name, HIST_MICROSECONDS);
3471
3472 sprintf(hist_name, "query_prepare_batch_us");
3473 query_prepare_batch_hist = histogram_create(hist_name, HIST_MICROSECONDS);
3474
3475 sprintf(hist_name, "query_batch_io_q_wait_us");
3476 query_batch_io_q_wait_hist = histogram_create(hist_name, HIST_MICROSECONDS);
3477
3478 sprintf(hist_name, "query_batch_io_us");
3479 query_batch_io_hist = histogram_create(hist_name, HIST_MICROSECONDS);
3480
3481 sprintf(hist_name, "query_net_io_us");
3482 query_net_io_hist = histogram_create(hist_name, HIST_MICROSECONDS);
3483
3484 g_config.query_enable_histogram = false;
3485}
3486
3487/*
3488 * Description -
3489 * It tries to set the query_worker_threads to the given value.
3490 *
3491 * Synchronization -
3492 * Takes a global query lock to protect the config of
3493 *
3494 * Arguments -
3495 * set_size - Value which one want to assign to query_threads.
3496 *
3497 * Returns -
3498 * AS_QUERY_OK - On successful resize of query threads.
3499 * AS_QUERY_ERR - Either the set_size exceeds AS_QUERY_MAX_THREADS
3500 * OR Query threads were not initialized on the first place.
3501 */
3502int
3503as_query_worker_reinit(int set_size, int *actual_size)
3504{
3505 if (set_size > AS_QUERY_MAX_WORKER_THREADS) {
3506 cf_warning(AS_QUERY, "Cannot increase query threads more than %d",
3507 AS_QUERY_MAX_WORKER_THREADS);
3508 //unlock
3509 return AS_QUERY_ERR;
3510 }
3511
3512 pthread_rwlock_wrlock(&g_query_lock);
3513 // Add threads if count is increased
3514 int i = cf_atomic32_get(g_query_worker_threadcnt);
3515 g_config.query_worker_threads = set_size;
3516 if (set_size > g_query_worker_threadcnt) {
3517 for (; i < set_size; i++) {
3518 cf_detail(AS_QUERY, "Creating thread %d", i);
3519 cf_thread_create_detached(qwork_th, (void*)g_query_work_queue);
3520 }
3521 g_config.query_worker_threads = i;
3522 }
3523 *actual_size = g_config.query_worker_threads;
3524
3525 pthread_rwlock_unlock(&g_query_lock);
3526
3527 return AS_QUERY_OK;
3528}
3529
3530/*
3531 * Description -
3532 * It tries to set the query_threads to the given value.
3533 *
3534 * Synchronization -
3535 * Takes a global query lock to protect the config of
3536 *
3537 * Arguments -
3538 * set_size - Value which one want to assign to query_threads.
3539 *
3540 * Returns -
3541 * AS_QUERY_OK - On successful resize of query threads.
3542 * AS_QUERY_ERR - Either the set_size exceeds AS_QUERY_MAX_THREADS
3543 * OR Query threads were not initialized on the first place.
3544 */
3545int
3546as_query_reinit(int set_size, int *actual_size)
3547{
3548 if (set_size > AS_QUERY_MAX_THREADS) {
3549 cf_warning(AS_QUERY, "Cannot increase query threads more than %d",
3550 AS_QUERY_MAX_THREADS);
3551 return AS_QUERY_ERR;
3552 }
3553
3554 pthread_rwlock_wrlock(&g_query_lock);
3555 // Add threads if count is increased
3556 int i = cf_atomic32_get(g_query_threadcnt);
3557
3558 // make it multiple of 2
3559 if (set_size % 2 != 0)
3560 set_size++;
3561
3562 g_config.query_threads = set_size;
3563 if (set_size > g_query_threadcnt) {
3564 for (; i < set_size; i += 2) {
3565 cf_detail(AS_QUERY, "Creating thread %d", i);
3566 cf_thread_create_detached(query_th, (void*)g_query_short_queue);
3567 cf_thread_create_detached(query_th, (void*)g_query_long_queue);
3568 }
3569 g_config.query_threads = i;
3570 }
3571 *actual_size = g_config.query_threads;
3572
3573 pthread_rwlock_unlock(&g_query_lock);
3574
3575 return AS_QUERY_OK;
3576}
3577// **************************************************************************************************
3578