Compare commits
21 Commits
mcp-auth
...
feature/co
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b5cc703c7c | ||
|
|
46aaa03ace | ||
|
|
f766fa2e9f | ||
|
|
977666f8bf | ||
|
|
5ca2645da6 | ||
|
|
411669f763 | ||
|
|
7211a672b6 | ||
|
|
6a9a891985 | ||
|
|
6526936f96 | ||
|
|
55ddcba3cb | ||
|
|
4e56e1b8b7 | ||
|
|
4b895e71f7 | ||
|
|
06b18ac607 | ||
|
|
3b2647f182 | ||
|
|
f9d1cc8735 | ||
|
|
1e9689b0b2 | ||
|
|
7523fb0442 | ||
|
|
551ef4b6f3 | ||
|
|
c56ead48cb | ||
|
|
08c594966f | ||
|
|
59a9ec6fb3 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -3690,6 +3690,7 @@ dependencies = [
|
||||
"collections",
|
||||
"futures 0.3.31",
|
||||
"gpui",
|
||||
"http_client",
|
||||
"log",
|
||||
"net",
|
||||
"parking_lot",
|
||||
|
||||
@@ -247,37 +247,58 @@ impl AgentConnection for AcpConnection {
|
||||
let default_mode = self.default_mode.clone();
|
||||
let cwd = cwd.to_path_buf();
|
||||
let context_server_store = project.read(cx).context_server_store().read(cx);
|
||||
let mcp_servers = if project.read(cx).is_local() {
|
||||
context_server_store
|
||||
.configured_server_ids()
|
||||
.iter()
|
||||
.filter_map(|id| {
|
||||
let configuration = context_server_store.configuration_for_server(id)?;
|
||||
let command = configuration.command();
|
||||
Some(acp::McpServer::Stdio {
|
||||
name: id.0.to_string(),
|
||||
command: command.path.clone(),
|
||||
args: command.args.clone(),
|
||||
env: if let Some(env) = command.env.as_ref() {
|
||||
env.iter()
|
||||
.map(|(name, value)| acp::EnvVariable {
|
||||
name: name.clone(),
|
||||
value: value.clone(),
|
||||
meta: None,
|
||||
})
|
||||
.collect()
|
||||
} else {
|
||||
vec![]
|
||||
},
|
||||
let mcp_servers =
|
||||
if project.read(cx).is_local() {
|
||||
context_server_store
|
||||
.configured_server_ids()
|
||||
.iter()
|
||||
.filter_map(|id| {
|
||||
let configuration = context_server_store.configuration_for_server(id)?;
|
||||
match &*configuration {
|
||||
project::context_server_store::ContextServerConfiguration::Custom {
|
||||
command,
|
||||
..
|
||||
}
|
||||
| project::context_server_store::ContextServerConfiguration::Extension {
|
||||
command,
|
||||
..
|
||||
} => Some(acp::McpServer::Stdio {
|
||||
name: id.0.to_string(),
|
||||
command: command.path.clone(),
|
||||
args: command.args.clone(),
|
||||
env: if let Some(env) = command.env.as_ref() {
|
||||
env.iter()
|
||||
.map(|(name, value)| acp::EnvVariable {
|
||||
name: name.clone(),
|
||||
value: value.clone(),
|
||||
meta: None,
|
||||
})
|
||||
.collect()
|
||||
} else {
|
||||
vec![]
|
||||
},
|
||||
}),
|
||||
project::context_server_store::ContextServerConfiguration::Http {
|
||||
url,
|
||||
headers,
|
||||
} => Some(acp::McpServer::Http {
|
||||
name: id.0.to_string(),
|
||||
url: url.to_string(),
|
||||
headers: headers.iter().map(|(name, value)| acp::HttpHeader {
|
||||
name: name.clone(),
|
||||
value: value.clone(),
|
||||
meta: None,
|
||||
}).collect(),
|
||||
}),
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
} else {
|
||||
// In SSH projects, the external agent is running on the remote
|
||||
// machine, and currently we only run MCP servers on the local
|
||||
// machine. So don't pass any MCP servers to the agent in that case.
|
||||
Vec::new()
|
||||
};
|
||||
.collect()
|
||||
} else {
|
||||
// In SSH projects, the external agent is running on the remote
|
||||
// machine, and currently we only run MCP servers on the local
|
||||
// machine. So don't pass any MCP servers to the agent in that case.
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
cx.spawn(async move |cx| {
|
||||
let response = conn
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
mod add_llm_provider_modal;
|
||||
mod configure_context_server_modal;
|
||||
pub mod configure_context_server_modal;
|
||||
mod configure_context_server_tools_modal;
|
||||
mod manage_profiles_modal;
|
||||
mod tool_picker;
|
||||
@@ -46,10 +46,12 @@ pub(crate) use configure_context_server_modal::ConfigureContextServerModal;
|
||||
pub(crate) use configure_context_server_tools_modal::ConfigureContextServerToolsModal;
|
||||
pub(crate) use manage_profiles_modal::ManageProfilesModal;
|
||||
|
||||
use crate::{
|
||||
AddContextServer,
|
||||
agent_configuration::add_llm_provider_modal::{AddLlmProviderModal, LlmCompatibleProvider},
|
||||
use crate::agent_configuration::add_llm_provider_modal::{
|
||||
AddLlmProviderModal, LlmCompatibleProvider,
|
||||
};
|
||||
use gpui::actions;
|
||||
|
||||
actions!(agent_ui, [AddRemoteContextServer]);
|
||||
|
||||
pub struct AgentConfiguration {
|
||||
fs: Arc<dyn Fs>,
|
||||
@@ -553,7 +555,14 @@ impl AgentConfiguration {
|
||||
move |window, cx| {
|
||||
Some(ContextMenu::build(window, cx, |menu, _window, _cx| {
|
||||
menu.entry("Add Custom Server", None, {
|
||||
|window, cx| window.dispatch_action(AddContextServer.boxed_clone(), cx)
|
||||
|window, cx| {
|
||||
window.dispatch_action(crate::AddContextServer.boxed_clone(), cx)
|
||||
}
|
||||
})
|
||||
.entry("Add Remote Server", None, {
|
||||
|window, cx| {
|
||||
window.dispatch_action(AddRemoteContextServer.boxed_clone(), cx)
|
||||
}
|
||||
})
|
||||
.entry("Install from Extensions", None, {
|
||||
|window, cx| {
|
||||
@@ -651,7 +660,7 @@ impl AgentConfiguration {
|
||||
let is_running = matches!(server_status, ContextServerStatus::Running);
|
||||
let item_id = SharedString::from(context_server_id.0.clone());
|
||||
// Servers without a configuration can only be provided by extensions.
|
||||
let provided_by_extension = server_configuration.is_none_or(|config| {
|
||||
let provided_by_extension = server_configuration.as_ref().is_none_or(|config| {
|
||||
matches!(
|
||||
config.as_ref(),
|
||||
ContextServerConfiguration::Extension { .. }
|
||||
@@ -707,7 +716,10 @@ impl AgentConfiguration {
|
||||
"Server is stopped.",
|
||||
),
|
||||
};
|
||||
|
||||
let is_remote = server_configuration
|
||||
.as_ref()
|
||||
.map(|config| matches!(config.as_ref(), ContextServerConfiguration::Http { .. }))
|
||||
.unwrap_or(false);
|
||||
let context_server_configuration_menu = PopoverMenu::new("context-server-config-menu")
|
||||
.trigger_with_tooltip(
|
||||
IconButton::new("context-server-config-menu", IconName::Settings)
|
||||
@@ -729,15 +741,27 @@ impl AgentConfiguration {
|
||||
let context_server_id = context_server_id.clone();
|
||||
let language_registry = language_registry.clone();
|
||||
let workspace = workspace.clone();
|
||||
let is_remote = is_remote;
|
||||
move |window, cx| {
|
||||
ConfigureContextServerModal::show_modal_for_existing_server(
|
||||
context_server_id.clone(),
|
||||
language_registry.clone(),
|
||||
workspace.clone(),
|
||||
window,
|
||||
cx,
|
||||
)
|
||||
.detach_and_log_err(cx);
|
||||
if is_remote {
|
||||
crate::agent_configuration::configure_context_server_modal::ConfigureContextServerModal::show_modal_for_existing_server(
|
||||
context_server_id.clone(),
|
||||
language_registry.clone(),
|
||||
workspace.clone(),
|
||||
window,
|
||||
cx,
|
||||
)
|
||||
.detach();
|
||||
} else {
|
||||
ConfigureContextServerModal::show_modal_for_existing_server(
|
||||
context_server_id.clone(),
|
||||
language_registry.clone(),
|
||||
workspace.clone(),
|
||||
window,
|
||||
cx,
|
||||
)
|
||||
.detach();
|
||||
}
|
||||
}
|
||||
}).when(tool_count > 0, |this| this.entry("View Tools", None, {
|
||||
let context_server_id = context_server_id.clone();
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::{
|
||||
};
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
use collections::HashMap;
|
||||
use context_server::{ContextServerCommand, ContextServerId};
|
||||
use editor::{Editor, EditorElement, EditorStyle};
|
||||
use gpui::{
|
||||
@@ -20,6 +21,7 @@ use project::{
|
||||
project_settings::{ContextServerSettings, ProjectSettings},
|
||||
worktree_store::WorktreeStore,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use settings::{Settings as _, update_settings_file};
|
||||
use theme::ThemeSettings;
|
||||
use ui::{
|
||||
@@ -33,10 +35,16 @@ use crate::AddContextServer;
|
||||
|
||||
enum ConfigurationTarget {
|
||||
New,
|
||||
NewRemote,
|
||||
Existing {
|
||||
id: ContextServerId,
|
||||
command: ContextServerCommand,
|
||||
},
|
||||
ExistingHttp {
|
||||
id: ContextServerId,
|
||||
url: String,
|
||||
headers: HashMap<String, String>,
|
||||
},
|
||||
Extension {
|
||||
id: ContextServerId,
|
||||
repository_url: Option<SharedString>,
|
||||
@@ -47,9 +55,11 @@ enum ConfigurationTarget {
|
||||
enum ConfigurationSource {
|
||||
New {
|
||||
editor: Entity<Editor>,
|
||||
is_remote: bool,
|
||||
},
|
||||
Existing {
|
||||
editor: Entity<Editor>,
|
||||
is_remote: bool,
|
||||
},
|
||||
Extension {
|
||||
id: ContextServerId,
|
||||
@@ -97,6 +107,11 @@ impl ConfigurationSource {
|
||||
match target {
|
||||
ConfigurationTarget::New => ConfigurationSource::New {
|
||||
editor: create_editor(context_server_input(None), jsonc_language, window, cx),
|
||||
is_remote: false,
|
||||
},
|
||||
ConfigurationTarget::NewRemote => ConfigurationSource::New {
|
||||
editor: create_editor(context_server_http_input(None), jsonc_language, window, cx),
|
||||
is_remote: true,
|
||||
},
|
||||
ConfigurationTarget::Existing { id, command } => ConfigurationSource::Existing {
|
||||
editor: create_editor(
|
||||
@@ -105,6 +120,20 @@ impl ConfigurationSource {
|
||||
window,
|
||||
cx,
|
||||
),
|
||||
is_remote: false,
|
||||
},
|
||||
ConfigurationTarget::ExistingHttp {
|
||||
id,
|
||||
url,
|
||||
headers: auth,
|
||||
} => ConfigurationSource::Existing {
|
||||
editor: create_editor(
|
||||
context_server_http_input(Some((id, url, auth))),
|
||||
jsonc_language,
|
||||
window,
|
||||
cx,
|
||||
),
|
||||
is_remote: true,
|
||||
},
|
||||
ConfigurationTarget::Extension {
|
||||
id,
|
||||
@@ -141,16 +170,30 @@ impl ConfigurationSource {
|
||||
|
||||
fn output(&self, cx: &mut App) -> Result<(ContextServerId, ContextServerSettings)> {
|
||||
match self {
|
||||
ConfigurationSource::New { editor } | ConfigurationSource::Existing { editor } => {
|
||||
parse_input(&editor.read(cx).text(cx)).map(|(id, command)| {
|
||||
(
|
||||
id,
|
||||
ContextServerSettings::Custom {
|
||||
enabled: true,
|
||||
command,
|
||||
},
|
||||
)
|
||||
})
|
||||
ConfigurationSource::New { editor, is_remote }
|
||||
| ConfigurationSource::Existing { editor, is_remote } => {
|
||||
if *is_remote {
|
||||
parse_http_input(&editor.read(cx).text(cx)).map(|(id, url, auth)| {
|
||||
(
|
||||
id,
|
||||
ContextServerSettings::Http {
|
||||
enabled: true,
|
||||
url,
|
||||
headers: auth,
|
||||
},
|
||||
)
|
||||
})
|
||||
} else {
|
||||
parse_input(&editor.read(cx).text(cx)).map(|(id, command)| {
|
||||
(
|
||||
id,
|
||||
ContextServerSettings::Custom {
|
||||
enabled: true,
|
||||
command,
|
||||
},
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
ConfigurationSource::Extension {
|
||||
id,
|
||||
@@ -212,6 +255,66 @@ fn context_server_input(existing: Option<(ContextServerId, ContextServerCommand)
|
||||
)
|
||||
}
|
||||
|
||||
fn context_server_http_input(
|
||||
existing: Option<(ContextServerId, String, HashMap<String, String>)>,
|
||||
) -> String {
|
||||
let (name, url, headers) = match existing {
|
||||
Some((id, url, headers)) => {
|
||||
let header = if headers.is_empty() {
|
||||
r#"// "Authorization": "Bearer <token>"#.to_string()
|
||||
} else {
|
||||
let json = serde_json::to_string_pretty(&headers).unwrap();
|
||||
let mut lines = json.split("\n").collect::<Vec<_>>();
|
||||
if lines.len() > 1 {
|
||||
lines.remove(0);
|
||||
lines.pop();
|
||||
}
|
||||
lines
|
||||
.into_iter()
|
||||
.map(|line| format!(" {}", line.to_string()))
|
||||
.collect::<String>()
|
||||
};
|
||||
(id.0.to_string(), url, header)
|
||||
}
|
||||
None => (
|
||||
"some-remote-server".to_string(),
|
||||
"https://example.com/mcp".to_string(),
|
||||
r#"// "Authorization": "Bearer <token>"#.to_string(),
|
||||
),
|
||||
};
|
||||
|
||||
format!(
|
||||
r#"{{
|
||||
/// The name of your remote MCP server
|
||||
"{name}": {{
|
||||
/// The URL of the remote MCP server
|
||||
"url": "{url}"
|
||||
"headers": {{
|
||||
/// Any headers to send along
|
||||
{headers}
|
||||
}}
|
||||
}}
|
||||
}}"#
|
||||
)
|
||||
}
|
||||
|
||||
fn parse_http_input(text: &str) -> Result<(ContextServerId, String, HashMap<String, String>)> {
|
||||
#[derive(Deserialize)]
|
||||
struct Temp {
|
||||
url: String,
|
||||
#[serde(default)]
|
||||
headers: HashMap<String, String>,
|
||||
}
|
||||
let value: HashMap<String, Temp> = serde_json_lenient::from_str(text)?;
|
||||
if value.len() != 1 {
|
||||
anyhow::bail!("Expected exactly one context server configuration");
|
||||
}
|
||||
|
||||
let (key, value) = value.into_iter().next().unwrap();
|
||||
|
||||
Ok((ContextServerId(key.into()), value.url, value.headers))
|
||||
}
|
||||
|
||||
fn resolve_context_server_extension(
|
||||
id: ContextServerId,
|
||||
worktree_store: Entity<WorktreeStore>,
|
||||
@@ -257,6 +360,20 @@ pub struct ConfigureContextServerModal {
|
||||
}
|
||||
|
||||
impl ConfigureContextServerModal {
|
||||
pub fn new_remote_server(
|
||||
project: Entity<project::Project>,
|
||||
workspace: WeakEntity<Workspace>,
|
||||
window: &mut Window,
|
||||
cx: &mut App,
|
||||
) -> Task<Result<()>> {
|
||||
let target = ConfigurationTarget::NewRemote;
|
||||
let language_registry = project.read(cx).languages().clone();
|
||||
|
||||
window.spawn(cx, async move |mut cx| {
|
||||
Self::show_modal(target, language_registry, workspace, &mut cx).await
|
||||
})
|
||||
}
|
||||
|
||||
pub fn register(
|
||||
workspace: &mut Workspace,
|
||||
language_registry: Arc<LanguageRegistry>,
|
||||
@@ -312,6 +429,15 @@ impl ConfigureContextServerModal {
|
||||
id: server_id,
|
||||
command,
|
||||
}),
|
||||
ContextServerSettings::Http {
|
||||
enabled: _,
|
||||
url,
|
||||
headers,
|
||||
} => Some(ConfigurationTarget::ExistingHttp {
|
||||
id: server_id,
|
||||
url,
|
||||
headers,
|
||||
}),
|
||||
ContextServerSettings::Extension { .. } => {
|
||||
match workspace
|
||||
.update(cx, |workspace, cx| {
|
||||
@@ -353,8 +479,9 @@ impl ConfigureContextServerModal {
|
||||
state: State::Idle,
|
||||
original_server_id: match &target {
|
||||
ConfigurationTarget::Existing { id, .. } => Some(id.clone()),
|
||||
ConfigurationTarget::ExistingHttp { id, .. } => Some(id.clone()),
|
||||
ConfigurationTarget::Extension { id, .. } => Some(id.clone()),
|
||||
ConfigurationTarget::New => None,
|
||||
ConfigurationTarget::New | ConfigurationTarget::NewRemote => None,
|
||||
},
|
||||
source: ConfigurationSource::from_target(
|
||||
target,
|
||||
@@ -481,7 +608,7 @@ impl ModalView for ConfigureContextServerModal {}
|
||||
impl Focusable for ConfigureContextServerModal {
|
||||
fn focus_handle(&self, cx: &App) -> FocusHandle {
|
||||
match &self.source {
|
||||
ConfigurationSource::New { editor } => editor.focus_handle(cx),
|
||||
ConfigurationSource::New { editor, .. } => editor.focus_handle(cx),
|
||||
ConfigurationSource::Existing { editor, .. } => editor.focus_handle(cx),
|
||||
ConfigurationSource::Extension { editor, .. } => editor
|
||||
.as_ref()
|
||||
@@ -527,9 +654,10 @@ impl ConfigureContextServerModal {
|
||||
}
|
||||
|
||||
fn render_modal_content(&self, cx: &App) -> AnyElement {
|
||||
// All variants now use single editor approach
|
||||
let editor = match &self.source {
|
||||
ConfigurationSource::New { editor } => editor,
|
||||
ConfigurationSource::Existing { editor } => editor,
|
||||
ConfigurationSource::New { editor, .. } => editor,
|
||||
ConfigurationSource::Existing { editor, .. } => editor,
|
||||
ConfigurationSource::Extension { editor, .. } => {
|
||||
let Some(editor) = editor else {
|
||||
return div().into_any_element();
|
||||
|
||||
@@ -45,7 +45,9 @@ use serde::{Deserialize, Serialize};
|
||||
use settings::{LanguageModelSelection, Settings as _, SettingsStore};
|
||||
use std::any::TypeId;
|
||||
|
||||
use crate::agent_configuration::{ConfigureContextServerModal, ManageProfilesModal};
|
||||
use crate::agent_configuration::{
|
||||
AddRemoteContextServer, ConfigureContextServerModal, ManageProfilesModal,
|
||||
};
|
||||
pub use crate::agent_panel::{AgentPanel, ConcreteAssistantPanelDelegate};
|
||||
pub use crate::inline_assistant::InlineAssistant;
|
||||
pub use agent_diff::{AgentDiffPane, AgentDiffToolbar};
|
||||
@@ -274,6 +276,19 @@ pub fn init(
|
||||
ConfigureContextServerModal::register(workspace, language_registry.clone(), window, cx)
|
||||
})
|
||||
.detach();
|
||||
cx.observe_new(move |workspace: &mut workspace::Workspace, _window, _| {
|
||||
workspace.register_action({
|
||||
move |workspace, _: &AddRemoteContextServer, window, cx| {
|
||||
crate::agent_configuration::configure_context_server_modal::ConfigureContextServerModal::new_remote_server(
|
||||
workspace.project().clone(),
|
||||
workspace.weak_handle(),
|
||||
window,
|
||||
cx,
|
||||
).detach();
|
||||
}
|
||||
});
|
||||
})
|
||||
.detach();
|
||||
cx.observe_new(ManageProfilesModal::register).detach();
|
||||
|
||||
// Update command palette filter based on AI settings
|
||||
|
||||
@@ -12,7 +12,7 @@ workspace = true
|
||||
path = "src/context_server.rs"
|
||||
|
||||
[features]
|
||||
test-support = []
|
||||
test-support = ["gpui/test-support"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
@@ -20,6 +20,7 @@ async-trait.workspace = true
|
||||
collections.workspace = true
|
||||
futures.workspace = true
|
||||
gpui.workspace = true
|
||||
http_client = { workspace = true, features = ["test-support"] }
|
||||
log.workspace = true
|
||||
net.workspace = true
|
||||
parking_lot.workspace = true
|
||||
@@ -32,3 +33,6 @@ smol.workspace = true
|
||||
tempfile.workspace = true
|
||||
url = { workspace = true, features = ["serde"] }
|
||||
util.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
gpui = { workspace = true, features = ["test-support"] }
|
||||
|
||||
@@ -6,15 +6,19 @@ pub mod test;
|
||||
pub mod transport;
|
||||
pub mod types;
|
||||
|
||||
use collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::{fmt::Display, path::PathBuf};
|
||||
|
||||
use anyhow::Result;
|
||||
use client::Client;
|
||||
use gpui::AsyncApp;
|
||||
use gpui::{App, AsyncApp};
|
||||
use parking_lot::RwLock;
|
||||
pub use settings::ContextServerCommand;
|
||||
use url::Url;
|
||||
|
||||
use crate::transport::HttpTransport;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct ContextServerId(pub Arc<str>);
|
||||
@@ -52,6 +56,25 @@ impl ContextServer {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn http(
|
||||
id: ContextServerId,
|
||||
endpoint: &Url,
|
||||
headers: HashMap<String, String>,
|
||||
cx: &App,
|
||||
) -> Result<Self> {
|
||||
let http_client = cx.http_client();
|
||||
|
||||
let transport = match endpoint.scheme() {
|
||||
"http" | "https" => {
|
||||
log::info!("Using HTTP transport for {}", endpoint);
|
||||
let transport = HttpTransport::new(http_client, endpoint.to_string(), headers, cx);
|
||||
Arc::new(transport) as _
|
||||
}
|
||||
_ => anyhow::bail!("unsupported MCP url scheme {}", endpoint.scheme()),
|
||||
};
|
||||
Ok(Self::new(id, transport))
|
||||
}
|
||||
|
||||
pub fn new(id: ContextServerId, transport: Arc<dyn crate::transport::Transport>) -> Self {
|
||||
Self {
|
||||
id,
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
pub mod http;
|
||||
mod stdio_transport;
|
||||
|
||||
use std::pin::Pin;
|
||||
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use futures::Stream;
|
||||
use std::pin::Pin;
|
||||
|
||||
pub use http::*;
|
||||
pub use stdio_transport::*;
|
||||
|
||||
#[async_trait]
|
||||
|
||||
271
crates/context_server/src/transport/http.rs
Normal file
271
crates/context_server/src/transport/http.rs
Normal file
@@ -0,0 +1,271 @@
|
||||
use anyhow::{Result, anyhow};
|
||||
use async_trait::async_trait;
|
||||
use collections::HashMap;
|
||||
use futures::{Stream, StreamExt};
|
||||
use gpui::{App, BackgroundExecutor};
|
||||
use http_client::{AsyncBody, HttpClient, Request, Response, http::Method};
|
||||
use parking_lot::Mutex as SyncMutex;
|
||||
use smol::channel;
|
||||
use std::{pin::Pin, sync::Arc};
|
||||
|
||||
use crate::transport::Transport;
|
||||
|
||||
// Constants from MCP spec
|
||||
const HEADER_SESSION_ID: &str = "Mcp-Session-Id";
|
||||
const EVENT_STREAM_MIME_TYPE: &str = "text/event-stream";
|
||||
const JSON_MIME_TYPE: &str = "application/json";
|
||||
|
||||
/// HTTP Transport with session management and SSE support
|
||||
pub struct HttpTransport {
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
endpoint: String,
|
||||
session_id: Arc<SyncMutex<Option<String>>>,
|
||||
executor: BackgroundExecutor,
|
||||
response_tx: channel::Sender<String>,
|
||||
response_rx: channel::Receiver<String>,
|
||||
error_tx: channel::Sender<String>,
|
||||
error_rx: channel::Receiver<String>,
|
||||
// Track if we've sent the initialize response
|
||||
initialized: Arc<SyncMutex<bool>>,
|
||||
// Authentication headers to include in requests
|
||||
headers: HashMap<String, String>,
|
||||
}
|
||||
|
||||
impl HttpTransport {
|
||||
pub fn new(
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
endpoint: String,
|
||||
headers: HashMap<String, String>,
|
||||
cx: &App,
|
||||
) -> Self {
|
||||
let (response_tx, response_rx) = channel::unbounded();
|
||||
let (error_tx, error_rx) = channel::unbounded();
|
||||
|
||||
Self {
|
||||
http_client,
|
||||
executor: cx.background_executor().clone(),
|
||||
endpoint,
|
||||
session_id: Arc::new(SyncMutex::new(None)),
|
||||
response_tx,
|
||||
response_rx,
|
||||
error_tx,
|
||||
error_rx,
|
||||
initialized: Arc::new(SyncMutex::new(false)),
|
||||
headers,
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a message and handle the response based on content type
|
||||
async fn send_message(&self, message: String) -> Result<()> {
|
||||
// Check if this is an initialize request
|
||||
let is_initialize = message.contains("\"method\":\"initialize\"");
|
||||
let is_notification =
|
||||
!message.contains("\"id\":") || message.contains("notifications/initialized");
|
||||
|
||||
let mut request_builder = Request::builder()
|
||||
.method(Method::POST)
|
||||
.uri(&self.endpoint)
|
||||
.header("Content-Type", JSON_MIME_TYPE)
|
||||
.header(
|
||||
"Accept",
|
||||
format!("{}, {}", JSON_MIME_TYPE, EVENT_STREAM_MIME_TYPE),
|
||||
);
|
||||
|
||||
for (key, value) in &self.headers {
|
||||
request_builder = request_builder.header(key.as_str(), value.as_str());
|
||||
}
|
||||
|
||||
// Add session ID if we have one (except for initialize)
|
||||
if !is_initialize {
|
||||
if let Some(ref session_id) = *self.session_id.lock() {
|
||||
request_builder = request_builder.header(HEADER_SESSION_ID, session_id.as_str());
|
||||
}
|
||||
}
|
||||
|
||||
let request = request_builder.body(AsyncBody::from(message.into_bytes()))?;
|
||||
let mut response = self.http_client.send(request).await?;
|
||||
|
||||
// Handle different response types based on status and content-type
|
||||
match response.status() {
|
||||
status if status.is_success() => {
|
||||
// Check content type
|
||||
let content_type = response
|
||||
.headers()
|
||||
.get("content-type")
|
||||
.and_then(|v| v.to_str().ok());
|
||||
|
||||
// Extract session ID from response headers if present
|
||||
if let Some(session_id) = response
|
||||
.headers()
|
||||
.get(HEADER_SESSION_ID)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
{
|
||||
*self.session_id.lock() = Some(session_id.to_string());
|
||||
log::debug!("Session ID set: {}", session_id);
|
||||
}
|
||||
|
||||
match content_type {
|
||||
Some(ct) if ct.starts_with(JSON_MIME_TYPE) => {
|
||||
// JSON response - read and forward immediately
|
||||
let mut body = String::new();
|
||||
futures::AsyncReadExt::read_to_string(response.body_mut(), &mut body)
|
||||
.await?;
|
||||
|
||||
// Only send non-empty responses
|
||||
if !body.is_empty() {
|
||||
self.response_tx
|
||||
.send(body)
|
||||
.await
|
||||
.map_err(|_| anyhow!("Failed to send JSON response"))?;
|
||||
}
|
||||
}
|
||||
Some(ct) if ct.starts_with(EVENT_STREAM_MIME_TYPE) => {
|
||||
// SSE stream - set up streaming
|
||||
self.setup_sse_stream(response).await?;
|
||||
|
||||
// Mark as initialized after setting up the first SSE stream
|
||||
if is_initialize {
|
||||
*self.initialized.lock() = true;
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
// For notifications, 202 Accepted with no content type is ok
|
||||
if is_notification && status.as_u16() == 202 {
|
||||
log::debug!("Notification accepted");
|
||||
} else {
|
||||
return Err(anyhow!("Unexpected content type: {:?}", content_type));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
status if status.as_u16() == 202 => {
|
||||
// Accepted - notification acknowledged, no response needed
|
||||
log::debug!("Notification accepted");
|
||||
}
|
||||
_ => {
|
||||
let mut error_body = String::new();
|
||||
futures::AsyncReadExt::read_to_string(response.body_mut(), &mut error_body).await?;
|
||||
|
||||
self.error_tx
|
||||
.send(format!("HTTP {}: {}", response.status(), error_body))
|
||||
.await
|
||||
.map_err(|_| anyhow!("Failed to send error"))?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set up SSE streaming from the response
|
||||
async fn setup_sse_stream(&self, mut response: Response<AsyncBody>) -> Result<()> {
|
||||
let response_tx = self.response_tx.clone();
|
||||
let error_tx = self.error_tx.clone();
|
||||
|
||||
// Spawn a task to handle the SSE stream
|
||||
smol::spawn(async move {
|
||||
let reader = futures::io::BufReader::new(response.body_mut());
|
||||
let mut lines = futures::AsyncBufReadExt::lines(reader);
|
||||
|
||||
let mut data_buffer = Vec::new();
|
||||
let mut in_message = false;
|
||||
|
||||
while let Some(line_result) = lines.next().await {
|
||||
match line_result {
|
||||
Ok(line) => {
|
||||
if line.is_empty() {
|
||||
// Empty line signals end of event
|
||||
if !data_buffer.is_empty() {
|
||||
let message = data_buffer.join("\n");
|
||||
|
||||
// Filter out ping messages and empty data
|
||||
if !message.trim().is_empty() && message != "ping" {
|
||||
if let Err(e) = response_tx.send(message).await {
|
||||
log::error!("Failed to send SSE message: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
data_buffer.clear();
|
||||
}
|
||||
in_message = false;
|
||||
} else if let Some(data) = line.strip_prefix("data: ") {
|
||||
// Handle data lines
|
||||
let data = data.trim();
|
||||
if !data.is_empty() {
|
||||
// Check if this is a ping message
|
||||
if data == "ping" {
|
||||
log::trace!("Received SSE ping");
|
||||
continue;
|
||||
}
|
||||
data_buffer.push(data.to_string());
|
||||
in_message = true;
|
||||
}
|
||||
} else if line.starts_with("event:")
|
||||
|| line.starts_with("id:")
|
||||
|| line.starts_with("retry:")
|
||||
{
|
||||
// Ignore other SSE fields
|
||||
continue;
|
||||
} else if in_message {
|
||||
// Continuation of data
|
||||
data_buffer.push(line);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = error_tx.send(format!("SSE stream error: {}", e)).await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Transport for HttpTransport {
|
||||
async fn send(&self, message: String) -> Result<()> {
|
||||
self.send_message(message).await
|
||||
}
|
||||
|
||||
fn receive(&self) -> Pin<Box<dyn Stream<Item = String> + Send>> {
|
||||
Box::pin(self.response_rx.clone())
|
||||
}
|
||||
|
||||
fn receive_err(&self) -> Pin<Box<dyn Stream<Item = String> + Send>> {
|
||||
Box::pin(self.error_rx.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for HttpTransport {
|
||||
fn drop(&mut self) {
|
||||
// Try to cleanup session on drop
|
||||
let http_client = self.http_client.clone();
|
||||
let endpoint = self.endpoint.clone();
|
||||
let session_id = self.session_id.lock().clone();
|
||||
let headers = self.headers.clone();
|
||||
|
||||
if let Some(session_id) = session_id {
|
||||
self.executor
|
||||
.spawn(async move {
|
||||
let mut request_builder = Request::builder()
|
||||
.method(Method::DELETE)
|
||||
.uri(&endpoint)
|
||||
.header(HEADER_SESSION_ID, &session_id);
|
||||
|
||||
// Add authentication headers if present
|
||||
for (key, value) in headers {
|
||||
request_builder = request_builder.header(key.as_str(), value.as_str());
|
||||
}
|
||||
|
||||
let request = request_builder.body(AsyncBody::empty());
|
||||
|
||||
if let Ok(request) = request {
|
||||
let _ = http_client.send(request).await;
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -990,6 +990,9 @@ impl ExtensionImports for WasmState {
|
||||
command: None,
|
||||
settings: Some(settings),
|
||||
})?),
|
||||
project::project_settings::ContextServerSettings::Http { .. } => {
|
||||
bail!("remote context server settings not supported in 0.6.0")
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
|
||||
@@ -99,13 +99,18 @@ pub enum ContextServerConfiguration {
|
||||
command: ContextServerCommand,
|
||||
settings: serde_json::Value,
|
||||
},
|
||||
Http {
|
||||
url: url::Url,
|
||||
headers: HashMap<String, String>,
|
||||
},
|
||||
}
|
||||
|
||||
impl ContextServerConfiguration {
|
||||
pub fn command(&self) -> &ContextServerCommand {
|
||||
pub fn command(&self) -> Option<&ContextServerCommand> {
|
||||
match self {
|
||||
ContextServerConfiguration::Custom { command } => command,
|
||||
ContextServerConfiguration::Extension { command, .. } => command,
|
||||
ContextServerConfiguration::Custom { command } => Some(command),
|
||||
ContextServerConfiguration::Extension { command, .. } => Some(command),
|
||||
ContextServerConfiguration::Http { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,6 +147,14 @@ impl ContextServerConfiguration {
|
||||
}
|
||||
}
|
||||
}
|
||||
ContextServerSettings::Http {
|
||||
enabled: _,
|
||||
url,
|
||||
headers: auth,
|
||||
} => {
|
||||
let url = url::Url::parse(&url).log_err()?;
|
||||
Some(ContextServerConfiguration::Http { url, headers: auth })
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -390,7 +403,7 @@ impl ContextServerStore {
|
||||
let configuration = state.configuration();
|
||||
|
||||
self.stop_server(&state.server().id(), cx)?;
|
||||
let new_server = self.create_context_server(id.clone(), configuration.clone(), cx);
|
||||
let new_server = self.create_context_server(id.clone(), configuration.clone(), cx)?;
|
||||
self.run_server(new_server, configuration, cx);
|
||||
}
|
||||
Ok(())
|
||||
@@ -479,33 +492,41 @@ impl ContextServerStore {
|
||||
id: ContextServerId,
|
||||
configuration: Arc<ContextServerConfiguration>,
|
||||
cx: &mut Context<Self>,
|
||||
) -> Arc<ContextServer> {
|
||||
let project = self.project.upgrade();
|
||||
let mut root_path = None;
|
||||
if let Some(project) = project {
|
||||
let project = project.read(cx);
|
||||
if project.is_local() {
|
||||
if let Some(path) = project.active_project_directory(cx) {
|
||||
root_path = Some(path);
|
||||
} else {
|
||||
for worktree in self.worktree_store.read(cx).visible_worktrees(cx) {
|
||||
if let Some(path) = worktree.read(cx).root_dir() {
|
||||
root_path = Some(path);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
) -> Result<Arc<ContextServer>> {
|
||||
if let Some(factory) = self.context_server_factory.as_ref() {
|
||||
factory(id, configuration)
|
||||
} else {
|
||||
Arc::new(ContextServer::stdio(
|
||||
id,
|
||||
configuration.command().clone(),
|
||||
root_path,
|
||||
))
|
||||
return Ok(factory(id, configuration));
|
||||
}
|
||||
|
||||
match configuration.as_ref() {
|
||||
ContextServerConfiguration::Http { url, headers } => Ok(Arc::new(ContextServer::http(
|
||||
id.clone(),
|
||||
url,
|
||||
headers.clone(),
|
||||
cx,
|
||||
)?)),
|
||||
_ => {
|
||||
let root_path = self
|
||||
.project
|
||||
.read_with(cx, |project, cx| project.active_project_directory(cx))
|
||||
.ok()
|
||||
.flatten()
|
||||
.or_else(|| {
|
||||
self.worktree_store.read_with(cx, |store, cx| {
|
||||
store.visible_worktrees(cx).fold(None, |acc, item| {
|
||||
if acc.is_none() {
|
||||
item.read(cx).root_dir()
|
||||
} else {
|
||||
acc
|
||||
}
|
||||
})
|
||||
})
|
||||
});
|
||||
Ok(Arc::new(ContextServer::stdio(
|
||||
id,
|
||||
configuration.command().unwrap().clone(),
|
||||
root_path,
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -621,14 +642,16 @@ impl ContextServerStore {
|
||||
let existing_config = state.as_ref().map(|state| state.configuration());
|
||||
if existing_config.as_deref() != Some(&config) || is_stopped {
|
||||
let config = Arc::new(config);
|
||||
let server = this.create_context_server(id.clone(), config.clone(), cx);
|
||||
let server = this.create_context_server(id.clone(), config.clone(), cx)?;
|
||||
servers_to_start.push((server, config));
|
||||
if this.servers.contains_key(&id) {
|
||||
servers_to_stop.insert(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
})?;
|
||||
|
||||
anyhow::Ok(())
|
||||
})??;
|
||||
|
||||
this.update(cx, |this, cx| {
|
||||
for id in servers_to_stop {
|
||||
@@ -1228,6 +1251,62 @@ mod tests {
|
||||
});
|
||||
}
|
||||
|
||||
#[gpui::test]
|
||||
async fn test_remote_context_server(cx: &mut TestAppContext) {
|
||||
const SERVER_ID: &str = "remote-server";
|
||||
let server_id = ContextServerId(SERVER_ID.into());
|
||||
let server_url = "http://example.com/api";
|
||||
|
||||
let (_fs, project) = setup_context_server_test(
|
||||
cx,
|
||||
json!({ "code.rs": "" }),
|
||||
vec![(
|
||||
SERVER_ID.into(),
|
||||
ContextServerSettings::Http {
|
||||
enabled: true,
|
||||
url: server_url.to_string(),
|
||||
headers: Default::default(),
|
||||
},
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
|
||||
let executor = cx.executor();
|
||||
let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
|
||||
let store = cx.new(|cx| {
|
||||
ContextServerStore::test_maintain_server_loop(
|
||||
Box::new(move |id, config| {
|
||||
assert_eq!(id.0.as_ref(), SERVER_ID);
|
||||
match config.as_ref() {
|
||||
ContextServerConfiguration::Http { url, headers } => {
|
||||
assert_eq!(url.as_str(), server_url);
|
||||
assert!(headers.is_empty());
|
||||
}
|
||||
_ => panic!("Expected remote configuration"),
|
||||
}
|
||||
Arc::new(ContextServer::new(
|
||||
id.clone(),
|
||||
Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
|
||||
))
|
||||
}),
|
||||
registry.clone(),
|
||||
project.read(cx).worktree_store(),
|
||||
project.downgrade(),
|
||||
cx,
|
||||
)
|
||||
});
|
||||
|
||||
let _server_events = assert_server_events(
|
||||
&store,
|
||||
vec![
|
||||
(server_id.clone(), ContextServerStatus::Starting),
|
||||
(server_id.clone(), ContextServerStatus::Running),
|
||||
],
|
||||
cx,
|
||||
);
|
||||
cx.run_until_parked();
|
||||
}
|
||||
|
||||
struct ServerEvents {
|
||||
received_event_count: Rc<RefCell<usize>>,
|
||||
expected_event_count: usize,
|
||||
|
||||
@@ -135,6 +135,16 @@ pub enum ContextServerSettings {
|
||||
/// are supported.
|
||||
settings: serde_json::Value,
|
||||
},
|
||||
Http {
|
||||
/// Whether the context server is enabled.
|
||||
#[serde(default = "default_true")]
|
||||
enabled: bool,
|
||||
/// The URL of the remote context server.
|
||||
url: String,
|
||||
/// Optional authentication configuration for the remote server.
|
||||
#[serde(skip_serializing_if = "HashMap::is_empty", default)]
|
||||
headers: HashMap<String, String>,
|
||||
},
|
||||
}
|
||||
|
||||
impl From<settings::ContextServerSettingsContent> for ContextServerSettings {
|
||||
@@ -146,6 +156,15 @@ impl From<settings::ContextServerSettingsContent> for ContextServerSettings {
|
||||
settings::ContextServerSettingsContent::Extension { enabled, settings } => {
|
||||
ContextServerSettings::Extension { enabled, settings }
|
||||
}
|
||||
settings::ContextServerSettingsContent::Http {
|
||||
enabled,
|
||||
url,
|
||||
headers,
|
||||
} => ContextServerSettings::Http {
|
||||
enabled,
|
||||
url,
|
||||
headers,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -158,6 +177,15 @@ impl Into<settings::ContextServerSettingsContent> for ContextServerSettings {
|
||||
ContextServerSettings::Extension { enabled, settings } => {
|
||||
settings::ContextServerSettingsContent::Extension { enabled, settings }
|
||||
}
|
||||
ContextServerSettings::Http {
|
||||
enabled,
|
||||
url,
|
||||
headers,
|
||||
} => settings::ContextServerSettingsContent::Http {
|
||||
enabled,
|
||||
url,
|
||||
headers,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -174,6 +202,7 @@ impl ContextServerSettings {
|
||||
match self {
|
||||
ContextServerSettings::Custom { enabled, .. } => *enabled,
|
||||
ContextServerSettings::Extension { enabled, .. } => *enabled,
|
||||
ContextServerSettings::Http { enabled, .. } => *enabled,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,6 +210,7 @@ impl ContextServerSettings {
|
||||
match self {
|
||||
ContextServerSettings::Custom { enabled: e, .. } => *e = enabled,
|
||||
ContextServerSettings::Extension { enabled: e, .. } => *e = enabled,
|
||||
ContextServerSettings::Http { enabled: e, .. } => *e = enabled,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -196,7 +196,7 @@ pub struct SessionSettingsContent {
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, PartialEq, Eq, JsonSchema, MergeFrom, Debug)]
|
||||
#[serde(tag = "source", rename_all = "snake_case")]
|
||||
#[serde(untagged, rename_all = "snake_case")]
|
||||
pub enum ContextServerSettingsContent {
|
||||
Custom {
|
||||
/// Whether the context server is enabled.
|
||||
@@ -206,6 +206,16 @@ pub enum ContextServerSettingsContent {
|
||||
#[serde(flatten)]
|
||||
command: ContextServerCommand,
|
||||
},
|
||||
Http {
|
||||
/// Whether the context server is enabled.
|
||||
#[serde(default = "default_true")]
|
||||
enabled: bool,
|
||||
/// The URL of the remote context server.
|
||||
url: String,
|
||||
/// Optional headers to send.
|
||||
#[serde(skip_serializing_if = "HashMap::is_empty", default)]
|
||||
headers: HashMap<String, String>,
|
||||
},
|
||||
Extension {
|
||||
/// Whether the context server is enabled.
|
||||
#[serde(default = "default_true")]
|
||||
@@ -217,19 +227,24 @@ pub enum ContextServerSettingsContent {
|
||||
settings: serde_json::Value,
|
||||
},
|
||||
}
|
||||
|
||||
impl ContextServerSettingsContent {
|
||||
pub fn set_enabled(&mut self, enabled: bool) {
|
||||
match self {
|
||||
ContextServerSettingsContent::Custom {
|
||||
enabled: custom_enabled,
|
||||
command: _,
|
||||
..
|
||||
} => {
|
||||
*custom_enabled = enabled;
|
||||
}
|
||||
ContextServerSettingsContent::Extension {
|
||||
enabled: ext_enabled,
|
||||
settings: _,
|
||||
..
|
||||
} => *ext_enabled = enabled,
|
||||
ContextServerSettingsContent::Http {
|
||||
enabled: remote_enabled,
|
||||
..
|
||||
} => *remote_enabled = enabled,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user