diff --git a/can/io/asc.py b/can/io/asc.py index c02020e27..93ed79c35 100644 --- a/can/io/asc.py +++ b/can/io/asc.py @@ -10,7 +10,7 @@ import re from collections.abc import Generator from datetime import datetime, timezone, tzinfo -from typing import Any, Final, TextIO +from typing import Any, Final, Literal, TextIO from ..message import Message from ..typechecking import StringPathLike @@ -75,7 +75,7 @@ def __init__( self.relative_timestamp = relative_timestamp self.date: str | None = None self.start_time = 0.0 - # TODO - what is this used for? The ASC Writer only prints `absolute` + self._last_timestamp = 0.0 self.timestamps_format: str | None = None self.internal_events_logged = False @@ -294,6 +294,7 @@ def __iter__(self) -> Generator[Message, None, None]: if self.relative_timestamp else self._datetime_to_timestamp(datetime_str, self._timezone) ) + self._last_timestamp = self.start_time continue # Handle the "Start of measurement" line @@ -309,7 +310,11 @@ def __iter__(self) -> Generator[Message, None, None]: msg_kwargs: dict[str, float | bool | int] = {} try: _timestamp, channel, rest_of_message = line.split(None, 2) - timestamp = float(_timestamp) + self.start_time + if self.timestamps_format == "relative" and not self.relative_timestamp: + self._last_timestamp += float(_timestamp) + timestamp = self._last_timestamp + else: + timestamp = float(_timestamp) + self.start_time msg_kwargs["timestamp"] = timestamp if channel == "CANFD": msg_kwargs["is_fd"] = True @@ -372,6 +377,7 @@ def __init__( file: StringPathLike | TextIO, channel: int = 1, tz: tzinfo | None = _LOCAL_TZ, + timestamps_format: Literal["absolute", "relative"] = "absolute", **kwargs: Any, ) -> None: """ @@ -384,7 +390,22 @@ def __init__( have a channel set. Default is 1. :param tz: Timezone for timestamps in the log file. Defaults to local timezone. + :param timestamps_format: + the format of timestamps in the header. + Use ``"absolute"`` (default) so that readers can recover + the original wall-clock timestamps by combining the + per-message offset with the trigger-block start time. + Use ``"relative"`` when only the elapsed time from the + start of the recording matters and no absolute time + recovery is needed. + :raises ValueError: if *timestamps_format* is not ``"absolute"`` or + ``"relative"`` """ + if timestamps_format not in ("absolute", "relative"): + raise ValueError( + f"timestamps_format must be 'absolute' or 'relative', " + f"got {timestamps_format!r}" + ) if kwargs.get("append", False): raise ValueError( f"{self.__class__.__name__} is currently not equipped to " @@ -394,11 +415,12 @@ def __init__( self._timezone = tz self.channel = channel + self.timestamps_format = timestamps_format # write start of file header start_time = self._format_header_datetime(datetime.now(tz=self._timezone)) self.file.write(f"date {start_time}\n") - self.file.write("base hex timestamps absolute\n") + self.file.write(f"base hex timestamps {self.timestamps_format}\n") self.file.write("internal events logged\n") # the last part is written with the timestamp of the first message @@ -445,10 +467,17 @@ def log_event(self, message: str, timestamp: float | None = None) -> None: # Use last known timestamp if unknown if timestamp is None: timestamp = self.last_timestamp - # turn into relative timestamps if necessary - if timestamp >= self.started: - timestamp -= self.started - line = self.FORMAT_EVENT.format(timestamp=timestamp, message=message) + timestamp = max(timestamp, self.last_timestamp) + # Compute written timestamp based on configured format + if self.timestamps_format == "absolute": + # offsets from the start of measurement + written_timestamp = timestamp - self.started + else: + # deltas from the preceding event + written_timestamp = timestamp - self.last_timestamp + # Track last timestamp so the next event can compute its delta + self.last_timestamp = timestamp + line = self.FORMAT_EVENT.format(timestamp=written_timestamp, message=message) self.file.write(line) def on_message_received(self, msg: Message) -> None: diff --git a/doc/changelog.d/2022.added.md b/doc/changelog.d/2022.added.md new file mode 100644 index 000000000..b8b8cd205 --- /dev/null +++ b/doc/changelog.d/2022.added.md @@ -0,0 +1,4 @@ +Added `timestamps_format` parameter to `ASCWriter` to support configurable timestamp +format: `"absolute"` (default, timestamps are offsets from the start of measurement) +or `"relative"` (each timestamp is the delta from the preceding event), matching the +semantics described in the ASC format specification. diff --git a/test/logformats_test.py b/test/logformats_test.py index 6839c2450..42ed29b13 100644 --- a/test/logformats_test.py +++ b/test/logformats_test.py @@ -11,6 +11,7 @@ TODO: correctly set preserves_channel and adds_default_channel """ + import locale import logging import os @@ -687,6 +688,94 @@ def test_write(self): self.assertEqual(expected_file.read_text(), actual_file.read_text()) + def test_write_timestamps_format_default_is_absolute(self): + """ASCWriter should write 'timestamps absolute' in the header by default.""" + with can.ASCWriter(self.test_file_name) as writer: + pass + + content = Path(self.test_file_name).read_text() + self.assertIn("timestamps absolute", content) + + def test_write_timestamps_format_relative(self): + """ASCWriter should write 'timestamps relative' when requested.""" + with can.ASCWriter(self.test_file_name, timestamps_format="relative") as writer: + pass + + content = Path(self.test_file_name).read_text() + self.assertIn("timestamps relative", content) + self.assertNotIn("timestamps absolute", content) + + def test_write_timestamps_format_invalid(self): + """ASCWriter should raise ValueError for an unsupported timestamps_format.""" + with self.assertRaises(ValueError): + can.ASCWriter(self.test_file_name, timestamps_format="unix") + + def test_write_relative_timestamp_roundtrip(self): + """Messages written with relative format round-trip to their original timestamps.""" + msgs = [ + can.Message(timestamp=100.0, arbitration_id=0x1, data=b"\x01"), + can.Message(timestamp=100.3, arbitration_id=0x2, data=b"\x02"), + can.Message(timestamp=101.0, arbitration_id=0x3, data=b"\x03"), + ] + + with can.ASCWriter(self.test_file_name, timestamps_format="relative") as writer: + for m in msgs: + writer.on_message_received(m) + + with can.ASCReader(self.test_file_name, relative_timestamp=False) as reader: + result = list(reader) + + self.assertEqual(len(result), len(msgs)) + self.assertAlmostEqual(result[0].timestamp, 100.0, places=3) + self.assertAlmostEqual(result[1].timestamp, 100.3, places=3) + self.assertAlmostEqual(result[2].timestamp, 101.0, places=3) + + def test_write_relative_timestamps_are_per_event_deltas(self): + """With timestamps_format='relative', each written timestamp is a delta from the + preceding event (not an offset from measurement start).""" + msgs = [ + can.Message(timestamp=100.0, arbitration_id=0x1, data=b"\x01"), + can.Message(timestamp=100.3, arbitration_id=0x2, data=b"\x02"), + can.Message(timestamp=101.0, arbitration_id=0x3, data=b"\x03"), + ] + + with can.ASCWriter(self.test_file_name, timestamps_format="relative") as writer: + for m in msgs: + writer.on_message_received(m) + + with can.ASCReader(self.test_file_name, relative_timestamp=True) as reader: + result = list(reader) + + self.assertEqual(len(result), len(msgs)) + # msg1: 0.0 (delta from "Start of measurement" at same time) + # msg2: 0.3 (delta from msg1) + # msg3: 0.7 (delta from msg2 — NOT 1.0, which would be absolute offset) + self.assertAlmostEqual(result[0].timestamp, 0.0, places=5) + self.assertAlmostEqual(result[1].timestamp, 0.3, places=5) + self.assertAlmostEqual(result[2].timestamp, 0.7, places=5) + + def test_write_absolute_timestamps_are_offsets_from_start(self): + """With timestamps_format='absolute' (default), messages round-trip to their + original timestamps when read back with relative_timestamp=False.""" + msgs = [ + can.Message(timestamp=100.0, arbitration_id=0x1, data=b"\x01"), + can.Message(timestamp=100.3, arbitration_id=0x2, data=b"\x02"), + can.Message(timestamp=101.0, arbitration_id=0x3, data=b"\x03"), + ] + + with can.ASCWriter(self.test_file_name, timestamps_format="absolute") as writer: + for m in msgs: + writer.on_message_received(m) + + with can.ASCReader(self.test_file_name, relative_timestamp=False) as reader: + result = list(reader) + + self.assertEqual(len(result), len(msgs)) + # Timestamps are recovered from the triggerblock start time + file offset: + self.assertAlmostEqual(result[0].timestamp, 100.0, places=3) + self.assertAlmostEqual(result[1].timestamp, 100.3, places=3) + self.assertAlmostEqual(result[2].timestamp, 101.0, places=3) + @parameterized.expand( [ (