@@ -83,20 +83,26 @@ export class SuiPriceListener extends ChainPriceListener {
8383
8484export class SuiPricePusher implements IPricePusher {
8585 private readonly signer : RawSigner ;
86+ // Sui transactions can error if they're sent concurrently. This flag tracks whether an update is in-flight,
87+ // so we can skip sending another update at the same time.
88+ private isAwaitingTx : boolean ;
89+
8690 constructor (
8791 private priceServiceConnection : PriceServiceConnection ,
8892 private pythPackageId : string ,
8993 private pythStateId : string ,
9094 private wormholePackageId : string ,
9195 private wormholeStateId : string ,
9296 private priceFeedToPriceInfoObjectTableId : string ,
97+ private maxVaasPerPtb : number ,
9398 endpoint : string ,
9499 mnemonic : string
95100 ) {
96101 this . signer = new RawSigner (
97102 Ed25519Keypair . deriveKeypair ( mnemonic ) ,
98103 new JsonRpcProvider ( new Connection ( { fullnode : endpoint } ) )
99104 ) ;
105+ this . isAwaitingTx = false ;
100106 }
101107
102108 async updatePriceFeed (
@@ -110,10 +116,64 @@ export class SuiPricePusher implements IPricePusher {
110116 if ( priceIds . length !== pubTimesToPush . length )
111117 throw new Error ( "Invalid arguments" ) ;
112118
113- const tx = new TransactionBlock ( ) ;
119+ if ( this . isAwaitingTx ) {
120+ console . log (
121+ "Skipping update: previous price update transaction(s) have not completed."
122+ ) ;
123+ return ;
124+ }
125+
126+ const priceFeeds = await this . priceServiceConnection . getLatestPriceFeeds (
127+ priceIds
128+ ) ;
129+ if ( priceFeeds === undefined ) {
130+ console . log ( "Failed to fetch price updates. Skipping push." ) ;
131+ return ;
132+ }
114133
115- const vaas = await this . priceServiceConnection . getLatestVaas ( priceIds ) ;
134+ const vaaToPriceFeedIds : Map < string , string [ ] > = new Map ( ) ;
135+ for ( const priceFeed of priceFeeds ) {
136+ // The ! will succeed as long as the priceServiceConnection is configured to return binary vaa data (which it is).
137+ const vaa = priceFeed . getVAA ( ) ! ;
138+ if ( ! vaaToPriceFeedIds . has ( vaa ) ) {
139+ vaaToPriceFeedIds . set ( vaa , [ ] ) ;
140+ }
141+ vaaToPriceFeedIds . get ( vaa ) ! . push ( priceFeed . id ) ;
142+ }
143+
144+ const txs = [ ] ;
145+ let currentBatchVaas = [ ] ;
146+ let currentBatchPriceFeedIds = [ ] ;
147+ for ( const [ vaa , priceFeedIds ] of Object . entries ( vaaToPriceFeedIds ) ) {
148+ currentBatchVaas . push ( vaa ) ;
149+ currentBatchPriceFeedIds . push ( ...priceFeedIds ) ;
150+ if ( currentBatchVaas . length >= this . maxVaasPerPtb ) {
151+ const tx = await this . createPriceUpdateTransaction (
152+ currentBatchVaas ,
153+ currentBatchPriceFeedIds
154+ ) ;
155+ if ( tx !== undefined ) {
156+ txs . push ( tx ) ;
157+ }
116158
159+ currentBatchVaas = [ ] ;
160+ currentBatchPriceFeedIds = [ ] ;
161+ }
162+ }
163+
164+ try {
165+ this . isAwaitingTx = true ;
166+ await this . sendTransactionBlocks ( txs ) ;
167+ } finally {
168+ this . isAwaitingTx = false ;
169+ }
170+ }
171+
172+ private async createPriceUpdateTransaction (
173+ vaas : string [ ] ,
174+ priceIds : string [ ]
175+ ) : Promise < TransactionBlock | undefined > {
176+ const tx = new TransactionBlock ( ) ;
117177 // Parse our batch price attestation VAA bytes using Wormhole.
118178 // Check out the Wormhole cross-chain bridge and generic messaging protocol here:
119179 // https://github.com/wormhole-foundation/wormhole
@@ -158,7 +218,7 @@ export class SuiPricePusher implements IPricePusher {
158218 } catch ( e ) {
159219 console . log ( "Error fetching price info object id for " , priceId ) ;
160220 console . error ( e ) ;
161- return ;
221+ return undefined ;
162222 }
163223 const coin = tx . splitCoins ( tx . gas , [ tx . pure ( 1 ) ] ) ;
164224 [ price_updates_hot_potato ] = tx . moveCall ( {
@@ -181,30 +241,36 @@ export class SuiPricePusher implements IPricePusher {
181241 typeArguments : [ `${ this . pythPackageId } ::price_info::PriceInfo` ] ,
182242 } ) ;
183243
184- try {
185- const result = await this . signer . signAndExecuteTransactionBlock ( {
186- transactionBlock : tx ,
187- options : {
188- showInput : true ,
189- showEffects : true ,
190- showEvents : true ,
191- showObjectChanges : true ,
192- showBalanceChanges : true ,
193- } ,
194- } ) ;
244+ return tx ;
245+ }
195246
196- console . log (
197- "Successfully updated price with transaction digest " ,
198- result . digest
199- ) ;
200- } catch ( e ) {
201- console . log ( "Error when signAndExecuteTransactionBlock" ) ;
202- if ( String ( e ) . includes ( "GasBalanceTooLow" ) ) {
203- console . log ( "Insufficient Gas Amount. Please top up your account" ) ;
204- process . exit ( ) ;
247+ /** Send every transaction in txs sequentially, returning when all transactions have completed. */
248+ private async sendTransactionBlocks ( txs : TransactionBlock [ ] ) : Promise < void > {
249+ for ( const tx of txs ) {
250+ try {
251+ const result = await this . signer . signAndExecuteTransactionBlock ( {
252+ transactionBlock : tx ,
253+ options : {
254+ showInput : true ,
255+ showEffects : true ,
256+ showEvents : true ,
257+ showObjectChanges : true ,
258+ showBalanceChanges : true ,
259+ } ,
260+ } ) ;
261+
262+ console . log (
263+ "Successfully updated price with transaction digest " ,
264+ result . digest
265+ ) ;
266+ } catch ( e ) {
267+ console . log ( "Error when signAndExecuteTransactionBlock" ) ;
268+ if ( String ( e ) . includes ( "GasBalanceTooLow" ) ) {
269+ console . log ( "Insufficient Gas Amount. Please top up your account" ) ;
270+ process . exit ( ) ;
271+ }
272+ console . error ( e ) ;
205273 }
206- console . error ( e ) ;
207- return ;
208274 }
209275 }
210276}
0 commit comments