MotorControl.py 9.0 KB


  1. from pymodbus.client import ModbusTcpClient
  2. import struct
  3. import logging
  4. class MotorClient:
  5. def __init__(self):
  6. """初始化Modbus RTU客户端。"""
  7. self.resolution = 2097152 # 设置分辨率
  8. self.pole_pairs = 8 # 电机极对数
  9. self.modbus_client = ModbusTcpClient(host="192.168.211.7", port=8899, timeout=3)
  10. # 尝试连接Modbus设备
  11. if not self.modbus_client.connect():
  12. raise ConnectionError("无法连接到Modbus RTU设备。")
  13. self.logger = logging.getLogger('motor')
  14. self.setup_logger('motor.log')
  15. def setup_logger(self, log_filename):
  16. """设置日志记录器。"""
  17. # 移除basicConfig,改为直接配置motor记录器
  18. self.logger.setLevel(logging.DEBUG)
  19. handler = logging.FileHandler(log_filename, encoding='utf-8')
  20. handler.setFormatter(logging.Formatter(
  21. '%(asctime)s - %(message)s',
  22. datefmt='%Y-%m-%d %H:%M:%S'
  23. ))
  24. self.logger.addHandler(handler) # 添加文件处理器
  25. self.logger.propagate = False # 防止传播到根记录器
  26. def write(self, address, value, datatype='UNS16', unit=2):
  27. """发送消息到Modbus设备。"""
  28. # 根据数据类型发送消息
  29. if datatype == 'UNS16':
  30. # 写入单个16位寄存器
  31. response = self.modbus_client.write_register(address - 1, value, slave=2)
  32. # 获取原始报文字节
  33. # self.logger.debug(f"发送: {response}")
  34. else:
  35. # 写入32位整数到两个连续的16位寄存器
  36. value = int.from_bytes(struct.pack('>i', value), 'big')
  37. response = self.modbus_client.write_registers(address - 1, [value >> 16, value & 0xFFFF], slave=2)
  38. if response.isError():
  39. raise RuntimeError(f"写入错误: {response}")
  40. else:
  41. # print(f"写入: 地址 {address - 1}, 值 {value}, 数据类型 {datatype}")
  42. self.logger.info(msg=f"写入: 地址 {address - 1}, 值 {value}, 数据类型 {datatype}")
  43. def read(self, address, count, datatype='UNS16', unit=2):
  44. """当GPO有输出时,会读取不到寄存器的数据"""
  45. address -= 1 # 调整为0-based地址
  46. response = self.modbus_client.read_holding_registers(address, slave=2, count=count)
  47. if response.isError():
  48. print(f"读取: {response}")
  49. else:
  50. if datatype == 'UNS16':
  51. data = hex(response.registers[0])
  52. else:
  53. data = struct.unpack('>i', struct.pack('>HH', *response.registers))[0]
  54. self.logger.info(f"读取: 地址 {address + 1}, 数据 {data}, 数据类型 {datatype}")
  55. return data
  56. def shutdown(self):
  57. # 发送失能指令
  58. self.write(61443, 6, 'UNS16')
  59. def switch_on(self):
  60. # 发送开启指令
  61. self.write(61443, 7, 'UNS16')
  62. def enable_operation(self):
  63. # 发送启动操作指令
  64. self.write(61443, 15, 'UNS16')
  65. def disable_voltage(self):
  66. # 发送禁止电压指令
  67. self.write(61443, 9, 'UNS16')
  68. def quick_stop(self):
  69. # 发送快速停止指令
  70. self.write(61443, 10, 'UNS16')
  71. def disable_operation(self):
  72. # 发送禁止操作指令
  73. self.write(61443, 5, 'UNS16')
  74. def fault_reset(self):
  75. # 发送错误重置指令
  76. self.write(61443, 128, 'UNS16')
  77. def set_position_mode(self):
  78. self.write(61450, 1, 'UNS16')
  79. def set_velocity_mode(self):
  80. # 发送设置速度模式指令
  81. self.write(61450, 3, 'UNS16')
  82. def set_homing_mode(self):
  83. # 发送回零模式指令
  84. self.write(61450, 6, 'UNS16')
  85. def set_multi_mode(self):
  86. # 发送多段运行模式指令
  87. self.write(61450, 62, 'UNS16')
  88. def set_segment(self, segment_number, loop=True):
  89. # 发送运行模式指令
  90. self.write(1072, 255, 'UNS16')
  91. # 运行段数设置
  92. self.write(1073, 0, 'UNS16') # 进入段数设置模式
  93. self.write(1075, segment_number, 'UNS32')
  94. self.write(1073, 1, 'UNS16') # 进入循环方式设置
  95. loop_var = 1 if loop else 0
  96. self.write(1075, loop_var, 'UNS32')
  97. self.write(1073, 4, 'UNS16') # 进入重新起始段设置
  98. self.write(1075, 1, 'UNS32')
  99. def set_segment_parameters(self, segment_number, position, velocity, acceleration, wait_time,
  100. target_value, mode=1, search_mode=2
  101. ):
  102. """
  103. 设置多段运行模式的运行参数,包括目标位置、速度、加速度、等待时间和搜索目标值。
  104. :param search_mode:
  105. :param mode:
  106. :param segment_number: 段号
  107. :param position: 目标位置
  108. :param velocity: 目标速度(单位:rpm)
  109. :param acceleration: 加速度(单位:rpm/s)
  110. :param wait_time: 等待时间(单位:毫秒)
  111. :param target_value: 搜索目标值
  112. """
  113. # 进入第n段设置
  114. self.write(1072, segment_number, 'UNS16') # 进入段号设置模式
  115. # 进入运行模式设置
  116. self.write(1073, 0, 'UNS16')
  117. self.write(1075, mode, 'UNS32')
  118. # 设置目标值位置
  119. self.write(1073, 1, 'UNS16') # 目标位置设置模式
  120. self.write(1075, position, 'UNS32')
  121. # 设置目标搜索模式
  122. self.write(1073, 2, 'UNS16')
  123. self.write(1075, search_mode, 'UNS32')
  124. # 设置搜索目标值
  125. self.write(1073, 3, 'UNS16') # 搜索目标值设置模式
  126. self.write(1075, target_value, 'UNS32')
  127. # 设置等待时间
  128. self.write(1073, 4, 'UNS16') # 等待时间设置模式
  129. self.write(1075, wait_time, 'UNS32')
  130. velocity = int(self.resolution * velocity / 60)
  131. # 设置目标速度
  132. self.write(1073, 6, 'UNS16') # 目标速度设置模式
  133. self.write(1075, velocity, 'UNS32')
  134. acceleration = int(self.resolution * acceleration / 60)
  135. # 设置加速度
  136. self.write(1073, 7, 'UNS16') # 加速度设置模式
  137. self.write(1075, acceleration, 'UNS32')
  138. def set_start_segment(self):
  139. self.switch_on()
  140. self.write(61443, 31)
  141. def set_homing_strategy(self):
  142. # 设置回零方式
  143. self.write(61552, 4, 'INTEGER8')
  144. def set_homing_velocity(self, speed):
  145. velocity = int(self.resolution * speed / 60)
  146. self.write(61554, velocity, 'UNS32')
  147. self.write(61556, velocity, 'UNS32')
  148. def homing(self):
  149. self.set_homing_mode()
  150. self.set_homing_velocity(15)
  151. self.set_homing_strategy()
  152. self.shutdown()
  153. self.switch_on()
  154. self.write(61443, 31)
  155. try:
  156. finished_flag = True
  157. while finished_flag:
  158. flag = self.read(61444, 1)
  159. if flag == '0x1637':
  160. finished_flag = False
  161. self.write(61443, 6)
  162. self.set_velocity_mode()
  163. else:
  164. continue
  165. finally:
  166. print('回零完成')
  167. def set_absolute_position(self):
  168. self.write(61443, 63, 'UNS16')
  169. def set_relative_position(self):
  170. self.write(61443, 127, 'UNS16')
  171. def set_target_position(self, position):
  172. address = 61488
  173. self.write(address, position, 'INTEGER32')
  174. def set_velocity(self, velocity):
  175. """设置速度值。"""
  176. address = 61659
  177. self.write(address, velocity, 'INTEGER32')
  178. def get_current_position(self):
  179. current_position = self.read(61457, 2, 'INTEGER32')
  180. return current_position
  181. def normalize_mechanical_angle(self, position):
  182. position = position if position >= 0 else position + 2147483648
  183. angle = float((position % self.resolution) / self.resolution * 360)
  184. return angle
  185. def get_status(self):
  186. status_code = self.read(4, 1)
  187. return status_code
  188. def get_velocity(self):
  189. return self.read(61470, 2, 'INTEGER32')
  190. def get_mode(self):
  191. mode_code = self.read(61451, 2, 'INTEGER8')
  192. modes_dict = {
  193. 0: "空模式",
  194. 1: "位置模式 (PPM)",
  195. 3: "速度模式 (PVM)",
  196. 4: "力矩模式 (TPM)",
  197. 6: "回零模式 (HOME)",
  198. 8: "周期同步位置模式 (CSP)",
  199. 9: "周期同步速度模式 (CSV)",
  200. 11: "力控模式",
  201. 22: "模拟量控速度模式",
  202. 23: "模拟量控力矩模式",
  203. 31: "脉冲控位置模式",
  204. 42: "PWM控速度",
  205. 60: "往复位置模式",
  206. 61: "参考信号",
  207. 62: "多段运行模式"
  208. }
  209. return modes_dict[mode_code]
  210. def close(self):
  211. """关闭Modbus连接。"""
  212. self.modbus_client.close()
  213. if __name__ == "__main__":
  214. client = MotorClient()
  215. client.setup_logger("motor.log") # 必须在程序初始化时调用
  216. a = client.set_position_mode()
  217. print(a)