Skip to content

Commit bc5f540

Browse files
authored
[autorevert] use 'tests.all_test_runs' instead of 'default.test_run_s3' for test signals (#7439)
This table is populated before the workflow finishes, providing much better TTS for autorevert. testing: ``` python -m pytorch_auto_revert --dry-run autorevert-checker Lint trunk pull inductor --hours 24 --hud-html ``` ses also: pytorch/pytorch#166988 #7434
1 parent a7ee5a8 commit bc5f540

File tree

5 files changed

+39
-24
lines changed

5 files changed

+39
-24
lines changed

aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,33 +43,33 @@ Notes
4343
- This preserves all runs (original + restarts) and per‑run attempts (`run_attempt`).
4444
- Job retries typically show up as separate job rows; names may include `Attempt #2` and have later `started_at`.
4545

46-
## Phase B — Test Details Fetch (batched, from `default.test_run_s3`)
46+
## Phase B — Test Details Fetch (batched, from `tests.all_test_runs`)
4747

48-
Decide in Python which jobs belong to the test‑track (e.g., `rule IN ('pytest failure','Python unittest failure')`. For those (job_id, run_id[, run_attempt]) triples, fetch per‑test rows directly from `default.test_run_s3` — this table contains one row per testcase, including successful ones (failure_count=0, error_count=0).
48+
Decide in Python which jobs belong to the test‑track (e.g., `rule IN ('pytest failure','Python unittest failure')`). For those (job_id, run_id[, run_attempt]) triples, fetch per‑test rows directly from `tests.all_test_runs` — this table contains one row per testcase and is populated earlier while jobs may still be running.
4949

50-
Why `test_run_s3` only?
51-
- We need per‑test identities to build per‑test Signals; `default.test_run_s3` has them. Summary is optional and redundant for this layer.
50+
Why `tests.all_test_runs`?
51+
- We need per‑test identities to build per‑test Signals; `tests.all_test_runs` has them and is populated earlier than the final summary tables. Summary is optional and redundant for this layer.
5252
- Performance remains good by filtering on `job_id IN (...)` (first PK column) and grouping; limit to the time window implicitly via the selected job set from Phase A.
5353

5454
Job selection for test track:
5555
- Step 1: find normalized job base names that exhibited a test‑related classification in any commit within the window.
5656
- Step 2: include ALL jobs across ALL commits whose normalized base is in that set (original runs, restarts; any run_id/attempt) so we can observe successes or pendings for the same test on other commits.
5757

58-
Optimized batched test_run_s3 query (for N job_ids):
58+
Optimized batched all_test_runs query (for N job_ids):
5959

6060
```
6161
SELECT job_id, workflow_id, workflow_run_attempt, file, classname, name,
6262
max(failure_count > 0) AS failing,
6363
max(error_count > 0) AS errored,
6464
max(rerun_count > 0) AS rerun_seen,
6565
count() AS rows
66-
FROM default.test_run_s3
66+
FROM tests.all_test_runs
6767
WHERE job_id IN {job_ids:Array(Int64)}
6868
GROUP BY job_id, workflow_id, workflow_run_attempt, file, classname, name
6969
```
7070

7171
Notes
72-
- Use `job_id IN (...)` to leverage the PK prefix `(job_id, name, classname, invoking_file, file)`.
72+
- Use `job_id IN (...)` to leverage the table’s primary key prefix on `job_id`.
7373
- We keep `workflow_run_attempt` to distinguish attempts within the same workflow run.
7474

7575
## Mapping to Signals
@@ -83,7 +83,7 @@ Notes
8383
Ordering: dicts in Python 3.7+ preserve insertion order. Phase A inserts commit keys in push‑timestamp DESC order, so iterating the mapping yields newest→older commits without extra sorting.
8484

8585
### Test‑track semantics
86-
- Source of truth for SUCCESS/FAILURE is `default.test_run_s3` per test id.
86+
- Source of truth for SUCCESS/FAILURE is `tests.all_test_runs` per test id.
8787
- When a test row exists for an attempt:
8888
- Emit at most one FAILURE if any failed runs exist; at most one SUCCESS if any successful runs exist.
8989
- When no test rows exist for an attempt and any grouped job for that attempt is pending → emit PENDING.
@@ -106,9 +106,9 @@ Event naming (for debuggability):
106106

107107
### Test‑track mapping
108108
- Build a per‑commit map `test_id -> list[SignalEvent]` by combining all relevant jobs and shards:
109-
- For each (wf_run_id, run_attempt, job_base_name) group in the commit, consult `test_run_s3` rows (if any) for each candidate `test_id`:
110-
- If `test_run_s3` rows exist for this `test_id` → status should reflect the found test verdict.
111-
- If no `test_run_s3` rows exist and the group is still running (some jobs pending) → status = PENDING.
109+
- For each (wf_run_id, run_attempt, job_base_name) group in the commit, consult `tests.all_test_runs` rows (if any) for each candidate `test_id`:
110+
- If rows exist for this `test_id` → status should reflect the found test verdict.
111+
- If no rows exist and the group is still running (some jobs pending) → status = PENDING.
112112
- Else (no rows and group completed) → missing/unknown (no event emitted).
113113
- Event boundaries (naturally arise from grouping):
114114
- Separate events for distinct workflow runs (different `wf_run_id`) on the same commit (regardless of how they were triggered).
@@ -174,7 +174,7 @@ Notes
174174
3) Implement selectors for test‑track pairs (Python filter on `rule`).
175175
4) Implement batched Phase B queries:
176176
- Use `(workflow_id, job_id) IN array(tuple(...))` to leverage PK prefixes.
177-
- call `test_run_s3` to enumerate failing tests
177+
- call `tests.all_test_runs` to enumerate failing tests
178178
5) Implement mapping to Signals for both tracks, emitting multiple events per commit as specified.
179179
6) Add unit tests:
180180
- Test‑track: a) failure on one commit; b) success on another; c) unknown/gap.
@@ -185,7 +185,7 @@ Notes
185185

