Skip to content

Conversation

@next-n
Copy link

@next-n next-n commented Jan 30, 2026

Description
Fixes #673
Fix a bug where a fan-in node in a DAG workflow could be executed more than once when multiple predecessor nodes enqueue it concurrently.
This change ensures that only a single designated predecessor is allowed to trigger fan-in execution once all dependencies are complete.

Type of Change

Bug fix (non-breaking change which fixes an issue)

Testing

I have added tests that prove my fix is effective or that my feature works

I have run the existing tests and they pass

I have run cargo fmt and cargo clippy

Checklist

My code follows the code style of this project

I have performed a self-review of my own code

I have commented my code, particularly in hard-to-understand areas

My changes generate no new warnings

I have added tests that prove my fix is effective or that my feature works

New and existing unit tests pass locally with my changes

Additional Notes

This fix addresses duplicate enqueue caused specifically by concurrent fan-in scheduling.

@next-n next-n requested a review from geofmureithi as a code owner January 30, 2026 13:31
Copy link
Member

@geofmureithi geofmureithi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some thoughts

.await?;
// TODO(bug): The check of done is not a good one as it can be called more than once if the jobs a too quickly executed
if results.iter().all(|s| matches!(s.status, Status::Done)) {
// ===== FIX START =====
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line

Suggested change
// ===== FIX START =====

pending_dependencies: dependency_task_ids,
});
}
// ===== FIX END =====
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line too

Suggested change
// ===== FIX END =====

@next-n
Copy link
Author

next-n commented Feb 2, 2026

I accidentally deleted the test conversation,
here is the test I added

#[tokio::test]
    async fn test_fan_in_runs_once_with_multiple_predecessors() {
        let counter = Arc::new(AtomicUsize::new(0));
        let barrier = Arc::new(Barrier::new(2));

        let dag = DagFlow::new("fan-in-single-exec-workflow");

        let a = {
            let barrier = barrier.clone();
            dag.add_node("a", task_fn(move |task: u32| {
                let barrier = barrier.clone();
                async move {
                    barrier.wait().await;
                    task
                }
            }))
        };

        let b = {
            let barrier = barrier.clone();
            dag.add_node("b", task_fn(move |task: u32| {
                let barrier = barrier.clone();
                async move {
                    barrier.wait().await;
                    task
                }
            }))
        };

        let counter_clone = Arc::clone(&counter);
        let _fan_in = dag
            .add_node(
                "fan_in",
                task_fn(move |task: (u32, u32), worker: WorkerContext| {
                    let counter = Arc::clone(&counter_clone);
                    async move {
                        counter.fetch_add(1, Ordering::SeqCst);
                        worker.stop().unwrap();
                        task.0 + task.1
                    }
                }),
            )
            .depends_on((&a, &b));

        let mut backend: JsonStorage<Value> = JsonStorage::new_temp().unwrap();
        backend.start_fan_out((1u32, 2u32)).await.unwrap();

        let worker = WorkerBuilder::new("rango-tango")
            .backend(backend)
            .build(dag);

        worker.run().await.unwrap();

        assert_eq!(counter.load(Ordering::SeqCst), 1);
    }
I have not pushed the code yet, Does this coverage look sufficient to you, or would you prefer a different structure?
And should I also delete comment of TODO?

@geofmureithi
Copy link
Member

Your approach is not ideal specifically because its not a practical use case scenario.

You possibly need to use TestWorker

Setup dag without passing the barrier (just normal nodes), execute that with TestWorker then collect the results and assert that the results match what we expect.

@next-n
Copy link
Author

next-n commented Feb 2, 2026

“I tried TestWorker, but for this fan-in flow with JsonStorage it doesn’t preserve the dependency-completion behavior needed by check_status, so it can loop in waiting state. I switched to a normal worker test to validate the actual runtime behavior: multi-predecessor fan-in executes once and produces the expected result.” Can you check the following code and give feedback?

