diff --git a/can/interfaces/gs_usb.py b/can/interfaces/gs_usb.py index 6297fc1f5..2169ded7a 100644 --- a/can/interfaces/gs_usb.py +++ b/can/interfaces/gs_usb.py @@ -1,4 +1,5 @@ import logging +from typing import Any import usb from gs_usb.constants import CAN_EFF_FLAG, CAN_ERR_FLAG, CAN_MAX_DLC, CAN_RTR_FLAG @@ -12,17 +13,46 @@ logger = logging.getLogger(__name__) +def _find_gs_usb_devices( + bus: int | None = None, address: int | None = None +) -> list[usb.core.Device]: + """Find raw USB devices for gs_usb using auto-detected backend. + + Unlike :meth:`GsUsb.scan`, this does not force the ``libusb1`` backend, + allowing ``pyusb`` to auto-detect the best available backend. This enables + support for WinUSB on Windows in addition to libusbK. + + :param bus: number of the bus that the device is connected to + :param address: address of the device on the bus it is connected to + :return: a list of found raw USB devices + """ + kwargs = {} + if bus is not None: + kwargs["bus"] = bus + if address is not None: + kwargs["address"] = address + + return list( + usb.core.find( + find_all=True, + custom_match=GsUsb.is_gs_usb_device, + **kwargs, + ) + or [] + ) + + class GsUsbBus(can.BusABC): def __init__( self, - channel, + channel: can.typechecking.Channel, bitrate: int = 500_000, - index=None, - bus=None, - address=None, - can_filters=None, - **kwargs, - ): + index: int | None = None, + bus: int | None = None, + address: int | None = None, + can_filters: can.typechecking.CanFilters | None = None, + **kwargs: Any, + ) -> None: """ :param channel: usb device name :param index: device number if using automatic scan, starting from 0. @@ -32,31 +62,41 @@ def __init__( :param can_filters: not supported :param bitrate: CAN network bandwidth (bits/s) """ - self._is_shutdown = False if (index is not None) and ((bus or address) is not None): raise CanInitializationError( "index and bus/address cannot be used simultaneously" ) if index is None and address is None and bus is None: - index = channel + _index: Any = channel + else: + _index = index - self._index = None - if index is not None: - devs = GsUsb.scan() - if len(devs) <= index: + self._index: int | None = None + if _index is not None: + if not isinstance(_index, int): + try: + _index = int(_index) + except (ValueError, TypeError): + raise CanInitializationError( + f"index must be an integer, but got {type(_index).__name__} ({_index})" + ) from None + + devs = _find_gs_usb_devices() + if len(devs) <= _index: raise CanInitializationError( - f"Cannot find device {index}. Devices found: {len(devs)}" + f"Cannot find device {_index}. Devices found: {len(devs)}" ) - gs_usb = devs[index] - self._index = index + gs_usb_dev = devs[_index] + self._index = _index else: - gs_usb = GsUsb.find(bus=bus, address=address) - if not gs_usb: + devs = _find_gs_usb_devices(bus=bus, address=address) + if not devs: raise CanInitializationError(f"Cannot find device {channel}") + gs_usb_dev = devs[0] - self.gs_usb = gs_usb - self.channel_info = channel + self.gs_usb = GsUsb(gs_usb_dev) + self.channel_info = str(channel) self._can_protocol = can.CanProtocol.CAN_20 bit_timing = can.BitTiming.from_sample_point( @@ -81,7 +121,7 @@ def __init__( **kwargs, ) - def send(self, msg: can.Message, timeout: float | None = None): + def send(self, msg: can.Message, timeout: float | None = None) -> None: """Transmit a message to the CAN bus. :param Message msg: A message object. @@ -139,7 +179,7 @@ def _recv_internal(self, timeout: float | None) -> tuple[can.Message | None, boo frame = GsUsbFrame() # Do not set timeout as None or zero here to avoid blocking - timeout_ms = round(timeout * 1000) if timeout else 1 + timeout_ms = round(timeout * 1000) if timeout else 0 if not self.gs_usb.read(frame=frame, timeout_ms=timeout_ms): return None, False @@ -158,21 +198,22 @@ def _recv_internal(self, timeout: float | None) -> tuple[can.Message | None, boo return msg, False def shutdown(self): - if self._is_shutdown: + already_shutdown = self._is_shutdown + super().shutdown() + if already_shutdown: return - super().shutdown() self.gs_usb.stop() if self._index is not None: - # Avoid errors on subsequent __init() by repeating the .scan() and .start() that would otherwise fail - # the next time the device is opened in __init__() - devs = GsUsb.scan() + # Avoid errors on subsequent __init() by repeating the .scan() and + # .start() that would otherwise fail the next time the device is + # opened in __init__() + devs = _find_gs_usb_devices() if self._index < len(devs): - gs_usb = devs[self._index] + gs_usb = GsUsb(devs[self._index]) try: gs_usb.set_bitrate(self._bitrate) gs_usb.start() gs_usb.stop() except usb.core.USBError: pass - self._is_shutdown = True diff --git a/doc/changelog.d/2031.changed.md b/doc/changelog.d/2031.changed.md new file mode 100644 index 000000000..9d6bf79aa --- /dev/null +++ b/doc/changelog.d/2031.changed.md @@ -0,0 +1 @@ +make gs_usb use pyusb (allows WinUSB instead of requiring libusbK on windows) also timeout=None means foreever diff --git a/doc/interfaces/gs_usb.rst b/doc/interfaces/gs_usb.rst index 8bab07c6f..580a994fc 100755 --- a/doc/interfaces/gs_usb.rst +++ b/doc/interfaces/gs_usb.rst @@ -52,8 +52,10 @@ Windows, Linux and Mac. The backend driver depends on `pyusb `_ so a ``pyusb`` backend driver library such as ``libusb`` must be installed. - On Windows a tool such as `Zadig `_ can be used to set the USB device driver to - ``libusbK``. + On Windows, WinUSB and libusbK are both supported. Devices with WCID (Windows Compatible ID) descriptors, + such as candleLight firmware, will automatically use WinUSB without any additional driver installation. + Alternatively, a tool such as `Zadig `_ can be used to set the USB device driver to + either ``WinUSB`` or ``libusbK``. Supplementary Info diff --git a/pyproject.toml b/pyproject.toml index 9eb4a41cd..8ed86e611 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,7 +64,7 @@ neovi = ["filelock", "python-ics>=2.12"] canalystii = ["canalystii>=0.1.0"] cantact = ["cantact>=0.0.7"] cvector = ["python-can-cvector"] -gs-usb = ["gs-usb>=0.2.1"] +gs-usb = ["gs-usb>=0.2.1", "pyusb>=1.0.2"] nixnet = ["nixnet>=0.3.2"] pcan = ["uptime~=3.0.1"] remote = ["python-can-remote"] diff --git a/test/test_interface_gs_usb.py b/test/test_interface_gs_usb.py new file mode 100644 index 000000000..9f0e35534 --- /dev/null +++ b/test/test_interface_gs_usb.py @@ -0,0 +1,67 @@ +"""Tests for the gs_usb interface.""" + +from unittest.mock import MagicMock, patch + +import pytest + +from can.interfaces.gs_usb import ( + GsUsbBus, + _find_gs_usb_devices, +) + + +@patch("can.interfaces.gs_usb.usb.core.find") +def test_find_devices_does_not_force_backend(mock_find): + """Verify that _find_gs_usb_devices does not pass a backend argument, + allowing pyusb to auto-detect the best available backend (WinUSB, libusbK, etc.).""" + mock_find.return_value = [] + + _find_gs_usb_devices() + + mock_find.assert_called_once() + call_kwargs = mock_find.call_args[1] + assert ( + "backend" not in call_kwargs + ), "backend should not be specified so pyusb can auto-detect" + assert call_kwargs["find_all"] is True + + +@patch("can.interfaces.gs_usb.usb.core.find") +def test_find_devices_with_args_does_not_force_backend(mock_find): + """Verify that _find_gs_usb_devices with bus/address does not pass a backend argument.""" + mock_find.return_value = [] + + _find_gs_usb_devices(bus=1, address=2) + + mock_find.assert_called_once() + call_kwargs = mock_find.call_args[1] + assert ( + "backend" not in call_kwargs + ), "backend should not be specified so pyusb can auto-detect" + assert call_kwargs["bus"] == 1 + assert call_kwargs["address"] == 2 + assert call_kwargs["find_all"] is True + + +@patch("can.interfaces.gs_usb.usb.core.find") +def test_find_devices_returns_raw_usb_devices(mock_find): + """Verify that _find_gs_usb_devices returns the raw USB devices.""" + mock_dev1 = MagicMock() + mock_dev2 = MagicMock() + mock_find.return_value = [mock_dev1, mock_dev2] + + devices = _find_gs_usb_devices() + + assert len(devices) == 2 + assert devices[0] is mock_dev1 + assert devices[1] is mock_dev2 + + +@patch("can.interfaces.gs_usb.usb.core.find") +def test_find_devices_returns_empty_list_when_no_devices(mock_find): + """Verify that _find_gs_usb_devices returns an empty list when no devices are found.""" + mock_find.return_value = [] + + devices = _find_gs_usb_devices() + + assert devices == [] diff --git a/tox.ini b/tox.ini index e635ec000..2e695f9e4 100644 --- a/tox.ini +++ b/tox.ini @@ -14,6 +14,7 @@ dependency_groups = test extras = canalystii + gs-usb mf4 multicast pywin32 @@ -27,6 +28,8 @@ extras = canalystii mf4 multicast + gs-usb + serial pywin32 serial # still no windows-curses for py314 @@ -34,6 +37,7 @@ extras = [testenv:{py313t,py314t,pypy310,pypy311}] extras = canalystii + gs-usb serial [testenv:docs]