Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 0 additions & 29 deletions third_party/pyth/p2w_autoattest.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,34 +168,6 @@
# modules like async HTTP requests and tokio runtime logs
os.environ["RUST_LOG"] = os.environ.get("RUST_LOG", "info")

# Send the first attestation in one-shot mode for testing
first_attest_result = run_or_die(
[
"pwhac",
"--commitment",
"confirmed",
"--p2w-addr",
P2W_SOL_ADDRESS,
"--rpc-url",
SOL_RPC_URL,
"--payer",
SOL_PAYER_KEYPAIR,
"attest",
"-f",
P2W_ATTESTATION_CFG,
"--timeout",
P2W_RPC_TIMEOUT_SECS,
],
capture_output=True,
debug = True,
)

logging.info("p2w_autoattest ready to roll!")

# Let k8s know the service is up
readiness_thread = threading.Thread(target=readiness, daemon=True)
readiness_thread.start()

# Do not exit this script if a continuous attestation stops for
# whatever reason (this avoids k8s restart penalty)
while True:
Expand All @@ -214,7 +186,6 @@
"attest",
"-f",
P2W_ATTESTATION_CFG,
"-d",
"--timeout",
P2W_RPC_TIMEOUT_SECS,
]
Expand Down
2 changes: 1 addition & 1 deletion wormhole-attester/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion wormhole-attester/client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-wormhole-attester-client"
version = "2.0.0"
version = "3.0.0"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
21 changes: 0 additions & 21 deletions wormhole-attester/client/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,27 +59,6 @@ pub enum Action {
Attest {
#[clap(short = 'f', long = "--config", help = "Attestation YAML config")]
attestation_cfg: PathBuf,
#[clap(
short = 'n',
long = "--n-retries",
help = "How many times to retry send_transaction() on each batch before flagging a failure. Only active outside daemon mode",
default_value = "5"
)]
n_retries: usize,
#[clap(
short = 'i',
long = "--retry-interval",
help = "How long to wait between send_transaction
retries. Only active outside daemon mode",
default_value = "5"
)]
retry_interval_secs: u64,
#[clap(
short = 'd',
long = "--daemon",
help = "Do not stop attesting. In this mode, this program will behave more like a daemon and continuously attest the specified symbols."
)]
daemon: bool,
#[clap(
short = 't',
long = "--timeout",
Expand Down
159 changes: 2 additions & 157 deletions wormhole-attester/client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,8 @@ async fn main() -> Result<(), ErrBox> {
}
Action::Attest {
ref attestation_cfg,
n_retries,
retry_interval_secs,
confirmation_timeout_secs,
metrics_bind_addr,
daemon,
} => {
// Load the attestation config yaml
let attestation_cfg: AttestationConfig =
Expand All @@ -249,26 +246,7 @@ async fn main() -> Result<(), ErrBox> {
Duration::from_millis(attestation_cfg.min_rpc_interval_ms),
));

