RKNNManager.cpp 8.6 KB


  1. #include "RKNNManager.h"
  2. #include "PPYOLOE.hpp"
  3. #include "../DataManager/DataManager.h"
  4. #include "../DataManager/DataPackage.h"
  5. #include "../LogRecorder/LogOutput.h"
  6. #include "../ThreadGuardian/ThreadGuardian.h"
  7. // Image test
  8. #include "../ImageTest/ImageTest.h"
  9. #include <stdexcept>
  10. void RKNNManager::addRknnTask(std::string modelData)
  11. {
  12. try
  13. {
  14. m_vThreadSwitch.resize(3);
  15. m_threads.clear();
  16. m_bThreadSwitch = true;
  17. m_threads.emplace_back(&RKNNManager::taskThread, this, 0, modelData);
  18. int index = m_threads.size() - 1;
  19. ThreadGuardian::getInstance().registerAIThread(index, std::bind(&RKNNManager::restartRknnThread, this, 0));
  20. m_threads.emplace_back(&RKNNManager::taskThread, this, 1, modelData);
  21. index = m_threads.size() - 1;
  22. ThreadGuardian::getInstance().registerAIThread(index, std::bind(&RKNNManager::restartRknnThread, this, 1));
  23. m_threads.emplace_back(&RKNNManager::taskThread, this, 2, modelData);
  24. index = m_threads.size() - 1;
  25. ThreadGuardian::getInstance().registerAIThread(index, std::bind(&RKNNManager::restartRknnThread, this, 2));
  26. }
  27. catch (const std::exception &e)
  28. {
  29. LOG_ERROR("Exception in addRknnTask: {}", e.what());
  30. }
  31. catch (...)
  32. {
  33. LOG_ERROR("Unknown exception in addRknnTask");
  34. }
  35. }
  36. void RKNNManager::restartRknnThread(int threadIndex)
  37. {
  38. // 先停止当前线程
  39. // if (m_threads[threadIndex].joinable())
  40. // {
  41. // m_vThreadSwitch[threadIndex] = false;
  42. // m_threads[threadIndex].join();
  43. // }
  44. if (threadIndex >= 0 && threadIndex < m_threads.size())
  45. {
  46. // // 1. 检查旧线程是否可合并(仍在运行)
  47. // if (m_threads[threadIndex].joinable())
  48. // {
  49. // // 2. 选择等待旧线程结束或分离它(二选一)
  50. // m_threads[threadIndex].join(); // 等待旧线程完成
  51. // // 或者
  52. // // m_threads[threadIndex].detach(); // 让旧线程独立运行(有风险!)
  53. // }
  54. // 重新启动线程
  55. if (threadIndex >= 0 && threadIndex < m_threads.size())
  56. {
  57. m_threads[threadIndex] = std::thread(&RKNNManager::taskThread, this, threadIndex, m_modelData);
  58. }
  59. }
  60. }
  61. void RKNNManager::updateThreadStatus(int threadIndex)
  62. {
  63. std::string threadName = "rknnThread_" + std::to_string(threadIndex);
  64. ThreadGuardian::getInstance().updateThreadHeartbeat(threadName);
  65. }
  66. void RKNNManager::taskThread(int index, std::string modelpath)
  67. {
  68. grpc::ClientContext *context;
  69. R360::Empty response;
  70. std::string target_str = this->m_grpcServerAddress;
  71. grpc::ChannelArguments channel_args;
  72. channel_args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, 100 * 1024 * 1024); // 设置最大发送消息大小为100MB
  73. channel_args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 100 * 1024 * 1024); // 设置最大接收消息大小为100MB
  74. channel_args.SetInt(GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE, 4 * 1024 * 1024);
  75. channel_args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 10000); // 10秒
  76. channel_args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 1000); // 1秒
  77. channel_args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 1000); // 1
  78. channel_args.SetInt(GRPC_ARG_ENABLE_RETRIES, 1); // 启用重试机制
  79. std::unique_ptr<MessageService::Stub> stub_ = MessageService::NewStub(grpc::CreateCustomChannel(target_str, grpc::InsecureChannelCredentials(), channel_args));
  80. std::unique_ptr<grpc::ClientWriter<DataList>> writer;
  81. context = new grpc::ClientContext();
  82. writer = stub_->R360SendMessage(context, &response);
  83. MessageServiceClient client;
  84. PPYOLOE infer;
  85. if (!infer.initialize(modelpath))
  86. {
  87. std::cout << "Failed to initialize model!" << std::endl;
  88. LOG_INFO("The model initialize is failed!!!\n");
  89. return;
  90. }
  91. DataPackagePtr dataPackage;
  92. std::string pipeName = "rknn" + std::to_string(index);
  93. try
  94. {
  95. DataManager::getInstance().addDataPipe<DataPackage>(pipeName);
  96. }
  97. catch (const std::exception &e)
  98. {
  99. std::cerr << "Exception when creating data pipe: " << e.what() << std::endl;
  100. return;
  101. }
  102. // time test
  103. UsbTest::HighResolutionTimer timer;
  104. while (m_bThreadSwitch)
  105. {
  106. try
  107. {
  108. updateThreadStatus(index);
  109. if (DataManager::getInstance().popData("resized" + std::to_string(index), dataPackage))
  110. {
  111. try
  112. {
  113. if (!infer.infer(index, (unsigned char *)dataPackage->pResizeData,
  114. dataPackage->nResizeWidth, dataPackage->nResizeHeight, dataPackage))
  115. {
  116. LOG_ERROR("Inference failed for index {}", index);
  117. }
  118. std::cout << "Inference result count " << dataPackage->Result.count << std::endl;
  119. DataList *pDatalist = client.R360SendMessage(dataPackage);
  120. if (!writer->Write(*pDatalist))
  121. {
  122. DataManager::getInstance().releaseDataBuffer<DataPackage>(dataPackage);
  123. LOG_WARN("Failed to write data to writer for index {}", index);
  124. // reset the channel and continue
  125. writer.release();
  126. delete context;
  127. context = new grpc::ClientContext();
  128. writer = stub_->R360SendMessage(context, &response);
  129. continue;
  130. }
  131. // MemoryPool<DataPackage> *dataPool = DataManager::getInstance().getDataBuffer<DataPackage>();
  132. DataManager::getInstance().releaseDataBuffer<DataPackage>(dataPackage);
  133. }
  134. catch (const std::exception &e)
  135. {
  136. LOG_ERROR("Exception in data processing loop: {}", e.what());
  137. if (dataPackage)
  138. {
  139. DataManager::getInstance().releaseDataBuffer<DataPackage>(dataPackage);
  140. }
  141. }
  142. catch (...)
  143. {
  144. // in unknown exception case, we should release the data buffer
  145. LOG_ERROR("Unknown exception in data processing loop");
  146. if (dataPackage)
  147. DataManager::getInstance().releaseDataBuffer<DataPackage>(dataPackage);
  148. }
  149. continue;
  150. }
  151. }
  152. catch (const std::exception &e)
  153. {
  154. std::cerr << "Exception in data processing loop: " << e.what() << std::endl;
  155. }
  156. std::this_thread::sleep_for(std::chrono::milliseconds(20));
  157. }
  158. }
  159. bool RKNNManager::setupGrpcChannel(const std::string &target_str, std::unique_ptr<MessageService::Stub> &stub, std::unique_ptr<grpc::ClientWriter<DataList>> &writer, grpc::ClientContext &context, R360::Empty &response)
  160. {
  161. try
  162. {
  163. LOG_INFO("正在连接gRPC服务器: {}", target_str);
  164. // 设置超时
  165. std::chrono::system_clock::time_point deadline =
  166. std::chrono::system_clock::now() + std::chrono::seconds(5);
  167. context.set_deadline(deadline);
  168. // 设置通道参数
  169. grpc::ChannelArguments channel_args;
  170. channel_args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, 100 * 1024 * 1024);
  171. channel_args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 100 * 1024 * 1024);
  172. channel_args.SetInt(GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE, 4 * 1024 * 1024);
  173. channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 10000); // 10s保活
  174. channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 5000); // 超时5s
  175. channel_args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); // 允许无调用时发送保活
  176. // 创建通道和存根
  177. auto channel = grpc::CreateCustomChannel(
  178. target_str, grpc::InsecureChannelCredentials(), channel_args);
  179. // 等待通道连接就绪
  180. auto state = channel->GetState(true);
  181. if (!channel->WaitForConnected(deadline))
  182. {
  183. LOG_ERROR("gRPC通道连接超时");
  184. return false;
  185. }
  186. stub = MessageService::NewStub(channel);
  187. writer = stub->R360SendMessage(&context, &response);
  188. LOG_INFO("gRPC通道连接成功");
  189. return true;
  190. }
  191. catch (const std::exception &e)
  192. {
  193. LOG_ERROR("建立gRPC连接异常: {}", e.what());
  194. return false;
  195. }
  196. catch (...)
  197. {
  198. LOG_ERROR("建立gRPC连接时发生未知异常");
  199. return false;
  200. }
  201. }