Compare commits
1 Commits
bash-timeo
...
rust-analy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3bc1a41fd4 |
@@ -529,132 +529,133 @@ impl Database {
|
|||||||
ancestor_channel: Option<&channel::Model>,
|
ancestor_channel: Option<&channel::Model>,
|
||||||
tx: &DatabaseTransaction,
|
tx: &DatabaseTransaction,
|
||||||
) -> Result<ChannelsForUser> {
|
) -> Result<ChannelsForUser> {
|
||||||
let mut filter = channel_member::Column::UserId
|
// let mut filter = channel_member::Column::UserId
|
||||||
.eq(user_id)
|
// .eq(user_id)
|
||||||
.and(channel_member::Column::Accepted.eq(true));
|
// .and(channel_member::Column::Accepted.eq(true));
|
||||||
|
|
||||||
if let Some(ancestor) = ancestor_channel {
|
// if let Some(ancestor) = ancestor_channel {
|
||||||
filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
|
// filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
|
||||||
}
|
// }
|
||||||
|
|
||||||
let channel_memberships = channel_member::Entity::find()
|
// let channel_memberships = channel_member::Entity::find()
|
||||||
.filter(filter)
|
// .filter(filter)
|
||||||
.all(tx)
|
// .all(tx)
|
||||||
.await?;
|
// .await?;
|
||||||
|
|
||||||
let channels = channel::Entity::find()
|
// let channels = channel::Entity::find()
|
||||||
.filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
|
// .filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
|
||||||
.all(tx)
|
// .all(tx)
|
||||||
.await?;
|
// .await?;
|
||||||
|
|
||||||
let mut descendants = self
|
// let mut descendants = self
|
||||||
.get_channel_descendants_excluding_self(channels.iter(), tx)
|
// .get_channel_descendants_excluding_self(channels.iter(), tx)
|
||||||
.await?;
|
// .await?;
|
||||||
|
|
||||||
for channel in channels {
|
// for channel in channels {
|
||||||
if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
|
// if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
|
||||||
descendants.insert(ix, channel);
|
// descendants.insert(ix, channel);
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
let roles_by_channel_id = channel_memberships
|
// let roles_by_channel_id = channel_memberships
|
||||||
.iter()
|
// .iter()
|
||||||
.map(|membership| (membership.channel_id, membership.role))
|
// .map(|membership| (membership.channel_id, membership.role))
|
||||||
.collect::<HashMap<_, _>>();
|
// .collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
let channels: Vec<Channel> = descendants
|
// let channels: Vec<Channel> = descendants
|
||||||
.into_iter()
|
// .into_iter()
|
||||||
.filter_map(|channel| {
|
// .filter_map(|channel| {
|
||||||
let parent_role = roles_by_channel_id.get(&channel.root_id())?;
|
// let parent_role = roles_by_channel_id.get(&channel.root_id())?;
|
||||||
if parent_role.can_see_channel(channel.visibility) {
|
// if parent_role.can_see_channel(channel.visibility) {
|
||||||
Some(Channel::from_model(channel))
|
// Some(Channel::from_model(channel))
|
||||||
} else {
|
// } else {
|
||||||
None
|
// None
|
||||||
}
|
// }
|
||||||
})
|
// })
|
||||||
.collect();
|
// .collect();
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
|
// #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
|
||||||
enum QueryUserIdsAndChannelIds {
|
// enum QueryUserIdsAndChannelIds {
|
||||||
ChannelId,
|
// ChannelId,
|
||||||
UserId,
|
// UserId,
|
||||||
}
|
// }
|
||||||
|
|
||||||
let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
|
// let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
|
||||||
{
|
// {
|
||||||
let mut rows = room_participant::Entity::find()
|
// let mut rows = room_participant::Entity::find()
|
||||||
.inner_join(room::Entity)
|
// .inner_join(room::Entity)
|
||||||
.filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
|
// .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
|
||||||
.select_only()
|
// .select_only()
|
||||||
.column(room::Column::ChannelId)
|
// .column(room::Column::ChannelId)
|
||||||
.column(room_participant::Column::UserId)
|
// .column(room_participant::Column::UserId)
|
||||||
.into_values::<_, QueryUserIdsAndChannelIds>()
|
// .into_values::<_, QueryUserIdsAndChannelIds>()
|
||||||
.stream(tx)
|
// .stream(tx)
|
||||||
.await?;
|
// .await?;
|
||||||
while let Some(row) = rows.next().await {
|
// while let Some(row) = rows.next().await {
|
||||||
let row: (ChannelId, UserId) = row?;
|
// let row: (ChannelId, UserId) = row?;
|
||||||
channel_participants.entry(row.0).or_default().push(row.1)
|
// channel_participants.entry(row.0).or_default().push(row.1)
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
|
// let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
|
||||||
|
|
||||||
let mut channel_ids_by_buffer_id = HashMap::default();
|
// let mut channel_ids_by_buffer_id = HashMap::default();
|
||||||
let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
|
// let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
|
||||||
let mut rows = buffer::Entity::find()
|
// let mut rows = buffer::Entity::find()
|
||||||
.filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
|
// .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
|
||||||
.stream(tx)
|
// .stream(tx)
|
||||||
.await?;
|
// .await?;
|
||||||
while let Some(row) = rows.next().await {
|
// while let Some(row) = rows.next().await {
|
||||||
let row = row?;
|
// let row = row?;
|
||||||
channel_ids_by_buffer_id.insert(row.id, row.channel_id);
|
// channel_ids_by_buffer_id.insert(row.id, row.channel_id);
|
||||||
latest_buffer_versions.push(ChannelBufferVersion {
|
// latest_buffer_versions.push(ChannelBufferVersion {
|
||||||
channel_id: row.channel_id.0 as u64,
|
// channel_id: row.channel_id.0 as u64,
|
||||||
epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
|
// epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
|
||||||
version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
|
// version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
|
||||||
.latest_operation_lamport_timestamp
|
// .latest_operation_lamport_timestamp
|
||||||
.zip(row.latest_operation_replica_id)
|
// .zip(row.latest_operation_replica_id)
|
||||||
{
|
// {
|
||||||
vec![VectorClockEntry {
|
// vec![VectorClockEntry {
|
||||||
timestamp: latest_lamport_timestamp as u32,
|
// timestamp: latest_lamport_timestamp as u32,
|
||||||
replica_id: latest_replica_id as u32,
|
// replica_id: latest_replica_id as u32,
|
||||||
}]
|
// }]
|
||||||
} else {
|
// } else {
|
||||||
vec![]
|
// vec![]
|
||||||
},
|
// },
|
||||||
});
|
// });
|
||||||
}
|
// }
|
||||||
drop(rows);
|
// drop(rows);
|
||||||
|
|
||||||
let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
|
// let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
|
||||||
|
|
||||||
let observed_buffer_versions = self
|
// let observed_buffer_versions = self
|
||||||
.observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
|
// .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
|
||||||
.await?;
|
// .await?;
|
||||||
|
|
||||||
let observed_channel_messages = self
|
// let observed_channel_messages = self
|
||||||
.observed_channel_messages(&channel_ids, user_id, tx)
|
// .observed_channel_messages(&channel_ids, user_id, tx)
|
||||||
.await?;
|
// .await?;
|
||||||
|
|
||||||
let hosted_projects = self
|
// let hosted_projects = self
|
||||||
.get_hosted_projects(&channel_ids, &roles_by_channel_id, tx)
|
// .get_hosted_projects(&channel_ids, &roles_by_channel_id, tx)
|
||||||
.await?;
|
// .await?;
|
||||||
|
|
||||||
let dev_servers = self.get_dev_servers(&channel_ids, tx).await?;
|
// let dev_servers = self.get_dev_servers(&channel_ids, tx).await?;
|
||||||
let remote_projects = self.get_remote_projects(&channel_ids, tx).await?;
|
// let remote_projects = self.get_remote_projects(&channel_ids, tx).await?;
|
||||||
|
|
||||||
Ok(ChannelsForUser {
|
// Ok(ChannelsForUser {
|
||||||
channel_memberships,
|
// channel_memberships,
|
||||||
channels,
|
// channels,
|
||||||
hosted_projects,
|
// hosted_projects,
|
||||||
dev_servers,
|
// dev_servers,
|
||||||
remote_projects,
|
// remote_projects,
|
||||||
channel_participants,
|
// channel_participants,
|
||||||
latest_buffer_versions,
|
// latest_buffer_versions,
|
||||||
latest_channel_messages,
|
// latest_channel_messages,
|
||||||
observed_buffer_versions,
|
// observed_buffer_versions,
|
||||||
observed_channel_messages,
|
// observed_channel_messages,
|
||||||
})
|
// })
|
||||||
|
Err(anyhow!("not implemented"))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sets the role for the specified channel member.
|
/// Sets the role for the specified channel member.
|
||||||
|
|||||||
@@ -3,88 +3,89 @@ use super::*;
|
|||||||
impl Database {
|
impl Database {
|
||||||
/// Retrieves the contacts for the user with the given ID.
|
/// Retrieves the contacts for the user with the given ID.
|
||||||
pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
|
pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
|
||||||
#[derive(Debug, FromQueryResult)]
|
Ok(vec![])
|
||||||
struct ContactWithUserBusyStatuses {
|
// #[derive(Debug, FromQueryResult)]
|
||||||
user_id_a: UserId,
|
// struct ContactWithUserBusyStatuses {
|
||||||
user_id_b: UserId,
|
// user_id_a: UserId,
|
||||||
a_to_b: bool,
|
// user_id_b: UserId,
|
||||||
accepted: bool,
|
// a_to_b: bool,
|
||||||
user_a_busy: bool,
|
// accepted: bool,
|
||||||
user_b_busy: bool,
|
// user_a_busy: bool,
|
||||||
}
|
// user_b_busy: bool,
|
||||||
|
// }
|
||||||
|
|
||||||
self.transaction(|tx| async move {
|
// self.transaction(|tx| async move {
|
||||||
let user_a_participant = Alias::new("user_a_participant");
|
// let user_a_participant = Alias::new("user_a_participant");
|
||||||
let user_b_participant = Alias::new("user_b_participant");
|
// let user_b_participant = Alias::new("user_b_participant");
|
||||||
let mut db_contacts = contact::Entity::find()
|
// let mut db_contacts = contact::Entity::find()
|
||||||
.column_as(
|
// .column_as(
|
||||||
Expr::col((user_a_participant.clone(), room_participant::Column::Id))
|
// Expr::col((user_a_participant.clone(), room_participant::Column::Id))
|
||||||
.is_not_null(),
|
// .is_not_null(),
|
||||||
"user_a_busy",
|
// "user_a_busy",
|
||||||
)
|
// )
|
||||||
.column_as(
|
// .column_as(
|
||||||
Expr::col((user_b_participant.clone(), room_participant::Column::Id))
|
// Expr::col((user_b_participant.clone(), room_participant::Column::Id))
|
||||||
.is_not_null(),
|
// .is_not_null(),
|
||||||
"user_b_busy",
|
// "user_b_busy",
|
||||||
)
|
// )
|
||||||
.filter(
|
// .filter(
|
||||||
contact::Column::UserIdA
|
// contact::Column::UserIdA
|
||||||
.eq(user_id)
|
// .eq(user_id)
|
||||||
.or(contact::Column::UserIdB.eq(user_id)),
|
// .or(contact::Column::UserIdB.eq(user_id)),
|
||||||
)
|
// )
|
||||||
.join_as(
|
// .join_as(
|
||||||
JoinType::LeftJoin,
|
// JoinType::LeftJoin,
|
||||||
contact::Relation::UserARoomParticipant.def(),
|
// contact::Relation::UserARoomParticipant.def(),
|
||||||
user_a_participant,
|
// user_a_participant,
|
||||||
)
|
// )
|
||||||
.join_as(
|
// .join_as(
|
||||||
JoinType::LeftJoin,
|
// JoinType::LeftJoin,
|
||||||
contact::Relation::UserBRoomParticipant.def(),
|
// contact::Relation::UserBRoomParticipant.def(),
|
||||||
user_b_participant,
|
// user_b_participant,
|
||||||
)
|
// )
|
||||||
.into_model::<ContactWithUserBusyStatuses>()
|
// .into_model::<ContactWithUserBusyStatuses>()
|
||||||
.stream(&*tx)
|
// .stream(&*tx)
|
||||||
.await?;
|
// .await?;
|
||||||
|
|
||||||
let mut contacts = Vec::new();
|
// let mut contacts = Vec::new();
|
||||||
while let Some(db_contact) = db_contacts.next().await {
|
// while let Some(db_contact) = db_contacts.next().await {
|
||||||
let db_contact = db_contact?;
|
// let db_contact = db_contact?;
|
||||||
if db_contact.user_id_a == user_id {
|
// if db_contact.user_id_a == user_id {
|
||||||
if db_contact.accepted {
|
// if db_contact.accepted {
|
||||||
contacts.push(Contact::Accepted {
|
// contacts.push(Contact::Accepted {
|
||||||
user_id: db_contact.user_id_b,
|
// user_id: db_contact.user_id_b,
|
||||||
busy: db_contact.user_b_busy,
|
// busy: db_contact.user_b_busy,
|
||||||
});
|
// });
|
||||||
} else if db_contact.a_to_b {
|
// } else if db_contact.a_to_b {
|
||||||
contacts.push(Contact::Outgoing {
|
// contacts.push(Contact::Outgoing {
|
||||||
user_id: db_contact.user_id_b,
|
// user_id: db_contact.user_id_b,
|
||||||
})
|
// })
|
||||||
} else {
|
// } else {
|
||||||
contacts.push(Contact::Incoming {
|
// contacts.push(Contact::Incoming {
|
||||||
user_id: db_contact.user_id_b,
|
// user_id: db_contact.user_id_b,
|
||||||
});
|
// });
|
||||||
}
|
// }
|
||||||
} else if db_contact.accepted {
|
// } else if db_contact.accepted {
|
||||||
contacts.push(Contact::Accepted {
|
// contacts.push(Contact::Accepted {
|
||||||
user_id: db_contact.user_id_a,
|
// user_id: db_contact.user_id_a,
|
||||||
busy: db_contact.user_a_busy,
|
// busy: db_contact.user_a_busy,
|
||||||
});
|
// });
|
||||||
} else if db_contact.a_to_b {
|
// } else if db_contact.a_to_b {
|
||||||
contacts.push(Contact::Incoming {
|
// contacts.push(Contact::Incoming {
|
||||||
user_id: db_contact.user_id_a,
|
// user_id: db_contact.user_id_a,
|
||||||
});
|
// });
|
||||||
} else {
|
// } else {
|
||||||
contacts.push(Contact::Outgoing {
|
// contacts.push(Contact::Outgoing {
|
||||||
user_id: db_contact.user_id_a,
|
// user_id: db_contact.user_id_a,
|
||||||
});
|
// });
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
contacts.sort_unstable_by_key(|contact| contact.user_id());
|
// contacts.sort_unstable_by_key(|contact| contact.user_id());
|
||||||
|
|
||||||
Ok(contacts)
|
// Ok(contacts)
|
||||||
})
|
// })
|
||||||
.await
|
// .await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns whether the given user is a busy (on a call).
|
/// Returns whether the given user is a busy (on a call).
|
||||||
|
|||||||
@@ -432,53 +432,53 @@ impl Database {
|
|||||||
channel_ids: &[ChannelId],
|
channel_ids: &[ChannelId],
|
||||||
tx: &DatabaseTransaction,
|
tx: &DatabaseTransaction,
|
||||||
) -> Result<Vec<proto::ChannelMessageId>> {
|
) -> Result<Vec<proto::ChannelMessageId>> {
|
||||||
let mut values = String::new();
|
// let mut values = String::new();
|
||||||
for id in channel_ids {
|
// for id in channel_ids {
|
||||||
if !values.is_empty() {
|
// if !values.is_empty() {
|
||||||
values.push_str(", ");
|
// values.push_str(", ");
|
||||||
}
|
// }
|
||||||
write!(&mut values, "({})", id).unwrap();
|
// write!(&mut values, "({})", id).unwrap();
|
||||||
}
|
// }
|
||||||
|
|
||||||
if values.is_empty() {
|
// if values.is_empty() {
|
||||||
return Ok(Vec::default());
|
// return Ok(Vec::default());
|
||||||
}
|
// }
|
||||||
|
|
||||||
let sql = format!(
|
// let sql = format!(
|
||||||
r#"
|
// r#"
|
||||||
SELECT
|
// SELECT
|
||||||
*
|
// *
|
||||||
FROM (
|
// FROM (
|
||||||
SELECT
|
// SELECT
|
||||||
*,
|
// *,
|
||||||
row_number() OVER (
|
// row_number() OVER (
|
||||||
PARTITION BY channel_id
|
// PARTITION BY channel_id
|
||||||
ORDER BY id DESC
|
// ORDER BY id DESC
|
||||||
) as row_number
|
// ) as row_number
|
||||||
FROM channel_messages
|
// FROM channel_messages
|
||||||
WHERE
|
// WHERE
|
||||||
channel_id in ({values})
|
// channel_id in ({values})
|
||||||
) AS messages
|
// ) AS messages
|
||||||
WHERE
|
// WHERE
|
||||||
row_number = 1
|
// row_number = 1
|
||||||
"#,
|
// "#,
|
||||||
);
|
// );
|
||||||
|
|
||||||
let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
|
// let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
|
||||||
let mut last_messages = channel_message::Model::find_by_statement(stmt)
|
// let mut last_messages = channel_message::Model::find_by_statement(stmt)
|
||||||
.stream(tx)
|
// .stream(tx)
|
||||||
.await?;
|
// .await?;
|
||||||
|
|
||||||
let mut results = Vec::new();
|
// let mut results = Vec::new();
|
||||||
while let Some(result) = last_messages.next().await {
|
// while let Some(result) = last_messages.next().await {
|
||||||
let message = result?;
|
// let message = result?;
|
||||||
results.push(proto::ChannelMessageId {
|
// results.push(proto::ChannelMessageId {
|
||||||
channel_id: message.channel_id.to_proto(),
|
// channel_id: message.channel_id.to_proto(),
|
||||||
message_id: message.id.to_proto(),
|
// message_id: message.id.to_proto(),
|
||||||
});
|
// });
|
||||||
}
|
// }
|
||||||
|
|
||||||
Ok(results)
|
Ok(vec![])
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_notification_kind_id_by_name(&self, notification_kind: &str) -> Option<i32> {
|
fn get_notification_kind_id_by_name(&self, notification_kind: &str) -> Option<i32> {
|
||||||
|
|||||||
Reference in New Issue
Block a user