Compare commits

...

9 Commits

Author SHA1 Message Date
Antonio Scandurra
f48ef7bbef zed 0.67.1 2022-12-13 15:11:22 +01:00
Antonio Scandurra
8761fe9e1d Merge pull request #1965 from zed-industries/preserve-calls-during-server-restarts
Automatically re-join call when server is restarted
2022-12-13 15:10:37 +01:00
Mikayla Maki
26a6316e02 Merge pull request #1963 from zed-industries/fix-workspace-corner-cases
Fix small workspace deserialization corner cases
2022-12-13 15:10:34 +01:00
Max Brunsfeld
08fd5cc968 Merge pull request #1962 from zed-industries/scrolling-breaks-follow
Avoid breaking follow when syncing leader's scroll position
2022-12-13 15:10:27 +01:00
Mikayla Maki
f8647c7c23 Merge pull request #1961 from zed-industries/fix-next-screen-bug
Fixed issue where the NextScreen action would never have an effect
2022-12-13 15:10:18 +01:00
Mikayla Maki
b13a60735b Merge pull request #1959 from zed-industries/serializing-bug-fixes
Add check for if the user wants a blanks workspace when deserializing
2022-12-13 15:10:11 +01:00
Antonio Scandurra
4b612882a3 Merge pull request #1950 from zed-industries/reconnect-to-room
Automatically re-join call when client connection drops
2022-12-13 15:10:02 +01:00
Mikayla Maki
e108c16333 Merge pull request #1951 from zed-industries/dock-bugfix
Fix infinite loop in dock position when deserializing
# Conflicts:
#	crates/workspace/src/workspace.rs
2022-12-13 15:09:40 +01:00
Max Brunsfeld
ec3835b989 v0.67.x preview 2022-12-08 15:32:48 -08:00
32 changed files with 1384 additions and 439 deletions

2
Cargo.lock generated
View File

@@ -8101,7 +8101,7 @@ checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec"
[[package]]
name = "zed"
version = "0.67.0"
version = "0.67.1"
dependencies = [
"activity_indicator",
"anyhow",

View File

@@ -94,12 +94,18 @@ impl ActiveCall {
async fn handle_call_canceled(
this: ModelHandle<Self>,
_: TypedEnvelope<proto::CallCanceled>,
envelope: TypedEnvelope<proto::CallCanceled>,
_: Arc<Client>,
mut cx: AsyncAppContext,
) -> Result<()> {
this.update(&mut cx, |this, _| {
*this.incoming_call.0.borrow_mut() = None;
let mut incoming_call = this.incoming_call.0.borrow_mut();
if incoming_call
.as_ref()
.map_or(false, |call| call.room_id == envelope.payload.room_id)
{
incoming_call.take();
}
});
Ok(())
}

View File

@@ -4,7 +4,7 @@ use collections::HashMap;
use gpui::WeakModelHandle;
pub use live_kit_client::Frame;
use project::Project;
use std::sync::Arc;
use std::{fmt, sync::Arc};
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum ParticipantLocation {
@@ -36,7 +36,7 @@ pub struct LocalParticipant {
pub active_project: Option<WeakModelHandle<Project>>,
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct RemoteParticipant {
pub user: Arc<User>,
pub projects: Vec<proto::ParticipantProject>,
@@ -49,6 +49,12 @@ pub struct RemoteVideoTrack {
pub(crate) live_kit_track: Arc<live_kit_client::RemoteVideoTrack>,
}
impl fmt::Debug for RemoteVideoTrack {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RemoteVideoTrack").finish()
}
}
impl RemoteVideoTrack {
pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
self.live_kit_track.frames()

View File

@@ -5,14 +5,18 @@ use crate::{
use anyhow::{anyhow, Result};
use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
use collections::{BTreeMap, HashSet};
use futures::StreamExt;
use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
use futures::{FutureExt, StreamExt};
use gpui::{
AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
};
use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
use postage::stream::Stream;
use project::Project;
use std::{mem, sync::Arc};
use std::{mem, sync::Arc, time::Duration};
use util::{post_inc, ResultExt};
pub const RECONNECT_TIMEOUT: Duration = client::RECEIVE_TIMEOUT;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Event {
ParticipantLocationChanged {
@@ -46,6 +50,7 @@ pub struct Room {
user_store: ModelHandle<UserStore>,
subscriptions: Vec<client::Subscription>,
pending_room_update: Option<Task<()>>,
maintain_connection: Option<Task<Result<()>>>,
}
impl Entity for Room {
@@ -66,21 +71,6 @@ impl Room {
user_store: ModelHandle<UserStore>,
cx: &mut ModelContext<Self>,
) -> Self {
let mut client_status = client.status();
cx.spawn_weak(|this, mut cx| async move {
let is_connected = client_status
.next()
.await
.map_or(false, |s| s.is_connected());
// Even if we're initially connected, any future change of the status means we momentarily disconnected.
if !is_connected || client_status.next().await.is_some() {
if let Some(this) = this.upgrade(&cx) {
let _ = this.update(&mut cx, |this, cx| this.leave(cx));
}
}
})
.detach();
let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
let room = live_kit_client::Room::new();
let mut status = room.status();
@@ -131,6 +121,9 @@ impl Room {
None
};
let maintain_connection =
cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx));
Self {
id,
live_kit: live_kit_room,
@@ -145,6 +138,7 @@ impl Room {
pending_room_update: None,
client,
user_store,
maintain_connection: Some(maintain_connection),
}
}
@@ -241,10 +235,96 @@ impl Room {
self.participant_user_ids.clear();
self.subscriptions.clear();
self.live_kit.take();
self.pending_room_update.take();
self.maintain_connection.take();
self.client.send(proto::LeaveRoom {})?;
Ok(())
}
async fn maintain_connection(
this: WeakModelHandle<Self>,
client: Arc<Client>,
mut cx: AsyncAppContext,
) -> Result<()> {
let mut client_status = client.status();
loop {
let is_connected = client_status
.next()
.await
.map_or(false, |s| s.is_connected());
// Even if we're initially connected, any future change of the status means we momentarily disconnected.
if !is_connected || client_status.next().await.is_some() {
let room_id = this
.upgrade(&cx)
.ok_or_else(|| anyhow!("room was dropped"))?
.update(&mut cx, |this, cx| {
this.status = RoomStatus::Rejoining;
cx.notify();
this.id
});
// Wait for client to re-establish a connection to the server.
{
let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
let client_reconnection = async {
let mut remaining_attempts = 3;
while remaining_attempts > 0 {
if let Some(status) = client_status.next().await {
if status.is_connected() {
let rejoin_room = async {
let response =
client.request(proto::JoinRoom { id: room_id }).await?;
let room_proto =
response.room.ok_or_else(|| anyhow!("invalid room"))?;
this.upgrade(&cx)
.ok_or_else(|| anyhow!("room was dropped"))?
.update(&mut cx, |this, cx| {
this.status = RoomStatus::Online;
this.apply_room_update(room_proto, cx)
})?;
anyhow::Ok(())
};
if rejoin_room.await.is_ok() {
return true;
} else {
remaining_attempts -= 1;
}
}
} else {
return false;
}
}
false
}
.fuse();
futures::pin_mut!(client_reconnection);
futures::select_biased! {
reconnected = client_reconnection => {
if reconnected {
// If we successfully joined the room, go back around the loop
// waiting for future connection status changes.
continue;
}
}
_ = reconnection_timeout => {}
}
}
// The client failed to re-establish a connection to the server
// or an error occurred while trying to re-join the room. Either way
// we leave the room and return an error.
if let Some(this) = this.upgrade(&cx) {
let _ = this.update(&mut cx, |this, cx| this.leave(cx));
}
return Err(anyhow!(
"can't reconnect to room: client failed to re-establish connection"
));
}
}
}
pub fn id(&self) -> u64 {
self.id
}
@@ -325,9 +405,11 @@ impl Room {
}
if let Some(participants) = remote_participants.log_err() {
let mut participant_peer_ids = HashSet::default();
for (participant, user) in room.participants.into_iter().zip(participants) {
let peer_id = PeerId(participant.peer_id);
this.participant_user_ids.insert(participant.user_id);
participant_peer_ids.insert(peer_id);
let old_projects = this
.remote_participants
@@ -394,8 +476,8 @@ impl Room {
}
}
this.remote_participants.retain(|_, participant| {
if this.participant_user_ids.contains(&participant.user.id) {
this.remote_participants.retain(|peer_id, participant| {
if participant_peer_ids.contains(peer_id) {
true
} else {
for project in &participant.projects {
@@ -477,10 +559,12 @@ impl Room {
{
for participant in self.remote_participants.values() {
assert!(self.participant_user_ids.contains(&participant.user.id));
assert_ne!(participant.user.id, self.client.user_id().unwrap());
}
for participant in &self.pending_participants {
assert!(self.participant_user_ids.contains(&participant.id));
assert_ne!(participant.id, self.client.user_id().unwrap());
}
assert_eq!(
@@ -751,6 +835,7 @@ impl Default for ScreenTrack {
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum RoomStatus {
Online,
Rejoining,
Offline,
}

View File

@@ -1,5 +1,5 @@
CREATE TABLE "users" (
"id" INTEGER PRIMARY KEY,
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"github_login" VARCHAR,
"admin" BOOLEAN,
"email_address" VARCHAR(255) DEFAULT NULL,
@@ -17,14 +17,14 @@ CREATE INDEX "index_users_on_email_address" ON "users" ("email_address");
CREATE INDEX "index_users_on_github_user_id" ON "users" ("github_user_id");
CREATE TABLE "access_tokens" (
"id" INTEGER PRIMARY KEY,
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"user_id" INTEGER REFERENCES users (id),
"hash" VARCHAR(128)
);
CREATE INDEX "index_access_tokens_user_id" ON "access_tokens" ("user_id");
CREATE TABLE "contacts" (
"id" INTEGER PRIMARY KEY,
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"user_id_a" INTEGER REFERENCES users (id) NOT NULL,
"user_id_b" INTEGER REFERENCES users (id) NOT NULL,
"a_to_b" BOOLEAN NOT NULL,
@@ -35,12 +35,12 @@ CREATE UNIQUE INDEX "index_contacts_user_ids" ON "contacts" ("user_id_a", "user_
CREATE INDEX "index_contacts_user_id_b" ON "contacts" ("user_id_b");
CREATE TABLE "rooms" (
"id" INTEGER PRIMARY KEY,
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"live_kit_room" VARCHAR NOT NULL
);
CREATE TABLE "projects" (
"id" INTEGER PRIMARY KEY,
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"room_id" INTEGER REFERENCES rooms (id) NOT NULL,
"host_user_id" INTEGER REFERENCES users (id) NOT NULL,
"host_connection_id" INTEGER NOT NULL,
@@ -99,7 +99,7 @@ CREATE TABLE "language_servers" (
CREATE INDEX "index_language_servers_on_project_id" ON "language_servers" ("project_id");
CREATE TABLE "project_collaborators" (
"id" INTEGER PRIMARY KEY,
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE,
"connection_id" INTEGER NOT NULL,
"connection_epoch" TEXT NOT NULL,
@@ -110,13 +110,16 @@ CREATE TABLE "project_collaborators" (
CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborators" ("project_id");
CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_and_replica_id" ON "project_collaborators" ("project_id", "replica_id");
CREATE INDEX "index_project_collaborators_on_connection_epoch" ON "project_collaborators" ("connection_epoch");
CREATE INDEX "index_project_collaborators_on_connection_id" ON "project_collaborators" ("connection_id");
CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_connection_id_and_epoch" ON "project_collaborators" ("project_id", "connection_id", "connection_epoch");
CREATE TABLE "room_participants" (
"id" INTEGER PRIMARY KEY,
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"room_id" INTEGER NOT NULL REFERENCES rooms (id),
"user_id" INTEGER NOT NULL REFERENCES users (id),
"answering_connection_id" INTEGER,
"answering_connection_epoch" TEXT,
"answering_connection_lost" BOOLEAN NOT NULL,
"location_kind" INTEGER,
"location_project_id" INTEGER,
"initial_project_id" INTEGER,
@@ -125,5 +128,8 @@ CREATE TABLE "room_participants" (
"calling_connection_epoch" TEXT NOT NULL
);
CREATE UNIQUE INDEX "index_room_participants_on_user_id" ON "room_participants" ("user_id");
CREATE INDEX "index_room_participants_on_room_id" ON "room_participants" ("room_id");
CREATE INDEX "index_room_participants_on_answering_connection_epoch" ON "room_participants" ("answering_connection_epoch");
CREATE INDEX "index_room_participants_on_calling_connection_epoch" ON "room_participants" ("calling_connection_epoch");
CREATE INDEX "index_room_participants_on_answering_connection_id" ON "room_participants" ("answering_connection_id");
CREATE UNIQUE INDEX "index_room_participants_on_answering_connection_id_and_answering_connection_epoch" ON "room_participants" ("answering_connection_id", "answering_connection_epoch");

View File

@@ -0,0 +1,7 @@
ALTER TABLE "room_participants"
ADD "answering_connection_lost" BOOLEAN NOT NULL DEFAULT FALSE;
CREATE INDEX "index_project_collaborators_on_connection_id" ON "project_collaborators" ("connection_id");
CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_connection_id_and_epoch" ON "project_collaborators" ("project_id", "connection_id", "connection_epoch");
CREATE INDEX "index_room_participants_on_answering_connection_id" ON "room_participants" ("answering_connection_id");
CREATE UNIQUE INDEX "index_room_participants_on_answering_connection_id_and_answering_connection_epoch" ON "room_participants" ("answering_connection_id", "answering_connection_epoch");

View File

@@ -0,0 +1 @@
CREATE INDEX "index_room_participants_on_room_id" ON "room_participants" ("room_id");

View File

@@ -21,6 +21,7 @@ use dashmap::DashMap;
use futures::StreamExt;
use hyper::StatusCode;
use rpc::{proto, ConnectionId};
use sea_orm::Condition;
pub use sea_orm::ConnectOptions;
use sea_orm::{
entity::prelude::*, ActiveValue, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
@@ -47,7 +48,7 @@ pub struct Database {
background: Option<std::sync::Arc<gpui::executor::Background>>,
#[cfg(test)]
runtime: Option<tokio::runtime::Runtime>,
epoch: Uuid,
epoch: parking_lot::RwLock<Uuid>,
}
impl Database {
@@ -60,10 +61,20 @@ impl Database {
background: None,
#[cfg(test)]
runtime: None,
epoch: Uuid::new_v4(),
epoch: parking_lot::RwLock::new(Uuid::new_v4()),
})
}
#[cfg(test)]
pub fn reset(&self) {
self.rooms.clear();
*self.epoch.write() = Uuid::new_v4();
}
fn epoch(&self) -> Uuid {
*self.epoch.read()
}
pub async fn migrate(
&self,
migrations_path: &Path,
@@ -105,34 +116,14 @@ impl Database {
Ok(new_migrations)
}
pub async fn clear_stale_data(&self) -> Result<()> {
pub async fn delete_stale_projects(&self) -> Result<()> {
self.transaction(|tx| async move {
project_collaborator::Entity::delete_many()
.filter(project_collaborator::Column::ConnectionEpoch.ne(self.epoch))
.exec(&*tx)
.await?;
room_participant::Entity::delete_many()
.filter(
room_participant::Column::AnsweringConnectionEpoch
.ne(self.epoch)
.or(room_participant::Column::CallingConnectionEpoch.ne(self.epoch)),
)
.filter(project_collaborator::Column::ConnectionEpoch.ne(self.epoch()))
.exec(&*tx)
.await?;
project::Entity::delete_many()
.filter(project::Column::HostConnectionEpoch.ne(self.epoch))
.exec(&*tx)
.await?;
room::Entity::delete_many()
.filter(
room::Column::Id.not_in_subquery(
Query::select()
.column(room_participant::Column::RoomId)
.from(room_participant::Entity)
.distinct()
.to_owned(),
),
)
.filter(project::Column::HostConnectionEpoch.ne(self.epoch()))
.exec(&*tx)
.await?;
Ok(())
@@ -140,6 +131,74 @@ impl Database {
.await
}
pub async fn outdated_room_ids(&self) -> Result<Vec<RoomId>> {
self.transaction(|tx| async move {
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
enum QueryAs {
RoomId,
}
Ok(room_participant::Entity::find()
.select_only()
.column(room_participant::Column::RoomId)
.distinct()
.filter(room_participant::Column::AnsweringConnectionEpoch.ne(self.epoch()))
.into_values::<_, QueryAs>()
.all(&*tx)
.await?)
})
.await
}
pub async fn refresh_room(&self, room_id: RoomId) -> Result<RoomGuard<RefreshedRoom>> {
self.room_transaction(|tx| async move {
let stale_participant_filter = Condition::all()
.add(room_participant::Column::RoomId.eq(room_id))
.add(room_participant::Column::AnsweringConnectionId.is_not_null())
.add(room_participant::Column::AnsweringConnectionEpoch.ne(self.epoch()));
let stale_participant_user_ids = room_participant::Entity::find()
.filter(stale_participant_filter.clone())
.all(&*tx)
.await?
.into_iter()
.map(|participant| participant.user_id)
.collect::<Vec<_>>();
// Delete participants who failed to reconnect.
room_participant::Entity::delete_many()
.filter(stale_participant_filter)
.exec(&*tx)
.await?;
let room = self.get_room(room_id, &tx).await?;
let mut canceled_calls_to_user_ids = Vec::new();
// Delete the room if it becomes empty and cancel pending calls.
if room.participants.is_empty() {
canceled_calls_to_user_ids.extend(
room.pending_participants
.iter()
.map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
);
room_participant::Entity::delete_many()
.filter(room_participant::Column::RoomId.eq(room_id))
.exec(&*tx)
.await?;
room::Entity::delete_by_id(room_id).exec(&*tx).await?;
}
Ok((
room_id,
RefreshedRoom {
room,
stale_participant_user_ids,
canceled_calls_to_user_ids,
},
))
})
.await
}
// users
pub async fn create_user(
@@ -1033,10 +1092,11 @@ impl Database {
room_id: ActiveValue::set(room_id),
user_id: ActiveValue::set(user_id),
answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
answering_connection_epoch: ActiveValue::set(Some(self.epoch)),
answering_connection_epoch: ActiveValue::set(Some(self.epoch())),
answering_connection_lost: ActiveValue::set(false),
calling_user_id: ActiveValue::set(user_id),
calling_connection_id: ActiveValue::set(connection_id.0 as i32),
calling_connection_epoch: ActiveValue::set(self.epoch),
calling_connection_epoch: ActiveValue::set(self.epoch()),
..Default::default()
}
.insert(&*tx)
@@ -1060,9 +1120,10 @@ impl Database {
room_participant::ActiveModel {
room_id: ActiveValue::set(room_id),
user_id: ActiveValue::set(called_user_id),
answering_connection_lost: ActiveValue::set(false),
calling_user_id: ActiveValue::set(calling_user_id),
calling_connection_id: ActiveValue::set(calling_connection_id.0 as i32),
calling_connection_epoch: ActiveValue::set(self.epoch),
calling_connection_epoch: ActiveValue::set(self.epoch()),
initial_project_id: ActiveValue::set(initial_project_id),
..Default::default()
}
@@ -1172,14 +1233,23 @@ impl Database {
self.room_transaction(|tx| async move {
let result = room_participant::Entity::update_many()
.filter(
room_participant::Column::RoomId
.eq(room_id)
.and(room_participant::Column::UserId.eq(user_id))
.and(room_participant::Column::AnsweringConnectionId.is_null()),
Condition::all()
.add(room_participant::Column::RoomId.eq(room_id))
.add(room_participant::Column::UserId.eq(user_id))
.add(
Condition::any()
.add(room_participant::Column::AnsweringConnectionId.is_null())
.add(room_participant::Column::AnsweringConnectionLost.eq(true))
.add(
room_participant::Column::AnsweringConnectionEpoch
.ne(self.epoch()),
),
),
)
.set(room_participant::ActiveModel {
answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)),
answering_connection_epoch: ActiveValue::set(Some(self.epoch)),
answering_connection_epoch: ActiveValue::set(Some(self.epoch())),
answering_connection_lost: ActiveValue::set(false),
..Default::default()
})
.exec(&*tx)
@@ -1197,7 +1267,7 @@ impl Database {
pub async fn leave_room(&self, connection_id: ConnectionId) -> Result<RoomGuard<LeftRoom>> {
self.room_transaction(|tx| async move {
let leaving_participant = room_participant::Entity::find()
.filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
.filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
.one(&*tx)
.await?;
@@ -1240,7 +1310,7 @@ impl Database {
project_collaborator::Column::ProjectId,
QueryProjectIds::ProjectId,
)
.filter(project_collaborator::Column::ConnectionId.eq(connection_id.0))
.filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
.into_values::<_, QueryProjectIds>()
.all(&*tx)
.await?;
@@ -1277,7 +1347,7 @@ impl Database {
// Leave projects.
project_collaborator::Entity::delete_many()
.filter(project_collaborator::Column::ConnectionId.eq(connection_id.0))
.filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
.exec(&*tx)
.await?;
@@ -1286,7 +1356,7 @@ impl Database {
.filter(
project::Column::RoomId
.eq(room_id)
.and(project::Column::HostConnectionId.eq(connection_id.0)),
.and(project::Column::HostConnectionId.eq(connection_id.0 as i32)),
)
.exec(&*tx)
.await?;
@@ -1344,11 +1414,9 @@ impl Database {
}
let result = room_participant::Entity::update_many()
.filter(
room_participant::Column::RoomId
.eq(room_id)
.and(room_participant::Column::AnsweringConnectionId.eq(connection_id.0)),
)
.filter(room_participant::Column::RoomId.eq(room_id).and(
room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32),
))
.set(room_participant::ActiveModel {
location_kind: ActiveValue::set(Some(location_kind)),
location_project_id: ActiveValue::set(location_project_id),
@@ -1367,6 +1435,66 @@ impl Database {
.await
}
pub async fn connection_lost(
&self,
connection_id: ConnectionId,
) -> Result<RoomGuard<Vec<LeftProject>>> {
self.room_transaction(|tx| async move {
let participant = room_participant::Entity::find()
.filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("not a participant in any room"))?;
let room_id = participant.room_id;
room_participant::Entity::update(room_participant::ActiveModel {
answering_connection_lost: ActiveValue::set(true),
..participant.into_active_model()
})
.exec(&*tx)
.await?;
let collaborator_on_projects = project_collaborator::Entity::find()
.find_also_related(project::Entity)
.filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
.all(&*tx)
.await?;
project_collaborator::Entity::delete_many()
.filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
.exec(&*tx)
.await?;
let mut left_projects = Vec::new();
for (_, project) in collaborator_on_projects {
if let Some(project) = project {
let collaborators = project
.find_related(project_collaborator::Entity)
.all(&*tx)
.await?;
let connection_ids = collaborators
.into_iter()
.map(|collaborator| ConnectionId(collaborator.connection_id as u32))
.collect();
left_projects.push(LeftProject {
id: project.id,
host_user_id: project.host_user_id,
host_connection_id: ConnectionId(project.host_connection_id as u32),
connection_ids,
});
}
}
project::Entity::delete_many()
.filter(project::Column::HostConnectionId.eq(connection_id.0 as i32))
.exec(&*tx)
.await?;
Ok((room_id, left_projects))
})
.await
}
fn build_incoming_call(
room: &proto::Room,
called_user_id: UserId,
@@ -1514,7 +1642,7 @@ impl Database {
) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
self.room_transaction(|tx| async move {
let participant = room_participant::Entity::find()
.filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
.filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("could not find participant"))?;
@@ -1526,7 +1654,7 @@ impl Database {
room_id: ActiveValue::set(participant.room_id),
host_user_id: ActiveValue::set(participant.user_id),
host_connection_id: ActiveValue::set(connection_id.0 as i32),
host_connection_epoch: ActiveValue::set(self.epoch),
host_connection_epoch: ActiveValue::set(self.epoch()),
..Default::default()
}
.insert(&*tx)
@@ -1551,7 +1679,7 @@ impl Database {
project_collaborator::ActiveModel {
project_id: ActiveValue::set(project.id),
connection_id: ActiveValue::set(connection_id.0 as i32),
connection_epoch: ActiveValue::set(self.epoch),
connection_epoch: ActiveValue::set(self.epoch()),
user_id: ActiveValue::set(participant.user_id),
replica_id: ActiveValue::set(ReplicaId(0)),
is_host: ActiveValue::set(true),
@@ -1600,7 +1728,7 @@ impl Database {
) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
self.room_transaction(|tx| async move {
let project = project::Entity::find_by_id(project_id)
.filter(project::Column::HostConnectionId.eq(connection_id.0))
.filter(project::Column::HostConnectionId.eq(connection_id.0 as i32))
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("no such project"))?;
@@ -1654,7 +1782,7 @@ impl Database {
// Ensure the update comes from the host.
let project = project::Entity::find_by_id(project_id)
.filter(project::Column::HostConnectionId.eq(connection_id.0))
.filter(project::Column::HostConnectionId.eq(connection_id.0 as i32))
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("no such project"))?;
@@ -1837,7 +1965,7 @@ impl Database {
) -> Result<RoomGuard<(Project, ReplicaId)>> {
self.room_transaction(|tx| async move {
let participant = room_participant::Entity::find()
.filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
.filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("must join a room first"))?;
@@ -1865,7 +1993,7 @@ impl Database {
let new_collaborator = project_collaborator::ActiveModel {
project_id: ActiveValue::set(project_id),
connection_id: ActiveValue::set(connection_id.0 as i32),
connection_epoch: ActiveValue::set(self.epoch),
connection_epoch: ActiveValue::set(self.epoch()),
user_id: ActiveValue::set(participant.user_id),
replica_id: ActiveValue::set(replica_id),
is_host: ActiveValue::set(false),
@@ -1974,7 +2102,7 @@ impl Database {
.filter(
project_collaborator::Column::ProjectId
.eq(project_id)
.and(project_collaborator::Column::ConnectionId.eq(connection_id.0)),
.and(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)),
)
.exec(&*tx)
.await?;
@@ -2488,6 +2616,12 @@ pub struct LeftRoom {
pub canceled_calls_to_user_ids: Vec<UserId>,
}
pub struct RefreshedRoom {
pub room: proto::Room,
pub stale_participant_user_ids: Vec<UserId>,
pub canceled_calls_to_user_ids: Vec<UserId>,
}
pub struct Project {
pub collaborators: Vec<project_collaborator::Model>,
pub worktrees: BTreeMap<u64, Worktree>,

View File

@@ -10,6 +10,7 @@ pub struct Model {
pub user_id: UserId,
pub answering_connection_id: Option<i32>,
pub answering_connection_epoch: Option<Uuid>,
pub answering_connection_lost: bool,
pub location_kind: Option<i32>,
pub location_project_id: Option<ProjectId>,
pub initial_project_id: Option<ProjectId>,

View File

@@ -0,0 +1,36 @@
use std::{future::Future, time::Duration};
#[derive(Clone)]
pub enum Executor {
Production,
#[cfg(test)]
Deterministic(std::sync::Arc<gpui::executor::Background>),
}
impl Executor {
pub fn spawn_detached<F>(&self, future: F)
where
F: 'static + Send + Future<Output = ()>,
{
match self {
Executor::Production => {
tokio::spawn(future);
}
#[cfg(test)]
Executor::Deterministic(background) => {
background.spawn(future).detach();
}
}
}
pub fn sleep(&self, duration: Duration) -> impl Future<Output = ()> {
let this = self.clone();
async move {
match this {
Executor::Production => tokio::time::sleep(duration).await,
#[cfg(test)]
Executor::Deterministic(background) => background.timer(duration).await,
}
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -2,6 +2,7 @@ pub mod api;
pub mod auth;
pub mod db;
pub mod env;
pub mod executor;
#[cfg(test)]
mod integration_tests;
pub mod rpc;

View File

@@ -1,6 +1,6 @@
use anyhow::anyhow;
use axum::{routing::get, Router};
use collab::{db, env, AppState, Config, MigrateConfig, Result};
use collab::{db, env, executor::Executor, AppState, Config, MigrateConfig, Result};
use db::Database;
use std::{
env::args,
@@ -52,12 +52,12 @@ async fn main() -> Result<()> {
init_tracing(&config);
let state = AppState::new(config).await?;
state.db.clear_stale_data().await?;
let listener = TcpListener::bind(&format!("0.0.0.0:{}", state.config.http_port))
.expect("failed to bind TCP listener");
let rpc_server = collab::rpc::Server::new(state.clone());
let rpc_server = collab::rpc::Server::new(state.clone(), Executor::Production);
rpc_server.start().await?;
let app = collab::api::routes(rpc_server.clone(), state.clone())
.merge(collab::rpc::routes(rpc_server.clone()))

View File

@@ -3,6 +3,7 @@ mod connection_pool;
use crate::{
auth,
db::{self, Database, ProjectId, RoomId, User, UserId},
executor::Executor,
AppState, Result,
};
use anyhow::anyhow;
@@ -52,13 +53,12 @@ use std::{
},
time::Duration,
};
use tokio::{
sync::{Mutex, MutexGuard},
time::Sleep,
};
use tokio::sync::watch;
use tower::ServiceBuilder;
use tracing::{info_span, instrument, Instrument};
pub const RECONNECT_TIMEOUT: Duration = rpc::RECEIVE_TIMEOUT;
lazy_static! {
static ref METRIC_CONNECTIONS: IntGauge =
register_int_gauge!("connections", "number of connections").unwrap();
@@ -90,14 +90,14 @@ impl<R: RequestMessage> Response<R> {
struct Session {
user_id: UserId,
connection_id: ConnectionId,
db: Arc<Mutex<DbHandle>>,
db: Arc<tokio::sync::Mutex<DbHandle>>,
peer: Arc<Peer>,
connection_pool: Arc<Mutex<ConnectionPool>>,
connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
}
impl Session {
async fn db(&self) -> MutexGuard<DbHandle> {
async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
#[cfg(test)]
tokio::task::yield_now().await;
let guard = self.db.lock().await;
@@ -109,9 +109,7 @@ impl Session {
async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
#[cfg(test)]
tokio::task::yield_now().await;
let guard = self.connection_pool.lock().await;
#[cfg(test)]
tokio::task::yield_now().await;
let guard = self.connection_pool.lock();
ConnectionPoolGuard {
guard,
_not_send: PhantomData,
@@ -140,22 +138,15 @@ impl Deref for DbHandle {
pub struct Server {
peer: Arc<Peer>,
pub(crate) connection_pool: Arc<Mutex<ConnectionPool>>,
pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
app_state: Arc<AppState>,
executor: Executor,
handlers: HashMap<TypeId, MessageHandler>,
teardown: watch::Sender<()>,
}
pub trait Executor: Send + Clone {
type Sleep: Send + Future;
fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
fn sleep(&self, duration: Duration) -> Self::Sleep;
}
#[derive(Clone)]
pub struct RealExecutor;
pub(crate) struct ConnectionPoolGuard<'a> {
guard: MutexGuard<'a, ConnectionPool>,
guard: parking_lot::MutexGuard<'a, ConnectionPool>,
_not_send: PhantomData<Rc<()>>,
}
@@ -176,12 +167,14 @@ where
}
impl Server {
pub fn new(app_state: Arc<AppState>) -> Arc<Self> {
pub fn new(app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
let mut server = Self {
peer: Peer::new(),
app_state,
executor,
connection_pool: Default::default(),
handlers: Default::default(),
teardown: watch::channel(()).0,
};
server
@@ -244,6 +237,99 @@ impl Server {
Arc::new(server)
}
pub async fn start(&self) -> Result<()> {
self.app_state.db.delete_stale_projects().await?;
let db = self.app_state.db.clone();
let peer = self.peer.clone();
let timeout = self.executor.sleep(RECONNECT_TIMEOUT);
let pool = self.connection_pool.clone();
let live_kit_client = self.app_state.live_kit_client.clone();
self.executor.spawn_detached(async move {
timeout.await;
if let Some(room_ids) = db.outdated_room_ids().await.trace_err() {
for room_id in room_ids {
let mut contacts_to_update = HashSet::default();
let mut canceled_calls_to_user_ids = Vec::new();
let mut live_kit_room = String::new();
let mut delete_live_kit_room = false;
if let Ok(mut refreshed_room) = db.refresh_room(room_id).await {
room_updated(&refreshed_room.room, &peer);
contacts_to_update
.extend(refreshed_room.stale_participant_user_ids.iter().copied());
contacts_to_update
.extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
canceled_calls_to_user_ids =
mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
delete_live_kit_room = refreshed_room.room.participants.is_empty();
}
{
let pool = pool.lock();
for canceled_user_id in canceled_calls_to_user_ids {
for connection_id in pool.user_connection_ids(canceled_user_id) {
peer.send(
connection_id,
proto::CallCanceled {
room_id: room_id.to_proto(),
},
)
.trace_err();
}
}
}
for user_id in contacts_to_update {
let busy = db.is_user_busy(user_id).await.trace_err();
let contacts = db.get_contacts(user_id).await.trace_err();
if let Some((busy, contacts)) = busy.zip(contacts) {
let pool = pool.lock();
let updated_contact = contact_for_user(user_id, false, busy, &pool);
for contact in contacts {
if let db::Contact::Accepted {
user_id: contact_user_id,
..
} = contact
{
for contact_conn_id in pool.user_connection_ids(contact_user_id)
{
peer.send(
contact_conn_id,
proto::UpdateContacts {
contacts: vec![updated_contact.clone()],
remove_contacts: Default::default(),
incoming_requests: Default::default(),
remove_incoming_requests: Default::default(),
outgoing_requests: Default::default(),
remove_outgoing_requests: Default::default(),
},
)
.trace_err();
}
}
}
}
}
if let Some(live_kit) = live_kit_client.as_ref() {
if delete_live_kit_room {
live_kit.delete_room(live_kit_room).await.trace_err();
}
}
}
}
});
Ok(())
}
#[cfg(test)]
pub fn teardown(&self) {
self.peer.reset();
self.connection_pool.lock().reset();
let _ = self.teardown.send(());
}
fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
where
F: 'static + Send + Sync + Fn(TypedEnvelope<M>, Session) -> Fut,
@@ -330,29 +416,25 @@ impl Server {
})
}
pub fn handle_connection<E: Executor>(
pub fn handle_connection(
self: &Arc<Self>,
connection: Connection,
address: String,
user: User,
mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
executor: E,
executor: Executor,
) -> impl Future<Output = Result<()>> {
let this = self.clone();
let user_id = user.id;
let login = user.github_login;
let span = info_span!("handle connection", %user_id, %login, %address);
let mut teardown = self.teardown.subscribe();
async move {
let (connection_id, handle_io, mut incoming_rx) = this
.peer
.add_connection(connection, {
let executor = executor.clone();
move |duration| {
let timer = executor.sleep(duration);
async move {
timer.await;
}
}
move |duration| executor.sleep(duration)
});
tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
@@ -374,7 +456,7 @@ impl Server {
).await?;
{
let mut pool = this.connection_pool.lock().await;
let mut pool = this.connection_pool.lock();
pool.add_connection(connection_id, user_id, user.admin);
this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
@@ -393,7 +475,7 @@ impl Server {
let session = Session {
user_id,
connection_id,
db: Arc::new(Mutex::new(DbHandle(this.app_state.db.clone()))),
db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
peer: this.peer.clone(),
connection_pool: this.connection_pool.clone(),
live_kit_client: this.app_state.live_kit_client.clone()
@@ -416,6 +498,7 @@ impl Server {
let next_message = incoming_rx.next().fuse();
futures::pin_mut!(next_message);
futures::select_biased! {
_ = teardown.changed().fuse() => return Ok(()),
result = handle_io => {
if let Err(error) = result {
tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
@@ -452,7 +535,7 @@ impl Server {
drop(foreground_message_handlers);
tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
if let Err(error) = sign_out(session).await {
if let Err(error) = sign_out(session, teardown, executor).await {
tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
}
@@ -467,7 +550,7 @@ impl Server {
) -> Result<()> {
if let Some(user) = self.app_state.db.get_user_by_id(inviter_id).await? {
if let Some(code) = &user.invite_code {
let pool = self.connection_pool.lock().await;
let pool = self.connection_pool.lock();
let invitee_contact = contact_for_user(invitee_id, true, false, &pool);
for connection_id in pool.user_connection_ids(inviter_id) {
self.peer.send(
@@ -493,7 +576,7 @@ impl Server {
pub async fn invite_count_updated(self: &Arc<Self>, user_id: UserId) -> Result<()> {
if let Some(user) = self.app_state.db.get_user_by_id(user_id).await? {
if let Some(invite_code) = &user.invite_code {
let pool = self.connection_pool.lock().await;
let pool = self.connection_pool.lock();
for connection_id in pool.user_connection_ids(user_id) {
self.peer.send(
connection_id,
@@ -514,7 +597,7 @@ impl Server {
pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
ServerSnapshot {
connection_pool: ConnectionPoolGuard {
guard: self.connection_pool.lock().await,
guard: self.connection_pool.lock(),
_not_send: PhantomData,
},
peer: &self.peer,
@@ -543,18 +626,6 @@ impl<'a> Drop for ConnectionPoolGuard<'a> {
}
}
impl Executor for RealExecutor {
type Sleep = Sleep;
fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
tokio::task::spawn(future);
}
fn sleep(&self, duration: Duration) -> Self::Sleep {
tokio::time::sleep(duration)
}
}
fn broadcast<F>(
sender_id: ConnectionId,
receiver_ids: impl IntoIterator<Item = ConnectionId>,
@@ -636,7 +707,7 @@ pub async fn handle_websocket_request(
let connection = Connection::new(Box::pin(socket));
async move {
server
.handle_connection(connection, socket_address, user, None, RealExecutor)
.handle_connection(connection, socket_address, user, None, Executor::Production)
.await
.log_err();
}
@@ -647,7 +718,6 @@ pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result
let connections = server
.connection_pool
.lock()
.await
.connections()
.filter(|connection| !connection.admin)
.count();
@@ -665,30 +735,48 @@ pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result
Ok(encoded_metrics)
}
#[instrument(err)]
async fn sign_out(session: Session) -> Result<()> {
#[instrument(err, skip(executor))]
async fn sign_out(
session: Session,
mut teardown: watch::Receiver<()>,
executor: Executor,
) -> Result<()> {
session.peer.disconnect(session.connection_id);
let decline_calls = {
let mut pool = session.connection_pool().await;
pool.remove_connection(session.connection_id)?;
let mut connections = pool.user_connection_ids(session.user_id);
connections.next().is_none()
};
session
.connection_pool()
.await
.remove_connection(session.connection_id)?;
leave_room_for_session(&session).await.trace_err();
if decline_calls {
if let Some(room) = session
.db()
.await
.decline_call(None, session.user_id)
.await
.trace_err()
{
room_updated(&room, &session);
if let Some(mut left_projects) = session
.db()
.await
.connection_lost(session.connection_id)
.await
.trace_err()
{
for left_project in mem::take(&mut *left_projects) {
project_left(&left_project, &session);
}
}
update_user_contacts(session.user_id, &session).await?;
futures::select_biased! {
_ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
leave_room_for_session(&session).await.trace_err();
if !session
.connection_pool()
.await
.is_user_online(session.user_id)
{
let db = session.db().await;
if let Some(room) = db.decline_call(None, session.user_id).await.trace_err() {
room_updated(&room, &session.peer);
}
}
update_user_contacts(session.user_id, &session).await?;
}
_ = teardown.changed().fuse() => {}
}
Ok(())
}
@@ -750,17 +838,14 @@ async fn join_room(
response: Response<proto::JoinRoom>,
session: Session,
) -> Result<()> {
let room_id = RoomId::from_proto(request.id);
let room = {
let room = session
.db()
.await
.join_room(
RoomId::from_proto(request.id),
session.user_id,
session.connection_id,
)
.join_room(room_id, session.user_id, session.connection_id)
.await?;
room_updated(&room, &session);
room_updated(&room, &session.peer);
room.clone()
};
@@ -771,7 +856,12 @@ async fn join_room(
{
session
.peer
.send(connection_id, proto::CallCanceled {})
.send(
connection_id,
proto::CallCanceled {
room_id: room_id.to_proto(),
},
)
.trace_err();
}
@@ -835,7 +925,7 @@ async fn call(
initial_project_id,
)
.await?;
room_updated(&room, &session);
room_updated(&room, &session.peer);
mem::take(incoming_call)
};
update_user_contacts(called_user_id, &session).await?;
@@ -865,7 +955,7 @@ async fn call(
.await
.call_failed(room_id, called_user_id)
.await?;
room_updated(&room, &session);
room_updated(&room, &session.peer);
}
update_user_contacts(called_user_id, &session).await?;
@@ -885,7 +975,7 @@ async fn cancel_call(
.await
.cancel_call(Some(room_id), session.connection_id, called_user_id)
.await?;
room_updated(&room, &session);
room_updated(&room, &session.peer);
}
for connection_id in session
@@ -895,7 +985,12 @@ async fn cancel_call(
{
session
.peer
.send(connection_id, proto::CallCanceled {})
.send(
connection_id,
proto::CallCanceled {
room_id: room_id.to_proto(),
},
)
.trace_err();
}
response.send(proto::Ack {})?;
@@ -912,7 +1007,7 @@ async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<(
.await
.decline_call(Some(room_id), session.user_id)
.await?;
room_updated(&room, &session);
room_updated(&room, &session.peer);
}
for connection_id in session
@@ -922,7 +1017,12 @@ async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<(
{
session
.peer
.send(connection_id, proto::CallCanceled {})
.send(
connection_id,
proto::CallCanceled {
room_id: room_id.to_proto(),
},
)
.trace_err();
}
update_user_contacts(session.user_id, &session).await?;
@@ -943,7 +1043,7 @@ async fn update_participant_location(
.await
.update_room_participant_location(room_id, session.connection_id, location)
.await?;
room_updated(&room, &session);
room_updated(&room, &session.peer);
response.send(proto::Ack {})?;
Ok(())
}
@@ -965,7 +1065,7 @@ async fn share_project(
response.send(proto::ShareProjectResponse {
project_id: project_id.to_proto(),
})?;
room_updated(&room, &session);
room_updated(&room, &session.peer);
Ok(())
}
@@ -984,7 +1084,7 @@ async fn unshare_project(message: proto::UnshareProject, session: Session) -> Re
guest_connection_ids.iter().copied(),
|conn_id| session.peer.send(conn_id, message.clone()),
);
room_updated(&room, &session);
room_updated(&room, &session.peer);
Ok(())
}
@@ -1118,20 +1218,7 @@ async fn leave_project(request: proto::LeaveProject, session: Session) -> Result
host_connection_id = %project.host_connection_id,
"leave project"
);
broadcast(
sender_id,
project.connection_ids.iter().copied(),
|conn_id| {
session.peer.send(
conn_id,
proto::RemoveProjectCollaborator {
project_id: project_id.to_proto(),
peer_id: sender_id.0,
},
)
},
);
project_left(&project, &session);
Ok(())
}
@@ -1156,7 +1243,7 @@ async fn update_project(
.forward_send(session.connection_id, connection_id, request.clone())
},
);
room_updated(&room, &session);
room_updated(&room, &session.peer);
response.send(proto::Ack {})?;
Ok(())
@@ -1803,17 +1890,15 @@ fn contact_for_user(
}
}
fn room_updated(room: &proto::Room, session: &Session) {
fn room_updated(room: &proto::Room, peer: &Peer) {
for participant in &room.participants {
session
.peer
.send(
ConnectionId(participant.peer_id),
proto::RoomUpdated {
room: Some(room.clone()),
},
)
.trace_err();
peer.send(
ConnectionId(participant.peer_id),
proto::RoomUpdated {
room: Some(room.clone()),
},
)
.trace_err();
}
}
@@ -1854,6 +1939,7 @@ async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()>
async fn leave_room_for_session(session: &Session) -> Result<()> {
let mut contacts_to_update = HashSet::default();
let room_id;
let canceled_calls_to_user_ids;
let live_kit_room;
let delete_live_kit_room;
@@ -1862,43 +1948,11 @@ async fn leave_room_for_session(session: &Session) -> Result<()> {
contacts_to_update.insert(session.user_id);
for project in left_room.left_projects.values() {
for connection_id in &project.connection_ids {
if project.host_user_id == session.user_id {
session
.peer
.send(
*connection_id,
proto::UnshareProject {
project_id: project.id.to_proto(),
},
)
.trace_err();
} else {
session
.peer
.send(
*connection_id,
proto::RemoveProjectCollaborator {
project_id: project.id.to_proto(),
peer_id: session.connection_id.0,
},
)
.trace_err();
}
}
session
.peer
.send(
session.connection_id,
proto::UnshareProject {
project_id: project.id.to_proto(),
},
)
.trace_err();
project_left(project, session);
}
room_updated(&left_room.room, &session);
room_updated(&left_room.room, &session.peer);
room_id = RoomId::from_proto(left_room.room.id);
canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
live_kit_room = mem::take(&mut left_room.room.live_kit_room);
delete_live_kit_room = left_room.room.participants.is_empty();
@@ -1910,7 +1964,12 @@ async fn leave_room_for_session(session: &Session) -> Result<()> {
for connection_id in pool.user_connection_ids(canceled_user_id) {
session
.peer
.send(connection_id, proto::CallCanceled {})
.send(
connection_id,
proto::CallCanceled {
room_id: room_id.to_proto(),
},
)
.trace_err();
}
contacts_to_update.insert(canceled_user_id);
@@ -1935,6 +1994,43 @@ async fn leave_room_for_session(session: &Session) -> Result<()> {
Ok(())
}
fn project_left(project: &db::LeftProject, session: &Session) {
for connection_id in &project.connection_ids {
if project.host_user_id == session.user_id {
session
.peer
.send(
*connection_id,
proto::UnshareProject {
project_id: project.id.to_proto(),
},
)
.trace_err();
} else {
session
.peer
.send(
*connection_id,
proto::RemoveProjectCollaborator {
project_id: project.id.to_proto(),
peer_id: session.connection_id.0,
},
)
.trace_err();
}
}
session
.peer
.send(
session.connection_id,
proto::UnshareProject {
project_id: project.id.to_proto(),
},
)
.trace_err();
}
pub trait ResultExt {
type Ok;

View File

@@ -23,6 +23,11 @@ pub struct Connection {
}
impl ConnectionPool {
pub fn reset(&mut self) {
self.connections.clear();
self.connected_users.clear();
}
#[instrument(skip(self))]
pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId, admin: bool) {
self.connections

View File

@@ -139,9 +139,7 @@ impl<V: View> DragAndDrop<V> {
region_offset,
region,
}) => {
if (dbg!(event.position) - (dbg!(region.origin() + region_offset))).length()
> DEAD_ZONE
{
if (event.position - (region.origin() + region_offset)).length() > DEAD_ZONE {
this.currently_dragged = Some(State::Dragging {
window_id,
region_offset,

View File

@@ -184,7 +184,6 @@ actions!(
Paste,
Undo,
Redo,
NextScreen,
MoveUp,
PageUp,
MoveDown,

View File

@@ -4983,9 +4983,11 @@ fn test_following(cx: &mut gpui::MutableAppContext) {
|cx| build_editor(buffer.clone(), cx),
);
let is_still_following = Rc::new(RefCell::new(true));
let pending_update = Rc::new(RefCell::new(None));
follower.update(cx, {
let update = pending_update.clone();
let is_still_following = is_still_following.clone();
|_, cx| {
cx.subscribe(&leader, move |_, leader, event, cx| {
leader
@@ -4993,6 +4995,13 @@ fn test_following(cx: &mut gpui::MutableAppContext) {
.add_event_to_update_proto(event, &mut *update.borrow_mut(), cx);
})
.detach();
cx.subscribe(&follower, move |_, _, event, cx| {
if Editor::should_unfollow_on_event(event, cx) {
*is_still_following.borrow_mut() = false;
}
})
.detach();
}
});
@@ -5006,6 +5015,7 @@ fn test_following(cx: &mut gpui::MutableAppContext) {
.unwrap();
});
assert_eq!(follower.read(cx).selections.ranges(cx), vec![1..1]);
assert_eq!(*is_still_following.borrow(), true);
// Update the scroll position only
leader.update(cx, |leader, cx| {
@@ -5020,6 +5030,7 @@ fn test_following(cx: &mut gpui::MutableAppContext) {
follower.update(cx, |follower, cx| follower.scroll_position(cx)),
vec2f(1.5, 3.5)
);
assert_eq!(*is_still_following.borrow(), true);
// Update the selections and scroll position
leader.update(cx, |leader, cx| {
@@ -5036,6 +5047,7 @@ fn test_following(cx: &mut gpui::MutableAppContext) {
assert!(follower.scroll_manager.has_autoscroll_request());
});
assert_eq!(follower.read(cx).selections.ranges(cx), vec![0..0]);
assert_eq!(*is_still_following.borrow(), true);
// Creating a pending selection that precedes another selection
leader.update(cx, |leader, cx| {
@@ -5048,6 +5060,7 @@ fn test_following(cx: &mut gpui::MutableAppContext) {
.unwrap();
});
assert_eq!(follower.read(cx).selections.ranges(cx), vec![0..0, 1..1]);
assert_eq!(*is_still_following.borrow(), true);
// Extend the pending selection so that it surrounds another selection
leader.update(cx, |leader, cx| {
@@ -5059,6 +5072,19 @@ fn test_following(cx: &mut gpui::MutableAppContext) {
.unwrap();
});
assert_eq!(follower.read(cx).selections.ranges(cx), vec![0..2]);
// Scrolling locally breaks the follow
follower.update(cx, |follower, cx| {
let top_anchor = follower.buffer().read(cx).read(cx).anchor_after(0);
follower.set_scroll_anchor(
ScrollAnchor {
top_anchor,
offset: vec2f(0.0, 0.5),
},
cx,
);
});
assert_eq!(*is_still_following.borrow(), false);
}
#[test]

View File

@@ -88,7 +88,7 @@ impl FollowableItem for Editor {
}
if let Some(anchor) = state.scroll_top_anchor {
editor.set_scroll_anchor_internal(
editor.set_scroll_anchor_remote(
ScrollAnchor {
top_anchor: Anchor {
buffer_id: Some(state.buffer_id as usize),
@@ -98,7 +98,6 @@ impl FollowableItem for Editor {
},
offset: vec2f(state.scroll_x, state.scroll_y),
},
false,
cx,
);
}
@@ -213,7 +212,7 @@ impl FollowableItem for Editor {
self.set_selections_from_remote(selections, cx);
self.request_autoscroll_remotely(Autoscroll::newest(), cx);
} else if let Some(anchor) = message.scroll_top_anchor {
self.set_scroll_anchor(
self.set_scroll_anchor_remote(
ScrollAnchor {
top_anchor: Anchor {
buffer_id: Some(buffer_id),

View File

@@ -284,17 +284,17 @@ impl Editor {
}
pub fn set_scroll_anchor(&mut self, scroll_anchor: ScrollAnchor, cx: &mut ViewContext<Self>) {
self.set_scroll_anchor_internal(scroll_anchor, true, cx);
hide_hover(self, cx);
self.scroll_manager.set_anchor(scroll_anchor, true, cx);
}
pub(crate) fn set_scroll_anchor_internal(
pub(crate) fn set_scroll_anchor_remote(
&mut self,
scroll_anchor: ScrollAnchor,
local: bool,
cx: &mut ViewContext<Self>,
) {
hide_hover(self, cx);
self.scroll_manager.set_anchor(scroll_anchor, local, cx);
self.scroll_manager.set_anchor(scroll_anchor, false, cx);
}
pub fn scroll_screen(&mut self, amount: &ScrollAmount, cx: &mut ViewContext<Self>) {

View File

@@ -64,15 +64,15 @@ impl Editor {
return None;
}
self.context_menu.as_mut()?;
if self.mouse_context_menu.read(cx).visible() {
return None;
}
if matches!(self.mode, EditorMode::SingleLine) {
cx.propagate_action();
return None;
}
self.request_autoscroll(Autoscroll::Next, cx);
Some(())
}

View File

@@ -1431,8 +1431,8 @@ impl MutableAppContext {
true
}
// Returns an iterator over all of the view ids from the passed view up to the root of the window
// Includes the passed view itself
/// Returns an iterator over all of the view ids from the passed view up to the root of the window
/// Includes the passed view itself
fn ancestors(&self, window_id: usize, mut view_id: usize) -> impl Iterator<Item = usize> + '_ {
std::iter::once(view_id)
.into_iter()
@@ -3695,6 +3695,7 @@ impl<'a, T: View> ViewContext<'a, T> {
return false;
}
self.ancestors(view.window_id, view.view_id)
.skip(1) // Skip self id
.any(|parent| parent == self.view_id)
}

View File

@@ -212,7 +212,9 @@ message IncomingCall {
optional ParticipantProject initial_project = 4;
}
message CallCanceled {}
message CallCanceled {
uint64 room_id = 1;
}
message CancelCall {
uint64 room_id = 1;

View File

@@ -6,4 +6,4 @@ pub use conn::Connection;
pub use peer::*;
mod macros;
pub const PROTOCOL_VERSION: u32 = 40;
pub const PROTOCOL_VERSION: u32 = 41;

View File

@@ -453,14 +453,26 @@ impl StatusItemView for ToggleDockButton {
#[cfg(test)]
mod tests {
use std::ops::{Deref, DerefMut};
use std::{
ops::{Deref, DerefMut},
path::PathBuf,
};
use gpui::{AppContext, TestAppContext, UpdateView, ViewContext};
use project::{FakeFs, Project};
use settings::Settings;
use super::*;
use crate::{item::test::TestItem, sidebar::Sidebar, ItemHandle, Workspace};
use crate::{
dock,
item::test::TestItem,
persistence::model::{
SerializedItem, SerializedPane, SerializedPaneGroup, SerializedWorkspace,
},
register_deserializable_item,
sidebar::Sidebar,
ItemHandle, Workspace,
};
pub fn default_item_factory(
_workspace: &mut Workspace,
@@ -469,6 +481,51 @@ mod tests {
Box::new(cx.add_view(|_| TestItem::new()))
}
#[gpui::test]
async fn test_dock_workspace_infinite_loop(cx: &mut TestAppContext) {
cx.foreground().forbid_parking();
Settings::test_async(cx);
cx.update(|cx| {
register_deserializable_item::<TestItem>(cx);
});
let serialized_workspace = SerializedWorkspace {
id: 0,
location: Vec::<PathBuf>::new().into(),
dock_position: dock::DockPosition::Shown(DockAnchor::Expanded),
center_group: SerializedPaneGroup::Pane(SerializedPane {
active: false,
children: vec![],
}),
dock_pane: SerializedPane {
active: true,
children: vec![SerializedItem {
active: true,
item_id: 0,
kind: "test".into(),
}],
},
left_sidebar_open: false,
};
let fs = FakeFs::new(cx.background());
let project = Project::test(fs, [], cx).await;
let (_, _workspace) = cx.add_window(|cx| {
Workspace::new(
Some(serialized_workspace),
0,
project.clone(),
default_item_factory,
cx,
)
});
cx.foreground().run_until_parked();
//Should terminate
}
#[gpui::test]
async fn test_dock_hides_when_pane_empty(cx: &mut TestAppContext) {
let mut cx = DockTestContext::new(cx).await;

View File

@@ -681,6 +681,7 @@ pub(crate) mod test {
use super::{Item, ItemEvent};
pub struct TestItem {
pub workspace_id: WorkspaceId,
pub state: String,
pub label: String,
pub save_count: usize,
@@ -716,6 +717,7 @@ pub(crate) mod test {
nav_history: None,
tab_descriptions: None,
tab_detail: Default::default(),
workspace_id: self.workspace_id,
}
}
}
@@ -736,9 +738,16 @@ pub(crate) mod test {
nav_history: None,
tab_descriptions: None,
tab_detail: Default::default(),
workspace_id: 0,
}
}
pub fn new_deserialized(id: WorkspaceId) -> Self {
let mut this = Self::new();
this.workspace_id = id;
this
}
pub fn with_label(mut self, state: &str) -> Self {
self.label = state.to_string();
self
@@ -893,11 +902,12 @@ pub(crate) mod test {
fn deserialize(
_project: ModelHandle<Project>,
_workspace: WeakViewHandle<Workspace>,
_workspace_id: WorkspaceId,
workspace_id: WorkspaceId,
_item_id: ItemId,
_cx: &mut ViewContext<Pane>,
cx: &mut ViewContext<Pane>,
) -> Task<anyhow::Result<ViewHandle<Self>>> {
unreachable!("Cannot deserialize test item")
let view = cx.add_view(|_cx| Self::new_deserialized(workspace_id));
Task::Ready(Some(anyhow::Ok(view)))
}
}

View File

@@ -8,7 +8,7 @@ use anyhow::{anyhow, bail, Context, Result};
use db::{define_connection, query, sqlez::connection::Connection, sqlez_macros::sql};
use gpui::Axis;
use util::{iife, unzip_option, ResultExt};
use util::{ unzip_option, ResultExt};
use crate::dock::DockPosition;
use crate::WorkspaceId;
@@ -96,22 +96,16 @@ impl WorkspaceDb {
WorkspaceLocation,
bool,
DockPosition,
) = iife!({
if worktree_roots.len() == 0 {
self.select_row(sql!(
SELECT workspace_id, workspace_location, left_sidebar_open, dock_visible, dock_anchor
FROM workspaces
ORDER BY timestamp DESC LIMIT 1))?()?
} else {
self.select_row_bound(sql!(
SELECT workspace_id, workspace_location, left_sidebar_open, dock_visible, dock_anchor
FROM workspaces
WHERE workspace_location = ?))?(&workspace_location)?
}
) =
self.select_row_bound(sql!{
SELECT workspace_id, workspace_location, left_sidebar_open, dock_visible, dock_anchor
FROM workspaces
WHERE workspace_location = ?
})
.and_then(|mut prepared_statement| (prepared_statement)(&workspace_location))
.context("No workspaces found")
})
.warn_on_err()
.flatten()?;
.warn_on_err()
.flatten()?;
Some(SerializedWorkspace {
id: workspace_id,
@@ -205,11 +199,21 @@ impl WorkspaceDb {
}
}
query! {
pub fn last_workspace() -> Result<Option<WorkspaceLocation>> {
SELECT workspace_location
FROM workspaces
WHERE workspace_location IS NOT NULL
ORDER BY timestamp DESC
LIMIT 1
}
}
fn get_center_pane_group(&self, workspace_id: WorkspaceId) -> Result<SerializedPaneGroup> {
self.get_pane_group(workspace_id, None)?
Ok(self.get_pane_group(workspace_id, None)?
.into_iter()
.next()
.context("No center pane group")
.unwrap_or_else(|| SerializedPaneGroup::Pane(SerializedPane { active: true, children: vec![] })))
}
fn get_pane_group(
@@ -263,7 +267,7 @@ impl WorkspaceDb {
// Filter out panes and pane groups which don't have any children or items
.filter(|pane_group| match pane_group {
Ok(SerializedPaneGroup::Group { children, .. }) => !children.is_empty(),
Ok(SerializedPaneGroup::Pane(pane)) => !pane.children.is_empty(),
Ok(SerializedPaneGroup::Pane(pane)) => !pane.children.is_empty(),
_ => true,
})
.collect::<Result<_>>()
@@ -371,6 +375,15 @@ impl WorkspaceDb {
Ok(())
}
query!{
pub async fn update_timestamp(workspace_id: WorkspaceId) -> Result<()> {
UPDATE workspaces
SET timestamp = CURRENT_TIMESTAMP
WHERE workspace_id = ?
}
}
}
#[cfg(test)]

View File

@@ -106,7 +106,6 @@ impl SerializedPaneGroup {
.await
{
members.push(new_member);
current_active_pane = current_active_pane.or(active_pane);
}
}
@@ -115,6 +114,10 @@ impl SerializedPaneGroup {
return None;
}
if members.len() == 1 {
return Some((members.remove(0), current_active_pane));
}
Some((
Member::Axis(PaneAxis {
axis: *axis,
@@ -130,9 +133,10 @@ impl SerializedPaneGroup {
.deserialize_to(project, &pane, workspace_id, workspace, cx)
.await;
if pane.read_with(cx, |pane, _| pane.items().next().is_some()) {
if pane.read_with(cx, |pane, _| pane.items_len() != 0) {
Some((Member::Pane(pane.clone()), active.then(|| pane)))
} else {
workspace.update(cx, |workspace, cx| workspace.remove_pane(pane, cx));
None
}
}

View File

@@ -176,6 +176,7 @@ pub fn init(app_state: Arc<AppState>, cx: &mut MutableAppContext) {
}
}
});
cx.add_global_action({
let app_state = Arc::downgrade(&app_state);
move |_: &NewWindow, cx: &mut MutableAppContext| {
@@ -2181,7 +2182,11 @@ impl Workspace {
}
pub fn on_window_activation_changed(&mut self, active: bool, cx: &mut ViewContext<Self>) {
if !active {
if active {
cx.background()
.spawn(persistence::DB.update_timestamp(self.database_id()))
.detach();
} else {
for pane in &self.panes {
pane.update(cx, |pane, cx| {
if let Some(item) = pane.active_item() {
@@ -2295,6 +2300,9 @@ impl Workspace {
}
if let Some(location) = self.location(cx) {
// Load bearing special case:
// - with_local_workspace() relies on this to not have other stuff open
// when you open your log
if !location.paths().is_empty() {
let dock_pane = serialize_pane_handle(self.dock.pane(), cx);
let center_group = build_serialized_pane_group(&self.center.root, cx);
@@ -2322,9 +2330,14 @@ impl Workspace {
) {
cx.spawn(|mut cx| async move {
if let Some(workspace) = workspace.upgrade(&cx) {
let (project, dock_pane_handle) = workspace.read_with(&cx, |workspace, _| {
(workspace.project().clone(), workspace.dock_pane().clone())
});
let (project, dock_pane_handle, old_center_pane) =
workspace.read_with(&cx, |workspace, _| {
(
workspace.project().clone(),
workspace.dock_pane().clone(),
workspace.last_active_center_pane.clone(),
)
});
serialized_workspace
.dock_pane
@@ -2360,18 +2373,26 @@ impl Workspace {
cx.focus(workspace.panes.last().unwrap().clone());
}
} else {
cx.focus_self();
let old_center_handle = old_center_pane.and_then(|weak| weak.upgrade(cx));
if let Some(old_center_handle) = old_center_handle {
cx.focus(old_center_handle)
} else {
cx.focus_self()
}
}
// Note, if this is moved after 'set_dock_position'
// it causes an infinite loop.
if workspace.left_sidebar().read(cx).is_open()
!= serialized_workspace.left_sidebar_open
{
workspace.toggle_sidebar(SidebarSide::Left, cx);
}
// Dock::set_dock_position(workspace, serialized_workspace.dock_position, cx);
// Note that without after_window, the focus_self() and
// the focus the dock generates start generating alternating
// focus due to the deferred execution each triggering each other
cx.after_window_update(move |workspace, cx| {
Dock::set_dock_position(workspace, serialized_workspace.dock_position, cx);
});
cx.notify();
});
@@ -2537,7 +2558,7 @@ impl View for Workspace {
} else {
for pane in self.panes() {
let view = view.clone();
if pane.update(cx, |_, cx| cx.is_child(view)) {
if pane.update(cx, |_, cx| view.id() == cx.view_id() || cx.is_child(view)) {
self.handle_pane_focused(pane.clone(), cx);
break;
}
@@ -2613,6 +2634,10 @@ pub fn activate_workspace_for_project(
None
}
pub fn last_opened_workspace_paths() -> Option<WorkspaceLocation> {
DB.last_workspace().log_err().flatten()
}
#[allow(clippy::type_complexity)]
pub fn open_paths(
abs_paths: &[PathBuf],

View File

@@ -3,7 +3,7 @@ authors = ["Nathan Sobo <nathansobo@gmail.com>"]
description = "The fast, collaborative code editor."
edition = "2021"
name = "zed"
version = "0.67.0"
version = "0.67.1"
[lib]
name = "zed"

View File

@@ -1 +1 @@
dev
preview

View File

@@ -167,7 +167,7 @@ fn main() {
cx.platform().activate(true);
let paths = collect_path_args();
if paths.is_empty() {
cx.dispatch_global_action(NewFile);
restore_or_create_workspace(cx);
} else {
cx.dispatch_global_action(OpenPaths { paths });
}
@@ -176,7 +176,7 @@ fn main() {
cx.spawn(|cx| handle_cli_connection(connection, app_state.clone(), cx))
.detach();
} else {
cx.dispatch_global_action(NewFile);
restore_or_create_workspace(cx);
}
cx.spawn(|cx| async move {
while let Some(connection) = cli_connections_rx.next().await {
@@ -200,6 +200,16 @@ fn main() {
});
}
fn restore_or_create_workspace(cx: &mut gpui::MutableAppContext) {
if let Some(location) = workspace::last_opened_workspace_paths() {
cx.dispatch_global_action(OpenPaths {
paths: location.paths().as_ref().clone(),
})
} else {
cx.dispatch_global_action(NewFile);
}
}
fn init_paths() {
std::fs::create_dir_all(&*util::paths::CONFIG_DIR).expect("could not create config path");
std::fs::create_dir_all(&*util::paths::LANGUAGES_DIR).expect("could not create languages path");