From 4cd10a03fb1b40cc6f78492646882c3555d82542 Mon Sep 17 00:00:00 2001 From: Liebing Date: Mon, 2 Mar 2026 11:30:14 +0800 Subject: [PATCH 1/4] [server] Add configuration options for multiple remote data directories --- .../apache/fluss/config/ConfigOptions.java | 54 ++++++++- .../apache/fluss/config/FlussConfigUtils.java | 109 ++++++++++++++++++ .../org/apache/fluss/utils/FlussPaths.java | 17 +++ .../fluss/config/FlussConfigUtilsTest.java | 107 +++++++++++++++++ .../server/coordinator/CoordinatorServer.java | 32 +---- .../fluss/server/tablet/TabletServer.java | 39 +------ website/docs/maintenance/configuration.md | 3 + 7 files changed, 294 insertions(+), 67 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 08742ebc2e..c00dd49905 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -97,7 +97,53 @@ public class ConfigOptions { .noDefaultValue() .withDescription( "The directory used for storing the kv snapshot data files and remote log for log tiered storage " - + " in a Fluss supported filesystem."); + + "in a Fluss supported filesystem."); + + public static final ConfigOption> REMOTE_DATA_DIRS = + key("remote.data.dirs") + .stringType() + .asList() + .defaultValues() + .withDescription( + "The directories used for storing the kv snapshot data files and remote log for log tiered storage " + + "in a Fluss supported filesystem. " + + "This should be a comma-separated list of remote URIs. " + + "If not configured, it defaults to the path specified in `" + + REMOTE_DATA_DIR.key() + + "`. Otherwise, one of the paths from this configuration will be used."); + + public static final ConfigOption REMOTE_DATA_DIRS_STRATEGY = + key("remote.data.dirs.strategy") + .enumType(RemoteDataDirStrategy.class) + .defaultValue(RemoteDataDirStrategy.ROUND_ROBIN) + .withDescription( + String.format( + "The strategy for selecting the remote data directory from `%s`. " + + "The candidate strategies are: %s, the default strategy is %s.\n" + + "%s: this strategy employs a round-robin approach to select one from the available remote directories.\n" + + "%s: this strategy selects one of the available remote directories based on the weights configured in `remote.data.dirs.weights`.", + REMOTE_DATA_DIRS.key(), + Arrays.toString(RemoteDataDirStrategy.values()), + RemoteDataDirStrategy.ROUND_ROBIN, + RemoteDataDirStrategy.ROUND_ROBIN, + RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN)); + + public static final ConfigOption> REMOTE_DATA_DIRS_WEIGHTS = + key("remote.data.dirs.weights") + .intType() + .asList() + .defaultValues() + .withDescription( + "The weights of the remote data directories. " + + "This is a list of weights corresponding to the `" + + REMOTE_DATA_DIRS.key() + + "` in the same order. When `" + + REMOTE_DATA_DIRS_STRATEGY.key() + + "` is set to `" + + RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN + + "`, this must be configured, and its size must be equal to `" + + REMOTE_DATA_DIRS.key() + + "`; otherwise, it will be ignored."); public static final ConfigOption REMOTE_FS_WRITE_BUFFER_SIZE = key("remote.fs.write-buffer-size") @@ -2066,4 +2112,10 @@ private static class ConfigOptionsHolder { public static ConfigOption getConfigOption(String key) { return ConfigOptionsHolder.CONFIG_OPTIONS_BY_KEY.get(key); } + + /** Remote data dir select strategy for Fluss. */ + public enum RemoteDataDirStrategy { + ROUND_ROBIN, + WEIGHTED_ROUND_ROBIN + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java index 08b97256c4..9891a58df4 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java @@ -19,12 +19,16 @@ import org.apache.fluss.annotation.Internal; import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.exception.IllegalConfigurationException; +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.utils.FlussPaths; import java.lang.reflect.Field; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; /** Utilities of Fluss {@link ConfigOptions}. */ @Internal @@ -77,4 +81,109 @@ static Map> extractConfigOptions(String prefix) { } return options; } + + public static void validateCoordinatorConfigs(Configuration conf) { + validServerConfigs(conf); + + validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1); + validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1); + validMinValue(conf, ConfigOptions.SERVER_IO_POOL_SIZE, 1); + + // Validate remote.data.dirs + List remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS); + for (int i = 0; i < remoteDataDirs.size(); i++) { + String remoteDataDir = remoteDataDirs.get(i); + try { + new FsPath(remoteDataDir); + } catch (Exception e) { + throw new IllegalConfigurationException( + String.format( + "Invalid remote path for %s at index %d.", + ConfigOptions.REMOTE_DATA_DIRS.key(), i), + e); + } + } + + // Validate remote.data.dirs.strategy + ConfigOptions.RemoteDataDirStrategy remoteDataDirStrategy = + conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY); + if (remoteDataDirStrategy == ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN) { + List weights = conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS); + if (!remoteDataDirs.isEmpty() && !weights.isEmpty()) { + if (remoteDataDirs.size() != weights.size()) { + throw new IllegalConfigurationException( + String.format( + "The size of '%s' (%d) must match the size of '%s' (%d) when using WEIGHTED_ROUND_ROBIN strategy.", + ConfigOptions.REMOTE_DATA_DIRS.key(), + remoteDataDirs.size(), + ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(), + weights.size())); + } + // Validate all weights are positive + for (int i = 0; i < weights.size(); i++) { + if (weights.get(i) < 0) { + throw new IllegalConfigurationException( + String.format( + "All weights in '%s' must be no less than 0, but found %d at index %d.", + ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(), + weights.get(i), + i)); + } + } + } + } + } + + public static void validateTabletConfigs(Configuration conf) { + validServerConfigs(conf); + + Optional serverId = conf.getOptional(ConfigOptions.TABLET_SERVER_ID); + if (!serverId.isPresent()) { + throw new IllegalConfigurationException( + String.format("Configuration %s must be set.", ConfigOptions.TABLET_SERVER_ID)); + } + validMinValue(ConfigOptions.TABLET_SERVER_ID, serverId.get(), 0); + + validMinValue(conf, ConfigOptions.BACKGROUND_THREADS, 1); + + if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > Integer.MAX_VALUE) { + throw new IllegalConfigurationException( + String.format( + "Invalid configuration for %s, it must be less than or equal %d bytes.", + ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE)); + } + } + + /** Validate common server configs. */ + private static void validServerConfigs(Configuration conf) { + if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) { + throw new IllegalConfigurationException( + String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR)); + } else { + // Must validate that remote.data.dir is a valid FsPath + try { + FlussPaths.remoteDataDir(conf); + } catch (Exception e) { + throw new IllegalConfigurationException( + String.format( + "Invalid configuration for %s.", + ConfigOptions.REMOTE_DATA_DIR.key()), + e); + } + } + } + + private static void validMinValue( + Configuration conf, ConfigOption option, int minValue) { + validMinValue(option, conf.get(option), minValue); + } + + private static void validMinValue(ConfigOption option, int value, int minValue) { + if (value < minValue) { + throw new IllegalConfigurationException( + String.format( + "Invalid configuration for %s, it must be greater than or equal %d.", + option.key(), minValue)); + } + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java b/fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java index 9a0659f180..d6d3978964 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java @@ -409,6 +409,23 @@ public static UUID uuidFromRemoteIndexCacheFileName(String fileName) { fileName.substring(fileName.indexOf('_') + 1, fileName.indexOf('.'))); } + // ---------------------------------------------------------------------------------------- + // Remote Data Paths + // ---------------------------------------------------------------------------------------- + + /** + * Returns the remote root directory path for storing data files. + * + *

