Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a5ddec2
Cause Compactor to exit when error loading classes
dlmarion Jul 8, 2025
825df0d
Added IT
dlmarion Jul 9, 2025
8695400
Changes to propogate IOE and ROE up to classloader callers
dlmarion Jul 11, 2025
7c7ecef
Return exception class name, capture failures and successes
dlmarion Jul 11, 2025
6563aee
Moved failure account to Compactor, log summary in Coordinator
dlmarion Jul 14, 2025
65a7a35
Minor update
dlmarion Jul 14, 2025
95ebecf
Add property to terminate Compactor based on consecutive failures
dlmarion Jul 14, 2025
787a4a7
Merge branch '2.1' into compactor-die-on-cl-error
dlmarion Jul 15, 2025
2865ad2
Fix property type, move error history logic to method
dlmarion Jul 15, 2025
3d71032
Modified error history to capture all exceptions, print nice summary
dlmarion Jul 15, 2025
d45b2d6
Fixed ErrorHistory
dlmarion Jul 15, 2025
9368643
Updated IT
dlmarion Jul 15, 2025
f302e83
Updated logging, addressed PR suggestions
dlmarion Jul 16, 2025
04faeab
Fix backoff condition, was off by one
dlmarion Jul 16, 2025
052ffff
Added new metrics to Compactor to track success, cancellation, failure
dlmarion Jul 16, 2025
daf32fc
Updated MetricsProducer javadoc for new metrics
dlmarion Jul 16, 2025
d6c8aac
log success and failure counts (#56)
keith-turner Jul 16, 2025
22fe257
Created new exception class for ContextClassLoaderFactory
dlmarion Jul 16, 2025
b6af86c
Merge branch 'compactor-die-on-cl-error' of github.com:dlmarion/accum…
dlmarion Jul 16, 2025
d216bde
narrow exceptions
dlmarion Jul 16, 2025
e05434d
Fixes to get failed ITs working
dlmarion Jul 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
*/
package org.apache.accumulo.core.classloader;

import java.io.IOException;

import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.spi.common.ContextClassLoaderFactory;
import org.apache.accumulo.core.spi.common.ContextClassLoaderFactory.ContextClassLoaderException;
import org.apache.accumulo.core.util.ConfigurationImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -74,11 +77,15 @@ public static synchronized void resetContextFactoryForTests() {
}

@SuppressWarnings("deprecation")
public static ClassLoader getClassLoader(String context) {
public static ClassLoader getClassLoader(String context) throws ContextClassLoaderException {
if (context != null && !context.isEmpty()) {
return FACTORY.getClassLoader(context);
} else {
return org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader.getClassLoader();
try {
return org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader.getClassLoader();
} catch (IOException e) {
throw new ContextClassLoaderException(context, e);
}
}
}

Expand All @@ -92,7 +99,7 @@ public static boolean isValidContext(String context) {
return false;
}
return true;
} catch (RuntimeException e) {
} catch (ContextClassLoaderException e) {
LOG.debug("Context {} is not valid.", context, e);
return false;
}
Expand All @@ -103,7 +110,11 @@ public static boolean isValidContext(String context) {

public static <U> Class<? extends U> loadClass(String context, String className,
Class<U> extension) throws ClassNotFoundException {
return getClassLoader(context).loadClass(className).asSubclass(extension);
try {
return getClassLoader(context).loadClass(className).asSubclass(extension);
} catch (ContextClassLoaderException e) {
throw new ClassNotFoundException("Error loading class from context: " + context, e);
}
}

public static <U> Class<? extends U> loadClass(String className, Class<U> extension)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.util.concurrent.TimeUnit.MINUTES;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -97,8 +98,12 @@ private static void removeUnusedContexts(Set<String> contextsInUse) {

@SuppressWarnings("deprecation")
@Override
public ClassLoader getClassLoader(String contextName) {
return org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader
.getContextClassLoader(contextName);
public ClassLoader getClassLoader(String contextName) throws ContextClassLoaderException {
try {
return org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader
.getContextClassLoader(contextName);
} catch (RuntimeException | IOException e) {
throw new ContextClassLoaderException(contextName, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public Iterator<Entry<Key,Value>> iterator() {
IteratorBuilder.builder(tm.values()).opts(serverSideIteratorOptions).env(iterEnv).build();

skvi = IteratorConfigUtil.loadIterators(smi, ib);
} catch (IOException e) {
} catch (IOException | ReflectiveOperationException e) {
throw new RuntimeException(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public Iterator<Entry<Key,Value>> iterator() {
.opts(serverSideIteratorOptions).env(iterEnv).build();
iterator = IteratorConfigUtil.loadIterators(iterator, iteratorBuilder);
}
} catch (IOException e) {
} catch (IOException | ReflectiveOperationException e) {
throw new RuntimeException(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ private void ensureOpen() {
}
}

protected boolean isClosed() {
return closed;
}

private ScanServerSelector createScanServerSelector() {
String clazz = ClientProperty.SCAN_SERVER_SELECTOR.getValue(info.getProperties());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ public Entry<Key,Value> next() {
}
}

private void nextTablet()
throws TableNotFoundException, AccumuloException, IOException, AccumuloSecurityException {
private void nextTablet() throws TableNotFoundException, AccumuloException, IOException,
AccumuloSecurityException, ReflectiveOperationException {

Range nextRange;

Expand Down Expand Up @@ -204,8 +204,8 @@ private TabletMetadata getTabletFiles(Range nextRange) {
}

private SortedKeyValueIterator<Key,Value> createIterator(KeyExtent extent,
Collection<StoredTabletFile> absFiles)
throws TableNotFoundException, AccumuloException, IOException, AccumuloSecurityException {
Collection<StoredTabletFile> absFiles) throws TableNotFoundException, AccumuloException,
IOException, AccumuloSecurityException, ReflectiveOperationException {

// possible race condition here, if table is renamed
String tableName = context.getTableName(tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.accumulo.core.conf;

import java.io.IOException;
import java.util.Map.Entry;
import java.util.Objects;

Expand Down Expand Up @@ -160,7 +159,7 @@ private static void verifyValidClassName(String confOption, String className,
Class<?> requiredBaseClass) {
try {
ConfigurationTypeHelper.getClassInstance(null, className, requiredBaseClass);
} catch (IOException | ReflectiveOperationException e) {
} catch (ReflectiveOperationException e) {
fatal(confOption + " has an invalid class name: " + className);
} catch (ClassCastException e) {
fatal(confOption + " must implement " + requiredBaseClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;

import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
Expand Down Expand Up @@ -175,7 +174,7 @@ public static <T> T getClassInstance(String context, String clazzName, Class<T>

try {
instance = getClassInstance(context, clazzName, base);
} catch (RuntimeException | IOException | ReflectiveOperationException e) {
} catch (RuntimeException | ReflectiveOperationException e) {
log.error("Failed to load class {} in classloader context {}", clazzName, context, e);
}

Expand All @@ -196,7 +195,7 @@ public static <T> T getClassInstance(String context, String clazzName, Class<T>
* @return a new instance of the class
*/
public static <T> T getClassInstance(String context, String clazzName, Class<T> base)
throws IOException, ReflectiveOperationException {
throws ReflectiveOperationException {
T instance;

Class<? extends T> clazz = ClassLoaderUtil.loadClass(context, clazzName, base);
Expand Down
28 changes: 28 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -1556,6 +1556,34 @@ public enum Property {
COMPACTOR_CLIENTPORT("compactor.port.client", "9133", PropertyType.PORT,
"The port used for handling client connections on the compactor servers.", "2.1.0"),
@Experimental
COMPACTOR_FAILURE_BACKOFF_THRESHOLD("compactor.failure.backoff.threshold", "3",
PropertyType.COUNT,
"The number of consecutive failures that must occur before the Compactor starts to back off"
+ " processing compactions.",
"2.1.4"),
@Experimental
COMPACTOR_FAILURE_BACKOFF_INTERVAL("compactor.failure.backoff.interval", "0",
PropertyType.TIMEDURATION,
"The time basis for computing the wait time for compaction failure backoff. A value of zero disables"
+ " the backoff feature. When a non-zero value is supplied, then after compactor.failure.backoff.threshold"
+ " failures have occurred, the compactor will wait compactor.failure.backoff.interval * the number of"
+ " failures seconds before executing the next compaction. For example, if this value is 10s, then after"
+ " three failures the Compactor will wait 30s before starting the next compaction. If the compaction fails"
+ " again, then it will wait 40s before starting the next compaction.",
"2.1.4"),
@Experimental
COMPACTOR_FAILURE_BACKOFF_RESET("compactor.failure.backoff.reset", "10m",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not recommending any changes here, was just pondering something. Another way this could work is that it could set a max backoff time instead of a reset time. Once we get to that max time we stop incrementing, but do not reset until a success is seen. Not coming up w/ any advantages for this other approach though. Wondering if there is any particular reason this reset after time approach was chosen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I selected to reset to 0 instead of staying at the max time just as a way to try and recover quicker in the event that the issue was fixed. This was also the reason I didn't do exponential backoff. I'm assuming that the user will fix the issue.

PropertyType.TIMEDURATION,
"The maximum amount of time that the compactor will wait before executing the next compaction. When this"
+ " time limit has been reached, the failures are cleared.",
"2.1.4"),
@Experimental
COMPACTOR_FAILURE_TERMINATION_THRESHOLD("compactor.failure.termination.threshold", "0",
PropertyType.COUNT,
"The number of consecutive failures at which the Compactor exits and the process terminates. A zero"
+ " value disables this feature.",
"2.1.4"),
@Experimental
COMPACTOR_MINTHREADS("compactor.threads.minimum", "4", PropertyType.COUNT,
"The minimum number of threads to use to handle incoming requests.", "2.1.0"),
@Experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ public static IteratorBuilder.IteratorBuilderEnv loadIterConf(IteratorScope scop
*/
public static SortedKeyValueIterator<Key,Value> convertItersAndLoad(IteratorScope scope,
SortedKeyValueIterator<Key,Value> source, AccumuloConfiguration conf,
List<IteratorSetting> iterators, IteratorEnvironment env) throws IOException {
List<IteratorSetting> iterators, IteratorEnvironment env)
throws IOException, ReflectiveOperationException {

List<IterInfo> ssiList = new ArrayList<>();
Map<String,Map<String,String>> ssio = new HashMap<>();
Expand All @@ -194,7 +195,7 @@ public static SortedKeyValueIterator<Key,Value> convertItersAndLoad(IteratorScop
*/
public static SortedKeyValueIterator<Key,Value>
loadIterators(SortedKeyValueIterator<Key,Value> source, IteratorBuilder iteratorBuilder)
throws IOException {
throws IOException, ReflectiveOperationException {
SortedKeyValueIterator<Key,Value> prev = source;
final boolean useClassLoader = iteratorBuilder.useAccumuloClassLoader;
Map<String,Class<SortedKeyValueIterator<Key,Value>>> classCache = new HashMap<>();
Expand Down Expand Up @@ -228,6 +229,7 @@ public static SortedKeyValueIterator<Key,Value> convertItersAndLoad(IteratorScop
} catch (ReflectiveOperationException e) {
log.error("Failed to load iterator {}, for table {}, from context {}", iterInfo.className,
iteratorBuilder.iteratorEnvironment.getTableId(), iteratorBuilder.context, e);
// This has to be a RuntimeException to be handled properly to fail the scan
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.accumulo.core.iteratorsImpl.conf;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -44,12 +43,12 @@ public ColumnToClassMapping() {
}

public ColumnToClassMapping(Map<String,String> objectStrings, Class<? extends K> c)
throws ReflectiveOperationException, IOException {
throws ReflectiveOperationException {
this(objectStrings, c, null);
}

public ColumnToClassMapping(Map<String,String> objectStrings, Class<? extends K> c,
String context) throws ReflectiveOperationException, IOException {
String context) throws ReflectiveOperationException {
this();

for (Entry<String,String> entry : objectStrings.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,42 @@
* <td>FunctionCounter</td>
* <td>Number of entries written by all threads performing compactions</td>
* </tr>
* <tr>
* <td>N/A</td>
* <td>N/A</td>
* <td>{@value #METRICS_COMPACTOR_COMPACTIONS_CANCELLED}</td>
* <td>FunctionCounter</td>
* <td>Number of compactions cancelled on a compactor</td>
* </tr>
* <tr>
* <td>N/A</td>
* <td>N/A</td>
* <td>{@value #METRICS_COMPACTOR_COMPACTIONS_COMPLETED}</td>
* <td>FunctionCounter</td>
* <td>Number of compactions completed on a compactor</td>
* </tr>
* <tr>
* <td>N/A</td>
* <td>N/A</td>
* <td>{@value #METRICS_COMPACTOR_COMPACTIONS_FAILED}</td>
* <td>FunctionCounter</td>
* <td>Number of compactions failed on a compactor</td>
* </tr>
* <tr>
* <td>N/A</td>
* <td>N/A</td>
* <td>{@value #METRICS_COMPACTOR_FAILURES_CONSECUTIVE}</td>
* <td>Gauge</td>
* <td>Number of consecutive compaction failures on a compactor</td>
* </tr>
* <tr>
* <td>N/A</td>
* <td>N/A</td>
* <td>{@value #METRICS_COMPACTOR_FAILURES_TERMINATION}</td>
* <td>Gauge</td>
* <td>Number of Compactors terminated due to consecutive failures. Process exits after this metric
* is incremented, so it's not guaranteed to be seen.</td>
* </tr>
* <!-- fate -->
* <tr>
* <td>currentFateOps</td>
Expand Down Expand Up @@ -625,6 +661,12 @@ public interface MetricsProducer {
String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + "majc.stuck";
String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX + "entries.read";
String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX + "entries.written";
String METRICS_COMPACTOR_COMPACTIONS_CANCELLED = METRICS_COMPACTOR_PREFIX + "majc.cancelled";
String METRICS_COMPACTOR_COMPACTIONS_COMPLETED = METRICS_COMPACTOR_PREFIX + "majc.completed";
String METRICS_COMPACTOR_COMPACTIONS_FAILED = METRICS_COMPACTOR_PREFIX + "majc.failed";
String METRICS_COMPACTOR_FAILURES_CONSECUTIVE =
METRICS_COMPACTOR_PREFIX + "majc.failures.consecutive";
String METRICS_COMPACTOR_FAILURES_TERMINATION = METRICS_COMPACTOR_PREFIX + "terminated";

String METRICS_FATE_PREFIX = "accumulo.fate.";
String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + "ops.in.progress.by.type";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,26 @@
*/
public interface ContextClassLoaderFactory {

class ContextClassLoaderException extends Exception {

private static final long serialVersionUID = 1L;
private static final String msg = "Error getting classloader for context: ";

public ContextClassLoaderException(String context, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(msg + context, cause, enableSuppression, writableStackTrace);
}

public ContextClassLoaderException(String context, Throwable cause) {
super(msg + context, cause);
}

public ContextClassLoaderException(String context) {
super(msg + context);
}

}

/**
* Pass the service environment to allow for additional class loader configuration
*
Expand All @@ -57,14 +77,14 @@ default void init(ContextClassLoaderEnvironment env) {}
/**
* Get the class loader for the given contextName. Callers should not cache the ClassLoader result
* as it may change if/when the ClassLoader reloads. Implementations should throw a
* RuntimeException of some type (such as IllegalArgumentException) if the provided contextName is
* not supported or fails to be constructed.
* ContextClassLoaderException if the provided contextName is not supported or fails to be
* constructed.
*
* @param contextName the name of the context that represents a class loader that is managed by
* this factory. Currently, Accumulo will only call this method for non-null and non-empty
* context. For empty or null context, Accumulo will use the system classloader without
* consulting this plugin.
* @return the class loader for the given contextName
*/
ClassLoader getClassLoader(String contextName);
ClassLoader getClassLoader(String contextName) throws ContextClassLoaderException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.accumulo.core.summary;

import java.io.IOException;

import org.apache.accumulo.core.classloader.ClassLoaderUtil;
import org.apache.accumulo.core.client.summary.Summarizer;
import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
Expand All @@ -41,8 +39,7 @@ public SummarizerFactory(AccumuloConfiguration tableConfig) {
this.context = ClassLoaderUtil.tableContext(tableConfig);
}

private Summarizer newSummarizer(String classname)
throws IOException, ReflectiveOperationException {
private Summarizer newSummarizer(String classname) throws ReflectiveOperationException {
if (classloader != null) {
return classloader.loadClass(classname).asSubclass(Summarizer.class).getDeclaredConstructor()
.newInstance();
Expand All @@ -55,7 +52,7 @@ private Summarizer newSummarizer(String classname)
public Summarizer getSummarizer(SummarizerConfiguration conf) {
try {
return newSummarizer(conf.getClassName());
} catch (ReflectiveOperationException | IOException e) {
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}
Expand Down
Loading