Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 12 additions & 28 deletions aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,33 +43,33 @@ Notes
- This preserves all runs (original + restarts) and per‑run attempts (`run_attempt`).
- Job retries typically show up as separate job rows; names may include `Attempt #2` and have later `started_at`.

## Phase B — Test Details Fetch (batched, from `tests.all_test_runs`)
## Phase B — Test Details Fetch (batched, from `default.test_run_s3`)

Decide in Python which jobs belong to the test‑track (e.g., `rule IN ('pytest failure','Python unittest failure')`). For those (job_id, run_id[, run_attempt]) triples, fetch per‑test rows directly from `tests.all_test_runs` — this table contains one row per testcase and is populated earlier while jobs may still be running.
Decide in Python which jobs belong to the test‑track (e.g., `rule IN ('pytest failure','Python unittest failure')`. For those (job_id, run_id[, run_attempt]) triples, fetch per‑test rows directly from `default.test_run_s3` — this table contains one row per testcase, including successful ones (failure_count=0, error_count=0).

Why `tests.all_test_runs`?
- We need per‑test identities to build per‑test Signals; `tests.all_test_runs` has them and is populated earlier than the final summary tables. Summary is optional and redundant for this layer.
Why `test_run_s3` only?
- We need per‑test identities to build per‑test Signals; `default.test_run_s3` has them. Summary is optional and redundant for this layer.
- Performance remains good by filtering on `job_id IN (...)` (first PK column) and grouping; limit to the time window implicitly via the selected job set from Phase A.

Job selection for test track:
- Step 1: find normalized job base names that exhibited a test‑related classification in any commit within the window.
- Step 2: include ALL jobs across ALL commits whose normalized base is in that set (original runs, restarts; any run_id/attempt) so we can observe successes or pendings for the same test on other commits.

Optimized batched all_test_runs query (for N job_ids):
Optimized batched test_run_s3 query (for N job_ids):

```
SELECT job_id, workflow_id, workflow_run_attempt, file, classname, name,
max(failure_count > 0) AS failing,
max(error_count > 0) AS errored,
max(rerun_count > 0) AS rerun_seen,
count() AS rows
FROM tests.all_test_runs
FROM default.test_run_s3
WHERE job_id IN {job_ids:Array(Int64)}
GROUP BY job_id, workflow_id, workflow_run_attempt, file, classname, name
```

Notes
- Use `job_id IN (...)` to leverage the table’s primary key prefix on `job_id`.
- Use `job_id IN (...)` to leverage the PK prefix `(job_id, name, classname, invoking_file, file)`.
- We keep `workflow_run_attempt` to distinguish attempts within the same workflow run.

## Mapping to Signals
Expand All @@ -82,22 +82,6 @@ Notes
- Each commit holds a list of `SignalEvent`s (time‑ordered by `started_at`).
Ordering: dicts in Python 3.7+ preserve insertion order. Phase A inserts commit keys in push‑timestamp DESC order, so iterating the mapping yields newest→older commits without extra sorting.

### Test‑track semantics
- Source of truth for SUCCESS/FAILURE is `tests.all_test_runs` per test id.
- When a test row exists for an attempt:
- Emit at most one FAILURE if any failed runs exist; at most one SUCCESS if any successful runs exist.
- When no test rows exist for an attempt and any grouped job for that attempt is pending → emit PENDING.
- Otherwise (no test rows and not pending) → no event for that attempt.

### Job‑track semantics (non‑test)
- Build per normalized job base across commits; aggregate shards by `(wf_run_id, run_attempt)`.
- Event mapping per attempt uses aggregated job meta with test‑failure filtering:
- FAILURE only when the attempt had non‑test failures (e.g. infra‑related).
- PENDING when the attempt is still running.
- SUCCESS otherwise, including when failures are exclusively test‑caused (these are handled by test‑track).
- Cancelled attempts are treated as missing (no event).
- Emit a job‑track Signal only when at least one attempt/commit shows a non‑test (infra) failure within the window.

Event naming (for debuggability):
- Consistent key=value format: `wf=<workflow> kind=<test|job> id=<test_id|job_base> run=<wf_run_id> attempt=<run_attempt>`
- Examples:
Expand All @@ -106,9 +90,9 @@ Event naming (for debuggability):

### Test‑track mapping
- Build a per‑commit map `test_id -> list[SignalEvent]` by combining all relevant jobs and shards:
- For each (wf_run_id, run_attempt, job_base_name) group in the commit, consult `tests.all_test_runs` rows (if any) for each candidate `test_id`:
- If rows exist for this `test_id` → status should reflect the found test verdict.
- If no rows exist and the group is still running (some jobs pending) → status = PENDING.
- For each (wf_run_id, run_attempt, job_base_name) group in the commit, consult `test_run_s3` rows (if any) for each candidate `test_id`:
- If `test_run_s3` rows exist for this `test_id` → status should reflect the found test verdict.
- If no `test_run_s3` rows exist and the group is still running (some jobs pending) → status = PENDING.
- Else (no rows and group completed) → missing/unknown (no event emitted).
- Event boundaries (naturally arise from grouping):
- Separate events for distinct workflow runs (different `wf_run_id`) on the same commit (regardless of how they were triggered).
Expand Down Expand Up @@ -174,7 +158,7 @@ Notes
3) Implement selectors for test‑track pairs (Python filter on `rule`).
4) Implement batched Phase B queries:
- Use `(workflow_id, job_id) IN array(tuple(...))` to leverage PK prefixes.
- call `tests.all_test_runs` to enumerate failing tests
- call `test_run_s3` to enumerate failing tests
5) Implement mapping to Signals for both tracks, emitting multiple events per commit as specified.
6) Add unit tests:
- Test‑track: a) failure on one commit; b) success on another; c) unknown/gap.
Expand All @@ -185,7 +169,7 @@ Notes

