Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion price_service/server/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pythnetwork/price-service-server",
"version": "2.3.4",
"version": "2.3.5",
"description": "Pyth price pervice server",
"main": "index.js",
"scripts": {
Expand Down
47 changes: 29 additions & 18 deletions price_service/server/src/listen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,13 @@ export class Listener implements PriceStore {
this.promClient = promClient;
this.spyServiceHost = config.spyServiceHost;
this.loadFilters(config.filtersRaw);
// Don't store any prices received from wormhole that are over 1 hour old.
this.ignorePricesOlderThanSecs = 60 * 60;
// Don't store any prices received from wormhole that are over 5 minutes old.
this.ignorePricesOlderThanSecs = 5 * 60;
this.readinessConfig = config.readiness;
this.updateCallbacks = [];
this.observedVaas = new LRUCache({
max: 10000, // At most 10000 items
ttl: 60 * 1000, // 60 seconds
max: 100000, // At most 100000 items
ttl: 6 * 60 * 1000, // 6 minutes which is longer than ignorePricesOlderThanSecs
});
this.vaasCache = new VaaCache(
config.cacheTtl,
Expand Down Expand Up @@ -255,16 +255,6 @@ export class Listener implements PriceStore {
cachedInfo: PriceInfo | undefined,
observedInfo: PriceInfo
): boolean {
// Sometimes we get old VAAs from wormhole (for unknown reasons). These VAAs can include price feeds that
// were deleted and haven't been updated in a long time. This check filters out such feeds so they don't trigger
// the stale feeds check.
if (
observedInfo.attestationTime <
this.currentTimeInSeconds() - this.ignorePricesOlderThanSecs
) {
return false;
}

if (cachedInfo === undefined) {
return true;
}
Expand All @@ -289,19 +279,17 @@ export class Listener implements PriceStore {
const vaaEmitterAddressHex = Buffer.from(parsedVaa.emitterAddress).toString(
"hex"
);

const observedVaasKey: VaaKey = `${parsedVaa.emitterChain}#${vaaEmitterAddressHex}#${parsedVaa.sequence}`;

if (this.observedVaas.has(observedVaasKey)) {
return;
}

this.observedVaas.set(observedVaasKey, true);
this.promClient?.incReceivedVaa();

let batchAttestation;

try {
batchAttestation = await parseBatchPriceAttestation(
batchAttestation = parseBatchPriceAttestation(
Buffer.from(parsedVaa.payload)
);
} catch (e: any) {
Expand All @@ -310,6 +298,29 @@ export class Listener implements PriceStore {
return;
}

if (batchAttestation.priceAttestations.length === 0) {
return;
}

// Attestation time is the same in all feeds in the batch.
// Return early if an attestation is old to exclude it from
// the counter metric.
if (
batchAttestation.priceAttestations[0].attestationTime <
this.currentTimeInSeconds() - this.ignorePricesOlderThanSecs
) {
return;
}

// There is no `await` clause to release the current thread since the previous check
// but this is here to ensure this is correct as the code evolves.
if (this.observedVaas.has(observedVaasKey)) {
return;
} else {
this.observedVaas.set(observedVaasKey, true);
this.promClient?.incReceivedVaa();
}

for (const priceAttestation of batchAttestation.priceAttestations) {
const key = priceAttestation.priceId;

Expand Down