#[tokio::test]
async fn test_fan_in_runs_once_with_multiple_predecessors() {
    use std::sync::{
        Arc,
        atomic::{AtomicUsize, Ordering},
    };

    let counter = Arc::new(AtomicUsize::new(0));
    let sum = Arc::new(AtomicUsize::new(0));
    let dag = DagFlow::new("fan-in-single-exec-workflow");

    let a = dag.add_node("a", task_fn(|task: u32| async move { task }));
    let b = dag.add_node("b", task_fn(|task: u32| async move { task }));

    let counter_clone = Arc::clone(&counter);
    let sum_clone = Arc::clone(&sum);
    let _fan_in = dag
        .add_node(
            "fan_in",
            task_fn(move |task: (u32, u32), worker: WorkerContext| {
                let counter = Arc::clone(&counter_clone);
                let sum = Arc::clone(&sum_clone);
                async move {
                    counter.fetch_add(1, Ordering::SeqCst);
                    sum.store((task.0 + task.1) as usize, Ordering::SeqCst);
                    worker.stop().unwrap();
                    task.0 + task.1
                }
            }),
        )
        .depends_on((&a, &b));

    let mut backend: JsonStorage<Value> = JsonStorage::new_temp().unwrap();
    backend.start_fan_out((1u32, 2u32)).await.unwrap();

    let worker = WorkerBuilder::new("rango-tango")
        .backend(backend)
        .on_event(|ctx, ev| {
            if matches!(ev, Event::Error(_)) {
                ctx.stop().unwrap();
            }
        })
        .build(dag);
    worker.run().await.unwrap();

    assert_eq!(counter.load(Ordering::SeqCst), 1);
    assert_eq!(sum.load(Ordering::SeqCst), 3);
}

@geofmureithi
Copy link
Member

I tried TestWorker, but for this fan-in flow with JsonStorage it doesn’t preserve the dependency-completion behavior needed by check_status, so it can loop in waiting state

Could you elaborate? You approach is now better but I think if TestWorker is not working then I would want to fix that as it would signify an underlying problem.

@geofmureithi
Copy link
Member

If it is an underlying problem with JsonStorage let me know so that we can separate it from this issue.

@next-n
Copy link
Author

next-n commented Feb 2, 2026

I investigated this further and it’s not specific to JsonStorage.

The issue is with TestWorker semantics for fan-in flows.

Fan-in scheduling relies on backend.check_status() seeing all predecessors as Done.
TestWorker emits task results immediately when the service future completes, before backend state used by check_status() is guaranteed to be visible.

Because of that, fan-in can stay in WaitingForDependencies and the test can loop/hang even though predecessor results were emitted.

A real worker run does not have this issue, which is why the normal worker test passes.

This looks like either:

a known limitation of TestWorker for dependency-driven workflows, or

a TestWorker bug (it should emit only after backend completion state is observable).

Happy to open a separate issue/PR for TestWorker once we agree on expected behavior.

@geofmureithi
Copy link
Member

That doesnt sound right as TestWorker just wraps a regular worker and pipes results into a channel.
Can you provide the code you are using for TestWorker?

@next-n
Copy link
Author

next-n commented Feb 2, 2026

The issue is in when TestWorker emits task completion.

In TestEmitService::call():

let res = fut.await;
tx.send((task_id, Ok(res))).await.unwrap();

The result is emitted immediately after the service future resolves.

Fan-in scheduling, however, relies on:

backend.check_status(dependency_task_ids)

which assumes that dependency completion is already visible in the backend.

With TestWorker, there is no synchronization point between
“service finished” and “backend status observable by check_status”.
So execute_next() can see predecessor results while the backend still
reports them as incomplete, leaving the fan-in node in
WaitingForDependencies.

This doesn’t occur with a normal worker, where backend state and
scheduling advance together.

@geofmureithi
Copy link
Member

geofmureithi commented Feb 2, 2026

Can you share the code you are using with TestWorker?

@next-n
Copy link
Author

next-n commented Feb 2, 2026

