Source code for ev3_dc.vehicle

'''
LEGO Mindstorms EV3 direct commands - vehicle
'''

import struct
from typing import Tuple, Callable
from math import pi, cos, sin, radians, copysign, sqrt
from decimal import Decimal
from struct import unpack
from numbers import Number
from collections import namedtuple
from thread_task import Task, Repeated
from .ev3 import EV3
from .constants import (
    BLUETOOTH,
    WIFI,
    EV3_LARGE_MOTOR,
    EV3_MEDIUM_MOTOR,
    opInput_Device,
    opOutput_Step_Sync,
    opOutput_Step_Speed,
    opOutput_Start,
    opOutput_Stop,
    opOutput_Reset,
    opOutput_Clr_Count,
    opOutput_Test,
    opMove32_32,
    opMove32_F,
    opOr8,
    opJr_False,
    opJr_Eq32,
    opJr_Neq32,
    opJr_Lt32,
    opJr_Gt32,
    opJr_Lteq32,
    opJr_Gteq32,
    opJr_GteqF,
    opAdd32,
    opSub32,
    opMul32,
    opDivF,
    opMath,
    ABS,
    PORT_A,
    PORT_B,
    PORT_C,
    PORT_D,
    READY_RAW
)
from .functions import (
    LCX,
    GVX,
    LVX,
    port_motor_input
)
from .exceptions import MotorError, PortInUse


VehiclePosition = namedtuple('VehiclePosition', [
        'x',
        'y',
        'o'
])
MotorPositions = namedtuple('MotorPositions', [
        'left',
        'right'
])


class _Tracking:
    '''
    does position and orientation tracking of a vehicle
    '''
    def __init__(
            self,
            radius_wheel: Number,
            tread: Number,
            motor_pos: Tuple[int],
            *,
            polarity_left: int = 1,
            polarity_right: int = 1,
            callback=None
    ):
        '''
        Mandatory positional arguments

          radius_wheel
            radius of the wheels [m]
          tread:
            the vehicles tread [m]
          motor_pos
            current positions of left and right motor [degree]

        Optional keyword only arguments

          polarity_left
            polarity of left motor rotation, values: -1, 1 (default)
          polarity_right
            polarity of right motor rotation, values: -1, 1 (default)
        '''
        assert isinstance(radius_wheel, Number), \
            "radius_wheel needs to be a number"
        assert radius_wheel > 0, \
            "radius_wheel needs to be positive"

        assert isinstance(tread, Number), \
            "tread needs to be a number"
        assert tread > 0, \
            "tread needs to be positive"

        assert isinstance(polarity_left, int), \
            'polarity_left must be an integer'
        assert polarity_left in (-1, 1), \
            "polarity_left needs to be -1 or 1"
        assert isinstance(polarity_right, int), \
            'polarity_right must be an integer'
        assert polarity_right in (-1, 1), \
            "polarity_right needs to be -1 or 1"
        assert callback is None or isinstance(callback, Callable), \
            "callback must be a callable"

        super().__init__()

        self._radius_wheel = radius_wheel
        self._tread = tread
        self._polarity_left = polarity_left
        self._polarity_right = polarity_right

        self._motor_pos = motor_pos
        self._position = 0., 0.
        self._callback = callback


    @property
    def position(self) -> VehiclePosition:
        '''
        current vehicle position (as named tuple)

        x and x-coordinates are in meters,
        orientation is in degrees [-180 - 180]
        '''
        return VehiclePosition(
            self._position[0],
            self._position[1],
            (
                (
                    self._polarity_right * self._motor_pos[1] -
                    self._polarity_left * self._motor_pos[0]
                ) *
                self._radius_wheel / self._tread +
                180
            ) % 360 - 180
        )


    @property
    def motor_pos(self) -> MotorPositions:
        '''
        current position of the left and right motor [degree]
        '''
        return MotorPositions(*self._motor_pos)


    def update(self, motor_pos: Tuple[int]) -> None:
        '''
        update vehicle position and orientation
        '''
        if (
                motor_pos[0] == self._motor_pos[0] and
                motor_pos[1] == self._motor_pos[1]
        ):
            # no movement
            return

        steps = (
            self._polarity_left * (motor_pos[0] - self._motor_pos[0]),
            self._polarity_right * (motor_pos[1] - self._motor_pos[1])
        )
        self._motor_pos = motor_pos

        # current orientation
        orientation = (
            (
                self._polarity_right * self._motor_pos[1] -
                self._polarity_left * self._motor_pos[0]
            ) *
            self._radius_wheel / self._tread +
            180
        ) % 360 - 180

        if steps[0] == steps[1]:
            # straight
            dist = steps[0] * 2 * pi * self._radius_wheel / 360

            vehicle_pos_x, vehicle_pos_y = self._position
            vehicle_pos_x += dist * cos(radians(orientation))
            vehicle_pos_y += dist * sin(radians(orientation))
            self._position = vehicle_pos_x, vehicle_pos_y
            return

        angle = (
            (steps[1] - steps[0]) *
            self._radius_wheel / self._tread +
            180
        ) % 360 - 180
        radius_turn = (
            0.5 * self._tread * (steps[1] + steps[0]) /
            (steps[1] - steps[0])
        )
        fact = 2.0 * radius_turn * sin(radians(0.5 * angle))

        vehicle_pos_x, vehicle_pos_y = self._position
        vehicle_pos_x += (
            fact * cos(radians(orientation - 0.5 * angle))
        )
        vehicle_pos_y += (
            fact * sin(radians(orientation - 0.5 * angle))
        )
        self._position = vehicle_pos_x, vehicle_pos_y
        if self._callback is not None:
            self._callback(self.position)


