Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions prometheus_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ scrape_configs:
scrape_interval: 5s
static_configs:
- targets: ["p2w-attest:3000"]
- job_name: price_service
scrape_interval: 5s
static_configs:
- targets: ["pyth-price-service:8081"]
90 changes: 73 additions & 17 deletions third_party/pyth/price-service/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions third_party/pyth/price-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"express-validation": "^4.0.1",
"http-status-codes": "^2.2.0",
"joi": "^17.6.0",
"lru-cache": "^7.14.1",
"morgan": "^1.10.0",
"prom-client": "^14.0.1",
"response-time": "^2.3.2",
Expand Down
3 changes: 2 additions & 1 deletion third_party/pyth/price-service/src/__tests__/rest.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ function dummyPriceInfoPair(
id,
{
priceFeed: dummyPriceFeed(id),
publishTime: 0,
attestationTime: 0,
seqNum,
vaaBytes: Buffer.from(vaa, "hex").toString("binary"),
vaa: Buffer.from(vaa, "hex"),
emitterChainId: 0,
priceServiceReceiveTime: 0,
},
Expand Down
3 changes: 2 additions & 1 deletion third_party/pyth/price-service/src/__tests__/ws.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ function dummyPriceInfo(
): PriceInfo {
return {
seqNum: dummyPriceMetadataValue.sequence_number,
publishTime: 0,
attestationTime: dummyPriceMetadataValue.attestation_time,
emitterChainId: dummyPriceMetadataValue.emitter_chain,
priceFeed: dummyPriceFeed(id),
vaaBytes: Buffer.from(vaa, "hex").toString("binary"),
vaa: Buffer.from(vaa, "hex"),
priceServiceReceiveTime: dummyPriceMetadataValue.price_service_receive_time,
};
}
Expand Down
68 changes: 49 additions & 19 deletions third_party/pyth/price-service/src/listen.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
import {
ChainId,
hexToUint8Array,
uint8ArrayToHex,
} from "@certusone/wormhole-sdk";
import { ChainId, uint8ArrayToHex } from "@certusone/wormhole-sdk";

import {
createSpyRPCServiceClient,
Expand All @@ -11,6 +7,8 @@ import {

import { importCoreWasm } from "@certusone/wormhole-sdk/lib/cjs/solana/wasm";

import { createHash } from "crypto";

import {
getBatchSummary,
parseBatchPriceAttestation,
Expand All @@ -25,10 +23,12 @@ import { HexString, PriceFeed } from "@pythnetwork/pyth-sdk-js";
import { sleep, TimestampInSec } from "./helpers";
import { logger } from "./logging";
import { PromClient } from "./promClient";
import LRUCache from "lru-cache";

export type PriceInfo = {
vaaBytes: string;
vaa: Buffer;
seqNum: number;
publishTime: TimestampInSec;
attestationTime: TimestampInSec;
priceFeed: PriceFeed;
emitterChainId: number;
Expand All @@ -52,6 +52,8 @@ type ListenerConfig = {
readiness: ListenerReadinessConfig;
};

type VaaHash = string;

export class Listener implements PriceStore {
// Mapping of Price Feed Id to Vaa
private priceFeedVaaMap = new Map<string, PriceInfo>();
Expand All @@ -61,13 +63,18 @@ export class Listener implements PriceStore {
private spyConnectionTime: TimestampInSec | undefined;
private readinessConfig: ListenerReadinessConfig;
private updateCallbacks: ((priceInfo: PriceInfo) => any)[];
private observedVaas: LRUCache<VaaHash, boolean>;

constructor(config: ListenerConfig, promClient?: PromClient) {
this.promClient = promClient;
this.spyServiceHost = config.spyServiceHost;
this.loadFilters(config.filtersRaw);
this.readinessConfig = config.readiness;
this.updateCallbacks = [];
this.observedVaas = new LRUCache({
max: 10000, // At most 10000 items
ttl: 60 * 1000, // 60 seconds
});
}

private loadFilters(filtersRaw?: string) {
Expand Down Expand Up @@ -114,7 +121,7 @@ export class Listener implements PriceStore {
);
stream = await subscribeSignedVAA(client, { filters: this.filters });

stream!.on("data", ({ vaaBytes }: { vaaBytes: string }) => {
stream!.on("data", ({ vaaBytes }: { vaaBytes: Buffer }) => {
this.processVaa(vaaBytes);
});

Expand Down Expand Up @@ -150,19 +157,29 @@ export class Listener implements PriceStore {
}
}

async processVaa(vaaBytes: string) {
async processVaa(vaa: Buffer) {
const { parse_vaa } = await importCoreWasm();
const parsedVAA = parse_vaa(hexToUint8Array(vaaBytes));

const vaaHash: VaaHash = createHash("md5").update(vaa).digest("base64");

if (this.observedVaas.has(vaaHash)) {
return;
}

this.observedVaas.set(vaaHash, true);
this.promClient?.incReceivedVaa();

const parsedVaa = parse_vaa(vaa);

let batchAttestation;

try {
batchAttestation = await parseBatchPriceAttestation(
Buffer.from(parsedVAA.payload)
Buffer.from(parsedVaa.payload)
);
} catch (e: any) {
logger.error(e, e.stack);
logger.error("Parsing failed. Dropping vaa: %o", parsedVAA);
logger.error("Parsing failed. Dropping vaa: %o", parsedVaa);
return;
}

Expand Down Expand Up @@ -194,15 +211,30 @@ export class Listener implements PriceStore {
) {
const priceFeed = priceAttestationToPriceFeed(priceAttestation);
const priceInfo = {
seqNum: parsedVAA.sequence,
vaaBytes,
seqNum: parsedVaa.sequence,
vaa,
publishTime: priceAttestation.publishTime,
attestationTime: priceAttestation.attestationTime,
priceFeed,
emitterChainId: parsedVAA.emitter_chain,
emitterChainId: parsedVaa.emitter_chain,
priceServiceReceiveTime: Math.floor(new Date().getTime() / 1000),
};
this.priceFeedVaaMap.set(key, priceInfo);

if (lastAttestationTime !== undefined) {
this.promClient?.addPriceUpdatesAttestationTimeGap(
priceAttestation.attestationTime - lastAttestationTime
);
}

const lastPublishTime = this.priceFeedVaaMap.get(key)?.publishTime;

if (lastPublishTime !== undefined) {
this.promClient?.addPriceUpdatesPublishTimeGap(
priceAttestation.publishTime - lastPublishTime
);
}

for (const callback of this.updateCallbacks) {
callback(priceInfo);
}
Expand All @@ -211,16 +243,14 @@ export class Listener implements PriceStore {

logger.info(
"Parsed a new Batch Price Attestation: [" +
parsedVAA.emitter_chain +
parsedVaa.emitter_chain +
":" +
uint8ArrayToHex(parsedVAA.emitter_address) +
uint8ArrayToHex(parsedVaa.emitter_address) +
"], seqNum: " +
parsedVAA.sequence +
parsedVaa.sequence +
", Batch Summary: " +
getBatchSummary(batchAttestation)
);

this.promClient?.incReceivedVaa();
}

getLatestPriceInfo(priceFeedId: string): PriceInfo | undefined {
Expand Down
Loading