1// Licensed to the .NET Foundation under one or more agreements.
2// The .NET Foundation licenses this file to you under the MIT license.
3// See the LICENSE file in the project root for more information.
4
5//=========================================================================
6
7//
8// HillClimbing.cpp
9//
10// Defines classes for the ThreadPool's HillClimbing concurrency-optimization
11// algorithm.
12//
13
14//=========================================================================
15
16//
17// TODO: write an essay about how/why this works. Maybe put it in BotR?
18//
19
20#include "common.h"
21#include "hillclimbing.h"
22#include "win32threadpool.h"
23
24//
25// Default compilation mode is /fp:precise, which disables fp intrinsics. This causes us to pull in FP stuff (sin,cos,etc.) from
26// The CRT, and increases our download size by ~5k. We don't need the extra precision this gets us, so let's switch to
27// the intrinsic versions.
28//
29#ifdef _MSC_VER
30#pragma float_control(precise, off)
31#endif
32
33
34
35const double pi = 3.141592653589793;
36
37void HillClimbing::Initialize()
38{
39 CONTRACTL
40 {
41 THROWS;
42 GC_NOTRIGGER;
43 MODE_ANY;
44 }
45 CONTRACTL_END;
46
47 m_wavePeriod = CLRConfig::GetConfigValue(CLRConfig::INTERNAL_HillClimbing_WavePeriod);
48 m_maxThreadWaveMagnitude = CLRConfig::GetConfigValue(CLRConfig::INTERNAL_HillClimbing_MaxWaveMagnitude);
49 m_threadMagnitudeMultiplier = (double)CLRConfig::GetConfigValue(CLRConfig::INTERNAL_HillClimbing_WaveMagnitudeMultiplier) / 100.0;
50 m_samplesToMeasure = m_wavePeriod * (int)CLRConfig::GetConfigValue(CLRConfig::INTERNAL_HillClimbing_WaveHistorySize);
51 m_targetThroughputRatio = (double)CLRConfig::GetConfigValue(CLRConfig::INTERNAL_HillClimbing_Bias) / 100.0;
52 m_targetSignalToNoiseRatio = (double)CLRConfig::GetConfigValue(CLRConfig::INTERNAL_HillClimbing_TargetSignalToNoiseRatio) / 100.0;
53 m_maxChangePerSecond = (double)CLRConfig::GetConfigValue(CLRConfig::INTERNAL_HillClimbing_MaxChangePerSecond);
54 m_maxChangePerSample = (double)CLRConfig::GetConfigValue(CLRConfig::INTERNAL_HillClimbing_MaxChangePerSample);
55 m_sampleIntervalLow = CLRConfig::GetConfigValue(CLRConfig::INTERNAL_HillClimbing_SampleIntervalLow);
56 m_sampleIntervalHigh = CLRConfig::GetConfigValue(CLRConfig::INTERNAL_HillClimbing_SampleIntervalHigh);
57 m_throughputErrorSmoothingFactor = (double)CLRConfig::GetConfigValue(CLRConfig::INTERNAL_HillClimbing_ErrorSmoothingFactor) / 100.0;
58 m_gainExponent = (double)CLRConfig::GetConfigValue(CLRConfig::INTERNAL_HillClimbing_GainExponent) / 100.0;
59 m_maxSampleError = (double)CLRConfig::GetConfigValue(CLRConfig::INTERNAL_HillClimbing_MaxSampleErrorPercent) / 100.0;
60 m_currentControlSetting = 0;
61 m_totalSamples = 0;
62 m_lastThreadCount = 0;
63 m_averageThroughputNoise = 0;
64 m_elapsedSinceLastChange = 0;
65 m_completionsSinceLastChange = 0;
66 m_accumulatedCompletionCount = 0;
67 m_accumulatedSampleDuration = 0;
68
69 m_samples = new double[m_samplesToMeasure];
70 m_threadCounts = new double[m_samplesToMeasure];
71
72 // seed our random number generator with the CLR instance ID and the process ID, to avoid correlations with other CLR ThreadPool instances.
73#ifndef DACCESS_COMPILE
74 m_randomIntervalGenerator.Init(((int)GetClrInstanceId() << 16) ^ (int)GetCurrentProcessId());
75#endif
76 m_currentSampleInterval = m_randomIntervalGenerator.Next(m_sampleIntervalLow, m_sampleIntervalHigh+1);
77}
78
79int HillClimbing::Update(int currentThreadCount, double sampleDuration, int numCompletions, int* pNewSampleInterval)
80{
81 LIMITED_METHOD_CONTRACT;
82
83#ifdef DACCESS_COMPILE
84 return 1;
85#else
86
87 //
88 // If someone changed the thread count without telling us, update our records accordingly.
89 //
90 if (currentThreadCount != m_lastThreadCount)
91 ForceChange(currentThreadCount, Initializing);
92
93 //
94 // Update the cumulative stats for this thread count
95 //
96 m_elapsedSinceLastChange += sampleDuration;
97 m_completionsSinceLastChange += numCompletions;
98
99 //
100 // Add in any data we've already collected about this sample
101 //
102 sampleDuration += m_accumulatedSampleDuration;
103 numCompletions += m_accumulatedCompletionCount;
104
105 //
106 // We need to make sure we're collecting reasonably accurate data. Since we're just counting the end
107 // of each work item, we are goinng to be missing some data about what really happened during the
108 // sample interval. The count produced by each thread includes an initial work item that may have
109 // started well before the start of the interval, and each thread may have been running some new
110 // work item for some time before the end of the interval, which did not yet get counted. So
111 // our count is going to be off by +/- threadCount workitems.
112 //
113 // The exception is that the thread that reported to us last time definitely wasn't running any work
114 // at that time, and the thread that's reporting now definitely isn't running a work item now. So
115 // we really only need to consider threadCount-1 threads.
116 //
117 // Thus the percent error in our count is +/- (threadCount-1)/numCompletions.
118 //
119 // We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because
120 // of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction,
121 // then the next one likely will be too. The one after that will include the sum of the completions
122 // we missed in the previous samples, and so will be 33% positive. So every three samples we'll have
123 // two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency
124 // range we're targeting, which will not be filtered by the frequency-domain translation.
125 //
126 if (m_totalSamples > 0 && ((currentThreadCount-1.0) / numCompletions) >= m_maxSampleError)
127 {
128 // not accurate enough yet. Let's accumulate the data so far, and tell the ThreadPool
129 // to collect a little more.
130 m_accumulatedSampleDuration = sampleDuration;
131 m_accumulatedCompletionCount = numCompletions;
132 *pNewSampleInterval = 10;
133 return currentThreadCount;
134 }
135
136 //
137 // We've got enouugh data for our sample; reset our accumulators for next time.
138 //
139 m_accumulatedSampleDuration = 0;
140 m_accumulatedCompletionCount = 0;
141
142 //
143 // Add the current thread count and throughput sample to our history
144 //
145 double throughput = (double)numCompletions / sampleDuration;
146 FireEtwThreadPoolWorkerThreadAdjustmentSample(throughput, GetClrInstanceId());
147
148 int sampleIndex = m_totalSamples % m_samplesToMeasure;
149 m_samples[sampleIndex] = throughput;
150 m_threadCounts[sampleIndex] = currentThreadCount;
151 m_totalSamples++;
152
153 //
154 // Set up defaults for our metrics
155 //
156 Complex threadWaveComponent = 0;
157 Complex throughputWaveComponent = 0;
158 double throughputErrorEstimate = 0;
159 Complex ratio = 0;
160 double confidence = 0;
161
162 HillClimbingStateTransition transition = Warmup;
163
164 //
165 // How many samples will we use? It must be at least the three wave periods we're looking for, and it must also be a whole
166 // multiple of the primary wave's period; otherwise the frequency we're looking for will fall between two frequency bands
167 // in the Fourier analysis, and we won't be able to measure it accurately.
168 //
169 int sampleCount = ((int)min(m_totalSamples-1, m_samplesToMeasure) / m_wavePeriod) * m_wavePeriod;
170
171 if (sampleCount > m_wavePeriod)
172 {
173 //
174 // Average the throughput and thread count samples, so we can scale the wave magnitudes later.
175 //
176 double sampleSum = 0;
177 double threadSum = 0;
178 for (int i = 0; i < sampleCount; i++)
179 {
180 sampleSum += m_samples[(m_totalSamples - sampleCount + i) % m_samplesToMeasure];
181 threadSum += m_threadCounts[(m_totalSamples - sampleCount + i) % m_samplesToMeasure];
182 }
183 double averageThroughput = sampleSum / sampleCount;
184 double averageThreadCount = threadSum / sampleCount;
185
186 if (averageThroughput > 0 && averageThreadCount > 0)
187 {
188 //
189 // Calculate the periods of the adjacent frequency bands we'll be using to measure noise levels.
190 // We want the two adjacent Fourier frequency bands.
191 //
192 double adjacentPeriod1 = sampleCount / (((double)sampleCount / (double)m_wavePeriod) + 1);
193 double adjacentPeriod2 = sampleCount / (((double)sampleCount / (double)m_wavePeriod) - 1);
194
195 //
196 // Get the the three different frequency components of the throughput (scaled by average
197 // throughput). Our "error" estimate (the amount of noise that might be present in the
198 // frequency band we're really interested in) is the average of the adjacent bands.
199 //
200 throughputWaveComponent = GetWaveComponent(m_samples, sampleCount, m_wavePeriod) / averageThroughput;
201 throughputErrorEstimate = abs(GetWaveComponent(m_samples, sampleCount, adjacentPeriod1) / averageThroughput);
202 if (adjacentPeriod2 <= sampleCount)
203 throughputErrorEstimate = max(throughputErrorEstimate, abs(GetWaveComponent(m_samples, sampleCount, adjacentPeriod2) / averageThroughput));
204
205 //
206 // Do the same for the thread counts, so we have something to compare to. We don't measure thread count
207 // noise, because there is none; these are exact measurements.
208 //
209 threadWaveComponent = GetWaveComponent(m_threadCounts, sampleCount, m_wavePeriod) / averageThreadCount;
210
211 //
212 // Update our moving average of the throughput noise. We'll use this later as feedback to
213 // determine the new size of the thread wave.
214 //
215 if (m_averageThroughputNoise == 0)
216 m_averageThroughputNoise = throughputErrorEstimate;
217 else
218 m_averageThroughputNoise = (m_throughputErrorSmoothingFactor * throughputErrorEstimate) + ((1.0-m_throughputErrorSmoothingFactor) * m_averageThroughputNoise);
219
220 if (abs(threadWaveComponent) > 0)
221 {
222 //
223 // Adjust the throughput wave so it's centered around the target wave, and then calculate the adjusted throughput/thread ratio.
224 //
225 ratio = (throughputWaveComponent - (m_targetThroughputRatio * threadWaveComponent)) / threadWaveComponent;
226 transition = ClimbingMove;
227 }
228 else
229 {
230 ratio = 0;
231 transition = Stabilizing;
232 }
233
234 //
235 // Calculate how confident we are in the ratio. More noise == less confident. This has
236 // the effect of slowing down movements that might be affected by random noise.
237 //
238 double noiseForConfidence = max(m_averageThroughputNoise, throughputErrorEstimate);
239 if (noiseForConfidence > 0)
240 confidence = (abs(threadWaveComponent) / noiseForConfidence) / m_targetSignalToNoiseRatio;
241 else
242 confidence = 1.0; //there is no noise!
243
244 }
245 }
246
247 //
248 // We use just the real part of the complex ratio we just calculated. If the throughput signal
249 // is exactly in phase with the thread signal, this will be the same as taking the magnitude of
250 // the complex move and moving that far up. If they're 180 degrees out of phase, we'll move
251 // backward (because this indicates that our changes are having the opposite of the intended effect).
252 // If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're
253 // having a negative or positive effect on throughput.
254 //
255 double move = min(1.0, max(-1.0, ratio.r));
256
257 //
258 // Apply our confidence multiplier.
259 //
260 move *= min(1.0, max(0.0, confidence));
261
262 //
263 // Now apply non-linear gain, such that values around zero are attenuated, while higher values
264 // are enhanced. This allows us to move quickly if we're far away from the target, but more slowly
265 // if we're getting close, giving us rapid ramp-up without wild oscillations around the target.
266 //
267 double gain = m_maxChangePerSecond * sampleDuration;
268 move = pow(fabs(move), m_gainExponent) * (move >= 0.0 ? 1 : -1) * gain;
269 move = min(move, m_maxChangePerSample);
270
271 //
272 // If the result was positive, and CPU is > 95%, refuse the move.
273 //
274 if (move > 0.0 && ThreadpoolMgr::cpuUtilization > CpuUtilizationHigh)
275 move = 0.0;
276
277 //
278 // Apply the move to our control setting
279 //
280 m_currentControlSetting += move;
281
282 //
283 // Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of
284 // the throughput error. This average starts at zero, so we'll start with a nice safe little wave at first.
285 //
286 int newThreadWaveMagnitude = (int)(0.5 + (m_currentControlSetting * m_averageThroughputNoise * m_targetSignalToNoiseRatio * m_threadMagnitudeMultiplier * 2.0));
287 newThreadWaveMagnitude = min(newThreadWaveMagnitude, m_maxThreadWaveMagnitude);
288 newThreadWaveMagnitude = max(newThreadWaveMagnitude, 1);
289
290 //
291 // Make sure our control setting is within the ThreadPool's limits
292 //
293 m_currentControlSetting = min(ThreadpoolMgr::MaxLimitTotalWorkerThreads-newThreadWaveMagnitude, m_currentControlSetting);
294 m_currentControlSetting = max(ThreadpoolMgr::MinLimitTotalWorkerThreads, m_currentControlSetting);
295
296 //
297 // Calculate the new thread count (control setting + square wave)
298 //
299 int newThreadCount = (int)(m_currentControlSetting + newThreadWaveMagnitude * ((m_totalSamples / (m_wavePeriod/2)) % 2));
300
301 //
302 // Make sure the new thread count doesn't exceed the ThreadPool's limits
303 //
304 newThreadCount = min(ThreadpoolMgr::MaxLimitTotalWorkerThreads, newThreadCount);
305 newThreadCount = max(ThreadpoolMgr::MinLimitTotalWorkerThreads, newThreadCount);
306
307 //
308 // Record these numbers for posterity
309 //
310 FireEtwThreadPoolWorkerThreadAdjustmentStats(
311 sampleDuration,
312 throughput,
313 threadWaveComponent.r,
314 throughputWaveComponent.r,
315 throughputErrorEstimate,
316 m_averageThroughputNoise,
317 ratio.r,
318 confidence,
319 m_currentControlSetting,
320 (unsigned short)newThreadWaveMagnitude,
321 GetClrInstanceId());
322
323 //
324 // If all of this caused an actual change in thread count, log that as well.
325 //
326 if (newThreadCount != currentThreadCount)
327 ChangeThreadCount(newThreadCount, transition);
328
329 //
330 // Return the new thread count and sample interval. This is randomized to prevent correlations with other periodic
331 // changes in throughput. Among other things, this prevents us from getting confused by Hill Climbing instances
332 // running in other processes.
333 //
334 // If we're at minThreads, and we seem to be hurting performance by going higher, we can't go any lower to fix this. So
335 // we'll simply stay at minThreads much longer, and only occasionally try a higher value.
336 //
337 if (ratio.r < 0.0 && newThreadCount == ThreadpoolMgr::MinLimitTotalWorkerThreads)
338 *pNewSampleInterval = (int)(0.5 + m_currentSampleInterval * (10.0 * min(-ratio.r, 1.0)));
339 else
340 *pNewSampleInterval = m_currentSampleInterval;
341
342 return newThreadCount;
343
344#endif //DACCESS_COMPILE
345}
346
347
348void HillClimbing::ForceChange(int newThreadCount, HillClimbingStateTransition transition)
349{
350 LIMITED_METHOD_CONTRACT;
351
352 if (newThreadCount != m_lastThreadCount)
353 {
354 m_currentControlSetting += (newThreadCount - m_lastThreadCount);
355 ChangeThreadCount(newThreadCount, transition);
356 }
357}
358
359
360void HillClimbing::ChangeThreadCount(int newThreadCount, HillClimbingStateTransition transition)
361{
362 LIMITED_METHOD_CONTRACT;
363
364 m_lastThreadCount = newThreadCount;
365 m_currentSampleInterval = m_randomIntervalGenerator.Next(m_sampleIntervalLow, m_sampleIntervalHigh+1);
366 double throughput = (m_elapsedSinceLastChange > 0) ? (m_completionsSinceLastChange / m_elapsedSinceLastChange) : 0;
367 LogTransition(newThreadCount, throughput, transition);
368 m_elapsedSinceLastChange = 0;
369 m_completionsSinceLastChange = 0;
370}
371
372
373GARY_IMPL(HillClimbingLogEntry, HillClimbingLog, HillClimbingLogCapacity);
374GVAL_IMPL(int, HillClimbingLogFirstIndex);
375GVAL_IMPL(int, HillClimbingLogSize);
376
377
378void HillClimbing::LogTransition(int threadCount, double throughput, HillClimbingStateTransition transition)
379{
380 LIMITED_METHOD_CONTRACT;
381
382#ifndef DACCESS_COMPILE
383 int index = (HillClimbingLogFirstIndex + HillClimbingLogSize) % HillClimbingLogCapacity;
384
385 if (HillClimbingLogSize == HillClimbingLogCapacity)
386 {
387 HillClimbingLogFirstIndex = (HillClimbingLogFirstIndex + 1) % HillClimbingLogCapacity;
388 HillClimbingLogSize--; //hide this slot while we update it
389 }
390
391 HillClimbingLogEntry* entry = &HillClimbingLog[index];
392
393 entry->TickCount = GetTickCount();
394 entry->Transition = transition;
395 entry->NewControlSetting = threadCount;
396
397 entry->LastHistoryCount = (int)(min(m_totalSamples, m_samplesToMeasure) / m_wavePeriod) * m_wavePeriod;
398 entry->LastHistoryMean = (float) throughput;
399
400 HillClimbingLogSize++;
401
402 FireEtwThreadPoolWorkerThreadAdjustmentAdjustment(
403 throughput,
404 threadCount,
405 transition,
406 GetClrInstanceId());
407
408#endif //DACCESS_COMPILE
409}
410
411Complex HillClimbing::GetWaveComponent(double* samples, int sampleCount, double period)
412{
413 LIMITED_METHOD_CONTRACT;
414
415 _ASSERTE(sampleCount >= period); //can't measure a wave that doesn't fit
416 _ASSERTE(period >= 2); //can't measure above the Nyquist frequency
417
418 //
419 // Calculate the sinusoid with the given period.
420 // We're using the Goertzel algorithm for this. See http://en.wikipedia.org/wiki/Goertzel_algorithm.
421 //
422 double w = 2.0 * pi / period;
423 double cosine = cos(w);
424 double sine = sin(w);
425 double coeff = 2.0 * cosine;
426 double q0 = 0, q1 = 0, q2 = 0;
427
428 for (int i = 0; i < sampleCount; i++)
429 {
430 double sample = samples[(m_totalSamples - sampleCount + i) % m_samplesToMeasure];
431
432 q0 = coeff * q1 - q2 + sample;
433 q2 = q1;
434 q1 = q0;
435 }
436
437 return Complex(q1 - q2 * cosine, q2 * sine) / (double)sampleCount;
438}
439
440