Compare commits

...

1 Commits

Author SHA1 Message Date
Conrad Irwin
3bc1a41fd4 DO NOT MERGE
Fixes rust analyzer panics
2024-04-15 14:54:45 -06:00
3 changed files with 231 additions and 229 deletions

View File

@@ -529,132 +529,133 @@ impl Database {
ancestor_channel: Option<&channel::Model>,
tx: &DatabaseTransaction,
) -> Result<ChannelsForUser> {
let mut filter = channel_member::Column::UserId
.eq(user_id)
.and(channel_member::Column::Accepted.eq(true));
// let mut filter = channel_member::Column::UserId
// .eq(user_id)
// .and(channel_member::Column::Accepted.eq(true));
if let Some(ancestor) = ancestor_channel {
filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
}
// if let Some(ancestor) = ancestor_channel {
// filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
// }
let channel_memberships = channel_member::Entity::find()
.filter(filter)
.all(tx)
.await?;
// let channel_memberships = channel_member::Entity::find()
// .filter(filter)
// .all(tx)
// .await?;
let channels = channel::Entity::find()
.filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
.all(tx)
.await?;
// let channels = channel::Entity::find()
// .filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
// .all(tx)
// .await?;
let mut descendants = self
.get_channel_descendants_excluding_self(channels.iter(), tx)
.await?;
// let mut descendants = self
// .get_channel_descendants_excluding_self(channels.iter(), tx)
// .await?;
for channel in channels {
if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
descendants.insert(ix, channel);
}
}
// for channel in channels {
// if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
// descendants.insert(ix, channel);
// }
// }
let roles_by_channel_id = channel_memberships
.iter()
.map(|membership| (membership.channel_id, membership.role))
.collect::<HashMap<_, _>>();
// let roles_by_channel_id = channel_memberships
// .iter()
// .map(|membership| (membership.channel_id, membership.role))
// .collect::<HashMap<_, _>>();
let channels: Vec<Channel> = descendants
.into_iter()
.filter_map(|channel| {
let parent_role = roles_by_channel_id.get(&channel.root_id())?;
if parent_role.can_see_channel(channel.visibility) {
Some(Channel::from_model(channel))
} else {
None
}
})
.collect();
// let channels: Vec<Channel> = descendants
// .into_iter()
// .filter_map(|channel| {
// let parent_role = roles_by_channel_id.get(&channel.root_id())?;
// if parent_role.can_see_channel(channel.visibility) {
// Some(Channel::from_model(channel))
// } else {
// None
// }
// })
// .collect();
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
enum QueryUserIdsAndChannelIds {
ChannelId,
UserId,
}
// #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
// enum QueryUserIdsAndChannelIds {
// ChannelId,
// UserId,
// }
let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
{
let mut rows = room_participant::Entity::find()
.inner_join(room::Entity)
.filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
.select_only()
.column(room::Column::ChannelId)
.column(room_participant::Column::UserId)
.into_values::<_, QueryUserIdsAndChannelIds>()
.stream(tx)
.await?;
while let Some(row) = rows.next().await {
let row: (ChannelId, UserId) = row?;
channel_participants.entry(row.0).or_default().push(row.1)
}
}
// let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
// {
// let mut rows = room_participant::Entity::find()
// .inner_join(room::Entity)
// .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
// .select_only()
// .column(room::Column::ChannelId)
// .column(room_participant::Column::UserId)
// .into_values::<_, QueryUserIdsAndChannelIds>()
// .stream(tx)
// .await?;
// while let Some(row) = rows.next().await {
// let row: (ChannelId, UserId) = row?;
// channel_participants.entry(row.0).or_default().push(row.1)
// }
// }
let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
// let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
let mut channel_ids_by_buffer_id = HashMap::default();
let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
let mut rows = buffer::Entity::find()
.filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
.stream(tx)
.await?;
while let Some(row) = rows.next().await {
let row = row?;
channel_ids_by_buffer_id.insert(row.id, row.channel_id);
latest_buffer_versions.push(ChannelBufferVersion {
channel_id: row.channel_id.0 as u64,
epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
.latest_operation_lamport_timestamp
.zip(row.latest_operation_replica_id)
{
vec![VectorClockEntry {
timestamp: latest_lamport_timestamp as u32,
replica_id: latest_replica_id as u32,
}]
} else {
vec![]
},
});
}
drop(rows);
// let mut channel_ids_by_buffer_id = HashMap::default();
// let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
// let mut rows = buffer::Entity::find()
// .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
// .stream(tx)
// .await?;
// while let Some(row) = rows.next().await {
// let row = row?;
// channel_ids_by_buffer_id.insert(row.id, row.channel_id);
// latest_buffer_versions.push(ChannelBufferVersion {
// channel_id: row.channel_id.0 as u64,
// epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
// version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
// .latest_operation_lamport_timestamp
// .zip(row.latest_operation_replica_id)
// {
// vec![VectorClockEntry {
// timestamp: latest_lamport_timestamp as u32,
// replica_id: latest_replica_id as u32,
// }]
// } else {
// vec![]
// },
// });
// }
// drop(rows);
let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
// let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
let observed_buffer_versions = self
.observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
.await?;
// let observed_buffer_versions = self
// .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
// .await?;
let observed_channel_messages = self
.observed_channel_messages(&channel_ids, user_id, tx)
.await?;
// let observed_channel_messages = self
// .observed_channel_messages(&channel_ids, user_id, tx)
// .await?;
let hosted_projects = self
.get_hosted_projects(&channel_ids, &roles_by_channel_id, tx)
.await?;
// let hosted_projects = self
// .get_hosted_projects(&channel_ids, &roles_by_channel_id, tx)
// .await?;
let dev_servers = self.get_dev_servers(&channel_ids, tx).await?;
let remote_projects = self.get_remote_projects(&channel_ids, tx).await?;
// let dev_servers = self.get_dev_servers(&channel_ids, tx).await?;
// let remote_projects = self.get_remote_projects(&channel_ids, tx).await?;
Ok(ChannelsForUser {
channel_memberships,
channels,
hosted_projects,
dev_servers,
remote_projects,
channel_participants,
latest_buffer_versions,
latest_channel_messages,
observed_buffer_versions,
observed_channel_messages,
})
// Ok(ChannelsForUser {
// channel_memberships,
// channels,
// hosted_projects,
// dev_servers,
// remote_projects,
// channel_participants,
// latest_buffer_versions,
// latest_channel_messages,
// observed_buffer_versions,
// observed_channel_messages,
// })
Err(anyhow!("not implemented"))
}
/// Sets the role for the specified channel member.

