fix(providers): add circuit breaker for Responses API fallback

When the Responses API fails repeatedly (3 consecutive compatibility
errors), skip it and fall back directly to Chat Completions.  Unlike a
permanent disable, the circuit re-probes after 5 minutes so recovery
is automatic when the API comes back.  Success resets the counter.

Keyed per (model, reasoning_effort) so a failure with one model does
not affect others.
This commit is contained in:
Mohamed Elkholy 2026-04-16 00:11:43 -04:00 committed by chengyongru
parent 9b9e0964a2
commit 1ced8d4420
2 changed files with 121 additions and 3 deletions

View File

@ -5,6 +5,7 @@ from __future__ import annotations
import asyncio import asyncio
import json import json
import hashlib import hashlib
import time
import importlib.util import importlib.util
import os import os
import secrets import secrets
@ -143,6 +144,10 @@ def _uses_openrouter_attribution(spec: "ProviderSpec | None", api_base: str | No
return bool(api_base and "openrouter" in api_base.lower()) return bool(api_base and "openrouter" in api_base.lower())
_RESPONSES_FAILURE_THRESHOLD = 3
_RESPONSES_PROBE_INTERVAL_S = 300 # 5 minutes
def _is_direct_openai_base(api_base: str | None) -> bool: def _is_direct_openai_base(api_base: str | None) -> bool:
"""Return True for direct OpenAI endpoints, not generic OpenAI-compatible gateways.""" """Return True for direct OpenAI endpoints, not generic OpenAI-compatible gateways."""
if not api_base: if not api_base:
@ -189,6 +194,11 @@ class OpenAICompatProvider(LLMProvider):
max_retries=0, max_retries=0,
) )
# Responses API circuit breaker: skip after repeated failures,
# probe again after _RESPONSES_PROBE_INTERVAL_S seconds.
self._responses_failures: dict[str, int] = {}
self._responses_tripped_at: dict[str, float] = {}
def _setup_env(self, api_key: str, api_base: str | None) -> None: def _setup_env(self, api_key: str, api_base: str | None) -> None:
"""Set environment variables based on provider spec.""" """Set environment variables based on provider spec."""
spec = self._spec spec = self._spec
@ -414,9 +424,41 @@ class OpenAICompatProvider(LLMProvider):
return False return False
model_name = (model or self.default_model).lower() model_name = (model or self.default_model).lower()
wants = False
if reasoning_effort and reasoning_effort.lower() != "none": if reasoning_effort and reasoning_effort.lower() != "none":
wants = True
elif any(token in model_name for token in ("gpt-5", "o1", "o3", "o4")):
wants = True
if not wants:
return False
# Circuit breaker: skip after repeated failures, probe periodically.
key = f"{model_name}:{reasoning_effort or ''}"
failures = self._responses_failures.get(key, 0)
if failures >= _RESPONSES_FAILURE_THRESHOLD:
tripped = self._responses_tripped_at.get(key, 0.0)
if (time.monotonic() - tripped) < _RESPONSES_PROBE_INTERVAL_S:
return False
# Half-open: allow one probe attempt
return True return True
return any(token in model_name for token in ("gpt-5", "o1", "o3", "o4"))
def _record_responses_failure(self, model: str | None, reasoning_effort: str | None) -> None:
key = f"{(model or self.default_model).lower()}:{reasoning_effort or ''}"
count = self._responses_failures.get(key, 0) + 1
self._responses_failures[key] = count
if count >= _RESPONSES_FAILURE_THRESHOLD:
self._responses_tripped_at[key] = time.monotonic()
from loguru import logger
logger.warning(
"Responses API circuit open for {} — falling back to Chat Completions",
key,
)
def _record_responses_success(self, model: str | None, reasoning_effort: str | None) -> None:
key = f"{(model or self.default_model).lower()}:{reasoning_effort or ''}"
self._responses_failures.pop(key, None)
self._responses_tripped_at.pop(key, None)
@staticmethod @staticmethod
def _should_fallback_from_responses_error(e: Exception) -> bool: def _should_fallback_from_responses_error(e: Exception) -> bool:
@ -915,10 +957,13 @@ class OpenAICompatProvider(LLMProvider):
messages, tools, model, max_tokens, temperature, messages, tools, model, max_tokens, temperature,
reasoning_effort, tool_choice, reasoning_effort, tool_choice,
) )
return parse_response_output(await self._client.responses.create(**body)) result = parse_response_output(await self._client.responses.create(**body))
self._record_responses_success(model, reasoning_effort)
return result
except Exception as responses_error: except Exception as responses_error:
if not self._should_fallback_from_responses_error(responses_error): if not self._should_fallback_from_responses_error(responses_error):
raise raise
self._record_responses_failure(model, reasoning_effort)
kwargs = self._build_kwargs( kwargs = self._build_kwargs(
messages, tools, model, max_tokens, temperature, messages, tools, model, max_tokens, temperature,
@ -965,6 +1010,7 @@ class OpenAICompatProvider(LLMProvider):
_timed_stream(), _timed_stream(),
on_content_delta, on_content_delta,
) )
self._record_responses_success(model, reasoning_effort)
return LLMResponse( return LLMResponse(
content=content or None, content=content or None,
tool_calls=tool_calls, tool_calls=tool_calls,
@ -975,6 +1021,7 @@ class OpenAICompatProvider(LLMProvider):
except Exception as responses_error: except Exception as responses_error:
if not self._should_fallback_from_responses_error(responses_error): if not self._should_fallback_from_responses_error(responses_error):
raise raise
self._record_responses_failure(model, reasoning_effort)
kwargs = self._build_kwargs( kwargs = self._build_kwargs(
messages, tools, model, max_tokens, temperature, messages, tools, model, max_tokens, temperature,

View File

@ -0,0 +1,71 @@
"""Tests for Responses API circuit breaker in OpenAICompatProvider."""
import time
import pytest
from nanobot.providers.openai_compat_provider import (
OpenAICompatProvider,
_RESPONSES_FAILURE_THRESHOLD,
_RESPONSES_PROBE_INTERVAL_S,
)
@pytest.fixture()
def provider():
"""A direct-OpenAI provider with Responses API support."""
p = OpenAICompatProvider.__new__(OpenAICompatProvider)
p.default_model = "gpt-5"
p._spec = type("Spec", (), {"name": "openai"})()
p._effective_base = "https://api.openai.com/v1"
p._responses_failures = {}
p._responses_tripped_at = {}
return p
def test_responses_api_available_by_default(provider):
assert provider._should_use_responses_api("gpt-5", None) is True
def test_circuit_opens_after_threshold(provider):
for _ in range(_RESPONSES_FAILURE_THRESHOLD):
provider._record_responses_failure("gpt-5", None)
assert provider._should_use_responses_api("gpt-5", None) is False
def test_circuit_does_not_affect_other_models(provider):
for _ in range(_RESPONSES_FAILURE_THRESHOLD):
provider._record_responses_failure("gpt-5", None)
assert provider._should_use_responses_api("o4-mini", None) is True
def test_success_resets_circuit(provider):
for _ in range(_RESPONSES_FAILURE_THRESHOLD):
provider._record_responses_failure("gpt-5", None)
assert provider._should_use_responses_api("gpt-5", None) is False
provider._record_responses_success("gpt-5", None)
assert provider._should_use_responses_api("gpt-5", None) is True
def test_probe_after_interval(provider, monkeypatch):
for _ in range(_RESPONSES_FAILURE_THRESHOLD):
provider._record_responses_failure("gpt-5", None)
assert provider._should_use_responses_api("gpt-5", None) is False
# Fast-forward past the probe interval
key = "gpt-5:"
provider._responses_tripped_at[key] = time.monotonic() - _RESPONSES_PROBE_INTERVAL_S - 1
assert provider._should_use_responses_api("gpt-5", None) is True
def test_below_threshold_still_allows(provider):
provider._record_responses_failure("gpt-5", None)
provider._record_responses_failure("gpt-5", None)
assert provider._should_use_responses_api("gpt-5", None) is True
def test_reasoning_effort_keyed_separately(provider):
for _ in range(_RESPONSES_FAILURE_THRESHOLD):
provider._record_responses_failure("o3", "high")
assert provider._should_use_responses_api("o3", "high") is False
assert provider._should_use_responses_api("o3", "low") is True