Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,59 @@ public class ConfigOptions {
.stringType()
.noDefaultValue()
.withDescription(
"The directory used for storing the kv snapshot data files and remote log for log tiered storage "
+ " in a Fluss supported filesystem.");
"The directory used for storing the kv snapshot data files and remote log for log tiered storage"
+ " in a Fluss supported filesystem. "
+ "For clusters that already have this configured, do not remove it when upgrading, "
+ "even after adding `remote.data.dirs`. "
+ "For new clusters, it is recommended to use `remote.data.dirs` instead.");

public static final ConfigOption<List<String>> REMOTE_DATA_DIRS =
key("remote.data.dirs")
.stringType()
.asList()
.defaultValues()
.withDescription(
"A comma-separated list of directories in Fluss supported filesystems "
+ "for storing the kv snapshot data files and remote log files of tables/partitions. "
+ "If configured, when a new table or a new partition is created, "
+ "one of the directories from this list will be selected according to the strategy "
+ "specified by `remote.data.dirs.strategy` (`ROUND_ROBIN` by default). "
+ "If not configured, the system uses `"
+ REMOTE_DATA_DIR.key()
+ "` as the sole remote data directory for all data.");

public static final ConfigOption<RemoteDataDirStrategy> REMOTE_DATA_DIRS_STRATEGY =
key("remote.data.dirs.strategy")
.enumType(RemoteDataDirStrategy.class)
.defaultValue(RemoteDataDirStrategy.ROUND_ROBIN)
.withDescription(
String.format(
"The strategy for selecting the remote data directory from `%s`. "
+ "The candidate strategies are: %s, the default strategy is %s.\n"
+ "%s: this strategy employs a round-robin approach to select one from the available remote directories.\n"
+ "%s: this strategy selects one of the available remote directories based on the weights configured in `remote.data.dirs.weights`.",
REMOTE_DATA_DIRS.key(),
Arrays.toString(RemoteDataDirStrategy.values()),
RemoteDataDirStrategy.ROUND_ROBIN,
RemoteDataDirStrategy.ROUND_ROBIN,
RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN));

public static final ConfigOption<List<Integer>> REMOTE_DATA_DIRS_WEIGHTS =
key("remote.data.dirs.weights")
.intType()
.asList()
.defaultValues()
.withDescription(
"The weights of the remote data directories. "
+ "This is a list of weights corresponding to the `"
+ REMOTE_DATA_DIRS.key()
+ "` in the same order. When `"
+ REMOTE_DATA_DIRS_STRATEGY.key()
+ "` is set to `"
+ RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN
+ "`, this must be configured, and its size must be equal to `"
+ REMOTE_DATA_DIRS.key()
+ "`; otherwise, it will be ignored.");

public static final ConfigOption<MemorySize> REMOTE_FS_WRITE_BUFFER_SIZE =
key("remote.fs.write-buffer-size")
Expand Down Expand Up @@ -2066,4 +2117,10 @@ private static class ConfigOptionsHolder {
public static ConfigOption<?> getConfigOption(String key) {
return ConfigOptionsHolder.CONFIG_OPTIONS_BY_KEY.get(key);
}

/** Remote data dir select strategy for Fluss. */
public enum RemoteDataDirStrategy {
ROUND_ROBIN,
WEIGHTED_ROUND_ROBIN
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

import org.apache.fluss.annotation.Internal;
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.exception.IllegalConfigurationException;
import org.apache.fluss.fs.FsPath;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/** Utilities of Fluss {@link ConfigOptions}. */
@Internal
Expand Down Expand Up @@ -77,4 +80,118 @@ static Map<String, ConfigOption<?>> extractConfigOptions(String prefix) {
}
return options;
}

public static void validateCoordinatorConfigs(Configuration conf) {
validateServerConfigs(conf);
}

public static void validateTabletConfigs(Configuration conf) {
validateServerConfigs(conf);

Optional<Integer> serverId = conf.getOptional(ConfigOptions.TABLET_SERVER_ID);
if (!serverId.isPresent()) {
throw new IllegalConfigurationException(
String.format(
"Configuration %s must be set.", ConfigOptions.TABLET_SERVER_ID.key()));
}
validMinValue(ConfigOptions.TABLET_SERVER_ID, serverId.get(), 0);
}

/** Validate common server configs. */
protected static void validateServerConfigs(Configuration conf) {
// Validate remote.data.dir and remote.data.dirs
String remoteDataDir = conf.get(ConfigOptions.REMOTE_DATA_DIR);
List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null
&& conf.get(ConfigOptions.REMOTE_DATA_DIRS).isEmpty()) {
throw new IllegalConfigurationException(
String.format(
"Either %s or %s must be configured.",
ConfigOptions.REMOTE_DATA_DIR.key(),
ConfigOptions.REMOTE_DATA_DIRS.key()));
}

if (remoteDataDir != null) {
// Must validate that remote.data.dir is a valid FsPath
try {
new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR));
} catch (Exception e) {
throw new IllegalConfigurationException(
String.format(
"Invalid configuration for %s.",
ConfigOptions.REMOTE_DATA_DIR.key()),
e);
}
}

