Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
import org.apache.fluss.client.table.writer.AppendWriter;
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
import org.apache.fluss.cluster.rebalance.ServerTag;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.MemorySize;
Expand All @@ -42,7 +43,9 @@
import org.apache.fluss.rpc.GatewayClientProxy;
import org.apache.fluss.rpc.RpcClient;
import org.apache.fluss.rpc.gateway.AdminGateway;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
import org.apache.fluss.rpc.messages.InitWriterRequest;
import org.apache.fluss.rpc.messages.InitWriterResponse;
import org.apache.fluss.rpc.messages.MetadataRequest;
Expand Down Expand Up @@ -87,7 +90,7 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.catchThrowable;

/** It case to test authorization of admin operationread and write operation. */
/** It case to test authorization of admin operation, read and write operation. */
public class FlussAuthorizationITCase {
@RegisterExtension
public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
Expand Down Expand Up @@ -732,6 +735,201 @@ void testDynamicConfigs() throws ExecutionException, InterruptedException {
ConfigEntry.ConfigSource.INITIAL_SERVER_CONFIG));
}

@Test
void testControlledShutdown() throws Exception {
ControlledShutdownRequest request =
new ControlledShutdownRequest().setTabletServerId(-1).setTabletServerEpoch(-1);

try (RpcClient rpcClient =
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance(), false)) {
CoordinatorGateway guestGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
rpcClient,
CoordinatorGateway.class);

// test controlledShutdown without ALTER permission on cluster resource
assertThatThrownBy(() -> guestGateway.controlledShutdown(request).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate ALTER on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));
}

// test controlledShutdown with internal connection (FLUSS listener)
// Internal connections should bypass authorization check
CoordinatorGateway internalGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("FLUSS"),
FLUSS_CLUSTER_EXTENSION.getRpcClient(),
CoordinatorGateway.class);

// Even without any ACL permission, internal connection should succeed
// (won't throw AuthorizationException)
// The request may fail for other reasons (e.g., invalid server id),
// but it should not fail due to authorization
assertThatThrownBy(() -> internalGateway.controlledShutdown(request).get())
.rootCause()
.isNotInstanceOf(AuthorizationException.class);
}

@Test
void testAddServerTag() throws Exception {
// test addServerTag without ALTER permission on cluster resource
assertThatThrownBy(
() ->
guestAdmin
.addServerTag(
Collections.singletonList(0),
ServerTag.PERMANENT_OFFLINE)
.get())
.rootCause()
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate ALTER on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));

// add ALTER permission to guest user on cluster resource
rootAdmin
.createAcls(
Collections.singletonList(
new AclBinding(
Resource.cluster(),
new AccessControlEntry(
guestPrincipal,
"*",
OperationType.ALTER,
PermissionType.ALLOW))))
.all()
.get();

// test addServerTag with ALTER permission should succeed
guestAdmin.addServerTag(Collections.singletonList(0), ServerTag.PERMANENT_OFFLINE).get();

// recover server tag
guestAdmin.removeServerTag(Collections.singletonList(0), ServerTag.PERMANENT_OFFLINE);
}

@Test
void testRemoveServerTag() throws Exception {
// test removeServerTag without ALTER permission on cluster resource
assertThatThrownBy(
() ->
guestAdmin
.removeServerTag(
Collections.singletonList(0),
ServerTag.PERMANENT_OFFLINE)
.get())
.rootCause()
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate ALTER on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));

// add ALTER permission to guest user on cluster resource
rootAdmin
.createAcls(
Collections.singletonList(
new AclBinding(
Resource.cluster(),
new AccessControlEntry(
guestPrincipal,
"*",
OperationType.ALTER,
PermissionType.ALLOW))))
.all()
.get();

// test removeServerTag with ALTER permission should succeed
guestAdmin.removeServerTag(Collections.singletonList(0), ServerTag.PERMANENT_OFFLINE).get();
}

@Test
void testRebalance() throws Exception {
// test rebalance without WRITE permission on cluster resource
assertThatThrownBy(() -> guestAdmin.rebalance(Collections.emptyList()).get())
.rootCause()
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));

// add WRITE permission to guest user on cluster resource
rootAdmin
.createAcls(
Collections.singletonList(
new AclBinding(
Resource.cluster(),
new AccessControlEntry(
guestPrincipal,
"*",
OperationType.WRITE,
PermissionType.ALLOW))))
.all()
.get();

// test rebalance with WRITE permission should succeed
guestAdmin.rebalance(Collections.emptyList()).get();
}

