My Project 3.2.0
C++ Distributed Hash Table
Loading...
Searching...
No Matches
thread_pool.h
1/*
2 * Copyright (C) 2014-2023 Savoir-faire Linux Inc.
3 *
4 * Author: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20#pragma once
21
22#include "def.h"
23
24#include <condition_variable>
25#include <vector>
26#include <queue>
27#include <future>
28#include <functional>
29
30#include <ciso646> // fix windows compiler bug
31
32namespace dht {
33
34class OPENDHT_PUBLIC ThreadPool {
35public:
36 static ThreadPool& computation();
37 static ThreadPool& io();
38
39 ThreadPool();
40 ThreadPool(unsigned minThreads, unsigned maxThreads = 0);
42
43 void run(std::function<void()>&& cb);
44
45 template<class T>
46 std::future<T> get(std::function<T()>&& cb) {
47 auto ret = std::make_shared<std::promise<T>>();
48 run([cb = std::move(cb), ret]() mutable {
49 try {
50 ret->set_value(cb());
51 } catch (...) {
52 try {
53 ret->set_exception(std::current_exception());
54 } catch(...) {}
55 }
56 });
57 return ret->get_future();
58 }
59 template<class T>
60 std::shared_future<T> getShared(std::function<T()>&& cb) {
61 return get(std::move(cb));
62 }
63
64 void stop(bool wait = true);
65 void join();
66 void detach();
67
68private:
69 std::mutex lock_ {};
70 std::condition_variable cv_ {};
71 std::queue<std::function<void()>> tasks_ {};
72 std::vector<std::unique_ptr<std::thread>> threads_;
73 unsigned readyThreads_ {0};
74 bool running_ {true};
75
76 unsigned minThreads_;
77 const unsigned maxThreads_;
78 std::chrono::steady_clock::duration threadExpirationDelay {std::chrono::minutes(5)};
79 double threadDelayRatio_ {2};
80
81 void threadEnded(std::thread&);
82};
83
84class OPENDHT_PUBLIC Executor : public std::enable_shared_from_this<Executor> {
85public:
86 Executor(ThreadPool& pool, unsigned maxConcurrent = 1)
87 : threadPool_(pool), maxConcurrent_(maxConcurrent)
88 {}
89
90 void run(std::function<void()>&& task);
91
92private:
93 std::reference_wrapper<ThreadPool> threadPool_;
94 const unsigned maxConcurrent_ {1};
95 std::mutex lock_ {};
96 unsigned current_ {0};
97 std::queue<std::function<void()>> tasks_ {};
98
99 void run_(std::function<void()>&& task);
100 void schedule();
101};
102
103class OPENDHT_PUBLIC ExecutionContext {
104public:
106 : threadPool_(pool), state_(std::make_shared<SharedState>())
107 {}
108
110 state_->destroy();
111 }
112
114 void stop() {
115 state_->destroy(false);
116 }
117
118 void run(std::function<void()>&& task) {
119 std::lock_guard<std::mutex> lock(state_->mtx);
120 if (state_->shutdown_) return;
121 state_->pendingTasks++;
122 threadPool_.get().run([task = std::move(task), state = state_] {
123 state->run(task);
124 });
125 }
126
127private:
128 struct SharedState {
129 std::mutex mtx {};
130 std::condition_variable cv {};
131 unsigned pendingTasks {0};
132 unsigned ongoingTasks {0};
134 bool shutdown_ {false};
136 std::atomic_bool destroyed {false};
137
138 void destroy(bool wait = true) {
139 std::unique_lock<std::mutex> lock(mtx);
140 if (destroyed) return;
141 if (wait) {
142 cv.wait(lock, [this] { return pendingTasks == 0 && ongoingTasks == 0; });
143 }
144 shutdown_ = true;
145 if (not wait) {
146 cv.wait(lock, [this] { return ongoingTasks == 0; });
147 }
148 destroyed = true;
149 }
150
151 void run(const std::function<void()>& task) {
152 {
153 std::lock_guard<std::mutex> lock(mtx);
154 pendingTasks--;
155 ongoingTasks++;
156 }
157 if (destroyed) return;
158 task();
159 {
160 std::lock_guard<std::mutex> lock(mtx);
161 ongoingTasks--;
162 cv.notify_all();
163 }
164 }
165 };
166 std::reference_wrapper<ThreadPool> threadPool_;
167 std::shared_ptr<SharedState> state_;
168};
169
170}