RKNNManager.cpp 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. #include "RKNNManager.h"
  2. #include "PPYOLOE.hpp"
  3. #include "../DataManager/DataManager.h"
  4. #include "../DataManager/DataPackage.h"
  5. #include "../LogRecorder/LogOutput.h"
  6. // Image test
  7. #include "../ImageTest/ImageTest.h"
  8. #include <stdexcept>
  9. void RKNNManager::addRknnTask(std::string modelData)
  10. {
  11. try
  12. {
  13. m_threads.emplace_back(&RKNNManager::taskThread, this, modelData);
  14. m_threads.emplace_back(&RKNNManager::taskThread, this, modelData);
  15. m_threads.emplace_back(&RKNNManager::taskThread, this, modelData);
  16. }
  17. catch (const std::exception &e)
  18. {
  19. LOG_ERROR("Exception in addRknnTask: {}", e.what());
  20. }
  21. catch (...)
  22. {
  23. LOG_ERROR("Unknown exception in addRknnTask");
  24. }
  25. }
  26. void RKNNManager::taskThread(std::string modelpath)
  27. {
  28. grpc::ClientContext *context;
  29. R360::Empty response;
  30. std::string target_str = this->m_grpcServerAddress;
  31. grpc::ChannelArguments channel_args;
  32. channel_args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, 100 * 1024 * 1024); // 设置最大发送消息大小为100MB
  33. channel_args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 100 * 1024 * 1024); // 设置最大接收消息大小为100MB
  34. channel_args.SetInt(GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE, 4 * 1024 * 1024);
  35. std::unique_ptr<MessageService::Stub> stub_ = MessageService::NewStub(grpc::CreateCustomChannel(target_str, grpc::InsecureChannelCredentials(), channel_args));
  36. std::unique_ptr<grpc::ClientWriter<DataList>> writer;
  37. context = new grpc::ClientContext();
  38. writer = stub_->R360SendMessage(context, &response);
  39. MessageServiceClient client;
  40. PPYOLOE infer;
  41. if (!infer.initialize(modelpath))
  42. {
  43. std::cout << "Failed to initialize model!" << std::endl;
  44. LOG_INFO("The model initialize is failed!!!\n");
  45. return;
  46. }
  47. m_threadSwitch = true;
  48. int index = infer.get_index();
  49. DataPackagePtr dataPackage;
  50. std::string pipeName = "rknn" + std::to_string(index);
  51. try
  52. {
  53. DataManager::getInstance().addDataPipe<DataPackage>(pipeName);
  54. }
  55. catch (const std::exception &e)
  56. {
  57. std::cerr << "Exception when creating data pipe: " << e.what() << std::endl;
  58. return;
  59. }
  60. // time test
  61. UsbTest::HighResolutionTimer timer;
  62. while (m_threadSwitch)
  63. {
  64. try
  65. {
  66. if (DataManager::getInstance().popData("resized" + std::to_string(index), dataPackage))
  67. {
  68. try
  69. {
  70. if (!infer.infer(index, (unsigned char *)dataPackage->pResizeData,
  71. dataPackage->nResizeWidth, dataPackage->nResizeHeight, dataPackage))
  72. {
  73. LOG_ERROR("Inference failed for index {}", index);
  74. }
  75. DataList *pDatalist = client.R360SendMessage(dataPackage);
  76. if (!writer->Write(*pDatalist))
  77. {
  78. DataManager::getInstance().releaseDataBuffer<DataPackage>(dataPackage);
  79. LOG_WARN("Failed to write data to writer for index {}", index);
  80. // reset the channel and continue
  81. writer.release();
  82. delete context;
  83. context = new grpc::ClientContext();
  84. writer = stub_->R360SendMessage(context, &response);
  85. continue;
  86. }
  87. // MemoryPool<DataPackage> *dataPool = DataManager::getInstance().getDataBuffer<DataPackage>();
  88. DataManager::getInstance().releaseDataBuffer<DataPackage>(dataPackage);
  89. }
  90. catch (const std::exception &e)
  91. {
  92. LOG_ERROR("Exception in data processing loop: {}", e.what());
  93. if (dataPackage)
  94. {
  95. try
  96. {
  97. DataManager::getInstance().releaseDataBuffer<DataPackage>(dataPackage);
  98. }
  99. catch (...)
  100. { /* 忽略释放时的异常 */
  101. }
  102. }
  103. }
  104. catch (...)
  105. {
  106. // in unknown exception case, we should release the data buffer
  107. LOG_ERROR("Unknown exception in data processing loop");
  108. if (dataPackage)
  109. DataManager::getInstance().releaseDataBuffer<DataPackage>(dataPackage);
  110. }
  111. continue;
  112. }
  113. }
  114. catch (const std::exception &e)
  115. {
  116. std::cerr << "Exception in data processing loop: " << e.what() << std::endl;
  117. }
  118. std::this_thread::sleep_for(std::chrono::milliseconds(20));
  119. }
  120. }
  121. bool RKNNManager::setupGrpcChannel(const std::string &target_str, std::unique_ptr<MessageService::Stub> &stub, std::unique_ptr<grpc::ClientWriter<DataList>> &writer, grpc::ClientContext &context, R360::Empty &response)
  122. {
  123. try
  124. {
  125. LOG_INFO("正在连接gRPC服务器: {}", target_str);
  126. // 设置超时
  127. std::chrono::system_clock::time_point deadline =
  128. std::chrono::system_clock::now() + std::chrono::seconds(5);
  129. context.set_deadline(deadline);
  130. // 设置通道参数
  131. grpc::ChannelArguments channel_args;
  132. channel_args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, 100 * 1024 * 1024);
  133. channel_args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 100 * 1024 * 1024);
  134. channel_args.SetInt(GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE, 4 * 1024 * 1024);
  135. channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 10000); // 10s保活
  136. channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 5000); // 超时5s
  137. channel_args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); // 允许无调用时发送保活
  138. // 创建通道和存根
  139. auto channel = grpc::CreateCustomChannel(
  140. target_str, grpc::InsecureChannelCredentials(), channel_args);
  141. // 等待通道连接就绪
  142. auto state = channel->GetState(true);
  143. if (!channel->WaitForConnected(deadline))
  144. {
  145. LOG_ERROR("gRPC通道连接超时");
  146. return false;
  147. }
  148. stub = MessageService::NewStub(channel);
  149. writer = stub->R360SendMessage(&context, &response);
  150. LOG_INFO("gRPC通道连接成功");
  151. return true;
  152. }
  153. catch (const std::exception &e)
  154. {
  155. LOG_ERROR("建立gRPC连接异常: {}", e.what());
  156. return false;
  157. }
  158. catch (...)
  159. {
  160. LOG_ERROR("建立gRPC连接时发生未知异常");
  161. return false;
  162. }
  163. }