Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions server/config.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
export const keywords = ['zenix', 'zenith', 'ai', 'bot', 'zen'];
export const keywordRegex = /\b(zenix|zenith|ai|bot|zen)\b/i;
export const country = 'Greece';
export const city = 'Athens';
export const timezone = 'Europe/Athens';

export const speed = {
minDelay: 5,
maxDelay: 15,
minDelay: 3,
maxDelay: 8,
speedMethod: 'divide',
speedFactor: 60,
speedFactor: 80,
};

// export const statuses = ["online", "idle", "dnd", "offline"];
Expand Down
25 changes: 14 additions & 11 deletions server/events/message-create/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { reply } from '~/utils/staggered-response';
import { keywords } from '~/config';
import { keywordRegex } from '~/config';
import logger from '~/lib/logger';
import type { WebhookChatMessage } from '~/types';
import { getMessages, getThreadMessages } from '~/lib/discourse';
import { updateStatus } from '~/lib/discourse';
import { generateResponse } from '~/utils/generate-response';
import type { GetSessionResponse } from '~~/client';
import { ratelimit, redisKeys } from '~/lib/kv';
import { requestQueue } from '~/utils/request-queue';

export const name = 'chat_message';
export const once = false;
Expand Down Expand Up @@ -34,9 +35,7 @@ export async function execute(
}

const isOwnDM = isDM && channel.id === myDMId;
const hasKeyword = keywords.some((kw) =>
content.toLowerCase().includes(kw.toLowerCase()),
);
const hasKeyword = keywordRegex.test(content);
const isMentioned = content.includes(`<@${botUser.username}>`);

logger.info(
Expand All @@ -54,11 +53,15 @@ export async function execute(
// thread_id,
// );

const messages = thread_id
? await getThreadMessages(channel.id as number, botUser, thread_id)
: await getMessages(channel.id as number, botUser);
const result = await generateResponse(messages);
await reply(result, payload?.channel.id, thread_id);

logger.info(`replied to ${payload.message.user.username}: ${result}`);
const priority = isDM ? 2 : (isMentioned ? 1 : 0);

await requestQueue.add(async () => {
const messages = thread_id
? await getThreadMessages(channel.id as number, botUser, thread_id)
: await getMessages(channel.id as number, botUser);
const result = await generateResponse(messages);
await reply(result, payload?.channel.id, thread_id);

logger.info(`replied to ${payload.message.user.username}: ${result}`);
}, priority);
}
4 changes: 2 additions & 2 deletions server/lib/discourse/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export async function getMessages(
channel_id,
},
query: {
page_size: 50,
page_size: 25,
},
});

Expand Down Expand Up @@ -69,7 +69,7 @@ export async function getThreadMessages(
thread_id: thread_id,
},
query: {
page_size: 50,
page_size: 25,
},
});

