diff --git a/lib/ruby_llm/providers/openai/streaming.rb b/lib/ruby_llm/providers/openai/streaming.rb index ba3134475..36d05a5db 100644 --- a/lib/ruby_llm/providers/openai/streaming.rb +++ b/lib/ruby_llm/providers/openai/streaming.rb @@ -21,6 +21,20 @@ def build_chunk(data) output_tokens: data.dig('usage', 'completion_tokens') ) end + + def parse_streaming_error(data) + error_data = JSON.parse(data) + return unless error_data['error'] + + case error_data.dig('error', 'type') + when 'server_error' + [500, error_data['error']['message']] + when 'rate_limit_exceeded', 'insufficient_quota' + [429, error_data['error']['message']] + else + [400, error_data['error']['message']] + end + end end end end diff --git a/lib/ruby_llm/streaming.rb b/lib/ruby_llm/streaming.rb index 30501a375..b524aebd8 100644 --- a/lib/ruby_llm/streaming.rb +++ b/lib/ruby_llm/streaming.rb @@ -55,13 +55,13 @@ def create_stream_processor(parser, buffer, &) end end - def process_stream_chunk(chunk, parser, _env, &) + def process_stream_chunk(chunk, parser, env, &) RubyLLM.logger.debug "Received chunk: #{chunk}" if error_chunk?(chunk) - handle_error_chunk(chunk, nil) + handle_error_chunk(chunk, env) else - yield handle_sse(chunk, parser, nil, &) + yield handle_sse(chunk, parser, env, &) end end @@ -88,7 +88,16 @@ def error_chunk?(chunk) def handle_error_chunk(chunk, env) error_data = chunk.split("\n")[1].delete_prefix('data: ') status, _message = parse_streaming_error(error_data) - error_response = env.merge(body: JSON.parse(error_data), status: status) + parsed_data = JSON.parse(error_data) + + # Create a response-like object that works for both Faraday v1 and v2 + error_response = if env + env.merge(body: parsed_data, status: status) + else + # For Faraday v1, create a simple object that responds to .status and .body + Struct.new(:body, :status).new(parsed_data, status) + end + ErrorMiddleware.parse_error(provider: self, response: error_response) rescue JSON::ParserError => e RubyLLM.logger.debug "Failed to parse error chunk: #{e.message}" @@ -122,7 +131,16 @@ def handle_data(data) def handle_error_event(data, env) status, _message = parse_streaming_error(data) - error_response = env.merge(body: JSON.parse(data), status: status) + parsed_data = JSON.parse(data) + + # Create a response-like object that works for both Faraday v1 and v2 + error_response = if env + env.merge(body: parsed_data, status: status) + else + # For Faraday v1, create a simple object that responds to .status and .body + Struct.new(:body, :status).new(parsed_data, status) + end + ErrorMiddleware.parse_error(provider: self, response: error_response) rescue JSON::ParserError => e RubyLLM.logger.debug "Failed to parse error event: #{e.message}" diff --git a/spec/ruby_llm/chat_streaming_spec.rb b/spec/ruby_llm/chat_streaming_spec.rb index a2de23ad5..262951753 100644 --- a/spec/ruby_llm/chat_streaming_spec.rb +++ b/spec/ruby_llm/chat_streaming_spec.rb @@ -4,6 +4,7 @@ RSpec.describe RubyLLM::Chat do include_context 'with configured RubyLLM' + include StreamingErrorHelpers describe 'streaming responses' do CHAT_MODELS.each do |model_info| @@ -47,4 +48,83 @@ end end end + + describe 'Error handling' do + CHAT_MODELS.each do |model_info| + model = model_info[:model] + provider = model_info[:provider] + + context "with #{provider}/#{model}" do + let(:chat) { RubyLLM.chat(model: model, provider: provider) } + + describe 'Faraday version 1' do # rubocop:disable RSpec/NestedGroups + before do + stub_const('Faraday::VERSION', '1.10.0') + end + + it "#{provider}/#{model} supports handling streaming error chunks" do # rubocop:disable RSpec/ExampleLength + skip('Error handling not implemented yet') unless error_handling_supported?(provider) + + stub_error_response(provider, :chunk) + + chunks = [] + + expect do + chat.ask('Count from 1 to 3') do |chunk| + chunks << chunk + end + end.to raise_error(expected_error_for(provider)) + end + + it "#{provider}/#{model} supports handling streaming error events" do # rubocop:disable RSpec/ExampleLength + skip('Error handling not implemented yet') unless error_handling_supported?(provider) + + stub_error_response(provider, :event) + + chunks = [] + + expect do + chat.ask('Count from 1 to 3') do |chunk| + chunks << chunk + end + end.to raise_error(expected_error_for(provider)) + end + end + + describe 'Faraday version 2' do # rubocop:disable RSpec/NestedGroups + before do + stub_const('Faraday::VERSION', '2.0.0') + end + + it "#{provider}/#{model} supports handling streaming error chunks" do # rubocop:disable RSpec/ExampleLength + skip('Error handling not implemented yet') unless error_handling_supported?(provider) + + stub_error_response(provider, :chunk) + + chunks = [] + + expect do + chat.ask('Count from 1 to 3') do |chunk| + chunks << chunk + end + end.to raise_error(expected_error_for(provider)) + end + + it "#{provider}/#{model} supports handling streaming error events" do # rubocop:disable RSpec/ExampleLength + skip('Error handling not implemented yet') unless error_handling_supported?(provider) + + stub_error_response(provider, :event) + + chunks = [] + + expect do + chat.ask('Count from 1 to 3') do |chunk| + chunks << chunk + end + end.to raise_error(expected_error_for(provider)) + end + end + end + end + end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index c8286fd69..abc64e1b4 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -42,6 +42,7 @@ require 'fileutils' require 'ruby_llm' require 'webmock/rspec' +require_relative 'support/streaming_error_helpers' # VCR Configuration VCR.configure do |config| diff --git a/spec/support/streaming_error_helpers.rb b/spec/support/streaming_error_helpers.rb new file mode 100644 index 000000000..54d102734 --- /dev/null +++ b/spec/support/streaming_error_helpers.rb @@ -0,0 +1,111 @@ +# frozen_string_literal: true + +module StreamingErrorHelpers + ERROR_HANDLING_CONFIGS = { + anthropic: { + url: 'https://api.anthropic.com/v1/messages', + error_response: { + type: 'error', + error: { + type: 'overloaded_error', + message: 'Overloaded' + } + }, + chunk_status: 529, + expected_error: RubyLLM::OverloadedError + }, + openai: { + url: 'https://api.openai.com/v1/chat/completions', + error_response: { + error: { + message: 'The server is temporarily overloaded. Please try again later.', + type: 'server_error', + param: nil, + code: nil + } + }, + chunk_status: 500, + expected_error: RubyLLM::ServerError + }, + gemini: { + url: 'https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:streamGenerateContent?alt=sse', + error_response: { + error: { + code: 529, + message: 'Service overloaded - please try again later', + status: 'RESOURCE_EXHAUSTED' + } + }, + chunk_status: 529, + expected_error: RubyLLM::OverloadedError + }, + deepseek: { + url: 'https://api.deepseek.com/chat/completions', + error_response: { + error: { + message: 'Service overloaded - please try again later', + type: 'server_error', + param: nil, + code: nil + } + }, + chunk_status: 500, + expected_error: RubyLLM::ServerError + }, + openrouter: { + url: 'https://openrouter.ai/api/v1/chat/completions', + error_response: { + error: { + message: 'Service overloaded - please try again later', + type: 'server_error', + param: nil, + code: nil + } + }, + chunk_status: 500, + expected_error: RubyLLM::ServerError + }, + ollama: { + url: 'http://localhost:11434/v1/chat/completions', + error_response: { + error: { + message: 'Service overloaded - please try again later', + type: 'server_error', + param: nil, + code: nil + } + }, + chunk_status: 500, + expected_error: RubyLLM::ServerError + } + }.freeze + + def error_handling_supported?(provider) + ERROR_HANDLING_CONFIGS.key?(provider) + end + + def expected_error_for(provider) + ERROR_HANDLING_CONFIGS[provider][:expected_error] + end + + def stub_error_response(provider, type) + config = ERROR_HANDLING_CONFIGS[provider] + return unless config + + body = case type + when :chunk + "#{config[:error_response].to_json}\n\n" + when :event + "event: error\ndata: #{config[:error_response].to_json}\n\n" + end + + status = type == :chunk ? config[:chunk_status] : 200 + + stub_request(:post, config[:url]) + .to_return( + status: status, + body: body, + headers: { 'Content-Type' => 'text/event-stream' } + ) + end +end