Introduce Compactor configuration for dealing with consecutive failures #5726
Introduce Compactor configuration for dealing with consecutive failures #5726dlmarion merged 21 commits intoapache:2.1from
Conversation
AccumuloVFSClassLoader.getContextClassLoader throws an UncheckedIOException when there is an issue getting the ClassLoader for a context name. This runtime exception escapes all the way up to the calling code in Compactor and TabletServer, which fails the compaction and moves on to the next compaction. If there is an issue getting the ClassLoader for the context once, then it's likely to happen again. It's probably not safe to terminate the TabletServer in this case, but is likely safe for the Compactor. This change captures the RuntimeException in the FileCompactor.compactLocalityGroup where the iterator stack is created and raises a new checked exception which is handled in the calling code.
| .convertItersAndLoad(env.getIteratorScope(), cfsi, acuTableConf, iterators, iterEnv)); | ||
| SortedKeyValueIterator<Key,Value> stack = null; | ||
| try { | ||
| stack = IteratorConfigUtil.convertItersAndLoad(env.getIteratorScope(), cfsi, acuTableConf, |
There was a problem hiding this comment.
If a table is configured w/ an incorrect iterator class name what would happen in that case with these changes?
There was a problem hiding this comment.
Looking at the code path, ClassLoaderUtil.loadClass would get the ClassLoader, but calling ClassLoader.loadClass would throw a ClassNotFoundException, which is not a RuntimeException. I think it would fail the compaction, pick up the next one, and keep failing until the configuration is fixed. Do you think we should add ReflectiveOperationException to this catch clause so that Compactors die if there is any issue loading iterator classes?
There was a problem hiding this comment.
Ignore what I said above. I wrote an IT and it turns out that an iterator class in the configuration that does not exist will be caught here. IteratorConfigUtil.loadIterators catches RelfectiveOperationException and raises a RuntimeException. I wonder if we just want to catch Exception here instead of RuntimeException.
There was a problem hiding this comment.
Do you think we should add ReflectiveOperationException to this catch clause so that Compactors die if there is any issue loading iterator classes?
Personally I would not want all compactors to die if bad config w/ an incorrect class name was placed on a tables iterator settings.
There was a problem hiding this comment.
Specifically in the context of compactors, I think we would want bad configuration to drive the process dieing. This would only be expected at the start of a compaction with a new configuration. Since the sole responsibility of the compactor is to compact, it terminating is a very clear message it cannot do that process. My understanding is that it would not impact existing compactions in progress. In the absence of something like this we'd need additional metrics that capture failed compactions so we could monitor that state in addition to busy/idle. As-is we have to monitor for the impacts of failing compactions cascading across the cluster, then go back to individual compactor logs to determine who is healthy and who isn't after the fact. This is much more complex than just checking which processes are down and pulling their recent log history.
There was a problem hiding this comment.
Wondering if this fix is too narrow. Maybe we want to do something more general like the following.
- HAve a configurable consecutive compaction failure count that cause process death
- Do exponential backoff between failed compactions.
This would more gracefully deal with consistently failing compactions that happen for any reason. Like if a compactor fails to compact 10 times in a row after backing off between each attempt, just exit the process.
There was a problem hiding this comment.
I don't think we should kill compactors when there is a bad compaction configuration. That compaction should certainly be aborted, though. I think that in general, the service should remain available for future compactions, if at all possible.
There was a problem hiding this comment.
The code currently in this PR (as of commit 7c7ecef no longer kills the Compactors.
There was a problem hiding this comment.
The code in this PR as of commit 95ebecf conditionally kills the compactor.
| .convertItersAndLoad(env.getIteratorScope(), cfsi, acuTableConf, iterators, iterEnv)); | ||
| SortedKeyValueIterator<Key,Value> stack = null; | ||
| try { | ||
| stack = IteratorConfigUtil.convertItersAndLoad(env.getIteratorScope(), cfsi, acuTableConf, |
There was a problem hiding this comment.
I don't think we should kill compactors when there is a bad compaction configuration. That compaction should certainly be aborted, though. I think that in general, the service should remain available for future compactions, if at all possible.
server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
Outdated
Show resolved
Hide resolved
server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
Outdated
Show resolved
Hide resolved
|
7c7ecef pushes the class of the exception that occurred on the Compactor back to the Coordinator. This is used for logging and for incrementing failure counters for the queue, compactor, and table. Subsequent compaction successes will decrement the counters for the queue, compactor, and table. Using these counters we can return an empty job back to the Compactor when the current error rate is over some threshold. We could also emit metrics from the Coordinator based on these failure counts. The logic and accounting in 7c7ecef is not 100% correct. I pushed it up as-is to get feedback on if and how we should move forward with this idea. |
...paction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
Outdated
Show resolved
Hide resolved
|
6563aee modifies the Compactor such that it can be configured to wait progressively longer on consecutive compaction failures. The failures are reported to the Coordinator, which logs a failure summary every 5 minutes. |
|
Created some test scripts to explore compaction failures so I can see what the current code does. I can also try running them against these changes. |
|
I have updated the description to match what the code in this PR currently does. I'm going to work on modifying the IT to try and test some of this new code. @keith-turner - Except for changes from testing, and comments from PR review, I don't think I have any major changes planned for this PR if you want to start testing it with your new scripts. I did change the implementation since your last review, so it might be good to review this first before testing. |
keith-turner
left a comment
There was a problem hiding this comment.
Took a look through the changes, going to try running some test w/ these changes.
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
Outdated
Show resolved
Hide resolved
| Set<String> allCompactorAddrs = new HashSet<>(); | ||
| allCompactors.values().forEach(l -> l.forEach(c -> allCompactorAddrs.add(c.toString()))); | ||
| failingCompactors.keySet().retainAll(allCompactorAddrs); | ||
|
|
There was a problem hiding this comment.
Another way this tracking could work is that it counts success and failure. Each time it logs it takes a snapshot of the counts, logs them, and then deducts the snapshot it logged. This will give an indication of the successes and failures since the last time this functions ran. Also could only log when there is failure and maybe log a line per thing (interpreting large maps in the log can be difficult). Maybe something like the following.
// this is not really correct, its assuming we also get a snapshot of the value in the map.. but that is not really true
Map<String, SuccessFailureCounts> queueSnapshot = Map.copyOf(failingQueues);
queueSnapshot.foreach((queue, counts)-> {
if(counts.failures > 0){
LOG.warn("Queue {} had {} successes and {} failures in the last {}ms", queue, counts.successes, counts.failures, logInterval);
// TODO decrement counts logged from failingQueues, by decrementing only the
// counts logged we do not lose any concurrent increments made while logging
}
});There was a problem hiding this comment.
Created this PR dlmarion#56 .. but I have not tested the changes
| + " again, then it will wait 40s before starting the next compaction.", | ||
| "2.1.4"), | ||
| @Experimental | ||
| COMPACTOR_FAILURE_BACKOFF_RESET("compactor.failure.backoff.reset", "10m", |
There was a problem hiding this comment.
Not recommending any changes here, was just pondering something. Another way this could work is that it could set a max backoff time instead of a reset time. Once we get to that max time we stop incrementing, but do not reset until a success is seen. Not coming up w/ any advantages for this other approach though. Wondering if there is any particular reason this reset after time approach was chosen?
There was a problem hiding this comment.
I selected to reset to 0 instead of staying at the max time just as a way to try and recover quicker in the event that the issue was fixed. This was also the reason I didn't do exponential backoff. I'm assuming that the user will fix the issue.
| * @return the class loader for the given contextName | ||
| */ | ||
| ClassLoader getClassLoader(String contextName); | ||
| ClassLoader getClassLoader(String contextName) throws IOException, ReflectiveOperationException; |
There was a problem hiding this comment.
Is there a benefit to these two specific exceptions? If we want information to travel through the code via a checked exception, then it may be better to create a very specific exception related to this SPI. This allows knowing that class loader creation failed w/o trying to guess at specific reasons/exceptions that it could fail, the specific reason should be in the cause. In general we may want to know this type of failure happened, but we probably do not care too much why it happened. Whenever it happens for any reasons its not good.
// maybe this should extend Exception
/**
* @since 2.1.4
* /
public static class ClassLoaderCreationFailed extends AccumuloException {
public ClassLoaderCreationFailed(Throwable cause) {}
public ClassLoaderCreationFailed(String msg, Throwable cause) {}
}
ClassLoader getClassLoader(String contextName) throws ClassLoaderCreationFailed;We could also leave this SPI as is and create a new internal exception that is always thrown when class loading creation fails. This allows this very specific and important information to travel in the internal code. Could do the more minimal change below in 2.1 and add the checked exception to the SPI in 4.0. Not opposed to adding a checked exception in 2.1.4 to the SPI though, would need to document the breaking change in the release notes.
public class ClassLoaderUtil {
// create this class outside of public API... any code in the class that attempts to create a classloader and fails should throw this exception
// this could be a checked or runtime exception... not sure which is best
public static class ClassLoaderCreationFailed extends RuntimeException {
}
public static ClassLoader getClassLoader(String context) {
try{
return FACTORY.getClassLoader(context);
} catch (Exception e) {
throw new ClassLoaderCreationFailed("Failed to create context "+context, e);
}
}
}There was a problem hiding this comment.
I don't have an opinion on this either way, except that whatever is thrown should be a checked exception so that it must be handled. Using a RuntimeException is part of the reason for this PR.
There was a problem hiding this comment.
If there is no benefit to using IOException, ReflectiveOperationException then IMO creating a new checked exception specific to the situation would be more informative for someone looking at logs or for someone reading code that throws it.
| 2:security.TCredentials credentials | ||
| 3:string externalCompactionId | ||
| 4:data.TKeyExtent extent | ||
| 5:string exceptionClassName |
There was a problem hiding this comment.
this may be a breaking change to thrift RPC? or maybe it will be null/ignored when there is difference so maybe its ok. Also coordinator is experimental.
There was a problem hiding this comment.
If using a 2.1.4 Compactor and 2.1.3 Coordinator, then I don't think this is an issue. Not sure about the reverse, maybe it's always null?
| errorHistory.values().stream().mapToLong(p -> p.getSecond().get()).sum(); | ||
| if (totalFailures > 0) { | ||
| LOG.warn("This Compactor has had {} consecutive failures. Failures: {}", totalFailures, | ||
| errorHistory); |
There was a problem hiding this comment.
Wondering how this will look, its always relogging the entire error history. Also it seems like it will only log the first exception seen for a table, is that the intent? I will know more about how this looks when I run some test.
There was a problem hiding this comment.
Yeah, I guess that I'm assuming the Throwable in Map<TableId,Pair<Throwable,AtomicLong>> would not really change in consecutive failures. I guess we could change it to List<Pair<>>, or remove the Throwable entirely and just make sure we log it. The exception class does get logged in the Coordinator when the compaction fails.
There was a problem hiding this comment.
This is what the logging looks like
2025-07-15T17:46:35,339 [compactor.Compactor] WARN : This Compactor has had 1 consecutive failures. Failures: {1=(java.lang.ClassNotFoundException: org.apache.accumulo.testing.continuous.ValidatingIterator,1)}
There was a problem hiding this comment.
The issue w/ the property type caused the compactor do die, so have not seen a count greater than one yet.
There was a problem hiding this comment.
The summaries may be nice if you know to look for them. Could do something like Map<TableId, Map<ClassName, FailureCount>>
There was a problem hiding this comment.
I'm thinking that I should just remove Throwable from the errorHistory map. The specific exception that caused the compaction to fail is logged at error at Compactor.java line 594.
There was a problem hiding this comment.
Ok, I missed your last comment before I posted mine. I can work on improving the error history map.
|
Seeing some nice results experimenting with these changes. Ran a test w/ 100 tablets w/ continual bulk import into 20 random tablets. There were 16 compactors, 8 of which would always fail. After 15 mins saw the following counts using the default settings. Restarted the same test setting My suspicion is there were more successful compactions in the first case because the avg files per tablet was higher because of the failed compactions delaying compactions. Going to do some more digging and see if that is the case. |
Log4J was not invoking ErrorHistory.toString when logging, so explicitly called toString when logging. Also, Throwable doesn't implement hashCode, so the HashMap wasn't working as expected. Changed the map key from Throwable to String to fix
|
Ran the two test again tracking average files per tablet. With I know it would be a change in behavior, but wondering if the default settings should make compactors backoff. Could be slight like |
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
Show resolved
Hide resolved
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
Outdated
Show resolved
Hide resolved
|
Tried running another test that did the following. For both tables below create 100 tablets.
In this case the tablets in table The test has been running for a bit, seeing table |
| .description("Number compactions that have succeeded on this compactor").register(registry); | ||
| FunctionCounter.builder(METRICS_COMPACTOR_COMPACTIONS_FAILED, this, Compactor::getFailures) | ||
| .description("Number compactions that have failed on this compactor").register(registry); | ||
| FunctionCounter.builder(METRICS_COMPACTOR_FAILURES_TERMINATION, this, Compactor::getTerminated) |
There was a problem hiding this comment.
Seems like this metric will usually not be seen as 1, if the compactor exits before the metric system polls the 1.
There was a problem hiding this comment.
The changes in my last commits having to deal with AbstractServer.close were actually to close the ServerContext so that the MeterRegistry gets closed. Closing the MeterRegistry's may end up doing a final poll on the metrics before closing down. It looks like the StatsD implementation does that anyway. But I agree, it's best effort and may not be guaranteed.
There was a problem hiding this comment.
I was wondering what the close changes were for
There was a problem hiding this comment.
That metric does show up in the IT FWIW
|
Full IT build completed successfully |
This change propagates exceptions from the classloading related code. Prior to this change some of the exceptions raised by the classloading code would be caught early and a RuntimeException would be raised instead.
This change also modifies the Compactor to conditionally delay execution of the next compaction job when the Compactors has been failing to complete consecutive prior compactions. Four new properties control the behavior of new delay logic.
This change also modifies the API between the Compactor and Coordinator when compactions have failed. The exception class is now relayed to the coordinator, which is tracking and periodically logging a summary of the failures.