Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 27 additions & 133 deletions core/conversations/src/conversation/group_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ use de_mls::{
default_score_deltas,
};
use hashgraph_like_consensus::signing::EthereumConsensusSigner;
use openmls::key_packages::KeyPackage;
use openmls::prelude::tls_codec::Serialize as _;
use openmls::prelude::{Capabilities, ExtensionType};
use prost::Message;
use shared_traits::{IdentId, IdentIdRef};
use std::sync::Arc;
Expand All @@ -37,47 +34,6 @@ use crate::{
conversation::{ChatError, Convo, GroupConvo, Identified},
};

/// Namespace used for de-mls (GroupV2) keypackages, so they don't collide
/// with the openmls (GroupV1) keypackage registered under the bare account id.
const DEMLS_KEYPACKAGE_NAMESPACE: &str = "demls";

/// Borrows an existing `IdentityProvider` but reports a namespaced `id()`,
/// so the same identity can register multiple keypackage "flavors"
/// (e.g. openmls vs. de-mls) without colliding in the registry.
struct NamespacedIdentity<'a> {
inner: &'a dyn IdentityProvider,
id: IdentId,
}

impl<'a> NamespacedIdentity<'a> {
fn new(inner: &'a dyn IdentityProvider, namespace: &str) -> Self {
let id = IdentId::new(Self::prefix(inner.id(), namespace));
Self { inner, id }
}

fn prefix(id: &IdentId, namespace: &str) -> String {
format!("{namespace}|{id}")
}
}

impl IdentityProvider for NamespacedIdentity<'_> {
fn id(&self) -> IdentIdRef<'_> {
&self.id
}

fn display_name(&self) -> String {
self.inner.display_name()
}

fn sign(&self, payload: &[u8]) -> crypto::Ed25519Signature {
self.inner.sign(payload)
}

fn public_key(&self) -> &crypto::Ed25519VerifyingKey {
self.inner.public_key()
}
}

/// Local member id bytes — the account identity the protocol matches on,
/// shared with the MLS credential and the consensus member.
fn member_id<S: ExternalServices>(service_ctx: &ServiceContext<S>) -> Vec<u8> {
Expand Down Expand Up @@ -130,35 +86,9 @@ fn demls_config() -> ConversationConfig {
}
}

/// Joiner: mint a single-use key package into the user's shared MLS provider
/// (storing its private keys there so the matching welcome opens), and return
/// the serialized public key package.
fn mint_key_package<S: ExternalServices>(
service_ctx: &ServiceContext<S>,
) -> Result<Vec<u8>, ChatError> {
let capabilities = Capabilities::builder()
.ciphersuites(vec![CIPHER_SUITE])
.extensions(vec![ExtensionType::ApplicationId])
.build();
let bundle = KeyPackage::builder()
.leaf_node_capabilities(capabilities)
.build(
CIPHER_SUITE,
&service_ctx.mls_provider,
&service_ctx.mls_identity,
service_ctx.mls_identity.get_credential(),
)
.map_err(ChatError::generic)?;
bundle
.key_package()
.tls_serialize_detached()
.map_err(ChatError::generic)
}

pub struct GroupV2Convo {
convo_id: String,
conversation:
Option<Conversation<DefaultConsensusPlugin, DefaultPeerScoring, DefaultStewardList>>,
conversation: Conversation<DefaultConsensusPlugin, DefaultPeerScoring, DefaultStewardList>,
/// Member-ids we proposed via add_member. We forward a welcome only to joiners WE invited.
pending_invites: Vec<Vec<u8>>,
}
Expand Down Expand Up @@ -197,7 +127,7 @@ impl GroupV2Convo {
)?;
let convo = GroupV2Convo {
convo_id,
conversation: Some(conversation),
conversation,
pending_invites: vec![],
};

Expand All @@ -206,40 +136,15 @@ impl GroupV2Convo {
Ok(convo)
}

