#include "RKNNManager.h" #include "PPYOLOE.hpp" #include "../DataManager/DataManager.h" #include "../DataManager/DataPackage.h" #include "../LogRecorder/LogOutput.h" #include "../ThreadGuardian/ThreadGuardian.h" // Image test #include "../ImageTest/ImageTest.h" #include void RKNNManager::addRknnTask(std::string modelData) { try { m_vThreadSwitch.resize(3); m_threads.clear(); m_bThreadSwitch = true; m_threads.emplace_back(&RKNNManager::taskThread, this, 0, modelData); int index = m_threads.size() - 1; ThreadGuardian::getInstance().registerAIThread(index, std::bind(&RKNNManager::restartRknnThread, this, 0)); m_threads.emplace_back(&RKNNManager::taskThread, this, 1, modelData); index = m_threads.size() - 1; ThreadGuardian::getInstance().registerAIThread(index, std::bind(&RKNNManager::restartRknnThread, this, 1)); m_threads.emplace_back(&RKNNManager::taskThread, this, 2, modelData); index = m_threads.size() - 1; ThreadGuardian::getInstance().registerAIThread(index, std::bind(&RKNNManager::restartRknnThread, this, 2)); } catch (const std::exception &e) { LOG_ERROR("Exception in addRknnTask: {}", e.what()); } catch (...) { LOG_ERROR("Unknown exception in addRknnTask"); } } void RKNNManager::restartRknnThread(int threadIndex) { // 先停止当前线程 // if (m_threads[threadIndex].joinable()) // { // m_vThreadSwitch[threadIndex] = false; // m_threads[threadIndex].join(); // } if (threadIndex >= 0 && threadIndex < m_threads.size()) { // // 1. 检查旧线程是否可合并(仍在运行) // if (m_threads[threadIndex].joinable()) // { // // 2. 选择等待旧线程结束或分离它(二选一) // m_threads[threadIndex].join(); // 等待旧线程完成 // // 或者 // // m_threads[threadIndex].detach(); // 让旧线程独立运行(有风险!) // } // 重新启动线程 if (threadIndex >= 0 && threadIndex < m_threads.size()) { m_threads[threadIndex] = std::thread(&RKNNManager::taskThread, this, threadIndex, m_modelData); } } } void RKNNManager::updateThreadStatus(int threadIndex) { std::string threadName = "rknnThread_" + std::to_string(threadIndex); ThreadGuardian::getInstance().updateThreadHeartbeat(threadName); } void RKNNManager::taskThread(int index, std::string modelpath) { grpc::ClientContext *context; R360::Empty response; std::string target_str = this->m_grpcServerAddress; grpc::ChannelArguments channel_args; channel_args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, 100 * 1024 * 1024); // 设置最大发送消息大小为100MB channel_args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 100 * 1024 * 1024); // 设置最大接收消息大小为100MB channel_args.SetInt(GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE, 4 * 1024 * 1024); channel_args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 10000); // 10秒 channel_args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 1000); // 1秒 channel_args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 1000); // 1 channel_args.SetInt(GRPC_ARG_ENABLE_RETRIES, 1); // 启用重试机制 std::unique_ptr stub_ = MessageService::NewStub(grpc::CreateCustomChannel(target_str, grpc::InsecureChannelCredentials(), channel_args)); std::unique_ptr> writer; context = new grpc::ClientContext(); writer = stub_->R360SendMessage(context, &response); MessageServiceClient client; PPYOLOE infer; if (!infer.initialize(modelpath)) { std::cout << "Failed to initialize model!" << std::endl; LOG_INFO("The model initialize is failed!!!\n"); return; } DataPackagePtr dataPackage; std::string pipeName = "rknn" + std::to_string(index); try { DataManager::getInstance().addDataPipe(pipeName); } catch (const std::exception &e) { std::cerr << "Exception when creating data pipe: " << e.what() << std::endl; return; } // time test UsbTest::HighResolutionTimer timer; while (m_bThreadSwitch) { try { updateThreadStatus(index); if (DataManager::getInstance().popData("resized" + std::to_string(index), dataPackage)) { try { if (!infer.infer(index, (unsigned char *)dataPackage->pResizeData, dataPackage->nResizeWidth, dataPackage->nResizeHeight, dataPackage)) { LOG_ERROR("Inference failed for index {}", index); } std::cout << "Inference result count " << dataPackage->Result.count << std::endl; DataList *pDatalist = client.R360SendMessage(dataPackage); if (!writer->Write(*pDatalist)) { DataManager::getInstance().releaseDataBuffer(dataPackage); LOG_WARN("Failed to write data to writer for index {}", index); // reset the channel and continue writer.release(); delete context; context = new grpc::ClientContext(); writer = stub_->R360SendMessage(context, &response); continue; } // MemoryPool *dataPool = DataManager::getInstance().getDataBuffer(); DataManager::getInstance().releaseDataBuffer(dataPackage); } catch (const std::exception &e) { LOG_ERROR("Exception in data processing loop: {}", e.what()); if (dataPackage) { DataManager::getInstance().releaseDataBuffer(dataPackage); } } catch (...) { // in unknown exception case, we should release the data buffer LOG_ERROR("Unknown exception in data processing loop"); if (dataPackage) DataManager::getInstance().releaseDataBuffer(dataPackage); } continue; } } catch (const std::exception &e) { std::cerr << "Exception in data processing loop: " << e.what() << std::endl; } std::this_thread::sleep_for(std::chrono::milliseconds(20)); } } bool RKNNManager::setupGrpcChannel(const std::string &target_str, std::unique_ptr &stub, std::unique_ptr> &writer, grpc::ClientContext &context, R360::Empty &response) { try { LOG_INFO("正在连接gRPC服务器: {}", target_str); // 设置超时 std::chrono::system_clock::time_point deadline = std::chrono::system_clock::now() + std::chrono::seconds(5); context.set_deadline(deadline); // 设置通道参数 grpc::ChannelArguments channel_args; channel_args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, 100 * 1024 * 1024); channel_args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 100 * 1024 * 1024); channel_args.SetInt(GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE, 4 * 1024 * 1024); channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 10000); // 10s保活 channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 5000); // 超时5s channel_args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); // 允许无调用时发送保活 // 创建通道和存根 auto channel = grpc::CreateCustomChannel( target_str, grpc::InsecureChannelCredentials(), channel_args); // 等待通道连接就绪 auto state = channel->GetState(true); if (!channel->WaitForConnected(deadline)) { LOG_ERROR("gRPC通道连接超时"); return false; } stub = MessageService::NewStub(channel); writer = stub->R360SendMessage(&context, &response); LOG_INFO("gRPC通道连接成功"); return true; } catch (const std::exception &e) { LOG_ERROR("建立gRPC连接异常: {}", e.what()); return false; } catch (...) { LOG_ERROR("建立gRPC连接时发生未知异常"); return false; } }