flexiblesusy is hosted by Hepforge, IPPP Durham
FlexibleSUSY
thread_pool.hpp
Go to the documentation of this file.
1// ====================================================================
2// This file is part of FlexibleSUSY.
3//
4// FlexibleSUSY is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published
6// by the Free Software Foundation, either version 3 of the License,
7// or (at your option) any later version.
8//
9// FlexibleSUSY is distributed in the hope that it will be useful, but
10// WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12// General Public License for more details.
13//
14// You should have received a copy of the GNU General Public License
15// along with FlexibleSUSY. If not, see
16// <http://www.gnu.org/licenses/>.
17// ====================================================================
18
19#ifndef THREAD_POOL_H
20#define THREAD_POOL_H
21
22#include "logger.hpp"
23
24#include <condition_variable>
25#include <functional>
26#include <future>
27#include <memory>
28#include <mutex>
29#include <queue>
30#include <stdexcept>
31#include <thread>
32#include <vector>
33
34namespace flexiblesusy {
35
48public:
49 explicit Thread_pool(std::size_t pool_size = std::thread::hardware_concurrency())
50 {
51 VERBOSE_MSG("launching " << pool_size << " threads ...");
52 for (std::size_t i = 0; i < pool_size; ++i)
53 threads.emplace_back(
54 [this] () {
55 for (;;) {
56 std::function<void()> task;
57
58 {
59 std::unique_lock<std::mutex> lock(mutex);
60 condition.wait(lock, [this]{ return stop || !tasks.empty(); });
61 if (stop && tasks.empty())
62 return;
63 task = std::move(tasks.front());
64 tasks.pop();
65 }
66
67 task();
68 }
69 });
70 }
71
72 Thread_pool(const Thread_pool&) = delete;
76
79 {
80 {
81 std::unique_lock<std::mutex> lock(mutex);
82 stop = true;
83 }
84
85 condition.notify_all();
86
87 try {
88 for (auto& t: threads)
89 t.join();
90 } catch (const std::exception& e) {
91 ERROR(e.what());
92 }
93 }
94
96 template <typename Task>
97 auto run_packaged_task(Task&& task) -> std::future<decltype(task())>
98 {
99 using return_t = decltype(task());
100
101 auto ptask = std::make_shared<std::packaged_task<return_t()>>([task](){ return task(); });
102
103 std::future<return_t> fut = ptask->get_future();
104
105 if (threads.empty()) {
106 (*ptask)();
107 } else {
108 {
109 std::unique_lock<std::mutex> lock(mutex);
110 tasks.emplace([ptask](){ (*ptask)(); });
111 }
112 condition.notify_one();
113 }
114
115 return fut;
116 }
117
119 template <typename Task>
120 void run_task(Task&& task)
121 {
122 if (threads.empty()) {
123 task();
124 } else {
125 {
126 std::unique_lock<std::mutex> lock(mutex);
127 tasks.emplace(std::forward<Task>(task));
128 }
129 condition.notify_one();
130 }
131 }
132
133 std::size_t size() const { return threads.size(); }
134
135private:
136 std::vector<std::thread> threads{};
137 std::queue<std::function<void()>> tasks{};
138 std::mutex mutex{};
139 std::condition_variable condition{};
140 bool stop{false};
141};
142
143} // namespace flexiblesusy
144
145#endif
LinearRange[start_, stop_, steps_] stop
A pool of threads.
Definition: thread_pool.hpp:47
auto run_packaged_task(Task &&task) -> std::future< decltype(task())>
runs task and returns future
Definition: thread_pool.hpp:97
Thread_pool(Thread_pool &&)=delete
Thread_pool & operator=(const Thread_pool &)=delete
void run_task(Task &&task)
runs task
std::vector< std::thread > threads
~Thread_pool()
waits for all tasks to finish and closes threads
Definition: thread_pool.hpp:78
Thread_pool(const Thread_pool &)=delete
Thread_pool & operator=(Thread_pool &&)=delete
std::size_t size() const
Thread_pool(std::size_t pool_size=std::thread::hardware_concurrency())
Definition: thread_pool.hpp:49
#define ERROR(msg)
Definition: logger.hpp:65
#define VERBOSE_MSG(msg)
Definition: logger.hpp:57