DataPipe.h 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. #ifndef DATAPipe_H
  2. #define DATAPipe_H
  3. #include <queue>
  4. #include <thread>
  5. #include <memory>
  6. #include <mutex>
  7. #include <condition_variable>
  8. #include <functional>
  9. #include "DataPackage.h"
  10. #define BUFFER_SIZE 30
  11. template <typename T>
  12. class DataPipe
  13. {
  14. public:
  15. DataPipe(size_t bufferSize = BUFFER_SIZE);
  16. ~DataPipe();
  17. void pushData(T *data);
  18. bool popData(T *&data);
  19. private:
  20. size_t m_bufferSize;
  21. std::queue<T *> m_bufferQueue;
  22. std::mutex m_mtx;
  23. std::condition_variable m_cv;
  24. };
  25. template <typename T>
  26. DataPipe<T>::DataPipe(size_t bufferSize) : m_bufferSize(bufferSize) {}
  27. template <typename T>
  28. DataPipe<T>::~DataPipe() {}
  29. template <typename T>
  30. void DataPipe<T>::pushData(T *data)
  31. {
  32. std::unique_lock<std::mutex> lock(m_mtx);
  33. if (m_bufferQueue.size() >= m_bufferSize)
  34. {
  35. m_bufferQueue.pop();
  36. }
  37. m_bufferQueue.push(data);
  38. m_cv.notify_one();
  39. }
  40. template <typename T>
  41. bool DataPipe<T>::popData(T *&data)
  42. {
  43. std::unique_lock<std::mutex> lock(m_mtx);
  44. if (m_bufferQueue.empty())
  45. {
  46. return false;
  47. }
  48. data = m_bufferQueue.front();
  49. m_bufferQueue.pop();
  50. return true;
  51. }
  52. typedef std::shared_ptr<DataPipe<DataPackage>> DataPipePtr;
  53. #endif // DATAPipe_H