View File

@@ -3,88 +3,89 @@ use super::*;
impl Database {
/// Retrieves the contacts for the user with the given ID.
pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
#[derive(Debug, FromQueryResult)]
struct ContactWithUserBusyStatuses {
user_id_a: UserId,
user_id_b: UserId,
a_to_b: bool,
accepted: bool,
user_a_busy: bool,
user_b_busy: bool,
}
Ok(vec![])
// #[derive(Debug, FromQueryResult)]
// struct ContactWithUserBusyStatuses {
// user_id_a: UserId,
// user_id_b: UserId,
// a_to_b: bool,
// accepted: bool,
// user_a_busy: bool,
// user_b_busy: bool,
// }
self.transaction(|tx| async move {
let user_a_participant = Alias::new("user_a_participant");
let user_b_participant = Alias::new("user_b_participant");
let mut db_contacts = contact::Entity::find()
.column_as(
Expr::col((user_a_participant.clone(), room_participant::Column::Id))
.is_not_null(),
"user_a_busy",
)
.column_as(
Expr::col((user_b_participant.clone(), room_participant::Column::Id))
.is_not_null(),
"user_b_busy",
)
.filter(
contact::Column::UserIdA
.eq(user_id)
.or(contact::Column::UserIdB.eq(user_id)),
)
.join_as(
JoinType::LeftJoin,
contact::Relation::UserARoomParticipant.def(),
user_a_participant,
)
.join_as(
JoinType::LeftJoin,
contact::Relation::UserBRoomParticipant.def(),
user_b_participant,
)
.into_model::<ContactWithUserBusyStatuses>()
.stream(&*tx)
.await?;
// self.transaction(|tx| async move {
// let user_a_participant = Alias::new("user_a_participant");
// let user_b_participant = Alias::new("user_b_participant");
// let mut db_contacts = contact::Entity::find()
// .column_as(
// Expr::col((user_a_participant.clone(), room_participant::Column::Id))
// .is_not_null(),
// "user_a_busy",
// )
// .column_as(
// Expr::col((user_b_participant.clone(), room_participant::Column::Id))
// .is_not_null(),
// "user_b_busy",
// )
// .filter(
// contact::Column::UserIdA
// .eq(user_id)
// .or(contact::Column::UserIdB.eq(user_id)),
// )
// .join_as(
// JoinType::LeftJoin,
// contact::Relation::UserARoomParticipant.def(),
// user_a_participant,
// )
// .join_as(
// JoinType::LeftJoin,
// contact::Relation::UserBRoomParticipant.def(),
// user_b_participant,
// )
// .into_model::<ContactWithUserBusyStatuses>()
// .stream(&*tx)
// .await?;
let mut contacts = Vec::new();
while let Some(db_contact) = db_contacts.next().await {
let db_contact = db_contact?;
if db_contact.user_id_a == user_id {
if db_contact.accepted {
contacts.push(Contact::Accepted {
user_id: db_contact.user_id_b,
busy: db_contact.user_b_busy,
});
} else if db_contact.a_to_b {
contacts.push(Contact::Outgoing {
user_id: db_contact.user_id_b,
})
} else {
contacts.push(Contact::Incoming {
user_id: db_contact.user_id_b,
});
}
} else if db_contact.accepted {
contacts.push(Contact::Accepted {
user_id: db_contact.user_id_a,
busy: db_contact.user_a_busy,
});
} else if db_contact.a_to_b {
contacts.push(Contact::Incoming {
user_id: db_contact.user_id_a,
});
} else {
contacts.push(Contact::Outgoing {
user_id: db_contact.user_id_a,
});
}
}
// let mut contacts = Vec::new();
// while let Some(db_contact) = db_contacts.next().await {
// let db_contact = db_contact?;
// if db_contact.user_id_a == user_id {
// if db_contact.accepted {
// contacts.push(Contact::Accepted {
// user_id: db_contact.user_id_b,
// busy: db_contact.user_b_busy,
// });
// } else if db_contact.a_to_b {
// contacts.push(Contact::Outgoing {
// user_id: db_contact.user_id_b,
// })
// } else {
// contacts.push(Contact::Incoming {
// user_id: db_contact.user_id_b,
// });
// }
// } else if db_contact.accepted {
// contacts.push(Contact::Accepted {
// user_id: db_contact.user_id_a,
// busy: db_contact.user_a_busy,
// });
// } else if db_contact.a_to_b {
// contacts.push(Contact::Incoming {
// user_id: db_contact.user_id_a,
// });
// } else {
// contacts.push(Contact::Outgoing {
// user_id: db_contact.user_id_a,
// });
// }
// }
contacts.sort_unstable_by_key(|contact| contact.user_id());
// contacts.sort_unstable_by_key(|contact| contact.user_id());
Ok(contacts)
})
.await
// Ok(contacts)
// })
// .await
}
/// Returns whether the given user is a busy (on a call).