186186
- Keep the window small (16–32h) and deduplicate commits via push timestamps.
187187
- Limit the batched pairs size; chunk when necessary.
188-
- Align filters with primary keys: `job_id` for `test_run_s3`.
188+
- Align filters with primary keys: `job_id` for `tests.all_test_runs`.
189189
- Avoid scanning all of `workflow_job` by joining to recent pushes and filtering repo/branches.
190190

191191
## Open Questions

aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@ def extract(self) -> List[Signal]:
8282
# Select jobs to participate in test-track details fetch
8383
test_track_job_ids, failed_job_ids = self._select_test_track_job_ids(jobs)
8484
test_rows = self._datasource.fetch_tests_for_job_ids(
85-
test_track_job_ids, failed_job_ids=failed_job_ids
85+
test_track_job_ids,
86+
failed_job_ids=failed_job_ids,
87+
lookback_hours=self.lookback_hours,
8688
)
8789

8890
test_signals = self._build_test_signals(jobs, test_rows, commits)
@@ -218,7 +220,7 @@ def _inject_pending_workflow_events(
218220
return out
219221

220222
# -----------------------------
221-
# Phase B — Tests (test_run_s3 only)
223+
# Phase B — Tests (tests.all_test_runs only)
222224
# -----------------------------
223225
def _select_test_track_job_ids(
224226
self, jobs: List[JobRow]
@@ -259,11 +261,11 @@ def _build_test_signals(
259261
) -> List[Signal]:
260262
"""Build per-test Signals across commits, scoped to job base.
261263
262-
We index `default.test_run_s3` rows per (wf_run_id, run_attempt, job_base) and collect
264+
We index `tests.all_test_runs` rows per (wf_run_id, run_attempt, job_base) and collect
263265
which base(s) (by normalized job name) a test appears in. For each commit and (workflow, base),
264266
we compute attempt metadata (pending/completed, start time). Then, for tests that failed at least once in
265267
that base, we emit events per commit/attempt:
266-
- If test_run_s3 rows exist → emit at most one FAILURE event if any failed runs exist,
268+
- If tests.all_test_runs rows exist → emit at most one FAILURE event if any failed runs exist,
267269
and at most one SUCCESS event if any successful runs exist (both may be present).
268270
- Else if group pending → PENDING
269271
- Else → no event (missing)
@@ -295,7 +297,7 @@ def _build_test_signals(
295297
value_fn=lambda j: (j.wf_run_id, j.run_attempt),
296298
)
297299

298-
# Index test_run_s3 rows per (commit, job_base, wf_run, attempt, test_id)
300+
# Index tests.all_test_runs rows per (commit, job_base, wf_run, attempt, test_id)
299301
# Store aggregated failure/success counts
300302
tests_by_group_attempt: Dict[
301303
Tuple[Sha, WorkflowName, JobBaseName, WfRunId, RunAttempt, TestId],

aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_datasource.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,15 @@ def fetch_tests_for_job_ids(
189189
job_ids: List[JobId],
190190
*,
191191
failed_job_ids: List[JobId],
192+
lookback_hours: int,
192193
) -> List[TestRow]:
193-
"""Batch fetch test verdict rows from default.test_run_s3 for given job ids.
194+
"""Batch fetch test verdict rows from tests.all_test_runs for given job ids.
194195
195196
If failed_job_ids is provided, first compute the set of failed test identifiers
196197
(file+classname+name) from those jobs, and only fetch tests for job_ids that
197198
match that set. This reduces the result size significantly.
199+
Additionally, constrain by the table's partition (toDate(time_inserted))
200+
using NOW() and the lookback window with a 1-day margin to avoid timezone issues.
198201
"""
199202
log = logging.getLogger(__name__)
200203
if not job_ids:
@@ -224,25 +227,31 @@ def fetch_tests_for_job_ids(
224227
)
225228
# One query with a CTE that enumerates failed test ids from failed_job_ids,
226229
# then filters the main selection by those ids for the current chunk.
227-
# Note: success_runs explicitly excludes skipped rows via skipped_count = 0.
230+
# Partition pruning: restrict toDate(time_inserted) to the lookback window
231+
# with a 1 day margin using NOW() to avoid timezone handling.
228232
query = """
229233
WITH failed_test_names AS (
230234
SELECT DISTINCT concat(file, '|', classname, '|', name) AS test_id
231-
FROM default.test_run_s3
235+
FROM tests.all_test_runs
232236
WHERE job_id IN {failed_job_ids:Array(Int64)}
233237
AND (failure_count > 0 OR error_count > 0)
238+
AND toDate(time_inserted) >=
239+
toDate(NOW() - toIntervalHour({lookback_hours:Int32}) - toIntervalDay(1))
234240
)
235241
SELECT job_id, workflow_id, workflow_run_attempt, file, classname, name,
236242
countIf(failure_count > 0 OR error_count > 0) AS failure_runs,
237243
countIf(failure_count = 0 AND error_count = 0 AND skipped_count = 0) AS success_runs
238-
FROM default.test_run_s3
244+
FROM tests.all_test_runs
239245
WHERE job_id IN {job_ids:Array(Int64)}
240246
AND concat(file, '|', classname, '|', name) IN failed_test_names
247+
AND toDate(time_inserted) >=
248+
toDate(NOW() - toIntervalHour({lookback_hours:Int32}) - toIntervalDay(1))
241249
GROUP BY job_id, workflow_id, workflow_run_attempt, file, classname, name
242250
"""
243251
params = {
244252
"job_ids": [int(j) for j in chunk],
245253
"failed_job_ids": [int(j) for j in failed_job_ids],
254+
"lookback_hours": int(lookback_hours),
246255
}
247256

248257
for attempt in RetryWithBackoff():

aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ def is_test_failure(self) -> bool:
121121
return self.is_failure and (self.rule in DEFAULT_TEST_RULES)
122122

123123

124-
# Represents a test verdict row from the test_run_s3 table in ClickHouse
124+
# Represents a test verdict row from the tests.all_test_runs table in ClickHouse
125125
@dataclass(frozen=True)
126126
class TestRow:
127127
job_id: JobId

aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,11 @@ def fetch_jobs_for_workflows(
5252
return list(self._jobs)
5353

5454
def fetch_tests_for_job_ids(
55-
self, job_ids: List[JobId], *, failed_job_ids: List[JobId]
55+
self,
56+
job_ids: List[JobId],
57+
*,
58+
failed_job_ids: List[JobId],
59+
lookback_hours: int = 24,
5660
) -> List[TestRow]:
5761
ids = {int(j) for j in job_ids}
5862
return [r for r in self._tests if int(r.job_id) in ids]

0 commit comments

Comments
 (0)