1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <toku_portability.h>
40#include "toku_os.h"
41#include <errno.h>
42#include <toku_assert.h>
43#include "queue.h"
44#include "memory.h"
45#include <toku_pthread.h>
46
47toku_instr_key *queue_result_mutex_key;
48toku_instr_key *queue_result_cond_key;
49
50struct qitem;
51
52struct qitem {
53 void *item;
54 struct qitem *next;
55 uint64_t weight;
56};
57
58struct queue {
59 uint64_t contents_weight; // how much stuff is in there?
60 uint64_t weight_limit; // Block enqueueing when the contents gets to be bigger than the weight.
61 struct qitem *head, *tail;
62
63 bool eof;
64
65 toku_mutex_t mutex;
66 toku_cond_t cond;
67};
68
69// Representation invariant:
70// q->contents_weight is the sum of the weights of everything in the queue.
71// q->weight_limit is the limit on the weight before we block.
72// q->head is the oldest thing in the queue. q->tail is the newest. (If nothing is in the queue then both are NULL)
73// If q->head is not null:
74// q->head->item is the oldest item.
75// q->head->weight is the weight of that item.
76// q->head->next is the next youngest thing.
77// q->eof indicates that the producer has said "that's all".
78// q->mutex and q->cond are used as condition variables.
79
80
81int toku_queue_create (QUEUE *q, uint64_t weight_limit)
82{
83 QUEUE CALLOC(result);
84 if (result==NULL) return get_error_errno();
85 result->contents_weight = 0;
86 result->weight_limit = weight_limit;
87 result->head = NULL;
88 result->tail = NULL;
89 result->eof = false;
90 toku_mutex_init(*queue_result_mutex_key, &result->mutex, nullptr);
91 toku_cond_init(*queue_result_cond_key, &result->cond, nullptr);
92 *q = result;
93 return 0;
94}
95
96int toku_queue_destroy (QUEUE q)
97{
98 if (q->head) return EINVAL;
99 assert(q->contents_weight==0);
100 toku_mutex_destroy(&q->mutex);
101 toku_cond_destroy(&q->cond);
102 toku_free(q);
103 return 0;
104}
105
106int toku_queue_enq (QUEUE q, void *item, uint64_t weight, uint64_t *total_weight_after_enq)
107{
108 toku_mutex_lock(&q->mutex);
109 assert(!q->eof);
110 // Go ahead and put it in, even if it's too much.
111 struct qitem *MALLOC(qi);
112 if (qi==NULL) {
113 int r = get_error_errno();
114 toku_mutex_unlock(&q->mutex);
115 return r;
116 }
117 q->contents_weight += weight;
118 qi->item = item;
119 qi->weight = weight;
120 qi->next = NULL;
121 if (q->tail) {
122 q->tail->next = qi;
123 } else {
124 assert(q->head==NULL);
125 q->head = qi;
126 }
127 q->tail = qi;
128 // Wake up the consumer.
129 toku_cond_signal(&q->cond);
130 // Now block if there's too much stuff in there.
131 while (q->weight_limit < q->contents_weight) {
132 toku_cond_wait(&q->cond, &q->mutex);
133 }
134 // we are allowed to return.
135 if (total_weight_after_enq) {
136 *total_weight_after_enq = q->contents_weight;
137 }
138 toku_mutex_unlock(&q->mutex);
139 return 0;
140}
141
142int toku_queue_eof (QUEUE q)
143{
144 toku_mutex_lock(&q->mutex);
145 assert(!q->eof);
146 q->eof = true;
147 toku_cond_signal(&q->cond);
148 toku_mutex_unlock(&q->mutex);
149 return 0;
150}
151
152int toku_queue_deq (QUEUE q, void **item, uint64_t *weight, uint64_t *total_weight_after_deq)
153{
154 toku_mutex_lock(&q->mutex);
155 int result;
156 while (q->head==NULL && !q->eof) {
157 toku_cond_wait(&q->cond, &q->mutex);
158 }
159 if (q->head==NULL) {
160 assert(q->eof);
161 result = EOF;
162 } else {
163 struct qitem *head = q->head;
164 q->contents_weight -= head->weight;
165 *item = head->item;
166 if (weight)
167 *weight = head->weight;
168 if (total_weight_after_deq)
169 *total_weight_after_deq = q->contents_weight;
170 q->head = head->next;
171 toku_free(head);
172 if (q->head==NULL) {
173 q->tail = NULL;
174 }
175 // wake up the producer, since we decreased the contents_weight.
176 toku_cond_signal(&q->cond);
177 // Successful result.
178 result = 0;
179 }
180 toku_mutex_unlock(&q->mutex);
181 return result;
182}
183