Compare commits
1 Commits
vim-mrnugg
...
rust-analy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3bc1a41fd4 |
@@ -529,132 +529,133 @@ impl Database {
|
||||
ancestor_channel: Option<&channel::Model>,
|
||||
tx: &DatabaseTransaction,
|
||||
) -> Result<ChannelsForUser> {
|
||||
let mut filter = channel_member::Column::UserId
|
||||
.eq(user_id)
|
||||
.and(channel_member::Column::Accepted.eq(true));
|
||||
// let mut filter = channel_member::Column::UserId
|
||||
// .eq(user_id)
|
||||
// .and(channel_member::Column::Accepted.eq(true));
|
||||
|
||||
if let Some(ancestor) = ancestor_channel {
|
||||
filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
|
||||
}
|
||||
// if let Some(ancestor) = ancestor_channel {
|
||||
// filter = filter.and(channel_member::Column::ChannelId.eq(ancestor.root_id()));
|
||||
// }
|
||||
|
||||
let channel_memberships = channel_member::Entity::find()
|
||||
.filter(filter)
|
||||
.all(tx)
|
||||
.await?;
|
||||
// let channel_memberships = channel_member::Entity::find()
|
||||
// .filter(filter)
|
||||
// .all(tx)
|
||||
// .await?;
|
||||
|
||||
let channels = channel::Entity::find()
|
||||
.filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
|
||||
.all(tx)
|
||||
.await?;
|
||||
// let channels = channel::Entity::find()
|
||||
// .filter(channel::Column::Id.is_in(channel_memberships.iter().map(|m| m.channel_id)))
|
||||
// .all(tx)
|
||||
// .await?;
|
||||
|
||||
let mut descendants = self
|
||||
.get_channel_descendants_excluding_self(channels.iter(), tx)
|
||||
.await?;
|
||||
// let mut descendants = self
|
||||
// .get_channel_descendants_excluding_self(channels.iter(), tx)
|
||||
// .await?;
|
||||
|
||||
for channel in channels {
|
||||
if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
|
||||
descendants.insert(ix, channel);
|
||||
}
|
||||
}
|
||||
// for channel in channels {
|
||||
// if let Err(ix) = descendants.binary_search_by_key(&channel.path(), |c| c.path()) {
|
||||
// descendants.insert(ix, channel);
|
||||
// }
|
||||
// }
|
||||
|
||||
let roles_by_channel_id = channel_memberships
|
||||
.iter()
|
||||
.map(|membership| (membership.channel_id, membership.role))
|
||||
.collect::<HashMap<_, _>>();
|
||||
// let roles_by_channel_id = channel_memberships
|
||||
// .iter()
|
||||
// .map(|membership| (membership.channel_id, membership.role))
|
||||
// .collect::<HashMap<_, _>>();
|
||||
|
||||
let channels: Vec<Channel> = descendants
|
||||
.into_iter()
|
||||
.filter_map(|channel| {
|
||||
let parent_role = roles_by_channel_id.get(&channel.root_id())?;
|
||||
if parent_role.can_see_channel(channel.visibility) {
|
||||
Some(Channel::from_model(channel))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
// let channels: Vec<Channel> = descendants
|
||||
// .into_iter()
|
||||
// .filter_map(|channel| {
|
||||
// let parent_role = roles_by_channel_id.get(&channel.root_id())?;
|
||||
// if parent_role.can_see_channel(channel.visibility) {
|
||||
// Some(Channel::from_model(channel))
|
||||
// } else {
|
||||
// None
|
||||
// }
|
||||
// })
|
||||
// .collect();
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
|
||||
enum QueryUserIdsAndChannelIds {
|
||||
ChannelId,
|
||||
UserId,
|
||||
}
|
||||
// #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
|
||||
// enum QueryUserIdsAndChannelIds {
|
||||
// ChannelId,
|
||||
// UserId,
|
||||
// }
|
||||
|
||||
let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
|
||||
{
|
||||
let mut rows = room_participant::Entity::find()
|
||||
.inner_join(room::Entity)
|
||||
.filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
|
||||
.select_only()
|
||||
.column(room::Column::ChannelId)
|
||||
.column(room_participant::Column::UserId)
|
||||
.into_values::<_, QueryUserIdsAndChannelIds>()
|
||||
.stream(tx)
|
||||
.await?;
|
||||
while let Some(row) = rows.next().await {
|
||||
let row: (ChannelId, UserId) = row?;
|
||||
channel_participants.entry(row.0).or_default().push(row.1)
|
||||
}
|
||||
}
|
||||
// let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
|
||||
// {
|
||||
// let mut rows = room_participant::Entity::find()
|
||||
// .inner_join(room::Entity)
|
||||
// .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
|
||||
// .select_only()
|
||||
// .column(room::Column::ChannelId)
|
||||
// .column(room_participant::Column::UserId)
|
||||
// .into_values::<_, QueryUserIdsAndChannelIds>()
|
||||
// .stream(tx)
|
||||
// .await?;
|
||||
// while let Some(row) = rows.next().await {
|
||||
// let row: (ChannelId, UserId) = row?;
|
||||
// channel_participants.entry(row.0).or_default().push(row.1)
|
||||
// }
|
||||
// }
|
||||
|
||||
let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
|
||||
// let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
|
||||
|
||||
let mut channel_ids_by_buffer_id = HashMap::default();
|
||||
let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
|
||||
let mut rows = buffer::Entity::find()
|
||||
.filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
|
||||
.stream(tx)
|
||||
.await?;
|
||||
while let Some(row) = rows.next().await {
|
||||
let row = row?;
|
||||
channel_ids_by_buffer_id.insert(row.id, row.channel_id);
|
||||
latest_buffer_versions.push(ChannelBufferVersion {
|
||||
channel_id: row.channel_id.0 as u64,
|
||||
epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
|
||||
version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
|
||||
.latest_operation_lamport_timestamp
|
||||
.zip(row.latest_operation_replica_id)
|
||||
{
|
||||
vec![VectorClockEntry {
|
||||
timestamp: latest_lamport_timestamp as u32,
|
||||
replica_id: latest_replica_id as u32,
|
||||
}]
|
||||
} else {
|
||||
vec![]
|
||||
},
|
||||
});
|
||||
}
|
||||
drop(rows);
|
||||
// let mut channel_ids_by_buffer_id = HashMap::default();
|
||||
// let mut latest_buffer_versions: Vec<ChannelBufferVersion> = vec![];
|
||||
// let mut rows = buffer::Entity::find()
|
||||
// .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
|
||||
// .stream(tx)
|
||||
// .await?;
|
||||
// while let Some(row) = rows.next().await {
|
||||
// let row = row?;
|
||||
// channel_ids_by_buffer_id.insert(row.id, row.channel_id);
|
||||
// latest_buffer_versions.push(ChannelBufferVersion {
|
||||
// channel_id: row.channel_id.0 as u64,
|
||||
// epoch: row.latest_operation_epoch.unwrap_or_default() as u64,
|
||||
// version: if let Some((latest_lamport_timestamp, latest_replica_id)) = row
|
||||
// .latest_operation_lamport_timestamp
|
||||
// .zip(row.latest_operation_replica_id)
|
||||
// {
|
||||
// vec![VectorClockEntry {
|
||||
// timestamp: latest_lamport_timestamp as u32,
|
||||
// replica_id: latest_replica_id as u32,
|
||||
// }]
|
||||
// } else {
|
||||
// vec![]
|
||||
// },
|
||||
// });
|
||||
// }
|
||||
// drop(rows);
|
||||
|
||||
let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
|
||||
// let latest_channel_messages = self.latest_channel_messages(&channel_ids, tx).await?;
|
||||
|
||||
let observed_buffer_versions = self
|
||||
.observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
|
||||
.await?;
|
||||
// let observed_buffer_versions = self
|
||||
// .observed_channel_buffer_changes(&channel_ids_by_buffer_id, user_id, tx)
|
||||
// .await?;
|
||||
|
||||
let observed_channel_messages = self
|
||||
.observed_channel_messages(&channel_ids, user_id, tx)
|
||||
.await?;
|
||||
// let observed_channel_messages = self
|
||||
// .observed_channel_messages(&channel_ids, user_id, tx)
|
||||
// .await?;
|
||||
|
||||
let hosted_projects = self
|
||||
.get_hosted_projects(&channel_ids, &roles_by_channel_id, tx)
|
||||
.await?;
|
||||
// let hosted_projects = self
|
||||
// .get_hosted_projects(&channel_ids, &roles_by_channel_id, tx)
|
||||
// .await?;
|
||||
|
||||
let dev_servers = self.get_dev_servers(&channel_ids, tx).await?;
|
||||
let remote_projects = self.get_remote_projects(&channel_ids, tx).await?;
|
||||
// let dev_servers = self.get_dev_servers(&channel_ids, tx).await?;
|
||||
// let remote_projects = self.get_remote_projects(&channel_ids, tx).await?;
|
||||
|
||||
Ok(ChannelsForUser {
|
||||
channel_memberships,
|
||||
channels,
|
||||
hosted_projects,
|
||||
dev_servers,
|
||||
remote_projects,
|
||||
channel_participants,
|
||||
latest_buffer_versions,
|
||||
latest_channel_messages,
|
||||
observed_buffer_versions,
|
||||
observed_channel_messages,
|
||||
})
|
||||
// Ok(ChannelsForUser {
|
||||
// channel_memberships,
|
||||
// channels,
|
||||
// hosted_projects,
|
||||
// dev_servers,
|
||||
// remote_projects,
|
||||
// channel_participants,
|
||||
// latest_buffer_versions,
|
||||
// latest_channel_messages,
|
||||
// observed_buffer_versions,
|
||||
// observed_channel_messages,
|
||||
// })
|
||||
Err(anyhow!("not implemented"))
|
||||
}
|
||||
|
||||
/// Sets the role for the specified channel member.
|
||||
|
||||
@@ -3,88 +3,89 @@ use super::*;
|
||||
impl Database {
|
||||
/// Retrieves the contacts for the user with the given ID.
|
||||
pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
|
||||
#[derive(Debug, FromQueryResult)]
|
||||
struct ContactWithUserBusyStatuses {
|
||||
user_id_a: UserId,
|
||||
user_id_b: UserId,
|
||||
a_to_b: bool,
|
||||
accepted: bool,
|
||||
user_a_busy: bool,
|
||||
user_b_busy: bool,
|
||||
}
|
||||
Ok(vec![])
|
||||
// #[derive(Debug, FromQueryResult)]
|
||||
// struct ContactWithUserBusyStatuses {
|
||||
// user_id_a: UserId,
|
||||
// user_id_b: UserId,
|
||||
// a_to_b: bool,
|
||||
// accepted: bool,
|
||||
// user_a_busy: bool,
|
||||
// user_b_busy: bool,
|
||||
// }
|
||||
|
||||
self.transaction(|tx| async move {
|
||||
let user_a_participant = Alias::new("user_a_participant");
|
||||
let user_b_participant = Alias::new("user_b_participant");
|
||||
let mut db_contacts = contact::Entity::find()
|
||||
.column_as(
|
||||
Expr::col((user_a_participant.clone(), room_participant::Column::Id))
|
||||
.is_not_null(),
|
||||
"user_a_busy",
|
||||
)
|
||||
.column_as(
|
||||
Expr::col((user_b_participant.clone(), room_participant::Column::Id))
|
||||
.is_not_null(),
|
||||
"user_b_busy",
|
||||
)
|
||||
.filter(
|
||||
contact::Column::UserIdA
|
||||
.eq(user_id)
|
||||
.or(contact::Column::UserIdB.eq(user_id)),
|
||||
)
|
||||
.join_as(
|
||||
JoinType::LeftJoin,
|
||||
contact::Relation::UserARoomParticipant.def(),
|
||||
user_a_participant,
|
||||
)
|
||||
.join_as(
|
||||
JoinType::LeftJoin,
|
||||
contact::Relation::UserBRoomParticipant.def(),
|
||||
user_b_participant,
|
||||
)
|
||||
.into_model::<ContactWithUserBusyStatuses>()
|
||||
.stream(&*tx)
|
||||
.await?;
|
||||
// self.transaction(|tx| async move {
|
||||
// let user_a_participant = Alias::new("user_a_participant");
|
||||
// let user_b_participant = Alias::new("user_b_participant");
|
||||
// let mut db_contacts = contact::Entity::find()
|
||||
// .column_as(
|
||||
// Expr::col((user_a_participant.clone(), room_participant::Column::Id))
|
||||
// .is_not_null(),
|
||||
// "user_a_busy",
|
||||
// )
|
||||
// .column_as(
|
||||
// Expr::col((user_b_participant.clone(), room_participant::Column::Id))
|
||||
// .is_not_null(),
|
||||
// "user_b_busy",
|
||||
// )
|
||||
// .filter(
|
||||
// contact::Column::UserIdA
|
||||
// .eq(user_id)
|
||||
// .or(contact::Column::UserIdB.eq(user_id)),
|
||||
// )
|
||||
// .join_as(
|
||||
// JoinType::LeftJoin,
|
||||
// contact::Relation::UserARoomParticipant.def(),
|
||||
// user_a_participant,
|
||||
// )
|
||||
// .join_as(
|
||||
// JoinType::LeftJoin,
|
||||
// contact::Relation::UserBRoomParticipant.def(),
|
||||
// user_b_participant,
|
||||
// )
|
||||
// .into_model::<ContactWithUserBusyStatuses>()
|
||||
// .stream(&*tx)
|
||||
// .await?;
|
||||
|
||||
let mut contacts = Vec::new();
|
||||
while let Some(db_contact) = db_contacts.next().await {
|
||||
let db_contact = db_contact?;
|
||||
if db_contact.user_id_a == user_id {
|
||||
if db_contact.accepted {
|
||||
contacts.push(Contact::Accepted {
|
||||
user_id: db_contact.user_id_b,
|
||||
busy: db_contact.user_b_busy,
|
||||
});
|
||||
} else if db_contact.a_to_b {
|
||||
contacts.push(Contact::Outgoing {
|
||||
user_id: db_contact.user_id_b,
|
||||
})
|
||||
} else {
|
||||
contacts.push(Contact::Incoming {
|
||||
user_id: db_contact.user_id_b,
|
||||
});
|
||||
}
|
||||
} else if db_contact.accepted {
|
||||
contacts.push(Contact::Accepted {
|
||||
user_id: db_contact.user_id_a,
|
||||
busy: db_contact.user_a_busy,
|
||||
});
|
||||
} else if db_contact.a_to_b {
|
||||
contacts.push(Contact::Incoming {
|
||||
user_id: db_contact.user_id_a,
|
||||
});
|
||||
} else {
|
||||
contacts.push(Contact::Outgoing {
|
||||
user_id: db_contact.user_id_a,
|
||||
});
|
||||
}
|
||||
}
|
||||
// let mut contacts = Vec::new();
|
||||
// while let Some(db_contact) = db_contacts.next().await {
|
||||
// let db_contact = db_contact?;
|
||||
// if db_contact.user_id_a == user_id {
|
||||
// if db_contact.accepted {
|
||||
// contacts.push(Contact::Accepted {
|
||||
// user_id: db_contact.user_id_b,
|
||||
// busy: db_contact.user_b_busy,
|
||||
// });
|
||||
// } else if db_contact.a_to_b {
|
||||
// contacts.push(Contact::Outgoing {
|
||||
// user_id: db_contact.user_id_b,
|
||||
// })
|
||||
// } else {
|
||||
// contacts.push(Contact::Incoming {
|
||||
// user_id: db_contact.user_id_b,
|
||||
// });
|
||||
// }
|
||||
// } else if db_contact.accepted {
|
||||
// contacts.push(Contact::Accepted {
|
||||
// user_id: db_contact.user_id_a,
|
||||
// busy: db_contact.user_a_busy,
|
||||
// });
|
||||
// } else if db_contact.a_to_b {
|
||||
// contacts.push(Contact::Incoming {
|
||||
// user_id: db_contact.user_id_a,
|
||||
// });
|
||||
// } else {
|
||||
// contacts.push(Contact::Outgoing {
|
||||
// user_id: db_contact.user_id_a,
|
||||
// });
|
||||
// }
|
||||
// }
|
||||
|
||||
contacts.sort_unstable_by_key(|contact| contact.user_id());
|
||||
// contacts.sort_unstable_by_key(|contact| contact.user_id());
|
||||
|
||||
Ok(contacts)
|
||||
})
|
||||
.await
|
||||
// Ok(contacts)
|
||||
// })
|
||||
// .await
|
||||
}
|
||||
|
||||
/// Returns whether the given user is a busy (on a call).
|
||||
|
||||
@@ -432,53 +432,53 @@ impl Database {
|
||||
channel_ids: &[ChannelId],
|
||||
tx: &DatabaseTransaction,
|
||||
) -> Result<Vec<proto::ChannelMessageId>> {
|
||||
let mut values = String::new();
|
||||
for id in channel_ids {
|
||||
if !values.is_empty() {
|
||||
values.push_str(", ");
|
||||
}
|
||||
write!(&mut values, "({})", id).unwrap();
|
||||
}
|
||||
// let mut values = String::new();
|
||||
// for id in channel_ids {
|
||||
// if !values.is_empty() {
|
||||
// values.push_str(", ");
|
||||
// }
|
||||
// write!(&mut values, "({})", id).unwrap();
|
||||
// }
|
||||
|
||||
if values.is_empty() {
|
||||
return Ok(Vec::default());
|
||||
}
|
||||
// if values.is_empty() {
|
||||
// return Ok(Vec::default());
|
||||
// }
|
||||
|
||||
let sql = format!(
|
||||
r#"
|
||||
SELECT
|
||||
*
|
||||
FROM (
|
||||
SELECT
|
||||
*,
|
||||
row_number() OVER (
|
||||
PARTITION BY channel_id
|
||||
ORDER BY id DESC
|
||||
) as row_number
|
||||
FROM channel_messages
|
||||
WHERE
|
||||
channel_id in ({values})
|
||||
) AS messages
|
||||
WHERE
|
||||
row_number = 1
|
||||
"#,
|
||||
);
|
||||
// let sql = format!(
|
||||
// r#"
|
||||
// SELECT
|
||||
// *
|
||||
// FROM (
|
||||
// SELECT
|
||||
// *,
|
||||
// row_number() OVER (
|
||||
// PARTITION BY channel_id
|
||||
// ORDER BY id DESC
|
||||
// ) as row_number
|
||||
// FROM channel_messages
|
||||
// WHERE
|
||||
// channel_id in ({values})
|
||||
// ) AS messages
|
||||
// WHERE
|
||||
// row_number = 1
|
||||
// "#,
|
||||
// );
|
||||
|
||||
let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
|
||||
let mut last_messages = channel_message::Model::find_by_statement(stmt)
|
||||
.stream(tx)
|
||||
.await?;
|
||||
// let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
|
||||
// let mut last_messages = channel_message::Model::find_by_statement(stmt)
|
||||
// .stream(tx)
|
||||
// .await?;
|
||||
|
||||
let mut results = Vec::new();
|
||||
while let Some(result) = last_messages.next().await {
|
||||
let message = result?;
|
||||
results.push(proto::ChannelMessageId {
|
||||
channel_id: message.channel_id.to_proto(),
|
||||
message_id: message.id.to_proto(),
|
||||
});
|
||||
}
|
||||
// let mut results = Vec::new();
|
||||
// while let Some(result) = last_messages.next().await {
|
||||
// let message = result?;
|
||||
// results.push(proto::ChannelMessageId {
|
||||
// channel_id: message.channel_id.to_proto(),
|
||||
// message_id: message.id.to_proto(),
|
||||
// });
|
||||
// }
|
||||
|
||||
Ok(results)
|
||||
Ok(vec![])
|
||||
}
|
||||
|
||||
fn get_notification_kind_id_by_name(&self, notification_kind: &str) -> Option<i32> {
|
||||
|
||||
Reference in New Issue
Block a user