diff --git a/core/src/main/java/org/apache/accumulo/core/classloader/ClassLoaderUtil.java b/core/src/main/java/org/apache/accumulo/core/classloader/ClassLoaderUtil.java index 7af94627b0c..31e69e0a20e 100644 --- a/core/src/main/java/org/apache/accumulo/core/classloader/ClassLoaderUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/classloader/ClassLoaderUtil.java @@ -18,9 +18,12 @@ */ package org.apache.accumulo.core.classloader; +import java.io.IOException; + import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.spi.common.ContextClassLoaderFactory; +import org.apache.accumulo.core.spi.common.ContextClassLoaderFactory.ContextClassLoaderException; import org.apache.accumulo.core.util.ConfigurationImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,11 +77,15 @@ public static synchronized void resetContextFactoryForTests() { } @SuppressWarnings("deprecation") - public static ClassLoader getClassLoader(String context) { + public static ClassLoader getClassLoader(String context) throws ContextClassLoaderException { if (context != null && !context.isEmpty()) { return FACTORY.getClassLoader(context); } else { - return org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader.getClassLoader(); + try { + return org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader.getClassLoader(); + } catch (IOException e) { + throw new ContextClassLoaderException(context, e); + } } } @@ -92,7 +99,7 @@ public static boolean isValidContext(String context) { return false; } return true; - } catch (RuntimeException e) { + } catch (ContextClassLoaderException e) { LOG.debug("Context {} is not valid.", context, e); return false; } @@ -103,7 +110,11 @@ public static boolean isValidContext(String context) { public static Class loadClass(String context, String className, Class extension) throws ClassNotFoundException { - return getClassLoader(context).loadClass(className).asSubclass(extension); + try { + return getClassLoader(context).loadClass(className).asSubclass(extension); + } catch (ContextClassLoaderException e) { + throw new ClassNotFoundException("Error loading class from context: " + context, e); + } } public static Class loadClass(String className, Class extension) diff --git a/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java b/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java index 1bd5fcb6701..ee5634c714b 100644 --- a/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java @@ -20,6 +20,7 @@ import static java.util.concurrent.TimeUnit.MINUTES; +import java.io.IOException; import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledFuture; @@ -97,8 +98,12 @@ private static void removeUnusedContexts(Set contextsInUse) { @SuppressWarnings("deprecation") @Override - public ClassLoader getClassLoader(String contextName) { - return org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader - .getContextClassLoader(contextName); + public ClassLoader getClassLoader(String contextName) throws ContextClassLoaderException { + try { + return org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader + .getContextClassLoader(contextName); + } catch (RuntimeException | IOException e) { + throw new ContextClassLoaderException(contextName, e); + } } } diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java index 8a646219511..0209e8deece 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java @@ -242,7 +242,7 @@ public Iterator> iterator() { IteratorBuilder.builder(tm.values()).opts(serverSideIteratorOptions).env(iterEnv).build(); skvi = IteratorConfigUtil.loadIterators(smi, ib); - } catch (IOException e) { + } catch (IOException | ReflectiveOperationException e) { throw new RuntimeException(e); } diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index e34730ff360..9266a53e510 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -320,7 +320,7 @@ public Iterator> iterator() { .opts(serverSideIteratorOptions).env(iterEnv).build(); iterator = IteratorConfigUtil.loadIterators(iterator, iteratorBuilder); } - } catch (IOException e) { + } catch (IOException | ReflectiveOperationException e) { throw new RuntimeException(e); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index c2bcabe01e6..e5b059a80f7 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -163,6 +163,10 @@ private void ensureOpen() { } } + protected boolean isClosed() { + return closed; + } + private ScanServerSelector createScanServerSelector() { String clazz = ClientProperty.SCAN_SERVER_SELECTOR.getValue(info.getProperties()); try { diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java index d64e0181487..7ec99bd9c49 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java @@ -137,8 +137,8 @@ public Entry next() { } } - private void nextTablet() - throws TableNotFoundException, AccumuloException, IOException, AccumuloSecurityException { + private void nextTablet() throws TableNotFoundException, AccumuloException, IOException, + AccumuloSecurityException, ReflectiveOperationException { Range nextRange; @@ -204,8 +204,8 @@ private TabletMetadata getTabletFiles(Range nextRange) { } private SortedKeyValueIterator createIterator(KeyExtent extent, - Collection absFiles) - throws TableNotFoundException, AccumuloException, IOException, AccumuloSecurityException { + Collection absFiles) throws TableNotFoundException, AccumuloException, + IOException, AccumuloSecurityException, ReflectiveOperationException { // possible race condition here, if table is renamed String tableName = context.getTableName(tableId); diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ConfigCheckUtil.java b/core/src/main/java/org/apache/accumulo/core/conf/ConfigCheckUtil.java index eba5cc420f3..db5cb305a6d 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/ConfigCheckUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/ConfigCheckUtil.java @@ -18,7 +18,6 @@ */ package org.apache.accumulo.core.conf; -import java.io.IOException; import java.util.Map.Entry; import java.util.Objects; @@ -160,7 +159,7 @@ private static void verifyValidClassName(String confOption, String className, Class requiredBaseClass) { try { ConfigurationTypeHelper.getClassInstance(null, className, requiredBaseClass); - } catch (IOException | ReflectiveOperationException e) { + } catch (ReflectiveOperationException e) { fatal(confOption + " has an invalid class name: " + className); } catch (ClassCastException e) { fatal(confOption + " must implement " + requiredBaseClass diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationTypeHelper.java b/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationTypeHelper.java index cd286c1cd71..ae0d4088e5c 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationTypeHelper.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationTypeHelper.java @@ -24,7 +24,6 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; -import java.io.IOException; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -175,7 +174,7 @@ public static T getClassInstance(String context, String clazzName, Class try { instance = getClassInstance(context, clazzName, base); - } catch (RuntimeException | IOException | ReflectiveOperationException e) { + } catch (RuntimeException | ReflectiveOperationException e) { log.error("Failed to load class {} in classloader context {}", clazzName, context, e); } @@ -196,7 +195,7 @@ public static T getClassInstance(String context, String clazzName, Class * @return a new instance of the class */ public static T getClassInstance(String context, String clazzName, Class base) - throws IOException, ReflectiveOperationException { + throws ReflectiveOperationException { T instance; Class clazz = ClassLoaderUtil.loadClass(context, clazzName, base); diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index d9d94cb3677..7c0375aa666 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -1556,6 +1556,34 @@ public enum Property { COMPACTOR_CLIENTPORT("compactor.port.client", "9133", PropertyType.PORT, "The port used for handling client connections on the compactor servers.", "2.1.0"), @Experimental + COMPACTOR_FAILURE_BACKOFF_THRESHOLD("compactor.failure.backoff.threshold", "3", + PropertyType.COUNT, + "The number of consecutive failures that must occur before the Compactor starts to back off" + + " processing compactions.", + "2.1.4"), + @Experimental + COMPACTOR_FAILURE_BACKOFF_INTERVAL("compactor.failure.backoff.interval", "0", + PropertyType.TIMEDURATION, + "The time basis for computing the wait time for compaction failure backoff. A value of zero disables" + + " the backoff feature. When a non-zero value is supplied, then after compactor.failure.backoff.threshold" + + " failures have occurred, the compactor will wait compactor.failure.backoff.interval * the number of" + + " failures seconds before executing the next compaction. For example, if this value is 10s, then after" + + " three failures the Compactor will wait 30s before starting the next compaction. If the compaction fails" + + " again, then it will wait 40s before starting the next compaction.", + "2.1.4"), + @Experimental + COMPACTOR_FAILURE_BACKOFF_RESET("compactor.failure.backoff.reset", "10m", + PropertyType.TIMEDURATION, + "The maximum amount of time that the compactor will wait before executing the next compaction. When this" + + " time limit has been reached, the failures are cleared.", + "2.1.4"), + @Experimental + COMPACTOR_FAILURE_TERMINATION_THRESHOLD("compactor.failure.termination.threshold", "0", + PropertyType.COUNT, + "The number of consecutive failures at which the Compactor exits and the process terminates. A zero" + + " value disables this feature.", + "2.1.4"), + @Experimental COMPACTOR_MINTHREADS("compactor.threads.minimum", "4", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests.", "2.1.0"), @Experimental diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/IteratorConfigUtil.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/IteratorConfigUtil.java index 93c098d0efd..5758411e04c 100644 --- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/IteratorConfigUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/IteratorConfigUtil.java @@ -174,7 +174,8 @@ public static IteratorBuilder.IteratorBuilderEnv loadIterConf(IteratorScope scop */ public static SortedKeyValueIterator convertItersAndLoad(IteratorScope scope, SortedKeyValueIterator source, AccumuloConfiguration conf, - List iterators, IteratorEnvironment env) throws IOException { + List iterators, IteratorEnvironment env) + throws IOException, ReflectiveOperationException { List ssiList = new ArrayList<>(); Map> ssio = new HashMap<>(); @@ -194,7 +195,7 @@ public static SortedKeyValueIterator convertItersAndLoad(IteratorScop */ public static SortedKeyValueIterator loadIterators(SortedKeyValueIterator source, IteratorBuilder iteratorBuilder) - throws IOException { + throws IOException, ReflectiveOperationException { SortedKeyValueIterator prev = source; final boolean useClassLoader = iteratorBuilder.useAccumuloClassLoader; Map>> classCache = new HashMap<>(); @@ -228,6 +229,7 @@ public static SortedKeyValueIterator convertItersAndLoad(IteratorScop } catch (ReflectiveOperationException e) { log.error("Failed to load iterator {}, for table {}, from context {}", iterInfo.className, iteratorBuilder.iteratorEnvironment.getTableId(), iteratorBuilder.context, e); + // This has to be a RuntimeException to be handled properly to fail the scan throw new RuntimeException(e); } } diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/conf/ColumnToClassMapping.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/conf/ColumnToClassMapping.java index 1e6ff268b0f..419aecc5cbf 100644 --- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/conf/ColumnToClassMapping.java +++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/conf/ColumnToClassMapping.java @@ -18,7 +18,6 @@ */ package org.apache.accumulo.core.iteratorsImpl.conf; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -44,12 +43,12 @@ public ColumnToClassMapping() { } public ColumnToClassMapping(Map objectStrings, Class c) - throws ReflectiveOperationException, IOException { + throws ReflectiveOperationException { this(objectStrings, c, null); } public ColumnToClassMapping(Map objectStrings, Class c, - String context) throws ReflectiveOperationException, IOException { + String context) throws ReflectiveOperationException { this(); for (Entry entry : objectStrings.entrySet()) { diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java index c98a2f5c61f..7ce7a023df6 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java @@ -73,6 +73,42 @@ * FunctionCounter * Number of entries written by all threads performing compactions * + * + * N/A + * N/A + * {@value #METRICS_COMPACTOR_COMPACTIONS_CANCELLED} + * FunctionCounter + * Number of compactions cancelled on a compactor + * + * + * N/A + * N/A + * {@value #METRICS_COMPACTOR_COMPACTIONS_COMPLETED} + * FunctionCounter + * Number of compactions completed on a compactor + * + * + * N/A + * N/A + * {@value #METRICS_COMPACTOR_COMPACTIONS_FAILED} + * FunctionCounter + * Number of compactions failed on a compactor + * + * + * N/A + * N/A + * {@value #METRICS_COMPACTOR_FAILURES_CONSECUTIVE} + * Gauge + * Number of consecutive compaction failures on a compactor + * + * + * N/A + * N/A + * {@value #METRICS_COMPACTOR_FAILURES_TERMINATION} + * Gauge + * Number of Compactors terminated due to consecutive failures. Process exits after this metric + * is incremented, so it's not guaranteed to be seen. + * * * * currentFateOps @@ -625,6 +661,12 @@ public interface MetricsProducer { String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + "majc.stuck"; String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX + "entries.read"; String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX + "entries.written"; + String METRICS_COMPACTOR_COMPACTIONS_CANCELLED = METRICS_COMPACTOR_PREFIX + "majc.cancelled"; + String METRICS_COMPACTOR_COMPACTIONS_COMPLETED = METRICS_COMPACTOR_PREFIX + "majc.completed"; + String METRICS_COMPACTOR_COMPACTIONS_FAILED = METRICS_COMPACTOR_PREFIX + "majc.failed"; + String METRICS_COMPACTOR_FAILURES_CONSECUTIVE = + METRICS_COMPACTOR_PREFIX + "majc.failures.consecutive"; + String METRICS_COMPACTOR_FAILURES_TERMINATION = METRICS_COMPACTOR_PREFIX + "terminated"; String METRICS_FATE_PREFIX = "accumulo.fate."; String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + "ops.in.progress.by.type"; diff --git a/core/src/main/java/org/apache/accumulo/core/spi/common/ContextClassLoaderFactory.java b/core/src/main/java/org/apache/accumulo/core/spi/common/ContextClassLoaderFactory.java index 3d9c18683f5..950b0a59e1c 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/common/ContextClassLoaderFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/common/ContextClassLoaderFactory.java @@ -47,6 +47,26 @@ */ public interface ContextClassLoaderFactory { + class ContextClassLoaderException extends Exception { + + private static final long serialVersionUID = 1L; + private static final String msg = "Error getting classloader for context: "; + + public ContextClassLoaderException(String context, Throwable cause, boolean enableSuppression, + boolean writableStackTrace) { + super(msg + context, cause, enableSuppression, writableStackTrace); + } + + public ContextClassLoaderException(String context, Throwable cause) { + super(msg + context, cause); + } + + public ContextClassLoaderException(String context) { + super(msg + context); + } + + } + /** * Pass the service environment to allow for additional class loader configuration * @@ -57,8 +77,8 @@ default void init(ContextClassLoaderEnvironment env) {} /** * Get the class loader for the given contextName. Callers should not cache the ClassLoader result * as it may change if/when the ClassLoader reloads. Implementations should throw a - * RuntimeException of some type (such as IllegalArgumentException) if the provided contextName is - * not supported or fails to be constructed. + * ContextClassLoaderException if the provided contextName is not supported or fails to be + * constructed. * * @param contextName the name of the context that represents a class loader that is managed by * this factory. Currently, Accumulo will only call this method for non-null and non-empty @@ -66,5 +86,5 @@ default void init(ContextClassLoaderEnvironment env) {} * consulting this plugin. * @return the class loader for the given contextName */ - ClassLoader getClassLoader(String contextName); + ClassLoader getClassLoader(String contextName) throws ContextClassLoaderException; } diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummarizerFactory.java b/core/src/main/java/org/apache/accumulo/core/summary/SummarizerFactory.java index 48b123b3a7f..2ab51b6323d 100644 --- a/core/src/main/java/org/apache/accumulo/core/summary/SummarizerFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/summary/SummarizerFactory.java @@ -18,8 +18,6 @@ */ package org.apache.accumulo.core.summary; -import java.io.IOException; - import org.apache.accumulo.core.classloader.ClassLoaderUtil; import org.apache.accumulo.core.client.summary.Summarizer; import org.apache.accumulo.core.client.summary.SummarizerConfiguration; @@ -41,8 +39,7 @@ public SummarizerFactory(AccumuloConfiguration tableConfig) { this.context = ClassLoaderUtil.tableContext(tableConfig); } - private Summarizer newSummarizer(String classname) - throws IOException, ReflectiveOperationException { + private Summarizer newSummarizer(String classname) throws ReflectiveOperationException { if (classloader != null) { return classloader.loadClass(classname).asSubclass(Summarizer.class).getDeclaredConstructor() .newInstance(); @@ -55,7 +52,7 @@ private Summarizer newSummarizer(String classname) public Summarizer getSummarizer(SummarizerConfiguration conf) { try { return newSummarizer(conf.getClassName()); - } catch (ReflectiveOperationException | IOException e) { + } catch (ReflectiveOperationException e) { throw new RuntimeException(e); } } diff --git a/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinatorService.java b/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinatorService.java index ac8e15f6e5d..a59cec5ae91 100644 --- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinatorService.java +++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinatorService.java @@ -35,7 +35,7 @@ public interface Iface { public void updateCompactionStatus(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String externalCompactionId, TCompactionStatusUpdate status, long timestamp) throws org.apache.thrift.TException; - public void compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String externalCompactionId, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent) throws org.apache.thrift.TException; + public void compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String externalCompactionId, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, java.lang.String exceptionClassName) throws org.apache.thrift.TException; public TExternalCompactionList getRunningCompactions(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials) throws org.apache.thrift.TException; @@ -53,7 +53,7 @@ public interface AsyncIface { public void updateCompactionStatus(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String externalCompactionId, TCompactionStatusUpdate status, long timestamp, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; - public void compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String externalCompactionId, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; + public void compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String externalCompactionId, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, java.lang.String exceptionClassName, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; public void getRunningCompactions(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; @@ -164,19 +164,20 @@ public void recv_updateCompactionStatus() throws org.apache.thrift.TException } @Override - public void compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String externalCompactionId, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent) throws org.apache.thrift.TException + public void compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String externalCompactionId, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, java.lang.String exceptionClassName) throws org.apache.thrift.TException { - send_compactionFailed(tinfo, credentials, externalCompactionId, extent); + send_compactionFailed(tinfo, credentials, externalCompactionId, extent, exceptionClassName); recv_compactionFailed(); } - public void send_compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String externalCompactionId, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent) throws org.apache.thrift.TException + public void send_compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String externalCompactionId, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, java.lang.String exceptionClassName) throws org.apache.thrift.TException { compactionFailed_args args = new compactionFailed_args(); args.setTinfo(tinfo); args.setCredentials(credentials); args.setExternalCompactionId(externalCompactionId); args.setExtent(extent); + args.setExceptionClassName(exceptionClassName); sendBase("compactionFailed", args); } @@ -423,9 +424,9 @@ public Void getResult() throws org.apache.thrift.TException { } @Override - public void compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String externalCompactionId, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { + public void compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String externalCompactionId, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, java.lang.String exceptionClassName, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { checkReady(); - compactionFailed_call method_call = new compactionFailed_call(tinfo, credentials, externalCompactionId, extent, resultHandler, this, ___protocolFactory, ___transport); + compactionFailed_call method_call = new compactionFailed_call(tinfo, credentials, externalCompactionId, extent, exceptionClassName, resultHandler, this, ___protocolFactory, ___transport); this.___currentMethod = method_call; ___manager.call(method_call); } @@ -435,12 +436,14 @@ public static class compactionFailed_call extends org.apache.thrift.async.TAsync private org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials; private java.lang.String externalCompactionId; private org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent; - public compactionFailed_call(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String externalCompactionId, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + private java.lang.String exceptionClassName; + public compactionFailed_call(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String externalCompactionId, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, java.lang.String exceptionClassName, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { super(client, protocolFactory, transport, resultHandler, false); this.tinfo = tinfo; this.credentials = credentials; this.externalCompactionId = externalCompactionId; this.extent = extent; + this.exceptionClassName = exceptionClassName; } @Override @@ -451,6 +454,7 @@ public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apa args.setCredentials(credentials); args.setExternalCompactionId(externalCompactionId); args.setExtent(extent); + args.setExceptionClassName(exceptionClassName); args.write(prot); prot.writeMessageEnd(); } @@ -715,7 +719,7 @@ protected boolean rethrowUnhandledExceptions() { @Override public compactionFailed_result getResult(I iface, compactionFailed_args args) throws org.apache.thrift.TException { compactionFailed_result result = new compactionFailed_result(); - iface.compactionFailed(args.tinfo, args.credentials, args.externalCompactionId, args.extent); + iface.compactionFailed(args.tinfo, args.credentials, args.externalCompactionId, args.extent, args.exceptionClassName); return result; } } @@ -1088,7 +1092,7 @@ protected boolean isOneway() { @Override public void start(I iface, compactionFailed_args args, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { - iface.compactionFailed(args.tinfo, args.credentials, args.externalCompactionId, args.extent,resultHandler); + iface.compactionFailed(args.tinfo, args.credentials, args.externalCompactionId, args.extent, args.exceptionClassName,resultHandler); } } @@ -4652,6 +4656,7 @@ public static class compactionFailed_args implements org.apache.thrift.TBase byName = new java.util.HashMap(); @@ -4690,6 +4697,8 @@ public static _Fields findByThriftId(int fieldId) { return EXTERNAL_COMPACTION_ID; case 4: // EXTENT return EXTENT; + case 5: // EXCEPTION_CLASS_NAME + return EXCEPTION_CLASS_NAME; default: return null; } @@ -4744,6 +4753,8 @@ public java.lang.String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.EXTENT, new org.apache.thrift.meta_data.FieldMetaData("extent", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent.class))); + tmpMap.put(_Fields.EXCEPTION_CLASS_NAME, new org.apache.thrift.meta_data.FieldMetaData("exceptionClassName", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(compactionFailed_args.class, metaDataMap); } @@ -4755,13 +4766,15 @@ public compactionFailed_args( org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, java.lang.String externalCompactionId, - org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent) + org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, + java.lang.String exceptionClassName) { this(); this.tinfo = tinfo; this.credentials = credentials; this.externalCompactionId = externalCompactionId; this.extent = extent; + this.exceptionClassName = exceptionClassName; } /** @@ -4780,6 +4793,9 @@ public compactionFailed_args(compactionFailed_args other) { if (other.isSetExtent()) { this.extent = new org.apache.accumulo.core.dataImpl.thrift.TKeyExtent(other.extent); } + if (other.isSetExceptionClassName()) { + this.exceptionClassName = other.exceptionClassName; + } } @Override @@ -4793,6 +4809,7 @@ public void clear() { this.credentials = null; this.externalCompactionId = null; this.extent = null; + this.exceptionClassName = null; } @org.apache.thrift.annotation.Nullable @@ -4895,6 +4912,31 @@ public void setExtentIsSet(boolean value) { } } + @org.apache.thrift.annotation.Nullable + public java.lang.String getExceptionClassName() { + return this.exceptionClassName; + } + + public compactionFailed_args setExceptionClassName(@org.apache.thrift.annotation.Nullable java.lang.String exceptionClassName) { + this.exceptionClassName = exceptionClassName; + return this; + } + + public void unsetExceptionClassName() { + this.exceptionClassName = null; + } + + /** Returns true if field exceptionClassName is set (has been assigned a value) and false otherwise */ + public boolean isSetExceptionClassName() { + return this.exceptionClassName != null; + } + + public void setExceptionClassNameIsSet(boolean value) { + if (!value) { + this.exceptionClassName = null; + } + } + @Override public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { @@ -4930,6 +4972,14 @@ public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable } break; + case EXCEPTION_CLASS_NAME: + if (value == null) { + unsetExceptionClassName(); + } else { + setExceptionClassName((java.lang.String)value); + } + break; + } } @@ -4949,6 +4999,9 @@ public java.lang.Object getFieldValue(_Fields field) { case EXTENT: return getExtent(); + case EXCEPTION_CLASS_NAME: + return getExceptionClassName(); + } throw new java.lang.IllegalStateException(); } @@ -4969,6 +5022,8 @@ public boolean isSet(_Fields field) { return isSetExternalCompactionId(); case EXTENT: return isSetExtent(); + case EXCEPTION_CLASS_NAME: + return isSetExceptionClassName(); } throw new java.lang.IllegalStateException(); } @@ -5022,6 +5077,15 @@ public boolean equals(compactionFailed_args that) { return false; } + boolean this_present_exceptionClassName = true && this.isSetExceptionClassName(); + boolean that_present_exceptionClassName = true && that.isSetExceptionClassName(); + if (this_present_exceptionClassName || that_present_exceptionClassName) { + if (!(this_present_exceptionClassName && that_present_exceptionClassName)) + return false; + if (!this.exceptionClassName.equals(that.exceptionClassName)) + return false; + } + return true; } @@ -5045,6 +5109,10 @@ public int hashCode() { if (isSetExtent()) hashCode = hashCode * 8191 + extent.hashCode(); + hashCode = hashCode * 8191 + ((isSetExceptionClassName()) ? 131071 : 524287); + if (isSetExceptionClassName()) + hashCode = hashCode * 8191 + exceptionClassName.hashCode(); + return hashCode; } @@ -5096,6 +5164,16 @@ public int compareTo(compactionFailed_args other) { return lastComparison; } } + lastComparison = java.lang.Boolean.compare(isSetExceptionClassName(), other.isSetExceptionClassName()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetExceptionClassName()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.exceptionClassName, other.exceptionClassName); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -5151,6 +5229,14 @@ public java.lang.String toString() { sb.append(this.extent); } first = false; + if (!first) sb.append(", "); + sb.append("exceptionClassName:"); + if (this.exceptionClassName == null) { + sb.append("null"); + } else { + sb.append(this.exceptionClassName); + } + first = false; sb.append(")"); return sb.toString(); } @@ -5240,6 +5326,14 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, compactionFailed_ar org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 5: // EXCEPTION_CLASS_NAME + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.exceptionClassName = iprot.readString(); + struct.setExceptionClassNameIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -5276,6 +5370,11 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, compactionFailed_a struct.extent.write(oprot); oprot.writeFieldEnd(); } + if (struct.exceptionClassName != null) { + oprot.writeFieldBegin(EXCEPTION_CLASS_NAME_FIELD_DESC); + oprot.writeString(struct.exceptionClassName); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -5307,7 +5406,10 @@ public void write(org.apache.thrift.protocol.TProtocol prot, compactionFailed_ar if (struct.isSetExtent()) { optionals.set(3); } - oprot.writeBitSet(optionals, 4); + if (struct.isSetExceptionClassName()) { + optionals.set(4); + } + oprot.writeBitSet(optionals, 5); if (struct.isSetTinfo()) { struct.tinfo.write(oprot); } @@ -5320,12 +5422,15 @@ public void write(org.apache.thrift.protocol.TProtocol prot, compactionFailed_ar if (struct.isSetExtent()) { struct.extent.write(oprot); } + if (struct.isSetExceptionClassName()) { + oprot.writeString(struct.exceptionClassName); + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, compactionFailed_args struct) throws org.apache.thrift.TException { org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet incoming = iprot.readBitSet(4); + java.util.BitSet incoming = iprot.readBitSet(5); if (incoming.get(0)) { struct.tinfo = new org.apache.accumulo.core.trace.thrift.TInfo(); struct.tinfo.read(iprot); @@ -5345,6 +5450,10 @@ public void read(org.apache.thrift.protocol.TProtocol prot, compactionFailed_arg struct.extent.read(iprot); struct.setExtentIsSet(true); } + if (incoming.get(4)) { + struct.exceptionClassName = iprot.readString(); + struct.setExceptionClassNameIsSet(true); + } } } diff --git a/core/src/main/thrift/compaction-coordinator.thrift b/core/src/main/thrift/compaction-coordinator.thrift index cbf3ac0d1fc..ae6a7cce6b9 100644 --- a/core/src/main/thrift/compaction-coordinator.thrift +++ b/core/src/main/thrift/compaction-coordinator.thrift @@ -112,6 +112,7 @@ service CompactionCoordinatorService { 2:security.TCredentials credentials 3:string externalCompactionId 4:data.TKeyExtent extent + 5:string exceptionClassName ) /* diff --git a/core/src/test/java/org/apache/accumulo/core/classloader/ContextClassLoaderFactoryTest.java b/core/src/test/java/org/apache/accumulo/core/classloader/ContextClassLoaderFactoryTest.java index 83b3821640e..84ee2f02844 100644 --- a/core/src/test/java/org/apache/accumulo/core/classloader/ContextClassLoaderFactoryTest.java +++ b/core/src/test/java/org/apache/accumulo/core/classloader/ContextClassLoaderFactoryTest.java @@ -28,6 +28,7 @@ import org.apache.accumulo.core.WithTestNames; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.spi.common.ContextClassLoaderFactory.ContextClassLoaderException; import org.apache.commons.io.FileUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -64,7 +65,7 @@ public void setup() throws Exception { } @Test - public void differentContexts() { + public void differentContexts() throws ContextClassLoaderException { ConfigurationCopy cc = new ConfigurationCopy(); cc.set(Property.GENERAL_CONTEXT_CLASSLOADER_FACTORY.getKey(), diff --git a/core/src/test/java/org/apache/accumulo/core/iteratorsImpl/IteratorConfigUtilTest.java b/core/src/test/java/org/apache/accumulo/core/iteratorsImpl/IteratorConfigUtilTest.java index 251a3441b36..3042a9a0a53 100644 --- a/core/src/test/java/org/apache/accumulo/core/iteratorsImpl/IteratorConfigUtilTest.java +++ b/core/src/test/java/org/apache/accumulo/core/iteratorsImpl/IteratorConfigUtilTest.java @@ -136,14 +136,15 @@ public Value getTopValue() { } private SortedKeyValueIterator createIter(IteratorScope scope, - SortedMapIterator source, AccumuloConfiguration conf) throws IOException { + SortedMapIterator source, AccumuloConfiguration conf) + throws IOException, ReflectiveOperationException { var ibEnv = IteratorConfigUtil.loadIterConf(scope, EMPTY_ITERS, new HashMap<>(), conf); var iteratorBuilder = ibEnv.env(ClientIteratorEnvironment.DEFAULT).useClassLoader(null).build(); return IteratorConfigUtil.loadIterators(source, iteratorBuilder); } @Test - public void test1() throws IOException { + public void test1() throws IOException, ReflectiveOperationException { ConfigurationCopy conf = new ConfigurationCopy(); // create an iterator that adds 1 and then squares @@ -177,7 +178,7 @@ public void test1() throws IOException { } @Test - public void test4() throws IOException { + public void test4() throws IOException, ReflectiveOperationException { // try loading for a different scope AccumuloConfiguration conf = new ConfigurationCopy(); @@ -209,7 +210,7 @@ public void test4() throws IOException { } @Test - public void test3() throws IOException { + public void test3() throws IOException, ReflectiveOperationException { // change the load order, so it squares and then adds ConfigurationCopy conf = new ConfigurationCopy(); @@ -245,7 +246,7 @@ public void test3() throws IOException { } @Test - public void test2() throws IOException { + public void test2() throws IOException, ReflectiveOperationException { ConfigurationCopy conf = new ConfigurationCopy(); @@ -284,7 +285,7 @@ public void test2() throws IOException { } @Test - public void test5() throws IOException { + public void test5() throws IOException, ReflectiveOperationException { ConfigurationCopy conf = new ConfigurationCopy(); // create an iterator that ages off diff --git a/pom.xml b/pom.xml index c8aa687bbb3..6c2548370fd 100644 --- a/pom.xml +++ b/pom.xml @@ -116,7 +116,7 @@ ${project.version} false - --add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.io=ALL-UNNAMED --add-opens java.base/java.net=ALL-UNNAMED --add-opens java.management/java.lang.management=ALL-UNNAMED --add-opens java.management/sun.management=ALL-UNNAMED --add-opens java.base/java.security=ALL-UNNAMED --add-opens java.base/java.lang.reflect=ALL-UNNAMED --add-opens java.base/java.util.concurrent=ALL-UNNAMED --add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens java.base/java.time=ALL-UNNAMED + --add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.io=ALL-UNNAMED --add-opens java.base/java.net=ALL-UNNAMED --add-opens java.management/java.lang.management=ALL-UNNAMED --add-opens java.management/sun.management=ALL-UNNAMED --add-opens java.base/java.security=ALL-UNNAMED --add-opens java.base/java.lang.reflect=ALL-UNNAMED --add-opens java.base/java.util.concurrent=ALL-UNNAMED --add-opens java.base/java.util.stream=ALL-UNNAMED --add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens java.base/java.time=ALL-UNNAMED false 1 diff --git a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java index cd5de7c442c..b2122caafd9 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java @@ -322,7 +322,11 @@ public void startServiceLockVerificationThread() { } @Override - public void close() {} + public void close() { + if (context != null) { + context.close(); + } + } protected void waitForUpgrade() throws InterruptedException { while (AccumuloDataVersion.getCurrentVersion(getContext()) < AccumuloDataVersion.get()) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java index a2a1f93fd47..30543aac24a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java +++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java @@ -84,6 +84,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + /** * Provides a server context for Accumulo server components that operate with the system credentials * and have access to the system files and configuration. @@ -461,6 +463,7 @@ public MetricsInfo getMetricsInfo() { @Override public void close() { + Preconditions.checkState(!isClosed(), "ServerContext.close was already called."); getMetricsInfo().close(); super.close(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServiceEnvironmentImpl.java b/server/base/src/main/java/org/apache/accumulo/server/ServiceEnvironmentImpl.java index 0f68a11d487..46c6fdb7bd5 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/ServiceEnvironmentImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/ServiceEnvironmentImpl.java @@ -18,7 +18,6 @@ */ package org.apache.accumulo.server; -import java.io.IOException; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.classloader.ClassLoaderUtil; @@ -62,14 +61,13 @@ public String getTableName(TableId tableId) throws TableNotFoundException { } @Override - public T instantiate(String className, Class base) - throws ReflectiveOperationException, IOException { + public T instantiate(String className, Class base) throws ReflectiveOperationException { return ConfigurationTypeHelper.getClassInstance(null, className, base); } @Override public T instantiate(TableId tableId, String className, Class base) - throws ReflectiveOperationException, IOException { + throws ReflectiveOperationException { String ctx = ClassLoaderUtil.tableContext(context.getTableConfiguration(tableId)); return ConfigurationTypeHelper.getClassInstance(ctx, className, base); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java index 1f655dd3040..c9352473db2 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java @@ -312,8 +312,8 @@ protected Map> getLocalityGroups(AccumuloConfiguration } @Override - public CompactionStats call() - throws IOException, CompactionCanceledException, InterruptedException { + public CompactionStats call() throws IOException, CompactionCanceledException, + InterruptedException, ReflectiveOperationException { FileSKVWriter mfw = null; @@ -539,7 +539,8 @@ private List> openMapDataFiles( private void compactLocalityGroup(String lgName, Set columnFamilies, boolean inclusive, FileSKVWriter mfw, CompactionStats majCStats, - EnumSet dropCacheFilePrefixes) throws IOException, CompactionCanceledException { + EnumSet dropCacheFilePrefixes) + throws IOException, CompactionCanceledException, ReflectiveOperationException { ArrayList readers = new ArrayList<>(filesToCompact.size()); Span compactSpan = TraceUtil.startSpan(this.getClass(), "compact"); try (Scope span = compactSpan.makeCurrent()) { @@ -562,7 +563,6 @@ private void compactLocalityGroup(String lgName, Set columnFamilie SortedKeyValueIterator itr = iterEnv.getTopLevelIterator(IteratorConfigUtil .convertItersAndLoad(env.getIteratorScope(), cfsi, acuTableConf, iterators, iterEnv)); - itr.seek(extent.toDataRange(), columnFamilies, inclusive); if (inclusive) { diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index add5d607565..873ce2abd9b 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -55,6 +55,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; @@ -95,6 +96,7 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; @@ -130,6 +132,37 @@ public class CompactionCoordinator extends AbstractServer implements /* Map of queue name to last time compactor called to get a compaction job */ private static final Map TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>(); + static class FailureCounts { + long failures; + long successes; + + FailureCounts(long failures, long successes) { + this.failures = failures; + this.successes = successes; + } + + static FailureCounts incrementFailure(Object key, FailureCounts counts) { + if (counts == null) { + return new FailureCounts(1, 0); + } + counts.failures++; + return counts; + } + + static FailureCounts incrementSuccess(Object key, FailureCounts counts) { + if (counts == null) { + return new FailureCounts(0, 1); + } + counts.successes++; + return counts; + } + } + + private final ConcurrentHashMap failingQueues = new ConcurrentHashMap<>(); + private final ConcurrentHashMap failingCompactors = + new ConcurrentHashMap<>(); + private final ConcurrentHashMap failingTables = new ConcurrentHashMap<>(); + private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); protected AuditedSecurityOperation security; protected final AccumuloConfiguration aconf; @@ -312,6 +345,7 @@ public void run() { tserverSet.startListeningForTabletServerChanges(); startDeadCompactionDetector(); + startFailureSummaryLogging(); LOG.info("Starting loop to check tservers for compaction summaries"); while (!isShutdownRequested()) { @@ -352,6 +386,7 @@ public void run() { if (coordinatorAddress.server != null) { coordinatorAddress.server.stop(); } + super.close(); getShutdownComplete().set(true); LOG.info("stop requested. exiting ... "); try { @@ -531,6 +566,7 @@ public TNextCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials prioTserver = QUEUE_SUMMARIES.getNextTserver(queue); continue; } + // It is possible that by the time this added that the tablet has already canceled the // compaction or the compactor that made this request is dead. In these cases the compaction // is not actually running. @@ -602,6 +638,7 @@ public void compactionCompleted(TInfo tinfo, TCredentials credentials, LOG.debug("Compaction completed, id: {}, stats: {}, extent: {}", externalCompactionId, stats, extent); final var ecid = ExternalCompactionId.of(externalCompactionId); + captureSuccess(ecid, extent); compactionFinalizer.commitCompaction(ecid, extent, stats.fileSize, stats.entriesWritten); // It's possible that RUNNING might not have an entry for this ecid in the case // of a coordinator restart when the Coordinator can't find the TServer for the @@ -611,18 +648,89 @@ public void compactionCompleted(TInfo tinfo, TCredentials credentials, @Override public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, - TKeyExtent extent) throws ThriftSecurityException { + TKeyExtent extent, String exceptionClassName) throws ThriftSecurityException { // do not expect users to call this directly, expect other tservers to call this method if (!security.canPerformSystemActions(credentials)) { throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException(); } KeyExtent fromThriftExtent = KeyExtent.fromThrift(extent); - LOG.info("Compaction failed: id: {}, extent: {}", externalCompactionId, fromThriftExtent); + LOG.info("Compaction failed: id: {}, extent: {}, compactor exception:{}", externalCompactionId, + fromThriftExtent, exceptionClassName); final var ecid = ExternalCompactionId.of(externalCompactionId); + if (exceptionClassName != null) { + captureFailure(ecid, fromThriftExtent); + } compactionFailed(Map.of(ecid, KeyExtent.fromThrift(extent))); } + private void captureFailure(ExternalCompactionId ecid, KeyExtent extent) { + var rc = RUNNING_CACHE.get(ecid); + if (rc != null) { + final String queue = rc.getQueueName(); + failingQueues.compute(queue, FailureCounts::incrementFailure); + final String compactor = rc.getCompactorAddress(); + failingCompactors.compute(compactor, FailureCounts::incrementFailure); + } + failingTables.compute(extent.tableId(), FailureCounts::incrementFailure); + } + + protected void startFailureSummaryLogging() { + ScheduledFuture future = getContext().getScheduledExecutor() + .scheduleWithFixedDelay(this::printStats, 0, 5, TimeUnit.MINUTES); + ThreadPools.watchNonCriticalScheduledTask(future); + } + + private void printStats(String logPrefix, ConcurrentHashMap failureCounts, + boolean logSuccessAtTrace) { + for (var key : failureCounts.keySet()) { + failureCounts.compute(key, (k, counts) -> { + if (counts != null) { + Level level; + if (counts.failures > 0) { + level = Level.WARN; + } else if (logSuccessAtTrace) { + level = Level.TRACE; + } else { + level = Level.DEBUG; + } + + LOG.atLevel(level).log("{} {} failures:{} successes:{} since last time this was logged ", + logPrefix, k, counts.failures, counts.successes); + } + + // clear the counts so they can start building up for the next logging if this key is ever + // used again + return null; + }); + } + } + + private void printStats() { + + // Remove down compactors from failing list + Map> allCompactors = + ExternalCompactionUtil.getCompactorAddrs(getContext()); + Set allCompactorAddrs = new HashSet<>(); + allCompactors.values().forEach(l -> l.forEach(c -> allCompactorAddrs.add(c.toString()))); + failingCompactors.keySet().retainAll(allCompactorAddrs); + + printStats("Queue", failingQueues, false); + printStats("Table", failingTables, false); + printStats("Compactor", failingCompactors, true); + } + + private void captureSuccess(ExternalCompactionId ecid, KeyExtent extent) { + var rc = RUNNING_CACHE.get(ecid); + if (rc != null) { + final String queue = rc.getQueueName(); + failingQueues.compute(queue, FailureCounts::incrementSuccess); + final String compactor = rc.getCompactorAddress(); + failingCompactors.compute(compactor, FailureCounts::incrementSuccess); + } + failingTables.compute(extent.tableId(), FailureCounts::incrementSuccess); + } + void compactionFailed(Map compactions) { compactionFinalizer.failCompactions(compactions); compactions.forEach((k, v) -> recordCompletion(k)); diff --git a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java index 8b320aac341..a38bcb61a0b 100644 --- a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java +++ b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java @@ -117,6 +117,9 @@ protected TestCoordinator(CompactionFinalizer finalizer, LiveTServerSet tservers @Override protected void startDeadCompactionDetector() {} + @Override + protected void startFailureSummaryLogging() {} + @Override protected long getTServerCheckInterval() { gracefulShutdown(null); @@ -188,7 +191,7 @@ public void compactionCompleted(TInfo tinfo, TCredentials credentials, @Override public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, - TKeyExtent extent) throws ThriftSecurityException {} + TKeyExtent extent, String exceptionClassName) throws ThriftSecurityException {} void setMetadataCompactionIds(Set mci) { metadataCompactionIds = mci; diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index ce18a0ef9ea..a317525b50f 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -28,6 +28,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -38,6 +39,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.Supplier; @@ -124,6 +126,7 @@ import com.google.common.base.Preconditions; import io.micrometer.core.instrument.FunctionCounter; +import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.LongTaskTimer; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; @@ -145,6 +148,51 @@ public interface FileCompactorRunnable extends Runnable { private static final SecureRandom random = new SecureRandom(); + private static class ConsecutiveErrorHistory extends HashMap> { + + private static final long serialVersionUID = 1L; + + public long getTotalFailures() { + long total = 0; + for (TableId tid : keySet()) { + total += getTotalTableFailures(tid); + } + return total; + } + + public long getTotalTableFailures(TableId tid) { + long total = 0; + for (AtomicLong failures : get(tid).values()) { + total += failures.get(); + } + return total; + } + + /** + * Add error for table + * + * @param tid table id + * @param error exception + */ + public void addError(TableId tid, Throwable error) { + computeIfAbsent(tid, t -> new HashMap()) + .computeIfAbsent(error.toString(), e -> new AtomicLong(0)).incrementAndGet(); + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + for (TableId tid : keySet()) { + buf.append("\nTable: ").append(tid); + for (Entry error : get(tid).entrySet()) { + buf.append("\n\tException: ").append(error.getKey()).append(", count: ") + .append(error.getValue().get()); + } + } + return buf.toString(); + } + } + public static class CompactorServerOpts extends ServerOpts { @Parameter(required = true, names = {"-q", "--queue"}, description = "compaction queue name") private String queueName = null; @@ -171,6 +219,11 @@ public String getQueueName() { private ServerAddress compactorAddress = null; private final AtomicBoolean compactionRunning = new AtomicBoolean(false); + private final ConsecutiveErrorHistory errorHistory = new ConsecutiveErrorHistory(); + private final AtomicLong completed = new AtomicLong(0); + private final AtomicLong cancelled = new AtomicLong(0); + private final AtomicLong failed = new AtomicLong(0); + private final AtomicLong terminated = new AtomicLong(0); protected Compactor(CompactorServerOpts opts, String[] args) { super("compactor", opts, args); @@ -186,6 +239,26 @@ private long getTotalEntriesWritten() { return FileCompactor.getTotalEntriesWritten(); } + private double getConsecutiveFailures() { + return errorHistory.getTotalFailures(); + } + + private double getCancellations() { + return cancelled.get(); + } + + private double getCompletions() { + return completed.get(); + } + + private double getFailures() { + return failed.get(); + } + + private double getTerminated() { + return terminated.get(); + } + @Override public void registerMetrics(MeterRegistry registry) { super.registerMetrics(registry); @@ -196,6 +269,22 @@ public void registerMetrics(MeterRegistry registry) { .builder(METRICS_COMPACTOR_ENTRIES_WRITTEN, this, Compactor::getTotalEntriesWritten) .description("Number of entries written by all compactions that have run on this compactor") .register(registry); + FunctionCounter + .builder(METRICS_COMPACTOR_COMPACTIONS_CANCELLED, this, Compactor::getCancellations) + .description("Number compactions that have been cancelled on this compactor") + .register(registry); + FunctionCounter + .builder(METRICS_COMPACTOR_COMPACTIONS_COMPLETED, this, Compactor::getCompletions) + .description("Number compactions that have succeeded on this compactor").register(registry); + FunctionCounter.builder(METRICS_COMPACTOR_COMPACTIONS_FAILED, this, Compactor::getFailures) + .description("Number compactions that have failed on this compactor").register(registry); + FunctionCounter.builder(METRICS_COMPACTOR_FAILURES_TERMINATION, this, Compactor::getTerminated) + .description("Will report 1 if the Compactor terminates due to consecutive failure, else 0") + .register(registry); + Gauge.builder(METRICS_COMPACTOR_FAILURES_CONSECUTIVE, this, Compactor::getConsecutiveFailures) + .description( + "Number of consecutive compaction failures. Resets to zero on a successful compaction") + .register(registry); LongTaskTimer timer = LongTaskTimer.builder(METRICS_COMPACTOR_MAJC_STUCK) .description("Number and duration of stuck major compactions").register(registry); CompactionWatcher.setTimer(timer); @@ -395,16 +484,17 @@ protected void updateCompactionState(TExternalCompactionJob job, TCompactionStat * Notify the CompactionCoordinator the job failed * * @param job current compaction job + * @param exception cause of failure * @throws RetriesExceededException thrown when retries have been exceeded */ - protected void updateCompactionFailed(TExternalCompactionJob job) + protected void updateCompactionFailed(TExternalCompactionJob job, Throwable exception) throws RetriesExceededException { RetryableThriftCall thriftCall = new RetryableThriftCall<>(1000, RetryableThriftCall.MAX_WAIT_TIME, 25, () -> { Client coordinatorClient = getCoordinatorClient(); try { coordinatorClient.compactionFailed(TraceUtil.traceInfo(), getContext().rpcCreds(), - job.getExternalCompactionId(), job.extent); + job.getExternalCompactionId(), job.extent, exception.getClass().getName()); return ""; } finally { ThriftUtil.returnClient(coordinatorClient, getContext()); @@ -666,6 +756,48 @@ protected Collection getServiceTags(HostAndPort clientAddress) { clientAddress, queueName); } + private void performFailureProcessing(ConsecutiveErrorHistory errorHistory) + throws InterruptedException { + // consecutive failure processing + final long totalFailures = errorHistory.getTotalFailures(); + if (totalFailures > 0) { + LOG.warn("This Compactor has had {} consecutive failures. Failures: {}", totalFailures, + errorHistory.toString()); // ErrorHistory.toString not invoked without .toString + final long failureThreshold = + getConfiguration().getCount(Property.COMPACTOR_FAILURE_TERMINATION_THRESHOLD); + if (failureThreshold > 0 && totalFailures >= failureThreshold) { + LOG.error( + "Consecutive failures ({}) has met or exceeded failure threshold ({}), exiting...", + totalFailures, failureThreshold); + terminated.incrementAndGet(); + throw new InterruptedException( + "Consecutive failures has exceeded failure threshold, exiting..."); + } + if (totalFailures + >= getConfiguration().getCount(Property.COMPACTOR_FAILURE_BACKOFF_THRESHOLD)) { + final long interval = + getConfiguration().getTimeInMillis(Property.COMPACTOR_FAILURE_BACKOFF_INTERVAL); + if (interval > 0) { + final long max = + getConfiguration().getTimeInMillis(Property.COMPACTOR_FAILURE_BACKOFF_RESET); + final long backoffMS = Math.min(max, interval * totalFailures); + LOG.warn( + "Not starting next compaction for {}ms due to consecutive failures. Check the log and address any issues.", + backoffMS); + if (backoffMS == max) { + errorHistory.clear(); + } + Thread.sleep(backoffMS); + } else if (interval == 0) { + LOG.info( + "This Compactor has had {} consecutive failures and failure backoff is disabled.", + totalFailures); + errorHistory.clear(); + } + } + } + } + @Override public void run() { @@ -712,6 +844,8 @@ public void run() { err.set(null); JOB_HOLDER.reset(); + performFailureProcessing(errorHistory); + TExternalCompactionJob job; try { TNextCompactionJob next = getNextJob(getNextId()); @@ -809,14 +943,15 @@ public void run() { new TCompactionStatusUpdate(TCompactionState.CANCELLED, "Compaction cancelled", -1, -1, -1, fcr.getCompactionAge().toNanos()); updateCompactionState(job, update); - updateCompactionFailed(job); + updateCompactionFailed(job, null); + cancelled.incrementAndGet(); } catch (RetriesExceededException e) { LOG.error("Error updating coordinator with compaction cancellation.", e); } finally { currentCompactionId.set(null); } } else if (err.get() != null) { - KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); + final KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); try { LOG.info("Updating coordinator with compaction failure: id: {}, extent: {}", job.getExternalCompactionId(), fromThriftExtent); @@ -824,7 +959,9 @@ public void run() { TCompactionState.FAILED, "Compaction failed due to: " + err.get().getMessage(), -1, -1, -1, fcr.getCompactionAge().toNanos()); updateCompactionState(job, update); - updateCompactionFailed(job); + updateCompactionFailed(job, err.get()); + failed.incrementAndGet(); + errorHistory.addError(fromThriftExtent.tableId(), err.get()); } catch (RetriesExceededException e) { LOG.error("Error updating coordinator with compaction failure: id: {}, extent: {}", job.getExternalCompactionId(), fromThriftExtent, e); @@ -835,6 +972,9 @@ public void run() { try { LOG.trace("Updating coordinator with compaction completion."); updateCompactionCompleted(job, JOB_HOLDER.getStats()); + completed.incrementAndGet(); + // job completed successfully, clear the error history + errorHistory.clear(); } catch (RetriesExceededException e) { LOG.error( "Error updating coordinator with compaction completion, cancelling compaction.", @@ -896,6 +1036,7 @@ public void run() { } gcLogger.logGCInfo(getConfiguration()); + super.close(); getShutdownComplete().set(true); LOG.info("stop requested. exiting ... "); try { diff --git a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java index 3489e8fef85..228b6911a55 100644 --- a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java +++ b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java @@ -272,7 +272,7 @@ protected void updateCompactionState(TExternalCompactionJob job, TCompactionStat } @Override - protected void updateCompactionFailed(TExternalCompactionJob job) + protected void updateCompactionFailed(TExternalCompactionJob job, Throwable exception) throws RetriesExceededException { failedCalled = true; } diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index a7cfca69b8d..902642a6c03 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -344,6 +344,7 @@ public void run() { gracefulShutdown(getContext().rpcCreds()); } } + super.close(); getShutdownComplete().set(true); log.info("stop requested. exiting ... "); try { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index c4914fc0ff6..a2abec19929 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1498,6 +1498,7 @@ boolean canSuspendTablets() { throw new IllegalStateException("Exception waiting on watcher", e); } } + super.close(); getShutdownComplete().set(true); log.info("stop requested. exiting ... "); try { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java index f395c4134ad..cb98c24d35b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java @@ -86,7 +86,7 @@ private static class MergedIterConfig { } SortedKeyValueIterator buildIterator(SortedKeyValueIterator systemIter, - TCondition tc) throws IOException { + TCondition tc) throws IOException, ReflectiveOperationException { ArrayByteSequence key = new ArrayByteSequence(tc.iterators); MergedIterConfig mic = mergedIterCache.get(key); @@ -111,7 +111,7 @@ SortedKeyValueIterator buildIterator(SortedKeyValueIterator systemIter, - ServerConditionalMutation scm) throws IOException { + ServerConditionalMutation scm) throws IOException, ReflectiveOperationException { boolean add = true; for (TCondition tc : scm.getConditions()) { @@ -157,7 +157,8 @@ public ConditionChecker(List conditionsToCheck, this.results = results; } - public void check(SortedKeyValueIterator systemIter) throws IOException { + public void check(SortedKeyValueIterator systemIter) + throws IOException, ReflectiveOperationException { checkArgument(!checked, "check() method should only be called once"); checked = true; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 78ddee2b855..ec89e668af9 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -463,6 +463,7 @@ public void run() { } gcLogger.logGCInfo(getConfiguration()); + super.close(); getShutdownComplete().set(true); LOG.info("stop requested. exiting ... "); try { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index 97dad09224b..571acb5a3fd 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -751,7 +751,7 @@ private NamespaceId getNamespaceId(TCredentials credentials, TableId tableId) private void checkConditions(Map> updates, ArrayList results, ConditionalSession cs, List symbols) - throws IOException { + throws IOException, ReflectiveOperationException { Iterator>> iter = updates.entrySet().iterator(); final CompressedIterators compressedIters = new CompressedIterators(symbols); @@ -905,7 +905,7 @@ private void addMutationsAsTCMResults(final List list, private Map> conditionalUpdate(ConditionalSession cs, Map> updates, ArrayList results, - List symbols) throws IOException { + List symbols) throws IOException, ReflectiveOperationException { // sort each list of mutations, this is done to avoid deadlock and doing seeks in order is // more efficient and detect duplicate rows. ConditionalMutationSet.sortConditionalMutations(updates); @@ -1023,7 +1023,7 @@ public List conditionalUpdate(TInfo tinfo, long sessID, } return results; - } catch (IOException ioe) { + } catch (IOException | ReflectiveOperationException ioe) { throw new TException(ioe); } catch (Exception e) { log.warn("Exception returned for conditionalUpdate {}", e); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 0cf5c2ec473..e66f6ae2d4a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -1044,7 +1044,7 @@ public void run() { } gcLogger.logGCInfo(getConfiguration()); - + super.close(); getShutdownComplete().set(true); log.info("TServerInfo: stop requested. exiting ... "); try { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java index 56a2006456c..be4672107ad 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java @@ -278,7 +278,7 @@ static T newInstance(AccumuloConfiguration tableConfig, String className, String context = ClassLoaderUtil.tableContext(tableConfig); try { return ConfigurationTypeHelper.getClassInstance(context, className, baseClass); - } catch (IOException | ReflectiveOperationException e) { + } catch (ReflectiveOperationException e) { throw new RuntimeException(e); } } @@ -559,8 +559,8 @@ private static AccumuloConfiguration getCompactionConfig(TableConfiguration tabl */ static CompactionStats compact(Tablet tablet, CompactionJob job, CompactableImpl.CompactionInfo cInfo, CompactionEnv cenv, - Map compactFiles, TabletFile tmpFileName) - throws IOException, CompactionCanceledException, InterruptedException { + Map compactFiles, TabletFile tmpFileName) throws IOException, + CompactionCanceledException, InterruptedException, ReflectiveOperationException { TableConfiguration tableConf = tablet.getTableConfiguration(); AccumuloConfiguration compactionConfig = getCompactionConfig(tableConf, diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java index f265056c6a5..93f73dbd2ed 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java @@ -115,7 +115,7 @@ public CompactionStats call() { } return ret; - } catch (IOException | UnsatisfiedLinkError e) { + } catch (IOException | ReflectiveOperationException | UnsatisfiedLinkError e) { log.warn("MinC failed ({}) to create {} retrying ...", e.getMessage(), outputFileName); ProblemReports.getInstance(tabletServer.getContext()).report( new ProblemReport(getExtent().tableId(), ProblemType.FILE_WRITE, outputFileName, e)); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index 85f355f7c93..0f30f7673ee 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -125,12 +125,17 @@ public boolean isCurrent() { @Override public SortedKeyValueIterator iterator() throws IOException { if (iter == null) { - iter = createIterator(); + try { + iter = createIterator(); + } catch (ReflectiveOperationException e) { + throw new IOException("Error creating iterator", e); + } } return iter; } - private SortedKeyValueIterator createIterator() throws IOException { + private SortedKeyValueIterator createIterator() + throws IOException, ReflectiveOperationException { Map files; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 8e296d400dc..71f4efca1c5 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -457,7 +457,7 @@ private void removeOldTemporaryFiles( } public void checkConditions(ConditionChecker checker, Authorizations authorizations, - AtomicBoolean iFlag) throws IOException { + AtomicBoolean iFlag) throws IOException, ReflectiveOperationException { ScanParameters scanParams = new ScanParameters(-1, authorizations, Collections.emptySet(), null, null, false, null, -1, null); @@ -469,7 +469,7 @@ public void checkConditions(ConditionChecker checker, Authorizations authorizati try { SortedKeyValueIterator iter = new SourceSwitchingIterator(dataSource); checker.check(iter); - } catch (IOException | RuntimeException e) { + } catch (IOException | RuntimeException | ReflectiveOperationException e) { sawException = true; throw e; } finally { diff --git a/shell/src/main/java/org/apache/accumulo/shell/Shell.java b/shell/src/main/java/org/apache/accumulo/shell/Shell.java index 5e7a84a02ff..0eb3214bda2 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/Shell.java +++ b/shell/src/main/java/org/apache/accumulo/shell/Shell.java @@ -67,6 +67,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.thrift.TConstraintViolationSummary; +import org.apache.accumulo.core.spi.common.ContextClassLoaderFactory.ContextClassLoaderException; import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException; import org.apache.accumulo.core.util.BadArgumentException; import org.apache.accumulo.core.util.format.DefaultFormatter; @@ -469,7 +470,8 @@ public AccumuloClient getAccumuloClient() { } public ClassLoader getClassLoader(final CommandLine cl, final Shell shellState) - throws AccumuloException, TableNotFoundException, AccumuloSecurityException { + throws AccumuloException, TableNotFoundException, AccumuloSecurityException, + ContextClassLoaderException { boolean tables = cl.hasOption(OptUtil.tableOpt().getOpt()) || !shellState.getTableName().isEmpty(); diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/SetIterCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/SetIterCommand.java index 6e4eed888f7..f9e8be1f562 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/SetIterCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/SetIterCommand.java @@ -41,6 +41,7 @@ import org.apache.accumulo.core.iterators.user.ReqVisFilter; import org.apache.accumulo.core.iterators.user.VersioningIterator; import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.spi.common.ContextClassLoaderFactory.ContextClassLoaderException; import org.apache.accumulo.shell.Shell; import org.apache.accumulo.shell.Shell.Command; import org.apache.accumulo.shell.ShellCommandException; @@ -60,7 +61,7 @@ public class SetIterCommand extends Command { @Override public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException, - ShellCommandException { + ShellCommandException, ContextClassLoaderException { boolean tables = cl.hasOption(OptUtil.tableOpt().getOpt()) || !shellState.getTableName().isEmpty(); diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/SetScanIterCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/SetScanIterCommand.java index 2cb2295a8d2..e5c67291ac4 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/SetScanIterCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/SetScanIterCommand.java @@ -31,6 +31,7 @@ import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.spi.common.ContextClassLoaderFactory.ContextClassLoaderException; import org.apache.accumulo.shell.Shell; import org.apache.accumulo.shell.ShellCommandException; import org.apache.commons.cli.CommandLine; @@ -43,7 +44,7 @@ public class SetScanIterCommand extends SetIterCommand { @Override public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException, - ShellCommandException { + ShellCommandException, ContextClassLoaderException { Shell.log.warn("Deprecated, use {}", new SetShellIterCommand().getName()); return super.execute(fullCommand, cl, shellState); } diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/SetShellIterCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/SetShellIterCommand.java index f10c13c8698..336ece806f5 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/SetShellIterCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/SetShellIterCommand.java @@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.spi.common.ContextClassLoaderFactory.ContextClassLoaderException; import org.apache.accumulo.shell.Shell; import org.apache.accumulo.shell.ShellCommandException; import org.apache.commons.cli.CommandLine; @@ -38,7 +39,7 @@ public class SetShellIterCommand extends SetIterCommand { @Override public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException, - ShellCommandException { + ShellCommandException, ContextClassLoaderException { return super.execute(fullCommand, cl, shellState); } diff --git a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoader.java b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoader.java index c75b931b87c..f1f17795f92 100644 --- a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoader.java +++ b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoader.java @@ -194,12 +194,8 @@ private static ReloadingClassLoader createDynamicClassloader(final ClassLoader p return new AccumuloReloadingVFSClassLoader(dynamicCPath, generateVfs(), wrapper, 1000, true); } - public static ClassLoader getClassLoader() { - try { - return getClassLoader_Internal(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + public static ClassLoader getClassLoader() throws IOException { + return getClassLoader_Internal(); } private static ClassLoader getClassLoader_Internal() throws IOException { @@ -417,19 +413,25 @@ public static void removeUnusedContexts(Set contextsInUse) { } } - public static ClassLoader getContextClassLoader(String contextName) { - try { - return getContextManager().getClassLoader(contextName); - } catch (IOException e) { - throw new UncheckedIOException( - "Error getting context class loader for context: " + contextName, e); - } + public static ClassLoader getContextClassLoader(String contextName) throws IOException { + return getContextManager().getClassLoader(contextName); } public static synchronized ContextManager getContextManager() throws IOException { if (contextManager == null) { getClassLoader(); - contextManager = new ContextManager(generateVfs(), AccumuloVFSClassLoader::getClassLoader); + try { + contextManager = new ContextManager(generateVfs(), () -> { + try { + return getClassLoader(); + } catch (IOException e) { + // throw runtime, then unwrap it. + throw new UncheckedIOException(e); + } + }); + } catch (UncheckedIOException uioe) { + throw uioe.getCause(); + } } return contextManager; diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ClassLoaderContextCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ClassLoaderContextCompactionIT.java new file mode 100644 index 00000000000..15a539bf5ce --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ClassLoaderContextCompactionIT.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.compaction; + +import static org.apache.accumulo.core.conf.Property.TABLE_FILE_MAX; +import static org.apache.accumulo.core.conf.Property.TABLE_MAJC_RATIO; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.accumulo.compactor.Compactor; +import org.apache.accumulo.coordinator.CompactionCoordinator; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory; +import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; +import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.ReadWriteIT; +import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory; +import org.apache.accumulo.test.metrics.TestStatsDSink; +import org.apache.accumulo.test.metrics.TestStatsDSink.Metric; +import org.apache.accumulo.test.util.Wait; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClassLoaderContextCompactionIT extends AccumuloClusterHarness { + + private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderContextCompactionIT.class); + private static TestStatsDSink sink; + + @BeforeAll + public static void before() throws Exception { + sink = new TestStatsDSink(); + } + + @AfterAll + public static void after() throws Exception { + sink.close(); + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite); + // After 1 failure start backing off by 5s. + // After 3 failures, terminate the Compactor + cfg.setProperty(Property.COMPACTOR_FAILURE_BACKOFF_THRESHOLD, "1"); + cfg.setProperty(Property.COMPACTOR_FAILURE_BACKOFF_INTERVAL, "5s"); + cfg.setProperty(Property.COMPACTOR_FAILURE_BACKOFF_RESET, "10m"); + cfg.setProperty(Property.COMPACTOR_FAILURE_TERMINATION_THRESHOLD, "3"); + cfg.setNumCompactors(2); + // Tell the server processes to use a StatsDMeterRegistry and the simple logging registry + // that will be configured to push all metrics to the sink we started. + cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true"); + cfg.setProperty(Property.GENERAL_MICROMETER_JVM_METRICS_ENABLED, "true"); + cfg.setProperty("general.custom.metrics.opts.logging.step", "1s"); + String clazzList = LoggingMeterRegistryFactory.class.getName() + "," + + TestStatsDRegistryFactory.class.getName(); + cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, clazzList); + Map sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1", + TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort())); + cfg.setSystemProperties(sysProps); + } + + @Test + public void testClassLoaderContextErrorKillsCompactor() throws Exception { + + final AtomicBoolean shutdownTailer = new AtomicBoolean(false); + final AtomicLong cancellations = new AtomicLong(0); + final AtomicLong completions = new AtomicLong(0); + final AtomicLong failures = new AtomicLong(0); + final AtomicLong consecutive = new AtomicLong(0); + final AtomicLong terminations = new AtomicLong(0); + + final Thread thread = Threads.createNonCriticalThread("metric-tailer", () -> { + while (!shutdownTailer.get()) { + List statsDMetrics = sink.getLines(); + for (String s : statsDMetrics) { + if (shutdownTailer.get()) { + break; + } + if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_COMPACTIONS_CANCELLED)) { + Metric m = TestStatsDSink.parseStatsDMetric(s); + LOG.info("{}", m); + cancellations.set(Long.parseLong(m.getValue())); + } else if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_COMPACTIONS_COMPLETED)) { + Metric m = TestStatsDSink.parseStatsDMetric(s); + LOG.info("{}", m); + completions.set(Long.parseLong(m.getValue())); + } else if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_COMPACTIONS_FAILED)) { + Metric m = TestStatsDSink.parseStatsDMetric(s); + LOG.info("{}", m); + failures.set(Long.parseLong(m.getValue())); + } else if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_FAILURES_TERMINATION)) { + Metric m = TestStatsDSink.parseStatsDMetric(s); + LOG.info("{}", m); + terminations.set(Long.parseLong(m.getValue())); + } else if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_FAILURES_CONSECUTIVE)) { + Metric m = TestStatsDSink.parseStatsDMetric(s); + LOG.info("{}", m); + consecutive.set(Long.parseLong(m.getValue())); + } + + } + } + }); + thread.start(); + + final String table1 = this.getUniqueNames(1)[0]; + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class); + getCluster().getClusterControl().startCompactors(Compactor.class, 1, QUEUE1); + Wait.waitFor( + () -> ExternalCompactionUtil.countCompactors(QUEUE1, (ClientContext) client) == 1); + List compactors = + ExternalCompactionUtil.getCompactorAddrs((ClientContext) client).get(QUEUE1); + assertEquals(1, compactors.size()); + final HostAndPort compactorAddr = compactors.get(0); + createTable(client, table1, "cs1"); + client.tableOperations().setProperty(table1, TABLE_FILE_MAX.getKey(), "1001"); + client.tableOperations().setProperty(table1, TABLE_MAJC_RATIO.getKey(), "1001"); + TableId tid = TableId.of(client.tableOperations().tableIdMap().get(table1)); + + ReadWriteIT.ingest(client, 1000, 1, 1, 0, "colf", table1, 20); + + Ample ample = ((ClientContext) client).getAmple(); + try ( + TabletsMetadata tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build()) { + TabletMetadata tm = tms.iterator().next(); + assertEquals(50, tm.getFiles().size()); + } + + final MiniAccumuloClusterImpl cluster = (MiniAccumuloClusterImpl) getCluster(); + final FileSystem fs = cluster.getFileSystem(); + + // Create the context directory in HDFS + final org.apache.hadoop.fs.Path contextDir = fs.makeQualified(new org.apache.hadoop.fs.Path( + cluster.getConfig().getAccumuloDir().toString(), "classpath")); + assertTrue(fs.mkdirs(contextDir)); + + // Copy the FooFilter.jar to the context dir + final org.apache.hadoop.fs.Path src = new org.apache.hadoop.fs.Path( + System.getProperty("java.io.tmpdir") + "/classes/org/apache/accumulo/test/FooFilter.jar"); + final org.apache.hadoop.fs.Path dst = new org.apache.hadoop.fs.Path(contextDir, "Test.jar"); + fs.copyFromLocalFile(src, dst); + assertTrue(fs.exists(dst)); + + // Define a classloader context that references Test.jar + @SuppressWarnings("removal") + final Property p = Property.VFS_CONTEXT_CLASSPATH_PROPERTY; + client.instanceOperations().setProperty(p.getKey() + "undefined", dst.toUri().toString()); + + // Force the classloader to look in the context jar first, don't delegate to the parent first + client.instanceOperations().setProperty("general.vfs.context.classpath.undefined.delegation", + "post"); + + // Set the context on the table + client.tableOperations().setProperty(table1, Property.TABLE_CLASSLOADER_CONTEXT.getKey(), + "undefined"); + + final IteratorSetting cfg = + new IteratorSetting(101, "FooFilter", "org.apache.accumulo.test.FooFilter"); + client.tableOperations().attachIterator(table1, cfg, EnumSet.of(IteratorScope.majc)); + + // delete Test.jar, so that the classloader will fail + assertTrue(fs.delete(dst, false)); + + assertEquals(0, cancellations.get()); + assertEquals(0, completions.get()); + assertEquals(0, failures.get()); + assertEquals(0, terminations.get()); + assertEquals(0, consecutive.get()); + + // Start a compaction. The missing jar should cause a failure + client.tableOperations().compact(table1, new CompactionConfig().setWait(false)); + Wait.waitFor( + () -> ExternalCompactionUtil.getRunningCompaction(compactorAddr, (ClientContext) client) + == null); + assertEquals(1, ExternalCompactionUtil.countCompactors(QUEUE1, (ClientContext) client)); + Wait.waitFor(() -> failures.get() == 1); + Wait.waitFor(() -> consecutive.get() == 1); + + Wait.waitFor(() -> failures.get() == 0); + client.tableOperations().compact(table1, new CompactionConfig().setWait(false)); + Wait.waitFor( + () -> ExternalCompactionUtil.getRunningCompaction(compactorAddr, (ClientContext) client) + == null); + assertEquals(1, ExternalCompactionUtil.countCompactors(QUEUE1, (ClientContext) client)); + Wait.waitFor(() -> failures.get() == 1); + Wait.waitFor(() -> consecutive.get() == 2); + + Wait.waitFor(() -> failures.get() == 0); + client.tableOperations().compact(table1, new CompactionConfig().setWait(false)); + Wait.waitFor( + () -> ExternalCompactionUtil.getRunningCompaction(compactorAddr, (ClientContext) client) + == null); + assertEquals(1, ExternalCompactionUtil.countCompactors(QUEUE1, (ClientContext) client)); + Wait.waitFor(() -> failures.get() == 1); + Wait.waitFor(() -> consecutive.get() == 3); + + // Three failures have occurred, Compactor should shut down. + Wait.waitFor( + () -> ExternalCompactionUtil.countCompactors(QUEUE1, (ClientContext) client) == 0); + Wait.waitFor(() -> terminations.get() == 1); + assertEquals(0, cancellations.get()); + assertEquals(0, completions.get()); + + } finally { + shutdownTailer.set(true); + thread.join(); + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR); + } + + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java index b7cefe3c8b6..026f392a010 100644 --- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java @@ -103,6 +103,11 @@ public void confirmMetricsPublished() throws Exception { // @formatter:off Set unexpectedMetrics = Set.of(METRICS_COMPACTOR_MAJC_STUCK, + METRICS_COMPACTOR_COMPACTIONS_CANCELLED, + METRICS_COMPACTOR_COMPACTIONS_COMPLETED, + METRICS_COMPACTOR_COMPACTIONS_FAILED, + METRICS_COMPACTOR_FAILURES_CONSECUTIVE, + METRICS_COMPACTOR_FAILURES_TERMINATION, METRICS_REPLICATION_QUEUE, METRICS_SCAN_YIELDS, METRICS_UPDATE_ERRORS); diff --git a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java index bba4b355bc0..62e034bbef5 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java @@ -429,7 +429,7 @@ private static SortedKeyValueIterator createScanIterator(KeyExtent ke Collection> mapfiles, Authorizations authorizations, byte[] defaultLabels, HashSet columnSet, List ssiList, Map> ssio, boolean useTableIterators, TableConfiguration conf, - ServerContext context) throws IOException { + ServerContext context) throws IOException, ReflectiveOperationException { SortedMapIterator smi = new SortedMapIterator(new TreeMap<>());