1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9/*#define DEBUG*/
10
11#include "monetdb_config.h"
12#include "rel_distribute.h"
13#include "rel_rel.h"
14#include "rel_exp.h"
15#include "rel_prop.h"
16#include "rel_dump.h"
17
18static int
19has_remote_or_replica( sql_rel *rel )
20{
21 if (!rel)
22 return 0;
23
24 switch (rel->op) {
25 case op_basetable: {
26 sql_table *t = rel->l;
27
28 if (t && (isReplicaTable(t) || isRemote(t)))
29 return 1;
30 break;
31 }
32 case op_table:
33 break;
34 case op_join:
35 case op_left:
36 case op_right:
37 case op_full:
38
39 case op_semi:
40 case op_anti:
41
42 case op_union:
43 case op_inter:
44 case op_except:
45 if (has_remote_or_replica( rel->l ) ||
46 has_remote_or_replica( rel->r ))
47 return 1;
48 break;
49 case op_project:
50 case op_select:
51 case op_groupby:
52 case op_topn:
53 case op_sample:
54 if (has_remote_or_replica( rel->l ))
55 return 1;
56 break;
57 case op_ddl:
58 if (has_remote_or_replica( rel->l ))
59 return 1;
60 /* fall through */
61 case op_insert:
62 case op_update:
63 case op_delete:
64 case op_truncate:
65 if (rel->r && has_remote_or_replica( rel->r ))
66 return 1;
67 break;
68 }
69 return 0;
70}
71
72static sql_rel *
73rewrite_replica( mvc *sql, sql_rel *rel, sql_table *t, sql_part *pd, int remote_prop)
74{
75 node *n, *m;
76 sql_table *p = find_sql_table(t->s, pd->base.name);
77 sql_rel *r = rel_basetable(sql, p, t->base.name);
78
79 for (n = rel->exps->h, m = r->exps->h; n && m; n = n->next, m = m->next) {
80 sql_exp *e = n->data;
81 sql_exp *ne = m->data;
82
83 exp_prop_alias(sql->sa, ne, e);
84 }
85 rel_destroy(rel);
86
87 /* set_remote() */
88 if (remote_prop && p && isRemote(p)) {
89 char *local_name = sa_strconcat(sql->sa, sa_strconcat(sql->sa, p->s->base.name, "."), p->base.name);
90 if (!local_name) {
91 return NULL;
92 }
93 prop *p = r->p = prop_create(sql->sa, PROP_REMOTE, r->p);
94 if (!p) {
95 return NULL;
96 }
97
98 p->value = local_name;
99 }
100 return r;
101}
102
103static list * exps_replica(mvc *sql, list *exps, char *uri) ;
104static sql_rel * replica(mvc *sql, sql_rel *rel, char *uri);
105
106static sql_exp *
107exp_replica(mvc *sql, sql_exp *e, char *uri)
108{
109 if (e->type != e_psm)
110 return e;
111 if (e->flag & PSM_VAR)
112 return e;
113 if (e->flag & PSM_SET || e->flag & PSM_RETURN)
114 e->l = exp_replica(sql, e->l, uri);
115 if (e->flag & PSM_WHILE || e->flag & PSM_IF) {
116 e->l = exp_replica(sql, e->l, uri);
117 e->r = exps_replica(sql, e->r, uri);
118 if (e->f)
119 e->f = exps_replica(sql, e->f, uri);
120 return e;
121 }
122 if (e->flag & PSM_REL)
123 e->l = replica(sql, e->l, uri);
124 if (e->flag & PSM_EXCEPTION)
125 e->l = exp_replica(sql, e->l, uri);
126 return e;
127}
128
129static list *
130exps_replica(mvc *sql, list *exps, char *uri)
131{
132 node *n;
133
134 if (!exps)
135 return exps;
136 for( n = exps->h; n; n = n->next)
137 n->data = exp_replica(sql, n->data, uri);
138 return exps;
139}
140
141static sql_rel *
142replica(mvc *sql, sql_rel *rel, char *uri)
143{
144 if (!rel)
145 return rel;
146
147 if (rel_is_ref(rel)) {
148 if (has_remote_or_replica(rel)) {
149 sql_rel *nrel = rel_copy(sql, rel, 0);
150
151 if (nrel && rel->p)
152 nrel->p = prop_copy(sql->sa, rel->p);
153 rel_destroy(rel);
154 rel = nrel;
155 } else {
156 return rel;
157 }
158 }
159 switch (rel->op) {
160 case op_basetable: {
161 sql_table *t = rel->l;
162
163 if (t && isReplicaTable(t)) {
164 node *n;
165
166 if (uri) {
167 /* replace by the replica which matches the uri */
168 for (n = t->members.set->h; n; n = n->next) {
169 sql_part *p = n->data;
170 sql_table *pt = find_sql_table(t->s, p->base.name);
171
172 if (isRemote(pt) && strcmp(uri, pt->query) == 0) {
173 rel = rewrite_replica(sql, rel, t, p, 0);
174 break;
175 }
176 }
177 } else { /* no match, find one without remote or use first */
178 if (t->members.set) {
179 int fnd = 0;
180 sql_part *p;
181 for (n = t->members.set->h; n; n = n->next) {
182 sql_part *p = n->data;
183 sql_table *pt = find_sql_table(t->s, p->base.name);
184
185 if (!isRemote(pt)) {
186 fnd = 1;
187 rel = rewrite_replica(sql, rel, t, p, 0);
188 break;
189 }
190 }
191 if (!fnd) {
192 p = t->members.set->h->data;
193 rel = rewrite_replica(sql, rel, t, p, 1);
194 }
195 } else {
196 rel = NULL;
197 }
198 }
199 }
200 break;
201 }
202 case op_table:
203 break;
204 case op_join:
205 case op_left:
206 case op_right:
207 case op_full:
208
209 case op_semi:
210 case op_anti:
211
212 case op_union:
213 case op_inter:
214 case op_except:
215 rel->l = replica(sql, rel->l, uri);
216 rel->r = replica(sql, rel->r, uri);
217 break;
218 case op_project:
219 case op_select:
220 case op_groupby:
221 case op_topn:
222 case op_sample:
223 rel->l = replica(sql, rel->l, uri);
224 break;
225 case op_ddl:
226 if ((rel->flag == ddl_psm || rel->flag == ddl_exception) && rel->exps)
227 rel->exps = exps_replica(sql, rel->exps, uri);
228 rel->l = replica(sql, rel->l, uri);
229 if (rel->r)
230 rel->r = replica(sql, rel->r, uri);
231 break;
232 case op_insert:
233 case op_update:
234 case op_delete:
235 case op_truncate:
236 rel->r = replica(sql, rel->r, uri);
237 break;
238 }
239 return rel;
240}
241
242static list * exps_distribute(mvc *sql, list *exps) ;
243static sql_rel * distribute(mvc *sql, sql_rel *rel);
244
245static sql_exp *
246exp_distribute(mvc *sql, sql_exp *e)
247{
248 if (e->type != e_psm)
249 return e;
250 if (e->flag & PSM_VAR)
251 return e;
252 if (e->flag & PSM_SET || e->flag & PSM_RETURN)
253 e->l = exp_distribute(sql, e->l);
254 if (e->flag & PSM_WHILE || e->flag & PSM_IF) {
255 e->l = exp_distribute(sql, e->l);
256 e->r = exps_distribute(sql, e->r);
257 if (e->f)
258 e->f = exps_distribute(sql, e->f);
259 return e;
260 }
261 if (e->flag & PSM_REL)
262 e->l = distribute(sql, e->l);
263 if (e->flag & PSM_EXCEPTION)
264 e->l = exp_distribute(sql, e->l);
265 return e;
266}
267
268static list *
269exps_distribute(mvc *sql, list *exps)
270{
271 node *n;
272
273 if (!exps)
274 return exps;
275 for( n = exps->h; n; n = n->next)
276 n->data = exp_distribute(sql, n->data);
277 return exps;
278}
279
280static sql_rel *
281distribute(mvc *sql, sql_rel *rel)
282{
283 sql_rel *l = NULL, *r = NULL;
284 prop *p, *pl, *pr;
285
286 if (!rel)
287 return rel;
288
289 if (rel_is_ref(rel)) {
290 if (has_remote_or_replica(rel)) {
291 sql_rel *nrel = rel_copy(sql, rel, 0);
292
293 if (nrel && rel->p)
294 nrel->p = prop_copy(sql->sa, rel->p);
295 rel_destroy(rel);
296 rel = nrel;
297 } else {
298 return rel;
299 }
300 }
301
302 switch (rel->op) {
303 case op_basetable: {
304 sql_table *t = rel->l;
305
306 /* set_remote() */
307 if (t && isRemote(t)) {
308 //TODO: check for allocation failure
309 char *local_name = sa_strconcat(sql->sa, sa_strconcat(sql->sa, t->s->base.name, "."), t->base.name);
310 if (!local_name) {
311 return NULL;
312 }
313
314 p = rel->p = prop_create(sql->sa, PROP_REMOTE, rel->p);
315 if (!p) {
316 return NULL;
317 }
318 p->value = local_name;
319 }
320 break;
321 }
322 case op_table:
323 break;
324 case op_join:
325 case op_left:
326 case op_right:
327 case op_full:
328
329 case op_semi:
330 case op_anti:
331
332 case op_union:
333 case op_inter:
334 case op_except:
335 l = rel->l = distribute(sql, rel->l);
336 r = rel->r = distribute(sql, rel->r);
337
338 if (is_join(rel->op) && list_empty(rel->exps) &&
339 find_prop(l->p, PROP_REMOTE) == NULL &&
340 find_prop(r->p, PROP_REMOTE) == NULL) {
341 /* cleanup replica's */
342 l = rel->l = replica(sql, l, NULL);
343 r = rel->r = replica(sql, r, NULL);
344 }
345 if (l && (pl = find_prop(l->p, PROP_REMOTE)) != NULL &&
346 r && find_prop(r->p, PROP_REMOTE) == NULL) {
347 r = rel->r = distribute(sql, replica(sql, rel->r, pl->value));
348 } else if (l && find_prop(l->p, PROP_REMOTE) == NULL &&
349 r && (pr = find_prop(r->p, PROP_REMOTE)) != NULL) {
350 l = rel->l = distribute(sql, replica(sql, rel->l, pr->value));
351 }
352
353 if (l && (pl = find_prop(l->p, PROP_REMOTE)) != NULL &&
354 r && (pr = find_prop(r->p, PROP_REMOTE)) != NULL &&
355 strcmp(pl->value, pr->value) == 0) {
356 l->p = prop_remove(l->p, pl);
357 r->p = prop_remove(r->p, pr);
358 pl->p = rel->p;
359 rel->p = pl;
360 }
361 break;
362 case op_project:
363 case op_select:
364 case op_groupby:
365 case op_topn:
366 case op_sample:
367 rel->l = distribute(sql, rel->l);
368 l = rel->l;
369 if (l && (p = find_prop(l->p, PROP_REMOTE)) != NULL) {
370 l->p = prop_remove(l->p, p);
371 p->p = rel->p;
372 rel->p = p;
373 }
374 break;
375 case op_ddl:
376 if ((rel->flag == ddl_psm || rel->flag == ddl_exception) && rel->exps)
377 rel->exps = exps_distribute(sql, rel->exps);
378 rel->l = distribute(sql, rel->l);
379 if (rel->r)
380 rel->r = distribute(sql, rel->r);
381 break;
382 case op_insert:
383 case op_update:
384 case op_delete:
385 case op_truncate:
386 rel->r = distribute(sql, rel->r);
387 break;
388 }
389 return rel;
390}
391
392static list * exps_remote_func(mvc *sql, list *exps) ;
393static sql_rel * rel_remote_func(mvc *sql, sql_rel *rel);
394
395static sql_exp *
396exp_remote_func(mvc *sql, sql_exp *e)
397{
398 if (e->type != e_psm)
399 return e;
400 if (e->flag & PSM_VAR)
401 return e;
402 if (e->flag & PSM_SET || e->flag & PSM_RETURN)
403 e->l = exp_remote_func(sql, e->l);
404 if (e->flag & PSM_WHILE || e->flag & PSM_IF) {
405 e->l = exp_remote_func(sql, e->l);
406 e->r = exps_remote_func(sql, e->r);
407 if (e->f)
408 e->f = exps_remote_func(sql, e->f);
409 return e;
410 }
411 if (e->flag & PSM_REL)
412 e->l = rel_remote_func(sql, e->l);
413 if (e->flag & PSM_EXCEPTION)
414 e->l = exp_remote_func(sql, e->l);
415 return e;
416}
417
418static list *
419exps_remote_func(mvc *sql, list *exps)
420{
421 node *n;
422
423 if (!exps)
424 return exps;
425 for( n = exps->h; n; n = n->next)
426 n->data = exp_remote_func(sql, n->data);
427 return exps;
428}
429
430static sql_rel *
431rel_remote_func(mvc *sql, sql_rel *rel)
432{
433 if (!rel)
434 return rel;
435
436 switch (rel->op) {
437 case op_basetable:
438 case op_table:
439 break;
440 case op_join:
441 case op_left:
442 case op_right:
443 case op_full:
444
445 case op_semi:
446 case op_anti:
447
448 case op_union:
449 case op_inter:
450 case op_except:
451 rel->l = rel_remote_func(sql, rel->l);
452 rel->r = rel_remote_func(sql, rel->r);
453 break;
454 case op_project:
455 case op_select:
456 case op_groupby:
457 case op_topn:
458 case op_sample:
459 rel->l = rel_remote_func(sql, rel->l);
460 break;
461 case op_ddl:
462 if ((rel->flag == ddl_psm || rel->flag == ddl_exception) && rel->exps)
463 rel->exps = exps_remote_func(sql, rel->exps);
464 rel->l = rel_remote_func(sql, rel->l);
465 if (rel->r)
466 rel->r = rel_remote_func(sql, rel->r);
467 break;
468 case op_insert:
469 case op_update:
470 case op_delete:
471 case op_truncate:
472 rel->r = rel_remote_func(sql, rel->r);
473 break;
474 }
475 if (find_prop(rel->p, PROP_REMOTE) != NULL) {
476 list *exps = rel_projections(sql, rel, NULL, 1, 1);
477 rel = rel_relational_func(sql->sa, rel, exps);
478 }
479 return rel;
480}
481
482sql_rel *
483rel_distribute(mvc *sql, sql_rel *rel)
484{
485 rel = distribute(sql, rel);
486 rel = replica(sql, rel, NULL);
487 return rel_remote_func(sql, rel);
488}
489