1 | // Licensed to the .NET Foundation under one or more agreements. |
2 | // The .NET Foundation licenses this file to you under the MIT license. |
3 | // See the LICENSE file in the project root for more information. |
4 | |
5 | |
6 | /*++ |
7 | |
8 | Module Name: |
9 | |
10 | Win32ThreadPool.cpp |
11 | |
12 | Abstract: |
13 | |
14 | This module implements Threadpool support using Win32 APIs |
15 | |
16 | |
17 | Revision History: |
18 | December 1999 - Created |
19 | |
20 | --*/ |
21 | |
22 | #include "common.h" |
23 | #include "log.h" |
24 | #include "threadpoolrequest.h" |
25 | #include "win32threadpool.h" |
26 | #include "delegateinfo.h" |
27 | #include "eeconfig.h" |
28 | #include "dbginterface.h" |
29 | #include "corhost.h" |
30 | #include "eventtrace.h" |
31 | #include "threads.h" |
32 | #include "appdomain.inl" |
33 | #include "nativeoverlapped.h" |
34 | #include "hillclimbing.h" |
35 | #include "configuration.h" |
36 | |
37 | |
38 | #ifndef FEATURE_PAL |
39 | #ifndef DACCESS_COMPILE |
40 | |
41 | // APIs that must be accessed through dynamic linking. |
42 | typedef int (WINAPI *NtQueryInformationThreadProc) ( |
43 | HANDLE ThreadHandle, |
44 | THREADINFOCLASS ThreadInformationClass, |
45 | PVOID ThreadInformation, |
46 | ULONG ThreadInformationLength, |
47 | PULONG ReturnLength); |
48 | NtQueryInformationThreadProc g_pufnNtQueryInformationThread = NULL; |
49 | |
50 | typedef int (WINAPI *NtQuerySystemInformationProc) ( |
51 | SYSTEM_INFORMATION_CLASS SystemInformationClass, |
52 | PVOID SystemInformation, |
53 | ULONG SystemInformationLength, |
54 | PULONG ReturnLength OPTIONAL); |
55 | NtQuerySystemInformationProc g_pufnNtQuerySystemInformation = NULL; |
56 | |
57 | typedef HANDLE (WINAPI * CreateWaitableTimerExProc) ( |
58 | LPSECURITY_ATTRIBUTES lpTimerAttributes, |
59 | LPCTSTR lpTimerName, |
60 | DWORD dwFlags, |
61 | DWORD dwDesiredAccess); |
62 | CreateWaitableTimerExProc g_pufnCreateWaitableTimerEx = NULL; |
63 | |
64 | typedef BOOL (WINAPI * SetWaitableTimerExProc) ( |
65 | HANDLE hTimer, |
66 | const LARGE_INTEGER *lpDueTime, |
67 | LONG lPeriod, |
68 | PTIMERAPCROUTINE pfnCompletionRoutine, |
69 | LPVOID lpArgToCompletionRoutine, |
70 | void* WakeContext, //should be PREASON_CONTEXT, but it's not defined for us (and we don't use it) |
71 | ULONG TolerableDelay); |
72 | SetWaitableTimerExProc g_pufnSetWaitableTimerEx = NULL; |
73 | |
74 | #endif // !DACCESS_COMPILE |
75 | #endif // !FEATURE_PAL |
76 | |
77 | BOOL ThreadpoolMgr::InitCompletionPortThreadpool = FALSE; |
78 | HANDLE ThreadpoolMgr::GlobalCompletionPort; // used for binding io completions on file handles |
79 | |
80 | SVAL_IMPL(ThreadpoolMgr::ThreadCounter,ThreadpoolMgr,CPThreadCounter); |
81 | |
82 | SVAL_IMPL_INIT(LONG,ThreadpoolMgr,MaxLimitTotalCPThreads,1000); // = MaxLimitCPThreadsPerCPU * number of CPUS |
83 | SVAL_IMPL(LONG,ThreadpoolMgr,MinLimitTotalCPThreads); |
84 | SVAL_IMPL(LONG,ThreadpoolMgr,MaxFreeCPThreads); // = MaxFreeCPThreadsPerCPU * Number of CPUS |
85 | |
86 | Volatile<LONG> ThreadpoolMgr::NumCPInfrastructureThreads = 0; // number of threads currently busy handling draining cycle |
87 | |
88 | // Cacheline aligned, hot variable |
89 | DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) SVAL_IMPL(ThreadpoolMgr::ThreadCounter, ThreadpoolMgr, WorkerCounter); |
90 | |
91 | SVAL_IMPL(LONG,ThreadpoolMgr,MinLimitTotalWorkerThreads); // = MaxLimitCPThreadsPerCPU * number of CPUS |
92 | SVAL_IMPL(LONG,ThreadpoolMgr,MaxLimitTotalWorkerThreads); // = MaxLimitCPThreadsPerCPU * number of CPUS |
93 | |
94 | SVAL_IMPL(LONG,ThreadpoolMgr,cpuUtilization); |
95 | LONG ThreadpoolMgr::cpuUtilizationAverage = 0; |
96 | |
97 | HillClimbing ThreadpoolMgr::HillClimbingInstance; |
98 | |
99 | // Cacheline aligned, 3 hot variables updated in a group |
100 | DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) LONG ThreadpoolMgr::PriorCompletedWorkRequests = 0; |
101 | DWORD ThreadpoolMgr::PriorCompletedWorkRequestsTime; |
102 | DWORD ThreadpoolMgr::NextCompletedWorkRequestsTime; |
103 | |
104 | LARGE_INTEGER ThreadpoolMgr::CurrentSampleStartTime; |
105 | |
106 | unsigned int ThreadpoolMgr::WorkerThreadSpinLimit; |
107 | bool ThreadpoolMgr::IsHillClimbingDisabled; |
108 | int ThreadpoolMgr::ThreadAdjustmentInterval; |
109 | |
110 | #define INVALID_HANDLE ((HANDLE) -1) |
111 | #define NEW_THREAD_THRESHOLD 7 // Number of requests outstanding before we start a new thread |
112 | #define CP_THREAD_PENDINGIO_WAIT 5000 // polling interval when thread is retired but has a pending io |
113 | #define GATE_THREAD_DELAY 500 /*milliseconds*/ |
114 | #define GATE_THREAD_DELAY_TOLERANCE 50 /*milliseconds*/ |
115 | #define DELAY_BETWEEN_SUSPENDS 5000 + GATE_THREAD_DELAY // time to delay between suspensions |
116 | #define SUSPEND_TIME GATE_THREAD_DELAY+100 // milliseconds to suspend during SuspendProcessing |
117 | |
118 | LONG ThreadpoolMgr::Initialization=0; // indicator of whether the threadpool is initialized. |
119 | |
120 | // Cacheline aligned, hot variable |
121 | DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) unsigned int ThreadpoolMgr::LastDequeueTime; // used to determine if work items are getting thread starved |
122 | |
123 | // Move out of from preceeding variables' cache line |
124 | DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) int ThreadpoolMgr::offset_counter = 0; |
125 | |
126 | SPTR_IMPL(WorkRequest,ThreadpoolMgr,WorkRequestHead); // Head of work request queue |
127 | SPTR_IMPL(WorkRequest,ThreadpoolMgr,WorkRequestTail); // Head of work request queue |
128 | |
129 | SVAL_IMPL(ThreadpoolMgr::LIST_ENTRY,ThreadpoolMgr,TimerQueue); // queue of timers |
130 | |
131 | //unsigned int ThreadpoolMgr::LastCpuSamplingTime=0; // last time cpu utilization was sampled by gate thread |
132 | unsigned int ThreadpoolMgr::LastCPThreadCreation=0; // last time a completion port thread was created |
133 | unsigned int ThreadpoolMgr::NumberOfProcessors; // = NumberOfWorkerThreads - no. of blocked threads |
134 | |
135 | |
136 | CrstStatic ThreadpoolMgr::WorkerCriticalSection; |
137 | CLREvent * ThreadpoolMgr::RetiredCPWakeupEvent; // wakeup event for completion port threads |
138 | CrstStatic ThreadpoolMgr::WaitThreadsCriticalSection; |
139 | ThreadpoolMgr::LIST_ENTRY ThreadpoolMgr::WaitThreadsHead; |
140 | |
141 | CLRLifoSemaphore* ThreadpoolMgr::WorkerSemaphore; |
142 | CLRLifoSemaphore* ThreadpoolMgr::RetiredWorkerSemaphore; |
143 | |
144 | CrstStatic ThreadpoolMgr::TimerQueueCriticalSection; |
145 | HANDLE ThreadpoolMgr::TimerThread=NULL; |
146 | Thread *ThreadpoolMgr::pTimerThread=NULL; |
147 | |
148 | // Cacheline aligned, hot variable |
149 | DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) DWORD ThreadpoolMgr::LastTickCount; |
150 | |
151 | #ifdef _DEBUG |
152 | DWORD ThreadpoolMgr::TickCountAdjustment=0; |
153 | #endif |
154 | |
155 | // Cacheline aligned, hot variable |
156 | DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) LONG ThreadpoolMgr::GateThreadStatus=GATE_THREAD_STATUS_NOT_RUNNING; |
157 | |
158 | // Move out of from preceeding variables' cache line |
159 | DECLSPEC_ALIGN(MAX_CACHE_LINE_SIZE) ThreadpoolMgr::RecycledListsWrapper ThreadpoolMgr::RecycledLists; |
160 | |
161 | ThreadpoolMgr::TimerInfo *ThreadpoolMgr::TimerInfosToBeRecycled = NULL; |
162 | |
163 | BOOL ThreadpoolMgr::IsApcPendingOnWaitThread = FALSE; |
164 | |
165 | #ifndef DACCESS_COMPILE |
166 | |
167 | // Macros for inserting/deleting from doubly linked list |
168 | |
169 | #define InitializeListHead(ListHead) (\ |
170 | (ListHead)->Flink = (ListHead)->Blink = (ListHead)) |
171 | |
172 | // |
173 | // these are named the same as slightly different macros in the NT headers |
174 | // |
175 | #undef RemoveHeadList |
176 | #undef RemoveEntryList |
177 | #undef InsertTailList |
178 | #undef InsertHeadList |
179 | |
180 | #define RemoveHeadList(ListHead,FirstEntry) \ |
181 | {\ |
182 | FirstEntry = (LIST_ENTRY*) (ListHead)->Flink;\ |
183 | ((LIST_ENTRY*)FirstEntry->Flink)->Blink = (ListHead);\ |
184 | (ListHead)->Flink = FirstEntry->Flink;\ |
185 | } |
186 | |
187 | #define RemoveEntryList(Entry) {\ |
188 | LIST_ENTRY* _EX_Entry;\ |
189 | _EX_Entry = (Entry);\ |
190 | ((LIST_ENTRY*) _EX_Entry->Blink)->Flink = _EX_Entry->Flink;\ |
191 | ((LIST_ENTRY*) _EX_Entry->Flink)->Blink = _EX_Entry->Blink;\ |
192 | } |
193 | |
194 | #define InsertTailList(ListHead,Entry) \ |
195 | (Entry)->Flink = (ListHead);\ |
196 | (Entry)->Blink = (ListHead)->Blink;\ |
197 | ((LIST_ENTRY*)(ListHead)->Blink)->Flink = (Entry);\ |
198 | (ListHead)->Blink = (Entry); |
199 | |
200 | #define InsertHeadList(ListHead,Entry) {\ |
201 | LIST_ENTRY* _EX_Flink;\ |
202 | LIST_ENTRY* _EX_ListHead;\ |
203 | _EX_ListHead = (LIST_ENTRY*)(ListHead);\ |
204 | _EX_Flink = (LIST_ENTRY*) _EX_ListHead->Flink;\ |
205 | (Entry)->Flink = _EX_Flink;\ |
206 | (Entry)->Blink = _EX_ListHead;\ |
207 | _EX_Flink->Blink = (Entry);\ |
208 | _EX_ListHead->Flink = (Entry);\ |
209 | } |
210 | |
211 | #define IsListEmpty(ListHead) \ |
212 | ((ListHead)->Flink == (ListHead)) |
213 | |
214 | #define SetLastHRError(hr) \ |
215 | if (HRESULT_FACILITY(hr) == FACILITY_WIN32)\ |
216 | SetLastError(HRESULT_CODE(hr));\ |
217 | else \ |
218 | SetLastError(ERROR_INVALID_DATA);\ |
219 | |
220 | /************************************************************************/ |
221 | |
222 | void ThreadpoolMgr::RecycledListsWrapper::Initialize( unsigned int numProcs ) |
223 | { |
224 | CONTRACTL |
225 | { |
226 | THROWS; |
227 | MODE_ANY; |
228 | GC_NOTRIGGER; |
229 | } |
230 | CONTRACTL_END; |
231 | |
232 | pRecycledListPerProcessor = new RecycledListInfo[numProcs][MEMTYPE_COUNT]; |
233 | } |
234 | |
235 | //--// |
236 | |
237 | void ThreadpoolMgr::EnsureInitialized() |
238 | { |
239 | CONTRACTL |
240 | { |
241 | THROWS; // Initialize can throw |
242 | MODE_ANY; |
243 | GC_NOTRIGGER; |
244 | } |
245 | CONTRACTL_END; |
246 | |
247 | if (IsInitialized()) |
248 | return; |
249 | |
250 | DWORD dwSwitchCount = 0; |
251 | |
252 | retry: |
253 | if (InterlockedCompareExchange(&Initialization, 1, 0) == 0) |
254 | { |
255 | if (Initialize()) |
256 | Initialization = -1; |
257 | else |
258 | { |
259 | Initialization = 0; |
260 | COMPlusThrowOM(); |
261 | } |
262 | } |
263 | else // someone has already begun initializing. |
264 | { |
265 | // wait until it finishes |
266 | while (Initialization != -1) |
267 | { |
268 | __SwitchToThread(0, ++dwSwitchCount); |
269 | goto retry; |
270 | } |
271 | } |
272 | } |
273 | |
274 | DWORD GetDefaultMaxLimitWorkerThreads(DWORD minLimit) |
275 | { |
276 | CONTRACTL |
277 | { |
278 | MODE_ANY; |
279 | GC_NOTRIGGER; |
280 | NOTHROW; |
281 | } |
282 | CONTRACTL_END; |
283 | |
284 | // |
285 | // We determine the max limit for worker threads as follows: |
286 | // |
287 | // 1) It must be at least MinLimitTotalWorkerThreads |
288 | // 2) It must be no greater than (half the virtual address space)/(thread stack size) |
289 | // 3) It must be <= MaxPossibleWorkerThreads |
290 | // |
291 | // TODO: what about CP threads? Can they follow a similar plan? How do we allocate |
292 | // thread counts between the two kinds of threads? |
293 | // |
294 | SIZE_T stackReserveSize = 0; |
295 | Thread::GetProcessDefaultStackSize(&stackReserveSize, NULL); |
296 | |
297 | ULONGLONG halfVirtualAddressSpace; |
298 | |
299 | MEMORYSTATUSEX memStats; |
300 | memStats.dwLength = sizeof(memStats); |
301 | if (GlobalMemoryStatusEx(&memStats)) |
302 | { |
303 | halfVirtualAddressSpace = memStats.ullTotalVirtual / 2; |
304 | } |
305 | else |
306 | { |
307 | //assume the normal Win32 32-bit virtual address space |
308 | halfVirtualAddressSpace = 0x000000007FFE0000ull / 2; |
309 | } |
310 | |
311 | ULONGLONG limit = halfVirtualAddressSpace / stackReserveSize; |
312 | limit = max(limit, (ULONGLONG)minLimit); |
313 | limit = min(limit, (ULONGLONG)ThreadpoolMgr::ThreadCounter::MaxPossibleCount); |
314 | |
315 | _ASSERTE(FitsIn<DWORD>(limit)); |
316 | return (DWORD)limit; |
317 | } |
318 | |
319 | DWORD GetForceMinWorkerThreadsValue() |
320 | { |
321 | WRAPPER_NO_CONTRACT; |
322 | return Configuration::GetKnobDWORDValue(W("System.Threading.ThreadPool.MinThreads" ), CLRConfig::INTERNAL_ThreadPool_ForceMinWorkerThreads); |
323 | } |
324 | |
325 | DWORD GetForceMaxWorkerThreadsValue() |
326 | { |
327 | WRAPPER_NO_CONTRACT; |
328 | return Configuration::GetKnobDWORDValue(W("System.Threading.ThreadPool.MaxThreads" ), CLRConfig::INTERNAL_ThreadPool_ForceMaxWorkerThreads); |
329 | } |
330 | |
331 | BOOL ThreadpoolMgr::Initialize() |
332 | { |
333 | CONTRACTL |
334 | { |
335 | THROWS; |
336 | MODE_ANY; |
337 | GC_NOTRIGGER; |
338 | INJECT_FAULT(COMPlusThrowOM()); |
339 | } |
340 | CONTRACTL_END; |
341 | |
342 | BOOL bRet = FALSE; |
343 | BOOL bExceptionCaught = FALSE; |
344 | |
345 | UnManagedPerAppDomainTPCount* pADTPCount; |
346 | pADTPCount = PerAppDomainTPCountList::GetUnmanagedTPCount(); |
347 | |
348 | //ThreadPool_CPUGroup |
349 | CPUGroupInfo::EnsureInitialized(); |
350 | if (CPUGroupInfo::CanEnableGCCPUGroups() && CPUGroupInfo::CanEnableThreadUseAllCpuGroups()) |
351 | NumberOfProcessors = CPUGroupInfo::GetNumActiveProcessors(); |
352 | else |
353 | NumberOfProcessors = GetCurrentProcessCpuCount(); |
354 | InitPlatformVariables(); |
355 | |
356 | EX_TRY |
357 | { |
358 | WorkerThreadSpinLimit = CLRConfig::GetConfigValue(CLRConfig::INTERNAL_ThreadPool_UnfairSemaphoreSpinLimit); |
359 | IsHillClimbingDisabled = CLRConfig::GetConfigValue(CLRConfig::INTERNAL_HillClimbing_Disable) != 0; |
360 | ThreadAdjustmentInterval = CLRConfig::GetConfigValue(CLRConfig::INTERNAL_HillClimbing_SampleIntervalLow); |
361 | |
362 | pADTPCount->InitResources(); |
363 | WorkerCriticalSection.Init(CrstThreadpoolWorker); |
364 | WaitThreadsCriticalSection.Init(CrstThreadpoolWaitThreads); |
365 | TimerQueueCriticalSection.Init(CrstThreadpoolTimerQueue); |
366 | |
367 | // initialize WaitThreadsHead |
368 | InitializeListHead(&WaitThreadsHead); |
369 | |
370 | // initialize TimerQueue |
371 | InitializeListHead(&TimerQueue); |
372 | |
373 | RetiredCPWakeupEvent = new CLREvent(); |
374 | RetiredCPWakeupEvent->CreateAutoEvent(FALSE); |
375 | _ASSERTE(RetiredCPWakeupEvent->IsValid()); |
376 | |
377 | WorkerSemaphore = new CLRLifoSemaphore(); |
378 | WorkerSemaphore->Create(0, ThreadCounter::MaxPossibleCount); |
379 | |
380 | RetiredWorkerSemaphore = new CLRLifoSemaphore(); |
381 | RetiredWorkerSemaphore->Create(0, ThreadCounter::MaxPossibleCount); |
382 | |
383 | //ThreadPool_CPUGroup |
384 | if (CPUGroupInfo::CanEnableGCCPUGroups() && CPUGroupInfo::CanEnableThreadUseAllCpuGroups()) |
385 | RecycledLists.Initialize( CPUGroupInfo::GetNumActiveProcessors() ); |
386 | else |
387 | RecycledLists.Initialize( g_SystemInfo.dwNumberOfProcessors ); |
388 | /* |
389 | { |
390 | SYSTEM_INFO sysInfo; |
391 | |
392 | ::GetSystemInfo( &sysInfo ); |
393 | |
394 | RecycledLists.Initialize( sysInfo.dwNumberOfProcessors ); |
395 | } |
396 | */ |
397 | } |
398 | EX_CATCH |
399 | { |
400 | pADTPCount->CleanupResources(); |
401 | |
402 | if (RetiredCPWakeupEvent) |
403 | { |
404 | delete RetiredCPWakeupEvent; |
405 | RetiredCPWakeupEvent = NULL; |
406 | } |
407 | |
408 | // Note: It is fine to call Destroy on unitialized critical sections |
409 | WorkerCriticalSection.Destroy(); |
410 | WaitThreadsCriticalSection.Destroy(); |
411 | TimerQueueCriticalSection.Destroy(); |
412 | |
413 | bExceptionCaught = TRUE; |
414 | } |
415 | EX_END_CATCH(SwallowAllExceptions); |
416 | |
417 | if (bExceptionCaught) |
418 | { |
419 | goto end; |
420 | } |
421 | |
422 | // initialize Worker and CP thread settings |
423 | DWORD forceMin; |
424 | forceMin = GetForceMinWorkerThreadsValue(); |
425 | MinLimitTotalWorkerThreads = forceMin > 0 ? (LONG)forceMin : (LONG)NumberOfProcessors; |
426 | |
427 | DWORD forceMax; |
428 | forceMax = GetForceMaxWorkerThreadsValue(); |
429 | MaxLimitTotalWorkerThreads = forceMax > 0 ? (LONG)forceMax : (LONG)GetDefaultMaxLimitWorkerThreads(MinLimitTotalWorkerThreads); |
430 | |
431 | ThreadCounter::Counts counts; |
432 | counts.NumActive = 0; |
433 | counts.NumWorking = 0; |
434 | counts.NumRetired = 0; |
435 | counts.MaxWorking = MinLimitTotalWorkerThreads; |
436 | WorkerCounter.counts.AsLongLong = counts.AsLongLong; |
437 | |
438 | #ifdef _DEBUG |
439 | TickCountAdjustment = CLRConfig::GetConfigValue(CLRConfig::INTERNAL_ThreadpoolTickCountAdjustment); |
440 | #endif |
441 | |
442 | // initialize CP thread settings |
443 | MinLimitTotalCPThreads = NumberOfProcessors; |
444 | |
445 | // Use volatile store to guarantee make the value visible to the DAC (the store can be optimized out otherwise) |
446 | VolatileStoreWithoutBarrier<LONG>(&MaxFreeCPThreads, NumberOfProcessors*MaxFreeCPThreadsPerCPU); |
447 | |
448 | counts.NumActive = 0; |
449 | counts.NumWorking = 0; |
450 | counts.NumRetired = 0; |
451 | counts.MaxWorking = MinLimitTotalCPThreads; |
452 | CPThreadCounter.counts.AsLongLong = counts.AsLongLong; |
453 | |
454 | #ifndef FEATURE_PAL |
455 | { |
456 | GlobalCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, |
457 | NULL, |
458 | 0, /*ignored for invalid handle value*/ |
459 | NumberOfProcessors); |
460 | } |
461 | #endif // !FEATURE_PAL |
462 | |
463 | HillClimbingInstance.Initialize(); |
464 | |
465 | bRet = TRUE; |
466 | end: |
467 | return bRet; |
468 | } |
469 | |
470 | void ThreadpoolMgr::InitPlatformVariables() |
471 | { |
472 | CONTRACTL |
473 | { |
474 | NOTHROW; |
475 | MODE_ANY; |
476 | GC_NOTRIGGER; |
477 | } |
478 | CONTRACTL_END; |
479 | |
480 | #ifndef FEATURE_PAL |
481 | HINSTANCE hNtDll; |
482 | HINSTANCE hCoreSynch; |
483 | { |
484 | CONTRACT_VIOLATION(GCViolation|FaultViolation); |
485 | hNtDll = CLRLoadLibrary(W("ntdll.dll" )); |
486 | _ASSERTE(hNtDll); |
487 | #ifdef FEATURE_CORESYSTEM |
488 | hCoreSynch = CLRLoadLibrary(W("api-ms-win-core-synch-l1-1-0.dll" )); |
489 | #else |
490 | hCoreSynch = CLRLoadLibrary(W("kernel32.dll" )); |
491 | #endif |
492 | _ASSERTE(hCoreSynch); |
493 | } |
494 | |
495 | // These APIs must be accessed via dynamic binding since they may be removed in future |
496 | // OS versions. |
497 | g_pufnNtQueryInformationThread = (NtQueryInformationThreadProc)GetProcAddress(hNtDll,"NtQueryInformationThread" ); |
498 | g_pufnNtQuerySystemInformation = (NtQuerySystemInformationProc)GetProcAddress(hNtDll,"NtQuerySystemInformation" ); |
499 | |
500 | |
501 | // These APIs are only supported on newer Windows versions |
502 | g_pufnCreateWaitableTimerEx = (CreateWaitableTimerExProc)GetProcAddress(hCoreSynch, "CreateWaitableTimerExW" ); |
503 | g_pufnSetWaitableTimerEx = (SetWaitableTimerExProc)GetProcAddress(hCoreSynch, "SetWaitableTimerEx" ); |
504 | #endif |
505 | } |
506 | |
507 | BOOL ThreadpoolMgr::SetMaxThreadsHelper(DWORD MaxWorkerThreads, |
508 | DWORD MaxIOCompletionThreads) |
509 | { |
510 | CONTRACTL |
511 | { |
512 | THROWS; // Crst can throw and toggle GC mode |
513 | MODE_ANY; |
514 | GC_TRIGGERS; |
515 | } |
516 | CONTRACTL_END; |
517 | |
518 | BOOL result = FALSE; |
519 | |
520 | // doesn't need to be WorkerCS, but using it to avoid race condition between setting min and max, and didn't want to create a new CS. |
521 | CrstHolder csh(&WorkerCriticalSection); |
522 | |
523 | if (MaxWorkerThreads >= (DWORD)MinLimitTotalWorkerThreads && |
524 | MaxIOCompletionThreads >= (DWORD)MinLimitTotalCPThreads && |
525 | MaxWorkerThreads != 0 && |
526 | MaxIOCompletionThreads != 0) |
527 | { |
528 | BEGIN_SO_INTOLERANT_CODE(GetThread()); |
529 | |
530 | if (GetForceMaxWorkerThreadsValue() == 0) |
531 | { |
532 | MaxLimitTotalWorkerThreads = min(MaxWorkerThreads, (DWORD)ThreadCounter::MaxPossibleCount); |
533 | |
534 | ThreadCounter::Counts counts = WorkerCounter.GetCleanCounts(); |
535 | while (counts.MaxWorking > MaxLimitTotalWorkerThreads) |
536 | { |
537 | ThreadCounter::Counts newCounts = counts; |
538 | newCounts.MaxWorking = MaxLimitTotalWorkerThreads; |
539 | |
540 | ThreadCounter::Counts oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts); |
541 | if (oldCounts == counts) |
542 | counts = newCounts; |
543 | else |
544 | counts = oldCounts; |
545 | } |
546 | } |
547 | |
548 | END_SO_INTOLERANT_CODE; |
549 | |
550 | MaxLimitTotalCPThreads = min(MaxIOCompletionThreads, (DWORD)ThreadCounter::MaxPossibleCount); |
551 | |
552 | result = TRUE; |
553 | } |
554 | |
555 | return result; |
556 | } |
557 | |
558 | /************************************************************************/ |
559 | BOOL ThreadpoolMgr::SetMaxThreads(DWORD MaxWorkerThreads, |
560 | DWORD MaxIOCompletionThreads) |
561 | { |
562 | CONTRACTL |
563 | { |
564 | THROWS; // SetMaxThreadsHelper can throw and toggle GC mode |
565 | MODE_ANY; |
566 | GC_TRIGGERS; |
567 | } |
568 | CONTRACTL_END; |
569 | |
570 | EnsureInitialized(); |
571 | |
572 | return SetMaxThreadsHelper(MaxWorkerThreads, MaxIOCompletionThreads); |
573 | } |
574 | |
575 | BOOL ThreadpoolMgr::GetMaxThreads(DWORD* MaxWorkerThreads, |
576 | DWORD* MaxIOCompletionThreads) |
577 | { |
578 | LIMITED_METHOD_CONTRACT; |
579 | |
580 | |
581 | if (!MaxWorkerThreads || !MaxIOCompletionThreads) |
582 | { |
583 | SetLastHRError(ERROR_INVALID_DATA); |
584 | return FALSE; |
585 | } |
586 | |
587 | EnsureInitialized(); |
588 | |
589 | *MaxWorkerThreads = (DWORD)MaxLimitTotalWorkerThreads; |
590 | *MaxIOCompletionThreads = MaxLimitTotalCPThreads; |
591 | return TRUE; |
592 | } |
593 | |
594 | BOOL ThreadpoolMgr::SetMinThreads(DWORD MinWorkerThreads, |
595 | DWORD MinIOCompletionThreads) |
596 | { |
597 | CONTRACTL |
598 | { |
599 | THROWS; // Crst can throw and toggle GC mode |
600 | MODE_ANY; |
601 | GC_TRIGGERS; |
602 | } |
603 | CONTRACTL_END; |
604 | |
605 | EnsureInitialized(); |
606 | |
607 | // doesn't need to be WorkerCS, but using it to avoid race condition between setting min and max, and didn't want to create a new CS. |
608 | CrstHolder csh(&WorkerCriticalSection); |
609 | |
610 | BOOL init_result = FALSE; |
611 | |
612 | if (MinWorkerThreads >= 0 && MinIOCompletionThreads >= 0 && |
613 | MinWorkerThreads <= (DWORD) MaxLimitTotalWorkerThreads && |
614 | MinIOCompletionThreads <= (DWORD) MaxLimitTotalCPThreads) |
615 | { |
616 | BEGIN_SO_INTOLERANT_CODE(GetThread()); |
617 | |
618 | if (GetForceMinWorkerThreadsValue() == 0) |
619 | { |
620 | MinLimitTotalWorkerThreads = max(1, min(MinWorkerThreads, (DWORD)ThreadCounter::MaxPossibleCount)); |
621 | |
622 | ThreadCounter::Counts counts = WorkerCounter.GetCleanCounts(); |
623 | while (counts.MaxWorking < MinLimitTotalWorkerThreads) |
624 | { |
625 | ThreadCounter::Counts newCounts = counts; |
626 | newCounts.MaxWorking = MinLimitTotalWorkerThreads; |
627 | |
628 | ThreadCounter::Counts oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts); |
629 | if (oldCounts == counts) |
630 | { |
631 | counts = newCounts; |
632 | |
633 | // if we increased the limit, and there are pending workitems, we need |
634 | // to dispatch a thread to process the work. |
635 | if (newCounts.MaxWorking > oldCounts.MaxWorking && |
636 | PerAppDomainTPCountList::AreRequestsPendingInAnyAppDomains()) |
637 | { |
638 | MaybeAddWorkingWorker(); |
639 | } |
640 | } |
641 | else |
642 | { |
643 | counts = oldCounts; |
644 | } |
645 | } |
646 | } |
647 | |
648 | END_SO_INTOLERANT_CODE; |
649 | |
650 | MinLimitTotalCPThreads = max(1, min(MinIOCompletionThreads, (DWORD)ThreadCounter::MaxPossibleCount)); |
651 | |
652 | init_result = TRUE; |
653 | } |
654 | |
655 | return init_result; |
656 | } |
657 | |
658 | BOOL ThreadpoolMgr::GetMinThreads(DWORD* MinWorkerThreads, |
659 | DWORD* MinIOCompletionThreads) |
660 | { |
661 | LIMITED_METHOD_CONTRACT; |
662 | |
663 | |
664 | if (!MinWorkerThreads || !MinIOCompletionThreads) |
665 | { |
666 | SetLastHRError(ERROR_INVALID_DATA); |
667 | return FALSE; |
668 | } |
669 | |
670 | EnsureInitialized(); |
671 | |
672 | *MinWorkerThreads = (DWORD)MinLimitTotalWorkerThreads; |
673 | *MinIOCompletionThreads = MinLimitTotalCPThreads; |
674 | return TRUE; |
675 | } |
676 | |
677 | BOOL ThreadpoolMgr::GetAvailableThreads(DWORD* AvailableWorkerThreads, |
678 | DWORD* AvailableIOCompletionThreads) |
679 | { |
680 | LIMITED_METHOD_CONTRACT; |
681 | |
682 | if (!AvailableWorkerThreads || !AvailableIOCompletionThreads) |
683 | { |
684 | SetLastHRError(ERROR_INVALID_DATA); |
685 | return FALSE; |
686 | } |
687 | |
688 | EnsureInitialized(); |
689 | |
690 | ThreadCounter::Counts counts = WorkerCounter.GetCleanCounts(); |
691 | |
692 | if (MaxLimitTotalWorkerThreads < counts.NumActive) |
693 | *AvailableWorkerThreads = 0; |
694 | else |
695 | *AvailableWorkerThreads = MaxLimitTotalWorkerThreads - counts.NumWorking; |
696 | |
697 | counts = CPThreadCounter.GetCleanCounts(); |
698 | if (MaxLimitTotalCPThreads < counts.NumActive) |
699 | *AvailableIOCompletionThreads = counts.NumActive - counts.NumWorking; |
700 | else |
701 | *AvailableIOCompletionThreads = MaxLimitTotalCPThreads - counts.NumWorking; |
702 | return TRUE; |
703 | } |
704 | |
705 | void QueueUserWorkItemHelp(LPTHREAD_START_ROUTINE Function, PVOID Context) |
706 | { |
707 | STATIC_CONTRACT_THROWS; |
708 | STATIC_CONTRACT_GC_TRIGGERS; |
709 | STATIC_CONTRACT_MODE_ANY; |
710 | /* Cannot use contract here because of SEH |
711 | CONTRACTL |
712 | { |
713 | THROWS; |
714 | GC_TRIGGERS; |
715 | MODE_ANY; |
716 | } |
717 | CONTRACTL_END;*/ |
718 | |
719 | Function(Context); |
720 | |
721 | Thread *pThread = GetThread(); |
722 | if (pThread) { |
723 | if (pThread->IsAbortRequested()) |
724 | pThread->EEResetAbort(Thread::TAR_ALL); |
725 | pThread->InternalReset(); |
726 | } |
727 | } |
728 | |
729 | // |
730 | // WorkingThreadCounts tracks the number of worker threads currently doing user work, and the maximum number of such threads |
731 | // since the last time TakeMaxWorkingThreadCount was called. This information is for diagnostic purposes only, |
732 | // and is tracked only if the CLR config value INTERNAL_ThreadPool_EnableWorkerTracking is non-zero (this feature is off |
733 | // by default). |
734 | // |
735 | union WorkingThreadCounts |
736 | { |
737 | struct |
738 | { |
739 | int currentWorking : 16; |
740 | int maxWorking : 16; |
741 | }; |
742 | |
743 | LONG asLong; |
744 | }; |
745 | |
746 | WorkingThreadCounts g_workingThreadCounts; |
747 | |
748 | // |
749 | // If worker tracking is enabled (see above) then this is called immediately before and after a worker thread executes |
750 | // each work item. |
751 | // |
752 | void ThreadpoolMgr::ReportThreadStatus(bool isWorking) |
753 | { |
754 | CONTRACTL |
755 | { |
756 | NOTHROW; |
757 | GC_NOTRIGGER; |
758 | SO_TOLERANT; |
759 | MODE_ANY; |
760 | } |
761 | CONTRACTL_END; |
762 | _ASSERTE(CLRConfig::GetConfigValue(CLRConfig::INTERNAL_ThreadPool_EnableWorkerTracking)); |
763 | while (true) |
764 | { |
765 | WorkingThreadCounts currentCounts, newCounts; |
766 | currentCounts.asLong = VolatileLoad(&g_workingThreadCounts.asLong); |
767 | |
768 | newCounts = currentCounts; |
769 | |
770 | if (isWorking) |
771 | newCounts.currentWorking++; |
772 | |
773 | if (newCounts.currentWorking > newCounts.maxWorking) |
774 | newCounts.maxWorking = newCounts.currentWorking; |
775 | |
776 | if (!isWorking) |
777 | newCounts.currentWorking--; |
778 | |
779 | if (currentCounts.asLong == InterlockedCompareExchange(&g_workingThreadCounts.asLong, newCounts.asLong, currentCounts.asLong)) |
780 | break; |
781 | } |
782 | } |
783 | |
784 | // |
785 | // Returns the max working count since the previous call to TakeMaxWorkingThreadCount, and resets WorkingThreadCounts.maxWorking. |
786 | // |
787 | int TakeMaxWorkingThreadCount() |
788 | { |
789 | CONTRACTL |
790 | { |
791 | NOTHROW; |
792 | GC_NOTRIGGER; |
793 | SO_TOLERANT; |
794 | MODE_ANY; |
795 | } |
796 | CONTRACTL_END; |
797 | _ASSERTE(CLRConfig::GetConfigValue(CLRConfig::INTERNAL_ThreadPool_EnableWorkerTracking)); |
798 | while (true) |
799 | { |
800 | WorkingThreadCounts currentCounts, newCounts; |
801 | currentCounts.asLong = VolatileLoad(&g_workingThreadCounts.asLong); |
802 | |
803 | newCounts = currentCounts; |
804 | newCounts.maxWorking = 0; |
805 | |
806 | if (currentCounts.asLong == InterlockedCompareExchange(&g_workingThreadCounts.asLong, newCounts.asLong, currentCounts.asLong)) |
807 | { |
808 | // If we haven't updated the counts since the last call to TakeMaxWorkingThreadCount, then we never updated maxWorking. |
809 | // In that case, the number of working threads for the whole period since the last TakeMaxWorkingThreadCount is the |
810 | // current number of working threads. |
811 | return currentCounts.maxWorking == 0 ? currentCounts.currentWorking : currentCounts.maxWorking; |
812 | } |
813 | } |
814 | } |
815 | |
816 | |
817 | /************************************************************************/ |
818 | |
819 | BOOL ThreadpoolMgr::QueueUserWorkItem(LPTHREAD_START_ROUTINE Function, |
820 | PVOID Context, |
821 | DWORD Flags, |
822 | BOOL UnmanagedTPRequest) |
823 | { |
824 | CONTRACTL |
825 | { |
826 | THROWS; // EnsureInitialized, EnqueueWorkRequest can throw OOM |
827 | GC_TRIGGERS; |
828 | MODE_ANY; |
829 | } |
830 | CONTRACTL_END; |
831 | |
832 | EnsureInitialized(); |
833 | |
834 | |
835 | if (Flags == CALL_OR_QUEUE) |
836 | { |
837 | // we've been asked to call this directly if the thread pressure is not too high |
838 | |
839 | int MinimumAvailableCPThreads = (NumberOfProcessors < 3) ? 3 : NumberOfProcessors; |
840 | |
841 | ThreadCounter::Counts counts = CPThreadCounter.GetCleanCounts(); |
842 | if ((MaxLimitTotalCPThreads - counts.NumActive) >= MinimumAvailableCPThreads ) |
843 | { |
844 | ThreadLocaleHolder localeHolder; |
845 | |
846 | QueueUserWorkItemHelp(Function, Context); |
847 | return TRUE; |
848 | } |
849 | |
850 | } |
851 | |
852 | if (UnmanagedTPRequest) |
853 | { |
854 | UnManagedPerAppDomainTPCount* pADTPCount; |
855 | pADTPCount = PerAppDomainTPCountList::GetUnmanagedTPCount(); |
856 | pADTPCount->QueueUnmanagedWorkRequest(Function, Context); |
857 | } |
858 | else |
859 | { |
860 | // caller has already registered its TPCount; this call is just to adjust the thread count |
861 | } |
862 | |
863 | return TRUE; |
864 | } |
865 | |
866 | |
867 | bool ThreadpoolMgr::ShouldWorkerKeepRunning() |
868 | { |
869 | WRAPPER_NO_CONTRACT; |
870 | |
871 | // |
872 | // Maybe this thread should retire now. Let's see. |
873 | // |
874 | bool shouldThisThreadKeepRunning = true; |
875 | |
876 | // Dirty read is OK here; the worst that can happen is that we won't retire this time. In the |
877 | // case where we might retire, we have to succeed a CompareExchange, which will have the effect |
878 | // of validating this read. |
879 | ThreadCounter::Counts counts = WorkerCounter.DangerousGetDirtyCounts(); |
880 | while (true) |
881 | { |
882 | if (counts.NumActive <= counts.MaxWorking) |
883 | { |
884 | shouldThisThreadKeepRunning = true; |
885 | break; |
886 | } |
887 | |
888 | ThreadCounter::Counts newCounts = counts; |
889 | newCounts.NumWorking--; |
890 | newCounts.NumActive--; |
891 | newCounts.NumRetired++; |
892 | |
893 | ThreadCounter::Counts oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts); |
894 | |
895 | if (oldCounts == counts) |
896 | { |
897 | shouldThisThreadKeepRunning = false; |
898 | break; |
899 | } |
900 | |
901 | counts = oldCounts; |
902 | } |
903 | |
904 | return shouldThisThreadKeepRunning; |
905 | } |
906 | |
907 | DangerousNonHostedSpinLock ThreadpoolMgr::ThreadAdjustmentLock; |
908 | |
909 | |
910 | // |
911 | // This method must only be called if ShouldAdjustMaxWorkersActive has returned true, *and* |
912 | // ThreadAdjustmentLock is held. |
913 | // |
914 | void ThreadpoolMgr::AdjustMaxWorkersActive() |
915 | { |
916 | CONTRACTL |
917 | { |
918 | NOTHROW; |
919 | if (GetThread()) { GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);} |
920 | MODE_ANY; |
921 | } |
922 | CONTRACTL_END; |
923 | |
924 | _ASSERTE(ThreadAdjustmentLock.IsHeld()); |
925 | |
926 | DWORD currentTicks = GetTickCount(); |
927 | LONG totalNumCompletions = Thread::GetTotalThreadPoolCompletionCount(); |
928 | LONG numCompletions = totalNumCompletions - VolatileLoad(&PriorCompletedWorkRequests); |
929 | |
930 | LARGE_INTEGER startTime = CurrentSampleStartTime; |
931 | LARGE_INTEGER endTime; |
932 | QueryPerformanceCounter(&endTime); |
933 | |
934 | static LARGE_INTEGER freq; |
935 | if (freq.QuadPart == 0) |
936 | QueryPerformanceFrequency(&freq); |
937 | |
938 | double elapsed = (double)(endTime.QuadPart - startTime.QuadPart) / freq.QuadPart; |
939 | |
940 | // |
941 | // It's possible for the current sample to be reset while we're holding |
942 | // ThreadAdjustmentLock. This will result in a very short sample, possibly |
943 | // with completely bogus counts. We'll try to detect this by checking the sample |
944 | // interval; if it's very short, then we try again later. |
945 | // |
946 | if (elapsed*1000.0 >= (ThreadAdjustmentInterval/2)) |
947 | { |
948 | ThreadCounter::Counts currentCounts = WorkerCounter.GetCleanCounts(); |
949 | |
950 | int newMax = HillClimbingInstance.Update( |
951 | currentCounts.MaxWorking, |
952 | elapsed, |
953 | numCompletions, |
954 | &ThreadAdjustmentInterval); |
955 | |
956 | while (newMax != currentCounts.MaxWorking) |
957 | { |
958 | ThreadCounter::Counts newCounts = currentCounts; |
959 | newCounts.MaxWorking = newMax; |
960 | |
961 | ThreadCounter::Counts oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, currentCounts); |
962 | if (oldCounts == currentCounts) |
963 | { |
964 | // |
965 | // If we're increasing the max, inject a thread. If that thread finds work, it will inject |
966 | // another thread, etc., until nobody finds work or we reach the new maximum. |
967 | // |
968 | // If we're reducing the max, whichever threads notice this first will retire themselves. |
969 | // |
970 | if (newMax > oldCounts.MaxWorking) |
971 | MaybeAddWorkingWorker(); |
972 | |
973 | break; |
974 | } |
975 | else |
976 | { |
977 | // we failed - maybe try again |
978 | if (oldCounts.MaxWorking > currentCounts.MaxWorking && |
979 | oldCounts.MaxWorking >= newMax) |
980 | { |
981 | // someone (probably the gate thread) increased the thread count more than |
982 | // we are about to do. Don't interfere. |
983 | break; |
984 | } |
985 | |
986 | currentCounts = oldCounts; |
987 | } |
988 | } |
989 | |
990 | PriorCompletedWorkRequests = totalNumCompletions; |
991 | NextCompletedWorkRequestsTime = currentTicks + ThreadAdjustmentInterval; |
992 | MemoryBarrier(); // flush previous writes (especially NextCompletedWorkRequestsTime) |
993 | PriorCompletedWorkRequestsTime = currentTicks; |
994 | CurrentSampleStartTime = endTime;; |
995 | } |
996 | } |
997 | |
998 | |
999 | void ThreadpoolMgr::MaybeAddWorkingWorker() |
1000 | { |
1001 | CONTRACTL |
1002 | { |
1003 | NOTHROW; |
1004 | if (GetThread()) { GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);} |
1005 | MODE_ANY; |
1006 | } |
1007 | CONTRACTL_END; |
1008 | |
1009 | // counts volatile read paired with CompareExchangeCounts loop set |
1010 | ThreadCounter::Counts counts = WorkerCounter.DangerousGetDirtyCounts(); |
1011 | ThreadCounter::Counts newCounts; |
1012 | while (true) |
1013 | { |
1014 | newCounts = counts; |
1015 | newCounts.NumWorking = max(counts.NumWorking, min(counts.NumWorking + 1, counts.MaxWorking)); |
1016 | newCounts.NumActive = max(counts.NumActive, newCounts.NumWorking); |
1017 | newCounts.NumRetired = max(0, counts.NumRetired - (newCounts.NumActive - counts.NumActive)); |
1018 | |
1019 | if (newCounts == counts) |
1020 | return; |
1021 | |
1022 | ThreadCounter::Counts oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts); |
1023 | |
1024 | if (oldCounts == counts) |
1025 | break; |
1026 | |
1027 | counts = oldCounts; |
1028 | } |
1029 | |
1030 | int toUnretire = counts.NumRetired - newCounts.NumRetired; |
1031 | int toCreate = (newCounts.NumActive - counts.NumActive) - toUnretire; |
1032 | int toRelease = (newCounts.NumWorking - counts.NumWorking) - (toUnretire + toCreate); |
1033 | |
1034 | _ASSERTE(toUnretire >= 0); |
1035 | _ASSERTE(toCreate >= 0); |
1036 | _ASSERTE(toRelease >= 0); |
1037 | _ASSERTE(toUnretire + toCreate + toRelease <= 1); |
1038 | |
1039 | if (toUnretire > 0) |
1040 | { |
1041 | RetiredWorkerSemaphore->Release(toUnretire); |
1042 | } |
1043 | |
1044 | if (toRelease > 0) |
1045 | WorkerSemaphore->Release(toRelease); |
1046 | |
1047 | while (toCreate > 0) |
1048 | { |
1049 | if (CreateWorkerThread()) |
1050 | { |
1051 | toCreate--; |
1052 | } |
1053 | else |
1054 | { |
1055 | // |
1056 | // Uh-oh, we promised to create a new thread, but the creation failed. We have to renege on our |
1057 | // promise. This may possibly result in no work getting done for a while, but the gate thread will |
1058 | // eventually notice that no completions are happening and force the creation of a new thread. |
1059 | // Of course, there's no guarantee *that* will work - but hopefully enough time will have passed |
1060 | // to allow whoever's using all the memory right now to release some. |
1061 | // |
1062 | |
1063 | // counts volatile read paired with CompareExchangeCounts loop set |
1064 | counts = WorkerCounter.DangerousGetDirtyCounts(); |
1065 | while (true) |
1066 | { |
1067 | // |
1068 | // If we said we would create a thread, we also said it would be working. So we need to |
1069 | // decrement both NumWorking and NumActive by the number of threads we will no longer be creating. |
1070 | // |
1071 | newCounts = counts; |
1072 | newCounts.NumWorking -= toCreate; |
1073 | newCounts.NumActive -= toCreate; |
1074 | |
1075 | ThreadCounter::Counts oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts); |
1076 | |
1077 | if (oldCounts == counts) |
1078 | break; |
1079 | |
1080 | counts = oldCounts; |
1081 | } |
1082 | |
1083 | toCreate = 0; |
1084 | } |
1085 | } |
1086 | } |
1087 | |
1088 | BOOL ThreadpoolMgr::PostQueuedCompletionStatus(LPOVERLAPPED lpOverlapped, |
1089 | LPOVERLAPPED_COMPLETION_ROUTINE Function) |
1090 | { |
1091 | CONTRACTL |
1092 | { |
1093 | THROWS; // EnsureInitialized can throw OOM |
1094 | GC_TRIGGERS; |
1095 | MODE_ANY; |
1096 | } |
1097 | CONTRACTL_END; |
1098 | |
1099 | #ifndef FEATURE_PAL |
1100 | EnsureInitialized(); |
1101 | |
1102 | _ASSERTE(GlobalCompletionPort != NULL); |
1103 | |
1104 | if (!InitCompletionPortThreadpool) |
1105 | InitCompletionPortThreadpool = TRUE; |
1106 | |
1107 | GrowCompletionPortThreadpoolIfNeeded(); |
1108 | |
1109 | // In order to allow external ETW listeners to correlate activities that use our IO completion port |
1110 | // as a dispatch mechanism, we have to ensure the runtime's calls to ::PostQueuedCompletionStatus |
1111 | // and ::GetQueuedCompletionStatus are "annotated" with ETW events representing to operations |
1112 | // performed. |
1113 | // There are currently 4 codepaths that post to the GlobalCompletionPort: |
1114 | // 1. and 2. - the Overlapped drainage events. Those are uninteresting to ETW listeners and |
1115 | // currently call the global ::PostQueuedCompletionStatus directly. |
1116 | // 3. the managed API ThreadPool.UnsafeQueueNativeOverlapped(), calling CorPostQueuedCompletionStatus() |
1117 | // which already fires the ETW event as needed |
1118 | // 4. the managed API ThreadPool.RegisterWaitForSingleObject which needs to fire the ETW event |
1119 | // at the time the managed API is called (on the orignial user thread), and not when the ::PQCS |
1120 | // is called (from the dedicated wait thread). |
1121 | // If additional codepaths appear they need to either fire the ETW event before calling this or ensure |
1122 | // we do not fire an unmatched "dequeue" event in ThreadpoolMgr::CompletionPortThreadStart |
1123 | // The current possible values for Function: |
1124 | // - CallbackForInitiateDrainageOfCompletionPortQueue and |
1125 | // CallbackForContinueDrainageOfCompletionPortQueue for Drainage |
1126 | // - BindIoCompletionCallbackStub for ThreadPool.UnsafeQueueNativeOverlapped |
1127 | // - WaitIOCompletionCallback for ThreadPool.RegisterWaitForSingleObject |
1128 | |
1129 | return ::PostQueuedCompletionStatus(GlobalCompletionPort, |
1130 | 0, |
1131 | (ULONG_PTR) Function, |
1132 | lpOverlapped); |
1133 | #else |
1134 | SetLastError(ERROR_CALL_NOT_IMPLEMENTED); |
1135 | return FALSE; |
1136 | #endif // !FEATURE_PAL |
1137 | } |
1138 | |
1139 | |
1140 | void ThreadpoolMgr::WaitIOCompletionCallback( |
1141 | DWORD dwErrorCode, |
1142 | DWORD numBytesTransferred, |
1143 | LPOVERLAPPED lpOverlapped) |
1144 | { |
1145 | CONTRACTL |
1146 | { |
1147 | THROWS; |
1148 | MODE_ANY; |
1149 | } |
1150 | CONTRACTL_END; |
1151 | |
1152 | if (dwErrorCode == ERROR_SUCCESS) |
1153 | DWORD ret = AsyncCallbackCompletion((PVOID)lpOverlapped); |
1154 | } |
1155 | |
1156 | #ifndef FEATURE_PAL |
1157 | // We need to make sure that the next jobs picked up by a completion port thread |
1158 | // is inserted into the queue after we start cleanup. The cleanup starts when a completion |
1159 | // port thread processes a special overlapped (overlappedForInitiateCleanup). |
1160 | // To do this, we loop through all completion port threads. |
1161 | // 1. If a thread is in cooperative mode, it is processing a job now, and the next job |
1162 | // it picks up will be after we start cleanup. |
1163 | // 2. A completion port thread may be waiting for a job, or is going to dispatch a job. |
1164 | // We can not distinguish these two. So we queue a dummy job to the queue after the starting |
1165 | // job. |
1166 | OVERLAPPED overlappedForInitiateCleanup; |
1167 | OVERLAPPED overlappedForContinueCleanup; |
1168 | #endif // !FEATURE_PAL |
1169 | |
1170 | Volatile<ULONG> g_fCompletionPortDrainNeeded = FALSE; |
1171 | |
1172 | VOID ThreadpoolMgr::CallbackForContinueDrainageOfCompletionPortQueue( |
1173 | DWORD dwErrorCode, |
1174 | DWORD dwNumberOfBytesTransfered, |
1175 | LPOVERLAPPED lpOverlapped |
1176 | ) |
1177 | { |
1178 | CONTRACTL |
1179 | { |
1180 | NOTHROW; |
1181 | GC_NOTRIGGER; |
1182 | MODE_PREEMPTIVE; |
1183 | } |
1184 | CONTRACTL_END; |
1185 | |
1186 | #ifndef FEATURE_PAL |
1187 | CounterHolder hldNumCPIT(&NumCPInfrastructureThreads); |
1188 | |
1189 | // It is OK if this overlapped is from a previous round. |
1190 | // We have started a new round. The next job picked by this thread is |
1191 | // going to be after the marker. |
1192 | Thread* pThread = GetThread(); |
1193 | if (pThread && !pThread->IsCompletionPortDrained()) |
1194 | { |
1195 | pThread->MarkCompletionPortDrained(); |
1196 | } |
1197 | if (g_fCompletionPortDrainNeeded) |
1198 | { |
1199 | ::PostQueuedCompletionStatus(GlobalCompletionPort, |
1200 | 0, |
1201 | (ULONG_PTR)CallbackForContinueDrainageOfCompletionPortQueue, |
1202 | &overlappedForContinueCleanup); |
1203 | // IO Completion port thread is LIFO queue. We want our special packet to be picked up by a different thread. |
1204 | while (g_fCompletionPortDrainNeeded && pThread->IsCompletionPortDrained()) |
1205 | { |
1206 | __SwitchToThread(100, CALLER_LIMITS_SPINNING); |
1207 | } |
1208 | } |
1209 | #endif // !FEATURE_PAL |
1210 | } |
1211 | |
1212 | |
1213 | VOID |
1214 | ThreadpoolMgr::CallbackForInitiateDrainageOfCompletionPortQueue( |
1215 | DWORD dwErrorCode, |
1216 | DWORD dwNumberOfBytesTransfered, |
1217 | LPOVERLAPPED lpOverlapped |
1218 | ) |
1219 | { |
1220 | #ifndef FEATURE_PAL |
1221 | CONTRACTL |
1222 | { |
1223 | NOTHROW; |
1224 | MODE_ANY; |
1225 | } |
1226 | CONTRACTL_END; |
1227 | |
1228 | CounterHolder hldNumCPIT(&NumCPInfrastructureThreads); |
1229 | { |
1230 | ThreadStoreLockHolder tsl; |
1231 | Thread *pThread = NULL; |
1232 | while ((pThread = ThreadStore::GetAllThreadList(pThread, Thread::TS_CompletionPortThread, Thread::TS_CompletionPortThread)) != NULL) |
1233 | { |
1234 | pThread->UnmarkCompletionPortDrained(); |
1235 | } |
1236 | } |
1237 | |
1238 | FastInterlockOr(&g_fCompletionPortDrainNeeded, 1); |
1239 | |
1240 | // Wake up retiring CP Threads so it can mark its status. |
1241 | ThreadCounter::Counts counts = CPThreadCounter.GetCleanCounts(); |
1242 | if (counts.NumRetired > 0) |
1243 | RetiredCPWakeupEvent->Set(); |
1244 | |
1245 | DWORD nTry = 0; |
1246 | BOOL fTryNextTime = FALSE; |
1247 | BOOL fMore = TRUE; |
1248 | BOOL fFirstTime = TRUE; |
1249 | while (fMore) |
1250 | { |
1251 | fMore = FALSE; |
1252 | Thread *pCurThread = GetThread(); |
1253 | Thread *pThread = NULL; |
1254 | { |
1255 | |
1256 | ThreadStoreLockHolder tsl; |
1257 | |
1258 | ::FlushProcessWriteBuffers(); |
1259 | |
1260 | while ((pThread = ThreadStore::GetAllThreadList(pThread, Thread::TS_CompletionPortThread, Thread::TS_CompletionPortThread)) != NULL) |
1261 | { |
1262 | if (pThread == pCurThread || pThread->IsDead() || pThread->IsCompletionPortDrained()) |
1263 | { |
1264 | continue; |
1265 | } |
1266 | |
1267 | if (pThread->PreemptiveGCDisabledOther() || pThread->GetFrame() != FRAME_TOP) |
1268 | { |
1269 | // The thread is processing an IO job now. When it picks up next job, the job |
1270 | // will be after the marker. |
1271 | pThread->MarkCompletionPortDrained(); |
1272 | } |
1273 | else |
1274 | { |
1275 | if (fFirstTime) |
1276 | { |
1277 | ::PostQueuedCompletionStatus(GlobalCompletionPort, |
1278 | 0, |
1279 | (ULONG_PTR)CallbackForContinueDrainageOfCompletionPortQueue, |
1280 | &overlappedForContinueCleanup); |
1281 | } |
1282 | fMore = TRUE; |
1283 | } |
1284 | } |
1285 | } |
1286 | if (fMore) |
1287 | { |
1288 | __SwitchToThread(10, CALLER_LIMITS_SPINNING); |
1289 | nTry ++; |
1290 | if (nTry > 1000) |
1291 | { |
1292 | fTryNextTime = TRUE; |
1293 | break; |
1294 | } |
1295 | } |
1296 | fFirstTime = FALSE; |
1297 | } |
1298 | |
1299 | FastInterlockAnd(&g_fCompletionPortDrainNeeded, 0); |
1300 | #endif // !FEATURE_PAL |
1301 | } |
1302 | |
1303 | extern void WINAPI BindIoCompletionCallbackStub(DWORD ErrorCode, |
1304 | DWORD numBytesTransferred, |
1305 | LPOVERLAPPED lpOverlapped); |
1306 | |
1307 | void HostIOCompletionCallback( |
1308 | DWORD ErrorCode, |
1309 | DWORD numBytesTransferred, |
1310 | LPOVERLAPPED lpOverlapped) |
1311 | { |
1312 | #ifndef FEATURE_PAL |
1313 | if (lpOverlapped == &overlappedForInitiateCleanup) |
1314 | { |
1315 | ThreadpoolMgr::CallbackForInitiateDrainageOfCompletionPortQueue ( |
1316 | ErrorCode, |
1317 | numBytesTransferred, |
1318 | lpOverlapped); |
1319 | } |
1320 | else if (lpOverlapped == &overlappedForContinueCleanup) |
1321 | { |
1322 | ThreadpoolMgr::CallbackForContinueDrainageOfCompletionPortQueue( |
1323 | ErrorCode, |
1324 | numBytesTransferred, |
1325 | lpOverlapped); |
1326 | } |
1327 | else |
1328 | { |
1329 | BindIoCompletionCallbackStub ( |
1330 | ErrorCode, |
1331 | numBytesTransferred, |
1332 | lpOverlapped); |
1333 | } |
1334 | #endif // !FEATURE_PAL |
1335 | } |
1336 | |
1337 | BOOL ThreadpoolMgr::DrainCompletionPortQueue() |
1338 | { |
1339 | #ifndef FEATURE_PAL |
1340 | CONTRACTL |
1341 | { |
1342 | NOTHROW; |
1343 | GC_TRIGGERS; |
1344 | MODE_ANY; |
1345 | } |
1346 | CONTRACTL_END; |
1347 | |
1348 | if (GlobalCompletionPort == 0) |
1349 | { |
1350 | return FALSE; |
1351 | } |
1352 | |
1353 | return ::PostQueuedCompletionStatus(GlobalCompletionPort, |
1354 | 0, |
1355 | (ULONG_PTR)CallbackForInitiateDrainageOfCompletionPortQueue, |
1356 | &overlappedForInitiateCleanup); |
1357 | #else |
1358 | return FALSE; |
1359 | #endif // !FEATURE_PAL |
1360 | } |
1361 | |
1362 | |
1363 | // This is either made by a worker thread or a CP thread |
1364 | // indicated by threadTypeStatus |
1365 | void ThreadpoolMgr::EnsureGateThreadRunning() |
1366 | { |
1367 | LIMITED_METHOD_CONTRACT; |
1368 | |
1369 | while (true) |
1370 | { |
1371 | switch (GateThreadStatus) |
1372 | { |
1373 | case GATE_THREAD_STATUS_REQUESTED: |
1374 | // |
1375 | // No action needed; the gate thread is running, and someone else has already registered a request |
1376 | // for it to stay. |
1377 | // |
1378 | return; |
1379 | |
1380 | case GATE_THREAD_STATUS_WAITING_FOR_REQUEST: |
1381 | // |
1382 | // Prevent the gate thread from exiting, if it hasn't already done so. If it has, we'll create it on the next iteration of |
1383 | // this loop. |
1384 | // |
1385 | FastInterlockCompareExchange(&GateThreadStatus, GATE_THREAD_STATUS_REQUESTED, GATE_THREAD_STATUS_WAITING_FOR_REQUEST); |
1386 | break; |
1387 | |
1388 | case GATE_THREAD_STATUS_NOT_RUNNING: |
1389 | // |
1390 | // We need to create a new gate thread |
1391 | // |
1392 | if (FastInterlockCompareExchange(&GateThreadStatus, GATE_THREAD_STATUS_REQUESTED, GATE_THREAD_STATUS_NOT_RUNNING) == GATE_THREAD_STATUS_NOT_RUNNING) |
1393 | { |
1394 | if (!CreateGateThread()) |
1395 | { |
1396 | // |
1397 | // If we failed to create the gate thread, someone else will need to try again later. |
1398 | // |
1399 | GateThreadStatus = GATE_THREAD_STATUS_NOT_RUNNING; |
1400 | } |
1401 | return; |
1402 | } |
1403 | break; |
1404 | |
1405 | default: |
1406 | _ASSERTE(!"Invalid value of ThreadpoolMgr::GateThreadStatus" ); |
1407 | } |
1408 | } |
1409 | |
1410 | return; |
1411 | } |
1412 | |
1413 | |
1414 | bool ThreadpoolMgr::ShouldGateThreadKeepRunning() |
1415 | { |
1416 | LIMITED_METHOD_CONTRACT; |
1417 | |
1418 | _ASSERTE(GateThreadStatus == GATE_THREAD_STATUS_WAITING_FOR_REQUEST || |
1419 | GateThreadStatus == GATE_THREAD_STATUS_REQUESTED); |
1420 | |
1421 | // |
1422 | // Switch to WAITING_FOR_REQUEST, and see if we had a request since the last check. |
1423 | // |
1424 | LONG previousStatus = FastInterlockExchange(&GateThreadStatus, GATE_THREAD_STATUS_WAITING_FOR_REQUEST); |
1425 | |
1426 | if (previousStatus == GATE_THREAD_STATUS_WAITING_FOR_REQUEST) |
1427 | { |
1428 | // |
1429 | // No recent requests for the gate thread. Check to see if we're still needed. |
1430 | // |
1431 | |
1432 | // |
1433 | // Are there any free threads in the I/O completion pool? If there are, we don't need a gate thread. |
1434 | // This implies that whenever we decrement NumFreeCPThreads to 0, we need to call EnsureGateThreadRunning(). |
1435 | // |
1436 | ThreadCounter::Counts counts = CPThreadCounter.GetCleanCounts(); |
1437 | bool needGateThreadForCompletionPort = |
1438 | InitCompletionPortThreadpool && |
1439 | (counts.NumActive - counts.NumWorking) <= 0; |
1440 | |
1441 | // |
1442 | // Are there any work requests in any worker queue? If so, we need a gate thread. |
1443 | // This imples that whenever a work queue goes from empty to non-empty, we need to call EnsureGateThreadRunning(). |
1444 | // |
1445 | bool needGateThreadForWorkerThreads = |
1446 | PerAppDomainTPCountList::AreRequestsPendingInAnyAppDomains(); |
1447 | |
1448 | // |
1449 | // If worker tracking is enabled, we need to fire periodic ETW events with active worker counts. This is |
1450 | // done by the gate thread. |
1451 | // We don't have to do anything special with EnsureGateThreadRunning() here, because this is only needed |
1452 | // once work has been added to the queue for the first time (which is covered above). |
1453 | // |
1454 | bool needGateThreadForWorkerTracking = |
1455 | 0 != CLRConfig::GetConfigValue(CLRConfig::INTERNAL_ThreadPool_EnableWorkerTracking); |
1456 | |
1457 | if (!(needGateThreadForCompletionPort || |
1458 | needGateThreadForWorkerThreads || |
1459 | needGateThreadForWorkerTracking)) |
1460 | { |
1461 | // |
1462 | // It looks like we shouldn't be running. But another thread may now tell us to run. If so, they will set GateThreadStatus |
1463 | // back to GATE_THREAD_STATUS_REQUESTED. |
1464 | // |
1465 | previousStatus = FastInterlockCompareExchange(&GateThreadStatus, GATE_THREAD_STATUS_NOT_RUNNING, GATE_THREAD_STATUS_WAITING_FOR_REQUEST); |
1466 | if (previousStatus == GATE_THREAD_STATUS_WAITING_FOR_REQUEST) |
1467 | return false; |
1468 | } |
1469 | } |
1470 | |
1471 | |
1472 | _ASSERTE(GateThreadStatus == GATE_THREAD_STATUS_WAITING_FOR_REQUEST || |
1473 | GateThreadStatus == GATE_THREAD_STATUS_REQUESTED); |
1474 | return true; |
1475 | } |
1476 | |
1477 | |
1478 | |
1479 | //************************************************************************ |
1480 | void ThreadpoolMgr::EnqueueWorkRequest(WorkRequest* workRequest) |
1481 | { |
1482 | CONTRACTL |
1483 | { |
1484 | NOTHROW; |
1485 | MODE_ANY; |
1486 | GC_NOTRIGGER; |
1487 | } |
1488 | CONTRACTL_END; |
1489 | |
1490 | AppendWorkRequest(workRequest); |
1491 | } |
1492 | |
1493 | WorkRequest* ThreadpoolMgr::DequeueWorkRequest() |
1494 | { |
1495 | WorkRequest* entry = NULL; |
1496 | CONTRACT(WorkRequest*) |
1497 | { |
1498 | NOTHROW; |
1499 | GC_NOTRIGGER; |
1500 | MODE_PREEMPTIVE; |
1501 | |
1502 | POSTCONDITION(CheckPointer(entry, NULL_OK)); |
1503 | } CONTRACT_END; |
1504 | |
1505 | entry = RemoveWorkRequest(); |
1506 | |
1507 | RETURN entry; |
1508 | } |
1509 | |
1510 | DWORD WINAPI ThreadpoolMgr::ExecuteHostRequest(PVOID pArg) |
1511 | { |
1512 | CONTRACTL |
1513 | { |
1514 | THROWS; |
1515 | GC_TRIGGERS; |
1516 | MODE_ANY; |
1517 | } |
1518 | CONTRACTL_END; |
1519 | |
1520 | ThreadLocaleHolder localeHolder; |
1521 | |
1522 | bool foundWork, wasNotRecalled; |
1523 | ExecuteWorkRequest(&foundWork, &wasNotRecalled); |
1524 | return ERROR_SUCCESS; |
1525 | } |
1526 | |
1527 | void ThreadpoolMgr::ExecuteWorkRequest(bool* foundWork, bool* wasNotRecalled) |
1528 | { |
1529 | CONTRACTL |
1530 | { |
1531 | THROWS; // QueueUserWorkItem can throw |
1532 | GC_TRIGGERS; |
1533 | MODE_ANY; |
1534 | } |
1535 | CONTRACTL_END; |
1536 | |
1537 | IPerAppDomainTPCount* pAdCount; |
1538 | |
1539 | LONG index = PerAppDomainTPCountList::GetAppDomainIndexForThreadpoolDispatch(); |
1540 | |
1541 | if (index == 0) |
1542 | { |
1543 | *foundWork = false; |
1544 | *wasNotRecalled = true; |
1545 | return; |
1546 | } |
1547 | |
1548 | if (index == -1) |
1549 | { |
1550 | pAdCount = PerAppDomainTPCountList::GetUnmanagedTPCount(); |
1551 | } |
1552 | else |
1553 | { |
1554 | |
1555 | pAdCount = PerAppDomainTPCountList::GetPerAppdomainCount(TPIndex((DWORD)index)); |
1556 | _ASSERTE(pAdCount); |
1557 | } |
1558 | |
1559 | pAdCount->DispatchWorkItem(foundWork, wasNotRecalled); |
1560 | } |
1561 | |
1562 | //-------------------------------------------------------------------------- |
1563 | //This function informs the thread scheduler that the first requests has been |
1564 | //queued on an appdomain, or it's the first unmanaged TP request. |
1565 | //Arguments: |
1566 | // UnmanagedTP: Indicates that the request arises from the unmanaged |
1567 | //part of Thread Pool. |
1568 | //Assumptions: |
1569 | // This function must be called under a per-appdomain lock or the |
1570 | //correct lock under unmanaged TP queue. |
1571 | // |
1572 | BOOL ThreadpoolMgr::SetAppDomainRequestsActive(BOOL UnmanagedTP) |
1573 | { |
1574 | CONTRACTL |
1575 | { |
1576 | NOTHROW; |
1577 | MODE_ANY; |
1578 | GC_TRIGGERS; |
1579 | SO_INTOLERANT; |
1580 | } |
1581 | CONTRACTL_END; |
1582 | |
1583 | BOOL fShouldSignalEvent = FALSE; |
1584 | |
1585 | IPerAppDomainTPCount* pAdCount; |
1586 | |
1587 | if(UnmanagedTP) |
1588 | { |
1589 | pAdCount = PerAppDomainTPCountList::GetUnmanagedTPCount(); |
1590 | _ASSERTE(pAdCount); |
1591 | } |
1592 | else |
1593 | { |
1594 | Thread* pCurThread = GetThread(); |
1595 | _ASSERTE( pCurThread); |
1596 | |
1597 | AppDomain* pAppDomain = pCurThread->GetDomain(); |
1598 | _ASSERTE(pAppDomain); |
1599 | |
1600 | TPIndex tpindex = pAppDomain->GetTPIndex(); |
1601 | pAdCount = PerAppDomainTPCountList::GetPerAppdomainCount(tpindex); |
1602 | |
1603 | _ASSERTE(pAdCount); |
1604 | } |
1605 | |
1606 | pAdCount->SetAppDomainRequestsActive(); |
1607 | |
1608 | return fShouldSignalEvent; |
1609 | } |
1610 | |
1611 | void ThreadpoolMgr::ClearAppDomainRequestsActive(BOOL UnmanagedTP, BOOL AdUnloading, LONG id) |
1612 | //-------------------------------------------------------------------------- |
1613 | //This function informs the thread scheduler that the kast request has been |
1614 | //dequeued on an appdomain, or it's the last unmanaged TP request. |
1615 | //Arguments: |
1616 | // UnmanagedTP: Indicates that the request arises from the unmanaged |
1617 | //part of Thread Pool. |
1618 | // id: Indicates the id of the appdomain. The id is needed as this |
1619 | //function can be called (indirectly) from the appdomain unload thread from |
1620 | //unmanaged code to clear per-appdomain state during rude unload. |
1621 | //Assumptions: |
1622 | // This function must be called under a per-appdomain lock or the |
1623 | //correct lock under unmanaged TP queue. |
1624 | // |
1625 | { |
1626 | CONTRACTL |
1627 | { |
1628 | NOTHROW; |
1629 | MODE_ANY; |
1630 | GC_TRIGGERS; |
1631 | SO_INTOLERANT; |
1632 | } |
1633 | CONTRACTL_END; |
1634 | |
1635 | IPerAppDomainTPCount* pAdCount; |
1636 | |
1637 | if(UnmanagedTP) |
1638 | { |
1639 | pAdCount = PerAppDomainTPCountList::GetUnmanagedTPCount(); |
1640 | _ASSERTE(pAdCount); |
1641 | } |
1642 | else |
1643 | { |
1644 | if (AdUnloading) |
1645 | { |
1646 | pAdCount = PerAppDomainTPCountList::GetPerAppdomainCount(TPIndex(id)); |
1647 | } |
1648 | else |
1649 | { |
1650 | Thread* pCurThread = GetThread(); |
1651 | _ASSERTE( pCurThread); |
1652 | |
1653 | AppDomain* pAppDomain = pCurThread->GetDomain(); |
1654 | _ASSERTE(pAppDomain); |
1655 | |
1656 | TPIndex tpindex = pAppDomain->GetTPIndex(); |
1657 | |
1658 | pAdCount = PerAppDomainTPCountList::GetPerAppdomainCount(tpindex); |
1659 | } |
1660 | |
1661 | _ASSERTE(pAdCount); |
1662 | } |
1663 | |
1664 | pAdCount->ClearAppDomainRequestsActive(); |
1665 | } |
1666 | |
1667 | |
1668 | // Remove a block from the appropriate recycleList and return. |
1669 | // If recycleList is empty, fall back to new. |
1670 | LPVOID ThreadpoolMgr::GetRecycledMemory(enum MemType memType) |
1671 | { |
1672 | LPVOID result = NULL; |
1673 | CONTRACT(LPVOID) |
1674 | { |
1675 | THROWS; |
1676 | GC_NOTRIGGER; |
1677 | MODE_ANY; |
1678 | INJECT_FAULT(COMPlusThrowOM()); |
1679 | POSTCONDITION(CheckPointer(result)); |
1680 | } CONTRACT_END; |
1681 | |
1682 | if(RecycledLists.IsInitialized()) |
1683 | { |
1684 | RecycledListInfo& list = RecycledLists.GetRecycleMemoryInfo( memType ); |
1685 | |
1686 | result = list.Remove(); |
1687 | } |
1688 | |
1689 | if(result == NULL) |
1690 | { |
1691 | switch (memType) |
1692 | { |
1693 | case MEMTYPE_DelegateInfo: |
1694 | result = new DelegateInfo; |
1695 | break; |
1696 | case MEMTYPE_AsyncCallback: |
1697 | result = new AsyncCallback; |
1698 | break; |
1699 | case MEMTYPE_WorkRequest: |
1700 | result = new WorkRequest; |
1701 | break; |
1702 | default: |
1703 | _ASSERTE(!"Unknown Memtype" ); |
1704 | result = NULL; |
1705 | break; |
1706 | } |
1707 | } |
1708 | |
1709 | RETURN result; |
1710 | } |
1711 | |
1712 | // Insert freed block in recycle list. If list is full, return to system heap |
1713 | void ThreadpoolMgr::RecycleMemory(LPVOID mem, enum MemType memType) |
1714 | { |
1715 | CONTRACTL |
1716 | { |
1717 | NOTHROW; |
1718 | GC_NOTRIGGER; |
1719 | SO_TOLERANT; |
1720 | MODE_ANY; |
1721 | } |
1722 | CONTRACTL_END; |
1723 | |
1724 | if(RecycledLists.IsInitialized()) |
1725 | { |
1726 | RecycledListInfo& list = RecycledLists.GetRecycleMemoryInfo( memType ); |
1727 | |
1728 | if(list.CanInsert()) |
1729 | { |
1730 | list.Insert( mem ); |
1731 | return; |
1732 | } |
1733 | } |
1734 | |
1735 | switch (memType) |
1736 | { |
1737 | case MEMTYPE_DelegateInfo: |
1738 | delete (DelegateInfo*) mem; |
1739 | break; |
1740 | case MEMTYPE_AsyncCallback: |
1741 | delete (AsyncCallback*) mem; |
1742 | break; |
1743 | case MEMTYPE_WorkRequest: |
1744 | delete (WorkRequest*) mem; |
1745 | break; |
1746 | default: |
1747 | _ASSERTE(!"Unknown Memtype" ); |
1748 | |
1749 | } |
1750 | } |
1751 | |
1752 | #define THROTTLE_RATE 0.10 /* rate by which we increase the delay as number of threads increase */ |
1753 | |
1754 | // This is to avoid the 64KB/1MB aliasing problem present on Pentium 4 processors, |
1755 | // which can significantly impact performance with HyperThreading enabled |
1756 | DWORD WINAPI ThreadpoolMgr::intermediateThreadProc(PVOID arg) |
1757 | { |
1758 | WRAPPER_NO_CONTRACT; |
1759 | STATIC_CONTRACT_SO_INTOLERANT; |
1760 | |
1761 | offset_counter++; |
1762 | if (offset_counter * offset_multiplier > (int)GetOsPageSize()) |
1763 | offset_counter = 0; |
1764 | |
1765 | (void)_alloca(offset_counter * offset_multiplier); |
1766 | |
1767 | intermediateThreadParam* param = (intermediateThreadParam*)arg; |
1768 | |
1769 | LPTHREAD_START_ROUTINE ThreadFcnPtr = param->lpThreadFunction; |
1770 | PVOID args = param->lpArg; |
1771 | delete param; |
1772 | |
1773 | return ThreadFcnPtr(args); |
1774 | } |
1775 | |
1776 | Thread* ThreadpoolMgr::CreateUnimpersonatedThread(LPTHREAD_START_ROUTINE lpStartAddress, LPVOID lpArgs, BOOL *pIsCLRThread) |
1777 | { |
1778 | STATIC_CONTRACT_NOTHROW; |
1779 | if (GetThread()) { STATIC_CONTRACT_GC_TRIGGERS;} else {DISABLED(STATIC_CONTRACT_GC_NOTRIGGER);} |
1780 | STATIC_CONTRACT_MODE_ANY; |
1781 | /* cannot use contract because of SEH |
1782 | CONTRACTL |
1783 | { |
1784 | NOTHROW; |
1785 | GC_NOTRIGGER; |
1786 | MODE_ANY; |
1787 | } |
1788 | CONTRACTL_END;*/ |
1789 | |
1790 | Thread* pThread = NULL; |
1791 | |
1792 | if (g_fEEStarted) { |
1793 | *pIsCLRThread = TRUE; |
1794 | } |
1795 | else |
1796 | *pIsCLRThread = FALSE; |
1797 | if (*pIsCLRThread) { |
1798 | EX_TRY |
1799 | { |
1800 | pThread = SetupUnstartedThread(); |
1801 | } |
1802 | EX_CATCH |
1803 | { |
1804 | pThread = NULL; |
1805 | } |
1806 | EX_END_CATCH(SwallowAllExceptions); |
1807 | if (pThread == NULL) { |
1808 | return NULL; |
1809 | } |
1810 | } |
1811 | DWORD threadId; |
1812 | BOOL bOK = FALSE; |
1813 | HANDLE threadHandle = NULL; |
1814 | |
1815 | if (*pIsCLRThread) { |
1816 | // CreateNewThread takes care of reverting any impersonation - so dont do anything here. |
1817 | bOK = pThread->CreateNewThread(0, // default stack size |
1818 | lpStartAddress, |
1819 | lpArgs, //arguments |
1820 | W(".NET ThreadPool Worker" )); |
1821 | } |
1822 | else { |
1823 | #ifndef FEATURE_PAL |
1824 | HandleHolder token; |
1825 | BOOL bReverted = FALSE; |
1826 | bOK = RevertIfImpersonated(&bReverted, &token); |
1827 | if (bOK != TRUE) |
1828 | return NULL; |
1829 | #endif // !FEATURE_PAL |
1830 | NewHolder<intermediateThreadParam> lpThreadArgs(new (nothrow) intermediateThreadParam); |
1831 | if (lpThreadArgs != NULL) |
1832 | { |
1833 | lpThreadArgs->lpThreadFunction = lpStartAddress; |
1834 | lpThreadArgs->lpArg = lpArgs; |
1835 | threadHandle = CreateThread(NULL, // security descriptor |
1836 | 0, // default stack size |
1837 | intermediateThreadProc, |
1838 | lpThreadArgs, // arguments |
1839 | CREATE_SUSPENDED, |
1840 | &threadId); |
1841 | #ifndef FEATURE_PAL |
1842 | SetThreadName(threadHandle, W(".NET ThreadPool Worker" )); |
1843 | #endif // !FEATURE_PAL |
1844 | if (threadHandle != NULL) |
1845 | lpThreadArgs.SuppressRelease(); |
1846 | } |
1847 | #ifndef FEATURE_PAL |
1848 | UndoRevert(bReverted, token); |
1849 | #endif // !FEATURE_PAL |
1850 | } |
1851 | |
1852 | if (*pIsCLRThread && !bOK) |
1853 | { |
1854 | pThread->DecExternalCount(FALSE); |
1855 | pThread = NULL; |
1856 | } |
1857 | |
1858 | if (*pIsCLRThread) { |
1859 | return pThread; |
1860 | } |
1861 | else |
1862 | return (Thread*)threadHandle; |
1863 | } |
1864 | |
1865 | |
1866 | BOOL ThreadpoolMgr::CreateWorkerThread() |
1867 | { |
1868 | CONTRACTL |
1869 | { |
1870 | if (GetThread()) { GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);} |
1871 | NOTHROW; |
1872 | MODE_ANY; // We may try to add a worker thread while queuing a work item thru an fcall |
1873 | } |
1874 | CONTRACTL_END; |
1875 | |
1876 | Thread *pThread; |
1877 | BOOL fIsCLRThread; |
1878 | if ((pThread = CreateUnimpersonatedThread(WorkerThreadStart, NULL, &fIsCLRThread)) != NULL) |
1879 | { |
1880 | if (fIsCLRThread) { |
1881 | pThread->ChooseThreadCPUGroupAffinity(); |
1882 | pThread->StartThread(); |
1883 | } |
1884 | else { |
1885 | DWORD status; |
1886 | status = ResumeThread((HANDLE)pThread); |
1887 | _ASSERTE(status != (DWORD) (-1)); |
1888 | CloseHandle((HANDLE)pThread); // we don't need this anymore |
1889 | } |
1890 | |
1891 | return TRUE; |
1892 | } |
1893 | |
1894 | return FALSE; |
1895 | } |
1896 | |
1897 | |
1898 | DWORD WINAPI ThreadpoolMgr::WorkerThreadStart(LPVOID lpArgs) |
1899 | { |
1900 | ClrFlsSetThreadType (ThreadType_Threadpool_Worker); |
1901 | |
1902 | CONTRACTL |
1903 | { |
1904 | THROWS; |
1905 | GC_TRIGGERS; |
1906 | MODE_PREEMPTIVE; |
1907 | SO_INTOLERANT; |
1908 | } |
1909 | CONTRACTL_END; |
1910 | |
1911 | Thread *pThread = NULL; |
1912 | DWORD dwSwitchCount = 0; |
1913 | BOOL fThreadInit = FALSE; |
1914 | |
1915 | ThreadCounter::Counts counts, oldCounts, newCounts; |
1916 | bool foundWork = true, wasNotRecalled = true; |
1917 | |
1918 | counts = WorkerCounter.GetCleanCounts(); |
1919 | FireEtwThreadPoolWorkerThreadStart(counts.NumActive, counts.NumRetired, GetClrInstanceId()); |
1920 | |
1921 | #ifdef FEATURE_COMINTEROP |
1922 | BOOL fCoInited = FALSE; |
1923 | // Threadpool threads should be initialized as MTA. If we are unable to do so, |
1924 | // return failure. |
1925 | { |
1926 | fCoInited = SUCCEEDED(::CoInitializeEx(NULL, COINIT_MULTITHREADED)); |
1927 | if (!fCoInited) |
1928 | { |
1929 | goto Exit; |
1930 | } |
1931 | } |
1932 | #endif // FEATURE_COMINTEROP |
1933 | Work: |
1934 | |
1935 | if (!fThreadInit) { |
1936 | if (g_fEEStarted) { |
1937 | pThread = SetupThreadNoThrow(); |
1938 | if (pThread == NULL) { |
1939 | __SwitchToThread(0, ++dwSwitchCount); |
1940 | goto Work; |
1941 | } |
1942 | |
1943 | // converted to CLRThread and added to ThreadStore, pick an group affinity for this thread |
1944 | pThread->ChooseThreadCPUGroupAffinity(); |
1945 | |
1946 | #ifdef FEATURE_COMINTEROP |
1947 | if (pThread->SetApartment(Thread::AS_InMTA, TRUE) != Thread::AS_InMTA) |
1948 | { |
1949 | // counts volatile read paired with CompareExchangeCounts loop set |
1950 | counts = WorkerCounter.DangerousGetDirtyCounts(); |
1951 | while (true) |
1952 | { |
1953 | newCounts = counts; |
1954 | newCounts.NumActive--; |
1955 | newCounts.NumWorking--; |
1956 | oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts); |
1957 | if (oldCounts == counts) |
1958 | break; |
1959 | counts = oldCounts; |
1960 | } |
1961 | goto Exit; |
1962 | } |
1963 | #endif // FEATURE_COMINTEROP |
1964 | |
1965 | pThread->SetBackground(TRUE); |
1966 | fThreadInit = TRUE; |
1967 | } |
1968 | } |
1969 | |
1970 | GCX_PREEMP_NO_DTOR(); |
1971 | _ASSERTE(pThread == NULL || !pThread->PreemptiveGCDisabled()); |
1972 | |
1973 | // make sure there's really work. If not, go back to sleep |
1974 | |
1975 | // counts volatile read paired with CompareExchangeCounts loop set |
1976 | counts = WorkerCounter.DangerousGetDirtyCounts(); |
1977 | while (true) |
1978 | { |
1979 | _ASSERTE(counts.NumActive > 0); |
1980 | _ASSERTE(counts.NumWorking > 0); |
1981 | |
1982 | newCounts = counts; |
1983 | |
1984 | bool retired; |
1985 | |
1986 | if (counts.NumActive > counts.MaxWorking) |
1987 | { |
1988 | newCounts.NumActive--; |
1989 | newCounts.NumRetired++; |
1990 | retired = true; |
1991 | } |
1992 | else |
1993 | { |
1994 | retired = false; |
1995 | |
1996 | if (foundWork) |
1997 | break; |
1998 | } |
1999 | |
2000 | newCounts.NumWorking--; |
2001 | |
2002 | oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts); |
2003 | |
2004 | if (oldCounts == counts) |
2005 | { |
2006 | if (retired) |
2007 | goto Retire; |
2008 | else |
2009 | goto WaitForWork; |
2010 | } |
2011 | |
2012 | counts = oldCounts; |
2013 | } |
2014 | |
2015 | if (GCHeapUtilities::IsGCInProgress(TRUE)) |
2016 | { |
2017 | // GC is imminent, so wait until GC is complete before executing next request. |
2018 | // this reduces in-flight objects allocated right before GC, easing the GC's work |
2019 | GCHeapUtilities::WaitForGCCompletion(TRUE); |
2020 | } |
2021 | |
2022 | { |
2023 | ThreadLocaleHolder localeHolder; |
2024 | |
2025 | ThreadpoolMgr::UpdateLastDequeueTime(); |
2026 | ThreadpoolMgr::ExecuteWorkRequest(&foundWork, &wasNotRecalled); |
2027 | } |
2028 | |
2029 | if (foundWork) |
2030 | { |
2031 | // Reset TLS etc. for next WorkRequest. |
2032 | if (pThread == NULL) |
2033 | pThread = GetThread(); |
2034 | |
2035 | if (pThread) |
2036 | { |
2037 | if (pThread->IsAbortRequested()) |
2038 | pThread->EEResetAbort(Thread::TAR_ALL); |
2039 | pThread->InternalReset(); |
2040 | } |
2041 | } |
2042 | |
2043 | if (wasNotRecalled) |
2044 | goto Work; |
2045 | |
2046 | Retire: |
2047 | |
2048 | counts = WorkerCounter.GetCleanCounts(); |
2049 | FireEtwThreadPoolWorkerThreadRetirementStart(counts.NumActive, counts.NumRetired, GetClrInstanceId()); |
2050 | |
2051 | // It's possible that some work came in just before we decremented the active thread count, in which |
2052 | // case whoever queued that work may be expecting us to pick it up - so they would not have signalled |
2053 | // the worker semaphore. If there are other threads waiting, they will never be woken up, because |
2054 | // whoever queued the work expects that it's already been picked up. The solution is to signal the semaphore |
2055 | // if there's any work available. |
2056 | if (PerAppDomainTPCountList::AreRequestsPendingInAnyAppDomains()) |
2057 | MaybeAddWorkingWorker(); |
2058 | |
2059 | while (true) |
2060 | { |
2061 | RetryRetire: |
2062 | if (RetiredWorkerSemaphore->Wait(AppX::IsAppXProcess() ? WorkerTimeoutAppX : WorkerTimeout)) |
2063 | { |
2064 | foundWork = true; |
2065 | |
2066 | counts = WorkerCounter.GetCleanCounts(); |
2067 | FireEtwThreadPoolWorkerThreadRetirementStop(counts.NumActive, counts.NumRetired, GetClrInstanceId()); |
2068 | goto Work; |
2069 | } |
2070 | |
2071 | if (!IsIoPending()) |
2072 | { |
2073 | // |
2074 | // We're going to exit. There's a nasty race here. We're about to decrement NumRetired, |
2075 | // since we're going to exit. Once we've done that, nobody will expect this thread |
2076 | // to be waiting for RetiredWorkerSemaphore. But between now and then, other threads still |
2077 | // think we're waiting on the semaphore, and they will happily do the following to try to |
2078 | // wake us up: |
2079 | // |
2080 | // 1) Decrement NumRetired |
2081 | // 2) Increment NumActive |
2082 | // 3) Increment NumWorking |
2083 | // 4) Signal RetiredWorkerSemaphore |
2084 | // |
2085 | // We will not receive that signal. If we don't do something special here, |
2086 | // we will decrement NumRetired an extra time, and leave the world thinking there |
2087 | // are fewer retired threads, and more working threads than reality. |
2088 | // |
2089 | // What can we do about this? First, we *need* to decrement NumRetired. If someone did it before us, |
2090 | // it might go negative. This is the easiest way to tell that we've encountered this race. In that case, |
2091 | // we will simply not commit the decrement, swallow the signal that was sent, and proceed as if we |
2092 | // got WAIT_OBJECT_0 in the wait above. |
2093 | // |
2094 | // If we don't hit zero while decrementing NumRetired, we still may have encountered this race. But |
2095 | // if we don't hit zero, then there's another retired thread that will pick up this signal. So it's ok |
2096 | // to exit. |
2097 | // |
2098 | |
2099 | // counts volatile read paired with CompareExchangeCounts loop set |
2100 | counts = WorkerCounter.DangerousGetDirtyCounts(); |
2101 | while (true) |
2102 | { |
2103 | if (counts.NumRetired == 0) |
2104 | goto RetryRetire; |
2105 | |
2106 | newCounts = counts; |
2107 | newCounts.NumRetired--; |
2108 | |
2109 | oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts); |
2110 | if (oldCounts == counts) |
2111 | { |
2112 | counts = newCounts; |
2113 | break; |
2114 | } |
2115 | counts = oldCounts; |
2116 | } |
2117 | |
2118 | FireEtwThreadPoolWorkerThreadRetirementStop(counts.NumActive, counts.NumRetired, GetClrInstanceId()); |
2119 | goto Exit; |
2120 | } |
2121 | } |
2122 | |
2123 | WaitForWork: |
2124 | |
2125 | // It's possible that we decided we had no work just before some work came in, |
2126 | // but reduced the worker count *after* the work came in. In this case, we might |
2127 | // miss the notification of available work. So we make a sweep through the ADs here, |
2128 | // and wake up a thread (maybe this one!) if there is work to do. |
2129 | if (PerAppDomainTPCountList::AreRequestsPendingInAnyAppDomains()) |
2130 | { |
2131 | foundWork = true; |
2132 | MaybeAddWorkingWorker(); |
2133 | } |
2134 | |
2135 | FireEtwThreadPoolWorkerThreadWait(counts.NumActive, counts.NumRetired, GetClrInstanceId()); |
2136 | |
2137 | RetryWaitForWork: |
2138 | if (WorkerSemaphore->Wait(AppX::IsAppXProcess() ? WorkerTimeoutAppX : WorkerTimeout, WorkerThreadSpinLimit, NumberOfProcessors)) |
2139 | { |
2140 | foundWork = true; |
2141 | goto Work; |
2142 | } |
2143 | |
2144 | if (!IsIoPending()) |
2145 | { |
2146 | // |
2147 | // We timed out, and are about to exit. This puts us in a very similar situation to the |
2148 | // retirement case above - someone may think we're still waiting, and go ahead and: |
2149 | // |
2150 | // 1) Increment NumWorking |
2151 | // 2) Signal WorkerSemaphore |
2152 | // |
2153 | // The solution is much like retirement; when we're decrementing NumActive, we need to make |
2154 | // sure it doesn't drop below NumWorking. If it would, then we need to go back and wait |
2155 | // again. |
2156 | // |
2157 | |
2158 | DangerousNonHostedSpinLockHolder tal(&ThreadAdjustmentLock); |
2159 | |
2160 | // counts volatile read paired with CompareExchangeCounts loop set |
2161 | counts = WorkerCounter.DangerousGetDirtyCounts(); |
2162 | while (true) |
2163 | { |
2164 | if (counts.NumActive == counts.NumWorking) |
2165 | { |
2166 | goto RetryWaitForWork; |
2167 | } |
2168 | |
2169 | newCounts = counts; |
2170 | newCounts.NumActive--; |
2171 | |
2172 | // if we timed out while active, then Hill Climbing needs to be told that we need fewer threads |
2173 | newCounts.MaxWorking = max(MinLimitTotalWorkerThreads, min(newCounts.NumActive, newCounts.MaxWorking)); |
2174 | |
2175 | oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts); |
2176 | |
2177 | if (oldCounts == counts) |
2178 | { |
2179 | HillClimbingInstance.ForceChange(newCounts.MaxWorking, ThreadTimedOut); |
2180 | goto Exit; |
2181 | } |
2182 | |
2183 | counts = oldCounts; |
2184 | } |
2185 | } |
2186 | else |
2187 | { |
2188 | goto RetryWaitForWork; |
2189 | } |
2190 | |
2191 | Exit: |
2192 | |
2193 | #ifdef FEATURE_COMINTEROP |
2194 | if (pThread) { |
2195 | pThread->SetApartment(Thread::AS_Unknown, TRUE); |
2196 | pThread->CoUninitialize(); |
2197 | } |
2198 | |
2199 | // Couninit the worker thread |
2200 | if (fCoInited) |
2201 | { |
2202 | CoUninitialize(); |
2203 | } |
2204 | #endif |
2205 | |
2206 | if (pThread) { |
2207 | pThread->ClearThreadCPUGroupAffinity(); |
2208 | |
2209 | DestroyThread(pThread); |
2210 | } |
2211 | |
2212 | _ASSERTE(!IsIoPending()); |
2213 | |
2214 | counts = WorkerCounter.GetCleanCounts(); |
2215 | FireEtwThreadPoolWorkerThreadStop(counts.NumActive, counts.NumRetired, GetClrInstanceId()); |
2216 | |
2217 | return ERROR_SUCCESS; |
2218 | } |
2219 | |
2220 | |
2221 | BOOL ThreadpoolMgr::SuspendProcessing() |
2222 | { |
2223 | CONTRACTL |
2224 | { |
2225 | NOTHROW; |
2226 | GC_NOTRIGGER; |
2227 | MODE_PREEMPTIVE; |
2228 | } |
2229 | CONTRACTL_END; |
2230 | |
2231 | BOOL shouldRetire = TRUE; |
2232 | DWORD sleepInterval = SUSPEND_TIME; |
2233 | int oldCpuUtilization = cpuUtilization; |
2234 | for (int i = 0; i < shouldRetire; i++) |
2235 | { |
2236 | __SwitchToThread(sleepInterval, CALLER_LIMITS_SPINNING); |
2237 | if ((cpuUtilization <= (oldCpuUtilization - 4))) |
2238 | { // if cpu util. dips by 4% or more, then put it back in circulation |
2239 | shouldRetire = FALSE; |
2240 | break; |
2241 | } |
2242 | } |
2243 | |
2244 | return shouldRetire; |
2245 | } |
2246 | |
2247 | |
2248 | // this should only be called by unmanaged thread (i.e. there should be no mgd |
2249 | // caller on the stack) since we are swallowing terminal exceptions |
2250 | DWORD ThreadpoolMgr::SafeWait(CLREvent * ev, DWORD sleepTime, BOOL alertable) |
2251 | { |
2252 | STATIC_CONTRACT_NOTHROW; |
2253 | STATIC_CONTRACT_GC_NOTRIGGER; |
2254 | STATIC_CONTRACT_MODE_PREEMPTIVE; |
2255 | /* cannot use contract because of SEH |
2256 | CONTRACTL |
2257 | { |
2258 | NOTHROW; |
2259 | GC_NOTRIGGER; |
2260 | MODE_PREEMPTIVE; |
2261 | } |
2262 | CONTRACTL_END;*/ |
2263 | |
2264 | DWORD status = WAIT_TIMEOUT; |
2265 | EX_TRY |
2266 | { |
2267 | status = ev->Wait(sleepTime,FALSE); |
2268 | } |
2269 | EX_CATCH |
2270 | { |
2271 | } |
2272 | EX_END_CATCH(SwallowAllExceptions) |
2273 | return status; |
2274 | } |
2275 | |
2276 | /************************************************************************/ |
2277 | |
2278 | BOOL ThreadpoolMgr::RegisterWaitForSingleObject(PHANDLE phNewWaitObject, |
2279 | HANDLE hWaitObject, |
2280 | WAITORTIMERCALLBACK Callback, |
2281 | PVOID Context, |
2282 | ULONG timeout, |
2283 | DWORD dwFlag ) |
2284 | { |
2285 | CONTRACTL |
2286 | { |
2287 | THROWS; |
2288 | MODE_ANY; |
2289 | if (GetThread()) { GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);} |
2290 | } |
2291 | CONTRACTL_END; |
2292 | EnsureInitialized(); |
2293 | |
2294 | ThreadCB* threadCB; |
2295 | { |
2296 | CrstHolder csh(&WaitThreadsCriticalSection); |
2297 | |
2298 | threadCB = FindWaitThread(); |
2299 | } |
2300 | |
2301 | *phNewWaitObject = NULL; |
2302 | |
2303 | if (threadCB) |
2304 | { |
2305 | WaitInfo* waitInfo = new (nothrow) WaitInfo; |
2306 | |
2307 | if (waitInfo == NULL) |
2308 | return FALSE; |
2309 | |
2310 | waitInfo->waitHandle = hWaitObject; |
2311 | waitInfo->Callback = Callback; |
2312 | waitInfo->Context = Context; |
2313 | waitInfo->timeout = timeout; |
2314 | waitInfo->flag = dwFlag; |
2315 | waitInfo->threadCB = threadCB; |
2316 | waitInfo->state = 0; |
2317 | waitInfo->refCount = 1; // safe to do this since no wait has yet been queued, so no other thread could be modifying this |
2318 | waitInfo->ExternalCompletionEvent = INVALID_HANDLE; |
2319 | waitInfo->ExternalEventSafeHandle = NULL; |
2320 | waitInfo->handleOwningAD = (ADID) 0; |
2321 | |
2322 | waitInfo->timer.startTime = GetTickCount(); |
2323 | waitInfo->timer.remainingTime = timeout; |
2324 | |
2325 | *phNewWaitObject = waitInfo; |
2326 | |
2327 | // We fire the "enqueue" ETW event here, to "mark" the thread that had called the API, rather than the |
2328 | // thread that will PostQueuedCompletionStatus (the dedicated WaitThread). |
2329 | // This event correlates with ThreadPoolIODequeue in ThreadpoolMgr::AsyncCallbackCompletion |
2330 | if (ETW_EVENT_ENABLED(MICROSOFT_WINDOWS_DOTNETRUNTIME_PROVIDER_Context, ThreadPoolIOEnqueue)) |
2331 | FireEtwThreadPoolIOEnqueue((LPOVERLAPPED)waitInfo, reinterpret_cast<void*>(Callback), (dwFlag & WAIT_SINGLE_EXECUTION) == 0, GetClrInstanceId()); |
2332 | |
2333 | BOOL status = QueueUserAPC((PAPCFUNC)InsertNewWaitForSelf, threadCB->threadHandle, (size_t) waitInfo); |
2334 | |
2335 | if (status == FALSE) |
2336 | { |
2337 | *phNewWaitObject = NULL; |
2338 | delete waitInfo; |
2339 | } |
2340 | |
2341 | return status; |
2342 | } |
2343 | |
2344 | return FALSE; |
2345 | } |
2346 | |
2347 | |
2348 | // Returns a wait thread that can accomodate another wait request. The |
2349 | // caller is responsible for synchronizing access to the WaitThreadsHead |
2350 | ThreadpoolMgr::ThreadCB* ThreadpoolMgr::FindWaitThread() |
2351 | { |
2352 | CONTRACTL |
2353 | { |
2354 | THROWS; // CreateWaitThread can throw |
2355 | MODE_ANY; |
2356 | GC_TRIGGERS; |
2357 | } |
2358 | CONTRACTL_END; |
2359 | do |
2360 | { |
2361 | for (LIST_ENTRY* Node = (LIST_ENTRY*) WaitThreadsHead.Flink ; |
2362 | Node != &WaitThreadsHead ; |
2363 | Node = (LIST_ENTRY*)Node->Flink) |
2364 | { |
2365 | _ASSERTE(offsetof(WaitThreadInfo,link) == 0); |
2366 | |
2367 | ThreadCB* threadCB = ((WaitThreadInfo*) Node)->threadCB; |
2368 | |
2369 | if (threadCB->NumWaitHandles < MAX_WAITHANDLES) // this test and following ... |
2370 | |
2371 | { |
2372 | InterlockedIncrement(&threadCB->NumWaitHandles); // ... increment are protected by WaitThreadsCriticalSection. |
2373 | // but there might be a concurrent decrement in DeactivateWait |
2374 | // or InsertNewWaitForSelf, hence the interlock |
2375 | return threadCB; |
2376 | } |
2377 | } |
2378 | |
2379 | // if reached here, there are no wait threads available, so need to create a new one |
2380 | if (!CreateWaitThread()) |
2381 | return NULL; |
2382 | |
2383 | |
2384 | // Now loop back |
2385 | } while (TRUE); |
2386 | |
2387 | } |
2388 | |
2389 | BOOL ThreadpoolMgr::CreateWaitThread() |
2390 | { |
2391 | CONTRACTL |
2392 | { |
2393 | THROWS; // CLREvent::CreateAutoEvent can throw OOM |
2394 | GC_TRIGGERS; |
2395 | MODE_ANY; |
2396 | INJECT_FAULT(COMPlusThrowOM()); |
2397 | } |
2398 | CONTRACTL_END; |
2399 | DWORD threadId; |
2400 | |
2401 | if (g_fEEShutDown & ShutDown_Finalize2){ |
2402 | // The process is shutting down. Shutdown thread has ThreadStore lock, |
2403 | // wait thread is blocked on the lock. |
2404 | return FALSE; |
2405 | } |
2406 | |
2407 | NewHolder<WaitThreadInfo> waitThreadInfo(new (nothrow) WaitThreadInfo); |
2408 | if (waitThreadInfo == NULL) |
2409 | return FALSE; |
2410 | |
2411 | NewHolder<ThreadCB> threadCB(new (nothrow) ThreadCB); |
2412 | |
2413 | if (threadCB == NULL) |
2414 | { |
2415 | return FALSE; |
2416 | } |
2417 | |
2418 | threadCB->startEvent.CreateAutoEvent(FALSE); |
2419 | HANDLE threadHandle = Thread::CreateUtilityThread(Thread::StackSize_Small, WaitThreadStart, (LPVOID)threadCB, W(".NET ThreadPool Wait" ), CREATE_SUSPENDED, &threadId); |
2420 | |
2421 | if (threadHandle == NULL) |
2422 | { |
2423 | threadCB->startEvent.CloseEvent(); |
2424 | return FALSE; |
2425 | } |
2426 | |
2427 | waitThreadInfo.SuppressRelease(); |
2428 | threadCB.SuppressRelease(); |
2429 | threadCB->threadHandle = threadHandle; |
2430 | threadCB->threadId = threadId; // may be useful for debugging otherwise not used |
2431 | threadCB->NumWaitHandles = 0; |
2432 | threadCB->NumActiveWaits = 0; |
2433 | for (int i=0; i< MAX_WAITHANDLES; i++) |
2434 | { |
2435 | InitializeListHead(&(threadCB->waitPointer[i])); |
2436 | } |
2437 | |
2438 | waitThreadInfo->threadCB = threadCB; |
2439 | |
2440 | DWORD status = ResumeThread(threadHandle); |
2441 | |
2442 | { |
2443 | // We will QueueUserAPC on the newly created thread. |
2444 | // Let us wait until the thread starts running. |
2445 | GCX_PREEMP(); |
2446 | DWORD timeout=500; |
2447 | while (TRUE) { |
2448 | if (g_fEEShutDown & ShutDown_Finalize2){ |
2449 | // The process is shutting down. Shutdown thread has ThreadStore lock, |
2450 | // wait thread is blocked on the lock. |
2451 | return FALSE; |
2452 | } |
2453 | DWORD wait_status = threadCB->startEvent.Wait(timeout, FALSE); |
2454 | if (wait_status == WAIT_OBJECT_0) { |
2455 | break; |
2456 | } |
2457 | } |
2458 | } |
2459 | threadCB->startEvent.CloseEvent(); |
2460 | |
2461 | // check to see if setup succeeded |
2462 | if (threadCB->threadHandle == NULL) |
2463 | return FALSE; |
2464 | |
2465 | InsertHeadList(&WaitThreadsHead,&waitThreadInfo->link); |
2466 | |
2467 | _ASSERTE(status != (DWORD) (-1)); |
2468 | |
2469 | return (status != (DWORD) (-1)); |
2470 | |
2471 | } |
2472 | |
2473 | // Executed as an APC on a WaitThread. Add the wait specified in pArg to the list of objects it is waiting on |
2474 | void ThreadpoolMgr::InsertNewWaitForSelf(WaitInfo* pArgs) |
2475 | { |
2476 | WRAPPER_NO_CONTRACT; |
2477 | STATIC_CONTRACT_SO_INTOLERANT; |
2478 | |
2479 | WaitInfo* waitInfo = pArgs; |
2480 | |
2481 | // the following is safe since only this thread is allowed to change the state |
2482 | if (!(waitInfo->state & WAIT_DELETE)) |
2483 | { |
2484 | waitInfo->state = (WAIT_REGISTERED | WAIT_ACTIVE); |
2485 | } |
2486 | else |
2487 | { |
2488 | // some thread unregistered the wait |
2489 | DeleteWait(waitInfo); |
2490 | return; |
2491 | } |
2492 | |
2493 | |
2494 | ThreadCB* threadCB = waitInfo->threadCB; |
2495 | |
2496 | _ASSERTE(threadCB->NumActiveWaits <= threadCB->NumWaitHandles); |
2497 | |
2498 | int index = FindWaitIndex(threadCB, waitInfo->waitHandle); |
2499 | _ASSERTE(index >= 0 && index <= threadCB->NumActiveWaits); |
2500 | |
2501 | if (index == threadCB->NumActiveWaits) |
2502 | { |
2503 | threadCB->waitHandle[threadCB->NumActiveWaits] = waitInfo->waitHandle; |
2504 | threadCB->NumActiveWaits++; |
2505 | } |
2506 | else |
2507 | { |
2508 | // this is a duplicate waithandle, so the increment in FindWaitThread |
2509 | // wasn't strictly necessary. This will avoid unnecessary thread creation. |
2510 | InterlockedDecrement(&threadCB->NumWaitHandles); |
2511 | } |
2512 | |
2513 | _ASSERTE(offsetof(WaitInfo, link) == 0); |
2514 | InsertTailList(&(threadCB->waitPointer[index]), (&waitInfo->link)); |
2515 | |
2516 | return; |
2517 | } |
2518 | |
2519 | // returns the index of the entry that matches waitHandle or next free entry if not found |
2520 | int ThreadpoolMgr::FindWaitIndex(const ThreadCB* threadCB, const HANDLE waitHandle) |
2521 | { |
2522 | LIMITED_METHOD_CONTRACT; |
2523 | |
2524 | for (int i=0;i<threadCB->NumActiveWaits; i++) |
2525 | if (threadCB->waitHandle[i] == waitHandle) |
2526 | return i; |
2527 | |
2528 | // else not found |
2529 | return threadCB->NumActiveWaits; |
2530 | } |
2531 | |
2532 | |
2533 | // if no wraparound that the timer is expired if duetime is less than current time |
2534 | // if wraparound occurred, then the timer expired if dueTime was greater than last time or dueTime is less equal to current time |
2535 | #define TimeExpired(last,now,duetime) (last <= now ? \ |
2536 | (duetime <= now && duetime >= last): \ |
2537 | (duetime >= last || duetime <= now)) |
2538 | |
2539 | #define TimeInterval(end,start) ( end > start ? (end - start) : ((0xffffffff - start) + end + 1) ) |
2540 | |
2541 | // Returns the minimum of the remaining time to reach a timeout among all the waits |
2542 | DWORD ThreadpoolMgr::MinimumRemainingWait(LIST_ENTRY* waitInfo, unsigned int numWaits) |
2543 | { |
2544 | LIMITED_METHOD_CONTRACT; |
2545 | |
2546 | unsigned int min = (unsigned int) -1; |
2547 | DWORD currentTime = GetTickCount(); |
2548 | |
2549 | for (unsigned i=0; i < numWaits ; i++) |
2550 | { |
2551 | WaitInfo* waitInfoPtr = (WaitInfo*) (waitInfo[i].Flink); |
2552 | PVOID waitInfoHead = &(waitInfo[i]); |
2553 | do |
2554 | { |
2555 | if (waitInfoPtr->timeout != INFINITE) |
2556 | { |
2557 | // compute remaining time |
2558 | DWORD elapsedTime = TimeInterval(currentTime,waitInfoPtr->timer.startTime ); |
2559 | |
2560 | __int64 remainingTime = (__int64) (waitInfoPtr->timeout) - (__int64) elapsedTime; |
2561 | |
2562 | // update remaining time |
2563 | waitInfoPtr->timer.remainingTime = remainingTime > 0 ? (int) remainingTime : 0; |
2564 | |
2565 | // ... and min |
2566 | if (waitInfoPtr->timer.remainingTime < min) |
2567 | min = waitInfoPtr->timer.remainingTime; |
2568 | } |
2569 | |
2570 | waitInfoPtr = (WaitInfo*) (waitInfoPtr->link.Flink); |
2571 | |
2572 | } while ((PVOID) waitInfoPtr != waitInfoHead); |
2573 | |
2574 | } |
2575 | return min; |
2576 | } |
2577 | |
2578 | #ifdef _MSC_VER |
2579 | #ifdef _WIN64 |
2580 | #pragma warning (disable : 4716) |
2581 | #else |
2582 | #pragma warning (disable : 4715) |
2583 | #endif |
2584 | #endif |
2585 | #ifdef _PREFAST_ |
2586 | #pragma warning(push) |
2587 | #pragma warning(disable:22008) // "Prefast integer overflow check on (0 + lval) is bogus. Tried local disable without luck, doing whole method." |
2588 | #endif |
2589 | |
2590 | DWORD WINAPI ThreadpoolMgr::WaitThreadStart(LPVOID lpArgs) |
2591 | { |
2592 | CONTRACTL |
2593 | { |
2594 | THROWS; |
2595 | GC_TRIGGERS; |
2596 | MODE_PREEMPTIVE; |
2597 | SO_TOLERANT; |
2598 | } |
2599 | CONTRACTL_END; |
2600 | |
2601 | ClrFlsSetThreadType (ThreadType_Wait); |
2602 | |
2603 | ThreadCB* threadCB = (ThreadCB*) lpArgs; |
2604 | Thread* pThread = SetupThreadNoThrow(); |
2605 | |
2606 | if (pThread == NULL) |
2607 | { |
2608 | _ASSERTE(threadCB->threadHandle != NULL); |
2609 | threadCB->threadHandle = NULL; |
2610 | } |
2611 | |
2612 | threadCB->startEvent.Set(); |
2613 | |
2614 | if (pThread == NULL) |
2615 | { |
2616 | return 0; |
2617 | } |
2618 | |
2619 | BEGIN_SO_INTOLERANT_CODE(pThread); // we probe at the top of the thread so we can safely call anything below here. |
2620 | { |
2621 | // wait threads never die. (Why?) |
2622 | for (;;) |
2623 | { |
2624 | DWORD status; |
2625 | DWORD timeout = 0; |
2626 | |
2627 | if (threadCB->NumActiveWaits == 0) |
2628 | { |
2629 | |
2630 | #undef SleepEx |
2631 | // <TODO>@TODO Consider doing a sleep for an idle period and terminating the thread if no activity</TODO> |
2632 | //We use SleepEx instead of CLRSLeepEx because CLRSleepEx calls into SQL(or other hosts) in hosted |
2633 | //scenarios. SQL does not deliver APC's, and the waithread wait insertion/deletion logic depends on |
2634 | //APC's being delivered. |
2635 | status = SleepEx(INFINITE,TRUE); |
2636 | #define SleepEx(a,b) Dont_Use_SleepEx(a,b) |
2637 | |
2638 | _ASSERTE(status == WAIT_IO_COMPLETION); |
2639 | } |
2640 | else if (IsWaitThreadAPCPending()) |
2641 | { |
2642 | //Do a sleep if an APC is pending, This was done to solve the corner case where the wait is signaled, |
2643 | //and APC to deregiter the wait never fires. That scenario leads to an infinite loop. This check would |
2644 | //allow the thread to enter alertable wait and thus cause the APC to fire. |
2645 | |
2646 | ResetWaitThreadAPCPending(); |
2647 | |
2648 | //We use SleepEx instead of CLRSLeepEx because CLRSleepEx calls into SQL(or other hosts) in hosted |
2649 | //scenarios. SQL does not deliver APC's, and the waithread wait insertion/deletion logic depends on |
2650 | //APC's being delivered. |
2651 | |
2652 | #undef SleepEx |
2653 | status = SleepEx(0,TRUE); |
2654 | #define SleepEx(a,b) Dont_Use_SleepEx(a,b) |
2655 | |
2656 | continue; |
2657 | } |
2658 | else |
2659 | { |
2660 | // compute minimum timeout. this call also updates the remainingTime field for each wait |
2661 | timeout = MinimumRemainingWait(threadCB->waitPointer,threadCB->NumActiveWaits); |
2662 | |
2663 | status = WaitForMultipleObjectsEx( threadCB->NumActiveWaits, |
2664 | threadCB->waitHandle, |
2665 | FALSE, // waitall |
2666 | timeout, |
2667 | TRUE ); // alertable |
2668 | |
2669 | _ASSERTE( (status == WAIT_TIMEOUT) || |
2670 | (status == WAIT_IO_COMPLETION) || |
2671 | //It could be that there are no waiters at this point, |
2672 | //as the APC to deregister the wait may have run. |
2673 | (status == WAIT_OBJECT_0) || |
2674 | (status >= WAIT_OBJECT_0 && status < (DWORD)(WAIT_OBJECT_0 + threadCB->NumActiveWaits)) || |
2675 | (status == WAIT_FAILED)); |
2676 | |
2677 | //It could be that the last waiter also got deregistered. |
2678 | if (threadCB->NumActiveWaits == 0) |
2679 | { |
2680 | continue; |
2681 | } |
2682 | } |
2683 | |
2684 | if (status == WAIT_IO_COMPLETION) |
2685 | continue; |
2686 | |
2687 | if (status == WAIT_TIMEOUT) |
2688 | { |
2689 | for (int i=0; i< threadCB->NumActiveWaits; i++) |
2690 | { |
2691 | WaitInfo* waitInfo = (WaitInfo*) (threadCB->waitPointer[i]).Flink; |
2692 | PVOID waitInfoHead = &(threadCB->waitPointer[i]); |
2693 | |
2694 | do |
2695 | { |
2696 | _ASSERTE(waitInfo->timer.remainingTime >= timeout); |
2697 | |
2698 | WaitInfo* wTemp = (WaitInfo*) waitInfo->link.Flink; |
2699 | |
2700 | if (waitInfo->timer.remainingTime == timeout) |
2701 | { |
2702 | ProcessWaitCompletion(waitInfo,i,TRUE); |
2703 | } |
2704 | |
2705 | waitInfo = wTemp; |
2706 | |
2707 | } while ((PVOID) waitInfo != waitInfoHead); |
2708 | } |
2709 | } |
2710 | else if (status >= WAIT_OBJECT_0 && status < (DWORD)(WAIT_OBJECT_0 + threadCB->NumActiveWaits)) |
2711 | { |
2712 | unsigned index = status - WAIT_OBJECT_0; |
2713 | WaitInfo* waitInfo = (WaitInfo*) (threadCB->waitPointer[index]).Flink; |
2714 | PVOID waitInfoHead = &(threadCB->waitPointer[index]); |
2715 | BOOL isAutoReset; |
2716 | |
2717 | // Setting to unconditional TRUE is inefficient since we will re-enter the wait and release |
2718 | // the next waiter, but short of using undocumented NT apis is the only solution. |
2719 | // Querying the state with a WaitForSingleObject is not an option as it will reset an |
2720 | // auto reset event if it has been signalled since the previous wait. |
2721 | isAutoReset = TRUE; |
2722 | |
2723 | do |
2724 | { |
2725 | WaitInfo* wTemp = (WaitInfo*) waitInfo->link.Flink; |
2726 | ProcessWaitCompletion(waitInfo,index,FALSE); |
2727 | |
2728 | waitInfo = wTemp; |
2729 | |
2730 | } while (((PVOID) waitInfo != waitInfoHead) && !isAutoReset); |
2731 | |
2732 | // If an app registers a recurring wait for an event that is always signalled (!), |
2733 | // then no apc's will be executed since the thread never enters the alertable state. |
2734 | // This can be fixed by doing the following: |
2735 | // SleepEx(0,TRUE); |
2736 | // However, it causes an unnecessary context switch. It is not worth penalizing well |
2737 | // behaved apps to protect poorly written apps. |
2738 | |
2739 | |
2740 | } |
2741 | else |
2742 | { |
2743 | _ASSERTE(status == WAIT_FAILED); |
2744 | // wait failed: application error |
2745 | // find out which wait handle caused the wait to fail |
2746 | for (int i = 0; i < threadCB->NumActiveWaits; i++) |
2747 | { |
2748 | DWORD subRet = WaitForSingleObject(threadCB->waitHandle[i], 0); |
2749 | |
2750 | if (subRet != WAIT_FAILED) |
2751 | continue; |
2752 | |
2753 | // remove all waits associated with this wait handle |
2754 | |
2755 | WaitInfo* waitInfo = (WaitInfo*) (threadCB->waitPointer[i]).Flink; |
2756 | PVOID waitInfoHead = &(threadCB->waitPointer[i]); |
2757 | |
2758 | do |
2759 | { |
2760 | WaitInfo* temp = (WaitInfo*) waitInfo->link.Flink; |
2761 | |
2762 | DeactivateNthWait(waitInfo,i); |
2763 | |
2764 | |
2765 | // Note, we cannot cleanup here since there is no way to suppress finalization |
2766 | // we will just leak, and rely on the finalizer to clean up the memory |
2767 | //if (InterlockedDecrement(&waitInfo->refCount) == 0) |
2768 | // DeleteWait(waitInfo); |
2769 | |
2770 | |
2771 | waitInfo = temp; |
2772 | |
2773 | } while ((PVOID) waitInfo != waitInfoHead); |
2774 | |
2775 | break; |
2776 | } |
2777 | } |
2778 | } |
2779 | } |
2780 | END_SO_INTOLERANT_CODE; |
2781 | |
2782 | //This is unreachable...so no return required. |
2783 | } |
2784 | #ifdef _PREFAST_ |
2785 | #pragma warning(pop) |
2786 | #endif |
2787 | |
2788 | #ifdef _MSC_VER |
2789 | #ifdef _WIN64 |
2790 | #pragma warning (default : 4716) |
2791 | #else |
2792 | #pragma warning (default : 4715) |
2793 | #endif |
2794 | #endif |
2795 | |
2796 | void ThreadpoolMgr::ProcessWaitCompletion(WaitInfo* waitInfo, |
2797 | unsigned index, |
2798 | BOOL waitTimedOut |
2799 | ) |
2800 | { |
2801 | STATIC_CONTRACT_THROWS; |
2802 | STATIC_CONTRACT_GC_TRIGGERS; |
2803 | STATIC_CONTRACT_MODE_PREEMPTIVE; |
2804 | /* cannot use contract because of SEH |
2805 | CONTRACTL |
2806 | { |
2807 | THROWS; |
2808 | GC_TRIGGERS; |
2809 | MODE_PREEMPTIVE; |
2810 | } |
2811 | CONTRACTL_END;*/ |
2812 | |
2813 | AsyncCallback* asyncCallback = NULL; |
2814 | EX_TRY{ |
2815 | if ( waitInfo->flag & WAIT_SINGLE_EXECUTION) |
2816 | { |
2817 | DeactivateNthWait (waitInfo,index) ; |
2818 | } |
2819 | else |
2820 | { // reactivate wait by resetting timer |
2821 | waitInfo->timer.startTime = GetTickCount(); |
2822 | } |
2823 | |
2824 | asyncCallback = MakeAsyncCallback(); |
2825 | if (asyncCallback) |
2826 | { |
2827 | asyncCallback->wait = waitInfo; |
2828 | asyncCallback->waitTimedOut = waitTimedOut; |
2829 | |
2830 | InterlockedIncrement(&waitInfo->refCount); |
2831 | |
2832 | #ifndef FEATURE_PAL |
2833 | if (FALSE == PostQueuedCompletionStatus((LPOVERLAPPED)asyncCallback, (LPOVERLAPPED_COMPLETION_ROUTINE)WaitIOCompletionCallback)) |
2834 | #else // FEATURE_PAL |
2835 | if (FALSE == QueueUserWorkItem(AsyncCallbackCompletion, asyncCallback, QUEUE_ONLY)) |
2836 | #endif // !FEATURE_PAL |
2837 | ReleaseAsyncCallback(asyncCallback); |
2838 | } |
2839 | } |
2840 | EX_CATCH { |
2841 | if (asyncCallback) |
2842 | ReleaseAsyncCallback(asyncCallback); |
2843 | |
2844 | if (SwallowUnhandledExceptions()) |
2845 | { |
2846 | // Do nothing to swallow the exception |
2847 | } |
2848 | else |
2849 | { |
2850 | EX_RETHROW; |
2851 | } |
2852 | } |
2853 | EX_END_CATCH(SwallowAllExceptions); |
2854 | } |
2855 | |
2856 | |
2857 | DWORD WINAPI ThreadpoolMgr::AsyncCallbackCompletion(PVOID pArgs) |
2858 | { |
2859 | CONTRACTL |
2860 | { |
2861 | THROWS; |
2862 | MODE_PREEMPTIVE; |
2863 | GC_TRIGGERS; |
2864 | SO_TOLERANT; |
2865 | } |
2866 | CONTRACTL_END; |
2867 | |
2868 | Thread * pThread = GetThread(); |
2869 | |
2870 | if (pThread == NULL) |
2871 | { |
2872 | HRESULT hr = ERROR_SUCCESS; |
2873 | |
2874 | ClrFlsSetThreadType(ThreadType_Threadpool_Worker); |
2875 | pThread = SetupThreadNoThrow(&hr); |
2876 | |
2877 | if (pThread == NULL) |
2878 | { |
2879 | return hr; |
2880 | } |
2881 | } |
2882 | |
2883 | BEGIN_SO_INTOLERANT_CODE_NOTHROW(pThread, return ERROR_STACK_OVERFLOW); |
2884 | { |
2885 | AsyncCallback * asyncCallback = (AsyncCallback*) pArgs; |
2886 | |
2887 | WaitInfo * waitInfo = asyncCallback->wait; |
2888 | |
2889 | AsyncCallbackHolder asyncCBHolder; |
2890 | asyncCBHolder.Assign(asyncCallback); |
2891 | |
2892 | // We fire the "dequeue" ETW event here, before executing the user code, to enable correlation with |
2893 | // the ThreadPoolIOEnqueue fired in ThreadpoolMgr::RegisterWaitForSingleObject |
2894 | if (ETW_EVENT_ENABLED(MICROSOFT_WINDOWS_DOTNETRUNTIME_PROVIDER_Context, ThreadPoolIODequeue)) |
2895 | FireEtwThreadPoolIODequeue(waitInfo, reinterpret_cast<void*>(waitInfo->Callback), GetClrInstanceId()); |
2896 | |
2897 | // the user callback can throw, the host must be prepared to handle it. |
2898 | // SQL is ok, since they have a top-level SEH handler. However, there's |
2899 | // no easy way to verify it |
2900 | |
2901 | ((WAITORTIMERCALLBACKFUNC) waitInfo->Callback) |
2902 | ( waitInfo->Context, asyncCallback->waitTimedOut != FALSE); |
2903 | } |
2904 | END_SO_INTOLERANT_CODE; |
2905 | |
2906 | return ERROR_SUCCESS; |
2907 | } |
2908 | |
2909 | void ThreadpoolMgr::DeactivateWait(WaitInfo* waitInfo) |
2910 | { |
2911 | LIMITED_METHOD_CONTRACT; |
2912 | |
2913 | ThreadCB* threadCB = waitInfo->threadCB; |
2914 | DWORD endIndex = threadCB->NumActiveWaits-1; |
2915 | DWORD index; |
2916 | |
2917 | for (index = 0; index <= endIndex; index++) |
2918 | { |
2919 | LIST_ENTRY* head = &(threadCB->waitPointer[index]); |
2920 | LIST_ENTRY* current = head; |
2921 | do { |
2922 | if (current->Flink == (PVOID) waitInfo) |
2923 | goto FOUND; |
2924 | |
2925 | current = (LIST_ENTRY*) current->Flink; |
2926 | |
2927 | } while (current != head); |
2928 | } |
2929 | |
2930 | FOUND: |
2931 | _ASSERTE(index <= endIndex); |
2932 | |
2933 | DeactivateNthWait(waitInfo, index); |
2934 | } |
2935 | |
2936 | |
2937 | void ThreadpoolMgr::DeactivateNthWait(WaitInfo* waitInfo, DWORD index) |
2938 | { |
2939 | LIMITED_METHOD_CONTRACT; |
2940 | |
2941 | ThreadCB* threadCB = waitInfo->threadCB; |
2942 | |
2943 | if (waitInfo->link.Flink != waitInfo->link.Blink) |
2944 | { |
2945 | RemoveEntryList(&(waitInfo->link)); |
2946 | } |
2947 | else |
2948 | { |
2949 | |
2950 | ULONG EndIndex = threadCB->NumActiveWaits -1; |
2951 | |
2952 | // Move the remaining ActiveWaitArray left. |
2953 | |
2954 | ShiftWaitArray( threadCB, index+1, index,EndIndex - index ) ; |
2955 | |
2956 | // repair the blink and flink of the first and last elements in the list |
2957 | for (unsigned int i = 0; i< EndIndex-index; i++) |
2958 | { |
2959 | WaitInfo* firstWaitInfo = (WaitInfo*) threadCB->waitPointer[index+i].Flink; |
2960 | WaitInfo* lastWaitInfo = (WaitInfo*) threadCB->waitPointer[index+i].Blink; |
2961 | firstWaitInfo->link.Blink = &(threadCB->waitPointer[index+i]); |
2962 | lastWaitInfo->link.Flink = &(threadCB->waitPointer[index+i]); |
2963 | } |
2964 | // initialize the entry just freed |
2965 | InitializeListHead(&(threadCB->waitPointer[EndIndex])); |
2966 | |
2967 | threadCB->NumActiveWaits-- ; |
2968 | InterlockedDecrement(&threadCB->NumWaitHandles); |
2969 | } |
2970 | |
2971 | waitInfo->state &= ~WAIT_ACTIVE ; |
2972 | |
2973 | } |
2974 | |
2975 | void ThreadpoolMgr::DeleteWait(WaitInfo* waitInfo) |
2976 | { |
2977 | CONTRACTL |
2978 | { |
2979 | if (waitInfo->ExternalEventSafeHandle != NULL) { THROWS;} else { NOTHROW; } |
2980 | MODE_ANY; |
2981 | if (GetThread()) {GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);} |
2982 | } |
2983 | CONTRACTL_END; |
2984 | |
2985 | if(waitInfo->Context && (waitInfo->flag & WAIT_FREE_CONTEXT)) { |
2986 | DelegateInfo* pDelegate = (DelegateInfo*) waitInfo->Context; |
2987 | |
2988 | // Since the delegate release destroys a handle, we need to be in |
2989 | // co-operative mode |
2990 | { |
2991 | GCX_COOP(); |
2992 | pDelegate->Release(); |
2993 | } |
2994 | |
2995 | RecycleMemory( pDelegate, MEMTYPE_DelegateInfo ); |
2996 | } |
2997 | |
2998 | if (waitInfo->flag & WAIT_INTERNAL_COMPLETION) |
2999 | { |
3000 | waitInfo->InternalCompletionEvent.Set(); |
3001 | return; // waitInfo will be deleted by the thread that's waiting on this event |
3002 | } |
3003 | else if (waitInfo->ExternalCompletionEvent != INVALID_HANDLE) |
3004 | { |
3005 | UnsafeSetEvent(waitInfo->ExternalCompletionEvent); |
3006 | } |
3007 | else if (waitInfo->ExternalEventSafeHandle != NULL) |
3008 | { |
3009 | // Release the safe handle and the GC handle holding it |
3010 | ReleaseWaitInfo(waitInfo); |
3011 | } |
3012 | |
3013 | delete waitInfo; |
3014 | |
3015 | |
3016 | } |
3017 | |
3018 | |
3019 | |
3020 | /************************************************************************/ |
3021 | BOOL ThreadpoolMgr::UnregisterWaitEx(HANDLE hWaitObject,HANDLE Event) |
3022 | { |
3023 | CONTRACTL |
3024 | { |
3025 | THROWS; //NOTHROW; |
3026 | if (GetThread()) {GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);} |
3027 | MODE_ANY; |
3028 | } |
3029 | CONTRACTL_END; |
3030 | |
3031 | _ASSERTE(IsInitialized()); // cannot call unregister before first registering |
3032 | |
3033 | const BOOL Blocking = (Event == (HANDLE) -1); |
3034 | WaitInfo* waitInfo = (WaitInfo*) hWaitObject; |
3035 | |
3036 | if (!hWaitObject) |
3037 | { |
3038 | return FALSE; |
3039 | } |
3040 | |
3041 | // we do not allow callbacks to run in the wait thread, hence the assert |
3042 | _ASSERTE(GetCurrentThreadId() != waitInfo->threadCB->threadId); |
3043 | |
3044 | |
3045 | if (Blocking) |
3046 | { |
3047 | waitInfo->InternalCompletionEvent.CreateAutoEvent(FALSE); |
3048 | waitInfo->flag |= WAIT_INTERNAL_COMPLETION; |
3049 | |
3050 | } |
3051 | else |
3052 | { |
3053 | waitInfo->ExternalCompletionEvent = (Event ? Event : INVALID_HANDLE); |
3054 | _ASSERTE((waitInfo->flag & WAIT_INTERNAL_COMPLETION) == 0); |
3055 | // we still want to block until the wait has been deactivated |
3056 | waitInfo->PartialCompletionEvent.CreateAutoEvent(FALSE); |
3057 | } |
3058 | |
3059 | BOOL status = QueueDeregisterWait(waitInfo->threadCB->threadHandle, waitInfo); |
3060 | |
3061 | |
3062 | if (status == 0) |
3063 | { |
3064 | STRESS_LOG1(LF_THREADPOOL, LL_ERROR, "Queue APC failed in UnregisterWaitEx %x" , status); |
3065 | |
3066 | if (Blocking) |
3067 | waitInfo->InternalCompletionEvent.CloseEvent(); |
3068 | else |
3069 | waitInfo->PartialCompletionEvent.CloseEvent(); |
3070 | return FALSE; |
3071 | } |
3072 | |
3073 | if (!Blocking) |
3074 | { |
3075 | waitInfo->PartialCompletionEvent.Wait(INFINITE,TRUE); |
3076 | waitInfo->PartialCompletionEvent.CloseEvent(); |
3077 | // we cannot do DeleteWait in DeregisterWait, since the DeleteWait could happen before |
3078 | // we close the event. So, the code has been moved here. |
3079 | if (InterlockedDecrement(&waitInfo->refCount) == 0) |
3080 | { |
3081 | DeleteWait(waitInfo); |
3082 | } |
3083 | } |
3084 | |
3085 | else // i.e. blocking |
3086 | { |
3087 | _ASSERTE(waitInfo->flag & WAIT_INTERNAL_COMPLETION); |
3088 | _ASSERTE(waitInfo->ExternalEventSafeHandle == NULL); |
3089 | |
3090 | waitInfo->InternalCompletionEvent.Wait(INFINITE,TRUE); |
3091 | waitInfo->InternalCompletionEvent.CloseEvent(); |
3092 | delete waitInfo; // if WAIT_INTERNAL_COMPLETION is not set, waitInfo will be deleted in DeleteWait |
3093 | } |
3094 | return TRUE; |
3095 | } |
3096 | |
3097 | |
3098 | void ThreadpoolMgr::DeregisterWait(WaitInfo* pArgs) |
3099 | { |
3100 | |
3101 | WRAPPER_NO_CONTRACT; |
3102 | STATIC_CONTRACT_SO_INTOLERANT; |
3103 | |
3104 | WaitInfo* waitInfo = pArgs; |
3105 | |
3106 | if ( ! (waitInfo->state & WAIT_REGISTERED) ) |
3107 | { |
3108 | // set state to deleted, so that it does not get registered |
3109 | waitInfo->state |= WAIT_DELETE ; |
3110 | |
3111 | // since the wait has not even been registered, we dont need an interlock to decrease the RefCount |
3112 | waitInfo->refCount--; |
3113 | |
3114 | if (waitInfo->PartialCompletionEvent.IsValid()) |
3115 | { |
3116 | waitInfo->PartialCompletionEvent.Set(); |
3117 | } |
3118 | return; |
3119 | } |
3120 | |
3121 | if (waitInfo->state & WAIT_ACTIVE) |
3122 | { |
3123 | DeactivateWait(waitInfo); |
3124 | } |
3125 | |
3126 | if ( waitInfo->PartialCompletionEvent.IsValid()) |
3127 | { |
3128 | waitInfo->PartialCompletionEvent.Set(); |
3129 | return; // we cannot delete the wait here since the PartialCompletionEvent |
3130 | // may not have been closed yet. so, we return and rely on the waiter of PartialCompletionEvent |
3131 | // to do the close |
3132 | } |
3133 | |
3134 | if (InterlockedDecrement(&waitInfo->refCount) == 0) |
3135 | { |
3136 | // After we suspend EE during shutdown, a thread may be blocked in WaitForEndOfShutdown in alertable state. |
3137 | // We don't allow a thread reenter runtime while processing APC or pumping message. |
3138 | if (!g_fSuspendOnShutdown ) |
3139 | { |
3140 | DeleteWait(waitInfo); |
3141 | } |
3142 | } |
3143 | return; |
3144 | } |
3145 | |
3146 | |
3147 | /* This gets called in a finalizer thread ONLY IF an app does not deregister the |
3148 | the wait. Note that just because the registeredWaitHandle is collected by GC |
3149 | does not mean it is safe to delete the wait. The refcount tells us when it is |
3150 | safe. |
3151 | */ |
3152 | void ThreadpoolMgr::WaitHandleCleanup(HANDLE hWaitObject) |
3153 | { |
3154 | LIMITED_METHOD_CONTRACT; |
3155 | |
3156 | WaitInfo* waitInfo = (WaitInfo*) hWaitObject; |
3157 | _ASSERTE(waitInfo->refCount > 0); |
3158 | |
3159 | DWORD result = QueueDeregisterWait(waitInfo->threadCB->threadHandle, waitInfo); |
3160 | |
3161 | if (result == 0) |
3162 | STRESS_LOG1(LF_THREADPOOL, LL_ERROR, "Queue APC failed in WaitHandleCleanup %x" , result); |
3163 | |
3164 | } |
3165 | |
3166 | BOOL ThreadpoolMgr::CreateGateThread() |
3167 | { |
3168 | LIMITED_METHOD_CONTRACT; |
3169 | |
3170 | HANDLE threadHandle = Thread::CreateUtilityThread(Thread::StackSize_Small, GateThreadStart, NULL, W(".NET ThreadPool Gate" )); |
3171 | |
3172 | if (threadHandle) |
3173 | { |
3174 | CloseHandle(threadHandle); //we don't need this anymore |
3175 | return TRUE; |
3176 | } |
3177 | |
3178 | return FALSE; |
3179 | } |
3180 | |
3181 | |
3182 | |
3183 | /************************************************************************/ |
3184 | |
3185 | BOOL ThreadpoolMgr::BindIoCompletionCallback(HANDLE FileHandle, |
3186 | LPOVERLAPPED_COMPLETION_ROUTINE Function, |
3187 | ULONG Flags, |
3188 | DWORD& errCode) |
3189 | { |
3190 | |
3191 | CONTRACTL |
3192 | { |
3193 | THROWS; // EnsureInitialized can throw |
3194 | if (GetThread()) { GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);} |
3195 | MODE_ANY; |
3196 | } |
3197 | CONTRACTL_END; |
3198 | |
3199 | #ifndef FEATURE_PAL |
3200 | |
3201 | errCode = S_OK; |
3202 | |
3203 | EnsureInitialized(); |
3204 | |
3205 | |
3206 | _ASSERTE(GlobalCompletionPort != NULL); |
3207 | |
3208 | if (!InitCompletionPortThreadpool) |
3209 | InitCompletionPortThreadpool = TRUE; |
3210 | |
3211 | GrowCompletionPortThreadpoolIfNeeded(); |
3212 | |
3213 | HANDLE h = CreateIoCompletionPort(FileHandle, |
3214 | GlobalCompletionPort, |
3215 | (ULONG_PTR) Function, |
3216 | NumberOfProcessors); |
3217 | if (h == NULL) |
3218 | { |
3219 | errCode = GetLastError(); |
3220 | return FALSE; |
3221 | } |
3222 | |
3223 | _ASSERTE(h == GlobalCompletionPort); |
3224 | |
3225 | return TRUE; |
3226 | #else // FEATURE_PAL |
3227 | SetLastError(ERROR_CALL_NOT_IMPLEMENTED); |
3228 | return FALSE; |
3229 | #endif // !FEATURE_PAL |
3230 | } |
3231 | |
3232 | #ifndef FEATURE_PAL |
3233 | BOOL ThreadpoolMgr::CreateCompletionPortThread(LPVOID lpArgs) |
3234 | { |
3235 | CONTRACTL |
3236 | { |
3237 | NOTHROW; |
3238 | if (GetThread()) { GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);} |
3239 | MODE_ANY; |
3240 | } |
3241 | CONTRACTL_END; |
3242 | |
3243 | Thread *pThread; |
3244 | BOOL fIsCLRThread; |
3245 | if ((pThread = CreateUnimpersonatedThread(CompletionPortThreadStart, lpArgs, &fIsCLRThread)) != NULL) |
3246 | { |
3247 | LastCPThreadCreation = GetTickCount(); // record this for use by logic to spawn additional threads |
3248 | |
3249 | if (fIsCLRThread) { |
3250 | pThread->ChooseThreadCPUGroupAffinity(); |
3251 | pThread->StartThread(); |
3252 | } |
3253 | else { |
3254 | DWORD status; |
3255 | status = ResumeThread((HANDLE)pThread); |
3256 | _ASSERTE(status != (DWORD) (-1)); |
3257 | CloseHandle((HANDLE)pThread); // we don't need this anymore |
3258 | } |
3259 | |
3260 | ThreadCounter::Counts counts = CPThreadCounter.GetCleanCounts(); |
3261 | FireEtwIOThreadCreate_V1(counts.NumActive + counts.NumRetired, counts.NumRetired, GetClrInstanceId()); |
3262 | |
3263 | return TRUE; |
3264 | } |
3265 | |
3266 | |
3267 | return FALSE; |
3268 | } |
3269 | |
3270 | DWORD WINAPI ThreadpoolMgr::CompletionPortThreadStart(LPVOID lpArgs) |
3271 | { |
3272 | ClrFlsSetThreadType (ThreadType_Threadpool_IOCompletion); |
3273 | |
3274 | CONTRACTL |
3275 | { |
3276 | THROWS; |
3277 | if (GetThread()) { MODE_PREEMPTIVE;} else { DISABLED(MODE_ANY);} |
3278 | if (GetThread()) { GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);} |
3279 | SO_INTOLERANT; |
3280 | } |
3281 | CONTRACTL_END; |
3282 | |
3283 | DWORD numBytes=0; |
3284 | size_t key=0; |
3285 | |
3286 | LPOVERLAPPED pOverlapped = NULL; |
3287 | DWORD errorCode; |
3288 | PIOCompletionContext context; |
3289 | BOOL fIsCompletionContext; |
3290 | |
3291 | const DWORD CP_THREAD_WAIT = AppX::IsAppXProcess() ? 5000 : 15000; /* milliseconds */ |
3292 | |
3293 | _ASSERTE(GlobalCompletionPort != NULL); |
3294 | |
3295 | BOOL fThreadInit = FALSE; |
3296 | Thread *pThread = NULL; |
3297 | |
3298 | DWORD cpThreadWait = 0; |
3299 | |
3300 | if (g_fEEStarted) { |
3301 | pThread = SetupThreadNoThrow(); |
3302 | if (pThread == NULL) { |
3303 | return 0; |
3304 | } |
3305 | |
3306 | // converted to CLRThread and added to ThreadStore, pick an group affinity for this thread |
3307 | pThread->ChooseThreadCPUGroupAffinity(); |
3308 | |
3309 | fThreadInit = TRUE; |
3310 | } |
3311 | |
3312 | #ifdef FEATURE_COMINTEROP |
3313 | // Threadpool threads should be initialized as MTA. If we are unable to do so, |
3314 | // return failure. |
3315 | BOOL fCoInited = FALSE; |
3316 | { |
3317 | fCoInited = SUCCEEDED(::CoInitializeEx(NULL, COINIT_MULTITHREADED)); |
3318 | if (!fCoInited) |
3319 | { |
3320 | goto Exit; |
3321 | } |
3322 | } |
3323 | |
3324 | if (pThread && pThread->SetApartment(Thread::AS_InMTA, TRUE) != Thread::AS_InMTA) |
3325 | { |
3326 | // @todo: should we log the failure |
3327 | goto Exit; |
3328 | } |
3329 | #endif // FEATURE_COMINTEROP |
3330 | |
3331 | ThreadCounter::Counts oldCounts; |
3332 | ThreadCounter::Counts newCounts; |
3333 | |
3334 | cpThreadWait = CP_THREAD_WAIT; |
3335 | for (;; ) |
3336 | { |
3337 | Top: |
3338 | if (!fThreadInit) { |
3339 | if (g_fEEStarted) { |
3340 | pThread = SetupThreadNoThrow(); |
3341 | if (pThread == NULL) { |
3342 | break; |
3343 | } |
3344 | |
3345 | // converted to CLRThread and added to ThreadStore, pick an group affinity for this thread |
3346 | pThread->ChooseThreadCPUGroupAffinity(); |
3347 | |
3348 | #ifdef FEATURE_COMINTEROP |
3349 | if (pThread->SetApartment(Thread::AS_InMTA, TRUE) != Thread::AS_InMTA) |
3350 | { |
3351 | // @todo: should we log the failure |
3352 | goto Exit; |
3353 | } |
3354 | #endif // FEATURE_COMINTEROP |
3355 | |
3356 | fThreadInit = TRUE; |
3357 | } |
3358 | } |
3359 | |
3360 | GCX_PREEMP_NO_DTOR(); |
3361 | |
3362 | // |
3363 | // We're about to wait on the IOCP; mark ourselves as no longer "working." |
3364 | // |
3365 | while (true) |
3366 | { |
3367 | ThreadCounter::Counts oldCounts = CPThreadCounter.DangerousGetDirtyCounts(); |
3368 | ThreadCounter::Counts newCounts = oldCounts; |
3369 | newCounts.NumWorking--; |
3370 | |
3371 | // |
3372 | // If we've only got one thread left, it won't be allowed to exit, because we need to keep |
3373 | // one thread listening for completions. So there's no point in having a timeout; it will |
3374 | // only use power unnecessarily. |
3375 | // |
3376 | cpThreadWait = (newCounts.NumActive == 1) ? INFINITE : CP_THREAD_WAIT; |
3377 | |
3378 | if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts)) |
3379 | break; |
3380 | } |
3381 | |
3382 | errorCode = S_OK; |
3383 | |
3384 | if (lpArgs == NULL) |
3385 | { |
3386 | CONTRACT_VIOLATION(ThrowsViolation); |
3387 | |
3388 | if (g_fCompletionPortDrainNeeded && pThread) |
3389 | { |
3390 | // We have started draining completion port. |
3391 | // The next job picked up by this thread is going to be after our special marker. |
3392 | if (!pThread->IsCompletionPortDrained()) |
3393 | { |
3394 | pThread->MarkCompletionPortDrained(); |
3395 | } |
3396 | } |
3397 | |
3398 | context = NULL; |
3399 | fIsCompletionContext = FALSE; |
3400 | |
3401 | if (pThread == NULL) |
3402 | { |
3403 | pThread = GetThread(); |
3404 | } |
3405 | |
3406 | if (pThread) |
3407 | { |
3408 | |
3409 | context = (PIOCompletionContext) pThread->GetIOCompletionContext(); |
3410 | |
3411 | if (context->lpOverlapped != NULL) |
3412 | { |
3413 | errorCode = context->ErrorCode; |
3414 | numBytes = context->numBytesTransferred; |
3415 | pOverlapped = context->lpOverlapped; |
3416 | key = context->key; |
3417 | |
3418 | context->lpOverlapped = NULL; |
3419 | fIsCompletionContext = TRUE; |
3420 | } |
3421 | } |
3422 | |
3423 | if((context == NULL) || (!fIsCompletionContext)) |
3424 | { |
3425 | _ASSERTE (context == NULL || context->lpOverlapped == NULL); |
3426 | |
3427 | BOOL status = GetQueuedCompletionStatus( |
3428 | GlobalCompletionPort, |
3429 | &numBytes, |
3430 | (PULONG_PTR)&key, |
3431 | &pOverlapped, |
3432 | cpThreadWait |
3433 | ); |
3434 | |
3435 | if (status == 0) |
3436 | errorCode = GetLastError(); |
3437 | } |
3438 | } |
3439 | else |
3440 | { |
3441 | QueuedStatus *CompletionStatus = (QueuedStatus*)lpArgs; |
3442 | numBytes = CompletionStatus->numBytes; |
3443 | key = (size_t)CompletionStatus->key; |
3444 | pOverlapped = CompletionStatus->pOverlapped; |
3445 | errorCode = CompletionStatus->errorCode; |
3446 | delete CompletionStatus; |
3447 | lpArgs = NULL; // one-time deal for initial CP packet |
3448 | } |
3449 | |
3450 | // We fire IODequeue events whether the IO completion was retrieved in the above call to |
3451 | // GetQueuedCompletionStatus or during an earlier call (e.g. in GateThreadStart, and passed here in lpArgs, |
3452 | // or in CompletionPortDispatchWorkWithinAppDomain, and passed here through StoreOverlappedInfoInThread) |
3453 | |
3454 | // For the purposes of activity correlation we only fire ETW events here, if needed OR if not fired at a higher |
3455 | // abstraction level (e.g. ThreadpoolMgr::RegisterWaitForSingleObject) |
3456 | // Note: we still fire the event for managed async IO, despite the fact we don't have a paired IOEnqueue event |
3457 | // for this case. We do this to "mark" the end of the previous workitem. When we provide full support at the higher |
3458 | // abstraction level for managed IO we can remove the IODequeues fired here |
3459 | if (ETW_EVENT_ENABLED(MICROSOFT_WINDOWS_DOTNETRUNTIME_PROVIDER_Context, ThreadPoolIODequeue) |
3460 | && !AreEtwIOQueueEventsSpeciallyHandled((LPOVERLAPPED_COMPLETION_ROUTINE)key) && pOverlapped != NULL) |
3461 | { |
3462 | FireEtwThreadPoolIODequeue(pOverlapped, OverlappedDataObject::GetOverlappedForTracing(pOverlapped), GetClrInstanceId()); |
3463 | } |
3464 | |
3465 | bool enterRetirement; |
3466 | |
3467 | while (true) |
3468 | { |
3469 | // |
3470 | // When we reach this point, this thread is "active" but not "working." Depending on the result of the call to GetQueuedCompletionStatus, |
3471 | // and the state of the rest of the IOCP threads, we need to figure out whether to de-activate (exit) this thread, retire this thread, |
3472 | // or transition to "working." |
3473 | // |
3474 | |
3475 | // counts volatile read paired with CompareExchangeCounts loop set |
3476 | oldCounts = CPThreadCounter.DangerousGetDirtyCounts(); |
3477 | newCounts = oldCounts; |
3478 | enterRetirement = false; |
3479 | |
3480 | if (errorCode == WAIT_TIMEOUT) |
3481 | { |
3482 | // |
3483 | // We timed out, and are going to try to exit or retire. |
3484 | // |
3485 | newCounts.NumActive--; |
3486 | |
3487 | // |
3488 | // We need at least one free thread, or we have no way of knowing if completions are being queued. |
3489 | // |
3490 | if (newCounts.NumWorking == newCounts.NumActive) |
3491 | { |
3492 | newCounts = oldCounts; |
3493 | newCounts.NumWorking++; //not really working, but we'll decremented it at the top |
3494 | if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts)) |
3495 | goto Top; |
3496 | else |
3497 | continue; |
3498 | } |
3499 | |
3500 | // |
3501 | // We can't exit a thread that has pending I/O - we'll "retire" it instead. |
3502 | // |
3503 | if (IsIoPending()) |
3504 | { |
3505 | enterRetirement = true; |
3506 | newCounts.NumRetired++; |
3507 | } |
3508 | } |
3509 | else |
3510 | { |
3511 | // |
3512 | // We have work to do |
3513 | // |
3514 | newCounts.NumWorking++; |
3515 | } |
3516 | |
3517 | if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts)) |
3518 | break; |
3519 | } |
3520 | |
3521 | if (errorCode == WAIT_TIMEOUT) |
3522 | { |
3523 | if (!enterRetirement) |
3524 | { |
3525 | goto Exit; |
3526 | } |
3527 | else |
3528 | { |
3529 | // now in "retired mode" waiting for pending io to complete |
3530 | FireEtwIOThreadRetire_V1(newCounts.NumActive + newCounts.NumRetired, newCounts.NumRetired, GetClrInstanceId()); |
3531 | |
3532 | for (;;) |
3533 | { |
3534 | #ifndef FEATURE_PAL |
3535 | if (g_fCompletionPortDrainNeeded && pThread) |
3536 | { |
3537 | // The thread is not going to process IO job now. |
3538 | if (!pThread->IsCompletionPortDrained()) |
3539 | { |
3540 | pThread->MarkCompletionPortDrained(); |
3541 | } |
3542 | } |
3543 | #endif // !FEATURE_PAL |
3544 | |
3545 | DWORD status = SafeWait(RetiredCPWakeupEvent,CP_THREAD_PENDINGIO_WAIT,FALSE); |
3546 | _ASSERTE(status == WAIT_TIMEOUT || status == WAIT_OBJECT_0); |
3547 | |
3548 | if (status == WAIT_TIMEOUT) |
3549 | { |
3550 | if (IsIoPending()) |
3551 | { |
3552 | continue; |
3553 | } |
3554 | else |
3555 | { |
3556 | // We can now exit; decrement the retired count. |
3557 | while (true) |
3558 | { |
3559 | // counts volatile read paired with CompareExchangeCounts loop set |
3560 | oldCounts = CPThreadCounter.DangerousGetDirtyCounts(); |
3561 | newCounts = oldCounts; |
3562 | newCounts.NumRetired--; |
3563 | if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts)) |
3564 | break; |
3565 | } |
3566 | goto Exit; |
3567 | } |
3568 | } |
3569 | else |
3570 | { |
3571 | // put back into rotation -- we need a thread |
3572 | while (true) |
3573 | { |
3574 | // counts volatile read paired with CompareExchangeCounts loop set |
3575 | oldCounts = CPThreadCounter.DangerousGetDirtyCounts(); |
3576 | newCounts = oldCounts; |
3577 | newCounts.NumRetired--; |
3578 | newCounts.NumActive++; |
3579 | newCounts.NumWorking++; //we're not really working, but we'll decrement this before waiting for work. |
3580 | if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts)) |
3581 | break; |
3582 | } |
3583 | FireEtwIOThreadUnretire_V1(newCounts.NumActive + newCounts.NumRetired, newCounts.NumRetired, GetClrInstanceId()); |
3584 | goto Top; |
3585 | } |
3586 | } |
3587 | } |
3588 | } |
3589 | |
3590 | // we should not reach this point unless we have work to do |
3591 | _ASSERTE(errorCode != WAIT_TIMEOUT && !enterRetirement); |
3592 | |
3593 | // if we have no more free threads, start the gate thread |
3594 | if (newCounts.NumWorking >= newCounts.NumActive) |
3595 | EnsureGateThreadRunning(); |
3596 | |
3597 | |
3598 | // We can not assert here. If stdin/stdout/stderr of child process are redirected based on |
3599 | // async io, GetQueuedCompletionStatus returns when child process operates on its stdin/stdout/stderr. |
3600 | // Parent process does not issue any ReadFile/WriteFile, and hence pOverlapped is going to be NULL. |
3601 | //_ASSERTE(pOverlapped != NULL); |
3602 | |
3603 | if (pOverlapped != NULL) |
3604 | { |
3605 | _ASSERTE(key != 0); // should be a valid function address |
3606 | |
3607 | if (key != 0) |
3608 | { |
3609 | if (GCHeapUtilities::IsGCInProgress(TRUE)) |
3610 | { |
3611 | //Indicate that this thread is free, and waiting on GC, not doing any user work. |
3612 | //This helps in threads not getting injected when some threads have woken up from the |
3613 | //GC event, and some have not. |
3614 | while (true) |
3615 | { |
3616 | // counts volatile read paired with CompareExchangeCounts loop set |
3617 | oldCounts = CPThreadCounter.DangerousGetDirtyCounts(); |
3618 | newCounts = oldCounts; |
3619 | newCounts.NumWorking--; |
3620 | if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts)) |
3621 | break; |
3622 | } |
3623 | |
3624 | // GC is imminent, so wait until GC is complete before executing next request. |
3625 | // this reduces in-flight objects allocated right before GC, easing the GC's work |
3626 | GCHeapUtilities::WaitForGCCompletion(TRUE); |
3627 | |
3628 | while (true) |
3629 | { |
3630 | // counts volatile read paired with CompareExchangeCounts loop set |
3631 | oldCounts = CPThreadCounter.DangerousGetDirtyCounts(); |
3632 | newCounts = oldCounts; |
3633 | newCounts.NumWorking++; |
3634 | if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts)) |
3635 | break; |
3636 | } |
3637 | |
3638 | if (newCounts.NumWorking >= newCounts.NumActive) |
3639 | EnsureGateThreadRunning(); |
3640 | } |
3641 | else |
3642 | { |
3643 | GrowCompletionPortThreadpoolIfNeeded(); |
3644 | } |
3645 | |
3646 | { |
3647 | CONTRACT_VIOLATION(ThrowsViolation); |
3648 | |
3649 | ThreadLocaleHolder localeHolder; |
3650 | |
3651 | ((LPOVERLAPPED_COMPLETION_ROUTINE) key)(errorCode, numBytes, pOverlapped); |
3652 | } |
3653 | |
3654 | if (pThread == NULL) { |
3655 | pThread = GetThread(); |
3656 | } |
3657 | if (pThread) { |
3658 | if (pThread->IsAbortRequested()) |
3659 | pThread->EEResetAbort(Thread::TAR_ALL); |
3660 | pThread->InternalReset(); |
3661 | } |
3662 | } |
3663 | else |
3664 | { |
3665 | // Application bug - can't do much, just ignore it |
3666 | } |
3667 | |
3668 | } |
3669 | |
3670 | } // for (;;) |
3671 | |
3672 | Exit: |
3673 | |
3674 | oldCounts = CPThreadCounter.GetCleanCounts(); |
3675 | |
3676 | // we should never destroy or retire all IOCP threads, because then we won't have any threads to notice incoming completions. |
3677 | _ASSERTE(oldCounts.NumActive > 0); |
3678 | |
3679 | FireEtwIOThreadTerminate_V1(oldCounts.NumActive + oldCounts.NumRetired, oldCounts.NumRetired, GetClrInstanceId()); |
3680 | |
3681 | #ifdef FEATURE_COMINTEROP |
3682 | if (pThread) { |
3683 | pThread->SetApartment(Thread::AS_Unknown, TRUE); |
3684 | pThread->CoUninitialize(); |
3685 | } |
3686 | // Couninit the worker thread |
3687 | if (fCoInited) |
3688 | { |
3689 | CoUninitialize(); |
3690 | } |
3691 | #endif |
3692 | |
3693 | if (pThread) { |
3694 | pThread->ClearThreadCPUGroupAffinity(); |
3695 | |
3696 | DestroyThread(pThread); |
3697 | } |
3698 | |
3699 | return 0; |
3700 | } |
3701 | |
3702 | LPOVERLAPPED ThreadpoolMgr::CompletionPortDispatchWorkWithinAppDomain( |
3703 | Thread* pThread, |
3704 | DWORD* pErrorCode, |
3705 | DWORD* pNumBytes, |
3706 | size_t* pKey, |
3707 | DWORD adid) |
3708 | // |
3709 | //This function is called just after dispatching the previous BindIO callback |
3710 | //to Managed code. This is a perf optimization to do a quick call to |
3711 | //GetQueuedCompletionStatus with a timeout of 0 ms. If there is work in the |
3712 | //same appdomain, dispatch it back immediately. If not stick it in a well known |
3713 | //place, and reenter the target domain. The timeout of zero is chosen so as to |
3714 | //not delay appdomain unloads. |
3715 | // |
3716 | { |
3717 | STATIC_CONTRACT_THROWS; |
3718 | STATIC_CONTRACT_GC_NOTRIGGER; |
3719 | STATIC_CONTRACT_MODE_ANY; |
3720 | STATIC_CONTRACT_SO_TOLERANT; |
3721 | |
3722 | LPOVERLAPPED lpOverlapped=NULL; |
3723 | |
3724 | BOOL status=FALSE; |
3725 | OVERLAPPEDDATAREF overlapped=NULL; |
3726 | BOOL ManagedCallback=FALSE; |
3727 | |
3728 | *pErrorCode = S_OK; |
3729 | |
3730 | |
3731 | //Very Very Important! |
3732 | //Do not change the timeout for GetQueuedCompletionStatus to a non-zero value. |
3733 | //Selecting a non-zero value can cause the thread to block, and lead to expensive context switches. |
3734 | //In real life scenarios, we have noticed a packet to be not availabe immediately, but very shortly |
3735 | //(after few 100's of instructions), and falling back to the VM is good in that case as compared to |
3736 | //taking a context switch. Changing the timeout to non-zero can lead to perf degrades, that are very |
3737 | //hard to diagnose. |
3738 | |
3739 | status = ::GetQueuedCompletionStatus( |
3740 | GlobalCompletionPort, |
3741 | pNumBytes, |
3742 | (PULONG_PTR)pKey, |
3743 | &lpOverlapped, |
3744 | 0); |
3745 | |
3746 | DWORD lastError = GetLastError(); |
3747 | |
3748 | if (status == 0) |
3749 | { |
3750 | if (lpOverlapped != NULL) |
3751 | { |
3752 | *pErrorCode = lastError; |
3753 | } |
3754 | else |
3755 | { |
3756 | return NULL; |
3757 | } |
3758 | } |
3759 | |
3760 | if (((LPOVERLAPPED_COMPLETION_ROUTINE) *pKey) != BindIoCompletionCallbackStub) |
3761 | { |
3762 | //_ASSERTE(FALSE); |
3763 | } |
3764 | else |
3765 | { |
3766 | ManagedCallback = TRUE; |
3767 | overlapped = ObjectToOVERLAPPEDDATAREF(OverlappedDataObject::GetOverlapped(lpOverlapped)); |
3768 | } |
3769 | |
3770 | if (ManagedCallback) |
3771 | { |
3772 | _ASSERTE(*pKey != 0); // should be a valid function address |
3773 | |
3774 | if (*pKey ==0) |
3775 | { |
3776 | //Application Bug. |
3777 | return NULL; |
3778 | } |
3779 | } |
3780 | else |
3781 | { |
3782 | //Just retruned back from managed code, a Thread structure should exist. |
3783 | _ASSERTE (pThread); |
3784 | |
3785 | //Oops, this is an overlapped fom a different appdomain. STick it in |
3786 | //the thread. We will process it later. |
3787 | |
3788 | StoreOverlappedInfoInThread(pThread, *pErrorCode, *pNumBytes, *pKey, lpOverlapped); |
3789 | |
3790 | lpOverlapped = NULL; |
3791 | } |
3792 | |
3793 | #ifndef DACCESS_COMPILE |
3794 | return lpOverlapped; |
3795 | #endif |
3796 | } |
3797 | |
3798 | void ThreadpoolMgr::StoreOverlappedInfoInThread(Thread* pThread, DWORD dwErrorCode, DWORD dwNumBytes, size_t key, LPOVERLAPPED lpOverlapped) |
3799 | { |
3800 | STATIC_CONTRACT_NOTHROW; |
3801 | STATIC_CONTRACT_GC_NOTRIGGER; |
3802 | STATIC_CONTRACT_MODE_ANY; |
3803 | STATIC_CONTRACT_SO_TOLERANT; |
3804 | |
3805 | _ASSERTE(pThread); |
3806 | |
3807 | PIOCompletionContext context; |
3808 | |
3809 | context = (PIOCompletionContext) pThread->GetIOCompletionContext(); |
3810 | |
3811 | _ASSERTE(context); |
3812 | |
3813 | context->ErrorCode = dwErrorCode; |
3814 | context->numBytesTransferred = dwNumBytes; |
3815 | context->lpOverlapped = lpOverlapped; |
3816 | context->key = key; |
3817 | } |
3818 | |
3819 | BOOL ThreadpoolMgr::ShouldGrowCompletionPortThreadpool(ThreadCounter::Counts counts) |
3820 | { |
3821 | CONTRACTL |
3822 | { |
3823 | GC_NOTRIGGER; |
3824 | NOTHROW; |
3825 | MODE_ANY; |
3826 | SO_TOLERANT; |
3827 | } |
3828 | CONTRACTL_END; |
3829 | |
3830 | if (counts.NumWorking >= counts.NumActive |
3831 | && NumCPInfrastructureThreads == 0 |
3832 | && (counts.NumActive == 0 || !GCHeapUtilities::IsGCInProgress(TRUE)) |
3833 | ) |
3834 | { |
3835 | // adjust limit if neeeded |
3836 | if (counts.NumRetired == 0) |
3837 | { |
3838 | if (counts.NumActive + counts.NumRetired < MaxLimitTotalCPThreads && |
3839 | (counts.NumActive < MinLimitTotalCPThreads || cpuUtilization < CpuUtilizationLow)) |
3840 | { |
3841 | // add one more check to make sure that we haven't fired off a new |
3842 | // thread since the last time time we checked the cpu utilization. |
3843 | // However, don't bother if we haven't reached the MinLimit (2*number of cpus) |
3844 | if ((counts.NumActive < MinLimitTotalCPThreads) || |
3845 | SufficientDelaySinceLastSample(LastCPThreadCreation,counts.NumActive)) |
3846 | { |
3847 | return TRUE; |
3848 | } |
3849 | } |
3850 | } |
3851 | |
3852 | if (counts.NumRetired > 0) |
3853 | return TRUE; |
3854 | } |
3855 | return FALSE; |
3856 | } |
3857 | |
3858 | void ThreadpoolMgr::GrowCompletionPortThreadpoolIfNeeded() |
3859 | { |
3860 | CONTRACTL |
3861 | { |
3862 | if (GetThread()) { GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);} |
3863 | NOTHROW; |
3864 | MODE_ANY; |
3865 | } |
3866 | CONTRACTL_END; |
3867 | |
3868 | ThreadCounter::Counts oldCounts, newCounts; |
3869 | while (true) |
3870 | { |
3871 | oldCounts = CPThreadCounter.GetCleanCounts(); |
3872 | newCounts = oldCounts; |
3873 | |
3874 | if(!ShouldGrowCompletionPortThreadpool(oldCounts)) |
3875 | { |
3876 | break; |
3877 | } |
3878 | else |
3879 | { |
3880 | if (oldCounts.NumRetired > 0) |
3881 | { |
3882 | // wakeup retired thread instead |
3883 | RetiredCPWakeupEvent->Set(); |
3884 | return; |
3885 | } |
3886 | else |
3887 | { |
3888 | // create a new thread. New IOCP threads start as "active" and "working" |
3889 | newCounts.NumActive++; |
3890 | newCounts.NumWorking++; |
3891 | if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts)) |
3892 | { |
3893 | if (!CreateCompletionPortThread(NULL)) |
3894 | { |
3895 | // if thread creation failed, we have to adjust the counts back down. |
3896 | while (true) |
3897 | { |
3898 | // counts volatile read paired with CompareExchangeCounts loop set |
3899 | oldCounts = CPThreadCounter.DangerousGetDirtyCounts(); |
3900 | newCounts = oldCounts; |
3901 | newCounts.NumActive--; |
3902 | newCounts.NumWorking--; |
3903 | if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts)) |
3904 | break; |
3905 | } |
3906 | } |
3907 | return; |
3908 | } |
3909 | } |
3910 | } |
3911 | } |
3912 | } |
3913 | #endif // !FEATURE_PAL |
3914 | |
3915 | // Returns true if there is pending io on the thread. |
3916 | BOOL ThreadpoolMgr::IsIoPending() |
3917 | { |
3918 | CONTRACTL |
3919 | { |
3920 | NOTHROW; |
3921 | MODE_ANY; |
3922 | GC_NOTRIGGER; |
3923 | } |
3924 | CONTRACTL_END; |
3925 | |
3926 | #ifndef FEATURE_PAL |
3927 | int Status; |
3928 | ULONG IsIoPending; |
3929 | |
3930 | if (g_pufnNtQueryInformationThread) |
3931 | { |
3932 | Status =(int) (*g_pufnNtQueryInformationThread)(GetCurrentThread(), |
3933 | ThreadIsIoPending, |
3934 | &IsIoPending, |
3935 | sizeof(IsIoPending), |
3936 | NULL); |
3937 | |
3938 | |
3939 | if ((Status < 0) || IsIoPending) |
3940 | return TRUE; |
3941 | else |
3942 | return FALSE; |
3943 | } |
3944 | return TRUE; |
3945 | #else |
3946 | return FALSE; |
3947 | #endif // !FEATURE_PAL |
3948 | } |
3949 | |
3950 | #ifndef FEATURE_PAL |
3951 | |
3952 | #ifdef _WIN64 |
3953 | #pragma warning (disable : 4716) |
3954 | #else |
3955 | #pragma warning (disable : 4715) |
3956 | #endif |
3957 | |
3958 | int ThreadpoolMgr::GetCPUBusyTime_NT(PROCESS_CPU_INFORMATION* pOldInfo) |
3959 | { |
3960 | LIMITED_METHOD_CONTRACT; |
3961 | |
3962 | PROCESS_CPU_INFORMATION newUsage; |
3963 | newUsage.idleTime.QuadPart = 0; |
3964 | newUsage.kernelTime.QuadPart = 0; |
3965 | newUsage.userTime.QuadPart = 0; |
3966 | |
3967 | if (CPUGroupInfo::CanEnableGCCPUGroups() && CPUGroupInfo::CanEnableThreadUseAllCpuGroups()) |
3968 | { |
3969 | #if !defined(FEATURE_REDHAWK) && !defined(FEATURE_PAL) |
3970 | FILETIME newIdleTime, newKernelTime, newUserTime; |
3971 | |
3972 | CPUGroupInfo::GetSystemTimes(&newIdleTime, &newKernelTime, &newUserTime); |
3973 | newUsage.idleTime.u.LowPart = newIdleTime.dwLowDateTime; |
3974 | newUsage.idleTime.u.HighPart = newIdleTime.dwHighDateTime; |
3975 | newUsage.kernelTime.u.LowPart = newKernelTime.dwLowDateTime; |
3976 | newUsage.kernelTime.u.HighPart = newKernelTime.dwHighDateTime; |
3977 | newUsage.userTime.u.LowPart = newUserTime.dwLowDateTime; |
3978 | newUsage.userTime.u.HighPart = newUserTime.dwHighDateTime; |
3979 | #endif |
3980 | } |
3981 | else |
3982 | { |
3983 | (*g_pufnNtQuerySystemInformation)(SystemProcessorPerformanceInformation, |
3984 | pOldInfo->usageBuffer, |
3985 | pOldInfo->usageBufferSize, |
3986 | NULL); |
3987 | |
3988 | SYSTEM_PROCESSOR_PERFORMANCE_INFORMATION* pInfoArray = pOldInfo->usageBuffer; |
3989 | DWORD_PTR pmask = pOldInfo->affinityMask; |
3990 | |
3991 | int proc_no = 0; |
3992 | while (pmask) |
3993 | { |
3994 | if (pmask & 1) |
3995 | { //should be good: 1CPU 28823 years, 256CPUs 100+years |
3996 | newUsage.idleTime.QuadPart += pInfoArray[proc_no].IdleTime.QuadPart; |
3997 | newUsage.kernelTime.QuadPart += pInfoArray[proc_no].KernelTime.QuadPart; |
3998 | newUsage.userTime.QuadPart += pInfoArray[proc_no].UserTime.QuadPart; |
3999 | } |
4000 | |
4001 | pmask >>=1; |
4002 | proc_no++; |
4003 | } |
4004 | } |
4005 | |
4006 | __int64 cpuTotalTime, cpuBusyTime; |
4007 | |
4008 | cpuTotalTime = (newUsage.userTime.QuadPart - pOldInfo->userTime.QuadPart) + |
4009 | (newUsage.kernelTime.QuadPart - pOldInfo->kernelTime.QuadPart); |
4010 | cpuBusyTime = cpuTotalTime - |
4011 | (newUsage.idleTime.QuadPart - pOldInfo->idleTime.QuadPart); |
4012 | |
4013 | // Preserve reading |
4014 | pOldInfo->idleTime = newUsage.idleTime; |
4015 | pOldInfo->kernelTime = newUsage.kernelTime; |
4016 | pOldInfo->userTime = newUsage.userTime; |
4017 | |
4018 | __int64 reading = 0; |
4019 | |
4020 | if (cpuTotalTime > 0) |
4021 | reading = ((cpuBusyTime * 100) / cpuTotalTime); |
4022 | |
4023 | _ASSERTE(FitsIn<int>(reading)); |
4024 | return (int)reading; |
4025 | } |
4026 | |
4027 | #else // !FEATURE_PAL |
4028 | |
4029 | int ThreadpoolMgr::GetCPUBusyTime_NT(PAL_IOCP_CPU_INFORMATION* pOldInfo) |
4030 | { |
4031 | return PAL_GetCPUBusyTime(pOldInfo); |
4032 | } |
4033 | |
4034 | #endif // !FEATURE_PAL |
4035 | |
4036 | // |
4037 | // A timer that ticks every GATE_THREAD_DELAY milliseconds. |
4038 | // On platforms that support it, we use a coalescable waitable timer object. |
4039 | // For other platforms, we use Sleep, via __SwitchToThread. |
4040 | // |
4041 | class GateThreadTimer |
4042 | { |
4043 | #ifndef FEATURE_PAL |
4044 | HANDLE m_hTimer; |
4045 | |
4046 | public: |
4047 | GateThreadTimer() |
4048 | : m_hTimer(NULL) |
4049 | { |
4050 | CONTRACTL |
4051 | { |
4052 | NOTHROW; |
4053 | MODE_PREEMPTIVE; |
4054 | } |
4055 | CONTRACTL_END; |
4056 | |
4057 | if (g_pufnCreateWaitableTimerEx && g_pufnSetWaitableTimerEx) |
4058 | { |
4059 | m_hTimer = g_pufnCreateWaitableTimerEx(NULL, NULL, 0, TIMER_ALL_ACCESS); |
4060 | if (m_hTimer) |
4061 | { |
4062 | // |
4063 | // Set the timer to fire GATE_THREAD_DELAY milliseconds from now, then every GATE_THREAD_DELAY milliseconds thereafter. |
4064 | // We also set the tolerance to GET_THREAD_DELAY_TOLERANCE, allowing the OS to coalesce this timer. |
4065 | // |
4066 | LARGE_INTEGER dueTime; |
4067 | dueTime.QuadPart = MILLI_TO_100NANO(-(LONGLONG)GATE_THREAD_DELAY); //negative value indicates relative time |
4068 | if (!g_pufnSetWaitableTimerEx(m_hTimer, &dueTime, GATE_THREAD_DELAY, NULL, NULL, NULL, GATE_THREAD_DELAY_TOLERANCE)) |
4069 | { |
4070 | CloseHandle(m_hTimer); |
4071 | m_hTimer = NULL; |
4072 | } |
4073 | } |
4074 | } |
4075 | } |
4076 | |
4077 | ~GateThreadTimer() |
4078 | { |
4079 | CONTRACTL |
4080 | { |
4081 | NOTHROW; |
4082 | MODE_PREEMPTIVE; |
4083 | } |
4084 | CONTRACTL_END; |
4085 | |
4086 | if (m_hTimer) |
4087 | { |
4088 | CloseHandle(m_hTimer); |
4089 | m_hTimer = NULL; |
4090 | } |
4091 | } |
4092 | |
4093 | #endif // !FEATURE_PAL |
4094 | |
4095 | public: |
4096 | void Wait() |
4097 | { |
4098 | CONTRACTL |
4099 | { |
4100 | NOTHROW; |
4101 | MODE_PREEMPTIVE; |
4102 | } |
4103 | CONTRACTL_END; |
4104 | |
4105 | #ifndef FEATURE_PAL |
4106 | if (m_hTimer) |
4107 | WaitForSingleObject(m_hTimer, INFINITE); |
4108 | else |
4109 | #endif // !FEATURE_PAL |
4110 | __SwitchToThread(GATE_THREAD_DELAY, CALLER_LIMITS_SPINNING); |
4111 | } |
4112 | }; |
4113 | |
4114 | |
4115 | DWORD WINAPI ThreadpoolMgr::GateThreadStart(LPVOID lpArgs) |
4116 | { |
4117 | ClrFlsSetThreadType (ThreadType_Gate); |
4118 | |
4119 | CONTRACTL |
4120 | { |
4121 | NOTHROW; |
4122 | GC_TRIGGERS; |
4123 | MODE_PREEMPTIVE; |
4124 | SO_INTOLERANT; |
4125 | } |
4126 | CONTRACTL_END; |
4127 | |
4128 | _ASSERTE(GateThreadStatus == GATE_THREAD_STATUS_REQUESTED); |
4129 | |
4130 | GateThreadTimer timer; |
4131 | |
4132 | // TODO: do we need to do this? |
4133 | timer.Wait(); // delay getting initial CPU reading |
4134 | |
4135 | #ifndef FEATURE_PAL |
4136 | PROCESS_CPU_INFORMATION prevCPUInfo; |
4137 | |
4138 | if (!g_pufnNtQuerySystemInformation) |
4139 | { |
4140 | _ASSERT(!"NtQuerySystemInformation API not available!" ); |
4141 | return 0; |
4142 | } |
4143 | |
4144 | //GateThread can start before EESetup, so ensure CPU group information is initialized; |
4145 | CPUGroupInfo::EnsureInitialized(); |
4146 | |
4147 | // initialize CPU usage information structure; |
4148 | prevCPUInfo.idleTime.QuadPart = 0; |
4149 | prevCPUInfo.kernelTime.QuadPart = 0; |
4150 | prevCPUInfo.userTime.QuadPart = 0; |
4151 | |
4152 | PREFIX_ASSUME(NumberOfProcessors < 65536); |
4153 | prevCPUInfo.numberOfProcessors = NumberOfProcessors; |
4154 | |
4155 | /* In following cases, affinity mask can be zero |
4156 | * 1. hosted, the hosted process already uses multiple cpu groups. |
4157 | * thus, during CLR initialization, GetCurrentProcessCpuCount() returns 64, and GC threads |
4158 | * are created to fill up the initial CPU group. ==> use g_SystemInfo.dwNumberOfProcessors |
4159 | * 2. GCCpuGroups=1, CLR creates GC threads for all processors in all CPU groups |
4160 | * thus, the threadpool thread would use a whole CPU group (if Thread_UseAllCpuGroups is not set). |
4161 | * ==> use g_SystemInfo.dwNumberOfProcessors. |
4162 | * 3. !defined(FEATURE_PAL) but defined(FEATURE_CORESYSTEM), GetCurrentProcessCpuCount() |
4163 | * returns g_SystemInfo.dwNumberOfProcessors ==> use g_SystemInfo.dwNumberOfProcessors; |
4164 | * Other cases: |
4165 | * 1. Normal case: the mask is all or a subset of all processors in a CPU group; |
4166 | * 2. GCCpuGroups=1 && Thread_UseAllCpuGroups = 1, the mask is not used |
4167 | */ |
4168 | prevCPUInfo.affinityMask = GetCurrentProcessCpuMask(); |
4169 | if (prevCPUInfo.affinityMask == 0) |
4170 | { // create a mask that has g_SystemInfo.dwNumberOfProcessors; |
4171 | DWORD_PTR mask = 0, maskpos = 1; |
4172 | for (unsigned int i=0; i < g_SystemInfo.dwNumberOfProcessors; i++) |
4173 | { |
4174 | mask |= maskpos; |
4175 | maskpos <<= 1; |
4176 | } |
4177 | prevCPUInfo.affinityMask = mask; |
4178 | } |
4179 | |
4180 | // in some cases GetCurrentProcessCpuCount() returns a number larger than |
4181 | // g_SystemInfo.dwNumberOfProcessor when there are CPU groups, use the larger |
4182 | // one to create buffer. This buffer must be cleared with 0's to get correct |
4183 | // CPU usage statistics |
4184 | int elementsNeeded = NumberOfProcessors > g_SystemInfo.dwNumberOfProcessors ? |
4185 | NumberOfProcessors : g_SystemInfo.dwNumberOfProcessors; |
4186 | if (!ClrSafeInt<int>::multiply(elementsNeeded, sizeof(SYSTEM_PROCESSOR_PERFORMANCE_INFORMATION), |
4187 | prevCPUInfo.usageBufferSize)) |
4188 | return 0; |
4189 | |
4190 | prevCPUInfo.usageBuffer = (SYSTEM_PROCESSOR_PERFORMANCE_INFORMATION *)alloca(prevCPUInfo.usageBufferSize); |
4191 | if (prevCPUInfo.usageBuffer == NULL) |
4192 | return 0; |
4193 | |
4194 | memset((void *)prevCPUInfo.usageBuffer, 0, prevCPUInfo.usageBufferSize); //must clear it with 0s |
4195 | |
4196 | GetCPUBusyTime_NT(&prevCPUInfo); |
4197 | #else // !FEATURE_PAL |
4198 | PAL_IOCP_CPU_INFORMATION prevCPUInfo; |
4199 | GetCPUBusyTime_NT(&prevCPUInfo); // ignore return value the first time |
4200 | #endif // !FEATURE_PAL |
4201 | |
4202 | BOOL IgnoreNextSample = FALSE; |
4203 | |
4204 | do |
4205 | { |
4206 | timer.Wait(); |
4207 | |
4208 | if(CLRConfig::GetConfigValue(CLRConfig::INTERNAL_ThreadPool_EnableWorkerTracking)) |
4209 | FireEtwThreadPoolWorkingThreadCount(TakeMaxWorkingThreadCount(), GetClrInstanceId()); |
4210 | |
4211 | #ifdef DEBUGGING_SUPPORTED |
4212 | // if we are stopped at a debug breakpoint, go back to sleep |
4213 | if (CORDebuggerAttached() && g_pDebugInterface->IsStopped()) |
4214 | continue; |
4215 | #endif // DEBUGGING_SUPPORTED |
4216 | |
4217 | if(g_IsPaused) |
4218 | { |
4219 | _ASSERTE(g_ClrResumeEvent.IsValid()); |
4220 | EX_TRY { |
4221 | g_ClrResumeEvent.Wait(INFINITE, TRUE); |
4222 | } |
4223 | EX_CATCH { |
4224 | // Assert on debug builds |
4225 | _ASSERTE(FALSE); |
4226 | } |
4227 | EX_END_CATCH(SwallowAllExceptions); |
4228 | } |
4229 | |
4230 | if (!GCHeapUtilities::IsGCInProgress(FALSE) ) |
4231 | { |
4232 | if (IgnoreNextSample) |
4233 | { |
4234 | IgnoreNextSample = FALSE; |
4235 | int cpuUtilizationTemp = GetCPUBusyTime_NT(&prevCPUInfo); // updates prevCPUInfo as side effect |
4236 | // don't artificially drive down average if cpu is high |
4237 | if (cpuUtilizationTemp <= CpuUtilizationLow) |
4238 | cpuUtilization = CpuUtilizationLow + 1; |
4239 | else |
4240 | cpuUtilization = cpuUtilizationTemp; |
4241 | } |
4242 | else |
4243 | { |
4244 | cpuUtilization = GetCPUBusyTime_NT(&prevCPUInfo); // updates prevCPUInfo as side effect |
4245 | } |
4246 | } |
4247 | else |
4248 | { |
4249 | int cpuUtilizationTemp = GetCPUBusyTime_NT(&prevCPUInfo); // updates prevCPUInfo as side effect |
4250 | // don't artificially drive down average if cpu is high |
4251 | if (cpuUtilizationTemp <= CpuUtilizationLow) |
4252 | cpuUtilization = CpuUtilizationLow + 1; |
4253 | else |
4254 | cpuUtilization = cpuUtilizationTemp; |
4255 | IgnoreNextSample = TRUE; |
4256 | } |
4257 | |
4258 | #ifndef FEATURE_PAL |
4259 | // don't mess with CP thread pool settings if not initialized yet |
4260 | if (InitCompletionPortThreadpool) |
4261 | { |
4262 | ThreadCounter::Counts oldCounts, newCounts; |
4263 | oldCounts = CPThreadCounter.GetCleanCounts(); |
4264 | |
4265 | if (oldCounts.NumActive == oldCounts.NumWorking && |
4266 | oldCounts.NumRetired == 0 && |
4267 | oldCounts.NumActive < MaxLimitTotalCPThreads && |
4268 | !g_fCompletionPortDrainNeeded && |
4269 | NumCPInfrastructureThreads == 0 && // infrastructure threads count as "to be free as needed" |
4270 | !GCHeapUtilities::IsGCInProgress(TRUE)) |
4271 | |
4272 | { |
4273 | BOOL status; |
4274 | DWORD numBytes; |
4275 | size_t key; |
4276 | LPOVERLAPPED pOverlapped; |
4277 | DWORD errorCode; |
4278 | |
4279 | errorCode = S_OK; |
4280 | |
4281 | status = GetQueuedCompletionStatus( |
4282 | GlobalCompletionPort, |
4283 | &numBytes, |
4284 | (PULONG_PTR)&key, |
4285 | &pOverlapped, |
4286 | 0 // immediate return |
4287 | ); |
4288 | |
4289 | if (status == 0) |
4290 | { |
4291 | errorCode = GetLastError(); |
4292 | } |
4293 | |
4294 | if(pOverlapped == &overlappedForContinueCleanup) |
4295 | { |
4296 | // if we picked up a "Continue Drainage" notification DO NOT create a new CP thread |
4297 | } |
4298 | else |
4299 | if (errorCode != WAIT_TIMEOUT) |
4300 | { |
4301 | QueuedStatus *CompletionStatus = NULL; |
4302 | |
4303 | // loop, retrying until memory is allocated. Under such conditions the gate |
4304 | // thread is not useful anyway, so I feel comfortable with this behavior |
4305 | do |
4306 | { |
4307 | // make sure to free mem later in thread |
4308 | CompletionStatus = new (nothrow) QueuedStatus; |
4309 | if (CompletionStatus == NULL) |
4310 | { |
4311 | __SwitchToThread(GATE_THREAD_DELAY, CALLER_LIMITS_SPINNING); |
4312 | } |
4313 | } |
4314 | while (CompletionStatus == NULL); |
4315 | |
4316 | CompletionStatus->numBytes = numBytes; |
4317 | CompletionStatus->key = (PULONG_PTR)key; |
4318 | CompletionStatus->pOverlapped = pOverlapped; |
4319 | CompletionStatus->errorCode = errorCode; |
4320 | |
4321 | // IOCP threads are created as "active" and "working" |
4322 | while (true) |
4323 | { |
4324 | // counts volatile read paired with CompareExchangeCounts loop set |
4325 | oldCounts = CPThreadCounter.DangerousGetDirtyCounts(); |
4326 | newCounts = oldCounts; |
4327 | newCounts.NumActive++; |
4328 | newCounts.NumWorking++; |
4329 | if (oldCounts == CPThreadCounter.CompareExchangeCounts(newCounts, oldCounts)) |
4330 | break; |
4331 | } |
4332 | |
4333 | // loop, retrying until thread is created. |
4334 | while (!CreateCompletionPortThread((LPVOID)CompletionStatus)) |
4335 | { |
4336 | __SwitchToThread(GATE_THREAD_DELAY, CALLER_LIMITS_SPINNING); |
4337 | } |
4338 | } |
4339 | } |
4340 | else if (cpuUtilization < CpuUtilizationLow) |
4341 | { |
4342 | // this could be an indication that threads might be getting blocked or there is no work |
4343 | if (oldCounts.NumWorking == oldCounts.NumActive && // don't bump the limit if there are already free threads |
4344 | oldCounts.NumRetired > 0) |
4345 | { |
4346 | RetiredCPWakeupEvent->Set(); |
4347 | } |
4348 | } |
4349 | } |
4350 | #endif // !FEATURE_PAL |
4351 | |
4352 | if (0 == CLRConfig::GetConfigValue(CLRConfig::INTERNAL_ThreadPool_DisableStarvationDetection)) |
4353 | { |
4354 | if (PerAppDomainTPCountList::AreRequestsPendingInAnyAppDomains() && SufficientDelaySinceLastDequeue()) |
4355 | { |
4356 | DangerousNonHostedSpinLockHolder tal(&ThreadAdjustmentLock); |
4357 | |
4358 | ThreadCounter::Counts counts = WorkerCounter.GetCleanCounts(); |
4359 | while (counts.NumActive < MaxLimitTotalWorkerThreads && //don't add a thread if we're at the max |
4360 | counts.NumActive >= counts.MaxWorking) //don't add a thread if we're already in the process of adding threads |
4361 | { |
4362 | bool breakIntoDebugger = (0 != CLRConfig::GetConfigValue(CLRConfig::INTERNAL_ThreadPool_DebugBreakOnWorkerStarvation)); |
4363 | if (breakIntoDebugger) |
4364 | { |
4365 | OutputDebugStringW(W("The CLR ThreadPool detected work queue starvation!" )); |
4366 | DebugBreak(); |
4367 | } |
4368 | |
4369 | ThreadCounter::Counts newCounts = counts; |
4370 | newCounts.MaxWorking = newCounts.NumActive + 1; |
4371 | |
4372 | ThreadCounter::Counts oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts); |
4373 | if (oldCounts == counts) |
4374 | { |
4375 | HillClimbingInstance.ForceChange(newCounts.MaxWorking, Starvation); |
4376 | MaybeAddWorkingWorker(); |
4377 | break; |
4378 | } |
4379 | else |
4380 | { |
4381 | counts = oldCounts; |
4382 | } |
4383 | } |
4384 | } |
4385 | } |
4386 | } |
4387 | while (ShouldGateThreadKeepRunning()); |
4388 | |
4389 | return 0; |
4390 | } |
4391 | |
4392 | // called by logic to spawn a new completion port thread. |
4393 | // return false if not enough time has elapsed since the last |
4394 | // time we sampled the cpu utilization. |
4395 | BOOL ThreadpoolMgr::SufficientDelaySinceLastSample(unsigned int LastThreadCreationTime, |
4396 | unsigned NumThreads, // total number of threads of that type (worker or CP) |
4397 | double throttleRate // the delay is increased by this percentage for each extra thread |
4398 | ) |
4399 | { |
4400 | LIMITED_METHOD_CONTRACT; |
4401 | |
4402 | unsigned dwCurrentTickCount = GetTickCount(); |
4403 | |
4404 | unsigned delaySinceLastThreadCreation = dwCurrentTickCount - LastThreadCreationTime; |
4405 | |
4406 | unsigned minWaitBetweenThreadCreation = GATE_THREAD_DELAY; |
4407 | |
4408 | if (throttleRate > 0.0) |
4409 | { |
4410 | _ASSERTE(throttleRate <= 1.0); |
4411 | |
4412 | unsigned adjustedThreadCount = NumThreads > NumberOfProcessors ? (NumThreads - NumberOfProcessors) : 0; |
4413 | |
4414 | minWaitBetweenThreadCreation = (unsigned) (GATE_THREAD_DELAY * pow((1.0 + throttleRate),(double)adjustedThreadCount)); |
4415 | } |
4416 | // the amount of time to wait should grow up as the number of threads is increased |
4417 | |
4418 | return (delaySinceLastThreadCreation > minWaitBetweenThreadCreation); |
4419 | |
4420 | } |
4421 | |
4422 | |
4423 | // called by logic to spawn new worker threads, return true if it's been too long |
4424 | // since the last dequeue operation - takes number of worker threads into account |
4425 | // in deciding "too long" |
4426 | BOOL ThreadpoolMgr::SufficientDelaySinceLastDequeue() |
4427 | { |
4428 | LIMITED_METHOD_CONTRACT; |
4429 | |
4430 | #define DEQUEUE_DELAY_THRESHOLD (GATE_THREAD_DELAY * 2) |
4431 | |
4432 | unsigned delay = GetTickCount() - VolatileLoad(&LastDequeueTime); |
4433 | unsigned tooLong; |
4434 | |
4435 | if(cpuUtilization < CpuUtilizationLow) |
4436 | { |
4437 | tooLong = GATE_THREAD_DELAY; |
4438 | } |
4439 | else |
4440 | { |
4441 | ThreadCounter::Counts counts = WorkerCounter.GetCleanCounts(); |
4442 | unsigned numThreads = counts.MaxWorking; |
4443 | tooLong = numThreads * DEQUEUE_DELAY_THRESHOLD; |
4444 | } |
4445 | |
4446 | return (delay > tooLong); |
4447 | |
4448 | } |
4449 | |
4450 | |
4451 | #ifdef _MSC_VER |
4452 | #ifdef _WIN64 |
4453 | #pragma warning (default : 4716) |
4454 | #else |
4455 | #pragma warning (default : 4715) |
4456 | #endif |
4457 | #endif |
4458 | |
4459 | /************************************************************************/ |
4460 | |
4461 | struct CreateTimerThreadParams { |
4462 | CLREvent event; |
4463 | BOOL setupSucceeded; |
4464 | }; |
4465 | |
4466 | BOOL ThreadpoolMgr::CreateTimerQueueTimer(PHANDLE phNewTimer, |
4467 | WAITORTIMERCALLBACK Callback, |
4468 | PVOID Parameter, |
4469 | DWORD DueTime, |
4470 | DWORD Period, |
4471 | ULONG Flag) |
4472 | { |
4473 | CONTRACTL |
4474 | { |
4475 | THROWS; // EnsureInitialized, CreateAutoEvent can throw |
4476 | if (GetThread()) {GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);} // There can be calls thru ICorThreadpool |
4477 | MODE_ANY; |
4478 | INJECT_FAULT(COMPlusThrowOM()); |
4479 | } |
4480 | CONTRACTL_END; |
4481 | |
4482 | EnsureInitialized(); |
4483 | |
4484 | // For now we use just one timer thread. Consider using multiple timer threads if |
4485 | // number of timers in the queue exceeds a certain threshold. The logic and code |
4486 | // would be similar to the one for creating wait threads. |
4487 | if (NULL == TimerThread) |
4488 | { |
4489 | CrstHolder csh(&TimerQueueCriticalSection); |
4490 | |
4491 | // check again |
4492 | if (NULL == TimerThread) |
4493 | { |
4494 | CreateTimerThreadParams params; |
4495 | params.event.CreateAutoEvent(FALSE); |
4496 | |
4497 | params.setupSucceeded = FALSE; |
4498 | |
4499 | HANDLE TimerThreadHandle = Thread::CreateUtilityThread(Thread::StackSize_Small, TimerThreadStart, ¶ms, W(".NET Timer" )); |
4500 | |
4501 | if (TimerThreadHandle == NULL) |
4502 | { |
4503 | params.event.CloseEvent(); |
4504 | ThrowOutOfMemory(); |
4505 | } |
4506 | |
4507 | { |
4508 | GCX_PREEMP(); |
4509 | for(;;) |
4510 | { |
4511 | // if a host throws because it couldnt allocate another thread, |
4512 | // just retry the wait. |
4513 | if (SafeWait(¶ms.event,INFINITE, FALSE) != WAIT_TIMEOUT) |
4514 | break; |
4515 | } |
4516 | } |
4517 | params.event.CloseEvent(); |
4518 | |
4519 | if (!params.setupSucceeded) |
4520 | { |
4521 | CloseHandle(TimerThreadHandle); |
4522 | return FALSE; |
4523 | } |
4524 | |
4525 | TimerThread = TimerThreadHandle; |
4526 | } |
4527 | |
4528 | } |
4529 | |
4530 | |
4531 | NewHolder<TimerInfo> timerInfoHolder; |
4532 | TimerInfo * timerInfo = new (nothrow) TimerInfo; |
4533 | *phNewTimer = (HANDLE) timerInfo; |
4534 | |
4535 | if (NULL == timerInfo) |
4536 | ThrowOutOfMemory(); |
4537 | |
4538 | timerInfoHolder.Assign(timerInfo); |
4539 | |
4540 | timerInfo->FiringTime = DueTime; |
4541 | timerInfo->Function = Callback; |
4542 | timerInfo->Context = Parameter; |
4543 | timerInfo->Period = Period; |
4544 | timerInfo->state = 0; |
4545 | timerInfo->flag = Flag; |
4546 | timerInfo->ExternalCompletionEvent = INVALID_HANDLE; |
4547 | timerInfo->ExternalEventSafeHandle = NULL; |
4548 | timerInfo->handleOwningAD = (ADID) 0; |
4549 | |
4550 | BOOL status = QueueUserAPC((PAPCFUNC)InsertNewTimer,TimerThread,(size_t)timerInfo); |
4551 | if (FALSE == status) |
4552 | { |
4553 | return FALSE; |
4554 | } |
4555 | |
4556 | timerInfoHolder.SuppressRelease(); |
4557 | return TRUE; |
4558 | } |
4559 | |
4560 | #ifdef _MSC_VER |
4561 | #ifdef _WIN64 |
4562 | #pragma warning (disable : 4716) |
4563 | #else |
4564 | #pragma warning (disable : 4715) |
4565 | #endif |
4566 | #endif |
4567 | DWORD WINAPI ThreadpoolMgr::TimerThreadStart(LPVOID p) |
4568 | { |
4569 | ClrFlsSetThreadType (ThreadType_Timer); |
4570 | |
4571 | STATIC_CONTRACT_THROWS; |
4572 | STATIC_CONTRACT_GC_TRIGGERS; // due to SetApartment |
4573 | STATIC_CONTRACT_MODE_PREEMPTIVE; |
4574 | STATIC_CONTRACT_SO_INTOLERANT; |
4575 | /* cannot use contract because of SEH |
4576 | CONTRACTL |
4577 | { |
4578 | NOTHROW; |
4579 | GC_NOTRIGGER; |
4580 | MODE_PREEMPTIVE; |
4581 | } |
4582 | CONTRACTL_END;*/ |
4583 | |
4584 | CreateTimerThreadParams* params = (CreateTimerThreadParams*)p; |
4585 | |
4586 | Thread* pThread = SetupThreadNoThrow(); |
4587 | |
4588 | params->setupSucceeded = (pThread == NULL) ? 0 : 1; |
4589 | params->event.Set(); |
4590 | |
4591 | if (pThread == NULL) |
4592 | return 0; |
4593 | |
4594 | pTimerThread = pThread; |
4595 | // Timer threads never die |
4596 | |
4597 | LastTickCount = GetTickCount(); |
4598 | |
4599 | #ifdef FEATURE_COMINTEROP |
4600 | if (pThread->SetApartment(Thread::AS_InMTA, TRUE) != Thread::AS_InMTA) |
4601 | { |
4602 | // @todo: should we log the failure |
4603 | goto Exit; |
4604 | } |
4605 | #endif // FEATURE_COMINTEROP |
4606 | |
4607 | for (;;) |
4608 | { |
4609 | // moved to its own function since EX_TRY consumes stack |
4610 | #ifdef _MSC_VER |
4611 | #pragma inline_depth (0) // the function containing EX_TRY can't be inlined here |
4612 | #endif |
4613 | TimerThreadFire(); |
4614 | #ifdef _MSC_VER |
4615 | #pragma inline_depth (20) |
4616 | #endif |
4617 | } |
4618 | |
4619 | #ifdef FEATURE_COMINTEROP |
4620 | // unreachable code |
4621 | // if (pThread) { |
4622 | // pThread->SetApartment(Thread::AS_Unknown, TRUE); |
4623 | // } |
4624 | Exit: |
4625 | |
4626 | // @todo: replace with host provided ExitThread |
4627 | return 0; |
4628 | #endif |
4629 | } |
4630 | |
4631 | void ThreadpoolMgr::TimerThreadFire() |
4632 | { |
4633 | CONTRACTL |
4634 | { |
4635 | THROWS; |
4636 | GC_TRIGGERS; |
4637 | MODE_PREEMPTIVE; |
4638 | } |
4639 | CONTRACTL_END; |
4640 | |
4641 | EX_TRY { |
4642 | DWORD timeout = FireTimers(); |
4643 | |
4644 | #undef SleepEx |
4645 | SleepEx(timeout, TRUE); |
4646 | #define SleepEx(a,b) Dont_Use_SleepEx(a,b) |
4647 | |
4648 | // the thread could wake up either because an APC completed or the sleep timeout |
4649 | // in both case, we need to sweep the timer queue, firing timers, and readjusting |
4650 | // the next firing time |
4651 | |
4652 | } |
4653 | EX_CATCH { |
4654 | // Assert on debug builds since a dead timer thread is a fatal error |
4655 | _ASSERTE(FALSE); |
4656 | if (SwallowUnhandledExceptions()) |
4657 | { |
4658 | // Do nothing to swallow the exception |
4659 | } |
4660 | else |
4661 | { |
4662 | EX_RETHROW; |
4663 | } |
4664 | } |
4665 | EX_END_CATCH(SwallowAllExceptions); |
4666 | } |
4667 | |
4668 | #ifdef _MSC_VER |
4669 | #ifdef _WIN64 |
4670 | #pragma warning (default : 4716) |
4671 | #else |
4672 | #pragma warning (default : 4715) |
4673 | #endif |
4674 | #endif |
4675 | |
4676 | // Executed as an APC in timer thread |
4677 | void ThreadpoolMgr::InsertNewTimer(TimerInfo* pArg) |
4678 | { |
4679 | CONTRACTL |
4680 | { |
4681 | NOTHROW; |
4682 | GC_TRIGGERS; |
4683 | MODE_ANY; |
4684 | } |
4685 | CONTRACTL_END; |
4686 | STATIC_CONTRACT_SO_INTOLERANT; |
4687 | |
4688 | _ASSERTE(pArg); |
4689 | TimerInfo * timerInfo = pArg; |
4690 | |
4691 | if (timerInfo->state & TIMER_DELETE) |
4692 | { // timer was deleted before it could be registered |
4693 | DeleteTimer(timerInfo); |
4694 | return; |
4695 | } |
4696 | |
4697 | // set the firing time = current time + due time (note initially firing time = due time) |
4698 | DWORD currentTime = GetTickCount(); |
4699 | if (timerInfo->FiringTime == (ULONG) -1) |
4700 | { |
4701 | timerInfo->state = TIMER_REGISTERED; |
4702 | timerInfo->refCount = 1; |
4703 | |
4704 | } |
4705 | else |
4706 | { |
4707 | timerInfo->FiringTime += currentTime; |
4708 | |
4709 | timerInfo->state = (TIMER_REGISTERED | TIMER_ACTIVE); |
4710 | timerInfo->refCount = 1; |
4711 | |
4712 | // insert the timer in the queue |
4713 | InsertTailList(&TimerQueue,(&timerInfo->link)); |
4714 | } |
4715 | |
4716 | return; |
4717 | } |
4718 | |
4719 | |
4720 | // executed by the Timer thread |
4721 | // sweeps through the list of timers, readjusting the firing times, queueing APCs for |
4722 | // those that have expired, and returns the next firing time interval |
4723 | DWORD ThreadpoolMgr::FireTimers() |
4724 | { |
4725 | CONTRACTL |
4726 | { |
4727 | THROWS; // QueueUserWorkItem can throw |
4728 | if (GetThread()) { GC_TRIGGERS;} else {DISABLED(GC_NOTRIGGER);} |
4729 | if (GetThread()) { MODE_PREEMPTIVE;} else { DISABLED(MODE_ANY);} |
4730 | } |
4731 | CONTRACTL_END; |
4732 | |
4733 | DWORD currentTime = GetTickCount(); |
4734 | DWORD nextFiringInterval = (DWORD) -1; |
4735 | TimerInfo* timerInfo = NULL; |
4736 | |
4737 | EX_TRY |
4738 | { |
4739 | for (LIST_ENTRY* node = (LIST_ENTRY*) TimerQueue.Flink; |
4740 | node != &TimerQueue; |
4741 | ) |
4742 | { |
4743 | timerInfo = (TimerInfo*) node; |
4744 | node = (LIST_ENTRY*) node->Flink; |
4745 | |
4746 | if (TimeExpired(LastTickCount, currentTime, timerInfo->FiringTime)) |
4747 | { |
4748 | if (timerInfo->Period == 0 || timerInfo->Period == (ULONG) -1) |
4749 | { |
4750 | DeactivateTimer(timerInfo); |
4751 | } |
4752 | |
4753 | InterlockedIncrement(&timerInfo->refCount); |
4754 | |
4755 | QueueUserWorkItem(AsyncTimerCallbackCompletion, |
4756 | timerInfo, |
4757 | QUEUE_ONLY /* TimerInfo take care of deleting*/); |
4758 | |
4759 | if (timerInfo->Period != 0 && timerInfo->Period != (ULONG)-1) |
4760 | { |
4761 | ULONG nextFiringTime = timerInfo->FiringTime + timerInfo->Period; |
4762 | DWORD firingInterval; |
4763 | if (TimeExpired(timerInfo->FiringTime, currentTime, nextFiringTime)) |
4764 | { |
4765 | // Enough time has elapsed to fire the timer yet again. The timer is not able to keep up with the short |
4766 | // period, have it fire 1 ms from now to avoid spinning without a delay. |
4767 | timerInfo->FiringTime = currentTime + 1; |
4768 | firingInterval = 1; |
4769 | } |
4770 | else |
4771 | { |
4772 | timerInfo->FiringTime = nextFiringTime; |
4773 | firingInterval = TimeInterval(nextFiringTime, currentTime); |
4774 | } |
4775 | |
4776 | if (firingInterval < nextFiringInterval) |
4777 | nextFiringInterval = firingInterval; |
4778 | } |
4779 | } |
4780 | else |
4781 | { |
4782 | DWORD firingInterval = TimeInterval(timerInfo->FiringTime, currentTime); |
4783 | if (firingInterval < nextFiringInterval) |
4784 | nextFiringInterval = firingInterval; |
4785 | } |
4786 | } |
4787 | } |
4788 | EX_CATCH |
4789 | { |
4790 | // If QueueUserWorkItem throws OOM, swallow the exception and retry on |
4791 | // the next call to FireTimers(), otherwise retrhow. |
4792 | Exception *ex = GET_EXCEPTION(); |
4793 | // undo the call to DeactivateTimer() |
4794 | InterlockedDecrement(&timerInfo->refCount); |
4795 | timerInfo->state = timerInfo->state & TIMER_ACTIVE; |
4796 | InsertTailList(&TimerQueue, (&timerInfo->link)); |
4797 | if (ex->GetHR() != E_OUTOFMEMORY) |
4798 | { |
4799 | EX_RETHROW; |
4800 | } |
4801 | } |
4802 | EX_END_CATCH(RethrowTerminalExceptions); |
4803 | |
4804 | LastTickCount = currentTime; |
4805 | |
4806 | return nextFiringInterval; |
4807 | } |
4808 | |
4809 | DWORD WINAPI ThreadpoolMgr::AsyncTimerCallbackCompletion(PVOID pArgs) |
4810 | { |
4811 | CONTRACTL |
4812 | { |
4813 | THROWS; |
4814 | GC_TRIGGERS; |
4815 | MODE_PREEMPTIVE; |
4816 | SO_TOLERANT; |
4817 | } |
4818 | CONTRACTL_END; |
4819 | |
4820 | Thread* pThread = GetThread(); |
4821 | |
4822 | if (pThread == NULL) |
4823 | { |
4824 | HRESULT hr = ERROR_SUCCESS; |
4825 | |
4826 | ClrFlsSetThreadType(ThreadType_Threadpool_Worker); |
4827 | pThread = SetupThreadNoThrow(&hr); |
4828 | |
4829 | if (pThread == NULL) |
4830 | { |
4831 | return hr; |
4832 | } |
4833 | } |
4834 | |
4835 | BEGIN_SO_INTOLERANT_CODE(pThread); |
4836 | { |
4837 | TimerInfo* timerInfo = (TimerInfo*) pArgs; |
4838 | ((WAITORTIMERCALLBACKFUNC) timerInfo->Function) (timerInfo->Context, TRUE) ; |
4839 | |
4840 | if (InterlockedDecrement(&timerInfo->refCount) == 0) |
4841 | { |
4842 | DeleteTimer(timerInfo); |
4843 | } |
4844 | } |
4845 | END_SO_INTOLERANT_CODE; |
4846 | |
4847 | return ERROR_SUCCESS; |
4848 | } |
4849 | |
4850 | |
4851 | // removes the timer from the timer queue, thereby cancelling it |
4852 | // there may still be pending callbacks that haven't completed |
4853 | void ThreadpoolMgr::DeactivateTimer(TimerInfo* timerInfo) |
4854 | { |
4855 | LIMITED_METHOD_CONTRACT; |
4856 | |
4857 | RemoveEntryList((LIST_ENTRY*) timerInfo); |
4858 | |
4859 | // This timer info could go into another linked list of timer infos |
4860 | // waiting to be released. Reinitialize the list pointers |
4861 | InitializeListHead(&timerInfo->link); |
4862 | timerInfo->state = timerInfo->state & ~TIMER_ACTIVE; |
4863 | } |
4864 | |
4865 | DWORD WINAPI ThreadpoolMgr::AsyncDeleteTimer(PVOID pArgs) |
4866 | { |
4867 | CONTRACTL |
4868 | { |
4869 | THROWS; |
4870 | MODE_PREEMPTIVE; |
4871 | GC_TRIGGERS; |
4872 | } |
4873 | CONTRACTL_END; |
4874 | |
4875 | Thread * pThread = GetThread(); |
4876 | |
4877 | if (pThread == NULL) |
4878 | { |
4879 | HRESULT hr = ERROR_SUCCESS; |
4880 | |
4881 | ClrFlsSetThreadType(ThreadType_Threadpool_Worker); |
4882 | pThread = SetupThreadNoThrow(&hr); |
4883 | |
4884 | if (pThread == NULL) |
4885 | { |
4886 | return hr; |
4887 | } |
4888 | } |
4889 | |
4890 | DeleteTimer((TimerInfo*) pArgs); |
4891 | |
4892 | return ERROR_SUCCESS; |
4893 | } |
4894 | |
4895 | void ThreadpoolMgr::DeleteTimer(TimerInfo* timerInfo) |
4896 | { |
4897 | CONTRACTL |
4898 | { |
4899 | if (GetThread() == pTimerThread) { NOTHROW; } else { THROWS; } |
4900 | GC_TRIGGERS; |
4901 | MODE_ANY; |
4902 | } |
4903 | CONTRACTL_END; |
4904 | |
4905 | _ASSERTE((timerInfo->state & TIMER_ACTIVE) == 0); |
4906 | |
4907 | _ASSERTE(!(timerInfo->flag & WAIT_FREE_CONTEXT)); |
4908 | |
4909 | if (timerInfo->flag & WAIT_INTERNAL_COMPLETION) |
4910 | { |
4911 | timerInfo->InternalCompletionEvent.Set(); |
4912 | return; // the timerInfo will be deleted by the thread that's waiting on InternalCompletionEvent |
4913 | } |
4914 | |
4915 | // ExternalCompletionEvent comes from Host, ExternalEventSafeHandle from managed code. |
4916 | // They are mutually exclusive. |
4917 | _ASSERTE(!(timerInfo->ExternalCompletionEvent != INVALID_HANDLE && |
4918 | timerInfo->ExternalEventSafeHandle != NULL)); |
4919 | |
4920 | if (timerInfo->ExternalCompletionEvent != INVALID_HANDLE) |
4921 | { |
4922 | UnsafeSetEvent(timerInfo->ExternalCompletionEvent); |
4923 | timerInfo->ExternalCompletionEvent = INVALID_HANDLE; |
4924 | } |
4925 | |
4926 | // We cannot block the timer thread, so some cleanup is deferred to other threads. |
4927 | if (GetThread() == pTimerThread) |
4928 | { |
4929 | // Notify the ExternalEventSafeHandle with an user work item |
4930 | if (timerInfo->ExternalEventSafeHandle != NULL) |
4931 | { |
4932 | BOOL success = FALSE; |
4933 | EX_TRY |
4934 | { |
4935 | if (QueueUserWorkItem(AsyncDeleteTimer, |
4936 | timerInfo, |
4937 | QUEUE_ONLY) != FALSE) |
4938 | { |
4939 | success = TRUE; |
4940 | } |
4941 | } |
4942 | EX_CATCH |
4943 | { |
4944 | } |
4945 | EX_END_CATCH(SwallowAllExceptions); |
4946 | |
4947 | // If unable to queue a user work item, fall back to queueing timer for release |
4948 | // which will happen *sometime* in the future. |
4949 | if (success == FALSE) |
4950 | { |
4951 | QueueTimerInfoForRelease(timerInfo); |
4952 | } |
4953 | |
4954 | return; |
4955 | } |
4956 | |
4957 | // Releasing GC handles can block. So we wont do this on the timer thread. |
4958 | // We'll put it in a list which will be processed by a worker thread |
4959 | if (timerInfo->Context != NULL) |
4960 | { |
4961 | QueueTimerInfoForRelease(timerInfo); |
4962 | return; |
4963 | } |
4964 | } |
4965 | |
4966 | // To get here we are either not the Timer thread or there is no blocking work to be done |
4967 | |
4968 | if (timerInfo->Context != NULL) |
4969 | { |
4970 | GCX_COOP(); |
4971 | delete (ThreadpoolMgr::TimerInfoContext*)timerInfo->Context; |
4972 | } |
4973 | |
4974 | if (timerInfo->ExternalEventSafeHandle != NULL) |
4975 | { |
4976 | ReleaseTimerInfo(timerInfo); |
4977 | } |
4978 | |
4979 | delete timerInfo; |
4980 | |
4981 | } |
4982 | |
4983 | // We add TimerInfos from deleted timers into a linked list. |
4984 | // A worker thread will later release the handles held by the TimerInfo |
4985 | // and recycle them if possible. |
4986 | void ThreadpoolMgr::QueueTimerInfoForRelease(TimerInfo *pTimerInfo) |
4987 | { |
4988 | CONTRACTL |
4989 | { |
4990 | NOTHROW; |
4991 | GC_NOTRIGGER; |
4992 | MODE_ANY; |
4993 | } |
4994 | CONTRACTL_END; |
4995 | |
4996 | // The synchronization in this method depends on the fact that |
4997 | // - There is only one timer thread |
4998 | // - The one and only timer thread is executing this method. |
4999 | // - This function wont go into an alertable state. That could trigger another APC. |
5000 | // Else two threads can be queueing timerinfos and a race could |
5001 | // lead to leaked memory and handles |
5002 | _ASSERTE(GetThread()); |
5003 | _ASSERTE(pTimerThread == GetThread()); |
5004 | TimerInfo *pHead = NULL; |
5005 | |
5006 | // Make sure this timer info has been deactivated and removed from any other lists |
5007 | _ASSERTE((pTimerInfo->state & TIMER_ACTIVE) == 0); |
5008 | //_ASSERTE(pTimerInfo->link.Blink == &(pTimerInfo->link) && |
5009 | // pTimerInfo->link.Flink == &(pTimerInfo->link)); |
5010 | // Make sure "link" is the first field in TimerInfo |
5011 | _ASSERTE(pTimerInfo == (PVOID)&pTimerInfo->link); |
5012 | |
5013 | // Grab any previously published list |
5014 | if ((pHead = InterlockedExchangeT(&TimerInfosToBeRecycled, NULL)) != NULL) |
5015 | { |
5016 | // If there already is a list, just append |
5017 | InsertTailList((LIST_ENTRY *)pHead, &pTimerInfo->link); |
5018 | pTimerInfo = pHead; |
5019 | } |
5020 | else |
5021 | // If this is the head, make its next and previous ptrs point to itself |
5022 | InitializeListHead((LIST_ENTRY*)&pTimerInfo->link); |
5023 | |
5024 | // Publish the list |
5025 | (void) InterlockedExchangeT(&TimerInfosToBeRecycled, pTimerInfo); |
5026 | |
5027 | } |
5028 | |
5029 | void ThreadpoolMgr::FlushQueueOfTimerInfos() |
5030 | { |
5031 | CONTRACTL |
5032 | { |
5033 | THROWS; |
5034 | GC_TRIGGERS; |
5035 | MODE_ANY; |
5036 | } |
5037 | CONTRACTL_END; |
5038 | |
5039 | TimerInfo *pHeadTimerInfo = NULL, *pCurrTimerInfo = NULL; |
5040 | LIST_ENTRY *pNextInfo = NULL; |
5041 | |
5042 | if ((pHeadTimerInfo = InterlockedExchangeT(&TimerInfosToBeRecycled, NULL)) == NULL) |
5043 | return; |
5044 | |
5045 | do |
5046 | { |
5047 | RemoveHeadList((LIST_ENTRY *)pHeadTimerInfo, pNextInfo); |
5048 | _ASSERTE(pNextInfo != NULL); |
5049 | |
5050 | pCurrTimerInfo = (TimerInfo *) pNextInfo; |
5051 | |
5052 | GCX_COOP(); |
5053 | if (pCurrTimerInfo->Context != NULL) |
5054 | { |
5055 | delete (ThreadpoolMgr::TimerInfoContext*)pCurrTimerInfo->Context; |
5056 | } |
5057 | |
5058 | if (pCurrTimerInfo->ExternalEventSafeHandle != NULL) |
5059 | { |
5060 | ReleaseTimerInfo(pCurrTimerInfo); |
5061 | } |
5062 | |
5063 | delete pCurrTimerInfo; |
5064 | |
5065 | } |
5066 | while ((TimerInfo *)pNextInfo != pHeadTimerInfo); |
5067 | } |
5068 | |
5069 | /************************************************************************/ |
5070 | BOOL ThreadpoolMgr::ChangeTimerQueueTimer( |
5071 | HANDLE Timer, |
5072 | ULONG DueTime, |
5073 | ULONG Period) |
5074 | { |
5075 | CONTRACTL |
5076 | { |
5077 | THROWS; |
5078 | MODE_ANY; |
5079 | GC_NOTRIGGER; |
5080 | INJECT_FAULT(COMPlusThrowOM()); |
5081 | } |
5082 | CONTRACTL_END; |
5083 | |
5084 | _ASSERTE(IsInitialized()); |
5085 | _ASSERTE(Timer); // not possible to give invalid handle in managed code |
5086 | |
5087 | NewHolder<TimerUpdateInfo> updateInfoHolder; |
5088 | TimerUpdateInfo *updateInfo = new TimerUpdateInfo; |
5089 | updateInfoHolder.Assign(updateInfo); |
5090 | |
5091 | updateInfo->Timer = (TimerInfo*) Timer; |
5092 | updateInfo->DueTime = DueTime; |
5093 | updateInfo->Period = Period; |
5094 | |
5095 | BOOL status = QueueUserAPC((PAPCFUNC)UpdateTimer, |
5096 | TimerThread, |
5097 | (size_t) updateInfo); |
5098 | |
5099 | if (status) |
5100 | updateInfoHolder.SuppressRelease(); |
5101 | |
5102 | return(status); |
5103 | } |
5104 | |
5105 | void ThreadpoolMgr::UpdateTimer(TimerUpdateInfo* pArgs) |
5106 | { |
5107 | CONTRACTL |
5108 | { |
5109 | NOTHROW; |
5110 | GC_NOTRIGGER; |
5111 | MODE_ANY; |
5112 | } |
5113 | CONTRACTL_END; |
5114 | |
5115 | TimerUpdateInfo* updateInfo = (TimerUpdateInfo*) pArgs; |
5116 | TimerInfo* timerInfo = updateInfo->Timer; |
5117 | |
5118 | timerInfo->Period = updateInfo->Period; |
5119 | |
5120 | if (updateInfo->DueTime == (ULONG) -1) |
5121 | { |
5122 | if (timerInfo->state & TIMER_ACTIVE) |
5123 | { |
5124 | DeactivateTimer(timerInfo); |
5125 | } |
5126 | // else, noop (the timer was already inactive) |
5127 | _ASSERTE((timerInfo->state & TIMER_ACTIVE) == 0); |
5128 | |
5129 | delete updateInfo; |
5130 | return; |
5131 | } |
5132 | |
5133 | DWORD currentTime = GetTickCount(); |
5134 | timerInfo->FiringTime = currentTime + updateInfo->DueTime; |
5135 | |
5136 | delete updateInfo; |
5137 | |
5138 | if (! (timerInfo->state & TIMER_ACTIVE)) |
5139 | { |
5140 | // timer not active (probably a one shot timer that has expired), so activate it |
5141 | timerInfo->state |= TIMER_ACTIVE; |
5142 | _ASSERTE(timerInfo->refCount >= 1); |
5143 | // insert the timer in the queue |
5144 | InsertTailList(&TimerQueue,(&timerInfo->link)); |
5145 | |
5146 | } |
5147 | |
5148 | return; |
5149 | } |
5150 | |
5151 | /************************************************************************/ |
5152 | BOOL ThreadpoolMgr::DeleteTimerQueueTimer( |
5153 | HANDLE Timer, |
5154 | HANDLE Event) |
5155 | { |
5156 | CONTRACTL |
5157 | { |
5158 | THROWS; |
5159 | MODE_ANY; |
5160 | GC_TRIGGERS; |
5161 | } |
5162 | CONTRACTL_END; |
5163 | |
5164 | _ASSERTE(IsInitialized()); // cannot call delete before creating timer |
5165 | _ASSERTE(Timer); // not possible to give invalid handle in managed code |
5166 | |
5167 | // make volatile to avoid compiler reordering check after async call. |
5168 | // otherwise, DeregisterTimer could delete timerInfo before the comparison. |
5169 | VolatilePtr<TimerInfo> timerInfo = (TimerInfo*) Timer; |
5170 | |
5171 | if (Event == (HANDLE) -1) |
5172 | { |
5173 | //CONTRACT_VIOLATION(ThrowsViolation); |
5174 | timerInfo->InternalCompletionEvent.CreateAutoEvent(FALSE); |
5175 | timerInfo->flag |= WAIT_INTERNAL_COMPLETION; |
5176 | } |
5177 | else if (Event) |
5178 | { |
5179 | timerInfo->ExternalCompletionEvent = Event; |
5180 | } |
5181 | #ifdef _DEBUG |
5182 | else /* Event == NULL */ |
5183 | { |
5184 | _ASSERTE(timerInfo->ExternalCompletionEvent == INVALID_HANDLE); |
5185 | } |
5186 | #endif |
5187 | |
5188 | BOOL isBlocking = timerInfo->flag & WAIT_INTERNAL_COMPLETION; |
5189 | |
5190 | BOOL status = QueueUserAPC((PAPCFUNC)DeregisterTimer, |
5191 | TimerThread, |
5192 | (size_t)(TimerInfo*)timerInfo); |
5193 | |
5194 | if (FALSE == status) |
5195 | { |
5196 | if (isBlocking) |
5197 | timerInfo->InternalCompletionEvent.CloseEvent(); |
5198 | return FALSE; |
5199 | } |
5200 | |
5201 | if (isBlocking) |
5202 | { |
5203 | _ASSERTE(timerInfo->ExternalEventSafeHandle == NULL); |
5204 | _ASSERTE(timerInfo->ExternalCompletionEvent == INVALID_HANDLE); |
5205 | _ASSERTE(GetThread() != pTimerThread); |
5206 | |
5207 | timerInfo->InternalCompletionEvent.Wait(INFINITE,TRUE /*alertable*/); |
5208 | timerInfo->InternalCompletionEvent.CloseEvent(); |
5209 | // Release handles and delete TimerInfo |
5210 | _ASSERTE(timerInfo->refCount == 0); |
5211 | // if WAIT_INTERNAL_COMPLETION flag is not set, timerInfo will be deleted in DeleteTimer. |
5212 | timerInfo->flag &= ~WAIT_INTERNAL_COMPLETION; |
5213 | DeleteTimer(timerInfo); |
5214 | } |
5215 | return status; |
5216 | } |
5217 | |
5218 | void ThreadpoolMgr::DeregisterTimer(TimerInfo* pArgs) |
5219 | { |
5220 | CONTRACTL |
5221 | { |
5222 | NOTHROW; |
5223 | GC_TRIGGERS; |
5224 | MODE_PREEMPTIVE; |
5225 | SO_INTOLERANT; |
5226 | } |
5227 | CONTRACTL_END; |
5228 | |
5229 | TimerInfo* timerInfo = (TimerInfo*) pArgs; |
5230 | |
5231 | if (! (timerInfo->state & TIMER_REGISTERED) ) |
5232 | { |
5233 | // set state to deleted, so that it does not get registered |
5234 | timerInfo->state |= TIMER_DELETE ; |
5235 | |
5236 | // since the timer has not even been registered, we dont need an interlock to decrease the RefCount |
5237 | timerInfo->refCount--; |
5238 | |
5239 | return; |
5240 | } |
5241 | |
5242 | if (timerInfo->state & TIMER_ACTIVE) |
5243 | { |
5244 | DeactivateTimer(timerInfo); |
5245 | } |
5246 | |
5247 | if (InterlockedDecrement(&timerInfo->refCount) == 0 ) |
5248 | { |
5249 | DeleteTimer(timerInfo); |
5250 | } |
5251 | return; |
5252 | } |
5253 | |
5254 | #endif // !DACCESS_COMPILE |
5255 | |