diff --git a/src/_pytest/junitxml.py b/src/_pytest/junitxml.py index 06cab961ef5..ec9922ee7bd 100644 --- a/src/_pytest/junitxml.py +++ b/src/_pytest/junitxml.py @@ -11,8 +11,6 @@ from __future__ import annotations from collections.abc import Callable -from datetime import datetime -from datetime import timezone import functools import os import platform @@ -636,8 +634,7 @@ def pytest_internalerror(self, excrepr: ExceptionRepr) -> None: reporter._add_simple("error", "internal error", str(excrepr)) def pytest_sessionstart(self) -> None: - self.suite_start_time = timing.time() - self.suite_start_perf = timing.perf_counter() + self.suite_start = timing.Instant() def pytest_sessionfinish(self) -> None: dirname = os.path.dirname(os.path.abspath(self.logfile)) @@ -645,8 +642,7 @@ def pytest_sessionfinish(self) -> None: os.makedirs(dirname, exist_ok=True) with open(self.logfile, "w", encoding="utf-8") as logfile: - suite_stop_perf = timing.perf_counter() - suite_time_delta = suite_stop_perf - self.suite_start_perf + duration = self.suite_start.elapsed() numtests = ( self.stats["passed"] @@ -664,10 +660,8 @@ def pytest_sessionfinish(self) -> None: failures=str(self.stats["failure"]), skipped=str(self.stats["skipped"]), tests=str(numtests), - time=f"{suite_time_delta:.3f}", - timestamp=datetime.fromtimestamp(self.suite_start_time, timezone.utc) - .astimezone() - .isoformat(), + time=f"{duration.seconds:.3f}", + timestamp=self.suite_start.as_utc().astimezone().isoformat(), hostname=platform.node(), ) global_properties = self._get_global_properties_node() diff --git a/src/_pytest/pytester.py b/src/_pytest/pytester.py index bb5f6a5787e..11127a88bb8 100644 --- a/src/_pytest/pytester.py +++ b/src/_pytest/pytester.py @@ -1150,7 +1150,7 @@ def runpytest_inprocess( if syspathinsert: self.syspathinsert() - now = timing.perf_counter() + instant = timing.Instant() capture = _get_multicapture("sys") capture.start_capturing() try: @@ -1180,7 +1180,7 @@ class reprec: # type: ignore assert reprec.ret is not None res = RunResult( - reprec.ret, out.splitlines(), err.splitlines(), timing.perf_counter() - now + reprec.ret, out.splitlines(), err.splitlines(), instant.elapsed().seconds ) res.reprec = reprec # type: ignore return res @@ -1408,7 +1408,7 @@ def run( print(" in:", Path.cwd()) with p1.open("w", encoding="utf8") as f1, p2.open("w", encoding="utf8") as f2: - now = timing.perf_counter() + instant = timing.Instant() popen = self.popen( cmdargs, stdin=stdin, @@ -1445,7 +1445,7 @@ def handle_timeout() -> None: with contextlib.suppress(ValueError): ret = ExitCode(ret) - return RunResult(ret, out, err, timing.perf_counter() - now) + return RunResult(ret, out, err, instant.elapsed().seconds) def _dump_lines(self, lines, fp): try: diff --git a/src/_pytest/runner.py b/src/_pytest/runner.py index f0543289267..26e4e838b77 100644 --- a/src/_pytest/runner.py +++ b/src/_pytest/runner.py @@ -339,8 +339,7 @@ def from_call( function, instead of being wrapped in the CallInfo. """ excinfo = None - start = timing.time() - precise_start = timing.perf_counter() + instant = timing.Instant() try: result: TResult | None = func() except BaseException: @@ -348,14 +347,11 @@ def from_call( if reraise is not None and isinstance(excinfo.value, reraise): raise result = None - # use the perf counter - precise_stop = timing.perf_counter() - duration = precise_stop - precise_start - stop = timing.time() + duration = instant.elapsed() return cls( - start=start, - stop=stop, - duration=duration, + start=duration.start.time, + stop=duration.stop.time, + duration=duration.seconds, when=when, result=result, excinfo=excinfo, diff --git a/src/_pytest/terminal.py b/src/_pytest/terminal.py index 8415912e9ac..d5ccc4e4900 100644 --- a/src/_pytest/terminal.py +++ b/src/_pytest/terminal.py @@ -391,7 +391,7 @@ def __init__(self, config: Config, file: TextIO | None = None) -> None: self._progress_nodeids_reported: set[str] = set() self._timing_nodeids_reported: set[str] = set() self._show_progress_info = self._determine_show_progress_info() - self._collect_report_last_write: float | None = None + self._collect_report_last_write = timing.Instant() self._already_displayed_warnings: int | None = None self._keyboardinterrupt_memo: ExceptionRepr | None = None @@ -769,7 +769,6 @@ def pytest_collection(self) -> None: if self.isatty: if self.config.option.verbose >= 0: self.write("collecting ... ", flush=True, bold=True) - self._collect_report_last_write = timing.perf_counter() elif self.config.option.verbose >= 1: self.write("collecting ... ", flush=True, bold=True) @@ -788,14 +787,13 @@ def report_collect(self, final: bool = False) -> None: return if not final: - # Only write "collecting" report every 0.5s. - t = timing.perf_counter() + # Only write the "collecting" report every `REPORT_COLLECTING_RESOLUTION`. if ( - self._collect_report_last_write is not None - and self._collect_report_last_write > t - REPORT_COLLECTING_RESOLUTION + self._collect_report_last_write.elapsed().seconds + < REPORT_COLLECTING_RESOLUTION ): return - self._collect_report_last_write = t + self._collect_report_last_write = timing.Instant() errors = len(self.stats.get("error", [])) skipped = len(self.stats.get("skipped", [])) @@ -823,7 +821,7 @@ def report_collect(self, final: bool = False) -> None: @hookimpl(trylast=True) def pytest_sessionstart(self, session: Session) -> None: self._session = session - self._sessionstarttime = timing.perf_counter() + self._session_start = timing.Instant() if not self.showheader: return self.write_sep("=", "test session starts", bold=True) @@ -1202,7 +1200,7 @@ def summary_stats(self) -> None: if self.verbosity < -1: return - session_duration = timing.perf_counter() - self._sessionstarttime + session_duration = self._session_start.elapsed() (parts, main_color) = self.build_summary_stats_line() line_parts = [] @@ -1217,7 +1215,7 @@ def summary_stats(self) -> None: msg = ", ".join(line_parts) main_markup = {main_color: True} - duration = f" in {format_session_duration(session_duration)}" + duration = f" in {format_session_duration(session_duration.seconds)}" duration_with_markup = self._tw.markup(duration, **main_markup) if display_sep: fullwidth += len(duration_with_markup) - len(duration) diff --git a/src/_pytest/timing.py b/src/_pytest/timing.py index 4422037a9d9..221eeffc4fd 100644 --- a/src/_pytest/timing.py +++ b/src/_pytest/timing.py @@ -10,6 +10,7 @@ import dataclasses from datetime import datetime +from datetime import timezone from time import perf_counter from time import sleep from time import time @@ -20,6 +21,47 @@ from pytest import MonkeyPatch +@dataclasses.dataclass(frozen=True) +class Instant: + """ + Represents an instant in time, used to both get the timestamp value and to measure + the duration of a time span. + + Inspired by Rust's `std::time::Instant`. + """ + + # Creation time of this instant, using time.time(), to measure actual time. + # Note: using a `lambda` to correctly get the mocked time via `MockTiming`. + time: float = dataclasses.field(default_factory=lambda: time(), init=False) + + # Performance counter tick of the instant, used to measure precise elapsed time. + # Note: using a `lambda` to correctly get the mocked time via `MockTiming`. + perf_count: float = dataclasses.field( + default_factory=lambda: perf_counter(), init=False + ) + + def elapsed(self) -> Duration: + """Measure the duration since `Instant` was created.""" + return Duration(start=self, stop=Instant()) + + def as_utc(self) -> datetime: + """Instant as UTC datetime.""" + return datetime.fromtimestamp(self.time, timezone.utc) + + +@dataclasses.dataclass(frozen=True) +class Duration: + """A span of time as measured by `Instant.elapsed()`.""" + + start: Instant + stop: Instant + + @property + def seconds(self) -> float: + """Elapsed time of the duration in seconds, measured using a performance counter for precise timing.""" + return self.stop.perf_count - self.start.perf_count + + @dataclasses.dataclass class MockTiming: """Mocks _pytest.timing with a known object that can be used to control timing in tests diff --git a/testing/_py/test_local.py b/testing/_py/test_local.py index e4b011a9727..7064d1daa9b 100644 --- a/testing/_py/test_local.py +++ b/testing/_py/test_local.py @@ -12,7 +12,6 @@ from py import error from py.path import local -import _pytest.timing import pytest @@ -747,7 +746,8 @@ def test_setmtime(self): name = tempfile.mktemp() open(name, "w").close() try: - mtime = int(_pytest.timing.time()) - 100 + # Do not use _pytest.timing here, as we do not want time mocking to affect this test. + mtime = int(time.time()) - 100 path = local(name) assert path.mtime() != mtime path.setmtime(mtime) @@ -1405,7 +1405,8 @@ def test_atime(self, tmpdir): import time path = tmpdir.ensure("samplefile") - now = _pytest.timing.perf_counter() + # Do not use _pytest.timing here, as we do not want time mocking to affect this test. + now = time.time() atime1 = path.atime() # we could wait here but timer resolution is very # system dependent @@ -1413,7 +1414,7 @@ def test_atime(self, tmpdir): time.sleep(ATIME_RESOLUTION) atime2 = path.atime() time.sleep(ATIME_RESOLUTION) - duration = _pytest.timing.perf_counter() - now + duration = time.time() - now assert (atime2 - atime1) <= duration def test_commondir(self, path1): diff --git a/testing/test_pytester.py b/testing/test_pytester.py index 555d73f9eaa..721e8c19d8b 100644 --- a/testing/test_pytester.py +++ b/testing/test_pytester.py @@ -451,13 +451,12 @@ def test_pytester_run_with_timeout(pytester: Pytester) -> None: timeout = 120 - start = _pytest.timing.perf_counter() + instant = _pytest.timing.Instant() result = pytester.runpytest_subprocess(testfile, timeout=timeout) - end = _pytest.timing.perf_counter() - duration = end - start + duration = instant.elapsed() assert result.ret == ExitCode.OK - assert duration < timeout + assert duration.seconds < timeout def test_pytester_run_timeout_expires(pytester: Pytester) -> None: