Merge pull request #1262 from zed-industries/user-timeline
Improve user activity tracking
This commit is contained in:
@@ -36,8 +36,12 @@ pub fn routes(rpc_server: &Arc<rpc::Server>, state: Arc<AppState>) -> Router<Bod
|
||||
.route("/panic", post(trace_panic))
|
||||
.route("/rpc_server_snapshot", get(get_rpc_server_snapshot))
|
||||
.route(
|
||||
"/project_activity_summary",
|
||||
get(get_project_activity_summary),
|
||||
"/user_activity/summary",
|
||||
get(get_top_users_activity_summary),
|
||||
)
|
||||
.route(
|
||||
"/user_activity/timeline/:user_id",
|
||||
get(get_user_activity_timeline),
|
||||
)
|
||||
.route("/project_metadata", get(get_project_metadata))
|
||||
.layer(
|
||||
@@ -264,20 +268,32 @@ async fn get_rpc_server_snapshot(
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct GetProjectActivityParams {
|
||||
struct TimePeriodParams {
|
||||
#[serde(with = "time::serde::iso8601")]
|
||||
start: OffsetDateTime,
|
||||
#[serde(with = "time::serde::iso8601")]
|
||||
end: OffsetDateTime,
|
||||
}
|
||||
|
||||
async fn get_project_activity_summary(
|
||||
Query(params): Query<GetProjectActivityParams>,
|
||||
async fn get_top_users_activity_summary(
|
||||
Query(params): Query<TimePeriodParams>,
|
||||
Extension(app): Extension<Arc<AppState>>,
|
||||
) -> Result<ErasedJson> {
|
||||
let summary = app
|
||||
.db
|
||||
.summarize_project_activity(params.start..params.end, 100)
|
||||
.get_top_users_activity_summary(params.start..params.end, 100)
|
||||
.await?;
|
||||
Ok(ErasedJson::pretty(summary))
|
||||
}
|
||||
|
||||
async fn get_user_activity_timeline(
|
||||
Path(user_id): Path<i32>,
|
||||
Query(params): Query<TimePeriodParams>,
|
||||
Extension(app): Extension<Arc<AppState>>,
|
||||
) -> Result<ErasedJson> {
|
||||
let summary = app
|
||||
.db
|
||||
.get_user_activity_timeline(params.start..params.end, UserId(user_id))
|
||||
.await?;
|
||||
Ok(ErasedJson::pretty(summary))
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::{ops::Range, time::Duration};
|
||||
use std::{cmp, ops::Range, time::Duration};
|
||||
|
||||
use crate::{Error, Result};
|
||||
use anyhow::{anyhow, Context};
|
||||
@@ -7,10 +7,10 @@ use axum::http::StatusCode;
|
||||
use collections::HashMap;
|
||||
use futures::StreamExt;
|
||||
use nanoid::nanoid;
|
||||
use serde::Serialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
pub use sqlx::postgres::PgPoolOptions as DbOptions;
|
||||
use sqlx::{types::Uuid, FromRow, QueryBuilder, Row};
|
||||
use time::OffsetDateTime;
|
||||
use time::{OffsetDateTime, PrimitiveDateTime};
|
||||
|
||||
#[async_trait]
|
||||
pub trait Db: Send + Sync {
|
||||
@@ -52,7 +52,7 @@ pub trait Db: Send + Sync {
|
||||
&self,
|
||||
project_id: ProjectId,
|
||||
worktree_id: u64,
|
||||
extensions: HashMap<String, usize>,
|
||||
extensions: HashMap<String, u32>,
|
||||
) -> Result<()>;
|
||||
|
||||
/// Get the file counts on the given project keyed by their worktree and extension.
|
||||
@@ -63,7 +63,7 @@ pub trait Db: Send + Sync {
|
||||
|
||||
/// Record which users have been active in which projects during
|
||||
/// a given period of time.
|
||||
async fn record_project_activity(
|
||||
async fn record_user_activity(
|
||||
&self,
|
||||
time_period: Range<OffsetDateTime>,
|
||||
active_projects: &[(UserId, ProjectId)],
|
||||
@@ -71,12 +71,19 @@ pub trait Db: Send + Sync {
|
||||
|
||||
/// Get the users that have been most active during the given time period,
|
||||
/// along with the amount of time they have been active in each project.
|
||||
async fn summarize_project_activity(
|
||||
async fn get_top_users_activity_summary(
|
||||
&self,
|
||||
time_period: Range<OffsetDateTime>,
|
||||
max_user_count: usize,
|
||||
) -> Result<Vec<UserActivitySummary>>;
|
||||
|
||||
/// Get the project activity for the given user and time period.
|
||||
async fn get_user_activity_timeline(
|
||||
&self,
|
||||
time_period: Range<OffsetDateTime>,
|
||||
user_id: UserId,
|
||||
) -> Result<Vec<UserActivityPeriod>>;
|
||||
|
||||
async fn get_contacts(&self, id: UserId) -> Result<Vec<Contact>>;
|
||||
async fn has_contact(&self, user_id_a: UserId, user_id_b: UserId) -> Result<bool>;
|
||||
async fn send_contact_request(&self, requester_id: UserId, responder_id: UserId) -> Result<()>;
|
||||
@@ -499,7 +506,7 @@ impl Db for PostgresDb {
|
||||
&self,
|
||||
project_id: ProjectId,
|
||||
worktree_id: u64,
|
||||
extensions: HashMap<String, usize>,
|
||||
extensions: HashMap<String, u32>,
|
||||
) -> Result<()> {
|
||||
if extensions.is_empty() {
|
||||
return Ok(());
|
||||
@@ -557,7 +564,7 @@ impl Db for PostgresDb {
|
||||
Ok(extension_counts)
|
||||
}
|
||||
|
||||
async fn record_project_activity(
|
||||
async fn record_user_activity(
|
||||
&self,
|
||||
time_period: Range<OffsetDateTime>,
|
||||
projects: &[(UserId, ProjectId)],
|
||||
@@ -586,7 +593,7 @@ impl Db for PostgresDb {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn summarize_project_activity(
|
||||
async fn get_top_users_activity_summary(
|
||||
&self,
|
||||
time_period: Range<OffsetDateTime>,
|
||||
max_user_count: usize,
|
||||
@@ -596,7 +603,7 @@ impl Db for PostgresDb {
|
||||
project_durations AS (
|
||||
SELECT user_id, project_id, SUM(duration_millis) AS project_duration
|
||||
FROM project_activity_periods
|
||||
WHERE $1 <= ended_at AND ended_at <= $2
|
||||
WHERE $1 < ended_at AND ended_at <= $2
|
||||
GROUP BY user_id, project_id
|
||||
),
|
||||
user_durations AS (
|
||||
@@ -611,7 +618,7 @@ impl Db for PostgresDb {
|
||||
WHERE
|
||||
user_durations.user_id = project_durations.user_id AND
|
||||
user_durations.user_id = users.id
|
||||
ORDER BY user_id ASC, project_duration DESC
|
||||
ORDER BY total_duration DESC, user_id ASC
|
||||
";
|
||||
|
||||
let mut rows = sqlx::query_as::<_, (UserId, String, ProjectId, i64)>(query)
|
||||
@@ -641,6 +648,91 @@ impl Db for PostgresDb {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn get_user_activity_timeline(
|
||||
&self,
|
||||
time_period: Range<OffsetDateTime>,
|
||||
user_id: UserId,
|
||||
) -> Result<Vec<UserActivityPeriod>> {
|
||||
const COALESCE_THRESHOLD: Duration = Duration::from_secs(30);
|
||||
|
||||
let query = "
|
||||
SELECT
|
||||
project_activity_periods.ended_at,
|
||||
project_activity_periods.duration_millis,
|
||||
project_activity_periods.project_id,
|
||||
worktree_extensions.extension,
|
||||
worktree_extensions.count
|
||||
FROM project_activity_periods
|
||||
LEFT OUTER JOIN
|
||||
worktree_extensions
|
||||
ON
|
||||
project_activity_periods.project_id = worktree_extensions.project_id
|
||||
WHERE
|
||||
project_activity_periods.user_id = $1 AND
|
||||
$2 < project_activity_periods.ended_at AND
|
||||
project_activity_periods.ended_at <= $3
|
||||
ORDER BY project_activity_periods.id ASC
|
||||
";
|
||||
|
||||
let mut rows = sqlx::query_as::<
|
||||
_,
|
||||
(
|
||||
PrimitiveDateTime,
|
||||
i32,
|
||||
ProjectId,
|
||||
Option<String>,
|
||||
Option<i32>,
|
||||
),
|
||||
>(query)
|
||||
.bind(user_id)
|
||||
.bind(time_period.start)
|
||||
.bind(time_period.end)
|
||||
.fetch(&self.pool);
|
||||
|
||||
let mut time_periods: HashMap<ProjectId, Vec<UserActivityPeriod>> = Default::default();
|
||||
while let Some(row) = rows.next().await {
|
||||
let (ended_at, duration_millis, project_id, extension, extension_count) = row?;
|
||||
let ended_at = ended_at.assume_utc();
|
||||
let duration = Duration::from_millis(duration_millis as u64);
|
||||
let started_at = ended_at - duration;
|
||||
let project_time_periods = time_periods.entry(project_id).or_default();
|
||||
|
||||
if let Some(prev_duration) = project_time_periods.last_mut() {
|
||||
if started_at <= prev_duration.end + COALESCE_THRESHOLD
|
||||
&& ended_at >= prev_duration.start
|
||||
{
|
||||
prev_duration.end = cmp::max(prev_duration.end, ended_at);
|
||||
} else {
|
||||
project_time_periods.push(UserActivityPeriod {
|
||||
project_id,
|
||||
start: started_at,
|
||||
end: ended_at,
|
||||
extensions: Default::default(),
|
||||
});
|
||||
}
|
||||
} else {
|
||||
project_time_periods.push(UserActivityPeriod {
|
||||
project_id,
|
||||
start: started_at,
|
||||
end: ended_at,
|
||||
extensions: Default::default(),
|
||||
});
|
||||
}
|
||||
|
||||
if let Some((extension, extension_count)) = extension.zip(extension_count) {
|
||||
project_time_periods
|
||||
.last_mut()
|
||||
.unwrap()
|
||||
.extensions
|
||||
.insert(extension, extension_count as usize);
|
||||
}
|
||||
}
|
||||
|
||||
let mut durations = time_periods.into_values().flatten().collect::<Vec<_>>();
|
||||
durations.sort_unstable_by_key(|duration| duration.start);
|
||||
Ok(durations)
|
||||
}
|
||||
|
||||
// contacts
|
||||
|
||||
async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
|
||||
@@ -1116,7 +1208,18 @@ impl Db for PostgresDb {
|
||||
macro_rules! id_type {
|
||||
($name:ident) => {
|
||||
#[derive(
|
||||
Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type, Serialize,
|
||||
Clone,
|
||||
Copy,
|
||||
Debug,
|
||||
Default,
|
||||
PartialEq,
|
||||
Eq,
|
||||
PartialOrd,
|
||||
Ord,
|
||||
Hash,
|
||||
sqlx::Type,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
)]
|
||||
#[sqlx(transparent)]
|
||||
#[serde(transparent)]
|
||||
@@ -1172,6 +1275,16 @@ pub struct UserActivitySummary {
|
||||
pub project_activity: Vec<(ProjectId, Duration)>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize)]
|
||||
pub struct UserActivityPeriod {
|
||||
project_id: ProjectId,
|
||||
#[serde(with = "time::serde::iso8601")]
|
||||
start: OffsetDateTime,
|
||||
#[serde(with = "time::serde::iso8601")]
|
||||
end: OffsetDateTime,
|
||||
extensions: HashMap<String, usize>,
|
||||
}
|
||||
|
||||
id_type!(OrgId);
|
||||
#[derive(FromRow)]
|
||||
pub struct Org {
|
||||
@@ -1439,29 +1552,36 @@ pub mod tests {
|
||||
let user_2 = db.create_user("user_2", None, false).await.unwrap();
|
||||
let user_3 = db.create_user("user_3", None, false).await.unwrap();
|
||||
let project_1 = db.register_project(user_1).await.unwrap();
|
||||
db.update_worktree_extensions(
|
||||
project_1,
|
||||
1,
|
||||
HashMap::from_iter([("rs".into(), 5), ("md".into(), 7)]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let project_2 = db.register_project(user_2).await.unwrap();
|
||||
let t0 = OffsetDateTime::now_utc() - Duration::from_secs(60 * 60);
|
||||
|
||||
// User 2 opens a project
|
||||
let t1 = t0 + Duration::from_secs(10);
|
||||
db.record_project_activity(t0..t1, &[(user_2, project_2)])
|
||||
db.record_user_activity(t0..t1, &[(user_2, project_2)])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let t2 = t1 + Duration::from_secs(10);
|
||||
db.record_project_activity(t1..t2, &[(user_2, project_2)])
|
||||
db.record_user_activity(t1..t2, &[(user_2, project_2)])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// User 1 joins the project
|
||||
let t3 = t2 + Duration::from_secs(10);
|
||||
db.record_project_activity(t2..t3, &[(user_2, project_2), (user_1, project_2)])
|
||||
db.record_user_activity(t2..t3, &[(user_2, project_2), (user_1, project_2)])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// User 1 opens another project
|
||||
let t4 = t3 + Duration::from_secs(10);
|
||||
db.record_project_activity(
|
||||
db.record_user_activity(
|
||||
t3..t4,
|
||||
&[
|
||||
(user_2, project_2),
|
||||
@@ -1474,7 +1594,7 @@ pub mod tests {
|
||||
|
||||
// User 3 joins that project
|
||||
let t5 = t4 + Duration::from_secs(10);
|
||||
db.record_project_activity(
|
||||
db.record_user_activity(
|
||||
t4..t5,
|
||||
&[
|
||||
(user_2, project_2),
|
||||
@@ -1488,20 +1608,25 @@ pub mod tests {
|
||||
|
||||
// User 2 leaves
|
||||
let t6 = t5 + Duration::from_secs(5);
|
||||
db.record_project_activity(t5..t6, &[(user_1, project_1), (user_3, project_1)])
|
||||
db.record_user_activity(t5..t6, &[(user_1, project_1), (user_3, project_1)])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let t7 = t6 + Duration::from_secs(60);
|
||||
let t8 = t7 + Duration::from_secs(10);
|
||||
db.record_user_activity(t7..t8, &[(user_1, project_1)])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let summary = db.summarize_project_activity(t0..t6, 10).await.unwrap();
|
||||
assert_eq!(
|
||||
summary,
|
||||
db.get_top_users_activity_summary(t0..t6, 10).await.unwrap(),
|
||||
&[
|
||||
UserActivitySummary {
|
||||
id: user_1,
|
||||
github_login: "user_1".to_string(),
|
||||
project_activity: vec![
|
||||
(project_1, Duration::from_secs(25)),
|
||||
(project_2, Duration::from_secs(30)),
|
||||
(project_1, Duration::from_secs(25))
|
||||
]
|
||||
},
|
||||
UserActivitySummary {
|
||||
@@ -1516,6 +1641,46 @@ pub mod tests {
|
||||
},
|
||||
]
|
||||
);
|
||||
assert_eq!(
|
||||
db.get_user_activity_timeline(t3..t6, user_1).await.unwrap(),
|
||||
&[
|
||||
UserActivityPeriod {
|
||||
project_id: project_1,
|
||||
start: t3,
|
||||
end: t6,
|
||||
extensions: HashMap::from_iter([("rs".to_string(), 5), ("md".to_string(), 7)]),
|
||||
},
|
||||
UserActivityPeriod {
|
||||
project_id: project_2,
|
||||
start: t3,
|
||||
end: t5,
|
||||
extensions: Default::default(),
|
||||
},
|
||||
]
|
||||
);
|
||||
assert_eq!(
|
||||
db.get_user_activity_timeline(t0..t8, user_1).await.unwrap(),
|
||||
&[
|
||||
UserActivityPeriod {
|
||||
project_id: project_2,
|
||||
start: t2,
|
||||
end: t5,
|
||||
extensions: Default::default(),
|
||||
},
|
||||
UserActivityPeriod {
|
||||
project_id: project_1,
|
||||
start: t3,
|
||||
end: t6,
|
||||
extensions: HashMap::from_iter([("rs".to_string(), 5), ("md".to_string(), 7)]),
|
||||
},
|
||||
UserActivityPeriod {
|
||||
project_id: project_1,
|
||||
start: t7,
|
||||
end: t8,
|
||||
extensions: HashMap::from_iter([("rs".to_string(), 5), ("md".to_string(), 7)]),
|
||||
},
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
@@ -2090,7 +2255,7 @@ pub mod tests {
|
||||
background: Arc<Background>,
|
||||
pub users: Mutex<BTreeMap<UserId, User>>,
|
||||
pub projects: Mutex<BTreeMap<ProjectId, Project>>,
|
||||
pub worktree_extensions: Mutex<BTreeMap<(ProjectId, u64, String), usize>>,
|
||||
pub worktree_extensions: Mutex<BTreeMap<(ProjectId, u64, String), u32>>,
|
||||
pub orgs: Mutex<BTreeMap<OrgId, Org>>,
|
||||
pub org_memberships: Mutex<BTreeMap<(OrgId, UserId), bool>>,
|
||||
pub channels: Mutex<BTreeMap<ChannelId, Channel>>,
|
||||
@@ -2281,7 +2446,7 @@ pub mod tests {
|
||||
&self,
|
||||
project_id: ProjectId,
|
||||
worktree_id: u64,
|
||||
extensions: HashMap<String, usize>,
|
||||
extensions: HashMap<String, u32>,
|
||||
) -> Result<()> {
|
||||
self.background.simulate_random_delay().await;
|
||||
if !self.projects.lock().contains_key(&project_id) {
|
||||
@@ -2304,22 +2469,30 @@ pub mod tests {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn record_project_activity(
|
||||
async fn record_user_activity(
|
||||
&self,
|
||||
_period: Range<OffsetDateTime>,
|
||||
_time_period: Range<OffsetDateTime>,
|
||||
_active_projects: &[(UserId, ProjectId)],
|
||||
) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn summarize_project_activity(
|
||||
async fn get_top_users_activity_summary(
|
||||
&self,
|
||||
_period: Range<OffsetDateTime>,
|
||||
_time_period: Range<OffsetDateTime>,
|
||||
_limit: usize,
|
||||
) -> Result<Vec<UserActivitySummary>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn get_user_activity_timeline(
|
||||
&self,
|
||||
_time_period: Range<OffsetDateTime>,
|
||||
_user_id: UserId,
|
||||
) -> Result<Vec<UserActivityPeriod>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
// contacts
|
||||
|
||||
async fn get_contacts(&self, id: UserId) -> Result<Vec<Contact>> {
|
||||
|
||||
@@ -483,14 +483,20 @@ async fn test_offline_projects(
|
||||
deterministic: Arc<Deterministic>,
|
||||
cx_a: &mut TestAppContext,
|
||||
cx_b: &mut TestAppContext,
|
||||
cx_c: &mut TestAppContext,
|
||||
) {
|
||||
cx_a.foreground().forbid_parking();
|
||||
let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
|
||||
let client_a = server.create_client(cx_a, "user_a").await;
|
||||
let client_b = server.create_client(cx_b, "user_b").await;
|
||||
let client_c = server.create_client(cx_c, "user_c").await;
|
||||
let user_a = UserId::from_proto(client_a.user_id().unwrap());
|
||||
server
|
||||
.make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
|
||||
.make_contacts(vec![
|
||||
(&client_a, cx_a),
|
||||
(&client_b, cx_b),
|
||||
(&client_c, cx_c),
|
||||
])
|
||||
.await;
|
||||
|
||||
// Set up observers of the project and user stores. Any time either of
|
||||
@@ -584,7 +590,8 @@ async fn test_offline_projects(
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// When a project is offline, no information about it is sent to the server.
|
||||
// When a project is offline, we still create it on the server but is invisible
|
||||
// to other users.
|
||||
deterministic.run_until_parked();
|
||||
assert!(server
|
||||
.store
|
||||
@@ -592,7 +599,10 @@ async fn test_offline_projects(
|
||||
.await
|
||||
.project_metadata_for_user(user_a)
|
||||
.is_empty());
|
||||
assert!(project.read_with(cx_a, |project, _| project.remote_id().is_none()));
|
||||
project.read_with(cx_a, |project, _| {
|
||||
assert!(project.remote_id().is_some());
|
||||
assert!(!project.is_online());
|
||||
});
|
||||
assert!(client_b
|
||||
.user_store
|
||||
.read_with(cx_b, |store, _| { store.contacts()[0].projects.is_empty() }));
|
||||
@@ -666,7 +676,7 @@ async fn test_offline_projects(
|
||||
|
||||
// Build another project using a directory which was previously part of
|
||||
// an online project. Restore the project's state from the host's database.
|
||||
let project2 = cx_a.update(|cx| {
|
||||
let project2_a = cx_a.update(|cx| {
|
||||
Project::local(
|
||||
false,
|
||||
client_a.client.clone(),
|
||||
@@ -677,21 +687,21 @@ async fn test_offline_projects(
|
||||
cx,
|
||||
)
|
||||
});
|
||||
project2
|
||||
project2_a
|
||||
.update(cx_a, |p, cx| {
|
||||
p.find_or_create_local_worktree("/code/crate3", true, cx)
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
project2
|
||||
project2_a
|
||||
.update(cx_a, |project, cx| project.restore_state(cx))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// This project is now online, because its directory was previously online.
|
||||
project2.read_with(cx_a, |project, _| assert!(project.is_online()));
|
||||
project2_a.read_with(cx_a, |project, _| assert!(project.is_online()));
|
||||
deterministic.run_until_parked();
|
||||
let project2_id = project2.read_with(cx_a, |p, _| p.remote_id()).unwrap();
|
||||
let project2_id = project2_a.read_with(cx_a, |p, _| p.remote_id()).unwrap();
|
||||
client_b.user_store.read_with(cx_b, |store, _| {
|
||||
assert_eq!(
|
||||
store.contacts()[0].projects,
|
||||
@@ -714,6 +724,41 @@ async fn test_offline_projects(
|
||||
);
|
||||
});
|
||||
|
||||
let project2_b = client_b.build_remote_project(&project2_a, cx_a, cx_b).await;
|
||||
let project2_c = cx_c.foreground().spawn(Project::remote(
|
||||
project2_id,
|
||||
client_c.client.clone(),
|
||||
client_c.user_store.clone(),
|
||||
client_c.project_store.clone(),
|
||||
client_c.language_registry.clone(),
|
||||
FakeFs::new(cx_c.background()),
|
||||
cx_c.to_async(),
|
||||
));
|
||||
deterministic.run_until_parked();
|
||||
|
||||
// Taking a project offline unshares the project, rejects any pending join request and
|
||||
// disconnects existing guests.
|
||||
project2_a.update(cx_a, |project, cx| project.set_online(false, cx));
|
||||
deterministic.run_until_parked();
|
||||
project2_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
|
||||
project2_b.read_with(cx_b, |project, _| assert!(project.is_read_only()));
|
||||
project2_c.await.unwrap_err();
|
||||
|
||||
client_b.user_store.read_with(cx_b, |store, _| {
|
||||
assert_eq!(
|
||||
store.contacts()[0].projects,
|
||||
&[ProjectMetadata {
|
||||
id: project_id,
|
||||
visible_worktree_root_names: vec![
|
||||
"crate1".into(),
|
||||
"crate2".into(),
|
||||
"crate3".into()
|
||||
],
|
||||
guests: Default::default(),
|
||||
},]
|
||||
);
|
||||
});
|
||||
|
||||
cx_a.update(|cx| {
|
||||
drop(subscriptions);
|
||||
drop(view);
|
||||
|
||||
@@ -159,6 +159,7 @@ impl Server {
|
||||
.add_message_handler(Server::update_project)
|
||||
.add_message_handler(Server::register_project_activity)
|
||||
.add_request_handler(Server::update_worktree)
|
||||
.add_message_handler(Server::update_worktree_extensions)
|
||||
.add_message_handler(Server::start_language_server)
|
||||
.add_message_handler(Server::update_language_server)
|
||||
.add_message_handler(Server::update_diagnostic_summary)
|
||||
@@ -327,7 +328,7 @@ impl Server {
|
||||
let period_end = OffsetDateTime::now_utc();
|
||||
this.app_state
|
||||
.db
|
||||
.record_project_activity(period_start..period_end, &active_projects)
|
||||
.record_user_activity(period_start..period_end, &active_projects)
|
||||
.await
|
||||
.trace_err();
|
||||
period_start = period_end;
|
||||
@@ -600,9 +601,11 @@ impl Server {
|
||||
.await
|
||||
.user_id_for_connection(request.sender_id)?;
|
||||
let project_id = self.app_state.db.register_project(user_id).await?;
|
||||
self.store()
|
||||
.await
|
||||
.register_project(request.sender_id, project_id)?;
|
||||
self.store().await.register_project(
|
||||
request.sender_id,
|
||||
project_id,
|
||||
request.payload.online,
|
||||
)?;
|
||||
|
||||
response.send(proto::RegisterProjectResponse {
|
||||
project_id: project_id.to_proto(),
|
||||
@@ -948,12 +951,53 @@ impl Server {
|
||||
let guest_connection_ids = state
|
||||
.read_project(project_id, request.sender_id)?
|
||||
.guest_connection_ids();
|
||||
state.update_project(project_id, &request.payload.worktrees, request.sender_id)?;
|
||||
broadcast(request.sender_id, guest_connection_ids, |connection_id| {
|
||||
self.peer
|
||||
.forward_send(request.sender_id, connection_id, request.payload.clone())
|
||||
});
|
||||
let unshared_project = state.update_project(
|
||||
project_id,
|
||||
&request.payload.worktrees,
|
||||
request.payload.online,
|
||||
request.sender_id,
|
||||
)?;
|
||||
|
||||
if let Some(unshared_project) = unshared_project {
|
||||
broadcast(
|
||||
request.sender_id,
|
||||
unshared_project.guests.keys().copied(),
|
||||
|conn_id| {
|
||||
self.peer.send(
|
||||
conn_id,
|
||||
proto::UnregisterProject {
|
||||
project_id: project_id.to_proto(),
|
||||
},
|
||||
)
|
||||
},
|
||||
);
|
||||
for (_, receipts) in unshared_project.pending_join_requests {
|
||||
for receipt in receipts {
|
||||
self.peer.respond(
|
||||
receipt,
|
||||
proto::JoinProjectResponse {
|
||||
variant: Some(proto::join_project_response::Variant::Decline(
|
||||
proto::join_project_response::Decline {
|
||||
reason:
|
||||
proto::join_project_response::decline::Reason::Closed
|
||||
as i32,
|
||||
},
|
||||
)),
|
||||
},
|
||||
)?;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
broadcast(request.sender_id, guest_connection_ids, |connection_id| {
|
||||
self.peer.forward_send(
|
||||
request.sender_id,
|
||||
connection_id,
|
||||
request.payload.clone(),
|
||||
)
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
self.update_user_contacts(user_id).await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -976,9 +1020,9 @@ impl Server {
|
||||
) -> Result<()> {
|
||||
let project_id = ProjectId::from_proto(request.payload.project_id);
|
||||
let worktree_id = request.payload.worktree_id;
|
||||
let (connection_ids, metadata_changed, extension_counts) = {
|
||||
let (connection_ids, metadata_changed) = {
|
||||
let mut store = self.store().await;
|
||||
let (connection_ids, metadata_changed, extension_counts) = store.update_worktree(
|
||||
let (connection_ids, metadata_changed) = store.update_worktree(
|
||||
request.sender_id,
|
||||
project_id,
|
||||
worktree_id,
|
||||
@@ -988,12 +1032,8 @@ impl Server {
|
||||
request.payload.scan_id,
|
||||
request.payload.is_last_update,
|
||||
)?;
|
||||
(connection_ids, metadata_changed, extension_counts.clone())
|
||||
(connection_ids, metadata_changed)
|
||||
};
|
||||
self.app_state
|
||||
.db
|
||||
.update_worktree_extensions(project_id, worktree_id, extension_counts)
|
||||
.await?;
|
||||
|
||||
broadcast(request.sender_id, connection_ids, |connection_id| {
|
||||
self.peer
|
||||
@@ -1010,6 +1050,25 @@ impl Server {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_worktree_extensions(
|
||||
self: Arc<Server>,
|
||||
request: TypedEnvelope<proto::UpdateWorktreeExtensions>,
|
||||
) -> Result<()> {
|
||||
let project_id = ProjectId::from_proto(request.payload.project_id);
|
||||
let worktree_id = request.payload.worktree_id;
|
||||
let extensions = request
|
||||
.payload
|
||||
.extensions
|
||||
.into_iter()
|
||||
.zip(request.payload.counts)
|
||||
.collect();
|
||||
self.app_state
|
||||
.db
|
||||
.update_worktree_extensions(project_id, worktree_id, extensions)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_diagnostic_summary(
|
||||
self: Arc<Server>,
|
||||
request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
|
||||
|
||||
@@ -3,12 +3,7 @@ use anyhow::{anyhow, Result};
|
||||
use collections::{btree_map, hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet};
|
||||
use rpc::{proto, ConnectionId, Receipt};
|
||||
use serde::Serialize;
|
||||
use std::{
|
||||
mem,
|
||||
path::{Path, PathBuf},
|
||||
str,
|
||||
time::Duration,
|
||||
};
|
||||
use std::{mem, path::PathBuf, str, time::Duration};
|
||||
use time::OffsetDateTime;
|
||||
use tracing::instrument;
|
||||
|
||||
@@ -32,6 +27,7 @@ struct ConnectionState {
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct Project {
|
||||
pub online: bool,
|
||||
pub host_connection_id: ConnectionId,
|
||||
pub host: Collaborator,
|
||||
pub guests: HashMap<ConnectionId, Collaborator>,
|
||||
@@ -58,8 +54,6 @@ pub struct Worktree {
|
||||
#[serde(skip)]
|
||||
pub entries: BTreeMap<u64, proto::Entry>,
|
||||
#[serde(skip)]
|
||||
pub extension_counts: HashMap<String, usize>,
|
||||
#[serde(skip)]
|
||||
pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
|
||||
pub scan_id: u64,
|
||||
pub is_complete: bool,
|
||||
@@ -89,6 +83,11 @@ pub struct LeftProject {
|
||||
pub unshare: bool,
|
||||
}
|
||||
|
||||
pub struct UnsharedProject {
|
||||
pub guests: HashMap<ConnectionId, Collaborator>,
|
||||
pub pending_join_requests: HashMap<UserId, Vec<Receipt<proto::JoinProject>>>,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub struct Metrics {
|
||||
pub connections: usize,
|
||||
@@ -298,7 +297,7 @@ impl Store {
|
||||
let mut metadata = Vec::new();
|
||||
for project_id in project_ids {
|
||||
if let Some(project) = self.projects.get(&project_id) {
|
||||
if project.host.user_id == user_id {
|
||||
if project.host.user_id == user_id && project.online {
|
||||
metadata.push(proto::ProjectMetadata {
|
||||
id: project_id.to_proto(),
|
||||
visible_worktree_root_names: project
|
||||
@@ -324,6 +323,7 @@ impl Store {
|
||||
&mut self,
|
||||
host_connection_id: ConnectionId,
|
||||
project_id: ProjectId,
|
||||
online: bool,
|
||||
) -> Result<()> {
|
||||
let connection = self
|
||||
.connections
|
||||
@@ -333,6 +333,7 @@ impl Store {
|
||||
self.projects.insert(
|
||||
project_id,
|
||||
Project {
|
||||
online,
|
||||
host_connection_id,
|
||||
host: Collaborator {
|
||||
user_id: connection.user_id,
|
||||
@@ -354,8 +355,9 @@ impl Store {
|
||||
&mut self,
|
||||
project_id: ProjectId,
|
||||
worktrees: &[proto::WorktreeMetadata],
|
||||
online: bool,
|
||||
connection_id: ConnectionId,
|
||||
) -> Result<()> {
|
||||
) -> Result<Option<UnsharedProject>> {
|
||||
let project = self
|
||||
.projects
|
||||
.get_mut(&project_id)
|
||||
@@ -376,7 +378,33 @@ impl Store {
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
if online != project.online {
|
||||
project.online = online;
|
||||
if project.online {
|
||||
Ok(None)
|
||||
} else {
|
||||
for connection_id in project.guest_connection_ids() {
|
||||
if let Some(connection) = self.connections.get_mut(&connection_id) {
|
||||
connection.projects.remove(&project_id);
|
||||
}
|
||||
}
|
||||
|
||||
project.active_replica_ids.clear();
|
||||
project.language_servers.clear();
|
||||
for worktree in project.worktrees.values_mut() {
|
||||
worktree.diagnostic_summaries.clear();
|
||||
worktree.entries.clear();
|
||||
}
|
||||
|
||||
Ok(Some(UnsharedProject {
|
||||
guests: mem::take(&mut project.guests),
|
||||
pending_join_requests: mem::take(&mut project.join_requests),
|
||||
}))
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
} else {
|
||||
Err(anyhow!("no such project"))?
|
||||
}
|
||||
@@ -482,13 +510,17 @@ impl Store {
|
||||
.projects
|
||||
.get_mut(&project_id)
|
||||
.ok_or_else(|| anyhow!("no such project"))?;
|
||||
connection.requested_projects.insert(project_id);
|
||||
project
|
||||
.join_requests
|
||||
.entry(requester_id)
|
||||
.or_default()
|
||||
.push(receipt);
|
||||
Ok(())
|
||||
if project.online {
|
||||
connection.requested_projects.insert(project_id);
|
||||
project
|
||||
.join_requests
|
||||
.entry(requester_id)
|
||||
.or_default()
|
||||
.push(receipt);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow!("no such project"))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deny_join_project_request(
|
||||
@@ -593,7 +625,6 @@ impl Store {
|
||||
for worktree in project.worktrees.values_mut() {
|
||||
worktree.diagnostic_summaries.clear();
|
||||
worktree.entries.clear();
|
||||
worktree.extension_counts.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -617,54 +648,28 @@ impl Store {
|
||||
updated_entries: &[proto::Entry],
|
||||
scan_id: u64,
|
||||
is_last_update: bool,
|
||||
) -> Result<(Vec<ConnectionId>, bool, HashMap<String, usize>)> {
|
||||
) -> Result<(Vec<ConnectionId>, bool)> {
|
||||
let project = self.write_project(project_id, connection_id)?;
|
||||
if !project.online {
|
||||
return Err(anyhow!("project is not online"));
|
||||
}
|
||||
|
||||
let connection_ids = project.connection_ids();
|
||||
let mut worktree = project.worktrees.entry(worktree_id).or_default();
|
||||
let metadata_changed = worktree_root_name != worktree.root_name;
|
||||
worktree.root_name = worktree_root_name.to_string();
|
||||
|
||||
for entry_id in removed_entries {
|
||||
if let Some(entry) = worktree.entries.remove(&entry_id) {
|
||||
if !entry.is_ignored {
|
||||
if let Some(extension) = extension_for_entry(&entry) {
|
||||
if let Some(count) = worktree.extension_counts.get_mut(extension) {
|
||||
*count = count.saturating_sub(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
worktree.entries.remove(&entry_id);
|
||||
}
|
||||
|
||||
for entry in updated_entries {
|
||||
if let Some(old_entry) = worktree.entries.insert(entry.id, entry.clone()) {
|
||||
if !old_entry.is_ignored {
|
||||
if let Some(extension) = extension_for_entry(&old_entry) {
|
||||
if let Some(count) = worktree.extension_counts.get_mut(extension) {
|
||||
*count = count.saturating_sub(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !entry.is_ignored {
|
||||
if let Some(extension) = extension_for_entry(&entry) {
|
||||
if let Some(count) = worktree.extension_counts.get_mut(extension) {
|
||||
*count += 1;
|
||||
} else {
|
||||
worktree.extension_counts.insert(extension.into(), 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
worktree.entries.insert(entry.id, entry.clone());
|
||||
}
|
||||
|
||||
worktree.scan_id = scan_id;
|
||||
worktree.is_complete = is_last_update;
|
||||
Ok((
|
||||
connection_ids,
|
||||
metadata_changed,
|
||||
worktree.extension_counts.clone(),
|
||||
))
|
||||
Ok((connection_ids, metadata_changed))
|
||||
}
|
||||
|
||||
pub fn project_connection_ids(
|
||||
@@ -853,11 +858,3 @@ impl Channel {
|
||||
self.connection_ids.iter().copied().collect()
|
||||
}
|
||||
}
|
||||
|
||||
fn extension_for_entry(entry: &proto::Entry) -> Option<&str> {
|
||||
str::from_utf8(&entry.path)
|
||||
.ok()
|
||||
.map(Path::new)
|
||||
.and_then(|p| p.extension())
|
||||
.and_then(|e| e.to_str())
|
||||
}
|
||||
|
||||
@@ -1263,6 +1263,13 @@ mod tests {
|
||||
.detach();
|
||||
});
|
||||
|
||||
let request = server.receive::<proto::RegisterProject>().await.unwrap();
|
||||
server
|
||||
.respond(
|
||||
request.receipt(),
|
||||
proto::RegisterProjectResponse { project_id: 200 },
|
||||
)
|
||||
.await;
|
||||
let get_users_request = server.receive::<proto::GetUsers>().await.unwrap();
|
||||
server
|
||||
.respond(
|
||||
@@ -1340,6 +1347,19 @@ mod tests {
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
assert_eq!(
|
||||
server
|
||||
.receive::<proto::UpdateProject>()
|
||||
.await
|
||||
.unwrap()
|
||||
.payload,
|
||||
proto::UpdateProject {
|
||||
project_id: 200,
|
||||
online: false,
|
||||
worktrees: vec![]
|
||||
},
|
||||
);
|
||||
|
||||
cx.foreground().run_until_parked();
|
||||
assert_eq!(
|
||||
cx.read(|cx| render_to_strings(&panel, cx)),
|
||||
@@ -1383,36 +1403,6 @@ mod tests {
|
||||
]
|
||||
);
|
||||
|
||||
// The server responds, assigning the project a remote id. It still appears
|
||||
// as loading, because the server hasn't yet sent out the updated contact
|
||||
// state for the current user.
|
||||
let request = server.receive::<proto::RegisterProject>().await.unwrap();
|
||||
server
|
||||
.respond(
|
||||
request.receipt(),
|
||||
proto::RegisterProjectResponse { project_id: 200 },
|
||||
)
|
||||
.await;
|
||||
cx.foreground().run_until_parked();
|
||||
assert_eq!(
|
||||
cx.read(|cx| render_to_strings(&panel, cx)),
|
||||
&[
|
||||
"v Requests",
|
||||
" incoming user_one",
|
||||
" outgoing user_two",
|
||||
"v Online",
|
||||
" the_current_user",
|
||||
" dir3",
|
||||
" 🔒 private_dir (going online...)",
|
||||
" user_four",
|
||||
" dir2",
|
||||
" user_three",
|
||||
" dir1",
|
||||
"v Offline",
|
||||
" user_five",
|
||||
]
|
||||
);
|
||||
|
||||
// The server receives the project's metadata and updates the contact metadata
|
||||
// for the current user. Now the project appears as online.
|
||||
assert_eq!(
|
||||
@@ -1420,14 +1410,22 @@ mod tests {
|
||||
.receive::<proto::UpdateProject>()
|
||||
.await
|
||||
.unwrap()
|
||||
.payload
|
||||
.worktrees,
|
||||
&[proto::WorktreeMetadata {
|
||||
id: worktree_id,
|
||||
root_name: "private_dir".to_string(),
|
||||
visible: true,
|
||||
}],
|
||||
.payload,
|
||||
proto::UpdateProject {
|
||||
project_id: 200,
|
||||
online: true,
|
||||
worktrees: vec![proto::WorktreeMetadata {
|
||||
id: worktree_id,
|
||||
root_name: "private_dir".to_string(),
|
||||
visible: true,
|
||||
}]
|
||||
},
|
||||
);
|
||||
server
|
||||
.receive::<proto::UpdateWorktreeExtensions>()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
server.send(proto::UpdateContacts {
|
||||
contacts: vec![proto::Contact {
|
||||
user_id: current_user_id,
|
||||
@@ -1492,7 +1490,19 @@ mod tests {
|
||||
|
||||
// The server receives the unregister request and updates the contact
|
||||
// metadata for the current user. The project is now offline.
|
||||
let request = server.receive::<proto::UnregisterProject>().await.unwrap();
|
||||
assert_eq!(
|
||||
server
|
||||
.receive::<proto::UpdateProject>()
|
||||
.await
|
||||
.unwrap()
|
||||
.payload,
|
||||
proto::UpdateProject {
|
||||
project_id: 200,
|
||||
online: false,
|
||||
worktrees: vec![]
|
||||
},
|
||||
);
|
||||
|
||||
server.send(proto::UpdateContacts {
|
||||
contacts: vec![proto::Contact {
|
||||
user_id: current_user_id,
|
||||
@@ -1526,28 +1536,6 @@ mod tests {
|
||||
]
|
||||
);
|
||||
|
||||
// The server responds to the unregister request.
|
||||
server.respond(request.receipt(), proto::Ack {}).await;
|
||||
cx.foreground().run_until_parked();
|
||||
assert_eq!(
|
||||
cx.read(|cx| render_to_strings(&panel, cx)),
|
||||
&[
|
||||
"v Requests",
|
||||
" incoming user_one",
|
||||
" outgoing user_two",
|
||||
"v Online",
|
||||
" the_current_user",
|
||||
" dir3",
|
||||
" 🔒 private_dir",
|
||||
" user_four",
|
||||
" dir2",
|
||||
" user_three",
|
||||
" dir1",
|
||||
"v Offline",
|
||||
" user_five",
|
||||
]
|
||||
);
|
||||
|
||||
panel.update(cx, |panel, cx| {
|
||||
panel
|
||||
.filter_editor
|
||||
|
||||
@@ -592,7 +592,7 @@ impl AsyncAppContext {
|
||||
self.0.borrow().foreground.spawn(f(self.clone()))
|
||||
}
|
||||
|
||||
pub fn read<T, F: FnOnce(&AppContext) -> T>(&mut self, callback: F) -> T {
|
||||
pub fn read<T, F: FnOnce(&AppContext) -> T>(&self, callback: F) -> T {
|
||||
callback(self.0.borrow().as_ref())
|
||||
}
|
||||
|
||||
|
||||
@@ -890,7 +890,7 @@ impl LspCommand for GetHover {
|
||||
message: Option<lsp::Hover>,
|
||||
_: ModelHandle<Project>,
|
||||
buffer: ModelHandle<Buffer>,
|
||||
mut cx: AsyncAppContext,
|
||||
cx: AsyncAppContext,
|
||||
) -> Result<Self::Response> {
|
||||
Ok(message.and_then(|hover| {
|
||||
let range = hover.range.map(|range| {
|
||||
|
||||
@@ -148,13 +148,14 @@ enum ProjectClientState {
|
||||
remote_id_rx: watch::Receiver<Option<u64>>,
|
||||
online_tx: watch::Sender<bool>,
|
||||
online_rx: watch::Receiver<bool>,
|
||||
_maintain_remote_id_task: Task<Option<()>>,
|
||||
_maintain_remote_id: Task<Option<()>>,
|
||||
_maintain_online_status: Task<Option<()>>,
|
||||
},
|
||||
Remote {
|
||||
sharing_has_stopped: bool,
|
||||
remote_id: u64,
|
||||
replica_id: ReplicaId,
|
||||
_detect_unshare_task: Task<Option<()>>,
|
||||
_detect_unshare: Task<Option<()>>,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -401,17 +402,13 @@ impl Project {
|
||||
cx: &mut MutableAppContext,
|
||||
) -> ModelHandle<Self> {
|
||||
cx.add_model(|cx: &mut ModelContext<Self>| {
|
||||
let (online_tx, online_rx) = watch::channel_with(online);
|
||||
let (remote_id_tx, remote_id_rx) = watch::channel();
|
||||
let _maintain_remote_id_task = cx.spawn_weak({
|
||||
let status_rx = client.clone().status();
|
||||
let online_rx = online_rx.clone();
|
||||
let _maintain_remote_id = cx.spawn_weak({
|
||||
let mut status_rx = client.clone().status();
|
||||
move |this, mut cx| async move {
|
||||
let mut stream = Stream::map(status_rx.clone(), drop)
|
||||
.merge(Stream::map(online_rx.clone(), drop));
|
||||
while stream.recv().await.is_some() {
|
||||
while let Some(status) = status_rx.recv().await {
|
||||
let this = this.upgrade(&cx)?;
|
||||
if status_rx.borrow().is_connected() && *online_rx.borrow() {
|
||||
if status.is_connected() {
|
||||
this.update(&mut cx, |this, cx| this.register(cx))
|
||||
.await
|
||||
.log_err()?;
|
||||
@@ -425,6 +422,23 @@ impl Project {
|
||||
}
|
||||
});
|
||||
|
||||
let (online_tx, online_rx) = watch::channel_with(online);
|
||||
let _maintain_online_status = cx.spawn_weak({
|
||||
let mut online_rx = online_rx.clone();
|
||||
move |this, mut cx| async move {
|
||||
while let Some(online) = online_rx.recv().await {
|
||||
let this = this.upgrade(&cx)?;
|
||||
this.update(&mut cx, |this, cx| {
|
||||
if !online {
|
||||
this.unshared(cx);
|
||||
}
|
||||
this.metadata_changed(false, cx)
|
||||
});
|
||||
}
|
||||
None
|
||||
}
|
||||
});
|
||||
|
||||
let handle = cx.weak_handle();
|
||||
project_store.update(cx, |store, cx| store.add_project(handle, cx));
|
||||
|
||||
@@ -443,7 +457,8 @@ impl Project {
|
||||
remote_id_rx,
|
||||
online_tx,
|
||||
online_rx,
|
||||
_maintain_remote_id_task,
|
||||
_maintain_remote_id,
|
||||
_maintain_online_status,
|
||||
},
|
||||
opened_buffer: (Rc::new(RefCell::new(opened_buffer_tx)), opened_buffer_rx),
|
||||
client_subscriptions: Vec::new(),
|
||||
@@ -538,7 +553,7 @@ impl Project {
|
||||
sharing_has_stopped: false,
|
||||
remote_id,
|
||||
replica_id,
|
||||
_detect_unshare_task: cx.spawn_weak(move |this, mut cx| {
|
||||
_detect_unshare: cx.spawn_weak(move |this, mut cx| {
|
||||
async move {
|
||||
let mut status = client.status();
|
||||
let is_connected =
|
||||
@@ -812,13 +827,11 @@ impl Project {
|
||||
&self.fs
|
||||
}
|
||||
|
||||
pub fn set_online(&mut self, online: bool, cx: &mut ModelContext<Self>) {
|
||||
pub fn set_online(&mut self, online: bool, _: &mut ModelContext<Self>) {
|
||||
if let ProjectClientState::Local { online_tx, .. } = &mut self.client_state {
|
||||
let mut online_tx = online_tx.borrow_mut();
|
||||
if *online_tx != online {
|
||||
*online_tx = online;
|
||||
drop(online_tx);
|
||||
self.metadata_changed(true, cx);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -869,27 +882,36 @@ impl Project {
|
||||
}
|
||||
|
||||
fn register(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
|
||||
if let ProjectClientState::Local { remote_id_rx, .. } = &self.client_state {
|
||||
if let ProjectClientState::Local {
|
||||
remote_id_rx,
|
||||
online_rx,
|
||||
..
|
||||
} = &self.client_state
|
||||
{
|
||||
if remote_id_rx.borrow().is_some() {
|
||||
return Task::ready(Ok(()));
|
||||
}
|
||||
}
|
||||
|
||||
let response = self.client.request(proto::RegisterProject {});
|
||||
cx.spawn(|this, mut cx| async move {
|
||||
let remote_id = response.await?.project_id;
|
||||
this.update(&mut cx, |this, cx| {
|
||||
if let ProjectClientState::Local { remote_id_tx, .. } = &mut this.client_state {
|
||||
*remote_id_tx.borrow_mut() = Some(remote_id);
|
||||
}
|
||||
let response = self.client.request(proto::RegisterProject {
|
||||
online: *online_rx.borrow(),
|
||||
});
|
||||
cx.spawn(|this, mut cx| async move {
|
||||
let remote_id = response.await?.project_id;
|
||||
this.update(&mut cx, |this, cx| {
|
||||
if let ProjectClientState::Local { remote_id_tx, .. } = &mut this.client_state {
|
||||
*remote_id_tx.borrow_mut() = Some(remote_id);
|
||||
}
|
||||
|
||||
this.metadata_changed(false, cx);
|
||||
cx.emit(Event::RemoteIdChanged(Some(remote_id)));
|
||||
this.client_subscriptions
|
||||
.push(this.client.add_model_for_remote_entity(remote_id, cx));
|
||||
Ok(())
|
||||
this.metadata_changed(false, cx);
|
||||
cx.emit(Event::RemoteIdChanged(Some(remote_id)));
|
||||
this.client_subscriptions
|
||||
.push(this.client.add_model_for_remote_entity(remote_id, cx));
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
})
|
||||
} else {
|
||||
Task::ready(Err(anyhow!("can't register a remote project")))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remote_id(&self) -> Option<u64> {
|
||||
@@ -953,21 +975,52 @@ impl Project {
|
||||
..
|
||||
} = &self.client_state
|
||||
{
|
||||
if let (Some(project_id), true) = (*remote_id_rx.borrow(), *online_rx.borrow()) {
|
||||
// Broadcast worktrees only if the project is online.
|
||||
let worktrees = if *online_rx.borrow() {
|
||||
self.worktrees
|
||||
.iter()
|
||||
.filter_map(|worktree| {
|
||||
worktree
|
||||
.upgrade(&cx)
|
||||
.map(|worktree| worktree.read(cx).as_local().unwrap().metadata_proto())
|
||||
})
|
||||
.collect()
|
||||
} else {
|
||||
Default::default()
|
||||
};
|
||||
if let Some(project_id) = *remote_id_rx.borrow() {
|
||||
let online = *online_rx.borrow();
|
||||
self.client
|
||||
.send(proto::UpdateProject {
|
||||
project_id,
|
||||
worktrees: self
|
||||
.worktrees
|
||||
.iter()
|
||||
.filter_map(|worktree| {
|
||||
worktree.upgrade(&cx).map(|worktree| {
|
||||
worktree.read(cx).as_local().unwrap().metadata_proto()
|
||||
})
|
||||
})
|
||||
.collect(),
|
||||
worktrees,
|
||||
online,
|
||||
})
|
||||
.log_err();
|
||||
|
||||
if online {
|
||||
let worktrees = self.visible_worktrees(cx).collect::<Vec<_>>();
|
||||
let scans_complete =
|
||||
futures::future::join_all(worktrees.iter().filter_map(|worktree| {
|
||||
Some(worktree.read(cx).as_local()?.scan_complete())
|
||||
}));
|
||||
|
||||
let worktrees = worktrees.into_iter().map(|handle| handle.downgrade());
|
||||
cx.spawn_weak(move |_, cx| async move {
|
||||
scans_complete.await;
|
||||
cx.read(|cx| {
|
||||
for worktree in worktrees {
|
||||
if let Some(worktree) = worktree
|
||||
.upgrade(cx)
|
||||
.and_then(|worktree| worktree.read(cx).as_local())
|
||||
{
|
||||
worktree.send_extension_counts(project_id);
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
.detach();
|
||||
}
|
||||
}
|
||||
|
||||
self.project_store.update(cx, |_, cx| cx.notify());
|
||||
@@ -1232,6 +1285,10 @@ impl Project {
|
||||
}
|
||||
|
||||
fn share(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
|
||||
if !self.is_online() {
|
||||
return Task::ready(Err(anyhow!("can't share an offline project")));
|
||||
}
|
||||
|
||||
let project_id;
|
||||
if let ProjectClientState::Local {
|
||||
remote_id_rx,
|
||||
@@ -1347,7 +1404,11 @@ impl Project {
|
||||
cx: &mut ModelContext<Self>,
|
||||
) {
|
||||
if let Some(project_id) = self.remote_id() {
|
||||
let share = self.share(cx);
|
||||
let share = if self.is_online() && allow {
|
||||
Some(self.share(cx))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let client = self.client.clone();
|
||||
cx.foreground()
|
||||
.spawn(async move {
|
||||
@@ -1356,7 +1417,9 @@ impl Project {
|
||||
project_id,
|
||||
allow,
|
||||
})?;
|
||||
share.await?;
|
||||
if let Some(share) = share {
|
||||
share.await?;
|
||||
}
|
||||
anyhow::Ok(())
|
||||
})
|
||||
.detach_and_log_err(cx);
|
||||
|
||||
@@ -106,6 +106,7 @@ pub struct LocalSnapshot {
|
||||
removed_entry_ids: HashMap<u64, ProjectEntryId>,
|
||||
next_entry_id: Arc<AtomicUsize>,
|
||||
snapshot: Snapshot,
|
||||
extension_counts: HashMap<OsString, usize>,
|
||||
}
|
||||
|
||||
impl Deref for LocalSnapshot {
|
||||
@@ -381,6 +382,7 @@ impl LocalWorktree {
|
||||
scan_id: 0,
|
||||
is_complete: true,
|
||||
},
|
||||
extension_counts: Default::default(),
|
||||
};
|
||||
if let Some(metadata) = metadata {
|
||||
let entry = Entry::new(
|
||||
@@ -916,6 +918,25 @@ impl LocalWorktree {
|
||||
pub fn is_shared(&self) -> bool {
|
||||
self.share.is_some()
|
||||
}
|
||||
|
||||
pub fn send_extension_counts(&self, project_id: u64) {
|
||||
let mut extensions = Vec::new();
|
||||
let mut counts = Vec::new();
|
||||
|
||||
for (extension, count) in self.extension_counts() {
|
||||
extensions.push(extension.to_string_lossy().to_string());
|
||||
counts.push(*count as u32);
|
||||
}
|
||||
|
||||
self.client
|
||||
.send(proto::UpdateWorktreeExtensions {
|
||||
project_id,
|
||||
worktree_id: self.id().to_proto(),
|
||||
extensions,
|
||||
counts,
|
||||
})
|
||||
.log_err();
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteWorktree {
|
||||
@@ -1221,6 +1242,10 @@ impl LocalSnapshot {
|
||||
&self.abs_path
|
||||
}
|
||||
|
||||
pub fn extension_counts(&self) -> &HashMap<OsString, usize> {
|
||||
&self.extension_counts
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn build_initial_update(&self, project_id: u64) -> proto::UpdateWorktree {
|
||||
let root_name = self.root_name.clone();
|
||||
@@ -1324,7 +1349,7 @@ impl LocalSnapshot {
|
||||
self.reuse_entry_id(&mut entry);
|
||||
self.entries_by_path.insert_or_replace(entry.clone(), &());
|
||||
let scan_id = self.scan_id;
|
||||
self.entries_by_id.insert_or_replace(
|
||||
let removed_entry = self.entries_by_id.insert_or_replace(
|
||||
PathEntry {
|
||||
id: entry.id,
|
||||
path: entry.path.clone(),
|
||||
@@ -1333,6 +1358,12 @@ impl LocalSnapshot {
|
||||
},
|
||||
&(),
|
||||
);
|
||||
|
||||
if let Some(removed_entry) = removed_entry {
|
||||
self.dec_extension_count(&removed_entry.path, removed_entry.is_ignored);
|
||||
}
|
||||
self.inc_extension_count(&entry.path, entry.is_ignored);
|
||||
|
||||
entry
|
||||
}
|
||||
|
||||
@@ -1368,6 +1399,7 @@ impl LocalSnapshot {
|
||||
|
||||
for mut entry in entries {
|
||||
self.reuse_entry_id(&mut entry);
|
||||
self.inc_extension_count(&entry.path, entry.is_ignored);
|
||||
entries_by_id_edits.push(Edit::Insert(PathEntry {
|
||||
id: entry.id,
|
||||
path: entry.path.clone(),
|
||||
@@ -1378,7 +1410,33 @@ impl LocalSnapshot {
|
||||
}
|
||||
|
||||
self.entries_by_path.edit(entries_by_path_edits, &());
|
||||
self.entries_by_id.edit(entries_by_id_edits, &());
|
||||
let removed_entries = self.entries_by_id.edit(entries_by_id_edits, &());
|
||||
|
||||
for removed_entry in removed_entries {
|
||||
self.dec_extension_count(&removed_entry.path, removed_entry.is_ignored);
|
||||
}
|
||||
}
|
||||
|
||||
fn inc_extension_count(&mut self, path: &Path, ignored: bool) {
|
||||
if !ignored {
|
||||
if let Some(extension) = path.extension() {
|
||||
if let Some(count) = self.extension_counts.get_mut(extension) {
|
||||
*count += 1;
|
||||
} else {
|
||||
self.extension_counts.insert(extension.into(), 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn dec_extension_count(&mut self, path: &Path, ignored: bool) {
|
||||
if !ignored {
|
||||
if let Some(extension) = path.extension() {
|
||||
if let Some(count) = self.extension_counts.get_mut(extension) {
|
||||
*count -= 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn reuse_entry_id(&mut self, entry: &mut Entry) {
|
||||
@@ -1408,6 +1466,7 @@ impl LocalSnapshot {
|
||||
.or_insert(entry.id);
|
||||
*removed_entry_id = cmp::max(*removed_entry_id, entry.id);
|
||||
entries_by_id_edits.push(Edit::Remove(entry.id));
|
||||
self.dec_extension_count(&entry.path, entry.is_ignored);
|
||||
}
|
||||
self.entries_by_id.edit(entries_by_id_edits, &());
|
||||
|
||||
@@ -2837,6 +2896,7 @@ mod tests {
|
||||
scan_id: 0,
|
||||
is_complete: true,
|
||||
},
|
||||
extension_counts: Default::default(),
|
||||
};
|
||||
initial_snapshot.insert_entry(
|
||||
Entry::new(
|
||||
@@ -3116,6 +3176,15 @@ mod tests {
|
||||
.entry_for_path(ignore_parent_path.join(&*GITIGNORE))
|
||||
.is_some());
|
||||
}
|
||||
|
||||
// Ensure extension counts are correct.
|
||||
let mut expected_extension_counts = HashMap::default();
|
||||
for extension in self.entries(false).filter_map(|e| e.path.extension()) {
|
||||
*expected_extension_counts
|
||||
.entry(extension.into())
|
||||
.or_insert(0) += 1;
|
||||
}
|
||||
assert_eq!(self.extension_counts, expected_extension_counts);
|
||||
}
|
||||
|
||||
fn to_vec(&self, include_ignored: bool) -> Vec<(&Path, u64, bool)> {
|
||||
|
||||
@@ -38,72 +38,73 @@ message Envelope {
|
||||
UpdateProject update_project = 30;
|
||||
RegisterProjectActivity register_project_activity = 31;
|
||||
UpdateWorktree update_worktree = 32;
|
||||
UpdateWorktreeExtensions update_worktree_extensions = 33;
|
||||
|
||||
CreateProjectEntry create_project_entry = 33;
|
||||
RenameProjectEntry rename_project_entry = 34;
|
||||
CopyProjectEntry copy_project_entry = 35;
|
||||
DeleteProjectEntry delete_project_entry = 36;
|
||||
ProjectEntryResponse project_entry_response = 37;
|
||||
CreateProjectEntry create_project_entry = 34;
|
||||
RenameProjectEntry rename_project_entry = 35;
|
||||
CopyProjectEntry copy_project_entry = 36;
|
||||
DeleteProjectEntry delete_project_entry = 37;
|
||||
ProjectEntryResponse project_entry_response = 38;
|
||||
|
||||
UpdateDiagnosticSummary update_diagnostic_summary = 38;
|
||||
StartLanguageServer start_language_server = 39;
|
||||
UpdateLanguageServer update_language_server = 40;
|
||||
UpdateDiagnosticSummary update_diagnostic_summary = 39;
|
||||
StartLanguageServer start_language_server = 40;
|
||||
UpdateLanguageServer update_language_server = 41;
|
||||
|
||||
OpenBufferById open_buffer_by_id = 41;
|
||||
OpenBufferByPath open_buffer_by_path = 42;
|
||||
OpenBufferResponse open_buffer_response = 43;
|
||||
UpdateBuffer update_buffer = 44;
|
||||
UpdateBufferFile update_buffer_file = 45;
|
||||
SaveBuffer save_buffer = 46;
|
||||
BufferSaved buffer_saved = 47;
|
||||
BufferReloaded buffer_reloaded = 48;
|
||||
ReloadBuffers reload_buffers = 49;
|
||||
ReloadBuffersResponse reload_buffers_response = 50;
|
||||
FormatBuffers format_buffers = 51;
|
||||
FormatBuffersResponse format_buffers_response = 52;
|
||||
GetCompletions get_completions = 53;
|
||||
GetCompletionsResponse get_completions_response = 54;
|
||||
ApplyCompletionAdditionalEdits apply_completion_additional_edits = 55;
|
||||
ApplyCompletionAdditionalEditsResponse apply_completion_additional_edits_response = 56;
|
||||
GetCodeActions get_code_actions = 57;
|
||||
GetCodeActionsResponse get_code_actions_response = 58;
|
||||
GetHover get_hover = 59;
|
||||
GetHoverResponse get_hover_response = 60;
|
||||
ApplyCodeAction apply_code_action = 61;
|
||||
ApplyCodeActionResponse apply_code_action_response = 62;
|
||||
PrepareRename prepare_rename = 63;
|
||||
PrepareRenameResponse prepare_rename_response = 64;
|
||||
PerformRename perform_rename = 65;
|
||||
PerformRenameResponse perform_rename_response = 66;
|
||||
SearchProject search_project = 67;
|
||||
SearchProjectResponse search_project_response = 68;
|
||||
OpenBufferById open_buffer_by_id = 42;
|
||||
OpenBufferByPath open_buffer_by_path = 43;
|
||||
OpenBufferResponse open_buffer_response = 44;
|
||||
UpdateBuffer update_buffer = 45;
|
||||
UpdateBufferFile update_buffer_file = 46;
|
||||
SaveBuffer save_buffer = 47;
|
||||
BufferSaved buffer_saved = 48;
|
||||
BufferReloaded buffer_reloaded = 49;
|
||||
ReloadBuffers reload_buffers = 50;
|
||||
ReloadBuffersResponse reload_buffers_response = 51;
|
||||
FormatBuffers format_buffers = 52;
|
||||
FormatBuffersResponse format_buffers_response = 53;
|
||||
GetCompletions get_completions = 54;
|
||||
GetCompletionsResponse get_completions_response = 55;
|
||||
ApplyCompletionAdditionalEdits apply_completion_additional_edits = 56;
|
||||
ApplyCompletionAdditionalEditsResponse apply_completion_additional_edits_response = 57;
|
||||
GetCodeActions get_code_actions = 58;
|
||||
GetCodeActionsResponse get_code_actions_response = 59;
|
||||
GetHover get_hover = 60;
|
||||
GetHoverResponse get_hover_response = 61;
|
||||
ApplyCodeAction apply_code_action = 62;
|
||||
ApplyCodeActionResponse apply_code_action_response = 63;
|
||||
PrepareRename prepare_rename = 64;
|
||||
PrepareRenameResponse prepare_rename_response = 65;
|
||||
PerformRename perform_rename = 66;
|
||||
PerformRenameResponse perform_rename_response = 67;
|
||||
SearchProject search_project = 68;
|
||||
SearchProjectResponse search_project_response = 69;
|
||||
|
||||
GetChannels get_channels = 69;
|
||||
GetChannelsResponse get_channels_response = 70;
|
||||
JoinChannel join_channel = 71;
|
||||
JoinChannelResponse join_channel_response = 72;
|
||||
LeaveChannel leave_channel = 73;
|
||||
SendChannelMessage send_channel_message = 74;
|
||||
SendChannelMessageResponse send_channel_message_response = 75;
|
||||
ChannelMessageSent channel_message_sent = 76;
|
||||
GetChannelMessages get_channel_messages = 77;
|
||||
GetChannelMessagesResponse get_channel_messages_response = 78;
|
||||
GetChannels get_channels = 70;
|
||||
GetChannelsResponse get_channels_response = 71;
|
||||
JoinChannel join_channel = 72;
|
||||
JoinChannelResponse join_channel_response = 73;
|
||||
LeaveChannel leave_channel = 74;
|
||||
SendChannelMessage send_channel_message = 75;
|
||||
SendChannelMessageResponse send_channel_message_response = 76;
|
||||
ChannelMessageSent channel_message_sent = 77;
|
||||
GetChannelMessages get_channel_messages = 78;
|
||||
GetChannelMessagesResponse get_channel_messages_response = 79;
|
||||
|
||||
UpdateContacts update_contacts = 79;
|
||||
UpdateInviteInfo update_invite_info = 80;
|
||||
ShowContacts show_contacts = 81;
|
||||
UpdateContacts update_contacts = 80;
|
||||
UpdateInviteInfo update_invite_info = 81;
|
||||
ShowContacts show_contacts = 82;
|
||||
|
||||
GetUsers get_users = 82;
|
||||
FuzzySearchUsers fuzzy_search_users = 83;
|
||||
UsersResponse users_response = 84;
|
||||
RequestContact request_contact = 85;
|
||||
RespondToContactRequest respond_to_contact_request = 86;
|
||||
RemoveContact remove_contact = 87;
|
||||
GetUsers get_users = 83;
|
||||
FuzzySearchUsers fuzzy_search_users = 84;
|
||||
UsersResponse users_response = 85;
|
||||
RequestContact request_contact = 86;
|
||||
RespondToContactRequest respond_to_contact_request = 87;
|
||||
RemoveContact remove_contact = 88;
|
||||
|
||||
Follow follow = 88;
|
||||
FollowResponse follow_response = 89;
|
||||
UpdateFollowers update_followers = 90;
|
||||
Unfollow unfollow = 91;
|
||||
Follow follow = 89;
|
||||
FollowResponse follow_response = 90;
|
||||
UpdateFollowers update_followers = 91;
|
||||
Unfollow unfollow = 92;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,7 +122,9 @@ message Test {
|
||||
uint64 id = 1;
|
||||
}
|
||||
|
||||
message RegisterProject {}
|
||||
message RegisterProject {
|
||||
bool online = 1;
|
||||
}
|
||||
|
||||
message RegisterProjectResponse {
|
||||
uint64 project_id = 1;
|
||||
@@ -134,6 +137,7 @@ message UnregisterProject {
|
||||
message UpdateProject {
|
||||
uint64 project_id = 1;
|
||||
repeated WorktreeMetadata worktrees = 2;
|
||||
bool online = 3;
|
||||
}
|
||||
|
||||
message RegisterProjectActivity {
|
||||
@@ -198,6 +202,13 @@ message UpdateWorktree {
|
||||
bool is_last_update = 7;
|
||||
}
|
||||
|
||||
message UpdateWorktreeExtensions {
|
||||
uint64 project_id = 1;
|
||||
uint64 worktree_id = 2;
|
||||
repeated string extensions = 3;
|
||||
repeated uint32 counts = 4;
|
||||
}
|
||||
|
||||
message CreateProjectEntry {
|
||||
uint64 project_id = 1;
|
||||
uint64 worktree_id = 2;
|
||||
|
||||
@@ -131,9 +131,9 @@ messages!(
|
||||
(PrepareRename, Background),
|
||||
(PrepareRenameResponse, Background),
|
||||
(ProjectEntryResponse, Foreground),
|
||||
(ProjectUnshared, Foreground),
|
||||
(RegisterProjectResponse, Foreground),
|
||||
(Ping, Foreground),
|
||||
(ProjectUnshared, Foreground),
|
||||
(RegisterProject, Foreground),
|
||||
(RegisterProjectActivity, Foreground),
|
||||
(ReloadBuffers, Foreground),
|
||||
@@ -163,6 +163,7 @@ messages!(
|
||||
(UpdateLanguageServer, Foreground),
|
||||
(UpdateProject, Foreground),
|
||||
(UpdateWorktree, Foreground),
|
||||
(UpdateWorktreeExtensions, Background),
|
||||
);
|
||||
|
||||
request_messages!(
|
||||
@@ -255,6 +256,7 @@ entity_messages!(
|
||||
UpdateLanguageServer,
|
||||
UpdateProject,
|
||||
UpdateWorktree,
|
||||
UpdateWorktreeExtensions,
|
||||
);
|
||||
|
||||
entity_messages!(channel_id, ChannelMessageSent);
|
||||
|
||||
@@ -6,4 +6,4 @@ pub use conn::Connection;
|
||||
pub use peer::*;
|
||||
mod macros;
|
||||
|
||||
pub const PROTOCOL_VERSION: u32 = 26;
|
||||
pub const PROTOCOL_VERSION: u32 = 27;
|
||||
|
||||
Reference in New Issue
Block a user