Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 140 additions & 9 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::borrow::Cow;
use std::num::NonZeroU8;
use std::str::FromStr;
use std::time::Duration;
use std::time::{Duration, Instant};
use std::{env, io};

use crate::auth::{
Expand All @@ -24,7 +24,6 @@ use axum_extra::TypedHeader;
use derive_more::From;
use futures::TryStreamExt;
use http::StatusCode;
use log::{info, warn};
use serde::Deserialize;
use spacetimedb::auth::identity::ConnectionAuthCtx;
use spacetimedb::database_logger::DatabaseLogger;
Expand All @@ -49,6 +48,7 @@ use spacetimedb_schema::auto_migrate::{
use tokio::sync::oneshot;
use tokio::time::error::Elapsed;
use tokio::time::timeout;
use tracing::{info, warn};

use super::subscribe::{handle_websocket, HasWebSocketOptions};

Expand Down Expand Up @@ -838,6 +838,15 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate + Authorization>(
};

let schema_migration_policy = schema_migration_policy(policy, token)?;
let publish_start = Instant::now();
info!(
database = %database_identity,
op = ?publish_op,
host_type = ?host_type,
num_replicas = ?num_replicas,
confirmation_timeout = ?confirmation_timeout,
"publishing database"
);
let maybe_updated = ctx
.publish_database(
&auth.claims.identity,
Expand All @@ -853,6 +862,13 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate + Authorization>(
)
.await
.map_err(log_and_500)?;
info!(
database = %database_identity,
op = ?publish_op,
result = update_database_result_name(&maybe_updated),
elapsed = ?publish_start.elapsed(),
"publish_database returned"
);

let success = || {
axum::Json(PublishResult::Success {
Expand All @@ -879,24 +895,139 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate + Authorization>(
durable_offset,
},
) => {
timeout(confirmation_timeout.min(MAX_UPDATE_CONFIRMATION_TIMEOUT), async {
let tx_offset = tx_offset.await?;
let confirmation_timeout = confirmation_timeout.min(MAX_UPDATE_CONFIRMATION_TIMEOUT);
let confirmation_start = Instant::now();
let initial_durable_offset = durable_offset.as_ref().and_then(|offset| offset.last_seen());
info!(
database = %database_identity,
op = ?publish_op,
confirmation_timeout = ?confirmation_timeout,
has_durable_offset = durable_offset.is_some(),
initial_durable_offset = ?initial_durable_offset,
"waiting for database update confirmation"
);
let confirmation_result = timeout(confirmation_timeout, async {
let tx_wait_start = Instant::now();
let tx_offset = match tx_offset.await {
Ok(tx_offset) => {
info!(
database = %database_identity,
op = ?publish_op,
tx_offset,
elapsed = ?tx_wait_start.elapsed(),
"database update tx offset confirmed"
);
tx_offset
}
Err(err) => {
warn!(
database = %database_identity,
op = ?publish_op,
elapsed = ?tx_wait_start.elapsed(),
error = %err,
"database update tx offset wait was cancelled"
);
return Err(UpdateConfirmationError::Cancelled(err));
}
};
if let Some(mut durable_offset) = durable_offset {
durable_offset.wait_for(tx_offset).await?;
let durable_wait_start = Instant::now();
let last_seen_before_wait = durable_offset.last_seen();
info!(
database = %database_identity,
op = ?publish_op,
tx_offset,
last_seen_durable_offset = ?last_seen_before_wait,
"waiting for database update durable offset"
);
match durable_offset.wait_for(tx_offset).await {
Ok(confirmed_durable_offset) => {
info!(
database = %database_identity,
op = ?publish_op,
tx_offset,
confirmed_durable_offset,
elapsed = ?durable_wait_start.elapsed(),
"database update durable offset confirmed"
);
}
Err(err) => {
warn!(
database = %database_identity,
op = ?publish_op,
tx_offset,
last_seen_durable_offset = ?durable_offset.last_seen(),
elapsed = ?durable_wait_start.elapsed(),
error = %err,
"database update durable offset wait failed"
);
return Err(UpdateConfirmationError::Crashed(err));
}
}
} else {
info!(
database = %database_identity,
op = ?publish_op,
tx_offset,
"database update has no durable offset to wait for"
);
}

Ok::<_, UpdateConfirmationError>(())
})
.await
.map_err(Into::into)
.flatten()?;
.await;

match confirmation_result {
Ok(Ok(())) => {
info!(
database = %database_identity,
op = ?publish_op,
elapsed = ?confirmation_start.elapsed(),
"database update confirmation completed"
);
}
Ok(Err(err)) => {
warn!(
database = %database_identity,
op = ?publish_op,
elapsed = ?confirmation_start.elapsed(),
error = ?err,
"database update confirmation failed"
);
return Err(err.into());
}
Err(err) => {
warn!(
database = %database_identity,
op = ?publish_op,
confirmation_timeout = ?confirmation_timeout,
elapsed = ?confirmation_start.elapsed(),
initial_durable_offset = ?initial_durable_offset,
"database update confirmation timed out"
);
return Err(UpdateConfirmationError::Timeout(err).into());
}
}

Ok(success())
}
}
}

#[derive(From)]
fn update_database_result_name(result: &Option<UpdateDatabaseResult>) -> &'static str {
match result {
None => "created",
Some(UpdateDatabaseResult::NoUpdateNeeded) => "no_update_needed",
Some(UpdateDatabaseResult::UpdatePerformed { .. }) => "update_performed",
Some(UpdateDatabaseResult::UpdatePerformedWithClientDisconnect { .. }) => {
"update_performed_with_client_disconnect"
}
Some(UpdateDatabaseResult::AutoMigrateError(_)) => "auto_migrate_error",
Some(UpdateDatabaseResult::ErrorExecutingMigration(_)) => "error_executing_migration",
}
}

#[derive(Debug, From)]
enum UpdateConfirmationError {
Cancelled(oneshot::error::RecvError),
Crashed(DurabilityExited),
Expand Down
63 changes: 62 additions & 1 deletion crates/smoketests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use std::process::{Command, Output, Stdio};
use std::sync::OnceLock;
use std::time::Instant;
use std::time::{Duration, Instant};
use which::which;

/// Returns the remote server URL if running against a remote server.
Expand Down Expand Up @@ -1166,6 +1166,12 @@ log = "0.4"
// Now publish with --bin-path to skip rebuild
let publish_start = Instant::now();
let mut args = vec!["publish", "--server", &self.server_url, "--bin-path", &wasm_path_str];
eprintln!(
"[SMOKETEST] publish start: server={} module={} target={}",
self.server_url,
self.module_name,
name.unwrap_or("<new>")
);

if opts.force {
args.push("--yes");
Expand Down Expand Up @@ -1223,6 +1229,10 @@ log = "0.4"
/// Arguments are passed directly to the CLI as strings.
pub fn call(&self, name: &str, args: &[&str]) -> Result<String> {
let identity = self.database_identity.as_ref().context("No database published")?;
eprintln!(
"[SMOKETEST] call start: server={} database={} reducer={} args={:?}",
self.server_url, identity, name, args
);

let mut cmd_args = vec!["call", "--server", &self.server_url, "--", identity.as_str(), name];
cmd_args.extend(args);
Expand Down Expand Up @@ -1282,13 +1292,21 @@ log = "0.4"
/// Executes a SQL query against the database.
pub fn sql(&self, query: &str) -> Result<String> {
let identity = self.database_identity.as_ref().context("No database published")?;
eprintln!(
"[SMOKETEST] sql start: server={} database={} query={}",
self.server_url, identity, query
);

self.spacetime(&["sql", "--server", &self.server_url, identity.as_str(), query])
}

/// Executes a SQL query with the --confirmed flag.
pub fn sql_confirmed(&self, query: &str) -> Result<String> {
let identity = self.database_identity.as_ref().context("No database published")?;
eprintln!(
"[SMOKETEST] confirmed sql start: server={} database={} query={}",
self.server_url, identity, query
);

self.spacetime(&[
"sql",
Expand Down Expand Up @@ -1317,6 +1335,49 @@ log = "0.4"
);
}

/// Asserts that a SQL query eventually produces the expected output.
///
/// Use this only for read-only queries after operations that can briefly
/// leave the database worker unavailable, such as publishing an update.
pub fn assert_sql_eventually(&self, query: &str, expected: &str) {
let expected_normalized = normalize_whitespace(expected);
let deadline = Instant::now() + Duration::from_secs(10);
let mut last_actual = None;
let mut last_error = None;

while Instant::now() < deadline {
match self.sql(query) {
Ok(actual) => {
let actual_normalized = normalize_whitespace(&actual);
if actual_normalized == expected_normalized {
return;
}
last_actual = Some(actual_normalized);
last_error = None;
}
Err(err) => {
last_error = Some(err.to_string());
}
}

std::thread::sleep(Duration::from_millis(200));
}

if let Some(actual_normalized) = last_actual {
assert_eq!(
actual_normalized, expected_normalized,
"SQL output mismatch for query after retry: {}\n\nExpected:\n{}\n\nActual:\n{}",
query, expected_normalized, actual_normalized
);
}

panic!(
"SQL query failed after retry: {}\n\nLast error:\n{}",
query,
last_error.unwrap_or_else(|| "no attempts completed".to_string())
);
}

/// Fetches the last N log entries from the database.
pub fn logs(&self, n: usize) -> Result<Vec<String>> {
let records = self.log_records(n)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/smoketests/tests/smoketests/views.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ fn test_views_auto_migration() {
test.use_precompiled_module("views-auto-migrate-updated");
test.publish_module_clear(false).unwrap();

test.assert_sql(
test.assert_sql_eventually(
"SELECT * FROM player",
r#" id | level
----+-------
Expand Down
Loading