#[tokio::test]
async fn test_fan_in_runs_once_with_multiple_predecessors_testworker() {
    use std::sync::{
        Arc,
        atomic::{AtomicUsize, Ordering},
    };

    use apalis_file_storage::JsonStorage;
    use serde_json::Value;
    use apalis_core::worker::test_worker::{TestWorker, ExecuteNext};
    use apalis_core::task_fn::task_fn;

    let counter = Arc::new(AtomicUsize::new(0));
    let dag = DagFlow::new("fan-in-testworker");

    let a = dag.add_node("a", task_fn(|task: u32| async move { task }));
    let b = dag.add_node("b", task_fn(|task: u32| async move { task }));

    let counter_clone = Arc::clone(&counter);
    let _fan_in = dag
        .add_node(
            "fan_in",
            task_fn(move |task: (u32, u32)| {
                let counter = Arc::clone(&counter_clone);
                async move {
                    counter.fetch_add(1, Ordering::SeqCst);
                    task.0 + task.1
                }
            }),
        )
        .depends_on((&a, &b));

    let mut backend: JsonStorage<Value> = JsonStorage::new_temp().unwrap();
    backend.start_fan_out((1u32, 2u32)).await.unwrap();

    let mut worker = TestWorker::new(backend, dag);

    // This loop never reaches fan_in completion
    while let Some(item) = worker.execute_next().await {
        let (_task_id, resp) = item.unwrap();
        let _ = resp.unwrap();
    }

    // This assertion is never reached
    assert_eq!(counter.load(Ordering::SeqCst), 1);
}

@geofmureithi
Copy link
Member

Yeah, you are doing it wrong.

Since: worker.execute_next() returns a Stream, use .take(4) and assert that the resp returned is what we expect.

It should possibly be something like vec[EnqueuedNext{..}, EnqueuedNext{..}, WaitingForDependencies {.. }, Complete {..}].

Also remove let counter = Arc::clone(&counter_clone);. TestWorker already handles this.

@geofmureithi
Copy link
Member

geofmureithi commented Feb 2, 2026

