Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 53 additions & 57 deletions apps/rowboat/app/lib/agents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { Message, AssistantMessage, AssistantMessageWithToolCalls, ToolMessage }
// Make everything available as a promise
const PROVIDER_API_KEY = process.env.PROVIDER_API_KEY || process.env.OPENAI_API_KEY || '';
const PROVIDER_BASE_URL = process.env.PROVIDER_BASE_URL || undefined;
const MODEL = process.env.PROVIDER_DEFAULT_MODEL || 'gpt-4o';
const MODEL = process.env.PROVIDER_DEFAULT_MODEL || 'gpt-5';

const openai = createOpenAI({
apiKey: PROVIDER_API_KEY,
Expand Down Expand Up @@ -517,10 +517,10 @@ ${config.description}

## About You

${config.outputVisibility === 'user_facing'
? CONVERSATION_TYPE_INSTRUCTIONS()
: config.type === 'pipeline'
? PIPELINE_TYPE_INSTRUCTIONS()
${config.outputVisibility === 'user_facing'
? CONVERSATION_TYPE_INSTRUCTIONS()
: config.type === 'pipeline'
? PIPELINE_TYPE_INSTRUCTIONS()
: TASK_TYPE_INSTRUCTIONS()}

## Instructions
Expand All @@ -535,12 +535,12 @@ ${CHILD_TRANSFER_RELATED_INSTRUCTIONS}
`;

let { sanitized, entities } = sanitizeTextWithMentions(instructions, workflow, config);

// Remove agent transfer instructions for pipeline agents
if (config.type === 'pipeline') {
sanitized = sanitized.replace(CHILD_TRANSFER_RELATED_INSTRUCTIONS, '');
}

agentLogger.log(`instructions: ${JSON.stringify(sanitized)}`);
agentLogger.log(`mentions: ${JSON.stringify(entities)}`);

Expand All @@ -561,11 +561,7 @@ ${CHILD_TRANSFER_RELATED_INSTRUCTIONS}
name: config.name,
instructions: sanitized,
tools: agentTools,
model: aisdk(openai(config.model)),
// model: config.model,
modelSettings: {
temperature: 0.0,
}
model: aisdk(openai(config.model))
});
agentLogger.log(`created agent`);

Expand Down Expand Up @@ -626,7 +622,7 @@ function getStartOfTurnAgentName(
}
}
return stack;
}
}

logger = logger.child(`getStartOfTurnAgentName`);
const startAgentStack = createAgentCallStack(messages);
Expand All @@ -640,7 +636,7 @@ function getStartOfTurnAgentName(
logger.log(`last agent ${lastAgentName} not found in agent config, returning start agent: ${workflow.startAgent}`);
return workflow.startAgent;
}

// For other agents, check control type
switch (lastAgentConfig.controlType) {
case 'retain':
Expand Down Expand Up @@ -757,7 +753,7 @@ function ensureSystemMessage(logger: PrefixLogger, messages: z.infer<typeof Mess

Basic context:
- The date-time right now is ${new Date().toISOString()}`;

messages[0].content = defaultContext;
logger.log(`updated system message with default context: ${messages[0].content}`);
}
Expand All @@ -781,12 +777,12 @@ function mapConfig(workflow: z.infer<typeof Workflow>): {
...acc,
[prompt.name]: prompt
}), {});

const pipelineConfig: Record<string, z.infer<typeof WorkflowPipeline>> = (workflow.pipelines || []).reduce((acc, pipeline) => ({
...acc,
[pipeline.name]: pipeline
}), {});

return { agentConfig, toolConfig, promptConfig, pipelineConfig };
}