The path contract: + * + *

+     * {$remote.data.dir}
+     * 
+ */ + public static FsPath remoteDataDir(Configuration conf) { + return new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR)); + } + // ---------------------------------------------------------------------------------------- // Remote Log Paths // ---------------------------------------------------------------------------------------- diff --git a/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java b/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java index e24bc121aa..7ff0d69e06 100644 --- a/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java @@ -17,14 +17,20 @@ package org.apache.fluss.config; +import org.apache.fluss.exception.IllegalConfigurationException; + import org.junit.jupiter.api.Test; +import java.util.Arrays; +import java.util.Collections; import java.util.Map; import static org.apache.fluss.config.FlussConfigUtils.CLIENT_OPTIONS; import static org.apache.fluss.config.FlussConfigUtils.TABLE_OPTIONS; import static org.apache.fluss.config.FlussConfigUtils.extractConfigOptions; +import static org.apache.fluss.config.FlussConfigUtils.validateCoordinatorConfigs; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link FlussConfigUtils}. */ class FlussConfigUtilsTest { @@ -49,4 +55,105 @@ void testExtractOptions() { }); assertThat(clientOptions.size()).isEqualTo(CLIENT_OPTIONS.size()); } + + @Test + void testValidateCoordinatorConfigs() { + // Test valid configuration + Configuration validConf = new Configuration(); + validConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + validateCoordinatorConfigs(validConf); + + // Test invalid DEFAULT_REPLICATION_FACTOR + Configuration invalidReplicationConf = new Configuration(); + invalidReplicationConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + invalidReplicationConf.set(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 0); + assertThatThrownBy(() -> validateCoordinatorConfigs(invalidReplicationConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.DEFAULT_REPLICATION_FACTOR.key()) + .hasMessageContaining("must be greater than or equal 1"); + + // Test invalid KV_MAX_RETAINED_SNAPSHOTS + Configuration invalidSnapshotConf = new Configuration(); + invalidSnapshotConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + invalidSnapshotConf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 0); + assertThatThrownBy(() -> validateCoordinatorConfigs(invalidSnapshotConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS.key()) + .hasMessageContaining("must be greater than or equal 1"); + + // Test invalid SERVER_IO_POOL_SIZE + Configuration invalidIoPoolConf = new Configuration(); + invalidIoPoolConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + invalidIoPoolConf.set(ConfigOptions.SERVER_IO_POOL_SIZE, 0); + assertThatThrownBy(() -> validateCoordinatorConfigs(invalidIoPoolConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.SERVER_IO_POOL_SIZE.key()) + .hasMessageContaining("must be greater than or equal 1"); + + // Test REMOTE_DATA_DIR not set + Configuration noRemoteDirConf = new Configuration(); + assertThatThrownBy(() -> validateCoordinatorConfigs(noRemoteDirConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIR.key()) + .hasMessageContaining("must be set"); + + // Test invalid REMOTE_DATA_DIR + Configuration invalidRemoteDirConf = new Configuration(); + invalidRemoteDirConf.set(ConfigOptions.REMOTE_DATA_DIR, "123://invalid.com"); + assertThatThrownBy(() -> validateCoordinatorConfigs(invalidRemoteDirConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIR.key()) + .hasMessageContaining("Invalid configuration for remote.data.dir"); + + // Test REMOTE_DATA_DIRS contains invalid path + Configuration invalidRemoteDirsConf = new Configuration(); + invalidRemoteDirsConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + invalidRemoteDirsConf.set( + ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "123://invalid.com")); + assertThatThrownBy(() -> validateCoordinatorConfigs(invalidRemoteDirsConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key()) + .hasMessageContaining("Invalid remote path for remote.data.dirs"); + + // Test WEIGHTED_ROUND_ROBIN with mismatched sizes + Configuration mismatchedWeightsConf = new Configuration(); + mismatchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + mismatchedWeightsConf.set( + ConfigOptions.REMOTE_DATA_DIRS_STRATEGY, + ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN); + mismatchedWeightsConf.set( + ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2")); + mismatchedWeightsConf.set( + ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Collections.singletonList(1)); + assertThatThrownBy(() -> validateCoordinatorConfigs(mismatchedWeightsConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key()) + .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key()); + + // Test WEIGHTED_ROUND_ROBIN with matched sizes + Configuration matchedWeightsConf = new Configuration(); + matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + matchedWeightsConf.set( + ConfigOptions.REMOTE_DATA_DIRS_STRATEGY, + ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN); + matchedWeightsConf.set( + ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2")); + matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(1, 2)); + validateCoordinatorConfigs(matchedWeightsConf); + + // Test negative weight + Configuration negativeWeightConf = new Configuration(); + negativeWeightConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + negativeWeightConf.set( + ConfigOptions.REMOTE_DATA_DIRS_STRATEGY, + ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN); + negativeWeightConf.set( + ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2")); + negativeWeightConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(-1, 2)); + assertThatThrownBy(() -> validateCoordinatorConfigs(negativeWeightConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key()) + .hasMessageContaining( + "All weights in 'remote.data.dirs.weights' must be no less than 0"); + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java index ca3708bfea..77e72b7442 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java @@ -22,7 +22,6 @@ import org.apache.fluss.cluster.ServerType; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; -import org.apache.fluss.exception.IllegalConfigurationException; import org.apache.fluss.metadata.DatabaseDescriptor; import org.apache.fluss.metrics.registry.MetricRegistry; import org.apache.fluss.rpc.RpcClient; @@ -66,6 +65,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.fluss.config.FlussConfigUtils.validateCoordinatorConfigs; + /** * Coordinator server implementation. The coordinator server is responsible to: * @@ -154,7 +155,7 @@ public CoordinatorServer(Configuration conf) { public CoordinatorServer(Configuration conf, Clock clock) { super(conf); - validateConfigs(conf); + validateCoordinatorConfigs(conf); this.terminationFuture = new CompletableFuture<>(); this.clock = clock; } @@ -549,31 +550,4 @@ public DynamicConfigManager getDynamicConfigManager() { public RebalanceManager getRebalanceManager() { return coordinatorEventProcessor.getRebalanceManager(); } - - private static void validateConfigs(Configuration conf) { - if (conf.get(ConfigOptions.DEFAULT_REPLICATION_FACTOR) < 1) { - throw new IllegalConfigurationException( - String.format( - "Invalid configuration for %s, it must be greater than or equal 1.", - ConfigOptions.DEFAULT_REPLICATION_FACTOR.key())); - } - if (conf.get(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS) < 1) { - throw new IllegalConfigurationException( - String.format( - "Invalid configuration for %s, it must be greater than or equal 1.", - ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS.key())); - } - - if (conf.get(ConfigOptions.SERVER_IO_POOL_SIZE) < 1) { - throw new IllegalConfigurationException( - String.format( - "Invalid configuration for %s, it must be greater than or equal 1.", - ConfigOptions.SERVER_IO_POOL_SIZE.key())); - } - - if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) { - throw new IllegalConfigurationException( - String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR)); - } - } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java index 8eed63c844..d108a63372 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java @@ -22,7 +22,6 @@ import org.apache.fluss.cluster.ServerType; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; -import org.apache.fluss.exception.IllegalConfigurationException; import org.apache.fluss.exception.InvalidServerRackInfoException; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metrics.registry.MetricRegistry; @@ -71,7 +70,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -79,6 +77,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.fluss.config.ConfigOptions.BACKGROUND_THREADS; +import static org.apache.fluss.config.FlussConfigUtils.validateTabletConfigs; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTableBucket; /** @@ -177,7 +176,7 @@ public TabletServer(Configuration conf) { public TabletServer(Configuration conf, Clock clock) { super(conf); - validateConfigs(conf); + validateTabletConfigs(conf); this.terminationFuture = new CompletableFuture<>(); this.serverId = conf.getInt(ConfigOptions.TABLET_SERVER_ID); this.rack = conf.getString(ConfigOptions.TABLET_SERVER_RACK); @@ -558,40 +557,6 @@ public ReplicaManager getReplicaManager() { return authorizer; } - private static void validateConfigs(Configuration conf) { - Optional serverId = conf.getOptional(ConfigOptions.TABLET_SERVER_ID); - if (!serverId.isPresent()) { - throw new IllegalConfigurationException( - String.format("Configuration %s must be set.", ConfigOptions.TABLET_SERVER_ID)); - } - - if (serverId.get() < 0) { - throw new IllegalConfigurationException( - String.format( - "Invalid configuration for %s, it must be greater than or equal 0.", - ConfigOptions.TABLET_SERVER_ID.key())); - } - - if (conf.get(ConfigOptions.BACKGROUND_THREADS) < 1) { - throw new IllegalConfigurationException( - String.format( - "Invalid configuration for %s, it must be greater than or equal 1.", - ConfigOptions.BACKGROUND_THREADS.key())); - } - - if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) { - throw new IllegalConfigurationException( - String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR)); - } - - if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > Integer.MAX_VALUE) { - throw new IllegalConfigurationException( - String.format( - "Invalid configuration for %s, it must be less than or equal %d bytes.", - ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE)); - } - } - @VisibleForTesting public RpcServer getRpcServer() { return rpcServer; diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index 80d40fddbc..37100ef26a 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -36,6 +36,9 @@ during the Fluss cluster working. | default.bucket.number | Integer | 1 | The default number of buckets for a table in Fluss cluster. It's a cluster-level parameter and all the tables without specifying bucket number in the cluster will use the value as the bucket number. | | default.replication.factor | Integer | 1 | The default replication factor for the log of a table in Fluss cluster. It's a cluster-level parameter, and all the tables without specifying replication factor in the cluster will use the value as replication factor. | | remote.data.dir | String | (None) | The directory used for storing the kv snapshot data files and remote log for log tiered storage in a Fluss supported filesystem. | +| remote.data.dirs | List<String> | (None) | The directories used for storing the kv snapshot data files and remote log for log tiered storage in a Fluss supported filesystem. This should be a comma-separated list of remote URIs. If not configured, it defaults to the path specified in `remote.data.dir`. Otherwise, one of the paths from this configuration will be used. | +| remote.data.dirs.strategy | Enum | ROUND_ROBIN | The strategy for selecting the remote data directory from `remote.data.dirs`. The candidate strategies are: [ROUND_ROBIN, WEIGHTED_ROUND_ROBIN], the default strategy is ROUND_ROBIN.
ROUND_ROBIN: this strategy employs a round-robin approach to select one from the available remote directories.
WEIGHTED_ROUND_ROBIN: this strategy selects one of the available remote directories based on the weights configured in `remote.data.dirs.weights`. | +| remote.data.dirs.weights | List<Integer>| (None) | The weights of the remote data directories. This is a list of weights corresponding to the `remote.data.dirs` in the same order. When `remote.data.dirs.strategy` is set to `WEIGHTED_ROUND_ROBIN`, this must be configured, and its size must be equal to `remote.data.dirs`; otherwise, it will be ignored. | | remote.fs.write-buffer-size | MemorySize | 4kb | The default size of the write buffer for writing the local files to remote file systems. | | plugin.classloader.parent-first-patterns.additional | List<String> | (None) | A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the plugin parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. These patterns are appended to `classloader.parent-first-patterns.default`. | | plugin.classloader.parent-first-patterns.default | String | java.,
org.apache.fluss.,
javax.annotation.,
org.slf4j,
org.apache.log4j,
org.apache.logging,
org.apache.commons.logging,
ch.qos.logback | A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the plugin parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. This setting should generally not be modified. | From 8a09d1c1f354acd9dc9a3bc77f903ac276f413ad Mon Sep 17 00:00:00 2001 From: Liebing Date: Thu, 5 Mar 2026 15:02:25 +0800 Subject: [PATCH 2/4] address jark's comments --- .../apache/fluss/config/ConfigOptions.java | 25 ++++-- .../apache/fluss/config/FlussConfigUtils.java | 84 +++++++++---------- .../org/apache/fluss/utils/FlussPaths.java | 17 ---- .../fluss/config/FlussConfigUtilsTest.java | 2 +- website/docs/maintenance/configuration.md | 4 +- 5 files changed, 62 insertions(+), 70 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index c00dd49905..bda7ac3a36 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -96,8 +96,14 @@ public class ConfigOptions { .stringType() .noDefaultValue() .withDescription( - "The directory used for storing the kv snapshot data files and remote log for log tiered storage " - + "in a Fluss supported filesystem."); + "The directory in a Fluss supported filesystem for remote data storage. " + + "This configuration is required. " + + "If `remote.data.dirs` is not configured, all remote data files " + + "(kv snapshots, remote log, producer offsets, kv snapshot leases, etc.) " + + "will be stored under this directory. " + + "If `remote.data.dirs` is configured, the kv snapshot data files and remote log files " + + "for tables/partitions will be stored in one of the directories specified by `remote.data.dirs`, " + + "while producer offsets and kv snapshot leases are always stored under this directory."); public static final ConfigOption> REMOTE_DATA_DIRS = key("remote.data.dirs") @@ -105,12 +111,17 @@ public class ConfigOptions { .asList() .defaultValues() .withDescription( - "The directories used for storing the kv snapshot data files and remote log for log tiered storage " - + "in a Fluss supported filesystem. " - + "This should be a comma-separated list of remote URIs. " - + "If not configured, it defaults to the path specified in `" + "A comma-separated list of directories in Fluss supported filesystems " + + "for storing the kv snapshot data files and remote log files of tables/partitions. " + + "This configuration is optional. " + + "If configured, when a new table or a new partition is created, " + + "one of the directories from this list will be selected according to the strategy " + + "specified by `remote.data.dirs.strategy` (`ROUND_ROBIN` by default). " + + "Once assigned, the table/partition will keep using the selected directory for " + + "storing the kv snapshot data files and remote log files. " + + "If not configured, the system uses `" + REMOTE_DATA_DIR.key() - + "`. Otherwise, one of the paths from this configuration will be used."); + + "` as the sole remote data directory for all data."); public static final ConfigOption REMOTE_DATA_DIRS_STRATEGY = key("remote.data.dirs.strategy") diff --git a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java index 9891a58df4..7d66597d92 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java @@ -21,7 +21,6 @@ import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.exception.IllegalConfigurationException; import org.apache.fluss.fs.FsPath; -import org.apache.fluss.utils.FlussPaths; import java.lang.reflect.Field; import java.util.Arrays; @@ -83,11 +82,49 @@ static Map> extractConfigOptions(String prefix) { } public static void validateCoordinatorConfigs(Configuration conf) { - validServerConfigs(conf); + validateServerConfigs(conf); + } + + public static void validateTabletConfigs(Configuration conf) { + validateServerConfigs(conf); + + Optional serverId = conf.getOptional(ConfigOptions.TABLET_SERVER_ID); + if (!serverId.isPresent()) { + throw new IllegalConfigurationException( + String.format("Configuration %s must be set.", ConfigOptions.TABLET_SERVER_ID)); + } + validMinValue(ConfigOptions.TABLET_SERVER_ID, serverId.get(), 0); + } + + /** Validate common server configs. */ + private static void validateServerConfigs(Configuration conf) { + if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) { + throw new IllegalConfigurationException( + String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR)); + } else { + // Must validate that remote.data.dir is a valid FsPath + try { + new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR)); + } catch (Exception e) { + throw new IllegalConfigurationException( + String.format( + "Invalid configuration for %s.", + ConfigOptions.REMOTE_DATA_DIR.key()), + e); + } + } validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1); validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1); validMinValue(conf, ConfigOptions.SERVER_IO_POOL_SIZE, 1); + validMinValue(conf, ConfigOptions.BACKGROUND_THREADS, 1); + + if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > Integer.MAX_VALUE) { + throw new IllegalConfigurationException( + String.format( + "Invalid configuration for %s, it must be less than or equal %d bytes.", + ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE)); + } // Validate remote.data.dirs List remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS); @@ -109,7 +146,7 @@ public static void validateCoordinatorConfigs(Configuration conf) { conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY); if (remoteDataDirStrategy == ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN) { List weights = conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS); - if (!remoteDataDirs.isEmpty() && !weights.isEmpty()) { + if (!remoteDataDirs.isEmpty()) { if (remoteDataDirs.size() != weights.size()) { throw new IllegalConfigurationException( String.format( @@ -119,7 +156,7 @@ public static void validateCoordinatorConfigs(Configuration conf) { ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(), weights.size())); } - // Validate all weights are positive + // Validate all weights are no less than 0 for (int i = 0; i < weights.size(); i++) { if (weights.get(i) < 0) { throw new IllegalConfigurationException( @@ -134,45 +171,6 @@ public static void validateCoordinatorConfigs(Configuration conf) { } } - public static void validateTabletConfigs(Configuration conf) { - validServerConfigs(conf); - - Optional serverId = conf.getOptional(ConfigOptions.TABLET_SERVER_ID); - if (!serverId.isPresent()) { - throw new IllegalConfigurationException( - String.format("Configuration %s must be set.", ConfigOptions.TABLET_SERVER_ID)); - } - validMinValue(ConfigOptions.TABLET_SERVER_ID, serverId.get(), 0); - - validMinValue(conf, ConfigOptions.BACKGROUND_THREADS, 1); - - if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > Integer.MAX_VALUE) { - throw new IllegalConfigurationException( - String.format( - "Invalid configuration for %s, it must be less than or equal %d bytes.", - ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE)); - } - } - - /** Validate common server configs. */ - private static void validServerConfigs(Configuration conf) { - if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) { - throw new IllegalConfigurationException( - String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR)); - } else { - // Must validate that remote.data.dir is a valid FsPath - try { - FlussPaths.remoteDataDir(conf); - } catch (Exception e) { - throw new IllegalConfigurationException( - String.format( - "Invalid configuration for %s.", - ConfigOptions.REMOTE_DATA_DIR.key()), - e); - } - } - } - private static void validMinValue( Configuration conf, ConfigOption option, int minValue) { validMinValue(option, conf.get(option), minValue); diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java b/fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java index d6d3978964..9a0659f180 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java @@ -409,23 +409,6 @@ public static UUID uuidFromRemoteIndexCacheFileName(String fileName) { fileName.substring(fileName.indexOf('_') + 1, fileName.indexOf('.'))); } - // ---------------------------------------------------------------------------------------- - // Remote Data Paths - // ---------------------------------------------------------------------------------------- - - /** - * Returns the remote root directory path for storing data files. - * - *

