Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions examples/audio_to_blog_api/response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from platogram import Content
from pypandoc import convert_text
from fpdf import FPDF


def format_time(ms):
seconds = ms // 1000
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"


def content_to_html(content: Content) -> str:
index = content

html = f"""
<h2>{index.title}</h2>
<h4>{index.summary}</h4>
"""

for paragraph in index.passages:
html += f"<p>{paragraph}</p>"

html += """
<details>
<summary>Expand for transcript</summary>
"""

for chunk in index.transcript:
timestamp = format_time(chunk.time_ms)

html += f"""
<h8>time: {timestamp}</h7>
<h6>{chunk.text}</h6>
"""

html += "</details>"

return html


def extract_html(content: str) -> str:
start = content.find("<")
end = content.rfind(">")
if start == -1 or end == -1:
return content
else:
return content[start : end + 1]


def HTML_to_format(content: str, format_type: str, dest_dir: str) -> str:
dest_file = f"{dest_dir}/blog.{format_type}"
if format_type == "pdf":
pdf = FPDF()
pdf.add_page()
pdf.write_html(content.replace("【", "{").replace("】", "}"))
pdf.output(dest_file)
else:
_ = convert_text(content, format_type, format="html", outputfile=dest_file)
return dest_file
78 changes: 78 additions & 0 deletions examples/audio_to_blog_api/serve.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from fastapi import FastAPI, HTTPException, UploadFile, File, Form
from fastapi.responses import StreamingResponse
import logging
from response import content_to_html, extract_html, HTML_to_format
from synthesize import summarize_audio, prompt_content
from tarfile import TarFile
from tempfile import SpooledTemporaryFile
from typing import List


app = FastAPI()
logger = logging.getLogger(__name__)


def get_src(
urls: str | None = Form(None), files: UploadFile | None = File(None)
) -> List[str | SpooledTemporaryFile]:
sources = []

if files is not None:
file_tar = TarFile(fileobj=files.file, mode="r")
file_members = file_tar.getmembers()
sources = [file_tar.extractfile(member) for member in file_members]
if urls is not None:
sources += [url for url in urls.split(",")]
if sources == []:
raise HTTPException(status_code=400, detail="No URL or file found in request")
return sources


@app.post("/post")
async def generate_post(
url: str | None = Form(None),
file: UploadFile | None = File(None),
prompt: str | None = Form(None),
) -> dict:
try:
sources = get_src(url, file)

if prompt is not None:
content = [await summarize_audio(src) for src in sources]

response = await prompt_content(content, prompt)

prompt = f"return only html to acuratelly represent a blog post on the following text:\n{response}"
response = await prompt_content(content, prompt)

html_content = extract_html(response)
else:
src = sources[0]
content = await summarize_audio(src)

html_content = content_to_html(content)

return {"html": html_content}
except Exception as e:
logger.error(e)
raise HTTPException(status_code=500, detail=str(e))


@app.post("/post/convertHTML")
async def convert_HTML(
content: str = Form(...), dest_type: str = Form(...)
) -> StreamingResponse:
async def result_generator():
converted_result = HTML_to_format(content, dest_type, ".platogram-cache")
with open(converted_result, "rb") as fd:
while True:
data = fd.read(1024)
if not data:
break
yield data

try:
return StreamingResponse(result_generator())
except Exception as e:
logger.error(e)
raise HTTPException(status_code=500, detail=str(e))
90 changes: 90 additions & 0 deletions examples/audio_to_blog_api/synthesize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from hashlib import sha256
from pathlib import Path
import platogram as plato
from platogram.types import Content
import os
from tarfile import ExFileObject
from tempfile import TemporaryDirectory, SpooledTemporaryFile
from typing import Literal
from urllib.parse import urlparse


CACHE_DIR = Path("./.platogram-cache")
# Handle cache rotation


def make_file_name(src: str | SpooledTemporaryFile | ExFileObject) -> str:
if not isinstance(src, str):
src = src.read()
else:
src = src.encode()
hash = sha256(src).hexdigest()
return hash


def is_uri(src: str) -> bool:
try:
result = urlparse(src)
return all([result.scheme, result.netloc, result.path])
except Exception as _:
return False


async def get_audio_url(
src: str | SpooledTemporaryFile | ExFileObject, temp_dir: str | None
) -> str:
if isinstance(src, str) and is_uri(src):
return src
else:
dest_file = f"{temp_dir}/{sha256(src.read()).hexdigest()}"
src.seek(0)
with open(dest_file, "wb") as content:
content.write(src.read())
return f"file://{os.path.abspath(dest_file)}"


async def summarize_audio(
src: str | SpooledTemporaryFile,
anthropic_model: str = "claude-3-5-sonnet",
assembly_ai_model: str = "best",
) -> Content:
llm = plato.llm.get_model(
f"anthropic/{anthropic_model}", os.environ["ANTHROPIC_API_KEY"]
)
asr = plato.asr.get_model(
f"assembly-ai/{assembly_ai_model}", os.environ["ASSEMBLYAI_API_KEY"]
)

CACHE_DIR.mkdir(exist_ok=True)

cache_file = CACHE_DIR / f"{make_file_name(src)}.json"

if cache_file.exists():
content = Content.model_validate_json(open(cache_file).read())
else:
with TemporaryDirectory() as temp_dir:
url = await get_audio_url(src, temp_dir)
transcript = plato.extract_transcript(url, asr)

content = plato.index(transcript, llm)
open(cache_file, "w").write(content.model_dump_json(indent=2))

return content


async def prompt_content(
content: list[Content],
prompt: str,
anthropic_model: str = "claude-3-5-sonnet",
context_size: Literal["small", "medium", "large"] = "small",
) -> str:
llm = plato.llm.get_model(
f"anthropic/{anthropic_model}", os.environ["ANTHROPIC_API_KEY"]
)

response = llm.prompt(
prompt=prompt,
context=content,
context_size=context_size,
)
return response