Expand Down Expand Up @@ -815,12 +811,12 @@ function createTools(
): Record<string, Tool> {
const tools: Record<string, Tool> = {};
const toolLogger = logger.child('createTools');

toolLogger.log(`=== CREATING ${Object.keys(toolConfig).length} TOOLS ===`);

for (const [toolName, config] of Object.entries(toolConfig)) {
toolLogger.log(`creating tool: ${toolName} (type: ${config.mockTool ? 'mock' : config.isMcp ? 'mcp' : config.isComposio ? 'composio' : 'webhook'})`);

if (config.mockTool) {
tools[toolName] = createMockTool(logger, config);
toolLogger.log(`✓ created mock tool: ${toolName}`);
Expand All @@ -835,7 +831,7 @@ function createTools(
toolLogger.log(`✓ created webhook tool: ${toolName} (fallback)`);
}
}

toolLogger.log(`=== TOOL CREATION COMPLETE ===`);
return tools;
}
Expand Down Expand Up @@ -869,12 +865,12 @@ function createAgents(
// create agents
for (const [agentName, config] of Object.entries(agentConfig)) {
agentsLogger.log(`creating agent: ${agentName} (type: ${config.outputVisibility}, control: ${config.controlType})`);

// Pipeline agents get special handling:
// - Different instruction template (PIPELINE_TYPE_INSTRUCTIONS)
// - Filtered mentions (tools only, no agents)
// - No agent transfer instructions

const { agent, entities } = createAgent(
logger,
projectId,
Expand All @@ -884,7 +880,7 @@ function createAgents(
promptConfig,
);
agents[agentName] = agent;

// Add pipeline entities to the agent's available mentions (unless it's a pipeline agent itself)
// Pipeline agents cannot reference other agents or pipelines, only tools
let agentEntities = entities;
Expand All @@ -894,7 +890,7 @@ function createAgents(
} else {
agentsLogger.log(`${agentName} (pipeline agent) can reference: ${entities.length} entities only`);
}

mentions[agentName] = agentEntities;
originalInstructions[agentName] = agent.instructions as string;
// handoffs will be set after all agents are created
Expand All @@ -906,17 +902,17 @@ function createAgents(
for (const [agentName, agent] of Object.entries(agents)) {
const connectedAgentNames = (mentions[agentName] || []).filter(e => e.type === 'agent').map(e => e.name);
const connectedPipelineNames = (mentions[agentName] || []).filter(e => e.type === 'pipeline').map(e => e.name);

// Pipeline agents have no agent handoffs (filtered out in validatePipelineAgentMentions)
// They only have tool connections, no agent transfers allowed

// Filter out pipeline agents from being handoff targets
// Only allow handoffs to non-pipeline agents
const validAgentNames = connectedAgentNames.filter(name => {
const targetConfig = agentConfig[name];
return targetConfig && targetConfig.type !== 'pipeline';
});

// Convert pipeline mentions to handoffs to the first agent in each pipeline
const pipelineFirstAgents: string[] = [];
for (const pipelineName of connectedPipelineNames) {
Expand All @@ -929,10 +925,10 @@ function createAgents(
}
}
}

// Combine regular agent handoffs with pipeline first agents
const allHandoffTargets = [...validAgentNames, ...pipelineFirstAgents];

// Only store Agent objects in handoffs (filter out Handoff if present)
const agentHandoffs = allHandoffTargets.map(e => agents[e]).filter(Boolean) as Agent[];
agent.handoffs = agentHandoffs;
Expand All @@ -944,30 +940,30 @@ function createAgents(
agentsLogger.log(`=== SETTING UP PIPELINE CHAINS ===`);
for (const [pipelineName, pipeline] of Object.entries(pipelineConfig)) {
agentsLogger.log(`setting up pipeline chain: ${pipelineName} -> [${pipeline.agents.join(' -> ')}]`);

for (let i = 0; i < pipeline.agents.length; i++) {
const currentAgentName = pipeline.agents[i];
const currentAgent = agents[currentAgentName];

if (!currentAgent) {
agentsLogger.log(`warning: pipeline agent ${currentAgentName} not found in agent config`);
continue;
}

// Pipeline agents have NO handoffs - they just execute once
currentAgent.handoffs = [];

// Add pipeline metadata to the agent for easy lookup
(currentAgent as any).pipelineName = pipelineName;
(currentAgent as any).pipelineIndex = i;
(currentAgent as any).isLastInPipeline = i === pipeline.agents.length - 1;

// Update originalHandoffs to reflect the final pipeline state
originalHandoffs[currentAgentName] = [];

agentsLogger.log(`pipeline agent ${currentAgentName} has no handoffs (will be controlled by pipeline controller)`);
agentsLogger.log(`pipeline agent ${currentAgentName} metadata: pipeline=${pipelineName}, index=${i}, isLast=${i === pipeline.agents.length - 1}`);

// Configure pipeline agents to relinquish control after completing their task
const agentConfigObj = agentConfig[currentAgentName];
if (agentConfigObj && agentConfigObj.type === 'pipeline') {
Expand Down Expand Up @@ -1023,13 +1019,13 @@ function maybeInjectGiveUpControlInstructions(
injectLogger.log(`isInternal: ${isInternal}`);
injectLogger.log(`isPipeline: ${isPipeline}`);
injectLogger.log(`isRetain: ${isRetain}`);

// For pipeline agents, they should continue pipeline execution, so no need to inject give up control
if (isPipeline) {
injectLogger.log(`Pipeline agent ${childAgentName} continues pipeline execution, no give up control needed`);
return;
}

if (!isInternal && isRetain) {
// inject give up control instructions
agents[childAgentName].instructions = getGiveUpControlInstructions(agents[childAgentName], parentAgentName, injectLogger);
Expand All @@ -1056,20 +1052,20 @@ function handlePipelineAgentExecution(
const pipelineName = (currentAgent as any).pipelineName;
const pipelineIndex = (currentAgent as any).pipelineIndex;
const isLastInPipeline = (currentAgent as any).isLastInPipeline;

if (!pipelineName || pipelineIndex === undefined) {
logger.log(`warning: pipeline agent ${currentAgentName} missing pipeline metadata`);
return { nextAgentName: null, shouldContinue: false };
}

const pipeline = pipelineConfig[pipelineName];
if (!pipeline) {
logger.log(`warning: pipeline ${pipelineName} not found in config`);
return { nextAgentName: null, shouldContinue: false };
}

let nextAgentName: string | null = null;

if (!isLastInPipeline) {
// Not the last agent - continue to next agent in pipeline
nextAgentName = pipeline.agents[pipelineIndex + 1];
Expand All @@ -1079,24 +1075,24 @@ function handlePipelineAgentExecution(
nextAgentName = stack.pop()!;
logger.log(`-- pipeline controller: ${currentAgentName} -> ${nextAgentName} (pipeline ${pipelineName} complete, returning to caller)`);
}

if (nextAgentName) {
// Create transfer events for pipeline continuation
const transferEvents = createTransferEvents(currentAgentName, nextAgentName);
const [transferStart, transferComplete] = transferEvents;

// Add messages to turn
turnMsgs.push(transferStart);
turnMsgs.push(transferComplete);

// Update transfer counter
transferCounter.increment(currentAgentName, nextAgentName);

logger.log(`switched to agent: ${nextAgentName} || reason: pipeline controller transfer`);

return { nextAgentName, shouldContinue: true, transferEvents };
}

return { nextAgentName: null, shouldContinue: false };
}

Expand Down Expand Up @@ -1133,7 +1129,7 @@ export async function* streamResponse(
logger.log(`pipelines: ${Object.keys(pipelineConfig).length} (${Object.keys(pipelineConfig).join(', ')})`);
logger.log(`start agent: ${workflow.startAgent}`);
logger.log(`=== END CONFIGURATION ===`);

const stack: string[] = [];
logger.log(`initialized stack: ${JSON.stringify(stack)}`);

Expand All @@ -1149,15 +1145,15 @@ export async function* streamResponse(
// get the agent that should be starting this turn
const startOfTurnAgentName = getStartOfTurnAgentName(logger, messages, agentConfig, workflow);
logger.log(`🎯 START AGENT DECISION: ${startOfTurnAgentName}`);

let agentName = startOfTurnAgentName;

// start the turn loop
const usageTracker = new UsageTracker();
const turnMsgs: z.infer<typeof Message>[] = [...messages];

logger.log('🎬 STARTING AGENT TURN');

// stack-based agent execution loop
let iter = 0;
const MAXTURNITERATIONS = 10;
Expand Down Expand Up @@ -1243,7 +1239,7 @@ export async function* streamResponse(
// handle handoff event
if (event.name === 'handoff_occurred' && event.item.type === 'handoff_output_item') {
eventLogger.log(`🔄 HANDOFF EVENT: ${agentName} -> ${event.item.targetAgent.name}`);

// skip if its the same agent
if (agentName === event.item.targetAgent.name) {
eventLogger.log(`⚠️ SKIPPING: handoff to same agent: ${agentName}`);
Expand Down Expand Up @@ -1298,10 +1294,10 @@ export async function* streamResponse(
loopLogger.log(`📚 STACK PUSH: ${agentName} (new agent ${newAgentName} is internal/pipeline)`);
loopLogger.log(`📚 STACK NOW: [${stack.join(' -> ')}]`);
}

// set this as the new agent name
agentName = newAgentName;

}

// handle tool call result
Expand Down Expand Up @@ -1365,14 +1361,14 @@ export async function* streamResponse(
transferCounter,
createTransferEvents
);

// Emit transfer events if they exist
if (result.transferEvents) {
const [transferStart, transferComplete] = result.transferEvents;
yield* emitEvent(eventLogger, transferStart);
yield* emitEvent(eventLogger, transferComplete);
}

if (result.shouldContinue) {
agentName = result.nextAgentName!;
// Run the turn from the next agent
Expand All @@ -1388,7 +1384,7 @@ export async function* streamResponse(
agentName = workflow.startAgent;
loopLogger.log(`-- using start agent: ${agentName} || reason: ${current} is an internal agent, it put out a message and it has a control type of ${currentAgentConfig?.controlType}, hence the flow of control needs to return to the start agent`);
}

// Only emit transfer events if we're actually changing agents
if (agentName !== current) {
loopLogger.log(`-- stack is now: ${JSON.stringify(stack)}`);
Expand Down
6 changes: 3 additions & 3 deletions apps/rowboat/app/lib/copilot/copilot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import { composio, getTool } from "../composio/composio";

const PROVIDER_API_KEY = process.env.PROVIDER_API_KEY || process.env.OPENAI_API_KEY || '';
const PROVIDER_BASE_URL = process.env.PROVIDER_BASE_URL || undefined;
const COPILOT_MODEL = process.env.PROVIDER_COPILOT_MODEL || 'gpt-4.1';
const AGENT_MODEL = process.env.PROVIDER_DEFAULT_MODEL || 'gpt-4.1';
const COPILOT_MODEL = process.env.PROVIDER_COPILOT_MODEL || 'gpt-5';
const AGENT_MODEL = process.env.PROVIDER_DEFAULT_MODEL || 'gpt-5';

const WORKFLOW_SCHEMA = JSON.stringify(zodToJsonSchema(Workflow));

Expand Down Expand Up @@ -138,7 +138,7 @@ async function searchRelevantTools(query: string): Promise<string> {
}));

// Format the response
const toolConfigs = workflowTools.map(tool =>
const toolConfigs = workflowTools.map(tool =>
`**${tool.name}**:\n\`\`\`json\n${JSON.stringify(tool, null, 2)}\n\`\`\``
).join('\n\n');

Expand Down
Loading