The path contract: - * - *

-     * {$remote.data.dir}
-     * 
- */ - public static FsPath remoteDataDir(Configuration conf) { - return new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR)); - } - // ---------------------------------------------------------------------------------------- // Remote Log Paths // ---------------------------------------------------------------------------------------- diff --git a/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java b/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java index 7ff0d69e06..727dac7270 100644 --- a/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java @@ -138,7 +138,7 @@ void testValidateCoordinatorConfigs() { ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN); matchedWeightsConf.set( ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2")); - matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(1, 2)); + matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(0, 2)); validateCoordinatorConfigs(matchedWeightsConf); // Test negative weight diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index 37100ef26a..b91b680867 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -35,8 +35,8 @@ during the Fluss cluster working. | `security.${protocol}.*` | String | (none) | Protocol-specific configuration properties. For example, security.sasl.jaas.config for SASL authentication settings. | | default.bucket.number | Integer | 1 | The default number of buckets for a table in Fluss cluster. It's a cluster-level parameter and all the tables without specifying bucket number in the cluster will use the value as the bucket number. | | default.replication.factor | Integer | 1 | The default replication factor for the log of a table in Fluss cluster. It's a cluster-level parameter, and all the tables without specifying replication factor in the cluster will use the value as replication factor. | -| remote.data.dir | String | (None) | The directory used for storing the kv snapshot data files and remote log for log tiered storage in a Fluss supported filesystem. | -| remote.data.dirs | List<String> | (None) | The directories used for storing the kv snapshot data files and remote log for log tiered storage in a Fluss supported filesystem. This should be a comma-separated list of remote URIs. If not configured, it defaults to the path specified in `remote.data.dir`. Otherwise, one of the paths from this configuration will be used. | +| remote.data.dir | String | (None) | The directory in a Fluss supported filesystem for remote data storage. This configuration is required. If `remote.data.dirs` is not configured, all remote data files (kv snapshots, remote log, producer offsets, kv snapshot leases, etc.) will be stored under this directory. If `remote.data.dirs` is configured, the kv snapshot data files and remote log files for tables/partitions will be stored in one of the directories specified by `remote.data.dirs`, while producer offsets and kv snapshot leases are always stored under this directory. | +| remote.data.dirs | List<String> | (None) | A comma-separated list of directories in Fluss supported filesystems for storing the kv snapshot data files and remote log files of tables/partitions. This configuration is optional. If configured, when a new table or a new partition is created, one of the directories from this list will be selected according to the strategy specified by `remote.data.dirs.strategy` (`ROUND_ROBIN` by default). Once assigned, the table/partition will keep using the selected directory for storing the kv snapshot data files and remote log files. If not configured, the system uses `remote.data.dir` as the sole remote data directory for all data. | | remote.data.dirs.strategy | Enum | ROUND_ROBIN | The strategy for selecting the remote data directory from `remote.data.dirs`. The candidate strategies are: [ROUND_ROBIN, WEIGHTED_ROUND_ROBIN], the default strategy is ROUND_ROBIN.
ROUND_ROBIN: this strategy employs a round-robin approach to select one from the available remote directories.
WEIGHTED_ROUND_ROBIN: this strategy selects one of the available remote directories based on the weights configured in `remote.data.dirs.weights`. | | remote.data.dirs.weights | List<Integer>| (None) | The weights of the remote data directories. This is a list of weights corresponding to the `remote.data.dirs` in the same order. When `remote.data.dirs.strategy` is set to `WEIGHTED_ROUND_ROBIN`, this must be configured, and its size must be equal to `remote.data.dirs`; otherwise, it will be ignored. | | remote.fs.write-buffer-size | MemorySize | 4kb | The default size of the write buffer for writing the local files to remote file systems. | From ab0b45bdac4a77cb96780962fec76b4ca84f5bf0 Mon Sep 17 00:00:00 2001 From: Liebing Date: Mon, 9 Mar 2026 18:45:15 +0800 Subject: [PATCH 3/4] address jark's comments --- .../apache/fluss/config/ConfigOptions.java | 16 +--- .../apache/fluss/config/FlussConfigUtils.java | 50 ++++++---- .../fluss/config/FlussConfigUtilsTest.java | 93 +++++++++++-------- website/docs/maintenance/configuration.md | 4 +- 4 files changed, 93 insertions(+), 70 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index bda7ac3a36..8630aca4ee 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -96,14 +96,11 @@ public class ConfigOptions { .stringType() .noDefaultValue() .withDescription( - "The directory in a Fluss supported filesystem for remote data storage. " - + "This configuration is required. " - + "If `remote.data.dirs` is not configured, all remote data files " - + "(kv snapshots, remote log, producer offsets, kv snapshot leases, etc.) " - + "will be stored under this directory. " - + "If `remote.data.dirs` is configured, the kv snapshot data files and remote log files " - + "for tables/partitions will be stored in one of the directories specified by `remote.data.dirs`, " - + "while producer offsets and kv snapshot leases are always stored under this directory."); + "The directory used for storing the kv snapshot data files and remote log for log tiered storage" + + " in a Fluss supported filesystem. " + + "For clusters that already have this configured, do not remove it when upgrading, " + + "even after adding `remote.data.dirs`. " + + "For new clusters, it is recommended to use `remote.data.dirs` instead."); public static final ConfigOption> REMOTE_DATA_DIRS = key("remote.data.dirs") @@ -113,12 +110,9 @@ public class ConfigOptions { .withDescription( "A comma-separated list of directories in Fluss supported filesystems " + "for storing the kv snapshot data files and remote log files of tables/partitions. " - + "This configuration is optional. " + "If configured, when a new table or a new partition is created, " + "one of the directories from this list will be selected according to the strategy " + "specified by `remote.data.dirs.strategy` (`ROUND_ROBIN` by default). " - + "Once assigned, the table/partition will keep using the selected directory for " - + "storing the kv snapshot data files and remote log files. " + "If not configured, the system uses `" + REMOTE_DATA_DIR.key() + "` as the sole remote data directory for all data."); diff --git a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java index 7d66597d92..b3a1d84f92 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java @@ -91,17 +91,27 @@ public static void validateTabletConfigs(Configuration conf) { Optional serverId = conf.getOptional(ConfigOptions.TABLET_SERVER_ID); if (!serverId.isPresent()) { throw new IllegalConfigurationException( - String.format("Configuration %s must be set.", ConfigOptions.TABLET_SERVER_ID)); + String.format( + "Configuration %s must be set.", ConfigOptions.TABLET_SERVER_ID.key())); } validMinValue(ConfigOptions.TABLET_SERVER_ID, serverId.get(), 0); } /** Validate common server configs. */ - private static void validateServerConfigs(Configuration conf) { - if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) { + protected static void validateServerConfigs(Configuration conf) { + // Validate remote.data.dir and remote.data.dirs + String remoteDataDir = conf.get(ConfigOptions.REMOTE_DATA_DIR); + List remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS); + if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null + && conf.get(ConfigOptions.REMOTE_DATA_DIRS).isEmpty()) { throw new IllegalConfigurationException( - String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR)); - } else { + String.format( + "Either %s or %s must be configured.", + ConfigOptions.REMOTE_DATA_DIR.key(), + ConfigOptions.REMOTE_DATA_DIRS.key())); + } + + if (remoteDataDir != null) { // Must validate that remote.data.dir is a valid FsPath try { new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR)); @@ -114,24 +124,11 @@ private static void validateServerConfigs(Configuration conf) { } } - validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1); - validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1); - validMinValue(conf, ConfigOptions.SERVER_IO_POOL_SIZE, 1); - validMinValue(conf, ConfigOptions.BACKGROUND_THREADS, 1); - - if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > Integer.MAX_VALUE) { - throw new IllegalConfigurationException( - String.format( - "Invalid configuration for %s, it must be less than or equal %d bytes.", - ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE)); - } - // Validate remote.data.dirs - List remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS); for (int i = 0; i < remoteDataDirs.size(); i++) { - String remoteDataDir = remoteDataDirs.get(i); + String dir = remoteDataDirs.get(i); try { - new FsPath(remoteDataDir); + new FsPath(dir); } catch (Exception e) { throw new IllegalConfigurationException( String.format( @@ -156,6 +153,7 @@ private static void validateServerConfigs(Configuration conf) { ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(), weights.size())); } + // Validate all weights are no less than 0 for (int i = 0; i < weights.size(); i++) { if (weights.get(i) < 0) { @@ -169,6 +167,18 @@ private static void validateServerConfigs(Configuration conf) { } } } + + validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1); + validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1); + validMinValue(conf, ConfigOptions.SERVER_IO_POOL_SIZE, 1); + validMinValue(conf, ConfigOptions.BACKGROUND_THREADS, 1); + + if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > Integer.MAX_VALUE) { + throw new IllegalConfigurationException( + String.format( + "Invalid configuration for %s, it must be less than or equal %d bytes.", + ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE)); + } } private static void validMinValue( diff --git a/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java b/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java index 727dac7270..e2690c54ba 100644 --- a/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java @@ -29,6 +29,7 @@ import static org.apache.fluss.config.FlussConfigUtils.TABLE_OPTIONS; import static org.apache.fluss.config.FlussConfigUtils.extractConfigOptions; import static org.apache.fluss.config.FlussConfigUtils.validateCoordinatorConfigs; +import static org.apache.fluss.config.FlussConfigUtils.validateTabletConfigs; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -58,44 +59,18 @@ void testExtractOptions() { @Test void testValidateCoordinatorConfigs() { - // Test valid configuration - Configuration validConf = new Configuration(); - validConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); - validateCoordinatorConfigs(validConf); - - // Test invalid DEFAULT_REPLICATION_FACTOR - Configuration invalidReplicationConf = new Configuration(); - invalidReplicationConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); - invalidReplicationConf.set(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 0); - assertThatThrownBy(() -> validateCoordinatorConfigs(invalidReplicationConf)) - .isInstanceOf(IllegalConfigurationException.class) - .hasMessageContaining(ConfigOptions.DEFAULT_REPLICATION_FACTOR.key()) - .hasMessageContaining("must be greater than or equal 1"); - - // Test invalid KV_MAX_RETAINED_SNAPSHOTS - Configuration invalidSnapshotConf = new Configuration(); - invalidSnapshotConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); - invalidSnapshotConf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 0); - assertThatThrownBy(() -> validateCoordinatorConfigs(invalidSnapshotConf)) - .isInstanceOf(IllegalConfigurationException.class) - .hasMessageContaining(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS.key()) - .hasMessageContaining("must be greater than or equal 1"); - - // Test invalid SERVER_IO_POOL_SIZE - Configuration invalidIoPoolConf = new Configuration(); - invalidIoPoolConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); - invalidIoPoolConf.set(ConfigOptions.SERVER_IO_POOL_SIZE, 0); - assertThatThrownBy(() -> validateCoordinatorConfigs(invalidIoPoolConf)) - .isInstanceOf(IllegalConfigurationException.class) - .hasMessageContaining(ConfigOptions.SERVER_IO_POOL_SIZE.key()) - .hasMessageContaining("must be greater than or equal 1"); - - // Test REMOTE_DATA_DIR not set - Configuration noRemoteDirConf = new Configuration(); - assertThatThrownBy(() -> validateCoordinatorConfigs(noRemoteDirConf)) + // Test empty configuration + Configuration emptyConf = new Configuration(); + assertThatThrownBy(() -> validateCoordinatorConfigs(emptyConf)) .isInstanceOf(IllegalConfigurationException.class) .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIR.key()) - .hasMessageContaining("must be set"); + .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key()) + .hasMessageContaining("must be configured"); + + // Test configuration with only REMOTE_DATA_DIR set + Configuration remoteDataDirConf = new Configuration(); + remoteDataDirConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + validateCoordinatorConfigs(remoteDataDirConf); // Test invalid REMOTE_DATA_DIR Configuration invalidRemoteDirConf = new Configuration(); @@ -105,6 +80,12 @@ void testValidateCoordinatorConfigs() { .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIR.key()) .hasMessageContaining("Invalid configuration for remote.data.dir"); + // Test configuration with only REMOTE_DATA_DIRS set + Configuration remoteDataDirsConf = new Configuration(); + remoteDataDirsConf.set( + ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2")); + validateCoordinatorConfigs(remoteDataDirConf); + // Test REMOTE_DATA_DIRS contains invalid path Configuration invalidRemoteDirsConf = new Configuration(); invalidRemoteDirsConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); @@ -113,7 +94,7 @@ void testValidateCoordinatorConfigs() { assertThatThrownBy(() -> validateCoordinatorConfigs(invalidRemoteDirsConf)) .isInstanceOf(IllegalConfigurationException.class) .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key()) - .hasMessageContaining("Invalid remote path for remote.data.dirs"); + .hasMessageContaining("Invalid remote path for"); // Test WEIGHTED_ROUND_ROBIN with mismatched sizes Configuration mismatchedWeightsConf = new Configuration(); @@ -155,5 +136,43 @@ void testValidateCoordinatorConfigs() { .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key()) .hasMessageContaining( "All weights in 'remote.data.dirs.weights' must be no less than 0"); + + // Test invalid DEFAULT_REPLICATION_FACTOR + Configuration invalidReplicationConf = new Configuration(); + invalidReplicationConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + invalidReplicationConf.set(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 0); + assertThatThrownBy(() -> validateCoordinatorConfigs(invalidReplicationConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.DEFAULT_REPLICATION_FACTOR.key()) + .hasMessageContaining("must be greater than or equal 1"); + + // Test invalid KV_MAX_RETAINED_SNAPSHOTS + Configuration invalidSnapshotConf = new Configuration(); + invalidSnapshotConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + invalidSnapshotConf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 0); + assertThatThrownBy(() -> validateCoordinatorConfigs(invalidSnapshotConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS.key()) + .hasMessageContaining("must be greater than or equal 1"); + + // Test invalid SERVER_IO_POOL_SIZE + Configuration invalidIoPoolConf = new Configuration(); + invalidIoPoolConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + invalidIoPoolConf.set(ConfigOptions.SERVER_IO_POOL_SIZE, 0); + assertThatThrownBy(() -> validateCoordinatorConfigs(invalidIoPoolConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.SERVER_IO_POOL_SIZE.key()) + .hasMessageContaining("must be greater than or equal 1"); + } + + @Test + void testValidateTabletConfigs() { + Configuration conf = new Configuration(); + conf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + conf.set(ConfigOptions.TABLET_SERVER_ID, -1); + assertThatThrownBy(() -> validateTabletConfigs(conf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.TABLET_SERVER_ID.key()) + .hasMessageContaining("it must be greater than or equal 0"); } } diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index b91b680867..5e0dad7ab0 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -35,8 +35,8 @@ during the Fluss cluster working. | `security.${protocol}.*` | String | (none) | Protocol-specific configuration properties. For example, security.sasl.jaas.config for SASL authentication settings. | | default.bucket.number | Integer | 1 | The default number of buckets for a table in Fluss cluster. It's a cluster-level parameter and all the tables without specifying bucket number in the cluster will use the value as the bucket number. | | default.replication.factor | Integer | 1 | The default replication factor for the log of a table in Fluss cluster. It's a cluster-level parameter, and all the tables without specifying replication factor in the cluster will use the value as replication factor. | -| remote.data.dir | String | (None) | The directory in a Fluss supported filesystem for remote data storage. This configuration is required. If `remote.data.dirs` is not configured, all remote data files (kv snapshots, remote log, producer offsets, kv snapshot leases, etc.) will be stored under this directory. If `remote.data.dirs` is configured, the kv snapshot data files and remote log files for tables/partitions will be stored in one of the directories specified by `remote.data.dirs`, while producer offsets and kv snapshot leases are always stored under this directory. | -| remote.data.dirs | List<String> | (None) | A comma-separated list of directories in Fluss supported filesystems for storing the kv snapshot data files and remote log files of tables/partitions. This configuration is optional. If configured, when a new table or a new partition is created, one of the directories from this list will be selected according to the strategy specified by `remote.data.dirs.strategy` (`ROUND_ROBIN` by default). Once assigned, the table/partition will keep using the selected directory for storing the kv snapshot data files and remote log files. If not configured, the system uses `remote.data.dir` as the sole remote data directory for all data. | +| remote.data.dir | String | (None) | The directory used for storing the kv snapshot data files and remote log for log tiered storage in a Fluss supported filesystem. For clusters that already have this configured, do not remove it when upgrading, even after adding `remote.data.dirs`. For new clusters, it is recommended to use `remote.data.dirs` instead. | +| remote.data.dirs | List<String> | (None) | A comma-separated list of directories in Fluss supported filesystems for storing the kv snapshot data files and remote log files of tables/partitions. If configured, when a new table or a new partition is created, one of the directories from this list will be selected according to the strategy specified by `remote.data.dirs.strategy` (`ROUND_ROBIN` by default). If not configured, the system uses `remote.data.dir` as the sole remote data directory for all data. | | remote.data.dirs.strategy | Enum | ROUND_ROBIN | The strategy for selecting the remote data directory from `remote.data.dirs`. The candidate strategies are: [ROUND_ROBIN, WEIGHTED_ROUND_ROBIN], the default strategy is ROUND_ROBIN.
ROUND_ROBIN: this strategy employs a round-robin approach to select one from the available remote directories.
WEIGHTED_ROUND_ROBIN: this strategy selects one of the available remote directories based on the weights configured in `remote.data.dirs.weights`. | | remote.data.dirs.weights | List<Integer>| (None) | The weights of the remote data directories. This is a list of weights corresponding to the `remote.data.dirs` in the same order. When `remote.data.dirs.strategy` is set to `WEIGHTED_ROUND_ROBIN`, this must be configured, and its size must be equal to `remote.data.dirs`; otherwise, it will be ignored. | | remote.fs.write-buffer-size | MemorySize | 4kb | The default size of the write buffer for writing the local files to remote file systems. | From 58befa94bcab64b4f311d89db1e0e5a4e632c0cc Mon Sep 17 00:00:00 2001 From: Liebing Date: Mon, 9 Mar 2026 21:21:33 +0800 Subject: [PATCH 4/4] fix --- .../main/java/org/apache/fluss/config/ConfigOptions.java | 6 +++--- website/docs/maintenance/configuration.md | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 8630aca4ee..362d0a28ca 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -98,9 +98,9 @@ public class ConfigOptions { .withDescription( "The directory used for storing the kv snapshot data files and remote log for log tiered storage" + " in a Fluss supported filesystem. " - + "For clusters that already have this configured, do not remove it when upgrading, " - + "even after adding `remote.data.dirs`. " - + "For new clusters, it is recommended to use `remote.data.dirs` instead."); + + "When upgrading to `remote.data.dirs`, please ensure this value is placed as the first entry in the new configuration." + + "For new clusters, it is recommended to use `remote.data.dirs` instead. " + + "If `remote.data.dirs` is configured, this value will be ignored."); public static final ConfigOption> REMOTE_DATA_DIRS = key("remote.data.dirs") diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index 5e0dad7ab0..0de77f0a5e 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -35,7 +35,7 @@ during the Fluss cluster working. | `security.${protocol}.*` | String | (none) | Protocol-specific configuration properties. For example, security.sasl.jaas.config for SASL authentication settings. | | default.bucket.number | Integer | 1 | The default number of buckets for a table in Fluss cluster. It's a cluster-level parameter and all the tables without specifying bucket number in the cluster will use the value as the bucket number. | | default.replication.factor | Integer | 1 | The default replication factor for the log of a table in Fluss cluster. It's a cluster-level parameter, and all the tables without specifying replication factor in the cluster will use the value as replication factor. | -| remote.data.dir | String | (None) | The directory used for storing the kv snapshot data files and remote log for log tiered storage in a Fluss supported filesystem. For clusters that already have this configured, do not remove it when upgrading, even after adding `remote.data.dirs`. For new clusters, it is recommended to use `remote.data.dirs` instead. | +| remote.data.dir | String | (None) | The directory used for storing the kv snapshot data files and remote log for log tiered storage in a Fluss supported filesystem. When upgrading to `remote.data.dirs`, please ensure this value is placed as the first entry in the new configuration.For new clusters, it is recommended to use `remote.data.dirs` instead. If `remote.data.dirs` is configured, this value will be ignored. | | remote.data.dirs | List<String> | (None) | A comma-separated list of directories in Fluss supported filesystems for storing the kv snapshot data files and remote log files of tables/partitions. If configured, when a new table or a new partition is created, one of the directories from this list will be selected according to the strategy specified by `remote.data.dirs.strategy` (`ROUND_ROBIN` by default). If not configured, the system uses `remote.data.dir` as the sole remote data directory for all data. | | remote.data.dirs.strategy | Enum | ROUND_ROBIN | The strategy for selecting the remote data directory from `remote.data.dirs`. The candidate strategies are: [ROUND_ROBIN, WEIGHTED_ROUND_ROBIN], the default strategy is ROUND_ROBIN.
ROUND_ROBIN: this strategy employs a round-robin approach to select one from the available remote directories.
WEIGHTED_ROUND_ROBIN: this strategy selects one of the available remote directories based on the weights configured in `remote.data.dirs.weights`. | | remote.data.dirs.weights | List<Integer>| (None) | The weights of the remote data directories. This is a list of weights corresponding to the `remote.data.dirs` in the same order. When `remote.data.dirs.strategy` is set to `WEIGHTED_ROUND_ROBIN`, this must be configured, and its size must be equal to `remote.data.dirs`; otherwise, it will be ignored. |