[docs]class TwoWheelVehicle(EV3): ''' EV3 vehicle with two drived wheels ''' def __init__( self, radius_wheel: Number, tread: Number, *, protocol: str = None, host: str = None, ev3_obj: EV3 = None, speed: int = 10, ramp_up: int = 30, ramp_down: int = 30, delta_time: Number = None, port_left: bytes = PORT_A, port_right: bytes = PORT_D, polarity_left: int = 1, polarity_right: int = 1, tracking_callback: Callable = None, verbosity: int = 0 ): ''' Establishes a connection to a LEGO EV3 device Mandatory positional arguments radius_wheel radius of the wheels [m] tread: the vehicles tread [m] Keyword only arguments (either protocol and host or ev3_obj) protocol BLUETOOTH == 'Bluetooth', USB == 'Usb', WIFI == 'WiFi' host mac-address of the LEGO EV3 (e.g. '00:16:53:42:2B:99') ev3_obj an existing EV3 object (its connections will be used) speed percentage of maximum speed [1 - 100] (default is 10) ramp_up degrees for ramp-up (default is 30) ramp_down degrees for ramp-down (default is 30) delta_time timespan between introspections [s] (default depends on protocol, USB: 0.2 sec., WIFI: 0.1 sec., USB: 0.05 sec.) port_left port of left motor (PORT_A, PORT_B, PORT_C or PORT_D) port_right port of right motor (PORT_A, PORT_B, PORT_C or PORT_D) polarity_left polarity of left motor rotation, values: -1, 1 (default) polarity_right polarity of right motor rotation, values: -1, 1 (default) tracking_callback callable, which frequently tells current position, its single argument must be of type VehiclePosition verbosity level (0, 1, 2) of verbosity (prints on stdout) ''' assert isinstance(radius_wheel, Number), \ "radius_wheel needs to be a number" assert radius_wheel > 0, \ "radius_wheel needs to be positive" assert isinstance(tread, Number), \ "tread needs to be a number" assert tread > 0, \ "tread needs to be positive" assert isinstance(speed, int), \ "speed needs to be an integer" assert speed > 0, \ "speed needs to be positive" assert speed <= 100, \ "speed needs to be lower or equal 100" assert isinstance(ramp_up, int), \ "ramp_up must be an int" assert ramp_up >= 0, \ "ramp_up must be positive" assert isinstance(ramp_down, int), \ "ramp_down must be an int" assert ramp_down >= 0, \ "ramp_down must be positive" assert delta_time is None or isinstance(delta_time, Number), \ "delta_time needs to be a number" assert delta_time is None or delta_time > 0, \ "delta_time needs to be positive" assert isinstance(port_left, int), \ "port_left needs to be an integer" assert port_left in (PORT_A, PORT_B, PORT_C, PORT_D), \ "port_left needs to be one of (PORT_A, PORT_B, PORT_C, PORT_D)" assert isinstance(port_right, int), \ "port_right needs to be an integer" assert port_right in (PORT_A, PORT_B, PORT_C, PORT_D), \ "port_right needs to be one of (PORT_A, PORT_B, PORT_C, PORT_D)" assert port_left != port_right, \ 'port_left must be different from port_right' assert isinstance(polarity_left, int), \ 'polarity_left must be an integer' assert polarity_left in (-1, 1), \ "polarity_left needs to be -1 or 1" assert isinstance(polarity_right, int), \ 'polarity_right must be an integer' assert polarity_right in (-1, 1), \ "polarity_right needs to be -1 or 1" assert ( tracking_callback is None or isinstance(tracking_callback, Callable) ), "tracking_callback must be a callable" super().__init__(protocol=protocol, host=host, ev3_obj=ev3_obj) self._radius_wheel = radius_wheel self._tread = tread self._speed = speed self._current_movement = None self._target_motor_pos = None if delta_time is not None: self._delta_time = delta_time elif self._physical_ev3._protocol == BLUETOOTH: self._delta_time = .2 elif self._physical_ev3._protocol == WIFI: self._delta_time = .1 else: self._delta_time = .05 self._port_left = port_left self._port_right = port_right self._polarity_left = polarity_left self._polarity_right = polarity_right self._ramp_up = ramp_up self._ramp_down = ramp_down if self._physical_ev3._introspection is None: self._physical_ev3.introspection(verbosity) if self.sensors_as_dict[port_motor_input(self._port_left)] not in ( EV3_LARGE_MOTOR, EV3_MEDIUM_MOTOR ): if self._port_left == PORT_A: port_str = 'PORT_A' elif self._port_left == PORT_B: port_str = 'PORT_B' elif self._port_left == PORT_C: port_str = 'PORT_C' else: port_str = 'PORT_D' raise MotorError('no motor connected at ' + port_str) if self.sensors_as_dict[port_motor_input(self._port_right)] not in ( EV3_LARGE_MOTOR, EV3_MEDIUM_MOTOR ): if self._port_right == PORT_A: port_str = 'PORT_A' elif self._port_right == PORT_B: port_str = 'PORT_B' elif self._port_right == PORT_C: port_str = 'PORT_C' else: port_str = 'PORT_D' raise MotorError('no motor connected at ' + port_str) if self._physical_ev3._introspection["sensors"] \ [port_motor_input(self._port_left)]['used_by'] is not None: if self._port_left == PORT_A: port_str = 'PORT_A' elif self._port_left == PORT_B: port_str = 'PORT_B' elif self._port_left == PORT_C: port_str = 'PORT_C' else: port_str = 'PORT_D' host_str = self._physical_ev3._host raise PortInUse(f'{port_str} of {host_str} already in use') self._physical_ev3._introspection["sensors"] \ [port_motor_input(self._port_left)]['used_by'] = self if self._physical_ev3._introspection["sensors"] \ [port_motor_input(self._port_right)]['used_by'] is not None: if self._port_right == PORT_A: port_str = 'PORT_A' elif self._port_right == PORT_B: port_str = 'PORT_B' elif self._port_right == PORT_C: port_str = 'PORT_C' else: port_str = 'PORT_D' host_str = self._physical_ev3._host raise PortInUse(f'{port_str} of {host_str} already in use') self._physical_ev3._introspection["sensors"] \ [port_motor_input(self._port_right)]['used_by'] = self self.send_direct_cmd( b''.join(( opOutput_Reset, LCX(0), # LAYER LCX(self._port_left + self._port_right), # NOS opOutput_Clr_Count, LCX(0), # LAYER LCX(self._port_left + self._port_right) # NO )) ) self._tracking = _Tracking( self._radius_wheel, self._tread, (0, 0), # motor_pos polarity_left=self._polarity_left, polarity_right=self._polarity_right, callback=tracking_callback ) def __str__(self): ''' description of the object in a str context ''' return ' '.join(( 'TwoWheelVehicle', f'of {super().__str__()}' )) def __del__(self): """ handle specific logic for deletion """ if self._physical_ev3 is not None: self._physical_ev3._introspection["sensors"] \ [port_motor_input(self._port_left)]['used_by'] = None self._physical_ev3._introspection["sensors"] \ [port_motor_input(self._port_right)]['used_by'] = None super().__del__() def __exit__(self, exc_type, exc_value, exc_traceback): """ handle specific logic when exit with block """ self._physical_ev3._introspection["sensors"] \ [port_motor_input(self._port_left)]['used_by'] = None self._physical_ev3._introspection["sensors"] \ [port_motor_input(self._port_right)]['used_by'] = None super().__exit__(exc_type, exc_value, exc_traceback) @property def speed(self) -> int: ''' speed as percentage of maximum speed [1 - 100] ''' return self._speed @speed.setter def speed(self, value: int): assert isinstance(value, int), \ "speed needs to be an integer" assert value > 0, \ "speed needs to be positive" assert value <= 100, \ "speed needs to be lower or equal 100" self._speed = value @property def ramp_up(self) -> int: ''' degrees for ramp-up ''' return self._ramp_up @ramp_up.setter def ramp_up(self, value: int): assert isinstance(value, int), \ "ramp_up must be an int" assert value >= 0, \ "ramp_up must be positive" self._ramp_up = value @property def ramp_down(self) -> int: ''' degrees for ramp-down ''' return self._ramp_down @ramp_down.setter def ramp_down(self, value: int): assert isinstance(value, int), \ "ramp_down must be an int" assert value >= 0, \ "ramp_down must be positive" self._ramp_down = value @property def polarity_left(self) -> int: ''' polarity of left motor rotation (values: -1, 1, default: 1) ''' return self._polarity_left @polarity_left.setter def polarity_left(self, value: int): assert isinstance(value, int), "polarity_left needs to be of type int" assert value in (1, -1), "allowed polarity_left values are: -1 or 1" self._polarity_left = value @property def polarity_right(self) -> int: ''' polarity of left motor rotation (values: -1, 1, default: 1) ''' return self._polarity_right @polarity_right.setter def polarity_right(self, value: int): assert isinstance(value, int), "polarity_right needs to be of type int" assert value in (1, -1), "allowed polarity_right values are: -1 or 1" self._polarity_right = value @property def port_right(self) -> bytes: ''' port of right wheel (default: PORT_A) ''' return self._port_right @port_right.setter def port_right(self, value: int): assert isinstance(value, int), "port needs to be of type int" assert value in (PORT_A, PORT_B, PORT_C, PORT_D), \ "value is not an allowed port" self._port_right = value @property def port_left(self) -> bytes: ''' port of left wheel (default: PORT_D) ''' return self._port_left @port_left.setter def port_left(self, value: int): assert isinstance(value, int), "port needs to be of type int" assert value in (PORT_A, PORT_B, PORT_C, PORT_D), \ "value is not an allowed port" self._port_left = value @property def position(self) -> VehiclePosition: ''' current vehicle position (as named tuple) x and x-coordinates are in meter, orientation is in degree [-180 - 180] ''' self._tracking.update( unpack( '<2i', self.send_direct_cmd( self._ops_pos(), global_mem=8 ) ) ) return self._tracking.position @property def motor_pos(self) -> MotorPositions: ''' current positions of left and right motor [degree] (as named tuple) ''' self._tracking.update( unpack( '<2i', self.send_direct_cmd( self._ops_pos(), global_mem=8 ) ) ) return self._tracking.motor_pos @property def busy(self) -> bool: ''' Flag if motors are currently busy ''' data = unpack( '<2i2B', self.send_direct_cmd( self._ops_pos() + self._ops_busy(global_offset=8), global_mem=10 ) ) self._tracking.update(data[:2]) return any(data[2:]) @property def tracking_callback(self) -> Callable: ''' callable, which frequently tells current vehicle position, its single argument must be of type VehiclePosition ''' return self._tracking._callback @tracking_callback.setter def tracking_callback(self, value): assert ( value is None or isinstance(value, Callable) ), "tracking_callback must be a callable" self._tracking._callback = value def _ops_busy(self, global_offset=0) -> bytes: '''reads busy state of motors (returns operations) GVX(global_offset) - busy state of left motor (DATA8) GVX(global_offset + 1) - busy state of right motor (DATA9) ''' return b''.join(( opOutput_Test, LCX(0), # LAYER LCX(self._port_left), # NOS GVX(global_offset), # BUSY (DATA8) - output opOutput_Test, LCX(0), # LAYER LCX(self._port_right), # NOS GVX(global_offset + 1) # BUSY (DATA8) - output )) def _ops_pos(self, global_offset=0) -> bytes: ''' read positions of the wheels (returns operations) GVX(global_offset) - position of left motor (DATA32) GVX(global_offset + 4) - position of right motor (DATA32) ''' return b''.join(( opInput_Device, READY_RAW, LCX(0), # LAYER port_motor_input(self._port_left), # NO LCX(7), # TYPE - EV3-Large-Motor LCX(1), # MODE - Degree LCX(1), # VALUES GVX(global_offset), # VALUE1 opInput_Device, READY_RAW, LCX(0), # LAYER port_motor_input(self._port_right), # NO LCX(7), # TYPE - EV3-Large-Motor LCX(0), # MODE - Degree LCX(1), # VALUES GVX(global_offset + 4) # VALUE1 ))
[docs] def stop(self, brake: bool = False) -> None: '''stops the current motor movements, sets or releases brake Keyword Arguments brake flag if stopping with active brake ''' assert isinstance(brake, bool), \ 'brake must be a boolean' ops_stop = b''.join(( opOutput_Stop, LCX(0), # LAYER LCX(self._port_left + self._port_right), # NOS LCX(1 if brake else 0), # BRAKE - 0 (no), 1 (yes) )) self._tracking.update( unpack( '<2i', self.send_direct_cmd( self._ops_pos() + ops_stop, global_mem=8 ) ) ) if self._current_movement is not None: self._current_movement['stopped'] = True
[docs] def cont(self) -> None: ''' continues stopped movement ''' if self._current_movement is None: # movement was already finished return assert 'stopped' in self._current_movement, \ "can't continue unstopped movement" if self._current_movement['op'] == 'Step_Speed': self._start_move_to( self._current_movement['target_motor_pos'], speed=self._current_movement['speed'], ramp_up=self._current_movement['ramp_up'], ramp_down=self._current_movement['ramp_down'], brake=self._current_movement['brake'], _control=True ) else: raise RuntimeError('unknown op in current movement')
[docs] def move( self, speed: int, turn: int ) -> None: """ Starts unlimited synchronized movement of the vehicle Mandatory positional arguments speed direction (sign) and speed of movement as percentage of maximum speed [-100 - 100] turn type of turn [-200 - 200] -200: circle right on place -100: turn right with unmoved right wheel 0 : straight 100: turn left with unmoved left wheel 200: circle left on place """ assert isinstance(turn, int), \ "turn needs to be an integer" assert -200 <= turn <= 200, \ "turn needs to be in range [-200 - 200]" assert isinstance(speed, int), \ "speed needs to be an integer" assert -100 <= speed <= 100, \ "speed needs to be in range [-100 - 100]" if self._polarity_left == -1 and self._polarity_left == -1: speed *= -1 elif self._polarity_left == -1: if turn >= 0: turn = 200 - turn else: speed *= -1 turn = -200 - turn elif self._polarity_right == -1: if turn >= 0: speed *= -1 turn = 200 - turn else: turn = -200 - turn if self._port_left < self._port_right: turn *= -1 ops = b''.join(( opOutput_Step_Sync, LCX(0), # LAYER LCX(self._port_left + self._port_right), # NOS LCX(speed), LCX(turn), LCX(0), # STEPS LCX(0), # BRAKE opOutput_Start, LCX(0), # LAYER LCX(self._port_left + self._port_right) # NOS )) data = unpack( '<2i', self.send_direct_cmd( self._ops_pos() + ops, global_mem=8 ) ) self._tracking.update(data)
def _control(self): ''' controls current movement (must be called by a Repeated) ''' def direct_cmd(): ''' reads current motor positions and busy state, then modifies the speed of the inner wheel, which needs to do a number of calculations. The speed is lowered down if: - abs(done_inner / done_outer) is too high and abs(last_inner / last_outer) also is too high. The speed is accelerated if: - abs(done_inner / done_outer) is too low and abs(last_inner / last_outer) also is too low. In both cases abs(all_inner / all_outer) is the reference. Whenever the speed changes, step2 must be recalculated and finally opOutput_Step_Speed does the change of the movement. All this is done in a single direct command, which minimizes the communication ''' ops = b''.join(( # input --> global_mem opMove32_32, LCX(speed), GVX(8), # input --> local_mem opMove32_32, # b'\x3A' LCX(start_pos_inner), LVX(0), opMove32_32, # b'\x3A' LCX(start_pos_outer), LVX(4), opMove32_32, # b'\x3A' LCX(target_pos_inner), LVX(8), opMove32_32, # b'\x3A' LCX(target_pos_outer), LVX(12), opMove32_32, # b'\x3A' LCX(last_pos_inner), LVX(16), opMove32_32, # # b'\x3A' LCX(last_pos_outer), LVX(20), opMove32_32, # b'\x3A' LCX(step1), LVX(24), opMove32_32, # b'\x3A' LCX(step2), LVX(28), opMove32_32, # b'\x3A' LCX(step3), LVX(32), opMove32_32, # b'\x3A' LCX(speed), LVX(56), # speed_orig opInput_Device, # b'\x98' READY_RAW, LCX(0), # LAYER port_motor_input(port_inner), # NO LCX(7), # TYPE - EV3-Large-Motor LCX(1), # MODE - Degree LCX(1), # VALUES GVX(0), # VALUE1 opInput_Device, # b'\x98' READY_RAW, LCX(0), # LAYER port_motor_input(port_outer), # NO LCX(7), # TYPE - EV3-Large-Motor LCX(0), # MODE - Degree LCX(1), # VALUES GVX(4), # VALUE1 opOutput_Test, # b'\xA9' LCX(0), # LAYER LCX(port_inner), # NOS GVX(12), # BUSY (DATA8) - output opOutput_Test, # b'\xA9' LCX(0), # LAYER LCX(port_outer), # NOS GVX(13), # BUSY (DATA8) - output # if not (busy_inner or busy_outer): return opOr8, # b'\x20' GVX(12), # SOURCE1 GVX(13), # SOURCE2 LVX(60), # busy_inner or busy_outer opJr_False, # b'\x41' LVX(60), # FLAG LCX(263), # OFFSET opSub32, # b'\x16' GVX(0), LVX(0), LVX(36), # done_inner = pos_inner - start_pos_inner opMove32_32, # b'\x3A' LVX(36), LVX(40), # will become abs(done_inner) opJr_Gteq32, # b'\x7A' LVX(36), # done_inner LCX(0), LCX(6), # OFFSET opMul32, # b'\x1A' LVX(36), LCX(-1), LVX(40), # abs(done_inner) # if abs(done_inner) < step1: return (still in ramp up) opJr_Lt32, # b'\x66' LVX(40), # abs(done_inner) LVX(24), # step1 LCX(235), # OFFSET # if abs(done_inner) > (step1 + step2): return (already in ramp down) opAdd32, # b'\x12' LVX(24), # step1 LVX(28), # step2 LVX(44), # step1 + step2 opJr_Gt32, # b'\x6A' LVX(40), # abs(done_inner) LVX(44), # step1 + step2 LCX(222), # OFFSET opSub32, # b'\x16' GVX(4), LVX(4), LVX(40), # done_outer = pos_outer - start_pos_outer opMove32_F, # b'\x3B' LVX(40), LVX(40), # done_outer as DATAF opMove32_F, # b'\x3B' LVX(36), LVX(44), # done_inner as DATAF opDivF, # b'\x1F' LVX(44), LVX(40), LVX(40), # done_inner / done_outer opMath, # b'\x8D' ABS, LVX(40), LVX(40), # ratio_done opSub32, # b'\x16' GVX(0), LVX(16), LVX(44), # last_inner = pos_inner - last_pos_inner opMove32_F, # b'\x3B' LVX(44), LVX(44), # last_inner as DATAF opSub32, # b'\x16' GVX(4), LVX(20), LVX(48), # last_outer = pos_outer - last_pos_outer opMove32_F, # b'\x3B' LVX(48), LVX(48), # last_outer as DATAF opDivF, # b'\x1F' LVX(44), LVX(48), LVX(44), # last_inner / last_outer opMath, # b'\x8D' ABS, LVX(44), LVX(44), # ratio_last opSub32, # b'\x16' LVX(8), LVX(0), LVX(48), # all_inner = target_pos_inner - start_pos_inner opMove32_F, # b'\x3B' LVX(48), LVX(52), # all_inner as DATAF opSub32, # b'\x16' LVX(12), LVX(4), LVX(56), # all_outer = target_pos_outer - start_pos_outer opMove32_F, # b'\x3B' LVX(56), LVX(56), # all_outer as DATAF opDivF, # b'\x1F' LVX(52), LVX(56), LVX(52), # all_inner / all_outer opMath, # b'\x8D' ABS, LVX(52), LVX(52), # ratio_all # skip slow down opJr_GteqF, # b'\x7B' LVX(52), # ratio_all LVX(44), # ratio_last LCX(22), opJr_GteqF, # b'\x7B' LVX(52), # ratio_all LVX(40), # ratio_done LCX(16), # slow down speed opJr_Lteq32, # b'\x76' GVX(8), # speed LCX(0), LCX(4), # OFFSET opSub32, # b'\x16' GVX(8), LCX(1), GVX(8), # speed -= 1 opJr_Gteq32, # b'\x7A' GVX(8), # speed LCX(0), LCX(4), opAdd32, # b'\x12' GVX(8), LCX(1), GVX(8), # speed += 1 # skip accelerate opJr_Eq32, # b'\x6E' already maximum speed GVX(8), # speed LCX(100), LCX(54), # OFFSET opJr_Eq32, # b'\x6E' already maximum speed GVX(8), # speed LCX(-100), LCX(48), # OFFSET opJr_GteqF, # b'\x7B' already compensating LVX(44), # ratio_last LVX(52), # ratio_all LCX(41), # OFFSET opJr_GteqF, # b'\x7B' ratio over target LVX(40), # ratio_done LVX(52), # ratio_all LCX(34), # OFFSET # accelerate speed opJr_Neq32, # b'\x72' GVX(8), # speed LCX(0), LCX(5), # OFFSET opJr_Lt32, # b'\x66' LVX(48), # all_inner LCX(0), LCX(9), # OFFSET opJr_Lt32, # b'\x66' GVX(8), # speed LCX(0), LCX(4), # OFFSET opAdd32, # b'\x12' GVX(8), LCX(1), GVX(8), # speed += 1 opJr_Neq32, # b'\x72' GVX(8), # speed LCX(0), LCX(5), # OFFSET opJr_Gt32, # b'\x6A' LVX(48), # all_inner LCX(0), LCX(8), # OFFSET opJr_Gt32, # b'\x6A' GVX(8), # speed LCX(0), LCX(4), # OFFSET opSub32, # b'\x16' GVX(8), LCX(1), GVX(8), # speed -= 1 # if speed == speed_orig: return opJr_Eq32, # b'\x6E' GVX(8), # speed LVX(56), LCX(34), # OFFSET # determine step2 opSub32, # b'\x16' LVX(48), LVX(36), LVX(0), # all_inner - done_inner opJr_Gteq32, # b'\x7A' LVX(0), # all_inner - done_inner LCX(0), LCX(4), # OFFSET opMul32, # b'\x1A' LVX(0), LCX(-1), LVX(0), # abs(all_inner - done_inner) opSub32, # b'\x16' LVX(0), # abs(all_inner - done_inner) LVX(32), # step3 LVX(28), # step2 # modify movement opOutput_Reset, # b'\xA2' LCX(0), # LAYER LCX(port_inner), # NOS opOutput_Step_Speed, # b'\xAE' LCX(0), # LAYER LCX(port_inner), # NOS GVX(8), # SPEED LCX(0), # STEP1 LVX(28), # STEP2 LVX(32), # STEP3 LCX(1 if cur_mov['brake'] else 0), # BRAKE opOutput_Start, # b'\xA6' LCX(0), # LAYER LCX(port_inner), # NOS )) return unpack( '<3i2B', self.send_direct_cmd( ops, global_mem=14, local_mem=61 ) ) if self._current_movement is None: # prevent needless data traffic return True if self._current_movement['op'] != 'Step_Speed': RuntimeError('unknown movement') cur_mov = self._current_movement all_left = ( round(cur_mov['target_motor_pos'][0]) - cur_mov['start_motor_pos'][0] ) all_right = ( round(cur_mov['target_motor_pos'][1]) - cur_mov['start_motor_pos'][1] ) if 'last_motor_pos' not in cur_mov: cur_mov['last_motor_pos'] = cur_mov['start_motor_pos'] if abs(all_left) < abs(all_right): # regulate left motor port_inner = self._port_left port_outer = self._port_right speed = cur_mov['speed_left'] step1 = cur_mov['step1_left'] step2 = cur_mov['step2_left'] step3 = cur_mov['step3_left'] start_pos_inner = cur_mov['start_motor_pos'][0] start_pos_outer = cur_mov['start_motor_pos'][1] last_pos_inner = cur_mov['last_motor_pos'][0] last_pos_outer = cur_mov['last_motor_pos'][1] target_pos_inner = cur_mov['target_motor_pos'][0] target_pos_outer = cur_mov['target_motor_pos'][1] else: # regulate right motor port_inner = self._port_right port_outer = self._port_left speed = cur_mov['speed_right'] step1 = cur_mov['step1_right'] step2 = cur_mov['step2_right'] step3 = cur_mov['step3_right'] start_pos_inner = cur_mov['start_motor_pos'][1] start_pos_outer = cur_mov['start_motor_pos'][0] last_pos_inner = cur_mov['last_motor_pos'][1] last_pos_outer = cur_mov['last_motor_pos'][0] target_pos_inner = cur_mov['target_motor_pos'][1] target_pos_outer = cur_mov['target_motor_pos'][0] speed_orig = speed pos_inner, pos_outer, speed, busy_inner, busy_outer = direct_cmd() # update tracking if abs(all_left) < abs(all_right): # regulate left motor motor_pos = (pos_inner, pos_outer) cur_mov['speed_left'] = speed else: # regulate right motor motor_pos = (pos_outer, pos_inner) cur_mov['speed_right'] = speed self._tracking.update(motor_pos) if not busy_inner and not busy_outer: # all done self._current_movement = None return True if speed_orig != speed: cur_mov['last_motor_pos'] = motor_pos return self._delta_time def _start_move_by( self, diff_pos: Tuple[Decimal], *, speed: int, ramp_up: int, ramp_down: int, brake: bool, _control: bool = False ) -> None: ''' starts moving the motors to given positions Mandatory positional arguments diff_positions difference in positions [degree] of left and right motor Mandatory keyword only arguments speed percentage of maximum speed [1 - 100] ramp_up degrees for ramp-up ramp_down degrees for ramp-down brake Flag if ending with floating motors (False) or active brake (True). ''' diff_left = round(diff_pos[0]) diff_right = round(diff_pos[1]) if abs(diff_left) >= abs(diff_right): # left is outer, right is inner factor = abs(diff_right) / abs(diff_left) speed_left = copysign(speed, diff_left) step2_left = abs(diff_left) - ramp_up - ramp_down if step2_left < 0: step1_left = round(abs(diff_left) * ( ramp_up / (ramp_up + ramp_down) )) step3_left = abs(diff_left) - step1_left step2_left = 0 speed_left *= min( sqrt(step1_left / ramp_up) if ramp_up > 0 else 1, sqrt(step3_left / ramp_down) if ramp_down > 0 else 1 ) speed_left = round(speed_left) else: step1_left = ramp_up step3_left = ramp_down speed_left = round(speed_left) step1_right = int(factor**2 * step1_left) step3_right = int(factor**2 * step3_left) step2_right = abs(diff_right) - step1_right - step3_right speed_right = round(copysign(factor * abs(speed_left), diff_right)) else: # right is outer, left is inner factor = abs(diff_left) / abs(diff_right) speed_right = copysign(speed, diff_right) step2_right = abs(diff_right) - ramp_up - ramp_down if step2_right < 0: step1_right = round(abs(diff_right) * ( ramp_up / (ramp_up + ramp_down) )) step2_right = 0 step3_right = abs(diff_right) - step1_right speed_right *= min( (step1_right / ramp_up)**2 if ramp_up > 0 else 1, (step3_right / ramp_down)**2 if ramp_down > 0 else 1 ) speed_right = round(speed_right) else: step1_right = ramp_up step3_right = ramp_down speed_right = round(speed_right) step1_left = int(factor**2 * step1_right) step3_left = int(factor**2 * step3_right) step2_left = abs(diff_left) - step1_left - step3_left speed_left = round(copysign(factor * abs(speed_right), diff_left)) ops_move = b''.join(( opOutput_Reset, LCX(0), # LAYER LCX(self._port_left + self._port_right), # NOS opOutput_Step_Speed, LCX(0), # LAYER LCX(self._port_left), # NOS LCX(speed_left), # SPEED LCX(step1_left), # STEP1 LCX(step2_left), # STEP2 LCX(step3_left), # STEP3 LCX(1 if brake else 0), # BRAKE - 1 (yes) or 0 (no) opOutput_Step_Speed, LCX(0), # LAYER LCX(self._port_right), # NOS LCX(speed_right), # SPEED LCX(step1_right), # STEP1 LCX(step2_right), # STEP2 LCX(step3_right), # STEP3 LCX(1 if brake else 0), # BRAKE - 1 (yes) or 0 (no) opOutput_Start, LCX(0), # LAYER LCX(self._port_left + self._port_right) # NOS )) data = unpack( '<2i', self.send_direct_cmd( self._ops_pos() + ops_move, global_mem=8 ) ) self._tracking.update(data) if _control: if self._target_motor_pos is None: self._target_motor_pos = MotorPositions( self._tracking._motor_pos[0] + diff_pos[0], self._tracking._motor_pos[1] + diff_pos[1] ) else: self._target_motor_pos = MotorPositions( self._target_motor_pos.left + diff_pos[0], self._target_motor_pos.right + diff_pos[1] ) self._current_movement = { 'op': 'Step_Speed', 'start_motor_pos': data, 'speed_left': speed_left, 'step1_left': step1_left, 'step2_left': step2_left, 'step3_left': step3_left, 'speed_right': speed_right, 'step1_right': step1_right, 'step2_right': step2_right, 'step3_right': step3_right, 'target_motor_pos': ( round(self._target_motor_pos.left), round(self._target_motor_pos.right) ), 'speed': speed, 'ramp_up': ramp_up, 'ramp_down': ramp_down, 'brake': brake } else: self._target_motor_pos = None self._current_movement = None def _start_move_to( self, target_pos: Tuple[Decimal], *, speed: int, ramp_up: int, ramp_down: int, brake: bool, _control: bool = False ) -> None: ''' starts moving the motors to given positions Mandatory positional arguments target_positions target positions [degree] of left and right motor Mandatory keyword only arguments speed percentage of maximum speed [1 - 100] ramp_up degrees for ramp-up ramp_down degrees for ramp-down brake Flag if ending with floating motors (False) or active brake (True). ''' if _control: self._target_motor_pos = MotorPositions( target_pos[0], target_pos[1] ) else: self._target_motor_pos = None current_pos = self.motor_pos diff_left = round(target_pos[0] - current_pos.left) diff_right = round(target_pos[1] - current_pos.right) if abs(diff_left) >= abs(diff_right): # left is outer, right is inner factor = abs(diff_right) / abs(diff_left) speed_left = copysign(speed, diff_left) step2_left = abs(diff_left) - ramp_up - ramp_down if step2_left < 0: step1_left = round(abs(diff_left) * ( ramp_up / (ramp_up + ramp_down) )) step3_left = abs(diff_left) - step1_left step2_left = 0 speed_left *= min( sqrt(step1_left / ramp_up) if ramp_up > 0 else 1, sqrt(step3_left / ramp_down) if ramp_down > 0 else 1 ) speed_left = round(speed_left) else: step1_left = ramp_up step3_left = ramp_down speed_left = round(speed_left) step1_right = int(factor**2 * step1_left) step3_right = int(factor**2 * step3_left) step2_right = abs(diff_right) - step1_right - step3_right speed_right = round(copysign(factor * abs(speed_left), diff_right)) else: # right is outer, left is inner factor = abs(diff_left) / abs(diff_right) speed_right = copysign(speed, diff_right) step2_right = abs(diff_right) - ramp_up - ramp_down if step2_right < 0: step1_right = round(abs(diff_right) * ( ramp_up / (ramp_up + ramp_down) )) step2_right = 0 step3_right = abs(diff_right) - step1_right speed_right *= min( (step1_right / ramp_up)**2 if ramp_up > 0 else 1, (step3_right / ramp_down)**2 if ramp_down > 0 else 1 ) speed_right = round(speed_right) else: step1_right = ramp_up step3_right = ramp_down speed_right = round(speed_right) step1_left = int(factor**2 * step1_right) step3_left = int(factor**2 * step3_right) step2_left = abs(diff_left) - step1_left - step3_left speed_left = round(copysign(factor * abs(speed_right), diff_left)) ops_move = b''.join(( opOutput_Reset, LCX(0), # LAYER LCX(self._port_left + self._port_right), # NOS opOutput_Step_Speed, LCX(0), # LAYER LCX(self._port_left), # NOS LCX(speed_left), # SPEED LCX(step1_left), # STEP1 LCX(step2_left), # STEP2 LCX(step3_left), # STEP3 LCX(1 if brake else 0), # BRAKE - 1 (yes) or 0 (no) opOutput_Step_Speed, LCX(0), # LAYER LCX(self._port_right), # NOS LCX(speed_right), # SPEED LCX(step1_right), # STEP1 LCX(step2_right), # STEP2 LCX(step3_right), # STEP3 LCX(1 if brake else 0), # BRAKE - 1 (yes) or 0 (no) opOutput_Start, LCX(0), # LAYER LCX(self._port_left + self._port_right) # NOS )) data = unpack( '<2i', self.send_direct_cmd( self._ops_pos() + ops_move, global_mem=8 ) ) self._tracking.update(data) if _control: self._current_movement = { 'op': 'Step_Speed', 'start_motor_pos': data, 'speed_left': speed_left, 'step1_left': step1_left, 'step2_left': step2_left, 'step3_left': step3_left, 'speed_right': speed_right, 'step1_right': step1_right, 'step2_right': step2_right, 'step3_right': step3_right, 'target_motor_pos': ( round(self._target_motor_pos.left), round(self._target_motor_pos.right) ), 'speed': speed, 'ramp_up': ramp_up, 'ramp_down': ramp_down, 'brake': brake } else: self._current_movement = None def _start_drive_straight( self, distance: Number, *, speed: int = None, ramp_up: int = None, ramp_down: int = None, brake: bool = False, _control: bool = False ) -> None: ''' starts driving the vehicle straight by a given distance Mandatory positional arguments distance direction (sign) and distance (meters) of straight movement Optional keyword only arguments speed percentage of maximum speed [1 - 100] ramp_up degrees for ramp-up ramp_down degrees for ramp-down brake flag if ending with floating motors (False) or active brake (True). ''' assert isinstance(distance, Number), \ 'distance needs to be a number' assert speed is None or isinstance(speed, int), \ 'speed needs to be an integer' assert speed is None or 0 < speed <= 100, \ 'speed needs to be in range [1 - 100]' assert ramp_down is None or isinstance(ramp_down, int), \ "ramp_down must be an int" assert ramp_down is None or ramp_down >= 0, \ "ramp_down must be positive" assert isinstance(brake, bool), \ 'brake needs to be a boolean' assert self._current_movement is None, \ 'concurrent movement in progress' assert ( self._current_movement is None or 'stopped' in self._current_movement ), "concurrent movement in progress" if speed is None: speed = self._speed if ramp_up is None: ramp_up = self._ramp_up if ramp_down is None: ramp_down = self._ramp_down step = distance * 360 / (2 * pi * self._radius_wheel) step_left = Decimal( self._polarity_left * step ).quantize(Decimal('1.0000')) step_right = Decimal( self._polarity_right * step ).quantize(Decimal('1.0000')) if self._target_motor_pos is None: self._start_move_by( (step_left, step_right), speed=speed, ramp_up=ramp_up, ramp_down=ramp_down, brake=brake, _control=_control ) else: self._start_move_to( ( self._target_motor_pos.left + step_left, self._target_motor_pos.right + step_right ), speed=speed, ramp_up=ramp_up, ramp_down=ramp_down, brake=brake, _control=_control )
[docs] def drive_straight( self, distance: Number, *, speed: int = None, ramp_up: int = None, ramp_down: int = None, brake: bool = False ) -> Task: ''' drives the vehicle straight by a given distance Mandatory positional arguments distance direction (sign) and distance (meters) of straight movement Optional keyword only arguments speed percentage of maximum speed [1 - 100] ramp_up degrees for ramp-up ramp_down degrees for ramp-down brake flag if ending with floating motors (False) or active brake (True). Returns Task object, which can be started, stopped and continued. ''' assert isinstance(distance, Number), \ 'distance needs to be a number' assert speed is None or isinstance(speed, int), \ 'speed needs to be an integer' assert speed is None or 0 < speed <= 100, \ 'speed needs to be in range [1 - 100]' assert ramp_down is None or isinstance(ramp_down, int), \ "ramp_down must be an int" assert ramp_down is None or ramp_down >= 0, \ "ramp_down must be positive" assert isinstance(brake, bool), \ 'brake needs to be a boolean' assert self._current_movement is None, \ 'concurrent movement in progress' assert ( self._current_movement is None or 'stopped' in self._current_movement ), "concurrent movement in progress" if speed is None: speed = self._speed if ramp_up is None: ramp_up = self._ramp_up if ramp_down is None: ramp_down = self._ramp_down return Task( self._start_drive_straight, args=(distance,), kwargs={ 'speed': speed, 'ramp_up': ramp_up, 'ramp_down': ramp_down, 'brake': brake, '_control': True }, duration=self._delta_time, action_stop=self.stop, action_cont=self.cont ) + Repeated( self._control, action_stop=self.stop, action_cont=self.cont )
def _start_drive_turn( self, angle: Number, radius: Number, *, speed: int = None, back: bool = False, ramp_up: int = None, ramp_down: int = None, brake: bool = False, _control: bool = False ) -> None: ''' starts driving the vehicle a turn by given angle and radius Mandatory positional arguments angle angle of turn (degrees), positive sign: to the left, negative sign: to the right radius radius of turn (meters) Optional keyword only arguments speed percentage of maximum speed [1 - 100] back flag if backwards ramp_up degrees for ramp-up ramp_down degrees for ramp-down brake Flag if ending with floating motors (False) or active brake (True). ''' assert isinstance(angle, Number), \ 'angle needs to be a number' assert isinstance(radius, Number) and radius >= 0, \ 'radius needs to be a positive number' assert speed is None or isinstance(speed, int), \ 'speed needs to be an integer' assert speed is None or 0 < speed <= 100, \ 'speed needs to be in range [1 - 100]' assert isinstance(back, bool), \ 'back must be a bool' assert ramp_down is None or isinstance(ramp_down, int), \ "ramp_down must be an int" assert ramp_down is None or ramp_down >= 0, \ "ramp_down must be positive" assert isinstance(brake, bool), \ 'brake needs to be a boolean' assert self._current_movement is None, \ 'concurrent movement in progress' assert ( self._current_movement is None or 'stopped' in self._current_movement ), "concurrent movement in progress" if speed is None: speed = self._speed if ramp_up is None: ramp_up = self._ramp_up if ramp_down is None: ramp_down = self._ramp_down radius_outer = radius + 0.5 * self._tread radius_inner = radius - 0.5 * self._tread ratio = radius_inner / radius_outer step_outer = ( (-1 if back else 1) * abs(angle) * radius_outer / self._radius_wheel ) step_inner = ratio * step_outer if any(( angle > 0 and not back, angle < 0 and back )): step_left = Decimal( self._polarity_left * step_inner ).quantize(Decimal('1.0000')) step_right = Decimal( self._polarity_right * step_outer ).quantize(Decimal('1.0000')) else: step_left = Decimal( self._polarity_left * step_outer, ).quantize(Decimal('1.0000')) step_right = Decimal( self._polarity_right * step_inner ).quantize(Decimal('1.0000')) if self._target_motor_pos is None: self._start_move_by( (step_left, step_right), speed=speed, ramp_up=ramp_up, ramp_down=ramp_down, brake=brake, _control=_control ) else: self._start_move_to( ( self._target_motor_pos.left + step_left, self._target_motor_pos.right + step_right ), speed=speed, ramp_up=ramp_up, ramp_down=ramp_down, brake=brake, _control=_control )
[docs] def drive_turn( self, angle: Number, radius: Number, *, speed: int = None, back: bool = False, ramp_up: int = None, ramp_down: int = None, brake: bool = False ) -> Task: ''' starts driving the vehicle a turn by given angle and radius Mandatory positional arguments angle angle of turn (degrees), positive sign: to the left, negative sign: to the right radius radius of turn (meters) Optional keyword only arguments speed percentage of maximum speed [1 - 100] back flag if backwards ramp_up degrees for ramp-up ramp_down degrees for ramp-down brake Flag if ending with floating motors (False) or active brake (True). Returns Task object, which can be started, stopped and continued. ''' assert isinstance(radius, Number) and radius >= 0, \ 'radius needs to be a positive number' assert isinstance(angle, Number), \ 'angle needs to be a number' assert speed is None or isinstance(speed, int), \ 'speed needs to be an integer' assert speed is None or 0 < speed <= 100, \ 'speed needs to be in range [1 - 100]' assert isinstance(back, bool), \ 'back must be a bool' assert ramp_down is None or isinstance(ramp_down, int), \ "ramp_down must be an int" assert ramp_down is None or ramp_down >= 0, \ "ramp_down must be positive" assert isinstance(brake, bool), \ 'brake needs to be a boolean' assert self._current_movement is None, \ 'concurrent movement in progress' assert ( self._current_movement is None or 'stopped' in self._current_movement ), "concurrent movement in progress" if speed is None: speed = self._speed if ramp_up is None: ramp_up = self._ramp_up if ramp_down is None: ramp_down = self._ramp_down return Task( self._start_drive_turn, args=(angle, radius), kwargs={ 'speed': speed, 'back': back, 'ramp_up': ramp_up, 'ramp_down': ramp_down, 'brake': brake, '_control': True }, duration=self._delta_time, action_stop=self.stop, action_cont=self.cont ) + Repeated( self._control, action_stop=self.stop, action_cont=self.cont )