Compare commits

...

21 Commits

Author SHA1 Message Date
Conrad Irwin
b5cc703c7c de-de-bug 2025-11-17 22:18:31 -07:00
Conrad Irwin
46aaa03ace tidy up 2025-11-17 22:17:36 -07:00
Conrad Irwin
f766fa2e9f Merge branch 'main' into feature/context-server-transports 2025-11-17 21:10:22 -07:00
Conrad Irwin
977666f8bf Simplify header handling 2025-11-17 21:10:17 -07:00
Conrad Irwin
5ca2645da6 Merge branch 'main' into feature/context-server-transports 2025-11-10 22:44:32 -07:00
Artur Shirokov
411669f763 Merge branch 'main' into feature/context-server-transports 2025-10-06 20:34:10 +01:00
Artur Shirokov
7211a672b6 Removed sse placeholder. 2025-10-05 18:41:38 +01:00
Artur Shirokov
6a9a891985 Bearer token support is done, tested with github mcp server. 2025-10-05 18:38:38 +01:00
Artur Shirokov
6526936f96 configure MCP modal now uses the same json editor for both local and remote. 2025-10-05 10:40:10 +01:00
Artur Shirokov
55ddcba3cb Now using gpui executor instead smol::spawn. 2025-09-30 18:58:18 +01:00
Artur Shirokov
4e56e1b8b7 First working version. 2025-09-27 13:27:46 +01:00
Artur Shirokov
4b895e71f7 Added more detailed loggin to client.rs. 2025-09-27 13:27:46 +01:00
Artur Shirokov
06b18ac607 Added proper headers. 2025-09-27 13:27:46 +01:00
Artur Shirokov
3b2647f182 Right now it at least shows connected but isn't recieving the tools. 2025-09-27 13:27:46 +01:00
Artur Shirokov
f9d1cc8735 Improving the logging. 2025-09-27 13:27:46 +01:00
Artur Shirokov
1e9689b0b2 Added agent_ui support for remote server, ignore the fact that the actual button is off a bit, ill fix tat later. 2025-09-27 13:27:46 +01:00
Artur Shirokov
7523fb0442 Removed the custom modal, checking if the rest is working. 2025-09-27 13:27:46 +01:00
Artur Shirokov
551ef4b6f3 Solving the errors from the new code. 2025-09-27 13:27:46 +01:00
Artur Shirokov
c56ead48cb wip 2025-09-27 13:27:46 +01:00
Artur Shirokov
08c594966f feat(context_server): Add HTTP/SSE transports and settings
This commit introduces HTTP and SSE transport implementations for the
`context_server` crate, allowing Zed to communicate with remote context
providers.

The new transports are built on the existing `http_client` and `gpui`
executor, adhering to the project's architectural patterns.

To support configuration of these new transports, the settings system
has been updated:

- A `Remote` variant has been added to the `ContextServerSettings`
  enum, allowing users to configure remote servers in `settings.json`
  with a URL.
- The project logic has been updated to initialize remote context
  servers using the new `ContextServer::from_url` constructor.
- The agent configuration UI now includes an "Add Remote Server"
  option and a dedicated modal for adding and editing remote server
  configurations.

Unit tests have been added for the new transports and for the logic
that handles remote server configurations.
2025-09-27 13:27:37 +01:00
Artur Shirokov
59a9ec6fb3 feat(context_server): Add HTTP and SSE transport implementations
This commit adds HTTP and SSE transport implementations to the `context_server` crate, allowing it to communicate with remote context providers over these protocols.

The new transports are:
- `HttpTransport`: A simple request/response transport over HTTP.
- `SseTransport`: A transport for receiving Server-Sent Events (SSE).

Both transports are built on top of the existing `http_client` crate and use `gpui::spawn` to perform network operations on a background thread, as is the pattern in the rest of the Zed application. This avoids introducing a new Tokio runtime and adheres to the project's architecture.

A `build_transport` function has been added to construct the appropriate transport based on the URL scheme.

Unit tests using `FakeHttpClient` have been added for both transports to ensure their correctness.
2025-09-27 13:27:22 +01:00
13 changed files with 714 additions and 99 deletions

1
Cargo.lock generated
View File

