Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,33 +43,33 @@ Notes
- This preserves all runs (original + restarts) and per‑run attempts (`run_attempt`).
- Job retries typically show up as separate job rows; names may include `Attempt #2` and have later `started_at`.

## Phase B — Test Details Fetch (batched, from `default.test_run_s3`)
## Phase B — Test Details Fetch (batched, from `tests.all_test_runs`)

Decide in Python which jobs belong to the test‑track (e.g., `rule IN ('pytest failure','Python unittest failure')`. For those (job_id, run_id[, run_attempt]) triples, fetch per‑test rows directly from `default.test_run_s3` — this table contains one row per testcase, including successful ones (failure_count=0, error_count=0).
Decide in Python which jobs belong to the test‑track (e.g., `rule IN ('pytest failure','Python unittest failure')`). For those (job_id, run_id[, run_attempt]) triples, fetch per‑test rows directly from `tests.all_test_runs` — this table contains one row per testcase and is populated earlier while jobs may still be running.

Why `test_run_s3` only?
- We need per‑test identities to build per‑test Signals; `default.test_run_s3` has them. Summary is optional and redundant for this layer.
Why `tests.all_test_runs`?
- We need per‑test identities to build per‑test Signals; `tests.all_test_runs` has them and is populated earlier than the final summary tables. Summary is optional and redundant for this layer.
- Performance remains good by filtering on `job_id IN (...)` (first PK column) and grouping; limit to the time window implicitly via the selected job set from Phase A.

Job selection for test track:
- Step 1: find normalized job base names that exhibited a test‑related classification in any commit within the window.
- Step 2: include ALL jobs across ALL commits whose normalized base is in that set (original runs, restarts; any run_id/attempt) so we can observe successes or pendings for the same test on other commits.

Optimized batched test_run_s3 query (for N job_ids):
Optimized batched all_test_runs query (for N job_ids):

```
SELECT job_id, workflow_id, workflow_run_attempt, file, classname, name,
max(failure_count > 0) AS failing,
max(error_count > 0) AS errored,
max(rerun_count > 0) AS rerun_seen,
count() AS rows
FROM default.test_run_s3
FROM tests.all_test_runs
WHERE job_id IN {job_ids:Array(Int64)}
GROUP BY job_id, workflow_id, workflow_run_attempt, file, classname, name
```

Notes
- Use `job_id IN (...)` to leverage the PK prefix `(job_id, name, classname, invoking_file, file)`.
- Use `job_id IN (...)` to leverage the table’s primary key prefix on `job_id`.
- We keep `workflow_run_attempt` to distinguish attempts within the same workflow run.

## Mapping to Signals
Expand All @@ -83,7 +83,7 @@ Notes
Ordering: dicts in Python 3.7+ preserve insertion order. Phase A inserts commit keys in push‑timestamp DESC order, so iterating the mapping yields newest→older commits without extra sorting.

### Test‑track semantics
- Source of truth for SUCCESS/FAILURE is `default.test_run_s3` per test id.
- Source of truth for SUCCESS/FAILURE is `tests.all_test_runs` per test id.
- When a test row exists for an attempt:
- Emit at most one FAILURE if any failed runs exist; at most one SUCCESS if any successful runs exist.
- When no test rows exist for an attempt and any grouped job for that attempt is pending → emit PENDING.
Expand All @@ -106,9 +106,9 @@ Event naming (for debuggability):

### Test‑track mapping
- Build a per‑commit map `test_id -> list[SignalEvent]` by combining all relevant jobs and shards:
- For each (wf_run_id, run_attempt, job_base_name) group in the commit, consult `test_run_s3` rows (if any) for each candidate `test_id`:
- If `test_run_s3` rows exist for this `test_id` → status should reflect the found test verdict.
- If no `test_run_s3` rows exist and the group is still running (some jobs pending) → status = PENDING.
- For each (wf_run_id, run_attempt, job_base_name) group in the commit, consult `tests.all_test_runs` rows (if any) for each candidate `test_id`:
- If rows exist for this `test_id` → status should reflect the found test verdict.
- If no rows exist and the group is still running (some jobs pending) → status = PENDING.
- Else (no rows and group completed) → missing/unknown (no event emitted).
- Event boundaries (naturally arise from grouping):
- Separate events for distinct workflow runs (different `wf_run_id`) on the same commit (regardless of how they were triggered).
Expand Down Expand Up @@ -174,7 +174,7 @@ Notes
3) Implement selectors for test‑track pairs (Python filter on `rule`).
4) Implement batched Phase B queries:
- Use `(workflow_id, job_id) IN array(tuple(...))` to leverage PK prefixes.
- call `test_run_s3` to enumerate failing tests
- call `tests.all_test_runs` to enumerate failing tests
5) Implement mapping to Signals for both tracks, emitting multiple events per commit as specified.
6) Add unit tests:
- Test‑track: a) failure on one commit; b) success on another; c) unknown/gap.
Expand All @@ -185,7 +185,7 @@ Notes

- Keep the window small (16–32h) and deduplicate commits via push timestamps.
- Limit the batched pairs size; chunk when necessary.
- Align filters with primary keys: `job_id` for `test_run_s3`.
- Align filters with primary keys: `job_id` for `tests.all_test_runs`.
- Avoid scanning all of `workflow_job` by joining to recent pushes and filtering repo/branches.