/// Joiner side: register a fresh key package under the account name,
/// but do NOT start a conversation. `convo_id` stays empty until
/// [`Self::accept_welcome`] fills it.
pub fn new_pending<S: ExternalServices>(
service_ctx: &mut ServiceContext<S>,
) -> Result<Self, ChatError> {
let kp_bytes = mint_key_package(service_ctx)?;

// Namespace the key package so it doesn't collide with the GroupV1
// key package the registry keys under the bare account id.
let namespaced =
NamespacedIdentity::new(&*service_ctx.mls_identity, DEMLS_KEYPACKAGE_NAMESPACE);
service_ctx
.registry
.register(&namespaced, kp_bytes)
.map_err(ChatError::generic)?;

Ok(GroupV2Convo {
convo_id: String::new(),
conversation: None,
pending_invites: vec![],
})
}

/// Joiner side: ingest a de-mls welcome handed over the InboxV2 1-1
/// channel. `from_welcome` attaches MLS and applies the bundled
/// `ConversationSync` in one call; we then subscribe to the
/// conversation address and flush the join broadcast.
#[instrument(name = "groupv2.accept_welcome", skip_all, fields(user_id = %service_ctx.mls_identity.display_name()))]
pub fn accept_welcome<S: ExternalServices>(
&mut self,
#[instrument(name = "groupv2.new_from_welcome", skip_all, fields(user_id = %service_ctx.mls_identity.display_name()))]
pub fn new_from_welcome<S: ExternalServices>(
service_ctx: &mut ServiceContext<S>,
welcome: &MemberWelcome,
) -> Result<(), ChatError> {
) -> Result<Self, ChatError> {
let member = member_id(service_ctx);
let Some(conv) = Conversation::join(
&service_ctx.mls_provider,
Expand All @@ -256,11 +161,17 @@ impl GroupV2Convo {
else {
return Err(ChatError::generic("welcome not addressed to this member"));
};
self.convo_id = conv.id().to_string();
self.conversation = Some(conv);
self.init(service_ctx)?; // subscribe
self.after_op(service_ctx)?; // flush join broadcast + schedule wakeup
Ok(())

let mut convo = GroupV2Convo {
convo_id: conv.id().to_string(),
conversation: conv,
pending_invites: vec![],
};

convo.init(service_ctx)?; // subscribe
convo.after_op(service_ctx)?; // flush join broadcast + schedule wakeup

Ok(convo)
}

fn delivery_address_from_id(convo_id: &str) -> String {
Expand Down Expand Up @@ -304,11 +215,7 @@ where
service_ctx: &mut super::ServiceContext<S>,
content: &[u8],
) -> Result<(), ChatError> {
let conv = self
.conversation
.as_mut()
.ok_or_else(|| ChatError::generic("conversation not found"))?;
conv.send_message(
self.conversation.send_message(
&service_ctx.mls_provider,
content.to_vec(),
&service_ctx.mls_identity,
Expand All @@ -335,17 +242,14 @@ where
_ => return Ok(ConvoOutcome::empty(self.convo_id.clone())),
};

let conv = self
.conversation
.as_mut()
.ok_or_else(|| ChatError::generic("no conversation"))?;
conv.process_inbound(
self.conversation.process_inbound(
&service_ctx.mls_provider,
&frame.sender_app_id,
&inner,
&service_ctx.mls_identity,
)?;
conv.poll(&service_ctx.mls_provider, &service_ctx.mls_identity);
self.conversation
.poll(&service_ctx.mls_provider, &service_ctx.mls_identity);
let events = self.after_op(service_ctx)?; // route + publish + re-arm, returns events

match self.events_to_content(&events) {
Expand All @@ -360,10 +264,8 @@ where
#[instrument(name = "groupv2.wakeup", skip_all, fields(user_id = %ctx.mls_identity.display_name()))]
fn wakeup(&mut self, ctx: &mut ServiceContext<S>) -> Result<(), ChatError> {
info!(convo = %self.convo_id, "Wakeup");
let Some(conv) = self.conversation.as_mut() else {
return Ok(()); // pending joiner: no deadlines exist yet
};
let outcome = conv.poll(&ctx.mls_provider, &ctx.mls_identity);

let outcome = self.conversation.poll(&ctx.mls_provider, &ctx.mls_identity);
if outcome.leave_requested {
// Commit ejected us (or join expired). Real handling - drops
// this convo from its map;
Expand All @@ -389,23 +291,18 @@ where
// member-id is the invitee's id bytes).
let mut kps = Vec::with_capacity(members.len());
for member in members {
let device_id = NamespacedIdentity::prefix(member, DEMLS_KEYPACKAGE_NAMESPACE);
let kp_bytes = service_ctx
.registry
.retrieve(&device_id)
.retrieve(member.as_str())
.map_err(ChatError::generic)?
.ok_or_else(|| ChatError::generic("No key package"))?;
self.pending_invites
.push(member.as_str().as_bytes().to_vec());
kps.push(kp_bytes);
}

let conv = self
.conversation
.as_mut()
.ok_or_else(|| ChatError::generic("no conversation"))?;
for kp_bytes in &kps {
conv.add_member(
self.conversation.add_member(
&service_ctx.mls_provider,
kp_bytes,
&service_ctx.mls_identity,
Expand All @@ -429,13 +326,10 @@ impl GroupV2Convo {
&mut self,
service_ctx: &mut ServiceContext<S>,
) -> Result<Vec<ConversationEvent>, ChatError> {
let Some(conv) = self.conversation.as_ref() else {
return Ok(Vec::new()); // still pending join — nothing buffered
};
// Pull everything first (these are &self, take-all):
let events = conv.drain_events();
let outbound = conv.drain_outbound(); // Vec<de_mls::session::Outbound>
let wakeup = conv.next_wakeup_in();
let events = self.conversation.drain_events();
let outbound = self.conversation.drain_outbound(); // Vec<de_mls::session::Outbound>
let wakeup = self.conversation.next_wakeup_in();

// 1. Route welcomes for joiners WE invited (event fires on every member now).
for evt in &events {
Expand Down
20 changes: 2 additions & 18 deletions core/conversations/src/inbox_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use de_mls::protos::de_mls::messages::v1::MemberWelcome;
use openmls::prelude::tls_codec::Serialize;
use openmls::prelude::*;
use prost::{Message, Oneof};
use std::cell::RefCell;
use storage::{ConversationKind, ConversationMeta, ConversationStore};
use tracing::info;
use tracing::instrument;
Expand Down Expand Up @@ -82,15 +81,11 @@ pub fn invite_user_v2<DS: DeliveryService>(
pub struct InboxV2 {
// Account_id field is an owned value, so it can be returned via reference.
ident_id: IdentId,
pending_demls: RefCell<Option<GroupV2Convo>>,
}

impl InboxV2 {
pub fn new(ident_id: IdentId) -> Self {
Self {
ident_id,
pending_demls: RefCell::new(None),
}
Self { ident_id }
}

pub fn ident_id(&self) -> IdentIdRef<'_> {
Expand All @@ -110,12 +105,6 @@ impl InboxV2 {
.register(&cx.mls_identity, keypackage_bytes)
.map_err(ChatError::generic)?;

// de-mls (GroupV2) joiner: build a conversation-less User and register
// its de-mls key package under the same account name. This shadows the
// OpenMLS key package above in the registry; GroupV2 is the path the
// de-mls integration exercises.
*self.pending_demls.borrow_mut() = Some(GroupV2Convo::new_pending(cx)?);

Ok(())
}

Expand Down Expand Up @@ -150,14 +139,9 @@ impl InboxV2 {
}
InviteType::GroupV2(welcome_bytes) => {
info!("Process WelcomeMessage");
let mut convo = self
.pending_demls
.borrow_mut()
.take()
.ok_or_else(|| ChatError::generic("no pending de-mls convo"))?;
let mw =
MemberWelcome::decode(welcome_bytes.as_slice()).map_err(ChatError::generic)?;
convo.accept_welcome(service_ctx, &mw)?;
let convo = GroupV2Convo::new_from_welcome(service_ctx, &mw)?;
Ok(Some(Box::new(convo)))
}
}
Expand Down
Loading