import serial import time import struct import logging def calculate_bcc(data_bytes): checksum = 0 for byte in data_bytes: checksum ^= byte return checksum def convert_2_to_1_byte(high_byte, low_byte): return ((high_byte & 0x7F) << 4) | (low_byte & 0x7F) def convert_and_interpret(data, data_type): # 将字节数据转换为32位无符号整数 udata = data[0] for i in range(1, 5): udata <<= 7 udata |= data[i] & 0x7F # 根据所需的类型解释数据 if data_type == 'float': return struct.unpack('f', udata.to_bytes(4, byteorder='little'))[0] elif data_type == 'int32': return struct.unpack('i', udata.to_bytes(4, byteorder='little'))[0] elif data_type == 'uint32': return udata else: raise ValueError("Invalid data type specified.") def is_bit_zero(data, bit): return (data & (1 << bit)) == 0 def calculate_spd(): """ 根据蜗轮蜗杆的输出速度,步进电机的步距角,细分值(micro_step)和减速比反向计算spd值。 """ # 使用之前的例子中的参数来计算spd的值 output_speed = 15 / 360 # 之前计算出的蜗轮蜗杆的输出速度,单位为周/秒 step_angle = 1.8 # 电机的步距角 micro_step = 1 / 32 # 电机的细分值 gear_ratio = 31 # 减速比 motor_speed = output_speed * gear_ratio spd = motor_speed * (360 / step_angle / micro_step) return spd def get_position(angle): if angle > 6 or angle < -6: raise ValueError('角度超出调整范围') else: position = 31 * angle * 32 / 1.8 return int(position) class MotorControlVince: def __init__(self, port='COM4', baudrate=9600): self.logger = logging.getLogger( 'pitch_motor' ) self.ser = serial.Serial(port, baudrate, timeout=1) device_number, speed, position, condition = self.send_command('1 sts') self.current_position = position self.current_angle = self.current_position * 1.8 / (32 * 31) def update_angle(self): device_number, speed, position, condition = self.send_command('1 sts') self.current_position = position self.current_angle = self.current_position * 1.8 / (32 * 31) def validate_data_stream(self, data_stream): if data_stream[0] != 0xFF or data_stream[-1] != 0xFE: self.logger.error("数据流格式错误") raise ValueError(f"数据流格式错误,数据流为{data_stream}") bcc_received_high, bcc_received_low = data_stream[-3], data_stream[-2] bcc_received = convert_2_to_1_byte(bcc_received_high, bcc_received_low) bcc_calculated = calculate_bcc(data_stream[1:-3]) if bcc_received != bcc_calculated: self.logger.error("校验码不匹配") raise ValueError("校验码不匹配") device_number = data_stream[1] feedback_number = data_stream[2] data_content = data_stream[3:-3] return device_number, feedback_number, data_content def send_command(self, command): for _ in range(3): # 重试3次 try: self.ser.reset_input_buffer() self.ser.write((command + "\n").encode('utf-8')) self.logger.info(msg=f"发送: '{command}'") time.sleep(0.1) response = self.process_feedback() # 处理反馈 if response: # 如果收到预期的回复,跳出循环 return response except (ValueError, serial.SerialException, IOError) as e: self.logger.error(f"发送命令时发生错误: {e}") time.sleep(0.5) # 等待一段时间后重试 self.logger.error("重试次数达到上限,未能成功发送命令") raise Exception("Command sending failed after multiple attempts") def process_feedback(self): if self.ser.in_waiting: reply = self.ser.read(self.ser.in_waiting) reply_bytes = list(reply) device_number, feedback_number, data_content = self.validate_data_stream(reply_bytes) if feedback_number == 3: string_content = bytes(data_content).decode('utf-8') self.logger.info(f"{string_content}") return f"字符串数据内容: {string_content}" else: speed = data_content[0:5] speed = convert_and_interpret(speed, 'float') position = data_content[5:10] position = convert_and_interpret(position, 'int32') condition = data_content[10:15] condition = convert_and_interpret(condition, 'uint32') self.logger.info(f"设备号:{device_number} 速度:{speed} 位置:{position} " f"角度:{position * 1.8 / (32 * 31):.1f}° 状态码:{condition}") return device_number, speed, position, condition self.logger.error("没有回复") raise ValueError('No Response') def find_zero_position(self): self.send_command('1 ena') time.sleep(2) self.logger.info('>>>>开始回零<<<<') self.move_to_limit(1, 5000, 2) # 移动到正极限 self.logger.info(">>>>到达正极限<<<<") # 移动到正极限 psr_position = self.current_position self.move_to_limit(1, -5000, 3) # 移动到负极限 msr_position = self.current_position self.logger.info(">>>>到达负极限<<<<") zero_position = (psr_position + msr_position) / 2 print(f"零点位置: {zero_position}") self.send_command(f'1 pos {zero_position}') self.wait_for_target(8) self.send_command('1 org') print('零点设置成功') self.logger.info(">>>>回零结束<<<<") return zero_position def move_to_limit(self, device_number, speed, condition_bit): self.send_command(f'{device_number} mov spd={speed}') while True: device_number, speed, position, condition = self.send_command('1 sts') if not is_bit_zero(condition, condition_bit): self.current_position = position self.update_angle() break print(f"到达极限,当前位置: {position}") def move_to_angle(self, angle): position = get_position(angle) self.reach_target(1, position, 8) def wait_for_target(self, condition_bit): while True: device_number, speed, position, condition = self.send_command('1 sts') if not is_bit_zero(condition, condition_bit): print("到达目标位置") break def reach_target(self, device_number, position, condition_bit): self.send_command(f'{device_number} pos {position}') while True: device_number, speed, position, condition = self.send_command('1 sts') if not is_bit_zero(condition, condition_bit): self.current_position = position self.update_angle() break print(f"到达位置: {position}") def increase_angle(self): # 俯仰角增加一度 self.move_to_angle(self.current_angle + 1) def decrease_angle(self): # 俯仰角减小一度 self.move_to_angle(self.current_angle - 1) def close(self): self.ser.close() if __name__ == "__main__": motor_control = MotorControlVince() motor_control.move_to_angle(0)