Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,30 +101,56 @@ public void start(String name) {
if (this.executor == null) {
return;
}

LOG.info("Starting {} workers[{}] with queue size {}...",
this.workers, name, this.queueSize);

for (int i = 0; i < this.workers; i++) {
this.runningFutures.add(
this.executor.submit(new ContextCallable<>(this::runAndDone)));
// capture submission thread context HERE
ContextCallable<Void> task = new ContextCallable<>(this::runAndDone);

// wrapper ensures latch always decremented even if ContextCallable fails
this.runningFutures.add(this.executor.submit(() -> this.safeRun(task)));
}
}

private Void safeRun(ContextCallable<Void> task) {
try {
return task.call(); // may fail before/after runAndDone()
} catch (Exception e) {
// This exception is from ContextCallable wrapper (setContext/resetContext/delegate dispatch),
// not from runAndDone() business logic (that one is handled inside runAndDone()).
if (this.exception == null) {
this.exception = e;
LOG.error("Consumer worker failed in ContextCallable wrapper", e);
} else {
LOG.warn("Additional worker failure in ContextCallable wrapper; first exception already recorded", e);
}
this.exceptionHandle(e);
} finally {
this.latch.countDown();
}
return null;
}

private Void runAndDone() {
try {
this.run();
} catch (Throwable e) {
} catch (Exception e) {
if (e instanceof StopExecution) {
this.queue.clear();
putQueueEnd();
} else {
// Only the first exception to one thread can be stored
this.exception = e;
LOG.error("Error when running task", e);
if (this.exception == null) {
this.exception = e;
LOG.error("Unhandled exception in consumer task", e);
} else {
LOG.warn("Additional exception in consumer task; first exception already recorded", e);
}
}
exceptionHandle(e);
this.exceptionHandle(e);
} finally {
this.done();
this.latch.countDown();
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.unit.util;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hugegraph.testutil.Assert;
import org.apache.hugegraph.util.Consumers;
import org.junit.Test;

public class ConsumersTest {

@Test(timeout = 1000)
public void testStartProvideAwaitNormal() throws Throwable {
ExecutorService executor = Executors.newFixedThreadPool(2);
try {
AtomicInteger processed = new AtomicInteger();

Consumers<Integer> consumers = new Consumers<>(executor, v -> {
processed.incrementAndGet();
});

consumers.start("test");
for (int i = 0; i < 50; i++) {
consumers.provide(i);
}
consumers.await();

Assert.assertEquals("Should process all provided elements",
50, processed.get());
} finally {
executor.shutdownNow();
}
}

/**
* Regression test for deadlock:
*
* ContextCallable fails before entering runAndDone().
* await() must still return because latch is decremented in safeRun().
*/
@Test(timeout = 1000)
public void testAwaitDoesNotHangWhenContextCallableFails() throws Throwable {
ExecutorService executor = Executors.newFixedThreadPool(1);
try {
// Use AssertionError to bypass the inner catch(Exception) loop in runAndDone()
// This simulates a scenario where an exception escapes the task logic
// (similar to how a ContextCallable failure would behave from safeRun's perspective)
Consumers<Integer> consumers = new Consumers<>(executor, v -> {
throw new AssertionError("Simulated fatal error (OOM/StackOverflow/etc)");
});
consumers.start("test-fatal-error");
consumers.provide(1);
// Verification:
// Without the fix, the latch would never be decremented (because runAndDone crashes), causing await() to hang.
// With the fix (safeRun wrapper), the finally block ensures latch.countDown() is called.
consumers.await();

// Note: consumer.exception will be null because safeRun only catches Exception, not Error.
// This is acceptable behavior for fatal errors, as long as it doesn't deadlock.
} finally {
executor.shutdownNow();
}
}

@Test(timeout = 1000)
public void testAwaitThrowsWhenConsumerThrows() throws Throwable {
ExecutorService executor = Executors.newFixedThreadPool(2);
try {
final String msg = "Injected exception for test";

Consumers<Integer> consumers = new Consumers<>(executor, v -> {
throw new RuntimeException(msg);
});

consumers.start("test");
consumers.provide(1);

try {
consumers.await();
Assert.fail("Expected await() to throw when consumer throws");
} catch (Throwable t) {
Throwable root = t.getCause() != null ? t.getCause() : t;
Assert.assertTrue("Expected RuntimeException, but got: " + root,
root instanceof RuntimeException);
Assert.assertTrue("Exception message should contain injected message",
root.getMessage() != null &&
root.getMessage().contains(msg));
}
} finally {
executor.shutdownNow();
}
}
}
Loading