diff --git a/package-lock.json b/package-lock.json index 8dbe6c67aa..ba8c433c18 100644 --- a/package-lock.json +++ b/package-lock.json @@ -46331,7 +46331,7 @@ }, "price_service/server": { "name": "@pythnetwork/price-service-server", - "version": "2.3.4", + "version": "2.3.5", "license": "Apache-2.0", "dependencies": { "@certusone/wormhole-sdk": "^0.9.9", diff --git a/price_service/server/package.json b/price_service/server/package.json index c577298d22..633ad428d0 100644 --- a/price_service/server/package.json +++ b/price_service/server/package.json @@ -1,6 +1,6 @@ { "name": "@pythnetwork/price-service-server", - "version": "2.3.4", + "version": "2.3.5", "description": "Pyth price pervice server", "main": "index.js", "scripts": { diff --git a/price_service/server/src/listen.ts b/price_service/server/src/listen.ts index 36167ab0ab..bf26c0d1ea 100644 --- a/price_service/server/src/listen.ts +++ b/price_service/server/src/listen.ts @@ -157,13 +157,13 @@ export class Listener implements PriceStore { this.promClient = promClient; this.spyServiceHost = config.spyServiceHost; this.loadFilters(config.filtersRaw); - // Don't store any prices received from wormhole that are over 1 hour old. - this.ignorePricesOlderThanSecs = 60 * 60; + // Don't store any prices received from wormhole that are over 5 minutes old. + this.ignorePricesOlderThanSecs = 5 * 60; this.readinessConfig = config.readiness; this.updateCallbacks = []; this.observedVaas = new LRUCache({ - max: 10000, // At most 10000 items - ttl: 60 * 1000, // 60 seconds + max: 100000, // At most 100000 items + ttl: 6 * 60 * 1000, // 6 minutes which is longer than ignorePricesOlderThanSecs }); this.vaasCache = new VaaCache( config.cacheTtl, @@ -255,16 +255,6 @@ export class Listener implements PriceStore { cachedInfo: PriceInfo | undefined, observedInfo: PriceInfo ): boolean { - // Sometimes we get old VAAs from wormhole (for unknown reasons). These VAAs can include price feeds that - // were deleted and haven't been updated in a long time. This check filters out such feeds so they don't trigger - // the stale feeds check. - if ( - observedInfo.attestationTime < - this.currentTimeInSeconds() - this.ignorePricesOlderThanSecs - ) { - return false; - } - if (cachedInfo === undefined) { return true; } @@ -289,19 +279,17 @@ export class Listener implements PriceStore { const vaaEmitterAddressHex = Buffer.from(parsedVaa.emitterAddress).toString( "hex" ); + const observedVaasKey: VaaKey = `${parsedVaa.emitterChain}#${vaaEmitterAddressHex}#${parsedVaa.sequence}`; if (this.observedVaas.has(observedVaasKey)) { return; } - this.observedVaas.set(observedVaasKey, true); - this.promClient?.incReceivedVaa(); - let batchAttestation; try { - batchAttestation = await parseBatchPriceAttestation( + batchAttestation = parseBatchPriceAttestation( Buffer.from(parsedVaa.payload) ); } catch (e: any) { @@ -310,6 +298,29 @@ export class Listener implements PriceStore { return; } + if (batchAttestation.priceAttestations.length === 0) { + return; + } + + // Attestation time is the same in all feeds in the batch. + // Return early if an attestation is old to exclude it from + // the counter metric. + if ( + batchAttestation.priceAttestations[0].attestationTime < + this.currentTimeInSeconds() - this.ignorePricesOlderThanSecs + ) { + return; + } + + // There is no `await` clause to release the current thread since the previous check + // but this is here to ensure this is correct as the code evolves. + if (this.observedVaas.has(observedVaasKey)) { + return; + } else { + this.observedVaas.set(observedVaasKey, true); + this.promClient?.incReceivedVaa(); + } + for (const priceAttestation of batchAttestation.priceAttestations) { const key = priceAttestation.priceId;