diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java b/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java index aec30a30..bb78a5b8 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java @@ -29,6 +29,7 @@ import java.util.Properties; import java.util.Random; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -342,8 +343,9 @@ public void visit(final State state, final RandWalkEnv env, Properties props) th log.debug(" " + entry.getKey() + ": " + entry.getValue()); } log.debug("State information"); - for (String key : new TreeSet<>(state.getMap().keySet())) { - Object value = state.getMap().get(key); + var stateSnapshot = new TreeMap<>(state.getMap()); + for (String key : stateSnapshot.keySet()) { + Object value = stateSnapshot.get(key); String logMsg = " " + key + ": "; if (value == null) logMsg += "null"; diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/State.java b/src/main/java/org/apache/accumulo/testing/randomwalk/State.java index c79a7256..3125e868 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/State.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/State.java @@ -19,8 +19,10 @@ package org.apache.accumulo.testing.randomwalk; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; /** @@ -28,7 +30,7 @@ */ public class State { - private final HashMap stateMap = new HashMap<>(); + private final Map stateMap = Collections.synchronizedMap(new HashMap<>()); private final List tables = new ArrayList<>(); private final List namespaces = new ArrayList<>(); private final List users = new ArrayList<>(); @@ -80,10 +82,12 @@ public void remove(String key) { * @throws RuntimeException if state object is not present */ public Object get(String key) { - if (!stateMap.containsKey(key)) { - throw new RuntimeException("State does not contain " + key); + synchronized (stateMap) { + if (!stateMap.containsKey(key)) { + throw new RuntimeException("State does not contain " + key); + } + return stateMap.get(key); } - return stateMap.get(key); } public List getTableNames() { @@ -135,8 +139,10 @@ public Object getOkIfAbsent(String key) { * * @return state map */ - HashMap getMap() { - return stateMap; + Map getMap() { + synchronized (stateMap) { + return Map.copyOf(stateMap); + } } /** diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java index 39d3f5f9..7287da22 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java @@ -56,9 +56,12 @@ public class BulkPlusOne extends BulkImportTest { private static final Value ONE = new Value("1".getBytes()); static void bulkLoadLots(Logger log, State state, RandWalkEnv env, Value value) throws Exception { + String markerColumnQualifier = String.format("%07d", counter.incrementAndGet()); + String markerLog = "marker:" + markerColumnQualifier; + final FileSystem fs = (FileSystem) state.get("fs"); final Path dir = new Path(fs.getUri() + "/tmp", "bulk_" + UUID.randomUUID()); - log.debug("Bulk loading from {}", dir); + log.debug("{} bulk loading from {}", markerLog, dir); final int parts = env.getRandom().nextInt(10) + 1; // The set created below should always contain 0. So its very important that zero is first in @@ -70,9 +73,8 @@ static void bulkLoadLots(Logger log, State state, RandWalkEnv env, Value value) List printRows = startRows.stream().map(row -> String.format(FMT, row)).collect(Collectors.toList()); - String markerColumnQualifier = String.format("%07d", counter.incrementAndGet()); - log.debug("preparing bulk files with start rows " + printRows + " last row " - + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier); + log.debug("{} preparing bulk files with start rows {} last row {} marker ", markerLog, + printRows, String.format(FMT, LOTS - 1)); List rows = new ArrayList<>(startRows); rows.add(LOTS); @@ -80,7 +82,7 @@ static void bulkLoadLots(Logger log, State state, RandWalkEnv env, Value value) for (int i = 0; i < parts; i++) { String fileName = dir + "/" + String.format("part_%d.rf", i); - log.debug("Creating {}", fileName); + log.debug("{} creating {}", markerLog, fileName); try (RFileWriter writer = RFile.newWriter().to(fileName).withFileSystem(fs).build()) { writer.startDefaultLocalityGroup(); int start = rows.get(i); @@ -97,8 +99,7 @@ static void bulkLoadLots(Logger log, State state, RandWalkEnv env, Value value) env.getAccumuloClient().tableOperations().importDirectory(dir.toString()) .to(Setup.getTableName()).tableTime(true).load(); fs.delete(dir, true); - log.debug("Finished bulk import, start rows " + printRows + " last row " - + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier); + log.debug("{} Finished bulk import", markerLog); } @Override diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java index 26e8c20c..f584b5d4 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java @@ -26,12 +26,22 @@ public abstract class BulkTest extends Test { + public static final String BACKGROUND_FAILURE_KEY = "sawBackgroundFailure"; + @Override public void visit(final State state, final RandWalkEnv env, Properties props) throws Exception { + + if ((Boolean) state.get(BACKGROUND_FAILURE_KEY)) { + // fail the test early because a previous background task failed + throw new IllegalArgumentException( + "One or more previous background task failed, aborting test"); + } + Setup.run(state, () -> { try { runLater(state, env); } catch (Throwable ex) { + state.set(BACKGROUND_FAILURE_KEY, Boolean.TRUE); log.error(ex.toString(), ex); } }); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java index 3c6d46a3..3fd2d8de 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java @@ -19,6 +19,7 @@ package org.apache.accumulo.testing.randomwalk.bulk; import java.net.InetAddress; +import java.util.Map; import java.util.Properties; import java.util.concurrent.ThreadPoolExecutor; @@ -26,6 +27,7 @@ import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.iterators.LongCombiner; import org.apache.accumulo.core.iterators.user.SummingCombiner; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -52,14 +54,17 @@ public void visit(State state, RandWalkEnv env, Properties props) throws Excepti IteratorSetting is = new IteratorSetting(10, SummingCombiner.class); SummingCombiner.setEncodingType(is, LongCombiner.Type.STRING); SummingCombiner.setCombineAllColumns(is, true); - tableOps.create(getTableName(), new NewTableConfiguration().attachIterator(is)); + var tableProps = Map.of(Property.TABLE_BULK_MAX_TABLET_FILES.getKey(), "1000"); + + tableOps.create(getTableName(), + new NewTableConfiguration().attachIterator(is).setProperties(tableProps)); } } catch (TableExistsException ex) { // expected if there are multiple walkers } state.setRandom(env.getRandom()); state.set("fs", FileSystem.get(env.getHadoopConfiguration())); - state.set("bulkImportSuccess", "true"); + state.set(BulkTest.BACKGROUND_FAILURE_KEY, Boolean.FALSE); BulkPlusOne.counter.set(0L); ThreadPoolExecutor e = ThreadPools.getServerThreadPools().getPoolBuilder("bulkImportPool") .numCoreThreads(MAX_POOL_SIZE).build(); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java index 8d611499..6db6ab5e 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java @@ -59,15 +59,17 @@ public void visit(State state, RandWalkEnv env, Properties props) throws Excepti lastSize = size; threadPool.awaitTermination(10, TimeUnit.SECONDS); } - if (!"true".equals(state.get("bulkImportSuccess"))) { - log.info("Not verifying bulk import test due to import failures"); - return; + + boolean errorFound = false; + + if ((Boolean) state.get(BulkTest.BACKGROUND_FAILURE_KEY)) { + log.error("One or more background task failed"); + errorFound = true; } String user = env.getAccumuloClient().whoami(); Authorizations auths = env.getAccumuloClient().securityOperations().getUserAuthorizations(user); RowIterator rowIter; - boolean errorFound = false; try (Scanner scanner = env.getAccumuloClient().createScanner(Setup.getTableName(), auths)) { scanner.fetchColumnFamily(BulkPlusOne.CHECK_COLUMN_FAMILY); for (Entry entry : scanner) {