Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion crates/cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,9 @@ pub(crate) fn resolve_run_config(
resolved.gateway.bind = "127.0.0.1:0"
.parse()
.expect("valid transparent bind address");
enforce_required_dynamic_plugin_startup(config, &resolved)?;
if !command.dry_run {
enforce_required_dynamic_plugin_startup(config, &resolved)?;
}
Ok(resolved)
}

Expand Down
42 changes: 38 additions & 4 deletions crates/cli/src/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::error::CliError;
use crate::installer::{
generated_hooks, hook_forward_command, merge_hermes_config, merge_hooks, read_json_file,
};
use crate::plugins::lifecycle::ActiveDynamicPluginComponent;
use crate::server;

/// Runs a child coding-agent command behind an ephemeral local gateway.
Expand Down Expand Up @@ -87,6 +88,7 @@ struct TransparentRun {
agent: CodingAgent,
prepared: PreparedRun,
resolved: ResolvedConfig,
dynamic_plugins: Vec<ActiveDynamicPluginComponent>,
listener: TcpListener,
gateway_url: String,
dry_run: bool,
Expand All @@ -99,7 +101,16 @@ impl TransparentRun {
async fn new(command: RunCommand, inherited: Option<&ServerArgs>) -> Result<Self, CliError> {
let dry_run = command.dry_run;
let print = command.print;
let explicit_config = command
.config
.as_ref()
.or_else(|| inherited.and_then(|args| args.config.as_ref()));
let mut resolved = resolve_run_config(&command, inherited)?;
let dynamic_plugins = if dry_run {
Vec::new()
} else {
crate::plugins::lifecycle::active_dynamic_plugin_components(explicit_config, &resolved)?
};
let (agent, argv) = resolve_agent_and_argv(&command, &resolved.agents)?;
let listener = TcpListener::bind("127.0.0.1:0").await?;
let address = listener.local_addr()?;
Expand All @@ -111,6 +122,7 @@ impl TransparentRun {
agent,
prepared,
resolved,
dynamic_plugins,
listener,
gateway_url,
dry_run,
Expand All @@ -134,9 +146,10 @@ impl TransparentRun {
}
self.prepared
.print_live_status(self.agent, &self.gateway_url, &self.resolved);
execute_live_run(
execute_live_run_with_dynamic(
self.listener,
self.resolved.gateway,
self.dynamic_plugins,
&self.gateway_url,
self.prepared,
)
Expand All @@ -146,13 +159,24 @@ impl TransparentRun {

// Starts the gateway, waits for readiness, runs the child command, restores temporary state, and then
// maps the child process status to the launcher's exit code.
#[cfg(test)]
async fn execute_live_run(
listener: TcpListener,
gateway_config: GatewayConfig,
gateway_url: &str,
prepared: PreparedRun,
) -> Result<ExitCode, CliError> {
let running_server = RunningGateway::start(listener, gateway_config);
execute_live_run_with_dynamic(listener, gateway_config, Vec::new(), gateway_url, prepared).await
}

async fn execute_live_run_with_dynamic(
listener: TcpListener,
gateway_config: GatewayConfig,
dynamic_plugins: Vec<ActiveDynamicPluginComponent>,
gateway_url: &str,
prepared: PreparedRun,
) -> Result<ExitCode, CliError> {
let running_server = RunningGateway::start(listener, gateway_config, dynamic_plugins);
if let Err(error) = wait_for_health(gateway_url).await {
let restore = prepared.restore();
let server_result = running_server.stop().await;
Expand Down Expand Up @@ -269,10 +293,20 @@ struct RunningGateway {
impl RunningGateway {
// Starts the gateway listener on a background task and keeps the shutdown sender paired with the
// task handle so health failures and normal exits use identical cleanup semantics.
fn start(listener: TcpListener, config: crate::config::GatewayConfig) -> Self {
fn start(
listener: TcpListener,
config: crate::config::GatewayConfig,
dynamic_plugins: Vec<ActiveDynamicPluginComponent>,
) -> Self {
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let task = tokio::spawn(async move {
server::serve_listener(listener, config, Some(shutdown_rx)).await
server::serve_listener_with_dynamic(
listener,
config,
dynamic_plugins,
Some(shutdown_rx),
)
.await
});
Self { shutdown_tx, task }
}
Expand Down
8 changes: 6 additions & 2 deletions crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,12 @@ async fn run_default(server_args: &ServerArgs) -> Result<ExitCode, error::CliErr
// exists. Once configured, bare `nemo-relay` becomes a quick health check; explicit
// `nemo-relay config` remains the reconfiguration path.
if server_args.requested_daemon_mode() {
let config = config::resolve_server_config(server_args)?;
server::serve(config.gateway).await?;
let resolved = config::resolve_server_config(server_args)?;
let dynamic_plugins = plugins::lifecycle::active_dynamic_plugin_components(
server_args.config.as_ref(),
&resolved,
)?;
server::serve_with_dynamic(resolved.gateway, dynamic_plugins).await?;
Ok(ExitCode::SUCCESS)
} else if config::any_config_file_exists() {
doctor::run_doctor(None, false).await
Expand Down
55 changes: 52 additions & 3 deletions crates/cli/src/plugins/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use std::path::PathBuf;
use std::process::ExitCode;

use nemo_relay::plugin::dynamic::{
DynamicPluginCheckState, DynamicPluginCompatibility, DynamicPluginLoadContract,
DynamicPluginManifest, DynamicPluginRecord, DynamicPluginValidationStatus,
DynamicPluginCheckState, DynamicPluginCompatibility, DynamicPluginKind,
DynamicPluginLoadContract, DynamicPluginManifest, DynamicPluginRecord,
DynamicPluginValidationStatus,
};
use serde_json::Value;
use serde_json::{Map, Value};

use crate::config::{
PluginsAddCommand, PluginsDisableCommand, PluginsEnableCommand, PluginsInspectCommand,
Expand Down Expand Up @@ -339,6 +340,54 @@ pub(crate) fn remove(command: PluginsRemoveCommand, server: &ServerArgs) -> Resu
Ok(())
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct ActiveDynamicPluginComponent {
pub(crate) plugin_id: String,
pub(crate) kind: DynamicPluginKind,
pub(crate) manifest_ref: Option<String>,
pub(crate) config: Map<String, Value>,
}

pub(crate) fn active_dynamic_plugin_components(
explicit: Option<&PathBuf>,
resolved: &ResolvedConfig,
) -> Result<Vec<ActiveDynamicPluginComponent>, CliError> {
let scopes = load_and_hydrate_scopes(explicit, resolved)?;
let host_config_by_id = host_config_by_id(resolved);
let mut components = Vec::new();

for resolved_plugin in &resolved.dynamic_plugins {
let Some(entry) = find_record_by_id(&scopes, &resolved_plugin.plugin_id)? else {
return Err(CliError::Config(format!(
"dynamic plugin '{}' is present in resolved config but not lifecycle state",
resolved_plugin.plugin_id
)));
};
if entry.record.is_tombstoned() || !entry.record.spec.enabled {
continue;
}
let host_config = host_config_by_id
.get(&entry.record.metadata.id)
.ok_or_else(|| {
CliError::Config(format!(
"dynamic plugin '{}' is enabled but has no resolved host config",
entry.record.metadata.id
))
})?;
components.push(ActiveDynamicPluginComponent {
plugin_id: entry.record.metadata.id.clone(),
kind: entry.record.metadata.kind,
manifest_ref: match entry.record.metadata.kind {
DynamicPluginKind::RustDynamic => Some(manifest_ref_from_record(&entry.record)?),
DynamicPluginKind::Worker => entry.record.source.manifest_ref.clone(),
},
config: host_config.config.clone(),
Comment thread
coderabbitai[bot] marked this conversation as resolved.
});
}

Ok(components)
}

fn mutate_enabled_state(
plugin_id: String,
server: &ServerArgs,
Expand Down
104 changes: 87 additions & 17 deletions crates/cli/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ use axum::extract::{DefaultBodyLimit, State};
use axum::http::HeaderMap;
use axum::routing::{get, post};
use axum::{Json, Router};
use nemo_relay::plugin::{PluginConfig, clear_plugin_configuration, initialize_plugins_exact};
use nemo_relay::plugin::dynamic::{
DynamicPluginKind, NativePluginActivation, NativePluginLoadSpec, load_native_plugins,
};
use nemo_relay::plugin::{
PluginComponentSpec, PluginConfig, clear_plugin_configuration, initialize_plugins_exact,
};
use nemo_relay_adaptive::plugin_component::register_adaptive_component;
use nemo_relay_pii_redaction::component::register_pii_redaction_component;
use reqwest::Client;
Expand All @@ -21,6 +26,7 @@ use crate::adapters::{claude_code, codex, cursor, hermes};
use crate::config::GatewayConfig;
use crate::error::CliError;
use crate::gateway;
use crate::plugins::lifecycle::ActiveDynamicPluginComponent;
use crate::session::SessionManager;

const HTTP_CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
Expand All @@ -35,11 +41,11 @@ pub(crate) struct AppState {
pub(crate) last_activity: Arc<Mutex<Instant>>,
}

/// Binds the configured address and serves until the process is stopped.
///
/// Tests and transparent run mode use `serve_listener` directly so they can supply an already
/// bound ephemeral listener and optional shutdown channel.
pub(crate) async fn serve(config: GatewayConfig) -> Result<(), CliError> {
/// Binds the configured address and activates enabled dynamic plugins before serving.
pub(crate) async fn serve_with_dynamic(
config: GatewayConfig,
dynamic_plugins: Vec<ActiveDynamicPluginComponent>,
) -> Result<(), CliError> {
Comment thread
coderabbitai[bot] marked this conversation as resolved.
let listener = TcpListener::bind(config.bind).await.map_err(|err| {
// Translate the common bind-failure (port already in use) into an actionable message.
// Plain `io error: Address already in use (os error 48)` is unhelpful; the friendly
Expand All @@ -58,19 +64,31 @@ pub(crate) async fn serve(config: GatewayConfig) -> Result<(), CliError> {
CliError::Io(err)
}
})?;
serve_listener(listener, config, None).await
serve_listener_with_dynamic(listener, config, dynamic_plugins, None).await
}

/// Serves the gateway router on a caller-owned listener with optional graceful shutdown.
///
/// A provided shutdown receiver is best-effort: the send side may be dropped after the child agent
/// exits, and either receiving or channel closure is enough to let Axum drain the listener.
#[cfg(test)]
pub(crate) async fn serve_listener(
listener: TcpListener,
config: GatewayConfig,
shutdown: Option<oneshot::Receiver<()>>,
) -> Result<(), CliError> {
let plugin_activation = PluginActivation::initialize(config.plugin_config.clone()).await?;
serve_listener_with_dynamic(listener, config, Vec::new(), shutdown).await
}

/// Serves the gateway router and activates enabled dynamic plugin components.
pub(crate) async fn serve_listener_with_dynamic(
listener: TcpListener,
config: GatewayConfig,
dynamic_plugins: Vec<ActiveDynamicPluginComponent>,
shutdown: Option<oneshot::Receiver<()>>,
) -> Result<(), CliError> {
let plugin_activation =
PluginActivation::initialize(config.plugin_config.clone(), dynamic_plugins).await?;
let state = AppState::new(config);
let sessions = state.sessions.clone();
let last_activity = state.last_activity.clone();
Expand Down Expand Up @@ -197,35 +215,87 @@ async fn idle_shutdown_future(

struct PluginActivation {
active: bool,
native: Option<NativePluginActivation>,
}

impl PluginActivation {
async fn initialize(config: Option<Value>) -> Result<Self, CliError> {
let Some(config) = config else {
return Ok(Self { active: false });
async fn initialize(
config: Option<Value>,
dynamic_plugins: Vec<ActiveDynamicPluginComponent>,
) -> Result<Self, CliError> {
if config.is_none() && dynamic_plugins.is_empty() {
return Ok(Self {
active: false,
native: None,
});
};
register_adaptive_component().map_err(|error| {
CliError::Config(format!("adaptive plugin registration failed: {error}"))
})?;
register_pii_redaction_component().map_err(|error| {
CliError::Config(format!("PII redaction plugin registration failed: {error}"))
})?;
let native_specs = dynamic_plugins
.iter()
.filter(|plugin| plugin.kind == DynamicPluginKind::RustDynamic)
.map(|plugin| {
let manifest_ref = plugin.manifest_ref.clone().ok_or_else(|| {
CliError::Config(format!(
"native dynamic plugin '{}' has no manifest_ref in lifecycle state",
plugin.plugin_id
))
})?;
Ok(NativePluginLoadSpec {
plugin_id: plugin.plugin_id.clone(),
manifest_ref,
})
})
.collect::<Result<Vec<_>, CliError>>()?;
let native =
if native_specs.is_empty() {
None
} else {
Some(load_native_plugins(native_specs).map_err(|error| {
CliError::Config(format!("native plugin load failed: {error}"))
})?)
};
// Gateway already resolved its config; activate exactly (no re-discovery).
let plugin_config: PluginConfig = serde_json::from_value(config)
.map_err(|error| CliError::Config(format!("invalid plugin config: {error}")))?;
let mut plugin_config: PluginConfig = match config {
Some(config) => serde_json::from_value(config)
.map_err(|error| CliError::Config(format!("invalid plugin config: {error}")))?,
None => PluginConfig::default(),
};
plugin_config
.components
.extend(
dynamic_plugins
.into_iter()
.map(|plugin| PluginComponentSpec {
kind: plugin.plugin_id,
enabled: true,
config: plugin.config,
}),
);
initialize_plugins_exact(plugin_config)
.await
.map_err(|error| CliError::Config(format!("plugin activation failed: {error}")))?;
Ok(Self { active: true })
Ok(Self {
active: true,
native,
})
}

fn clear(mut self) -> Result<(), CliError> {
if self.active {
let result = if self.active {
self.active = false;
clear_plugin_configuration()
.map_err(|error| CliError::Config(format!("plugin teardown failed: {error}")))?;
}
Ok(())
Ok(())
} else {
Ok(())
};
self.native.take();
result
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
}

Expand Down
Loading
Loading