1 #ifndef teca_thread_pool_h
2 #define teca_thread_pool_h
4 #include "teca_common.h"
5 #include "teca_algorithm_fwd.h"
6 #include "teca_thread_util.h"
7 #include "teca_threadsafe_queue.h"
17 #if defined(_GNU_SOURCE)
23 template <
typename task_t,
typename data_t>
26 template <
typename task_t,
typename data_t>
27 using p_teca_thread_pool = std::shared_ptr<teca_thread_pool<task_t, data_t>>;
31 template <
typename task_t,
typename data_t>
58 void push_task(task_t &task);
63 template <
template <
typename ... >
class container_t, typename ... args>
64 void wait_all(container_t<data_t, args ...> &data);
74 template <
template <
typename ... >
class container_t, typename ... args>
75 int wait_some(long n_to_wait, long long poll_interval,
76 container_t<data_t, args ...> &data);
79 unsigned int size()
const noexcept
80 {
return m_threads.size(); }
84 void create_threads(MPI_Comm comm,
int n_threads,
bool bind,
bool verbose);
87 std::atomic<bool> m_live;
90 std::vector<std::future<data_t>>
93 std::vector<std::thread> m_threads;
97 template <
typename task_t,
typename data_t>
99 bool bind,
bool verbose) : m_live(true)
101 this->create_threads(comm, n, bind, verbose);
105 template <
typename task_t,
typename data_t>
107 int n_requested,
bool bind,
bool verbose)
110 if (comm == MPI_COMM_NULL)
113 int n_threads = n_requested;
115 std::deque<int> core_ids;
117 if (teca_thread_util::thread_parameters(comm, -1,
118 n_requested, bind, verbose, n_threads, core_ids))
120 TECA_WARNING(
"Failed to detetermine thread parameters."
121 " Falling back to 1 thread, affinity disabled.")
128 for (
int i = 0; i < n_threads; ++i)
130 m_threads.push_back(std::thread([
this]()
133 while (m_live.load())
136 if (m_queue.try_pop(task))
139 std::this_thread::yield();
142 #if defined(_GNU_SOURCE)
146 int core_id = core_ids.front();
147 core_ids.pop_front();
150 CPU_ZERO(&core_mask);
151 CPU_SET(core_id, &core_mask);
153 if (pthread_setaffinity_np(m_threads[i].native_handle(),
154 sizeof(cpu_set_t), &core_mask))
156 TECA_WARNING(
"Failed to set thread affinity.")
164 template <
typename task_t,
typename data_t>
168 std::for_each(m_threads.begin(), m_threads.end(),
169 [](std::thread &t) { t.join(); });
173 template <
typename task_t,
typename data_t>
176 m_futures.push_back(task.get_future());
177 m_queue.push(std::move(task));
181 template <
typename task_t,
typename data_t>
182 template <
template <
typename ... >
class container_t, typename ... args>
184 long long poll_interval, container_t<data_t, args ...> &data)
186 long n_tasks = m_futures.size();
191 this->wait_all(data);
195 else if (n_to_wait > n_tasks)
203 auto it = m_futures.begin();
204 while (it != m_futures.end())
206 std::future_status stat = it->wait_for(std::chrono::seconds::zero());
207 if (stat == std::future_status::ready)
209 data.push_back(it->get());
210 it = m_futures.erase(it);
220 if (data.size() <
static_cast<unsigned int>(n_to_wait))
221 std::this_thread::sleep_for(std::chrono::nanoseconds(poll_interval));
227 return m_futures.size();
231 template <
typename task_t,
typename data_t>
232 template <
template <
typename ... >
class container_t, typename ... args>
233 void
teca_thread_pool<task_t, data_t>::wait_all(container_t<data_t, args ...> &data)
237 std::for_each(m_futures.begin(), m_futures.end(),
238 [&data] (std::future<data_t> &f)
240 data.push_back(f.get());