Ah I see the problem, TestWorker does not expose stream:

 TestWorker {
            stream,

You can still work around this by exiting the while loop on Complete or using a loop and breaking at the 4th iteration. The key goal is to use resp rather than the counter.

I will add an API improvement to expose the stream

Let me know if you need help.

@next-n
Copy link
Author

next-n commented Feb 2, 2026

I tried implementing the TestWorker-based test as suggested.
However, the fan-in node never reaches Complete under TestWorker — it remains in WaitingForDependencies, so complete == 0.

This happens because fan-in relies on backend.check_status, and with TestWorker the emitted results are observable before dependency completion is visible to check_status. As a result, the fan-in gate never opens.
this is the code I used

#[tokio::test]
async fn test_fan_in_runs_once_with_multiple_predecessors() {
    use apalis_core::{
        task_fn::task_fn,
        worker::test_worker::{ExecuteNext, TestWorker},
    };
    use apalis_file_storage::JsonStorage;
    use serde_json::Value;

    let dag = DagFlow::new("fan-in-testworker");

    let a = dag.add_node("a", task_fn(|task: u32| async move { task }));
    let b = dag.add_node("b", task_fn(|task: u32| async move { task }));

    let _fan_in = dag
        .add_node(
            "fan_in",
            task_fn(|task: (u32, u32)| async move { task.0 + task.1 }),
        )
        .depends_on((&a, &b));

    let mut backend: JsonStorage<Value> = JsonStorage::new_temp().unwrap();
    backend.start_fan_out((1u32, 2u32)).await.unwrap();

    let mut worker = TestWorker::new(backend, dag);

    use super::response::DagExecutionResponse::*;

    let mut enqueued_next = 0;
    let mut waiting = 0;
    let mut complete = 0;
    let mut complete_value = None;

    // bounded loop to avoid hangs
    for _ in 0..50 {
        let Some(item) = worker.execute_next().await else { break };
        let (_task_id, resp) = item.unwrap();
        let resp = resp.unwrap();

        match resp {
            EnqueuedNext { .. } => enqueued_next += 1,
            WaitingForDependencies { .. } => waiting += 1,
            Complete { result } => {
                complete += 1;
                complete_value = Some(result);
                break; // stop after first completion
            }
            _ => {}
        }
    }

    // Assertions focus on semantics, not order
    assert_eq!(enqueued_next, 2, "both predecessors must enqueue once");
    assert!(waiting >= 1, "fan-in must wait at least once");
    assert_eq!(complete, 1, "fan-in must execute exactly once");
    assert_eq!(complete_value, Some(Value::from(3u32)));
}

@geofmureithi
Copy link
Member

The TestWorkerService wraps all other services and should work appropriately. (Hence not the issue you are describing)

My suspicion is that there is an extra bug here:

let req = req.map(|_| encoded_input); // Replace args with fan-in input
let response = executor.call(req).await?;
(response, context)

It doesnt look obvious but the executor will return ExecuteNext and since there is no Next and we dont check so, then it doesnt return the correct result which should be Complete.

Can you confirm this by debugging the first 4 results? and posting them here.

@next-n
Copy link
Author

next-n commented Feb 2, 2026

[0] task_id=TaskId(RandomId("4yzi9nwccm4")) resp=Ok(EntryFanOut { node_task_ids: {NodeIndex(0): TaskId(RandomId("4yzi9pwccm4")), NodeIndex(1): TaskId(RandomId("4yzi9pmhn5p"))} })
[1] task_id=TaskId(RandomId("4yzi9pmhn5p")) resp=Ok(EnqueuedNext { result: Number(2) })
[2] task_id=TaskId(RandomId("4yzi9pwccm4")) resp=Ok(EnqueuedNext { result: Number(1) })
[3] task_id=TaskId(RandomId("4yzi9qyqw4k")) resp=Ok(WaitingForDependencies { pending_dependencies: {NodeIndex(0): TaskId(RandomId("4yzi9pwccm4")), NodeIndex(1): TaskId(RandomId("4yzi9pmhn5p"))} })
[4] task_id=TaskId(RandomId("4yzi9re6m5-")) resp=Ok(WaitingForDependencies { pending_dependencies: {NodeIndex(0): TaskId(RandomId("4yzi9pwccm4")), NodeIndex(1): TaskId(RandomId("4yzi9pmhn5p"))} })
[5] task_id=TaskId(RandomId("4yzi9te6m5-")) resp=Ok(WaitingForDependencies { pending_dependencies: {NodeIndex(0): TaskId(RandomId("4yzi9pwccm4")), NodeIndex(1): TaskId(RandomId("4yzi9pmhn5p"))} })
[6] task_id=TaskId(RandomId("4yzi9trzjjw")) resp=Ok(WaitingForDependencies { pending_dependencies: {NodeIndex(0): TaskId(RandomId("4yzi9pwccm4")), NodeIndex(1): TaskId(RandomId("4yzi9pmhn5p"))} })
[7] task_id=TaskId(RandomId("4yzi9ukxq95")) resp=Ok(WaitingForDependencies { pending_dependencies: {NodeIndex(0): TaskId(RandomId("4yzi9pwccm4")), NodeIndex(1): TaskId(RandomId("4yzi9pmhn5p"))} })

I captured the first few results from TestWorker (see logs above).

Both predecessor nodes run successfully — each emits EnqueuedNext.
However, the fan-in node never transitions to Complete.

@next-n
Copy link
Author

next-n commented Feb 2, 2026

From this trace, dependency execution is not the issue — both predecessors emit EnqueuedNext successfully.

The problem is probably that the fan-in path never reaches Complete and keeps returning WaitingForDependencies with the same task IDs. This suggests the executor is not advancing from ExecuteNext → Complete once dependencies are satisfied (likely around the executor.call(req) path you pointed out).

@geofmureithi
Copy link
Member

This doesnt look right, why would there be more than 5 results?

Anyways, let me look at it and then get back.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DAG fan‑in node can execute more than once when dependencies finish close together

2 participants