@@ -3690,6 +3690,7 @@ dependencies = [
"collections",
"futures 0.3.31",
"gpui",
"http_client",
"log",
"net",
"parking_lot",

View File

@@ -247,37 +247,58 @@ impl AgentConnection for AcpConnection {
let default_mode = self.default_mode.clone();
let cwd = cwd.to_path_buf();
let context_server_store = project.read(cx).context_server_store().read(cx);
let mcp_servers = if project.read(cx).is_local() {
context_server_store
.configured_server_ids()
.iter()
.filter_map(|id| {
let configuration = context_server_store.configuration_for_server(id)?;
let command = configuration.command();
Some(acp::McpServer::Stdio {
name: id.0.to_string(),
command: command.path.clone(),
args: command.args.clone(),
env: if let Some(env) = command.env.as_ref() {
env.iter()
.map(|(name, value)| acp::EnvVariable {
name: name.clone(),
value: value.clone(),
meta: None,
})
.collect()
} else {
vec![]
},
let mcp_servers =
if project.read(cx).is_local() {
context_server_store
.configured_server_ids()
.iter()
.filter_map(|id| {
let configuration = context_server_store.configuration_for_server(id)?;
match &*configuration {
project::context_server_store::ContextServerConfiguration::Custom {
command,
..
}
| project::context_server_store::ContextServerConfiguration::Extension {
command,
..
} => Some(acp::McpServer::Stdio {
name: id.0.to_string(),
command: command.path.clone(),
args: command.args.clone(),
env: if let Some(env) = command.env.as_ref() {
env.iter()
.map(|(name, value)| acp::EnvVariable {
name: name.clone(),
value: value.clone(),
meta: None,
})
.collect()
} else {
vec![]
},
}),
project::context_server_store::ContextServerConfiguration::Http {
url,
headers,
} => Some(acp::McpServer::Http {
name: id.0.to_string(),
url: url.to_string(),
headers: headers.iter().map(|(name, value)| acp::HttpHeader {
name: name.clone(),
value: value.clone(),
meta: None,
}).collect(),
}),
}
})
})
.collect()
} else {
// In SSH projects, the external agent is running on the remote
// machine, and currently we only run MCP servers on the local
// machine. So don't pass any MCP servers to the agent in that case.
Vec::new()
};
.collect()
} else {
// In SSH projects, the external agent is running on the remote
// machine, and currently we only run MCP servers on the local
// machine. So don't pass any MCP servers to the agent in that case.
Vec::new()
};
cx.spawn(async move |cx| {
let response = conn

View File

@@ -1,5 +1,5 @@
mod add_llm_provider_modal;
mod configure_context_server_modal;
pub mod configure_context_server_modal;
mod configure_context_server_tools_modal;
mod manage_profiles_modal;
mod tool_picker;
@@ -46,10 +46,12 @@ pub(crate) use configure_context_server_modal::ConfigureContextServerModal;
pub(crate) use configure_context_server_tools_modal::ConfigureContextServerToolsModal;
pub(crate) use manage_profiles_modal::ManageProfilesModal;
use crate::{
AddContextServer,
agent_configuration::add_llm_provider_modal::{AddLlmProviderModal, LlmCompatibleProvider},
use crate::agent_configuration::add_llm_provider_modal::{
AddLlmProviderModal, LlmCompatibleProvider,
};
use gpui::actions;
actions!(agent_ui, [AddRemoteContextServer]);
pub struct AgentConfiguration {
fs: Arc<dyn Fs>,
@@ -553,7 +555,14 @@ impl AgentConfiguration {
move |window, cx| {
Some(ContextMenu::build(window, cx, |menu, _window, _cx| {
menu.entry("Add Custom Server", None, {
|window, cx| window.dispatch_action(AddContextServer.boxed_clone(), cx)
|window, cx| {
window.dispatch_action(crate::AddContextServer.boxed_clone(), cx)
}
})
.entry("Add Remote Server", None, {
|window, cx| {
window.dispatch_action(AddRemoteContextServer.boxed_clone(), cx)
}
})
.entry("Install from Extensions", None, {
|window, cx| {
@@ -651,7 +660,7 @@ impl AgentConfiguration {
let is_running = matches!(server_status, ContextServerStatus::Running);
let item_id = SharedString::from(context_server_id.0.clone());
// Servers without a configuration can only be provided by extensions.
let provided_by_extension = server_configuration.is_none_or(|config| {
let provided_by_extension = server_configuration.as_ref().is_none_or(|config| {
matches!(
config.as_ref(),
ContextServerConfiguration::Extension { .. }
@@ -707,7 +716,10 @@ impl AgentConfiguration {
"Server is stopped.",
),
};
let is_remote = server_configuration
.as_ref()
.map(|config| matches!(config.as_ref(), ContextServerConfiguration::Http { .. }))
.unwrap_or(false);
let context_server_configuration_menu = PopoverMenu::new("context-server-config-menu")
.trigger_with_tooltip(
IconButton::new("context-server-config-menu", IconName::Settings)
@@ -729,15 +741,27 @@ impl AgentConfiguration {
let context_server_id = context_server_id.clone();
let language_registry = language_registry.clone();
let workspace = workspace.clone();
let is_remote = is_remote;
move |window, cx| {
ConfigureContextServerModal::show_modal_for_existing_server(
context_server_id.clone(),
language_registry.clone(),
workspace.clone(),
window,
cx,
)
.detach_and_log_err(cx);
if is_remote {
crate::agent_configuration::configure_context_server_modal::ConfigureContextServerModal::show_modal_for_existing_server(
context_server_id.clone(),
language_registry.clone(),
workspace.clone(),
window,
cx,
)
.detach();
} else {
ConfigureContextServerModal::show_modal_for_existing_server(
context_server_id.clone(),
language_registry.clone(),
workspace.clone(),
window,
cx,
)
.detach();
}
}
}).when(tool_count > 0, |this| this.entry("View Tools", None, {
let context_server_id = context_server_id.clone();

View File

@@ -4,6 +4,7 @@ use std::{
};
use anyhow::{Context as _, Result};
use collections::HashMap;
use context_server::{ContextServerCommand, ContextServerId};
use editor::{Editor, EditorElement, EditorStyle};
use gpui::{
@@ -20,6 +21,7 @@ use project::{
project_settings::{ContextServerSettings, ProjectSettings},
worktree_store::WorktreeStore,
};
use serde::Deserialize;
use settings::{Settings as _, update_settings_file};
use theme::ThemeSettings;
use ui::{
@@ -33,10 +35,16 @@ use crate::AddContextServer;
enum ConfigurationTarget {
New,
NewRemote,
Existing {
id: ContextServerId,
command: ContextServerCommand,
},
ExistingHttp {
id: ContextServerId,
url: String,
headers: HashMap<String, String>,
},
Extension {
id: ContextServerId,
repository_url: Option<SharedString>,
@@ -47,9 +55,11 @@ enum ConfigurationTarget {
enum ConfigurationSource {
New {
editor: Entity<Editor>,
is_remote: bool,
},
Existing {
editor: Entity<Editor>,
is_remote: bool,
},
Extension {
id: ContextServerId,
@@ -97,6 +107,11 @@ impl ConfigurationSource {
match target {
ConfigurationTarget::New => ConfigurationSource::New {
editor: create_editor(context_server_input(None), jsonc_language, window, cx),
is_remote: false,
},
ConfigurationTarget::NewRemote => ConfigurationSource::New {
editor: create_editor(context_server_http_input(None), jsonc_language, window, cx),
is_remote: true,
},
ConfigurationTarget::Existing { id, command } => ConfigurationSource::Existing {
editor: create_editor(
@@ -105,6 +120,20 @@ impl ConfigurationSource {
window,
cx,
),
is_remote: false,
},
ConfigurationTarget::ExistingHttp {
id,
url,
headers: auth,
} => ConfigurationSource::Existing {
editor: create_editor(
context_server_http_input(Some((id, url, auth))),
jsonc_language,
window,
cx,
),
is_remote: true,
},
ConfigurationTarget::Extension {
id,
@@ -141,16 +170,30 @@ impl ConfigurationSource {
fn output(&self, cx: &mut App) -> Result<(ContextServerId, ContextServerSettings)> {
match self {
ConfigurationSource::New { editor } | ConfigurationSource::Existing { editor } => {
parse_input(&editor.read(cx).text(cx)).map(|(id, command)| {
(
id,
ContextServerSettings::Custom {
enabled: true,
command,
},
)
})
ConfigurationSource::New { editor, is_remote }
| ConfigurationSource::Existing { editor, is_remote } => {
if *is_remote {
parse_http_input(&editor.read(cx).text(cx)).map(|(id, url, auth)| {
(
id,
ContextServerSettings::Http {
enabled: true,
url,
headers: auth,
},
)
})
} else {
parse_input(&editor.read(cx).text(cx)).map(|(id, command)| {
(
id,
ContextServerSettings::Custom {
enabled: true,
command,
},
)
})
}
}
ConfigurationSource::Extension {
id,
@@ -212,6 +255,66 @@ fn context_server_input(existing: Option<(ContextServerId, ContextServerCommand)
)
}
fn context_server_http_input(
existing: Option<(ContextServerId, String, HashMap<String, String>)>,
) -> String {
let (name, url, headers) = match existing {
Some((id, url, headers)) => {
let header = if headers.is_empty() {
r#"// "Authorization": "Bearer <token>"#.to_string()
} else {
let json = serde_json::to_string_pretty(&headers).unwrap();
let mut lines = json.split("\n").collect::<Vec<_>>();
if lines.len() > 1 {
lines.remove(0);
lines.pop();
}
lines
.into_iter()
.map(|line| format!(" {}", line.to_string()))
.collect::<String>()
};
(id.0.to_string(), url, header)
}
None => (
"some-remote-server".to_string(),
"https://example.com/mcp".to_string(),
r#"// "Authorization": "Bearer <token>"#.to_string(),
),
};
format!(
r#"{{
/// The name of your remote MCP server
"{name}": {{
/// The URL of the remote MCP server
"url": "{url}"
"headers": {{
/// Any headers to send along
{headers}
}}
}}
}}"#
)
}
fn parse_http_input(text: &str) -> Result<(ContextServerId, String, HashMap<String, String>)> {
#[derive(Deserialize)]
struct Temp {
url: String,
#[serde(default)]
headers: HashMap<String, String>,
}
let value: HashMap<String, Temp> = serde_json_lenient::from_str(text)?;
if value.len() != 1 {
anyhow::bail!("Expected exactly one context server configuration");
}
let (key, value) = value.into_iter().next().unwrap();
Ok((ContextServerId(key.into()), value.url, value.headers))
}
fn resolve_context_server_extension(
id: ContextServerId,
worktree_store: Entity<WorktreeStore>,
@@ -257,6 +360,20 @@ pub struct ConfigureContextServerModal {
}
impl ConfigureContextServerModal {
pub fn new_remote_server(
project: Entity<project::Project>,
workspace: WeakEntity<Workspace>,
window: &mut Window,
cx: &mut App,
) -> Task<Result<()>> {
let target = ConfigurationTarget::NewRemote;
let language_registry = project.read(cx).languages().clone();
window.spawn(cx, async move |mut cx| {
Self::show_modal(target, language_registry, workspace, &mut cx).await
})
}
pub fn register(
workspace: &mut Workspace,
language_registry: Arc<LanguageRegistry>,
@@ -312,6 +429,15 @@ impl ConfigureContextServerModal {
id: server_id,
command,
}),
ContextServerSettings::Http {
enabled: _,
url,
headers,
} => Some(ConfigurationTarget::ExistingHttp {
id: server_id,
url,
headers,
}),
ContextServerSettings::Extension { .. } => {
match workspace
.update(cx, |workspace, cx| {
@@ -353,8 +479,9 @@ impl ConfigureContextServerModal {
state: State::Idle,
original_server_id: match &target {
ConfigurationTarget::Existing { id, .. } => Some(id.clone()),
ConfigurationTarget::ExistingHttp { id, .. } => Some(id.clone()),
ConfigurationTarget::Extension { id, .. } => Some(id.clone()),
ConfigurationTarget::New => None,
ConfigurationTarget::New | ConfigurationTarget::NewRemote => None,
},
source: ConfigurationSource::from_target(
target,
@@ -481,7 +608,7 @@ impl ModalView for ConfigureContextServerModal {}
impl Focusable for ConfigureContextServerModal {
fn focus_handle(&self, cx: &App) -> FocusHandle {
match &self.source {
ConfigurationSource::New { editor } => editor.focus_handle(cx),
ConfigurationSource::New { editor, .. } => editor.focus_handle(cx),
ConfigurationSource::Existing { editor, .. } => editor.focus_handle(cx),
ConfigurationSource::Extension { editor, .. } => editor
.as_ref()
@@ -527,9 +654,10 @@ impl ConfigureContextServerModal {
}
fn render_modal_content(&self, cx: &App) -> AnyElement {
// All variants now use single editor approach
let editor = match &self.source {
ConfigurationSource::New { editor } => editor,
ConfigurationSource::Existing { editor } => editor,
ConfigurationSource::New { editor, .. } => editor,
ConfigurationSource::Existing { editor, .. } => editor,
ConfigurationSource::Extension { editor, .. } => {
let Some(editor) = editor else {
return div().into_any_element();

View File

@@ -45,7 +45,9 @@ use serde::{Deserialize, Serialize};
use settings::{LanguageModelSelection, Settings as _, SettingsStore};
use std::any::TypeId;
use crate::agent_configuration::{ConfigureContextServerModal, ManageProfilesModal};
use crate::agent_configuration::{
AddRemoteContextServer, ConfigureContextServerModal, ManageProfilesModal,
};
pub use crate::agent_panel::{AgentPanel, ConcreteAssistantPanelDelegate};
pub use crate::inline_assistant::InlineAssistant;
pub use agent_diff::{AgentDiffPane, AgentDiffToolbar};
@@ -274,6 +276,19 @@ pub fn init(
ConfigureContextServerModal::register(workspace, language_registry.clone(), window, cx)
})
.detach();
cx.observe_new(move |workspace: &mut workspace::Workspace, _window, _| {
workspace.register_action({
move |workspace, _: &AddRemoteContextServer, window, cx| {
crate::agent_configuration::configure_context_server_modal::ConfigureContextServerModal::new_remote_server(
workspace.project().clone(),
workspace.weak_handle(),
window,
cx,
).detach();
}
});
})
.detach();
cx.observe_new(ManageProfilesModal::register).detach();
// Update command palette filter based on AI settings

View File

@@ -12,7 +12,7 @@ workspace = true
path = "src/context_server.rs"
[features]
test-support = []
test-support = ["gpui/test-support"]
[dependencies]
anyhow.workspace = true
@@ -20,6 +20,7 @@ async-trait.workspace = true
collections.workspace = true
futures.workspace = true
gpui.workspace = true
http_client = { workspace = true, features = ["test-support"] }
log.workspace = true
net.workspace = true
parking_lot.workspace = true
@@ -32,3 +33,6 @@ smol.workspace = true
tempfile.workspace = true
url = { workspace = true, features = ["serde"] }
util.workspace = true
[dev-dependencies]
gpui = { workspace = true, features = ["test-support"] }

View File

@@ -6,15 +6,19 @@ pub mod test;
pub mod transport;
pub mod types;
use collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::{fmt::Display, path::PathBuf};
use anyhow::Result;
use client::Client;
use gpui::AsyncApp;
use gpui::{App, AsyncApp};
use parking_lot::RwLock;
pub use settings::ContextServerCommand;
use url::Url;
use crate::transport::HttpTransport;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ContextServerId(pub Arc<str>);
@@ -52,6 +56,25 @@ impl ContextServer {
}
}
pub fn http(
id: ContextServerId,
endpoint: &Url,
headers: HashMap<String, String>,
cx: &App,
) -> Result<Self> {
let http_client = cx.http_client();
let transport = match endpoint.scheme() {
"http" | "https" => {
log::info!("Using HTTP transport for {}", endpoint);
let transport = HttpTransport::new(http_client, endpoint.to_string(), headers, cx);
Arc::new(transport) as _
}
_ => anyhow::bail!("unsupported MCP url scheme {}", endpoint.scheme()),
};
Ok(Self::new(id, transport))
}
pub fn new(id: ContextServerId, transport: Arc<dyn crate::transport::Transport>) -> Self {
Self {
id,

View File

@@ -1,11 +1,12 @@
pub mod http;
mod stdio_transport;
use std::pin::Pin;
use anyhow::Result;
use async_trait::async_trait;
use futures::Stream;
use std::pin::Pin;
pub use http::*;
pub use stdio_transport::*;
#[async_trait]

View File

@@ -0,0 +1,271 @@
use anyhow::{Result, anyhow};
use async_trait::async_trait;
use collections::HashMap;
use futures::{Stream, StreamExt};
use gpui::{App, BackgroundExecutor};
use http_client::{AsyncBody, HttpClient, Request, Response, http::Method};
use parking_lot::Mutex as SyncMutex;
use smol::channel;
use std::{pin::Pin, sync::Arc};
use crate::transport::Transport;
// Constants from MCP spec
const HEADER_SESSION_ID: &str = "Mcp-Session-Id";
const EVENT_STREAM_MIME_TYPE: &str = "text/event-stream";
const JSON_MIME_TYPE: &str = "application/json";
/// HTTP Transport with session management and SSE support
pub struct HttpTransport {
http_client: Arc<dyn HttpClient>,
endpoint: String,
session_id: Arc<SyncMutex<Option<String>>>,
executor: BackgroundExecutor,
response_tx: channel::Sender<String>,
response_rx: channel::Receiver<String>,
error_tx: channel::Sender<String>,
error_rx: channel::Receiver<String>,
// Track if we've sent the initialize response
initialized: Arc<SyncMutex<bool>>,
// Authentication headers to include in requests
headers: HashMap<String, String>,
}
impl HttpTransport {
pub fn new(
http_client: Arc<dyn HttpClient>,
endpoint: String,
headers: HashMap<String, String>,
cx: &App,
) -> Self {
let (response_tx, response_rx) = channel::unbounded();
let (error_tx, error_rx) = channel::unbounded();
Self {
http_client,
executor: cx.background_executor().clone(),
endpoint,
session_id: Arc::new(SyncMutex::new(None)),
response_tx,
response_rx,
error_tx,
error_rx,
initialized: Arc::new(SyncMutex::new(false)),
headers,
}
}
/// Send a message and handle the response based on content type
async fn send_message(&self, message: String) -> Result<()> {
// Check if this is an initialize request
let is_initialize = message.contains("\"method\":\"initialize\"");
let is_notification =
!message.contains("\"id\":") || message.contains("notifications/initialized");
let mut request_builder = Request::builder()
.method(Method::POST)
.uri(&self.endpoint)
.header("Content-Type", JSON_MIME_TYPE)
.header(
"Accept",
format!("{}, {}", JSON_MIME_TYPE, EVENT_STREAM_MIME_TYPE),
);
for (key, value) in &self.headers {
request_builder = request_builder.header(key.as_str(), value.as_str());
}
// Add session ID if we have one (except for initialize)
if !is_initialize {
if let Some(ref session_id) = *self.session_id.lock() {
request_builder = request_builder.header(HEADER_SESSION_ID, session_id.as_str());
}
}
let request = request_builder.body(AsyncBody::from(message.into_bytes()))?;
let mut response = self.http_client.send(request).await?;
// Handle different response types based on status and content-type
match response.status() {
status if status.is_success() => {
// Check content type
let content_type = response
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok());
// Extract session ID from response headers if present
if let Some(session_id) = response
.headers()
.get(HEADER_SESSION_ID)
.and_then(|v| v.to_str().ok())
{
*self.session_id.lock() = Some(session_id.to_string());
log::debug!("Session ID set: {}", session_id);
}
match content_type {
Some(ct) if ct.starts_with(JSON_MIME_TYPE) => {
// JSON response - read and forward immediately
let mut body = String::new();
futures::AsyncReadExt::read_to_string(response.body_mut(), &mut body)
.await?;
// Only send non-empty responses
if !body.is_empty() {
self.response_tx
.send(body)
.await
.map_err(|_| anyhow!("Failed to send JSON response"))?;
}
}
Some(ct) if ct.starts_with(EVENT_STREAM_MIME_TYPE) => {
// SSE stream - set up streaming
self.setup_sse_stream(response).await?;
// Mark as initialized after setting up the first SSE stream
if is_initialize {
*self.initialized.lock() = true;
}
}
_ => {
// For notifications, 202 Accepted with no content type is ok
if is_notification && status.as_u16() == 202 {
log::debug!("Notification accepted");
} else {
return Err(anyhow!("Unexpected content type: {:?}", content_type));
}
}
}
}
status if status.as_u16() == 202 => {
// Accepted - notification acknowledged, no response needed
log::debug!("Notification accepted");
}
_ => {
let mut error_body = String::new();
futures::AsyncReadExt::read_to_string(response.body_mut(), &mut error_body).await?;
self.error_tx
.send(format!("HTTP {}: {}", response.status(), error_body))
.await
.map_err(|_| anyhow!("Failed to send error"))?;
}
}
Ok(())
}
/// Set up SSE streaming from the response
async fn setup_sse_stream(&self, mut response: Response<AsyncBody>) -> Result<()> {
let response_tx = self.response_tx.clone();
let error_tx = self.error_tx.clone();
// Spawn a task to handle the SSE stream
smol::spawn(async move {
let reader = futures::io::BufReader::new(response.body_mut());
let mut lines = futures::AsyncBufReadExt::lines(reader);
let mut data_buffer = Vec::new();
let mut in_message = false;
while let Some(line_result) = lines.next().await {
match line_result {
Ok(line) => {
if line.is_empty() {
// Empty line signals end of event
if !data_buffer.is_empty() {
let message = data_buffer.join("\n");
// Filter out ping messages and empty data
if !message.trim().is_empty() && message != "ping" {
if let Err(e) = response_tx.send(message).await {
log::error!("Failed to send SSE message: {}", e);
break;
}
}
data_buffer.clear();
}
in_message = false;
} else if let Some(data) = line.strip_prefix("data: ") {
// Handle data lines
let data = data.trim();
if !data.is_empty() {
// Check if this is a ping message
if data == "ping" {
log::trace!("Received SSE ping");
continue;
}
data_buffer.push(data.to_string());
in_message = true;
}
} else if line.starts_with("event:")
|| line.starts_with("id:")
|| line.starts_with("retry:")
{
// Ignore other SSE fields
continue;
} else if in_message {
// Continuation of data
data_buffer.push(line);
}
}
Err(e) => {
let _ = error_tx.send(format!("SSE stream error: {}", e)).await;
break;
}
}
}
})
.detach();
Ok(())
}
}
#[async_trait]
impl Transport for HttpTransport {
async fn send(&self, message: String) -> Result<()> {
self.send_message(message).await
}
fn receive(&self) -> Pin<Box<dyn Stream<Item = String> + Send>> {
Box::pin(self.response_rx.clone())
}
fn receive_err(&self) -> Pin<Box<dyn Stream<Item = String> + Send>> {
Box::pin(self.error_rx.clone())
}
}
impl Drop for HttpTransport {
fn drop(&mut self) {
// Try to cleanup session on drop
let http_client = self.http_client.clone();
let endpoint = self.endpoint.clone();
let session_id = self.session_id.lock().clone();
let headers = self.headers.clone();
if let Some(session_id) = session_id {
self.executor
.spawn(async move {
let mut request_builder = Request::builder()
.method(Method::DELETE)
.uri(&endpoint)
.header(HEADER_SESSION_ID, &session_id);
// Add authentication headers if present
for (key, value) in headers {
request_builder = request_builder.header(key.as_str(), value.as_str());
}
let request = request_builder.body(AsyncBody::empty());
if let Ok(request) = request {
let _ = http_client.send(request).await;
}
})
.detach();
}
}
}

View File

@@ -990,6 +990,9 @@ impl ExtensionImports for WasmState {
command: None,
settings: Some(settings),
})?),
project::project_settings::ContextServerSettings::Http { .. } => {
bail!("remote context server settings not supported in 0.6.0")
}
}
}
_ => {

View File

@@ -99,13 +99,18 @@ pub enum ContextServerConfiguration {
command: ContextServerCommand,
settings: serde_json::Value,
},
Http {
url: url::Url,
headers: HashMap<String, String>,
},
}
impl ContextServerConfiguration {
pub fn command(&self) -> &ContextServerCommand {
pub fn command(&self) -> Option<&ContextServerCommand> {
match self {
ContextServerConfiguration::Custom { command } => command,
ContextServerConfiguration::Extension { command, .. } => command,
ContextServerConfiguration::Custom { command } => Some(command),
ContextServerConfiguration::Extension { command, .. } => Some(command),
ContextServerConfiguration::Http { .. } => None,
}
}
@@ -142,6 +147,14 @@ impl ContextServerConfiguration {
}
}
}
ContextServerSettings::Http {
enabled: _,
url,
headers: auth,
} => {
let url = url::Url::parse(&url).log_err()?;
Some(ContextServerConfiguration::Http { url, headers: auth })
}
}
}
}
@@ -390,7 +403,7 @@ impl ContextServerStore {
let configuration = state.configuration();
self.stop_server(&state.server().id(), cx)?;
let new_server = self.create_context_server(id.clone(), configuration.clone(), cx);
let new_server = self.create_context_server(id.clone(), configuration.clone(), cx)?;
self.run_server(new_server, configuration, cx);
}
Ok(())
@@ -479,33 +492,41 @@ impl ContextServerStore {
id: ContextServerId,
configuration: Arc<ContextServerConfiguration>,
cx: &mut Context<Self>,
) -> Arc<ContextServer> {
let project = self.project.upgrade();
let mut root_path = None;
if let Some(project) = project {
let project = project.read(cx);
if project.is_local() {
if let Some(path) = project.active_project_directory(cx) {
root_path = Some(path);
} else {
for worktree in self.worktree_store.read(cx).visible_worktrees(cx) {
if let Some(path) = worktree.read(cx).root_dir() {
root_path = Some(path);
break;
}
}
}
}
};
) -> Result<Arc<ContextServer>> {
if let Some(factory) = self.context_server_factory.as_ref() {
factory(id, configuration)
} else {
Arc::new(ContextServer::stdio(
id,
configuration.command().clone(),
root_path,
))
return Ok(factory(id, configuration));
}
match configuration.as_ref() {
ContextServerConfiguration::Http { url, headers } => Ok(Arc::new(ContextServer::http(
id.clone(),
url,
headers.clone(),
cx,
)?)),
_ => {
let root_path = self
.project
.read_with(cx, |project, cx| project.active_project_directory(cx))
.ok()
.flatten()
.or_else(|| {
self.worktree_store.read_with(cx, |store, cx| {
store.visible_worktrees(cx).fold(None, |acc, item| {
if acc.is_none() {
item.read(cx).root_dir()
} else {
acc
}
})
})
});
Ok(Arc::new(ContextServer::stdio(
id,
configuration.command().unwrap().clone(),
root_path,
)))
}
}
}
@@ -621,14 +642,16 @@ impl ContextServerStore {
let existing_config = state.as_ref().map(|state| state.configuration());
if existing_config.as_deref() != Some(&config) || is_stopped {
let config = Arc::new(config);
let server = this.create_context_server(id.clone(), config.clone(), cx);
let server = this.create_context_server(id.clone(), config.clone(), cx)?;
servers_to_start.push((server, config));
if this.servers.contains_key(&id) {
servers_to_stop.insert(id);
}
}
}
})?;
anyhow::Ok(())
})??;
this.update(cx, |this, cx| {
for id in servers_to_stop {
@@ -1228,6 +1251,62 @@ mod tests {
});
}
#[gpui::test]
async fn test_remote_context_server(cx: &mut TestAppContext) {
const SERVER_ID: &str = "remote-server";
let server_id = ContextServerId(SERVER_ID.into());
let server_url = "http://example.com/api";
let (_fs, project) = setup_context_server_test(
cx,
json!({ "code.rs": "" }),
vec![(
SERVER_ID.into(),
ContextServerSettings::Http {
enabled: true,
url: server_url.to_string(),
headers: Default::default(),
},
)],
)
.await;
let executor = cx.executor();
let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
let store = cx.new(|cx| {
ContextServerStore::test_maintain_server_loop(
Box::new(move |id, config| {
assert_eq!(id.0.as_ref(), SERVER_ID);
match config.as_ref() {
ContextServerConfiguration::Http { url, headers } => {
assert_eq!(url.as_str(), server_url);
assert!(headers.is_empty());
}
_ => panic!("Expected remote configuration"),
}
Arc::new(ContextServer::new(
id.clone(),
Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
))
}),
registry.clone(),
project.read(cx).worktree_store(),
project.downgrade(),
cx,
)
});
let _server_events = assert_server_events(
&store,
vec![
(server_id.clone(), ContextServerStatus::Starting),
(server_id.clone(), ContextServerStatus::Running),
],
cx,
);
cx.run_until_parked();
}
struct ServerEvents {
received_event_count: Rc<RefCell<usize>>,
expected_event_count: usize,

View File

@@ -135,6 +135,16 @@ pub enum ContextServerSettings {
/// are supported.
settings: serde_json::Value,
},
Http {
/// Whether the context server is enabled.
#[serde(default = "default_true")]
enabled: bool,
/// The URL of the remote context server.
url: String,
/// Optional authentication configuration for the remote server.
#[serde(skip_serializing_if = "HashMap::is_empty", default)]
headers: HashMap<String, String>,
},
}
impl From<settings::ContextServerSettingsContent> for ContextServerSettings {
@@ -146,6 +156,15 @@ impl From<settings::ContextServerSettingsContent> for ContextServerSettings {
settings::ContextServerSettingsContent::Extension { enabled, settings } => {
ContextServerSettings::Extension { enabled, settings }
}
settings::ContextServerSettingsContent::Http {
enabled,
url,
headers,
} => ContextServerSettings::Http {
enabled,
url,
headers,
},
}
}
}
@@ -158,6 +177,15 @@ impl Into<settings::ContextServerSettingsContent> for ContextServerSettings {
ContextServerSettings::Extension { enabled, settings } => {
settings::ContextServerSettingsContent::Extension { enabled, settings }
}
ContextServerSettings::Http {
enabled,
url,
headers,
} => settings::ContextServerSettingsContent::Http {
enabled,
url,
headers,
},
}
}
}
@@ -174,6 +202,7 @@ impl ContextServerSettings {
match self {
ContextServerSettings::Custom { enabled, .. } => *enabled,
ContextServerSettings::Extension { enabled, .. } => *enabled,
ContextServerSettings::Http { enabled, .. } => *enabled,
}
}
@@ -181,6 +210,7 @@ impl ContextServerSettings {
match self {
ContextServerSettings::Custom { enabled: e, .. } => *e = enabled,
ContextServerSettings::Extension { enabled: e, .. } => *e = enabled,
ContextServerSettings::Http { enabled: e, .. } => *e = enabled,
}
}
}

View File

@@ -196,7 +196,7 @@ pub struct SessionSettingsContent {
}
#[derive(Deserialize, Serialize, Clone, PartialEq, Eq, JsonSchema, MergeFrom, Debug)]
#[serde(tag = "source", rename_all = "snake_case")]
#[serde(untagged, rename_all = "snake_case")]
pub enum ContextServerSettingsContent {
Custom {
/// Whether the context server is enabled.
@@ -206,6 +206,16 @@ pub enum ContextServerSettingsContent {
#[serde(flatten)]
command: ContextServerCommand,
},
Http {
/// Whether the context server is enabled.
#[serde(default = "default_true")]
enabled: bool,
/// The URL of the remote context server.
url: String,
/// Optional headers to send.
#[serde(skip_serializing_if = "HashMap::is_empty", default)]
headers: HashMap<String, String>,
},
Extension {
/// Whether the context server is enabled.
#[serde(default = "default_true")]
@@ -217,19 +227,24 @@ pub enum ContextServerSettingsContent {
settings: serde_json::Value,
},
}
impl ContextServerSettingsContent {
pub fn set_enabled(&mut self, enabled: bool) {
match self {
ContextServerSettingsContent::Custom {
enabled: custom_enabled,
command: _,
..
} => {
*custom_enabled = enabled;
}
ContextServerSettingsContent::Extension {
enabled: ext_enabled,
settings: _,
..
} => *ext_enabled = enabled,
ContextServerSettingsContent::Http {
enabled: remote_enabled,
..
} => *remote_enabled = enabled,
}
}
}