// Validate remote.data.dirs
for (int i = 0; i < remoteDataDirs.size(); i++) {
String dir = remoteDataDirs.get(i);
try {
new FsPath(dir);
} catch (Exception e) {
throw new IllegalConfigurationException(
String.format(
"Invalid remote path for %s at index %d.",
ConfigOptions.REMOTE_DATA_DIRS.key(), i),
e);
}
}

// Validate remote.data.dirs.strategy
ConfigOptions.RemoteDataDirStrategy remoteDataDirStrategy =
conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY);
if (remoteDataDirStrategy == ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN) {
List<Integer> weights = conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS);
if (!remoteDataDirs.isEmpty()) {
if (remoteDataDirs.size() != weights.size()) {
throw new IllegalConfigurationException(
String.format(
"The size of '%s' (%d) must match the size of '%s' (%d) when using WEIGHTED_ROUND_ROBIN strategy.",
ConfigOptions.REMOTE_DATA_DIRS.key(),
remoteDataDirs.size(),
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
weights.size()));
}

// Validate all weights are no less than 0
for (int i = 0; i < weights.size(); i++) {
if (weights.get(i) < 0) {
throw new IllegalConfigurationException(
String.format(
"All weights in '%s' must be no less than 0, but found %d at index %d.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition weights.get(i) < 0 should be weights.get(i) <= 0, and the error message should be "must be greater than 0".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need 0. Imagine a scenario where the capacity of a remote storage has reached its limit and we don’t want to transfer any more files to it; in that case, we can set its weight to 0.

ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
weights.get(i),
i));
}
}
}
}

validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1);
validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1);
validMinValue(conf, ConfigOptions.SERVER_IO_POOL_SIZE, 1);
validMinValue(conf, ConfigOptions.BACKGROUND_THREADS, 1);

if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > Integer.MAX_VALUE) {
throw new IllegalConfigurationException(
String.format(
"Invalid configuration for %s, it must be less than or equal %d bytes.",
ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE));
}
}

private static void validMinValue(
Configuration conf, ConfigOption<Integer> option, int minValue) {
validMinValue(option, conf.get(option), minValue);
}

private static void validMinValue(ConfigOption<Integer> option, int value, int minValue) {
if (value < minValue) {
throw new IllegalConfigurationException(
String.format(
"Invalid configuration for %s, it must be greater than or equal %d.",
option.key(), minValue));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@

package org.apache.fluss.config;

import org.apache.fluss.exception.IllegalConfigurationException;

import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;

import static org.apache.fluss.config.FlussConfigUtils.CLIENT_OPTIONS;
import static org.apache.fluss.config.FlussConfigUtils.TABLE_OPTIONS;
import static org.apache.fluss.config.FlussConfigUtils.extractConfigOptions;
import static org.apache.fluss.config.FlussConfigUtils.validateCoordinatorConfigs;
import static org.apache.fluss.config.FlussConfigUtils.validateTabletConfigs;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link FlussConfigUtils}. */
class FlussConfigUtilsTest {
Expand All @@ -49,4 +56,123 @@ void testExtractOptions() {
});
assertThat(clientOptions.size()).isEqualTo(CLIENT_OPTIONS.size());
}

@Test
void testValidateCoordinatorConfigs() {
// Test empty configuration
Configuration emptyConf = new Configuration();
assertThatThrownBy(() -> validateCoordinatorConfigs(emptyConf))
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIR.key())
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key())
.hasMessageContaining("must be configured");

