From d73e0fc1d0a6523bbf58ed2e5ced7b3ce3f071f1 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 29 Aug 2025 18:14:48 +0000 Subject: [PATCH] fixes file count issue w/ bulk import random walk test Running bulk import test failed with the following error. ``` 2025-08-29T00:44:43,285 [randomwalk.bulk.BulkMinusOne] ERROR: java.lang.IllegalStateException: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: The file hdfs://localhost:8020/tmp/bulk_ab3612b1-ef53-4cd5-8c28 -7dd75de47c87/part_0.rf attempted to import to 122 tablets. Max tablets allowed set to 100 java.lang.IllegalStateException: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: The file hdfs://localhost:8020/tmp/bulk_ab3612b1-ef53-4cd5-8c28-7dd75de47c87/part_0.rf attempted to import to 122 tablets. Max tablets allowed set to 100 at org.apache.accumulo.core.clientImpl.bulk.BulkImport.computeFileToTabletMappings(BulkImport.java:591) ~[accumulo-testing-shaded.jar:?] at org.apache.accumulo.core.clientImpl.bulk.BulkImport.computeMappingFromFiles(BulkImport.java:499) ~[accumulo-testing-shaded.jar:?] at org.apache.accumulo.core.clientImpl.bulk.BulkImport.load(BulkImport.java:155) ~[accumulo-testing-shaded.jar:?] at org.apache.accumulo.testing.randomwalk.bulk.BulkPlusOne.bulkLoadLots(BulkPlusOne.java:98) ~[accumulo-testing-shaded.jar:?] at org.apache.accumulo.testing.randomwalk.bulk.BulkMinusOne.runLater(BulkMinusOne.java:34) ~[accumulo-testing-shaded.jar:?] at org.apache.accumulo.testing.randomwalk.bulk.BulkTest.lambda$visit$0(BulkTest.java:33) ~[accumulo-testing-shaded.jar:?] at org.apache.accumulo.core.trace.TraceWrappedRunnable.run(TraceWrappedRunnable.java:52) [accumulo-testing-shaded.jar:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at org.apache.accumulo.core.trace.TraceWrappedRunnable.run(TraceWrappedRunnable.java:52) [accumulo-testing-shaded.jar:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?] at org.apache.accumulo.core.trace.TraceWrappedRunnable.run(TraceWrappedRunnable.java:52) [accumulo-testing-shaded.jar:?] at java.lang.Thread.run(Thread.java:840) [?:?] Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: The file hdfs://localhost:8020/tmp/bulk_ab3612b1-ef53-4cd5-8c28-7dd75de47c87/part_0.rf attempted to import to 122 tablets. Max tablets allowed set to 100 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) ~[?:?] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[?:?] at org.apache.accumulo.core.clientImpl.bulk.BulkImport.computeFileToTabletMappings(BulkImport.java:585) ~[accumulo-testing-shaded.jar:?] ... 13 more Caused by: java.lang.IllegalArgumentException: The file hdfs://localhost:8020/tmp/bulk_ab3612b1-ef53-4cd5-8c28-7dd75de47c87/part_0.rf attempted to import to 122 tablets. Max tablets allowed set to 100 at org.apache.accumulo.core.clientImpl.bulk.BulkImport.checkTabletCount(BulkImport.java:627) ~[accumulo-testing-shaded.jar:?] at org.apache.accumulo.core.clientImpl.bulk.BulkImport.lambda$computeFileToTabletMappings$7(BulkImport.java:563) ~[accumulo-testing-shaded.jar:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) ~[?:?] ... 5 more ``` To avoid this adjusted the tablet setting for the max number of files that can go to a single tablet to 1000. Also fixed the following problems with the test code that made this problem harder to debug. * The test schedules lots of background task, but keeps running when they fail. Made the test exit when it notices a background test failed. This changes required making a map synchronized. * Some log messages related to the test bulk import code logged a marker count that correlates the bulk import w/ data in the table. This marker count is really important for tracking down bugs w/ missing data, but only a subet of log messages included it. Added the marker count to all log messages since multiple threads log messages are interleaved in test output. Since the test did not immediataly fail when the bulk import call failed, I assumed all bulk imports were succesful and accumulo lost data. Looked down the wrong path for a bit, hoping these changes help avoid that in the future. --- .../accumulo/testing/randomwalk/Module.java | 6 ++++-- .../accumulo/testing/randomwalk/State.java | 18 ++++++++++++------ .../testing/randomwalk/bulk/BulkPlusOne.java | 15 ++++++++------- .../testing/randomwalk/bulk/BulkTest.java | 10 ++++++++++ .../testing/randomwalk/bulk/Setup.java | 9 +++++++-- .../testing/randomwalk/bulk/Verify.java | 10 ++++++---- 6 files changed, 47 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java b/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java index aec30a30..bb78a5b8 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java @@ -29,6 +29,7 @@ import java.util.Properties; import java.util.Random; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -342,8 +343,9 @@ public void visit(final State state, final RandWalkEnv env, Properties props) th log.debug(" " + entry.getKey() + ": " + entry.getValue()); } log.debug("State information"); - for (String key : new TreeSet<>(state.getMap().keySet())) { - Object value = state.getMap().get(key); + var stateSnapshot = new TreeMap<>(state.getMap()); + for (String key : stateSnapshot.keySet()) { + Object value = stateSnapshot.get(key); String logMsg = " " + key + ": "; if (value == null) logMsg += "null"; diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/State.java b/src/main/java/org/apache/accumulo/testing/randomwalk/State.java index c79a7256..3125e868 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/State.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/State.java @@ -19,8 +19,10 @@ package org.apache.accumulo.testing.randomwalk; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; /** @@ -28,7 +30,7 @@ */ public class State { - private final HashMap stateMap = new HashMap<>(); + private final Map stateMap = Collections.synchronizedMap(new HashMap<>()); private final List tables = new ArrayList<>(); private final List namespaces = new ArrayList<>(); private final List users = new ArrayList<>(); @@ -80,10 +82,12 @@ public void remove(String key) { * @throws RuntimeException if state object is not present */ public Object get(String key) { - if (!stateMap.containsKey(key)) { - throw new RuntimeException("State does not contain " + key); + synchronized (stateMap) { + if (!stateMap.containsKey(key)) { + throw new RuntimeException("State does not contain " + key); + } + return stateMap.get(key); } - return stateMap.get(key); } public List getTableNames() { @@ -135,8 +139,10 @@ public Object getOkIfAbsent(String key) { * * @return state map */ - HashMap getMap() { - return stateMap; + Map getMap() { + synchronized (stateMap) { + return Map.copyOf(stateMap); + } } /** diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java index 39d3f5f9..7287da22 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java @@ -56,9 +56,12 @@ public class BulkPlusOne extends BulkImportTest { private static final Value ONE = new Value("1".getBytes()); static void bulkLoadLots(Logger log, State state, RandWalkEnv env, Value value) throws Exception { + String markerColumnQualifier = String.format("%07d", counter.incrementAndGet()); + String markerLog = "marker:" + markerColumnQualifier; + final FileSystem fs = (FileSystem) state.get("fs"); final Path dir = new Path(fs.getUri() + "/tmp", "bulk_" + UUID.randomUUID()); - log.debug("Bulk loading from {}", dir); + log.debug("{} bulk loading from {}", markerLog, dir); final int parts = env.getRandom().nextInt(10) + 1; // The set created below should always contain 0. So its very important that zero is first in @@ -70,9 +73,8 @@ static void bulkLoadLots(Logger log, State state, RandWalkEnv env, Value value) List printRows = startRows.stream().map(row -> String.format(FMT, row)).collect(Collectors.toList()); - String markerColumnQualifier = String.format("%07d", counter.incrementAndGet()); - log.debug("preparing bulk files with start rows " + printRows + " last row " - + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier); + log.debug("{} preparing bulk files with start rows {} last row {} marker ", markerLog, + printRows, String.format(FMT, LOTS - 1)); List rows = new ArrayList<>(startRows); rows.add(LOTS); @@ -80,7 +82,7 @@ static void bulkLoadLots(Logger log, State state, RandWalkEnv env, Value value) for (int i = 0; i < parts; i++) { String fileName = dir + "/" + String.format("part_%d.rf", i); - log.debug("Creating {}", fileName); + log.debug("{} creating {}", markerLog, fileName); try (RFileWriter writer = RFile.newWriter().to(fileName).withFileSystem(fs).build()) { writer.startDefaultLocalityGroup(); int start = rows.get(i); @@ -97,8 +99,7 @@ static void bulkLoadLots(Logger log, State state, RandWalkEnv env, Value value) env.getAccumuloClient().tableOperations().importDirectory(dir.toString()) .to(Setup.getTableName()).tableTime(true).load(); fs.delete(dir, true); - log.debug("Finished bulk import, start rows " + printRows + " last row " - + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier); + log.debug("{} Finished bulk import", markerLog); } @Override diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java index 26e8c20c..f584b5d4 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkTest.java @@ -26,12 +26,22 @@ public abstract class BulkTest extends Test { + public static final String BACKGROUND_FAILURE_KEY = "sawBackgroundFailure"; + @Override public void visit(final State state, final RandWalkEnv env, Properties props) throws Exception { + + if ((Boolean) state.get(BACKGROUND_FAILURE_KEY)) { + // fail the test early because a previous background task failed + throw new IllegalArgumentException( + "One or more previous background task failed, aborting test"); + } + Setup.run(state, () -> { try { runLater(state, env); } catch (Throwable ex) { + state.set(BACKGROUND_FAILURE_KEY, Boolean.TRUE); log.error(ex.toString(), ex); } }); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java index 3c6d46a3..3fd2d8de 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java @@ -19,6 +19,7 @@ package org.apache.accumulo.testing.randomwalk.bulk; import java.net.InetAddress; +import java.util.Map; import java.util.Properties; import java.util.concurrent.ThreadPoolExecutor; @@ -26,6 +27,7 @@ import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.iterators.LongCombiner; import org.apache.accumulo.core.iterators.user.SummingCombiner; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -52,14 +54,17 @@ public void visit(State state, RandWalkEnv env, Properties props) throws Excepti IteratorSetting is = new IteratorSetting(10, SummingCombiner.class); SummingCombiner.setEncodingType(is, LongCombiner.Type.STRING); SummingCombiner.setCombineAllColumns(is, true); - tableOps.create(getTableName(), new NewTableConfiguration().attachIterator(is)); + var tableProps = Map.of(Property.TABLE_BULK_MAX_TABLET_FILES.getKey(), "1000"); + + tableOps.create(getTableName(), + new NewTableConfiguration().attachIterator(is).setProperties(tableProps)); } } catch (TableExistsException ex) { // expected if there are multiple walkers } state.setRandom(env.getRandom()); state.set("fs", FileSystem.get(env.getHadoopConfiguration())); - state.set("bulkImportSuccess", "true"); + state.set(BulkTest.BACKGROUND_FAILURE_KEY, Boolean.FALSE); BulkPlusOne.counter.set(0L); ThreadPoolExecutor e = ThreadPools.getServerThreadPools().getPoolBuilder("bulkImportPool") .numCoreThreads(MAX_POOL_SIZE).build(); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java index 8d611499..6db6ab5e 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java @@ -59,15 +59,17 @@ public void visit(State state, RandWalkEnv env, Properties props) throws Excepti lastSize = size; threadPool.awaitTermination(10, TimeUnit.SECONDS); } - if (!"true".equals(state.get("bulkImportSuccess"))) { - log.info("Not verifying bulk import test due to import failures"); - return; + + boolean errorFound = false; + + if ((Boolean) state.get(BulkTest.BACKGROUND_FAILURE_KEY)) { + log.error("One or more background task failed"); + errorFound = true; } String user = env.getAccumuloClient().whoami(); Authorizations auths = env.getAccumuloClient().securityOperations().getUserAuthorizations(user); RowIterator rowIter; - boolean errorFound = false; try (Scanner scanner = env.getAccumuloClient().createScanner(Setup.getTableName(), auths)) { scanner.fetchColumnFamily(BulkPlusOne.CHECK_COLUMN_FAMILY); for (Entry entry : scanner) {