Expand Down
2 changes: 2 additions & 0 deletions server/lib/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ export const ratelimit = new Ratelimit({
export const redisKeys = {
messageCount: (contextId: string) => `ctx:messageCount:${contextId}`,
channelCount: (contextId: string) => `ctx:channelCount:${contextId}`,
responseCache: (messagesHash: string) => `cache:response:${messagesHash}`,
userContext: (userId: string) => `ctx:user:${userId}`,
};
18 changes: 16 additions & 2 deletions server/utils/generate-response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,21 @@ import { type CoreMessage, generateText, tool } from 'ai';
import { z } from 'zod';
import { env } from '~/env';
import { myProvider } from '~/lib/ai/providers';
import { redis, redisKeys } from '~/lib/kv';
import { createHash } from 'crypto';

export const generateResponse = async (
messages: CoreMessage[]
) => {
const messagesHash = createHash('sha256')
.update(JSON.stringify(messages.slice(-10)))
.digest('hex');

const cachedResponse = await redis.get(redisKeys.responseCache(messagesHash));
if (cachedResponse && typeof cachedResponse === 'string') {
return cachedResponse;
}

const { text } = await generateText({
model: myProvider.languageModel('chat-model'),
system: `Your real name is Zenith, but you go by Zenix, a chill, but slightly sassy guy at the age of 21.
Expand Down Expand Up @@ -79,6 +90,9 @@ export const generateResponse = async (
},
});

// Convert markdown to Slack markdown format
return text.replace(/\[(.*?)\]\((.*?)\)/g, '<$2|$1>').replace(/\*\*/g, '*');
const result = text.replace(/\[(.*?)\]\((.*?)\)/g, '<$2|$1>').replace(/\*\*/g, '*');

await redis.setex(redisKeys.responseCache(messagesHash), 300, result);

return result;
};
83 changes: 83 additions & 0 deletions server/utils/message-batch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
interface BatchRequest {
channel_id: number;
thread_id?: number | null;
messages: string[];
resolve: (value: void) => void;
reject: (error: Error) => void;
}

class MessageBatcher {
private batches = new Map<string, BatchRequest[]>();
private timers = new Map<string, NodeJS.Timeout>();
private readonly BATCH_DELAY = 100;
private readonly MAX_BATCH_SIZE = 3;

async addToBatch(
channel_id: number,
thread_id: number | null | undefined,
message: string
): Promise<void> {
return new Promise((resolve, reject) => {
const key = `${channel_id}:${thread_id || 'main'}`;

if (!this.batches.has(key)) {
this.batches.set(key, []);
}

const batch = this.batches.get(key)!;
batch.push({
channel_id,
thread_id,
messages: [message],
resolve,
reject
});

if (batch.length >= this.MAX_BATCH_SIZE) {
this.processBatch(key);
} else {
if (this.timers.has(key)) {
clearTimeout(this.timers.get(key)!);
}
this.timers.set(key, setTimeout(() => this.processBatch(key), this.BATCH_DELAY));
}
});
}

private async processBatch(key: string): Promise<void> {
const batch = this.batches.get(key);
if (!batch || batch.length === 0) return;

this.batches.delete(key);
if (this.timers.has(key)) {
clearTimeout(this.timers.get(key)!);
this.timers.delete(key);
}

try {
const { sendMessage } = await import('~~/client');
const { client } = await import('~/lib/discourse/client');

const combinedMessage = batch
.flatMap(req => req.messages)
.join(' ');

await sendMessage({
client,
path: {
channel_id: batch[0].channel_id,
},
body: {
message: combinedMessage,
thread_id: batch[0].thread_id ?? undefined,
},
});

batch.forEach(req => req.resolve());
} catch (error) {
batch.forEach(req => req.reject(error as Error));
}
}
}

export const messageBatcher = new MessageBatcher();
75 changes: 75 additions & 0 deletions server/utils/request-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
interface QueueItem {
id: string;
task: () => Promise<any>;
priority: number;
timestamp: number;
}

class RequestQueue {
private queue: QueueItem[] = [];
private processing = false;
private readonly MAX_CONCURRENT = 2;
private activeCount = 0;

async add<T>(task: () => Promise<T>, priority = 0): Promise<T> {
return new Promise((resolve, reject) => {
const id = Math.random().toString(36).substring(7);

this.queue.push({
id,
task: async () => {
try {
const result = await task();
resolve(result);
return result;
} catch (error) {
reject(error);
throw error;
}
},
priority,
timestamp: Date.now()
});

this.queue.sort((a, b) => {
if (a.priority !== b.priority) {
return b.priority - a.priority;
}
return a.timestamp - b.timestamp;
});

this.process();
});
}

private async process(): Promise<void> {
if (this.processing || this.activeCount >= this.MAX_CONCURRENT || this.queue.length === 0) {
return;
}

this.processing = true;

while (this.queue.length > 0 && this.activeCount < this.MAX_CONCURRENT) {
const item = this.queue.shift();
if (item) {
this.activeCount++;
item.task().finally(() => {
this.activeCount--;
this.process();
});
}
}

this.processing = false;
}

getQueueSize(): number {
return this.queue.length;
}

getActiveCount(): number {
return this.activeCount;
}
}

export const requestQueue = new RequestQueue();
18 changes: 15 additions & 3 deletions server/utils/staggered-response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@ import logger from "~/lib/logger";
import { sendMessage } from "~~/client";
import { client } from '~/lib/discourse/client';

const delayCache = new Map<string, number>();

const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

function calculateDelay(text: string): number {
if (delayCache.has(text)) {
return delayCache.get(text)!;
}

const { speedMethod, speedFactor } = speedConfig;

const length = text.length;

const baseSeconds = (() => {
switch (speedMethod) {
case "multiply":
Expand All @@ -30,8 +36,14 @@ function calculateDelay(text: string): number {
.filter((w) => /[.!?]$/.test(w)).length;
const extraMs = punctuationCount * 500;

const totalMs = baseSeconds * 1000 + extraMs;
return Math.max(totalMs, 100);
const totalMs = Math.max(baseSeconds * 1000 + extraMs, 100);

if (delayCache.size > 200) {
delayCache.clear();
}
delayCache.set(text, totalMs);

return totalMs;
}

export async function reply(reply: string, channel_id: number, thread_id?: number|null): Promise<void> {
Expand Down
28 changes: 26 additions & 2 deletions server/utils/tokenize-messages.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,42 @@
import nlp from 'compromise';

const sentenceCache = new Map<string, string[]>();
const normalizeCache = new Map<string, string[]>();

export function sentences(text: string): string[] {
return nlp(text)
if (sentenceCache.has(text)) {
return sentenceCache.get(text)!;
}

const result = nlp(text)
.sentences()
.out('array')
.map((s: string) => s.trim());

if (sentenceCache.size > 1000) {
sentenceCache.clear();
}
sentenceCache.set(text, result);
return result;
}

export function normalize(input: string[]): string[] {
return input.map((s) =>
const key = input.join('|');
if (normalizeCache.has(key)) {
return normalizeCache.get(key)!;
}

const result = input.map((s) =>
s
.replace(/\b\w+(?:\s*\([^)]+\))*:\s*/gi, '')
.toLowerCase()
.trim()
.replace(/[.,!?]+$/g, ''),
);

if (normalizeCache.size > 500) {
normalizeCache.clear();
}
normalizeCache.set(key, result);
return result;
}