Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions src/_pytest/junitxml.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
from __future__ import annotations

from collections.abc import Callable
from datetime import datetime
from datetime import timezone
import functools
import os
import platform
Expand Down Expand Up @@ -636,17 +634,15 @@ def pytest_internalerror(self, excrepr: ExceptionRepr) -> None:
reporter._add_simple("error", "internal error", str(excrepr))

def pytest_sessionstart(self) -> None:
self.suite_start_time = timing.time()
self.suite_start_perf = timing.perf_counter()
self.suite_start = timing.Instant()

def pytest_sessionfinish(self) -> None:
dirname = os.path.dirname(os.path.abspath(self.logfile))
# exist_ok avoids filesystem race conditions between checking path existence and requesting creation
os.makedirs(dirname, exist_ok=True)

with open(self.logfile, "w", encoding="utf-8") as logfile:
suite_stop_perf = timing.perf_counter()
suite_time_delta = suite_stop_perf - self.suite_start_perf
duration = self.suite_start.elapsed()

numtests = (
self.stats["passed"]
Expand All @@ -664,10 +660,8 @@ def pytest_sessionfinish(self) -> None:
failures=str(self.stats["failure"]),
skipped=str(self.stats["skipped"]),
tests=str(numtests),
time=f"{suite_time_delta:.3f}",
timestamp=datetime.fromtimestamp(self.suite_start_time, timezone.utc)
.astimezone()
.isoformat(),
time=f"{duration.seconds:.3f}",
timestamp=self.suite_start.as_utc().astimezone().isoformat(),
hostname=platform.node(),
)
global_properties = self._get_global_properties_node()
Expand Down
8 changes: 4 additions & 4 deletions src/_pytest/pytester.py
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,7 @@ def runpytest_inprocess(

if syspathinsert:
self.syspathinsert()
now = timing.perf_counter()
instant = timing.Instant()
capture = _get_multicapture("sys")
capture.start_capturing()
try:
Expand Down Expand Up @@ -1180,7 +1180,7 @@ class reprec: # type: ignore

assert reprec.ret is not None
res = RunResult(
reprec.ret, out.splitlines(), err.splitlines(), timing.perf_counter() - now
reprec.ret, out.splitlines(), err.splitlines(), instant.elapsed().seconds
)
res.reprec = reprec # type: ignore
return res
Expand Down Expand Up @@ -1408,7 +1408,7 @@ def run(
print(" in:", Path.cwd())

with p1.open("w", encoding="utf8") as f1, p2.open("w", encoding="utf8") as f2:
now = timing.perf_counter()
instant = timing.Instant()
popen = self.popen(
cmdargs,
stdin=stdin,
Expand Down Expand Up @@ -1445,7 +1445,7 @@ def handle_timeout() -> None:

with contextlib.suppress(ValueError):
ret = ExitCode(ret)
return RunResult(ret, out, err, timing.perf_counter() - now)
return RunResult(ret, out, err, instant.elapsed().seconds)

def _dump_lines(self, lines, fp):
try:
Expand Down
14 changes: 5 additions & 9 deletions src/_pytest/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,23 +339,19 @@ def from_call(
function, instead of being wrapped in the CallInfo.
"""
excinfo = None
start = timing.time()
precise_start = timing.perf_counter()
instant = timing.Instant()
try:
result: TResult | None = func()
except BaseException:
excinfo = ExceptionInfo.from_current()
if reraise is not None and isinstance(excinfo.value, reraise):
raise
result = None
# use the perf counter
precise_stop = timing.perf_counter()
duration = precise_stop - precise_start
stop = timing.time()
duration = instant.elapsed()
return cls(
start=start,
stop=stop,
duration=duration,
start=duration.start.time,
stop=duration.stop.time,
duration=duration.seconds,
when=when,
result=result,
excinfo=excinfo,
Expand Down
18 changes: 8 additions & 10 deletions src/_pytest/terminal.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ def __init__(self, config: Config, file: TextIO | None = None) -> None:
self._progress_nodeids_reported: set[str] = set()
self._timing_nodeids_reported: set[str] = set()
self._show_progress_info = self._determine_show_progress_info()
self._collect_report_last_write: float | None = None
self._collect_report_last_write = timing.Instant()
self._already_displayed_warnings: int | None = None
self._keyboardinterrupt_memo: ExceptionRepr | None = None

Expand Down Expand Up @@ -769,7 +769,6 @@ def pytest_collection(self) -> None:
if self.isatty:
if self.config.option.verbose >= 0:
self.write("collecting ... ", flush=True, bold=True)
self._collect_report_last_write = timing.perf_counter()
elif self.config.option.verbose >= 1:
self.write("collecting ... ", flush=True, bold=True)

Expand All @@ -788,14 +787,13 @@ def report_collect(self, final: bool = False) -> None:
return

if not final:
# Only write "collecting" report every 0.5s.
t = timing.perf_counter()
# Only write the "collecting" report every `REPORT_COLLECTING_RESOLUTION`.
if (
self._collect_report_last_write is not None
and self._collect_report_last_write > t - REPORT_COLLECTING_RESOLUTION
self._collect_report_last_write.elapsed().seconds
< REPORT_COLLECTING_RESOLUTION
):
return
self._collect_report_last_write = t
self._collect_report_last_write = timing.Instant()

errors = len(self.stats.get("error", []))
skipped = len(self.stats.get("skipped", []))
Expand Down Expand Up @@ -823,7 +821,7 @@ def report_collect(self, final: bool = False) -> None:
@hookimpl(trylast=True)
def pytest_sessionstart(self, session: Session) -> None:
self._session = session
self._sessionstarttime = timing.perf_counter()
self._session_start = timing.Instant()
if not self.showheader:
return
self.write_sep("=", "test session starts", bold=True)
Expand Down Expand Up @@ -1202,7 +1200,7 @@ def summary_stats(self) -> None:
if self.verbosity < -1:
return

session_duration = timing.perf_counter() - self._sessionstarttime
session_duration = self._session_start.elapsed()
(parts, main_color) = self.build_summary_stats_line()
line_parts = []

Expand All @@ -1217,7 +1215,7 @@ def summary_stats(self) -> None:
msg = ", ".join(line_parts)

main_markup = {main_color: True}
duration = f" in {format_session_duration(session_duration)}"
duration = f" in {format_session_duration(session_duration.seconds)}"
duration_with_markup = self._tw.markup(duration, **main_markup)
if display_sep:
fullwidth += len(duration_with_markup) - len(duration)
Expand Down
42 changes: 42 additions & 0 deletions src/_pytest/timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import dataclasses
from datetime import datetime
from datetime import timezone
from time import perf_counter
from time import sleep
from time import time
Expand All @@ -20,6 +21,47 @@
from pytest import MonkeyPatch


@dataclasses.dataclass(frozen=True)
class Instant:
"""
Represents an instant in time, used to both get the timestamp value and to measure
the duration of a time span.

Inspired by Rust's `std::time::Instant`.
"""

# Creation time of this instant, using time.time(), to measure actual time.
# Note: using a `lambda` to correctly get the mocked time via `MockTiming`.
time: float = dataclasses.field(default_factory=lambda: time(), init=False)

# Performance counter tick of the instant, used to measure precise elapsed time.
# Note: using a `lambda` to correctly get the mocked time via `MockTiming`.
perf_count: float = dataclasses.field(
default_factory=lambda: perf_counter(), init=False
)

def elapsed(self) -> Duration:
"""Measure the duration since `Instant` was created."""
return Duration(start=self, stop=Instant())

def as_utc(self) -> datetime:
"""Instant as UTC datetime."""
return datetime.fromtimestamp(self.time, timezone.utc)


@dataclasses.dataclass(frozen=True)
class Duration:
"""A span of time as measured by `Instant.elapsed()`."""

start: Instant
stop: Instant

@property
def seconds(self) -> float:
"""Elapsed time of the duration in seconds, measured using a performance counter for precise timing."""
return self.stop.perf_count - self.start.perf_count


@dataclasses.dataclass
class MockTiming:
"""Mocks _pytest.timing with a known object that can be used to control timing in tests
Expand Down
9 changes: 5 additions & 4 deletions testing/_py/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from py import error
from py.path import local

import _pytest.timing
import pytest


Expand Down Expand Up @@ -747,7 +746,8 @@ def test_setmtime(self):
name = tempfile.mktemp()
open(name, "w").close()
try:
mtime = int(_pytest.timing.time()) - 100
# Do not use _pytest.timing here, as we do not want time mocking to affect this test.
mtime = int(time.time()) - 100
path = local(name)
assert path.mtime() != mtime
path.setmtime(mtime)
Expand Down Expand Up @@ -1405,15 +1405,16 @@ def test_atime(self, tmpdir):
import time

path = tmpdir.ensure("samplefile")
now = _pytest.timing.perf_counter()
# Do not use _pytest.timing here, as we do not want time mocking to affect this test.
now = time.time()
atime1 = path.atime()
# we could wait here but timer resolution is very
# system dependent
path.read_binary()
time.sleep(ATIME_RESOLUTION)
atime2 = path.atime()
time.sleep(ATIME_RESOLUTION)
duration = _pytest.timing.perf_counter() - now
duration = time.time() - now
assert (atime2 - atime1) <= duration

def test_commondir(self, path1):
Expand Down
7 changes: 3 additions & 4 deletions testing/test_pytester.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,13 +451,12 @@ def test_pytester_run_with_timeout(pytester: Pytester) -> None:

timeout = 120

start = _pytest.timing.perf_counter()
instant = _pytest.timing.Instant()
result = pytester.runpytest_subprocess(testfile, timeout=timeout)
end = _pytest.timing.perf_counter()
duration = end - start
duration = instant.elapsed()

assert result.ret == ExitCode.OK
assert duration < timeout
assert duration.seconds < timeout


def test_pytester_run_timeout_expires(pytester: Pytester) -> None:
Expand Down