Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
cb233c6
Added cloud app logs to CLI commands
adam-lightning Jul 12, 2022
fc6d578
Removed overly descriptive comments. Minor PR improvements.
adam-lightning Jul 12, 2022
4048fd5
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 13, 2022
e952ed1
Merge branch 'master' into app-logs-in-cli
adam-lightning Jul 18, 2022
624441c
Merge branch 'app-logs-in-cli' of github.com:Lightning-AI/lightning i…
adam-lightning Jul 18, 2022
86926a1
Merge branch 'master' into app-logs-in-cli
adam-lightning Jul 26, 2022
1c8102f
up
adam-lightning Jul 26, 2022
30c71c4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 26, 2022
4d73bba
update
adam-lightning Jul 27, 2022
b5ad673
Merge branch 'app-logs-in-cli' of github.com:Lightning-AI/lightning i…
adam-lightning Jul 27, 2022
bb6fa12
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 27, 2022
01a2b24
update
adam-lightning Jul 27, 2022
fca96a1
Merge branch 'app-logs-in-cli' of github.com:Lightning-AI/lightning i…
adam-lightning Jul 27, 2022
265a0f6
update tests
adam-lightning Jul 27, 2022
1312101
fix test
adam-lightning Jul 27, 2022
d7b76fc
Merge branch 'master' into app-logs-in-cli
adam-lightning Aug 4, 2022
5701216
Merge branch 'master' into app-logs-in-cli
tchaton Aug 5, 2022
4b01eaa
Update src/lightning_app/cli/lightning_cli.py
adam-lightning Aug 5, 2022
0482951
Update src/lightning_app/cli/lightning_cli.py
adam-lightning Aug 5, 2022
4d36b70
Update src/lightning_app/cli/lightning_cli.py
adam-lightning Aug 5, 2022
ad6ebb4
Update src/lightning_app/cli/lightning_cli.py
adam-lightning Aug 5, 2022
da1b699
up
adam-lightning Aug 5, 2022
cdee98c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 5, 2022
054ebdb
update
tchaton Aug 10, 2022
7e01a75
Merge branch 'master' into app-logs-in-cli
tchaton Aug 10, 2022
8095aa1
update
tchaton Aug 10, 2022
688c370
update
tchaton Aug 10, 2022
6a86216
Merge branch 'master' into app-logs-in-cli
tchaton Aug 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/lightning_app/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Add support to run Lightning apps on Lightning AI BYOC clusters ([#13894](https://github.com/Lightning-AI/lightning/pull/13894))
- Add support for listing Lightning AI apps ([#13987](https://github.com/Lightning-AI/lightning/pull/13987))
- Adds `LightningTrainingComponent`. `LightningTrainingComponent` orchestrates multi-node training in the cloud ([#13830](https://github.com/Lightning-AI/lightning/pull/13830))
- Add support for printing application logs using CLI `lightning show logs <app_name> [components]` ([#13634](https://github.com/Lightning-AI/lightning/pull/13634))


### Changed

Expand Down
89 changes: 89 additions & 0 deletions src/lightning_app/cli/lightning_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

import click
import requests
import rich
from requests.exceptions import ConnectionError
from rich.color import ANSI_COLOR_NAMES

from lightning_app import __version__ as ver
from lightning_app.cli import cmd_init, cmd_install, cmd_pl_init, cmd_react_ui_init
Expand All @@ -18,12 +20,15 @@
from lightning_app.core.constants import get_lightning_cloud_url, LOCAL_LAUNCH_ADMIN_VIEW
from lightning_app.runners.runtime import dispatch
from lightning_app.runners.runtime_type import RuntimeType
from lightning_app.utilities.app_logs import _app_logs_reader
from lightning_app.utilities.cli_helpers import (
_format_input_env_variables,
_retrieve_application_url_and_available_commands,
)
from lightning_app.utilities.cloud import _get_project
from lightning_app.utilities.install_components import register_all_external_components
from lightning_app.utilities.login import Auth
from lightning_app.utilities.network import LightningClient
from lightning_app.utilities.state import headers_for

logger = logging.getLogger(__name__)
Expand All @@ -50,9 +55,93 @@ def main():
@click.version_option(ver)
def _main():
register_all_external_components()


@_main.group()
def show():
"""Show given resource."""
pass


@show.command()
@click.argument("app_name", required=False)
@click.argument("components", nargs=-1, required=False)
@click.option("-f", "--follow", required=False, is_flag=True, help="Wait for new logs, to exit use CTRL+C.")
def logs(app_name: str, components: List[str], follow: bool) -> None:
"""Show cloud application logs. By default prints logs for all currently available components.

Example uses:

Print all application logs:

$ lightning show logs my-application


Print logs only from the flow (no work):

$ lightning show logs my-application flow


Print logs only from selected works:

$ lightning show logs my-application root.work_a root.work_b
"""

client = LightningClient()
project = _get_project(client)

apps = {
app.name: app
for app in client.lightningapp_instance_service_list_lightningapp_instances(project.project_id).lightningapps
}

if not apps:
raise click.ClickException(
"You don't have any application in the cloud. Please, run an application first with `--cloud`."
)

if not app_name:
raise click.ClickException(
f"You have not specified any Lightning App. Please select one of available: [{', '.join(apps.keys())}]"
)

if app_name not in apps:
raise click.ClickException(
f"The Lightning App '{app_name}' does not exist. Please select one of following: [{', '.join(apps.keys())}]"
)

# Fetch all lightning works from given application
# 'Flow' component is somewhat implicit, only one for whole app,
# and not listed in lightningwork API - so we add it directly to the list
works = client.lightningwork_service_list_lightningwork(
project_id=project.project_id, app_id=apps[app_name].id
).lightningworks
app_component_names = ["flow"] + [f.name for f in apps[app_name].spec.flow_servers] + [w.name for w in works]

if not components:
components = app_component_names

for component in components:
if component not in app_component_names:
raise click.ClickException(f"Component '{component}' does not exist in app {app_name}.")

log_reader = _app_logs_reader(
client=client,
project_id=project.project_id,
app_id=apps[app_name].id,
component_names=components,
follow=follow,
)

rich_colors = list(ANSI_COLOR_NAMES)
colors = {c: rich_colors[i + 1] for i, c in enumerate(components)}

for component_name, log_event in log_reader:
date = log_event.timestamp.strftime("%m/%d/%Y %H:%M:%S")
color = colors[component_name]
rich.print(f"[{color}]{component_name}[/{color}] {date} {log_event.message}")


@_main.command()
def login():
"""Log in to your Lightning.ai account."""
Expand Down
2 changes: 1 addition & 1 deletion src/lightning_app/testing/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def fetch_logs() -> str:
)

try:
yield admin_page, view_page, fetch_logs
yield admin_page, view_page, fetch_logs, name
except KeyboardInterrupt:
pass
finally:
Expand Down
125 changes: 125 additions & 0 deletions src/lightning_app/utilities/app_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import json
import queue
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta
from json import JSONDecodeError
from threading import Thread
from typing import Iterator, List, Optional, Tuple

import dateutil.parser
from websocket import WebSocketApp

from lightning_app.utilities.logs_socket_api import _LightningLogsSocketAPI
from lightning_app.utilities.network import LightningClient


@dataclass
class _LogEventLabels:
app: str
container: str
filename: str
job: str
namespace: str
node_name: str
pod: str
stream: Optional[str] = None


@dataclass
class _LogEvent:
message: str
timestamp: datetime
labels: _LogEventLabels


def _push_logevents_to_read_queue_callback(component_name: str, read_queue: queue.PriorityQueue):
"""Pushes _LogEvents from websocket to read_queue.

Returns callback function used with `on_message_callback` of websocket.WebSocketApp.
"""

def callback(ws_app: WebSocketApp, msg: str):
# We strongly trust that the contract on API will hold atm :D
event_dict = json.loads(msg)
labels = _LogEventLabels(**event_dict["labels"])
if "message" in event_dict:
event = _LogEvent(
message=event_dict["message"],
timestamp=dateutil.parser.isoparse(event_dict["timestamp"]),
labels=labels,
)
read_queue.put((event.timestamp, component_name, event))

return callback


def _error_callback(ws_app: WebSocketApp, error: Exception):
errors = {
KeyError: "Malformed log message, missing key",
JSONDecodeError: "Malformed log message",
TypeError: "Malformed log format",
ValueError: "Malformed date format",
}
print(f"Error while reading logs ({errors.get(type(error), 'Unknown')})", file=sys.stderr)
ws_app.close()


def _app_logs_reader(
client: LightningClient, project_id: str, app_id: str, component_names: List[str], follow: bool
) -> Iterator[Tuple[str, _LogEvent]]:

read_queue = queue.PriorityQueue()
logs_api_client = _LightningLogsSocketAPI(client.api_client)

# We will use a socket per component
log_sockets = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably works well for some apps, and is a good start. I just want to point out that an app doesn't need to have a constant amount of "components". These could come and go during runtime of the app. So if you "follow" the logs, that's probably useful to know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is definitely a skateboard version and needs to be improved upon.

logs_api_client.create_lightning_logs_socket(
project_id=project_id,
app_id=app_id,
component=component_name,
on_message_callback=_push_logevents_to_read_queue_callback(component_name, read_queue),
on_error_callback=_error_callback,
)
for component_name in component_names
]

# And each socket on separate thread pushing log event to print queue
# run_forever() will run until we close() the connection from outside
log_threads = [Thread(target=work.run_forever) for work in log_sockets]

# Establish connection and begin pushing logs to the print queue
for th in log_threads:
th.start()

user_log_start = "<<< BEGIN USER_RUN_FLOW SECTION >>>"
start_timestamp = None

# Print logs from queue when log event is available
try:
while True:
_, component_name, log_event = read_queue.get(timeout=None if follow else 1.0)
log_event: _LogEvent

if user_log_start in log_event.message:
start_timestamp = log_event.timestamp + timedelta(seconds=0.5)

if start_timestamp and log_event.timestamp > start_timestamp:
yield component_name, log_event

except queue.Empty:
# Empty is raised by queue.get if timeout is reached. Follow = False case.
pass

except KeyboardInterrupt:
# User pressed CTRL+C to exit, we sould respect that
pass

finally:
# Close connections - it will cause run_forever() to finish -> thread as finishes aswell
for socket in log_sockets:
socket.close()

# Because all socket were closed, we can just wait for threads to finish.
for th in log_threads:
th.join()
95 changes: 95 additions & 0 deletions src/lightning_app/utilities/logs_socket_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from typing import Callable, Optional
from urllib.parse import urlparse

from lightning_cloud.openapi import ApiClient, AuthServiceApi, V1LoginRequest
from websocket import WebSocketApp

from lightning_app.utilities.login import Auth


class _LightningLogsSocketAPI:
def __init__(self, api_client: ApiClient):
self.api_client = api_client
self._auth = Auth()
self._auth.authenticate()
self._auth_service = AuthServiceApi(api_client)

def _get_api_token(self) -> str:
token_resp = self._auth_service.auth_service_login(
body=V1LoginRequest(
username=self._auth.username,
api_key=self._auth.api_key,
)
)
return token_resp.token

@staticmethod
def _socket_url(host: str, project_id: str, app_id: str, token: str, component: str) -> str:
return (
f"wss://{host}/v1/projects/{project_id}/appinstances/{app_id}/logs?"
f"token={token}&component={component}&follow=true"
)

def create_lightning_logs_socket(
self,
project_id: str,
app_id: str,
component: str,
on_message_callback: Callable[[WebSocketApp, str], None],
on_error_callback: Optional[Callable[[Exception, str], None]] = None,
) -> WebSocketApp:
"""Creates and returns WebSocketApp to listen to lightning app logs.

.. code-block:: python
# Synchronous reading, run_forever() is blocking


def print_log_msg(ws_app, msg):
print(msg)


flow_logs_socket = client.create_lightning_logs_socket("project_id", "app_id", "flow", print_log_msg)
flow_socket.run_forever()

.. code-block:: python
# Asynchronous reading (with Threads)


def print_log_msg(ws_app, msg):
print(msg)


flow_logs_socket = client.create_lightning_logs_socket("project_id", "app_id", "flow", print_log_msg)
work_logs_socket = client.create_lightning_logs_socket("project_id", "app_id", "work_1", print_log_msg)

flow_logs_thread = Thread(target=flow_logs_socket.run_forever)
work_logs_thread = Thread(target=work_logs_socket.run_forever)

flow_logs_thread.start()
work_logs_thread.start()
# .......

flow_logs_socket.close()
work_logs_thread.close()

Arguments:
project_id: Project ID.
app_id: Application ID.
component: Component name eg flow.
on_message_callback: Callback object which is called when received data.
on_error_callback: Callback object which is called when we get error.

Returns:
WebSocketApp of the wanted socket
"""
_token = self._get_api_token()
clean_ws_host = urlparse(self.api_client.configuration.host).netloc
socket_url = self._socket_url(
host=clean_ws_host,
project_id=project_id,
app_id=app_id,
token=_token,
component=component,
)

return WebSocketApp(socket_url, on_message=on_message_callback, on_error=on_error_callback)
Loading