TECA
teca_threadsafe_queue.h
1 #ifndef teca_threadsafe_queue_h
2 #define teca_threadsafe_queue_h
3 
4 #include <mutex>
5 #include <queue>
6 #include <condition_variable>
7 
8 template<typename T>
10 {
11 public:
14  void operator=(const teca_threadsafe_queue<T> &other);
15 
16  // report current size
17  typename std::queue<T>::size_type size() const;
18 
19  // push a value onto the queue
20  void push(const T &val);
21  void push(T &&val);
22 
23  // pop a value from the queue, will block until data is ready.
24  void pop(T &val);
25 
26  // pop a value from the queue if data is present, will return
27  // false if no data is in the queue.
28  bool try_pop(T &val);
29 
30  // swap the contents
31  void swap(teca_threadsafe_queue<T> &other);
32 
33  // clear
34  void clear();
35 
36 private:
37  mutable std::mutex m_mutex;
38  std::queue<T> m_queue;
39  std::condition_variable m_ready;
40  std::condition_variable m_empty;
41 };
42 
43 // --------------------------------------------------------------------------
44 template<typename T>
46  const teca_threadsafe_queue<T> &other)
47 {
48  std::lock_guard<std::mutex> lock(other.m_mutex);
49  std::queue<T> tmp(other.m_queue);
50  m_queue.swap(tmp);
51 }
52 
53 // --------------------------------------------------------------------------
54 template<typename T>
56  const teca_threadsafe_queue<T> &other)
57 {
58  std::lock(m_mutex, other.m_mutex);
59  std::lock_guard<std::mutex> lock(m_mutex, std::adopt_lock);
60  std::lock_guard<std::mutex> lock_other(other.m_mutex, std::adopt_lock);
61  std::queue<T> tmp(other.m_queue);
62  m_queue.swap(tmp);
63 }
64 
65 // --------------------------------------------------------------------------
66 template<typename T>
68 {
69  std::lock(m_mutex, other.m_mutex);
70  std::lock_guard<std::mutex> lock(m_mutex, std::adopt_lock);
71  std::lock_guard<std::mutex> lock_other(other.m_mutex, std::adopt_lock);
72  m_queue.swap(other.m_queue);
73 }
74 
75 // --------------------------------------------------------------------------
76 template<typename T>
77 typename std::queue<T>::size_type teca_threadsafe_queue<T>::size() const
78 {
79  std::lock_guard<std::mutex> lock(m_mutex);
80  return m_queue.size();
81 }
82 
83 // --------------------------------------------------------------------------
84 template<typename T>
85 void teca_threadsafe_queue<T>::push(const T &val)
86 {
87  std::lock_guard<std::mutex> lock(m_mutex);
88  m_queue.push(val);
89  m_ready.notify_one();
90 }
91 
92 // --------------------------------------------------------------------------
93 template<typename T>
95 {
96  std::lock_guard<std::mutex> lock(m_mutex);
97  m_queue.push(std::move(val));
98  m_ready.notify_one();
99 }
100 
101 // --------------------------------------------------------------------------
102 template<typename T>
104 {
105  std::unique_lock<std::mutex> lock(m_mutex);
106  m_ready.wait(lock, [this] { return !m_queue.empty(); });
107  val = std::move(m_queue.front());
108  m_queue.pop();
109 }
110 
111 // --------------------------------------------------------------------------
112 template<typename T>
114 {
115  std::unique_lock<std::mutex> lock(m_mutex);
116  if (m_queue.empty())
117  return false;
118  val = std::move(m_queue.front());
119  m_queue.pop();
120  return true;
121 }
122 
123 // --------------------------------------------------------------------------
124 template<typename T>
126 {
127  std::lock_guard<std::mutex> lock(m_mutex);
128  m_queue.clear();
129 }
130 
131 #endif
teca_threadsafe_queue
Definition: teca_threadsafe_queue.h:10