Source code for ev3_dc.ev3

#!/usr/bin/env python3
'''
LEGO EV3 direct commands - ev3
'''

import re
import socket
import struct
from collections import namedtuple
from time import sleep
from datetime import datetime, timedelta
from threading import Lock

import platform
if platform.system() == 'Darwin':
    import hid
else:
    import usb.util

from .exceptions import NoEV3, DirCmdError, SysCmdError
from .constants import (
    _ID_VENDOR_LEGO,
    _ID_PRODUCT_EV3,
    _EP_IN,
    _EP_OUT,
    _DIRECT_COMMAND_REPLY,
    _DIRECT_COMMAND_NO_REPLY,
    _DIRECT_REPLY,
    _SYSTEM_COMMAND_REPLY,
    _SYSTEM_COMMAND_NO_REPLY,
    _SYSTEM_REPLY,
    WIFI,
    BLUETOOTH,
    USB,
    STD,
    ASYNC,
    SYNC,

    # introspection
    opInfo,
    opInput_Device,
    opMemory_Usage,
    opCom_Get,
    opCom_Set,
    opUI_Read,
    GET_TYPEMODE,
    GET_VOLUME,
    GET_MINUTES,
    GET_VBATT,
    GET_IBATT,
    GET_LBATT,
    GET_BRICKNAME,
    GET_OS_VERS,
    GET_OS_BUILD,
    GET_HW_VERS,
    GET_FW_BUILD,
    GET_FW_VERS,
    GET_NETWORK,
    SET_BRICKNAME,
    SET_VOLUME,
    SET_MINUTES,
    PORT_1,
    PORT_2,
    PORT_3,
    PORT_4,
    PORT_A_SENSOR,
    PORT_B_SENSOR,
    PORT_C_SENSOR,
    PORT_D_SENSOR
)
from .functions import LCX, GVX, LCS

Battery = namedtuple('Battery', [
        'voltage',
        'current',
        'percentage'
])

Memory = namedtuple('Memory', [
        'total',
        'free'
])

Sensors = namedtuple('Sensors', [
        'Port_1',
        'Port_2',
        'Port_3',
        'Port_4',
        'Port_A',
        'Port_B',
        'Port_C',
        'Port_D'
])

Network = namedtuple('Network', [
        'name',
        'ip_adr',
        'mac_adr'
])

System = namedtuple('System', [
        'os_version',
        'os_build',
        'fw_version',
        'fw_build',
        'hw_version'
])

