diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java index daa54ee958..f4a7671f35 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java @@ -101,30 +101,56 @@ public void start(String name) { if (this.executor == null) { return; } + LOG.info("Starting {} workers[{}] with queue size {}...", this.workers, name, this.queueSize); + for (int i = 0; i < this.workers; i++) { - this.runningFutures.add( - this.executor.submit(new ContextCallable<>(this::runAndDone))); + // capture submission thread context HERE + ContextCallable task = new ContextCallable<>(this::runAndDone); + + // wrapper ensures latch always decremented even if ContextCallable fails + this.runningFutures.add(this.executor.submit(() -> this.safeRun(task))); + } + } + + private Void safeRun(ContextCallable task) { + try { + return task.call(); // may fail before/after runAndDone() + } catch (Exception e) { + // This exception is from ContextCallable wrapper (setContext/resetContext/delegate dispatch), + // not from runAndDone() business logic (that one is handled inside runAndDone()). + if (this.exception == null) { + this.exception = e; + LOG.error("Consumer worker failed in ContextCallable wrapper", e); + } else { + LOG.warn("Additional worker failure in ContextCallable wrapper; first exception already recorded", e); + } + this.exceptionHandle(e); + } finally { + this.latch.countDown(); } + return null; } private Void runAndDone() { try { this.run(); - } catch (Throwable e) { + } catch (Exception e) { if (e instanceof StopExecution) { this.queue.clear(); putQueueEnd(); } else { - // Only the first exception to one thread can be stored - this.exception = e; - LOG.error("Error when running task", e); + if (this.exception == null) { + this.exception = e; + LOG.error("Unhandled exception in consumer task", e); + } else { + LOG.warn("Additional exception in consumer task; first exception already recorded", e); + } } - exceptionHandle(e); + this.exceptionHandle(e); } finally { this.done(); - this.latch.countDown(); } return null; } diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/util/ConsumersTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/util/ConsumersTest.java new file mode 100644 index 0000000000..1bb152ec75 --- /dev/null +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/util/ConsumersTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.unit.util; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hugegraph.testutil.Assert; +import org.apache.hugegraph.util.Consumers; +import org.junit.Test; + +public class ConsumersTest { + + @Test(timeout = 1000) + public void testStartProvideAwaitNormal() throws Throwable { + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + AtomicInteger processed = new AtomicInteger(); + + Consumers consumers = new Consumers<>(executor, v -> { + processed.incrementAndGet(); + }); + + consumers.start("test"); + for (int i = 0; i < 50; i++) { + consumers.provide(i); + } + consumers.await(); + + Assert.assertEquals("Should process all provided elements", + 50, processed.get()); + } finally { + executor.shutdownNow(); + } + } + + /** + * Regression test for deadlock: + * + * ContextCallable fails before entering runAndDone(). + * await() must still return because latch is decremented in safeRun(). + */ + @Test(timeout = 1000) + public void testAwaitDoesNotHangWhenContextCallableFails() throws Throwable { + ExecutorService executor = Executors.newFixedThreadPool(1); + try { + // Use AssertionError to bypass the inner catch(Exception) loop in runAndDone() + // This simulates a scenario where an exception escapes the task logic + // (similar to how a ContextCallable failure would behave from safeRun's perspective) + Consumers consumers = new Consumers<>(executor, v -> { + throw new AssertionError("Simulated fatal error (OOM/StackOverflow/etc)"); + }); + consumers.start("test-fatal-error"); + consumers.provide(1); + // Verification: + // Without the fix, the latch would never be decremented (because runAndDone crashes), causing await() to hang. + // With the fix (safeRun wrapper), the finally block ensures latch.countDown() is called. + consumers.await(); + + // Note: consumer.exception will be null because safeRun only catches Exception, not Error. + // This is acceptable behavior for fatal errors, as long as it doesn't deadlock. + } finally { + executor.shutdownNow(); + } + } + + @Test(timeout = 1000) + public void testAwaitThrowsWhenConsumerThrows() throws Throwable { + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + final String msg = "Injected exception for test"; + + Consumers consumers = new Consumers<>(executor, v -> { + throw new RuntimeException(msg); + }); + + consumers.start("test"); + consumers.provide(1); + + try { + consumers.await(); + Assert.fail("Expected await() to throw when consumer throws"); + } catch (Throwable t) { + Throwable root = t.getCause() != null ? t.getCause() : t; + Assert.assertTrue("Expected RuntimeException, but got: " + root, + root instanceof RuntimeException); + Assert.assertTrue("Exception message should contain injected message", + root.getMessage() != null && + root.getMessage().contains(msg)); + } + } finally { + executor.shutdownNow(); + } + } +}