1#include <roaring/roaring.h>
2
3struct roaring_pq_element_s {
4 uint64_t size;
5 bool is_temporary;
6 roaring_bitmap_t *bitmap;
7};
8
9typedef struct roaring_pq_element_s roaring_pq_element_t;
10
11struct roaring_pq_s {
12 roaring_pq_element_t *elements;
13 uint64_t size;
14};
15
16typedef struct roaring_pq_s roaring_pq_t;
17
18static inline bool compare(roaring_pq_element_t *t1, roaring_pq_element_t *t2) {
19 return t1->size < t2->size;
20}
21
22static void pq_add(roaring_pq_t *pq, roaring_pq_element_t *t) {
23 uint64_t i = pq->size;
24 pq->elements[pq->size++] = *t;
25 while (i > 0) {
26 uint64_t p = (i - 1) >> 1;
27 roaring_pq_element_t ap = pq->elements[p];
28 if (!compare(t, &ap)) break;
29 pq->elements[i] = ap;
30 i = p;
31 }
32 pq->elements[i] = *t;
33}
34
35static void pq_free(roaring_pq_t *pq) {
36 free(pq->elements);
37 pq->elements = NULL; // paranoid
38 free(pq);
39}
40
41static void percolate_down(roaring_pq_t *pq, uint32_t i) {
42 uint32_t size = (uint32_t)pq->size;
43 uint32_t hsize = size >> 1;
44 roaring_pq_element_t ai = pq->elements[i];
45 while (i < hsize) {
46 uint32_t l = (i << 1) + 1;
47 uint32_t r = l + 1;
48 roaring_pq_element_t bestc = pq->elements[l];
49 if (r < size) {
50 if (compare(pq->elements + r, &bestc)) {
51 l = r;
52 bestc = pq->elements[r];
53 }
54 }
55 if (!compare(&bestc, &ai)) {
56 break;
57 }
58 pq->elements[i] = bestc;
59 i = l;
60 }
61 pq->elements[i] = ai;
62}
63
64static roaring_pq_t *create_pq(const roaring_bitmap_t **arr, uint32_t length) {
65 roaring_pq_t *answer = (roaring_pq_t *)malloc(sizeof(roaring_pq_t));
66 answer->elements =
67 (roaring_pq_element_t *)malloc(sizeof(roaring_pq_element_t) * length);
68 answer->size = length;
69 for (uint32_t i = 0; i < length; i++) {
70 answer->elements[i].bitmap = (roaring_bitmap_t *)arr[i];
71 answer->elements[i].is_temporary = false;
72 answer->elements[i].size =
73 roaring_bitmap_portable_size_in_bytes(arr[i]);
74 }
75 for (int32_t i = (length >> 1); i >= 0; i--) {
76 percolate_down(answer, i);
77 }
78 return answer;
79}
80
81static roaring_pq_element_t pq_poll(roaring_pq_t *pq) {
82 roaring_pq_element_t ans = *pq->elements;
83 if (pq->size > 1) {
84 pq->elements[0] = pq->elements[--pq->size];
85 percolate_down(pq, 0);
86 } else
87 --pq->size;
88 // memmove(pq->elements,pq->elements+1,(pq->size-1)*sizeof(roaring_pq_element_t));--pq->size;
89 return ans;
90}
91
92// this function consumes and frees the inputs
93static roaring_bitmap_t *lazy_or_from_lazy_inputs(roaring_bitmap_t *x1,
94 roaring_bitmap_t *x2) {
95 uint8_t container_result_type = 0;
96 const int length1 = ra_get_size(&x1->high_low_container),
97 length2 = ra_get_size(&x2->high_low_container);
98 if (0 == length1) {
99 roaring_bitmap_free(x1);
100 return x2;
101 }
102 if (0 == length2) {
103 roaring_bitmap_free(x2);
104 return x1;
105 }
106 uint32_t neededcap = length1 > length2 ? length2 : length1;
107 roaring_bitmap_t *answer = roaring_bitmap_create_with_capacity(neededcap);
108 int pos1 = 0, pos2 = 0;
109 uint8_t container_type_1, container_type_2;
110 uint16_t s1 = ra_get_key_at_index(&x1->high_low_container, pos1);
111 uint16_t s2 = ra_get_key_at_index(&x2->high_low_container, pos2);
112 while (true) {
113 if (s1 == s2) {
114 // todo: unsharing can be inefficient as it may create a clone where
115 // none
116 // is needed, but it has the benefit of being easy to reason about.
117 ra_unshare_container_at_index(&x1->high_low_container, pos1);
118 void *c1 = ra_get_container_at_index(&x1->high_low_container, pos1,
119 &container_type_1);
120 assert(container_type_1 != SHARED_CONTAINER_TYPE_CODE);
121 ra_unshare_container_at_index(&x2->high_low_container, pos2);
122 void *c2 = ra_get_container_at_index(&x2->high_low_container, pos2,
123 &container_type_2);
124 assert(container_type_2 != SHARED_CONTAINER_TYPE_CODE);
125 void *c;
126
127 if ((container_type_2 == BITSET_CONTAINER_TYPE_CODE) &&
128 (container_type_1 != BITSET_CONTAINER_TYPE_CODE)) {
129 c = container_lazy_ior(c2, container_type_2, c1,
130 container_type_1,
131 &container_result_type);
132 container_free(c1, container_type_1);
133 if (c != c2) {
134 container_free(c2, container_type_2);
135 }
136 } else {
137 c = container_lazy_ior(c1, container_type_1, c2,
138 container_type_2,
139 &container_result_type);
140 container_free(c2, container_type_2);
141 if (c != c1) {
142 container_free(c1, container_type_1);
143 }
144 }
145 // since we assume that the initial containers are non-empty, the
146 // result here
147 // can only be non-empty
148 ra_append(&answer->high_low_container, s1, c,
149 container_result_type);
150 ++pos1;
151 ++pos2;
152 if (pos1 == length1) break;
153 if (pos2 == length2) break;
154 s1 = ra_get_key_at_index(&x1->high_low_container, pos1);
155 s2 = ra_get_key_at_index(&x2->high_low_container, pos2);
156
157 } else if (s1 < s2) { // s1 < s2
158 void *c1 = ra_get_container_at_index(&x1->high_low_container, pos1,
159 &container_type_1);
160 ra_append(&answer->high_low_container, s1, c1, container_type_1);
161 pos1++;
162 if (pos1 == length1) break;
163 s1 = ra_get_key_at_index(&x1->high_low_container, pos1);
164
165 } else { // s1 > s2
166 void *c2 = ra_get_container_at_index(&x2->high_low_container, pos2,
167 &container_type_2);
168 ra_append(&answer->high_low_container, s2, c2, container_type_2);
169 pos2++;
170 if (pos2 == length2) break;
171 s2 = ra_get_key_at_index(&x2->high_low_container, pos2);
172 }
173 }
174 if (pos1 == length1) {
175 ra_append_move_range(&answer->high_low_container,
176 &x2->high_low_container, pos2, length2);
177 } else if (pos2 == length2) {
178 ra_append_move_range(&answer->high_low_container,
179 &x1->high_low_container, pos1, length1);
180 }
181 ra_clear_without_containers(&x1->high_low_container);
182 ra_clear_without_containers(&x2->high_low_container);
183 free(x1);
184 free(x2);
185 return answer;
186}
187
188/**
189 * Compute the union of 'number' bitmaps using a heap. This can
190 * sometimes be faster than roaring_bitmap_or_many which uses
191 * a naive algorithm. Caller is responsible for freeing the
192 * result.
193 */
194roaring_bitmap_t *roaring_bitmap_or_many_heap(uint32_t number,
195 const roaring_bitmap_t **x) {
196 if (number == 0) {
197 return roaring_bitmap_create();
198 }
199 if (number == 1) {
200 return roaring_bitmap_copy(x[0]);
201 }
202 roaring_pq_t *pq = create_pq(x, number);
203 while (pq->size > 1) {
204 roaring_pq_element_t x1 = pq_poll(pq);
205 roaring_pq_element_t x2 = pq_poll(pq);
206
207 if (x1.is_temporary && x2.is_temporary) {
208 roaring_bitmap_t *newb =
209 lazy_or_from_lazy_inputs(x1.bitmap, x2.bitmap);
210 // should normally return a fresh new bitmap *except* that
211 // it can return x1.bitmap or x2.bitmap in degenerate cases
212 bool temporary = !((newb == x1.bitmap) && (newb == x2.bitmap));
213 uint64_t bsize = roaring_bitmap_portable_size_in_bytes(newb);
214 roaring_pq_element_t newelement = {
215 .size = bsize, .is_temporary = temporary, .bitmap = newb};
216 pq_add(pq, &newelement);
217 } else if (x2.is_temporary) {
218 roaring_bitmap_lazy_or_inplace(x2.bitmap, x1.bitmap, false);
219 x2.size = roaring_bitmap_portable_size_in_bytes(x2.bitmap);
220 pq_add(pq, &x2);
221 } else if (x1.is_temporary) {
222 roaring_bitmap_lazy_or_inplace(x1.bitmap, x2.bitmap, false);
223 x1.size = roaring_bitmap_portable_size_in_bytes(x1.bitmap);
224
225 pq_add(pq, &x1);
226 } else {
227 roaring_bitmap_t *newb =
228 roaring_bitmap_lazy_or(x1.bitmap, x2.bitmap, false);
229 uint64_t bsize = roaring_bitmap_portable_size_in_bytes(newb);
230 roaring_pq_element_t newelement = {
231 .size = bsize, .is_temporary = true, .bitmap = newb};
232
233 pq_add(pq, &newelement);
234 }
235 }
236 roaring_pq_element_t X = pq_poll(pq);
237 roaring_bitmap_t *answer = X.bitmap;
238 roaring_bitmap_repair_after_lazy(answer);
239 pq_free(pq);
240 return answer;
241}
242