Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -342,8 +343,9 @@ public void visit(final State state, final RandWalkEnv env, Properties props) th
log.debug(" " + entry.getKey() + ": " + entry.getValue());
}
log.debug("State information");
for (String key : new TreeSet<>(state.getMap().keySet())) {
Object value = state.getMap().get(key);
var stateSnapshot = new TreeMap<>(state.getMap());
for (String key : stateSnapshot.keySet()) {
Object value = stateSnapshot.get(key);
String logMsg = " " + key + ": ";
if (value == null)
logMsg += "null";
Expand Down
18 changes: 12 additions & 6 deletions src/main/java/org/apache/accumulo/testing/randomwalk/State.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@
package org.apache.accumulo.testing.randomwalk;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

/**
* A structure for storing state kept during a test. This class is not thread-safe.
*/
public class State {

private final HashMap<String,Object> stateMap = new HashMap<>();
private final Map<String,Object> stateMap = Collections.synchronizedMap(new HashMap<>());
private final List<String> tables = new ArrayList<>();
private final List<String> namespaces = new ArrayList<>();
private final List<String> users = new ArrayList<>();
Expand Down Expand Up @@ -80,10 +82,12 @@ public void remove(String key) {
* @throws RuntimeException if state object is not present
*/
public Object get(String key) {
if (!stateMap.containsKey(key)) {
throw new RuntimeException("State does not contain " + key);
synchronized (stateMap) {
if (!stateMap.containsKey(key)) {
throw new RuntimeException("State does not contain " + key);
}
return stateMap.get(key);
}
return stateMap.get(key);
}

public List<String> getTableNames() {
Expand Down Expand Up @@ -135,8 +139,10 @@ public Object getOkIfAbsent(String key) {
*
* @return state map
*/
HashMap<String,Object> getMap() {
return stateMap;
Map<String,Object> getMap() {
synchronized (stateMap) {
return Map.copyOf(stateMap);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ public class BulkPlusOne extends BulkImportTest {
private static final Value ONE = new Value("1".getBytes());

static void bulkLoadLots(Logger log, State state, RandWalkEnv env, Value value) throws Exception {
String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
String markerLog = "marker:" + markerColumnQualifier;

final FileSystem fs = (FileSystem) state.get("fs");
final Path dir = new Path(fs.getUri() + "/tmp", "bulk_" + UUID.randomUUID());
log.debug("Bulk loading from {}", dir);
log.debug("{} bulk loading from {}", markerLog, dir);
final int parts = env.getRandom().nextInt(10) + 1;

// The set created below should always contain 0. So its very important that zero is first in
Expand All @@ -70,17 +73,16 @@ static void bulkLoadLots(Logger log, State state, RandWalkEnv env, Value value)
List<String> printRows =
startRows.stream().map(row -> String.format(FMT, row)).collect(Collectors.toList());

String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
log.debug("preparing bulk files with start rows " + printRows + " last row "
+ String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
log.debug("{} preparing bulk files with start rows {} last row {} marker ", markerLog,
printRows, String.format(FMT, LOTS - 1));

List<Integer> rows = new ArrayList<>(startRows);
rows.add(LOTS);

for (int i = 0; i < parts; i++) {
String fileName = dir + "/" + String.format("part_%d.rf", i);

log.debug("Creating {}", fileName);
log.debug("{} creating {}", markerLog, fileName);
try (RFileWriter writer = RFile.newWriter().to(fileName).withFileSystem(fs).build()) {
writer.startDefaultLocalityGroup();
int start = rows.get(i);
Expand All @@ -97,8 +99,7 @@ static void bulkLoadLots(Logger log, State state, RandWalkEnv env, Value value)
env.getAccumuloClient().tableOperations().importDirectory(dir.toString())
.to(Setup.getTableName()).tableTime(true).load();
fs.delete(dir, true);
log.debug("Finished bulk import, start rows " + printRows + " last row "
+ String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
log.debug("{} Finished bulk import", markerLog);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,22 @@

public abstract class BulkTest extends Test {

public static final String BACKGROUND_FAILURE_KEY = "sawBackgroundFailure";

@Override
public void visit(final State state, final RandWalkEnv env, Properties props) throws Exception {

if ((Boolean) state.get(BACKGROUND_FAILURE_KEY)) {
// fail the test early because a previous background task failed
throw new IllegalArgumentException(
"One or more previous background task failed, aborting test");
}

Setup.run(state, () -> {
try {
runLater(state, env);
} catch (Throwable ex) {
state.set(BACKGROUND_FAILURE_KEY, Boolean.TRUE);
log.error(ex.toString(), ex);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
package org.apache.accumulo.testing.randomwalk.bulk;

import java.net.InetAddress;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ThreadPoolExecutor;

import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.iterators.LongCombiner;
import org.apache.accumulo.core.iterators.user.SummingCombiner;
import org.apache.accumulo.core.util.threads.ThreadPools;
Expand All @@ -52,14 +54,17 @@ public void visit(State state, RandWalkEnv env, Properties props) throws Excepti
IteratorSetting is = new IteratorSetting(10, SummingCombiner.class);
SummingCombiner.setEncodingType(is, LongCombiner.Type.STRING);
SummingCombiner.setCombineAllColumns(is, true);
tableOps.create(getTableName(), new NewTableConfiguration().attachIterator(is));
var tableProps = Map.of(Property.TABLE_BULK_MAX_TABLET_FILES.getKey(), "1000");

tableOps.create(getTableName(),
new NewTableConfiguration().attachIterator(is).setProperties(tableProps));
}
} catch (TableExistsException ex) {
// expected if there are multiple walkers
}
state.setRandom(env.getRandom());
state.set("fs", FileSystem.get(env.getHadoopConfiguration()));
state.set("bulkImportSuccess", "true");
state.set(BulkTest.BACKGROUND_FAILURE_KEY, Boolean.FALSE);
BulkPlusOne.counter.set(0L);
ThreadPoolExecutor e = ThreadPools.getServerThreadPools().getPoolBuilder("bulkImportPool")
.numCoreThreads(MAX_POOL_SIZE).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,17 @@ public void visit(State state, RandWalkEnv env, Properties props) throws Excepti
lastSize = size;
threadPool.awaitTermination(10, TimeUnit.SECONDS);
}
if (!"true".equals(state.get("bulkImportSuccess"))) {
log.info("Not verifying bulk import test due to import failures");
return;

boolean errorFound = false;

if ((Boolean) state.get(BulkTest.BACKGROUND_FAILURE_KEY)) {
log.error("One or more background task failed");
errorFound = true;
}

String user = env.getAccumuloClient().whoami();
Authorizations auths = env.getAccumuloClient().securityOperations().getUserAuthorizations(user);
RowIterator rowIter;
boolean errorFound = false;
try (Scanner scanner = env.getAccumuloClient().createScanner(Setup.getTableName(), auths)) {
scanner.fetchColumnFamily(BulkPlusOne.CHECK_COLUMN_FAMILY);
for (Entry<Key,Value> entry : scanner) {
Expand Down