1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9#include "monetdb_config.h"
10#include "store_sequence.h"
11#include "sql_storage.h"
12#include "gdk_logger.h"
13
14typedef struct store_sequence {
15 sqlid seqid;
16 bit called;
17 lng cur;
18 lng cached;
19} store_sequence;
20
21static list *sql_seqs = NULL;
22
23static void
24sequence_destroy( store_sequence *s )
25{
26 _DELETE(s);
27}
28
29void*
30sequences_init(void)
31{
32 sql_seqs = list_create( (fdestroy)sequence_destroy );
33 return (void*) sql_seqs;
34}
35
36void
37sequences_exit(void)
38{
39 if(sql_seqs) {
40 list_destroy(sql_seqs);
41 sql_seqs = NULL;
42 }
43}
44
45/* lock is held */
46static void
47sql_update_sequence_cache(sql_sequence *seq, lng cached)
48{
49 logger_funcs.log_sequence(seq->base.id, cached);
50}
51
52/* lock is held */
53static store_sequence *
54sql_create_sequence(sql_sequence *seq )
55{
56 lng id = 0;
57 store_sequence *s = NULL;
58 s = MNEW(store_sequence);
59 if(!s)
60 return NULL;
61
62 s -> seqid = seq->base.id;
63 s -> called = 0;
64 s -> cur = seq->start;
65 s -> cached = seq->start;
66
67 if (!isNew(seq) && logger_funcs.get_sequence(seq->base.id, &id ))
68 s->cached = id;
69 s -> cur = s->cached;
70 return s;
71}
72
73int
74seq_restart(sql_sequence *seq, lng start)
75{
76 node *n = NULL;
77 store_sequence *s;
78
79 assert(!is_lng_nil(start));
80 store_lock();
81 for ( n = sql_seqs->h; n; n = n ->next ) {
82 s = n->data;
83 if (s->seqid == seq->base.id)
84 break;
85 }
86 if (!n) {
87 s = sql_create_sequence(seq);
88 if (!s) {
89 store_unlock();
90 return 0;
91 }
92 list_append(sql_seqs, s);
93 } else {
94 s = n->data;
95 }
96 s->called = 0;
97 s->cur = start;
98 s->cached = start;
99 /* handle min/max and cycle */
100 if ((seq->maxvalue && s->cur > seq->maxvalue) ||
101 (seq->minvalue && s->cur < seq->minvalue))
102 {
103 /* we're out of numbers */
104 store_unlock();
105 return 0;
106 }
107 sql_update_sequence_cache(seq, s->cached);
108 store_unlock();
109 return 1;
110}
111
112int
113seq_next_value(sql_sequence *seq, lng *val)
114{
115 lng nr = 0;
116 node *n = NULL;
117 store_sequence *s;
118 int save = 0;
119
120 *val = 0;
121 store_lock();
122 for ( n = sql_seqs->h; n; n = n ->next ) {
123 s = n->data;
124 if (s->seqid == seq->base.id)
125 break;
126 }
127 if (!n) {
128 s = sql_create_sequence(seq);
129 if (!s) {
130 store_unlock();
131 return 0;
132 }
133 list_append(sql_seqs, s);
134 } else {
135 s = n->data;
136 if (s->called)
137 s->cur += seq->increment;
138 }
139 /* handle min/max and cycle */
140 if ((seq->maxvalue && s->cur > seq->maxvalue) ||
141 (seq->minvalue && s->cur < seq->minvalue))
142 {
143 if (seq->cycle) {
144 /* cycle to the min value again */
145 s->cur = seq->minvalue;
146 save = 1;
147 } else { /* we're out of numbers */
148 store_unlock();
149 return 0;
150 }
151 }
152 s->called = 1;
153 nr = s->cur;
154 *val = nr;
155 if (save || nr == s->cached) {
156 s->cached = nr + seq->cacheinc*seq->increment;
157 sql_update_sequence_cache(seq, s->cached);
158 store_unlock();
159 return 1;
160 }
161 assert(nr<s->cached);
162 store_unlock();
163 return 1;
164}
165
166seqbulk *
167seqbulk_create(sql_sequence *seq, BUN cnt)
168{
169 seqbulk *sb = MNEW(seqbulk);
170 store_sequence *s;
171 node *n = NULL;
172
173 if (!sb)
174 return NULL;
175
176 store_lock();
177 sb->seq = seq;
178 sb->cnt = cnt;
179 sb->save = 0;
180
181 for ( n = sql_seqs->h; n; n = n ->next ) {
182 s = n->data;
183 if (s->seqid == seq->base.id)
184 break;
185 }
186 if (!n) {
187 s = sql_create_sequence(seq);
188 if (!s) {
189 _DELETE(sb);
190 store_unlock();
191 return NULL;
192 }
193 list_append(sql_seqs, s);
194 } else {
195 s = n->data;
196 }
197 sb->internal_seq = s;
198 return sb;
199}
200
201void
202seqbulk_destroy(seqbulk *sb)
203{
204 if (sb->save) {
205 sql_sequence *seq = sb->seq;
206 store_sequence *s = sb->internal_seq;
207
208 sql_update_sequence_cache(seq, s->cached);
209 }
210 _DELETE(sb);
211 store_unlock();
212}
213
214int
215seqbulk_next_value(seqbulk *sb, lng *val)
216{
217 lng nr = 0;
218 store_sequence *s = sb->internal_seq;
219 sql_sequence *seq = sb->seq;
220
221 if (s->called)
222 s->cur += seq->increment;
223
224 /* handle min/max and cycle */
225 if ((seq->maxvalue && s->cur > seq->maxvalue) ||
226 (seq->minvalue && s->cur < seq->minvalue))
227 {
228 if (seq->cycle) {
229 /* cycle to the min value again */
230 s->cur = seq->minvalue;
231 sb->save = 1;
232 } else { /* we're out of numbers */
233 return 0;
234 }
235 }
236 s->called = 1;
237 nr = s->cur;
238 *val = nr;
239 if (nr == s->cached) {
240 s->cached = nr + seq->cacheinc*seq->increment;
241 sb->save = 1;
242 return 1;
243 }
244 assert(nr<s->cached);
245 return 1;
246}
247
248int
249seq_get_value(sql_sequence *seq, lng *val)
250{
251 node *n = NULL;
252 store_sequence *s;
253
254 *val = 0;
255 store_lock();
256 for ( n = sql_seqs->h; n; n = n ->next ) {
257 s = n->data;
258 if (s->seqid == seq->base.id)
259 break;
260 }
261 if (!n) {
262 s = sql_create_sequence(seq);
263 if (!s) {
264 store_unlock();
265 return 0;
266 }
267 list_append(sql_seqs, s);
268 } else {
269 s = n->data;
270 }
271 *val = s->cur;
272 if (s->called)
273 *val += seq->increment;
274 /* handle min/max and cycle */
275 if ((seq->maxvalue && *val > seq->maxvalue) ||
276 (seq->minvalue && *val < seq->minvalue))
277 {
278 if (seq->cycle) {
279 /* cycle to the min value again */
280 *val = seq->minvalue;
281 } else { /* we're out of numbers */
282 store_unlock();
283 return 0;
284 }
285 }
286 store_unlock();
287 return 1;
288}
289
290int
291seqbulk_get_value(seqbulk *sb, lng *val)
292{
293 store_sequence *s = sb->internal_seq;
294 sql_sequence *seq = sb->seq;
295
296 *val = s->cur;
297 if (s->called)
298 *val += seq->increment;
299 /* handle min/max and cycle */
300 if ((seq->maxvalue && s->cur > seq->maxvalue) ||
301 (seq->minvalue && s->cur < seq->minvalue))
302 {
303 if (seq->cycle) {
304 /* cycle to the min value again */
305 s->cur = seq->minvalue;
306 sb->save = 1;
307 } else { /* we're out of numbers */
308 return 0;
309 }
310 }
311 return 1;
312}
313
314int
315seqbulk_restart(seqbulk *sb, lng start)
316{
317 store_sequence *s = sb->internal_seq;
318 sql_sequence *seq = sb->seq;
319
320 assert(!is_lng_nil(start));
321 s->called = 0;
322 s->cur = start;
323 s->cached = start;
324 /* handle min/max and cycle */
325 if ((seq->maxvalue && s->cur > seq->maxvalue) ||
326 (seq->minvalue && s->cur < seq->minvalue))
327 {
328 return 0;
329 }
330 sql_update_sequence_cache(seq, s->cached);
331 return 1;
332}
333