diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 0bc25865a..05ad12487 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -61,6 +61,11 @@ jobs: files: ./coverage/coverage.xml fail_ci_if_error: false + - name: Run the test suite with lower Faraday versions + run: FARADAY_VERSION=1.10.3 bundle install && bundle exec rspec + env: + OLLAMA_API_BASE: http://localhost:11434/v1 # dummy + publish: name: Build + Publish needs: test diff --git a/lib/ruby_llm/streaming.rb b/lib/ruby_llm/streaming.rb index d328e7b8d..b7017896e 100644 --- a/lib/ruby_llm/streaming.rb +++ b/lib/ruby_llm/streaming.rb @@ -12,9 +12,18 @@ def stream_response(connection, payload, &block) accumulator = StreamAccumulator.new connection.post stream_url, payload do |req| - req.options.on_data = handle_stream do |chunk| - accumulator.add chunk - block.call chunk + if req.options.respond_to?(:on_data) + # Handle Faraday 2.x streaming with on_data method + req.options.on_data = handle_stream do |chunk| + accumulator.add chunk + block.call chunk + end + else + # Handle Faraday 1.x streaming with :on_data key + req.options[:on_data] = handle_stream do |chunk| + accumulator.add chunk + block.call chunk + end end end @@ -29,19 +38,45 @@ def handle_stream(&block) private - def to_json_stream(&block) + def to_json_stream(&) buffer = String.new parser = EventStreamParser::Parser.new - proc do |chunk, _bytes, env| - RubyLLM.logger.debug "Received chunk: #{chunk}" + create_stream_processor(parser, buffer, &) + end - if error_chunk?(chunk) - handle_error_chunk(chunk, env) - elsif env&.status != 200 - handle_failed_response(chunk, buffer, env) + def create_stream_processor(parser, buffer, &) + if Faraday::VERSION.start_with?('1') + # Faraday 1.x: on_data receives (chunk, size) + legacy_stream_processor(parser, &) + else + # Faraday 2.x: on_data receives (chunk, bytes, env) + stream_processor(parser, buffer, &) + end + end + + def process_stream_chunk(chunk, parser, _env, &) + RubyLLM.logger.debug "Received chunk: #{chunk}" + + if error_chunk?(chunk) + handle_error_chunk(chunk, nil) + else + yield handle_sse(chunk, parser, nil, &) + end + end + + def legacy_stream_processor(parser, &block) + proc do |chunk, _size| + process_stream_chunk(chunk, parser, nil, &block) + end + end + + def stream_processor(parser, buffer, &block) + proc do |chunk, _bytes, env| + if env&.status == 200 + process_stream_chunk(chunk, parser, env, &block) else - yield handle_sse(chunk, parser, env, &block) + handle_failed_response(chunk, buffer, env) end end end diff --git a/ruby_llm.gemspec b/ruby_llm.gemspec index d45d97161..fbb1bbd6b 100644 --- a/ruby_llm.gemspec +++ b/ruby_llm.gemspec @@ -36,10 +36,10 @@ Gem::Specification.new do |spec| # Runtime dependencies spec.add_dependency 'base64' spec.add_dependency 'event_stream_parser', '~> 1' - spec.add_dependency 'faraday', '~> 2' - spec.add_dependency 'faraday-multipart', '~> 1' - spec.add_dependency 'faraday-net_http', '~> 3' - spec.add_dependency 'faraday-retry', '~> 2' + spec.add_dependency 'faraday', ENV['FARADAY_VERSION'] || '>= 1.10.0' + spec.add_dependency 'faraday-multipart', '>= 1' + spec.add_dependency 'faraday-net_http', '>= 1' + spec.add_dependency 'faraday-retry', '>= 1' spec.add_dependency 'marcel', '~> 1.0' spec.add_dependency 'zeitwerk', '~> 2' end