// Test configuration with only REMOTE_DATA_DIR set
Configuration remoteDataDirConf = new Configuration();
remoteDataDirConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
validateCoordinatorConfigs(remoteDataDirConf);

// Test invalid REMOTE_DATA_DIR
Configuration invalidRemoteDirConf = new Configuration();
invalidRemoteDirConf.set(ConfigOptions.REMOTE_DATA_DIR, "123://invalid.com");
assertThatThrownBy(() -> validateCoordinatorConfigs(invalidRemoteDirConf))
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIR.key())
.hasMessageContaining("Invalid configuration for remote.data.dir");

// Test configuration with only REMOTE_DATA_DIRS set
Configuration remoteDataDirsConf = new Configuration();
remoteDataDirsConf.set(
ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2"));
validateCoordinatorConfigs(remoteDataDirConf);

// Test REMOTE_DATA_DIRS contains invalid path
Configuration invalidRemoteDirsConf = new Configuration();
invalidRemoteDirsConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
invalidRemoteDirsConf.set(
ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "123://invalid.com"));
assertThatThrownBy(() -> validateCoordinatorConfigs(invalidRemoteDirsConf))
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key())
.hasMessageContaining("Invalid remote path for");

// Test WEIGHTED_ROUND_ROBIN with mismatched sizes
Configuration mismatchedWeightsConf = new Configuration();
mismatchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
mismatchedWeightsConf.set(
ConfigOptions.REMOTE_DATA_DIRS_STRATEGY,
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN);
mismatchedWeightsConf.set(
ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2"));
mismatchedWeightsConf.set(
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Collections.singletonList(1));
assertThatThrownBy(() -> validateCoordinatorConfigs(mismatchedWeightsConf))
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key())
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key());

// Test WEIGHTED_ROUND_ROBIN with matched sizes
Configuration matchedWeightsConf = new Configuration();
matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
matchedWeightsConf.set(
ConfigOptions.REMOTE_DATA_DIRS_STRATEGY,
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN);
matchedWeightsConf.set(
ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2"));
matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(0, 2));
validateCoordinatorConfigs(matchedWeightsConf);

// Test negative weight
Configuration negativeWeightConf = new Configuration();
negativeWeightConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
negativeWeightConf.set(
ConfigOptions.REMOTE_DATA_DIRS_STRATEGY,
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN);
negativeWeightConf.set(
ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2"));
negativeWeightConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(-1, 2));
assertThatThrownBy(() -> validateCoordinatorConfigs(negativeWeightConf))
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key())
.hasMessageContaining(
"All weights in 'remote.data.dirs.weights' must be no less than 0");

// Test invalid DEFAULT_REPLICATION_FACTOR
Configuration invalidReplicationConf = new Configuration();
invalidReplicationConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
invalidReplicationConf.set(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 0);
assertThatThrownBy(() -> validateCoordinatorConfigs(invalidReplicationConf))
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining(ConfigOptions.DEFAULT_REPLICATION_FACTOR.key())
.hasMessageContaining("must be greater than or equal 1");

// Test invalid KV_MAX_RETAINED_SNAPSHOTS
Configuration invalidSnapshotConf = new Configuration();
invalidSnapshotConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
invalidSnapshotConf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 0);
assertThatThrownBy(() -> validateCoordinatorConfigs(invalidSnapshotConf))
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS.key())
.hasMessageContaining("must be greater than or equal 1");

// Test invalid SERVER_IO_POOL_SIZE
Configuration invalidIoPoolConf = new Configuration();
invalidIoPoolConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
invalidIoPoolConf.set(ConfigOptions.SERVER_IO_POOL_SIZE, 0);
assertThatThrownBy(() -> validateCoordinatorConfigs(invalidIoPoolConf))
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining(ConfigOptions.SERVER_IO_POOL_SIZE.key())
.hasMessageContaining("must be greater than or equal 1");
}

@Test
void testValidateTabletConfigs() {
Configuration conf = new Configuration();
conf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
conf.set(ConfigOptions.TABLET_SERVER_ID, -1);
assertThatThrownBy(() -> validateTabletConfigs(conf))
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining(ConfigOptions.TABLET_SERVER_ID.key())
.hasMessageContaining("it must be greater than or equal 0");
}
}
Loading