diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 08742ebc2e..362d0a28ca 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -96,8 +96,59 @@ public class ConfigOptions { .stringType() .noDefaultValue() .withDescription( - "The directory used for storing the kv snapshot data files and remote log for log tiered storage " - + " in a Fluss supported filesystem."); + "The directory used for storing the kv snapshot data files and remote log for log tiered storage" + + " in a Fluss supported filesystem. " + + "When upgrading to `remote.data.dirs`, please ensure this value is placed as the first entry in the new configuration." + + "For new clusters, it is recommended to use `remote.data.dirs` instead. " + + "If `remote.data.dirs` is configured, this value will be ignored."); + + public static final ConfigOption> REMOTE_DATA_DIRS = + key("remote.data.dirs") + .stringType() + .asList() + .defaultValues() + .withDescription( + "A comma-separated list of directories in Fluss supported filesystems " + + "for storing the kv snapshot data files and remote log files of tables/partitions. " + + "If configured, when a new table or a new partition is created, " + + "one of the directories from this list will be selected according to the strategy " + + "specified by `remote.data.dirs.strategy` (`ROUND_ROBIN` by default). " + + "If not configured, the system uses `" + + REMOTE_DATA_DIR.key() + + "` as the sole remote data directory for all data."); + + public static final ConfigOption REMOTE_DATA_DIRS_STRATEGY = + key("remote.data.dirs.strategy") + .enumType(RemoteDataDirStrategy.class) + .defaultValue(RemoteDataDirStrategy.ROUND_ROBIN) + .withDescription( + String.format( + "The strategy for selecting the remote data directory from `%s`. " + + "The candidate strategies are: %s, the default strategy is %s.\n" + + "%s: this strategy employs a round-robin approach to select one from the available remote directories.\n" + + "%s: this strategy selects one of the available remote directories based on the weights configured in `remote.data.dirs.weights`.", + REMOTE_DATA_DIRS.key(), + Arrays.toString(RemoteDataDirStrategy.values()), + RemoteDataDirStrategy.ROUND_ROBIN, + RemoteDataDirStrategy.ROUND_ROBIN, + RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN)); + + public static final ConfigOption> REMOTE_DATA_DIRS_WEIGHTS = + key("remote.data.dirs.weights") + .intType() + .asList() + .defaultValues() + .withDescription( + "The weights of the remote data directories. " + + "This is a list of weights corresponding to the `" + + REMOTE_DATA_DIRS.key() + + "` in the same order. When `" + + REMOTE_DATA_DIRS_STRATEGY.key() + + "` is set to `" + + RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN + + "`, this must be configured, and its size must be equal to `" + + REMOTE_DATA_DIRS.key() + + "`; otherwise, it will be ignored."); public static final ConfigOption REMOTE_FS_WRITE_BUFFER_SIZE = key("remote.fs.write-buffer-size") @@ -2066,4 +2117,10 @@ private static class ConfigOptionsHolder { public static ConfigOption getConfigOption(String key) { return ConfigOptionsHolder.CONFIG_OPTIONS_BY_KEY.get(key); } + + /** Remote data dir select strategy for Fluss. */ + public enum RemoteDataDirStrategy { + ROUND_ROBIN, + WEIGHTED_ROUND_ROBIN + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java index 08b97256c4..b3a1d84f92 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java @@ -19,12 +19,15 @@ import org.apache.fluss.annotation.Internal; import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.exception.IllegalConfigurationException; +import org.apache.fluss.fs.FsPath; import java.lang.reflect.Field; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; /** Utilities of Fluss {@link ConfigOptions}. */ @Internal @@ -77,4 +80,118 @@ static Map> extractConfigOptions(String prefix) { } return options; } + + public static void validateCoordinatorConfigs(Configuration conf) { + validateServerConfigs(conf); + } + + public static void validateTabletConfigs(Configuration conf) { + validateServerConfigs(conf); + + Optional serverId = conf.getOptional(ConfigOptions.TABLET_SERVER_ID); + if (!serverId.isPresent()) { + throw new IllegalConfigurationException( + String.format( + "Configuration %s must be set.", ConfigOptions.TABLET_SERVER_ID.key())); + } + validMinValue(ConfigOptions.TABLET_SERVER_ID, serverId.get(), 0); + } + + /** Validate common server configs. */ + protected static void validateServerConfigs(Configuration conf) { + // Validate remote.data.dir and remote.data.dirs + String remoteDataDir = conf.get(ConfigOptions.REMOTE_DATA_DIR); + List remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS); + if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null + && conf.get(ConfigOptions.REMOTE_DATA_DIRS).isEmpty()) { + throw new IllegalConfigurationException( + String.format( + "Either %s or %s must be configured.", + ConfigOptions.REMOTE_DATA_DIR.key(), + ConfigOptions.REMOTE_DATA_DIRS.key())); + } + + if (remoteDataDir != null) { + // Must validate that remote.data.dir is a valid FsPath + try { + new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR)); + } catch (Exception e) { + throw new IllegalConfigurationException( + String.format( + "Invalid configuration for %s.", + ConfigOptions.REMOTE_DATA_DIR.key()), + e); + } + } + + // Validate remote.data.dirs + for (int i = 0; i < remoteDataDirs.size(); i++) { + String dir = remoteDataDirs.get(i); + try { + new FsPath(dir); + } catch (Exception e) { + throw new IllegalConfigurationException( + String.format( + "Invalid remote path for %s at index %d.", + ConfigOptions.REMOTE_DATA_DIRS.key(), i), + e); + } + } + + // Validate remote.data.dirs.strategy + ConfigOptions.RemoteDataDirStrategy remoteDataDirStrategy = + conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY); + if (remoteDataDirStrategy == ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN) { + List weights = conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS); + if (!remoteDataDirs.isEmpty()) { + if (remoteDataDirs.size() != weights.size()) { + throw new IllegalConfigurationException( + String.format( + "The size of '%s' (%d) must match the size of '%s' (%d) when using WEIGHTED_ROUND_ROBIN strategy.", + ConfigOptions.REMOTE_DATA_DIRS.key(), + remoteDataDirs.size(), + ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(), + weights.size())); + } + + // Validate all weights are no less than 0 + for (int i = 0; i < weights.size(); i++) { + if (weights.get(i) < 0) { + throw new IllegalConfigurationException( + String.format( + "All weights in '%s' must be no less than 0, but found %d at index %d.", + ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(), + weights.get(i), + i)); + } + } + } + } + + validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1); + validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1); + validMinValue(conf, ConfigOptions.SERVER_IO_POOL_SIZE, 1); + validMinValue(conf, ConfigOptions.BACKGROUND_THREADS, 1); + + if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > Integer.MAX_VALUE) { + throw new IllegalConfigurationException( + String.format( + "Invalid configuration for %s, it must be less than or equal %d bytes.", + ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE)); + } + } + + private static void validMinValue( + Configuration conf, ConfigOption option, int minValue) { + validMinValue(option, conf.get(option), minValue); + } + + private static void validMinValue(ConfigOption option, int value, int minValue) { + if (value < minValue) { + throw new IllegalConfigurationException( + String.format( + "Invalid configuration for %s, it must be greater than or equal %d.", + option.key(), minValue)); + } + } } diff --git a/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java b/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java index e24bc121aa..e2690c54ba 100644 --- a/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java @@ -17,14 +17,21 @@ package org.apache.fluss.config; +import org.apache.fluss.exception.IllegalConfigurationException; + import org.junit.jupiter.api.Test; +import java.util.Arrays; +import java.util.Collections; import java.util.Map; import static org.apache.fluss.config.FlussConfigUtils.CLIENT_OPTIONS; import static org.apache.fluss.config.FlussConfigUtils.TABLE_OPTIONS; import static org.apache.fluss.config.FlussConfigUtils.extractConfigOptions; +import static org.apache.fluss.config.FlussConfigUtils.validateCoordinatorConfigs; +import static org.apache.fluss.config.FlussConfigUtils.validateTabletConfigs; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link FlussConfigUtils}. */ class FlussConfigUtilsTest { @@ -49,4 +56,123 @@ void testExtractOptions() { }); assertThat(clientOptions.size()).isEqualTo(CLIENT_OPTIONS.size()); } + + @Test + void testValidateCoordinatorConfigs() { + // Test empty configuration + Configuration emptyConf = new Configuration(); + assertThatThrownBy(() -> validateCoordinatorConfigs(emptyConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIR.key()) + .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key()) + .hasMessageContaining("must be configured"); + + // Test configuration with only REMOTE_DATA_DIR set + Configuration remoteDataDirConf = new Configuration(); + remoteDataDirConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + validateCoordinatorConfigs(remoteDataDirConf); + + // Test invalid REMOTE_DATA_DIR + Configuration invalidRemoteDirConf = new Configuration(); + invalidRemoteDirConf.set(ConfigOptions.REMOTE_DATA_DIR, "123://invalid.com"); + assertThatThrownBy(() -> validateCoordinatorConfigs(invalidRemoteDirConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIR.key()) + .hasMessageContaining("Invalid configuration for remote.data.dir"); + + // Test configuration with only REMOTE_DATA_DIRS set + Configuration remoteDataDirsConf = new Configuration(); + remoteDataDirsConf.set( + ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2")); + validateCoordinatorConfigs(remoteDataDirConf); + + // Test REMOTE_DATA_DIRS contains invalid path + Configuration invalidRemoteDirsConf = new Configuration(); + invalidRemoteDirsConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + invalidRemoteDirsConf.set( + ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "123://invalid.com")); + assertThatThrownBy(() -> validateCoordinatorConfigs(invalidRemoteDirsConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key()) + .hasMessageContaining("Invalid remote path for"); + + // Test WEIGHTED_ROUND_ROBIN with mismatched sizes + Configuration mismatchedWeightsConf = new Configuration(); + mismatchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + mismatchedWeightsConf.set( + ConfigOptions.REMOTE_DATA_DIRS_STRATEGY, + ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN); + mismatchedWeightsConf.set( + ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2")); + mismatchedWeightsConf.set( + ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Collections.singletonList(1)); + assertThatThrownBy(() -> validateCoordinatorConfigs(mismatchedWeightsConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key()) + .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key()); + + // Test WEIGHTED_ROUND_ROBIN with matched sizes + Configuration matchedWeightsConf = new Configuration(); + matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + matchedWeightsConf.set( + ConfigOptions.REMOTE_DATA_DIRS_STRATEGY, + ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN); + matchedWeightsConf.set( + ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2")); + matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(0, 2)); + validateCoordinatorConfigs(matchedWeightsConf); + + // Test negative weight + Configuration negativeWeightConf = new Configuration(); + negativeWeightConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + negativeWeightConf.set( + ConfigOptions.REMOTE_DATA_DIRS_STRATEGY, + ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN); + negativeWeightConf.set( + ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2")); + negativeWeightConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(-1, 2)); + assertThatThrownBy(() -> validateCoordinatorConfigs(negativeWeightConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key()) + .hasMessageContaining( + "All weights in 'remote.data.dirs.weights' must be no less than 0"); + + // Test invalid DEFAULT_REPLICATION_FACTOR + Configuration invalidReplicationConf = new Configuration(); + invalidReplicationConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + invalidReplicationConf.set(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 0); + assertThatThrownBy(() -> validateCoordinatorConfigs(invalidReplicationConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.DEFAULT_REPLICATION_FACTOR.key()) + .hasMessageContaining("must be greater than or equal 1"); + + // Test invalid KV_MAX_RETAINED_SNAPSHOTS + Configuration invalidSnapshotConf = new Configuration(); + invalidSnapshotConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + invalidSnapshotConf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 0); + assertThatThrownBy(() -> validateCoordinatorConfigs(invalidSnapshotConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS.key()) + .hasMessageContaining("must be greater than or equal 1"); + + // Test invalid SERVER_IO_POOL_SIZE + Configuration invalidIoPoolConf = new Configuration(); + invalidIoPoolConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + invalidIoPoolConf.set(ConfigOptions.SERVER_IO_POOL_SIZE, 0); + assertThatThrownBy(() -> validateCoordinatorConfigs(invalidIoPoolConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.SERVER_IO_POOL_SIZE.key()) + .hasMessageContaining("must be greater than or equal 1"); + } + + @Test + void testValidateTabletConfigs() { + Configuration conf = new Configuration(); + conf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); + conf.set(ConfigOptions.TABLET_SERVER_ID, -1); + assertThatThrownBy(() -> validateTabletConfigs(conf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(ConfigOptions.TABLET_SERVER_ID.key()) + .hasMessageContaining("it must be greater than or equal 0"); + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java index ca3708bfea..77e72b7442 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java @@ -22,7 +22,6 @@ import org.apache.fluss.cluster.ServerType; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; -import org.apache.fluss.exception.IllegalConfigurationException; import org.apache.fluss.metadata.DatabaseDescriptor; import org.apache.fluss.metrics.registry.MetricRegistry; import org.apache.fluss.rpc.RpcClient; @@ -66,6 +65,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.fluss.config.FlussConfigUtils.validateCoordinatorConfigs; + /** * Coordinator server implementation. The coordinator server is responsible to: * @@ -154,7 +155,7 @@ public CoordinatorServer(Configuration conf) { public CoordinatorServer(Configuration conf, Clock clock) { super(conf); - validateConfigs(conf); + validateCoordinatorConfigs(conf); this.terminationFuture = new CompletableFuture<>(); this.clock = clock; } @@ -549,31 +550,4 @@ public DynamicConfigManager getDynamicConfigManager() { public RebalanceManager getRebalanceManager() { return coordinatorEventProcessor.getRebalanceManager(); } - - private static void validateConfigs(Configuration conf) { - if (conf.get(ConfigOptions.DEFAULT_REPLICATION_FACTOR) < 1) { - throw new IllegalConfigurationException( - String.format( - "Invalid configuration for %s, it must be greater than or equal 1.", - ConfigOptions.DEFAULT_REPLICATION_FACTOR.key())); - } - if (conf.get(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS) < 1) { - throw new IllegalConfigurationException( - String.format( - "Invalid configuration for %s, it must be greater than or equal 1.", - ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS.key())); - } - - if (conf.get(ConfigOptions.SERVER_IO_POOL_SIZE) < 1) { - throw new IllegalConfigurationException( - String.format( - "Invalid configuration for %s, it must be greater than or equal 1.", - ConfigOptions.SERVER_IO_POOL_SIZE.key())); - } - - if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) { - throw new IllegalConfigurationException( - String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR)); - } - } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java index 8eed63c844..d108a63372 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java @@ -22,7 +22,6 @@ import org.apache.fluss.cluster.ServerType; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; -import org.apache.fluss.exception.IllegalConfigurationException; import org.apache.fluss.exception.InvalidServerRackInfoException; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metrics.registry.MetricRegistry; @@ -71,7 +70,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -79,6 +77,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.fluss.config.ConfigOptions.BACKGROUND_THREADS; +import static org.apache.fluss.config.FlussConfigUtils.validateTabletConfigs; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTableBucket; /** @@ -177,7 +176,7 @@ public TabletServer(Configuration conf) { public TabletServer(Configuration conf, Clock clock) { super(conf); - validateConfigs(conf); + validateTabletConfigs(conf); this.terminationFuture = new CompletableFuture<>(); this.serverId = conf.getInt(ConfigOptions.TABLET_SERVER_ID); this.rack = conf.getString(ConfigOptions.TABLET_SERVER_RACK); @@ -558,40 +557,6 @@ public ReplicaManager getReplicaManager() { return authorizer; } - private static void validateConfigs(Configuration conf) { - Optional serverId = conf.getOptional(ConfigOptions.TABLET_SERVER_ID); - if (!serverId.isPresent()) { - throw new IllegalConfigurationException( - String.format("Configuration %s must be set.", ConfigOptions.TABLET_SERVER_ID)); - } - - if (serverId.get() < 0) { - throw new IllegalConfigurationException( - String.format( - "Invalid configuration for %s, it must be greater than or equal 0.", - ConfigOptions.TABLET_SERVER_ID.key())); - } - - if (conf.get(ConfigOptions.BACKGROUND_THREADS) < 1) { - throw new IllegalConfigurationException( - String.format( - "Invalid configuration for %s, it must be greater than or equal 1.", - ConfigOptions.BACKGROUND_THREADS.key())); - } - - if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) { - throw new IllegalConfigurationException( - String.format("Configuration %s must be set.", ConfigOptions.REMOTE_DATA_DIR)); - } - - if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > Integer.MAX_VALUE) { - throw new IllegalConfigurationException( - String.format( - "Invalid configuration for %s, it must be less than or equal %d bytes.", - ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE)); - } - } - @VisibleForTesting public RpcServer getRpcServer() { return rpcServer; diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index 80d40fddbc..0de77f0a5e 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -35,7 +35,10 @@ during the Fluss cluster working. | `security.${protocol}.*` | String | (none) | Protocol-specific configuration properties. For example, security.sasl.jaas.config for SASL authentication settings. | | default.bucket.number | Integer | 1 | The default number of buckets for a table in Fluss cluster. It's a cluster-level parameter and all the tables without specifying bucket number in the cluster will use the value as the bucket number. | | default.replication.factor | Integer | 1 | The default replication factor for the log of a table in Fluss cluster. It's a cluster-level parameter, and all the tables without specifying replication factor in the cluster will use the value as replication factor. | -| remote.data.dir | String | (None) | The directory used for storing the kv snapshot data files and remote log for log tiered storage in a Fluss supported filesystem. | +| remote.data.dir | String | (None) | The directory used for storing the kv snapshot data files and remote log for log tiered storage in a Fluss supported filesystem. When upgrading to `remote.data.dirs`, please ensure this value is placed as the first entry in the new configuration.For new clusters, it is recommended to use `remote.data.dirs` instead. If `remote.data.dirs` is configured, this value will be ignored. | +| remote.data.dirs | List<String> | (None) | A comma-separated list of directories in Fluss supported filesystems for storing the kv snapshot data files and remote log files of tables/partitions. If configured, when a new table or a new partition is created, one of the directories from this list will be selected according to the strategy specified by `remote.data.dirs.strategy` (`ROUND_ROBIN` by default). If not configured, the system uses `remote.data.dir` as the sole remote data directory for all data. | +| remote.data.dirs.strategy | Enum | ROUND_ROBIN | The strategy for selecting the remote data directory from `remote.data.dirs`. The candidate strategies are: [ROUND_ROBIN, WEIGHTED_ROUND_ROBIN], the default strategy is ROUND_ROBIN.
ROUND_ROBIN: this strategy employs a round-robin approach to select one from the available remote directories.
WEIGHTED_ROUND_ROBIN: this strategy selects one of the available remote directories based on the weights configured in `remote.data.dirs.weights`. | +| remote.data.dirs.weights | List<Integer>| (None) | The weights of the remote data directories. This is a list of weights corresponding to the `remote.data.dirs` in the same order. When `remote.data.dirs.strategy` is set to `WEIGHTED_ROUND_ROBIN`, this must be configured, and its size must be equal to `remote.data.dirs`; otherwise, it will be ignored. | | remote.fs.write-buffer-size | MemorySize | 4kb | The default size of the write buffer for writing the local files to remote file systems. | | plugin.classloader.parent-first-patterns.additional | List<String> | (None) | A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the plugin parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. These patterns are appended to `classloader.parent-first-patterns.default`. | | plugin.classloader.parent-first-patterns.default | String | java.,
org.apache.fluss.,
javax.annotation.,
org.slf4j,
org.apache.log4j,
org.apache.logging,
org.apache.commons.logging,
ch.qos.logback | A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the plugin parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. This setting should generally not be modified. |