-
Notifications
You must be signed in to change notification settings - Fork 666
gs-usb WinUSB support and timeout=none is forever #2031
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
6c4d115
03d1b16
713736f
15a4978
a0371b0
e128a3d
e066590
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,42 @@ | |
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def _scan_gs_usb_devices() -> list[GsUsb]: | ||
| """Scan for gs_usb devices using auto-detected backend. | ||
|
|
||
| Unlike :meth:`GsUsb.scan`, this does not force the ``libusb1`` backend, | ||
| allowing ``pyusb`` to auto-detect the best available backend. This enables | ||
| support for WinUSB on Windows in addition to libusbK. | ||
| """ | ||
| return [ | ||
| GsUsb(dev) | ||
| for dev in ( | ||
| usb.core.find( | ||
| find_all=True, | ||
| custom_match=GsUsb.is_gs_usb_device, | ||
| ) | ||
| or [] | ||
| ) | ||
| ] | ||
|
|
||
|
|
||
| def _find_gs_usb_device(bus: int, address: int) -> GsUsb | None: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think |
||
| """Find a specific gs_usb device using auto-detected backend. | ||
|
|
||
| Unlike :meth:`GsUsb.find`, this does not force the ``libusb1`` backend, | ||
| allowing ``pyusb`` to auto-detect the best available backend. This enables | ||
| support for WinUSB on Windows in addition to libusbK. | ||
| """ | ||
| dev = usb.core.find( | ||
| custom_match=GsUsb.is_gs_usb_device, | ||
| bus=bus, | ||
| address=address, | ||
| ) | ||
| if dev: | ||
| return GsUsb(dev) | ||
| return None | ||
|
|
||
|
|
||
| class GsUsbBus(can.BusABC): | ||
| def __init__( | ||
| self, | ||
|
|
@@ -32,7 +68,6 @@ def __init__( | |
| :param can_filters: not supported | ||
| :param bitrate: CAN network bandwidth (bits/s) | ||
| """ | ||
| self._is_shutdown = False | ||
| if (index is not None) and ((bus or address) is not None): | ||
| raise CanInitializationError( | ||
| "index and bus/address cannot be used simultaneously" | ||
|
|
@@ -43,15 +78,15 @@ def __init__( | |
|
|
||
| self._index = None | ||
| if index is not None: | ||
| devs = GsUsb.scan() | ||
| devs = _scan_gs_usb_devices() | ||
| if len(devs) <= index: | ||
| raise CanInitializationError( | ||
| f"Cannot find device {index}. Devices found: {len(devs)}" | ||
| ) | ||
| gs_usb = devs[index] | ||
| self._index = index | ||
| else: | ||
| gs_usb = GsUsb.find(bus=bus, address=address) | ||
| gs_usb = _find_gs_usb_device(bus=bus, address=address) | ||
| if not gs_usb: | ||
| raise CanInitializationError(f"Cannot find device {channel}") | ||
|
|
||
|
|
@@ -139,7 +174,7 @@ def _recv_internal(self, timeout: float | None) -> tuple[can.Message | None, boo | |
| frame = GsUsbFrame() | ||
|
|
||
| # Do not set timeout as None or zero here to avoid blocking | ||
| timeout_ms = round(timeout * 1000) if timeout else 1 | ||
| timeout_ms = round(timeout * 1000) if timeout else 0 | ||
| if not self.gs_usb.read(frame=frame, timeout_ms=timeout_ms): | ||
| return None, False | ||
|
|
||
|
|
@@ -158,15 +193,16 @@ def _recv_internal(self, timeout: float | None) -> tuple[can.Message | None, boo | |
| return msg, False | ||
|
|
||
| def shutdown(self): | ||
| if self._is_shutdown: | ||
| already_shutdown = self._is_shutdown | ||
| super().shutdown() | ||
| if already_shutdown: | ||
| return | ||
|
|
||
| super().shutdown() | ||
| self.gs_usb.stop() | ||
| if self._index is not None: | ||
| # Avoid errors on subsequent __init() by repeating the .scan() and .start() that would otherwise fail | ||
| # the next time the device is opened in __init__() | ||
| devs = GsUsb.scan() | ||
| devs = _scan_gs_usb_devices() | ||
| if self._index < len(devs): | ||
| gs_usb = devs[self._index] | ||
| try: | ||
|
|
@@ -175,4 +211,3 @@ def shutdown(self): | |
| gs_usb.stop() | ||
| except usb.core.USBError: | ||
| pass | ||
| self._is_shutdown = True | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| * make gs_usb use WinUSB instead of requiring libusbK. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pyusb works on all platforms though, right? Btw: The news fragment should be a sentence or a text, not a list. |
||
| * gs_usb: timeout=None means foreever | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -64,7 +64,7 @@ neovi = ["filelock", "python-ics>=2.12"] | |
| canalystii = ["canalystii>=0.1.0"] | ||
| cantact = ["cantact>=0.0.7"] | ||
| cvector = ["python-can-cvector"] | ||
| gs-usb = ["gs-usb>=0.2.1"] | ||
| gs-usb = ["gs-usb>=0.2.1", "pyusb>=1.0.2"] | ||
| nixnet = ["nixnet>=0.3.2"] | ||
| pcan = ["uptime~=3.0.1"] | ||
| remote = ["python-can-remote"] | ||
|
|
@@ -103,6 +103,7 @@ test = [ | |
| "coverage==7.13.*", | ||
| "hypothesis==6.*", | ||
| "parameterized==0.9.*", | ||
| "gs-usb==0.3.*", | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can add it to tox.ini instead. |
||
| ] | ||
| dev = [ | ||
| {include-group = "docs"}, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| """Tests for the gs_usb interface.""" | ||
|
|
||
| from unittest.mock import MagicMock, patch | ||
|
|
||
| import pytest | ||
|
|
||
| from can.interfaces.gs_usb import ( | ||
| GsUsbBus, | ||
| _find_gs_usb_device, | ||
| _scan_gs_usb_devices, | ||
| ) | ||
|
|
||
|
|
||
| @patch("can.interfaces.gs_usb.usb.core.find") | ||
| def test_scan_does_not_force_backend(mock_find): | ||
| """Verify that _scan_gs_usb_devices does not pass a backend argument, | ||
| allowing pyusb to auto-detect the best available backend (WinUSB, libusbK, etc.).""" | ||
| mock_find.return_value = [] | ||
|
|
||
| _scan_gs_usb_devices() | ||
|
|
||
| mock_find.assert_called_once() | ||
| call_kwargs = mock_find.call_args[1] | ||
| assert ( | ||
| "backend" not in call_kwargs | ||
| ), "backend should not be specified so pyusb can auto-detect" | ||
| assert call_kwargs["find_all"] is True | ||
|
|
||
|
|
||
| @patch("can.interfaces.gs_usb.usb.core.find") | ||
| def test_find_does_not_force_backend(mock_find): | ||
| """Verify that _find_gs_usb_device does not pass a backend argument, | ||
| allowing pyusb to auto-detect the best available backend (WinUSB, libusbK, etc.).""" | ||
| mock_find.return_value = None | ||
|
|
||
| _find_gs_usb_device(bus=1, address=2) | ||
|
|
||
| mock_find.assert_called_once() | ||
| call_kwargs = mock_find.call_args[1] | ||
| assert ( | ||
| "backend" not in call_kwargs | ||
| ), "backend should not be specified so pyusb can auto-detect" | ||
| assert call_kwargs["bus"] == 1 | ||
| assert call_kwargs["address"] == 2 | ||
|
|
||
|
|
||
| @patch("can.interfaces.gs_usb.usb.core.find") | ||
| def test_scan_returns_gs_usb_devices(mock_find): | ||
| """Verify that _scan_gs_usb_devices wraps found USB devices in GsUsb objects.""" | ||
| mock_dev1 = MagicMock() | ||
| mock_dev2 = MagicMock() | ||
| mock_find.return_value = [mock_dev1, mock_dev2] | ||
|
|
||
| devices = _scan_gs_usb_devices() | ||
|
|
||
| assert len(devices) == 2 | ||
| assert devices[0].gs_usb is mock_dev1 | ||
| assert devices[1].gs_usb is mock_dev2 | ||
|
|
||
|
|
||
| @patch("can.interfaces.gs_usb.usb.core.find") | ||
| def test_find_returns_gs_usb_device(mock_find): | ||
| """Verify that _find_gs_usb_device wraps the found USB device in a GsUsb object.""" | ||
| mock_dev = MagicMock() | ||
| mock_find.return_value = mock_dev | ||
|
|
||
| device = _find_gs_usb_device(bus=1, address=2) | ||
|
|
||
| assert device is not None | ||
| assert device.gs_usb is mock_dev | ||
|
|
||
|
|
||
| @patch("can.interfaces.gs_usb.usb.core.find") | ||
| def test_find_returns_none_when_no_device(mock_find): | ||
| """Verify that _find_gs_usb_device returns None when no device is found.""" | ||
| mock_find.return_value = None | ||
|
|
||
| device = _find_gs_usb_device(bus=1, address=2) | ||
|
|
||
| assert device is None | ||
|
|
||
|
|
||
| @patch("can.interfaces.gs_usb.usb.core.find") | ||
| def test_scan_returns_empty_list_when_no_devices(mock_find): | ||
| """Verify that _scan_gs_usb_devices returns an empty list when no devices are found.""" | ||
| mock_find.return_value = [] | ||
|
|
||
| devices = _scan_gs_usb_devices() | ||
|
|
||
| assert devices == [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it make sense to instantiate
GsUsbfor every found dev?