1// Licensed to the .NET Foundation under one or more agreements.
2// The .NET Foundation licenses this file to you under the MIT license.
3// See the LICENSE file in the project root for more information.
4
5
6/*++
7
8Module Name:
9
10 Win32ThreadPool.cpp
11
12Abstract:
13
14 This module implements Threadpool support using Win32 APIs
15
16
17Revision History:
18 December 1999 - Created
19
20--*/
21
22#include "common.h"
23#include "log.h"
24#include "threadpoolrequest.h"
25#include "win32threadpool.h"
26#include "delegateinfo.h"
27#include "eeconfig.h"
28#include "dbginterface.h"
29#include "corhost.h"
30#include "eventtrace.h"
31#include "threads.h"
32#include "appdomain.inl"
33#include "nativeoverlapped.h"
34#include "hillclimbing.h"
35#include "configuration.h"
36
37
38#ifndef FEATURE_PAL
39#ifndef DACCESS_COMPILE
40
41// APIs that must be accessed through dynamic linking.
42typedef int (WINAPI *NtQueryInformationThreadProc) (
43 HANDLE ThreadHandle,
44 THREADINFOCLASS ThreadInformationClass,
45 PVOID ThreadInformation,
46 ULONG ThreadInformationLength,
47 PULONG ReturnLength);
48NtQueryInformationThreadProc g_pufnNtQueryInformationThread = NULL;
49
50typedef int (WINAPI *NtQuerySystemInformationProc) (
51 SYSTEM_INFORMATION_CLASS SystemInformationClass,
52 PVOID SystemInformation,
53 ULONG SystemInformationLength,
54 PULONG ReturnLength OPTIONAL);
55NtQuerySystemInformationProc g_pufnNtQuerySystemInformation = NULL;
56
57typedef HANDLE (WINAPI * CreateWaitableTimerExProc) (
58 LPSECURITY_ATTRIBUTES lpTimerAttributes,
59 LPCTSTR lpTimerName,
60 DWORD dwFlags,
61 DWORD dwDesiredAccess);
62CreateWaitableTimerExProc g_pufnCreateWaitableTimerEx = NULL;
63
64typedef BOOL (WINAPI * SetWaitableTimerExProc) (
65 HANDLE hTimer,
66 const LARGE_INTEGER *lpDueTime,
67 LONG lPeriod,
68 PTIMERAPCROUTINE pfnCompletionRoutine,
69 LPVOID lpArgToCompletionRoutine,
70 void* WakeContext, //should be PREASON_CONTEXT, but it's not defined for us (and we don't use it)
71 ULONG TolerableDelay);
72SetWaitableTimerExProc g_pufnSetWaitableTimerEx = NULL;
73
74#endif // !DACCESS_COMPILE
75#endif // !FEATURE_PAL
76
77BOOL ThreadpoolMgr::InitCompletionPortThreadpool = FALSE;
78HANDLE ThreadpoolMgr::GlobalCompletionPort; // used for binding io completions on file handles
79
80SVAL_IMPL(ThreadpoolMgr::ThreadCounter,ThreadpoolMgr,CPThreadCounter);
81
82SVAL_IMPL_INIT(LONG,ThreadpoolMgr,MaxLimitTotalCPThreads,1000); // = MaxLimitCPThreadsPerCPU * number of CPUS
83SVAL_IMPL(LONG,ThreadpoolMgr,MinLimitTotalCPThreads);
84SVAL_IMPL(LONG,ThreadpoolMgr,MaxFreeCPThreads); // = MaxFreeCPThreadsPerCPU * Number of CPUS
85
86Volatile<LONG> ThreadpoolMgr::NumCPInfrastructureThreads = 0; // number of threads currently busy handling draining cycle
87
88// Cacheline aligned, hot variable
89DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) SVAL_IMPL(ThreadpoolMgr::ThreadCounter, ThreadpoolMgr, WorkerCounter);
90
91SVAL_IMPL(LONG,ThreadpoolMgr,MinLimitTotalWorkerThreads); // = MaxLimitCPThreadsPerCPU * number of CPUS
92SVAL_IMPL(LONG,ThreadpoolMgr,MaxLimitTotalWorkerThreads); // = MaxLimitCPThreadsPerCPU * number of CPUS
93
94SVAL_IMPL(LONG,ThreadpoolMgr,cpuUtilization);
95LONG ThreadpoolMgr::cpuUtilizationAverage = 0;
96
97HillClimbing ThreadpoolMgr::HillClimbingInstance;
98
99// Cacheline aligned, 3 hot variables updated in a group
100DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) LONG ThreadpoolMgr::PriorCompletedWorkRequests = 0;
101DWORD ThreadpoolMgr::PriorCompletedWorkRequestsTime;
102DWORD ThreadpoolMgr::NextCompletedWorkRequestsTime;
103
104LARGE_INTEGER ThreadpoolMgr::CurrentSampleStartTime;
105
106unsigned int ThreadpoolMgr::WorkerThreadSpinLimit;
107bool ThreadpoolMgr::IsHillClimbingDisabled;
108int ThreadpoolMgr::ThreadAdjustmentInterval;
109
110#define INVALID_HANDLE ((HANDLE) -1)
111#define NEW_THREAD_THRESHOLD 7 // Number of requests outstanding before we start a new thread
112#define CP_THREAD_PENDINGIO_WAIT 5000 // polling interval when thread is retired but has a pending io
113#define GATE_THREAD_DELAY 500 /*milliseconds*/
114#define GATE_THREAD_DELAY_TOLERANCE 50 /*milliseconds*/
115#define DELAY_BETWEEN_SUSPENDS 5000 + GATE_THREAD_DELAY // time to delay between suspensions
116#define SUSPEND_TIME GATE_THREAD_DELAY+100 // milliseconds to suspend during SuspendProcessing
117
118LONG ThreadpoolMgr::Initialization=0; // indicator of whether the threadpool is initialized.
119
120// Cacheline aligned, hot variable
121DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) unsigned int ThreadpoolMgr::LastDequeueTime; // used to determine if work items are getting thread starved
122
123// Move out of from preceeding variables' cache line
124DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) int ThreadpoolMgr::offset_counter = 0;
125
126SPTR_IMPL(WorkRequest,ThreadpoolMgr,WorkRequestHead); // Head of work request queue
127SPTR_IMPL(WorkRequest,ThreadpoolMgr,WorkRequestTail); // Head of work request queue
128
129SVAL_IMPL(ThreadpoolMgr::LIST_ENTRY,ThreadpoolMgr,TimerQueue); // queue of timers
130
131//unsigned int ThreadpoolMgr::LastCpuSamplingTime=0; // last time cpu utilization was sampled by gate thread
132unsigned int ThreadpoolMgr::LastCPThreadCreation=0; // last time a completion port thread was created
133unsigned int ThreadpoolMgr::NumberOfProcessors; // = NumberOfWorkerThreads - no. of blocked threads
134
135
136CrstStatic ThreadpoolMgr::WorkerCriticalSection;
137CLREvent * ThreadpoolMgr::RetiredCPWakeupEvent; // wakeup event for completion port threads
138CrstStatic ThreadpoolMgr::WaitThreadsCriticalSection;
139ThreadpoolMgr::LIST_ENTRY ThreadpoolMgr::WaitThreadsHead;
140
141CLRLifoSemaphore* ThreadpoolMgr::WorkerSemaphore;
142CLRLifoSemaphore* ThreadpoolMgr::RetiredWorkerSemaphore;
143
144CrstStatic ThreadpoolMgr::TimerQueueCriticalSection;
145HANDLE ThreadpoolMgr::TimerThread=NULL;
146Thread *ThreadpoolMgr::pTimerThread=NULL;
147
148// Cacheline aligned, hot variable
149DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) DWORD ThreadpoolMgr::LastTickCount;
150
151#ifdef _DEBUG
152DWORD ThreadpoolMgr::TickCountAdjustment=0;
153#endif
154
155// Cacheline aligned, hot variable
156DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) LONG ThreadpoolMgr::GateThreadStatus=GATE_THREAD_STATUS_NOT_RUNNING;
157
158// Move out of from preceeding variables' cache line
159DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) ThreadpoolMgr::RecycledListsWrapper ThreadpoolMgr::RecycledLists;
160
161ThreadpoolMgr::TimerInfo *ThreadpoolMgr::TimerInfosToBeRecycled = NULL;
162
163BOOL ThreadpoolMgr::IsApcPendingOnWaitThread = FALSE;
164
165#ifndef DACCESS_COMPILE
166
167// Macros for inserting/deleting from doubly linked list
168
169#define InitializeListHead(ListHead) (\
170 (ListHead)->Flink = (ListHead)->Blink = (ListHead))
171
172//
173// these are named the same as slightly different macros in the NT headers
174//
175#undef RemoveHeadList
176#undef RemoveEntryList
177#undef InsertTailList
178#undef InsertHeadList
179
180#define RemoveHeadList(ListHead,FirstEntry) \
181 {\
182 FirstEntry = (LIST_ENTRY*) (ListHead)->Flink;\
183 ((LIST_ENTRY*)FirstEntry->Flink)->Blink = (ListHead);\
184 (ListHead)->Flink = FirstEntry->Flink;\
185 }
186
187#define RemoveEntryList(Entry) {\
188 LIST_ENTRY* _EX_Entry;\
189 _EX_Entry = (Entry);\
190 ((LIST_ENTRY*) _EX_Entry->Blink)->Flink = _EX_Entry->Flink;\
191 ((LIST_ENTRY*) _EX_Entry->Flink)->Blink = _EX_Entry->Blink;\
192 }
193
194#define InsertTailList(ListHead,Entry) \
195 (Entry)->Flink = (ListHead);\
196 (Entry)->Blink = (ListHead)->Blink;\
197 ((LIST_ENTRY*)(ListHead)->Blink)->Flink = (Entry);\
198 (ListHead)->Blink = (Entry);
199
200#define InsertHeadList(ListHead,Entry) {\
201 LIST_ENTRY* _EX_Flink;\
202 LIST_ENTRY* _EX_ListHead;\
203 _EX_ListHead = (LIST_ENTRY*)(ListHead);\
204 _EX_Flink = (LIST_ENTRY*) _EX_ListHead->Flink;\
205 (Entry)->Flink = _EX_Flink;\
206 (Entry)->Blink = _EX_ListHead;\
207 _EX_Flink->Blink = (Entry);\
208 _EX_ListHead->Flink = (Entry);\
209 }
210
211#define IsListEmpty(ListHead) \
212 ((ListHead)->Flink == (ListHead))
213
214#define SetLastHRError(hr) \
215 if (HRESULT_FACILITY(hr) == FACILITY_WIN32)\
216 SetLastError(HRESULT_CODE(hr));\
217 else \
218 SetLastError(ERROR_INVALID_DATA);\
219
220/************************************************************************/
221
222void ThreadpoolMgr::RecycledListsWrapper::Initialize( unsigned int numProcs )
223{
224 CONTRACTL
225 {
226 THROWS;
227 MODE_ANY;
228 GC_NOTRIGGER;
229 }
230 CONTRACTL_END;
231
232 pRecycledListPerProcessor = new RecycledListInfo[numProcs][MEMTYPE_COUNT];
233}
234
235//--//
236
237void ThreadpoolMgr::EnsureInitialized()
238{
239 CONTRACTL
240 {
241 THROWS; // Initialize can throw
242 MODE_ANY;
243 GC_NOTRIGGER;
244 }
245 CONTRACTL_END;
246
247 if (IsInitialized())
248 return;
249
250 DWORD dwSwitchCount = 0;
251
252retry:
253 if (InterlockedCompareExchange(&Initialization, 1, 0) == 0)
254 {
255 if (Initialize())
256 Initialization = -1;
257 else
258 {
259 Initialization = 0;
260 COMPlusThrowOM();
261 }
262 }
263 else // someone has already begun initializing.
264 {
265 // wait until it finishes
266 while (Initialization != -1)
267 {
268 __SwitchToThread(0, ++dwSwitchCount);
269 goto retry;
270 }
271 }
272}
273
274DWORD GetDefaultMaxLimitWorkerThreads(DWORD minLimit)
275{
276 CONTRACTL
277 {
278 MODE_ANY;
279 GC_NOTRIGGER;
280 NOTHROW;
281 }
282 CONTRACTL_END;
283
284 //
285 // We determine the max limit for worker threads as follows:
286 //
287 // 1) It must be at least MinLimitTotalWorkerThreads
288 // 2) It must be no greater than (half the virtual address space)/(thread stack size)
289 // 3) It must be <= MaxPossibleWorkerThreads
290 //
291 // TODO: what about CP threads? Can they follow a similar plan? How do we allocate
292 // thread counts between the two kinds of threads?
293 //
294 SIZE_T stackReserveSize = 0;
295 Thread::GetProcessDefaultStackSize(&stackReserveSize, NULL);
296
297 ULONGLONG halfVirtualAddressSpace;
298
299 MEMORYSTATUSEX memStats;
300 memStats.dwLength = sizeof(memStats);
301 if (GlobalMemoryStatusEx(&memStats))
302 {
303 halfVirtualAddressSpace = memStats.ullTotalVirtual / 2;
304 }
305 else
306 {
307 //assume the normal Win32 32-bit virtual address space
308 halfVirtualAddressSpace = 0x000000007FFE0000ull / 2;
309 }
310
311 ULONGLONG limit = halfVirtualAddressSpace / stackReserveSize;
312 limit = max(limit, (ULONGLONG)minLimit);
313 limit = min(limit, (ULONGLONG)ThreadpoolMgr::ThreadCounter::MaxPossibleCount);
314
315 _ASSERTE(FitsIn<DWORD>(limit));
316 return (DWORD)limit;
317}
318
319DWORD GetForceMinWorkerThreadsValue()
320{
321 WRAPPER_NO_CONTRACT;
322 return Configuration::GetKnobDWORDValue(W("System.Threading.ThreadPool.MinThreads"), CLRConfig::INTERNAL_ThreadPool_ForceMinWorkerThreads);
323}
324
325DWORD GetForceMaxWorkerThreadsValue()
326{
327 WRAPPER_NO_CONTRACT;
328 return Configuration::GetKnobDWORDValue(W("System.Threading.ThreadPool.MaxThreads"), CLRConfig::INTERNAL_ThreadPool_ForceMaxWorkerThreads);
329}
330
331BOOL ThreadpoolMgr::Initialize()
332{
333 CONTRACTL
334 {
335 THROWS;
336 MODE_ANY;
337 GC_NOTRIGGER;
338 INJECT_FAULT(COMPlusThrowOM());
339 }
340 CONTRACTL_END;
341
342 BOOL bRet = FALSE;
343 BOOL bExceptionCaught = FALSE;
344
345 UnManagedPerAppDomainTPCount* pADTPCount;
346 pADTPCount = PerAppDomainTPCountList::GetUnmanagedTPCount();
347
348 //ThreadPool_CPUGroup
349 CPUGroupInfo::EnsureInitialized();
350 if (CPUGroupInfo::CanEnableGCCPUGroups() && CPUGroupInfo::CanEnableThreadUseAllCpuGroups())
351 NumberOfProcessors = CPUGroupInfo::GetNumActiveProcessors();
352 else
353 NumberOfProcessors = GetCurrentProcessCpuCount();
354 InitPlatformVariables();
355
356 EX_TRY
357 {
358 WorkerThreadSpinLimit = CLRConfig::GetConfigValue(CLRConfig::INTERNAL_ThreadPool_UnfairSemaphoreSpinLimit);
359 IsHillClimbingDisabled = CLRConfig::GetConfigValue(CLRConfig::INTERNAL_HillClimbing_Disable) != 0;
360 ThreadAdjustmentInterval = CLRConfig::GetConfigValue(CLRConfig::INTERNAL_HillClimbing_SampleIntervalLow);
361
362 pADTPCount->InitResources();
363 WorkerCriticalSection.Init(CrstThreadpoolWorker);
364 WaitThreadsCriticalSection.Init(CrstThreadpoolWaitThreads);
365 TimerQueueCriticalSection.Init(CrstThreadpoolTimerQueue);
366
367 // initialize WaitThreadsHead
368 InitializeListHead(&WaitThreadsHead);
369
370 // initialize TimerQueue
371 InitializeListHead(&TimerQueue);
372
373 RetiredCPWakeupEvent = new CLREvent();
374 RetiredCPWakeupEvent->CreateAutoEvent(FALSE);
375 _ASSERTE(RetiredCPWakeupEvent->IsValid());
376
377 WorkerSemaphore = new CLRLifoSemaphore();
378 WorkerSemaphore->Create(0, ThreadCounter::MaxPossibleCount);
379
380 RetiredWorkerSemaphore = new CLRLifoSemaphore();
381 RetiredWorkerSemaphore->Create(0, ThreadCounter::MaxPossibleCount);
382
383 //ThreadPool_CPUGroup
384 if (CPUGroupInfo::CanEnableGCCPUGroups() && CPUGroupInfo::CanEnableThreadUseAllCpuGroups())
385 RecycledLists.Initialize( CPUGroupInfo::GetNumActiveProcessors() );
386 else
387 RecycledLists.Initialize( g_SystemInfo.dwNumberOfProcessors );
388 /*
389 {
390 SYSTEM_INFO sysInfo;
391
392 ::GetSystemInfo( &sysInfo );
393
394 RecycledLists.Initialize( sysInfo.dwNumberOfProcessors );
395 }
396 */
397 }
398 EX_CATCH
399 {
400 pADTPCount->CleanupResources();
401
402 if (RetiredCPWakeupEvent)
403 {
404 delete RetiredCPWakeupEvent;
405 RetiredCPWakeupEvent = NULL;
406 }
407
408 // Note: It is fine to call Destroy on unitialized critical sections
409 WorkerCriticalSection.Destroy();
410 WaitThreadsCriticalSection.Destroy();
411 TimerQueueCriticalSection.Destroy();
412
413 bExceptionCaught = TRUE;
414 }
415 EX_END_CATCH(SwallowAllExceptions);
416
417 if (bExceptionCaught)
418 {
419 goto end;
420 }
421
422 // initialize Worker and CP thread settings
423 DWORD forceMin;
424 forceMin = GetForceMinWorkerThreadsValue();
425 MinLimitTotalWorkerThreads = forceMin > 0 ? (LONG)forceMin : (LONG)NumberOfProcessors;
426
427 DWORD forceMax;
428 forceMax = GetForceMaxWorkerThreadsValue();
429 MaxLimitTotalWorkerThreads = forceMax > 0 ? (LONG)forceMax : (LONG)GetDefaultMaxLimitWorkerThreads(MinLimitTotalWorkerThreads);
430
431 ThreadCounter::Counts counts;
432 counts.NumActive = 0;
433 counts.NumWorking = 0;
434 counts.NumRetired = 0;
435 counts.MaxWorking = MinLimitTotalWorkerThreads;
436 WorkerCounter.counts.AsLongLong = counts.AsLongLong;
437
438#ifdef _DEBUG
439 TickCountAdjustment = CLRConfig::GetConfigValue(CLRConfig::INTERNAL_ThreadpoolTickCountAdjustment);
440#endif
441
442 // initialize CP thread settings
443 MinLimitTotalCPThreads = NumberOfProcessors;
444
445 // Use volatile store to guarantee make the value visible to the DAC (the store can be optimized out otherwise)
446 VolatileStoreWithoutBarrier<LONG>(&MaxFreeCPThreads, NumberOfProcessors*MaxFreeCPThreadsPerCPU);
447
448 counts.NumActive = 0;
449 counts.NumWorking = 0;
450 counts.NumRetired = 0;
451 counts.MaxWorking = MinLimitTotalCPThreads;
452 CPThreadCounter.counts.AsLongLong = counts.AsLongLong;
453
454#ifndef FEATURE_PAL
455 {
456 GlobalCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
457 NULL,
458 0, /*ignored for invalid handle value*/
459 NumberOfProcessors);
460 }
461#endif // !FEATURE_PAL
462
463 HillClimbingInstance.Initialize();
464
465 bRet = TRUE;
466end:
467 return bRet;
468}
469
470void ThreadpoolMgr::InitPlatformVariables()
471{
472 CONTRACTL
473 {
474 NOTHROW;
475 MODE_ANY;
476 GC_NOTRIGGER;
477 }
478 CONTRACTL_END;
479
480#ifndef FEATURE_PAL
481 HINSTANCE hNtDll;
482 HINSTANCE hCoreSynch;
483 {
484 CONTRACT_VIOLATION(GCViolation|FaultViolation);
485 hNtDll = CLRLoadLibrary(W("ntdll.dll"));
486 _ASSERTE(hNtDll);
487#ifdef FEATURE_CORESYSTEM
488 hCoreSynch = CLRLoadLibrary(W("api-ms-win-core-synch-l1-1-0.dll"));
489#else
490 hCoreSynch = CLRLoadLibrary(W("kernel32.dll"));
491#endif
492 _ASSERTE(hCoreSynch);
493 }
494
495 // These APIs must be accessed via dynamic binding since they may be removed in future
496 // OS versions.
497 g_pufnNtQueryInformationThread = (NtQueryInformationThreadProc)GetProcAddress(hNtDll,"NtQueryInformationThread");
498 g_pufnNtQuerySystemInformation = (NtQuerySystemInformationProc)GetProcAddress(hNtDll,"NtQuerySystemInformation");
499
500
501 // These APIs are only supported on newer Windows versions
502 g_pufnCreateWaitableTimerEx = (CreateWaitableTimerExProc)GetProcAddress(hCoreSynch, "CreateWaitableTimerExW");
503 g_pufnSetWaitableTimerEx = (SetWaitableTimerExProc)GetProcAddress(hCoreSynch, "SetWaitableTimerEx");
504#endif
505}
506
507BOOL ThreadpoolMgr::SetMaxThreadsHelper(DWORD MaxWorkerThreads,
508 DWORD MaxIOCompletionThreads)
509{
510 CONTRACTL
511 {
512 THROWS; // Crst can throw and toggle GC mode
513 MODE_ANY;
514 GC_TRIGGERS;
515 }
516 CONTRACTL_END;
517
518 BOOL result = FALSE;
519
520 // doesn't need to be WorkerCS, but using it to avoid race condition between setting min and max, and didn't want to create a new CS.
521 CrstHolder csh(&WorkerCriticalSection);
522
523 if (MaxWorkerThreads >= (DWORD)MinLimitTotalWorkerThreads &&
524 MaxIOCompletionThreads >= (DWORD)MinLimitTotalCPThreads &&
525 MaxWorkerThreads != 0 &&
526 MaxIOCompletionThreads != 0)
527 {
528 BEGIN_SO_INTOLERANT_CODE(GetThread());
529
530 if (GetForceMaxWorkerThreadsValue() == 0)
531 {
532 MaxLimitTotalWorkerThreads = min(MaxWorkerThreads, (DWORD)ThreadCounter::MaxPossibleCount);
533
534 ThreadCounter::Counts counts = WorkerCounter.GetCleanCounts();
535 while (counts.MaxWorking > MaxLimitTotalWorkerThreads)
536 {
537 ThreadCounter::Counts newCounts = counts;
538 newCounts.MaxWorking = MaxLimitTotalWorkerThreads;
539
540 ThreadCounter::Counts oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts);
541 if (oldCounts == counts)
542 counts = newCounts;
543 else
544 counts = oldCounts;
545 }
546 }
547
548 END_SO_INTOLERANT_CODE;
549
550 MaxLimitTotalCPThreads = min(MaxIOCompletionThreads, (DWORD)ThreadCounter::MaxPossibleCount);
551
552 result = TRUE;
553 }
554
555 return result;
556 }
557
558/************************************************************************/
559BOOL ThreadpoolMgr::SetMaxThreads(DWORD MaxWorkerThreads,
560 DWORD MaxIOCompletionThreads)
561{
562 CONTRACTL
563 {
564 THROWS; // SetMaxThreadsHelper can throw and toggle GC mode
565 MODE_ANY;
566 GC_TRIGGERS;
567 }
568 CONTRACTL_END;
569
570 EnsureInitialized();
571
572 return SetMaxThreadsHelper(MaxWorkerThreads, MaxIOCompletionThreads);
573}
574
575BOOL ThreadpoolMgr::GetMaxThreads(DWORD* MaxWorkerThreads,
576 DWORD* MaxIOCompletionThreads)
577{
578 LIMITED_METHOD_CONTRACT;
579
580
581 if (!MaxWorkerThreads || !MaxIOCompletionThreads)
582 {
583 SetLastHRError(ERROR_INVALID_DATA);
584 return FALSE;
585 }
586
587 EnsureInitialized();
588
589 *MaxWorkerThreads = (DWORD)MaxLimitTotalWorkerThreads;
590 *MaxIOCompletionThreads = MaxLimitTotalCPThreads;
591 return TRUE;
592}
593
594BOOL ThreadpoolMgr::SetMinThreads(DWORD MinWorkerThreads,
595 DWORD MinIOCompletionThreads)
596{
597 CONTRACTL
598 {
599 THROWS; // Crst can throw and toggle GC mode
600 MODE_ANY;
601 GC_TRIGGERS;
602 }
603 CONTRACTL_END;
604
605 EnsureInitialized();
606
607 // doesn't need to be WorkerCS, but using it to avoid race condition between setting min and max, and didn't want to create a new CS.
608 CrstHolder csh(&WorkerCriticalSection);
609
610 BOOL init_result = FALSE;
611
612 if (MinWorkerThreads >= 0 && MinIOCompletionThreads >= 0 &&
613 MinWorkerThreads <= (DWORD) MaxLimitTotalWorkerThreads &&
614 MinIOCompletionThreads <= (DWORD) MaxLimitTotalCPThreads)
615 {
616 BEGIN_SO_INTOLERANT_CODE(GetThread());
617
618 if (GetForceMinWorkerThreadsValue() == 0)
619 {
620 MinLimitTotalWorkerThreads = max(1, min(MinWorkerThreads, (DWORD)ThreadCounter::MaxPossibleCount));
621
622 ThreadCounter::Counts counts = WorkerCounter.GetCleanCounts();
623 while (counts.MaxWorking < MinLimitTotalWorkerThreads)
624 {
625 ThreadCounter::Counts newCounts = counts;
626 newCounts.MaxWorking = MinLimitTotalWorkerThreads;
627
628 ThreadCounter::Counts oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts);
629 if (oldCounts == counts)
630 {
631 counts = newCounts;
632
633 // if we increased the limit, and there are pending workitems, we need
634 // to dispatch a thread to process the work.
635 if (newCounts.MaxWorking > oldCounts.MaxWorking &&
636 PerAppDomainTPCountList::AreRequestsPendingInAnyAppDomains())
637 {
638 MaybeAddWorkingWorker();
639 }
640 }
641 else
642 {
643 counts = oldCounts;
644 }
645 }
646 }
647
648 END_SO_INTOLERANT_CODE;
649
650 MinLimitTotalCPThreads = max(1, min(MinIOCompletionThreads, (DWORD)ThreadCounter::MaxPossibleCount));
651
652 init_result = TRUE;
653 }
654
655 return init_result;
656}
657
658BOOL ThreadpoolMgr::GetMinThreads(DWORD* MinWorkerThreads,
659 DWORD* MinIOCompletionThreads)
660{
661 LIMITED_METHOD_CONTRACT;
662
663
664 if (!MinWorkerThreads || !MinIOCompletionThreads)
665 {
666 SetLastHRError(ERROR_INVALID_DATA);
667 return FALSE;
668 }
669
670 EnsureInitialized();
671
672 *MinWorkerThreads = (DWORD)MinLimitTotalWorkerThreads;
673 *MinIOCompletionThreads = MinLimitTotalCPThreads;
674 return TRUE;
675}
676
677BOOL ThreadpoolMgr::GetAvailableThreads(DWORD* AvailableWorkerThreads,
678 DWORD* AvailableIOCompletionThreads)
679{
680 LIMITED_METHOD_CONTRACT;
681
682 if (!AvailableWorkerThreads || !AvailableIOCompletionThreads)
683 {
684 SetLastHRError(ERROR_INVALID_DATA);
685 return FALSE;
686 }
687
688 EnsureInitialized();
689
690 ThreadCounter::Counts counts = WorkerCounter.GetCleanCounts();
691
692 if (MaxLimitTotalWorkerThreads < counts.NumActive)
693 *AvailableWorkerThreads = 0;
694 else
695 *AvailableWorkerThreads = MaxLimitTotalWorkerThreads - counts.NumWorking;
696
697 counts = CPThreadCounter.GetCleanCounts();
698 if (MaxLimitTotalCPThreads < counts.NumActive)
699 *AvailableIOCompletionThreads = counts.NumActive - counts.NumWorking;
700 else
701 *AvailableIOCompletionThreads = MaxLimitTotalCPThreads - counts.NumWorking;
702 return TRUE;
703}
704
705void QueueUserWorkItemHelp(LPTHREAD_START_ROUTINE Function, PVOID Context)
706{
707 STATIC_CONTRACT_THROWS;
708 STATIC_CONTRACT_GC_TRIGGERS;
709 STATIC_CONTRACT_MODE_ANY;
710 /* Cannot use contract here because of SEH
711 CONTRACTL
712 {
713 THROWS;
714 GC_TRIGGERS;
715 MODE_ANY;
716 }
717 CONTRACTL_END;*/
718
719 Function(Context);
720
721 Thread *pThread = GetThread();
722 if (pThread) {
723 if (pThread->IsAbortRequested())
724 pThread->EEResetAbort(Thread::TAR_ALL);
725 pThread->InternalReset();
726 }
727}
728
729//
730// WorkingThreadCounts tracks the number of worker threads currently doing user work, and the maximum number of such threads
731// since the last time TakeMaxWorkingThreadCount was called. This information is for diagnostic purposes only,
732// and is tracked only if the CLR config value INTERNAL_ThreadPool_EnableWorkerTracking is non-zero (this feature is off
733// by default).
734//
735union WorkingThreadCounts
736{
737 struct
738 {
739 int currentWorking : 16;
740 int maxWorking : 16;
741 };
742
743 LONG asLong;
744};
745
746WorkingThreadCounts g_workingThreadCounts;
747
748//
749// If worker tracking is enabled (see above) then this is called immediately before and after a worker thread executes
750// each work item.
751//
752void ThreadpoolMgr::ReportThreadStatus(bool isWorking)
753{
754 CONTRACTL
755 {
756 NOTHROW;
757 GC_NOTRIGGER;
758 SO_TOLERANT;
759 MODE_ANY;
760 }
761 CONTRACTL_END;
762 _ASSERTE(CLRConfig::GetConfigValue(CLRConfig::INTERNAL_ThreadPool_EnableWorkerTracking));
763 while (true)
764 {
765 WorkingThreadCounts currentCounts, newCounts;
766 currentCounts.asLong = VolatileLoad(&g_workingThreadCounts.asLong);
767
768 newCounts = currentCounts;
769
770 if (isWorking)
771 newCounts.currentWorking++;
772
773 if (newCounts.currentWorking > newCounts.maxWorking)
774 newCounts.maxWorking = newCounts.currentWorking;
775
776 if (!isWorking)
777 newCounts.currentWorking--;
778
779 if (currentCounts.asLong == InterlockedCompareExchange(&g_workingThreadCounts.asLong, newCounts.asLong, currentCounts.asLong))
780 break;
781 }
782}
783
784//
785// Returns the max working count since the previous call to TakeMaxWorkingThreadCount, and resets WorkingThreadCounts.maxWorking.
786//
787int TakeMaxWorkingThreadCount()
788{
789 CONTRACTL
790 {
791 NOTHROW;
792 GC_NOTRIGGER;
793 SO_TOLERANT;
794 MODE_ANY;
795 }
796 CONTRACTL_END;
797 _ASSERTE(CLRConfig::GetConfigValue(CLRConfig::INTERNAL_ThreadPool_EnableWorkerTracking));
798 while (true)
799 {
800 WorkingThreadCounts currentCounts, newCounts;
801 currentCounts.asLong = VolatileLoad(&g_workingThreadCounts.asLong);
802
803 newCounts = currentCounts;
804 newCounts.maxWorking = 0;
805
806 if (currentCounts.asLong == InterlockedCompareExchange(&g_workingThreadCounts.asLong, newCounts.asLong, currentCounts.asLong))
807 {
808 // If we haven't updated the counts since the last call to TakeMaxWorkingThreadCount, then we never updated maxWorking.
809 // In that case, the number of working threads for the whole period since the last TakeMaxWorkingThreadCount is the
810 // current number of working threads.
811 return currentCounts.maxWorking == 0 ? currentCounts.currentWorking : currentCounts.maxWorking;
812 }
813 }
814}
815
816
817/************************************************************************/
818
819BOOL ThreadpoolMgr::QueueUserWorkItem(LPTHREAD_START_ROUTINE Function,
820 PVOID Context,
821 DWORD Flags,
822 BOOL UnmanagedTPRequest)
823{
824 CONTRACTL
825 {
826 THROWS; // EnsureInitialized, EnqueueWorkRequest can throw OOM
827 GC_TRIGGERS;
828 MODE_ANY;
829 }
830 CONTRACTL_END;
831
832 EnsureInitialized();
833
834
835 if (Flags == CALL_OR_QUEUE)
836 {
837 // we've been asked to call this directly if the thread pressure is not too high
838
839 int MinimumAvailableCPThreads = (NumberOfProcessors < 3) ? 3 : NumberOfProcessors;
840
841 ThreadCounter::Counts counts = CPThreadCounter.GetCleanCounts();
842 if ((MaxLimitTotalCPThreads - counts.NumActive) >= MinimumAvailableCPThreads )
843 {
844 ThreadLocaleHolder localeHolder;
845
846 QueueUserWorkItemHelp(Function, Context);
847 return TRUE;
848 }
849
850 }
851
852 if (UnmanagedTPRequest)
853 {
854 UnManagedPerAppDomainTPCount* pADTPCount;
855 pADTPCount = PerAppDomainTPCountList::GetUnmanagedTPCount();
856 pADTPCount->QueueUnmanagedWorkRequest(Function, Context);
857 }
858 else
859 {
860 // caller has already registered its TPCount; this call is just to adjust the thread count
861 }
862
863 return TRUE;
864}
865
866
867bool ThreadpoolMgr::ShouldWorkerKeepRunning()
868{
869 WRAPPER_NO_CONTRACT;
870
871 //
872 // Maybe this thread should retire now. Let's see.
873 //
874 bool shouldThisThreadKeepRunning = true;
875
876 // Dirty read is OK here; the worst that can happen is that we won't retire this time. In the
877 // case where we might retire, we have to succeed a CompareExchange, which will have the effect
878 // of validating this read.
879 ThreadCounter::Counts counts = WorkerCounter.DangerousGetDirtyCounts();
880 while (true)
881 {
882 if (counts.NumActive <= counts.MaxWorking)
883 {
884 shouldThisThreadKeepRunning = true;
885 break;
886 }
887
888 ThreadCounter::Counts newCounts = counts;
889 newCounts.NumWorking--;
890 newCounts.NumActive--;
891 newCounts.NumRetired++;
892
893 ThreadCounter::Counts oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts);
894
895 if (oldCounts == counts)
896 {
897 shouldThisThreadKeepRunning = false;
898 break;
899 }
900
901 counts = oldCounts;
902 }
903
904 return shouldThisThreadKeepRunning;
905}
906
907DangerousNonHostedSpinLock ThreadpoolMgr::ThreadAdjustmentLock;
908
909
910//
911// This method must only be called if ShouldAdjustMaxWorkersActive has returned true, *and*
912// ThreadAdjustmentLock is held.
913//
914void ThreadpoolMgr::AdjustMaxWorkersActive()
915{
916 CONTRACTL
917 {
918 NOTHROW;
919 if (GetThread()) { GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);}
920 MODE_ANY;
921 }
922 CONTRACTL_END;
923
924 _ASSERTE(ThreadAdjustmentLock.IsHeld());
925
926 DWORD currentTicks = GetTickCount();
927 LONG totalNumCompletions = Thread::GetTotalThreadPoolCompletionCount();
928 LONG numCompletions = totalNumCompletions - VolatileLoad(&PriorCompletedWorkRequests);
929
930 LARGE_INTEGER startTime = CurrentSampleStartTime;
931 LARGE_INTEGER endTime;
932 QueryPerformanceCounter(&endTime);
933
934 static LARGE_INTEGER freq;
935 if (freq.QuadPart == 0)
936 QueryPerformanceFrequency(&freq);
937
938 double elapsed = (double)(endTime.QuadPart - startTime.QuadPart) / freq.QuadPart;
939
940 //
941 // It's possible for the current sample to be reset while we're holding
942 // ThreadAdjustmentLock. This will result in a very short sample, possibly
943 // with completely bogus counts. We'll try to detect this by checking the sample
944 // interval; if it's very short, then we try again later.
945 //
946 if (elapsed*1000.0 >= (ThreadAdjustmentInterval/2))
947 {
948 ThreadCounter::Counts currentCounts = WorkerCounter.GetCleanCounts();
949
950 int newMax = HillClimbingInstance.Update(
951 currentCounts.MaxWorking,
952 elapsed,
953 numCompletions,
954 &ThreadAdjustmentInterval);
955
956 while (newMax != currentCounts.MaxWorking)
957 {
958 ThreadCounter::Counts newCounts = currentCounts;
959 newCounts.MaxWorking = newMax;
960
961 ThreadCounter::Counts oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, currentCounts);
962 if (oldCounts == currentCounts)
963 {
964 //
965 // If we're increasing the max, inject a thread. If that thread finds work, it will inject
966 // another thread, etc., until nobody finds work or we reach the new maximum.
967 //
968 // If we're reducing the max, whichever threads notice this first will retire themselves.
969 //
970 if (newMax > oldCounts.MaxWorking)
971 MaybeAddWorkingWorker();
972
973 break;
974 }
975 else
976 {
977 // we failed - maybe try again
978 if (oldCounts.MaxWorking > currentCounts.MaxWorking &&
979 oldCounts.MaxWorking >= newMax)
980 {
981 // someone (probably the gate thread) increased the thread count more than
982 // we are about to do. Don't interfere.
983 break;
984 }
985
986 currentCounts = oldCounts;
987 }
988 }
989
990 PriorCompletedWorkRequests = totalNumCompletions;
991 NextCompletedWorkRequestsTime = currentTicks + ThreadAdjustmentInterval;
992 MemoryBarrier(); // flush previous writes (especially NextCompletedWorkRequestsTime)
993 PriorCompletedWorkRequestsTime = currentTicks;
994 CurrentSampleStartTime = endTime;;
995 }
996}
997
998
999void ThreadpoolMgr::MaybeAddWorkingWorker()
1000{
1001 CONTRACTL
1002 {
1003 NOTHROW;
1004 if (GetThread()) { GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);}
1005 MODE_ANY;
1006 }
1007 CONTRACTL_END;
1008
1009 // counts volatile read paired with CompareExchangeCounts loop set
1010 ThreadCounter::Counts counts = WorkerCounter.DangerousGetDirtyCounts();
1011 ThreadCounter::Counts newCounts;
1012 while (true)
1013 {
1014 newCounts = counts;
1015 newCounts.NumWorking = max(counts.NumWorking, min(counts.NumWorking + 1, counts.MaxWorking));
1016 newCounts.NumActive = max(counts.NumActive, newCounts.NumWorking);
1017 newCounts.NumRetired = max(0, counts.NumRetired - (newCounts.NumActive - counts.NumActive));
1018
1019 if (newCounts == counts)
1020 return;
1021
1022 ThreadCounter::Counts oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts);
1023
1024 if (oldCounts == counts)
1025 break;
1026
1027 counts = oldCounts;
1028 }
1029
1030 int toUnretire = counts.NumRetired - newCounts.NumRetired;
1031 int toCreate = (newCounts.NumActive - counts.NumActive) - toUnretire;
1032 int toRelease = (newCounts.NumWorking - counts.NumWorking) - (toUnretire + toCreate);
1033
1034 _ASSERTE(toUnretire >= 0);
1035 _ASSERTE(toCreate >= 0);
1036 _ASSERTE(toRelease >= 0);
1037 _ASSERTE(toUnretire + toCreate + toRelease <= 1);
1038
1039 if (toUnretire > 0)
1040 {
1041 RetiredWorkerSemaphore->Release(toUnretire);
1042 }
1043
1044 if (toRelease > 0)
1045 WorkerSemaphore->Release(toRelease);
1046
1047 while (toCreate > 0)
1048 {
1049 if (CreateWorkerThread())
1050 {
1051 toCreate--;
1052 }
1053 else
1054 {
1055 //
1056 // Uh-oh, we promised to create a new thread, but the creation failed. We have to renege on our
1057 // promise. This may possibly result in no work getting done for a while, but the gate thread will
1058 // eventually notice that no completions are happening and force the creation of a new thread.
1059 // Of course, there's no guarantee *that* will work - but hopefully enough time will have passed
1060 // to allow whoever's using all the memory right now to release some.
1061 //
1062
1063 // counts volatile read paired with CompareExchangeCounts loop set
1064 counts = WorkerCounter.DangerousGetDirtyCounts();
1065 while (true)
1066 {
1067 //
1068 // If we said we would create a thread, we also said it would be working. So we need to
1069 // decrement both NumWorking and NumActive by the number of threads we will no longer be creating.
1070 //
1071 newCounts = counts;
1072 newCounts.NumWorking -= toCreate;
1073 newCounts.NumActive -= toCreate;
1074
1075 ThreadCounter::Counts oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts);
1076
1077 if (oldCounts == counts)
1078 break;
1079
1080 counts = oldCounts;
1081 }
1082
1083 toCreate = 0;
1084 }
1085 }
1086}
1087
1088BOOL ThreadpoolMgr::PostQueuedCompletionStatus(LPOVERLAPPED lpOverlapped,
1089 LPOVERLAPPED_COMPLETION_ROUTINE Function)
1090{
1091 CONTRACTL
1092 {
1093 THROWS; // EnsureInitialized can throw OOM
1094 GC_TRIGGERS;
1095 MODE_ANY;
1096 }
1097 CONTRACTL_END;
1098
1099#ifndef FEATURE_PAL
1100 EnsureInitialized();
1101
1102 _ASSERTE(GlobalCompletionPort != NULL);
1103
1104 if (!InitCompletionPortThreadpool)
1105 InitCompletionPortThreadpool = TRUE;
1106
1107 GrowCompletionPortThreadpoolIfNeeded();
1108
1109 // In order to allow external ETW listeners to correlate activities that use our IO completion port
1110 // as a dispatch mechanism, we have to ensure the runtime's calls to ::PostQueuedCompletionStatus
1111 // and ::GetQueuedCompletionStatus are "annotated" with ETW events representing to operations
1112 // performed.
1113 // There are currently 4 codepaths that post to the GlobalCompletionPort:
1114 // 1. and 2. - the Overlapped drainage events. Those are uninteresting to ETW listeners and
1115 // currently call the global ::PostQueuedCompletionStatus directly.
1116 // 3. the managed API ThreadPool.UnsafeQueueNativeOverlapped(), calling CorPostQueuedCompletionStatus()
1117 // which already fires the ETW event as needed
1118 // 4. the managed API ThreadPool.RegisterWaitForSingleObject which needs to fire the ETW event
1119 // at the time the managed API is called (on the orignial user thread), and not when the ::PQCS
1120 // is called (from the dedicated wait thread).
1121 // If additional codepaths appear they need to either fire the ETW event before calling this or ensure
1122 // we do not fire an unmatched "dequeue" event in ThreadpoolMgr::CompletionPortThreadStart
1123 // The current possible values for Function:
1124 // - CallbackForInitiateDrainageOfCompletionPortQueue and
1125 // CallbackForContinueDrainageOfCompletionPortQueue for Drainage
1126 // - BindIoCompletionCallbackStub for ThreadPool.UnsafeQueueNativeOverlapped
1127 // - WaitIOCompletionCallback for ThreadPool.RegisterWaitForSingleObject
1128
1129 return ::PostQueuedCompletionStatus(GlobalCompletionPort,
1130 0,
1131 (ULONG_PTR) Function,
1132 lpOverlapped);
1133#else
1134 SetLastError(ERROR_CALL_NOT_IMPLEMENTED);
1135 return FALSE;
1136#endif // !FEATURE_PAL
1137}
1138
1139
1140void ThreadpoolMgr::WaitIOCompletionCallback(
1141 DWORD dwErrorCode,
1142 DWORD numBytesTransferred,
1143 LPOVERLAPPED lpOverlapped)
1144{
1145 CONTRACTL
1146 {
1147 THROWS;
1148 MODE_ANY;
1149 }
1150 CONTRACTL_END;
1151
1152 if (dwErrorCode == ERROR_SUCCESS)
1153 DWORD ret = AsyncCallbackCompletion((PVOID)lpOverlapped);
1154}
1155
1156#ifndef FEATURE_PAL
1157// We need to make sure that the next jobs picked up by a completion port thread
1158// is inserted into the queue after we start cleanup. The cleanup starts when a completion
1159// port thread processes a special overlapped (overlappedForInitiateCleanup).
1160// To do this, we loop through all completion port threads.
1161// 1. If a thread is in cooperative mode, it is processing a job now, and the next job
1162// it picks up will be after we start cleanup.
1163// 2. A completion port thread may be waiting for a job, or is going to dispatch a job.
1164// We can not distinguish these two. So we queue a dummy job to the queue after the starting
1165// job.
1166OVERLAPPED overlappedForInitiateCleanup;
1167OVERLAPPED overlappedForContinueCleanup;
1168#endif // !FEATURE_PAL
1169
1170Volatile<ULONG> g_fCompletionPortDrainNeeded = FALSE;
1171
1172VOID ThreadpoolMgr::CallbackForContinueDrainageOfCompletionPortQueue(
1173 DWORD dwErrorCode,
1174 DWORD dwNumberOfBytesTransfered,
1175 LPOVERLAPPED lpOverlapped
1176 )
1177{
1178 CONTRACTL
1179 {
1180 NOTHROW;
1181 GC_NOTRIGGER;
1182 MODE_PREEMPTIVE;
1183 }
1184 CONTRACTL_END;
1185
1186#ifndef FEATURE_PAL
1187 CounterHolder hldNumCPIT(&NumCPInfrastructureThreads);
1188
1189 // It is OK if this overlapped is from a previous round.
1190 // We have started a new round. The next job picked by this thread is
1191 // going to be after the marker.
1192 Thread* pThread = GetThread();
1193 if (pThread && !pThread->IsCompletionPortDrained())
1194 {
1195 pThread->MarkCompletionPortDrained();
1196 }
1197 if (g_fCompletionPortDrainNeeded)
1198 {
1199 ::PostQueuedCompletionStatus(GlobalCompletionPort,
1200 0,
1201 (ULONG_PTR)CallbackForContinueDrainageOfCompletionPortQueue,
1202 &overlappedForContinueCleanup);
1203 // IO Completion port thread is LIFO queue. We want our special packet to be picked up by a different thread.
1204 while (g_fCompletionPortDrainNeeded && pThread->IsCompletionPortDrained())
1205 {
1206 __SwitchToThread(100, CALLER_LIMITS_SPINNING);
1207 }
1208 }
1209#endif // !FEATURE_PAL
1210}
1211
1212
1213VOID
1214ThreadpoolMgr::CallbackForInitiateDrainageOfCompletionPortQueue(
1215 DWORD dwErrorCode,
1216 DWORD dwNumberOfBytesTransfered,
1217 LPOVERLAPPED lpOverlapped
1218 )
1219{
1220 #ifndef FEATURE_PAL
1221 CONTRACTL
1222 {
1223 NOTHROW;
1224 MODE_ANY;
1225 }
1226 CONTRACTL_END;
1227
1228 CounterHolder hldNumCPIT(&NumCPInfrastructureThreads);
1229 {
1230 ThreadStoreLockHolder tsl;
1231 Thread *pThread = NULL;
1232 while ((pThread = ThreadStore::GetAllThreadList(pThread, Thread::TS_CompletionPortThread, Thread::TS_CompletionPortThread)) != NULL)
1233 {
1234 pThread->UnmarkCompletionPortDrained();
1235 }
1236 }
1237
1238 FastInterlockOr(&g_fCompletionPortDrainNeeded, 1);
1239
1240 // Wake up retiring CP Threads so it can mark its status.
1241 ThreadCounter::Counts counts = CPThreadCounter.GetCleanCounts();
1242 if (counts.NumRetired > 0)
1243 RetiredCPWakeupEvent->Set();
1244
1245 DWORD nTry = 0;
1246 BOOL fTryNextTime = FALSE;
1247 BOOL fMore = TRUE;
1248 BOOL fFirstTime = TRUE;
1249 while (fMore)
1250 {
1251 fMore = FALSE;
1252 Thread *pCurThread = GetThread();
1253 Thread *pThread = NULL;
1254 {
1255
1256 ThreadStoreLockHolder tsl;
1257
1258 ::FlushProcessWriteBuffers();
1259
1260 while ((pThread = ThreadStore::GetAllThreadList(pThread, Thread::TS_CompletionPortThread, Thread::TS_CompletionPortThread)) != NULL)
1261 {
1262 if (pThread == pCurThread || pThread->IsDead() || pThread->IsCompletionPortDrained())
1263 {
1264 continue;
1265 }
1266
1267 if (pThread->PreemptiveGCDisabledOther() || pThread->GetFrame() != FRAME_TOP)
1268 {
1269 // The thread is processing an IO job now. When it picks up next job, the job
1270 // will be after the marker.
1271 pThread->MarkCompletionPortDrained();
1272 }
1273 else
1274 {
1275 if (fFirstTime)
1276 {
1277 ::PostQueuedCompletionStatus(GlobalCompletionPort,
1278 0,
1279 (ULONG_PTR)CallbackForContinueDrainageOfCompletionPortQueue,
1280 &overlappedForContinueCleanup);
1281 }
1282 fMore = TRUE;
1283 }
1284 }
1285 }
1286 if (fMore)
1287 {
1288 __SwitchToThread(10, CALLER_LIMITS_SPINNING);
1289 nTry ++;
1290 if (nTry > 1000)
1291 {
1292 fTryNextTime = TRUE;
1293 break;
1294 }
1295 }
1296 fFirstTime = FALSE;
1297 }
1298
1299 FastInterlockAnd(&g_fCompletionPortDrainNeeded, 0);
1300#endif // !FEATURE_PAL
1301}
1302
1303extern void WINAPI BindIoCompletionCallbackStub(DWORD ErrorCode,
1304 DWORD numBytesTransferred,
1305 LPOVERLAPPED lpOverlapped);
1306
1307void HostIOCompletionCallback(
1308 DWORD ErrorCode,
1309 DWORD numBytesTransferred,
1310 LPOVERLAPPED lpOverlapped)
1311{
1312#ifndef FEATURE_PAL
1313 if (lpOverlapped == &overlappedForInitiateCleanup)
1314 {
1315 ThreadpoolMgr::CallbackForInitiateDrainageOfCompletionPortQueue (
1316 ErrorCode,
1317 numBytesTransferred,
1318 lpOverlapped);
1319 }
1320 else if (lpOverlapped == &overlappedForContinueCleanup)
1321 {
1322 ThreadpoolMgr::CallbackForContinueDrainageOfCompletionPortQueue(
1323 ErrorCode,
1324 numBytesTransferred,
1325 lpOverlapped);
1326 }
1327 else
1328 {
1329 BindIoCompletionCallbackStub (
1330 ErrorCode,
1331 numBytesTransferred,
1332 lpOverlapped);
1333 }
1334#endif // !FEATURE_PAL
1335}
1336
1337BOOL ThreadpoolMgr::DrainCompletionPortQueue()
1338{
1339#ifndef FEATURE_PAL
1340 CONTRACTL
1341 {
1342 NOTHROW;
1343 GC_TRIGGERS;
1344 MODE_ANY;
1345 }
1346 CONTRACTL_END;
1347
1348 if (GlobalCompletionPort == 0)
1349 {
1350 return FALSE;
1351 }
1352
1353 return ::PostQueuedCompletionStatus(GlobalCompletionPort,
1354 0,
1355 (ULONG_PTR)CallbackForInitiateDrainageOfCompletionPortQueue,
1356 &overlappedForInitiateCleanup);
1357#else
1358 return FALSE;
1359#endif // !FEATURE_PAL
1360}
1361
1362
1363// This is either made by a worker thread or a CP thread
1364// indicated by threadTypeStatus
1365void ThreadpoolMgr::EnsureGateThreadRunning()
1366{
1367 LIMITED_METHOD_CONTRACT;
1368
1369 while (true)
1370 {
1371 switch (GateThreadStatus)
1372 {
1373 case GATE_THREAD_STATUS_REQUESTED:
1374 //
1375 // No action needed; the gate thread is running, and someone else has already registered a request
1376 // for it to stay.
1377 //
1378 return;
1379
1380 case GATE_THREAD_STATUS_WAITING_FOR_REQUEST:
1381 //
1382 // Prevent the gate thread from exiting, if it hasn't already done so. If it has, we'll create it on the next iteration of
1383 // this loop.
1384 //
1385 FastInterlockCompareExchange(&GateThreadStatus, GATE_THREAD_STATUS_REQUESTED, GATE_THREAD_STATUS_WAITING_FOR_REQUEST);
1386 break;
1387
1388 case GATE_THREAD_STATUS_NOT_RUNNING:
1389 //
1390 // We need to create a new gate thread
1391 //
1392 if (FastInterlockCompareExchange(&GateThreadStatus, GATE_THREAD_STATUS_REQUESTED, GATE_THREAD_STATUS_NOT_RUNNING) == GATE_THREAD_STATUS_NOT_RUNNING)
1393 {
1394 if (!CreateGateThread())
1395 {
1396 //
1397 // If we failed to create the gate thread, someone else will need to try again later.
1398 //
1399 GateThreadStatus = GATE_THREAD_STATUS_NOT_RUNNING;
1400 }
1401 return;
1402 }
1403 break;
1404
1405 default:
1406 _ASSERTE(!"Invalid value of ThreadpoolMgr::GateThreadStatus");
1407 }
1408 }
1409
1410 return;
1411}
1412
1413
1414bool ThreadpoolMgr::ShouldGateThreadKeepRunning()
1415{
1416 LIMITED_METHOD_CONTRACT;
1417
1418 _ASSERTE(GateThreadStatus == GATE_THREAD_STATUS_WAITING_FOR_REQUEST ||
1419 GateThreadStatus == GATE_THREAD_STATUS_REQUESTED);
1420
1421 //
1422 // Switch to WAITING_FOR_REQUEST, and see if we had a request since the last check.
1423 //
1424 LONG previousStatus = FastInterlockExchange(&GateThreadStatus, GATE_THREAD_STATUS_WAITING_FOR_REQUEST);
1425
1426 if (previousStatus == GATE_THREAD_STATUS_WAITING_FOR_REQUEST)
1427 {
1428 //
1429 // No recent requests for the gate thread. Check to see if we're still needed.
1430 //
1431
1432 //
1433 // Are there any free threads in the I/O completion pool? If there are, we don't need a gate thread.
1434 // This implies that whenever we decrement NumFreeCPThreads to 0, we need to call EnsureGateThreadRunning().
1435 //
1436 ThreadCounter::Counts counts = CPThreadCounter.GetCleanCounts();
1437 bool needGateThreadForCompletionPort =
1438 InitCompletionPortThreadpool &&
1439 (counts.NumActive - counts.NumWorking) <= 0;
1440
1441 //
1442 // Are there any work requests in any worker queue? If so, we need a gate thread.
1443 // This imples that whenever a work queue goes from empty to non-empty, we need to call EnsureGateThreadRunning().
1444 //
1445 bool needGateThreadForWorkerThreads =
1446 PerAppDomainTPCountList::AreRequestsPendingInAnyAppDomains();
1447
1448 //
1449 // If worker tracking is enabled, we need to fire periodic ETW events with active worker counts. This is
1450 // done by the gate thread.
1451 // We don't have to do anything special with EnsureGateThreadRunning() here, because this is only needed
1452 // once work has been added to the queue for the first time (which is covered above).
1453 //
1454 bool needGateThreadForWorkerTracking =
1455 0 != CLRConfig::GetConfigValue(CLRConfig::INTERNAL_ThreadPool_EnableWorkerTracking);
1456
1457 if (!(needGateThreadForCompletionPort ||
1458 needGateThreadForWorkerThreads ||
1459 needGateThreadForWorkerTracking))
1460 {
1461 //
1462 // It looks like we shouldn't be running. But another thread may now tell us to run. If so, they will set GateThreadStatus
1463 // back to GATE_THREAD_STATUS_REQUESTED.
1464 //
1465 previousStatus = FastInterlockCompareExchange(&GateThreadStatus, GATE_THREAD_STATUS_NOT_RUNNING, GATE_THREAD_STATUS_WAITING_FOR_REQUEST);
1466 if (previousStatus == GATE_THREAD_STATUS_WAITING_FOR_REQUEST)
1467 return false;
1468 }
1469 }
1470
1471
1472 _ASSERTE(GateThreadStatus == GATE_THREAD_STATUS_WAITING_FOR_REQUEST ||
1473 GateThreadStatus == GATE_THREAD_STATUS_REQUESTED);
1474 return true;
1475}
1476
1477
1478
1479//************************************************************************
1480void ThreadpoolMgr::EnqueueWorkRequest(WorkRequest* workRequest)
1481{
1482 CONTRACTL
1483 {
1484 NOTHROW;
1485 MODE_ANY;
1486 GC_NOTRIGGER;
1487 }
1488 CONTRACTL_END;
1489
1490 AppendWorkRequest(workRequest);
1491}
1492
1493WorkRequest* ThreadpoolMgr::DequeueWorkRequest()
1494{
1495 WorkRequest* entry = NULL;
1496 CONTRACT(WorkRequest*)
1497 {
1498 NOTHROW;
1499 GC_NOTRIGGER;
1500 MODE_PREEMPTIVE;
1501
1502 POSTCONDITION(CheckPointer(entry, NULL_OK));
1503 } CONTRACT_END;
1504
1505 entry = RemoveWorkRequest();
1506
1507 RETURN entry;
1508}
1509
1510DWORD WINAPI ThreadpoolMgr::ExecuteHostRequest(PVOID pArg)
1511{
1512 CONTRACTL
1513 {
1514 THROWS;
1515 GC_TRIGGERS;
1516 MODE_ANY;
1517 }
1518 CONTRACTL_END;
1519
1520 ThreadLocaleHolder localeHolder;
1521
1522 bool foundWork, wasNotRecalled;
1523 ExecuteWorkRequest(&foundWork, &wasNotRecalled);
1524 return ERROR_SUCCESS;
1525}
1526
1527void ThreadpoolMgr::ExecuteWorkRequest(bool* foundWork, bool* wasNotRecalled)
1528{
1529 CONTRACTL
1530 {
1531 THROWS; // QueueUserWorkItem can throw
1532 GC_TRIGGERS;
1533 MODE_ANY;
1534 }
1535 CONTRACTL_END;
1536
1537 IPerAppDomainTPCount* pAdCount;
1538
1539 LONG index = PerAppDomainTPCountList::GetAppDomainIndexForThreadpoolDispatch();
1540
1541 if (index == 0)
1542 {
1543 *foundWork = false;
1544 *wasNotRecalled = true;
1545 return;
1546 }
1547
1548 if (index == -1)
1549 {
1550 pAdCount = PerAppDomainTPCountList::GetUnmanagedTPCount();
1551 }
1552 else
1553 {
1554
1555 pAdCount = PerAppDomainTPCountList::GetPerAppdomainCount(TPIndex((DWORD)index));
1556 _ASSERTE(pAdCount);
1557 }
1558
1559 pAdCount->DispatchWorkItem(foundWork, wasNotRecalled);
1560}
1561
1562//--------------------------------------------------------------------------
1563//This function informs the thread scheduler that the first requests has been
1564//queued on an appdomain, or it's the first unmanaged TP request.
1565//Arguments:
1566// UnmanagedTP: Indicates that the request arises from the unmanaged
1567//part of Thread Pool.
1568//Assumptions:
1569// This function must be called under a per-appdomain lock or the
1570//correct lock under unmanaged TP queue.
1571//
1572BOOL ThreadpoolMgr::SetAppDomainRequestsActive(BOOL UnmanagedTP)
1573{
1574 CONTRACTL
1575 {
1576 NOTHROW;
1577 MODE_ANY;
1578 GC_TRIGGERS;
1579 SO_INTOLERANT;
1580 }
1581 CONTRACTL_END;
1582
1583 BOOL fShouldSignalEvent = FALSE;
1584
1585 IPerAppDomainTPCount* pAdCount;
1586
1587 if(UnmanagedTP)
1588 {
1589 pAdCount = PerAppDomainTPCountList::GetUnmanagedTPCount();
1590 _ASSERTE(pAdCount);
1591 }
1592 else
1593 {
1594 Thread* pCurThread = GetThread();
1595 _ASSERTE( pCurThread);
1596
1597 AppDomain* pAppDomain = pCurThread->GetDomain();
1598 _ASSERTE(pAppDomain);
1599
1600 TPIndex tpindex = pAppDomain->GetTPIndex();
1601 pAdCount = PerAppDomainTPCountList::GetPerAppdomainCount(tpindex);
1602
1603 _ASSERTE(pAdCount);
1604 }
1605
1606 pAdCount->SetAppDomainRequestsActive();
1607
1608 return fShouldSignalEvent;
1609}
1610
1611void ThreadpoolMgr::ClearAppDomainRequestsActive(BOOL UnmanagedTP, BOOL AdUnloading, LONG id)
1612//--------------------------------------------------------------------------
1613//This function informs the thread scheduler that the kast request has been
1614//dequeued on an appdomain, or it's the last unmanaged TP request.
1615//Arguments:
1616// UnmanagedTP: Indicates that the request arises from the unmanaged
1617//part of Thread Pool.
1618// id: Indicates the id of the appdomain. The id is needed as this
1619//function can be called (indirectly) from the appdomain unload thread from
1620//unmanaged code to clear per-appdomain state during rude unload.
1621//Assumptions:
1622// This function must be called under a per-appdomain lock or the
1623//correct lock under unmanaged TP queue.
1624//
1625{
1626 CONTRACTL
1627 {
1628 NOTHROW;
1629 MODE_ANY;
1630 GC_TRIGGERS;
1631 SO_INTOLERANT;
1632 }
1633 CONTRACTL_END;
1634
1635 IPerAppDomainTPCount* pAdCount;
1636
1637 if(UnmanagedTP)
1638 {
1639 pAdCount = PerAppDomainTPCountList::GetUnmanagedTPCount();
1640 _ASSERTE(pAdCount);
1641 }
1642 else
1643 {
1644 if (AdUnloading)
1645 {
1646 pAdCount = PerAppDomainTPCountList::GetPerAppdomainCount(TPIndex(id));
1647 }
1648 else
1649 {
1650 Thread* pCurThread = GetThread();
1651 _ASSERTE( pCurThread);
1652
1653 AppDomain* pAppDomain = pCurThread->GetDomain();
1654 _ASSERTE(pAppDomain);
1655
1656 TPIndex tpindex = pAppDomain->GetTPIndex();
1657
1658 pAdCount = PerAppDomainTPCountList::GetPerAppdomainCount(tpindex);
1659 }
1660
1661 _ASSERTE(pAdCount);
1662 }
1663
1664 pAdCount->ClearAppDomainRequestsActive();
1665}
1666
1667
1668// Remove a block from the appropriate recycleList and return.
1669// If recycleList is empty, fall back to new.
1670LPVOID ThreadpoolMgr::GetRecycledMemory(enum MemType memType)
1671{
1672 LPVOID result = NULL;
1673 CONTRACT(LPVOID)
1674 {
1675 THROWS;
1676 GC_NOTRIGGER;
1677 MODE_ANY;
1678 INJECT_FAULT(COMPlusThrowOM());
1679 POSTCONDITION(CheckPointer(result));
1680 } CONTRACT_END;
1681
1682 if(RecycledLists.IsInitialized())
1683 {
1684 RecycledListInfo& list = RecycledLists.GetRecycleMemoryInfo( memType );
1685
1686 result = list.Remove();
1687 }
1688
1689 if(result == NULL)
1690 {
1691 switch (memType)
1692 {
1693 case MEMTYPE_DelegateInfo:
1694 result = new DelegateInfo;
1695 break;
1696 case MEMTYPE_AsyncCallback:
1697 result = new AsyncCallback;
1698 break;
1699 case MEMTYPE_WorkRequest:
1700 result = new WorkRequest;
1701 break;
1702 default:
1703 _ASSERTE(!"Unknown Memtype");
1704 result = NULL;
1705 break;
1706 }
1707 }
1708
1709 RETURN result;
1710}
1711
1712// Insert freed block in recycle list. If list is full, return to system heap
1713void ThreadpoolMgr::RecycleMemory(LPVOID mem, enum MemType memType)
1714{
1715 CONTRACTL
1716 {
1717 NOTHROW;
1718 GC_NOTRIGGER;
1719 SO_TOLERANT;
1720 MODE_ANY;
1721 }
1722 CONTRACTL_END;
1723
1724 if(RecycledLists.IsInitialized())
1725 {
1726 RecycledListInfo& list = RecycledLists.GetRecycleMemoryInfo( memType );
1727
1728 if(list.CanInsert())
1729 {
1730 list.Insert( mem );
1731 return;
1732 }
1733 }
1734
1735 switch (memType)
1736 {
1737 case MEMTYPE_DelegateInfo:
1738 delete (DelegateInfo*) mem;
1739 break;
1740 case MEMTYPE_AsyncCallback:
1741 delete (AsyncCallback*) mem;
1742 break;
1743 case MEMTYPE_WorkRequest:
1744 delete (WorkRequest*) mem;
1745 break;
1746 default:
1747 _ASSERTE(!"Unknown Memtype");
1748
1749 }
1750}
1751
1752#define THROTTLE_RATE 0.10 /* rate by which we increase the delay as number of threads increase */
1753
1754// This is to avoid the 64KB/1MB aliasing problem present on Pentium 4 processors,
1755// which can significantly impact performance with HyperThreading enabled
1756DWORD WINAPI ThreadpoolMgr::intermediateThreadProc(PVOID arg)
1757{
1758 WRAPPER_NO_CONTRACT;
1759 STATIC_CONTRACT_SO_INTOLERANT;
1760
1761 offset_counter++;
1762 if (offset_counter * offset_multiplier > (int)GetOsPageSize())
1763 offset_counter = 0;
1764
1765 (void)_alloca(offset_counter * offset_multiplier);
1766
1767 intermediateThreadParam* param = (intermediateThreadParam*)arg;
1768
1769 LPTHREAD_START_ROUTINE ThreadFcnPtr = param->lpThreadFunction;
1770 PVOID args = param->lpArg;
1771 delete param;
1772
1773 return ThreadFcnPtr(args);
1774}
1775
1776Thread* ThreadpoolMgr::CreateUnimpersonatedThread(LPTHREAD_START_ROUTINE lpStartAddress, LPVOID lpArgs, BOOL *pIsCLRThread)
1777{
1778 STATIC_CONTRACT_NOTHROW;
1779 if (GetThread()) { STATIC_CONTRACT_GC_TRIGGERS;} else {DISABLED(STATIC_CONTRACT_GC_NOTRIGGER);}
1780 STATIC_CONTRACT_MODE_ANY;
1781 /* cannot use contract because of SEH
1782 CONTRACTL
1783 {
1784 NOTHROW;
1785 GC_NOTRIGGER;
1786 MODE_ANY;
1787 }
1788 CONTRACTL_END;*/
1789
1790 Thread* pThread = NULL;
1791
1792 if (g_fEEStarted) {
1793 *pIsCLRThread = TRUE;
1794 }
1795 else
1796 *pIsCLRThread = FALSE;
1797 if (*pIsCLRThread) {
1798 EX_TRY
1799 {
1800 pThread = SetupUnstartedThread();
1801 }
1802 EX_CATCH
1803 {
1804 pThread = NULL;
1805 }
1806 EX_END_CATCH(SwallowAllExceptions);
1807 if (pThread == NULL) {
1808 return NULL;
1809 }
1810 }
1811 DWORD threadId;
1812 BOOL bOK = FALSE;
1813 HANDLE threadHandle = NULL;
1814
1815 if (*pIsCLRThread) {
1816 // CreateNewThread takes care of reverting any impersonation - so dont do anything here.
1817 bOK = pThread->CreateNewThread(0, // default stack size
1818 lpStartAddress,
1819 lpArgs, //arguments
1820 W(".NET ThreadPool Worker"));
1821 }
1822 else {
1823#ifndef FEATURE_PAL
1824 HandleHolder token;
1825 BOOL bReverted = FALSE;
1826 bOK = RevertIfImpersonated(&bReverted, &token);
1827 if (bOK != TRUE)
1828 return NULL;
1829#endif // !FEATURE_PAL
1830 NewHolder<intermediateThreadParam> lpThreadArgs(new (nothrow) intermediateThreadParam);
1831 if (lpThreadArgs != NULL)
1832 {
1833 lpThreadArgs->lpThreadFunction = lpStartAddress;
1834 lpThreadArgs->lpArg = lpArgs;
1835 threadHandle = CreateThread(NULL, // security descriptor
1836 0, // default stack size
1837 intermediateThreadProc,
1838 lpThreadArgs, // arguments
1839 CREATE_SUSPENDED,
1840 &threadId);
1841#ifndef FEATURE_PAL
1842 SetThreadName(threadHandle, W(".NET ThreadPool Worker"));
1843#endif // !FEATURE_PAL
1844 if (threadHandle != NULL)
1845 lpThreadArgs.SuppressRelease();
1846 }
1847#ifndef FEATURE_PAL
1848 UndoRevert(bReverted, token);
1849#endif // !FEATURE_PAL
1850 }
1851
1852 if (*pIsCLRThread && !bOK)
1853 {
1854 pThread->DecExternalCount(FALSE);
1855 pThread = NULL;
1856 }
1857
1858 if (*pIsCLRThread) {
1859 return pThread;
1860 }
1861 else
1862 return (Thread*)threadHandle;
1863}
1864
1865
1866BOOL ThreadpoolMgr::CreateWorkerThread()
1867{
1868 CONTRACTL
1869 {
1870 if (GetThread()) { GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);}
1871 NOTHROW;
1872 MODE_ANY; // We may try to add a worker thread while queuing a work item thru an fcall
1873 }
1874 CONTRACTL_END;
1875
1876 Thread *pThread;
1877 BOOL fIsCLRThread;
1878 if ((pThread = CreateUnimpersonatedThread(WorkerThreadStart, NULL, &fIsCLRThread)) != NULL)
1879 {
1880 if (fIsCLRThread) {
1881 pThread->ChooseThreadCPUGroupAffinity();
1882 pThread->StartThread();
1883 }
1884 else {
1885 DWORD status;
1886 status = ResumeThread((HANDLE)pThread);
1887 _ASSERTE(status != (DWORD) (-1));
1888 CloseHandle((HANDLE)pThread); // we don't need this anymore
1889 }
1890
1891 return TRUE;
1892 }
1893
1894 return FALSE;
1895}
1896
1897
1898DWORD WINAPI ThreadpoolMgr::WorkerThreadStart(LPVOID lpArgs)
1899{
1900 ClrFlsSetThreadType (ThreadType_Threadpool_Worker);
1901
1902 CONTRACTL
1903 {
1904 THROWS;
1905 GC_TRIGGERS;
1906 MODE_PREEMPTIVE;
1907 SO_INTOLERANT;
1908 }
1909 CONTRACTL_END;
1910
1911 Thread *pThread = NULL;
1912 DWORD dwSwitchCount = 0;
1913 BOOL fThreadInit = FALSE;
1914
1915 ThreadCounter::Counts counts, oldCounts, newCounts;
1916 bool foundWork = true, wasNotRecalled = true;
1917
1918 counts = WorkerCounter.GetCleanCounts();
1919 FireEtwThreadPoolWorkerThreadStart(counts.NumActive, counts.NumRetired, GetClrInstanceId());
1920
1921#ifdef FEATURE_COMINTEROP
1922 BOOL fCoInited = FALSE;
1923 // Threadpool threads should be initialized as MTA. If we are unable to do so,
1924 // return failure.
1925 {
1926 fCoInited = SUCCEEDED(::CoInitializeEx(NULL, COINIT_MULTITHREADED));
1927 if (!fCoInited)
1928 {
1929 goto Exit;
1930 }
1931 }
1932#endif // FEATURE_COMINTEROP
1933Work:
1934
1935 if (!fThreadInit) {
1936 if (g_fEEStarted) {
1937 pThread = SetupThreadNoThrow();
1938 if (pThread == NULL) {
1939 __SwitchToThread(0, ++dwSwitchCount);
1940 goto Work;
1941 }
1942
1943 // converted to CLRThread and added to ThreadStore, pick an group affinity for this thread
1944 pThread->ChooseThreadCPUGroupAffinity();
1945
1946 #ifdef FEATURE_COMINTEROP
1947 if (pThread->SetApartment(Thread::AS_InMTA, TRUE) != Thread::AS_InMTA)
1948 {
1949 // counts volatile read paired with CompareExchangeCounts loop set
1950 counts = WorkerCounter.DangerousGetDirtyCounts();
1951 while (true)
1952 {
1953 newCounts = counts;
1954 newCounts.NumActive--;
1955 newCounts.NumWorking--;
1956 oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts);
1957 if (oldCounts == counts)
1958 break;
1959 counts = oldCounts;
1960 }
1961 goto Exit;
1962 }
1963 #endif // FEATURE_COMINTEROP
1964
1965 pThread->SetBackground(TRUE);
1966 fThreadInit = TRUE;
1967 }
1968 }
1969
1970 GCX_PREEMP_NO_DTOR();
1971 _ASSERTE(pThread == NULL || !pThread->PreemptiveGCDisabled());
1972
1973 // make sure there's really work. If not, go back to sleep
1974
1975 // counts volatile read paired with CompareExchangeCounts loop set
1976 counts = WorkerCounter.DangerousGetDirtyCounts();
1977 while (true)
1978 {
1979 _ASSERTE(counts.NumActive > 0);
1980 _ASSERTE(counts.NumWorking > 0);
1981
1982 newCounts = counts;
1983
1984 bool retired;
1985
1986 if (counts.NumActive > counts.MaxWorking)
1987 {
1988 newCounts.NumActive--;
1989 newCounts.NumRetired++;
1990 retired = true;
1991 }
1992 else
1993 {
1994 retired = false;
1995
1996 if (foundWork)
1997 break;
1998 }
1999
2000 newCounts.NumWorking--;
2001
2002 oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts);
2003
2004 if (oldCounts == counts)
2005 {
2006 if (retired)
2007 goto Retire;
2008 else
2009 goto WaitForWork;
2010 }
2011
2012 counts = oldCounts;
2013 }
2014
2015 if (GCHeapUtilities::IsGCInProgress(TRUE))
2016 {
2017 // GC is imminent, so wait until GC is complete before executing next request.
2018 // this reduces in-flight objects allocated right before GC, easing the GC's work
2019 GCHeapUtilities::WaitForGCCompletion(TRUE);
2020 }
2021
2022 {
2023 ThreadLocaleHolder localeHolder;
2024
2025 ThreadpoolMgr::UpdateLastDequeueTime();
2026 ThreadpoolMgr::ExecuteWorkRequest(&foundWork, &wasNotRecalled);
2027 }
2028
2029 if (foundWork)
2030 {
2031 // Reset TLS etc. for next WorkRequest.
2032 if (pThread == NULL)
2033 pThread = GetThread();
2034
2035 if (pThread)
2036 {
2037 if (pThread->IsAbortRequested())
2038 pThread->EEResetAbort(Thread::TAR_ALL);
2039 pThread->InternalReset();
2040 }
2041 }
2042
2043 if (wasNotRecalled)
2044 goto Work;
2045
2046Retire:
2047
2048 counts = WorkerCounter.GetCleanCounts();
2049 FireEtwThreadPoolWorkerThreadRetirementStart(counts.NumActive, counts.NumRetired, GetClrInstanceId());
2050
2051 // It's possible that some work came in just before we decremented the active thread count, in which
2052 // case whoever queued that work may be expecting us to pick it up - so they would not have signalled
2053 // the worker semaphore. If there are other threads waiting, they will never be woken up, because
2054 // whoever queued the work expects that it's already been picked up. The solution is to signal the semaphore
2055 // if there's any work available.
2056 if (PerAppDomainTPCountList::AreRequestsPendingInAnyAppDomains())
2057 MaybeAddWorkingWorker();
2058
2059 while (true)
2060 {
2061RetryRetire:
2062 if (RetiredWorkerSemaphore->Wait(AppX::IsAppXProcess() ? WorkerTimeoutAppX : WorkerTimeout))
2063 {
2064 foundWork = true;
2065
2066 counts = WorkerCounter.GetCleanCounts();
2067 FireEtwThreadPoolWorkerThreadRetirementStop(counts.NumActive, counts.NumRetired, GetClrInstanceId());
2068 goto Work;
2069 }
2070
2071 if (!IsIoPending())
2072 {
2073 //
2074 // We're going to exit. There's a nasty race here. We're about to decrement NumRetired,
2075 // since we're going to exit. Once we've done that, nobody will expect this thread
2076 // to be waiting for RetiredWorkerSemaphore. But between now and then, other threads still
2077 // think we're waiting on the semaphore, and they will happily do the following to try to
2078 // wake us up:
2079 //
2080 // 1) Decrement NumRetired
2081 // 2) Increment NumActive
2082 // 3) Increment NumWorking
2083 // 4) Signal RetiredWorkerSemaphore
2084 //
2085 // We will not receive that signal. If we don't do something special here,
2086 // we will decrement NumRetired an extra time, and leave the world thinking there
2087 // are fewer retired threads, and more working threads than reality.
2088 //
2089 // What can we do about this? First, we *need* to decrement NumRetired. If someone did it before us,
2090 // it might go negative. This is the easiest way to tell that we've encountered this race. In that case,
2091 // we will simply not commit the decrement, swallow the signal that was sent, and proceed as if we
2092 // got WAIT_OBJECT_0 in the wait above.
2093 //
2094 // If we don't hit zero while decrementing NumRetired, we still may have encountered this race. But
2095 // if we don't hit zero, then there's another retired thread that will pick up this signal. So it's ok
2096 // to exit.
2097 //
2098
2099 // counts volatile read paired with CompareExchangeCounts loop set
2100 counts = WorkerCounter.DangerousGetDirtyCounts();
2101 while (true)
2102 {
2103 if (counts.NumRetired == 0)
2104 goto RetryRetire;
2105
2106 newCounts = counts;
2107 newCounts.NumRetired--;
2108
2109 oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts);
2110 if (oldCounts == counts)
2111 {
2112 counts = newCounts;
2113 break;
2114 }
2115 counts = oldCounts;
2116 }
2117
2118 FireEtwThreadPoolWorkerThreadRetirementStop(counts.NumActive, counts.NumRetired, GetClrInstanceId());
2119 goto Exit;
2120 }
2121 }
2122
2123WaitForWork:
2124
2125 // It's possible that we decided we had no work just before some work came in,
2126 // but reduced the worker count *after* the work came in. In this case, we might
2127 // miss the notification of available work. So we make a sweep through the ADs here,
2128 // and wake up a thread (maybe this one!) if there is work to do.
2129 if (PerAppDomainTPCountList::AreRequestsPendingInAnyAppDomains())
2130 {
2131 foundWork = true;
2132 MaybeAddWorkingWorker();
2133 }
2134
2135 FireEtwThreadPoolWorkerThreadWait(counts.NumActive, counts.NumRetired, GetClrInstanceId());
2136
2137RetryWaitForWork:
2138 if (WorkerSemaphore->Wait(AppX::IsAppXProcess() ? WorkerTimeoutAppX : WorkerTimeout, WorkerThreadSpinLimit, NumberOfProcessors))
2139 {
2140 foundWork = true;
2141 goto Work;
2142 }
2143
2144 if (!IsIoPending())
2145 {
2146 //
2147 // We timed out, and are about to exit. This puts us in a very similar situation to the
2148 // retirement case above - someone may think we're still waiting, and go ahead and:
2149 //
2150 // 1) Increment NumWorking
2151 // 2) Signal WorkerSemaphore
2152 //
2153 // The solution is much like retirement; when we're decrementing NumActive, we need to make
2154 // sure it doesn't drop below NumWorking. If it would, then we need to go back and wait
2155 // again.
2156 //
2157
2158 DangerousNonHostedSpinLockHolder tal(&ThreadAdjustmentLock);
2159
2160 // counts volatile read paired with CompareExchangeCounts loop set
2161 counts = WorkerCounter.DangerousGetDirtyCounts();
2162 while (true)
2163 {
2164 if (counts.NumActive == counts.NumWorking)
2165 {
2166 goto RetryWaitForWork;
2167 }
2168
2169 newCounts = counts;
2170 newCounts.NumActive--;
2171
2172 // if we timed out while active, then Hill Climbing needs to be told that we need fewer threads
2173 newCounts.MaxWorking = max(MinLimitTotalWorkerThreads, min(newCounts.NumActive, newCounts.MaxWorking));
2174
2175 oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts);
2176
2177 if (oldCounts == counts)
2178 {
2179 HillClimbingInstance.ForceChange(newCounts.MaxWorking, ThreadTimedOut);
2180 goto Exit;
2181 }
2182
2183 counts = oldCounts;
2184 }
2185 }
2186 else
2187 {
2188 goto RetryWaitForWork;
2189 }
2190
2191Exit:
2192
2193#ifdef FEATURE_COMINTEROP
2194 if (pThread) {
2195 pThread->SetApartment(Thread::AS_Unknown, TRUE);
2196 pThread->CoUninitialize();
2197 }
2198
2199 // Couninit the worker thread
2200 if (fCoInited)
2201 {
2202 CoUninitialize();
2203 }
2204#endif
2205
2206 if (pThread) {
2207 pThread->ClearThreadCPUGroupAffinity();
2208
2209 DestroyThread(pThread);
2210 }
2211
2212 _ASSERTE(!IsIoPending());
2213
2214 counts = WorkerCounter.GetCleanCounts();
2215 FireEtwThreadPoolWorkerThreadStop(counts.NumActive, counts.NumRetired, GetClrInstanceId());
2216
2217 return ERROR_SUCCESS;
2218}
2219
2220
2221BOOL ThreadpoolMgr::SuspendProcessing()
2222{
2223 CONTRACTL
2224 {
2225 NOTHROW;
2226 GC_NOTRIGGER;
2227 MODE_PREEMPTIVE;
2228 }
2229 CONTRACTL_END;
2230
2231 BOOL shouldRetire = TRUE;
2232 DWORD sleepInterval = SUSPEND_TIME;
2233 int oldCpuUtilization = cpuUtilization;
2234 for (int i = 0; i < shouldRetire; i++)
2235 {
2236 __SwitchToThread(sleepInterval, CALLER_LIMITS_SPINNING);
2237 if ((cpuUtilization <= (oldCpuUtilization - 4)))
2238 { // if cpu util. dips by 4% or more, then put it back in circulation
2239 shouldRetire = FALSE;
2240 break;
2241 }
2242 }
2243
2244 return shouldRetire;
2245}
2246
2247
2248// this should only be called by unmanaged thread (i.e. there should be no mgd
2249// caller on the stack) since we are swallowing terminal exceptions
2250DWORD ThreadpoolMgr::SafeWait(CLREvent * ev, DWORD sleepTime, BOOL alertable)
2251{
2252 STATIC_CONTRACT_NOTHROW;
2253 STATIC_CONTRACT_GC_NOTRIGGER;
2254 STATIC_CONTRACT_MODE_PREEMPTIVE;
2255 /* cannot use contract because of SEH
2256 CONTRACTL
2257 {
2258 NOTHROW;
2259 GC_NOTRIGGER;
2260 MODE_PREEMPTIVE;
2261 }
2262 CONTRACTL_END;*/
2263
2264 DWORD status = WAIT_TIMEOUT;
2265 EX_TRY
2266 {
2267 status = ev->Wait(sleepTime,FALSE);
2268 }
2269 EX_CATCH
2270 {
2271 }
2272 EX_END_CATCH(SwallowAllExceptions)
2273 return status;
2274}
2275
2276/************************************************************************/
2277
2278BOOL ThreadpoolMgr::RegisterWaitForSingleObject(PHANDLE phNewWaitObject,
2279 HANDLE hWaitObject,
2280 WAITORTIMERCALLBACK Callback,
2281 PVOID Context,
2282 ULONG timeout,
2283 DWORD dwFlag )
2284{
2285 CONTRACTL
2286 {
2287 THROWS;
2288 MODE_ANY;
2289 if (GetThread()) { GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);}
2290 }
2291 CONTRACTL_END;
2292 EnsureInitialized();
2293
2294 ThreadCB* threadCB;
2295 {
2296 CrstHolder csh(&WaitThreadsCriticalSection);
2297
2298 threadCB = FindWaitThread();
2299 }
2300
2301 *phNewWaitObject = NULL;
2302
2303 if (threadCB)
2304 {
2305 WaitInfo* waitInfo = new (nothrow) WaitInfo;
2306
2307 if (waitInfo == NULL)
2308 return FALSE;
2309
2310 waitInfo->waitHandle = hWaitObject;
2311 waitInfo->Callback = Callback;
2312 waitInfo->Context = Context;
2313 waitInfo->timeout = timeout;
2314 waitInfo->flag = dwFlag;
2315 waitInfo->threadCB = threadCB;
2316 waitInfo->state = 0;
2317 waitInfo->refCount = 1; // safe to do this since no wait has yet been queued, so no other thread could be modifying this
2318 waitInfo->ExternalCompletionEvent = INVALID_HANDLE;
2319 waitInfo->ExternalEventSafeHandle = NULL;
2320 waitInfo->handleOwningAD = (ADID) 0;
2321
2322 waitInfo->timer.startTime = GetTickCount();
2323 waitInfo->timer.remainingTime = timeout;
2324
2325 *phNewWaitObject = waitInfo;
2326
2327 // We fire the "enqueue" ETW event here, to "mark" the thread that had called the API, rather than the
2328 // thread that will PostQueuedCompletionStatus (the dedicated WaitThread).
2329 // This event correlates with ThreadPoolIODequeue in ThreadpoolMgr::AsyncCallbackCompletion
2330 if (ETW_EVENT_ENABLED(MICROSOFT_WINDOWS_DOTNETRUNTIME_PROVIDER_Context, ThreadPoolIOEnqueue))
2331 FireEtwThreadPoolIOEnqueue((LPOVERLAPPED)waitInfo, reinterpret_cast<void*>(Callback), (dwFlag & WAIT_SINGLE_EXECUTION) == 0, GetClrInstanceId());
2332
2333 BOOL status = QueueUserAPC((PAPCFUNC)InsertNewWaitForSelf, threadCB->threadHandle, (size_t) waitInfo);
2334
2335 if (status == FALSE)
2336 {
2337 *phNewWaitObject = NULL;
2338 delete waitInfo;
2339 }
2340
2341 return status;
2342 }
2343
2344 return FALSE;
2345}
2346
2347
2348// Returns a wait thread that can accomodate another wait request. The
2349// caller is responsible for synchronizing access to the WaitThreadsHead
2350ThreadpoolMgr::ThreadCB* ThreadpoolMgr::FindWaitThread()
2351{
2352 CONTRACTL
2353 {
2354 THROWS; // CreateWaitThread can throw
2355 MODE_ANY;
2356 GC_TRIGGERS;
2357 }
2358 CONTRACTL_END;
2359 do
2360 {
2361 for (LIST_ENTRY* Node = (LIST_ENTRY*) WaitThreadsHead.Flink ;
2362 Node != &WaitThreadsHead ;
2363 Node = (LIST_ENTRY*)Node->Flink)
2364 {
2365 _ASSERTE(offsetof(WaitThreadInfo,link) == 0);
2366
2367 ThreadCB* threadCB = ((WaitThreadInfo*) Node)->threadCB;
2368
2369 if (threadCB->NumWaitHandles < MAX_WAITHANDLES) // this test and following ...
2370
2371 {
2372 InterlockedIncrement(&threadCB->NumWaitHandles); // ... increment are protected by WaitThreadsCriticalSection.
2373 // but there might be a concurrent decrement in DeactivateWait
2374 // or InsertNewWaitForSelf, hence the interlock
2375 return threadCB;
2376 }
2377 }
2378
2379 // if reached here, there are no wait threads available, so need to create a new one
2380 if (!CreateWaitThread())
2381 return NULL;
2382
2383
2384 // Now loop back
2385 } while (TRUE);
2386
2387}
2388
2389BOOL ThreadpoolMgr::CreateWaitThread()
2390{
2391 CONTRACTL
2392 {
2393 THROWS; // CLREvent::CreateAutoEvent can throw OOM
2394 GC_TRIGGERS;
2395 MODE_ANY;
2396 INJECT_FAULT(COMPlusThrowOM());
2397 }
2398 CONTRACTL_END;
2399 DWORD threadId;
2400
2401 if (g_fEEShutDown & ShutDown_Finalize2){
2402 // The process is shutting down. Shutdown thread has ThreadStore lock,
2403 // wait thread is blocked on the lock.
2404 return FALSE;
2405 }
2406
2407 NewHolder<WaitThreadInfo> waitThreadInfo(new (nothrow) WaitThreadInfo);
2408 if (waitThreadInfo == NULL)
2409 return FALSE;
2410
2411 NewHolder<ThreadCB> threadCB(new (nothrow) ThreadCB);
2412
2413 if (threadCB == NULL)
2414 {
2415 return FALSE;
2416 }
2417
2418 threadCB->startEvent.CreateAutoEvent(FALSE);
2419 HANDLE threadHandle = Thread::CreateUtilityThread(Thread::StackSize_Small, WaitThreadStart, (LPVOID)threadCB, W(".NET ThreadPool Wait"), CREATE_SUSPENDED, &threadId);
2420
2421 if (threadHandle == NULL)
2422 {
2423 threadCB->startEvent.CloseEvent();
2424 return FALSE;
2425 }
2426
2427 waitThreadInfo.SuppressRelease();
2428 threadCB.SuppressRelease();
2429 threadCB->threadHandle = threadHandle;
2430 threadCB->threadId = threadId; // may be useful for debugging otherwise not used
2431 threadCB->NumWaitHandles = 0;
2432 threadCB->NumActiveWaits = 0;
2433 for (int i=0; i< MAX_WAITHANDLES; i++)
2434 {
2435 InitializeListHead(&(threadCB->waitPointer[i]));
2436 }
2437
2438 waitThreadInfo->threadCB = threadCB;
2439
2440 DWORD status = ResumeThread(threadHandle);
2441
2442 {
2443 // We will QueueUserAPC on the newly created thread.
2444 // Let us wait until the thread starts running.
2445 GCX_PREEMP();
2446 DWORD timeout=500;
2447 while (TRUE) {
2448 if (g_fEEShutDown & ShutDown_Finalize2){
2449 // The process is shutting down. Shutdown thread has ThreadStore lock,
2450 // wait thread is blocked on the lock.
2451 return FALSE;
2452 }
2453 DWORD wait_status = threadCB->startEvent.Wait(timeout, FALSE);
2454 if (wait_status == WAIT_OBJECT_0) {
2455 break;
2456 }
2457 }
2458 }
2459 threadCB->startEvent.CloseEvent();
2460
2461 // check to see if setup succeeded
2462 if (threadCB->threadHandle == NULL)
2463 return FALSE;
2464
2465 InsertHeadList(&WaitThreadsHead,&waitThreadInfo->link);
2466
2467 _ASSERTE(status != (DWORD) (-1));
2468
2469 return (status != (DWORD) (-1));
2470
2471}
2472
2473// Executed as an APC on a WaitThread. Add the wait specified in pArg to the list of objects it is waiting on
2474void ThreadpoolMgr::InsertNewWaitForSelf(WaitInfo* pArgs)
2475{
2476 WRAPPER_NO_CONTRACT;
2477 STATIC_CONTRACT_SO_INTOLERANT;
2478
2479 WaitInfo* waitInfo = pArgs;
2480
2481 // the following is safe since only this thread is allowed to change the state
2482 if (!(waitInfo->state & WAIT_DELETE))
2483 {
2484 waitInfo->state = (WAIT_REGISTERED | WAIT_ACTIVE);
2485 }
2486 else
2487 {
2488 // some thread unregistered the wait
2489 DeleteWait(waitInfo);
2490 return;
2491 }
2492
2493
2494 ThreadCB* threadCB = waitInfo->threadCB;
2495
2496 _ASSERTE(threadCB->NumActiveWaits <= threadCB->NumWaitHandles);
2497
2498 int index = FindWaitIndex(threadCB, waitInfo->waitHandle);
2499 _ASSERTE(index >= 0 && index <= threadCB->NumActiveWaits);
2500
2501 if (index == threadCB->NumActiveWaits)
2502 {
2503 threadCB->waitHandle[threadCB->NumActiveWaits] = waitInfo->waitHandle;
2504 threadCB->NumActiveWaits++;
2505 }
2506 else
2507 {
2508 // this is a duplicate waithandle, so the increment in FindWaitThread
2509 // wasn't strictly necessary. This will avoid unnecessary thread creation.
2510 InterlockedDecrement(&threadCB->NumWaitHandles);
2511 }
2512
2513 _ASSERTE(offsetof(WaitInfo, link) == 0);
2514 InsertTailList(&(threadCB->waitPointer[index]), (&waitInfo->link));
2515
2516 return;
2517}
2518
2519// returns the index of the entry that matches waitHandle or next free entry if not found
2520int ThreadpoolMgr::FindWaitIndex(const ThreadCB* threadCB, const HANDLE waitHandle)
2521{
2522 LIMITED_METHOD_CONTRACT;
2523
2524 for (int i=0;i<threadCB->NumActiveWaits; i++)
2525 if (threadCB->waitHandle[i] == waitHandle)
2526 return i;
2527
2528 // else not found
2529 return threadCB->NumActiveWaits;
2530}
2531
2532
2533// if no wraparound that the timer is expired if duetime is less than current time
2534// if wraparound occurred, then the timer expired if dueTime was greater than last time or dueTime is less equal to current time
2535#define TimeExpired(last,now,duetime) (last <= now ? \
2536 (duetime <= now && duetime >= last): \
2537 (duetime >= last || duetime <= now))
2538
2539#define TimeInterval(end,start) ( end > start ? (end - start) : ((0xffffffff - start) + end + 1) )
2540
2541// Returns the minimum of the remaining time to reach a timeout among all the waits
2542DWORD ThreadpoolMgr::MinimumRemainingWait(LIST_ENTRY* waitInfo, unsigned int numWaits)
2543{
2544 LIMITED_METHOD_CONTRACT;
2545
2546 unsigned int min = (unsigned int) -1;
2547 DWORD currentTime = GetTickCount();
2548
2549 for (unsigned i=0; i < numWaits ; i++)
2550 {
2551 WaitInfo* waitInfoPtr = (WaitInfo*) (waitInfo[i].Flink);
2552 PVOID waitInfoHead = &(waitInfo[i]);
2553 do
2554 {
2555 if (waitInfoPtr->timeout != INFINITE)
2556 {
2557 // compute remaining time
2558 DWORD elapsedTime = TimeInterval(currentTime,waitInfoPtr->timer.startTime );
2559
2560 __int64 remainingTime = (__int64) (waitInfoPtr->timeout) - (__int64) elapsedTime;
2561
2562 // update remaining time
2563 waitInfoPtr->timer.remainingTime = remainingTime > 0 ? (int) remainingTime : 0;
2564
2565 // ... and min
2566 if (waitInfoPtr->timer.remainingTime < min)
2567 min = waitInfoPtr->timer.remainingTime;
2568 }
2569
2570 waitInfoPtr = (WaitInfo*) (waitInfoPtr->link.Flink);
2571
2572 } while ((PVOID) waitInfoPtr != waitInfoHead);
2573
2574 }
2575 return min;
2576}
2577
2578#ifdef _MSC_VER
2579#ifdef _WIN64
2580#pragma warning (disable : 4716)
2581#else
2582#pragma warning (disable : 4715)
2583#endif
2584#endif
2585#ifdef _PREFAST_
2586#pragma warning(push)
2587#pragma warning(disable:22008) // "Prefast integer overflow check on (0 + lval) is bogus. Tried local disable without luck, doing whole method."
2588#endif
2589
2590DWORD WINAPI ThreadpoolMgr::WaitThreadStart(LPVOID lpArgs)
2591{
2592 CONTRACTL
2593 {
2594 THROWS;
2595 GC_TRIGGERS;
2596 MODE_PREEMPTIVE;
2597 SO_TOLERANT;
2598 }
2599 CONTRACTL_END;
2600
2601 ClrFlsSetThreadType (ThreadType_Wait);
2602
2603 ThreadCB* threadCB = (ThreadCB*) lpArgs;
2604 Thread* pThread = SetupThreadNoThrow();
2605
2606 if (pThread == NULL)
2607 {
2608 _ASSERTE(threadCB->threadHandle != NULL);
2609 threadCB->threadHandle = NULL;
2610 }
2611
2612 threadCB->startEvent.Set();
2613
2614 if (pThread == NULL)
2615 {
2616 return 0;
2617 }
2618
2619 BEGIN_SO_INTOLERANT_CODE(pThread); // we probe at the top of the thread so we can safely call anything below here.
2620 {
2621 // wait threads never die. (Why?)
2622 for (;;)
2623 {
2624 DWORD status;
2625 DWORD timeout = 0;
2626
2627 if (threadCB->NumActiveWaits == 0)
2628 {
2629
2630#undef SleepEx
2631 // <TODO>@TODO Consider doing a sleep for an idle period and terminating the thread if no activity</TODO>
2632 //We use SleepEx instead of CLRSLeepEx because CLRSleepEx calls into SQL(or other hosts) in hosted
2633 //scenarios. SQL does not deliver APC's, and the waithread wait insertion/deletion logic depends on
2634 //APC's being delivered.
2635 status = SleepEx(INFINITE,TRUE);
2636#define SleepEx(a,b) Dont_Use_SleepEx(a,b)
2637
2638 _ASSERTE(status == WAIT_IO_COMPLETION);
2639 }
2640 else if (IsWaitThreadAPCPending())
2641 {
2642 //Do a sleep if an APC is pending, This was done to solve the corner case where the wait is signaled,
2643 //and APC to deregiter the wait never fires. That scenario leads to an infinite loop. This check would
2644 //allow the thread to enter alertable wait and thus cause the APC to fire.
2645
2646 ResetWaitThreadAPCPending();
2647
2648 //We use SleepEx instead of CLRSLeepEx because CLRSleepEx calls into SQL(or other hosts) in hosted
2649 //scenarios. SQL does not deliver APC's, and the waithread wait insertion/deletion logic depends on
2650 //APC's being delivered.
2651
2652 #undef SleepEx
2653 status = SleepEx(0,TRUE);
2654 #define SleepEx(a,b) Dont_Use_SleepEx(a,b)
2655
2656 continue;
2657 }
2658 else
2659 {
2660 // compute minimum timeout. this call also updates the remainingTime field for each wait
2661 timeout = MinimumRemainingWait(threadCB->waitPointer,threadCB->NumActiveWaits);
2662
2663 status = WaitForMultipleObjectsEx( threadCB->NumActiveWaits,
2664 threadCB->waitHandle,
2665 FALSE, // waitall
2666 timeout,
2667 TRUE ); // alertable
2668
2669 _ASSERTE( (status == WAIT_TIMEOUT) ||
2670 (status == WAIT_IO_COMPLETION) ||
2671 //It could be that there are no waiters at this point,
2672 //as the APC to deregister the wait may have run.
2673 (status == WAIT_OBJECT_0) ||
2674 (status >= WAIT_OBJECT_0 && status < (DWORD)(WAIT_OBJECT_0 + threadCB->NumActiveWaits)) ||
2675 (status == WAIT_FAILED));
2676
2677 //It could be that the last waiter also got deregistered.
2678 if (threadCB->NumActiveWaits == 0)
2679 {
2680 continue;
2681 }
2682 }
2683
2684 if (status == WAIT_IO_COMPLETION)
2685 continue;
2686
2687 if (status == WAIT_TIMEOUT)
2688 {
2689 for (int i=0; i< threadCB->NumActiveWaits; i++)
2690 {
2691 WaitInfo* waitInfo = (WaitInfo*) (threadCB->waitPointer[i]).Flink;
2692 PVOID waitInfoHead = &(threadCB->waitPointer[i]);
2693
2694 do
2695 {
2696 _ASSERTE(waitInfo->timer.remainingTime >= timeout);
2697
2698 WaitInfo* wTemp = (WaitInfo*) waitInfo->link.Flink;
2699
2700 if (waitInfo->timer.remainingTime == timeout)
2701 {
2702 ProcessWaitCompletion(waitInfo,i,TRUE);
2703 }
2704
2705 waitInfo = wTemp;
2706
2707 } while ((PVOID) waitInfo != waitInfoHead);
2708 }
2709 }
2710 else if (status >= WAIT_OBJECT_0 && status < (DWORD)(WAIT_OBJECT_0 + threadCB->NumActiveWaits))
2711 {
2712 unsigned index = status - WAIT_OBJECT_0;
2713 WaitInfo* waitInfo = (WaitInfo*) (threadCB->waitPointer[index]).Flink;
2714 PVOID waitInfoHead = &(threadCB->waitPointer[index]);
2715 BOOL isAutoReset;
2716
2717 // Setting to unconditional TRUE is inefficient since we will re-enter the wait and release
2718 // the next waiter, but short of using undocumented NT apis is the only solution.
2719 // Querying the state with a WaitForSingleObject is not an option as it will reset an
2720 // auto reset event if it has been signalled since the previous wait.
2721 isAutoReset = TRUE;
2722
2723 do
2724 {
2725 WaitInfo* wTemp = (WaitInfo*) waitInfo->link.Flink;
2726 ProcessWaitCompletion(waitInfo,index,FALSE);
2727
2728 waitInfo = wTemp;
2729
2730 } while (((PVOID) waitInfo != waitInfoHead) && !isAutoReset);
2731
2732 // If an app registers a recurring wait for an event that is always signalled (!),
2733 // then no apc's will be executed since the thread never enters the alertable state.
2734 // This can be fixed by doing the following:
2735 // SleepEx(0,TRUE);
2736 // However, it causes an unnecessary context switch. It is not worth penalizing well
2737 // behaved apps to protect poorly written apps.
2738
2739
2740 }
2741 else
2742 {
2743 _ASSERTE(status == WAIT_FAILED);
2744 // wait failed: application error
2745 // find out which wait handle caused the wait to fail
2746 for (int i = 0; i < threadCB->NumActiveWaits; i++)
2747 {
2748 DWORD subRet = WaitForSingleObject(threadCB->waitHandle[i], 0);
2749
2750 if (subRet != WAIT_FAILED)
2751 continue;
2752
2753 // remove all waits associated with this wait handle
2754
2755 WaitInfo* waitInfo = (WaitInfo*) (threadCB->waitPointer[i]).Flink;
2756 PVOID waitInfoHead = &(threadCB->waitPointer[i]);
2757
2758 do
2759 {
2760 WaitInfo* temp = (WaitInfo*) waitInfo->link.Flink;
2761
2762 DeactivateNthWait(waitInfo,i);
2763
2764
2765 // Note, we cannot cleanup here since there is no way to suppress finalization
2766 // we will just leak, and rely on the finalizer to clean up the memory
2767 //if (InterlockedDecrement(&waitInfo->refCount) == 0)
2768 // DeleteWait(waitInfo);
2769
2770
2771 waitInfo = temp;
2772
2773 } while ((PVOID) waitInfo != waitInfoHead);
2774
2775 break;
2776 }
2777 }
2778 }
2779 }
2780 END_SO_INTOLERANT_CODE;
2781
2782 //This is unreachable...so no return required.
2783}
2784#ifdef _PREFAST_
2785#pragma warning(pop)
2786#endif
2787
2788#ifdef _MSC_VER
2789#ifdef _WIN64
2790#pragma warning (default : 4716)
2791#else
2792#pragma warning (default : 4715)
2793#endif
2794#endif
2795
2796void ThreadpoolMgr::ProcessWaitCompletion(WaitInfo* waitInfo,
2797 unsigned index,
2798 BOOL waitTimedOut
2799 )
2800{
2801 STATIC_CONTRACT_THROWS;
2802 STATIC_CONTRACT_GC_TRIGGERS;
2803 STATIC_CONTRACT_MODE_PREEMPTIVE;
2804 /* cannot use contract because of SEH
2805 CONTRACTL
2806 {
2807 THROWS;
2808 GC_TRIGGERS;
2809 MODE_PREEMPTIVE;
2810 }
2811 CONTRACTL_END;*/
2812
2813 AsyncCallback* asyncCallback = NULL;
2814 EX_TRY{
2815 if ( waitInfo->flag & WAIT_SINGLE_EXECUTION)
2816 {
2817 DeactivateNthWait (waitInfo,index) ;
2818 }
2819 else
2820 { // reactivate wait by resetting timer
2821 waitInfo->timer.startTime = GetTickCount();
2822 }
2823
2824 asyncCallback = MakeAsyncCallback();
2825 if (asyncCallback)
2826 {
2827 asyncCallback->wait = waitInfo;
2828 asyncCallback->waitTimedOut = waitTimedOut;
2829
2830 InterlockedIncrement(&waitInfo->refCount);
2831
2832#ifndef FEATURE_PAL
2833 if (FALSE == PostQueuedCompletionStatus((LPOVERLAPPED)asyncCallback, (LPOVERLAPPED_COMPLETION_ROUTINE)WaitIOCompletionCallback))
2834#else // FEATURE_PAL
2835 if (FALSE == QueueUserWorkItem(AsyncCallbackCompletion, asyncCallback, QUEUE_ONLY))
2836#endif // !FEATURE_PAL
2837 ReleaseAsyncCallback(asyncCallback);
2838 }
2839 }
2840 EX_CATCH {
2841 if (asyncCallback)
2842 ReleaseAsyncCallback(asyncCallback);
2843
2844 if (SwallowUnhandledExceptions())
2845 {
2846 // Do nothing to swallow the exception
2847 }
2848 else
2849 {
2850 EX_RETHROW;
2851 }
2852 }
2853 EX_END_CATCH(SwallowAllExceptions);
2854}
2855
2856
2857DWORD WINAPI ThreadpoolMgr::AsyncCallbackCompletion(PVOID pArgs)
2858{
2859 CONTRACTL
2860 {
2861 THROWS;
2862 MODE_PREEMPTIVE;
2863 GC_TRIGGERS;
2864 SO_TOLERANT;
2865 }
2866 CONTRACTL_END;
2867
2868 Thread * pThread = GetThread();
2869
2870 if (pThread == NULL)
2871 {
2872 HRESULT hr = ERROR_SUCCESS;
2873
2874 ClrFlsSetThreadType(ThreadType_Threadpool_Worker);
2875 pThread = SetupThreadNoThrow(&hr);
2876
2877 if (pThread == NULL)
2878 {
2879 return hr;
2880 }
2881 }
2882
2883 BEGIN_SO_INTOLERANT_CODE_NOTHROW(pThread, return ERROR_STACK_OVERFLOW);
2884 {
2885 AsyncCallback * asyncCallback = (AsyncCallback*) pArgs;
2886
2887 WaitInfo * waitInfo = asyncCallback->wait;
2888
2889 AsyncCallbackHolder asyncCBHolder;
2890 asyncCBHolder.Assign(asyncCallback);
2891
2892 // We fire the "dequeue" ETW event here, before executing the user code, to enable correlation with
2893 // the ThreadPoolIOEnqueue fired in ThreadpoolMgr::RegisterWaitForSingleObject
2894 if (ETW_EVENT_ENABLED(MICROSOFT_WINDOWS_DOTNETRUNTIME_PROVIDER_Context, ThreadPoolIODequeue))
2895 FireEtwThreadPoolIODequeue(waitInfo, reinterpret_cast<void*>(waitInfo->Callback), GetClrInstanceId());
2896
2897 // the user callback can throw, the host must be prepared to handle it.
2898 // SQL is ok, since they have a top-level SEH handler. However, there's
2899 // no easy way to verify it
2900
2901 ((WAITORTIMERCALLBACKFUNC) waitInfo->Callback)
2902 ( waitInfo->Context, asyncCallback->waitTimedOut != FALSE);
2903 }
2904 END_SO_INTOLERANT_CODE;
2905
2906 return ERROR_SUCCESS;
2907}
2908
2909void ThreadpoolMgr::DeactivateWait(WaitInfo* waitInfo)
2910{
2911 LIMITED_METHOD_CONTRACT;
2912
2913 ThreadCB* threadCB = waitInfo->threadCB;
2914 DWORD endIndex = threadCB->NumActiveWaits-1;
2915 DWORD index;
2916
2917 for (index = 0; index <= endIndex; index++)
2918 {
2919 LIST_ENTRY* head = &(threadCB->waitPointer[index]);
2920 LIST_ENTRY* current = head;
2921 do {
2922 if (current->Flink == (PVOID) waitInfo)
2923 goto FOUND;
2924
2925 current = (LIST_ENTRY*) current->Flink;
2926
2927 } while (current != head);
2928 }
2929
2930FOUND:
2931 _ASSERTE(index <= endIndex);
2932
2933 DeactivateNthWait(waitInfo, index);
2934}
2935
2936
2937void ThreadpoolMgr::DeactivateNthWait(WaitInfo* waitInfo, DWORD index)
2938{
2939 LIMITED_METHOD_CONTRACT;
2940
2941 ThreadCB* threadCB = waitInfo->threadCB;
2942
2943 if (waitInfo->link.Flink != waitInfo->link.Blink)
2944 {
2945 RemoveEntryList(&(waitInfo->link));
2946 }
2947 else
2948 {
2949
2950 ULONG EndIndex = threadCB->NumActiveWaits -1;
2951
2952 // Move the remaining ActiveWaitArray left.
2953
2954 ShiftWaitArray( threadCB, index+1, index,EndIndex - index ) ;
2955
2956 // repair the blink and flink of the first and last elements in the list
2957 for (unsigned int i = 0; i< EndIndex-index; i++)
2958 {
2959 WaitInfo* firstWaitInfo = (WaitInfo*) threadCB->waitPointer[index+i].Flink;
2960 WaitInfo* lastWaitInfo = (WaitInfo*) threadCB->waitPointer[index+i].Blink;
2961 firstWaitInfo->link.Blink = &(threadCB->waitPointer[index+i]);
2962 lastWaitInfo->link.Flink = &(threadCB->waitPointer[index+i]);
2963 }
2964 // initialize the entry just freed
2965 InitializeListHead(&(threadCB->waitPointer[EndIndex]));
2966
2967 threadCB->NumActiveWaits-- ;
2968 InterlockedDecrement(&threadCB->NumWaitHandles);
2969 }
2970
2971 waitInfo->state &= ~WAIT_ACTIVE ;
2972
2973}
2974
2975void ThreadpoolMgr::DeleteWait(WaitInfo* waitInfo)
2976{
2977 CONTRACTL
2978 {
2979 if (waitInfo->ExternalEventSafeHandle != NULL) { THROWS;} else { NOTHROW; }
2980 MODE_ANY;
2981 if (GetThread()) {GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);}
2982 }
2983 CONTRACTL_END;
2984
2985 if(waitInfo->Context && (waitInfo->flag & WAIT_FREE_CONTEXT)) {
2986 DelegateInfo* pDelegate = (DelegateInfo*) waitInfo->Context;
2987
2988 // Since the delegate release destroys a handle, we need to be in
2989 // co-operative mode
2990 {
2991 GCX_COOP();
2992 pDelegate->Release();
2993 }
2994
2995 RecycleMemory( pDelegate, MEMTYPE_DelegateInfo );
2996 }
2997
2998 if (waitInfo->flag & WAIT_INTERNAL_COMPLETION)
2999 {
3000 waitInfo->InternalCompletionEvent.Set();
3001 return; // waitInfo will be deleted by the thread that's waiting on this event
3002 }
3003 else if (waitInfo->ExternalCompletionEvent != INVALID_HANDLE)
3004 {
3005 UnsafeSetEvent(waitInfo->ExternalCompletionEvent);
3006 }
3007 else if (waitInfo->ExternalEventSafeHandle != NULL)
3008 {
3009 // Release the safe handle and the GC handle holding it
3010 ReleaseWaitInfo(waitInfo);
3011 }
3012
3013 delete waitInfo;
3014
3015
3016}
3017
3018
3019
3020/************************************************************************/
3021BOOL ThreadpoolMgr::UnregisterWaitEx(HANDLE hWaitObject,HANDLE Event)
3022{
3023 CONTRACTL
3024 {
3025 THROWS; //NOTHROW;
3026 if (GetThread()) {GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);}
3027 MODE_ANY;
3028 }
3029 CONTRACTL_END;
3030
3031 _ASSERTE(IsInitialized()); // cannot call unregister before first registering
3032
3033 const BOOL Blocking = (Event == (HANDLE) -1);
3034 WaitInfo* waitInfo = (WaitInfo*) hWaitObject;
3035
3036 if (!hWaitObject)
3037 {
3038 return FALSE;
3039 }
3040
3041 // we do not allow callbacks to run in the wait thread, hence the assert
3042 _ASSERTE(GetCurrentThreadId() != waitInfo->threadCB->threadId);
3043
3044
3045 if (Blocking)
3046 {
3047 waitInfo->InternalCompletionEvent.CreateAutoEvent(FALSE);
3048 waitInfo->flag |= WAIT_INTERNAL_COMPLETION;
3049
3050 }
3051 else
3052 {
3053 waitInfo->ExternalCompletionEvent = (Event ? Event : INVALID_HANDLE);
3054 _ASSERTE((waitInfo->flag & WAIT_INTERNAL_COMPLETION) == 0);
3055 // we still want to block until the wait has been deactivated
3056 waitInfo->PartialCompletionEvent.CreateAutoEvent(FALSE);
3057 }
3058
3059 BOOL status = QueueDeregisterWait(waitInfo->threadCB->threadHandle, waitInfo);
3060
3061
3062 if (status == 0)
3063 {
3064 STRESS_LOG1(LF_THREADPOOL, LL_ERROR, "Queue APC failed in UnregisterWaitEx %x", status);
3065
3066 if (Blocking)
3067 waitInfo->InternalCompletionEvent.CloseEvent();
3068 else
3069 waitInfo->PartialCompletionEvent.CloseEvent();
3070 return FALSE;
3071 }
3072
3073 if (!Blocking)
3074 {
3075 waitInfo->PartialCompletionEvent.Wait(INFINITE,TRUE);
3076 waitInfo->PartialCompletionEvent.CloseEvent();
3077 // we cannot do DeleteWait in DeregisterWait, since the DeleteWait could happen before
3078 // we close the event. So, the code has been moved here.
3079 if (InterlockedDecrement(&waitInfo->refCount) == 0)
3080 {
3081 DeleteWait(waitInfo);
3082 }
3083 }
3084
3085 else // i.e. blocking
3086 {
3087 _ASSERTE(waitInfo->flag & WAIT_INTERNAL_COMPLETION);
3088 _ASSERTE(waitInfo->ExternalEventSafeHandle == NULL);
3089
3090 waitInfo->InternalCompletionEvent.Wait(INFINITE,TRUE);
3091 waitInfo->InternalCompletionEvent.CloseEvent();
3092 delete waitInfo; // if WAIT_INTERNAL_COMPLETION is not set, waitInfo will be deleted in DeleteWait
3093 }
3094 return TRUE;
3095}
3096
3097
3098void ThreadpoolMgr::DeregisterWait(WaitInfo* pArgs)
3099{
3100
3101 WRAPPER_NO_CONTRACT;
3102 STATIC_CONTRACT_SO_INTOLERANT;
3103
3104 WaitInfo* waitInfo = pArgs;
3105
3106 if ( ! (waitInfo->state & WAIT_REGISTERED) )
3107 {
3108 // set state to deleted, so that it does not get registered
3109 waitInfo->state |= WAIT_DELETE ;
3110
3111 // since the wait has not even been registered, we dont need an interlock to decrease the RefCount
3112 waitInfo->refCount--;
3113
3114 if (waitInfo->PartialCompletionEvent.IsValid())
3115 {
3116 waitInfo->PartialCompletionEvent.Set();
3117 }
3118 return;
3119 }
3120
3121 if (waitInfo->state & WAIT_ACTIVE)
3122 {
3123 DeactivateWait(waitInfo);
3124 }
3125
3126 if ( waitInfo->PartialCompletionEvent.IsValid())
3127 {
3128 waitInfo->PartialCompletionEvent.Set();
3129 return; // we cannot delete the wait here since the PartialCompletionEvent
3130 // may not have been closed yet. so, we return and rely on the waiter of PartialCompletionEvent
3131 // to do the close
3132 }
3133
3134 if (InterlockedDecrement(&waitInfo->refCount) == 0)
3135 {
3136 // After we suspend EE during shutdown, a thread may be blocked in WaitForEndOfShutdown in alertable state.
3137 // We don't allow a thread reenter runtime while processing APC or pumping message.
3138 if (!g_fSuspendOnShutdown )
3139 {
3140 DeleteWait(waitInfo);
3141 }
3142 }
3143 return;
3144}
3145
3146
3147/* This gets called in a finalizer thread ONLY IF an app does not deregister the
3148 the wait. Note that just because the registeredWaitHandle is collected by GC
3149 does not mean it is safe to delete the wait. The refcount tells us when it is
3150 safe.
3151*/
3152void ThreadpoolMgr::WaitHandleCleanup(HANDLE hWaitObject)
3153{
3154 LIMITED_METHOD_CONTRACT;
3155
3156 WaitInfo* waitInfo = (WaitInfo*) hWaitObject;
3157 _ASSERTE(waitInfo->refCount > 0);
3158
3159 DWORD result = QueueDeregisterWait(waitInfo->threadCB->threadHandle, waitInfo);
3160
3161 if (result == 0)
3162 STRESS_LOG1(LF_THREADPOOL, LL_ERROR, "Queue APC failed in WaitHandleCleanup %x", result);
3163
3164}
3165
3166BOOL ThreadpoolMgr::CreateGateThread()
3167{
3168 LIMITED_METHOD_CONTRACT;
3169
3170 HANDLE threadHandle = Thread::CreateUtilityThread(Thread::StackSize_Small, GateThreadStart, NULL, W(".NET ThreadPool Gate"));
3171
3172 if (threadHandle)
3173 {
3174 CloseHandle(threadHandle); //we don't need this anymore
3175 return TRUE;
3176 }
3177
3178 return FALSE;
3179}
3180
3181
3182
3183/************************************************************************/
3184
3185BOOL ThreadpoolMgr::BindIoCompletionCallback(HANDLE FileHandle,
3186 LPOVERLAPPED_COMPLETION_ROUTINE Function,
3187 ULONG Flags,
3188 DWORD& errCode)
3189{
3190
3191 CONTRACTL
3192 {
3193 THROWS; // EnsureInitialized can throw
3194 if (GetThread()) { GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);}
3195 MODE_ANY;
3196 }
3197 CONTRACTL_END;
3198
3199#ifndef FEATURE_PAL
3200
3201 errCode = S_OK;
3202
3203 EnsureInitialized();
3204
3205
3206 _ASSERTE(GlobalCompletionPort != NULL);
3207
3208 if (!InitCompletionPortThreadpool)
3209 InitCompletionPortThreadpool = TRUE;
3210
3211 GrowCompletionPortThreadpoolIfNeeded();
3212
3213 HANDLE h = CreateIoCompletionPort(FileHandle,
3214 GlobalCompletionPort,
3215 (ULONG_PTR) Function,
3216 NumberOfProcessors);
3217 if (h == NULL)
3218 {
3219 errCode = GetLastError();
3220 return FALSE;
3221 }
3222
3223 _ASSERTE(h == GlobalCompletionPort);
3224
3225 return TRUE;
3226#else // FEATURE_PAL
3227 SetLastError(ERROR_CALL_NOT_IMPLEMENTED);
3228 return FALSE;
3229#endif // !FEATURE_PAL
3230}
3231
3232#ifndef FEATURE_PAL
3233BOOL ThreadpoolMgr::CreateCompletionPortThread(LPVOID lpArgs)
3234{
3235 CONTRACTL
3236 {
3237 NOTHROW;
3238 if (GetThread()) { GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);}
3239 MODE_ANY;
3240 }
3241 CONTRACTL_END;
3242
3243 Thread *pThread;
3244 BOOL fIsCLRThread;
3245 if ((pThread = CreateUnimpersonatedThread(CompletionPortThreadStart, lpArgs, &fIsCLRThread)) != NULL)
3246 {
3247 LastCPThreadCreation = GetTickCount(); // record this for use by logic to spawn additional threads
3248
3249 if (fIsCLRThread) {
3250 pThread->ChooseThreadCPUGroupAffinity();
3251 pThread->StartThread();
3252 }
3253 else {
3254 DWORD status;
3255 status = ResumeThread((HANDLE)pThread);
3256 _ASSERTE(status != (DWORD) (-1));
3257 CloseHandle((HANDLE)pThread); // we don't need this anymore
3258 }
3259
3260 ThreadCounter::Counts counts = CPThreadCounter.GetCleanCounts();
3261 FireEtwIOThreadCreate_V1(counts.NumActive + counts.NumRetired, counts.NumRetired, GetClrInstanceId());
3262
3263 return TRUE;
3264 }
3265
3266
3267 return FALSE;
3268}
3269
3270DWORD WINAPI ThreadpoolMgr::CompletionPortThreadStart(LPVOID lpArgs)
3271{
3272 ClrFlsSetThreadType (ThreadType_Threadpool_IOCompletion);
3273
3274 CONTRACTL
3275 {
3276 THROWS;
3277 if (GetThread()) { MODE_PREEMPTIVE;} else { DISABLED(MODE_ANY);}
3278 if (GetThread()) { GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);}
3279 SO_INTOLERANT;
3280 }
3281 CONTRACTL_END;
3282
3283 DWORD numBytes=0;
3284 size_t key=0;
3285
3286 LPOVERLAPPED pOverlapped = NULL;
3287 DWORD errorCode;
3288 PIOCompletionContext context;
3289 BOOL fIsCompletionContext;
3290
3291 const DWORD CP_THREAD_WAIT = AppX::IsAppXProcess() ? 5000 : 15000; /* milliseconds */
3292
3293 _ASSERTE(GlobalCompletionPort != NULL);
3294
3295 BOOL fThreadInit = FALSE;
3296 Thread *pThread = NULL;
3297
3298 DWORD cpThreadWait = 0;
3299
3300 if (g_fEEStarted) {
3301 pThread = SetupThreadNoThrow();
3302 if (pThread == NULL) {
3303 return 0;
3304 }
3305
3306 // converted to CLRThread and added to ThreadStore, pick an group affinity for this thread
3307 pThread->ChooseThreadCPUGroupAffinity();
3308
3309 fThreadInit = TRUE;
3310 }
3311
3312#ifdef FEATURE_COMINTEROP
3313 // Threadpool threads should be initialized as MTA. If we are unable to do so,
3314 // return failure.
3315 BOOL fCoInited = FALSE;
3316 {
3317 fCoInited = SUCCEEDED(::CoInitializeEx(NULL, COINIT_MULTITHREADED));
3318 if (!fCoInited)
3319 {
3320 goto Exit;
3321 }
3322 }
3323
3324 if (pThread && pThread->SetApartment(Thread::AS_InMTA, TRUE) != Thread::AS_InMTA)
3325 {
3326 // @todo: should we log the failure
3327 goto Exit;
3328 }
3329#endif // FEATURE_COMINTEROP
3330
3331 ThreadCounter::Counts oldCounts;
3332 ThreadCounter::Counts newCounts;
3333
3334 cpThreadWait = CP_THREAD_WAIT;
3335 for (;; )
3336 {
3337Top:
3338 if (!fThreadInit) {
3339 if (g_fEEStarted) {
3340 pThread = SetupThreadNoThrow();
3341 if (pThread == NULL) {
3342 break;
3343 }
3344
3345 // converted to CLRThread and added to ThreadStore, pick an group affinity for this thread
3346 pThread->ChooseThreadCPUGroupAffinity();
3347
3348#ifdef FEATURE_COMINTEROP
3349 if (pThread->SetApartment(Thread::AS_InMTA, TRUE) != Thread::AS_InMTA)
3350 {
3351 // @todo: should we log the failure
3352 goto Exit;
3353 }
3354#endif // FEATURE_COMINTEROP
3355
3356 fThreadInit = TRUE;
3357 }
3358 }
3359
3360 GCX_PREEMP_NO_DTOR();
3361
3362 //
3363 // We're about to wait on the IOCP; mark ourselves as no longer "working."
3364 //
3365 while (true)
3366 {
3367 ThreadCounter::Counts oldCounts = CPThreadCounter.DangerousGetDirtyCounts();
3368 ThreadCounter::Counts newCounts = oldCounts;
3369 newCounts.NumWorking--;
3370
3371 //
3372 // If we've only got one thread left, it won't be allowed to exit, because we need to keep
3373 // one thread listening for completions. So there's no point in having a timeout; it will
3374 // only use power unnecessarily.
3375 //
3376 cpThreadWait = (newCounts.NumActive == 1) ? INFINITE : CP_THREAD_WAIT;
3377
3378 if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts))
3379 break;
3380 }
3381
3382 errorCode = S_OK;
3383
3384 if (lpArgs == NULL)
3385 {
3386 CONTRACT_VIOLATION(ThrowsViolation);
3387
3388 if (g_fCompletionPortDrainNeeded && pThread)
3389 {
3390 // We have started draining completion port.
3391 // The next job picked up by this thread is going to be after our special marker.
3392 if (!pThread->IsCompletionPortDrained())
3393 {
3394 pThread->MarkCompletionPortDrained();
3395 }
3396 }
3397
3398 context = NULL;
3399 fIsCompletionContext = FALSE;
3400
3401 if (pThread == NULL)
3402 {
3403 pThread = GetThread();
3404 }
3405
3406 if (pThread)
3407 {
3408
3409 context = (PIOCompletionContext) pThread->GetIOCompletionContext();
3410
3411 if (context->lpOverlapped != NULL)
3412 {
3413 errorCode = context->ErrorCode;
3414 numBytes = context->numBytesTransferred;
3415 pOverlapped = context->lpOverlapped;
3416 key = context->key;
3417
3418 context->lpOverlapped = NULL;
3419 fIsCompletionContext = TRUE;
3420 }
3421 }
3422
3423 if((context == NULL) || (!fIsCompletionContext))
3424 {
3425 _ASSERTE (context == NULL || context->lpOverlapped == NULL);
3426
3427 BOOL status = GetQueuedCompletionStatus(
3428 GlobalCompletionPort,
3429 &numBytes,
3430 (PULONG_PTR)&key,
3431 &pOverlapped,
3432 cpThreadWait
3433 );
3434
3435 if (status == 0)
3436 errorCode = GetLastError();
3437 }
3438 }
3439 else
3440 {
3441 QueuedStatus *CompletionStatus = (QueuedStatus*)lpArgs;
3442 numBytes = CompletionStatus->numBytes;
3443 key = (size_t)CompletionStatus->key;
3444 pOverlapped = CompletionStatus->pOverlapped;
3445 errorCode = CompletionStatus->errorCode;
3446 delete CompletionStatus;
3447 lpArgs = NULL; // one-time deal for initial CP packet
3448 }
3449
3450 // We fire IODequeue events whether the IO completion was retrieved in the above call to
3451 // GetQueuedCompletionStatus or during an earlier call (e.g. in GateThreadStart, and passed here in lpArgs,
3452 // or in CompletionPortDispatchWorkWithinAppDomain, and passed here through StoreOverlappedInfoInThread)
3453
3454 // For the purposes of activity correlation we only fire ETW events here, if needed OR if not fired at a higher
3455 // abstraction level (e.g. ThreadpoolMgr::RegisterWaitForSingleObject)
3456 // Note: we still fire the event for managed async IO, despite the fact we don't have a paired IOEnqueue event
3457 // for this case. We do this to "mark" the end of the previous workitem. When we provide full support at the higher
3458 // abstraction level for managed IO we can remove the IODequeues fired here
3459 if (ETW_EVENT_ENABLED(MICROSOFT_WINDOWS_DOTNETRUNTIME_PROVIDER_Context, ThreadPoolIODequeue)
3460 && !AreEtwIOQueueEventsSpeciallyHandled((LPOVERLAPPED_COMPLETION_ROUTINE)key) && pOverlapped != NULL)
3461 {
3462 FireEtwThreadPoolIODequeue(pOverlapped, OverlappedDataObject::GetOverlappedForTracing(pOverlapped), GetClrInstanceId());
3463 }
3464
3465 bool enterRetirement;
3466
3467 while (true)
3468 {
3469 //
3470 // When we reach this point, this thread is "active" but not "working." Depending on the result of the call to GetQueuedCompletionStatus,
3471 // and the state of the rest of the IOCP threads, we need to figure out whether to de-activate (exit) this thread, retire this thread,
3472 // or transition to "working."
3473 //
3474
3475 // counts volatile read paired with CompareExchangeCounts loop set
3476 oldCounts = CPThreadCounter.DangerousGetDirtyCounts();
3477 newCounts = oldCounts;
3478 enterRetirement = false;
3479
3480 if (errorCode == WAIT_TIMEOUT)
3481 {
3482 //
3483 // We timed out, and are going to try to exit or retire.
3484 //
3485 newCounts.NumActive--;
3486
3487 //
3488 // We need at least one free thread, or we have no way of knowing if completions are being queued.
3489 //
3490 if (newCounts.NumWorking == newCounts.NumActive)
3491 {
3492 newCounts = oldCounts;
3493 newCounts.NumWorking++; //not really working, but we'll decremented it at the top
3494 if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts))
3495 goto Top;
3496 else
3497 continue;
3498 }
3499
3500 //
3501 // We can't exit a thread that has pending I/O - we'll "retire" it instead.
3502 //
3503 if (IsIoPending())
3504 {
3505 enterRetirement = true;
3506 newCounts.NumRetired++;
3507 }
3508 }
3509 else
3510 {
3511 //
3512 // We have work to do
3513 //
3514 newCounts.NumWorking++;
3515 }
3516
3517 if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts))
3518 break;
3519 }
3520
3521 if (errorCode == WAIT_TIMEOUT)
3522 {
3523 if (!enterRetirement)
3524 {
3525 goto Exit;
3526 }
3527 else
3528 {
3529 // now in "retired mode" waiting for pending io to complete
3530 FireEtwIOThreadRetire_V1(newCounts.NumActive + newCounts.NumRetired, newCounts.NumRetired, GetClrInstanceId());
3531
3532 for (;;)
3533 {
3534#ifndef FEATURE_PAL
3535 if (g_fCompletionPortDrainNeeded && pThread)
3536 {
3537 // The thread is not going to process IO job now.
3538 if (!pThread->IsCompletionPortDrained())
3539 {
3540 pThread->MarkCompletionPortDrained();
3541 }
3542 }
3543#endif // !FEATURE_PAL
3544
3545 DWORD status = SafeWait(RetiredCPWakeupEvent,CP_THREAD_PENDINGIO_WAIT,FALSE);
3546 _ASSERTE(status == WAIT_TIMEOUT || status == WAIT_OBJECT_0);
3547
3548 if (status == WAIT_TIMEOUT)
3549 {
3550 if (IsIoPending())
3551 {
3552 continue;
3553 }
3554 else
3555 {
3556 // We can now exit; decrement the retired count.
3557 while (true)
3558 {
3559 // counts volatile read paired with CompareExchangeCounts loop set
3560 oldCounts = CPThreadCounter.DangerousGetDirtyCounts();
3561 newCounts = oldCounts;
3562 newCounts.NumRetired--;
3563 if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts))
3564 break;
3565 }
3566 goto Exit;
3567 }
3568 }
3569 else
3570 {
3571 // put back into rotation -- we need a thread
3572 while (true)
3573 {
3574 // counts volatile read paired with CompareExchangeCounts loop set
3575 oldCounts = CPThreadCounter.DangerousGetDirtyCounts();
3576 newCounts = oldCounts;
3577 newCounts.NumRetired--;
3578 newCounts.NumActive++;
3579 newCounts.NumWorking++; //we're not really working, but we'll decrement this before waiting for work.
3580 if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts))
3581 break;
3582 }
3583 FireEtwIOThreadUnretire_V1(newCounts.NumActive + newCounts.NumRetired, newCounts.NumRetired, GetClrInstanceId());
3584 goto Top;
3585 }
3586 }
3587 }
3588 }
3589
3590 // we should not reach this point unless we have work to do
3591 _ASSERTE(errorCode != WAIT_TIMEOUT && !enterRetirement);
3592
3593 // if we have no more free threads, start the gate thread
3594 if (newCounts.NumWorking >= newCounts.NumActive)
3595 EnsureGateThreadRunning();
3596
3597
3598 // We can not assert here. If stdin/stdout/stderr of child process are redirected based on
3599 // async io, GetQueuedCompletionStatus returns when child process operates on its stdin/stdout/stderr.
3600 // Parent process does not issue any ReadFile/WriteFile, and hence pOverlapped is going to be NULL.
3601 //_ASSERTE(pOverlapped != NULL);
3602
3603 if (pOverlapped != NULL)
3604 {
3605 _ASSERTE(key != 0); // should be a valid function address
3606
3607 if (key != 0)
3608 {
3609 if (GCHeapUtilities::IsGCInProgress(TRUE))
3610 {
3611 //Indicate that this thread is free, and waiting on GC, not doing any user work.
3612 //This helps in threads not getting injected when some threads have woken up from the
3613 //GC event, and some have not.
3614 while (true)
3615 {
3616 // counts volatile read paired with CompareExchangeCounts loop set
3617 oldCounts = CPThreadCounter.DangerousGetDirtyCounts();
3618 newCounts = oldCounts;
3619 newCounts.NumWorking--;
3620 if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts))
3621 break;
3622 }
3623
3624 // GC is imminent, so wait until GC is complete before executing next request.
3625 // this reduces in-flight objects allocated right before GC, easing the GC's work
3626 GCHeapUtilities::WaitForGCCompletion(TRUE);
3627
3628 while (true)
3629 {
3630 // counts volatile read paired with CompareExchangeCounts loop set
3631 oldCounts = CPThreadCounter.DangerousGetDirtyCounts();
3632 newCounts = oldCounts;
3633 newCounts.NumWorking++;
3634 if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts))
3635 break;
3636 }
3637
3638 if (newCounts.NumWorking >= newCounts.NumActive)
3639 EnsureGateThreadRunning();
3640 }
3641 else
3642 {
3643 GrowCompletionPortThreadpoolIfNeeded();
3644 }
3645
3646 {
3647 CONTRACT_VIOLATION(ThrowsViolation);
3648
3649 ThreadLocaleHolder localeHolder;
3650
3651 ((LPOVERLAPPED_COMPLETION_ROUTINE) key)(errorCode, numBytes, pOverlapped);
3652 }
3653
3654 if (pThread == NULL) {
3655 pThread = GetThread();
3656 }
3657 if (pThread) {
3658 if (pThread->IsAbortRequested())
3659 pThread->EEResetAbort(Thread::TAR_ALL);
3660 pThread->InternalReset();
3661 }
3662 }
3663 else
3664 {
3665 // Application bug - can't do much, just ignore it
3666 }
3667
3668 }
3669
3670 } // for (;;)
3671
3672Exit:
3673
3674 oldCounts = CPThreadCounter.GetCleanCounts();
3675
3676 // we should never destroy or retire all IOCP threads, because then we won't have any threads to notice incoming completions.
3677 _ASSERTE(oldCounts.NumActive > 0);
3678
3679 FireEtwIOThreadTerminate_V1(oldCounts.NumActive + oldCounts.NumRetired, oldCounts.NumRetired, GetClrInstanceId());
3680
3681#ifdef FEATURE_COMINTEROP
3682 if (pThread) {
3683 pThread->SetApartment(Thread::AS_Unknown, TRUE);
3684 pThread->CoUninitialize();
3685 }
3686 // Couninit the worker thread
3687 if (fCoInited)
3688 {
3689 CoUninitialize();
3690 }
3691#endif
3692
3693 if (pThread) {
3694 pThread->ClearThreadCPUGroupAffinity();
3695
3696 DestroyThread(pThread);
3697 }
3698
3699 return 0;
3700}
3701
3702LPOVERLAPPED ThreadpoolMgr::CompletionPortDispatchWorkWithinAppDomain(
3703 Thread* pThread,
3704 DWORD* pErrorCode,
3705 DWORD* pNumBytes,
3706 size_t* pKey,
3707 DWORD adid)
3708//
3709//This function is called just after dispatching the previous BindIO callback
3710//to Managed code. This is a perf optimization to do a quick call to
3711//GetQueuedCompletionStatus with a timeout of 0 ms. If there is work in the
3712//same appdomain, dispatch it back immediately. If not stick it in a well known
3713//place, and reenter the target domain. The timeout of zero is chosen so as to
3714//not delay appdomain unloads.
3715//
3716{
3717 STATIC_CONTRACT_THROWS;
3718 STATIC_CONTRACT_GC_NOTRIGGER;
3719 STATIC_CONTRACT_MODE_ANY;
3720 STATIC_CONTRACT_SO_TOLERANT;
3721
3722 LPOVERLAPPED lpOverlapped=NULL;
3723
3724 BOOL status=FALSE;
3725 OVERLAPPEDDATAREF overlapped=NULL;
3726 BOOL ManagedCallback=FALSE;
3727
3728 *pErrorCode = S_OK;
3729
3730
3731 //Very Very Important!
3732 //Do not change the timeout for GetQueuedCompletionStatus to a non-zero value.
3733 //Selecting a non-zero value can cause the thread to block, and lead to expensive context switches.
3734 //In real life scenarios, we have noticed a packet to be not availabe immediately, but very shortly
3735 //(after few 100's of instructions), and falling back to the VM is good in that case as compared to
3736 //taking a context switch. Changing the timeout to non-zero can lead to perf degrades, that are very
3737 //hard to diagnose.
3738
3739 status = ::GetQueuedCompletionStatus(
3740 GlobalCompletionPort,
3741 pNumBytes,
3742 (PULONG_PTR)pKey,
3743 &lpOverlapped,
3744 0);
3745
3746 DWORD lastError = GetLastError();
3747
3748 if (status == 0)
3749 {
3750 if (lpOverlapped != NULL)
3751 {
3752 *pErrorCode = lastError;
3753 }
3754 else
3755 {
3756 return NULL;
3757 }
3758 }
3759
3760 if (((LPOVERLAPPED_COMPLETION_ROUTINE) *pKey) != BindIoCompletionCallbackStub)
3761 {
3762 //_ASSERTE(FALSE);
3763 }
3764 else
3765 {
3766 ManagedCallback = TRUE;
3767 overlapped = ObjectToOVERLAPPEDDATAREF(OverlappedDataObject::GetOverlapped(lpOverlapped));
3768 }
3769
3770 if (ManagedCallback)
3771 {
3772 _ASSERTE(*pKey != 0); // should be a valid function address
3773
3774 if (*pKey ==0)
3775 {
3776 //Application Bug.
3777 return NULL;
3778 }
3779 }
3780 else
3781 {
3782 //Just retruned back from managed code, a Thread structure should exist.
3783 _ASSERTE (pThread);
3784
3785 //Oops, this is an overlapped fom a different appdomain. STick it in
3786 //the thread. We will process it later.
3787
3788 StoreOverlappedInfoInThread(pThread, *pErrorCode, *pNumBytes, *pKey, lpOverlapped);
3789
3790 lpOverlapped = NULL;
3791 }
3792
3793#ifndef DACCESS_COMPILE
3794 return lpOverlapped;
3795#endif
3796}
3797
3798void ThreadpoolMgr::StoreOverlappedInfoInThread(Thread* pThread, DWORD dwErrorCode, DWORD dwNumBytes, size_t key, LPOVERLAPPED lpOverlapped)
3799{
3800 STATIC_CONTRACT_NOTHROW;
3801 STATIC_CONTRACT_GC_NOTRIGGER;
3802 STATIC_CONTRACT_MODE_ANY;
3803 STATIC_CONTRACT_SO_TOLERANT;
3804
3805 _ASSERTE(pThread);
3806
3807 PIOCompletionContext context;
3808
3809 context = (PIOCompletionContext) pThread->GetIOCompletionContext();
3810
3811 _ASSERTE(context);
3812
3813 context->ErrorCode = dwErrorCode;
3814 context->numBytesTransferred = dwNumBytes;
3815 context->lpOverlapped = lpOverlapped;
3816 context->key = key;
3817}
3818
3819BOOL ThreadpoolMgr::ShouldGrowCompletionPortThreadpool(ThreadCounter::Counts counts)
3820{
3821 CONTRACTL
3822 {
3823 GC_NOTRIGGER;
3824 NOTHROW;
3825 MODE_ANY;
3826 SO_TOLERANT;
3827 }
3828 CONTRACTL_END;
3829
3830 if (counts.NumWorking >= counts.NumActive
3831 && NumCPInfrastructureThreads == 0
3832 && (counts.NumActive == 0 || !GCHeapUtilities::IsGCInProgress(TRUE))
3833 )
3834 {
3835 // adjust limit if neeeded
3836 if (counts.NumRetired == 0)
3837 {
3838 if (counts.NumActive + counts.NumRetired < MaxLimitTotalCPThreads &&
3839 (counts.NumActive < MinLimitTotalCPThreads || cpuUtilization < CpuUtilizationLow))
3840 {
3841 // add one more check to make sure that we haven't fired off a new
3842 // thread since the last time time we checked the cpu utilization.
3843 // However, don't bother if we haven't reached the MinLimit (2*number of cpus)
3844 if ((counts.NumActive < MinLimitTotalCPThreads) ||
3845 SufficientDelaySinceLastSample(LastCPThreadCreation,counts.NumActive))
3846 {
3847 return TRUE;
3848 }
3849 }
3850 }
3851
3852 if (counts.NumRetired > 0)
3853 return TRUE;
3854 }
3855 return FALSE;
3856}
3857
3858void ThreadpoolMgr::GrowCompletionPortThreadpoolIfNeeded()
3859{
3860 CONTRACTL
3861 {
3862 if (GetThread()) { GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);}
3863 NOTHROW;
3864 MODE_ANY;
3865 }
3866 CONTRACTL_END;
3867
3868 ThreadCounter::Counts oldCounts, newCounts;
3869 while (true)
3870 {
3871 oldCounts = CPThreadCounter.GetCleanCounts();
3872 newCounts = oldCounts;
3873
3874 if(!ShouldGrowCompletionPortThreadpool(oldCounts))
3875 {
3876 break;
3877 }
3878 else
3879 {
3880 if (oldCounts.NumRetired > 0)
3881 {
3882 // wakeup retired thread instead
3883 RetiredCPWakeupEvent->Set();
3884 return;
3885 }
3886 else
3887 {
3888 // create a new thread. New IOCP threads start as "active" and "working"
3889 newCounts.NumActive++;
3890 newCounts.NumWorking++;
3891 if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts))
3892 {
3893 if (!CreateCompletionPortThread(NULL))
3894 {
3895 // if thread creation failed, we have to adjust the counts back down.
3896 while (true)
3897 {
3898 // counts volatile read paired with CompareExchangeCounts loop set
3899 oldCounts = CPThreadCounter.DangerousGetDirtyCounts();
3900 newCounts = oldCounts;
3901 newCounts.NumActive--;
3902 newCounts.NumWorking--;
3903 if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts))
3904 break;
3905 }
3906 }
3907 return;
3908 }
3909 }
3910 }
3911 }
3912}
3913#endif // !FEATURE_PAL
3914
3915// Returns true if there is pending io on the thread.
3916BOOL ThreadpoolMgr::IsIoPending()
3917{
3918 CONTRACTL
3919 {
3920 NOTHROW;
3921 MODE_ANY;
3922 GC_NOTRIGGER;
3923 }
3924 CONTRACTL_END;
3925
3926#ifndef FEATURE_PAL
3927 int Status;
3928 ULONG IsIoPending;
3929
3930 if (g_pufnNtQueryInformationThread)
3931 {
3932 Status =(int) (*g_pufnNtQueryInformationThread)(GetCurrentThread(),
3933 ThreadIsIoPending,
3934 &IsIoPending,
3935 sizeof(IsIoPending),
3936 NULL);
3937
3938
3939 if ((Status < 0) || IsIoPending)
3940 return TRUE;
3941 else
3942 return FALSE;
3943 }
3944 return TRUE;
3945#else
3946 return FALSE;
3947#endif // !FEATURE_PAL
3948}
3949
3950#ifndef FEATURE_PAL
3951
3952#ifdef _WIN64
3953#pragma warning (disable : 4716)
3954#else
3955#pragma warning (disable : 4715)
3956#endif
3957
3958int ThreadpoolMgr::GetCPUBusyTime_NT(PROCESS_CPU_INFORMATION* pOldInfo)
3959{
3960 LIMITED_METHOD_CONTRACT;
3961
3962 PROCESS_CPU_INFORMATION newUsage;
3963 newUsage.idleTime.QuadPart = 0;
3964 newUsage.kernelTime.QuadPart = 0;
3965 newUsage.userTime.QuadPart = 0;
3966
3967 if (CPUGroupInfo::CanEnableGCCPUGroups() && CPUGroupInfo::CanEnableThreadUseAllCpuGroups())
3968 {
3969#if !defined(FEATURE_REDHAWK) && !defined(FEATURE_PAL)
3970 FILETIME newIdleTime, newKernelTime, newUserTime;
3971
3972 CPUGroupInfo::GetSystemTimes(&newIdleTime, &newKernelTime, &newUserTime);
3973 newUsage.idleTime.u.LowPart = newIdleTime.dwLowDateTime;
3974 newUsage.idleTime.u.HighPart = newIdleTime.dwHighDateTime;
3975 newUsage.kernelTime.u.LowPart = newKernelTime.dwLowDateTime;
3976 newUsage.kernelTime.u.HighPart = newKernelTime.dwHighDateTime;
3977 newUsage.userTime.u.LowPart = newUserTime.dwLowDateTime;
3978 newUsage.userTime.u.HighPart = newUserTime.dwHighDateTime;
3979#endif
3980 }
3981 else
3982 {
3983 (*g_pufnNtQuerySystemInformation)(SystemProcessorPerformanceInformation,
3984 pOldInfo->usageBuffer,
3985 pOldInfo->usageBufferSize,
3986 NULL);
3987
3988 SYSTEM_PROCESSOR_PERFORMANCE_INFORMATION* pInfoArray = pOldInfo->usageBuffer;
3989 DWORD_PTR pmask = pOldInfo->affinityMask;
3990
3991 int proc_no = 0;
3992 while (pmask)
3993 {
3994 if (pmask & 1)
3995 { //should be good: 1CPU 28823 years, 256CPUs 100+years
3996 newUsage.idleTime.QuadPart += pInfoArray[proc_no].IdleTime.QuadPart;
3997 newUsage.kernelTime.QuadPart += pInfoArray[proc_no].KernelTime.QuadPart;
3998 newUsage.userTime.QuadPart += pInfoArray[proc_no].UserTime.QuadPart;
3999 }
4000
4001 pmask >>=1;
4002 proc_no++;
4003 }
4004 }
4005
4006 __int64 cpuTotalTime, cpuBusyTime;
4007
4008 cpuTotalTime = (newUsage.userTime.QuadPart - pOldInfo->userTime.QuadPart) +
4009 (newUsage.kernelTime.QuadPart - pOldInfo->kernelTime.QuadPart);
4010 cpuBusyTime = cpuTotalTime -
4011 (newUsage.idleTime.QuadPart - pOldInfo->idleTime.QuadPart);
4012
4013 // Preserve reading
4014 pOldInfo->idleTime = newUsage.idleTime;
4015 pOldInfo->kernelTime = newUsage.kernelTime;
4016 pOldInfo->userTime = newUsage.userTime;
4017
4018 __int64 reading = 0;
4019
4020 if (cpuTotalTime > 0)
4021 reading = ((cpuBusyTime * 100) / cpuTotalTime);
4022
4023 _ASSERTE(FitsIn<int>(reading));
4024 return (int)reading;
4025}
4026
4027#else // !FEATURE_PAL
4028
4029int ThreadpoolMgr::GetCPUBusyTime_NT(PAL_IOCP_CPU_INFORMATION* pOldInfo)
4030{
4031 return PAL_GetCPUBusyTime(pOldInfo);
4032}
4033
4034#endif // !FEATURE_PAL
4035
4036//
4037// A timer that ticks every GATE_THREAD_DELAY milliseconds.
4038// On platforms that support it, we use a coalescable waitable timer object.
4039// For other platforms, we use Sleep, via __SwitchToThread.
4040//
4041class GateThreadTimer
4042{
4043#ifndef FEATURE_PAL
4044 HANDLE m_hTimer;
4045
4046public:
4047 GateThreadTimer()
4048 : m_hTimer(NULL)
4049 {
4050 CONTRACTL
4051 {
4052 NOTHROW;
4053 MODE_PREEMPTIVE;
4054 }
4055 CONTRACTL_END;
4056
4057 if (g_pufnCreateWaitableTimerEx && g_pufnSetWaitableTimerEx)
4058 {
4059 m_hTimer = g_pufnCreateWaitableTimerEx(NULL, NULL, 0, TIMER_ALL_ACCESS);
4060 if (m_hTimer)
4061 {
4062 //
4063 // Set the timer to fire GATE_THREAD_DELAY milliseconds from now, then every GATE_THREAD_DELAY milliseconds thereafter.
4064 // We also set the tolerance to GET_THREAD_DELAY_TOLERANCE, allowing the OS to coalesce this timer.
4065 //
4066 LARGE_INTEGER dueTime;
4067 dueTime.QuadPart = MILLI_TO_100NANO(-(LONGLONG)GATE_THREAD_DELAY); //negative value indicates relative time
4068 if (!g_pufnSetWaitableTimerEx(m_hTimer, &dueTime, GATE_THREAD_DELAY, NULL, NULL, NULL, GATE_THREAD_DELAY_TOLERANCE))
4069 {
4070 CloseHandle(m_hTimer);
4071 m_hTimer = NULL;
4072 }
4073 }
4074 }
4075 }
4076
4077 ~GateThreadTimer()
4078 {
4079 CONTRACTL
4080 {
4081 NOTHROW;
4082 MODE_PREEMPTIVE;
4083 }
4084 CONTRACTL_END;
4085
4086 if (m_hTimer)
4087 {
4088 CloseHandle(m_hTimer);
4089 m_hTimer = NULL;
4090 }
4091 }
4092
4093#endif // !FEATURE_PAL
4094
4095public:
4096 void Wait()
4097 {
4098 CONTRACTL
4099 {
4100 NOTHROW;
4101 MODE_PREEMPTIVE;
4102 }
4103 CONTRACTL_END;
4104
4105#ifndef FEATURE_PAL
4106 if (m_hTimer)
4107 WaitForSingleObject(m_hTimer, INFINITE);
4108 else
4109#endif // !FEATURE_PAL
4110 __SwitchToThread(GATE_THREAD_DELAY, CALLER_LIMITS_SPINNING);
4111 }
4112};
4113
4114
4115DWORD WINAPI ThreadpoolMgr::GateThreadStart(LPVOID lpArgs)
4116{
4117 ClrFlsSetThreadType (ThreadType_Gate);
4118
4119 CONTRACTL
4120 {
4121 NOTHROW;
4122 GC_TRIGGERS;
4123 MODE_PREEMPTIVE;
4124 SO_INTOLERANT;
4125 }
4126 CONTRACTL_END;
4127
4128 _ASSERTE(GateThreadStatus == GATE_THREAD_STATUS_REQUESTED);
4129
4130 GateThreadTimer timer;
4131
4132 // TODO: do we need to do this?
4133 timer.Wait(); // delay getting initial CPU reading
4134
4135#ifndef FEATURE_PAL
4136 PROCESS_CPU_INFORMATION prevCPUInfo;
4137
4138 if (!g_pufnNtQuerySystemInformation)
4139 {
4140 _ASSERT(!"NtQuerySystemInformation API not available!");
4141 return 0;
4142 }
4143
4144 //GateThread can start before EESetup, so ensure CPU group information is initialized;
4145 CPUGroupInfo::EnsureInitialized();
4146
4147 // initialize CPU usage information structure;
4148 prevCPUInfo.idleTime.QuadPart = 0;
4149 prevCPUInfo.kernelTime.QuadPart = 0;
4150 prevCPUInfo.userTime.QuadPart = 0;
4151
4152 PREFIX_ASSUME(NumberOfProcessors < 65536);
4153 prevCPUInfo.numberOfProcessors = NumberOfProcessors;
4154
4155 /* In following cases, affinity mask can be zero
4156 * 1. hosted, the hosted process already uses multiple cpu groups.
4157 * thus, during CLR initialization, GetCurrentProcessCpuCount() returns 64, and GC threads
4158 * are created to fill up the initial CPU group. ==> use g_SystemInfo.dwNumberOfProcessors
4159 * 2. GCCpuGroups=1, CLR creates GC threads for all processors in all CPU groups
4160 * thus, the threadpool thread would use a whole CPU group (if Thread_UseAllCpuGroups is not set).
4161 * ==> use g_SystemInfo.dwNumberOfProcessors.
4162 * 3. !defined(FEATURE_PAL) but defined(FEATURE_CORESYSTEM), GetCurrentProcessCpuCount()
4163 * returns g_SystemInfo.dwNumberOfProcessors ==> use g_SystemInfo.dwNumberOfProcessors;
4164 * Other cases:
4165 * 1. Normal case: the mask is all or a subset of all processors in a CPU group;
4166 * 2. GCCpuGroups=1 && Thread_UseAllCpuGroups = 1, the mask is not used
4167 */
4168 prevCPUInfo.affinityMask = GetCurrentProcessCpuMask();
4169 if (prevCPUInfo.affinityMask == 0)
4170 { // create a mask that has g_SystemInfo.dwNumberOfProcessors;
4171 DWORD_PTR mask = 0, maskpos = 1;
4172 for (unsigned int i=0; i < g_SystemInfo.dwNumberOfProcessors; i++)
4173 {
4174 mask |= maskpos;
4175 maskpos <<= 1;
4176 }
4177 prevCPUInfo.affinityMask = mask;
4178 }
4179
4180 // in some cases GetCurrentProcessCpuCount() returns a number larger than
4181 // g_SystemInfo.dwNumberOfProcessor when there are CPU groups, use the larger
4182 // one to create buffer. This buffer must be cleared with 0's to get correct
4183 // CPU usage statistics
4184 int elementsNeeded = NumberOfProcessors > g_SystemInfo.dwNumberOfProcessors ?
4185 NumberOfProcessors : g_SystemInfo.dwNumberOfProcessors;
4186 if (!ClrSafeInt<int>::multiply(elementsNeeded, sizeof(SYSTEM_PROCESSOR_PERFORMANCE_INFORMATION),
4187 prevCPUInfo.usageBufferSize))
4188 return 0;
4189
4190 prevCPUInfo.usageBuffer = (SYSTEM_PROCESSOR_PERFORMANCE_INFORMATION *)alloca(prevCPUInfo.usageBufferSize);
4191 if (prevCPUInfo.usageBuffer == NULL)
4192 return 0;
4193
4194 memset((void *)prevCPUInfo.usageBuffer, 0, prevCPUInfo.usageBufferSize); //must clear it with 0s
4195
4196 GetCPUBusyTime_NT(&prevCPUInfo);
4197#else // !FEATURE_PAL
4198 PAL_IOCP_CPU_INFORMATION prevCPUInfo;
4199 GetCPUBusyTime_NT(&prevCPUInfo); // ignore return value the first time
4200#endif // !FEATURE_PAL
4201
4202 BOOL IgnoreNextSample = FALSE;
4203
4204 do
4205 {
4206 timer.Wait();
4207
4208 if(CLRConfig::GetConfigValue(CLRConfig::INTERNAL_ThreadPool_EnableWorkerTracking))
4209 FireEtwThreadPoolWorkingThreadCount(TakeMaxWorkingThreadCount(), GetClrInstanceId());
4210
4211#ifdef DEBUGGING_SUPPORTED
4212 // if we are stopped at a debug breakpoint, go back to sleep
4213 if (CORDebuggerAttached() && g_pDebugInterface->IsStopped())
4214 continue;
4215#endif // DEBUGGING_SUPPORTED
4216
4217 if(g_IsPaused)
4218 {
4219 _ASSERTE(g_ClrResumeEvent.IsValid());
4220 EX_TRY {
4221 g_ClrResumeEvent.Wait(INFINITE, TRUE);
4222 }
4223 EX_CATCH {
4224 // Assert on debug builds
4225 _ASSERTE(FALSE);
4226 }
4227 EX_END_CATCH(SwallowAllExceptions);
4228 }
4229
4230 if (!GCHeapUtilities::IsGCInProgress(FALSE) )
4231 {
4232 if (IgnoreNextSample)
4233 {
4234 IgnoreNextSample = FALSE;
4235 int cpuUtilizationTemp = GetCPUBusyTime_NT(&prevCPUInfo); // updates prevCPUInfo as side effect
4236 // don't artificially drive down average if cpu is high
4237 if (cpuUtilizationTemp <= CpuUtilizationLow)
4238 cpuUtilization = CpuUtilizationLow + 1;
4239 else
4240 cpuUtilization = cpuUtilizationTemp;
4241 }
4242 else
4243 {
4244 cpuUtilization = GetCPUBusyTime_NT(&prevCPUInfo); // updates prevCPUInfo as side effect
4245 }
4246 }
4247 else
4248 {
4249 int cpuUtilizationTemp = GetCPUBusyTime_NT(&prevCPUInfo); // updates prevCPUInfo as side effect
4250 // don't artificially drive down average if cpu is high
4251 if (cpuUtilizationTemp <= CpuUtilizationLow)
4252 cpuUtilization = CpuUtilizationLow + 1;
4253 else
4254 cpuUtilization = cpuUtilizationTemp;
4255 IgnoreNextSample = TRUE;
4256 }
4257
4258#ifndef FEATURE_PAL
4259 // don't mess with CP thread pool settings if not initialized yet
4260 if (InitCompletionPortThreadpool)
4261 {
4262 ThreadCounter::Counts oldCounts, newCounts;
4263 oldCounts = CPThreadCounter.GetCleanCounts();
4264
4265 if (oldCounts.NumActive == oldCounts.NumWorking &&
4266 oldCounts.NumRetired == 0 &&
4267 oldCounts.NumActive < MaxLimitTotalCPThreads &&
4268 !g_fCompletionPortDrainNeeded &&
4269 NumCPInfrastructureThreads == 0 && // infrastructure threads count as "to be free as needed"
4270 !GCHeapUtilities::IsGCInProgress(TRUE))
4271
4272 {
4273 BOOL status;
4274 DWORD numBytes;
4275 size_t key;
4276 LPOVERLAPPED pOverlapped;
4277 DWORD errorCode;
4278
4279 errorCode = S_OK;
4280
4281 status = GetQueuedCompletionStatus(
4282 GlobalCompletionPort,
4283 &numBytes,
4284 (PULONG_PTR)&key,
4285 &pOverlapped,
4286 0 // immediate return
4287 );
4288
4289 if (status == 0)
4290 {
4291 errorCode = GetLastError();
4292 }
4293
4294 if(pOverlapped == &overlappedForContinueCleanup)
4295 {
4296 // if we picked up a "Continue Drainage" notification DO NOT create a new CP thread
4297 }
4298 else
4299 if (errorCode != WAIT_TIMEOUT)
4300 {
4301 QueuedStatus *CompletionStatus = NULL;
4302
4303 // loop, retrying until memory is allocated. Under such conditions the gate
4304 // thread is not useful anyway, so I feel comfortable with this behavior
4305 do
4306 {
4307 // make sure to free mem later in thread
4308 CompletionStatus = new (nothrow) QueuedStatus;
4309 if (CompletionStatus == NULL)
4310 {
4311 __SwitchToThread(GATE_THREAD_DELAY, CALLER_LIMITS_SPINNING);
4312 }
4313 }
4314 while (CompletionStatus == NULL);
4315
4316 CompletionStatus->numBytes = numBytes;
4317 CompletionStatus->key = (PULONG_PTR)key;
4318 CompletionStatus->pOverlapped = pOverlapped;
4319 CompletionStatus->errorCode = errorCode;
4320
4321 // IOCP threads are created as "active" and "working"
4322 while (true)
4323 {
4324 // counts volatile read paired with CompareExchangeCounts loop set
4325 oldCounts = CPThreadCounter.DangerousGetDirtyCounts();
4326 newCounts = oldCounts;
4327 newCounts.NumActive++;
4328 newCounts.NumWorking++;
4329 if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts))
4330 break;
4331 }
4332
4333 // loop, retrying until thread is created.
4334 while (!CreateCompletionPortThread((LPVOID)CompletionStatus))
4335 {
4336 __SwitchToThread(GATE_THREAD_DELAY, CALLER_LIMITS_SPINNING);
4337 }
4338 }
4339 }
4340 else if (cpuUtilization < CpuUtilizationLow)
4341 {
4342 // this could be an indication that threads might be getting blocked or there is no work
4343 if (oldCounts.NumWorking == oldCounts.NumActive && // don't bump the limit if there are already free threads
4344 oldCounts.NumRetired > 0)
4345 {
4346 RetiredCPWakeupEvent->Set();
4347 }
4348 }
4349 }
4350#endif // !FEATURE_PAL
4351
4352 if (0 == CLRConfig::GetConfigValue(CLRConfig::INTERNAL_ThreadPool_DisableStarvationDetection))
4353 {
4354 if (PerAppDomainTPCountList::AreRequestsPendingInAnyAppDomains() && SufficientDelaySinceLastDequeue())
4355 {
4356 DangerousNonHostedSpinLockHolder tal(&ThreadAdjustmentLock);
4357
4358 ThreadCounter::Counts counts = WorkerCounter.GetCleanCounts();
4359 while (counts.NumActive < MaxLimitTotalWorkerThreads && //don't add a thread if we're at the max
4360 counts.NumActive >= counts.MaxWorking) //don't add a thread if we're already in the process of adding threads
4361 {
4362 bool breakIntoDebugger = (0 != CLRConfig::GetConfigValue(CLRConfig::INTERNAL_ThreadPool_DebugBreakOnWorkerStarvation));
4363 if (breakIntoDebugger)
4364 {
4365 OutputDebugStringW(W("The CLR ThreadPool detected work queue starvation!"));
4366 DebugBreak();
4367 }
4368
4369 ThreadCounter::Counts newCounts = counts;
4370 newCounts.MaxWorking = newCounts.NumActive + 1;
4371
4372 ThreadCounter::Counts oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts);
4373 if (oldCounts == counts)
4374 {
4375 HillClimbingInstance.ForceChange(newCounts.MaxWorking, Starvation);
4376 MaybeAddWorkingWorker();
4377 break;
4378 }
4379 else
4380 {
4381 counts = oldCounts;
4382 }
4383 }
4384 }
4385 }
4386 }
4387 while (ShouldGateThreadKeepRunning());
4388
4389 return 0;
4390}
4391
4392// called by logic to spawn a new completion port thread.
4393// return false if not enough time has elapsed since the last
4394// time we sampled the cpu utilization.
4395BOOL ThreadpoolMgr::SufficientDelaySinceLastSample(unsigned int LastThreadCreationTime,
4396 unsigned NumThreads, // total number of threads of that type (worker or CP)
4397 double throttleRate // the delay is increased by this percentage for each extra thread
4398 )
4399{
4400 LIMITED_METHOD_CONTRACT;
4401
4402 unsigned dwCurrentTickCount = GetTickCount();
4403
4404 unsigned delaySinceLastThreadCreation = dwCurrentTickCount - LastThreadCreationTime;
4405
4406 unsigned minWaitBetweenThreadCreation = GATE_THREAD_DELAY;
4407
4408 if (throttleRate > 0.0)
4409 {
4410 _ASSERTE(throttleRate <= 1.0);
4411
4412 unsigned adjustedThreadCount = NumThreads > NumberOfProcessors ? (NumThreads - NumberOfProcessors) : 0;
4413
4414 minWaitBetweenThreadCreation = (unsigned) (GATE_THREAD_DELAY * pow((1.0 + throttleRate),(double)adjustedThreadCount));
4415 }
4416 // the amount of time to wait should grow up as the number of threads is increased
4417
4418 return (delaySinceLastThreadCreation > minWaitBetweenThreadCreation);
4419
4420}
4421
4422
4423// called by logic to spawn new worker threads, return true if it's been too long
4424// since the last dequeue operation - takes number of worker threads into account
4425// in deciding "too long"
4426BOOL ThreadpoolMgr::SufficientDelaySinceLastDequeue()
4427{
4428 LIMITED_METHOD_CONTRACT;
4429
4430 #define DEQUEUE_DELAY_THRESHOLD (GATE_THREAD_DELAY * 2)
4431
4432 unsigned delay = GetTickCount() - VolatileLoad(&LastDequeueTime);
4433 unsigned tooLong;
4434
4435 if(cpuUtilization < CpuUtilizationLow)
4436 {
4437 tooLong = GATE_THREAD_DELAY;
4438 }
4439 else
4440 {
4441 ThreadCounter::Counts counts = WorkerCounter.GetCleanCounts();
4442 unsigned numThreads = counts.MaxWorking;
4443 tooLong = numThreads * DEQUEUE_DELAY_THRESHOLD;
4444 }
4445
4446 return (delay > tooLong);
4447
4448}
4449
4450
4451#ifdef _MSC_VER
4452#ifdef _WIN64
4453#pragma warning (default : 4716)
4454#else
4455#pragma warning (default : 4715)
4456#endif
4457#endif
4458
4459/************************************************************************/
4460
4461struct CreateTimerThreadParams {
4462 CLREvent event;
4463 BOOL setupSucceeded;
4464};
4465
4466BOOL ThreadpoolMgr::CreateTimerQueueTimer(PHANDLE phNewTimer,
4467 WAITORTIMERCALLBACK Callback,
4468 PVOID Parameter,
4469 DWORD DueTime,
4470 DWORD Period,
4471 ULONG Flag)
4472{
4473 CONTRACTL
4474 {
4475 THROWS; // EnsureInitialized, CreateAutoEvent can throw
4476 if (GetThread()) {GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);} // There can be calls thru ICorThreadpool
4477 MODE_ANY;
4478 INJECT_FAULT(COMPlusThrowOM());
4479 }
4480 CONTRACTL_END;
4481
4482 EnsureInitialized();
4483
4484 // For now we use just one timer thread. Consider using multiple timer threads if
4485 // number of timers in the queue exceeds a certain threshold. The logic and code
4486 // would be similar to the one for creating wait threads.
4487 if (NULL == TimerThread)
4488 {
4489 CrstHolder csh(&TimerQueueCriticalSection);
4490
4491 // check again
4492 if (NULL == TimerThread)
4493 {
4494 CreateTimerThreadParams params;
4495 params.event.CreateAutoEvent(FALSE);
4496
4497 params.setupSucceeded = FALSE;
4498
4499 HANDLE TimerThreadHandle = Thread::CreateUtilityThread(Thread::StackSize_Small, TimerThreadStart, &params, W(".NET Timer"));
4500
4501 if (TimerThreadHandle == NULL)
4502 {
4503 params.event.CloseEvent();
4504 ThrowOutOfMemory();
4505 }
4506
4507 {
4508 GCX_PREEMP();
4509 for(;;)
4510 {
4511 // if a host throws because it couldnt allocate another thread,
4512 // just retry the wait.
4513 if (SafeWait(&params.event,INFINITE, FALSE) != WAIT_TIMEOUT)
4514 break;
4515 }
4516 }
4517 params.event.CloseEvent();
4518
4519 if (!params.setupSucceeded)
4520 {
4521 CloseHandle(TimerThreadHandle);
4522 return FALSE;
4523 }
4524
4525 TimerThread = TimerThreadHandle;
4526 }
4527
4528 }
4529
4530
4531 NewHolder<TimerInfo> timerInfoHolder;
4532 TimerInfo * timerInfo = new (nothrow) TimerInfo;
4533 *phNewTimer = (HANDLE) timerInfo;
4534
4535 if (NULL == timerInfo)
4536 ThrowOutOfMemory();
4537
4538 timerInfoHolder.Assign(timerInfo);
4539
4540 timerInfo->FiringTime = DueTime;
4541 timerInfo->Function = Callback;
4542 timerInfo->Context = Parameter;
4543 timerInfo->Period = Period;
4544 timerInfo->state = 0;
4545 timerInfo->flag = Flag;
4546 timerInfo->ExternalCompletionEvent = INVALID_HANDLE;
4547 timerInfo->ExternalEventSafeHandle = NULL;
4548 timerInfo->handleOwningAD = (ADID) 0;
4549
4550 BOOL status = QueueUserAPC((PAPCFUNC)InsertNewTimer,TimerThread,(size_t)timerInfo);
4551 if (FALSE == status)
4552 {
4553 return FALSE;
4554 }
4555
4556 timerInfoHolder.SuppressRelease();
4557 return TRUE;
4558}
4559
4560#ifdef _MSC_VER
4561#ifdef _WIN64
4562#pragma warning (disable : 4716)
4563#else
4564#pragma warning (disable : 4715)
4565#endif
4566#endif
4567DWORD WINAPI ThreadpoolMgr::TimerThreadStart(LPVOID p)
4568{
4569 ClrFlsSetThreadType (ThreadType_Timer);
4570
4571 STATIC_CONTRACT_THROWS;
4572 STATIC_CONTRACT_GC_TRIGGERS; // due to SetApartment
4573 STATIC_CONTRACT_MODE_PREEMPTIVE;
4574 STATIC_CONTRACT_SO_INTOLERANT;
4575 /* cannot use contract because of SEH
4576 CONTRACTL
4577 {
4578 NOTHROW;
4579 GC_NOTRIGGER;
4580 MODE_PREEMPTIVE;
4581 }
4582 CONTRACTL_END;*/
4583
4584 CreateTimerThreadParams* params = (CreateTimerThreadParams*)p;
4585
4586 Thread* pThread = SetupThreadNoThrow();
4587
4588 params->setupSucceeded = (pThread == NULL) ? 0 : 1;
4589 params->event.Set();
4590
4591 if (pThread == NULL)
4592 return 0;
4593
4594 pTimerThread = pThread;
4595 // Timer threads never die
4596
4597 LastTickCount = GetTickCount();
4598
4599#ifdef FEATURE_COMINTEROP
4600 if (pThread->SetApartment(Thread::AS_InMTA, TRUE) != Thread::AS_InMTA)
4601 {
4602 // @todo: should we log the failure
4603 goto Exit;
4604 }
4605#endif // FEATURE_COMINTEROP
4606
4607 for (;;)
4608 {
4609 // moved to its own function since EX_TRY consumes stack
4610#ifdef _MSC_VER
4611#pragma inline_depth (0) // the function containing EX_TRY can't be inlined here
4612#endif
4613 TimerThreadFire();
4614#ifdef _MSC_VER
4615#pragma inline_depth (20)
4616#endif
4617 }
4618
4619#ifdef FEATURE_COMINTEROP
4620// unreachable code
4621// if (pThread) {
4622// pThread->SetApartment(Thread::AS_Unknown, TRUE);
4623// }
4624Exit:
4625
4626 // @todo: replace with host provided ExitThread
4627 return 0;
4628#endif
4629}
4630
4631void ThreadpoolMgr::TimerThreadFire()
4632{
4633 CONTRACTL
4634 {
4635 THROWS;
4636 GC_TRIGGERS;
4637 MODE_PREEMPTIVE;
4638 }
4639 CONTRACTL_END;
4640
4641 EX_TRY {
4642 DWORD timeout = FireTimers();
4643
4644#undef SleepEx
4645 SleepEx(timeout, TRUE);
4646#define SleepEx(a,b) Dont_Use_SleepEx(a,b)
4647
4648 // the thread could wake up either because an APC completed or the sleep timeout
4649 // in both case, we need to sweep the timer queue, firing timers, and readjusting
4650 // the next firing time
4651
4652 }
4653 EX_CATCH {
4654 // Assert on debug builds since a dead timer thread is a fatal error
4655 _ASSERTE(FALSE);
4656 if (SwallowUnhandledExceptions())
4657 {
4658 // Do nothing to swallow the exception
4659 }
4660 else
4661 {
4662 EX_RETHROW;
4663 }
4664 }
4665 EX_END_CATCH(SwallowAllExceptions);
4666}
4667
4668#ifdef _MSC_VER
4669#ifdef _WIN64
4670#pragma warning (default : 4716)
4671#else
4672#pragma warning (default : 4715)
4673#endif
4674#endif
4675
4676// Executed as an APC in timer thread
4677void ThreadpoolMgr::InsertNewTimer(TimerInfo* pArg)
4678{
4679 CONTRACTL
4680 {
4681 NOTHROW;
4682 GC_TRIGGERS;
4683 MODE_ANY;
4684 }
4685 CONTRACTL_END;
4686 STATIC_CONTRACT_SO_INTOLERANT;
4687
4688 _ASSERTE(pArg);
4689 TimerInfo * timerInfo = pArg;
4690
4691 if (timerInfo->state & TIMER_DELETE)
4692 { // timer was deleted before it could be registered
4693 DeleteTimer(timerInfo);
4694 return;
4695 }
4696
4697 // set the firing time = current time + due time (note initially firing time = due time)
4698 DWORD currentTime = GetTickCount();
4699 if (timerInfo->FiringTime == (ULONG) -1)
4700 {
4701 timerInfo->state = TIMER_REGISTERED;
4702 timerInfo->refCount = 1;
4703
4704 }
4705 else
4706 {
4707 timerInfo->FiringTime += currentTime;
4708
4709 timerInfo->state = (TIMER_REGISTERED | TIMER_ACTIVE);
4710 timerInfo->refCount = 1;
4711
4712 // insert the timer in the queue
4713 InsertTailList(&TimerQueue,(&timerInfo->link));
4714 }
4715
4716 return;
4717}
4718
4719
4720// executed by the Timer thread
4721// sweeps through the list of timers, readjusting the firing times, queueing APCs for
4722// those that have expired, and returns the next firing time interval
4723DWORD ThreadpoolMgr::FireTimers()
4724{
4725 CONTRACTL
4726 {
4727 THROWS; // QueueUserWorkItem can throw
4728 if (GetThread()) { GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);}
4729 if (GetThread()) { MODE_PREEMPTIVE;} else { DISABLED(MODE_ANY);}
4730 }
4731 CONTRACTL_END;
4732
4733 DWORD currentTime = GetTickCount();
4734 DWORD nextFiringInterval = (DWORD) -1;
4735 TimerInfo* timerInfo = NULL;
4736
4737 EX_TRY
4738 {
4739 for (LIST_ENTRY* node = (LIST_ENTRY*) TimerQueue.Flink;
4740 node != &TimerQueue;
4741 )
4742 {
4743 timerInfo = (TimerInfo*) node;
4744 node = (LIST_ENTRY*) node->Flink;
4745
4746 if (TimeExpired(LastTickCount, currentTime, timerInfo->FiringTime))
4747 {
4748 if (timerInfo->Period == 0 || timerInfo->Period == (ULONG) -1)
4749 {
4750 DeactivateTimer(timerInfo);
4751 }
4752
4753 InterlockedIncrement(&timerInfo->refCount);
4754
4755 QueueUserWorkItem(AsyncTimerCallbackCompletion,
4756 timerInfo,
4757 QUEUE_ONLY /* TimerInfo take care of deleting*/);
4758
4759 if (timerInfo->Period != 0 && timerInfo->Period != (ULONG)-1)
4760 {
4761 ULONG nextFiringTime = timerInfo->FiringTime + timerInfo->Period;
4762 DWORD firingInterval;
4763 if (TimeExpired(timerInfo->FiringTime, currentTime, nextFiringTime))
4764 {
4765 // Enough time has elapsed to fire the timer yet again. The timer is not able to keep up with the short
4766 // period, have it fire 1 ms from now to avoid spinning without a delay.
4767 timerInfo->FiringTime = currentTime + 1;
4768 firingInterval = 1;
4769 }
4770 else
4771 {
4772 timerInfo->FiringTime = nextFiringTime;
4773 firingInterval = TimeInterval(nextFiringTime, currentTime);
4774 }
4775
4776 if (firingInterval < nextFiringInterval)
4777 nextFiringInterval = firingInterval;
4778 }
4779 }
4780 else
4781 {
4782 DWORD firingInterval = TimeInterval(timerInfo->FiringTime, currentTime);
4783 if (firingInterval < nextFiringInterval)
4784 nextFiringInterval = firingInterval;
4785 }
4786 }
4787 }
4788 EX_CATCH
4789 {
4790 // If QueueUserWorkItem throws OOM, swallow the exception and retry on
4791 // the next call to FireTimers(), otherwise retrhow.
4792 Exception *ex = GET_EXCEPTION();
4793 // undo the call to DeactivateTimer()
4794 InterlockedDecrement(&timerInfo->refCount);
4795 timerInfo->state = timerInfo->state & TIMER_ACTIVE;
4796 InsertTailList(&TimerQueue, (&timerInfo->link));
4797 if (ex->GetHR() != E_OUTOFMEMORY)
4798 {
4799 EX_RETHROW;
4800 }
4801 }
4802 EX_END_CATCH(RethrowTerminalExceptions);
4803
4804 LastTickCount = currentTime;
4805
4806 return nextFiringInterval;
4807}
4808
4809DWORD WINAPI ThreadpoolMgr::AsyncTimerCallbackCompletion(PVOID pArgs)
4810{
4811 CONTRACTL
4812 {
4813 THROWS;
4814 GC_TRIGGERS;
4815 MODE_PREEMPTIVE;
4816 SO_TOLERANT;
4817 }
4818 CONTRACTL_END;
4819
4820 Thread* pThread = GetThread();
4821
4822 if (pThread == NULL)
4823 {
4824 HRESULT hr = ERROR_SUCCESS;
4825
4826 ClrFlsSetThreadType(ThreadType_Threadpool_Worker);
4827 pThread = SetupThreadNoThrow(&hr);
4828
4829 if (pThread == NULL)
4830 {
4831 return hr;
4832 }
4833 }
4834
4835 BEGIN_SO_INTOLERANT_CODE(pThread);
4836 {
4837 TimerInfo* timerInfo = (TimerInfo*) pArgs;
4838 ((WAITORTIMERCALLBACKFUNC) timerInfo->Function) (timerInfo->Context, TRUE) ;
4839
4840 if (InterlockedDecrement(&timerInfo->refCount) == 0)
4841 {
4842 DeleteTimer(timerInfo);
4843 }
4844 }
4845 END_SO_INTOLERANT_CODE;
4846
4847 return ERROR_SUCCESS;
4848}
4849
4850
4851// removes the timer from the timer queue, thereby cancelling it
4852// there may still be pending callbacks that haven't completed
4853void ThreadpoolMgr::DeactivateTimer(TimerInfo* timerInfo)
4854{
4855 LIMITED_METHOD_CONTRACT;
4856
4857 RemoveEntryList((LIST_ENTRY*) timerInfo);
4858
4859 // This timer info could go into another linked list of timer infos
4860 // waiting to be released. Reinitialize the list pointers
4861 InitializeListHead(&timerInfo->link);
4862 timerInfo->state = timerInfo->state & ~TIMER_ACTIVE;
4863}
4864
4865DWORD WINAPI ThreadpoolMgr::AsyncDeleteTimer(PVOID pArgs)
4866{
4867 CONTRACTL
4868 {
4869 THROWS;
4870 MODE_PREEMPTIVE;
4871 GC_TRIGGERS;
4872 }
4873 CONTRACTL_END;
4874
4875 Thread * pThread = GetThread();
4876
4877 if (pThread == NULL)
4878 {
4879 HRESULT hr = ERROR_SUCCESS;
4880
4881 ClrFlsSetThreadType(ThreadType_Threadpool_Worker);
4882 pThread = SetupThreadNoThrow(&hr);
4883
4884 if (pThread == NULL)
4885 {
4886 return hr;
4887 }
4888 }
4889
4890 DeleteTimer((TimerInfo*) pArgs);
4891
4892 return ERROR_SUCCESS;
4893}
4894
4895void ThreadpoolMgr::DeleteTimer(TimerInfo* timerInfo)
4896{
4897 CONTRACTL
4898 {
4899 if (GetThread() == pTimerThread) { NOTHROW; } else { THROWS; }
4900 GC_TRIGGERS;
4901 MODE_ANY;
4902 }
4903 CONTRACTL_END;
4904
4905 _ASSERTE((timerInfo->state & TIMER_ACTIVE) == 0);
4906
4907 _ASSERTE(!(timerInfo->flag & WAIT_FREE_CONTEXT));
4908
4909 if (timerInfo->flag & WAIT_INTERNAL_COMPLETION)
4910 {
4911 timerInfo->InternalCompletionEvent.Set();
4912 return; // the timerInfo will be deleted by the thread that's waiting on InternalCompletionEvent
4913 }
4914
4915 // ExternalCompletionEvent comes from Host, ExternalEventSafeHandle from managed code.
4916 // They are mutually exclusive.
4917 _ASSERTE(!(timerInfo->ExternalCompletionEvent != INVALID_HANDLE &&
4918 timerInfo->ExternalEventSafeHandle != NULL));
4919
4920 if (timerInfo->ExternalCompletionEvent != INVALID_HANDLE)
4921 {
4922 UnsafeSetEvent(timerInfo->ExternalCompletionEvent);
4923 timerInfo->ExternalCompletionEvent = INVALID_HANDLE;
4924 }
4925
4926 // We cannot block the timer thread, so some cleanup is deferred to other threads.
4927 if (GetThread() == pTimerThread)
4928 {
4929 // Notify the ExternalEventSafeHandle with an user work item
4930 if (timerInfo->ExternalEventSafeHandle != NULL)
4931 {
4932 BOOL success = FALSE;
4933 EX_TRY
4934 {
4935 if (QueueUserWorkItem(AsyncDeleteTimer,
4936 timerInfo,
4937 QUEUE_ONLY) != FALSE)
4938 {
4939 success = TRUE;
4940 }
4941 }
4942 EX_CATCH
4943 {
4944 }
4945 EX_END_CATCH(SwallowAllExceptions);
4946
4947 // If unable to queue a user work item, fall back to queueing timer for release
4948 // which will happen *sometime* in the future.
4949 if (success == FALSE)
4950 {
4951 QueueTimerInfoForRelease(timerInfo);
4952 }
4953
4954 return;
4955 }
4956
4957 // Releasing GC handles can block. So we wont do this on the timer thread.
4958 // We'll put it in a list which will be processed by a worker thread
4959 if (timerInfo->Context != NULL)
4960 {
4961 QueueTimerInfoForRelease(timerInfo);
4962 return;
4963 }
4964 }
4965
4966 // To get here we are either not the Timer thread or there is no blocking work to be done
4967
4968 if (timerInfo->Context != NULL)
4969 {
4970 GCX_COOP();
4971 delete (ThreadpoolMgr::TimerInfoContext*)timerInfo->Context;
4972 }
4973
4974 if (timerInfo->ExternalEventSafeHandle != NULL)
4975 {
4976 ReleaseTimerInfo(timerInfo);
4977 }
4978
4979 delete timerInfo;
4980
4981}
4982
4983// We add TimerInfos from deleted timers into a linked list.
4984// A worker thread will later release the handles held by the TimerInfo
4985// and recycle them if possible.
4986void ThreadpoolMgr::QueueTimerInfoForRelease(TimerInfo *pTimerInfo)
4987{
4988 CONTRACTL
4989 {
4990 NOTHROW;
4991 GC_NOTRIGGER;
4992 MODE_ANY;
4993 }
4994 CONTRACTL_END;
4995
4996 // The synchronization in this method depends on the fact that
4997 // - There is only one timer thread
4998 // - The one and only timer thread is executing this method.
4999 // - This function wont go into an alertable state. That could trigger another APC.
5000 // Else two threads can be queueing timerinfos and a race could
5001 // lead to leaked memory and handles
5002 _ASSERTE(GetThread());
5003 _ASSERTE(pTimerThread == GetThread());
5004 TimerInfo *pHead = NULL;
5005
5006 // Make sure this timer info has been deactivated and removed from any other lists
5007 _ASSERTE((pTimerInfo->state & TIMER_ACTIVE) == 0);
5008 //_ASSERTE(pTimerInfo->link.Blink == &(pTimerInfo->link) &&
5009 // pTimerInfo->link.Flink == &(pTimerInfo->link));
5010 // Make sure "link" is the first field in TimerInfo
5011 _ASSERTE(pTimerInfo == (PVOID)&pTimerInfo->link);
5012
5013 // Grab any previously published list
5014 if ((pHead = InterlockedExchangeT(&TimerInfosToBeRecycled, NULL)) != NULL)
5015 {
5016 // If there already is a list, just append
5017 InsertTailList((LIST_ENTRY *)pHead, &pTimerInfo->link);
5018 pTimerInfo = pHead;
5019 }
5020 else
5021 // If this is the head, make its next and previous ptrs point to itself
5022 InitializeListHead((LIST_ENTRY*)&pTimerInfo->link);
5023
5024 // Publish the list
5025 (void) InterlockedExchangeT(&TimerInfosToBeRecycled, pTimerInfo);
5026
5027}
5028
5029void ThreadpoolMgr::FlushQueueOfTimerInfos()
5030{
5031 CONTRACTL
5032 {
5033 THROWS;
5034 GC_TRIGGERS;
5035 MODE_ANY;
5036 }
5037 CONTRACTL_END;
5038
5039 TimerInfo *pHeadTimerInfo = NULL, *pCurrTimerInfo = NULL;
5040 LIST_ENTRY *pNextInfo = NULL;
5041
5042 if ((pHeadTimerInfo = InterlockedExchangeT(&TimerInfosToBeRecycled, NULL)) == NULL)
5043 return;
5044
5045 do
5046 {
5047 RemoveHeadList((LIST_ENTRY *)pHeadTimerInfo, pNextInfo);
5048 _ASSERTE(pNextInfo != NULL);
5049
5050 pCurrTimerInfo = (TimerInfo *) pNextInfo;
5051
5052 GCX_COOP();
5053 if (pCurrTimerInfo->Context != NULL)
5054 {
5055 delete (ThreadpoolMgr::TimerInfoContext*)pCurrTimerInfo->Context;
5056 }
5057
5058 if (pCurrTimerInfo->ExternalEventSafeHandle != NULL)
5059 {
5060 ReleaseTimerInfo(pCurrTimerInfo);
5061 }
5062
5063 delete pCurrTimerInfo;
5064
5065 }
5066 while ((TimerInfo *)pNextInfo != pHeadTimerInfo);
5067}
5068
5069/************************************************************************/
5070BOOL ThreadpoolMgr::ChangeTimerQueueTimer(
5071 HANDLE Timer,
5072 ULONG DueTime,
5073 ULONG Period)
5074{
5075 CONTRACTL
5076 {
5077 THROWS;
5078 MODE_ANY;
5079 GC_NOTRIGGER;
5080 INJECT_FAULT(COMPlusThrowOM());
5081 }
5082 CONTRACTL_END;
5083
5084 _ASSERTE(IsInitialized());
5085 _ASSERTE(Timer); // not possible to give invalid handle in managed code
5086
5087 NewHolder<TimerUpdateInfo> updateInfoHolder;
5088 TimerUpdateInfo *updateInfo = new TimerUpdateInfo;
5089 updateInfoHolder.Assign(updateInfo);
5090
5091 updateInfo->Timer = (TimerInfo*) Timer;
5092 updateInfo->DueTime = DueTime;
5093 updateInfo->Period = Period;
5094
5095 BOOL status = QueueUserAPC((PAPCFUNC)UpdateTimer,
5096 TimerThread,
5097 (size_t) updateInfo);
5098
5099 if (status)
5100 updateInfoHolder.SuppressRelease();
5101
5102 return(status);
5103}
5104
5105void ThreadpoolMgr::UpdateTimer(TimerUpdateInfo* pArgs)
5106{
5107 CONTRACTL
5108 {
5109 NOTHROW;
5110 GC_NOTRIGGER;
5111 MODE_ANY;
5112 }
5113 CONTRACTL_END;
5114
5115 TimerUpdateInfo* updateInfo = (TimerUpdateInfo*) pArgs;
5116 TimerInfo* timerInfo = updateInfo->Timer;
5117
5118 timerInfo->Period = updateInfo->Period;
5119
5120 if (updateInfo->DueTime == (ULONG) -1)
5121 {
5122 if (timerInfo->state & TIMER_ACTIVE)
5123 {
5124 DeactivateTimer(timerInfo);
5125 }
5126 // else, noop (the timer was already inactive)
5127 _ASSERTE((timerInfo->state & TIMER_ACTIVE) == 0);
5128
5129 delete updateInfo;
5130 return;
5131 }
5132
5133 DWORD currentTime = GetTickCount();
5134 timerInfo->FiringTime = currentTime + updateInfo->DueTime;
5135
5136 delete updateInfo;
5137
5138 if (! (timerInfo->state & TIMER_ACTIVE))
5139 {
5140 // timer not active (probably a one shot timer that has expired), so activate it
5141 timerInfo->state |= TIMER_ACTIVE;
5142 _ASSERTE(timerInfo->refCount >= 1);
5143 // insert the timer in the queue
5144 InsertTailList(&TimerQueue,(&timerInfo->link));
5145
5146 }
5147
5148 return;
5149}
5150
5151/************************************************************************/
5152BOOL ThreadpoolMgr::DeleteTimerQueueTimer(
5153 HANDLE Timer,
5154 HANDLE Event)
5155{
5156 CONTRACTL
5157 {
5158 THROWS;
5159 MODE_ANY;
5160 GC_TRIGGERS;
5161 }
5162 CONTRACTL_END;
5163
5164 _ASSERTE(IsInitialized()); // cannot call delete before creating timer
5165 _ASSERTE(Timer); // not possible to give invalid handle in managed code
5166
5167 // make volatile to avoid compiler reordering check after async call.
5168 // otherwise, DeregisterTimer could delete timerInfo before the comparison.
5169 VolatilePtr<TimerInfo> timerInfo = (TimerInfo*) Timer;
5170
5171 if (Event == (HANDLE) -1)
5172 {
5173 //CONTRACT_VIOLATION(ThrowsViolation);
5174 timerInfo->InternalCompletionEvent.CreateAutoEvent(FALSE);
5175 timerInfo->flag |= WAIT_INTERNAL_COMPLETION;
5176 }
5177 else if (Event)
5178 {
5179 timerInfo->ExternalCompletionEvent = Event;
5180 }
5181#ifdef _DEBUG
5182 else /* Event == NULL */
5183 {
5184 _ASSERTE(timerInfo->ExternalCompletionEvent == INVALID_HANDLE);
5185 }
5186#endif
5187
5188 BOOL isBlocking = timerInfo->flag & WAIT_INTERNAL_COMPLETION;
5189
5190 BOOL status = QueueUserAPC((PAPCFUNC)DeregisterTimer,
5191 TimerThread,
5192 (size_t)(TimerInfo*)timerInfo);
5193
5194 if (FALSE == status)
5195 {
5196 if (isBlocking)
5197 timerInfo->InternalCompletionEvent.CloseEvent();
5198 return FALSE;
5199 }
5200
5201 if (isBlocking)
5202 {
5203 _ASSERTE(timerInfo->ExternalEventSafeHandle == NULL);
5204 _ASSERTE(timerInfo->ExternalCompletionEvent == INVALID_HANDLE);
5205 _ASSERTE(GetThread() != pTimerThread);
5206
5207 timerInfo->InternalCompletionEvent.Wait(INFINITE,TRUE /*alertable*/);
5208 timerInfo->InternalCompletionEvent.CloseEvent();
5209 // Release handles and delete TimerInfo
5210 _ASSERTE(timerInfo->refCount == 0);
5211 // if WAIT_INTERNAL_COMPLETION flag is not set, timerInfo will be deleted in DeleteTimer.
5212 timerInfo->flag &= ~WAIT_INTERNAL_COMPLETION;
5213 DeleteTimer(timerInfo);
5214 }
5215 return status;
5216}
5217
5218void ThreadpoolMgr::DeregisterTimer(TimerInfo* pArgs)
5219{
5220 CONTRACTL
5221 {
5222 NOTHROW;
5223 GC_TRIGGERS;
5224 MODE_PREEMPTIVE;
5225 SO_INTOLERANT;
5226 }
5227 CONTRACTL_END;
5228
5229 TimerInfo* timerInfo = (TimerInfo*) pArgs;
5230
5231 if (! (timerInfo->state & TIMER_REGISTERED) )
5232 {
5233 // set state to deleted, so that it does not get registered
5234 timerInfo->state |= TIMER_DELETE ;
5235
5236 // since the timer has not even been registered, we dont need an interlock to decrease the RefCount
5237 timerInfo->refCount--;
5238
5239 return;
5240 }
5241
5242 if (timerInfo->state & TIMER_ACTIVE)
5243 {
5244 DeactivateTimer(timerInfo);
5245 }
5246
5247 if (InterlockedDecrement(&timerInfo->refCount) == 0 )
5248 {
5249 DeleteTimer(timerInfo);
5250 }
5251 return;
5252}
5253
5254#endif // !DACCESS_COMPILE
5255