if daemon {
handle_attest_daemon_mode(
rpc_cfg,
payer,
p2w_addr,
attestation_cfg,
metrics_bind_addr,
)
.await?;
} else {
handle_attest_non_daemon_mode(
attestation_cfg,
rpc_cfg,
p2w_addr,
payer,
n_retries,
Duration::from_secs(retry_interval_secs),
)
.await?;
}
handle_attest(rpc_cfg, payer, p2w_addr, attestation_cfg, metrics_bind_addr).await?;
}
Action::GetEmitter => unreachable! {}, // It is handled early in this function.
Action::SetIsActive {
Expand Down Expand Up @@ -296,7 +274,7 @@ async fn main() -> Result<(), ErrBox> {
}

/// Continuously send batch attestations for symbols of an attestation config.
async fn handle_attest_daemon_mode(
async fn handle_attest(
rpc_cfg: Arc<RLMutex<RpcCfg>>,
payer: Keypair,
p2w_addr: Pubkey,
Expand Down Expand Up @@ -463,76 +441,6 @@ async fn lock_and_make_rpc(rlmtx: &RLMutex<RpcCfg>) -> RpcClient {
RpcClient::new_with_timeout_and_commitment(url, timeout, commitment)
}

/// Non-daemon attestation scheduling
async fn handle_attest_non_daemon_mode(
attestation_cfg: AttestationConfig,
rpc_cfg: Arc<RLMutex<RpcCfg>>,
p2w_addr: Pubkey,
payer: Keypair,
n_retries: usize,
retry_interval: Duration,
) -> Result<(), ErrBox> {
let p2w_cfg = get_config_account(&lock_and_make_rpc(&rpc_cfg).await, &p2w_addr).await?;

let batch_config =
attestation_config_to_batches(&rpc_cfg, &attestation_cfg, p2w_cfg.max_batch_size as usize)
.await
.unwrap_or_else(|_| {
attestation_cfg.instantiate_batches(&[], p2w_cfg.max_batch_size as usize)
});

let batches: Vec<_> = batch_config
.into_iter()
.map(|x| BatchState::new(&x))
.collect();
let batch_count = batches.len();

// For enforcing min_msg_reuse_interval_ms, we keep a piece of
// state that creates or reuses accounts if enough time had
// passed
let message_q_mtx = Arc::new(Mutex::new(P2WMessageQueue::new(
Duration::from_millis(attestation_cfg.min_msg_reuse_interval_ms),
attestation_cfg.max_msg_accounts as usize,
)));

let retry_jobs = batches.into_iter().enumerate().map(|(idx, batch_state)| {
attestation_retry_job(AttestationRetryJobArgs {
batch_no: idx + 1,
batch_count,
group_name: batch_state.group_name,
symbols: batch_state.symbols,
n_retries,
retry_interval,
rpc_cfg: rpc_cfg.clone(),
p2w_addr,
p2w_config: p2w_cfg.clone(),
payer: Keypair::from_bytes(&payer.to_bytes()).unwrap(),
message_q_mtx: message_q_mtx.clone(),
})
});

let results = futures::future::join_all(retry_jobs).await;

// After completing, we count any errors coming from the sched
// futs.
let errors: Vec<_> = results
.iter()
.enumerate()
.filter_map(|(idx, r)| {
r.as_ref()
.err()
.map(|e| format!("Error {}: {:?}\n", idx + 1, e))
})
.collect();

if !errors.is_empty() {
let err_lines = errors.join("\n");
let msg = format!("{} batches failed:\n{}", errors.len(), err_lines);
error!("{}", msg);
return Err(msg.into());
}
Ok(())
}

/// Generate batches to attest by retrieving the on-chain product account data and grouping it
/// according to the configuration in `attestation_cfg`.
Expand Down Expand Up @@ -692,69 +600,6 @@ async fn attestation_sched_job(args: AttestationSchedJobArgs) -> Result<(), ErrB
}
}

pub struct AttestationRetryJobArgs {
pub batch_no: usize,
pub batch_count: usize,
pub group_name: String,
pub symbols: Vec<P2WSymbol>,
pub n_retries: usize,
pub retry_interval: Duration,
pub rpc_cfg: Arc<RLMutex<RpcCfg>>,
pub p2w_addr: Pubkey,
pub p2w_config: Pyth2WormholeConfig,
pub payer: Keypair,
pub message_q_mtx: Arc<Mutex<P2WMessageQueue>>,
}

/// A future that cranks a batch up to n_retries times, pausing for
/// retry_interval in between; Used exclusively in non-daemon mode
async fn attestation_retry_job(args: AttestationRetryJobArgs) -> Result<(), ErrBoxSend> {
let AttestationRetryJobArgs {
batch_no,
batch_count,
group_name,
symbols,
n_retries,
retry_interval,
rpc_cfg,
p2w_addr,
p2w_config,
payer,
message_q_mtx,
} = args;

let mut res = Err(
"attestation_retry_job INTERNAL: Could not get a single attestation job result"
.to_string()
.into(),
);

for _i in 0..=n_retries {
res = attestation_job(AttestationJobArgs {
rlmtx: rpc_cfg.clone(),
batch_no,
batch_count,
group_name: group_name.clone(),
p2w_addr,
config: p2w_config.clone(),
payer: Keypair::from_bytes(&payer.to_bytes()).unwrap(), // Keypair has no clone
symbols: symbols.clone(),
max_jobs_sema: Arc::new(Semaphore::new(1)), // Not important for non-daemon mode
message_q_mtx: message_q_mtx.clone(),
})
.await;

// Finish early on success
if res.is_ok() {
break;
}

tokio::time::sleep(retry_interval).await;
}

res
}

/// Arguments for attestation_job(). This struct rules out same-type
/// ordering errors due to the large argument count
pub struct AttestationJobArgs {
Expand Down