Skip to content

Commit ddb476d

Browse files
[App] Application logs in CLI (#13634)
1 parent dc8ff5e commit ddb476d

17 files changed

+398
-4
lines changed

src/lightning_app/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
1313
- Add support to run Lightning apps on Lightning AI BYOC clusters ([#13894](https://github.com/Lightning-AI/lightning/pull/13894))
1414
- Add support for listing Lightning AI apps ([#13987](https://github.com/Lightning-AI/lightning/pull/13987))
1515
- Adds `LightningTrainingComponent`. `LightningTrainingComponent` orchestrates multi-node training in the cloud ([#13830](https://github.com/Lightning-AI/lightning/pull/13830))
16+
- Add support for printing application logs using CLI `lightning show logs <app_name> [components]` ([#13634](https://github.com/Lightning-AI/lightning/pull/13634))
17+
1618

1719
### Changed
1820

src/lightning_app/cli/lightning_cli.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88

99
import click
1010
import requests
11+
import rich
1112
from requests.exceptions import ConnectionError
13+
from rich.color import ANSI_COLOR_NAMES
1214

1315
from lightning_app import __version__ as ver
1416
from lightning_app.cli import cmd_init, cmd_install, cmd_pl_init, cmd_react_ui_init
@@ -18,12 +20,15 @@
1820
from lightning_app.core.constants import get_lightning_cloud_url, LOCAL_LAUNCH_ADMIN_VIEW
1921
from lightning_app.runners.runtime import dispatch
2022
from lightning_app.runners.runtime_type import RuntimeType
23+
from lightning_app.utilities.app_logs import _app_logs_reader
2124
from lightning_app.utilities.cli_helpers import (
2225
_format_input_env_variables,
2326
_retrieve_application_url_and_available_commands,
2427
)
28+
from lightning_app.utilities.cloud import _get_project
2529
from lightning_app.utilities.install_components import register_all_external_components
2630
from lightning_app.utilities.login import Auth
31+
from lightning_app.utilities.network import LightningClient
2732
from lightning_app.utilities.state import headers_for
2833

2934
logger = logging.getLogger(__name__)
@@ -50,9 +55,93 @@ def main():
5055
@click.version_option(ver)
5156
def _main():
5257
register_all_external_components()
58+
59+
60+
@_main.group()
61+
def show():
62+
"""Show given resource."""
5363
pass
5464

5565

66+
@show.command()
67+
@click.argument("app_name", required=False)
68+
@click.argument("components", nargs=-1, required=False)
69+
@click.option("-f", "--follow", required=False, is_flag=True, help="Wait for new logs, to exit use CTRL+C.")
70+
def logs(app_name: str, components: List[str], follow: bool) -> None:
71+
"""Show cloud application logs. By default prints logs for all currently available components.
72+
73+
Example uses:
74+
75+
Print all application logs:
76+
77+
$ lightning show logs my-application
78+
79+
80+
Print logs only from the flow (no work):
81+
82+
$ lightning show logs my-application flow
83+
84+
85+
Print logs only from selected works:
86+
87+
$ lightning show logs my-application root.work_a root.work_b
88+
"""
89+
90+
client = LightningClient()
91+
project = _get_project(client)
92+
93+
apps = {
94+
app.name: app
95+
for app in client.lightningapp_instance_service_list_lightningapp_instances(project.project_id).lightningapps
96+
}
97+
98+
if not apps:
99+
raise click.ClickException(
100+
"You don't have any application in the cloud. Please, run an application first with `--cloud`."
101+
)
102+
103+
if not app_name:
104+
raise click.ClickException(
105+
f"You have not specified any Lightning App. Please select one of available: [{', '.join(apps.keys())}]"
106+
)
107+
108+
if app_name not in apps:
109+
raise click.ClickException(
110+
f"The Lightning App '{app_name}' does not exist. Please select one of following: [{', '.join(apps.keys())}]"
111+
)
112+
113+
# Fetch all lightning works from given application
114+
# 'Flow' component is somewhat implicit, only one for whole app,
115+
# and not listed in lightningwork API - so we add it directly to the list
116+
works = client.lightningwork_service_list_lightningwork(
117+
project_id=project.project_id, app_id=apps[app_name].id
118+
).lightningworks
119+
app_component_names = ["flow"] + [f.name for f in apps[app_name].spec.flow_servers] + [w.name for w in works]
120+
121+
if not components:
122+
components = app_component_names
123+
124+
for component in components:
125+
if component not in app_component_names:
126+
raise click.ClickException(f"Component '{component}' does not exist in app {app_name}.")
127+
128+
log_reader = _app_logs_reader(
129+
client=client,
130+
project_id=project.project_id,
131+
app_id=apps[app_name].id,
132+
component_names=components,
133+
follow=follow,
134+
)
135+
136+
rich_colors = list(ANSI_COLOR_NAMES)
137+
colors = {c: rich_colors[i + 1] for i, c in enumerate(components)}
138+
139+
for component_name, log_event in log_reader:
140+
date = log_event.timestamp.strftime("%m/%d/%Y %H:%M:%S")
141+
color = colors[component_name]
142+
rich.print(f"[{color}]{component_name}[/{color}] {date} {log_event.message}")
143+
144+
56145
@_main.command()
57146
def login():
58147
"""Log in to your Lightning.ai account."""

src/lightning_app/testing/testing.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ def fetch_logs() -> str:
318318
)
319319

320320
try:
321-
yield admin_page, view_page, fetch_logs
321+
yield admin_page, view_page, fetch_logs, name
322322
except KeyboardInterrupt:
323323
pass
324324
finally:
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import json
2+
import queue
3+
import sys
4+
from dataclasses import dataclass
5+
from datetime import datetime, timedelta
6+
from json import JSONDecodeError
7+
from threading import Thread
8+
from typing import Iterator, List, Optional, Tuple
9+
10+
import dateutil.parser
11+
from websocket import WebSocketApp
12+
13+
from lightning_app.utilities.logs_socket_api import _LightningLogsSocketAPI
14+
from lightning_app.utilities.network import LightningClient
15+
16+
17+
@dataclass
18+
class _LogEventLabels:
19+
app: str
20+
container: str
21+
filename: str
22+
job: str
23+
namespace: str
24+
node_name: str
25+
pod: str
26+
stream: Optional[str] = None
27+
28+
29+
@dataclass
30+
class _LogEvent:
31+
message: str
32+
timestamp: datetime
33+
labels: _LogEventLabels
34+
35+
36+
def _push_logevents_to_read_queue_callback(component_name: str, read_queue: queue.PriorityQueue):
37+
"""Pushes _LogEvents from websocket to read_queue.
38+
39+
Returns callback function used with `on_message_callback` of websocket.WebSocketApp.
40+
"""
41+
42+
def callback(ws_app: WebSocketApp, msg: str):
43+
# We strongly trust that the contract on API will hold atm :D
44+
event_dict = json.loads(msg)
45+
labels = _LogEventLabels(**event_dict["labels"])
46+
if "message" in event_dict:
47+
event = _LogEvent(
48+
message=event_dict["message"],
49+
timestamp=dateutil.parser.isoparse(event_dict["timestamp"]),
50+
labels=labels,
51+
)
52+
read_queue.put((event.timestamp, component_name, event))
53+
54+
return callback
55+
56+
57+
def _error_callback(ws_app: WebSocketApp, error: Exception):
58+
errors = {
59+
KeyError: "Malformed log message, missing key",
60+
JSONDecodeError: "Malformed log message",
61+
TypeError: "Malformed log format",
62+
ValueError: "Malformed date format",
63+
}
64+
print(f"Error while reading logs ({errors.get(type(error), 'Unknown')})", file=sys.stderr)
65+
ws_app.close()
66+
67+
68+
def _app_logs_reader(
69+
client: LightningClient, project_id: str, app_id: str, component_names: List[str], follow: bool
70+
) -> Iterator[Tuple[str, _LogEvent]]:
71+
72+
read_queue = queue.PriorityQueue()
73+
logs_api_client = _LightningLogsSocketAPI(client.api_client)
74+
75+
# We will use a socket per component
76+
log_sockets = [
77+
logs_api_client.create_lightning_logs_socket(
78+
project_id=project_id,
79+
app_id=app_id,
80+
component=component_name,
81+
on_message_callback=_push_logevents_to_read_queue_callback(component_name, read_queue),
82+
on_error_callback=_error_callback,
83+
)
84+
for component_name in component_names
85+
]
86+
87+
# And each socket on separate thread pushing log event to print queue
88+
# run_forever() will run until we close() the connection from outside
89+
log_threads = [Thread(target=work.run_forever) for work in log_sockets]
90+
91+
# Establish connection and begin pushing logs to the print queue
92+
for th in log_threads:
93+
th.start()
94+
95+
user_log_start = "<<< BEGIN USER_RUN_FLOW SECTION >>>"
96+
start_timestamp = None
97+
98+
# Print logs from queue when log event is available
99+
try:
100+
while True:
101+
_, component_name, log_event = read_queue.get(timeout=None if follow else 1.0)
102+
log_event: _LogEvent
103+
104+
if user_log_start in log_event.message:
105+
start_timestamp = log_event.timestamp + timedelta(seconds=0.5)
106+
107+
if start_timestamp and log_event.timestamp > start_timestamp:
108+
yield component_name, log_event
109+
110+
except queue.Empty:
111+
# Empty is raised by queue.get if timeout is reached. Follow = False case.
112+
pass
113+
114+
except KeyboardInterrupt:
115+
# User pressed CTRL+C to exit, we sould respect that
116+
pass
117+
118+
finally:
119+
# Close connections - it will cause run_forever() to finish -> thread as finishes aswell
120+
for socket in log_sockets:
121+
socket.close()
122+
123+
# Because all socket were closed, we can just wait for threads to finish.
124+
for th in log_threads:
125+
th.join()
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
from typing import Callable, Optional
2+
from urllib.parse import urlparse
3+
4+
from lightning_cloud.openapi import ApiClient, AuthServiceApi, V1LoginRequest
5+
from websocket import WebSocketApp
6+
7+
from lightning_app.utilities.login import Auth
8+
9+
10+
class _LightningLogsSocketAPI:
11+
def __init__(self, api_client: ApiClient):
12+
self.api_client = api_client
13+
self._auth = Auth()
14+
self._auth.authenticate()
15+
self._auth_service = AuthServiceApi(api_client)
16+
17+
def _get_api_token(self) -> str:
18+
token_resp = self._auth_service.auth_service_login(
19+
body=V1LoginRequest(
20+
username=self._auth.username,
21+
api_key=self._auth.api_key,
22+
)
23+
)
24+
return token_resp.token
25+
26+
@staticmethod
27+
def _socket_url(host: str, project_id: str, app_id: str, token: str, component: str) -> str:
28+
return (
29+
f"wss://{host}/v1/projects/{project_id}/appinstances/{app_id}/logs?"
30+
f"token={token}&component={component}&follow=true"
31+
)
32+
33+
def create_lightning_logs_socket(
34+
self,
35+
project_id: str,
36+
app_id: str,
37+
component: str,
38+
on_message_callback: Callable[[WebSocketApp, str], None],
39+
on_error_callback: Optional[Callable[[Exception, str], None]] = None,
40+
) -> WebSocketApp:
41+
"""Creates and returns WebSocketApp to listen to lightning app logs.
42+
43+
.. code-block:: python
44+
# Synchronous reading, run_forever() is blocking
45+
46+
47+
def print_log_msg(ws_app, msg):
48+
print(msg)
49+
50+
51+
flow_logs_socket = client.create_lightning_logs_socket("project_id", "app_id", "flow", print_log_msg)
52+
flow_socket.run_forever()
53+
54+
.. code-block:: python
55+
# Asynchronous reading (with Threads)
56+
57+
58+
def print_log_msg(ws_app, msg):
59+
print(msg)
60+
61+
62+
flow_logs_socket = client.create_lightning_logs_socket("project_id", "app_id", "flow", print_log_msg)
63+
work_logs_socket = client.create_lightning_logs_socket("project_id", "app_id", "work_1", print_log_msg)
64+
65+
flow_logs_thread = Thread(target=flow_logs_socket.run_forever)
66+
work_logs_thread = Thread(target=work_logs_socket.run_forever)
67+
68+
flow_logs_thread.start()
69+
work_logs_thread.start()
70+
# .......
71+
72+
flow_logs_socket.close()
73+
work_logs_thread.close()
74+
75+
Arguments:
76+
project_id: Project ID.
77+
app_id: Application ID.
78+
component: Component name eg flow.
79+
on_message_callback: Callback object which is called when received data.
80+
on_error_callback: Callback object which is called when we get error.
81+
82+
Returns:
83+
WebSocketApp of the wanted socket
84+
"""
85+
_token = self._get_api_token()
86+
clean_ws_host = urlparse(self.api_client.configuration.host).netloc
87+
socket_url = self._socket_url(
88+
host=clean_ws_host,
89+
project_id=project_id,
90+
app_id=app_id,
91+
token=_token,
92+
component=component,
93+
)
94+
95+
return WebSocketApp(socket_url, on_message=on_message_callback, on_error=on_error_callback)

0 commit comments

Comments
 (0)