diff --git a/crates/iceberg/src/encryption/crypto.rs b/crates/iceberg/src/encryption/crypto.rs index 0b34580db8..0f6a9eff43 100644 --- a/crates/iceberg/src/encryption/crypto.rs +++ b/crates/iceberg/src/encryption/crypto.rs @@ -43,7 +43,7 @@ use crate::{Error, ErrorKind, Result}; /// containing `SensitiveBytes` can safely derive or implement `Debug` /// without risk of leaking key material. #[derive(Clone, PartialEq, Eq)] -struct SensitiveBytes(Zeroizing>); +pub struct SensitiveBytes(Zeroizing>); impl SensitiveBytes { /// Wraps the given bytes as sensitive material. @@ -57,13 +57,11 @@ impl SensitiveBytes { } /// Returns the number of bytes. - #[allow(dead_code)] // Encryption work is ongoing so currently unused pub fn len(&self) -> usize { self.0.len() } /// Returns `true` if the byte slice is empty. - #[allow(dead_code)] // Encryption work is ongoing so currently unused pub fn is_empty(&self) -> bool { self.0.is_empty() } @@ -85,9 +83,10 @@ impl fmt::Display for SensitiveBytes { /// /// The Iceberg spec supports 128, 192, and 256-bit keys for AES-GCM. /// See: -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] pub enum AesKeySize { - /// 128-bit AES key (16 bytes) + /// 128-bit AES key (16 bytes). Default per the Iceberg spec. + #[default] Bits128 = 128, /// 192-bit AES key (24 bytes) Bits192 = 192, diff --git a/crates/iceberg/src/encryption/kms/client.rs b/crates/iceberg/src/encryption/kms/client.rs new file mode 100644 index 0000000000..85cd511758 --- /dev/null +++ b/crates/iceberg/src/encryption/kms/client.rs @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Key management client trait for encryption key operations. +//! +//! Mirrors the Java `KeyManagementClient` interface from the Apache Iceberg spec. + +use async_trait::async_trait; + +use crate::Result; +use crate::encryption::SensitiveBytes; + +/// Result of a server-side key generation operation. +/// +/// Returned by [`KeyManagementClient::generate_key`] when the KMS supports +/// atomic key generation and wrapping. +pub struct GeneratedKey { + key: SensitiveBytes, + wrapped_key: Vec, +} + +impl GeneratedKey { + /// Creates a new `GeneratedKey` from plaintext key bytes and wrapped key bytes. + pub fn new(key: SensitiveBytes, wrapped_key: Vec) -> Self { + Self { key, wrapped_key } + } + + /// Returns the plaintext key bytes. Zeroized on drop, redacted in Debug. + pub fn key(&self) -> &SensitiveBytes { + &self.key + } + + /// Returns the wrapped (encrypted) key bytes. + pub fn wrapped_key(&self) -> &[u8] { + &self.wrapped_key + } +} + +/// Pluggable interface for key management systems (AWS KMS, Azure Key Vault, etc.). +#[async_trait] +pub trait KeyManagementClient: Send + Sync + std::fmt::Debug { + /// Wrap (encrypt) a key using a wrapping key managed by the KMS. + async fn wrap_key(&self, key: &[u8], wrapping_key_id: &str) -> Result>; + + /// Unwrap (decrypt) a previously wrapped key. + async fn unwrap_key(&self, wrapped_key: &[u8], wrapping_key_id: &str) + -> Result; + + /// Whether this KMS supports server-side key generation. + /// + /// If `true`, callers can use [`generate_key`](Self::generate_key) for atomic + /// key generation and wrapping, which is more secure than generating a key + /// locally and then wrapping it. + fn supports_key_generation(&self) -> bool; + + /// Generate a new key and wrap it atomically on the server side. + /// + /// This is only supported when [`supports_key_generation`](Self::supports_key_generation) + /// returns `true`. + async fn generate_key(&self, wrapping_key_id: &str) -> Result; +} + +#[async_trait] +impl + Send + Sync + std::fmt::Debug> KeyManagementClient for T { + async fn wrap_key(&self, key: &[u8], wrapping_key_id: &str) -> Result> { + self.as_ref().wrap_key(key, wrapping_key_id).await + } + + async fn unwrap_key( + &self, + wrapped_key: &[u8], + wrapping_key_id: &str, + ) -> Result { + self.as_ref().unwrap_key(wrapped_key, wrapping_key_id).await + } + + fn supports_key_generation(&self) -> bool { + self.as_ref().supports_key_generation() + } + + async fn generate_key(&self, wrapping_key_id: &str) -> Result { + self.as_ref().generate_key(wrapping_key_id).await + } +} diff --git a/crates/iceberg/src/encryption/kms/memory.rs b/crates/iceberg/src/encryption/kms/memory.rs new file mode 100644 index 0000000000..d4c9c8cedf --- /dev/null +++ b/crates/iceberg/src/encryption/kms/memory.rs @@ -0,0 +1,290 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! In-memory KMS implementation for testing and development. +//! +//! **WARNING**: This implementation is NOT suitable for production use. +//! Keys are stored in memory only and will be lost when the process exits. + +use std::collections::HashMap; +use std::fmt; +use std::sync::{Arc, RwLock}; + +use async_trait::async_trait; + +use super::KeyManagementClient; +use crate::encryption::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; +use crate::error::lock_error; +use crate::{Error, ErrorKind, Result}; + +/// In-memory KMS for testing. Not suitable for production use. +/// +/// ``` +/// use iceberg::encryption::KeyManagementClient; +/// use iceberg::encryption::kms::MemoryKeyManagementClient; +/// +/// # async fn example() -> iceberg::Result<()> { +/// let kms = MemoryKeyManagementClient::new(); +/// kms.add_master_key("my-master-key")?; +/// +/// let dek = vec![0u8; 16]; +/// let wrapped = kms.wrap_key(&dek, "my-master-key").await?; +/// let unwrapped = kms.unwrap_key(&wrapped, "my-master-key").await?; +/// assert_eq!(dek.as_slice(), unwrapped.as_bytes()); +/// # Ok(()) +/// # } +/// ``` +#[derive(Clone, Default)] +pub struct MemoryKeyManagementClient { + master_keys: Arc>>, + master_key_size: AesKeySize, +} + +impl fmt::Debug for MemoryKeyManagementClient { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MemoryKeyManagementClient") + .field("master_key_size", &self.master_key_size) + .field("key_count", &self.key_count()) + .finish() + } +} + +impl MemoryKeyManagementClient { + /// Creates a new in-memory KMS with 128-bit AES keys. + pub fn new() -> Self { + Self::default() + } + + /// Creates a new in-memory KMS with the specified master key size. + pub fn with_master_key_size(master_key_size: AesKeySize) -> Self { + Self { + master_keys: Arc::new(RwLock::new(HashMap::new())), + master_key_size, + } + } + + /// Adds a randomly generated master key with the given ID. + pub fn add_master_key(&self, key_id: impl Into) -> Result<()> { + let key = SecureKey::generate(self.master_key_size); + self.insert_key(key_id.into(), SensitiveBytes::new(key.as_bytes())) + } + + /// Adds a master key with explicit key bytes. + /// + /// Use this to seed the KMS with known key material, e.g. for + /// cross-language integration tests where both Java and Rust must + /// share the same master key bytes. + pub fn add_master_key_bytes( + &self, + key_id: impl Into, + key_bytes: SensitiveBytes, + ) -> Result<()> { + let _ = SecureKey::new(key_bytes.as_bytes())?; + self.insert_key(key_id.into(), key_bytes) + } + + fn insert_key(&self, key_id: String, key: SensitiveBytes) -> Result<()> { + let mut keys = self.master_keys.write().map_err(lock_error)?; + + if keys.contains_key(&key_id) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Master key already exists: {key_id}"), + )); + } + + keys.insert(key_id, key); + Ok(()) + } + + fn get_master_key(&self, key_id: &str) -> Result { + let keys = self.master_keys.read().map_err(lock_error)?; + + keys.get(key_id).cloned().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Master key not found: {key_id}"), + ) + }) + } + + /// Number of master keys stored. + pub fn key_count(&self) -> usize { + self.master_keys.read().map(|keys| keys.len()).unwrap_or(0) + } + + /// Whether a master key with the given ID exists. + pub fn has_key(&self, key_id: &str) -> bool { + self.master_keys + .read() + .map(|keys| keys.contains_key(key_id)) + .unwrap_or(false) + } +} + +#[async_trait] +impl KeyManagementClient for MemoryKeyManagementClient { + async fn wrap_key(&self, key: &[u8], wrapping_key_id: &str) -> Result> { + let master_key_bytes = self.get_master_key(wrapping_key_id)?; + let master_key = SecureKey::new(master_key_bytes.as_bytes())?; + let cipher = AesGcmCipher::new(master_key); + + cipher.encrypt(key, None) + } + + async fn unwrap_key( + &self, + wrapped_key: &[u8], + wrapping_key_id: &str, + ) -> Result { + let master_key_bytes = self.get_master_key(wrapping_key_id)?; + let master_key = SecureKey::new(master_key_bytes.as_bytes())?; + let cipher = AesGcmCipher::new(master_key); + + Ok(SensitiveBytes::new(cipher.decrypt(wrapped_key, None)?)) + } + + fn supports_key_generation(&self) -> bool { + false + } + + async fn generate_key(&self, _wrapping_key_id: &str) -> Result { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "MemoryKeyManagementClient does not support server-side key generation", + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_wrap_unwrap_roundtrip() { + let kms = MemoryKeyManagementClient::new(); + kms.add_master_key("master-1").unwrap(); + let dek = vec![0u8; 16]; + + let wrapped = kms.wrap_key(&dek, "master-1").await.unwrap(); + let unwrapped = kms.unwrap_key(&wrapped, "master-1").await.unwrap(); + assert_eq!(unwrapped.as_bytes(), dek.as_slice()); + } + + #[tokio::test] + async fn test_wrap_unknown_key_fails() { + let kms = MemoryKeyManagementClient::new(); + let dek = vec![0u8; 16]; + + let result = kms.wrap_key(&dek, "nonexistent").await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_wrong_master_key_fails_unwrap() { + let kms = MemoryKeyManagementClient::new(); + kms.add_master_key("master-1").unwrap(); + kms.add_master_key("master-2").unwrap(); + let dek = vec![0u8; 16]; + + let wrapped = kms.wrap_key(&dek, "master-1").await.unwrap(); + + let result = kms.unwrap_key(&wrapped, "master-2").await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_does_not_support_key_generation() { + let kms = MemoryKeyManagementClient::new(); + assert!(!kms.supports_key_generation()); + + let result = kms.generate_key("master-1").await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_multiple_master_keys() { + let kms = MemoryKeyManagementClient::new(); + kms.add_master_key("master-1").unwrap(); + kms.add_master_key("master-2").unwrap(); + let dek1 = vec![1u8; 16]; + let dek2 = vec![2u8; 16]; + + let wrapped1 = kms.wrap_key(&dek1, "master-1").await.unwrap(); + let wrapped2 = kms.wrap_key(&dek2, "master-2").await.unwrap(); + + let unwrapped1 = kms.unwrap_key(&wrapped1, "master-1").await.unwrap(); + let unwrapped2 = kms.unwrap_key(&wrapped2, "master-2").await.unwrap(); + + assert_eq!(unwrapped1.as_bytes(), dek1.as_slice()); + assert_eq!(unwrapped2.as_bytes(), dek2.as_slice()); + } + + #[tokio::test] + async fn test_add_master_key() { + let kms = MemoryKeyManagementClient::new(); + + kms.add_master_key("my-key").unwrap(); + assert!(kms.has_key("my-key")); + assert_eq!(kms.key_count(), 1); + + let result = kms.add_master_key("my-key"); + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_add_master_key_bytes() { + let kms = MemoryKeyManagementClient::new(); + let key_bytes = SensitiveBytes::new([42u8; 16]); + + kms.add_master_key_bytes("my-key", key_bytes).unwrap(); + assert!(kms.has_key("my-key")); + + let dek = vec![7u8; 16]; + let wrapped = kms.wrap_key(&dek, "my-key").await.unwrap(); + let unwrapped = kms.unwrap_key(&wrapped, "my-key").await.unwrap(); + assert_eq!(unwrapped.as_bytes(), dek.as_slice()); + } + + #[tokio::test] + async fn test_add_master_key_bytes_invalid_length() { + let kms = MemoryKeyManagementClient::new(); + + let result = kms.add_master_key_bytes("my-key", SensitiveBytes::new([0u8; 7])); + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_with_master_key_size() { + let kms = MemoryKeyManagementClient::with_master_key_size(AesKeySize::Bits256); + kms.add_master_key("master-256").unwrap(); + + let dek = vec![0u8; 16]; + let wrapped = kms.wrap_key(&dek, "master-256").await.unwrap(); + let unwrapped = kms.unwrap_key(&wrapped, "master-256").await.unwrap(); + assert_eq!(unwrapped.as_bytes(), dek.as_slice()); + } + + #[tokio::test] + async fn test_clone_shares_state() { + let kms1 = MemoryKeyManagementClient::new(); + let kms2 = kms1.clone(); + + kms1.add_master_key("shared-key").unwrap(); + assert!(kms2.has_key("shared-key")); + } +} diff --git a/crates/iceberg/src/encryption/kms/mod.rs b/crates/iceberg/src/encryption/kms/mod.rs new file mode 100644 index 0000000000..160e692550 --- /dev/null +++ b/crates/iceberg/src/encryption/kms/mod.rs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Key Management System trait and implementations. +//! +//! This module provides the [`KeyManagementClient`] trait for pluggable KMS +//! integration and implementations for different key management systems. + +mod client; +mod memory; + +pub use client::{GeneratedKey, KeyManagementClient}; +pub use memory::MemoryKeyManagementClient; diff --git a/crates/iceberg/src/encryption/mod.rs b/crates/iceberg/src/encryption/mod.rs index 9888a153c7..38edb72f53 100644 --- a/crates/iceberg/src/encryption/mod.rs +++ b/crates/iceberg/src/encryption/mod.rs @@ -17,15 +17,17 @@ //! Encryption module for Apache Iceberg. //! -//! This module provides core cryptographic primitives for encrypting -//! and decrypting data in Iceberg tables. +//! This module provides core cryptographic primitives and key management +//! for encrypting and decrypting data in Iceberg tables. mod crypto; mod file_decryptor; mod file_encryptor; +pub mod kms; mod stream; -pub use crypto::{AesGcmCipher, AesKeySize, SecureKey}; +pub use crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; pub use file_decryptor::AesGcmFileDecryptor; pub use file_encryptor::AesGcmFileEncryptor; +pub use kms::{GeneratedKey, KeyManagementClient}; pub use stream::{AesGcmFileRead, AesGcmFileWrite}; diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index a0399a8082..02c3eee8fc 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -18,6 +18,7 @@ use std::backtrace::{Backtrace, BacktraceStatus}; use std::fmt; use std::fmt::{Debug, Display, Formatter}; +use std::sync::PoisonError; use chrono::{DateTime, TimeZone as _, Utc}; @@ -422,6 +423,11 @@ define_from_err!( define_from_err!(std::io::Error, ErrorKind::Unexpected, "IO Operation failed"); +/// Converts a [`PoisonError`] from a poisoned lock into an [`Error`]. +pub(crate) fn lock_error(e: PoisonError) -> Error { + Error::new(ErrorKind::Unexpected, format!("Lock poisoned: {e}")) +} + /// Converts a timestamp in milliseconds to `DateTime`, handling errors. /// /// # Arguments