1/*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2018 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#ifndef _RDAVG_H_
30#define _RDAVG_H_
31
32
33#if WITH_HDRHISTOGRAM
34#include "rdhdrhistogram.h"
35#endif
36
37typedef struct rd_avg_s {
38 struct {
39 int64_t maxv;
40 int64_t minv;
41 int64_t avg;
42 int64_t sum;
43 int cnt;
44 rd_ts_t start;
45 } ra_v;
46 mtx_t ra_lock;
47 int ra_enabled;
48 enum {
49 RD_AVG_GAUGE,
50 RD_AVG_COUNTER,
51 } ra_type;
52#if WITH_HDRHISTOGRAM
53 rd_hdr_histogram_t *ra_hdr;
54#endif
55 /* Histogram results, calculated for dst in rollover().
56 * Will be all zeroes if histograms are not supported. */
57 struct {
58 /* Quantiles */
59 int64_t p50;
60 int64_t p75;
61 int64_t p90;
62 int64_t p95;
63 int64_t p99;
64 int64_t p99_99;
65
66 int64_t oor; /**< Values out of range */
67 int32_t hdrsize; /**< hdr.allocatedSize */
68 double stddev;
69 double mean;
70 } ra_hist;
71} rd_avg_t;
72
73
74/**
75 * @brief Add value \p v to averager \p ra.
76 */
77static RD_UNUSED void rd_avg_add (rd_avg_t *ra, int64_t v) {
78 mtx_lock(&ra->ra_lock);
79 if (!ra->ra_enabled) {
80 mtx_unlock(&ra->ra_lock);
81 return;
82 }
83 if (v > ra->ra_v.maxv)
84 ra->ra_v.maxv = v;
85 if (ra->ra_v.minv == 0 || v < ra->ra_v.minv)
86 ra->ra_v.minv = v;
87 ra->ra_v.sum += v;
88 ra->ra_v.cnt++;
89#if WITH_HDRHISTOGRAM
90 rd_hdr_histogram_record(ra->ra_hdr, v);
91#endif
92 mtx_unlock(&ra->ra_lock);
93}
94
95
96/**
97 * @brief Calculate the average
98 */
99static RD_UNUSED void rd_avg_calc (rd_avg_t *ra, rd_ts_t now) {
100 if (ra->ra_type == RD_AVG_GAUGE) {
101 if (ra->ra_v.cnt)
102 ra->ra_v.avg = ra->ra_v.sum / ra->ra_v.cnt;
103 else
104 ra->ra_v.avg = 0;
105 } else {
106 rd_ts_t elapsed = now - ra->ra_v.start;
107
108 if (elapsed)
109 ra->ra_v.avg = (ra->ra_v.sum * 1000000llu) / elapsed;
110 else
111 ra->ra_v.avg = 0;
112
113 ra->ra_v.start = elapsed;
114 }
115}
116
117
118/**
119 * @returns the quantile \q for \p ra, or 0 if histograms are not supported
120 * in this build.
121 *
122 * @remark ra will be not locked by this function.
123 */
124static RD_UNUSED int64_t
125rd_avg_quantile (const rd_avg_t *ra, double q) {
126#if WITH_HDRHISTOGRAM
127 return rd_hdr_histogram_quantile(ra->ra_hdr, q);
128#else
129 return 0;
130#endif
131}
132
133/**
134 * @brief Rolls over statistics in \p src and stores the average in \p dst.
135 * \p src is cleared and ready to be reused.
136 *
137 * Caller must free avg internal members by calling rd_avg_destroy()
138 * on the \p dst.
139 */
140static RD_UNUSED void rd_avg_rollover (rd_avg_t *dst, rd_avg_t *src) {
141 rd_ts_t now;
142
143 mtx_lock(&src->ra_lock);
144 if (!src->ra_enabled) {
145 memset(dst, 0, sizeof(*dst));
146 dst->ra_type = src->ra_type;
147 mtx_unlock(&src->ra_lock);
148 return;
149 }
150
151 mtx_init(&dst->ra_lock, mtx_plain);
152 dst->ra_type = src->ra_type;
153 dst->ra_v = src->ra_v;
154#if WITH_HDRHISTOGRAM
155 dst->ra_hdr = NULL;
156
157 dst->ra_hist.stddev = rd_hdr_histogram_stddev(src->ra_hdr);
158 dst->ra_hist.mean = rd_hdr_histogram_mean(src->ra_hdr);
159 dst->ra_hist.oor = src->ra_hdr->outOfRangeCount;
160 dst->ra_hist.hdrsize = src->ra_hdr->allocatedSize;
161 dst->ra_hist.p50 = rd_hdr_histogram_quantile(src->ra_hdr, 50.0);
162 dst->ra_hist.p75 = rd_hdr_histogram_quantile(src->ra_hdr, 75.0);
163 dst->ra_hist.p90 = rd_hdr_histogram_quantile(src->ra_hdr, 90.0);
164 dst->ra_hist.p95 = rd_hdr_histogram_quantile(src->ra_hdr, 95.0);
165 dst->ra_hist.p99 = rd_hdr_histogram_quantile(src->ra_hdr, 99.0);
166 dst->ra_hist.p99_99 = rd_hdr_histogram_quantile(src->ra_hdr, 99.99);
167#else
168 memset(&dst->ra_hist, 0, sizeof(dst->ra_hist));
169#endif
170 memset(&src->ra_v, 0, sizeof(src->ra_v));
171
172 now = rd_clock();
173 src->ra_v.start = now;
174
175#if WITH_HDRHISTOGRAM
176 /* Adapt histogram span to fit future out of range entries
177 * from this period. */
178 if (src->ra_hdr->totalCount > 0) {
179 int64_t vmin = src->ra_hdr->lowestTrackableValue;
180 int64_t vmax = src->ra_hdr->highestTrackableValue;
181 int64_t mindiff, maxdiff;
182
183 mindiff = src->ra_hdr->lowestTrackableValue -
184 src->ra_hdr->lowestOutOfRange;
185
186 if (mindiff > 0) {
187 /* There were low out of range values, grow lower
188 * span to fit lowest out of range value + 20%. */
189 vmin = src->ra_hdr->lowestOutOfRange +
190 (int64_t)((double)mindiff * 0.2);
191 }
192
193 maxdiff = src->ra_hdr->highestOutOfRange -
194 src->ra_hdr->highestTrackableValue;
195
196 if (maxdiff > 0) {
197 /* There were high out of range values, grow higher
198 * span to fit highest out of range value + 20%. */
199 vmax = src->ra_hdr->highestOutOfRange +
200 (int64_t)((double)maxdiff * 0.2);
201 }
202
203 if (vmin == src->ra_hdr->lowestTrackableValue &&
204 vmax == src->ra_hdr->highestTrackableValue) {
205 /* No change in min,max, use existing hdr */
206 rd_hdr_histogram_reset(src->ra_hdr);
207
208 } else {
209 int sigfigs = (int)src->ra_hdr->significantFigures;
210 /* Create new hdr for adapted range */
211 rd_hdr_histogram_destroy(src->ra_hdr);
212 src->ra_hdr = rd_hdr_histogram_new(vmin, vmax, sigfigs);
213 }
214
215 } else {
216 /* No records, no need to reset. */
217 }
218#endif
219
220 mtx_unlock(&src->ra_lock);
221
222 rd_avg_calc(dst, now);
223}
224
225
226/**
227 * Initialize an averager
228 */
229static RD_UNUSED void rd_avg_init (rd_avg_t *ra, int type,
230 int64_t exp_min, int64_t exp_max,
231 int sigfigs, int enable) {
232 memset(ra, 0, sizeof(*ra));
233 mtx_init(&ra->ra_lock, 0);
234 ra->ra_enabled = enable;
235 if (!enable)
236 return;
237 ra->ra_type = type;
238 ra->ra_v.start = rd_clock();
239#if WITH_HDRHISTOGRAM
240 /* Start off the histogram with expected min,max span,
241 * we'll adapt the size on each rollover. */
242 ra->ra_hdr = rd_hdr_histogram_new(exp_min, exp_max, sigfigs);
243#endif
244}
245
246
247/**
248 * Destroy averager
249 */
250static RD_UNUSED void rd_avg_destroy (rd_avg_t *ra) {
251#if WITH_HDRHISTOGRAM
252 if (ra->ra_hdr)
253 rd_hdr_histogram_destroy(ra->ra_hdr);
254#endif
255 mtx_destroy(&ra->ra_lock);
256}
257
258#endif /* _RDAVG_H_ */
259