- Keep the window small (16–32h) and deduplicate commits via push timestamps.
- Limit the batched pairs size; chunk when necessary.
- Align filters with primary keys: `job_id` for `tests.all_test_runs`.
- Align filters with primary keys: `job_id` for `test_run_s3`.
- Avoid scanning all of `workflow_job` by joining to recent pushes and filtering repo/branches.

## Open Questions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(self):
self.secret_store_name = os.environ.get("SECRET_STORE_NAME", "")
self.workflows = os.environ.get(
"WORKFLOWS",
",".join(["Lint", "trunk", "pull", "inductor", "linux-aarch64", "slow"]),
",".join(["Lint", "trunk", "pull", "inductor", "linux-aarch64"]),
).split(",")

def to_autorevert_v2_params(
Expand Down
33 changes: 0 additions & 33 deletions aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,6 @@ def has_success(self) -> bool:
def has_failure(self) -> bool:
return SignalStatus.FAILURE in self.statuses

def count_by_status(self, status: SignalStatus) -> int:
"""Get the count of events with the specified status."""
return self.statuses.get(status, 0)

def events_by_status(self, status: SignalStatus) -> List[SignalEvent]:
"""Get all events with the specified status."""
return [event for event in self.events if event.status == status]
Expand Down Expand Up @@ -274,13 +270,6 @@ class InfraCheckResult(Enum):
RESTART_FAILURE = "restart_failure" # no failure after any success


class SignalSource(Enum):
"""Origin of a Signal: test-track or job-track."""

TEST = "test"
JOB = "job"


class Signal:
"""A refined, column-like view of raw CI data for pattern detection.

Expand All @@ -296,15 +285,12 @@ def __init__(
workflow_name: str,
commits: List[SignalCommit],
job_base_name: Optional[str] = None,
source: SignalSource = SignalSource.TEST,
):
self.key = key
self.workflow_name = workflow_name
# commits are ordered from newest to oldest
self.commits = commits
self.job_base_name = job_base_name
# Track the origin of the signal (test-track or job-track).
self.source = source

def detect_fixed(self) -> bool:
"""
Expand Down Expand Up @@ -465,16 +451,6 @@ def process_valid_autorevert_pattern(
):
restart_commits.add(partition.successful[0].head_sha)

# Job-track specific requirement: when there is no gap (unknown empty),
# require a failed rerun on the first failing commit to increase confidence.
if (
not partition.unknown
and self.source == SignalSource.JOB
and not partition.failed[-1].has_pending
and len(partition.failed[-1].events) < 2
):
restart_commits.add(partition.failed[-1].head_sha)

if restart_commits:
return RestartCommits(commit_shas=restart_commits)

Expand All @@ -496,15 +472,6 @@ def process_valid_autorevert_pattern(
f"not enough successes to make call: {partition.success_events_count()}",
)

if (
self.source == SignalSource.JOB
and partition.failed[-1].count_by_status(SignalStatus.FAILURE) < 2
):
return Ineligible(
IneligibleReason.INSUFFICIENT_FAILURES,
"job-track signal requires at least 2 failures on the first failing commit",
)

if partition.unknown:
# there are still pending/missing commits in the unknown partition
unknown_shas = ", ".join(c.head_sha for c in partition.unknown)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from typing import Dict, Iterable, List, Optional, Set, Tuple

from .job_agg_index import JobAggIndex, JobMeta, SignalStatus as AggStatus
from .signal import Signal, SignalCommit, SignalEvent, SignalSource, SignalStatus
from .signal import Signal, SignalCommit, SignalEvent, SignalStatus
from .signal_extraction_datasource import SignalExtractionDatasource
from .signal_extraction_types import (
JobBaseName,
Expand Down Expand Up @@ -82,9 +82,7 @@ def extract(self) -> List[Signal]:
# Select jobs to participate in test-track details fetch
test_track_job_ids, failed_job_ids = self._select_test_track_job_ids(jobs)
test_rows = self._datasource.fetch_tests_for_job_ids(
test_track_job_ids,
failed_job_ids=failed_job_ids,
lookback_hours=self.lookback_hours,
test_track_job_ids, failed_job_ids=failed_job_ids
)

test_signals = self._build_test_signals(jobs, test_rows, commits)
Expand Down Expand Up @@ -129,7 +127,6 @@ def _dedup_signal_events(self, signals: List[Signal]) -> List[Signal]:
workflow_name=s.workflow_name,
commits=new_commits,
job_base_name=s.job_base_name,
source=s.source,
)
)
return deduped
Expand Down Expand Up @@ -214,13 +211,12 @@ def _inject_pending_workflow_events(
workflow_name=s.workflow_name,
commits=new_commits,
job_base_name=s.job_base_name,
source=s.source,
)
)
return out

