diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java index a6fa97d1..430f7d69 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java @@ -18,6 +18,10 @@ */ package org.apache.accumulo.testing.continuous; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.access.AccessExpression.unquote; +import static org.apache.accumulo.access.ParsedAccessExpression.ExpressionType.AND; +import static org.apache.accumulo.access.ParsedAccessExpression.ExpressionType.AUTHORIZATION; import static org.apache.accumulo.testing.continuous.ContinuousIngest.genCol; import static org.apache.accumulo.testing.continuous.ContinuousIngest.genLong; import static org.apache.accumulo.testing.continuous.ContinuousIngest.genRow; @@ -25,15 +29,18 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.security.SecureRandom; import java.util.List; import java.util.Random; +import java.util.TreeSet; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.zip.CRC32; import java.util.zip.Checksum; +import org.apache.accumulo.access.AccessExpression; +import org.apache.accumulo.access.ParsedAccessExpression; +import org.apache.accumulo.access.ParsedAccessExpression.ExpressionType; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.ColumnVisibility; @@ -52,6 +59,142 @@ */ public class ContinuousInputFormat extends InputFormat { + /** + * As part of normalizing access expression this class is used to sort and dedupe sub-expressions + * in a tree set. + */ + private static class NormalizedExpression implements Comparable { + public final String expression; + public final ExpressionType type; + + NormalizedExpression(String expression, ExpressionType type) { + this.expression = expression; + this.type = type; + } + + // determines the sort order of different kinds of subexpressions. + private static int typeOrder(ExpressionType type) { + switch (type) { + case AUTHORIZATION: + return 1; + case OR: + return 2; + case AND: + return 3; + default: + throw new IllegalArgumentException("Unexpected type " + type); + } + } + + @Override + public int compareTo(NormalizedExpression o) { + // Changing this comparator would significantly change how expressions are normalized. + int cmp = typeOrder(type) - typeOrder(o.type); + if (cmp == 0) { + if (type == AUTHORIZATION) { + // sort based on the unquoted and unescaped form of the authorization + cmp = unquote(expression).compareTo(unquote(o.expression)); + } else { + cmp = expression.compareTo(o.expression); + } + + } + return cmp; + } + + @Override + public boolean equals(Object o) { + if (o instanceof NormalizedExpression) { + return compareTo((NormalizedExpression) o) == 0; + } + return false; + } + + @Override + public int hashCode() { + return expression.hashCode(); + } + } + + /** + * This method helps with the flattening aspect of normalization by recursing down as far as + * possible the parse tree in the case when the expression type is the same. As long as the type + * is the same in the sub expression, keep using the same tree set. + */ + private static void flatten(ExpressionType parentType, ParsedAccessExpression parsed, + TreeSet normalizedExpressions) { + if (parsed.getType() == parentType) { + for (var child : parsed.getChildren()) { + flatten(parentType, child, normalizedExpressions); + } + } else { + // The type changed, so start again on the subexpression. + normalizedExpressions.add(normalize(parsed)); + } + } + + /** + *

+ * For a given access expression this example will deduplicate, sort, flatten, and remove unneeded + * parentheses or quotes in the expressions. The following list gives examples of what each + * normalization step does. + * + *

    + *
  • As an example of flattening, the expression {@code A&(B&C)} flattens to {@code + * A&B&C}.
  • + *
  • As an example of sorting, the expression {@code (Z&Y)|(C&B)} sorts to {@code + * (B&C)|(Y&Z)}
  • + *
  • As an example of deduplication, the expression {@code X&Y&X} normalizes to {@code X&Y}
  • + *
  • As an example of unneeded quotes, the expression {@code "ABC"&"XYZ"} normalizes to + * {@code ABC&XYZ}
  • + *
  • As an example of unneeded parentheses, the expression {@code (((ABC)|(XYZ)))} normalizes to + * {@code ABC|XYZ}
  • + *
+ * + *

+ * This algorithm attempts to have the same behavior as the one in the Accumulo 2.1 + * ColumnVisibility class. However the implementation is very different. + *

+ */ + private static NormalizedExpression normalize(ParsedAccessExpression parsed) { + if (parsed.getType() == AUTHORIZATION) { + // If the authorization is quoted and it does not need to be quoted then the following two + // lines will remove the unnecessary quoting. + String unquoted = AccessExpression.unquote(parsed.getExpression()); + String quoted = AccessExpression.quote(unquoted); + return new NormalizedExpression(quoted, parsed.getType()); + } else { + // The tree set does the work of sorting and deduplicating sub expressions. + TreeSet normalizedChildren = new TreeSet<>(); + for (var child : parsed.getChildren()) { + flatten(parsed.getType(), child, normalizedChildren); + } + + if (normalizedChildren.size() == 1) { + return normalizedChildren.first(); + } else { + String operator = parsed.getType() == AND ? "&" : "|"; + String sep = ""; + + StringBuilder builder = new StringBuilder(); + + for (var child : normalizedChildren) { + builder.append(sep); + if (child.type == AUTHORIZATION) { + builder.append(child.expression); + } else { + builder.append("("); + builder.append(child.expression); + builder.append(")"); + } + sep = operator; + } + + return new NormalizedExpression(builder.toString(), parsed.getType()); + } + } + } + private static final String PROP_UUID = "mrbulk.uuid"; private static final String PROP_MAP_TASK = "mrbulk.map.task"; private static final String PROP_MAP_NODES = "mrbulk.map.nodes"; @@ -123,7 +266,7 @@ public RecordReader createRecordReader(InputSplit inputSplit, @Override public void initialize(InputSplit inputSplit, TaskAttemptContext job) { numNodes = job.getConfiguration().getLong(PROP_MAP_NODES, 1000000); - uuid = job.getConfiguration().get(PROP_UUID).getBytes(StandardCharsets.UTF_8); + uuid = job.getConfiguration().get(PROP_UUID).getBytes(UTF_8); minRow = job.getConfiguration().getLong(PROP_ROW_MIN, 0); maxRow = job.getConfiguration().getLong(PROP_ROW_MAX, Long.MAX_VALUE); @@ -143,8 +286,8 @@ private Key genKey(CRC32 cksum) { byte[] fam = genCol(random.nextInt(maxFam)); byte[] qual = genCol(random.nextInt(maxQual)); - @SuppressWarnings("deprecation") - byte[] cv = visibilities.get(random.nextInt(visibilities.size())).flatten(); + byte[] cv = visibilities.get(random.nextInt(visibilities.size())).getExpression(); + cv = normalize(AccessExpression.parse(cv)).expression.getBytes(UTF_8); if (cksum != null) { cksum.update(row); diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/SplitBalancingPT.java b/src/main/java/org/apache/accumulo/testing/performance/tests/SplitBalancingPT.java index 48d79b18..1ab20a60 100644 --- a/src/main/java/org/apache/accumulo/testing/performance/tests/SplitBalancingPT.java +++ b/src/main/java/org/apache/accumulo/testing/performance/tests/SplitBalancingPT.java @@ -25,6 +25,7 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@ -60,7 +61,8 @@ public Report runTest(final Environment env) throws Exception { client.tableOperations().addSplits(TABLE_NAME, getSplits()); client.instanceOperations().waitForBalance(); - int totalTabletServers = client.instanceOperations().getTabletServers().size(); + int totalTabletServers = + client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size(); int expectedAllocation = NUM_SPLITS / totalTabletServers; int min = expectedAllocation - MARGIN; int max = expectedAllocation + MARGIN; diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/SelectiveQueueing.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/SelectiveQueueing.java index 8ef6567b..186cfbb1 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/SelectiveQueueing.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/SelectiveQueueing.java @@ -21,6 +21,7 @@ import java.util.concurrent.ThreadPoolExecutor; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.testing.randomwalk.RandWalkEnv; import org.apache.accumulo.testing.randomwalk.State; import org.slf4j.Logger; @@ -37,7 +38,7 @@ public static boolean shouldQueueOperation(State state, RandWalkEnv env) { final ThreadPoolExecutor pool = (ThreadPoolExecutor) state.get("pool"); long queuedThreads = pool.getTaskCount() - pool.getActiveCount() - pool.getCompletedTaskCount(); final AccumuloClient client = env.getAccumuloClient(); - int numTservers = client.instanceOperations().getTabletServers().size(); + int numTservers = client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size(); if (!shouldQueue(queuedThreads, numTservers)) { log.info("Not queueing because of " + queuedThreads + " outstanding tasks"); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java index 01125cf3..c6978519 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java @@ -78,7 +78,6 @@ static Setting s(Property property, long min, long max) { s(Property.TSERV_WAL_SORT_BUFFER_SIZE, 1024 * 1024, 1024 * 1024 * 1024L), s(Property.TSERV_WAL_BLOCKSIZE, 1024 * 1024,1024 * 1024 * 1024 * 10L), s(Property.MANAGER_BULK_TIMEOUT, 10, 600), - s(Property.MANAGER_FATE_THREADPOOL_SIZE, 1, 100), s(Property.MANAGER_RECOVERY_DELAY, 0, 100), s(Property.MANAGER_LEASE_RECOVERY_WAITING_PERIOD, 0, 10), s(Property.MANAGER_THREADCHECK, 100, 10000),