TECA
teca_threaded_algorithm.h
1 #ifndef teca_threaded_algorithm_h
2 #define teca_threaded_algorithm_h
3 
4 #include "teca_algorithm.h"
5 #include "teca_threaded_algorithm_fwd.h"
6 #include "teca_algorithm_output_port.h"
7 #include "teca_dataset.h"
8 
9 template <typename task_t, typename data_t>
10 class teca_thread_pool;
11 
12 class teca_metadata;
14 
15 #include <thread>
16 #include <future>
17 
18 // declare the thread pool type
19 using teca_data_request_task = std::packaged_task<const_p_teca_dataset()>;
20 
21 class teca_data_request;
24 
25 using p_teca_data_request_queue = std::shared_ptr<teca_data_request_queue>;
26 
27 p_teca_data_request_queue new_teca_data_request_queue(MPI_Comm comm,
28  int n, bool bind, bool verbose);
29 
30 // this is the base class defining a threaded algorithm.
31 // the stratgey employed is to parallelize over upstream
32 // data requests using a thread pool.
34 {
35 public:
36  TECA_ALGORITHM_STATIC_NEW(teca_threaded_algorithm)
37  TECA_ALGORITHM_DELETE_COPY_ASSIGN(teca_threaded_algorithm)
38  TECA_ALGORITHM_CLASS_NAME(teca_threaded_algorithm)
39  virtual ~teca_threaded_algorithm() noexcept;
40 
41  // report/initialize to/from Boost program options
42  // objects.
43  TECA_GET_ALGORITHM_PROPERTIES_DESCRIPTION()
44  TECA_SET_ALGORITHM_PROPERTIES()
45 
46  // set/get the number of threads in the pool. setting
47  // to -1 results in a thread per core factoring in all MPI
48  // ranks running on the node. the default is -1.
49  void set_thread_pool_size(int n_threads);
50  unsigned int get_thread_pool_size() const noexcept;
51 
52  // set/get the verbosity level.
53  TECA_ALGORITHM_PROPERTY(int, verbose);
54 
55  // set/get thread affinity mode. When 0 threads are not bound
56  // CPU cores, allowing for migration among all cores. This will
57  // likely degrade performance. Default is 1.
58  TECA_ALGORITHM_PROPERTY(int, bind_threads);
59 
60  // set the smallest number of datasets to gather per call to
61  // execute. the default (-1) results in all datasets being
62  // gathered. In practice more datasets will be returned if
63  // ready
64  TECA_ALGORITHM_PROPERTY(int, stream_size);
65 
66  // set the duration in nano seconds to wait between checking
67  // for completed tasks
68  TECA_ALGORITHM_PROPERTY(long long, poll_interval);
69 
70  // explicitly set the thread pool to submit requests to
71  void set_data_request_queue(const p_teca_data_request_queue &queue);
72 
73 protected:
75 
76  // streaming execute. streaming flag will be set when there is more
77  // data to process. it is not safe to use MPI when the streaming flag
78  // is set. on the last call streaming flag will not be set, at that
79  // point MPI may be used.
80  virtual
81  const_p_teca_dataset execute(unsigned int port,
82  const std::vector<const_p_teca_dataset> &input_data,
83  const teca_metadata &request, int streaming);
84 
85  // forward to streaming execute
86  const_p_teca_dataset execute(unsigned int port,
87  const std::vector<const_p_teca_dataset> &input_data,
88  const teca_metadata &request) override;
89 
90  // driver function that manages execution of the given
91  // requst on the named port. each upstream request issued
92  // will be executed by the thread pool.
93  const_p_teca_dataset request_data(teca_algorithm_output_port &port,
94  const teca_metadata &request) override;
95 private:
96  int verbose;
97  int bind_threads;
98  int stream_size;
99  long long poll_interval;
100 
102 };
103 
104 #endif
teca_metadata
Definition: teca_metadata.h:17
teca_thread_pool
Definition: teca_thread_pool.h:33
teca_threaded_algorithm_internals
Definition: teca_threaded_algorithm.cxx:51
teca_threaded_algorithm
Definition: teca_threaded_algorithm.h:34
teca_algorithm
Definition: teca_algorithm.h:25
teca_data_request
Definition: teca_threaded_algorithm.cxx:31