class _PhysicalEV3:
    '''
    exists once per physical EV3 device
    '''
    _devices = set()  # all connected devices as class attribute

    def __init__(
            self,
            protocol: str,
            host: str = None,
    ):
        assert isinstance(protocol, str), \
            'protocol needs to be of type str'
        assert protocol in (BLUETOOTH, WIFI, USB), \
            'Protocol ' + protocol + 'is not valid'
        assert host is None or isinstance(host, str), \
            "host needs to be of type str"
        assert host is not None or protocol != BLUETOOTH, \
            "in case of protocol BLUETOOTH, host needs to be set"
        assert host is None or host.upper() not in self._devices, \
            f"host {host} already connected, call with argument ev3_obj"

        self._msg_cnt = 41
        self._lock = Lock()
        self._reply_buffer = {}
        self._protocol = protocol
        if host is not None:
            self._host = host.upper()
        else:
            self._host = host
        self._device = None
        self._socket = None
        self._name = None
        self._ip_adr = None
        self._introspection = None

        if protocol == BLUETOOTH:
            self.connect_bluetooth()
        elif protocol == WIFI:
            self.connect_wifi()
        else:
            self.connect_usb()

        self._devices.add(self._host)

    @classmethod
    @property
    def devices(cls):
        '''
        all connected devices as a set of mac_addresses
        '''
        return cls._devices

    def __del__(self):
        '''
        closes the connection to the LEGO EV3
        '''
        if (
                self._socket is not None and
                isinstance(self._socket, socket.socket) and
                self._host in self._devices
        ):
            self._socket.close()
            self._devices.remove(self._host)

    def next_msg_cnt(self) -> int:
        '''
        determines next message counter
        '''
        if self._msg_cnt < 65535:
            self._msg_cnt += 1
        else:
            self._msg_cnt = 1
        return self._msg_cnt

    def put_to_reply_buffer(self, msg_cnt: bytes, reply: bytes) -> None:
        '''
        put a reply to the stack
        '''
        if msg_cnt in self._reply_buffer:
            raise ValueError(
                'reply with msg_cnt ' +
                ':'.join('{:02X}'.format(byte) for byte in msg_cnt) +
                ' already exists'
            )
        self._reply_buffer[msg_cnt] = reply


    def connect_bluetooth(self) -> int:
        '''
        Create a socket, that holds a bluetooth-connection to an EV3
        '''
        self._socket = socket.socket(
            socket.AF_BLUETOOTH,
            socket.SOCK_STREAM,
            socket.BTPROTO_RFCOMM
        )
        try:
            self._socket.connect((self._host, 1))
        except OSError:
            raise NoEV3('No EV3 device found') from None


    def connect_wifi(self) -> int:
        '''
        Create a socket, that holds a WiFi-connection to an EV3
        '''

        # listen on port 3015 for a UDP broadcast from the EV3
        started_at = datetime.now()
        while True:
            UDPSock = socket.socket(
                socket.AF_INET,
                socket.SOCK_DGRAM
            )
            UDPSock.settimeout(10)
            UDPSock.bind(('', 3015))
            try:
                data, addr = UDPSock.recvfrom(1024)
            except socket.timeout:
                raise NoEV3('No EV3 device found') from None

            # pick serial number, port, name and protocol
            # from the broadcast message
            matcher = re.search(
                r'^Serial-Number: (\w*)\s\n' +
                r'Port: (\d{4,4})\s\n' +
                r'Name: (\w+)\s\n' +
                r'Protocol: (\w+)',
                data.decode('utf-8')
            )
            tmp = matcher.group(1).upper()
            serial_number = ':'.join(
                (tmp[i:i + 2] for i in range(0, len(tmp), 2))
            )
            port = matcher.group(2)
            name = matcher.group(3)
            protocol = matcher.group(4)

            # test if correct mac-addr
            if self._host is not None:
                if serial_number == self._host.upper():
                    break
            else:
                self._host = serial_number
                break

            if datetime.now() - started_at > timedelta(seconds=10):
                raise NoEV3(
                    f'found EV3 device {serial_number}, but not {self._host}'
                )

        assert self._host not in self._devices, \
            f"host {self._host} already connected, call with argument ev3_obj"

        # Send an UDP message back to the EV3
        # to make it accept a TCP/IP connection
        UDPSock.sendto(' '.encode('utf-8'), (addr[0], int(port)))
        UDPSock.close()

        # Establish a TCP/IP connection with EV3's address and port
        self._socket = socket.socket(
            socket.AF_INET,
            socket.SOCK_STREAM
        )
        self._socket.connect((addr[0], int(port)))

        # Send an unlock message to the EV3 over TCP/IP
        msg = 'GET /target?sn=' + serial_number + 'VMTP1.0\n' + \
              'Protocol: ' + protocol
        self._socket.send(msg.encode('utf-8'))
        reply = self._socket.recv(16).decode('utf-8')
        if not reply.startswith('Accept:EV340'):
            raise RuntimeError(
                'No WiFi connection to ' +
                name +
                ' established'
            )


    def connect_usb(self) -> int:
        '''
        Create a device, that holds an USB connection to an EV3
        '''
        if platform.system() == 'Darwin':
            # hidapi
            devices = hid.enumerate(_ID_VENDOR_LEGO, _ID_PRODUCT_EV3)
            if len(devices) == 0:
                raise NoEV3('No EV3 device found')
            if self._host is None and len(devices) > 1:
                raise NoEV3('multiple EV3 found, you need to set argument host')
            if self._host is None and len(devices) == 1:
                self._device = hid.device()
                self._device.open_path(devices[0]['path'])
            else:
                # multiple devices found, match with host
                hosts = []
                for dev in devices:
                    tmp = dev['serial_number'].upper()
                    mac_addr = ':'.join((tmp[i:i + 2] for i in range(0, len(tmp), 2)))
                    if mac_addr == self._host:
                        self._device = hid.device()
                        self._device.open_path(dev['path'])
                        break
                    hosts.append(mac_addr)
                if self._device is None:
                    raise NoEV3(
                        f'found EV3 devices: {hosts} but not {self._host}'
                    )
        else:
            # pyusb
            mac_addr = None
            hosts = []
            for dev in usb.core.find(
                    find_all=True,
                    idVendor=_ID_VENDOR_LEGO,
                    idProduct=_ID_PRODUCT_EV3
            ):
                tmp = usb.util.get_string(dev, dev.iSerialNumber).upper()
                mac_addr = ':'.join((tmp[i:i + 2] for i in range(0, len(tmp), 2)))
                hosts.append(mac_addr)
                if self._device is not None:
                    raise NoEV3(
                        'multiple EV3 found, you need to set argument host'
                    )
                if self._host is not None:
                    if mac_addr == self._host.upper():
                        self._device = dev
                        break
                else:
                    self._device = dev
                    self._host = mac_addr

            if self._device is None:
                if mac_addr is None:
                    raise NoEV3('No EV3 device found')
                raise NoEV3(
                    f'found EV3 devices: {hosts} but not {self._host}'
                )

            assert self._host not in self._devices, \
                f"host {self._host} already connected, call with argument ev3_obj"
    
            # handle interfaces
            for i in self._device.configurations()[0].interfaces():
                try:
                    if self._device.is_kernel_driver_active(i.index):
                        self._device.detach_kernel_driver(i.index)
                except NotImplementedError:
                    pass
            self._device.set_configuration()

            # initial read
            try:
                self._device.read(_EP_IN, 1024, 100)
            except:
                pass


    def introspection(self, verbosity: int) -> None:
        '''
        reads informations about itself
        '''
        ports = (
            PORT_1,
            PORT_2,
            PORT_3,
            PORT_4,
            PORT_A_SENSOR,
            PORT_B_SENSOR,
            PORT_C_SENSOR,
            PORT_D_SENSOR
        )
        ops = b''
        for i in range(8):
            ops += b''.join((
                opInput_Device,  # operation
                GET_TYPEMODE,  # CMD
                LCX(0),  # LAYER
                ports[i],  # NO
                GVX(2*i),  # TYPE (output)
                GVX(2*i + 1)  # MODE (output)
            ))
        ops += b''.join((
            opMemory_Usage,
            GVX(16),  # TOTAL (out)
            GVX(20),  # FREE (out)
            opInfo,
            GET_VOLUME,  # CMD
            GVX(24), # VALUE
            opInfo,
            GET_MINUTES,  # CMD
            GVX(25), # VALUE
            opCom_Get,
            GET_BRICKNAME,  # CMD
            LCX(32),  # LENGTH
            GVX(26),  # NAME (out)
            opUI_Read,
            GET_OS_VERS,  # CMD
            LCX(16),  # LENGTH
            GVX(58),  # NAME (out)
            opUI_Read,
            GET_HW_VERS,  # CMD
            LCX(8),  # LENGTH
            GVX(74),  # NAME (out)
            opUI_Read,
            GET_FW_VERS,  # CMD
            LCX(8),  # LENGTH
            GVX(82),  # NAME (out)
            opUI_Read,
            GET_OS_BUILD,  # CMD
            LCX(12),  # LENGTH
            GVX(90),  # NAME (out)
            opUI_Read,
            GET_FW_BUILD,  # CMD
            LCX(12),  # LENGTH
            GVX(102)  # NAME (out)
        ))
        reply = self.send_direct_cmd(
            ops,
            global_mem=114,
            verbosity=verbosity,
            sync_mode=SYNC
        )

        self._introspection = {}
        self._introspection["sensors"] = {}
        for i in range(8):
            sensor_type = struct.unpack(
                '<B',
                reply[2*i:2*i+1]
            )[0]
            if sensor_type == 126:
                self._introspection["sensors"][ports[i]] = {
                        'type': None,
                        'used_by': None
                }
            else:
                self._introspection["sensors"][ports[i]] = {
                        'type': sensor_type,
                        'used_by': None
                }
        (
            self._introspection["mem_total"],
            self._introspection["mem_free"],
            self._introspection["volume"],
            self._introspection["sleep"],
            name,
            os_vers,
            hw_vers,
            fw_vers,
            os_build,
            fw_build
        ) = struct.unpack(
            '<2i2B32s16s8s8s12s12s',
            reply[16:]
        )

        self._introspection["name"] = name.split(b'\x00')[0].decode("utf8")
        self._introspection["os_vers"] = os_vers.split(b'\x00')[0].decode("utf8")
        self._introspection["os_build"] = os_build.split(b'\x00')[0].decode("utf8")
        self._introspection["fw_vers"] = fw_vers.split(b'\x00')[0].decode("utf8")
        self._introspection["fw_build"] = fw_build.split(b'\x00')[0].decode("utf8")
        self._introspection["hw_vers"] = hw_vers.split(b'\x00')[0].decode("utf8")

        if self._protocol == WIFI:
            ops = b''.join((
                opCom_Get,
                GET_NETWORK,
                LCX(3),  # HARDWARE
                LCX(42),  # LENGTH
                GVX(0),  # NAME
                GVX(42),  # MAC
                GVX(84)  # IP
            ))
            (
                self._introspection["network_name"],
                self._introspection["network_mac_adr"],
                self._introspection["network_ip_adr"]
            ) = [
                value.split(b'\x00')[0].decode("utf8")
                for value in struct.unpack(
                    '42s42s42s',
                    self.send_direct_cmd(
                        ops,
                        global_mem=126,
                        verbosity=verbosity,
                        sync_mode=SYNC
                    )
                )
            ]
            self._introspection["network_mac_adr"] = ':'.join(
                self._introspection["network_mac_adr"][i:i+2]
                for i in range(6)
            )

    def send_direct_cmd(
            self,
            ops: bytes,
            *,
            verbosity: int,
            local_mem: int = 0,
            global_mem: int = 0,
            sync_mode: str = None
    ) -> bytes:
        '''
        Send a direct command to the LEGO EV3
        '''

        if sync_mode is None:
            if self._protocol == USB:
                sync_mode = SYNC
            else:
                sync_mode = STD

        self._lock.acquire()

        if (
                global_mem > 0 or
                sync_mode == SYNC
        ):
            cmd_type = _DIRECT_COMMAND_REPLY
        else:
            cmd_type = _DIRECT_COMMAND_NO_REPLY

        msg_cnt = self.next_msg_cnt()

        cmd = b''.join((
            struct.pack('<HH', len(ops) + 5, msg_cnt),
            cmd_type,
            struct.pack('<H', local_mem * 1024 + global_mem),
            ops
        ))

        if verbosity is not None and verbosity > 0:
            print(
                datetime.now().strftime('%H:%M:%S.%f') +
                ' Sent 0x|' +
                ':'.join('{:02X}'.format(byte) for byte in cmd[0:2]) + '|' +
                ':'.join('{:02X}'.format(byte) for byte in cmd[2:4]) + '|' +
                ':'.join('{:02X}'.format(byte) for byte in cmd[4:5]) + '|' +
                ':'.join('{:02X}'.format(byte) for byte in cmd[5:7]) + '|' +
                ':'.join('{:02X}'.format(byte) for byte in cmd[7:]) + '|'
            )

        if self._protocol in (BLUETOOTH, WIFI):
            self._socket.send(cmd)
        else:
            if platform.system() == 'Darwin':
                self._device.write(cmd)
            else:
                self._device.write(_EP_OUT, cmd, 100)

        msg_cnt = cmd[2:4]
        if (
                cmd[4:5] == _DIRECT_COMMAND_NO_REPLY or
                sync_mode == ASYNC
        ):
            self._lock.release()
            return msg_cnt
        return self.wait_for_reply(
            msg_cnt,
            verbosity=verbosity,
            _locked=True
        )

    def wait_for_reply(
            self,
            msg_cnt: bytes,
            verbosity: int = None,
            _locked: bool = False
    ) -> bytes:
        '''
        Ask the LEGO EV3 for a reply and wait until it is received
        '''

        if not _locked:
            self._lock.acquire()

        # reply already in buffer?
        reply = self._reply_buffer.pop(msg_cnt, None)
        if reply is not None:
            self._lock.release()
            if reply[4:5] == _DIRECT_REPLY:
                return reply[5:]
            raise DirCmdError(
                "direct command {:02X}:{:02X} replied error".format(
                    reply[2],
                    reply[3]
                )
            )

        #  get replies from EV3 device until msg_cnt fits
        while True:
            if self._protocol in (BLUETOOTH, WIFI):
                reply = self._socket.recv(1024)
            else:
                if platform.system() == 'Darwin':
                    reply = bytes(self._device.read(1024, 0))
                else:
                    reply = bytes(self._device.read(_EP_IN, 1024, 0))
            len_data = struct.unpack('<H', reply[:2])[0] + 2
            msg_cnt_reply = reply[2:4]
            if verbosity is not None and verbosity > 0:
                print(
                    datetime.now().strftime('%H:%M:%S.%f') +
                    ' Recv 0x|' +
                    ':'.join('{:02X}'.format(byte) for byte in reply[0:2]) +
                    '|' +
                    ':'.join('{:02X}'.format(byte) for byte in reply[2:4]) +
                    '|' +
                    ':'.join('{:02X}'.format(byte) for byte in reply[4:5]) +
                    '|', end=''
                )
                if len_data > 5:
                    dat = ':'.join(
                        '{:02X}'.format(byte) for byte in reply[5:len_data]
                    )
                    print(dat + '|')
                else:
                    print()

            if msg_cnt != msg_cnt_reply:
                # does not fit, put reply into buffer
                self.put_to_reply_buffer(
                    msg_cnt_reply,
                    reply[:len_data]
                )
            else:
                self._lock.release()
                if reply[4:5] == _DIRECT_REPLY:
                    return reply[5:len_data]
                raise DirCmdError(
                    "direct command {:02X}:{:02X} replied error".format(
                        reply[2],
                        reply[3]
                    )
                )

    def send_system_cmd(
            self,
            cmd: bytes,
            *,
            verbosity: int,
            reply: bool = True
    ) -> bytes:
        '''
        Send a system command to the LEGO EV3
        '''

        self._lock.acquire()

        if reply:
            cmd_type = _SYSTEM_COMMAND_REPLY
        else:
            cmd_type = _SYSTEM_COMMAND_NO_REPLY
        msg_cnt = self.next_msg_cnt()
        cmd = b''.join((
            struct.pack('<hh', len(cmd) + 3, msg_cnt),
            cmd_type,
            cmd
        ))
        if verbosity >= 1:
            print(
                datetime.now().strftime('%H:%M:%S.%f') +
                ' Sent 0x|' +
                ':'.join('{:02X}'.format(byte) for byte in cmd[0:2]) + '|' +
                ':'.join('{:02X}'.format(byte) for byte in cmd[2:4]) + '|' +
                ':'.join('{:02X}'.format(byte) for byte in cmd[4:5]) + '|' +
                ':'.join('{:02X}'.format(byte) for byte in cmd[5:]) + '|'
            )
        if self._protocol in (BLUETOOTH, WIFI):
            self._socket.send(cmd)
        else:
            if platform.system() == 'Darwin':
                self._device.write(cmd)
            else:
                self._device.write(_EP_OUT, cmd, 100)

        msg_cnt = cmd[2:4]
        if not reply:
            self._lock.release()
            return msg_cnt
        return self._wait_for_system_reply(
            msg_cnt,
            verbosity=verbosity,
            _locked=True
        )

    def _wait_for_system_reply(
            self,
            msg_cnt: bytes,
            *,
            verbosity: int,
            _locked=False
    ) -> bytes:
        '''
        Ask the LEGO EV3 for a system command reply and wait until received
        '''

        if not _locked:
            self._lock.acquire()

        # reply already in buffer?
        reply = self._reply_buffer.pop(msg_cnt, None)
        if reply is not None:
            self._lock.release()
            if reply[4:5] == _SYSTEM_REPLY:
                return reply[6:]
            raise SysCmdError(
                "system command {:02X}:{:02X} replied error".format(
                    reply[2],
                    reply[3]
                )
            )

        #  get replies from EV3 device until msg_cnt fits
        while True:
            if self._protocol == BLUETOOTH:
                sleep(0.1)
            if self._protocol in (BLUETOOTH, WIFI):
                reply = self._socket.recv(1024)
            else:
                if platform.system() == 'Darwin':
                    reply = bytes(self._device.read(1024, 0))
                else:
                    reply = bytes(self._device.read(_EP_IN, 1024, 0))
            len_data = struct.unpack('<H', reply[:2])[0] + 2
            reply_msg_cnt = reply[2:4]

            if verbosity > 0:
                print(
                    datetime.now().strftime('%H:%M:%S.%f') +
                    ' Recv 0x|' +
                    ':'.join('{:02X}'.format(byte) for byte in reply[0:2]) +
                    '|' +
                    ':'.join('{:02X}'.format(byte) for byte in reply[2:4]) +
                    '|' +
                    ':'.join('{:02X}'.format(byte) for byte in reply[4:5]) +
                    '|' +
                    ':'.join('{:02X}'.format(byte) for byte in reply[5:6]) +
                    '|' +
                    ':'.join('{:02X}'.format(byte) for byte in reply[6:7]) +
                    '|',
                    end=''
                )
                if len_data > 7:
                    dat = ':'.join(
                        '{:02X}'.format(byte) for byte in reply[7:len_data]
                    )
                    print(dat + '|')
                else:
                    print()

            if msg_cnt != reply_msg_cnt:
                self.put_to_reply_buffer(
                    reply_msg_cnt,
                    reply[:len_data]
                )
            else:
                self._lock.release()
                if reply[4:5] == _SYSTEM_REPLY:
                    return reply[6:len_data]
                raise SysCmdError(
                    "system command {:02X}:{:02X} replied error".format(
                        reply[2],
                        reply[3]
                    )
                )


