RKNNManager.cpp 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. #include "RKNNManager.h"
  2. #include "PPYOLOE.hpp"
  3. #include "../DataManager/DataManager.h"
  4. #include "../DataManager/DataPackage.h"
  5. #include "../LogRecorder/LogOutput.h"
  6. // Image test
  7. #include "../ImageTest/ImageTest.h"
  8. #include <stdexcept>
  9. void RKNNManager::addRknnTask(std::string modelData)
  10. {
  11. try
  12. {
  13. m_threads.emplace_back(&RKNNManager::taskThread, this, modelData);
  14. m_threads.emplace_back(&RKNNManager::taskThread, this, modelData);
  15. m_threads.emplace_back(&RKNNManager::taskThread, this, modelData);
  16. }
  17. catch (const std::exception &e)
  18. {
  19. LOG_ERROR("Exception in addRknnTask: {}", e.what());
  20. }
  21. catch (...)
  22. {
  23. LOG_ERROR("Unknown exception in addRknnTask");
  24. }
  25. }
  26. void RKNNManager::taskThread(std::string modelpath)
  27. {
  28. grpc::ClientContext *context;
  29. R360::Empty response;
  30. std::string target_str = this->m_grpcServerAddress;
  31. grpc::ChannelArguments channel_args;
  32. channel_args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, 100 * 1024 * 1024); // 设置最大发送消息大小为100MB
  33. channel_args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 100 * 1024 * 1024); // 设置最大接收消息大小为100MB
  34. channel_args.SetInt(GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE, 4 * 1024 * 1024);
  35. std::unique_ptr<MessageService::Stub> stub_ = MessageService::NewStub(grpc::CreateCustomChannel(target_str, grpc::InsecureChannelCredentials(), channel_args));
  36. std::unique_ptr<grpc::ClientWriter<DataList>> writer;
  37. context = new grpc::ClientContext();
  38. writer = stub_->R360SendMessage(context, &response);
  39. MessageServiceClient client;
  40. PPYOLOE infer;
  41. if (!infer.initialize(modelpath))
  42. {
  43. std::cout << "Failed to initialize model!" << std::endl;
  44. LOG_INFO("The model initialize is failed!!!\n");
  45. return;
  46. }
  47. m_threadSwitch = true;
  48. int index = infer.get_index();
  49. DataPackagePtr dataPackage;
  50. std::string pipeName = "rknn" + std::to_string(index);
  51. try
  52. {
  53. DataManager::getInstance().addDataPipe<DataPackage>(pipeName);
  54. }
  55. catch (const std::exception &e)
  56. {
  57. std::cerr << "Exception when creating data pipe: " << e.what() << std::endl;
  58. return;
  59. }
  60. // time test
  61. UsbTest::HighResolutionTimer timer;
  62. while (m_threadSwitch)
  63. {
  64. try
  65. {
  66. if (DataManager::getInstance().popData("resized" + std::to_string(index), dataPackage))
  67. {
  68. try
  69. {
  70. if (!infer.infer(index, (unsigned char *)dataPackage->pResizeData,
  71. dataPackage->nResizeWidth, dataPackage->nResizeHeight, dataPackage))
  72. {
  73. LOG_ERROR("Inference failed for index {}", index);
  74. }
  75. std::cout << "Inference result count " << dataPackage->Result.count << std::endl;
  76. DataList *pDatalist = client.R360SendMessage(dataPackage);
  77. if (!writer->Write(*pDatalist))
  78. {
  79. DataManager::getInstance().releaseDataBuffer<DataPackage>(dataPackage);
  80. LOG_WARN("Failed to write data to writer for index {}", index);
  81. // reset the channel and continue
  82. writer.release();
  83. delete context;
  84. context = new grpc::ClientContext();
  85. writer = stub_->R360SendMessage(context, &response);
  86. continue;
  87. }
  88. // MemoryPool<DataPackage> *dataPool = DataManager::getInstance().getDataBuffer<DataPackage>();
  89. DataManager::getInstance().releaseDataBuffer<DataPackage>(dataPackage);
  90. }
  91. catch (const std::exception &e)
  92. {
  93. LOG_ERROR("Exception in data processing loop: {}", e.what());
  94. if (dataPackage)
  95. {
  96. DataManager::getInstance().releaseDataBuffer<DataPackage>(dataPackage);
  97. }
  98. }
  99. catch (...)
  100. {
  101. // in unknown exception case, we should release the data buffer
  102. LOG_ERROR("Unknown exception in data processing loop");
  103. if (dataPackage)
  104. DataManager::getInstance().releaseDataBuffer<DataPackage>(dataPackage);
  105. }
  106. continue;
  107. }
  108. }
  109. catch (const std::exception &e)
  110. {
  111. std::cerr << "Exception in data processing loop: " << e.what() << std::endl;
  112. }
  113. std::this_thread::sleep_for(std::chrono::milliseconds(20));
  114. }
  115. }
  116. bool RKNNManager::setupGrpcChannel(const std::string &target_str, std::unique_ptr<MessageService::Stub> &stub, std::unique_ptr<grpc::ClientWriter<DataList>> &writer, grpc::ClientContext &context, R360::Empty &response)
  117. {
  118. try
  119. {
  120. LOG_INFO("正在连接gRPC服务器: {}", target_str);
  121. // 设置超时
  122. std::chrono::system_clock::time_point deadline =
  123. std::chrono::system_clock::now() + std::chrono::seconds(5);
  124. context.set_deadline(deadline);
  125. // 设置通道参数
  126. grpc::ChannelArguments channel_args;
  127. channel_args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, 100 * 1024 * 1024);
  128. channel_args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 100 * 1024 * 1024);
  129. channel_args.SetInt(GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE, 4 * 1024 * 1024);
  130. channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 10000); // 10s保活
  131. channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 5000); // 超时5s
  132. channel_args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); // 允许无调用时发送保活
  133. // 创建通道和存根
  134. auto channel = grpc::CreateCustomChannel(
  135. target_str, grpc::InsecureChannelCredentials(), channel_args);
  136. // 等待通道连接就绪
  137. auto state = channel->GetState(true);
  138. if (!channel->WaitForConnected(deadline))
  139. {
  140. LOG_ERROR("gRPC通道连接超时");
  141. return false;
  142. }
  143. stub = MessageService::NewStub(channel);
  144. writer = stub->R360SendMessage(&context, &response);
  145. LOG_INFO("gRPC通道连接成功");
  146. return true;
  147. }
  148. catch (const std::exception &e)
  149. {
  150. LOG_ERROR("建立gRPC连接异常: {}", e.what());
  151. return false;
  152. }
  153. catch (...)
  154. {
  155. LOG_ERROR("建立gRPC连接时发生未知异常");
  156. return false;
  157. }
  158. }