# -----------------------------
# Phase B — Tests (tests.all_test_runs only)
# Phase B — Tests (test_run_s3 only)
# -----------------------------
def _select_test_track_job_ids(
self, jobs: List[JobRow]
Expand Down Expand Up @@ -261,11 +257,11 @@ def _build_test_signals(
) -> List[Signal]:
"""Build per-test Signals across commits, scoped to job base.

We index `tests.all_test_runs` rows per (wf_run_id, run_attempt, job_base) and collect
We index `default.test_run_s3` rows per (wf_run_id, run_attempt, job_base) and collect
which base(s) (by normalized job name) a test appears in. For each commit and (workflow, base),
we compute attempt metadata (pending/completed, start time). Then, for tests that failed at least once in
that base, we emit events per commit/attempt:
- If tests.all_test_runs rows exist → emit at most one FAILURE event if any failed runs exist,
- If test_run_s3 rows exist → emit at most one FAILURE event if any failed runs exist,
and at most one SUCCESS event if any successful runs exist (both may be present).
- Else if group pending → PENDING
- Else → no event (missing)
Expand Down Expand Up @@ -297,7 +293,7 @@ def _build_test_signals(
value_fn=lambda j: (j.wf_run_id, j.run_attempt),
)

# Index tests.all_test_runs rows per (commit, job_base, wf_run, attempt, test_id)
# Index test_run_s3 rows per (commit, job_base, wf_run, attempt, test_id)
# Store aggregated failure/success counts
tests_by_group_attempt: Dict[
Tuple[Sha, WorkflowName, JobBaseName, WfRunId, RunAttempt, TestId],
Expand Down Expand Up @@ -436,7 +432,6 @@ def _build_test_signals(
workflow_name=wf_name,
commits=commit_objs,
job_base_name=str(job_base_name),
source=SignalSource.TEST,
)
)

Expand Down Expand Up @@ -497,15 +492,16 @@ def _build_non_test_signals(
# Map aggregation verdict to outer SignalStatus
if meta.status is None:
continue
if meta.status == AggStatus.FAILURE and meta.has_non_test_failures:
if meta.status == AggStatus.FAILURE:
# mark presence of non-test failures (relevant for job track)
has_relevant_failures = True
if meta.has_non_test_failures:
has_relevant_failures = True

ev_status = SignalStatus.FAILURE
elif meta.status == AggStatus.PENDING:
ev_status = SignalStatus.PENDING
else:
# Note: when all failures are caused by tests, we do NOT emit job-level failures
elif meta.status == AggStatus.SUCCESS:
ev_status = SignalStatus.SUCCESS
else:
ev_status = SignalStatus.PENDING

# Extract wf_run_id/run_attempt from the attempt key
_, _, _, wf_run_id, run_attempt = akey
Expand Down Expand Up @@ -543,7 +539,6 @@ def _build_non_test_signals(
workflow_name=wf_name,
commits=commit_objs,
job_base_name=str(base_name),
source=SignalSource.JOB,
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,12 @@ def fetch_tests_for_job_ids(
job_ids: List[JobId],
*,
failed_job_ids: List[JobId],
lookback_hours: int,
) -> List[TestRow]:
"""Batch fetch test verdict rows from tests.all_test_runs for given job ids.
"""Batch fetch test verdict rows from default.test_run_s3 for given job ids.

If failed_job_ids is provided, first compute the set of failed test identifiers
(file+classname+name) from those jobs, and only fetch tests for job_ids that
match that set. This reduces the result size significantly.
Additionally, constrain by the table's partition (toDate(time_inserted))
using NOW() and the lookback window with a 1-day margin to avoid timezone issues.
"""
log = logging.getLogger(__name__)
if not job_ids:
Expand Down Expand Up @@ -227,31 +224,25 @@ def fetch_tests_for_job_ids(
)
# One query with a CTE that enumerates failed test ids from failed_job_ids,
# then filters the main selection by those ids for the current chunk.
# Partition pruning: restrict toDate(time_inserted) to the lookback window
# with a 1 day margin using NOW() to avoid timezone handling.
# Note: success_runs explicitly excludes skipped rows via skipped_count = 0.
query = """
WITH failed_test_names AS (
SELECT DISTINCT concat(file, '|', classname, '|', name) AS test_id
FROM tests.all_test_runs
FROM default.test_run_s3
WHERE job_id IN {failed_job_ids:Array(Int64)}
AND (failure_count > 0 OR error_count > 0)
AND toDate(time_inserted) >=
toDate(NOW() - toIntervalHour({lookback_hours:Int32}) - toIntervalDay(1))
)
SELECT job_id, workflow_id, workflow_run_attempt, file, classname, name,
countIf(failure_count > 0 OR error_count > 0) AS failure_runs,
countIf(failure_count = 0 AND error_count = 0 AND skipped_count = 0) AS success_runs
FROM tests.all_test_runs
FROM default.test_run_s3
WHERE job_id IN {job_ids:Array(Int64)}
AND concat(file, '|', classname, '|', name) IN failed_test_names
AND toDate(time_inserted) >=
toDate(NOW() - toIntervalHour({lookback_hours:Int32}) - toIntervalDay(1))
GROUP BY job_id, workflow_id, workflow_run_attempt, file, classname, name
"""
params = {
"job_ids": [int(j) for j in chunk],
"failed_job_ids": [int(j) for j in failed_job_ids],
"lookback_hours": int(lookback_hours),
}

for attempt in RetryWithBackoff():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def is_test_failure(self) -> bool:
return self.is_failure and (self.rule in DEFAULT_TEST_RULES)


# Represents a test verdict row from the tests.all_test_runs table in ClickHouse
# Represents a test verdict row from the test_run_s3 table in ClickHouse
@dataclass(frozen=True)
class TestRow:
job_id: JobId
Expand Down
Loading