Blender  V3.3
task.cpp
Go to the documentation of this file.
1 /* SPDX-License-Identifier: Apache-2.0
2  * Copyright 2011-2022 Blender Foundation */
3 
4 #include "util/task.h"
5 #include "util/foreach.h"
6 #include "util/log.h"
7 #include "util/system.h"
8 #include "util/time.h"
9 
11 
12 /* Task Pool */
13 
14 TaskPool::TaskPool() : start_time(time_dt()), num_tasks_pushed(0)
15 {
16 }
17 
19 {
20  cancel();
21 }
22 
24 {
25  tbb_group.run(std::move(task));
27 }
28 
30 {
31  tbb_group.wait();
32 
33  if (stats != NULL) {
34  stats->time_total = time_dt() - start_time;
36  }
37 
38  num_tasks_pushed = 0;
39 }
40 
42 {
43  if (num_tasks_pushed > 0) {
44  tbb_group.cancel();
45  tbb_group.wait();
46  num_tasks_pushed = 0;
47  }
48 }
49 
51 {
52  return tbb::is_current_task_group_canceling();
53 }
54 
55 /* Task Scheduler */
56 
58 int TaskScheduler::users = 0;
60 tbb::global_control *TaskScheduler::global_control = nullptr;
61 
62 void TaskScheduler::init(int num_threads)
63 {
65  /* Multiple cycles instances can use this task scheduler, sharing the same
66  * threads, so we keep track of the number of users. */
67  ++users;
68  if (users != 1) {
69  return;
70  }
71  if (num_threads > 0) {
72  /* Automatic number of threads. */
73  VLOG_INFO << "Overriding number of TBB threads to " << num_threads << ".";
74  global_control = new tbb::global_control(tbb::global_control::max_allowed_parallelism,
75  num_threads);
76  active_num_threads = num_threads;
77  }
78  else {
79  active_num_threads = tbb::this_task_arena::max_concurrency();
80  }
81 }
82 
84 {
86  users--;
87  if (users == 0) {
88  delete global_control;
89  global_control = nullptr;
91  }
92 }
93 
95 {
96  assert(users == 0);
97 }
98 
100 {
102  return (users > 0) ? active_num_threads : tbb::this_task_arena::max_concurrency();
103 }
104 
105 /* Dedicated Task Pool */
106 
108 {
109  do_cancel = false;
110  do_exit = false;
111  num = 0;
112 
114 }
115 
117 {
118  wait();
119 
120  do_exit = true;
121  queue_cond.notify_all();
122 
123  worker_thread->join();
124  delete worker_thread;
125 }
126 
128 {
129  num_increase();
130 
131  /* add task to queue */
132  queue_mutex.lock();
133  if (front)
134  queue.emplace_front(std::move(task));
135  else
136  queue.emplace_back(std::move(task));
137 
138  queue_cond.notify_one();
139  queue_mutex.unlock();
140 }
141 
143 {
144  thread_scoped_lock num_lock(num_mutex);
145 
146  while (num)
147  num_cond.wait(num_lock);
148 }
149 
151 {
152  do_cancel = true;
153 
154  clear();
155  wait();
156 
157  do_cancel = false;
158 }
159 
161 {
162  return do_cancel;
163 }
164 
166 {
167  thread_scoped_lock num_lock(num_mutex);
168  num -= done;
169 
170  assert(num >= 0);
171  if (num == 0)
172  num_cond.notify_all();
173 }
174 
176 {
177  thread_scoped_lock num_lock(num_mutex);
178  num++;
179  num_cond.notify_all();
180 }
181 
183 {
184  thread_scoped_lock queue_lock(queue_mutex);
185 
186  while (queue.empty() && !do_exit)
187  queue_cond.wait(queue_lock);
188 
189  if (queue.empty()) {
190  assert(do_exit);
191  return false;
192  }
193 
194  task = queue.front();
195  queue.pop_front();
196 
197  return true;
198 }
199 
201 {
203 
204  /* keep popping off tasks */
205  while (thread_wait_pop(task)) {
206  /* run task */
207  task();
208 
209  /* delete task */
210  task = nullptr;
211 
212  /* notify task was done */
213  num_decrease(1);
214  }
215 }
216 
218 {
219  thread_scoped_lock queue_lock(queue_mutex);
220 
221  /* erase all tasks from the queue */
222  int done = queue.size();
223  queue.clear();
224 
225  queue_lock.unlock();
226 
227  /* notify done */
228  num_decrease(done);
229 }
230 
232 {
233  string report = "";
234  report += string_printf("Total time: %f\n", time_total);
235  report += string_printf("Tasks handled: %d\n", num_tasks_handled);
236  return report;
237 }
238 
volatile int lock
thread_condition_variable num_cond
Definition: task.h:119
bool do_exit
Definition: task.h:127
bool do_cancel
Definition: task.h:126
void thread_run()
Definition: task.cpp:200
void clear()
Definition: task.cpp:217
thread_mutex queue_mutex
Definition: task.h:122
thread_mutex num_mutex
Definition: task.h:118
void num_decrease(int done)
Definition: task.cpp:165
void push(TaskRunFunction &&run, bool front=false)
Definition: task.cpp:127
void cancel()
Definition: task.cpp:150
thread * worker_thread
Definition: task.h:129
thread_condition_variable queue_cond
Definition: task.h:123
bool canceled()
Definition: task.cpp:160
void num_increase()
Definition: task.cpp:175
list< TaskRunFunction > queue
Definition: task.h:121
bool thread_wait_pop(TaskRunFunction &task)
Definition: task.cpp:182
static void free_memory()
Definition: task.cpp:94
static void exit()
Definition: task.cpp:83
static void init(int num_threads=0)
Definition: task.cpp:62
static thread_mutex mutex
Definition: task.h:81
static int users
Definition: task.h:82
static int active_num_threads
Definition: task.h:83
static int max_concurrency()
Definition: task.cpp:99
Definition: thread.h:34
bool join()
Definition: thread.cpp:42
#define CCL_NAMESPACE_END
Definition: cuda/compat.h:9
#define function_bind
#define VLOG_INFO
Definition: log.h:77
struct blender::compositor::@179::@181 task
CCL_NAMESPACE_BEGIN string string_printf(const char *format,...)
Definition: string.cpp:22
string full_report() const
Definition: task.cpp:231
int num_tasks_handled
Definition: task.h:35
double time_total
Definition: task.h:32
void push(TaskRunFunction &&task)
Definition: task.cpp:23
static bool canceled()
Definition: task.cpp:50
int num_tasks_pushed
Definition: task.h:62
tbb::task_group tbb_group
Definition: task.h:54
double start_time
Definition: task.h:59
~TaskPool()
Definition: task.cpp:18
void cancel()
Definition: task.cpp:41
TaskPool()
Definition: task.cpp:14
void wait_work(Summary *stats=NULL)
Definition: task.cpp:29
function< void(void)> TaskRunFunction
Definition: task.h:16
std::unique_lock< std::mutex > thread_scoped_lock
Definition: thread.h:28
CCL_NAMESPACE_BEGIN typedef std::mutex thread_mutex
Definition: thread.h:27
CCL_NAMESPACE_BEGIN double time_dt()
Definition: time.cpp:35