Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,29 @@
*/
package org.apache.accumulo.testing.continuous;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.access.AccessExpression.unquote;
import static org.apache.accumulo.access.ParsedAccessExpression.ExpressionType.AND;
import static org.apache.accumulo.access.ParsedAccessExpression.ExpressionType.AUTHORIZATION;
import static org.apache.accumulo.testing.continuous.ContinuousIngest.genCol;
import static org.apache.accumulo.testing.continuous.ContinuousIngest.genLong;
import static org.apache.accumulo.testing.continuous.ContinuousIngest.genRow;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.List;
import java.util.Random;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.CRC32;
import java.util.zip.Checksum;

import org.apache.accumulo.access.AccessExpression;
import org.apache.accumulo.access.ParsedAccessExpression;
import org.apache.accumulo.access.ParsedAccessExpression.ExpressionType;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.ColumnVisibility;
Expand All @@ -52,6 +59,142 @@
*/
public class ContinuousInputFormat extends InputFormat<Key,Value> {

/**
* As part of normalizing access expression this class is used to sort and dedupe sub-expressions
* in a tree set.
*/
private static class NormalizedExpression implements Comparable<NormalizedExpression> {
public final String expression;
public final ExpressionType type;

NormalizedExpression(String expression, ExpressionType type) {
this.expression = expression;
this.type = type;
}

// determines the sort order of different kinds of subexpressions.
private static int typeOrder(ExpressionType type) {
switch (type) {
case AUTHORIZATION:
return 1;
case OR:
return 2;
case AND:
return 3;
default:
throw new IllegalArgumentException("Unexpected type " + type);
}
}

@Override
public int compareTo(NormalizedExpression o) {
// Changing this comparator would significantly change how expressions are normalized.
int cmp = typeOrder(type) - typeOrder(o.type);
if (cmp == 0) {
if (type == AUTHORIZATION) {
// sort based on the unquoted and unescaped form of the authorization
cmp = unquote(expression).compareTo(unquote(o.expression));
} else {
cmp = expression.compareTo(o.expression);
}

}
return cmp;
}

@Override
public boolean equals(Object o) {
if (o instanceof NormalizedExpression) {
return compareTo((NormalizedExpression) o) == 0;
}
return false;
}

@Override
public int hashCode() {
return expression.hashCode();
}
}

/**
* This method helps with the flattening aspect of normalization by recursing down as far as
* possible the parse tree in the case when the expression type is the same. As long as the type
* is the same in the sub expression, keep using the same tree set.
*/
private static void flatten(ExpressionType parentType, ParsedAccessExpression parsed,
TreeSet<NormalizedExpression> normalizedExpressions) {
if (parsed.getType() == parentType) {
for (var child : parsed.getChildren()) {
flatten(parentType, child, normalizedExpressions);
}
} else {
// The type changed, so start again on the subexpression.
normalizedExpressions.add(normalize(parsed));
}
}

/**
* <p>
* For a given access expression this example will deduplicate, sort, flatten, and remove unneeded
* parentheses or quotes in the expressions. The following list gives examples of what each
* normalization step does.
*
* <ul>
* <li>As an example of flattening, the expression {@code A&(B&C)} flattens to {@code
* A&B&C}.</li>
* <li>As an example of sorting, the expression {@code (Z&Y)|(C&B)} sorts to {@code
* (B&C)|(Y&Z)}</li>
* <li>As an example of deduplication, the expression {@code X&Y&X} normalizes to {@code X&Y}</li>
* <li>As an example of unneeded quotes, the expression {@code "ABC"&"XYZ"} normalizes to
* {@code ABC&XYZ}</li>
* <li>As an example of unneeded parentheses, the expression {@code (((ABC)|(XYZ)))} normalizes to
* {@code ABC|XYZ}</li>
* </ul>
*
* <p>
* This algorithm attempts to have the same behavior as the one in the Accumulo 2.1
* ColumnVisibility class. However the implementation is very different.
* </p>
*/
private static NormalizedExpression normalize(ParsedAccessExpression parsed) {
if (parsed.getType() == AUTHORIZATION) {
// If the authorization is quoted and it does not need to be quoted then the following two
// lines will remove the unnecessary quoting.
String unquoted = AccessExpression.unquote(parsed.getExpression());
String quoted = AccessExpression.quote(unquoted);
return new NormalizedExpression(quoted, parsed.getType());
} else {
// The tree set does the work of sorting and deduplicating sub expressions.
TreeSet<NormalizedExpression> normalizedChildren = new TreeSet<>();
for (var child : parsed.getChildren()) {
flatten(parsed.getType(), child, normalizedChildren);
}

if (normalizedChildren.size() == 1) {
return normalizedChildren.first();
} else {
String operator = parsed.getType() == AND ? "&" : "|";
String sep = "";

StringBuilder builder = new StringBuilder();

for (var child : normalizedChildren) {
builder.append(sep);
if (child.type == AUTHORIZATION) {
builder.append(child.expression);
} else {
builder.append("(");
builder.append(child.expression);
builder.append(")");
}
sep = operator;
}

return new NormalizedExpression(builder.toString(), parsed.getType());
}
}
}

private static final String PROP_UUID = "mrbulk.uuid";
private static final String PROP_MAP_TASK = "mrbulk.map.task";
private static final String PROP_MAP_NODES = "mrbulk.map.nodes";
Expand Down Expand Up @@ -123,7 +266,7 @@ public RecordReader<Key,Value> createRecordReader(InputSplit inputSplit,
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext job) {
numNodes = job.getConfiguration().getLong(PROP_MAP_NODES, 1000000);
uuid = job.getConfiguration().get(PROP_UUID).getBytes(StandardCharsets.UTF_8);
uuid = job.getConfiguration().get(PROP_UUID).getBytes(UTF_8);

minRow = job.getConfiguration().getLong(PROP_ROW_MIN, 0);
maxRow = job.getConfiguration().getLong(PROP_ROW_MAX, Long.MAX_VALUE);
Expand All @@ -143,8 +286,8 @@ private Key genKey(CRC32 cksum) {

byte[] fam = genCol(random.nextInt(maxFam));
byte[] qual = genCol(random.nextInt(maxQual));
@SuppressWarnings("deprecation")
byte[] cv = visibilities.get(random.nextInt(visibilities.size())).flatten();
byte[] cv = visibilities.get(random.nextInt(visibilities.size())).getExpression();
cv = normalize(AccessExpression.parse(cv)).expression.getBytes(UTF_8);

if (cksum != null) {
cksum.update(row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
Expand Down Expand Up @@ -60,7 +61,8 @@ public Report runTest(final Environment env) throws Exception {
client.tableOperations().addSplits(TABLE_NAME, getSplits());
client.instanceOperations().waitForBalance();

int totalTabletServers = client.instanceOperations().getTabletServers().size();
int totalTabletServers =
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size();
int expectedAllocation = NUM_SPLITS / totalTabletServers;
int min = expectedAllocation - MARGIN;
int max = expectedAllocation + MARGIN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.ThreadPoolExecutor;

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.testing.randomwalk.RandWalkEnv;
import org.apache.accumulo.testing.randomwalk.State;
import org.slf4j.Logger;
Expand All @@ -37,7 +38,7 @@ public static boolean shouldQueueOperation(State state, RandWalkEnv env) {
final ThreadPoolExecutor pool = (ThreadPoolExecutor) state.get("pool");
long queuedThreads = pool.getTaskCount() - pool.getActiveCount() - pool.getCompletedTaskCount();
final AccumuloClient client = env.getAccumuloClient();
int numTservers = client.instanceOperations().getTabletServers().size();
int numTservers = client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size();

if (!shouldQueue(queuedThreads, numTservers)) {
log.info("Not queueing because of " + queuedThreads + " outstanding tasks");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ static Setting s(Property property, long min, long max) {
s(Property.TSERV_WAL_SORT_BUFFER_SIZE, 1024 * 1024, 1024 * 1024 * 1024L),
s(Property.TSERV_WAL_BLOCKSIZE, 1024 * 1024,1024 * 1024 * 1024 * 10L),
s(Property.MANAGER_BULK_TIMEOUT, 10, 600),
s(Property.MANAGER_FATE_THREADPOOL_SIZE, 1, 100),
s(Property.MANAGER_RECOVERY_DELAY, 0, 100),
s(Property.MANAGER_LEASE_RECOVERY_WAITING_PERIOD, 0, 10),
s(Property.MANAGER_THREADCHECK, 100, 10000),
Expand Down
Loading