## Open Questions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ def extract(self) -> List[Signal]:
# Select jobs to participate in test-track details fetch
test_track_job_ids, failed_job_ids = self._select_test_track_job_ids(jobs)
test_rows = self._datasource.fetch_tests_for_job_ids(
test_track_job_ids, failed_job_ids=failed_job_ids
test_track_job_ids,
failed_job_ids=failed_job_ids,
lookback_hours=self.lookback_hours,
)

test_signals = self._build_test_signals(jobs, test_rows, commits)
Expand Down Expand Up @@ -218,7 +220,7 @@ def _inject_pending_workflow_events(
return out

# -----------------------------
# Phase B — Tests (test_run_s3 only)
# Phase B — Tests (tests.all_test_runs only)
# -----------------------------
def _select_test_track_job_ids(
self, jobs: List[JobRow]
Expand Down Expand Up @@ -259,11 +261,11 @@ def _build_test_signals(
) -> List[Signal]:
"""Build per-test Signals across commits, scoped to job base.

We index `default.test_run_s3` rows per (wf_run_id, run_attempt, job_base) and collect
We index `tests.all_test_runs` rows per (wf_run_id, run_attempt, job_base) and collect
which base(s) (by normalized job name) a test appears in. For each commit and (workflow, base),
we compute attempt metadata (pending/completed, start time). Then, for tests that failed at least once in
that base, we emit events per commit/attempt:
- If test_run_s3 rows exist → emit at most one FAILURE event if any failed runs exist,
- If tests.all_test_runs rows exist → emit at most one FAILURE event if any failed runs exist,
and at most one SUCCESS event if any successful runs exist (both may be present).
- Else if group pending → PENDING
- Else → no event (missing)
Expand Down Expand Up @@ -295,7 +297,7 @@ def _build_test_signals(
value_fn=lambda j: (j.wf_run_id, j.run_attempt),
)

# Index test_run_s3 rows per (commit, job_base, wf_run, attempt, test_id)
# Index tests.all_test_runs rows per (commit, job_base, wf_run, attempt, test_id)
# Store aggregated failure/success counts
tests_by_group_attempt: Dict[
Tuple[Sha, WorkflowName, JobBaseName, WfRunId, RunAttempt, TestId],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,15 @@ def fetch_tests_for_job_ids(
job_ids: List[JobId],
*,
failed_job_ids: List[JobId],
lookback_hours: int,
) -> List[TestRow]:
"""Batch fetch test verdict rows from default.test_run_s3 for given job ids.
"""Batch fetch test verdict rows from tests.all_test_runs for given job ids.

If failed_job_ids is provided, first compute the set of failed test identifiers
(file+classname+name) from those jobs, and only fetch tests for job_ids that
match that set. This reduces the result size significantly.
Additionally, constrain by the table's partition (toDate(time_inserted))
using NOW() and the lookback window with a 1-day margin to avoid timezone issues.
"""
log = logging.getLogger(__name__)
if not job_ids:
Expand Down Expand Up @@ -224,25 +227,31 @@ def fetch_tests_for_job_ids(
)
# One query with a CTE that enumerates failed test ids from failed_job_ids,
# then filters the main selection by those ids for the current chunk.
# Note: success_runs explicitly excludes skipped rows via skipped_count = 0.
# Partition pruning: restrict toDate(time_inserted) to the lookback window
# with a 1 day margin using NOW() to avoid timezone handling.
query = """
WITH failed_test_names AS (
SELECT DISTINCT concat(file, '|', classname, '|', name) AS test_id
FROM default.test_run_s3
FROM tests.all_test_runs
WHERE job_id IN {failed_job_ids:Array(Int64)}
AND (failure_count > 0 OR error_count > 0)
AND toDate(time_inserted) >=
toDate(NOW() - toIntervalHour({lookback_hours:Int32}) - toIntervalDay(1))
)
SELECT job_id, workflow_id, workflow_run_attempt, file, classname, name,
countIf(failure_count > 0 OR error_count > 0) AS failure_runs,
countIf(failure_count = 0 AND error_count = 0 AND skipped_count = 0) AS success_runs
FROM default.test_run_s3
FROM tests.all_test_runs
WHERE job_id IN {job_ids:Array(Int64)}
AND concat(file, '|', classname, '|', name) IN failed_test_names
AND toDate(time_inserted) >=
toDate(NOW() - toIntervalHour({lookback_hours:Int32}) - toIntervalDay(1))
GROUP BY job_id, workflow_id, workflow_run_attempt, file, classname, name
"""
params = {
"job_ids": [int(j) for j in chunk],
"failed_job_ids": [int(j) for j in failed_job_ids],
"lookback_hours": int(lookback_hours),
}

for attempt in RetryWithBackoff():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def is_test_failure(self) -> bool:
return self.is_failure and (self.rule in DEFAULT_TEST_RULES)


# Represents a test verdict row from the test_run_s3 table in ClickHouse
# Represents a test verdict row from the tests.all_test_runs table in ClickHouse
@dataclass(frozen=True)
class TestRow:
job_id: JobId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ def fetch_jobs_for_workflows(
return list(self._jobs)

def fetch_tests_for_job_ids(
self, job_ids: List[JobId], *, failed_job_ids: List[JobId]
self,
job_ids: List[JobId],
*,
failed_job_ids: List[JobId],
lookback_hours: int = 24,
) -> List[TestRow]:
ids = {int(j) for j in job_ids}
return [r for r in self._tests if int(r.job_id) in ids]
Expand Down