diff --git a/server/config.ts b/server/config.ts index 6cd5be4..820f8b4 100644 --- a/server/config.ts +++ b/server/config.ts @@ -1,13 +1,14 @@ export const keywords = ['zenix', 'zenith', 'ai', 'bot', 'zen']; +export const keywordRegex = /\b(zenix|zenith|ai|bot|zen)\b/i; export const country = 'Greece'; export const city = 'Athens'; export const timezone = 'Europe/Athens'; export const speed = { - minDelay: 5, - maxDelay: 15, + minDelay: 3, + maxDelay: 8, speedMethod: 'divide', - speedFactor: 60, + speedFactor: 80, }; // export const statuses = ["online", "idle", "dnd", "offline"]; diff --git a/server/events/message-create/index.ts b/server/events/message-create/index.ts index 3b8be04..f834b6c 100644 --- a/server/events/message-create/index.ts +++ b/server/events/message-create/index.ts @@ -1,5 +1,5 @@ import { reply } from '~/utils/staggered-response'; -import { keywords } from '~/config'; +import { keywordRegex } from '~/config'; import logger from '~/lib/logger'; import type { WebhookChatMessage } from '~/types'; import { getMessages, getThreadMessages } from '~/lib/discourse'; @@ -7,6 +7,7 @@ import { updateStatus } from '~/lib/discourse'; import { generateResponse } from '~/utils/generate-response'; import type { GetSessionResponse } from '~~/client'; import { ratelimit, redisKeys } from '~/lib/kv'; +import { requestQueue } from '~/utils/request-queue'; export const name = 'chat_message'; export const once = false; @@ -34,9 +35,7 @@ export async function execute( } const isOwnDM = isDM && channel.id === myDMId; - const hasKeyword = keywords.some((kw) => - content.toLowerCase().includes(kw.toLowerCase()), - ); + const hasKeyword = keywordRegex.test(content); const isMentioned = content.includes(`<@${botUser.username}>`); logger.info( @@ -54,11 +53,15 @@ export async function execute( // thread_id, // ); - const messages = thread_id - ? await getThreadMessages(channel.id as number, botUser, thread_id) - : await getMessages(channel.id as number, botUser); - const result = await generateResponse(messages); - await reply(result, payload?.channel.id, thread_id); - - logger.info(`replied to ${payload.message.user.username}: ${result}`); + const priority = isDM ? 2 : (isMentioned ? 1 : 0); + + await requestQueue.add(async () => { + const messages = thread_id + ? await getThreadMessages(channel.id as number, botUser, thread_id) + : await getMessages(channel.id as number, botUser); + const result = await generateResponse(messages); + await reply(result, payload?.channel.id, thread_id); + + logger.info(`replied to ${payload.message.user.username}: ${result}`); + }, priority); } diff --git a/server/lib/discourse/messages.ts b/server/lib/discourse/messages.ts index 6196419..9623fa3 100644 --- a/server/lib/discourse/messages.ts +++ b/server/lib/discourse/messages.ts @@ -16,7 +16,7 @@ export async function getMessages( channel_id, }, query: { - page_size: 50, + page_size: 25, }, }); @@ -69,7 +69,7 @@ export async function getThreadMessages( thread_id: thread_id, }, query: { - page_size: 50, + page_size: 25, }, }); diff --git a/server/lib/kv.ts b/server/lib/kv.ts index 02258a2..9560a26 100644 --- a/server/lib/kv.ts +++ b/server/lib/kv.ts @@ -17,4 +17,6 @@ export const ratelimit = new Ratelimit({ export const redisKeys = { messageCount: (contextId: string) => `ctx:messageCount:${contextId}`, channelCount: (contextId: string) => `ctx:channelCount:${contextId}`, + responseCache: (messagesHash: string) => `cache:response:${messagesHash}`, + userContext: (userId: string) => `ctx:user:${userId}`, }; \ No newline at end of file diff --git a/server/utils/generate-response.ts b/server/utils/generate-response.ts index 3c6ad38..220cf5d 100644 --- a/server/utils/generate-response.ts +++ b/server/utils/generate-response.ts @@ -3,10 +3,21 @@ import { type CoreMessage, generateText, tool } from 'ai'; import { z } from 'zod'; import { env } from '~/env'; import { myProvider } from '~/lib/ai/providers'; +import { redis, redisKeys } from '~/lib/kv'; +import { createHash } from 'crypto'; export const generateResponse = async ( messages: CoreMessage[] ) => { + const messagesHash = createHash('sha256') + .update(JSON.stringify(messages.slice(-10))) + .digest('hex'); + + const cachedResponse = await redis.get(redisKeys.responseCache(messagesHash)); + if (cachedResponse && typeof cachedResponse === 'string') { + return cachedResponse; + } + const { text } = await generateText({ model: myProvider.languageModel('chat-model'), system: `Your real name is Zenith, but you go by Zenix, a chill, but slightly sassy guy at the age of 21. @@ -79,6 +90,9 @@ export const generateResponse = async ( }, }); - // Convert markdown to Slack markdown format - return text.replace(/\[(.*?)\]\((.*?)\)/g, '<$2|$1>').replace(/\*\*/g, '*'); + const result = text.replace(/\[(.*?)\]\((.*?)\)/g, '<$2|$1>').replace(/\*\*/g, '*'); + + await redis.setex(redisKeys.responseCache(messagesHash), 300, result); + + return result; }; diff --git a/server/utils/message-batch.ts b/server/utils/message-batch.ts new file mode 100644 index 0000000..e91d1a2 --- /dev/null +++ b/server/utils/message-batch.ts @@ -0,0 +1,83 @@ +interface BatchRequest { + channel_id: number; + thread_id?: number | null; + messages: string[]; + resolve: (value: void) => void; + reject: (error: Error) => void; +} + +class MessageBatcher { + private batches = new Map(); + private timers = new Map(); + private readonly BATCH_DELAY = 100; + private readonly MAX_BATCH_SIZE = 3; + + async addToBatch( + channel_id: number, + thread_id: number | null | undefined, + message: string + ): Promise { + return new Promise((resolve, reject) => { + const key = `${channel_id}:${thread_id || 'main'}`; + + if (!this.batches.has(key)) { + this.batches.set(key, []); + } + + const batch = this.batches.get(key)!; + batch.push({ + channel_id, + thread_id, + messages: [message], + resolve, + reject + }); + + if (batch.length >= this.MAX_BATCH_SIZE) { + this.processBatch(key); + } else { + if (this.timers.has(key)) { + clearTimeout(this.timers.get(key)!); + } + this.timers.set(key, setTimeout(() => this.processBatch(key), this.BATCH_DELAY)); + } + }); + } + + private async processBatch(key: string): Promise { + const batch = this.batches.get(key); + if (!batch || batch.length === 0) return; + + this.batches.delete(key); + if (this.timers.has(key)) { + clearTimeout(this.timers.get(key)!); + this.timers.delete(key); + } + + try { + const { sendMessage } = await import('~~/client'); + const { client } = await import('~/lib/discourse/client'); + + const combinedMessage = batch + .flatMap(req => req.messages) + .join(' '); + + await sendMessage({ + client, + path: { + channel_id: batch[0].channel_id, + }, + body: { + message: combinedMessage, + thread_id: batch[0].thread_id ?? undefined, + }, + }); + + batch.forEach(req => req.resolve()); + } catch (error) { + batch.forEach(req => req.reject(error as Error)); + } + } +} + +export const messageBatcher = new MessageBatcher(); \ No newline at end of file diff --git a/server/utils/request-queue.ts b/server/utils/request-queue.ts new file mode 100644 index 0000000..44c908c --- /dev/null +++ b/server/utils/request-queue.ts @@ -0,0 +1,75 @@ +interface QueueItem { + id: string; + task: () => Promise; + priority: number; + timestamp: number; +} + +class RequestQueue { + private queue: QueueItem[] = []; + private processing = false; + private readonly MAX_CONCURRENT = 2; + private activeCount = 0; + + async add(task: () => Promise, priority = 0): Promise { + return new Promise((resolve, reject) => { + const id = Math.random().toString(36).substring(7); + + this.queue.push({ + id, + task: async () => { + try { + const result = await task(); + resolve(result); + return result; + } catch (error) { + reject(error); + throw error; + } + }, + priority, + timestamp: Date.now() + }); + + this.queue.sort((a, b) => { + if (a.priority !== b.priority) { + return b.priority - a.priority; + } + return a.timestamp - b.timestamp; + }); + + this.process(); + }); + } + + private async process(): Promise { + if (this.processing || this.activeCount >= this.MAX_CONCURRENT || this.queue.length === 0) { + return; + } + + this.processing = true; + + while (this.queue.length > 0 && this.activeCount < this.MAX_CONCURRENT) { + const item = this.queue.shift(); + if (item) { + this.activeCount++; + item.task().finally(() => { + this.activeCount--; + this.process(); + }); + } + } + + this.processing = false; + } + + getQueueSize(): number { + return this.queue.length; + } + + getActiveCount(): number { + return this.activeCount; + } +} + +export const requestQueue = new RequestQueue(); \ No newline at end of file diff --git a/server/utils/staggered-response.ts b/server/utils/staggered-response.ts index e7bfa83..e4c042b 100644 --- a/server/utils/staggered-response.ts +++ b/server/utils/staggered-response.ts @@ -4,12 +4,18 @@ import logger from "~/lib/logger"; import { sendMessage } from "~~/client"; import { client } from '~/lib/discourse/client'; +const delayCache = new Map(); + const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); function calculateDelay(text: string): number { + if (delayCache.has(text)) { + return delayCache.get(text)!; + } + const { speedMethod, speedFactor } = speedConfig; - const length = text.length; + const baseSeconds = (() => { switch (speedMethod) { case "multiply": @@ -30,8 +36,14 @@ function calculateDelay(text: string): number { .filter((w) => /[.!?]$/.test(w)).length; const extraMs = punctuationCount * 500; - const totalMs = baseSeconds * 1000 + extraMs; - return Math.max(totalMs, 100); + const totalMs = Math.max(baseSeconds * 1000 + extraMs, 100); + + if (delayCache.size > 200) { + delayCache.clear(); + } + delayCache.set(text, totalMs); + + return totalMs; } export async function reply(reply: string, channel_id: number, thread_id?: number|null): Promise { diff --git a/server/utils/tokenize-messages.ts b/server/utils/tokenize-messages.ts index 6cb711c..45bfc00 100644 --- a/server/utils/tokenize-messages.ts +++ b/server/utils/tokenize-messages.ts @@ -1,18 +1,42 @@ import nlp from 'compromise'; +const sentenceCache = new Map(); +const normalizeCache = new Map(); + export function sentences(text: string): string[] { - return nlp(text) + if (sentenceCache.has(text)) { + return sentenceCache.get(text)!; + } + + const result = nlp(text) .sentences() .out('array') .map((s: string) => s.trim()); + + if (sentenceCache.size > 1000) { + sentenceCache.clear(); + } + sentenceCache.set(text, result); + return result; } export function normalize(input: string[]): string[] { - return input.map((s) => + const key = input.join('|'); + if (normalizeCache.has(key)) { + return normalizeCache.get(key)!; + } + + const result = input.map((s) => s .replace(/\b\w+(?:\s*\([^)]+\))*:\s*/gi, '') .toLowerCase() .trim() .replace(/[.,!?]+$/g, ''), ); + + if (normalizeCache.size > 500) { + normalizeCache.clear(); + } + normalizeCache.set(key, result); + return result; }