@Test
void testListRebalanceProgress() throws Exception {
// test listRebalanceProgress without DESCRIBE permission on cluster resource
assertThatThrownBy(() -> guestAdmin.listRebalanceProgress(null).get())
.rootCause()
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate DESCRIBE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));

// add DESCRIBE permission to guest user on cluster resource
rootAdmin
.createAcls(
Collections.singletonList(
new AclBinding(
Resource.cluster(),
new AccessControlEntry(
guestPrincipal,
"*",
OperationType.DESCRIBE,
PermissionType.ALLOW))))
.all()
.get();

// test listRebalanceProgress with DESCRIBE permission should succeed
guestAdmin.listRebalanceProgress(null).get();
}

@Test
void testCancelRebalance() throws Exception {
// test cancelRebalance without WRITE permission on cluster resource
assertThatThrownBy(() -> guestAdmin.cancelRebalance(null).get())
.rootCause()
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));

// add WRITE permission to guest user on cluster resource
rootAdmin
.createAcls(
Collections.singletonList(
new AclBinding(
Resource.cluster(),
new AccessControlEntry(
guestPrincipal,
"*",
OperationType.WRITE,
PermissionType.ALLOW))))
.all()
.get();

// test cancelRebalance with WRITE permission should succeed
guestAdmin.cancelRebalance(null).get();
}

private static Configuration initConfig() {
Configuration conf = new Configuration();
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,10 @@ public CompletableFuture<LakeTieringHeartbeatResponse> lakeTieringHeartbeat(
@Override
public CompletableFuture<ControlledShutdownResponse> controlledShutdown(
ControlledShutdownRequest request) {
if (authorizer != null) {
authorizer.authorize(currentSession(), OperationType.ALTER, Resource.cluster());
}

CompletableFuture<ControlledShutdownResponse> response = new CompletableFuture<>();
eventManagerSupplier
.get()
Expand Down Expand Up @@ -825,6 +829,10 @@ public CompletableFuture<AlterClusterConfigsResponse> alterClusterConfigs(

@Override
public CompletableFuture<AddServerTagResponse> addServerTag(AddServerTagRequest request) {
if (authorizer != null) {
authorizer.authorize(currentSession(), OperationType.ALTER, Resource.cluster());
}

CompletableFuture<AddServerTagResponse> response = new CompletableFuture<>();
eventManagerSupplier
.get()
Expand All @@ -841,6 +849,10 @@ public CompletableFuture<AddServerTagResponse> addServerTag(AddServerTagRequest
@Override
public CompletableFuture<RemoveServerTagResponse> removeServerTag(
RemoveServerTagRequest request) {
if (authorizer != null) {
authorizer.authorize(currentSession(), OperationType.ALTER, Resource.cluster());
}

CompletableFuture<RemoveServerTagResponse> response = new CompletableFuture<>();
eventManagerSupplier
.get()
Expand All @@ -856,6 +868,10 @@ public CompletableFuture<RemoveServerTagResponse> removeServerTag(

@Override
public CompletableFuture<RebalanceResponse> rebalance(RebalanceRequest request) {
if (authorizer != null) {
authorizer.authorize(currentSession(), OperationType.WRITE, Resource.cluster());
}

List<Goal> goalsByPriority = new ArrayList<>();
Arrays.stream(request.getGoals())
.forEach(goal -> goalsByPriority.add(getGoalByType(GoalType.valueOf(goal))));
Expand All @@ -868,6 +884,10 @@ public CompletableFuture<RebalanceResponse> rebalance(RebalanceRequest request)
@Override
public CompletableFuture<ListRebalanceProgressResponse> listRebalanceProgress(
ListRebalanceProgressRequest request) {
if (authorizer != null) {
authorizer.authorize(currentSession(), OperationType.DESCRIBE, Resource.cluster());
}

CompletableFuture<ListRebalanceProgressResponse> response = new CompletableFuture<>();
eventManagerSupplier
.get()
Expand All @@ -881,6 +901,10 @@ public CompletableFuture<ListRebalanceProgressResponse> listRebalanceProgress(
@Override
public CompletableFuture<CancelRebalanceResponse> cancelRebalance(
CancelRebalanceRequest request) {
if (authorizer != null) {
authorizer.authorize(currentSession(), OperationType.WRITE, Resource.cluster());
}

CompletableFuture<CancelRebalanceResponse> response = new CompletableFuture<>();
eventManagerSupplier
.get()
Expand Down