Compare commits

...

2 Commits

Author SHA1 Message Date
Richard Feldman
7cf2265a97 Combine duplicated code in both if and else branches 2025-03-27 09:43:24 -04:00
Richard Feldman
88e6a957ef Handle 502 errors from Anthropic 2025-03-27 07:30:36 -04:00
2 changed files with 65 additions and 67 deletions

View File

@@ -1,6 +1,6 @@
mod supported_countries;
use std::{pin::Pin, str::FromStr};
use std::{fmt::Display, pin::Pin, str};
use anyhow::{anyhow, Context as _, Result};
use chrono::{DateTime, Utc};
@@ -8,7 +8,7 @@ use futures::{io::BufReader, stream::BoxStream, AsyncBufReadExt, AsyncReadExt, S
use http_client::http::{HeaderMap, HeaderValue};
use http_client::{AsyncBody, HttpClient, Method, Request as HttpRequest};
use serde::{Deserialize, Serialize};
use strum::{EnumIter, EnumString};
use strum::{EnumIter, EnumString, FromRepr};
use thiserror::Error;
use util::ResultExt as _;
@@ -276,30 +276,21 @@ pub async fn complete(
.send(request)
.await
.context("failed to send request to Anthropic")?;
if response.status().is_success() {
let mut body = Vec::new();
response
.body_mut()
.read_to_end(&mut body)
.await
.context("failed to read response body")?;
let status = response.status();
let mut body = Vec::new();
response
.body_mut()
.read_to_end(&mut body)
.await
.context("failed to read response body")?;
if status.is_success() {
let response_message: Response =
serde_json::from_slice(&body).context("failed to deserialize response body")?;
Ok(response_message)
} else {
let mut body = Vec::new();
response
.body_mut()
.read_to_end(&mut body)
.await
.context("failed to read response body")?;
let body_str =
std::str::from_utf8(&body).context("failed to parse response body as UTF-8")?;
Err(AnthropicError::Other(anyhow!(
"Failed to connect to API: {} {}",
response.status(),
body_str
)))
let body_str = str::from_utf8(&body).context("failed to parse response body as UTF-8")?;
Err(AnthropicError::from_response(status.as_u16(), body_str))
}
}
@@ -393,7 +384,8 @@ pub async fn stream_completion_with_rate_limit_info(
.send(request)
.await
.context("failed to send request to Anthropic")?;
if response.status().is_success() {
let status = response.status();
if status.is_success() {
let rate_limits = RateLimitInfo::from_headers(response.headers());
let reader = BufReader::new(response.into_body());
let stream = reader
@@ -420,20 +412,8 @@ pub async fn stream_completion_with_rate_limit_info(
.await
.context("failed to read response body")?;
let body_str =
std::str::from_utf8(&body).context("failed to parse response body as UTF-8")?;
match serde_json::from_str::<Event>(body_str) {
Ok(Event::Error { error }) => Err(AnthropicError::ApiError(error)),
Ok(_) => Err(AnthropicError::Other(anyhow!(
"Unexpected success response while expecting an error: '{body_str}'",
))),
Err(_) => Err(AnthropicError::Other(anyhow!(
"Failed to connect to API: {} {}",
response.status(),
body_str,
))),
}
let body_str = str::from_utf8(&body).context("failed to parse response body as UTF-8")?;
Err(AnthropicError::from_response(status.as_u16(), body_str))
}
}
@@ -694,48 +674,72 @@ pub struct MessageDelta {
#[derive(Error, Debug)]
pub enum AnthropicError {
#[error("an error occurred while interacting with the Anthropic API: {error_type}: {message}", error_type = .0.error_type, message = .0.message)]
#[error("an error occurred while interacting with the Anthropic API: {error_code}: {message}", error_code = .0.code, message = .0.message)]
ApiError(ApiError),
#[error("{0}")]
Other(#[from] anyhow::Error),
}
impl AnthropicError {
pub fn from_response(status: u16, body: &str) -> Self {
let error = match serde_json::from_str::<Event>(body) {
Ok(Event::Error { error }) => Self::ApiError(error),
result => match ApiErrorCode::from_repr(status) {
Some(error_code) => Self::ApiError(ApiError {
code: error_code,
message: body.into(),
}),
None => {
if result.is_ok() {
Self::Other(anyhow!(
"Unexpected success response while expecting an error: '{body}'",
))
} else {
Self::Other(anyhow!("Failed to connect to API: {status} {body}",))
}
}
},
};
error
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ApiError {
#[serde(rename = "type")]
pub error_type: String,
pub code: ApiErrorCode,
pub message: String,
}
/// An Anthropic API error code.
/// <https://docs.anthropic.com/en/api/errors#http-errors>
#[derive(Debug, PartialEq, Eq, Clone, Copy, EnumString)]
#[derive(Debug, PartialEq, Eq, Clone, Copy, EnumString, Serialize, Deserialize, FromRepr)]
#[serde(rename_all = "snake_case")]
#[repr(u16)]
#[strum(serialize_all = "snake_case")]
pub enum ApiErrorCode {
/// 400 - `invalid_request_error`: There was an issue with the format or content of your request.
InvalidRequestError,
InvalidRequestError = 400,
/// 401 - `authentication_error`: There's an issue with your API key.
AuthenticationError,
AuthenticationError = 401,
/// 403 - `permission_error`: Your API key does not have permission to use the specified resource.
PermissionError,
PermissionError = 403,
/// 404 - `not_found_error`: The requested resource was not found.
NotFoundError,
NotFoundError = 404,
/// 413 - `request_too_large`: Request exceeds the maximum allowed number of bytes.
RequestTooLarge,
RequestTooLarge = 413,
/// 429 - `rate_limit_error`: Your account has hit a rate limit.
RateLimitError,
RateLimitError = 429,
/// 500 - `api_error`: An unexpected error has occurred internal to Anthropic's systems.
ApiError,
ApiError = 500,
/// 502 - Bad Gateway. This is not in Anthropic's docs, but we have seen it in the wild.
BadGateway = 502,
/// 529 - `overloaded_error`: Anthropic's API is temporarily overloaded.
OverloadedError,
OverloadedError = 529,
}
impl ApiError {
pub fn code(&self) -> Option<ApiErrorCode> {
ApiErrorCode::from_str(&self.error_type).ok()
}
pub fn is_rate_limit_error(&self) -> bool {
matches!(self.error_type.as_str(), "rate_limit_error")
impl Display for ApiErrorCode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "HTTP {}", *self as u16)
}
}

View File

@@ -279,8 +279,8 @@ async fn perform_completion(
)
.await
.map_err(|err| match err {
anthropic::AnthropicError::ApiError(ref api_error) => match api_error.code() {
Some(anthropic::ApiErrorCode::RateLimitError) => {
anthropic::AnthropicError::ApiError(ref api_error) => match api_error.code {
anthropic::ApiErrorCode::RateLimitError => {
tracing::info!(
target: "upstream rate limit exceeded",
user_id = claims.user_id,
@@ -296,16 +296,10 @@ async fn perform_completion(
"Upstream Anthropic rate limit exceeded.".to_string(),
)
}
Some(anthropic::ApiErrorCode::InvalidRequestError) => {
Error::http(StatusCode::BAD_REQUEST, api_error.message.clone())
}
Some(anthropic::ApiErrorCode::OverloadedError) => {
Error::http(StatusCode::SERVICE_UNAVAILABLE, api_error.message.clone())
}
Some(_) => {
Error::http(StatusCode::INTERNAL_SERVER_ERROR, api_error.message.clone())
}
None => Error::Internal(anyhow!(err)),
code => match StatusCode::from_u16(code as u16) {
Ok(status_code) => Error::http(status_code, api_error.message.clone()),
Err(_) => Error::Internal(anyhow!(err)),
},
},
anthropic::AnthropicError::Other(err) => Error::Internal(err),
})?;