Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions price_pusher/src/controller.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { UnixTimestamp } from "@pythnetwork/pyth-evm-js";
import { DurationInSeconds, sleep } from "./utils";
import { ChainPricePusher, PriceListener } from "./interface";
import { ChainPricePusher, IPriceListener } from "./interface";
import { PriceConfig, shouldUpdate } from "./price-config";

export class Controller {
private cooldownDuration: DurationInSeconds;
constructor(
private priceConfigs: PriceConfig[],
private sourcePriceListener: PriceListener,
private targetPriceListener: PriceListener,
private sourcePriceListener: IPriceListener,
private targetPriceListener: IPriceListener,
private targetChainPricePusher: ChainPricePusher,
config: {
cooldownDuration: DurationInSeconds;
Expand Down
53 changes: 9 additions & 44 deletions price_pusher/src/evm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
} from "@pythnetwork/pyth-evm-js";
import { Contract, EventData } from "web3-eth-contract";
import { PriceConfig } from "./price-config";
import { ChainPricePusher, PriceInfo, PriceListener } from "./interface";
import { ChainPricePusher, PriceInfo, ChainPriceListener } from "./interface";
import { TransactionReceipt } from "ethereum-protocol";
import { addLeading0x, DurationInSeconds, removeLeading0x } from "./utils";
import AbstractPythAbi from "@pythnetwork/pyth-sdk-solidity/abis/AbstractPyth.json";
Expand All @@ -14,30 +14,27 @@ import { Provider } from "web3/providers";
import Web3 from "web3";
import { isWsEndpoint } from "./utils";

export class EvmPriceListener implements PriceListener {
export class EvmPriceListener extends ChainPriceListener {
private pythContractFactory: PythContractFactory;
private pythContract: Contract;
private latestPriceInfo: Map<HexString, PriceInfo>;
private priceIds: HexString[];
private priceIdToAlias: Map<HexString, string>;

private pollingFrequency: DurationInSeconds;

constructor(
pythContractFactory: PythContractFactory,
priceConfigs: PriceConfig[],
config: {
pollingFrequency: DurationInSeconds;
}
) {
this.latestPriceInfo = new Map();
this.priceIds = priceConfigs.map((priceConfig) => priceConfig.id);
super(
"Evm",
config.pollingFrequency,
priceConfigs.map((priceConfig) => priceConfig.id)
);
this.priceIdToAlias = new Map(
priceConfigs.map((priceConfig) => [priceConfig.id, priceConfig.alias])
);

this.pollingFrequency = config.pollingFrequency;

this.pythContractFactory = pythContractFactory;
this.pythContract = this.pythContractFactory.createPythContract();
}
Expand All @@ -55,10 +52,8 @@ export class EvmPriceListener implements PriceListener {
);
}

console.log(`Polling the prices every ${this.pollingFrequency} seconds...`);
setInterval(this.pollPrices.bind(this), this.pollingFrequency * 1000);

await this.pollPrices();
// base class for polling
await super.start();
}

private async startSubscription() {
Expand Down Expand Up @@ -97,20 +92,6 @@ export class EvmPriceListener implements PriceListener {
this.updateLatestPriceInfo(priceId, priceInfo);
}

private async pollPrices() {
console.log("Polling evm prices...");
for (const priceId of this.priceIds) {
const currentPriceInfo = await this.getOnChainPriceInfo(priceId);
if (currentPriceInfo !== undefined) {
this.updateLatestPriceInfo(priceId, currentPriceInfo);
}
}
}

getLatestPriceInfo(priceId: string): PriceInfo | undefined {
return this.latestPriceInfo.get(priceId);
}

async getOnChainPriceInfo(
priceId: HexString
): Promise<PriceInfo | undefined> {
Expand All @@ -131,22 +112,6 @@ export class EvmPriceListener implements PriceListener {
publishTime: Number(priceRaw.publishTime),
};
}

private updateLatestPriceInfo(priceId: HexString, observedPrice: PriceInfo) {
const cachedLatestPriceInfo = this.getLatestPriceInfo(priceId);

// Ignore the observed price if the cache already has newer
// price. This could happen because we are using polling and
// subscription at the same time.
if (
cachedLatestPriceInfo !== undefined &&
cachedLatestPriceInfo.publishTime > observedPrice.publishTime
) {
return;
}

this.latestPriceInfo.set(priceId, observedPrice);
}
}

export class EvmPricePusher implements ChainPricePusher {
Expand Down
56 changes: 7 additions & 49 deletions price_pusher/src/injective.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { HexString, PriceServiceConnection } from "@pythnetwork/pyth-common-js";
import { ChainPricePusher, PriceInfo, PriceListener } from "./interface";
import { ChainPricePusher, PriceInfo, ChainPriceListener } from "./interface";
import { DurationInSeconds } from "./utils";
import { PriceConfig } from "./price-config";
import {
Expand Down Expand Up @@ -32,13 +32,7 @@ type UpdateFeeResponse = {
};

// this use price without leading 0x
// FIXME: implement common methods in the parent class
export class InjectivePriceListener implements PriceListener {
private latestPriceInfo: Map<HexString, PriceInfo>;
private priceIds: HexString[];

private pollingFrequency: DurationInSeconds;

export class InjectivePriceListener extends ChainPriceListener {
constructor(
private contractAddress: string,
private grpcEndpoint: string,
Expand All @@ -47,27 +41,11 @@ export class InjectivePriceListener implements PriceListener {
pollingFrequency: DurationInSeconds;
}
) {
this.latestPriceInfo = new Map();
this.priceIds = priceConfigs.map((priceConfig) => priceConfig.id);

this.pollingFrequency = config.pollingFrequency;
}

async start() {
console.log(`Polling the prices every ${this.pollingFrequency} seconds...`);
setInterval(this.pollPrices.bind(this), this.pollingFrequency * 1000);

await this.pollPrices();
}

private async pollPrices() {
console.log("Polling injective prices...");
for (const priceId of this.priceIds) {
const currentPriceInfo = await this.getOnChainPriceInfo(priceId);
if (currentPriceInfo !== undefined) {
this.updateLatestPriceInfo(priceId, currentPriceInfo);
}
}
super(
"Injective",
config.pollingFrequency,
priceConfigs.map((priceConfig) => priceConfig.id)
);
}

async getOnChainPriceInfo(
Expand Down Expand Up @@ -95,26 +73,6 @@ export class InjectivePriceListener implements PriceListener {
publishTime: priceQueryResponse.price_feed.price.publish_time,
};
}

private updateLatestPriceInfo(priceId: HexString, observedPrice: PriceInfo) {
const cachedLatestPriceInfo = this.getLatestPriceInfo(priceId);

// Ignore the observed price if the cache already has newer
// price. This could happen because we are using polling and
// subscription at the same time.
if (
cachedLatestPriceInfo !== undefined &&
cachedLatestPriceInfo.publishTime > observedPrice.publishTime
) {
return;
}

this.latestPriceInfo.set(priceId, observedPrice);
}

getLatestPriceInfo(priceId: string): PriceInfo | undefined {
return this.latestPriceInfo.get(priceId);
}
}

export class InjectivePricePusher implements ChainPricePusher {
Expand Down
63 changes: 60 additions & 3 deletions price_pusher/src/interface.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,71 @@
import { HexString, UnixTimestamp } from "@pythnetwork/pyth-evm-js";
import { HexString, UnixTimestamp } from "@pythnetwork/pyth-common-js";
import { DurationInSeconds } from "./utils";

export type PriceInfo = {
price: string;
conf: string;
publishTime: UnixTimestamp;
};

export interface PriceListener {
export interface IPriceListener {
getLatestPriceInfo(priceId: string): PriceInfo | undefined;
}

export abstract class ChainPriceListener implements IPriceListener {
private latestPriceInfo: Map<HexString, PriceInfo>;

constructor(
private chain: string,
private pollingFrequency: DurationInSeconds,
protected priceIds: HexString[]
) {
this.latestPriceInfo = new Map();
}

async start() {
console.log(`Polling the prices every ${this.pollingFrequency} seconds...`);
setInterval(this.pollPrices.bind(this), this.pollingFrequency * 1000);

await this.pollPrices();
}

private async pollPrices() {
console.log(`Polling ${this.chain} prices...`);
for (const priceId of this.priceIds) {
const currentPriceInfo = await this.getOnChainPriceInfo(priceId);
if (currentPriceInfo !== undefined) {
this.updateLatestPriceInfo(priceId, currentPriceInfo);
}
}
}

protected updateLatestPriceInfo(
priceId: HexString,
observedPrice: PriceInfo
) {
const cachedLatestPriceInfo = this.getLatestPriceInfo(priceId);

// Ignore the observed price if the cache already has newer
// price. This could happen because we are using polling and
// subscription at the same time.
if (
cachedLatestPriceInfo !== undefined &&
cachedLatestPriceInfo.publishTime > observedPrice.publishTime
) {
return;
}

this.latestPriceInfo.set(priceId, observedPrice);
}

// Should return undefined only when the price does not exist.
getLatestPriceInfo(priceId: HexString): undefined | PriceInfo;
getLatestPriceInfo(priceId: string): PriceInfo | undefined {
return this.latestPriceInfo.get(priceId);
}

abstract getOnChainPriceInfo(
priceId: HexString
): Promise<PriceInfo | undefined>;
}

export interface ChainPricePusher {
Expand Down
2 changes: 1 addition & 1 deletion price_pusher/src/price-config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { HexString } from "@pythnetwork/pyth-evm-js";
import { HexString } from "@pythnetwork/pyth-common-js";
import Joi from "joi";
import YAML from "yaml";
import fs from "fs";
Expand Down
11 changes: 7 additions & 4 deletions price_pusher/src/pyth-price-listener.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import { HexString, PriceFeed } from "@pythnetwork/pyth-evm-js";
import { PriceServiceConnection } from "@pythnetwork/pyth-common-js";
import {
PriceServiceConnection,
HexString,
PriceFeed,
} from "@pythnetwork/pyth-common-js";
import { PriceConfig } from "./price-config";
import { PriceInfo, PriceListener } from "./interface";
import { PriceInfo, IPriceListener } from "./interface";

export class PythPriceListener implements PriceListener {
export class PythPriceListener implements IPriceListener {
private connection: PriceServiceConnection;
private priceIds: HexString[];
private priceIdToAlias: Map<HexString, string>;
Expand Down