TECA
teca_thread_pool.h
1 #ifndef teca_thread_pool_h
2 #define teca_thread_pool_h
3 
4 #include "teca_common.h"
5 #include "teca_algorithm_fwd.h"
6 #include "teca_thread_util.h"
7 #include "teca_threadsafe_queue.h"
8 #include "teca_mpi.h"
9 
10 #include <vector>
11 #include <thread>
12 #include <atomic>
13 #include <mutex>
14 #include <future>
15 #include <chrono>
16 #include <algorithm>
17 #if defined(_GNU_SOURCE)
18 #include <pthread.h>
19 #include <sched.h>
20 #include <deque>
21 #endif
22 
23 template <typename task_t, typename data_t>
24 class teca_thread_pool;
25 
26 template <typename task_t, typename data_t>
27 using p_teca_thread_pool = std::shared_ptr<teca_thread_pool<task_t, data_t>>;
28 
29 // a class to manage a fixed size pool of threads that dispatch
30 // I/O work
31 template <typename task_t, typename data_t>
33 {
34 public:
35  teca_thread_pool() = delete;
36 
37  // construct/destruct the thread pool.
38  // arguments:
39  // comm communicator over which to map threads. Use MPI_COMM_SELF
40  // for local mapping.
41  //
42  // n number of threads to create for the pool. -1 will
43  // create 1 thread per physical CPU core. all MPI ranks running
44  // on the same node are taken into account, resulting in 1
45  // thread per core node wide.
46  //
47  // bind bind each thread to a specific core.
48  //
49  // verbose print a report of the thread to core bindings
50  teca_thread_pool(MPI_Comm comm, int n, bool bind, bool verbose);
51  ~teca_thread_pool() noexcept;
52 
53  // get rid of copy and asignment
54  TECA_ALGORITHM_DELETE_COPY_ASSIGN(teca_thread_pool)
55 
56  // add a data request task to the queue, returns a future
57  // from which the generated dataset can be accessed.
58  void push_task(task_t &task);
59 
60  // wait for all of the requests to execute and transfer
61  // datasets in the order that corresponding requests
62  // were added to the queue.
63  template <template <typename ... > class container_t, typename ... args>
64  void wait_all(container_t<data_t, args ...> &data);
65 
66  // wait for some of the requests to execute. datasets will be retruned as
67  // they become ready. n_to_wait specifies how many datasets to gather but
68  // there are three cases when the number of datasets returned differs from
69  // n_to_wait. when n_to_wait is larger than the number of tasks remaining,
70  // datasets from all of the remaining tasks is returned. when n_to_wait is
71  // smaller than the number of datasets ready, all of the currenttly ready
72  // data are returned. finally, when n_to_wait is < 1 the call blocks until
73  // all of the tasks complete and all of the data is returned.
74  template <template <typename ... > class container_t, typename ... args>
75  int wait_some(long n_to_wait, long long poll_interval,
76  container_t<data_t, args ...> &data);
77 
78  // get the number of threads
79  unsigned int size() const noexcept
80  { return m_threads.size(); }
81 
82 private:
83  // create n threads for the pool
84  void create_threads(MPI_Comm comm, int n_threads, bool bind, bool verbose);
85 
86 private:
87  std::atomic<bool> m_live;
89 
90  std::vector<std::future<data_t>>
91  m_futures;
92 
93  std::vector<std::thread> m_threads;
94 };
95 
96 // --------------------------------------------------------------------------
97 template <typename task_t, typename data_t>
99  bool bind, bool verbose) : m_live(true)
100 {
101  this->create_threads(comm, n, bind, verbose);
102 }
103 
104 // --------------------------------------------------------------------------
105 template <typename task_t, typename data_t>
107  int n_requested, bool bind, bool verbose)
108 {
109  // this rank is excluded from computations
110  if (comm == MPI_COMM_NULL)
111  return;
112 
113  int n_threads = n_requested;
114 
115  std::deque<int> core_ids;
116 
117  if (teca_thread_util::thread_parameters(comm, -1,
118  n_requested, bind, verbose, n_threads, core_ids))
119  {
120  TECA_WARNING("Failed to detetermine thread parameters."
121  " Falling back to 1 thread, affinity disabled.")
122 
123  n_threads = 1;
124  bind = false;
125  }
126 
127  // allocate the threads
128  for (int i = 0; i < n_threads; ++i)
129  {
130  m_threads.push_back(std::thread([this]()
131  {
132  // "main" for each thread in the pool
133  while (m_live.load())
134  {
135  task_t task;
136  if (m_queue.try_pop(task))
137  task();
138  else
139  std::this_thread::yield();
140  }
141  }));
142 #if defined(_GNU_SOURCE)
143  // bind each to a hyperthread
144  if (bind)
145  {
146  int core_id = core_ids.front();
147  core_ids.pop_front();
148 
149  cpu_set_t core_mask;
150  CPU_ZERO(&core_mask);
151  CPU_SET(core_id, &core_mask);
152 
153  if (pthread_setaffinity_np(m_threads[i].native_handle(),
154  sizeof(cpu_set_t), &core_mask))
155  {
156  TECA_WARNING("Failed to set thread affinity.")
157  }
158  }
159 #endif
160  }
161 }
162 
163 // --------------------------------------------------------------------------
164 template <typename task_t, typename data_t>
166 {
167  m_live = false;
168  std::for_each(m_threads.begin(), m_threads.end(),
169  [](std::thread &t) { t.join(); });
170 }
171 
172 // --------------------------------------------------------------------------
173 template <typename task_t, typename data_t>
175 {
176  m_futures.push_back(task.get_future());
177  m_queue.push(std::move(task));
178 }
179 
180 // --------------------------------------------------------------------------
181 template <typename task_t, typename data_t>
182 template <template <typename ... > class container_t, typename ... args>
183 int teca_thread_pool<task_t, data_t>::wait_some(long n_to_wait,
184  long long poll_interval, container_t<data_t, args ...> &data)
185 {
186  long n_tasks = m_futures.size();
187 
188  // wait for all
189  if (n_to_wait < 1)
190  {
191  this->wait_all(data);
192  return 0;
193  }
194  // wait for at most the number of queued tasks
195  else if (n_to_wait > n_tasks)
196  n_to_wait = n_tasks;
197 
198 
199  // gather the requested number of datasets
200  while (1)
201  {
202  // scan the tasks once. capture any data that is ready
203  auto it = m_futures.begin();
204  while (it != m_futures.end())
205  {
206  std::future_status stat = it->wait_for(std::chrono::seconds::zero());
207  if (stat == std::future_status::ready)
208  {
209  data.push_back(it->get());
210  it = m_futures.erase(it);
211  }
212  else
213  {
214  ++it;
215  }
216  }
217 
218  // if we have not accumulated the requested number of datasets
219  // wait for the user supplied duration before re-scanning
220  if (data.size() < static_cast<unsigned int>(n_to_wait))
221  std::this_thread::sleep_for(std::chrono::nanoseconds(poll_interval));
222  else
223  break;
224  }
225 
226  // return the number of tasks remaining
227  return m_futures.size();
228 }
229 
230 // --------------------------------------------------------------------------
231 template <typename task_t, typename data_t>
232 template <template <typename ... > class container_t, typename ... args>
233 void teca_thread_pool<task_t, data_t>::wait_all(container_t<data_t, args ...> &data)
234 {
235  // wait on all pending requests and gather the generated
236  // datasets
237  std::for_each(m_futures.begin(), m_futures.end(),
238  [&data] (std::future<data_t> &f)
239  {
240  data.push_back(f.get());
241  });
242  m_futures.clear();
243 }
244 
245 #endif
teca_thread_pool
Definition: teca_thread_pool.h:33
teca_threadsafe_queue< task_t >