123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- import serial
- import time
- import struct
- import logging
- def calculate_bcc(data_bytes):
- checksum = 0
- for byte in data_bytes:
- checksum ^= byte
- return checksum
- def convert_2_to_1_byte(high_byte, low_byte):
- return ((high_byte & 0x7F) << 4) | (low_byte & 0x7F)
- def convert_and_interpret(data, data_type):
- # 将字节数据转换为32位无符号整数
- udata = data[0]
- for i in range(1, 5):
- udata <<= 7
- udata |= data[i] & 0x7F
- # 根据所需的类型解释数据
- if data_type == 'float':
- return struct.unpack('f', udata.to_bytes(4, byteorder='little'))[0]
- elif data_type == 'int32':
- return struct.unpack('i', udata.to_bytes(4, byteorder='little'))[0]
- elif data_type == 'uint32':
- return udata
- else:
- raise ValueError("Invalid data type specified.")
- def is_bit_zero(data, bit):
- return (data & (1 << bit)) == 0
- def calculate_spd():
- """
- 根据蜗轮蜗杆的输出速度,步进电机的步距角,细分值(micro_step)和减速比反向计算spd值。
- """
- # 使用之前的例子中的参数来计算spd的值
- output_speed = 15 / 360 # 之前计算出的蜗轮蜗杆的输出速度,单位为周/秒
- step_angle = 1.8 # 电机的步距角
- micro_step = 1 / 32 # 电机的细分值
- gear_ratio = 31 # 减速比
- motor_speed = output_speed * gear_ratio
- spd = motor_speed * (360 / step_angle / micro_step)
- return spd
- def get_position(angle):
- if angle > 6 or angle < -6:
- raise ValueError('角度超出调整范围')
- else:
- position = 31 * angle * 32 / 1.8
- return int(position)
- class MotorControlVince:
- def __init__(self, port='COM4', baudrate=9600):
- self.logger = logging.getLogger(
- 'pitch_motor'
- )
- self.ser = serial.Serial(port, baudrate, timeout=1)
- device_number, speed, position, condition = self.send_command('1 sts')
- self.current_position = position
- self.current_angle = self.current_position * 1.8 / (32 * 31)
- def update_angle(self):
- device_number, speed, position, condition = self.send_command('1 sts')
- self.current_position = position
- self.current_angle = self.current_position * 1.8 / (32 * 31)
- def validate_data_stream(self, data_stream):
- if data_stream[0] != 0xFF or data_stream[-1] != 0xFE:
- self.logger.error("数据流格式错误")
- raise ValueError(f"数据流格式错误,数据流为{data_stream}")
- bcc_received_high, bcc_received_low = data_stream[-3], data_stream[-2]
- bcc_received = convert_2_to_1_byte(bcc_received_high, bcc_received_low)
- bcc_calculated = calculate_bcc(data_stream[1:-3])
- if bcc_received != bcc_calculated:
- self.logger.error("校验码不匹配")
- raise ValueError("校验码不匹配")
- device_number = data_stream[1]
- feedback_number = data_stream[2]
- data_content = data_stream[3:-3]
- return device_number, feedback_number, data_content
- def send_command(self, command):
- for _ in range(3): # 重试3次
- try:
- self.ser.reset_input_buffer()
- self.ser.write((command + "\n").encode('utf-8'))
- self.logger.info(msg=f"发送: '{command}'")
- time.sleep(0.1)
- response = self.process_feedback() # 处理反馈
- if response: # 如果收到预期的回复,跳出循环
- return response
- except (ValueError, serial.SerialException, IOError) as e:
- self.logger.error(f"发送命令时发生错误: {e}")
- time.sleep(0.5) # 等待一段时间后重试
- self.logger.error("重试次数达到上限,未能成功发送命令")
- raise Exception("Command sending failed after multiple attempts")
- def process_feedback(self):
- if self.ser.in_waiting:
- reply = self.ser.read(self.ser.in_waiting)
- reply_bytes = list(reply)
- device_number, feedback_number, data_content = self.validate_data_stream(reply_bytes)
- if feedback_number == 3:
- string_content = bytes(data_content).decode('utf-8')
- self.logger.info(f"{string_content}")
- return f"字符串数据内容: {string_content}"
- else:
- speed = data_content[0:5]
- speed = convert_and_interpret(speed, 'float')
- position = data_content[5:10]
- position = convert_and_interpret(position, 'int32')
- condition = data_content[10:15]
- condition = convert_and_interpret(condition, 'uint32')
- self.logger.info(f"设备号:{device_number} 速度:{speed} 位置:{position} "
- f"角度:{position * 1.8 / (32 * 31):.1f}° 状态码:{condition}")
- return device_number, speed, position, condition
- self.logger.error("没有回复")
- raise ValueError('No Response')
- def find_zero_position(self):
- self.send_command('1 ena')
- time.sleep(2)
- self.logger.info('>>>>开始回零<<<<')
- self.move_to_limit(1, 5000, 2) # 移动到正极限
- self.logger.info(">>>>到达正极限<<<<")
- # 移动到正极限
- psr_position = self.current_position
- self.move_to_limit(1, -5000, 3) # 移动到负极限
- msr_position = self.current_position
- self.logger.info(">>>>到达负极限<<<<")
- zero_position = (psr_position + msr_position) / 2
- print(f"零点位置: {zero_position}")
- self.send_command(f'1 pos {zero_position}')
- self.wait_for_target(8)
- self.send_command('1 org')
- print('零点设置成功')
- self.logger.info(">>>>回零结束<<<<")
- return zero_position
- def move_to_limit(self, device_number, speed, condition_bit):
- self.send_command(f'{device_number} mov spd={speed}')
- while True:
- device_number, speed, position, condition = self.send_command('1 sts')
- if not is_bit_zero(condition, condition_bit):
- self.current_position = position
- self.update_angle()
- break
- print(f"到达极限,当前位置: {position}")
- def move_to_angle(self, angle):
- position = get_position(angle)
- self.reach_target(1, position, 8)
- def wait_for_target(self, condition_bit):
- while True:
- device_number, speed, position, condition = self.send_command('1 sts')
- if not is_bit_zero(condition, condition_bit):
- print("到达目标位置")
- break
- def reach_target(self, device_number, position, condition_bit):
- self.send_command(f'{device_number} pos {position}')
- while True:
- device_number, speed, position, condition = self.send_command('1 sts')
- if not is_bit_zero(condition, condition_bit):
- self.current_position = position
- self.update_angle()
- break
- print(f"到达位置: {position}")
- def increase_angle(self): # 俯仰角增加一度
- self.move_to_angle(self.current_angle + 1)
- def decrease_angle(self): # 俯仰角减小一度
- self.move_to_angle(self.current_angle - 1)
- def close(self):
- self.ser.close()
- if __name__ == "__main__":
- motor_control = MotorControlVince()
- motor_control.move_to_angle(0)
|