123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266 |
- from pymodbus.client import ModbusTcpClient
- import struct
- import logging
- class MotorClient:
- def __init__(self):
- """初始化Modbus RTU客户端。"""
- self.resolution = 2097152 # 设置分辨率
- self.pole_pairs = 8 # 电机极对数
- self.modbus_client = ModbusTcpClient(host="192.168.211.7", port=8899, timeout=3)
- # 尝试连接Modbus设备
- if not self.modbus_client.connect():
- raise ConnectionError("无法连接到Modbus RTU设备。")
- self.logger = logging.getLogger('motor')
- self.setup_logger('motor.log')
- def setup_logger(self, log_filename):
- """设置日志记录器。"""
- # 移除basicConfig,改为直接配置motor记录器
- self.logger.setLevel(logging.DEBUG)
- handler = logging.FileHandler(log_filename, encoding='utf-8')
- handler.setFormatter(logging.Formatter(
- '%(asctime)s - %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S'
- ))
- self.logger.addHandler(handler) # 添加文件处理器
- self.logger.propagate = False # 防止传播到根记录器
- def write(self, address, value, datatype='UNS16', unit=2):
- """发送消息到Modbus设备。"""
- # 根据数据类型发送消息
- if datatype == 'UNS16':
- # 写入单个16位寄存器
- response = self.modbus_client.write_register(address - 1, value, slave=2)
- # 获取原始报文字节
- # self.logger.debug(f"发送: {response}")
- else:
- # 写入32位整数到两个连续的16位寄存器
- value = int.from_bytes(struct.pack('>i', value), 'big')
- response = self.modbus_client.write_registers(address - 1, [value >> 16, value & 0xFFFF], slave=2)
- if response.isError():
- raise RuntimeError(f"写入错误: {response}")
- else:
- # print(f"写入: 地址 {address - 1}, 值 {value}, 数据类型 {datatype}")
- self.logger.info(msg=f"写入: 地址 {address - 1}, 值 {value}, 数据类型 {datatype}")
- def read(self, address, count, datatype='UNS16', unit=2):
- """当GPO有输出时,会读取不到寄存器的数据"""
- address -= 1 # 调整为0-based地址
- response = self.modbus_client.read_holding_registers(address, slave=2, count=count)
- if response.isError():
- print(f"读取: {response}")
- else:
- if datatype == 'UNS16':
- data = hex(response.registers[0])
- else:
- data = struct.unpack('>i', struct.pack('>HH', *response.registers))[0]
- self.logger.info(f"读取: 地址 {address + 1}, 数据 {data}, 数据类型 {datatype}")
- return data
- def shutdown(self):
- # 发送失能指令
- self.write(61443, 6, 'UNS16')
- def switch_on(self):
- # 发送开启指令
- self.write(61443, 7, 'UNS16')
- def enable_operation(self):
- # 发送启动操作指令
- self.write(61443, 15, 'UNS16')
- def disable_voltage(self):
- # 发送禁止电压指令
- self.write(61443, 9, 'UNS16')
- def quick_stop(self):
- # 发送快速停止指令
- self.write(61443, 10, 'UNS16')
- def disable_operation(self):
- # 发送禁止操作指令
- self.write(61443, 5, 'UNS16')
- def fault_reset(self):
- # 发送错误重置指令
- self.write(61443, 128, 'UNS16')
- def set_position_mode(self):
- self.write(61450, 1, 'UNS16')
- def set_velocity_mode(self):
- # 发送设置速度模式指令
- self.write(61450, 3, 'UNS16')
- def set_homing_mode(self):
- # 发送回零模式指令
- self.write(61450, 6, 'UNS16')
- def set_multi_mode(self):
- # 发送多段运行模式指令
- self.write(61450, 62, 'UNS16')
- def set_segment(self, segment_number, loop=True):
- # 发送运行模式指令
- self.write(1072, 255, 'UNS16')
- # 运行段数设置
- self.write(1073, 0, 'UNS16') # 进入段数设置模式
- self.write(1075, segment_number, 'UNS32')
- self.write(1073, 1, 'UNS16') # 进入循环方式设置
- loop_var = 1 if loop else 0
- self.write(1075, loop_var, 'UNS32')
- self.write(1073, 4, 'UNS16') # 进入重新起始段设置
- self.write(1075, 1, 'UNS32')
- def set_segment_parameters(self, segment_number, position, velocity, acceleration, wait_time,
- target_value, mode=1, search_mode=2
- ):
- """
- 设置多段运行模式的运行参数,包括目标位置、速度、加速度、等待时间和搜索目标值。
- :param search_mode:
- :param mode:
- :param segment_number: 段号
- :param position: 目标位置
- :param velocity: 目标速度(单位:rpm)
- :param acceleration: 加速度(单位:rpm/s)
- :param wait_time: 等待时间(单位:毫秒)
- :param target_value: 搜索目标值
- """
- # 进入第n段设置
- self.write(1072, segment_number, 'UNS16') # 进入段号设置模式
- # 进入运行模式设置
- self.write(1073, 0, 'UNS16')
- self.write(1075, mode, 'UNS32')
- # 设置目标值位置
- self.write(1073, 1, 'UNS16') # 目标位置设置模式
- self.write(1075, position, 'UNS32')
- # 设置目标搜索模式
- self.write(1073, 2, 'UNS16')
- self.write(1075, search_mode, 'UNS32')
- # 设置搜索目标值
- self.write(1073, 3, 'UNS16') # 搜索目标值设置模式
- self.write(1075, target_value, 'UNS32')
- # 设置等待时间
- self.write(1073, 4, 'UNS16') # 等待时间设置模式
- self.write(1075, wait_time, 'UNS32')
- velocity = int(self.resolution * velocity / 60)
- # 设置目标速度
- self.write(1073, 6, 'UNS16') # 目标速度设置模式
- self.write(1075, velocity, 'UNS32')
- acceleration = int(self.resolution * acceleration / 60)
- # 设置加速度
- self.write(1073, 7, 'UNS16') # 加速度设置模式
- self.write(1075, acceleration, 'UNS32')
- def set_start_segment(self):
- self.switch_on()
- self.write(61443, 31)
- def set_homing_strategy(self):
- # 设置回零方式
- self.write(61552, 4, 'INTEGER8')
- def set_homing_velocity(self, speed):
- velocity = int(self.resolution * speed / 60)
- self.write(61554, velocity, 'UNS32')
- self.write(61556, velocity, 'UNS32')
- def homing(self):
- self.set_homing_mode()
- self.set_homing_velocity(15)
- self.set_homing_strategy()
- self.shutdown()
- self.switch_on()
- self.write(61443, 31)
- try:
- finished_flag = True
- while finished_flag:
- flag = self.read(61444, 1)
- if flag == '0x1637':
- finished_flag = False
- self.write(61443, 6)
- self.set_velocity_mode()
- else:
- continue
- finally:
- print('回零完成')
- def set_absolute_position(self):
- self.write(61443, 63, 'UNS16')
- def set_relative_position(self):
- self.write(61443, 127, 'UNS16')
- def set_target_position(self, position):
- address = 61488
- self.write(address, position, 'INTEGER32')
- def set_velocity(self, velocity):
- """设置速度值。"""
- address = 61659
- self.write(address, velocity, 'INTEGER32')
- def get_current_position(self):
- current_position = self.read(61457, 2, 'INTEGER32')
- return current_position
- def normalize_mechanical_angle(self, position):
- position = position if position >= 0 else position + 2147483648
- angle = float((position % self.resolution) / self.resolution * 360)
- return angle
- def get_status(self):
- status_code = self.read(4, 1)
- return status_code
- def get_velocity(self):
- return self.read(61470, 2, 'INTEGER32')
- def get_mode(self):
- mode_code = self.read(61451, 2, 'INTEGER8')
- modes_dict = {
- 0: "空模式",
- 1: "位置模式 (PPM)",
- 3: "速度模式 (PVM)",
- 4: "力矩模式 (TPM)",
- 6: "回零模式 (HOME)",
- 8: "周期同步位置模式 (CSP)",
- 9: "周期同步速度模式 (CSV)",
- 11: "力控模式",
- 22: "模拟量控速度模式",
- 23: "模拟量控力矩模式",
- 31: "脉冲控位置模式",
- 42: "PWM控速度",
- 60: "往复位置模式",
- 61: "参考信号",
- 62: "多段运行模式"
- }
- return modes_dict[mode_code]
- def close(self):
- """关闭Modbus连接。"""
- self.modbus_client.close()
- if __name__ == "__main__":
- client = MotorClient()
- client.setup_logger("motor.log") # 必须在程序初始化时调用
- a = client.set_position_mode()
- print(a)
|