1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <my_global.h>
40#include <toku_portability.h>
41#include "toku_os.h"
42#include "ft-internal.h"
43#include "loader/loader-internal.h"
44#include "loader/pqueue.h"
45
46#define pqueue_left(i) ((i) << 1)
47#define pqueue_right(i) (((i) << 1) + 1)
48#define pqueue_parent(i) ((i) >> 1)
49
50int pqueue_init(pqueue_t **result, size_t n, int which_db, DB *db, ft_compare_func compare, struct error_callback_s *err_callback)
51{
52 pqueue_t *MALLOC(q);
53 if (!q) {
54 return get_error_errno();
55 }
56
57 /* Need to allocate n+1 elements since element 0 isn't used. */
58 MALLOC_N(n + 1, q->d);
59 if (!q->d) {
60 int r = get_error_errno();
61 toku_free(q);
62 return r;
63 }
64 q->size = 1;
65 q->avail = q->step = (n+1); /* see comment above about n+1 */
66
67 q->which_db = which_db;
68 q->db = db;
69 q->compare = compare;
70 q->dup_error = 0;
71
72 q->error_callback = err_callback;
73
74 *result = q;
75 return 0;
76}
77
78void pqueue_free(pqueue_t *q)
79{
80 toku_free(q->d);
81 toku_free(q);
82}
83
84
85size_t pqueue_size(pqueue_t *q)
86{
87 /* queue element 0 exists but doesn't count since it isn't used. */
88 return (q->size - 1);
89}
90
91static int pqueue_compare(pqueue_t *q, DBT *next_key, DBT *next_val, DBT *curr_key)
92{
93 int r = q->compare(q->db, next_key, curr_key);
94 if ( r == 0 ) { // duplicate key : next_key == curr_key
95 q->dup_error = 1;
96 if (q->error_callback)
97 ft_loader_set_error_and_callback(q->error_callback, DB_KEYEXIST, q->db, q->which_db, next_key, next_val);
98 }
99 return ( r > -1 );
100}
101
102static void pqueue_bubble_up(pqueue_t *q, size_t i)
103{
104 size_t parent_node;
105 pqueue_node_t *moving_node = q->d[i];
106 DBT *moving_key = moving_node->key;
107
108 for (parent_node = pqueue_parent(i);
109 ((i > 1) && pqueue_compare(q, q->d[parent_node]->key, q->d[parent_node]->val, moving_key));
110 i = parent_node, parent_node = pqueue_parent(i))
111 {
112 q->d[i] = q->d[parent_node];
113 }
114
115 q->d[i] = moving_node;
116}
117
118
119static size_t pqueue_maxchild(pqueue_t *q, size_t i)
120{
121 size_t child_node = pqueue_left(i);
122
123 if (child_node >= q->size)
124 return 0;
125
126 if ((child_node+1) < q->size &&
127 pqueue_compare(q, q->d[child_node]->key, q->d[child_node]->val, q->d[child_node+1]->key))
128 child_node++; /* use right child instead of left */
129
130 return child_node;
131}
132
133
134static void pqueue_percolate_down(pqueue_t *q, size_t i)
135{
136 size_t child_node;
137 pqueue_node_t *moving_node = q->d[i];
138 DBT *moving_key = moving_node->key;
139 DBT *moving_val = moving_node->val;
140
141 while ((child_node = pqueue_maxchild(q, i)) &&
142 pqueue_compare(q, moving_key, moving_val, q->d[child_node]->key))
143 {
144 q->d[i] = q->d[child_node];
145 i = child_node;
146 }
147
148 q->d[i] = moving_node;
149}
150
151
152int pqueue_insert(pqueue_t *q, pqueue_node_t *d)
153{
154 size_t i;
155
156 if (!q) return 1;
157 if (q->size >= q->avail) return 1;
158
159 /* insert item */
160 i = q->size++;
161 q->d[i] = d;
162 pqueue_bubble_up(q, i);
163
164 if ( q->dup_error ) return DB_KEYEXIST;
165 return 0;
166}
167
168int pqueue_pop(pqueue_t *q, pqueue_node_t **d)
169{
170 if (!q || q->size == 1) {
171 *d = NULL;
172 return 0;
173 }
174
175 *d = q->d[1];
176 q->d[1] = q->d[--q->size];
177 pqueue_percolate_down(q, 1);
178
179 if ( q->dup_error ) return DB_KEYEXIST;
180 return 0;
181}
182