diff --git a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java index f760487b7f..947d38f25d 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java @@ -25,6 +25,7 @@ import org.apache.fluss.client.table.scanner.batch.BatchScanner; import org.apache.fluss.client.table.writer.AppendWriter; import org.apache.fluss.client.utils.ClientRpcMessageUtils; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.config.MemorySize; @@ -42,7 +43,9 @@ import org.apache.fluss.rpc.GatewayClientProxy; import org.apache.fluss.rpc.RpcClient; import org.apache.fluss.rpc.gateway.AdminGateway; +import org.apache.fluss.rpc.gateway.CoordinatorGateway; import org.apache.fluss.rpc.gateway.TabletServerGateway; +import org.apache.fluss.rpc.messages.ControlledShutdownRequest; import org.apache.fluss.rpc.messages.InitWriterRequest; import org.apache.fluss.rpc.messages.InitWriterResponse; import org.apache.fluss.rpc.messages.MetadataRequest; @@ -87,7 +90,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.catchThrowable; -/** It case to test authorization of admin operation、read and write operation. */ +/** It case to test authorization of admin operation, read and write operation. */ public class FlussAuthorizationITCase { @RegisterExtension public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = @@ -732,6 +735,201 @@ void testDynamicConfigs() throws ExecutionException, InterruptedException { ConfigEntry.ConfigSource.INITIAL_SERVER_CONFIG)); } + @Test + void testControlledShutdown() throws Exception { + ControlledShutdownRequest request = + new ControlledShutdownRequest().setTabletServerId(-1).setTabletServerEpoch(-1); + + try (RpcClient rpcClient = + RpcClient.create(guestConf, TestingClientMetricGroup.newInstance(), false)) { + CoordinatorGateway guestGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"), + rpcClient, + CoordinatorGateway.class); + + // test controlledShutdown without ALTER permission on cluster resource + assertThatThrownBy(() -> guestGateway.controlledShutdown(request).get()) + .rootCause() + .isInstanceOf(AuthorizationException.class) + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate ALTER on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + } + + // test controlledShutdown with internal connection (FLUSS listener) + // Internal connections should bypass authorization check + CoordinatorGateway internalGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("FLUSS"), + FLUSS_CLUSTER_EXTENSION.getRpcClient(), + CoordinatorGateway.class); + + // Even without any ACL permission, internal connection should succeed + // (won't throw AuthorizationException) + // The request may fail for other reasons (e.g., invalid server id), + // but it should not fail due to authorization + assertThatThrownBy(() -> internalGateway.controlledShutdown(request).get()) + .rootCause() + .isNotInstanceOf(AuthorizationException.class); + } + + @Test + void testAddServerTag() throws Exception { + // test addServerTag without ALTER permission on cluster resource + assertThatThrownBy( + () -> + guestAdmin + .addServerTag( + Collections.singletonList(0), + ServerTag.PERMANENT_OFFLINE) + .get()) + .rootCause() + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate ALTER on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + + // add ALTER permission to guest user on cluster resource + rootAdmin + .createAcls( + Collections.singletonList( + new AclBinding( + Resource.cluster(), + new AccessControlEntry( + guestPrincipal, + "*", + OperationType.ALTER, + PermissionType.ALLOW)))) + .all() + .get(); + + // test addServerTag with ALTER permission should succeed + guestAdmin.addServerTag(Collections.singletonList(0), ServerTag.PERMANENT_OFFLINE).get(); + + // recover server tag + guestAdmin.removeServerTag(Collections.singletonList(0), ServerTag.PERMANENT_OFFLINE); + } + + @Test + void testRemoveServerTag() throws Exception { + // test removeServerTag without ALTER permission on cluster resource + assertThatThrownBy( + () -> + guestAdmin + .removeServerTag( + Collections.singletonList(0), + ServerTag.PERMANENT_OFFLINE) + .get()) + .rootCause() + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate ALTER on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + + // add ALTER permission to guest user on cluster resource + rootAdmin + .createAcls( + Collections.singletonList( + new AclBinding( + Resource.cluster(), + new AccessControlEntry( + guestPrincipal, + "*", + OperationType.ALTER, + PermissionType.ALLOW)))) + .all() + .get(); + + // test removeServerTag with ALTER permission should succeed + guestAdmin.removeServerTag(Collections.singletonList(0), ServerTag.PERMANENT_OFFLINE).get(); + } + + @Test + void testRebalance() throws Exception { + // test rebalance without WRITE permission on cluster resource + assertThatThrownBy(() -> guestAdmin.rebalance(Collections.emptyList()).get()) + .rootCause() + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + + // add WRITE permission to guest user on cluster resource + rootAdmin + .createAcls( + Collections.singletonList( + new AclBinding( + Resource.cluster(), + new AccessControlEntry( + guestPrincipal, + "*", + OperationType.WRITE, + PermissionType.ALLOW)))) + .all() + .get(); + + // test rebalance with WRITE permission should succeed + guestAdmin.rebalance(Collections.emptyList()).get(); + } + + @Test + void testListRebalanceProgress() throws Exception { + // test listRebalanceProgress without DESCRIBE permission on cluster resource + assertThatThrownBy(() -> guestAdmin.listRebalanceProgress(null).get()) + .rootCause() + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate DESCRIBE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + + // add DESCRIBE permission to guest user on cluster resource + rootAdmin + .createAcls( + Collections.singletonList( + new AclBinding( + Resource.cluster(), + new AccessControlEntry( + guestPrincipal, + "*", + OperationType.DESCRIBE, + PermissionType.ALLOW)))) + .all() + .get(); + + // test listRebalanceProgress with DESCRIBE permission should succeed + guestAdmin.listRebalanceProgress(null).get(); + } + + @Test + void testCancelRebalance() throws Exception { + // test cancelRebalance without WRITE permission on cluster resource + assertThatThrownBy(() -> guestAdmin.cancelRebalance(null).get()) + .rootCause() + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + + // add WRITE permission to guest user on cluster resource + rootAdmin + .createAcls( + Collections.singletonList( + new AclBinding( + Resource.cluster(), + new AccessControlEntry( + guestPrincipal, + "*", + OperationType.WRITE, + PermissionType.ALLOW)))) + .all() + .get(); + + // test cancelRebalance with WRITE permission should succeed + guestAdmin.cancelRebalance(null).get(); + } + private static Configuration initConfig() { Configuration conf = new Configuration(); conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index ee0092d1b8..6f60c0c9ca 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -773,6 +773,10 @@ public CompletableFuture lakeTieringHeartbeat( @Override public CompletableFuture controlledShutdown( ControlledShutdownRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), OperationType.ALTER, Resource.cluster()); + } + CompletableFuture response = new CompletableFuture<>(); eventManagerSupplier .get() @@ -825,6 +829,10 @@ public CompletableFuture alterClusterConfigs( @Override public CompletableFuture addServerTag(AddServerTagRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), OperationType.ALTER, Resource.cluster()); + } + CompletableFuture response = new CompletableFuture<>(); eventManagerSupplier .get() @@ -841,6 +849,10 @@ public CompletableFuture addServerTag(AddServerTagRequest @Override public CompletableFuture removeServerTag( RemoveServerTagRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), OperationType.ALTER, Resource.cluster()); + } + CompletableFuture response = new CompletableFuture<>(); eventManagerSupplier .get() @@ -856,6 +868,10 @@ public CompletableFuture removeServerTag( @Override public CompletableFuture rebalance(RebalanceRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), OperationType.WRITE, Resource.cluster()); + } + List goalsByPriority = new ArrayList<>(); Arrays.stream(request.getGoals()) .forEach(goal -> goalsByPriority.add(getGoalByType(GoalType.valueOf(goal)))); @@ -868,6 +884,10 @@ public CompletableFuture rebalance(RebalanceRequest request) @Override public CompletableFuture listRebalanceProgress( ListRebalanceProgressRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), OperationType.DESCRIBE, Resource.cluster()); + } + CompletableFuture response = new CompletableFuture<>(); eventManagerSupplier .get() @@ -881,6 +901,10 @@ public CompletableFuture listRebalanceProgress( @Override public CompletableFuture cancelRebalance( CancelRebalanceRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), OperationType.WRITE, Resource.cluster()); + } + CompletableFuture response = new CompletableFuture<>(); eventManagerSupplier .get()