PitchControl.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. import serial
  2. import time
  3. import struct
  4. import logging
  5. def calculate_bcc(data_bytes):
  6. checksum = 0
  7. for byte in data_bytes:
  8. checksum ^= byte
  9. return checksum
  10. def convert_2_to_1_byte(high_byte, low_byte):
  11. return ((high_byte & 0x7F) << 4) | (low_byte & 0x7F)
  12. def convert_and_interpret(data, data_type):
  13. # 将字节数据转换为32位无符号整数
  14. udata = data[0]
  15. for i in range(1, 5):
  16. udata <<= 7
  17. udata |= data[i] & 0x7F
  18. # 根据所需的类型解释数据
  19. if data_type == 'float':
  20. return struct.unpack('f', udata.to_bytes(4, byteorder='little'))[0]
  21. elif data_type == 'int32':
  22. return struct.unpack('i', udata.to_bytes(4, byteorder='little'))[0]
  23. elif data_type == 'uint32':
  24. return udata
  25. else:
  26. raise ValueError("Invalid data type specified.")
  27. def is_bit_zero(data, bit):
  28. return (data & (1 << bit)) == 0
  29. def calculate_spd():
  30. """
  31. 根据蜗轮蜗杆的输出速度,步进电机的步距角,细分值(micro_step)和减速比反向计算spd值。
  32. """
  33. # 使用之前的例子中的参数来计算spd的值
  34. output_speed = 15 / 360 # 之前计算出的蜗轮蜗杆的输出速度,单位为周/秒
  35. step_angle = 1.8 # 电机的步距角
  36. micro_step = 1 / 32 # 电机的细分值
  37. gear_ratio = 31 # 减速比
  38. motor_speed = output_speed * gear_ratio
  39. spd = motor_speed * (360 / step_angle / micro_step)
  40. return spd
  41. def get_position(angle):
  42. if angle > 6 or angle < -6:
  43. raise ValueError('角度超出调整范围')
  44. else:
  45. position = 31 * angle * 32 / 1.8
  46. return int(position)
  47. class MotorControlVince:
  48. def __init__(self, port='COM4', baudrate=9600):
  49. self.logger = logging.getLogger(
  50. 'pitch_motor'
  51. )
  52. self.ser = serial.Serial(port, baudrate, timeout=1)
  53. device_number, speed, position, condition = self.send_command('1 sts')
  54. self.current_position = position
  55. self.current_angle = self.current_position * 1.8 / (32 * 31)
  56. def update_angle(self):
  57. device_number, speed, position, condition = self.send_command('1 sts')
  58. self.current_position = position
  59. self.current_angle = self.current_position * 1.8 / (32 * 31)
  60. def validate_data_stream(self, data_stream):
  61. if data_stream[0] != 0xFF or data_stream[-1] != 0xFE:
  62. self.logger.error("数据流格式错误")
  63. raise ValueError(f"数据流格式错误,数据流为{data_stream}")
  64. bcc_received_high, bcc_received_low = data_stream[-3], data_stream[-2]
  65. bcc_received = convert_2_to_1_byte(bcc_received_high, bcc_received_low)
  66. bcc_calculated = calculate_bcc(data_stream[1:-3])
  67. if bcc_received != bcc_calculated:
  68. self.logger.error("校验码不匹配")
  69. raise ValueError("校验码不匹配")
  70. device_number = data_stream[1]
  71. feedback_number = data_stream[2]
  72. data_content = data_stream[3:-3]
  73. return device_number, feedback_number, data_content
  74. def send_command(self, command):
  75. for _ in range(3): # 重试3次
  76. try:
  77. self.ser.reset_input_buffer()
  78. self.ser.write((command + "\n").encode('utf-8'))
  79. self.logger.info(msg=f"发送: '{command}'")
  80. time.sleep(0.1)
  81. response = self.process_feedback() # 处理反馈
  82. if response: # 如果收到预期的回复,跳出循环
  83. return response
  84. except (ValueError, serial.SerialException, IOError) as e:
  85. self.logger.error(f"发送命令时发生错误: {e}")
  86. time.sleep(0.5) # 等待一段时间后重试
  87. self.logger.error("重试次数达到上限,未能成功发送命令")
  88. raise Exception("Command sending failed after multiple attempts")
  89. def process_feedback(self):
  90. if self.ser.in_waiting:
  91. reply = self.ser.read(self.ser.in_waiting)
  92. reply_bytes = list(reply)
  93. device_number, feedback_number, data_content = self.validate_data_stream(reply_bytes)
  94. if feedback_number == 3:
  95. string_content = bytes(data_content).decode('utf-8')
  96. self.logger.info(f"{string_content}")
  97. return f"字符串数据内容: {string_content}"
  98. else:
  99. speed = data_content[0:5]
  100. speed = convert_and_interpret(speed, 'float')
  101. position = data_content[5:10]
  102. position = convert_and_interpret(position, 'int32')
  103. condition = data_content[10:15]
  104. condition = convert_and_interpret(condition, 'uint32')
  105. self.logger.info(f"设备号:{device_number} 速度:{speed} 位置:{position} "
  106. f"角度:{position * 1.8 / (32 * 31):.1f}° 状态码:{condition}")
  107. return device_number, speed, position, condition
  108. self.logger.error("没有回复")
  109. raise ValueError('No Response')
  110. def find_zero_position(self):
  111. self.send_command('1 ena')
  112. time.sleep(2)
  113. self.logger.info('>>>>开始回零<<<<')
  114. self.move_to_limit(1, 5000, 2) # 移动到正极限
  115. self.logger.info(">>>>到达正极限<<<<")
  116. # 移动到正极限
  117. psr_position = self.current_position
  118. self.move_to_limit(1, -5000, 3) # 移动到负极限
  119. msr_position = self.current_position
  120. self.logger.info(">>>>到达负极限<<<<")
  121. zero_position = (psr_position + msr_position) / 2
  122. print(f"零点位置: {zero_position}")
  123. self.send_command(f'1 pos {zero_position}')
  124. self.wait_for_target(8)
  125. self.send_command('1 org')
  126. print('零点设置成功')
  127. self.logger.info(">>>>回零结束<<<<")
  128. return zero_position
  129. def move_to_limit(self, device_number, speed, condition_bit):
  130. self.send_command(f'{device_number} mov spd={speed}')
  131. while True:
  132. device_number, speed, position, condition = self.send_command('1 sts')
  133. if not is_bit_zero(condition, condition_bit):
  134. self.current_position = position
  135. self.update_angle()
  136. break
  137. print(f"到达极限,当前位置: {position}")
  138. def move_to_angle(self, angle):
  139. position = get_position(angle)
  140. self.reach_target(1, position, 8)
  141. def wait_for_target(self, condition_bit):
  142. while True:
  143. device_number, speed, position, condition = self.send_command('1 sts')
  144. if not is_bit_zero(condition, condition_bit):
  145. print("到达目标位置")
  146. break
  147. def reach_target(self, device_number, position, condition_bit):
  148. self.send_command(f'{device_number} pos {position}')
  149. while True:
  150. device_number, speed, position, condition = self.send_command('1 sts')
  151. if not is_bit_zero(condition, condition_bit):
  152. self.current_position = position
  153. self.update_angle()
  154. break
  155. print(f"到达位置: {position}")
  156. def increase_angle(self): # 俯仰角增加一度
  157. self.move_to_angle(self.current_angle + 1)
  158. def decrease_angle(self): # 俯仰角减小一度
  159. self.move_to_angle(self.current_angle - 1)
  160. def close(self):
  161. self.ser.close()
  162. if __name__ == "__main__":
  163. motor_control = MotorControlVince()
  164. motor_control.move_to_angle(0)