[docs]class EV3: ''' communicates with a LEGO EV3 using direct or system commands ''' def __init__( self, *, protocol: str = None, host: str = None, ev3_obj: 'EV3' = None, sync_mode: str = None, verbosity=0 ): ''' Establish a connection to a LEGO EV3 device Keyword arguments (either protocol and host or ev3_obj) protocol 'Bluetooth', 'USB' or 'WiFi' host MAC-address of the LEGO EV3 (e.g. '00:16:53:42:2B:99') ev3_obj existing EV3 object (its connections will be used) sync mode (standard, asynchronous, synchronous) STD - if reply then use DIRECT_COMMAND_REPLY and wait for reply. ASYNC - if reply then use DIRECT_COMMAND_REPLY, but never wait for reply (it's the task of the calling program). SYNC - Always use DIRECT_COMMAND_REPLY and wait for reply, which may be empty. verbosity level (0, 1, 2) of verbosity (prints on stdout). ''' assert ev3_obj or protocol, \ 'Either protocol or ev3_obj needs to be given' if ev3_obj: assert isinstance(ev3_obj, EV3), \ 'ev3_obj needs to be instance of EV3' assert ev3_obj._physical_ev3 is not None, \ 'ev3_obj needs to be active' self._conn_owner = False self._physical_ev3 = ev3_obj._physical_ev3 else: self._conn_owner = True try: self._physical_ev3 = _PhysicalEV3(protocol, host) except: self._physical_ev3 = None raise assert isinstance(verbosity, int), \ "verbosity needs to be of type int" assert 0 <= verbosity <= 2, \ "allowed verbosity values are: 0, 1 or 2" self._verbosity = int(verbosity) assert sync_mode is None or isinstance(sync_mode, str), \ "sync_mode needs to be of type str" assert sync_mode is None or sync_mode in (STD, SYNC, ASYNC), \ "value of sync_mode: " + sync_mode + " is invalid" if sync_mode is None and self._physical_ev3._protocol == USB: self._sync_mode = SYNC else: self._sync_mode = STD def __enter__(self): ''' allows to code 'with EV3 as my_ev3:' ''' return self def __exit__(self, exc_type, exc_value, exc_traceback): ''' allows a clean exit from with block ''' if self._conn_owner: self._physical_ev3.__del__() self._physical_ev3 = None def __del__(self): ''' handles deletions ''' if self._conn_owner and self._physical_ev3 is not None: self._physical_ev3.__del__() self._physical_ev3 = None def __str__(self): '''representation of the object in a str context''' return ' '.join(( f'{self.protocol}', f'connected EV3 {self.host}', f'({self.name})' )) @property def battery(self) -> Battery: ''' battery voltage [V], current [A] and percentage (as named tuple) ''' reply = self._physical_ev3.send_direct_cmd( b''.join(( opUI_Read, GET_VBATT, GVX(0), opUI_Read, GET_IBATT, GVX(4), opUI_Read, GET_LBATT, GVX(8) )), global_mem=9, verbosity=self._verbosity, sync_mode=self._sync_mode ) return Battery( *struct.unpack('<2fB', reply) ) @property def host(self) -> str: ''' mac address of EV3 device ''' return self._physical_ev3._host @property def memory(self) -> Memory: ''' total and free memory [kB] (as named tuple) ''' if self._physical_ev3._introspection is None: self._physical_ev3.introspection(self._verbosity) return Memory( self._physical_ev3._introspection["mem_total"], self._physical_ev3._introspection["mem_free"] ) @property def name(self) -> str: ''' name of EV3 device ''' if self._physical_ev3._introspection is None: self._physical_ev3.introspection(self._verbosity) return self._physical_ev3._introspection["name"] @name.setter def name(self, value: str): assert isinstance(value, str), "name needs to be of type str" assert value != '', "name may not be an empty string" if self._physical_ev3._introspection is None: self._physical_ev3.introspection(self._verbosity) if value != self._physical_ev3._introspection["name"]: self._physical_ev3.send_direct_cmd( b''.join(( opCom_Set, # operation SET_BRICKNAME, # CMD LCS(value) # NAME )), verbosity=self._verbosity, sync_mode=self._sync_mode ) self._physical_ev3._introspection["name"] = value @property def network(self) -> str: ''' name, ip_adr and mac_adr of the EV3 device (as named tuple) available only for WiFi connected devices, mac_adr is the address of the WiFi dongle ''' assert self._physical_ev3._protocol == WIFI, \ 'available only for WiFi connected devices' if self._physical_ev3._introspection is None: self._physical_ev3.introspection(self._verbosity) return Network( self._physical_ev3._introspection["network_name"], self._physical_ev3._introspection["network_ip_adr"], self._physical_ev3._introspection["network_mac_adr"] ) @property def protocol(self) -> str: ''' connection type ''' return self._physical_ev3._protocol @property def sensors(self) -> Sensors: ''' all connected sensors and motors at all ports (as named tuple Sensors) You can address a single one by e.g.: ev3_dc.EV3.sensors.Port_3 or ev3_dc.EV3.sensors.Port_C ''' if self._physical_ev3._introspection is None: self._physical_ev3.introspection(self._verbosity) return Sensors( self._physical_ev3._introspection["sensors"][PORT_1]["type"], self._physical_ev3._introspection["sensors"][PORT_2]["type"], self._physical_ev3._introspection["sensors"][PORT_3]["type"], self._physical_ev3._introspection["sensors"][PORT_4]["type"], self._physical_ev3._introspection["sensors"][PORT_A_SENSOR]["type"], self._physical_ev3._introspection["sensors"][PORT_B_SENSOR]["type"], self._physical_ev3._introspection["sensors"][PORT_C_SENSOR]["type"], self._physical_ev3._introspection["sensors"][PORT_D_SENSOR]["type"] ) @property def sensors_as_dict(self) -> dict: ''' all connected sensors and motors at all ports (as dict) You can address a single one by e.g.: ev3_dc.EV3.sensors_as_dict[ev3_dc.PORT_1] or ev3_dc.EV3.sensors_as_dict[ev3_dc.PORT_A_SENSOR] ''' if self._physical_ev3._introspection is None: self._physical_ev3.introspection(self._verbosity) return { key: sensor["type"] for key, sensor in self._physical_ev3._introspection["sensors"].items() } @property def sleep(self) -> int: ''' idle minutes until EV3 shuts down, values from 0 to 120 value 0 says: never shut down ''' if self._physical_ev3._introspection is None: self._physical_ev3.introspection(self._verbosity) return self._physical_ev3._introspection["sleep"] @sleep.setter def sleep(self, value: int): assert isinstance(value, int), "sleep needs to be of type int" assert 0 <= value <= 120, "sleep must be between 0 and 120" if self._physical_ev3._introspection is None: self._physical_ev3.introspection(self._verbosity) if value != self._physical_ev3._introspection["sleep"]: self._physical_ev3.send_direct_cmd( b''.join(( opInfo, # operation SET_MINUTES, # CMD LCX(value) # NAME )), verbosity=self._verbosity, sync_mode=self._sync_mode ) self._physical_ev3._introspection["sleep"] = value @property def sync_mode(self) -> str: ''' sync mode (standard, asynchronous, synchronous) STD use DIRECT_COMMAND_REPLY only if global_mem > 0, wait for reply if there is one. ASYNC use DIRECT_COMMAND_REPLY only if global_mem > 0, never wait for reply (it's the task of the calling program). SYNC always use DIRECT_COMMAND_REPLY and wait for reply. The idea is ASYNC - if there is a reply, it must explicitly be asked for. Control directly comes back. SYNC - EV3 device is blocked and control comes back, when direct command is finished, which means synchronicity of program and EV3 device. STD - direct commands with no reply are handled like ASYNC, direct commands with reply are handled like SYNC. ''' return self._sync_mode @sync_mode.setter def sync_mode(self, value: str): assert isinstance(value, str), \ "sync_mode needs to be of type str" assert value in (STD, SYNC, ASYNC), \ "value of sync_mode: " + value + " is invalid" self._sync_mode = value @property def system(self) -> str: ''' system versions and build numbers (as named tuple System) os_version operating system version os_build operating system build number fw_version firmware version fw_build firmware build number hw_version hardware version ''' if self._physical_ev3._introspection is None: self._physical_ev3.introspection(self._verbosity) return System( self._physical_ev3._introspection["os_vers"], self._physical_ev3._introspection["os_build"], self._physical_ev3._introspection["fw_vers"], self._physical_ev3._introspection["fw_build"], self._physical_ev3._introspection["hw_vers"] ) @property def verbosity(self) -> int: ''' level of verbosity (prints on stdout), values 0, 1 or 2 ''' return self._verbosity @verbosity.setter def verbosity(self, value: int): assert isinstance(value, int), \ "verbosity needs to be of type int" assert 0 <= value <= 2, \ "allowed verbosity values are: 0, 1 or 2" self._verbosity = int(value) @property def volume(self) -> int: ''' sound volume [%], values from 0 to 100 ''' if self._physical_ev3._introspection is None: self._physical_ev3.introspection(self._verbosity) return self._physical_ev3._introspection["volume"] @volume.setter def volume(self, value: int): assert isinstance(value, int), "volume needs to be of type int" assert 0 <= value <= 100, "volume must be between 0 and 100" if self._physical_ev3._introspection is None: self._physical_ev3.introspection(self._verbosity) if value != self._physical_ev3._introspection["volume"]: self._physical_ev3.send_direct_cmd( b''.join(( opInfo, # operation SET_VOLUME, # CMD LCX(value) # NAME )), verbosity=self._verbosity, sync_mode=self._sync_mode ) self._physical_ev3._introspection["volume"] = value
[docs] def send_direct_cmd( self, ops: bytes, *, local_mem: int = 0, global_mem: int = 0, sync_mode: str = None, verbosity: int = None ) -> bytes: ''' Send a direct command to the LEGO EV3 Mandatory positional arguments ops holds netto data only (operations), these fields are added: length: 2 bytes, little endian msg_cnt: 2 bytes, little endian type: 1 byte, DIRECT_COMMAND_REPLY or DIRECT_COMMAND_NO_REPLY header: 2 bytes, holds sizes of local and global memory Optional keyword only arguments local_mem size of the local memory global_mem size of the global memory sync_mode synchronization mode (STD, SYNC, ASYNC) verbosity level (0, 1, 2) of verbosity (prints on stdout). Returns STD (sync_mode) if global_mem > 0 then reply else message counter ASYNC (sync_mode) message counter SYNC (sync_mode) reply of the LEGO EV3 ''' assert isinstance(ops, bytes), \ "ops needs to be of type bytes" assert len(ops) > 0, \ "ops must not be empty" assert isinstance(local_mem, int), \ "local_mem needs to be an integer" assert local_mem >= 0, \ "local_mem needs to be positive" assert local_mem <= 63, \ "local_mem has a maximum of 63" assert isinstance(global_mem, int), \ "global_mem needs to be an integer" assert global_mem >= 0, \ "global_mem needs to be positive" assert local_mem <= 1019, \ "global_mem has a maximum of 1019" assert sync_mode is None or isinstance(sync_mode, str), \ "sync_mode needs to be of type str" assert sync_mode is None or sync_mode in (STD, SYNC, ASYNC), \ "value of sync_mode: " + sync_mode + " is invalid" assert verbosity is None or isinstance(verbosity, int), \ "verbosity needs to be of type int" assert verbosity is None or 0 <= verbosity <= 2, \ "allowed verbosity values are: 0, 1 or 2" if sync_mode is None: sync_mode = self._sync_mode if verbosity is None: verbosity = self._verbosity return self._physical_ev3.send_direct_cmd( ops, local_mem=local_mem, global_mem=global_mem, sync_mode=sync_mode, verbosity=verbosity )
[docs] def wait_for_reply( self, msg_cnt: bytes, *, verbosity: int = None ) -> bytes: ''' Ask the LEGO EV3 for a reply and wait until it is received Mandatory positional arguments msg_cnt is the message counter of the corresponding send_direct_cmd Optional keyword only arguments verbosity level (0, 1, 2) of verbosity (prints on stdout). Returns reply to the direct command (without len, msg_cnt and return status) ''' assert isinstance(msg_cnt, bytes), \ "msg_cnt needs to be of type bytes" assert len(msg_cnt) == 2, \ "msg_cnt must be 2 bytes long" assert verbosity is None or isinstance(verbosity, int), \ "verbosity needs to be of type int" assert verbosity is None or 0 <= verbosity <= 2, \ "allowed verbosity values are: 0, 1 or 2" if verbosity is None: verbosity = self._verbosity return self._physical_ev3.wait_for_reply( msg_cnt, verbosity=verbosity )
[docs] def send_system_cmd( self, cmd: bytes, *, reply: bool = True, verbosity: int = None ) -> bytes: ''' Send a system command to the LEGO EV3 Mandatory positional arguments cmd holds netto data only (cmd and arguments), these fields are added: length: 2 bytes, little endian msg_cnt: 2 bytes, little endian type: 1 byte, SYSTEM_COMMAND_REPLY or SYSTEM_COMMAND_NO_REPLY Optional keyword only arguments reply flag if with reply verbosity level (0, 1, 2) of verbosity (prints on stdout). Returns reply (in case of SYSTEM_COMMAND_NO_REPLY: msg_cnt) ''' assert isinstance(cmd, bytes), \ "cmd needs to be of type bytes" assert isinstance(reply, bool), \ "reply needs to be of type bool" assert verbosity is None or isinstance(verbosity, int), \ "verbosity needs to be of type int" assert verbosity is None or 0 <= verbosity <= 2, \ "allowed verbosity values are: 0, 1 or 2" if verbosity is None: verbosity = self._verbosity return self._physical_ev3.send_system_cmd( cmd, reply=reply, verbosity=verbosity )
def _wait_for_system_reply( self, msg_cnt: bytes, *, verbosity: int = None ) -> bytes: ''' Ask the LEGO EV3 for a system command reply and wait until received Mandatory positional arguments msg_cnt is the message counter of the corresponding send_system_cmd Optional keyword only arguments verbosity level (0, 1, 2) of verbosity (prints on stdout). Returns reply to the system command ''' assert isinstance(msg_cnt, bytes), \ "msg_cnt needs to be of type bytes" assert len(msg_cnt) == 2, \ "msg_cnt must be 2 bytes long" assert verbosity is None or isinstance(verbosity, int), \ "verbosity needs to be of type int" assert verbosity is None or 0 <= verbosity <= 2, \ "allowed verbosity values are: 0, 1 or 2" if verbosity is None: verbosity = self._verbosity return self._physical_ev3._wait_for_system_reply( msg_cnt, verbosity=verbosity )