123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- #include "RKNNManager.h"
- #include "PPYOLOE.hpp"
- #include "../DataManager/DataManager.h"
- #include "../DataManager/DataPackage.h"
- #include "../LogRecorder/LogOutput.h"
- // Image test
- #include "../ImageTest/ImageTest.h"
- #include <stdexcept>
- void RKNNManager::addRknnTask(std::string modelData)
- {
- try
- {
- m_threads.emplace_back(&RKNNManager::taskThread, this, modelData);
- m_threads.emplace_back(&RKNNManager::taskThread, this, modelData);
- m_threads.emplace_back(&RKNNManager::taskThread, this, modelData);
- }
- catch (const std::exception &e)
- {
- LOG_ERROR("Exception in addRknnTask: {}", e.what());
- }
- catch (...)
- {
- LOG_ERROR("Unknown exception in addRknnTask");
- }
- }
- void RKNNManager::taskThread(std::string modelpath)
- {
- grpc::ClientContext *context;
- R360::Empty response;
- std::string target_str = this->m_grpcServerAddress;
- grpc::ChannelArguments channel_args;
- channel_args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, 100 * 1024 * 1024); // 设置最大发送消息大小为100MB
- channel_args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 100 * 1024 * 1024); // 设置最大接收消息大小为100MB
- channel_args.SetInt(GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE, 4 * 1024 * 1024);
- std::unique_ptr<MessageService::Stub> stub_ = MessageService::NewStub(grpc::CreateCustomChannel(target_str, grpc::InsecureChannelCredentials(), channel_args));
- std::unique_ptr<grpc::ClientWriter<DataList>> writer;
- context = new grpc::ClientContext();
- writer = stub_->R360SendMessage(context, &response);
- MessageServiceClient client;
- PPYOLOE infer;
- if (!infer.initialize(modelpath))
- {
- std::cout << "Failed to initialize model!" << std::endl;
- LOG_INFO("The model initialize is failed!!!\n");
- return;
- }
- m_threadSwitch = true;
- int index = infer.get_index();
- DataPackagePtr dataPackage;
- std::string pipeName = "rknn" + std::to_string(index);
- try
- {
- DataManager::getInstance().addDataPipe<DataPackage>(pipeName);
- }
- catch (const std::exception &e)
- {
- std::cerr << "Exception when creating data pipe: " << e.what() << std::endl;
- return;
- }
- // time test
- UsbTest::HighResolutionTimer timer;
- while (m_threadSwitch)
- {
- try
- {
- if (DataManager::getInstance().popData("resized" + std::to_string(index), dataPackage))
- {
- try
- {
- if (!infer.infer(index, (unsigned char *)dataPackage->pResizeData,
- dataPackage->nResizeWidth, dataPackage->nResizeHeight, dataPackage))
- {
- LOG_ERROR("Inference failed for index {}", index);
- }
- std::cout << "Inference result count " << dataPackage->Result.count << std::endl;
- DataList *pDatalist = client.R360SendMessage(dataPackage);
- if (!writer->Write(*pDatalist))
- {
- DataManager::getInstance().releaseDataBuffer<DataPackage>(dataPackage);
- LOG_WARN("Failed to write data to writer for index {}", index);
- // reset the channel and continue
- writer.release();
- delete context;
- context = new grpc::ClientContext();
- writer = stub_->R360SendMessage(context, &response);
- continue;
- }
- // MemoryPool<DataPackage> *dataPool = DataManager::getInstance().getDataBuffer<DataPackage>();
- DataManager::getInstance().releaseDataBuffer<DataPackage>(dataPackage);
- }
- catch (const std::exception &e)
- {
- LOG_ERROR("Exception in data processing loop: {}", e.what());
- if (dataPackage)
- {
- DataManager::getInstance().releaseDataBuffer<DataPackage>(dataPackage);
- }
- }
- catch (...)
- {
- // in unknown exception case, we should release the data buffer
- LOG_ERROR("Unknown exception in data processing loop");
- if (dataPackage)
- DataManager::getInstance().releaseDataBuffer<DataPackage>(dataPackage);
- }
- continue;
- }
- }
- catch (const std::exception &e)
- {
- std::cerr << "Exception in data processing loop: " << e.what() << std::endl;
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(20));
- }
- }
- bool RKNNManager::setupGrpcChannel(const std::string &target_str, std::unique_ptr<MessageService::Stub> &stub, std::unique_ptr<grpc::ClientWriter<DataList>> &writer, grpc::ClientContext &context, R360::Empty &response)
- {
- try
- {
- LOG_INFO("正在连接gRPC服务器: {}", target_str);
- // 设置超时
- std::chrono::system_clock::time_point deadline =
- std::chrono::system_clock::now() + std::chrono::seconds(5);
- context.set_deadline(deadline);
- // 设置通道参数
- grpc::ChannelArguments channel_args;
- channel_args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, 100 * 1024 * 1024);
- channel_args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 100 * 1024 * 1024);
- channel_args.SetInt(GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE, 4 * 1024 * 1024);
- channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 10000); // 10s保活
- channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 5000); // 超时5s
- channel_args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); // 允许无调用时发送保活
- // 创建通道和存根
- auto channel = grpc::CreateCustomChannel(
- target_str, grpc::InsecureChannelCredentials(), channel_args);
- // 等待通道连接就绪
- auto state = channel->GetState(true);
- if (!channel->WaitForConnected(deadline))
- {
- LOG_ERROR("gRPC通道连接超时");
- return false;
- }
- stub = MessageService::NewStub(channel);
- writer = stub->R360SendMessage(&context, &response);
- LOG_INFO("gRPC通道连接成功");
- return true;
- }
- catch (const std::exception &e)
- {
- LOG_ERROR("建立gRPC连接异常: {}", e.what());
- return false;
- }
- catch (...)
- {
- LOG_ERROR("建立gRPC连接时发生未知异常");
- return false;
- }
- }
|