Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
r.getTableId(),
r.getSchemaId(),
TableDescriptor.fromJsonBytes(r.getTableJson()),
// For backward compatibility, results returned by old
// clusters do not include the remote data dir
r.hasRemoteDataDir() ? r.getRemoteDataDir() : null,
r.getCreatedTime(),
r.getModifiedTime()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.annotation.concurrent.GuardedBy;

import java.time.Clock;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -95,7 +96,7 @@ public void start() throws Exception {
void startTokensUpdate() {
try {
LOG.info("Starting tokens update task");
AtomicReference<ObtainedSecurityToken> tokenContainer = new AtomicReference<>();
AtomicReference<List<ObtainedSecurityToken>> tokenContainer = new AtomicReference<>();
Optional<Long> nextRenewal = obtainSecurityTokensAndGetNextRenewal(tokenContainer);

if (tokenContainer.get() != null) {
Expand Down Expand Up @@ -133,17 +134,17 @@ void startTokensUpdate() {
}

protected Optional<Long> obtainSecurityTokensAndGetNextRenewal(
AtomicReference<ObtainedSecurityToken> tokenContainer) {
AtomicReference<List<ObtainedSecurityToken>> tokenContainer) {
try {
LOG.debug("Obtaining security token.");
ObtainedSecurityToken token = securityTokenProvider.obtainSecurityToken();
tokenContainer.set(token);
checkNotNull(token, "Obtained security tokens must not be null");
LOG.debug("Obtained security token successfully");
return token.getValidUntil();
LOG.debug("Obtaining security tokens.");
List<ObtainedSecurityToken> tokens = securityTokenProvider.obtainSecurityTokens();
tokenContainer.set(tokens);
checkNotNull(tokens, "Obtained security tokens must not be null");
LOG.debug("Obtained security tokens successfully");
return tokens.get(0).getValidUntil();
} catch (Exception e) {
Throwable t = ExceptionUtils.stripExecutionException(e);
LOG.error("Failed to obtain security token.", t);
LOG.error("Failed to obtain security tokens.", t);
throw new FlussRuntimeException(t);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
import org.apache.fluss.rpc.messages.GetFileSystemSecurityTokenRequest;

import java.util.List;

/** A default implementation of {@link SecurityTokenProvider} to get token from server. */
public class DefaultSecurityTokenProvider implements SecurityTokenProvider {

Expand All @@ -32,10 +34,10 @@ public DefaultSecurityTokenProvider(AdminReadOnlyGateway adminReadOnlyGateway) {
}

@Override
public ObtainedSecurityToken obtainSecurityToken() throws Exception {
public List<ObtainedSecurityToken> obtainSecurityTokens() throws Exception {
return adminReadOnlyGateway
.getFileSystemSecurityToken(new GetFileSystemSecurityTokenRequest())
.thenApply(ClientRpcMessageUtils::toSecurityToken)
.thenApply(ClientRpcMessageUtils::toSecurityTokens)
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

import org.apache.fluss.fs.token.ObtainedSecurityToken;

import java.util.List;

/** Security token provider API. */
public interface SecurityTokenProvider {

/**
* Obtain security token.
* Obtain security tokens for all file systems configured in the target cluster.
*
* @return the obtained security token.
* @return the obtained security tokens.
*/
ObtainedSecurityToken obtainSecurityToken() throws Exception;
List<ObtainedSecurityToken> obtainSecurityTokens() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.function.Consumer;
Expand Down Expand Up @@ -71,23 +72,25 @@ private Map<String, SecurityTokenReceiver> loadReceivers() {
}

/**
* Callback function when new security token obtained.
* Callback function when new security tokens obtained.
*
* @param token security token obtained. The token will be forwarded to the appropriate {@link
* SecurityTokenReceiver} based on scheme name.
* @param tokens security tokens obtained. The tokens will be forwarded to the appropriate
* {@link SecurityTokenReceiver} based on scheme name.
*/
void onNewTokensObtained(ObtainedSecurityToken token) {
String schemeName = token.getScheme();
LOG.info("New security tokens arrived, sending them to receiver");
if (!securityTokenReceivers.containsKey(schemeName)) {
throw new IllegalStateException(
"Token arrived for service but no receiver found for it: " + schemeName);
void onNewTokensObtained(List<ObtainedSecurityToken> tokens) {
for (ObtainedSecurityToken token : tokens) {
String schemeName = token.getScheme();
LOG.info("New security tokens arrived, sending them to receiver");
if (!securityTokenReceivers.containsKey(schemeName)) {
throw new IllegalStateException(
"Tokens arrived for service but no receiver found for them: " + schemeName);
}
try {
securityTokenReceivers.get(schemeName).onNewTokensObtained(token);
} catch (Exception e) {
LOG.warn("Failed to send tokens to security token receiver {}", schemeName, e);
}
LOG.info("Security tokens sent to receiver");
}
try {
securityTokenReceivers.get(schemeName).onNewTokensObtained(token);
} catch (Exception e) {
LOG.warn("Failed to send token to security token receiver {}", schemeName, e);
}
LOG.info("Security token sent to receiver");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,34 @@ public static List<FsPathAndFileName> toFsPathAndFileName(
public static ObtainedSecurityToken toSecurityToken(
GetFileSystemSecurityTokenResponse response) {
String scheme = response.getSchema();
String authority = response.hasAuthority() ? response.getAuthority() : null;
byte[] tokens = response.getToken();
Long validUntil = response.hasExpirationTime() ? response.getExpirationTime() : null;

Map<String, String> additionInfo = toKeyValueMap(response.getAdditionInfosList());
return new ObtainedSecurityToken(scheme, tokens, validUntil, additionInfo);
return new ObtainedSecurityToken(scheme, authority, tokens, validUntil, additionInfo);
}

public static List<ObtainedSecurityToken> toSecurityTokens(
GetFileSystemSecurityTokenResponse response) {
List<ObtainedSecurityToken> obtainedSecurityTokens = new ArrayList<>();

obtainedSecurityTokens.add(toSecurityToken(response));

response.getTokensList()
.forEach(
token ->
obtainedSecurityTokens.add(
new ObtainedSecurityToken(
token.getSchema(),
token.hasAuthority() ? token.getAuthority() : null,
token.getToken(),
token.hasExpirationTime()
? token.getExpirationTime()
: null,
toKeyValueMap(token.getAdditionInfosList()))));

return obtainedSecurityTokens;
}

public static MetadataRequest makeMetadataRequest(
Expand Down Expand Up @@ -555,8 +578,12 @@ public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse re
pbPartitionInfo ->
new PartitionInfo(
pbPartitionInfo.getPartitionId(),
toResolvedPartitionSpec(
pbPartitionInfo.getPartitionSpec())))
toResolvedPartitionSpec(pbPartitionInfo.getPartitionSpec()),
// For backward compatibility, results returned by old
// clusters do not include the remote data dir
pbPartitionInfo.hasRemoteDataDir()
? pbPartitionInfo.getRemoteDataDir()
: null))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA2_TABLE_INFO;
import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH;
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
import static org.apache.fluss.row.BinaryString.fromString;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toByteBuffer;
Expand Down Expand Up @@ -169,6 +170,7 @@ void testProjection(LogFormat logFormat, byte magic) throws Exception {
.distributedBy(3)
.logFormat(logFormat)
.build(),
DEFAULT_REMOTE_DATA_DIR,
System.currentTimeMillis(),
System.currentTimeMillis());
long fetchOffset = 0L;
Expand Down Expand Up @@ -313,6 +315,7 @@ void testComplexTypeFetch() throws Exception {
.distributedBy(3)
.logFormat(LogFormat.ARROW)
.build(),
DEFAULT_REMOTE_DATA_DIR,
System.currentTimeMillis(),
System.currentTimeMillis());
long fetchOffset = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ void testFetchWithSchemaChange() throws Exception {
DATA1_TABLE_INFO.getNumBuckets(),
DATA1_TABLE_INFO.getProperties(),
DATA1_TABLE_INFO.getCustomProperties(),
DATA1_TABLE_INFO.getRemoteDataDir(),
DATA1_TABLE_INFO.getComment().orElse(null),
DATA1_TABLE_INFO.getCreatedTime(),
DATA1_TABLE_INFO.getModifiedTime()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA2_TABLE_INFO;
import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH;
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
import static org.apache.fluss.testutils.DataTestUtils.genLogFile;
import static org.apache.fluss.utils.FlussPaths.remoteLogSegmentDir;
Expand Down Expand Up @@ -218,6 +219,7 @@ void testProjection(String format) throws Exception {
.distributedBy(3)
.logFormat(logFormat)
.build(),
DEFAULT_REMOTE_DATA_DIR,
System.currentTimeMillis(),
System.currentTimeMillis());
long fetchOffset = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.time.Clock;
import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;

import static java.time.Instant.ofEpochMilli;
import static org.apache.fluss.config.ConfigOptions.FILESYSTEM_SECURITY_TOKEN_RENEWAL_RETRY_BACKOFF;
Expand All @@ -37,7 +38,7 @@ class DefaultSecurityTokenManagerTest {
@Test
void startTokensUpdateShouldScheduleRenewal() {
TestingSecurityTokenProvider testingSecurityTokenProvider =
new TestingSecurityTokenProvider("token1");
new TestingSecurityTokenProvider(Arrays.asList("token1-1", "token1-2", "token1-3"));

// set small token renew backoff
Configuration configuration = new Configuration();
Expand All @@ -54,23 +55,31 @@ void startTokensUpdateShouldScheduleRenewal() {
Duration.ofMinutes(1),
() ->
assertThat(testingSecurityTokenProvider.getHistoryTokens())
.containsExactly("token1"));
.containsExactly(
Arrays.asList("token1-1", "token1-2", "token1-3")));

// token history should be token1, token2
testingSecurityTokenProvider.setCurrentToken("token2");
testingSecurityTokenProvider.setCurrentTokens(
Arrays.asList("token2-1", "token2-2", "token2-3"));
retry(
Duration.ofMinutes(1),
() ->
assertThat(testingSecurityTokenProvider.getHistoryTokens())
.containsExactly("token1", "token2"));
.containsExactly(
Arrays.asList("token1-1", "token1-2", "token1-3"),
Arrays.asList("token2-1", "token2-2", "token2-3")));

// token history should be token1, token2, token3
testingSecurityTokenProvider.setCurrentToken("token3");
testingSecurityTokenProvider.setCurrentTokens(
Arrays.asList("token3-1", "token3-2", "token3-3"));
retry(
Duration.ofMinutes(1),
() ->
assertThat(testingSecurityTokenProvider.getHistoryTokens())
.containsExactly("token1", "token2", "token3"));
.containsExactly(
Arrays.asList("token1-1", "token1-2", "token1-3"),
Arrays.asList("token2-1", "token2-2", "token2-3"),
Arrays.asList("token3-1", "token3-2", "token3-3")));

securityTokenManager.stopTokensUpdate();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,49 +21,65 @@

import java.time.Clock;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.stream.Collectors;

/** A {@link SecurityTokenProvider} for testing purpose. */
public class TestingSecurityTokenProvider implements SecurityTokenProvider {

private final Queue<String> historyTokens = new LinkedBlockingDeque<>();
private volatile String currentToken;
private final Queue<List<String>> historyTokens = new LinkedBlockingDeque<>();
private volatile List<String> currentTokens;

public TestingSecurityTokenProvider(String currentToken) {
this.currentToken = currentToken;
public TestingSecurityTokenProvider(List<String> currentTokens) {
this.currentTokens = currentTokens;
}

public void setCurrentToken(String currentToken) {
public void setCurrentTokens(List<String> currentTokens) {
synchronized (this) {
this.currentToken = currentToken;
this.currentTokens = currentTokens;
}
}

@Override
public ObtainedSecurityToken obtainSecurityToken() {
public List<ObtainedSecurityToken> obtainSecurityTokens() {
synchronized (this) {
String previousToken = historyTokens.peek();
List<String> previousTokens = historyTokens.peek();
long currentTime = Clock.systemDefaultZone().millis();
// we set expire time to 2s later, should be large enough for testing.
// if it's too small, DefaultSecurityTokenManager#calculateRenewalDelay will
// get a negative value by formula ‘Math.round(tokensRenewalTimeRatio * (nextRenewal -
// now))’ which causes never renewal token
// now))’ which causes never renewal tokens
long expireTime = currentTime + 2000;
if (previousToken != null && previousToken.equals(currentToken)) {
// just return the previous one token
return new ObtainedSecurityToken(
"testing", previousToken.getBytes(), expireTime, Collections.emptyMap());
if (previousTokens != null && previousTokens.equals(currentTokens)) {
// just return the previous tokens
return previousTokens.stream()
.map(
t ->
new ObtainedSecurityToken(
"testing",
t.getBytes(),
expireTime,
Collections.emptyMap()))
.collect(Collectors.toList());
} else {
// return the current token and push back to the queue
historyTokens.add(currentToken);
return new ObtainedSecurityToken(
"testing", currentToken.getBytes(), expireTime, Collections.emptyMap());
// return the current tokens and push back to the queue
historyTokens.add(currentTokens);
return currentTokens.stream()
.map(
t ->
new ObtainedSecurityToken(
"testing",
t.getBytes(),
expireTime,
Collections.emptyMap()))
.collect(Collectors.toList());
}
}
}

public Queue<String> getHistoryTokens() {
public Queue<List<String>> getHistoryTokens() {
return historyTokens;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR;
import static org.apache.fluss.testutils.DataTestUtils.indexedRow;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -96,6 +97,7 @@ class RecordAccumulatorTest {
.distributedBy(3)
.property(ConfigOptions.TABLE_LOG_ARROW_COMPRESSION_TYPE.key(), "zstd")
.build(),
DEFAULT_REMOTE_DATA_DIR,
System.currentTimeMillis(),
System.currentTimeMillis());

Expand Down
Loading