From e7bd0175aa5384eb196ba3e7df3f1f3bba32fbfc Mon Sep 17 00:00:00 2001 From: Will Killian Date: Thu, 25 Jun 2026 13:16:21 -0400 Subject: [PATCH] feat(plugin): load rust_dynamic plugins in relay Signed-off-by: Will Killian --- Cargo.lock | 4 + crates/cli/src/config.rs | 4 +- crates/cli/src/launcher.rs | 42 +- crates/cli/src/main.rs | 8 +- crates/cli/src/plugins/lifecycle.rs | 55 +- crates/cli/src/server.rs | 104 +- crates/cli/tests/coverage/launcher_tests.rs | 62 + crates/cli/tests/coverage/main_tests.rs | 22 +- .../tests/coverage/plugins_lifecycle_tests.rs | 193 +- crates/cli/tests/coverage/server_tests.rs | 70 + crates/core/Cargo.toml | 8 + crates/core/src/api/runtime.rs | 2 +- crates/core/src/api/runtime/scope_stack.rs | 30 + .../src/api/runtime/subscriber_dispatcher.rs | 22 +- crates/core/src/plugin/dynamic.rs | 4 + crates/core/src/plugin/dynamic/native.rs | 2207 +++++++++++++++++ .../tests/fixtures/native_plugin/Cargo.toml | 18 + .../tests/fixtures/native_plugin/src/lib.rs | 470 ++++ .../tests/integration/native_plugin_tests.rs | 1145 +++++++++ 19 files changed, 4433 insertions(+), 37 deletions(-) create mode 100644 crates/core/src/plugin/dynamic/native.rs create mode 100644 crates/core/tests/fixtures/native_plugin/Cargo.toml create mode 100644 crates/core/tests/fixtures/native_plugin/src/lib.rs create mode 100644 crates/core/tests/integration/native_plugin_tests.rs diff --git a/Cargo.lock b/Cargo.lock index 9c38de959..c44679e4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1337,6 +1337,8 @@ dependencies = [ "futures-util", "getrandom 0.3.4", "js-sys", + "libloading", + "nemo-relay-plugin", "nemo-relay-types", "object_store", "openinference-semantic-conventions", @@ -1347,8 +1349,10 @@ dependencies = [ "reqwest", "rustls", "schemars", + "semver", "serde", "serde_json", + "sha2", "strum", "tempfile", "thiserror 2.0.18", diff --git a/crates/cli/src/config.rs b/crates/cli/src/config.rs index 118202745..ca2b3c84a 100644 --- a/crates/cli/src/config.rs +++ b/crates/cli/src/config.rs @@ -775,7 +775,9 @@ pub(crate) fn resolve_run_config( resolved.gateway.bind = "127.0.0.1:0" .parse() .expect("valid transparent bind address"); - enforce_required_dynamic_plugin_startup(config, &resolved)?; + if !command.dry_run { + enforce_required_dynamic_plugin_startup(config, &resolved)?; + } Ok(resolved) } diff --git a/crates/cli/src/launcher.rs b/crates/cli/src/launcher.rs index 25bdfd144..6de268af8 100644 --- a/crates/cli/src/launcher.rs +++ b/crates/cli/src/launcher.rs @@ -22,6 +22,7 @@ use crate::error::CliError; use crate::installer::{ generated_hooks, hook_forward_command, merge_hermes_config, merge_hooks, read_json_file, }; +use crate::plugins::lifecycle::ActiveDynamicPluginComponent; use crate::server; /// Runs a child coding-agent command behind an ephemeral local gateway. @@ -87,6 +88,7 @@ struct TransparentRun { agent: CodingAgent, prepared: PreparedRun, resolved: ResolvedConfig, + dynamic_plugins: Vec, listener: TcpListener, gateway_url: String, dry_run: bool, @@ -99,7 +101,16 @@ impl TransparentRun { async fn new(command: RunCommand, inherited: Option<&ServerArgs>) -> Result { let dry_run = command.dry_run; let print = command.print; + let explicit_config = command + .config + .as_ref() + .or_else(|| inherited.and_then(|args| args.config.as_ref())); let mut resolved = resolve_run_config(&command, inherited)?; + let dynamic_plugins = if dry_run { + Vec::new() + } else { + crate::plugins::lifecycle::active_dynamic_plugin_components(explicit_config, &resolved)? + }; let (agent, argv) = resolve_agent_and_argv(&command, &resolved.agents)?; let listener = TcpListener::bind("127.0.0.1:0").await?; let address = listener.local_addr()?; @@ -111,6 +122,7 @@ impl TransparentRun { agent, prepared, resolved, + dynamic_plugins, listener, gateway_url, dry_run, @@ -134,9 +146,10 @@ impl TransparentRun { } self.prepared .print_live_status(self.agent, &self.gateway_url, &self.resolved); - execute_live_run( + execute_live_run_with_dynamic( self.listener, self.resolved.gateway, + self.dynamic_plugins, &self.gateway_url, self.prepared, ) @@ -146,13 +159,24 @@ impl TransparentRun { // Starts the gateway, waits for readiness, runs the child command, restores temporary state, and then // maps the child process status to the launcher's exit code. +#[cfg(test)] async fn execute_live_run( listener: TcpListener, gateway_config: GatewayConfig, gateway_url: &str, prepared: PreparedRun, ) -> Result { - let running_server = RunningGateway::start(listener, gateway_config); + execute_live_run_with_dynamic(listener, gateway_config, Vec::new(), gateway_url, prepared).await +} + +async fn execute_live_run_with_dynamic( + listener: TcpListener, + gateway_config: GatewayConfig, + dynamic_plugins: Vec, + gateway_url: &str, + prepared: PreparedRun, +) -> Result { + let running_server = RunningGateway::start(listener, gateway_config, dynamic_plugins); if let Err(error) = wait_for_health(gateway_url).await { let restore = prepared.restore(); let server_result = running_server.stop().await; @@ -269,10 +293,20 @@ struct RunningGateway { impl RunningGateway { // Starts the gateway listener on a background task and keeps the shutdown sender paired with the // task handle so health failures and normal exits use identical cleanup semantics. - fn start(listener: TcpListener, config: crate::config::GatewayConfig) -> Self { + fn start( + listener: TcpListener, + config: crate::config::GatewayConfig, + dynamic_plugins: Vec, + ) -> Self { let (shutdown_tx, shutdown_rx) = oneshot::channel(); let task = tokio::spawn(async move { - server::serve_listener(listener, config, Some(shutdown_rx)).await + server::serve_listener_with_dynamic( + listener, + config, + dynamic_plugins, + Some(shutdown_rx), + ) + .await }); Self { shutdown_tx, task } } diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 41349982f..fc3813018 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -196,8 +196,12 @@ async fn run_default(server_args: &ServerArgs) -> Result Resu Ok(()) } +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct ActiveDynamicPluginComponent { + pub(crate) plugin_id: String, + pub(crate) kind: DynamicPluginKind, + pub(crate) manifest_ref: Option, + pub(crate) config: Map, +} + +pub(crate) fn active_dynamic_plugin_components( + explicit: Option<&PathBuf>, + resolved: &ResolvedConfig, +) -> Result, CliError> { + let scopes = load_and_hydrate_scopes(explicit, resolved)?; + let host_config_by_id = host_config_by_id(resolved); + let mut components = Vec::new(); + + for resolved_plugin in &resolved.dynamic_plugins { + let Some(entry) = find_record_by_id(&scopes, &resolved_plugin.plugin_id)? else { + return Err(CliError::Config(format!( + "dynamic plugin '{}' is present in resolved config but not lifecycle state", + resolved_plugin.plugin_id + ))); + }; + if entry.record.is_tombstoned() || !entry.record.spec.enabled { + continue; + } + let host_config = host_config_by_id + .get(&entry.record.metadata.id) + .ok_or_else(|| { + CliError::Config(format!( + "dynamic plugin '{}' is enabled but has no resolved host config", + entry.record.metadata.id + )) + })?; + components.push(ActiveDynamicPluginComponent { + plugin_id: entry.record.metadata.id.clone(), + kind: entry.record.metadata.kind, + manifest_ref: match entry.record.metadata.kind { + DynamicPluginKind::RustDynamic => Some(manifest_ref_from_record(&entry.record)?), + DynamicPluginKind::Worker => entry.record.source.manifest_ref.clone(), + }, + config: host_config.config.clone(), + }); + } + + Ok(components) +} + fn mutate_enabled_state( plugin_id: String, server: &ServerArgs, diff --git a/crates/cli/src/server.rs b/crates/cli/src/server.rs index 4329a4aec..9765cddd4 100644 --- a/crates/cli/src/server.rs +++ b/crates/cli/src/server.rs @@ -9,7 +9,12 @@ use axum::extract::{DefaultBodyLimit, State}; use axum::http::HeaderMap; use axum::routing::{get, post}; use axum::{Json, Router}; -use nemo_relay::plugin::{PluginConfig, clear_plugin_configuration, initialize_plugins_exact}; +use nemo_relay::plugin::dynamic::{ + DynamicPluginKind, NativePluginActivation, NativePluginLoadSpec, load_native_plugins, +}; +use nemo_relay::plugin::{ + PluginComponentSpec, PluginConfig, clear_plugin_configuration, initialize_plugins_exact, +}; use nemo_relay_adaptive::plugin_component::register_adaptive_component; use nemo_relay_pii_redaction::component::register_pii_redaction_component; use reqwest::Client; @@ -21,6 +26,7 @@ use crate::adapters::{claude_code, codex, cursor, hermes}; use crate::config::GatewayConfig; use crate::error::CliError; use crate::gateway; +use crate::plugins::lifecycle::ActiveDynamicPluginComponent; use crate::session::SessionManager; const HTTP_CONNECT_TIMEOUT: Duration = Duration::from_secs(30); @@ -35,11 +41,11 @@ pub(crate) struct AppState { pub(crate) last_activity: Arc>, } -/// Binds the configured address and serves until the process is stopped. -/// -/// Tests and transparent run mode use `serve_listener` directly so they can supply an already -/// bound ephemeral listener and optional shutdown channel. -pub(crate) async fn serve(config: GatewayConfig) -> Result<(), CliError> { +/// Binds the configured address and activates enabled dynamic plugins before serving. +pub(crate) async fn serve_with_dynamic( + config: GatewayConfig, + dynamic_plugins: Vec, +) -> Result<(), CliError> { let listener = TcpListener::bind(config.bind).await.map_err(|err| { // Translate the common bind-failure (port already in use) into an actionable message. // Plain `io error: Address already in use (os error 48)` is unhelpful; the friendly @@ -58,19 +64,31 @@ pub(crate) async fn serve(config: GatewayConfig) -> Result<(), CliError> { CliError::Io(err) } })?; - serve_listener(listener, config, None).await + serve_listener_with_dynamic(listener, config, dynamic_plugins, None).await } /// Serves the gateway router on a caller-owned listener with optional graceful shutdown. /// /// A provided shutdown receiver is best-effort: the send side may be dropped after the child agent /// exits, and either receiving or channel closure is enough to let Axum drain the listener. +#[cfg(test)] pub(crate) async fn serve_listener( listener: TcpListener, config: GatewayConfig, shutdown: Option>, ) -> Result<(), CliError> { - let plugin_activation = PluginActivation::initialize(config.plugin_config.clone()).await?; + serve_listener_with_dynamic(listener, config, Vec::new(), shutdown).await +} + +/// Serves the gateway router and activates enabled dynamic plugin components. +pub(crate) async fn serve_listener_with_dynamic( + listener: TcpListener, + config: GatewayConfig, + dynamic_plugins: Vec, + shutdown: Option>, +) -> Result<(), CliError> { + let plugin_activation = + PluginActivation::initialize(config.plugin_config.clone(), dynamic_plugins).await?; let state = AppState::new(config); let sessions = state.sessions.clone(); let last_activity = state.last_activity.clone(); @@ -197,12 +215,19 @@ async fn idle_shutdown_future( struct PluginActivation { active: bool, + native: Option, } impl PluginActivation { - async fn initialize(config: Option) -> Result { - let Some(config) = config else { - return Ok(Self { active: false }); + async fn initialize( + config: Option, + dynamic_plugins: Vec, + ) -> Result { + if config.is_none() && dynamic_plugins.is_empty() { + return Ok(Self { + active: false, + native: None, + }); }; register_adaptive_component().map_err(|error| { CliError::Config(format!("adaptive plugin registration failed: {error}")) @@ -210,22 +235,67 @@ impl PluginActivation { register_pii_redaction_component().map_err(|error| { CliError::Config(format!("PII redaction plugin registration failed: {error}")) })?; + let native_specs = dynamic_plugins + .iter() + .filter(|plugin| plugin.kind == DynamicPluginKind::RustDynamic) + .map(|plugin| { + let manifest_ref = plugin.manifest_ref.clone().ok_or_else(|| { + CliError::Config(format!( + "native dynamic plugin '{}' has no manifest_ref in lifecycle state", + plugin.plugin_id + )) + })?; + Ok(NativePluginLoadSpec { + plugin_id: plugin.plugin_id.clone(), + manifest_ref, + }) + }) + .collect::, CliError>>()?; + let native = + if native_specs.is_empty() { + None + } else { + Some(load_native_plugins(native_specs).map_err(|error| { + CliError::Config(format!("native plugin load failed: {error}")) + })?) + }; // Gateway already resolved its config; activate exactly (no re-discovery). - let plugin_config: PluginConfig = serde_json::from_value(config) - .map_err(|error| CliError::Config(format!("invalid plugin config: {error}")))?; + let mut plugin_config: PluginConfig = match config { + Some(config) => serde_json::from_value(config) + .map_err(|error| CliError::Config(format!("invalid plugin config: {error}")))?, + None => PluginConfig::default(), + }; + plugin_config + .components + .extend( + dynamic_plugins + .into_iter() + .map(|plugin| PluginComponentSpec { + kind: plugin.plugin_id, + enabled: true, + config: plugin.config, + }), + ); initialize_plugins_exact(plugin_config) .await .map_err(|error| CliError::Config(format!("plugin activation failed: {error}")))?; - Ok(Self { active: true }) + Ok(Self { + active: true, + native, + }) } fn clear(mut self) -> Result<(), CliError> { - if self.active { + let result = if self.active { self.active = false; clear_plugin_configuration() .map_err(|error| CliError::Config(format!("plugin teardown failed: {error}")))?; - } - Ok(()) + Ok(()) + } else { + Ok(()) + }; + self.native.take(); + result } } diff --git a/crates/cli/tests/coverage/launcher_tests.rs b/crates/cli/tests/coverage/launcher_tests.rs index f81468164..fefa81add 100644 --- a/crates/cli/tests/coverage/launcher_tests.rs +++ b/crates/cli/tests/coverage/launcher_tests.rs @@ -1137,6 +1137,68 @@ async fn dry_run_does_not_spawn_agent() { assert_eq!(code, ExitCode::SUCCESS); } +#[tokio::test] +async fn dry_run_does_not_hydrate_dynamic_plugin_lifecycle_state() { + let temp = tempfile::tempdir().unwrap(); + let plugin_dir = temp.path().join("plugins/acme"); + std::fs::create_dir_all(&plugin_dir).unwrap(); + let manifest_path = plugin_dir.join("relay-plugin.toml"); + std::fs::write( + &manifest_path, + format!( + r#" +manifest_version = 1 + +[plugin] +id = "acme.worker" +kind = "worker" + +[compat] +relay = "={version}" +worker_protocol = "grpc-v1" + +[capabilities] +items = ["plugin_worker"] + +[defaults] + +[load] +runtime = "python" +entrypoint = "acme.worker:create_plugin" +"#, + version = env!("CARGO_PKG_VERSION"), + ), + ) + .unwrap(); + let config_path = temp.path().join("config.toml"); + std::fs::write(&config_path, "").unwrap(); + std::fs::write( + temp.path().join("plugins.toml"), + format!( + "[[plugins.dynamic]]\nmanifest = {:?}\n", + manifest_path.to_string_lossy() + ), + ) + .unwrap(); + + let command = RunCommand { + agent: Some(CodingAgent::Codex), + config: Some(config_path), + openai_base_url: None, + anthropic_base_url: None, + session_metadata: None, + plugin_config: None, + dry_run: true, + print: false, + command: vec!["codex".into()], + }; + + let code = run(command, None).await.unwrap(); + + assert_eq!(code, ExitCode::SUCCESS); + assert!(!temp.path().join(".dynamic-plugins.json").exists()); +} + #[tokio::test] async fn wait_for_health_reports_unready_gateway() { let error = wait_for_health("http://127.0.0.1:1") diff --git a/crates/cli/tests/coverage/main_tests.rs b/crates/cli/tests/coverage/main_tests.rs index 9dd10b5bd..9572dfa2d 100644 --- a/crates/cli/tests/coverage/main_tests.rs +++ b/crates/cli/tests/coverage/main_tests.rs @@ -87,6 +87,10 @@ fn completions_helper_reports_missing_shell_and_generates_requested_shell() { fn safe_dispatch_helpers_cover_completions_and_plugins_paths() { let temp = tempfile::tempdir().unwrap(); let _env = EnvScope::hermetic(&temp); + let server = ServerArgs { + config: Some(temp.path().join("config.toml")), + ..ServerArgs::default() + }; assert_eq!( run_completions(CompletionsCommand { @@ -101,7 +105,7 @@ fn safe_dispatch_helpers_cover_completions_and_plugins_paths() { PluginsCommand { command: PluginsSubcommand::Edit(PluginsEditCommand::default()), }, - &ServerArgs::default(), + &server, ) .unwrap_err() .to_string(); @@ -112,7 +116,7 @@ fn safe_dispatch_helpers_cover_completions_and_plugins_paths() { PluginsCommand { command: PluginsSubcommand::List(PluginsListCommand::default()), }, - &ServerArgs::default() + &server ) .unwrap(), ExitCode::SUCCESS @@ -126,7 +130,7 @@ fn safe_dispatch_helpers_cover_completions_and_plugins_paths() { json: false, }), }, - &ServerArgs::default(), + &server, ) .unwrap(), ExitCode::from(2) @@ -140,7 +144,7 @@ fn safe_dispatch_helpers_cover_completions_and_plugins_paths() { json: false, }), }, - &ServerArgs::default(), + &server, ) .unwrap(), ExitCode::from(2) @@ -154,7 +158,7 @@ fn safe_dispatch_helpers_cover_completions_and_plugins_paths() { json: false, }), }, - &ServerArgs::default() + &server ) .unwrap(), ExitCode::SUCCESS @@ -165,6 +169,10 @@ fn safe_dispatch_helpers_cover_completions_and_plugins_paths() { fn safe_dispatch_plugin_json_errors_return_exit_codes() { let temp = tempfile::tempdir().unwrap(); let _env = EnvScope::hermetic(&temp); + let server = ServerArgs { + config: Some(temp.path().join("config.toml")), + ..ServerArgs::default() + }; assert_eq!( run_plugins( @@ -174,7 +182,7 @@ fn safe_dispatch_plugin_json_errors_return_exit_codes() { json: true, }), }, - &ServerArgs::default(), + &server, ) .unwrap(), ExitCode::from(2) @@ -188,7 +196,7 @@ fn safe_dispatch_plugin_json_errors_return_exit_codes() { json: true, }), }, - &ServerArgs::default(), + &server, ) .unwrap(), ExitCode::from(2) diff --git a/crates/cli/tests/coverage/plugins_lifecycle_tests.rs b/crates/cli/tests/coverage/plugins_lifecycle_tests.rs index 4b2dfccf4..5e57969c0 100644 --- a/crates/cli/tests/coverage/plugins_lifecycle_tests.rs +++ b/crates/cli/tests/coverage/plugins_lifecycle_tests.rs @@ -125,7 +125,7 @@ kind = "worker" [compat] relay = "0.5" -worker_protocol = "1" +worker_protocol = "grpc-v1" [defaults] enabled = false @@ -303,6 +303,54 @@ fn trust_evaluation_short_circuits_when_policy_is_blocked() { assert!(trust.failure().is_none()); } +fn write_native_dynamic_manifest(dir: &Path, plugin_id: &str) -> PathBuf { + let artifact_body = b"native plugin fixture"; + std::fs::write(dir.join("libfixture_native.so"), artifact_body).unwrap(); + let digest = format!( + "sha256:{}", + Sha256::digest(artifact_body) + .iter() + .map(|byte| format!("{byte:02x}")) + .collect::() + ); + let manifest_path = dir.join("relay-plugin.toml"); + std::fs::write( + &manifest_path, + format!( + r#" +manifest_version = 1 + +[plugin] +id = "{plugin_id}" +kind = "rust_dynamic" + +[compat] +relay = "0.5" +native_api = "1" + +[defaults] +enabled = false + +[capabilities] +items = ["plugin_native"] + +[source] +artifact = "libfixture_native.so" + +[integrity] +sha256 = "{digest}" + +[load] +library = "libfixture_native.so" +symbol = "nemo_relay_fixture_native_plugin" +"#, + digest = digest, + ), + ) + .unwrap(); + manifest_path +} + #[test] fn add_registers_dynamic_plugin_in_project_plugins_toml() { let temp = tempfile::tempdir().unwrap(); @@ -334,6 +382,149 @@ fn add_registers_dynamic_plugin_in_project_plugins_toml() { assert_eq!(resolved.dynamic_plugins[0].plugin_id, "acme.guardrail"); } +#[test] +fn active_dynamic_plugin_components_project_enabled_native_records_only() { + let temp = tempfile::tempdir().unwrap(); + let _env = EnvScope::hermetic(&temp); + let _cwd = CurrentDirGuard::enter(temp.path()); + let plugin_dir = temp.path().join("plugins").join("native"); + std::fs::create_dir_all(&plugin_dir).unwrap(); + write_native_dynamic_manifest(&plugin_dir, "acme.native"); + let server = crate::config::ServerArgs::default(); + + add( + PluginsAddCommand { + scope: PluginsScopeArgs { + project: true, + ..PluginsScopeArgs::default() + }, + path: plugin_dir, + }, + &server, + ) + .unwrap(); + + let resolved = resolve_plugins_config(None).unwrap(); + let inactive = active_dynamic_plugin_components(None, &resolved).unwrap(); + assert!(inactive.is_empty()); + + enable( + PluginsEnableCommand { + id: "acme.native".into(), + }, + &server, + ) + .unwrap(); + let resolved = resolve_plugins_config(None).unwrap(); + let active = active_dynamic_plugin_components(None, &resolved).unwrap(); + assert_eq!(active.len(), 1); + assert_eq!(active[0].plugin_id, "acme.native"); + assert_eq!(active[0].kind, DynamicPluginKind::RustDynamic); + assert!( + active[0] + .manifest_ref + .as_deref() + .is_some_and(|manifest_ref| manifest_ref.contains("relay-plugin.toml")) + ); + assert!(active[0].config.is_empty()); +} + +#[test] +fn active_dynamic_plugin_components_accept_enabled_worker_records() { + let temp = tempfile::tempdir().unwrap(); + let _env = EnvScope::hermetic(&temp); + let _cwd = CurrentDirGuard::enter(temp.path()); + let plugin_dir = temp.path().join("plugins").join("worker"); + std::fs::create_dir_all(&plugin_dir).unwrap(); + write_dynamic_manifest(&plugin_dir, "acme.worker"); + let server = crate::config::ServerArgs::default(); + + add( + PluginsAddCommand { + scope: PluginsScopeArgs { + project: true, + ..PluginsScopeArgs::default() + }, + path: plugin_dir, + }, + &server, + ) + .unwrap(); + enable( + PluginsEnableCommand { + id: "acme.worker".into(), + }, + &server, + ) + .unwrap(); + + let resolved = resolve_plugins_config(None).unwrap(); + let active = active_dynamic_plugin_components(None, &resolved).unwrap(); + assert_eq!(active.len(), 1); + assert_eq!(active[0].plugin_id, "acme.worker"); + assert_eq!(active[0].kind, DynamicPluginKind::Worker); + assert!( + active[0] + .manifest_ref + .as_deref() + .is_some_and(|manifest_ref| manifest_ref.contains("relay-plugin.toml")) + ); + assert!(active[0].config.is_empty()); +} + +#[test] +fn active_dynamic_plugin_components_accept_worker_records_without_manifest_ref() { + let temp = tempfile::tempdir().unwrap(); + let _env = EnvScope::hermetic(&temp); + let _cwd = CurrentDirGuard::enter(temp.path()); + let plugin_dir = temp.path().join("plugins").join("worker"); + std::fs::create_dir_all(&plugin_dir).unwrap(); + write_dynamic_manifest(&plugin_dir, "acme.worker"); + let server = crate::config::ServerArgs::default(); + + add( + PluginsAddCommand { + scope: PluginsScopeArgs { + project: true, + ..PluginsScopeArgs::default() + }, + path: plugin_dir, + }, + &server, + ) + .unwrap(); + enable( + PluginsEnableCommand { + id: "acme.worker".into(), + }, + &server, + ) + .unwrap(); + + let mut scopes = load_scoped_registries(server.config.as_ref()).unwrap(); + let scope = scopes + .iter_mut() + .find(|scope| scope.registry.get("acme.worker").is_some()) + .expect("worker record should exist"); + let mut records = scope.registry.cloned_records(true); + records + .iter_mut() + .find(|record| record.metadata.id == "acme.worker") + .expect("worker record should exist") + .source + .manifest_ref = None; + scope.registry = nemo_relay::plugin::dynamic::DynamicPluginRegistry::from_records(records) + .expect("registry should accept worker without manifest_ref"); + scope.save().unwrap(); + + let resolved = resolve_plugins_config(None).unwrap(); + let active = active_dynamic_plugin_components(None, &resolved).unwrap(); + assert_eq!(active.len(), 1); + assert_eq!(active[0].plugin_id, "acme.worker"); + assert_eq!(active[0].kind, DynamicPluginKind::Worker); + assert_eq!(active[0].manifest_ref, None); +} + #[test] fn add_rejects_duplicate_dynamic_plugin_ids() { let temp = tempfile::tempdir().unwrap(); diff --git a/crates/cli/tests/coverage/server_tests.rs b/crates/cli/tests/coverage/server_tests.rs index 42e507b7f..1b32b050e 100644 --- a/crates/cli/tests/coverage/server_tests.rs +++ b/crates/cli/tests/coverage/server_tests.rs @@ -18,6 +18,7 @@ use nemo_relay::api::registry::{ deregister_tool_conditional_execution_guardrail, register_tool_conditional_execution_guardrail, }; use nemo_relay::api::subscriber::{deregister_subscriber, flush_subscribers, register_subscriber}; +use nemo_relay::plugin::dynamic::DynamicPluginKind; use nemo_relay::plugin::{ ConfigDiagnostic, Plugin, PluginRegistration, PluginRegistrationContext, deregister_plugin, register_plugin, @@ -30,6 +31,7 @@ use tower::ServiceExt; use super::*; use crate::error::CliError; +use crate::plugins::lifecycle::ActiveDynamicPluginComponent; use crate::test_support::PLUGIN_CONFIG_TEST_LOCK; const GENERIC_TEST_PLUGIN_KIND: &str = "cli-test-generic-plugin"; @@ -168,6 +170,44 @@ fn test_config() -> GatewayConfig { } } +fn write_missing_native_plugin_manifest( + dir: &std::path::Path, + plugin_id: &str, +) -> std::path::PathBuf { + let missing_library = dir.join("missing-native-plugin"); + let manifest_ref = dir.join("relay-plugin.toml"); + let plugin_id = serde_json::to_string(plugin_id).unwrap(); + let library = serde_json::to_string(&missing_library.to_string_lossy()).unwrap(); + std::fs::write( + &manifest_ref, + format!( + r#"manifest_version = 1 + +[plugin] +id = {plugin_id} +kind = "rust_dynamic" + +[compat] +relay = "={version}" +native_api = "1" + +[defaults] +enabled = false + +[capabilities] +items = ["plugin_native"] + +[load] +library = {library} +symbol = "nemo_relay_missing_native_plugin" +"#, + version = env!("CARGO_PKG_VERSION"), + ), + ) + .unwrap(); + manifest_ref +} + fn find_scope_event<'a>( events: &'a [Value], name: &str, @@ -1678,6 +1718,36 @@ async fn serve_listener_rejects_invalid_plugin_config() { assert!(nemo_relay::plugin::active_plugin_report().is_none()); } +#[tokio::test] +async fn serve_listener_with_dynamic_reports_native_load_errors() { + let _guard = PLUGIN_CONFIG_TEST_LOCK.lock().await; + let _ = nemo_relay::plugin::clear_plugin_configuration(); + + let temp = tempfile::tempdir().unwrap(); + let manifest_ref = write_missing_native_plugin_manifest(temp.path(), "cli.missing-native"); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + drop(shutdown_tx); + let error = serve_listener_with_dynamic( + listener, + test_config(), + vec![ActiveDynamicPluginComponent { + plugin_id: "cli.missing-native".into(), + kind: DynamicPluginKind::RustDynamic, + manifest_ref: Some(manifest_ref.to_string_lossy().into_owned()), + config: Map::new(), + }], + Some(shutdown_rx), + ) + .await + .unwrap_err(); + + let error = error.to_string(); + assert!(error.contains("native plugin load failed"), "{error}"); + assert!(error.contains("does not exist"), "{error}"); + assert!(nemo_relay::plugin::active_plugin_report().is_none()); +} + #[tokio::test] async fn serve_listener_rejects_invalid_pii_redaction_plugin_config() { let _guard = PLUGIN_CONFIG_TEST_LOCK.lock().await; diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index aebca07f8..97f668690 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -116,6 +116,10 @@ path = "tests/integration/codec_tests.rs" name = "context_isolation_integration" path = "tests/integration/context_isolation_tests.rs" +[[test]] +name = "native_plugin_integration" +path = "tests/integration/native_plugin_tests.rs" + [[test]] name = "middleware_integration" path = "tests/integration/middleware_tests.rs" @@ -149,6 +153,10 @@ required-features = ["object-store"] opentelemetry-otlp = { version = "0.31.1", default-features = false, features = ["trace", "http-proto"], optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] +nemo-relay-plugin.workspace = true +libloading = "0.8" +semver = "1" +sha2 = "0.11" opentelemetry-otlp = { version = "0.31.1", default-features = false, features = ["trace", "http-proto", "reqwest-blocking-client", "grpc-tonic"], optional = true } reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots-no-provider", "stream"], optional = true } rustls = { version = "0.23", default-features = false, features = ["ring", "std", "tls12"], optional = true } diff --git a/crates/core/src/api/runtime.rs b/crates/core/src/api/runtime.rs index 7cb1f8868..2351ae352 100644 --- a/crates/core/src/api/runtime.rs +++ b/crates/core/src/api/runtime.rs @@ -20,7 +20,7 @@ pub use scope_stack::{ ScopeStack, ScopeStackHandle, TASK_SCOPE_STACK, ThreadScopeStackBinding, capture_thread_scope_stack, create_scope_stack, current_scope_stack, propagate_scope_to_thread, restore_thread_scope_stack, scope_stack_active, set_thread_scope_stack, - sync_thread_scope_stack, task_scope_push, task_scope_remove, task_scope_top, + sync_thread_scope_stack, task_scope_push, task_scope_remove, task_scope_top, with_scope_stack, }; pub use state::NemoRelayContextState; pub use subscriber_dispatcher::flush_subscribers; diff --git a/crates/core/src/api/runtime/scope_stack.rs b/crates/core/src/api/runtime/scope_stack.rs index 413407455..ca70e93cb 100644 --- a/crates/core/src/api/runtime/scope_stack.rs +++ b/crates/core/src/api/runtime/scope_stack.rs @@ -257,6 +257,9 @@ tokio::task_local! { } thread_local! { + /// Synchronous override used by native plugin callbacks that need to run a + /// bounded block with an isolated stack even inside a task-local context. + static SCOPE_STACK_OVERRIDE: RefCell> = const { RefCell::new(None) }; /// Thread-local fallback scope stack for non-task contexts. static THREAD_SCOPE_STACK: RefCell = RefCell::new(create_scope_stack()); /// Whether the current thread explicitly owns a scope stack. @@ -275,11 +278,35 @@ thread_local! { /// When no explicit thread-local stack has been installed yet, the default /// per-thread root-only stack is returned. pub fn current_scope_stack() -> ScopeStackHandle { + if let Some(stack) = SCOPE_STACK_OVERRIDE.with(|stack| stack.borrow().clone()) { + return stack; + } TASK_SCOPE_STACK .try_with(|stack| stack.clone()) .unwrap_or_else(|_| THREAD_SCOPE_STACK.with(|stack| stack.borrow().clone())) } +/// Run a synchronous callback with `handle` as the visible scope stack. +/// +/// This override takes precedence over task-local and thread-local stacks for +/// the duration of the callback and is restored even when the callback panics. +pub fn with_scope_stack(handle: ScopeStackHandle, f: impl FnOnce() -> T) -> T { + struct OverrideGuard { + previous: Option, + } + + impl Drop for OverrideGuard { + fn drop(&mut self) { + let previous = self.previous.take(); + SCOPE_STACK_OVERRIDE.with(|stack| *stack.borrow_mut() = previous); + } + } + + let previous = SCOPE_STACK_OVERRIDE.with(|stack| stack.replace(Some(handle))); + let _guard = OverrideGuard { previous }; + f() +} + /// Install an explicit scope stack for the current thread. /// /// This replaces the thread-local scope stack handle and marks the current @@ -356,6 +383,9 @@ pub fn sync_thread_scope_stack(handle: ScopeStackHandle) { /// A synchronized thread-local stack does not count as explicit unless it was /// installed through [`set_thread_scope_stack`]. pub fn scope_stack_active() -> bool { + if SCOPE_STACK_OVERRIDE.with(|stack| stack.borrow().is_some()) { + return true; + } TASK_SCOPE_STACK .try_with(|_| true) .unwrap_or_else(|_| THREAD_SCOPE_STACK_EXPLICIT.with(|flag| flag.get())) diff --git a/crates/core/src/api/runtime/subscriber_dispatcher.rs b/crates/core/src/api/runtime/subscriber_dispatcher.rs index 2bc799f8d..747ba692f 100644 --- a/crates/core/src/api/runtime/subscriber_dispatcher.rs +++ b/crates/core/src/api/runtime/subscriber_dispatcher.rs @@ -97,8 +97,28 @@ mod native { fn run_dispatcher(rx: Receiver) { while let Ok(message) = rx.recv() { - handle_message(message); + match message { + DispatcherMessage::Flush { done } => { + let pending_flushes = drain_pending_messages(&rx); + let _ = done.send(()); + for pending in pending_flushes { + let _ = pending.send(()); + } + } + message => handle_message(message), + } + } + } + + fn drain_pending_messages(rx: &Receiver) -> Vec> { + let mut pending_flushes = Vec::new(); + while let Ok(message) = rx.try_recv() { + match message { + DispatcherMessage::Flush { done } => pending_flushes.push(done), + message => handle_message(message), + } } + pending_flushes } fn handle_message(message: DispatcherMessage) { diff --git a/crates/core/src/plugin/dynamic.rs b/crates/core/src/plugin/dynamic.rs index 639714538..cd86e72fd 100644 --- a/crates/core/src/plugin/dynamic.rs +++ b/crates/core/src/plugin/dynamic.rs @@ -19,9 +19,13 @@ pub type DynamicPluginId = String; pub const DYNAMIC_PLUGIN_MANIFEST_FILENAME: &str = "relay-plugin.toml"; mod manifest; +#[cfg(not(target_arch = "wasm32"))] +mod native; mod registry; pub use manifest::*; +#[cfg(not(target_arch = "wasm32"))] +pub use native::*; pub use registry::*; /// Plugin execution lane. diff --git a/crates/core/src/plugin/dynamic/native.rs b/crates/core/src/plugin/dynamic/native.rs new file mode 100644 index 000000000..0f1c14444 --- /dev/null +++ b/crates/core/src/plugin/dynamic/native.rs @@ -0,0 +1,2207 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Native dynamic plugin loader and host-side ABI adapter. + +use std::cell::RefCell; +use std::ffi::c_void; +use std::future::Future; +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::ptr; +use std::sync::{Arc, Mutex, OnceLock}; +use std::task::{Context, Poll}; + +use chrono::{DateTime, Utc}; +use libloading::{Library, Symbol}; +use nemo_relay_plugin::{ + NEMO_RELAY_NATIVE_ABI_VERSION, NemoRelayNativeEventSubscriberCb, NemoRelayNativeFreeFn, + NemoRelayNativeHostApiV1, NemoRelayNativeJsonCb, NemoRelayNativeLlmConditionalCb, + NemoRelayNativeLlmExecutionCb, NemoRelayNativeLlmRequestCb, + NemoRelayNativeLlmRequestInterceptCb, NemoRelayNativeLlmStreamExecutionCb, + NemoRelayNativeLlmStreamV1, NemoRelayNativePluginContext, NemoRelayNativePluginEntry, + NemoRelayNativePluginV1, NemoRelayNativeScopeHandle, NemoRelayNativeScopeStack, + NemoRelayNativeScopeStackBinding, NemoRelayNativeScopeType, NemoRelayNativeString, + NemoRelayNativeToolConditionalCb, NemoRelayNativeToolExecutionCb, NemoRelayNativeToolJsonCb, + NemoRelayNativeWithScopeStackCb, NemoRelayStatus, +}; +use semver::{Version, VersionReq}; +use serde_json::{Map, Value as Json}; +use sha2::{Digest, Sha256}; +use tokio::runtime::Runtime; +use tokio_stream::{Stream, StreamExt}; + +use crate::api::event::Event; +use crate::api::llm::LlmRequest; +use crate::api::runtime::{ + EventSubscriberFn, LlmConditionalFn, LlmExecutionFn, LlmExecutionNextFn, LlmJsonStream, + LlmRequestInterceptFn, LlmSanitizeRequestFn, LlmSanitizeResponseFn, LlmStreamExecutionFn, + LlmStreamExecutionNextFn, ToolConditionalFn, ToolExecutionFn, ToolExecutionNextFn, + ToolInterceptFn, ToolSanitizeFn, +}; +use crate::api::runtime::{ + ScopeStackHandle, ThreadScopeStackBinding, capture_thread_scope_stack, create_scope_stack, + current_scope_stack, restore_thread_scope_stack, scope_stack_active, set_thread_scope_stack, + with_scope_stack, +}; +use crate::api::scope::{ + EmitMarkEventParams, PopScopeParams, PushScopeParams, ScopeAttributes, ScopeHandle, ScopeType, +}; +use crate::api::scope::{event as emit_scope_mark, get_handle, pop_scope, push_scope}; +use crate::codec::request::AnnotatedLlmRequest; +use crate::error::{FlowError, Result as FlowResult}; +use crate::plugin::{ + ConfigDiagnostic, DiagnosticLevel, Plugin, PluginError, PluginRegistrationContext, + deregister_plugin, register_plugin, +}; + +use super::{DynamicPluginKind, DynamicPluginManifest, DynamicPluginManifestLoad}; + +/// Native plugin load request derived from host dynamic-plugin state. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct NativePluginLoadSpec { + /// Expected plugin kind. + pub plugin_id: String, + /// Path to the authored `relay-plugin.toml`. + pub manifest_ref: String, +} + +/// Owns native dynamic libraries registered into the plugin registry. +/// +/// Dropping this value deregisters the native plugin kinds before unloading +/// their libraries. Clear active plugin configuration before dropping it so +/// runtime callbacks cannot outlive their code. +pub struct NativePluginActivation { + plugins: Vec>, + plugin_kinds: Vec, +} + +impl NativePluginActivation { + /// Returns `true` when no native plugins were loaded. + pub fn is_empty(&self) -> bool { + self.plugins.is_empty() + } + + /// Consumes the activation and deregisters loaded plugin kinds. + pub fn clear(self) {} +} + +impl Drop for NativePluginActivation { + fn drop(&mut self) { + for plugin_kind in self.plugin_kinds.iter().rev() { + let _ = deregister_plugin(plugin_kind); + } + } +} + +/// Loads native dynamic plugins and registers their plugin kinds. +/// +/// The returned activation must be kept alive until after active plugin +/// configuration has been cleared. +pub fn load_native_plugins(specs: I) -> crate::plugin::Result +where + I: IntoIterator, +{ + let mut activation = NativePluginActivation { + plugins: Vec::new(), + plugin_kinds: Vec::new(), + }; + for spec in specs { + let instance = load_one_native_plugin(&spec)?; + let plugin_kind = instance.plugin_kind.clone(); + register_plugin(Arc::new(NativePluginAdapter { + plugin_kind: plugin_kind.clone(), + allows_multiple_components: instance.allows_multiple_components, + instance: instance.clone(), + }))?; + activation.plugins.push(instance); + activation.plugin_kinds.push(plugin_kind); + } + Ok(activation) +} + +struct NativePluginAdapter { + plugin_kind: String, + allows_multiple_components: bool, + instance: Arc, +} + +impl Plugin for NativePluginAdapter { + fn plugin_kind(&self) -> &str { + &self.plugin_kind + } + + fn allows_multiple_components(&self) -> bool { + self.allows_multiple_components + } + + fn validate(&self, plugin_config: &Map) -> Vec { + let plugin = self + .instance + .plugin + .lock() + .expect("native plugin lock poisoned"); + let Some(validate) = plugin.validate else { + return vec![]; + }; + clear_native_last_error(); + let Some(config_json) = native_string_from_json(&Json::Object(plugin_config.clone())) + else { + return vec![native_error_diagnostic( + &self.plugin_kind, + "plugin.native_validate_failed", + "failed to serialize plugin config", + )]; + }; + let mut out = ptr::null_mut(); + let status = unsafe { validate(plugin.user_data, config_json, &mut out) }; + unsafe { native_string_free(config_json) }; + if status != NemoRelayStatus::Ok { + if !out.is_null() { + unsafe { native_string_free(out) }; + } + let message = native_last_error_message() + .unwrap_or_else(|| format!("native validate callback returned {status:?}")); + return vec![native_error_diagnostic( + &self.plugin_kind, + "plugin.native_validate_failed", + &message, + )]; + } + if out.is_null() { + return vec![]; + } + let diagnostics = read_native_string(out) + .ok() + .and_then(|text| serde_json::from_str::>(&text).ok()) + .unwrap_or_else(|| { + vec![native_error_diagnostic( + &self.plugin_kind, + "plugin.native_validate_failed", + "native validate callback returned invalid diagnostics JSON", + )] + }); + unsafe { native_string_free(out) }; + diagnostics + } + + fn register<'a>( + &'a self, + plugin_config: &Map, + ctx: &'a mut PluginRegistrationContext, + ) -> Pin> + Send + 'a>> { + let plugin_config = plugin_config.clone(); + Box::pin(async move { + let plugin = self.instance.plugin.lock().map_err(|err| { + PluginError::Internal(format!("native plugin lock poisoned: {err}")) + })?; + let register = plugin.register.ok_or_else(|| { + PluginError::RegistrationFailed(format!( + "native plugin '{}' did not return a register callback", + self.plugin_kind + )) + })?; + clear_native_last_error(); + let config_json = + native_string_from_json(&Json::Object(plugin_config)).ok_or_else(|| { + PluginError::RegistrationFailed("failed to serialize plugin config".into()) + })?; + let mut native_ctx = NativeHostPluginContext { + ctx: ctx as *mut _, + instance: self.instance.clone(), + }; + let status = unsafe { + register( + plugin.user_data, + config_json, + &mut native_ctx as *mut _ as *mut NemoRelayNativePluginContext, + ) + }; + unsafe { native_string_free(config_json) }; + if status == NemoRelayStatus::Ok { + Ok(()) + } else { + let message = native_last_error_message() + .unwrap_or_else(|| format!("native register callback returned {status:?}")); + Err(PluginError::RegistrationFailed(message)) + } + }) + } +} + +fn native_error_diagnostic(plugin_kind: &str, code: &str, message: &str) -> ConfigDiagnostic { + ConfigDiagnostic { + level: DiagnosticLevel::Error, + code: code.into(), + component: Some(plugin_kind.into()), + field: None, + message: message.into(), + } +} + +struct NativePluginInstance { + plugin_kind: String, + allows_multiple_components: bool, + plugin: Mutex, + _library: Library, +} + +unsafe impl Send for NativePluginInstance {} +unsafe impl Sync for NativePluginInstance {} + +impl Drop for NativePluginInstance { + fn drop(&mut self) { + if let Ok(mut plugin) = self.plugin.lock() { + drop_native_plugin_descriptor(&mut plugin); + } + } +} + +fn drop_native_plugin_descriptor(plugin: &mut NemoRelayNativePluginV1) { + if let Some(drop_fn) = plugin.drop.take() { + unsafe { drop_fn(plugin.user_data) }; + plugin.user_data = ptr::null_mut(); + } + if !plugin.plugin_kind.is_null() { + unsafe { native_string_free(plugin.plugin_kind) }; + plugin.plugin_kind = ptr::null_mut(); + } +} + +fn load_one_native_plugin( + spec: &NativePluginLoadSpec, +) -> crate::plugin::Result> { + let (manifest, manifest_ref) = DynamicPluginManifest::load_from_path(&spec.manifest_ref)?; + if manifest.plugin.id.trim() != spec.plugin_id { + return Err(PluginError::InvalidConfig(format!( + "dynamic plugin manifest id '{}' does not match expected id '{}'", + manifest.plugin.id, spec.plugin_id + ))); + } + if manifest.plugin.kind != DynamicPluginKind::RustDynamic { + return Err(PluginError::InvalidConfig(format!( + "dynamic plugin '{}' is kind {}; native loader only supports rust_dynamic", + spec.plugin_id, manifest.plugin.kind + ))); + } + validate_relay_compatibility(manifest.compat.relay.as_deref())?; + if manifest.compat.native_api.as_deref().map(str::trim) != Some("1") { + return Err(PluginError::InvalidConfig(format!( + "dynamic plugin '{}' declares unsupported compat.native_api '{}'; expected 1", + spec.plugin_id, + manifest.compat.native_api.as_deref().unwrap_or("") + ))); + } + let DynamicPluginManifestLoad::RustDynamic(load) = &manifest.load else { + return Err(PluginError::InvalidConfig(format!( + "dynamic plugin '{}' has invalid rust_dynamic load contract", + spec.plugin_id + ))); + }; + let manifest_path = PathBuf::from(&manifest_ref); + let library_path = resolve_manifest_relative_path( + &manifest_path, + load.library + .as_deref() + .ok_or_else(|| PluginError::InvalidConfig("load.library is required".into()))?, + ); + if !library_path.exists() { + return Err(PluginError::NotFound(format!( + "native plugin library '{}' does not exist", + library_path.display() + ))); + } + if let Some(expected_digest) = manifest + .integrity + .as_ref() + .and_then(|integrity| integrity.sha256.as_deref()) + { + verify_sha256(&library_path, expected_digest)?; + } + let symbol = load + .symbol + .as_deref() + .ok_or_else(|| PluginError::InvalidConfig("load.symbol is required".into()))?; + + let library = unsafe { Library::new(&library_path) }.map_err(|err| { + PluginError::Internal(format!( + "failed to load native plugin library '{}': {err}", + library_path.display() + )) + })?; + let mut plugin = NemoRelayNativePluginV1::default(); + unsafe { + let entry: Symbol = + library.get(symbol.as_bytes()).map_err(|err| { + PluginError::NotFound(format!( + "native plugin symbol '{symbol}' not found in '{}': {err}", + library_path.display() + )) + })?; + let status = entry(native_host_api(), &mut plugin); + if status != NemoRelayStatus::Ok { + drop_native_plugin_descriptor(&mut plugin); + return Err(PluginError::RegistrationFailed(format!( + "native plugin entry symbol '{symbol}' failed: {}", + native_last_error_message().unwrap_or_else(|| format!("{status:?}")) + ))); + } + } + if let Err(err) = validate_plugin_descriptor(&spec.plugin_id, &plugin) { + drop_native_plugin_descriptor(&mut plugin); + return Err(err); + } + let plugin_kind = match read_native_string(plugin.plugin_kind) { + Ok(plugin_kind) => plugin_kind, + Err(err) => { + drop_native_plugin_descriptor(&mut plugin); + return Err(err); + } + }; + if plugin_kind != spec.plugin_id { + drop_native_plugin_descriptor(&mut plugin); + return Err(PluginError::InvalidConfig(format!( + "native plugin returned kind '{plugin_kind}' but manifest id is '{}'", + spec.plugin_id + ))); + } + Ok(Arc::new(NativePluginInstance { + plugin_kind, + allows_multiple_components: plugin.allows_multiple_components, + plugin: Mutex::new(plugin), + _library: library, + })) +} + +fn validate_relay_compatibility(relay: Option<&str>) -> crate::plugin::Result<()> { + let relay = relay + .map(str::trim) + .filter(|value| !value.is_empty()) + .ok_or_else(|| PluginError::InvalidConfig("compat.relay is required".into()))?; + let req = VersionReq::parse(relay).map_err(|err| { + PluginError::InvalidConfig(format!("invalid compat.relay version requirement: {err}")) + })?; + let version = Version::parse(env!("CARGO_PKG_VERSION")) + .map_err(|err| PluginError::Internal(format!("failed to parse host version: {err}")))?; + if req.matches(&version) { + Ok(()) + } else { + Err(PluginError::InvalidConfig(format!( + "native plugin requires relay '{relay}' but host version is {version}" + ))) + } +} + +fn validate_plugin_descriptor( + plugin_id: &str, + plugin: &NemoRelayNativePluginV1, +) -> crate::plugin::Result<()> { + if plugin.struct_size < std::mem::size_of::() { + return Err(PluginError::InvalidConfig(format!( + "native plugin '{plugin_id}' returned incompatible plugin descriptor size {}", + plugin.struct_size + ))); + } + if plugin.plugin_kind.is_null() { + return Err(PluginError::InvalidConfig(format!( + "native plugin '{plugin_id}' returned a null plugin_kind" + ))); + } + if plugin.register.is_none() { + return Err(PluginError::InvalidConfig(format!( + "native plugin '{plugin_id}' returned no register callback" + ))); + } + Ok(()) +} + +fn resolve_manifest_relative_path(manifest_path: &Path, value: &str) -> PathBuf { + let path = PathBuf::from(value); + if path.is_absolute() { + path + } else { + manifest_path + .parent() + .map(|parent| parent.join(&path)) + .unwrap_or(path) + } +} + +fn verify_sha256(path: &Path, expected: &str) -> crate::plugin::Result<()> { + let expected = expected + .trim() + .strip_prefix("sha256:") + .unwrap_or(expected.trim()); + let bytes = std::fs::read(path).map_err(|err| { + PluginError::Internal(format!("failed to read '{}': {err}", path.display())) + })?; + let actual = hex_digest(Sha256::digest(bytes)); + if actual.eq_ignore_ascii_case(expected) { + Ok(()) + } else { + Err(PluginError::InvalidConfig(format!( + "native plugin library '{}' sha256 mismatch", + path.display() + ))) + } +} + +fn hex_digest(bytes: impl AsRef<[u8]>) -> String { + const HEX: &[u8; 16] = b"0123456789abcdef"; + let bytes = bytes.as_ref(); + let mut out = String::with_capacity(bytes.len() * 2); + for byte in bytes { + out.push(HEX[(byte >> 4) as usize] as char); + out.push(HEX[(byte & 0x0f) as usize] as char); + } + out +} + +#[repr(C)] +struct NativeHostPluginContext { + ctx: *mut PluginRegistrationContext, + instance: Arc, +} + +struct NativeHostString(Vec); + +struct NativeHostScopeHandle(ScopeHandle); + +struct NativeHostScopeStack(ScopeStackHandle); + +struct NativeHostScopeStackBinding(ThreadScopeStackBinding); + +thread_local! { + static NATIVE_LAST_ERROR: RefCell> = const { RefCell::new(None) }; +} + +fn set_native_last_error(message: impl Into) { + NATIVE_LAST_ERROR.with(|cell| *cell.borrow_mut() = Some(message.into())); +} + +fn clear_native_last_error() { + NATIVE_LAST_ERROR.with(|cell| *cell.borrow_mut() = None); +} + +fn native_last_error_message() -> Option { + NATIVE_LAST_ERROR.with(|cell| cell.borrow().clone()) +} + +unsafe extern "C" fn native_string_new( + data: *const u8, + len: usize, + out: *mut *mut NemoRelayNativeString, +) -> NemoRelayStatus { + if out.is_null() { + set_native_last_error("out string pointer is null"); + return NemoRelayStatus::NullPointer; + } + unsafe { *out = ptr::null_mut() }; + if data.is_null() && len > 0 { + set_native_last_error("string data pointer is null"); + return NemoRelayStatus::NullPointer; + } + let bytes: &[u8] = if len == 0 { + &[] + } else { + unsafe { std::slice::from_raw_parts(data, len) } + }; + if let Err(err) = std::str::from_utf8(bytes) { + set_native_last_error(format!("string data is not valid UTF-8: {err}")); + return NemoRelayStatus::InvalidUtf8; + } + let handle = Box::new(NativeHostString(bytes.to_vec())); + unsafe { *out = Box::into_raw(handle) as *mut NemoRelayNativeString }; + NemoRelayStatus::Ok +} + +unsafe extern "C" fn native_string_data(value: *const NemoRelayNativeString) -> *const u8 { + if value.is_null() { + return ptr::null(); + } + let value = unsafe { &*(value as *const NativeHostString) }; + value.0.as_ptr() +} + +unsafe extern "C" fn native_string_len(value: *const NemoRelayNativeString) -> usize { + if value.is_null() { + return 0; + } + let value = unsafe { &*(value as *const NativeHostString) }; + value.0.len() +} + +unsafe extern "C" fn native_string_free(value: *mut NemoRelayNativeString) { + if !value.is_null() { + drop(unsafe { Box::from_raw(value as *mut NativeHostString) }); + } +} + +unsafe extern "C" fn native_last_error_clear() { + clear_native_last_error(); +} + +unsafe extern "C" fn native_last_error_set(message: *const NemoRelayNativeString) { + match read_native_string(message) { + Ok(message) => set_native_last_error(message), + Err(err) => set_native_last_error(err.to_string()), + } +} + +fn native_host_api() -> *const NemoRelayNativeHostApiV1 { + static HOST_API: OnceLock = OnceLock::new(); + static RELAY_VERSION: &[u8] = concat!(env!("CARGO_PKG_VERSION"), "\0").as_bytes(); + HOST_API.get_or_init(|| NemoRelayNativeHostApiV1 { + abi_version: NEMO_RELAY_NATIVE_ABI_VERSION, + struct_size: std::mem::size_of::(), + relay_version: RELAY_VERSION.as_ptr().cast(), + string_new: native_string_new, + string_data: native_string_data, + string_len: native_string_len, + string_free: native_string_free, + last_error_clear: native_last_error_clear, + last_error_set: native_last_error_set, + plugin_context_register_subscriber: native_plugin_context_register_subscriber, + plugin_context_register_tool_sanitize_request_guardrail: + native_plugin_context_register_tool_sanitize_request_guardrail, + plugin_context_register_tool_sanitize_response_guardrail: + native_plugin_context_register_tool_sanitize_response_guardrail, + plugin_context_register_tool_conditional_execution_guardrail: + native_plugin_context_register_tool_conditional_execution_guardrail, + plugin_context_register_tool_request_intercept: + native_plugin_context_register_tool_request_intercept, + plugin_context_register_tool_execution_intercept: + native_plugin_context_register_tool_execution_intercept, + plugin_context_register_llm_sanitize_request_guardrail: + native_plugin_context_register_llm_sanitize_request_guardrail, + plugin_context_register_llm_sanitize_response_guardrail: + native_plugin_context_register_llm_sanitize_response_guardrail, + plugin_context_register_llm_conditional_execution_guardrail: + native_plugin_context_register_llm_conditional_execution_guardrail, + plugin_context_register_llm_request_intercept: + native_plugin_context_register_llm_request_intercept, + plugin_context_register_llm_execution_intercept: + native_plugin_context_register_llm_execution_intercept, + plugin_context_register_llm_stream_execution_intercept: + native_plugin_context_register_llm_stream_execution_intercept, + scope_handle_free: native_scope_handle_free, + scope_get_current: native_scope_get_current, + scope_push: native_scope_push, + scope_pop: native_scope_pop, + emit_mark: native_emit_mark, + scope_stack_create: native_scope_stack_create, + scope_stack_free: native_scope_stack_free, + scope_stack_set_thread: native_scope_stack_set_thread, + scope_stack_capture_thread: native_scope_stack_capture_thread, + scope_stack_restore_thread: native_scope_stack_restore_thread, + scope_stack_binding_free: native_scope_stack_binding_free, + scope_stack_active: native_scope_stack_active, + scope_stack_with_current: native_scope_stack_with_current, + }) as *const _ +} + +fn read_native_string(value: *const NemoRelayNativeString) -> crate::plugin::Result { + if value.is_null() { + return Ok(String::new()); + } + let value = unsafe { &*(value as *const NativeHostString) }; + std::str::from_utf8(&value.0) + .map(str::to_owned) + .map_err(|err| { + PluginError::InvalidConfig(format!("native string is not valid UTF-8: {err}")) + }) +} + +fn native_string_from_str(value: &str) -> Option<*mut NemoRelayNativeString> { + let mut out = ptr::null_mut(); + let status = unsafe { native_string_new(value.as_ptr(), value.len(), &mut out) }; + (status == NemoRelayStatus::Ok).then_some(out) +} + +fn native_string_from_json(value: &Json) -> Option<*mut NemoRelayNativeString> { + serde_json::to_string(value) + .ok() + .and_then(|value| native_string_from_str(&value)) +} + +fn json_from_native_string(value: *mut NemoRelayNativeString, fallback: &str) -> FlowResult { + if value.is_null() { + return Err(FlowError::Internal( + native_last_error_message().unwrap_or_else(|| fallback.into()), + )); + } + let text = read_native_string(value).map_err(|err| FlowError::Internal(err.to_string()))?; + serde_json::from_str(&text).map_err(|err| FlowError::Internal(format!("invalid JSON: {err}"))) +} + +fn take_native_string(value: *mut NemoRelayNativeString) -> FlowResult { + let result = read_native_string(value).map_err(|err| FlowError::Internal(err.to_string())); + unsafe { native_string_free(value) }; + result +} + +fn take_json_from_native_string( + value: *mut NemoRelayNativeString, + fallback: &str, +) -> FlowResult { + let result = json_from_native_string(value, fallback); + unsafe { native_string_free(value) }; + result +} + +fn optional_json_from_native_string( + value: *const NemoRelayNativeString, + field: &str, +) -> Result, NemoRelayStatus> { + if value.is_null() { + return Ok(None); + } + let text = read_native_string(value).map_err(|err| { + set_native_last_error(err.to_string()); + NemoRelayStatus::InvalidUtf8 + })?; + serde_json::from_str(&text).map(Some).map_err(|err| { + set_native_last_error(format!("{field} is not valid JSON: {err}")); + NemoRelayStatus::InvalidJson + }) +} + +fn optional_timestamp_from_native( + timestamp_unix_micros: *const i64, +) -> Result>, NemoRelayStatus> { + if timestamp_unix_micros.is_null() { + return Ok(None); + } + DateTime::::from_timestamp_micros(unsafe { ptr::read(timestamp_unix_micros) }) + .map(Some) + .ok_or_else(|| { + set_native_last_error("timestamp unix microseconds are outside supported range"); + NemoRelayStatus::InvalidArg + }) +} + +fn native_scope_type_to_core(scope_type: NemoRelayNativeScopeType) -> ScopeType { + match scope_type { + NemoRelayNativeScopeType::Agent => ScopeType::Agent, + NemoRelayNativeScopeType::Function => ScopeType::Function, + NemoRelayNativeScopeType::Tool => ScopeType::Tool, + NemoRelayNativeScopeType::Llm => ScopeType::Llm, + NemoRelayNativeScopeType::Retriever => ScopeType::Retriever, + NemoRelayNativeScopeType::Embedder => ScopeType::Embedder, + NemoRelayNativeScopeType::Reranker => ScopeType::Reranker, + NemoRelayNativeScopeType::Guardrail => ScopeType::Guardrail, + NemoRelayNativeScopeType::Evaluator => ScopeType::Evaluator, + NemoRelayNativeScopeType::Custom => ScopeType::Custom, + NemoRelayNativeScopeType::Unknown => ScopeType::Unknown, + } +} + +fn native_scope_ref<'a>(handle: *const NemoRelayNativeScopeHandle) -> Option<&'a ScopeHandle> { + if handle.is_null() { + return None; + } + Some(&unsafe { &*(handle as *const NativeHostScopeHandle) }.0) +} + +unsafe extern "C" fn native_scope_handle_free(handle: *mut NemoRelayNativeScopeHandle) { + if !handle.is_null() { + drop(unsafe { Box::from_raw(handle as *mut NativeHostScopeHandle) }); + } +} + +unsafe extern "C" fn native_scope_get_current( + out: *mut *mut NemoRelayNativeScopeHandle, +) -> NemoRelayStatus { + clear_native_last_error(); + if out.is_null() { + set_native_last_error("out scope handle pointer is null"); + return NemoRelayStatus::NullPointer; + } + unsafe { *out = ptr::null_mut() }; + match get_handle() { + Ok(handle) => { + unsafe { *out = Box::into_raw(Box::new(NativeHostScopeHandle(handle))).cast() }; + NemoRelayStatus::Ok + } + Err(err) => status_from_flow_error(err), + } +} + +unsafe extern "C" fn native_scope_push( + name: *const NemoRelayNativeString, + scope_type: NemoRelayNativeScopeType, + parent: *const NemoRelayNativeScopeHandle, + attributes: u32, + data_json: *const NemoRelayNativeString, + metadata_json: *const NemoRelayNativeString, + input_json: *const NemoRelayNativeString, + timestamp_unix_micros: *const i64, + out: *mut *mut NemoRelayNativeScopeHandle, +) -> NemoRelayStatus { + clear_native_last_error(); + if out.is_null() { + set_native_last_error("out scope handle pointer is null"); + return NemoRelayStatus::NullPointer; + } + unsafe { *out = ptr::null_mut() }; + let name = match read_name(name) { + Ok(name) => name, + Err(status) => return status, + }; + let data = match optional_json_from_native_string(data_json, "scope data") { + Ok(data) => data, + Err(status) => return status, + }; + let metadata = match optional_json_from_native_string(metadata_json, "scope metadata") { + Ok(metadata) => metadata, + Err(status) => return status, + }; + let input = match optional_json_from_native_string(input_json, "scope input") { + Ok(input) => input, + Err(status) => return status, + }; + let timestamp = match optional_timestamp_from_native(timestamp_unix_micros) { + Ok(timestamp) => timestamp, + Err(status) => return status, + }; + let parent_ref = native_scope_ref(parent); + match push_scope( + PushScopeParams::builder() + .name(&name) + .scope_type(native_scope_type_to_core(scope_type)) + .parent_opt(parent_ref) + .attributes(ScopeAttributes::from_bits_truncate(attributes)) + .data_opt(data) + .metadata_opt(metadata) + .input_opt(input) + .timestamp_opt(timestamp) + .build(), + ) { + Ok(handle) => { + unsafe { *out = Box::into_raw(Box::new(NativeHostScopeHandle(handle))).cast() }; + NemoRelayStatus::Ok + } + Err(err) => status_from_flow_error(err), + } +} + +unsafe extern "C" fn native_scope_pop( + handle: *const NemoRelayNativeScopeHandle, + output_json: *const NemoRelayNativeString, + metadata_json: *const NemoRelayNativeString, + timestamp_unix_micros: *const i64, +) -> NemoRelayStatus { + clear_native_last_error(); + if handle.is_null() { + set_native_last_error("scope handle is null"); + return NemoRelayStatus::NullPointer; + } + let output = match optional_json_from_native_string(output_json, "scope output") { + Ok(output) => output, + Err(status) => return status, + }; + let metadata = match optional_json_from_native_string(metadata_json, "scope metadata") { + Ok(metadata) => metadata, + Err(status) => return status, + }; + let timestamp = match optional_timestamp_from_native(timestamp_unix_micros) { + Ok(timestamp) => timestamp, + Err(status) => return status, + }; + let handle = unsafe { &*(handle as *const NativeHostScopeHandle) }; + match pop_scope( + PopScopeParams::builder() + .handle_uuid(&handle.0.uuid) + .output_opt(output) + .metadata_opt(metadata) + .timestamp_opt(timestamp) + .build(), + ) { + Ok(()) => NemoRelayStatus::Ok, + Err(err) => status_from_flow_error(err), + } +} + +unsafe extern "C" fn native_emit_mark( + name: *const NemoRelayNativeString, + parent: *const NemoRelayNativeScopeHandle, + data_json: *const NemoRelayNativeString, + metadata_json: *const NemoRelayNativeString, + timestamp_unix_micros: *const i64, +) -> NemoRelayStatus { + clear_native_last_error(); + let name = match read_name(name) { + Ok(name) => name, + Err(status) => return status, + }; + let data = match optional_json_from_native_string(data_json, "mark data") { + Ok(data) => data, + Err(status) => return status, + }; + let metadata = match optional_json_from_native_string(metadata_json, "mark metadata") { + Ok(metadata) => metadata, + Err(status) => return status, + }; + let timestamp = match optional_timestamp_from_native(timestamp_unix_micros) { + Ok(timestamp) => timestamp, + Err(status) => return status, + }; + let parent_ref = native_scope_ref(parent); + match emit_scope_mark( + EmitMarkEventParams::builder() + .name(&name) + .parent_opt(parent_ref) + .data_opt(data) + .metadata_opt(metadata) + .timestamp_opt(timestamp) + .build(), + ) { + Ok(()) => NemoRelayStatus::Ok, + Err(err) => status_from_flow_error(err), + } +} + +unsafe extern "C" fn native_scope_stack_create( + out: *mut *mut NemoRelayNativeScopeStack, +) -> NemoRelayStatus { + clear_native_last_error(); + if out.is_null() { + set_native_last_error("out scope stack pointer is null"); + return NemoRelayStatus::NullPointer; + } + unsafe { + *out = Box::into_raw(Box::new(NativeHostScopeStack(create_scope_stack()))).cast(); + } + NemoRelayStatus::Ok +} + +unsafe extern "C" fn native_scope_stack_free(stack: *mut NemoRelayNativeScopeStack) { + if !stack.is_null() { + drop(unsafe { Box::from_raw(stack as *mut NativeHostScopeStack) }); + } +} + +unsafe extern "C" fn native_scope_stack_set_thread( + stack: *const NemoRelayNativeScopeStack, +) -> NemoRelayStatus { + clear_native_last_error(); + if stack.is_null() { + set_native_last_error("scope stack is null"); + return NemoRelayStatus::NullPointer; + } + let stack = unsafe { &*(stack as *const NativeHostScopeStack) }; + set_thread_scope_stack(stack.0.clone()); + NemoRelayStatus::Ok +} + +unsafe extern "C" fn native_scope_stack_capture_thread( + out: *mut *mut NemoRelayNativeScopeStackBinding, +) -> NemoRelayStatus { + clear_native_last_error(); + if out.is_null() { + set_native_last_error("out scope stack binding pointer is null"); + return NemoRelayStatus::NullPointer; + } + unsafe { + *out = Box::into_raw(Box::new(NativeHostScopeStackBinding( + capture_thread_scope_stack(), + ))) + .cast(); + } + NemoRelayStatus::Ok +} + +unsafe extern "C" fn native_scope_stack_restore_thread( + binding: *mut NemoRelayNativeScopeStackBinding, +) -> NemoRelayStatus { + clear_native_last_error(); + if binding.is_null() { + set_native_last_error("scope stack binding is null"); + return NemoRelayStatus::NullPointer; + } + let binding = unsafe { Box::from_raw(binding as *mut NativeHostScopeStackBinding) }; + restore_thread_scope_stack(binding.0); + NemoRelayStatus::Ok +} + +unsafe extern "C" fn native_scope_stack_binding_free( + binding: *mut NemoRelayNativeScopeStackBinding, +) { + if !binding.is_null() { + drop(unsafe { Box::from_raw(binding as *mut NativeHostScopeStackBinding) }); + } +} + +unsafe extern "C" fn native_scope_stack_active() -> bool { + scope_stack_active() +} + +unsafe extern "C" fn native_scope_stack_with_current( + stack: *const NemoRelayNativeScopeStack, + cb: NemoRelayNativeWithScopeStackCb, + user_data: *mut c_void, +) -> NemoRelayStatus { + clear_native_last_error(); + if stack.is_null() { + set_native_last_error("scope stack is null"); + return NemoRelayStatus::NullPointer; + } + let stack = unsafe { &*(stack as *const NativeHostScopeStack) }; + let status = with_scope_stack(stack.0.clone(), || unsafe { cb(user_data) }); + if status != NemoRelayStatus::Ok && native_last_error_message().is_none() { + set_native_last_error(format!("native scope-stack callback returned {status:?}")); + } + status +} + +fn flow_error_from_status(status: NemoRelayStatus, fallback: &str) -> FlowError { + let message = native_last_error_message().unwrap_or_else(|| format!("{fallback}: {status:?}")); + match status { + NemoRelayStatus::AlreadyExists => FlowError::AlreadyExists(message), + NemoRelayStatus::NotFound => FlowError::NotFound(message), + NemoRelayStatus::ScopeStackEmpty => FlowError::ScopeStackEmpty, + NemoRelayStatus::GuardrailRejected => FlowError::GuardrailRejected(message), + NemoRelayStatus::InvalidArg => FlowError::InvalidArgument(message), + _ => FlowError::Internal(message), + } +} + +fn status_from_plugin_error(err: PluginError) -> NemoRelayStatus { + set_native_last_error(err.to_string()); + match err { + PluginError::NotFound(_) => NemoRelayStatus::NotFound, + PluginError::Conflict(_) => NemoRelayStatus::AlreadyExists, + PluginError::InvalidConfig(_) | PluginError::Serialization(_) => { + NemoRelayStatus::InvalidArg + } + PluginError::Internal(_) | PluginError::RegistrationFailed(_) => NemoRelayStatus::Internal, + } +} + +fn status_from_flow_error(err: FlowError) -> NemoRelayStatus { + set_native_last_error(err.to_string()); + match err { + FlowError::AlreadyExists(_) => NemoRelayStatus::AlreadyExists, + FlowError::NotFound(_) => NemoRelayStatus::NotFound, + FlowError::InvalidArgument(_) => NemoRelayStatus::InvalidArg, + FlowError::ScopeStackEmpty => NemoRelayStatus::ScopeStackEmpty, + FlowError::GuardrailRejected(_) => NemoRelayStatus::GuardrailRejected, + FlowError::Internal(_) => NemoRelayStatus::Internal, + } +} + +fn native_runtime() -> &'static Runtime { + static RUNTIME: OnceLock = OnceLock::new(); + RUNTIME.get_or_init(|| { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("native plugin runtime should build") + }) +} + +fn spawn_with_current_scope(f: impl FnOnce() -> T + Send + 'static) -> std::thread::JoinHandle +where + T: Send + 'static, +{ + let binding = capture_thread_scope_stack(); + let visible_stack = scope_stack_active().then(current_scope_stack); + std::thread::spawn(move || { + restore_thread_scope_stack(binding); + if let Some(stack) = visible_stack { + with_scope_stack(stack, f) + } else { + f() + } + }) +} + +struct NativeCallbackUserData { + ptr: *mut c_void, + free_fn: NemoRelayNativeFreeFn, + _instance: Arc, +} + +unsafe impl Send for NativeCallbackUserData {} +unsafe impl Sync for NativeCallbackUserData {} + +impl Drop for NativeCallbackUserData { + fn drop(&mut self) { + if let Some(free_fn) = self.free_fn { + unsafe { free_fn(self.ptr) }; + } + } +} + +fn make_user_data( + instance: Arc, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> Arc { + Arc::new(NativeCallbackUserData { + ptr: user_data, + free_fn, + _instance: instance, + }) +} + +fn host_ctx_mut<'a>( + ctx: *mut NemoRelayNativePluginContext, +) -> Result<&'a mut NativeHostPluginContext, NemoRelayStatus> { + if ctx.is_null() { + set_native_last_error("plugin context is null"); + return Err(NemoRelayStatus::NullPointer); + } + let ctx = unsafe { &mut *(ctx as *mut NativeHostPluginContext) }; + if ctx.ctx.is_null() { + set_native_last_error("plugin context inner pointer is null"); + return Err(NemoRelayStatus::NullPointer); + } + Ok(ctx) +} + +fn read_name(name: *const NemoRelayNativeString) -> Result { + read_native_string(name).map_err(|err| { + set_native_last_error(err.to_string()); + NemoRelayStatus::InvalidUtf8 + }) +} + +unsafe extern "C" fn native_plugin_context_register_subscriber( + ctx: *mut NemoRelayNativePluginContext, + name: *const NemoRelayNativeString, + cb: NemoRelayNativeEventSubscriberCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> NemoRelayStatus { + clear_native_last_error(); + let host_ctx = match host_ctx_mut(ctx) { + Ok(ctx) => ctx, + Err(status) => return status, + }; + let instance = host_ctx.instance.clone(); + let ctx = unsafe { &mut *host_ctx.ctx }; + let name = match read_name(name) { + Ok(name) => name, + Err(status) => return status, + }; + match ctx.register_subscriber( + &name, + wrap_event_subscriber(instance, cb, user_data, free_fn), + ) { + Ok(()) => NemoRelayStatus::Ok, + Err(err) => status_from_plugin_error(err), + } +} + +macro_rules! native_tool_json_context_register { + ($fn_name:ident, $ctx_method:ident) => { + unsafe extern "C" fn $fn_name( + ctx: *mut NemoRelayNativePluginContext, + name: *const NemoRelayNativeString, + priority: i32, + cb: NemoRelayNativeToolJsonCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, + ) -> NemoRelayStatus { + clear_native_last_error(); + let host_ctx = match host_ctx_mut(ctx) { + Ok(ctx) => ctx, + Err(status) => return status, + }; + let instance = host_ctx.instance.clone(); + let ctx = unsafe { &mut *host_ctx.ctx }; + let name = match read_name(name) { + Ok(name) => name, + Err(status) => return status, + }; + match ctx.$ctx_method( + &name, + priority, + wrap_tool_json_fn(instance, cb, user_data, free_fn), + ) { + Ok(()) => NemoRelayStatus::Ok, + Err(err) => status_from_plugin_error(err), + } + } + }; +} + +native_tool_json_context_register!( + native_plugin_context_register_tool_sanitize_request_guardrail, + register_tool_sanitize_request_guardrail +); +native_tool_json_context_register!( + native_plugin_context_register_tool_sanitize_response_guardrail, + register_tool_sanitize_response_guardrail +); + +unsafe extern "C" fn native_plugin_context_register_tool_conditional_execution_guardrail( + ctx: *mut NemoRelayNativePluginContext, + name: *const NemoRelayNativeString, + priority: i32, + cb: NemoRelayNativeToolConditionalCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> NemoRelayStatus { + clear_native_last_error(); + let host_ctx = match host_ctx_mut(ctx) { + Ok(ctx) => ctx, + Err(status) => return status, + }; + let instance = host_ctx.instance.clone(); + let ctx = unsafe { &mut *host_ctx.ctx }; + let name = match read_name(name) { + Ok(name) => name, + Err(status) => return status, + }; + match ctx.register_tool_conditional_execution_guardrail( + &name, + priority, + wrap_tool_conditional_fn(instance, cb, user_data, free_fn), + ) { + Ok(()) => NemoRelayStatus::Ok, + Err(err) => status_from_plugin_error(err), + } +} + +unsafe extern "C" fn native_plugin_context_register_tool_request_intercept( + ctx: *mut NemoRelayNativePluginContext, + name: *const NemoRelayNativeString, + priority: i32, + break_chain: bool, + cb: NemoRelayNativeToolJsonCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> NemoRelayStatus { + clear_native_last_error(); + let host_ctx = match host_ctx_mut(ctx) { + Ok(ctx) => ctx, + Err(status) => return status, + }; + let instance = host_ctx.instance.clone(); + let ctx = unsafe { &mut *host_ctx.ctx }; + let name = match read_name(name) { + Ok(name) => name, + Err(status) => return status, + }; + match ctx.register_tool_request_intercept( + &name, + priority, + break_chain, + wrap_tool_intercept_fn(instance, cb, user_data, free_fn), + ) { + Ok(()) => NemoRelayStatus::Ok, + Err(err) => status_from_plugin_error(err), + } +} + +unsafe extern "C" fn native_plugin_context_register_tool_execution_intercept( + ctx: *mut NemoRelayNativePluginContext, + name: *const NemoRelayNativeString, + priority: i32, + cb: NemoRelayNativeToolExecutionCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> NemoRelayStatus { + clear_native_last_error(); + let host_ctx = match host_ctx_mut(ctx) { + Ok(ctx) => ctx, + Err(status) => return status, + }; + let instance = host_ctx.instance.clone(); + let ctx = unsafe { &mut *host_ctx.ctx }; + let name = match read_name(name) { + Ok(name) => name, + Err(status) => return status, + }; + match ctx.register_tool_execution_intercept( + &name, + priority, + wrap_tool_execution_fn(instance, cb, user_data, free_fn), + ) { + Ok(()) => NemoRelayStatus::Ok, + Err(err) => status_from_plugin_error(err), + } +} + +unsafe extern "C" fn native_plugin_context_register_llm_sanitize_request_guardrail( + ctx: *mut NemoRelayNativePluginContext, + name: *const NemoRelayNativeString, + priority: i32, + cb: NemoRelayNativeLlmRequestCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> NemoRelayStatus { + clear_native_last_error(); + let host_ctx = match host_ctx_mut(ctx) { + Ok(ctx) => ctx, + Err(status) => return status, + }; + let instance = host_ctx.instance.clone(); + let ctx = unsafe { &mut *host_ctx.ctx }; + let name = match read_name(name) { + Ok(name) => name, + Err(status) => return status, + }; + match ctx.register_llm_sanitize_request_guardrail( + &name, + priority, + wrap_llm_request_fn(instance, cb, user_data, free_fn), + ) { + Ok(()) => NemoRelayStatus::Ok, + Err(err) => status_from_plugin_error(err), + } +} + +unsafe extern "C" fn native_plugin_context_register_llm_sanitize_response_guardrail( + ctx: *mut NemoRelayNativePluginContext, + name: *const NemoRelayNativeString, + priority: i32, + cb: NemoRelayNativeJsonCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> NemoRelayStatus { + clear_native_last_error(); + let host_ctx = match host_ctx_mut(ctx) { + Ok(ctx) => ctx, + Err(status) => return status, + }; + let instance = host_ctx.instance.clone(); + let ctx = unsafe { &mut *host_ctx.ctx }; + let name = match read_name(name) { + Ok(name) => name, + Err(status) => return status, + }; + match ctx.register_llm_sanitize_response_guardrail( + &name, + priority, + wrap_json_fn(instance, cb, user_data, free_fn), + ) { + Ok(()) => NemoRelayStatus::Ok, + Err(err) => status_from_plugin_error(err), + } +} + +unsafe extern "C" fn native_plugin_context_register_llm_conditional_execution_guardrail( + ctx: *mut NemoRelayNativePluginContext, + name: *const NemoRelayNativeString, + priority: i32, + cb: NemoRelayNativeLlmConditionalCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> NemoRelayStatus { + clear_native_last_error(); + let host_ctx = match host_ctx_mut(ctx) { + Ok(ctx) => ctx, + Err(status) => return status, + }; + let instance = host_ctx.instance.clone(); + let ctx = unsafe { &mut *host_ctx.ctx }; + let name = match read_name(name) { + Ok(name) => name, + Err(status) => return status, + }; + match ctx.register_llm_conditional_execution_guardrail( + &name, + priority, + wrap_llm_conditional_fn(instance, cb, user_data, free_fn), + ) { + Ok(()) => NemoRelayStatus::Ok, + Err(err) => status_from_plugin_error(err), + } +} + +unsafe extern "C" fn native_plugin_context_register_llm_request_intercept( + ctx: *mut NemoRelayNativePluginContext, + name: *const NemoRelayNativeString, + priority: i32, + break_chain: bool, + cb: NemoRelayNativeLlmRequestInterceptCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> NemoRelayStatus { + clear_native_last_error(); + let host_ctx = match host_ctx_mut(ctx) { + Ok(ctx) => ctx, + Err(status) => return status, + }; + let instance = host_ctx.instance.clone(); + let ctx = unsafe { &mut *host_ctx.ctx }; + let name = match read_name(name) { + Ok(name) => name, + Err(status) => return status, + }; + match ctx.register_llm_request_intercept( + &name, + priority, + break_chain, + wrap_llm_request_intercept_fn(instance, cb, user_data, free_fn), + ) { + Ok(()) => NemoRelayStatus::Ok, + Err(err) => status_from_plugin_error(err), + } +} + +unsafe extern "C" fn native_plugin_context_register_llm_execution_intercept( + ctx: *mut NemoRelayNativePluginContext, + name: *const NemoRelayNativeString, + priority: i32, + cb: NemoRelayNativeLlmExecutionCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> NemoRelayStatus { + clear_native_last_error(); + let host_ctx = match host_ctx_mut(ctx) { + Ok(ctx) => ctx, + Err(status) => return status, + }; + let instance = host_ctx.instance.clone(); + let ctx = unsafe { &mut *host_ctx.ctx }; + let name = match read_name(name) { + Ok(name) => name, + Err(status) => return status, + }; + match ctx.register_llm_execution_intercept( + &name, + priority, + wrap_llm_execution_fn(instance, cb, user_data, free_fn), + ) { + Ok(()) => NemoRelayStatus::Ok, + Err(err) => status_from_plugin_error(err), + } +} + +unsafe extern "C" fn native_plugin_context_register_llm_stream_execution_intercept( + ctx: *mut NemoRelayNativePluginContext, + name: *const NemoRelayNativeString, + priority: i32, + cb: NemoRelayNativeLlmStreamExecutionCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> NemoRelayStatus { + clear_native_last_error(); + let host_ctx = match host_ctx_mut(ctx) { + Ok(ctx) => ctx, + Err(status) => return status, + }; + let instance = host_ctx.instance.clone(); + let ctx = unsafe { &mut *host_ctx.ctx }; + let name = match read_name(name) { + Ok(name) => name, + Err(status) => return status, + }; + match ctx.register_llm_stream_execution_intercept( + &name, + priority, + wrap_llm_stream_execution_fn(instance, cb, user_data, free_fn), + ) { + Ok(()) => NemoRelayStatus::Ok, + Err(err) => status_from_plugin_error(err), + } +} + +fn wrap_event_subscriber( + instance: Arc, + cb: NemoRelayNativeEventSubscriberCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> EventSubscriberFn { + let user_data = make_user_data(instance, user_data, free_fn); + Arc::new(move |event: &Event| { + let event_json = serde_json::to_value(event).unwrap_or(Json::Null); + if let Some(event_string) = native_string_from_json(&event_json) { + let status = unsafe { cb(user_data.ptr, event_string) }; + if status != NemoRelayStatus::Ok { + set_native_last_error(format!("native subscriber callback returned {status:?}")); + } + unsafe { native_string_free(event_string) }; + } + }) +} + +fn wrap_tool_json_fn( + instance: Arc, + cb: NemoRelayNativeToolJsonCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> ToolSanitizeFn { + let user_data = make_user_data(instance, user_data, free_fn); + Arc::new(move |name, payload| { + call_tool_json_callback(cb, user_data.ptr, name, &payload).unwrap_or(Json::Null) + }) +} + +fn wrap_tool_intercept_fn( + instance: Arc, + cb: NemoRelayNativeToolJsonCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> ToolInterceptFn { + let user_data = make_user_data(instance, user_data, free_fn); + Arc::new(move |name, payload| call_tool_json_callback(cb, user_data.ptr, name, &payload)) +} + +fn call_tool_json_callback( + cb: NemoRelayNativeToolJsonCb, + user_data: *mut c_void, + name: &str, + payload: &Json, +) -> FlowResult { + clear_native_last_error(); + let name = native_string_from_str(name) + .ok_or_else(|| FlowError::Internal("failed to allocate native name".into()))?; + let payload = native_string_from_json(payload) + .ok_or_else(|| FlowError::Internal("failed to allocate native payload".into()))?; + let mut out = ptr::null_mut(); + let status = unsafe { cb(user_data, name, payload, &mut out) }; + unsafe { + native_string_free(name); + native_string_free(payload); + } + if status != NemoRelayStatus::Ok { + if !out.is_null() { + unsafe { native_string_free(out) }; + } + return Err(flow_error_from_status( + status, + "native JSON callback failed", + )); + } + take_json_from_native_string(out, "native JSON callback returned null") +} + +fn wrap_tool_conditional_fn( + instance: Arc, + cb: NemoRelayNativeToolConditionalCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> ToolConditionalFn { + let user_data = make_user_data(instance, user_data, free_fn); + Arc::new(move |name, args| { + clear_native_last_error(); + let name_string = native_string_from_str(name) + .ok_or_else(|| FlowError::Internal("failed to allocate native name".into()))?; + let args_string = native_string_from_json(args) + .ok_or_else(|| FlowError::Internal("failed to allocate native args".into()))?; + let mut out = ptr::null_mut(); + let status = unsafe { cb(user_data.ptr, name_string, args_string, &mut out) }; + unsafe { + native_string_free(name_string); + native_string_free(args_string); + } + if status != NemoRelayStatus::Ok { + if !out.is_null() { + unsafe { native_string_free(out) }; + } + return Err(flow_error_from_status( + status, + "native tool conditional failed", + )); + } + if out.is_null() { + Ok(None) + } else { + let reason = take_native_string(out)?; + Ok(Some(reason)) + } + }) +} + +fn wrap_tool_execution_fn( + instance: Arc, + cb: NemoRelayNativeToolExecutionCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> ToolExecutionFn { + let user_data = make_user_data(instance, user_data, free_fn); + Arc::new(move |name, args, next| { + let name = name.to_owned(); + let user_data = user_data.clone(); + Box::pin(async move { + clear_native_last_error(); + let name_string = native_string_from_str(&name) + .ok_or_else(|| FlowError::Internal("failed to allocate native name".into()))?; + let args_string = native_string_from_json(&args) + .ok_or_else(|| FlowError::Internal("failed to allocate native args".into()))?; + let next_ctx = Box::into_raw(Box::new(next)) as *mut c_void; + let mut out = ptr::null_mut(); + let status = unsafe { + cb( + user_data.ptr, + name_string, + args_string, + native_tool_next, + next_ctx, + &mut out, + ) + }; + unsafe { + drop(Box::from_raw(next_ctx as *mut ToolExecutionNextFn)); + native_string_free(name_string); + native_string_free(args_string); + } + if status != NemoRelayStatus::Ok { + if !out.is_null() { + unsafe { native_string_free(out) }; + } + return Err(flow_error_from_status( + status, + "native tool execution failed", + )); + } + take_json_from_native_string(out, "native tool execution returned null") + }) + }) +} + +unsafe extern "C" fn native_tool_next( + args_json: *const NemoRelayNativeString, + next_ctx: *mut c_void, + out_json: *mut *mut NemoRelayNativeString, +) -> NemoRelayStatus { + if next_ctx.is_null() || out_json.is_null() { + set_native_last_error("native tool next received null pointer"); + return NemoRelayStatus::NullPointer; + } + let args = match parse_json_arg(args_json, "native tool next args") { + Ok(args) => args, + Err(status) => return status, + }; + let next = unsafe { (*(next_ctx as *const ToolExecutionNextFn)).clone() }; + let result = spawn_with_current_scope(move || native_runtime().block_on(next(args))).join(); + match result { + Ok(Ok(result)) => write_native_json(&result, out_json), + Ok(Err(err)) => status_from_flow_error(err), + Err(_) => { + set_native_last_error("native tool next panicked"); + NemoRelayStatus::Internal + } + } +} + +fn wrap_llm_request_fn( + instance: Arc, + cb: NemoRelayNativeLlmRequestCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> LlmSanitizeRequestFn { + let user_data = make_user_data(instance, user_data, free_fn); + Arc::new(move |request| { + call_llm_request_callback(cb, user_data.ptr, &request).unwrap_or_else(|_| LlmRequest { + headers: Map::new(), + content: Json::Null, + }) + }) +} + +fn call_llm_request_callback( + cb: NemoRelayNativeLlmRequestCb, + user_data: *mut c_void, + request: &LlmRequest, +) -> FlowResult { + clear_native_last_error(); + let request_json = serde_json::to_value(request) + .map_err(|err| FlowError::Internal(format!("failed to serialize LLM request: {err}")))?; + let request_string = native_string_from_json(&request_json) + .ok_or_else(|| FlowError::Internal("failed to allocate native LLM request".into()))?; + let mut out = ptr::null_mut(); + let status = unsafe { cb(user_data, request_string, &mut out) }; + unsafe { native_string_free(request_string) }; + if status != NemoRelayStatus::Ok { + if !out.is_null() { + unsafe { native_string_free(out) }; + } + return Err(flow_error_from_status( + status, + "native LLM request callback failed", + )); + } + let result_json = + take_json_from_native_string(out, "native LLM request callback returned null")?; + serde_json::from_value(result_json) + .map_err(|err| FlowError::Internal(format!("invalid LLM request JSON: {err}"))) +} + +fn wrap_json_fn( + instance: Arc, + cb: NemoRelayNativeJsonCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> LlmSanitizeResponseFn { + let user_data = make_user_data(instance, user_data, free_fn); + Arc::new(move |payload| { + clear_native_last_error(); + let payload_string = native_string_from_json(&payload); + let Some(payload_string) = payload_string else { + return Json::Null; + }; + let mut out = ptr::null_mut(); + let status = unsafe { cb(user_data.ptr, payload_string, &mut out) }; + unsafe { native_string_free(payload_string) }; + if status != NemoRelayStatus::Ok { + if !out.is_null() { + unsafe { native_string_free(out) }; + } + return Json::Null; + } + take_json_from_native_string(out, "native JSON callback returned null") + .unwrap_or(Json::Null) + }) +} + +fn wrap_llm_conditional_fn( + instance: Arc, + cb: NemoRelayNativeLlmConditionalCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> LlmConditionalFn { + let user_data = make_user_data(instance, user_data, free_fn); + Arc::new(move |request| { + clear_native_last_error(); + let request_json = serde_json::to_value(request).map_err(|err| { + FlowError::Internal(format!("failed to serialize LLM request: {err}")) + })?; + let request_string = native_string_from_json(&request_json) + .ok_or_else(|| FlowError::Internal("failed to allocate native LLM request".into()))?; + let mut out = ptr::null_mut(); + let status = unsafe { cb(user_data.ptr, request_string, &mut out) }; + unsafe { native_string_free(request_string) }; + if status != NemoRelayStatus::Ok { + if !out.is_null() { + unsafe { native_string_free(out) }; + } + return Err(flow_error_from_status( + status, + "native LLM conditional failed", + )); + } + if out.is_null() { + Ok(None) + } else { + let reason = take_native_string(out)?; + Ok(Some(reason)) + } + }) +} + +fn wrap_llm_request_intercept_fn( + instance: Arc, + cb: NemoRelayNativeLlmRequestInterceptCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> LlmRequestInterceptFn { + let user_data = make_user_data(instance, user_data, free_fn); + Arc::new(move |name, request, annotated| { + clear_native_last_error(); + let name_string = native_string_from_str(name) + .ok_or_else(|| FlowError::Internal("failed to allocate native name".into()))?; + let request_json = serde_json::to_value(&request).map_err(|err| { + FlowError::Internal(format!("failed to serialize LLM request: {err}")) + })?; + let request_string = native_string_from_json(&request_json) + .ok_or_else(|| FlowError::Internal("failed to allocate native LLM request".into()))?; + let annotated_string = match &annotated { + Some(annotated) => { + let value = serde_json::to_value(annotated).map_err(|err| { + FlowError::Internal(format!("failed to serialize annotated request: {err}")) + })?; + native_string_from_json(&value).ok_or_else(|| { + FlowError::Internal("failed to allocate annotated request".into()) + })? + } + None => ptr::null_mut(), + }; + let mut out_request = ptr::null_mut(); + let mut out_annotated = ptr::null_mut(); + let status = unsafe { + cb( + user_data.ptr, + name_string, + request_string, + annotated_string, + &mut out_request, + &mut out_annotated, + ) + }; + unsafe { + native_string_free(name_string); + native_string_free(request_string); + native_string_free(annotated_string); + } + if status != NemoRelayStatus::Ok { + unsafe { + native_string_free(out_request); + native_string_free(out_annotated); + } + return Err(flow_error_from_status( + status, + "native LLM request intercept failed", + )); + } + let request_json = json_from_native_string( + out_request, + "native LLM request intercept returned null request", + ); + let annotated_json = if out_annotated.is_null() { + Ok(None) + } else { + json_from_native_string(out_annotated, "invalid annotated request").map(Some) + }; + unsafe { + native_string_free(out_request); + native_string_free(out_annotated); + } + let request_json = request_json?; + let annotated_json = annotated_json?; + let request: LlmRequest = serde_json::from_value(request_json) + .map_err(|err| FlowError::Internal(format!("invalid LLM request JSON: {err}")))?; + let annotated = annotated_json + .map(|annotated_json| { + serde_json::from_value::(annotated_json).map_err(|err| { + FlowError::Internal(format!("invalid annotated request JSON: {err}")) + }) + }) + .transpose()?; + Ok((request, annotated)) + }) +} + +fn wrap_llm_execution_fn( + instance: Arc, + cb: NemoRelayNativeLlmExecutionCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> LlmExecutionFn { + let user_data = make_user_data(instance, user_data, free_fn); + Arc::new(move |name, request, next| { + let name = name.to_owned(); + let user_data = user_data.clone(); + Box::pin(async move { call_llm_execution_callback(cb, &user_data, &name, &request, next) }) + }) +} + +fn call_llm_execution_callback( + cb: NemoRelayNativeLlmExecutionCb, + user_data: &NativeCallbackUserData, + name: &str, + request: &LlmRequest, + next: LlmExecutionNextFn, +) -> FlowResult { + clear_native_last_error(); + let name_string = native_string_from_str(name) + .ok_or_else(|| FlowError::Internal("failed to allocate native name".into()))?; + let request_json = serde_json::to_value(request) + .map_err(|err| FlowError::Internal(format!("failed to serialize LLM request: {err}")))?; + let request_string = native_string_from_json(&request_json) + .ok_or_else(|| FlowError::Internal("failed to allocate native LLM request".into()))?; + let next_ctx = Box::into_raw(Box::new(next)) as *mut c_void; + let mut out = ptr::null_mut(); + let status = unsafe { + cb( + user_data.ptr, + name_string, + request_string, + native_llm_next, + next_ctx, + &mut out, + ) + }; + unsafe { + drop(Box::from_raw(next_ctx as *mut LlmExecutionNextFn)); + native_string_free(name_string); + native_string_free(request_string); + } + if status != NemoRelayStatus::Ok { + if !out.is_null() { + unsafe { native_string_free(out) }; + } + return Err(flow_error_from_status( + status, + "native LLM execution failed", + )); + } + take_json_from_native_string(out, "native LLM execution returned null") +} + +unsafe extern "C" fn native_llm_next( + request_json: *const NemoRelayNativeString, + next_ctx: *mut c_void, + out_json: *mut *mut NemoRelayNativeString, +) -> NemoRelayStatus { + if next_ctx.is_null() || out_json.is_null() { + set_native_last_error("native LLM next received null pointer"); + return NemoRelayStatus::NullPointer; + } + let request = match parse_llm_request_arg(request_json, "native LLM next request") { + Ok(request) => request, + Err(status) => return status, + }; + let next = unsafe { (*(next_ctx as *const LlmExecutionNextFn)).clone() }; + let result = spawn_with_current_scope(move || native_runtime().block_on(next(request))).join(); + match result { + Ok(Ok(result)) => write_native_json(&result, out_json), + Ok(Err(err)) => status_from_flow_error(err), + Err(_) => { + set_native_last_error("native LLM next panicked"); + NemoRelayStatus::Internal + } + } +} +fn wrap_llm_stream_execution_fn( + instance: Arc, + cb: NemoRelayNativeLlmStreamExecutionCb, + user_data: *mut c_void, + free_fn: NemoRelayNativeFreeFn, +) -> LlmStreamExecutionFn { + let user_data = make_user_data(instance, user_data, free_fn); + Arc::new(move |name, request, next| { + let name = name.to_owned(); + let user_data = user_data.clone(); + Box::pin( + async move { call_llm_stream_execution_callback(cb, user_data, &name, &request, next) }, + ) + }) +} + +fn call_llm_stream_execution_callback( + cb: NemoRelayNativeLlmStreamExecutionCb, + user_data: Arc, + name: &str, + request: &LlmRequest, + next: LlmStreamExecutionNextFn, +) -> FlowResult { + clear_native_last_error(); + let name_string = native_string_from_str(name) + .ok_or_else(|| FlowError::Internal("failed to allocate native name".into()))?; + let request_json = serde_json::to_value(request) + .map_err(|err| FlowError::Internal(format!("failed to serialize LLM request: {err}")))?; + let request_string = native_string_from_json(&request_json) + .ok_or_else(|| FlowError::Internal("failed to allocate native LLM request".into()))?; + let next_ctx = NativeStreamNextContext::new(Box::into_raw(Box::new(next)) as *mut c_void); + let mut out = NemoRelayNativeLlmStreamV1::default(); + let status = unsafe { + cb( + user_data.ptr, + name_string, + request_string, + native_llm_stream_next, + next_ctx.ptr, + &mut out, + ) + }; + unsafe { + native_string_free(name_string); + native_string_free(request_string); + } + if status != NemoRelayStatus::Ok { + drop_native_stream(out); + return Err(flow_error_from_status( + status, + "native LLM stream execution failed", + )); + } + native_stream_to_relay_stream(out, Some(next_ctx), Some(user_data)) +} + +unsafe extern "C" fn native_llm_stream_next( + request_json: *const NemoRelayNativeString, + next_ctx: *mut c_void, + out_stream: *mut NemoRelayNativeLlmStreamV1, +) -> NemoRelayStatus { + if next_ctx.is_null() || out_stream.is_null() { + set_native_last_error("native LLM stream next received null pointer"); + return NemoRelayStatus::NullPointer; + } + unsafe { *out_stream = NemoRelayNativeLlmStreamV1::default() }; + let request = match parse_llm_request_arg(request_json, "native LLM stream next request") { + Ok(request) => request, + Err(status) => return status, + }; + let next = unsafe { (*(next_ctx as *const LlmStreamExecutionNextFn)).clone() }; + let result = spawn_with_current_scope(move || native_runtime().block_on(next(request))).join(); + match result { + Ok(Ok(stream)) => { + unsafe { *out_stream = relay_stream_to_native_stream(stream) }; + NemoRelayStatus::Ok + } + Ok(Err(err)) => status_from_flow_error(err), + Err(_) => { + set_native_last_error("native LLM stream next panicked"); + NemoRelayStatus::Internal + } + } +} + +struct NativeRelayLlmStream { + raw: NemoRelayNativeLlmStreamV1, + finished: bool, + _next_ctx: Option, + _callback_user_data: Option>, +} + +unsafe impl Send for NativeRelayLlmStream {} + +impl NativeRelayLlmStream { + fn from_raw( + raw: NemoRelayNativeLlmStreamV1, + next_ctx: Option, + callback_user_data: Option>, + ) -> FlowResult { + if raw.struct_size != std::mem::size_of::() { + let struct_size = raw.struct_size; + drop_native_stream(raw); + return Err(FlowError::Internal(format!( + "unsupported native LLM stream struct size: {}", + struct_size + ))); + } + if raw.next.is_none() { + drop_native_stream(raw); + return Err(FlowError::Internal( + "native LLM stream next callback was null".into(), + )); + } + Ok(Self { + raw, + finished: false, + _next_ctx: next_ctx, + _callback_user_data: callback_user_data, + }) + } + + fn finish(&mut self) { + self.finished = true; + if let Some(drop_fn) = self.raw.drop.take() { + unsafe { drop_fn(self.raw.user_data) }; + } + self.raw.user_data = ptr::null_mut(); + } +} + +impl Stream for NativeRelayLlmStream { + type Item = FlowResult; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if self.finished { + return Poll::Ready(None); + } + let Some(next) = self.raw.next else { + self.finish(); + return Poll::Ready(Some(Err(FlowError::Internal( + "native LLM stream next callback was null".into(), + )))); + }; + let mut out = ptr::null_mut(); + let status = unsafe { next(self.raw.user_data, &mut out) }; + match status { + NemoRelayStatus::Ok => { + if out.is_null() { + let error = FlowError::Internal( + native_last_error_message() + .unwrap_or_else(|| "native LLM stream returned null chunk".into()), + ); + self.finish(); + return Poll::Ready(Some(Err(error))); + } + let result = + take_json_from_native_string(out, "native LLM stream returned null chunk"); + if result.is_err() { + self.finish(); + } + Poll::Ready(Some(result)) + } + NemoRelayStatus::StreamEnd => { + if !out.is_null() { + unsafe { native_string_free(out) }; + } + self.finish(); + Poll::Ready(None) + } + status => { + if !out.is_null() { + unsafe { native_string_free(out) }; + } + let error = flow_error_from_status(status, "native LLM stream poll failed"); + self.finish(); + Poll::Ready(Some(Err(error))) + } + } + } +} + +impl Drop for NativeRelayLlmStream { + fn drop(&mut self) { + if !self.finished + && let Some(cancel) = self.raw.cancel + { + let _ = unsafe { cancel(self.raw.user_data) }; + } + self.finish(); + } +} + +fn native_stream_to_relay_stream( + raw: NemoRelayNativeLlmStreamV1, + next_ctx: Option, + callback_user_data: Option>, +) -> FlowResult { + Ok(Box::pin(NativeRelayLlmStream::from_raw( + raw, + next_ctx, + callback_user_data, + )?) as LlmJsonStream) +} + +fn drop_native_stream(mut raw: NemoRelayNativeLlmStreamV1) { + if let Some(drop_fn) = raw.drop.take() { + unsafe { drop_fn(raw.user_data) }; + } +} + +struct NativeHostLlmStream { + stream: Arc>>, +} + +struct NativeStreamNextContext { + ptr: *mut c_void, +} + +unsafe impl Send for NativeStreamNextContext {} + +impl NativeStreamNextContext { + fn new(ptr: *mut c_void) -> Self { + Self { ptr } + } +} + +impl Drop for NativeStreamNextContext { + fn drop(&mut self) { + if !self.ptr.is_null() { + drop(unsafe { Box::from_raw(self.ptr as *mut LlmStreamExecutionNextFn) }); + self.ptr = ptr::null_mut(); + } + } +} + +fn relay_stream_to_native_stream(stream: LlmJsonStream) -> NemoRelayNativeLlmStreamV1 { + let state = Box::new(NativeHostLlmStream { + stream: Arc::new(Mutex::new(Some(stream))), + }); + NemoRelayNativeLlmStreamV1 { + struct_size: std::mem::size_of::(), + user_data: Box::into_raw(state).cast(), + next: Some(poll_relay_llm_stream), + cancel: Some(cancel_relay_llm_stream), + drop: Some(drop_relay_llm_stream), + } +} + +unsafe extern "C" fn poll_relay_llm_stream( + user_data: *mut c_void, + out_json: *mut *mut NemoRelayNativeString, +) -> NemoRelayStatus { + if user_data.is_null() || out_json.is_null() { + set_native_last_error("native host LLM stream poll received null pointer"); + return NemoRelayStatus::NullPointer; + } + unsafe { *out_json = ptr::null_mut() }; + let state = unsafe { &*(user_data as *const NativeHostLlmStream) }; + let stream = state.stream.clone(); + let result = spawn_with_current_scope(move || { + native_runtime().block_on(async move { + let Some(mut current) = stream + .lock() + .map_err(|_| FlowError::Internal("native host LLM stream lock poisoned".into()))? + .take() + else { + return Ok(None); + }; + match current.next().await { + Some(Ok(chunk)) => { + *stream.lock().map_err(|_| { + FlowError::Internal("native host LLM stream lock poisoned".into()) + })? = Some(current); + Ok(Some(chunk)) + } + Some(Err(err)) => Err(err), + None => Ok(None), + } + }) + }) + .join(); + match result { + Ok(Ok(Some(chunk))) => write_native_json(&chunk, out_json), + Ok(Ok(None)) => NemoRelayStatus::StreamEnd, + Ok(Err(err)) => status_from_flow_error(err), + Err(_) => { + set_native_last_error("native host LLM stream poll panicked"); + NemoRelayStatus::Internal + } + } +} + +unsafe extern "C" fn cancel_relay_llm_stream(user_data: *mut c_void) -> NemoRelayStatus { + if user_data.is_null() { + set_native_last_error("native host LLM stream cancel received null pointer"); + return NemoRelayStatus::NullPointer; + } + let state = unsafe { &*(user_data as *const NativeHostLlmStream) }; + match state.stream.lock() { + Ok(mut stream) => { + stream.take(); + NemoRelayStatus::Ok + } + Err(_) => { + set_native_last_error("native host LLM stream lock poisoned"); + NemoRelayStatus::Internal + } + } +} + +unsafe extern "C" fn drop_relay_llm_stream(user_data: *mut c_void) { + if !user_data.is_null() { + drop(unsafe { Box::from_raw(user_data as *mut NativeHostLlmStream) }); + } +} + +fn parse_json_arg( + value: *const NemoRelayNativeString, + label: &str, +) -> Result { + let text = match read_native_string(value) { + Ok(text) => text, + Err(err) => { + set_native_last_error(err.to_string()); + return Err(NemoRelayStatus::InvalidUtf8); + } + }; + serde_json::from_str(&text).map_err(|err| { + set_native_last_error(format!("{label} was invalid JSON: {err}")); + NemoRelayStatus::InvalidJson + }) +} + +fn parse_llm_request_arg( + value: *const NemoRelayNativeString, + label: &str, +) -> Result { + let value = parse_json_arg(value, label)?; + serde_json::from_value(value).map_err(|err| { + set_native_last_error(format!("{label} was not an LLM request: {err}")); + NemoRelayStatus::InvalidJson + }) +} + +fn write_native_json(value: &Json, out: *mut *mut NemoRelayNativeString) -> NemoRelayStatus { + if out.is_null() { + set_native_last_error("out JSON pointer is null"); + return NemoRelayStatus::NullPointer; + } + let Some(handle) = native_string_from_json(value) else { + set_native_last_error("failed to serialize native JSON output"); + return NemoRelayStatus::Internal; + }; + unsafe { *out = handle }; + NemoRelayStatus::Ok +} diff --git a/crates/core/tests/fixtures/native_plugin/Cargo.toml b/crates/core/tests/fixtures/native_plugin/Cargo.toml new file mode 100644 index 000000000..5b8356fcd --- /dev/null +++ b/crates/core/tests/fixtures/native_plugin/Cargo.toml @@ -0,0 +1,18 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +[workspace] + +[package] +name = "nemo-relay-plugin-fixture" +version = "0.0.0" +edition = "2024" +license = "Apache-2.0" +publish = false + +[lib] +crate-type = ["cdylib"] + +[dependencies] +nemo-relay-plugin = { path = "../../../../plugin" } +serde_json = "1" diff --git a/crates/core/tests/fixtures/native_plugin/src/lib.rs b/crates/core/tests/fixtures/native_plugin/src/lib.rs new file mode 100644 index 000000000..9fc58bd41 --- /dev/null +++ b/crates/core/tests/fixtures/native_plugin/src/lib.rs @@ -0,0 +1,470 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +use std::ffi::c_void; +use std::ptr; + +use nemo_relay_plugin::{ + ConfigDiagnostic, DiagnosticLevel, Event, Json, LlmJsonStream, LlmRequest, + NemoRelayNativeHostApiV1, NemoRelayNativePluginContext, NemoRelayNativePluginV1, + NemoRelayNativeString, NemoRelayStatus, NativePlugin, PluginContext, PluginRuntime, + ScopeCategory, ScopeType, +}; +use serde_json::{Map, json}; + +struct FixtureNativePlugin; + +impl NativePlugin for FixtureNativePlugin { + fn plugin_kind(&self) -> &str { + "fixture_native" + } + + fn validate(&self, plugin_config: &Map) -> Vec { + if plugin_config + .get("reject") + .and_then(Json::as_bool) + .unwrap_or(false) + { + return vec![ConfigDiagnostic { + level: DiagnosticLevel::Error, + code: "fixture.rejected".into(), + component: Some("fixture_native".into()), + field: Some("reject".into()), + message: "fixture rejection requested".into(), + }]; + } + vec![] + } + + fn register( + &mut self, + _plugin_config: &Map, + ctx: &mut PluginContext<'_>, + ) -> nemo_relay_plugin::Result<()> { + let runtime = ctx.runtime(); + ctx.register_subscriber("fixture_subscriber", { + let runtime = runtime.clone(); + move |event| subscriber_mark(&runtime, event) + })?; + + ctx.register_tool_sanitize_request_guardrail( + "fixture_tool_sanitize_request", + 0, + |_name, args| mark_json(args, "native_plugin_tool_sanitize_request"), + )?; + ctx.register_tool_sanitize_response_guardrail( + "fixture_tool_sanitize_response", + 0, + |_name, result| mark_json(result, "native_plugin_tool_sanitize_response"), + )?; + ctx.register_tool_conditional_execution_guardrail( + "fixture_tool_conditional", + 0, + |_name, _args| Ok(None), + )?; + ctx.register_tool_request_intercept("fixture_rewrite_args", 0, false, { + let runtime = runtime.clone(); + move |_name, args| { + emit_runtime_events(&runtime)?; + Ok(mark_json(args, "native_plugin")) + } + })?; + ctx.register_tool_execution_intercept("fixture_tool_execution", 0, { + let runtime = runtime.clone(); + move |_name, args, next| { + let args = mark_json(args, "native_plugin_tool_execution_request"); + let result = if args + .get("use_isolated_next") + .and_then(Json::as_bool) + .unwrap_or(false) + { + let isolated = runtime.create_scope_stack()?; + let mut result = None; + isolated.with_current(|| { + let mut scope = runtime.scope( + "fixture.native.isolated.next", + ScopeType::Custom, + None, + None, + Some(&Json::String("isolated-next-input".into())), + )?; + result = Some(next.call(args)?); + scope.close(Some(&Json::String("isolated-next-output".into())), None) + })?; + result.ok_or_else(|| "isolated next did not produce a result".to_string())? + } else { + next.call(args)? + }; + Ok(mark_json(result, "native_plugin_tool_execution")) + } + })?; + + ctx.register_llm_sanitize_request_guardrail( + "fixture_llm_sanitize_request", + 0, + |request| mark_llm_request(request, "native_plugin_llm_sanitize_request"), + )?; + ctx.register_llm_sanitize_response_guardrail( + "fixture_llm_sanitize_response", + 0, + |response| mark_json(response, "native_plugin_llm_sanitize_response"), + )?; + ctx.register_llm_conditional_execution_guardrail( + "fixture_llm_conditional", + 0, + |_request| Ok(None), + )?; + ctx.register_llm_request_intercept( + "fixture_llm_request_intercept", + 0, + false, + |_name, request, annotated| { + Ok(( + mark_llm_request(request, "native_plugin_llm_request_intercept"), + annotated, + )) + }, + )?; + ctx.register_llm_execution_intercept("fixture_llm_execution", 0, |_name, request, next| { + let response = next.call(mark_llm_request( + request, + "native_plugin_llm_execution_request", + ))?; + Ok(mark_json(response, "native_plugin_llm_execution")) + })?; + ctx.register_llm_stream_execution_intercept( + "fixture_llm_stream_execution", + 0, + |_name, request, next| { + let stream = next.call(mark_llm_request( + request, + "native_plugin_llm_stream_execution_request", + ))?; + let stream: LlmJsonStream = Box::new(stream.map(|chunk| { + chunk.map(|chunk| mark_json(chunk, "native_plugin_llm_stream_execution")) + })); + Ok(stream) + }, + )?; + + Ok(()) + } +} + +fn subscriber_mark(runtime: &PluginRuntime, event: &Event) { + if event.name() == "native-plugin-test-outer" + && event.scope_category() == Some(ScopeCategory::Start) + { + let _ = runtime.emit_mark( + "fixture.native.subscriber.mark", + Some(&Json::String("subscriber".into())), + None, + ); + } +} + +fn emit_runtime_events(runtime: &PluginRuntime) -> nemo_relay_plugin::Result<()> { + runtime.emit_mark( + "fixture.native.mark", + Some(&Json::String("current".into())), + None, + )?; + let scope = runtime.push_scope( + "fixture.native.scope", + ScopeType::Custom, + None, + None, + Some(&Json::String("current-scope-input".into())), + )?; + runtime.emit_mark( + "fixture.native.scope.mark", + Some(&Json::String("inside-current-scope".into())), + None, + )?; + runtime.pop_scope( + &scope, + Some(&Json::String("current-scope-output".into())), + None, + )?; + + let thread_stack = runtime.create_scope_stack()?; + { + let _thread_guard = runtime.bind_scope_stack_thread(&thread_stack)?; + runtime.emit_mark( + "fixture.native.thread_stack.mark", + Some(&Json::String("thread-stack".into())), + None, + )?; + } + + let isolated = runtime.create_scope_stack()?; + isolated.with_current(|| { + runtime.emit_mark( + "fixture.native.isolated.mark", + Some(&Json::String("isolated".into())), + None, + )?; + let scope = runtime.push_scope( + "fixture.native.isolated.scope", + ScopeType::Custom, + None, + None, + Some(&Json::String("isolated-scope-input".into())), + )?; + runtime.pop_scope( + &scope, + Some(&Json::String("isolated-scope-output".into())), + None, + ) + }) +} + +fn mark_llm_request(mut request: LlmRequest, key: &str) -> LlmRequest { + request.content = mark_json(request.content, key); + request +} + +fn mark_json(mut value: Json, key: &str) -> Json { + if let Json::Object(object) = &mut value { + object.insert(key.into(), json!(true)); + } + value +} + +nemo_relay_plugin::nemo_relay_plugin!(nemo_relay_fixture_native_plugin, || FixtureNativePlugin); + +#[unsafe(no_mangle)] +pub unsafe extern "C" fn nemo_relay_fixture_entry_error( + host: *const NemoRelayNativeHostApiV1, + _out: *mut NemoRelayNativePluginV1, +) -> NemoRelayStatus { + unsafe { set_raw_last_error(host, "fixture entry failed") }; + NemoRelayStatus::Internal +} + +#[unsafe(no_mangle)] +pub unsafe extern "C" fn nemo_relay_fixture_small_descriptor( + host: *const NemoRelayNativeHostApiV1, + out: *mut NemoRelayNativePluginV1, +) -> NemoRelayStatus { + unsafe { + write_raw_descriptor( + host, + out, + "fixture_native", + Some(0), + None, + Some(raw_noop_register), + ) + } +} + +#[unsafe(no_mangle)] +pub unsafe extern "C" fn nemo_relay_fixture_null_kind( + host: *const NemoRelayNativeHostApiV1, + out: *mut NemoRelayNativePluginV1, +) -> NemoRelayStatus { + let status = + unsafe { write_raw_descriptor(host, out, "", None, None, Some(raw_noop_register)) }; + if status != NemoRelayStatus::Ok { + return status; + } + unsafe { + if !(*out).plugin_kind.is_null() { + let host = &*host; + (host.string_free)((*out).plugin_kind); + } + (*out).plugin_kind = ptr::null_mut(); + } + NemoRelayStatus::Ok +} + +#[unsafe(no_mangle)] +pub unsafe extern "C" fn nemo_relay_fixture_no_register( + host: *const NemoRelayNativeHostApiV1, + out: *mut NemoRelayNativePluginV1, +) -> NemoRelayStatus { + unsafe { write_raw_descriptor(host, out, "fixture_native", None, None, None) } +} + +#[unsafe(no_mangle)] +pub unsafe extern "C" fn nemo_relay_fixture_validate_error( + host: *const NemoRelayNativeHostApiV1, + out: *mut NemoRelayNativePluginV1, +) -> NemoRelayStatus { + unsafe { + write_raw_descriptor( + host, + out, + "fixture_native", + None, + Some(raw_validate_error), + Some(raw_noop_register), + ) + } +} + +#[unsafe(no_mangle)] +pub unsafe extern "C" fn nemo_relay_fixture_invalid_diagnostics( + host: *const NemoRelayNativeHostApiV1, + out: *mut NemoRelayNativePluginV1, +) -> NemoRelayStatus { + unsafe { + write_raw_descriptor( + host, + out, + "fixture_native", + None, + Some(raw_invalid_diagnostics_validate), + Some(raw_noop_register), + ) + } +} + +#[unsafe(no_mangle)] +pub unsafe extern "C" fn nemo_relay_fixture_register_error( + host: *const NemoRelayNativeHostApiV1, + out: *mut NemoRelayNativePluginV1, +) -> NemoRelayStatus { + unsafe { + write_raw_descriptor( + host, + out, + "fixture_native", + None, + None, + Some(raw_register_error), + ) + } +} + +type RawValidate = unsafe extern "C" fn( + *mut c_void, + *const NemoRelayNativeString, + *mut *mut NemoRelayNativeString, +) -> NemoRelayStatus; + +type RawRegister = unsafe extern "C" fn( + *mut c_void, + *const NemoRelayNativeString, + *mut NemoRelayNativePluginContext, +) -> NemoRelayStatus; + +unsafe fn write_raw_descriptor( + host: *const NemoRelayNativeHostApiV1, + out: *mut NemoRelayNativePluginV1, + kind: &str, + struct_size: Option, + validate: Option, + register: Option, +) -> NemoRelayStatus { + if host.is_null() || out.is_null() { + return NemoRelayStatus::NullPointer; + } + let host = unsafe { *host }; + let mut plugin = NemoRelayNativePluginV1::default(); + plugin.struct_size = struct_size.unwrap_or(std::mem::size_of::()); + plugin.plugin_kind = unsafe { raw_host_string(&host, kind) }; + if plugin.plugin_kind.is_null() && !kind.is_empty() { + return NemoRelayStatus::Internal; + } + plugin.user_data = Box::into_raw(Box::new(host)).cast(); + plugin.validate = validate; + plugin.register = register; + plugin.drop = Some(raw_drop_host); + unsafe { *out = plugin }; + NemoRelayStatus::Ok +} + +unsafe extern "C" fn raw_validate_error( + user_data: *mut c_void, + _plugin_config_json: *const NemoRelayNativeString, + _out_diagnostics_json: *mut *mut NemoRelayNativeString, +) -> NemoRelayStatus { + unsafe { set_raw_last_error_from_user_data(user_data, "fixture validate failed") }; + NemoRelayStatus::InvalidArg +} + +unsafe extern "C" fn raw_invalid_diagnostics_validate( + user_data: *mut c_void, + _plugin_config_json: *const NemoRelayNativeString, + out_diagnostics_json: *mut *mut NemoRelayNativeString, +) -> NemoRelayStatus { + if out_diagnostics_json.is_null() { + return NemoRelayStatus::NullPointer; + } + let host = unsafe { raw_host_from_user_data(user_data) }; + let Some(host) = host else { + return NemoRelayStatus::NullPointer; + }; + unsafe { + *out_diagnostics_json = raw_host_string(host, "not-json"); + } + NemoRelayStatus::Ok +} + +unsafe extern "C" fn raw_noop_register( + _user_data: *mut c_void, + _plugin_config_json: *const NemoRelayNativeString, + _ctx: *mut NemoRelayNativePluginContext, +) -> NemoRelayStatus { + NemoRelayStatus::Ok +} + +unsafe extern "C" fn raw_register_error( + user_data: *mut c_void, + _plugin_config_json: *const NemoRelayNativeString, + _ctx: *mut NemoRelayNativePluginContext, +) -> NemoRelayStatus { + unsafe { set_raw_last_error_from_user_data(user_data, "fixture register failed") }; + NemoRelayStatus::Internal +} + +unsafe extern "C" fn raw_drop_host(user_data: *mut c_void) { + if !user_data.is_null() { + drop(unsafe { Box::from_raw(user_data as *mut NemoRelayNativeHostApiV1) }); + } +} + +unsafe fn raw_host_from_user_data<'a>( + user_data: *mut c_void, +) -> Option<&'a NemoRelayNativeHostApiV1> { + if user_data.is_null() { + None + } else { + Some(unsafe { &*(user_data as *const NemoRelayNativeHostApiV1) }) + } +} + +unsafe fn set_raw_last_error_from_user_data(user_data: *mut c_void, message: &str) { + if let Some(host) = unsafe { raw_host_from_user_data(user_data) } { + unsafe { set_raw_last_error(host as *const _, message) }; + } +} + +unsafe fn set_raw_last_error(host: *const NemoRelayNativeHostApiV1, message: &str) { + if host.is_null() { + return; + } + let host = unsafe { &*host }; + let message = unsafe { raw_host_string(host, message) }; + if !message.is_null() { + unsafe { + (host.last_error_set)(message); + (host.string_free)(message); + } + } +} + +unsafe fn raw_host_string( + host: &NemoRelayNativeHostApiV1, + value: &str, +) -> *mut NemoRelayNativeString { + let mut out = ptr::null_mut(); + let status = unsafe { (host.string_new)(value.as_ptr(), value.len(), &mut out) }; + if status == NemoRelayStatus::Ok { + out + } else { + ptr::null_mut() + } +} diff --git a/crates/core/tests/integration/native_plugin_tests.rs b/crates/core/tests/integration/native_plugin_tests.rs new file mode 100644 index 000000000..c209a553e --- /dev/null +++ b/crates/core/tests/integration/native_plugin_tests.rs @@ -0,0 +1,1145 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +#![cfg(not(target_arch = "wasm32"))] + +//! Integration coverage for SDK-built native dynamic plugins. + +use std::path::{Path, PathBuf}; +use std::process::Command; +use std::sync::{Arc, Mutex}; + +use nemo_relay::api::event::{Event, ScopeCategory}; +use nemo_relay::api::llm::{ + LlmCallEndParams, LlmCallExecuteParams, LlmCallParams, LlmRequest, LlmStreamCallExecuteParams, + llm_call, llm_call_end, llm_call_execute, llm_stream_call_execute, +}; +use nemo_relay::api::runtime::{ + LlmJsonStream, TASK_SCOPE_STACK, ThreadScopeStackBinding, capture_thread_scope_stack, + create_scope_stack, restore_thread_scope_stack, set_thread_scope_stack, +}; +use nemo_relay::api::scope::{ + EmitMarkEventParams, PopScopeParams, PushScopeParams, ScopeType, event as emit_scope_mark, + pop_scope, push_scope, +}; +use nemo_relay::api::subscriber::{deregister_subscriber, flush_subscribers, register_subscriber}; +use nemo_relay::api::tool::{ToolCallExecuteParams, tool_call_execute, tool_request_intercepts}; +use nemo_relay::codec::response::AnnotatedLlmResponse; +use nemo_relay::plugin::dynamic::{NativePluginLoadSpec, load_native_plugins}; +use nemo_relay::plugin::{ + PluginComponentSpec, PluginConfig, clear_plugin_configuration, initialize_plugins_exact, +}; +use serde_json::{Map, Value as Json, json}; +use sha2::{Digest, Sha256}; +use tempfile::TempDir; +use tokio_stream::StreamExt; +use uuid::Uuid; + +static NATIVE_PLUGIN_TEST_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(()); + +struct ThreadScopeStackRestore(Option); + +impl ThreadScopeStackRestore { + fn capture() -> Self { + Self(Some(capture_thread_scope_stack())) + } +} + +impl Drop for ThreadScopeStackRestore { + fn drop(&mut self) { + if let Some(binding) = self.0.take() { + restore_thread_scope_stack(binding); + } + } +} + +struct NativePluginTestCleanup { + subscriber: Option<&'static str>, + plugin_configuration_active: bool, +} + +impl NativePluginTestCleanup { + fn new() -> Self { + Self { + subscriber: None, + plugin_configuration_active: false, + } + } + + fn mark_plugin_configuration_active(&mut self) { + self.plugin_configuration_active = true; + } + + fn mark_subscriber_registered(&mut self, name: &'static str) { + self.subscriber = Some(name); + } +} + +impl Drop for NativePluginTestCleanup { + fn drop(&mut self) { + if let Some(name) = self.subscriber.take() { + let _ = deregister_subscriber(name); + } + if self.plugin_configuration_active { + let _ = clear_plugin_configuration(); + } + } +} + +#[tokio::test] +async fn sdk_cdylib_registers_tool_request_intercept() { + let _guard = NATIVE_PLUGIN_TEST_LOCK.lock().await; + let fixture = build_fixture_plugin(); + let manifest_ref = write_manifest(&fixture); + + let activation = load_native_plugins([NativePluginLoadSpec { + plugin_id: "fixture_native".into(), + manifest_ref: manifest_ref.to_string_lossy().into_owned(), + }]) + .expect("native plugin should load"); + let mut cleanup = NativePluginTestCleanup::new(); + + let mut plugin_config = PluginConfig::default(); + plugin_config.components.push(PluginComponentSpec { + kind: "fixture_native".into(), + enabled: true, + config: Map::new(), + }); + initialize_plugins_exact(plugin_config) + .await + .expect("native plugin should initialize"); + cleanup.mark_plugin_configuration_active(); + + let events = Arc::new(Mutex::new(Vec::::new())); + let captured = events.clone(); + register_subscriber( + "native_plugin_fixture_events", + Arc::new(move |event| { + captured.lock().unwrap().push(event.clone()); + }), + ) + .expect("test subscriber should register"); + cleanup.mark_subscriber_registered("native_plugin_fixture_events"); + + let stack = create_scope_stack(); + let (outer_uuid, rewritten, tool_result) = TASK_SCOPE_STACK + .scope(stack, async { + let outer = push_scope( + PushScopeParams::builder() + .name("native-plugin-test-outer") + .scope_type(ScopeType::Agent) + .build(), + ) + .expect("outer scope should push"); + let outer_uuid = outer.uuid; + let rewritten = tool_request_intercepts("demo_tool", json!({ "input": "value" })) + .expect("native request intercept should run"); + let tool_result = tool_call_execute( + ToolCallExecuteParams::builder() + .name("native-fixture-tool") + .args(json!({ "input": "execute" })) + .func(Arc::new(|args| { + Box::pin(async move { Ok(json!({ "tool_callback": true, "args": args })) }) + })) + .build(), + ) + .await + .expect("native tool middleware should run"); + pop_scope(PopScopeParams::builder().handle_uuid(&outer.uuid).build()) + .expect("outer scope should pop"); + (outer_uuid, rewritten, tool_result) + }) + .await; + assert_eq!(rewritten["input"], "value"); + assert_eq!(rewritten["native_plugin"], true); + assert_eq!(tool_result["tool_callback"], true); + assert_eq!(tool_result["native_plugin_tool_execution"], true); + assert_eq!( + tool_result["args"]["native_plugin_tool_execution_request"], + true + ); + + flush_subscribers().expect("native fixture events should flush"); + let first_events = events.lock().unwrap().clone(); + find_event(&first_events, "fixture.native.subscriber.mark", None); + assert_parent(&first_events, "fixture.native.mark", None, Some(outer_uuid)); + assert_parent( + &first_events, + "fixture.native.scope", + Some(ScopeCategory::Start), + Some(outer_uuid), + ); + assert_not_parent( + &first_events, + "fixture.native.isolated.mark", + None, + outer_uuid, + ); + assert_not_parent( + &first_events, + "fixture.native.isolated.scope", + Some(ScopeCategory::Start), + outer_uuid, + ); + let tool_start = find_event( + &first_events, + "native-fixture-tool", + Some(ScopeCategory::Start), + ); + assert_eq!( + tool_start.input().unwrap()["native_plugin_tool_sanitize_request"], + true + ); + let tool_end = find_event( + &first_events, + "native-fixture-tool", + Some(ScopeCategory::End), + ); + assert_eq!( + tool_end.output().unwrap()["native_plugin_tool_sanitize_response"], + true + ); + + events.lock().unwrap().clear(); + let isolated_next_stack = create_scope_stack(); + let isolated_next_outer_uuid = TASK_SCOPE_STACK + .scope(isolated_next_stack, async { + let outer = push_scope( + PushScopeParams::builder() + .name("native-plugin-test-isolated-next-outer") + .scope_type(ScopeType::Agent) + .build(), + ) + .expect("isolated next outer scope should push"); + let outer_uuid = outer.uuid; + let result = tool_call_execute( + ToolCallExecuteParams::builder() + .name("native-fixture-tool-isolated-next") + .args(json!({ + "input": "isolated-next", + "use_isolated_next": true + })) + .func(Arc::new(|_args| { + Box::pin(async move { + emit_scope_mark( + EmitMarkEventParams::builder() + .name("native-fixture-tool-callback-mark") + .build(), + )?; + Ok(json!({ "tool_callback": true })) + }) + })) + .build(), + ) + .await + .expect("native isolated next middleware should run"); + assert_eq!(result["tool_callback"], true); + assert_eq!(result["native_plugin_tool_execution"], true); + pop_scope(PopScopeParams::builder().handle_uuid(&outer.uuid).build()) + .expect("isolated next outer scope should pop"); + outer_uuid + }) + .await; + flush_subscribers().expect("isolated next native fixture events should flush"); + let isolated_next_events = events.lock().unwrap().clone(); + let isolated_next_scope = find_event( + &isolated_next_events, + "fixture.native.isolated.next", + Some(ScopeCategory::Start), + ); + let callback_mark = find_event( + &isolated_next_events, + "native-fixture-tool-callback-mark", + None, + ); + assert_eq!( + callback_mark.parent_uuid(), + Some(isolated_next_scope.uuid()) + ); + assert_ne!( + callback_mark.parent_uuid(), + Some(isolated_next_outer_uuid), + "native next callback should use the plugin-selected isolated stack" + ); + + events.lock().unwrap().clear(); + { + let thread_stack = create_scope_stack(); + let _thread_stack_restore = ThreadScopeStackRestore::capture(); + set_thread_scope_stack(thread_stack); + let thread_outer = push_scope( + PushScopeParams::builder() + .name("native-plugin-test-thread-outer") + .scope_type(ScopeType::Agent) + .build(), + ) + .expect("thread outer scope should push"); + let thread_outer_uuid = thread_outer.uuid; + let rewritten = tool_request_intercepts("demo_tool", json!({ "input": "thread" })) + .expect("native request intercept should run with thread stack"); + assert_eq!(rewritten["native_plugin"], true); + pop_scope( + PopScopeParams::builder() + .handle_uuid(&thread_outer.uuid) + .build(), + ) + .expect("thread outer scope should pop"); + flush_subscribers().expect("thread-stack native fixture events should flush"); + let thread_events = events.lock().unwrap().clone(); + assert_parent( + &thread_events, + "fixture.native.mark", + None, + Some(thread_outer_uuid), + ); + assert_not_parent( + &thread_events, + "fixture.native.thread_stack.mark", + None, + thread_outer_uuid, + ); + } + + events.lock().unwrap().clear(); + let llm_execute_response = llm_call_execute( + LlmCallExecuteParams::builder() + .name("native-fixture-llm-execute") + .request(LlmRequest { + headers: Map::new(), + content: json!({ "prompt": "managed" }), + }) + .func(Arc::new(|request| { + Box::pin(async move { + Ok(json!({ + "id": "managed-response", + "request": request.content, + "llm_callback": true + })) + }) + })) + .build(), + ) + .await + .expect("native LLM middleware should run"); + assert_eq!(llm_execute_response["llm_callback"], true); + assert_eq!(llm_execute_response["native_plugin_llm_execution"], true); + assert_eq!( + llm_execute_response["request"]["native_plugin_llm_execution_request"], + true + ); + flush_subscribers().expect("managed LLM native fixture events should flush"); + let managed_llm_events = events.lock().unwrap().clone(); + let llm_start = find_event( + &managed_llm_events, + "native-fixture-llm-execute", + Some(ScopeCategory::Start), + ); + assert_eq!( + llm_start.input().unwrap()["content"]["native_plugin_llm_sanitize_request"], + true + ); + assert_eq!( + llm_start.input().unwrap()["content"]["native_plugin_llm_request_intercept"], + true + ); + let llm_end = find_event( + &managed_llm_events, + "native-fixture-llm-execute", + Some(ScopeCategory::End), + ); + assert_eq!( + llm_end.output().unwrap()["native_plugin_llm_sanitize_response"], + true + ); + assert!(llm_end.annotated_response().is_none()); + + events.lock().unwrap().clear(); + let collected_stream_chunks = Arc::new(Mutex::new(Vec::::new())); + let collector_chunks = collected_stream_chunks.clone(); + let finalizer_chunks = collected_stream_chunks.clone(); + let mut stream = llm_stream_call_execute( + LlmStreamCallExecuteParams::builder() + .name("native-fixture-llm-stream") + .request(LlmRequest { + headers: Map::new(), + content: json!({ "prompt": "stream" }), + }) + .func(Arc::new(|request| { + Box::pin(async move { + Ok(Box::pin(tokio_stream::iter(vec![ + Ok(json!({ + "stream_chunk": 1, + "request": request.content, + })), + Ok(json!({ "stream_chunk": 2 })), + ])) as LlmJsonStream) + }) + })) + .collector(Box::new(move |chunk| { + collector_chunks.lock().unwrap().push(chunk); + Ok(()) + })) + .finalizer(Box::new(move || { + Json::Array(finalizer_chunks.lock().unwrap().clone()) + })) + .build(), + ) + .await + .expect("native LLM stream middleware should run"); + let mut stream_chunks = Vec::new(); + while let Some(chunk) = stream.next().await { + stream_chunks.push(chunk.expect("native stream chunk should succeed")); + } + assert_eq!(stream_chunks.len(), 2); + assert_eq!( + stream_chunks[0]["request"]["native_plugin_llm_stream_execution_request"], + true + ); + assert_eq!(stream_chunks[0]["native_plugin_llm_stream_execution"], true); + assert_eq!(stream_chunks[1]["native_plugin_llm_stream_execution"], true); + assert_eq!(*collected_stream_chunks.lock().unwrap(), stream_chunks); + flush_subscribers().expect("stream native fixture events should flush"); + let stream_events = events.lock().unwrap().clone(); + let stream_end = find_event( + &stream_events, + "native-fixture-llm-stream", + Some(ScopeCategory::End), + ); + assert_eq!( + stream_end.output().unwrap()[0]["native_plugin_llm_stream_execution"], + true + ); + + events.lock().unwrap().clear(); + let llm_request = LlmRequest { + headers: Map::new(), + content: json!({ "prompt": "hello" }), + }; + let handle = llm_call( + LlmCallParams::builder() + .name("native-fixture-llm") + .request(&llm_request) + .build(), + ) + .expect("llm start should emit"); + let mut extra = Map::new(); + extra.insert("preexisting_annotation".into(), json!("kept")); + llm_call_end( + LlmCallEndParams::builder() + .handle(&handle) + .response(json!({ "id": "response-from-test", "content": "done" })) + .annotated_response(Arc::new(AnnotatedLlmResponse { + id: Some("annotation-before-plugin".into()), + model: None, + message: None, + tool_calls: None, + finish_reason: None, + usage: None, + api_specific: None, + extra, + })) + .build(), + ) + .expect("llm end should emit"); + flush_subscribers().expect("llm response annotation event should flush"); + let llm_events = events.lock().unwrap().clone(); + let llm_end = find_event(&llm_events, "native-fixture-llm", Some(ScopeCategory::End)); + let annotated = llm_end + .annotated_response() + .expect("native plugin should preserve response annotation"); + assert_eq!(annotated.id.as_deref(), Some("annotation-before-plugin")); + assert_eq!(annotated.extra["preexisting_annotation"], json!("kept")); + assert!(annotated.extra.get("native_plugin_annotation").is_none()); + + drop(cleanup); + activation.clear(); +} + +#[tokio::test] +async fn native_validation_diagnostics_prevent_initialization() { + let _guard = NATIVE_PLUGIN_TEST_LOCK.lock().await; + let fixture = build_fixture_plugin(); + let manifest_ref = write_manifest(&fixture); + + let activation = load_native_plugins([NativePluginLoadSpec { + plugin_id: "fixture_native".into(), + manifest_ref: manifest_ref.to_string_lossy().into_owned(), + }]) + .expect("native plugin should load"); + + let mut plugin_config = PluginConfig::default(); + plugin_config.components.push(PluginComponentSpec { + kind: "fixture_native".into(), + enabled: true, + config: Map::from_iter([("reject".into(), json!(true))]), + }); + let error = initialize_plugins_exact(plugin_config) + .await + .expect_err("validation diagnostics should prevent initialization") + .to_string(); + assert!(error.contains("fixture rejection requested"), "{error}"); + + clear_plugin_configuration().expect("native plugin config should clear"); + activation.clear(); +} + +#[test] +fn native_loader_rejects_missing_library() { + let _guard = NATIVE_PLUGIN_TEST_LOCK.blocking_lock(); + let manifest_dir = TempDir::new().expect("manifest dir"); + let missing_library = manifest_dir.path().join("libmissing_native_plugin.so"); + let manifest_ref = write_manifest_text(ManifestOptions { + manifest_dir: manifest_dir.path(), + plugin_id: "fixture_native", + relay: &format!("={}", env!("CARGO_PKG_VERSION")), + library: &missing_library.to_string_lossy(), + symbol: "nemo_relay_fixture_native_plugin", + integrity: None, + }); + + let error = expect_native_load_error( + NativePluginLoadSpec { + plugin_id: "fixture_native".into(), + manifest_ref: manifest_ref.to_string_lossy().into_owned(), + }, + "missing library should fail", + ); + assert!(error.contains("does not exist"), "{error}"); +} + +#[test] +fn native_loader_returns_empty_activation_for_empty_specs() { + let activation = + load_native_plugins(std::iter::empty::()).expect("empty load"); + assert!(activation.is_empty()); +} + +#[test] +fn native_activation_clear_deregisters_plugin_kind() { + let _guard = NATIVE_PLUGIN_TEST_LOCK.blocking_lock(); + let fixture = build_fixture_plugin(); + let manifest_ref = write_manifest(&fixture); + + let activation = load_native_plugins([load_spec("fixture_native", &manifest_ref)]) + .expect("native plugin should load"); + assert!(!activation.is_empty()); + activation.clear(); + + let activation = load_native_plugins([load_spec("fixture_native", &manifest_ref)]) + .expect("native plugin should reload after activation clear"); + activation.clear(); +} + +#[test] +fn native_loader_resolves_manifest_directory_and_relative_library_paths() { + let _guard = NATIVE_PLUGIN_TEST_LOCK.blocking_lock(); + let fixture = build_fixture_plugin(); + let relative_dir = fixture.manifest_dir.path().join("lib"); + std::fs::create_dir_all(&relative_dir).expect("relative lib dir"); + let relative_library = Path::new("lib").join(fixture_library_name()); + std::fs::copy( + &fixture.library_path, + fixture.manifest_dir.path().join(&relative_library), + ) + .expect("copy fixture library"); + write_manifest_text(ManifestOptions { + manifest_dir: fixture.manifest_dir.path(), + plugin_id: "fixture_native", + relay: &format!("={}", env!("CARGO_PKG_VERSION")), + library: &relative_library.to_string_lossy(), + symbol: "nemo_relay_fixture_native_plugin", + integrity: None, + }); + + let activation = load_native_plugins([NativePluginLoadSpec { + plugin_id: "fixture_native".into(), + manifest_ref: fixture.manifest_dir.path().to_string_lossy().into_owned(), + }]) + .expect("native plugin should load from manifest directory"); + activation.clear(); +} + +#[test] +fn native_loader_rolls_back_partially_loaded_plugins() { + let _guard = NATIVE_PLUGIN_TEST_LOCK.blocking_lock(); + let fixture = build_fixture_plugin(); + let valid_manifest = write_manifest(&fixture); + let missing_manifest_dir = TempDir::new().expect("missing manifest dir"); + let missing_library = missing_manifest_dir + .path() + .join("libmissing_native_plugin.so"); + let missing_manifest = write_manifest_text(ManifestOptions { + manifest_dir: missing_manifest_dir.path(), + plugin_id: "fixture_native_missing", + relay: &format!("={}", env!("CARGO_PKG_VERSION")), + library: &missing_library.to_string_lossy(), + symbol: "nemo_relay_fixture_native_plugin", + integrity: None, + }); + + let error = expect_native_load_error_from_specs( + [ + load_spec("fixture_native", &valid_manifest), + load_spec("fixture_native_missing", &missing_manifest), + ], + "partial load failure should fail", + ); + assert!(error.contains("does not exist"), "{error}"); + + let activation = load_native_plugins([load_spec("fixture_native", &valid_manifest)]) + .expect("first plugin kind should be deregistered after rollback"); + activation.clear(); +} + +#[test] +fn native_loader_rejects_unsupported_relay_requirement_before_loading() { + let _guard = NATIVE_PLUGIN_TEST_LOCK.blocking_lock(); + let manifest_dir = TempDir::new().expect("manifest dir"); + let manifest_ref = write_manifest_text(ManifestOptions { + manifest_dir: manifest_dir.path(), + plugin_id: "fixture_native", + relay: ">=1.0,<2.0", + library: "libdoes-not-need-to-exist.so", + symbol: "nemo_relay_fixture_native_plugin", + integrity: None, + }); + + let error = expect_native_load_error( + NativePluginLoadSpec { + plugin_id: "fixture_native".into(), + manifest_ref: manifest_ref.to_string_lossy().into_owned(), + }, + "unsupported relay requirement should fail", + ); + assert!(error.contains("requires relay"), "{error}"); +} + +#[test] +fn native_loader_rejects_manifest_contract_errors_before_loading_library() { + let _guard = NATIVE_PLUGIN_TEST_LOCK.blocking_lock(); + let manifest_dir = TempDir::new().expect("manifest dir"); + + let mismatched_id = write_raw_manifest( + manifest_dir.path(), + &native_manifest_text( + "fixture_manifest_id", + &format!("={}", env!("CARGO_PKG_VERSION")), + "1", + "libdoes-not-need-to-exist.so", + "nemo_relay_fixture_native_plugin", + ), + ); + let error = expect_native_load_error( + NativePluginLoadSpec { + plugin_id: "fixture_expected_id".into(), + manifest_ref: mismatched_id.to_string_lossy().into_owned(), + }, + "manifest id mismatch should fail", + ); + assert!(error.contains("does not match expected id"), "{error}"); + + let invalid_relay = write_raw_manifest( + manifest_dir.path(), + &native_manifest_text( + "fixture_native", + "not a version requirement", + "1", + "libdoes-not-need-to-exist.so", + "nemo_relay_fixture_native_plugin", + ), + ); + let error = expect_native_load_error( + NativePluginLoadSpec { + plugin_id: "fixture_native".into(), + manifest_ref: invalid_relay.to_string_lossy().into_owned(), + }, + "invalid relay requirement should fail", + ); + assert!( + error.contains("invalid compat.relay version requirement"), + "{error}" + ); + + let unsupported_native_api = write_raw_manifest( + manifest_dir.path(), + &native_manifest_text( + "fixture_native", + &format!("={}", env!("CARGO_PKG_VERSION")), + "2", + "libdoes-not-need-to-exist.so", + "nemo_relay_fixture_native_plugin", + ), + ); + let error = expect_native_load_error( + NativePluginLoadSpec { + plugin_id: "fixture_native".into(), + manifest_ref: unsupported_native_api.to_string_lossy().into_owned(), + }, + "unsupported native API should fail", + ); + assert!(error.contains("unsupported compat.native_api"), "{error}"); + + let worker_manifest = write_raw_manifest( + manifest_dir.path(), + r#" +manifest_version = 1 + +[plugin] +id = "fixture_worker" +kind = "worker" + +[compat] +relay = ">=0.5,<1.0" +worker_protocol = "grpc-v1" + +[defaults] +enabled = false + +[capabilities] +items = ["plugin_worker"] + +[load] +runtime = "python" +entrypoint = "fixture.worker:create_plugin" +"#, + ); + let error = expect_native_load_error( + NativePluginLoadSpec { + plugin_id: "fixture_worker".into(), + manifest_ref: worker_manifest.to_string_lossy().into_owned(), + }, + "worker manifest should fail native loading", + ); + assert!(error.contains("only supports rust_dynamic"), "{error}"); +} + +#[test] +fn native_manifest_writer_escapes_toml_strings() { + let _guard = NATIVE_PLUGIN_TEST_LOCK.blocking_lock(); + let manifest_dir = TempDir::new().expect("manifest dir"); + let windows_style_library = + r"C:\Users\RUNNER~1\AppData\Local\Temp\.tmpPath\debug\nemo_relay_plugin_fixture.dll"; + let manifest_ref = write_manifest_text(ManifestOptions { + manifest_dir: manifest_dir.path(), + plugin_id: "fixture_native", + relay: &format!("={}", env!("CARGO_PKG_VERSION")), + library: windows_style_library, + symbol: "nemo_relay_fixture_native_plugin", + integrity: Some(r"sha256:abc\def"), + }); + + let manifest = std::fs::read_to_string(manifest_ref).expect("read relay-plugin.toml"); + let parsed: toml::Value = toml::from_str(&manifest).expect("manifest should parse"); + assert_eq!( + parsed["load"]["library"].as_str(), + Some(windows_style_library) + ); + assert_eq!( + parsed["integrity"]["sha256"].as_str(), + Some(r"sha256:abc\def") + ); +} + +#[test] +fn native_loader_rejects_missing_symbol_digest_mismatch_and_kind_mismatch() { + let _guard = NATIVE_PLUGIN_TEST_LOCK.blocking_lock(); + let fixture = build_fixture_plugin(); + + let missing_symbol = write_manifest_with_symbol(&fixture, "missing_native_symbol"); + let error = expect_native_load_error( + NativePluginLoadSpec { + plugin_id: "fixture_native".into(), + manifest_ref: missing_symbol.to_string_lossy().into_owned(), + }, + "missing symbol should fail", + ); + assert!(error.contains("symbol"), "{error}"); + + let digest_match = write_manifest_with_integrity(&fixture, &sha256(&fixture.library_path)); + let activation = load_native_plugins([NativePluginLoadSpec { + plugin_id: "fixture_native".into(), + manifest_ref: digest_match.to_string_lossy().into_owned(), + }]) + .expect("matching digest should load"); + activation.clear(); + + let digest_mismatch = write_manifest_with_integrity(&fixture, "sha256:deadbeef"); + let error = expect_native_load_error( + NativePluginLoadSpec { + plugin_id: "fixture_native".into(), + manifest_ref: digest_mismatch.to_string_lossy().into_owned(), + }, + "digest mismatch should fail", + ); + assert!(error.contains("sha256 mismatch"), "{error}"); + + let wrong_kind = write_manifest_with_plugin_id(&fixture, "fixture_native_mismatch"); + let error = expect_native_load_error( + NativePluginLoadSpec { + plugin_id: "fixture_native_mismatch".into(), + manifest_ref: wrong_kind.to_string_lossy().into_owned(), + }, + "plugin kind mismatch should fail", + ); + assert!(error.contains("returned kind"), "{error}"); +} + +#[test] +fn native_loader_rejects_entry_and_descriptor_failures() { + let _guard = NATIVE_PLUGIN_TEST_LOCK.blocking_lock(); + let fixture = build_fixture_plugin(); + + for (symbol, expected) in [ + ("nemo_relay_fixture_entry_error", "fixture entry failed"), + ( + "nemo_relay_fixture_small_descriptor", + "incompatible plugin descriptor size", + ), + ("nemo_relay_fixture_null_kind", "null plugin_kind"), + ("nemo_relay_fixture_no_register", "no register callback"), + ] { + let manifest_ref = write_manifest_with_symbol(&fixture, symbol); + let error = expect_native_load_error( + load_spec("fixture_native", &manifest_ref), + "invalid native descriptor should fail", + ); + assert!( + error.contains(expected), + "expected {expected:?} in error: {error}" + ); + } +} + +#[tokio::test] +async fn native_validate_and_register_callback_errors_are_reported() { + let _guard = NATIVE_PLUGIN_TEST_LOCK.lock().await; + let fixture = build_fixture_plugin(); + + for (symbol, expected) in [ + ( + "nemo_relay_fixture_validate_error", + "fixture validate failed", + ), + ( + "nemo_relay_fixture_invalid_diagnostics", + "invalid diagnostics JSON", + ), + ( + "nemo_relay_fixture_register_error", + "fixture register failed", + ), + ] { + let manifest_ref = write_manifest_with_symbol(&fixture, symbol); + let activation = load_native_plugins([load_spec("fixture_native", &manifest_ref)]) + .expect("native plugin should load"); + let error = initialize_fixture_native(Map::new()) + .await + .expect_err("native plugin initialization should fail") + .to_string(); + assert!( + error.contains(expected), + "expected {expected:?} in error: {error}" + ); + clear_plugin_configuration().expect("native plugin config should clear"); + activation.clear(); + } +} + +async fn initialize_fixture_native(config: Map) -> nemo_relay::plugin::Result<()> { + let mut plugin_config = PluginConfig::default(); + plugin_config.components.push(PluginComponentSpec { + kind: "fixture_native".into(), + enabled: true, + config, + }); + initialize_plugins_exact(plugin_config).await.map(|_| ()) +} + +fn expect_native_load_error(spec: NativePluginLoadSpec, message: &str) -> String { + expect_native_load_error_from_specs([spec], message) +} + +fn expect_native_load_error_from_specs(specs: I, message: &str) -> String +where + I: IntoIterator, +{ + match load_native_plugins(specs) { + Ok(activation) => { + activation.clear(); + panic!("{message}"); + } + Err(error) => error.to_string(), + } +} + +fn load_spec(plugin_id: &str, manifest_ref: &Path) -> NativePluginLoadSpec { + NativePluginLoadSpec { + plugin_id: plugin_id.into(), + manifest_ref: manifest_ref.to_string_lossy().into_owned(), + } +} + +fn assert_parent( + events: &[Event], + name: &str, + scope_category: Option, + expected_parent: Option, +) { + let event = find_event(events, name, scope_category); + assert_eq!( + event.parent_uuid(), + expected_parent, + "{name} parent mismatch" + ); +} + +fn assert_not_parent( + events: &[Event], + name: &str, + scope_category: Option, + unexpected_parent: Uuid, +) { + let event = find_event(events, name, scope_category); + assert_ne!( + event.parent_uuid(), + Some(unexpected_parent), + "{name} should be emitted on an isolated stack" + ); +} + +fn find_event<'a>( + events: &'a [Event], + name: &str, + scope_category: Option, +) -> &'a Event { + events + .iter() + .find(|event| event.name() == name && event.scope_category() == scope_category) + .unwrap_or_else(|| panic!("missing event {name} with scope category {scope_category:?}")) +} + +struct BuiltFixture { + _source_dir: TempDir, + _target_dir: TempDir, + manifest_dir: TempDir, + library_path: PathBuf, +} + +fn build_fixture_plugin() -> BuiltFixture { + let source_dir = TempDir::new().expect("fixture source dir"); + let fixture_dir = source_dir.path().join("native_plugin"); + let fixture_src_dir = fixture_dir.join("src"); + std::fs::create_dir_all(&fixture_src_dir).expect("fixture src dir"); + let native_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("../plugin"); + let fixture_manifest = std::fs::read_to_string( + Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/native_plugin/Cargo.toml"), + ) + .expect("fixture Cargo.toml template") + .replace( + r#"nemo-relay-plugin = { path = "../../../../plugin" }"#, + &format!("nemo-relay-plugin = {{ path = {native_path:?} }}"), + ); + std::fs::write(fixture_dir.join("Cargo.toml"), fixture_manifest).expect("fixture Cargo.toml"); + std::fs::copy( + Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/native_plugin/src/lib.rs"), + fixture_src_dir.join("lib.rs"), + ) + .expect("fixture lib.rs"); + let target_dir = TempDir::new().expect("fixture target dir"); + let manifest_dir = TempDir::new().expect("fixture manifest dir"); + let cargo = std::env::var("CARGO").unwrap_or_else(|_| "cargo".into()); + let status = Command::new(cargo) + .arg("build") + .arg("--quiet") + .arg("--manifest-path") + .arg(fixture_dir.join("Cargo.toml")) + .arg("--target-dir") + .arg(target_dir.path()) + .status() + .expect("fixture cargo build should start"); + assert!(status.success(), "fixture cargo build failed: {status}"); + + let library_path = target_dir.path().join("debug").join(fixture_library_name()); + assert!( + library_path.exists(), + "fixture library missing at {}", + library_path.display() + ); + + BuiltFixture { + _source_dir: source_dir, + _target_dir: target_dir, + manifest_dir, + library_path, + } +} + +fn write_manifest(fixture: &BuiltFixture) -> PathBuf { + write_manifest_text(ManifestOptions { + manifest_dir: fixture.manifest_dir.path(), + plugin_id: "fixture_native", + relay: &format!("={}", env!("CARGO_PKG_VERSION")), + library: &fixture.library_path.to_string_lossy(), + symbol: "nemo_relay_fixture_native_plugin", + integrity: None, + }) +} + +fn write_manifest_with_symbol(fixture: &BuiltFixture, symbol: &str) -> PathBuf { + write_manifest_text(ManifestOptions { + manifest_dir: fixture.manifest_dir.path(), + plugin_id: "fixture_native", + relay: &format!("={}", env!("CARGO_PKG_VERSION")), + library: &fixture.library_path.to_string_lossy(), + symbol, + integrity: None, + }) +} + +fn write_manifest_with_plugin_id(fixture: &BuiltFixture, plugin_id: &str) -> PathBuf { + write_manifest_text(ManifestOptions { + manifest_dir: fixture.manifest_dir.path(), + plugin_id, + relay: &format!("={}", env!("CARGO_PKG_VERSION")), + library: &fixture.library_path.to_string_lossy(), + symbol: "nemo_relay_fixture_native_plugin", + integrity: None, + }) +} + +fn write_manifest_with_integrity(fixture: &BuiltFixture, integrity: &str) -> PathBuf { + write_manifest_text(ManifestOptions { + manifest_dir: fixture.manifest_dir.path(), + plugin_id: "fixture_native", + relay: &format!("={}", env!("CARGO_PKG_VERSION")), + library: &fixture.library_path.to_string_lossy(), + symbol: "nemo_relay_fixture_native_plugin", + integrity: Some(integrity), + }) +} + +struct ManifestOptions<'a> { + manifest_dir: &'a Path, + plugin_id: &'a str, + relay: &'a str, + library: &'a str, + symbol: &'a str, + integrity: Option<&'a str>, +} + +fn write_manifest_text(options: ManifestOptions<'_>) -> PathBuf { + let manifest_ref = options.manifest_dir.join("relay-plugin.toml"); + let integrity = options + .integrity + .map(|sha256| format!("\n[integrity]\nsha256 = {}\n", toml_string(sha256))) + .unwrap_or_default(); + let plugin_id = toml_string(options.plugin_id); + let relay = toml_string(options.relay); + let library = toml_string(options.library); + let symbol = toml_string(options.symbol); + let manifest = format!( + r#" +manifest_version = 1 + +[plugin] +id = {plugin_id} +kind = "rust_dynamic" + +[compat] +relay = {relay} +native_api = "1" + +[defaults] +enabled = false + +[capabilities] +items = ["plugin_native"] + +[load] +library = {library} +symbol = {symbol} +{integrity} +"#, + plugin_id = plugin_id, + relay = relay, + library = library, + symbol = symbol, + integrity = integrity, + ); + std::fs::write(&manifest_ref, manifest).expect("write relay-plugin.toml"); + manifest_ref +} + +fn write_raw_manifest(manifest_dir: &Path, manifest: &str) -> PathBuf { + let manifest_ref = manifest_dir.join("relay-plugin.toml"); + std::fs::write(&manifest_ref, manifest).expect("write relay-plugin.toml"); + manifest_ref +} + +fn native_manifest_text( + plugin_id: &str, + relay: &str, + native_api: &str, + library: &str, + symbol: &str, +) -> String { + format!( + r#" +manifest_version = 1 + +[plugin] +id = {plugin_id} +kind = "rust_dynamic" + +[compat] +relay = {relay} +native_api = {native_api} + +[defaults] +enabled = false + +[capabilities] +items = ["plugin_native"] + +[load] +library = {library} +symbol = {symbol} +"#, + plugin_id = toml_string(plugin_id), + relay = toml_string(relay), + native_api = toml_string(native_api), + library = toml_string(library), + symbol = toml_string(symbol), + ) +} + +fn toml_string(value: &str) -> String { + serde_json::to_string(value).expect("TOML-compatible string escape should succeed") +} + +fn sha256(path: &Path) -> String { + let bytes = std::fs::read(path).expect("read file for digest"); + let digest = Sha256::digest(bytes); + format!("sha256:{}", hex_digest(digest)) +} + +fn hex_digest(bytes: impl AsRef<[u8]>) -> String { + const HEX: &[u8; 16] = b"0123456789abcdef"; + let bytes = bytes.as_ref(); + let mut out = String::with_capacity(bytes.len() * 2); + for byte in bytes { + out.push(HEX[(byte >> 4) as usize] as char); + out.push(HEX[(byte & 0x0f) as usize] as char); + } + out +} + +fn fixture_library_name() -> &'static str { + if cfg!(target_os = "windows") { + "nemo_relay_plugin_fixture.dll" + } else if cfg!(target_os = "macos") { + "libnemo_relay_plugin_fixture.dylib" + } else { + "libnemo_relay_plugin_fixture.so" + } +}