From f77a71d58346587b2f6038cc2c0cdfcdf310d683 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Mon, 6 May 2024 18:29:17 +0100 Subject: [PATCH 1/5] add telegram integration --- .gitignore | 4 +++- poetry.lock | 16 +++++++++++++++- pyproject.toml | 1 + pyth_observer/__init__.py | 5 +++-- pyth_observer/cli.py | 15 +++++++++++++-- pyth_observer/dispatch.py | 6 +++++- pyth_observer/event.py | 39 +++++++++++++++++++++++++++++++++++++++ sample.config.yaml | 1 + sample.telegram.yaml | 4 ++++ 9 files changed, 84 insertions(+), 7 deletions(-) create mode 100644 sample.telegram.yaml diff --git a/.gitignore b/.gitignore index 54949b9..86a5a17 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,6 @@ __pycache__/ .DS_Store .envrc -.coverage \ No newline at end of file +.coverage + +.env \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index 8016880..d74e01e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1563,6 +1563,20 @@ files = [ [package.dependencies] six = ">=1.5" +[[package]] +name = "python-dotenv" +version = "1.0.1" +description = "Read key-value pairs from a .env file and set them as environment variables" +optional = false +python-versions = ">=3.8" +files = [ + {file = "python-dotenv-1.0.1.tar.gz", hash = "sha256:e324ee90a023d808f1959c46bcbc04446a10ced277783dc6ee09987c37ec10ca"}, + {file = "python_dotenv-1.0.1-py3-none-any.whl", hash = "sha256:f7b63ef50f1b690dddf550d03497b66d609393b40b564ed0d674909a68ebf16a"}, +] + +[package.extras] +cli = ["click (>=5.0)"] + [[package]] name = "pytz" version = "2024.1" @@ -2024,4 +2038,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "b331982b4733cd9d3da39fc1fffdfab4999a79db63b39ca0ee837d899b0d6e9f" +content-hash = "68214551a9dc797c9890b689836f2ecdeea8536596bdbfc467b3b08fb404cca0" diff --git a/pyproject.toml b/pyproject.toml index 606476d..be8c4b8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ pyyaml = "^6.0" throttler = "1.2.1" types-pyyaml = "^6.0.12" types-pytz = "^2022.4.0.0" +python-dotenv = "^1.0.1" [tool.poetry.group.dev.dependencies] diff --git a/pyth_observer/__init__.py b/pyth_observer/__init__.py index c7048ab..1a30fac 100644 --- a/pyth_observer/__init__.py +++ b/pyth_observer/__init__.py @@ -1,6 +1,6 @@ import asyncio import os -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Optional, Tuple from base58 import b58decode from loguru import logger @@ -51,9 +51,10 @@ def __init__( config: Dict[str, Any], publishers: Dict[str, str], coingecko_mapping: Dict[str, Symbol], + telegram_mapping: Optional[Dict[str, str]] = None, ): self.config = config - self.dispatch = Dispatch(config, publishers) + self.dispatch = Dispatch(config, publishers, telegram_mapping=telegram_mapping) self.publishers = publishers self.pyth_client = PythClient( solana_endpoint=config["network"]["http_endpoint"], diff --git a/pyth_observer/cli.py b/pyth_observer/cli.py index 9cb6257..79b7be1 100644 --- a/pyth_observer/cli.py +++ b/pyth_observer/cli.py @@ -29,18 +29,29 @@ envvar="COINGECKO_MAPPING", required=True, ) +@click.option( + "--telegram-mapping", + help="Path to YAML/JSON file with publisher key-Telegram chat ID mappings", + envvar="TELEGRAM_MAPPING", + required=False, +) @click.option( "--prometheus-port", help="Port number for Prometheus metrics endpoint", envvar="PROMETHEUS_PORT", default="9001", ) -def run(config, publishers, coingecko_mapping, prometheus_port): +def run(config, publishers, coingecko_mapping, telegram_mapping, prometheus_port): config_ = yaml.safe_load(open(config, "r")) publishers_ = yaml.safe_load(open(publishers, "r")) publishers_inverted = {v: k for k, v in publishers_.items()} coingecko_mapping_ = yaml.safe_load(open(coingecko_mapping, "r")) - observer = Observer(config_, publishers_inverted, coingecko_mapping_) + telegram_mapping_ = ( + yaml.safe_load(open(telegram_mapping, "r")) if telegram_mapping else {} + ) + observer = Observer( + config_, publishers_inverted, coingecko_mapping_, telegram_mapping_ + ) start_http_server(int(prometheus_port)) diff --git a/pyth_observer/dispatch.py b/pyth_observer/dispatch.py index 21224e6..9383dbe 100644 --- a/pyth_observer/dispatch.py +++ b/pyth_observer/dispatch.py @@ -9,10 +9,12 @@ from pyth_observer.check.publisher import PUBLISHER_CHECKS, PublisherState from pyth_observer.event import DatadogEvent # Used dynamically from pyth_observer.event import LogEvent # Used dynamically +from pyth_observer.event import TelegramEvent # Used dynamically from pyth_observer.event import Event assert DatadogEvent assert LogEvent +assert TelegramEvent class Dispatch: @@ -21,9 +23,10 @@ class Dispatch: notifiers for the checks that failed. """ - def __init__(self, config, publishers): + def __init__(self, config, publishers, telegram_mapping=None): self.config = config self.publishers = publishers + self.telegram_mapping = telegram_mapping self.price_feed_check_gauge = Gauge( "price_feed_check_failed", "Price feed check failure status", @@ -52,6 +55,7 @@ async def run(self, states: List[State]): context = { "network": self.config["network"]["name"], "publishers": self.publishers, + "telegram_mapping": self.telegram_mapping or {}, } for check in failed_checks: diff --git a/pyth_observer/event.py b/pyth_observer/event.py index 2e49229..a630247 100644 --- a/pyth_observer/event.py +++ b/pyth_observer/event.py @@ -1,20 +1,25 @@ import os from typing import Dict, Literal, Protocol, TypedDict, cast +import aiohttp from datadog_api_client.api_client import AsyncApiClient as DatadogAPI from datadog_api_client.configuration import Configuration as DatadogConfig from datadog_api_client.v1.api.events_api import EventsApi as DatadogEventAPI from datadog_api_client.v1.model.event_alert_type import EventAlertType from datadog_api_client.v1.model.event_create_request import EventCreateRequest +from dotenv import load_dotenv from loguru import logger from pyth_observer.check import Check from pyth_observer.check.publisher import PublisherCheck +load_dotenv() + class Context(TypedDict): network: str publishers: Dict[str, str] + telegram_mapping: Dict[str, str] class Event(Protocol): @@ -94,3 +99,37 @@ async def send(self): level = cast(LogEventLevel, os.environ.get("LOG_EVENT_LEVEL", "INFO")) logger.log(level, text.replace("\n", ". ")) + + +class TelegramEvent(Event): + def __init__(self, check: Check, context: Context): + self.check = check + self.context = context + + async def send(self): + text = self.check.error_message() + # Extract the publisher key from the message text + publisher_key = text[text.find("(") + 1 : text.find(")")] + # Retrieve the chat ID from the telegram_mapping using the publisher key + chat_id = self.context["telegram_mapping"].get(publisher_key, None) + + if chat_id is None: + logger.warning( + f"Telegram chat ID not found for publisher key {publisher_key}" + ) + return + + telegram_api_url = f"https://api.telegram.org/bot{os.environ['TELEGRAM_BOT_TOKEN']}/sendMessage" + message_data = { + "chat_id": chat_id, + "text": text, + "parse_mode": "Markdown", + } + + async with aiohttp.ClientSession() as session: + async with session.post(telegram_api_url, json=message_data) as response: + if response.status != 200: + response_text = await response.text() + raise RuntimeError( + f"Failed to send Telegram message: {response_text}" + ) diff --git a/sample.config.yaml b/sample.config.yaml index 94de198..c0c3dd3 100644 --- a/sample.config.yaml +++ b/sample.config.yaml @@ -10,6 +10,7 @@ events: # NOTE: Uncomment to enable Datadog metrics, see README.md for datadog credential docs. # - DatadogEvent - LogEvent + - TelegramEvent checks: global: # Price feed checks diff --git a/sample.telegram.yaml b/sample.telegram.yaml new file mode 100644 index 0000000..bead637 --- /dev/null +++ b/sample.telegram.yaml @@ -0,0 +1,4 @@ +{ + "pyq7ySiH5RvKteu2vdXKC7SNyNDp9vNDkGXdHxSpPtu": "-1111111111", + "G41cdkE63eCTE2KPtFLN9AVNGkVEdnhvSDtS9Ab5XK6G": "-2222222222", +} From fc153597298e8f11445523d4b46fad360450ff18 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Tue, 7 May 2024 14:19:10 +0100 Subject: [PATCH 2/5] address comments --- pyth_observer/__init__.py | 11 ++++++----- pyth_observer/cli.py | 24 ++++++++++-------------- pyth_observer/dispatch.py | 4 +--- pyth_observer/event.py | 13 +++++++++---- pyth_observer/models.py | 16 ++++++++++++++++ sample.publishers.yaml | 19 +++++++++++++++---- sample.telegram.yaml | 4 ---- 7 files changed, 57 insertions(+), 34 deletions(-) create mode 100644 pyth_observer/models.py delete mode 100644 sample.telegram.yaml diff --git a/pyth_observer/__init__.py b/pyth_observer/__init__.py index 1a30fac..c7edfb6 100644 --- a/pyth_observer/__init__.py +++ b/pyth_observer/__init__.py @@ -1,6 +1,6 @@ import asyncio import os -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Tuple from base58 import b58decode from loguru import logger @@ -22,6 +22,7 @@ from pyth_observer.crosschain import CrosschainPrice from pyth_observer.crosschain import CrosschainPriceObserver as Crosschain from pyth_observer.dispatch import Dispatch +from pyth_observer.models import Publisher PYTHTEST_HTTP_ENDPOINT = "https://api.pythtest.pyth.network/" PYTHTEST_WS_ENDPOINT = "wss://api.pythtest.pyth.network/" @@ -49,12 +50,11 @@ class Observer: def __init__( self, config: Dict[str, Any], - publishers: Dict[str, str], + publishers: Dict[str, Publisher], coingecko_mapping: Dict[str, Symbol], - telegram_mapping: Optional[Dict[str, str]] = None, ): self.config = config - self.dispatch = Dispatch(config, publishers, telegram_mapping=telegram_mapping) + self.dispatch = Dispatch(config, publishers) self.publishers = publishers self.pyth_client = PythClient( solana_endpoint=config["network"]["http_endpoint"], @@ -135,8 +135,9 @@ async def run(self): ) for component in price_account.price_components: + pub = self.publishers.get(component.publisher_key.key, None) publisher_name = ( - self.publishers.get(component.publisher_key.key, "") + (pub.name if pub else "") + f" ({component.publisher_key.key})" ).strip() states.append( diff --git a/pyth_observer/cli.py b/pyth_observer/cli.py index 79b7be1..d559cf3 100644 --- a/pyth_observer/cli.py +++ b/pyth_observer/cli.py @@ -7,7 +7,7 @@ from loguru import logger from prometheus_client import start_http_server -from pyth_observer import Observer +from pyth_observer import Observer, Publisher @click.command() @@ -29,28 +29,24 @@ envvar="COINGECKO_MAPPING", required=True, ) -@click.option( - "--telegram-mapping", - help="Path to YAML/JSON file with publisher key-Telegram chat ID mappings", - envvar="TELEGRAM_MAPPING", - required=False, -) @click.option( "--prometheus-port", help="Port number for Prometheus metrics endpoint", envvar="PROMETHEUS_PORT", default="9001", ) -def run(config, publishers, coingecko_mapping, telegram_mapping, prometheus_port): +def run(config, publishers, coingecko_mapping, prometheus_port): config_ = yaml.safe_load(open(config, "r")) - publishers_ = yaml.safe_load(open(publishers, "r")) - publishers_inverted = {v: k for k, v in publishers_.items()} + # Load publishers YAML file and convert to dictionary of Publisher instances + publishers_raw = yaml.safe_load(open(publishers, "r")) + publishers_ = { + publisher["key"]: Publisher(**publisher) for publisher in publishers_raw + } coingecko_mapping_ = yaml.safe_load(open(coingecko_mapping, "r")) - telegram_mapping_ = ( - yaml.safe_load(open(telegram_mapping, "r")) if telegram_mapping else {} - ) observer = Observer( - config_, publishers_inverted, coingecko_mapping_, telegram_mapping_ + config_, + publishers_, + coingecko_mapping_, ) start_http_server(int(prometheus_port)) diff --git a/pyth_observer/dispatch.py b/pyth_observer/dispatch.py index 9383dbe..afae8e8 100644 --- a/pyth_observer/dispatch.py +++ b/pyth_observer/dispatch.py @@ -23,10 +23,9 @@ class Dispatch: notifiers for the checks that failed. """ - def __init__(self, config, publishers, telegram_mapping=None): + def __init__(self, config, publishers): self.config = config self.publishers = publishers - self.telegram_mapping = telegram_mapping self.price_feed_check_gauge = Gauge( "price_feed_check_failed", "Price feed check failure status", @@ -55,7 +54,6 @@ async def run(self, states: List[State]): context = { "network": self.config["network"]["name"], "publishers": self.publishers, - "telegram_mapping": self.telegram_mapping or {}, } for check in failed_checks: diff --git a/pyth_observer/event.py b/pyth_observer/event.py index a630247..5e9567a 100644 --- a/pyth_observer/event.py +++ b/pyth_observer/event.py @@ -12,14 +12,14 @@ from pyth_observer.check import Check from pyth_observer.check.publisher import PublisherCheck +from pyth_observer.models import Publisher load_dotenv() class Context(TypedDict): network: str - publishers: Dict[str, str] - telegram_mapping: Dict[str, str] + publishers: Dict[str, Publisher] class Event(Protocol): @@ -110,8 +110,13 @@ async def send(self): text = self.check.error_message() # Extract the publisher key from the message text publisher_key = text[text.find("(") + 1 : text.find(")")] - # Retrieve the chat ID from the telegram_mapping using the publisher key - chat_id = self.context["telegram_mapping"].get(publisher_key, None) + publisher = self.context["publishers"].get(publisher_key, None) + # Ensure publisher is not None and has contact_info before accessing telegram_chat_id + chat_id = ( + publisher.contact_info.telegram_chat_id + if publisher is not None and publisher.contact_info is not None + else None + ) if chat_id is None: logger.warning( diff --git a/pyth_observer/models.py b/pyth_observer/models.py new file mode 100644 index 0000000..778fefb --- /dev/null +++ b/pyth_observer/models.py @@ -0,0 +1,16 @@ +import dataclasses +from typing import Optional + + +@dataclasses.dataclass +class ContactInfo: + telegram_chat_id: Optional[str] = None + email: Optional[str] = None + slack_channel_id: Optional[str] = None + + +@dataclasses.dataclass +class Publisher: + key: str + name: str + contact_info: Optional[ContactInfo] = None diff --git a/sample.publishers.yaml b/sample.publishers.yaml index b186cf9..0e6e41d 100644 --- a/sample.publishers.yaml +++ b/sample.publishers.yaml @@ -1,4 +1,15 @@ -{ - "publisher1": "66wJmrBqyykL7m4Erj4Ud29qhsm32DHSTo23zooupJrJ", - "publisher2": "3BkoB5MBSrrnDY7qe694UAuPpeMg7zJnodwbCnayNYzC", -} +- name: publisher1 + key: "66wJmrBqyykL7m4Erj4Ud29qhsm32DHSTo23zooupJrJ" + contact_info: + # Optional fields for contact information + telegram_chat_id: + email: + slack_channel_id: + +- name: publisher2 + key: "3BkoB5MBSrrnDY7qe694UAuPpeMg7zJnodwbCnayNYzC" + contact_info: + # Optional fields for contact information + telegram_chat_id: + email: + slack_channel_id: diff --git a/sample.telegram.yaml b/sample.telegram.yaml deleted file mode 100644 index bead637..0000000 --- a/sample.telegram.yaml +++ /dev/null @@ -1,4 +0,0 @@ -{ - "pyq7ySiH5RvKteu2vdXKC7SNyNDp9vNDkGXdHxSpPtu": "-1111111111", - "G41cdkE63eCTE2KPtFLN9AVNGkVEdnhvSDtS9Ab5XK6G": "-2222222222", -} From 2e749bc1976b9fd8d510912e3e05bbda47fad877 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Tue, 7 May 2024 16:18:06 +0100 Subject: [PATCH 3/5] address comments --- pyth_observer/cli.py | 12 +++++++++++- pyth_observer/event.py | 5 ++--- sample.publishers.yaml | 8 ++++---- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/pyth_observer/cli.py b/pyth_observer/cli.py index d559cf3..1df6aed 100644 --- a/pyth_observer/cli.py +++ b/pyth_observer/cli.py @@ -8,6 +8,7 @@ from prometheus_client import start_http_server from pyth_observer import Observer, Publisher +from pyth_observer.models import ContactInfo @click.command() @@ -40,7 +41,16 @@ def run(config, publishers, coingecko_mapping, prometheus_port): # Load publishers YAML file and convert to dictionary of Publisher instances publishers_raw = yaml.safe_load(open(publishers, "r")) publishers_ = { - publisher["key"]: Publisher(**publisher) for publisher in publishers_raw + publisher["key"]: Publisher( + key=publisher["key"], + name=publisher["name"], + contact_info=( + ContactInfo(**publisher["contact_info"]) + if "contact_info" in publisher + else None + ), + ) + for publisher in publishers_raw } coingecko_mapping_ = yaml.safe_load(open(coingecko_mapping, "r")) observer = Observer( diff --git a/pyth_observer/event.py b/pyth_observer/event.py index 5e9567a..9759ba6 100644 --- a/pyth_observer/event.py +++ b/pyth_observer/event.py @@ -102,14 +102,13 @@ async def send(self): class TelegramEvent(Event): - def __init__(self, check: Check, context: Context): + def __init__(self, check: PublisherCheck, context: Context): self.check = check self.context = context async def send(self): text = self.check.error_message() - # Extract the publisher key from the message text - publisher_key = text[text.find("(") + 1 : text.find(")")] + publisher_key = self.check.state().public_key.key publisher = self.context["publishers"].get(publisher_key, None) # Ensure publisher is not None and has contact_info before accessing telegram_chat_id chat_id = ( diff --git a/sample.publishers.yaml b/sample.publishers.yaml index 0e6e41d..5608d03 100644 --- a/sample.publishers.yaml +++ b/sample.publishers.yaml @@ -1,15 +1,15 @@ - name: publisher1 - key: "66wJmrBqyykL7m4Erj4Ud29qhsm32DHSTo23zooupJrJ" + key: "FR19oB2ePko2haah8yP4fhTycxitxkVQTxk3tssxX1Ce" contact_info: # Optional fields for contact information - telegram_chat_id: + telegram_chat_id: -4224704640 email: slack_channel_id: - name: publisher2 - key: "3BkoB5MBSrrnDY7qe694UAuPpeMg7zJnodwbCnayNYzC" + key: "DgAK7fPveidN72LCwCF4QjFcYHchBZbtZnjEAtgU1bMX" contact_info: # Optional fields for contact information - telegram_chat_id: + telegram_chat_id: -4224704640 email: slack_channel_id: From 89fbb1025bf3b069c784e3ad3f26af3c7c44ad66 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Tue, 7 May 2024 17:11:22 +0100 Subject: [PATCH 4/5] fix check --- pyth_observer/event.py | 63 ++++++++++++++++++++++-------------------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/pyth_observer/event.py b/pyth_observer/event.py index 9759ba6..b1aa3ec 100644 --- a/pyth_observer/event.py +++ b/pyth_observer/event.py @@ -102,38 +102,41 @@ async def send(self): class TelegramEvent(Event): - def __init__(self, check: PublisherCheck, context: Context): + def __init__(self, check: Check, context: Context): self.check = check self.context = context async def send(self): - text = self.check.error_message() - publisher_key = self.check.state().public_key.key - publisher = self.context["publishers"].get(publisher_key, None) - # Ensure publisher is not None and has contact_info before accessing telegram_chat_id - chat_id = ( - publisher.contact_info.telegram_chat_id - if publisher is not None and publisher.contact_info is not None - else None - ) - - if chat_id is None: - logger.warning( - f"Telegram chat ID not found for publisher key {publisher_key}" + if self.check.__class__.__bases__ == (PublisherCheck,): + text = self.check.error_message() + publisher_key = self.check.state().public_key.key + publisher = self.context["publishers"].get(publisher_key, None) + # Ensure publisher is not None and has contact_info before accessing telegram_chat_id + chat_id = ( + publisher.contact_info.telegram_chat_id + if publisher is not None and publisher.contact_info is not None + else None ) - return - - telegram_api_url = f"https://api.telegram.org/bot{os.environ['TELEGRAM_BOT_TOKEN']}/sendMessage" - message_data = { - "chat_id": chat_id, - "text": text, - "parse_mode": "Markdown", - } - - async with aiohttp.ClientSession() as session: - async with session.post(telegram_api_url, json=message_data) as response: - if response.status != 200: - response_text = await response.text() - raise RuntimeError( - f"Failed to send Telegram message: {response_text}" - ) + + if chat_id is None: + logger.warning( + f"Telegram chat ID not found for publisher key {publisher_key}" + ) + return + + telegram_api_url = f"https://api.telegram.org/bot{os.environ['TELEGRAM_BOT_TOKEN']}/sendMessage" + message_data = { + "chat_id": chat_id, + "text": text, + "parse_mode": "Markdown", + } + + async with aiohttp.ClientSession() as session: + async with session.post( + telegram_api_url, json=message_data + ) as response: + if response.status != 200: + response_text = await response.text() + logger.error( + f"Failed to send Telegram message: {response_text}" + ) From f6c9f1600dce170bce6daf9d19cfd0130028eb0e Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Thu, 9 May 2024 09:31:10 +0100 Subject: [PATCH 5/5] address comments --- README.md | 15 +++++++++++++++ pyth_observer/event.py | 5 ++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index a4add22..b59d761 100644 --- a/README.md +++ b/README.md @@ -30,3 +30,18 @@ Event types are configured via environment variables: - `LogEvent` - `LOG_EVENT_LEVEL` - Level to log messages at + +- `TelegramEvent` + - `TELEGRAM_BOT_TOKEN` - API token for the Telegram bot + +## Finding the Telegram Group Chat ID + +To integrate Telegram events with the Observer, you need the Telegram group chat ID. Here's how you can find it: + +1. Open [Telegram Web](https://web.telegram.org). +2. Navigate to the group chat for which you need the ID. +3. Look at the URL in the browser's address bar; it should look something like `https://web.telegram.org/a/#-1111111111`. +4. The group chat ID is the number in the URL, including the `-` sign if present (e.g., `-1111111111`). + +Use this ID in the `publishers.yaml` configuration to correctly set up Telegram events. + diff --git a/pyth_observer/event.py b/pyth_observer/event.py index b1aa3ec..0cb939a 100644 --- a/pyth_observer/event.py +++ b/pyth_observer/event.py @@ -105,6 +105,7 @@ class TelegramEvent(Event): def __init__(self, check: Check, context: Context): self.check = check self.context = context + self.telegram_bot_token = os.environ["TELEGRAM_BOT_TOKEN"] async def send(self): if self.check.__class__.__bases__ == (PublisherCheck,): @@ -124,7 +125,9 @@ async def send(self): ) return - telegram_api_url = f"https://api.telegram.org/bot{os.environ['TELEGRAM_BOT_TOKEN']}/sendMessage" + telegram_api_url = ( + f"https://api.telegram.org/bot{self.telegram_bot_token}/sendMessage" + ) message_data = { "chat_id": chat_id, "text": text,