from pymodbus.client import ModbusTcpClient import struct import logging class MotorClient: def __init__(self): """初始化Modbus RTU客户端。""" self.resolution = 2097152 # 设置分辨率 self.pole_pairs = 8 # 电机极对数 self.modbus_client = ModbusTcpClient(host="192.168.211.7", port=8899, timeout=3) # 尝试连接Modbus设备 if not self.modbus_client.connect(): raise ConnectionError("无法连接到Modbus RTU设备。") self.logger = logging.getLogger('motor') self.setup_logger('motor.log') def setup_logger(self, log_filename): """设置日志记录器。""" # 移除basicConfig,改为直接配置motor记录器 self.logger.setLevel(logging.DEBUG) handler = logging.FileHandler(log_filename, encoding='utf-8') handler.setFormatter(logging.Formatter( '%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' )) self.logger.addHandler(handler) # 添加文件处理器 self.logger.propagate = False # 防止传播到根记录器 def write(self, address, value, datatype='UNS16', unit=2): """发送消息到Modbus设备。""" # 根据数据类型发送消息 if datatype == 'UNS16': # 写入单个16位寄存器 response = self.modbus_client.write_register(address - 1, value, slave=2) # 获取原始报文字节 # self.logger.debug(f"发送: {response}") else: # 写入32位整数到两个连续的16位寄存器 value = int.from_bytes(struct.pack('>i', value), 'big') response = self.modbus_client.write_registers(address - 1, [value >> 16, value & 0xFFFF], slave=2) if response.isError(): raise RuntimeError(f"写入错误: {response}") else: # print(f"写入: 地址 {address - 1}, 值 {value}, 数据类型 {datatype}") self.logger.info(msg=f"写入: 地址 {address - 1}, 值 {value}, 数据类型 {datatype}") def read(self, address, count, datatype='UNS16', unit=2): """当GPO有输出时,会读取不到寄存器的数据""" address -= 1 # 调整为0-based地址 response = self.modbus_client.read_holding_registers(address, slave=2, count=count) if response.isError(): print(f"读取: {response}") else: if datatype == 'UNS16': data = hex(response.registers[0]) else: data = struct.unpack('>i', struct.pack('>HH', *response.registers))[0] self.logger.info(f"读取: 地址 {address + 1}, 数据 {data}, 数据类型 {datatype}") return data def shutdown(self): # 发送失能指令 self.write(61443, 6, 'UNS16') def switch_on(self): # 发送开启指令 self.write(61443, 7, 'UNS16') def enable_operation(self): # 发送启动操作指令 self.write(61443, 15, 'UNS16') def disable_voltage(self): # 发送禁止电压指令 self.write(61443, 9, 'UNS16') def quick_stop(self): # 发送快速停止指令 self.write(61443, 10, 'UNS16') def disable_operation(self): # 发送禁止操作指令 self.write(61443, 5, 'UNS16') def fault_reset(self): # 发送错误重置指令 self.write(61443, 128, 'UNS16') def set_position_mode(self): self.write(61450, 1, 'UNS16') def set_velocity_mode(self): # 发送设置速度模式指令 self.write(61450, 3, 'UNS16') def set_homing_mode(self): # 发送回零模式指令 self.write(61450, 6, 'UNS16') def set_multi_mode(self): # 发送多段运行模式指令 self.write(61450, 62, 'UNS16') def set_segment(self, segment_number, loop=True): # 发送运行模式指令 self.write(1072, 255, 'UNS16') # 运行段数设置 self.write(1073, 0, 'UNS16') # 进入段数设置模式 self.write(1075, segment_number, 'UNS32') self.write(1073, 1, 'UNS16') # 进入循环方式设置 loop_var = 1 if loop else 0 self.write(1075, loop_var, 'UNS32') self.write(1073, 4, 'UNS16') # 进入重新起始段设置 self.write(1075, 1, 'UNS32') def set_segment_parameters(self, segment_number, position, velocity, acceleration, wait_time, target_value, mode=1, search_mode=2 ): """ 设置多段运行模式的运行参数,包括目标位置、速度、加速度、等待时间和搜索目标值。 :param search_mode: :param mode: :param segment_number: 段号 :param position: 目标位置 :param velocity: 目标速度(单位:rpm) :param acceleration: 加速度(单位:rpm/s) :param wait_time: 等待时间(单位:毫秒) :param target_value: 搜索目标值 """ # 进入第n段设置 self.write(1072, segment_number, 'UNS16') # 进入段号设置模式 # 进入运行模式设置 self.write(1073, 0, 'UNS16') self.write(1075, mode, 'UNS32') # 设置目标值位置 self.write(1073, 1, 'UNS16') # 目标位置设置模式 self.write(1075, position, 'UNS32') # 设置目标搜索模式 self.write(1073, 2, 'UNS16') self.write(1075, search_mode, 'UNS32') # 设置搜索目标值 self.write(1073, 3, 'UNS16') # 搜索目标值设置模式 self.write(1075, target_value, 'UNS32') # 设置等待时间 self.write(1073, 4, 'UNS16') # 等待时间设置模式 self.write(1075, wait_time, 'UNS32') velocity = int(self.resolution * velocity / 60) # 设置目标速度 self.write(1073, 6, 'UNS16') # 目标速度设置模式 self.write(1075, velocity, 'UNS32') acceleration = int(self.resolution * acceleration / 60) # 设置加速度 self.write(1073, 7, 'UNS16') # 加速度设置模式 self.write(1075, acceleration, 'UNS32') def set_start_segment(self): self.switch_on() self.write(61443, 31) def set_homing_strategy(self): # 设置回零方式 self.write(61552, 4, 'INTEGER8') def set_homing_velocity(self, speed): velocity = int(self.resolution * speed / 60) self.write(61554, velocity, 'UNS32') self.write(61556, velocity, 'UNS32') def homing(self): self.set_homing_mode() self.set_homing_velocity(15) self.set_homing_strategy() self.shutdown() self.switch_on() self.write(61443, 31) try: finished_flag = True while finished_flag: flag = self.read(61444, 1) if flag == '0x1637': finished_flag = False self.write(61443, 6) self.set_velocity_mode() else: continue finally: print('回零完成') def set_absolute_position(self): self.write(61443, 63, 'UNS16') def set_relative_position(self): self.write(61443, 127, 'UNS16') def set_target_position(self, position): address = 61488 self.write(address, position, 'INTEGER32') def set_velocity(self, velocity): """设置速度值。""" address = 61659 self.write(address, velocity, 'INTEGER32') def get_current_position(self): current_position = self.read(61457, 2, 'INTEGER32') return current_position def normalize_mechanical_angle(self, position): position = position if position >= 0 else position + 2147483648 angle = float((position % self.resolution) / self.resolution * 360) return angle def get_status(self): status_code = self.read(4, 1) return status_code def get_velocity(self): return self.read(61470, 2, 'INTEGER32') def get_mode(self): mode_code = self.read(61451, 2, 'INTEGER8') modes_dict = { 0: "空模式", 1: "位置模式 (PPM)", 3: "速度模式 (PVM)", 4: "力矩模式 (TPM)", 6: "回零模式 (HOME)", 8: "周期同步位置模式 (CSP)", 9: "周期同步速度模式 (CSV)", 11: "力控模式", 22: "模拟量控速度模式", 23: "模拟量控力矩模式", 31: "脉冲控位置模式", 42: "PWM控速度", 60: "往复位置模式", 61: "参考信号", 62: "多段运行模式" } return modes_dict[mode_code] def close(self): """关闭Modbus连接。""" self.modbus_client.close() if __name__ == "__main__": client = MotorClient() client.setup_logger("motor.log") # 必须在程序初始化时调用 a = client.set_position_mode() print(a)