Blender  V3.3
btTaskScheduler.cpp
Go to the documentation of this file.
1 
2 #include "LinearMath/btMinMax.h"
4 #include "LinearMath/btThreads.h"
6 #include <stdio.h>
7 #include <algorithm>
8 
9 #if BT_THREADSAFE
10 
12 
13 #if defined(_WIN32)
14 
15 #define WIN32_LEAN_AND_MEAN
16 
17 #include <windows.h>
18 
19 #endif
20 
21 typedef unsigned long long btU64;
22 static const int kCacheLineSize = 64;
23 
24 void btSpinPause()
25 {
26 #if defined(_WIN32)
27  YieldProcessor();
28 #endif
29 }
30 
31 struct WorkerThreadStatus
32 {
33  enum Type
34  {
35  kInvalid,
36  kWaitingForWork,
37  kWorking,
38  kSleeping,
39  };
40 };
41 
43 WorkerThreadDirectives
44 {
45  static const int kMaxThreadCount = BT_MAX_THREAD_COUNT;
46  // directives for all worker threads packed into a single cacheline
47  char m_threadDirs[kMaxThreadCount];
48 
49 public:
50  enum Type
51  {
52  kInvalid,
53  kGoToSleep, // go to sleep
54  kStayAwakeButIdle, // wait for not checking job queue
55  kScanForJobs, // actively scan job queue for jobs
56  };
57  WorkerThreadDirectives()
58  {
59  for (int i = 0; i < kMaxThreadCount; ++i)
60  {
61  m_threadDirs[i] = 0;
62  }
63  }
64 
65  Type getDirective(int threadId)
66  {
67  btAssert(threadId < kMaxThreadCount);
68  return static_cast<Type>(m_threadDirs[threadId]);
69  }
70 
71  void setDirectiveByRange(int threadBegin, int threadEnd, Type dir)
72  {
73  btAssert(threadBegin < threadEnd);
74  btAssert(threadEnd <= kMaxThreadCount);
75  char dirChar = static_cast<char>(dir);
76  for (int i = threadBegin; i < threadEnd; ++i)
77  {
78  m_threadDirs[i] = dirChar;
79  }
80  }
81 };
82 
83 class JobQueue;
84 
85 ATTRIBUTE_ALIGNED64(struct)
86 ThreadLocalStorage
87 {
88  int m_threadId;
89  WorkerThreadStatus::Type m_status;
90  int m_numJobsFinished;
91  btSpinMutex m_mutex;
92  btScalar m_sumResult;
93  WorkerThreadDirectives* m_directive;
94  JobQueue* m_queue;
95  btClock* m_clock;
96  unsigned int m_cooldownTime;
97 };
98 
99 struct IJob
100 {
101  virtual void executeJob(int threadId) = 0;
102 };
103 
104 class ParallelForJob : public IJob
105 {
106  const btIParallelForBody* m_body;
107  int m_begin;
108  int m_end;
109 
110 public:
111  ParallelForJob(int iBegin, int iEnd, const btIParallelForBody& body)
112  {
113  m_body = &body;
114  m_begin = iBegin;
115  m_end = iEnd;
116  }
117  virtual void executeJob(int threadId) BT_OVERRIDE
118  {
119  BT_PROFILE("executeJob");
120 
121  // call the functor body to do the work
122  m_body->forLoop(m_begin, m_end);
123  }
124 };
125 
126 class ParallelSumJob : public IJob
127 {
128  const btIParallelSumBody* m_body;
129  ThreadLocalStorage* m_threadLocalStoreArray;
130  int m_begin;
131  int m_end;
132 
133 public:
134  ParallelSumJob(int iBegin, int iEnd, const btIParallelSumBody& body, ThreadLocalStorage* tls)
135  {
136  m_body = &body;
137  m_threadLocalStoreArray = tls;
138  m_begin = iBegin;
139  m_end = iEnd;
140  }
141  virtual void executeJob(int threadId) BT_OVERRIDE
142  {
143  BT_PROFILE("executeJob");
144 
145  // call the functor body to do the work
146  btScalar val = m_body->sumLoop(m_begin, m_end);
147 #if BT_PARALLEL_SUM_DETERMINISTISM
148  // by truncating bits of the result, we can make the parallelSum deterministic (at the expense of precision)
149  const float TRUNC_SCALE = float(1 << 19);
150  val = floor(val * TRUNC_SCALE + 0.5f) / TRUNC_SCALE; // truncate some bits
151 #endif
152  m_threadLocalStoreArray[threadId].m_sumResult += val;
153  }
154 };
155 
156 ATTRIBUTE_ALIGNED64(class)
157 JobQueue
158 {
159  btThreadSupportInterface* m_threadSupport;
160  btCriticalSection* m_queueLock;
161  btSpinMutex m_mutex;
162 
163  btAlignedObjectArray<IJob*> m_jobQueue;
164  char* m_jobMem;
165  int m_jobMemSize;
166  bool m_queueIsEmpty;
167  int m_tailIndex;
168  int m_headIndex;
169  int m_allocSize;
170  bool m_useSpinMutex;
171  btAlignedObjectArray<JobQueue*> m_neighborContexts;
172  char m_cachePadding[kCacheLineSize]; // prevent false sharing
173 
174  void freeJobMem()
175  {
176  if (m_jobMem)
177  {
178  // free old
179  btAlignedFree(m_jobMem);
180  m_jobMem = NULL;
181  }
182  }
183  void resizeJobMem(int newSize)
184  {
185  if (newSize > m_jobMemSize)
186  {
187  freeJobMem();
188  m_jobMem = static_cast<char*>(btAlignedAlloc(newSize, kCacheLineSize));
189  m_jobMemSize = newSize;
190  }
191  }
192 
193 public:
194  JobQueue()
195  {
196  m_jobMem = NULL;
197  m_jobMemSize = 0;
198  m_threadSupport = NULL;
199  m_queueLock = NULL;
200  m_headIndex = 0;
201  m_tailIndex = 0;
202  m_useSpinMutex = false;
203  }
204  ~JobQueue()
205  {
206  exit();
207  }
208  void exit()
209  {
210  freeJobMem();
211  if (m_queueLock && m_threadSupport)
212  {
213  m_threadSupport->deleteCriticalSection(m_queueLock);
214  m_queueLock = NULL;
215  m_threadSupport = 0;
216  }
217  }
218 
219  void init(btThreadSupportInterface * threadSup, btAlignedObjectArray<JobQueue> * contextArray)
220  {
221  m_threadSupport = threadSup;
222  if (threadSup)
223  {
224  m_queueLock = m_threadSupport->createCriticalSection();
225  }
226  setupJobStealing(contextArray, contextArray->size());
227  }
228  void setupJobStealing(btAlignedObjectArray<JobQueue> * contextArray, int numActiveContexts)
229  {
230  btAlignedObjectArray<JobQueue>& contexts = *contextArray;
231  int selfIndex = 0;
232  for (int i = 0; i < contexts.size(); ++i)
233  {
234  if (this == &contexts[i])
235  {
236  selfIndex = i;
237  break;
238  }
239  }
240  int numNeighbors = btMin(2, contexts.size() - 1);
241  int neighborOffsets[] = {-1, 1, -2, 2, -3, 3};
242  int numOffsets = sizeof(neighborOffsets) / sizeof(neighborOffsets[0]);
243  m_neighborContexts.reserve(numNeighbors);
244  m_neighborContexts.resizeNoInitialize(0);
245  for (int i = 0; i < numOffsets && m_neighborContexts.size() < numNeighbors; i++)
246  {
247  int neighborIndex = selfIndex + neighborOffsets[i];
248  if (neighborIndex >= 0 && neighborIndex < numActiveContexts)
249  {
250  m_neighborContexts.push_back(&contexts[neighborIndex]);
251  }
252  }
253  }
254 
255  bool isQueueEmpty() const { return m_queueIsEmpty; }
256  void lockQueue()
257  {
258  if (m_useSpinMutex)
259  {
260  m_mutex.lock();
261  }
262  else
263  {
264  m_queueLock->lock();
265  }
266  }
267  void unlockQueue()
268  {
269  if (m_useSpinMutex)
270  {
271  m_mutex.unlock();
272  }
273  else
274  {
275  m_queueLock->unlock();
276  }
277  }
278  void clearQueue(int jobCount, int jobSize)
279  {
280  lockQueue();
281  m_headIndex = 0;
282  m_tailIndex = 0;
283  m_allocSize = 0;
284  m_queueIsEmpty = true;
285  int jobBufSize = jobSize * jobCount;
286  // make sure we have enough memory allocated to store jobs
287  if (jobBufSize > m_jobMemSize)
288  {
289  resizeJobMem(jobBufSize);
290  }
291  // make sure job queue is big enough
292  if (jobCount > m_jobQueue.capacity())
293  {
294  m_jobQueue.reserve(jobCount);
295  }
296  unlockQueue();
297  m_jobQueue.resizeNoInitialize(0);
298  }
299  void* allocJobMem(int jobSize)
300  {
301  btAssert(m_jobMemSize >= (m_allocSize + jobSize));
302  void* jobMem = &m_jobMem[m_allocSize];
303  m_allocSize += jobSize;
304  return jobMem;
305  }
306  void submitJob(IJob * job)
307  {
308  btAssert(reinterpret_cast<char*>(job) >= &m_jobMem[0] && reinterpret_cast<char*>(job) < &m_jobMem[0] + m_allocSize);
309  m_jobQueue.push_back(job);
310  lockQueue();
311  m_tailIndex++;
312  m_queueIsEmpty = false;
313  unlockQueue();
314  }
315  IJob* consumeJobFromOwnQueue()
316  {
317  if (m_queueIsEmpty)
318  {
319  // lock free path. even if this is taken erroneously it isn't harmful
320  return NULL;
321  }
322  IJob* job = NULL;
323  lockQueue();
324  if (!m_queueIsEmpty)
325  {
326  job = m_jobQueue[m_headIndex++];
327  btAssert(reinterpret_cast<char*>(job) >= &m_jobMem[0] && reinterpret_cast<char*>(job) < &m_jobMem[0] + m_allocSize);
328  if (m_headIndex == m_tailIndex)
329  {
330  m_queueIsEmpty = true;
331  }
332  }
333  unlockQueue();
334  return job;
335  }
336  IJob* consumeJob()
337  {
338  if (IJob* job = consumeJobFromOwnQueue())
339  {
340  return job;
341  }
342  // own queue is empty, try to steal from neighbor
343  for (int i = 0; i < m_neighborContexts.size(); ++i)
344  {
345  JobQueue* otherContext = m_neighborContexts[i];
346  if (IJob* job = otherContext->consumeJobFromOwnQueue())
347  {
348  return job;
349  }
350  }
351  return NULL;
352  }
353 };
354 
355 static void WorkerThreadFunc(void* userPtr)
356 {
357  BT_PROFILE("WorkerThreadFunc");
358  ThreadLocalStorage* localStorage = (ThreadLocalStorage*)userPtr;
359  JobQueue* jobQueue = localStorage->m_queue;
360 
361  bool shouldSleep = false;
362  int threadId = localStorage->m_threadId;
363  while (!shouldSleep)
364  {
365  // do work
366  localStorage->m_mutex.lock();
367  while (IJob* job = jobQueue->consumeJob())
368  {
369  localStorage->m_status = WorkerThreadStatus::kWorking;
370  job->executeJob(threadId);
371  localStorage->m_numJobsFinished++;
372  }
373  localStorage->m_status = WorkerThreadStatus::kWaitingForWork;
374  localStorage->m_mutex.unlock();
375  btU64 clockStart = localStorage->m_clock->getTimeMicroseconds();
376  // while queue is empty,
377  while (jobQueue->isQueueEmpty())
378  {
379  // todo: spin wait a bit to avoid hammering the empty queue
380  btSpinPause();
381  if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kGoToSleep)
382  {
383  shouldSleep = true;
384  break;
385  }
386  // if jobs are incoming,
387  if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kScanForJobs)
388  {
389  clockStart = localStorage->m_clock->getTimeMicroseconds(); // reset clock
390  }
391  else
392  {
393  for (int i = 0; i < 50; ++i)
394  {
395  btSpinPause();
396  btSpinPause();
397  btSpinPause();
398  btSpinPause();
399  if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kScanForJobs || !jobQueue->isQueueEmpty())
400  {
401  break;
402  }
403  }
404  // if no jobs incoming and queue has been empty for the cooldown time, sleep
405  btU64 timeElapsed = localStorage->m_clock->getTimeMicroseconds() - clockStart;
406  if (timeElapsed > localStorage->m_cooldownTime)
407  {
408  shouldSleep = true;
409  break;
410  }
411  }
412  }
413  }
414  {
415  BT_PROFILE("sleep");
416  // go sleep
417  localStorage->m_mutex.lock();
418  localStorage->m_status = WorkerThreadStatus::kSleeping;
419  localStorage->m_mutex.unlock();
420  }
421 }
422 
423 class btTaskSchedulerDefault : public btITaskScheduler
424 {
425  btThreadSupportInterface* m_threadSupport;
426  WorkerThreadDirectives* m_workerDirective;
427  btAlignedObjectArray<JobQueue> m_jobQueues;
428  btAlignedObjectArray<JobQueue*> m_perThreadJobQueues;
429  btAlignedObjectArray<ThreadLocalStorage> m_threadLocalStorage;
430  btSpinMutex m_antiNestingLock; // prevent nested parallel-for
431  btClock m_clock;
432  int m_numThreads;
433  int m_numWorkerThreads;
434  int m_numActiveJobQueues;
435  int m_maxNumThreads;
436  int m_numJobs;
437  static const int kFirstWorkerThreadId = 1;
438 
439 public:
440  btTaskSchedulerDefault() : btITaskScheduler("ThreadSupport")
441  {
442  m_threadSupport = NULL;
443  m_workerDirective = NULL;
444  }
445 
446  virtual ~btTaskSchedulerDefault()
447  {
448  waitForWorkersToSleep();
449 
450  for (int i = 0; i < m_jobQueues.size(); ++i)
451  {
452  m_jobQueues[i].exit();
453  }
454 
455  if (m_threadSupport)
456  {
457  delete m_threadSupport;
458  m_threadSupport = NULL;
459  }
460  if (m_workerDirective)
461  {
462  btAlignedFree(m_workerDirective);
463  m_workerDirective = NULL;
464  }
465  }
466 
467  void init()
468  {
469  btThreadSupportInterface::ConstructionInfo constructionInfo("TaskScheduler", WorkerThreadFunc);
470  m_threadSupport = btThreadSupportInterface::create(constructionInfo);
471  m_workerDirective = static_cast<WorkerThreadDirectives*>(btAlignedAlloc(sizeof(*m_workerDirective), 64));
472 
473  m_numWorkerThreads = m_threadSupport->getNumWorkerThreads();
474  m_maxNumThreads = m_threadSupport->getNumWorkerThreads() + 1;
475  m_numThreads = m_maxNumThreads;
476  // ideal to have one job queue for each physical processor (except for the main thread which needs no queue)
477  int numThreadsPerQueue = m_threadSupport->getLogicalToPhysicalCoreRatio();
478  int numJobQueues = (numThreadsPerQueue == 1) ? (m_maxNumThreads - 1) : (m_maxNumThreads / numThreadsPerQueue);
479  m_jobQueues.resize(numJobQueues);
480  m_numActiveJobQueues = numJobQueues;
481  for (int i = 0; i < m_jobQueues.size(); ++i)
482  {
483  m_jobQueues[i].init(m_threadSupport, &m_jobQueues);
484  }
485  m_perThreadJobQueues.resize(m_numThreads);
486  for (int i = 0; i < m_numThreads; i++)
487  {
488  JobQueue* jq = NULL;
489  // only worker threads get a job queue
490  if (i > 0)
491  {
492  if (numThreadsPerQueue == 1)
493  {
494  // one queue per worker thread
495  jq = &m_jobQueues[i - kFirstWorkerThreadId];
496  }
497  else
498  {
499  // 2 threads share each queue
500  jq = &m_jobQueues[i / numThreadsPerQueue];
501  }
502  }
503  m_perThreadJobQueues[i] = jq;
504  }
505  m_threadLocalStorage.resize(m_numThreads);
506  for (int i = 0; i < m_numThreads; i++)
507  {
508  ThreadLocalStorage& storage = m_threadLocalStorage[i];
509  storage.m_threadId = i;
510  storage.m_directive = m_workerDirective;
511  storage.m_status = WorkerThreadStatus::kSleeping;
512  storage.m_cooldownTime = 100; // 100 microseconds, threads go to sleep after this long if they have nothing to do
513  storage.m_clock = &m_clock;
514  storage.m_queue = m_perThreadJobQueues[i];
515  }
516  setWorkerDirectives(WorkerThreadDirectives::kGoToSleep); // no work for them yet
517  setNumThreads(m_threadSupport->getCacheFriendlyNumThreads());
518  }
519 
520  void setWorkerDirectives(WorkerThreadDirectives::Type dir)
521  {
522  m_workerDirective->setDirectiveByRange(kFirstWorkerThreadId, m_numThreads, dir);
523  }
524 
525  virtual int getMaxNumThreads() const BT_OVERRIDE
526  {
527  return m_maxNumThreads;
528  }
529 
530  virtual int getNumThreads() const BT_OVERRIDE
531  {
532  return m_numThreads;
533  }
534 
535  virtual void setNumThreads(int numThreads) BT_OVERRIDE
536  {
537  m_numThreads = btMax(btMin(numThreads, int(m_maxNumThreads)), 1);
538  m_numWorkerThreads = m_numThreads - 1;
539  m_numActiveJobQueues = 0;
540  // if there is at least 1 worker,
541  if (m_numWorkerThreads > 0)
542  {
543  // re-setup job stealing between queues to avoid attempting to steal from an inactive job queue
544  JobQueue* lastActiveContext = m_perThreadJobQueues[m_numThreads - 1];
545  int iLastActiveContext = lastActiveContext - &m_jobQueues[0];
546  m_numActiveJobQueues = iLastActiveContext + 1;
547  for (int i = 0; i < m_jobQueues.size(); ++i)
548  {
549  m_jobQueues[i].setupJobStealing(&m_jobQueues, m_numActiveJobQueues);
550  }
551  }
552  m_workerDirective->setDirectiveByRange(m_numThreads, BT_MAX_THREAD_COUNT, WorkerThreadDirectives::kGoToSleep);
553  }
554 
555  void waitJobs()
556  {
557  BT_PROFILE("waitJobs");
558  // have the main thread work until the job queues are empty
559  int numMainThreadJobsFinished = 0;
560  for (int i = 0; i < m_numActiveJobQueues; ++i)
561  {
562  while (IJob* job = m_jobQueues[i].consumeJob())
563  {
564  job->executeJob(0);
565  numMainThreadJobsFinished++;
566  }
567  }
568 
569  // done with jobs for now, tell workers to rest (but not sleep)
570  setWorkerDirectives(WorkerThreadDirectives::kStayAwakeButIdle);
571 
572  btU64 clockStart = m_clock.getTimeMicroseconds();
573  // wait for workers to finish any jobs in progress
574  while (true)
575  {
576  int numWorkerJobsFinished = 0;
577  for (int iThread = kFirstWorkerThreadId; iThread < m_numThreads; ++iThread)
578  {
579  ThreadLocalStorage* storage = &m_threadLocalStorage[iThread];
580  storage->m_mutex.lock();
581  numWorkerJobsFinished += storage->m_numJobsFinished;
582  storage->m_mutex.unlock();
583  }
584  if (numWorkerJobsFinished + numMainThreadJobsFinished == m_numJobs)
585  {
586  break;
587  }
588  btU64 timeElapsed = m_clock.getTimeMicroseconds() - clockStart;
589  btAssert(timeElapsed < 1000);
590  if (timeElapsed > 100000)
591  {
592  break;
593  }
594  btSpinPause();
595  }
596  }
597 
598  void wakeWorkers(int numWorkersToWake)
599  {
600  BT_PROFILE("wakeWorkers");
601  btAssert(m_workerDirective->getDirective(1) == WorkerThreadDirectives::kScanForJobs);
602  int numDesiredWorkers = btMin(numWorkersToWake, m_numWorkerThreads);
603  int numActiveWorkers = 0;
604  for (int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker)
605  {
606  // note this count of active workers is not necessarily totally reliable, because a worker thread could be
607  // just about to put itself to sleep. So we may on occasion fail to wake up all the workers. It should be rare.
608  ThreadLocalStorage& storage = m_threadLocalStorage[kFirstWorkerThreadId + iWorker];
609  if (storage.m_status != WorkerThreadStatus::kSleeping)
610  {
611  numActiveWorkers++;
612  }
613  }
614  for (int iWorker = 0; iWorker < m_numWorkerThreads && numActiveWorkers < numDesiredWorkers; ++iWorker)
615  {
616  ThreadLocalStorage& storage = m_threadLocalStorage[kFirstWorkerThreadId + iWorker];
617  if (storage.m_status == WorkerThreadStatus::kSleeping)
618  {
619  m_threadSupport->runTask(iWorker, &storage);
620  numActiveWorkers++;
621  }
622  }
623  }
624 
625  void waitForWorkersToSleep()
626  {
627  BT_PROFILE("waitForWorkersToSleep");
628  setWorkerDirectives(WorkerThreadDirectives::kGoToSleep);
629  m_threadSupport->waitForAllTasks();
630  for (int i = kFirstWorkerThreadId; i < m_numThreads; i++)
631  {
632  ThreadLocalStorage& storage = m_threadLocalStorage[i];
633  btAssert(storage.m_status == WorkerThreadStatus::kSleeping);
634  }
635  }
636 
637  virtual void sleepWorkerThreadsHint() BT_OVERRIDE
638  {
639  BT_PROFILE("sleepWorkerThreadsHint");
640  // hint the task scheduler that we may not be using these threads for a little while
641  setWorkerDirectives(WorkerThreadDirectives::kGoToSleep);
642  }
643 
644  void prepareWorkerThreads()
645  {
646  for (int i = kFirstWorkerThreadId; i < m_numThreads; ++i)
647  {
648  ThreadLocalStorage& storage = m_threadLocalStorage[i];
649  storage.m_mutex.lock();
650  storage.m_numJobsFinished = 0;
651  storage.m_mutex.unlock();
652  }
653  setWorkerDirectives(WorkerThreadDirectives::kScanForJobs);
654  }
655 
656  virtual void parallelFor(int iBegin, int iEnd, int grainSize, const btIParallelForBody& body) BT_OVERRIDE
657  {
658  BT_PROFILE("parallelFor_ThreadSupport");
659  btAssert(iEnd >= iBegin);
660  btAssert(grainSize >= 1);
661  int iterationCount = iEnd - iBegin;
662  if (iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock())
663  {
664  typedef ParallelForJob JobType;
665  int jobCount = (iterationCount + grainSize - 1) / grainSize;
666  m_numJobs = jobCount;
667  btAssert(jobCount >= 2); // need more than one job for multithreading
668  int jobSize = sizeof(JobType);
669 
670  for (int i = 0; i < m_numActiveJobQueues; ++i)
671  {
672  m_jobQueues[i].clearQueue(jobCount, jobSize);
673  }
674  // prepare worker threads for incoming work
675  prepareWorkerThreads();
676  // submit all of the jobs
677  int iJob = 0;
678  int iThread = kFirstWorkerThreadId; // first worker thread
679  for (int i = iBegin; i < iEnd; i += grainSize)
680  {
681  btAssert(iJob < jobCount);
682  int iE = btMin(i + grainSize, iEnd);
683  JobQueue* jq = m_perThreadJobQueues[iThread];
684  btAssert(jq);
685  btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
686  void* jobMem = jq->allocJobMem(jobSize);
687  JobType* job = new (jobMem) ParallelForJob(i, iE, body); // placement new
688  jq->submitJob(job);
689  iJob++;
690  iThread++;
691  if (iThread >= m_numThreads)
692  {
693  iThread = kFirstWorkerThreadId; // first worker thread
694  }
695  }
696  wakeWorkers(jobCount - 1);
697 
698  // put the main thread to work on emptying the job queue and then wait for all workers to finish
699  waitJobs();
700  m_antiNestingLock.unlock();
701  }
702  else
703  {
704  BT_PROFILE("parallelFor_mainThread");
705  // just run on main thread
706  body.forLoop(iBegin, iEnd);
707  }
708  }
709  virtual btScalar parallelSum(int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body) BT_OVERRIDE
710  {
711  BT_PROFILE("parallelSum_ThreadSupport");
712  btAssert(iEnd >= iBegin);
713  btAssert(grainSize >= 1);
714  int iterationCount = iEnd - iBegin;
715  if (iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock())
716  {
717  typedef ParallelSumJob JobType;
718  int jobCount = (iterationCount + grainSize - 1) / grainSize;
719  m_numJobs = jobCount;
720  btAssert(jobCount >= 2); // need more than one job for multithreading
721  int jobSize = sizeof(JobType);
722  for (int i = 0; i < m_numActiveJobQueues; ++i)
723  {
724  m_jobQueues[i].clearQueue(jobCount, jobSize);
725  }
726 
727  // initialize summation
728  for (int iThread = 0; iThread < m_numThreads; ++iThread)
729  {
730  m_threadLocalStorage[iThread].m_sumResult = btScalar(0);
731  }
732 
733  // prepare worker threads for incoming work
734  prepareWorkerThreads();
735  // submit all of the jobs
736  int iJob = 0;
737  int iThread = kFirstWorkerThreadId; // first worker thread
738  for (int i = iBegin; i < iEnd; i += grainSize)
739  {
740  btAssert(iJob < jobCount);
741  int iE = btMin(i + grainSize, iEnd);
742  JobQueue* jq = m_perThreadJobQueues[iThread];
743  btAssert(jq);
744  btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
745  void* jobMem = jq->allocJobMem(jobSize);
746  JobType* job = new (jobMem) ParallelSumJob(i, iE, body, &m_threadLocalStorage[0]); // placement new
747  jq->submitJob(job);
748  iJob++;
749  iThread++;
750  if (iThread >= m_numThreads)
751  {
752  iThread = kFirstWorkerThreadId; // first worker thread
753  }
754  }
755  wakeWorkers(jobCount - 1);
756 
757  // put the main thread to work on emptying the job queue and then wait for all workers to finish
758  waitJobs();
759 
760  // add up all the thread sums
761  btScalar sum = btScalar(0);
762  for (int iThread = 0; iThread < m_numThreads; ++iThread)
763  {
764  sum += m_threadLocalStorage[iThread].m_sumResult;
765  }
766  m_antiNestingLock.unlock();
767  return sum;
768  }
769  else
770  {
771  BT_PROFILE("parallelSum_mainThread");
772  // just run on main thread
773  return body.sumLoop(iBegin, iEnd);
774  }
775  }
776 };
777 
779 {
780  btTaskSchedulerDefault* ts = new btTaskSchedulerDefault();
781  ts->init();
782  return ts;
783 }
784 
785 #else // #if BT_THREADSAFE
786 
788 {
789  return NULL;
790 }
791 
792 #endif // #else // #if BT_THREADSAFE
typedef float(TangentPoint)[2]
#define btAlignedFree(ptr)
#define btAlignedAlloc(size, alignment)
SIMD_FORCE_INLINE const T & btMin(const T &a, const T &b)
Definition: btMinMax.h:21
SIMD_FORCE_INLINE const T & btMax(const T &a, const T &b)
Definition: btMinMax.h:27
#define BT_PROFILE(name)
Definition: btQuickprof.h:198
float btScalar
The btScalar type abstracts floating point numbers, to easily switch between double and single floati...
Definition: btScalar.h:314
#define ATTRIBUTE_ALIGNED64(a)
Definition: btScalar.h:286
#define btAssert(x)
Definition: btScalar.h:295
btSequentialImpulseConstraintSolverMt int btPersistentManifold int btTypedConstraint int const btContactSolverInfo btIDebugDraw *debugDrawer BT_OVERRIDE
static T sum(const btAlignedObjectArray< T > &items)
btITaskScheduler * btCreateDefaultTaskScheduler()
const unsigned int BT_MAX_THREAD_COUNT
Definition: btThreads.h:31
SIMD_FORCE_INLINE void reserve(int _Count)
SIMD_FORCE_INLINE int capacity() const
return the pre-allocated (reserved) elements, this is at least as large as the total number of elemen...
SIMD_FORCE_INLINE void resizeNoInitialize(int newsize)
SIMD_FORCE_INLINE int size() const
return the number of elements in the array
SIMD_FORCE_INLINE void init()
SIMD_FORCE_INLINE void resize(int newsize, const T &fillData=T())
SIMD_FORCE_INLINE void push_back(const T &_Val)
The btClock is a portable basic clock that measures accurate time in seconds, use for profiling.
Definition: btQuickprof.h:23
unsigned long long int getTimeMicroseconds()
virtual void lock()=0
virtual void unlock()=0
virtual void forLoop(int iBegin, int iEnd) const =0
virtual btScalar sumLoop(int iBegin, int iEnd) const =0
virtual int getNumThreads() const =0
virtual int getMaxNumThreads() const =0
virtual void parallelFor(int iBegin, int iEnd, int grainSize, const btIParallelForBody &body)=0
virtual void sleepWorkerThreadsHint()
Definition: btThreads.h:135
virtual void setNumThreads(int numThreads)=0
virtual btScalar parallelSum(int iBegin, int iEnd, int grainSize, const btIParallelSumBody &body)=0
void lock()
Definition: btThreads.cpp:196
bool tryLock()
Definition: btThreads.cpp:206
void unlock()
Definition: btThreads.cpp:201
virtual int getCacheFriendlyNumThreads() const =0
virtual int getLogicalToPhysicalCoreRatio() const =0
virtual void waitForAllTasks()=0
virtual void runTask(int threadIndex, void *userData)=0
virtual btCriticalSection * createCriticalSection()=0
virtual int getNumWorkerThreads() const =0
static btThreadSupportInterface * create(const ConstructionInfo &info)
virtual void deleteCriticalSection(btCriticalSection *criticalSection)=0
T floor(const T &a)