View File

@@ -432,53 +432,53 @@ impl Database {
channel_ids: &[ChannelId],
tx: &DatabaseTransaction,
) -> Result<Vec<proto::ChannelMessageId>> {
let mut values = String::new();
for id in channel_ids {
if !values.is_empty() {
values.push_str(", ");
}
write!(&mut values, "({})", id).unwrap();
}
// let mut values = String::new();
// for id in channel_ids {
// if !values.is_empty() {
// values.push_str(", ");
// }
// write!(&mut values, "({})", id).unwrap();
// }
if values.is_empty() {
return Ok(Vec::default());
}
// if values.is_empty() {
// return Ok(Vec::default());
// }
let sql = format!(
r#"
SELECT
*
FROM (
SELECT
*,
row_number() OVER (
PARTITION BY channel_id
ORDER BY id DESC
) as row_number
FROM channel_messages
WHERE
channel_id in ({values})
) AS messages
WHERE
row_number = 1
"#,
);
// let sql = format!(
// r#"
// SELECT
// *
// FROM (
// SELECT
// *,
// row_number() OVER (
// PARTITION BY channel_id
// ORDER BY id DESC
// ) as row_number
// FROM channel_messages
// WHERE
// channel_id in ({values})
// ) AS messages
// WHERE
// row_number = 1
// "#,
// );
let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
let mut last_messages = channel_message::Model::find_by_statement(stmt)
.stream(tx)
.await?;
// let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
// let mut last_messages = channel_message::Model::find_by_statement(stmt)
// .stream(tx)
// .await?;
let mut results = Vec::new();
while let Some(result) = last_messages.next().await {
let message = result?;
results.push(proto::ChannelMessageId {
channel_id: message.channel_id.to_proto(),
message_id: message.id.to_proto(),
});
}
// let mut results = Vec::new();
// while let Some(result) = last_messages.next().await {
// let message = result?;
// results.push(proto::ChannelMessageId {
// channel_id: message.channel_id.to_proto(),
// message_id: message.id.to_proto(),
// });
// }
Ok(results)
Ok(vec![])
}
fn get_notification_kind_id_by_name(&self, notification_kind: &str) -> Option<i32> {