From b41ff7d8a3789fc0aa4e8691d8bbc38f7abda55b Mon Sep 17 00:00:00 2001 From: David Thorpe Date: Fri, 7 Feb 2025 19:36:16 +0100 Subject: [PATCH] Added telegram as a test --- cmd/llm/chat2.go | 160 +++++++++++++++++++++++++++++++++++ cmd/llm/main.go | 1 + context.go | 9 +- pkg/internal/impl/session.go | 12 +++ pkg/ui/telegram/opt.go | 50 +++++++++++ pkg/ui/telegram/telegram.go | 131 +++++++++++++++++++++++++--- 6 files changed, 350 insertions(+), 13 deletions(-) create mode 100644 cmd/llm/chat2.go create mode 100644 pkg/ui/telegram/opt.go diff --git a/cmd/llm/chat2.go b/cmd/llm/chat2.go new file mode 100644 index 0000000..641e7e4 --- /dev/null +++ b/cmd/llm/chat2.go @@ -0,0 +1,160 @@ +package main + +import ( + "context" + "errors" + "log" + "sync" + "time" + + // Packages + llm "github.com/mutablelogic/go-llm" + telegram "github.com/mutablelogic/go-llm/pkg/ui/telegram" +) + +//////////////////////////////////////////////////////////////////////////////// +// TYPES + +type Chat2Cmd struct { + Model string `arg:"" help:"Model name"` + Token string `env:"TELEGRAM_TOKEN" help:"Telegram token" required:""` +} + +type Server struct { + sync.RWMutex + *telegram.Client + + // Model and toolkit + toolkit llm.ToolKit + model llm.Model + + // Map of active sessions + sessions map[string]llm.Context +} + +//////////////////////////////////////////////////////////////////////////////// +// LIFECYCLE + +func NewTelegramServer(token string, model llm.Model, toolkit llm.ToolKit, opts ...telegram.Opt) (*Server, error) { + server := new(Server) + server.sessions = make(map[string]llm.Context) + server.model = model + server.toolkit = toolkit + + // Create a new telegram client + opts = append(opts, telegram.WithCallback(server.receive)) + if telegram, err := telegram.New(token, opts...); err != nil { + return nil, err + } else { + server.Client = telegram + } + + // Return success + return server, nil +} + +//////////////////////////////////////////////////////////////////////////////// +// PUBLIC METHODS + +func (cmd *Chat2Cmd) Run(globals *Globals) error { + return run(globals, cmd.Model, func(ctx context.Context, model llm.Model) error { + server, err := NewTelegramServer(cmd.Token, model, globals.toolkit, telegram.WithDebug(globals.Debug)) + if err != nil { + return err + } + + log.Printf("Running Telegram bot %q\n", server.Client.Name()) + + var result error + var wg sync.WaitGroup + wg.Add(2) + go func(ctx context.Context) { + defer wg.Done() + if err := server.Run(ctx); err != nil { + result = errors.Join(result, err) + } + }(ctx) + go func(ctx context.Context) { + defer wg.Done() + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + server.Purge() + } + } + }(ctx) + + // Wait for completion + wg.Wait() + + // Return any errors + return result + }) +} + +// ////////////////////////////////////////////////////////////////////////////// +// PRIVATE METHODS + +func (telegram *Server) Purge() { + telegram.Lock() + defer telegram.Unlock() + for user, session := range telegram.sessions { + if session.SinceLast() > 10*time.Minute { + log.Printf("Purging session for %q\n", user) + delete(telegram.sessions, user) + } + } +} + +func (telegram *Server) session(user string) llm.Context { + telegram.Lock() + defer telegram.Unlock() + if session, exists := telegram.sessions[user]; exists { + return session + } + session := telegram.model.Context( + llm.WithToolKit(telegram.toolkit), + llm.WithSystemPrompt("Please reply to messages in markdown format."), + ) + telegram.sessions[user] = session + return session +} + +func (telegram *Server) receive(ctx context.Context, msg telegram.Message) error { + // Get an active session + session := telegram.session(msg.Sender()) + + // Process the message + text := msg.Text() + text += "\n\nPlease reply in markdown format." + if err := session.FromUser(ctx, text); err != nil { + return err + } + + // Run tool calls + for { + calls := session.ToolCalls(0) + if len(calls) == 0 { + break + } + if text := session.Text(0); text != "" { + msg.Reply(ctx, text, false) + } else { + msg.Reply(ctx, "_Gathering information_", true) + } + + results, err := telegram.toolkit.Run(ctx, calls...) + if err != nil { + return err + } else if err := session.FromTool(ctx, results...); err != nil { + return err + } + } + + // Reply with the text + return msg.Reply(ctx, session.Text(0), true) +} diff --git a/cmd/llm/main.go b/cmd/llm/main.go index ce6b532..777d668 100644 --- a/cmd/llm/main.go +++ b/cmd/llm/main.go @@ -78,6 +78,7 @@ type CLI struct { // Commands Download DownloadModelCmd `cmd:"" help:"Download a model"` Chat ChatCmd `cmd:"" help:"Start a chat session"` + Chat2 Chat2Cmd `cmd:"" help:"Start a chat session (2)"` Complete CompleteCmd `cmd:"" help:"Complete a prompt"` Embedding EmbeddingCmd `cmd:"" help:"Generate an embedding"` Version VersionCmd `cmd:"" help:"Print the version of this tool"` diff --git a/context.go b/context.go index db5dd12..8f11999 100644 --- a/context.go +++ b/context.go @@ -1,6 +1,9 @@ package llm -import "context" +import ( + "context" + "time" +) ////////////////////////////////////////////////////////////////// // TYPES @@ -44,4 +47,8 @@ type Context interface { // Generate a response from a tool, passing the results // from the tool call FromTool(context.Context, ...ToolResult) error + + // Return the duration since the last completion was made + // or zero + SinceLast() time.Duration } diff --git a/pkg/internal/impl/session.go b/pkg/internal/impl/session.go index d56846e..af595a3 100644 --- a/pkg/internal/impl/session.go +++ b/pkg/internal/impl/session.go @@ -3,6 +3,7 @@ package impl import ( "context" "encoding/json" + "time" // Packages "github.com/mutablelogic/go-llm" @@ -37,6 +38,7 @@ type session struct { opts []llm.Opt // Options to apply to the session seq []llm.Completion // Sequence of messages factory MessageFactory // Factory for generating messages + last time.Time // Last completion time } var _ llm.Context = (*session)(nil) @@ -129,10 +131,20 @@ func (session *session) chat(ctx context.Context, messages ...llm.Completion) er // Append the first choice session.Append(completion.Choice(0)) + // Update the last completion time + session.last = time.Now() + // Success return nil } +func (session *session) SinceLast() time.Duration { + if len(session.seq) == 0 || session.last.IsZero() { + return 0 + } + return time.Since(session.last) +} + /////////////////////////////////////////////////////////////////////////////// // PUBLIC METHODS - COMPLETION diff --git a/pkg/ui/telegram/opt.go b/pkg/ui/telegram/opt.go new file mode 100644 index 0000000..abd0569 --- /dev/null +++ b/pkg/ui/telegram/opt.go @@ -0,0 +1,50 @@ +package telegram + +import "context" + +/////////////////////////////////////////////////////////////////////////////// +// TYPES + +// A generic option type, which can set options on an agent or session +type Opt func(*opts) error + +// set of options +type opts struct { + token string + callback CallbackFunc + debug bool +} + +type CallbackFunc func(context.Context, Message) error + +//////////////////////////////////////////////////////////////////////////////// +// LIFECYCLE + +// applyOpts returns a structure of options +func applyOpts(token string, opt ...Opt) (*opts, error) { + o := new(opts) + o.token = token + for _, opt := range opt { + if err := opt(o); err != nil { + return nil, err + } + } + return o, nil +} + +//////////////////////////////////////////////////////////////////////////////// +// PUBLIC METHODS + +func WithCallback(fn CallbackFunc) Opt { + return func(o *opts) error { + o.callback = fn + return nil + } +} + +func WithDebug(v bool) Opt { + return func(o *opts) error { + o.debug = v + return nil + } +} diff --git a/pkg/ui/telegram/telegram.go b/pkg/ui/telegram/telegram.go index 052161a..e4b11d0 100644 --- a/pkg/ui/telegram/telegram.go +++ b/pkg/ui/telegram/telegram.go @@ -2,30 +2,54 @@ package telegram import ( "context" - "fmt" + "encoding/json" + "errors" + "log" // Packages telegram "github.com/go-telegram-bot-api/telegram-bot-api/v5" + llm "github.com/mutablelogic/go-llm" ) ///////////////////////////////////////////////////////////////////// // TYPES -type t struct { +type Client struct { *telegram.BotAPI + callback CallbackFunc +} + +type Message interface { + Sender() string + Text() string + Typing() error + Reply(context.Context, string, bool) error +} + +type message struct { + client *Client + sender string + text string + chatid int64 + messageid int } ///////////////////////////////////////////////////////////////////// // LIFECYCLE -func NewTelegram(token string) (*t, error) { - bot, err := telegram.NewBotAPI(token) +func New(token string, opts ...Opt) (*Client, error) { + opt, err := applyOpts(token, opts...) + if err != nil { + return nil, err + } + bot, err := telegram.NewBotAPI(opt.token) + bot.Debug = opt.debug if err != nil { return nil, err } // Create a new telegram instance - telegram := &t{bot} + telegram := &Client{bot, opt.callback} // Return the instance return telegram, nil @@ -34,16 +58,24 @@ func NewTelegram(token string) (*t, error) { ///////////////////////////////////////////////////////////////////// // PUBLIC METHODS -func (t *t) Run(ctx context.Context) error { - updates := t.GetUpdatesChan(telegram.NewUpdate(0)) +func (t *Client) Name() string { + return t.Self.UserName +} + +func (t *Client) Run(ctx context.Context) error { + config := telegram.NewUpdate(0) + config.Timeout = 60 + updates := t.GetUpdatesChan(config) FOR_LOOP: for { select { case <-ctx.Done(): break FOR_LOOP case evt := <-updates: - if evt.Message != nil && !evt.Message.IsCommand() { - t.handleMessage(evt.Message) + if evt.Message == nil { + // NO-OP + } else if err := t.handleMessage(ctx, evt.Message); err != nil { + log.Printf("Error: %v\n", err) } } } @@ -55,7 +87,82 @@ FOR_LOOP: ///////////////////////////////////////////////////////////////////// // PRIVATE METHODS -func (t *t) handleMessage(update *telegram.Message) { - fmt.Println("Received message from", update.From.UserName) - fmt.Println(" => ", update.Text) +func (t *Client) purgeIdleSessions() { + // TODO +} + +func (t *Client) handleMessage(ctx context.Context, update *telegram.Message) error { + // Check for command + if command := update.Command(); command != "" { + return t.handleCommand(ctx, command, update.CommandArguments()) + } + + // Check callback + if t.callback == nil { + return llm.ErrInternalServerError.With("No callback") + } + + // Make a new message + message := &message{ + client: t, + sender: update.From.UserName, + text: update.Text, + chatid: update.Chat.ID, + messageid: update.MessageID, + } + + // Callback + if err := t.callback(ctx, message); err != nil { + return errors.Join(err, message.Reply(ctx, err.Error(), false)) + } + + // Return success + return nil +} + +func (t *Client) handleCommand(ctx context.Context, cmd, args string) error { + // TODO + return nil +} + +///////////////////////////////////////////////////////////////////// +// MESSAGE + +func (message message) Sender() string { + return message.sender +} + +func (message message) Text() string { + return message.text +} + +func (message message) Typing() error { + action := telegram.NewChatAction(message.chatid, telegram.ChatTyping) + _, err := message.client.Send(action) + return err +} + +func (message message) Reply(ctx context.Context, text string, markdown bool) error { + mode := telegram.ModeMarkdownV2 + text = telegram.EscapeText(mode, text) + msg := telegram.NewMessage(message.chatid, text) + msg.ReplyToMessageID = message.messageid + msg.ParseMode = mode + _, err := message.client.Send(msg) + return err +} + +func (message message) MarshalJSON() ([]byte, error) { + return json.Marshal(map[string]interface{}{ + "sender": message.sender, + "text": message.text, + }) +} + +func (message message) String() string { + data, err := json.MarshalIndent(message, "", " ") + if err != nil { + return err.Error() + } + return string(data) }