diff --git a/pkg/controller/batch/taskqueue_controller.go b/pkg/controller/batch/taskqueue_controller.go index 0dd9373c..2822151b 100644 --- a/pkg/controller/batch/taskqueue_controller.go +++ b/pkg/controller/batch/taskqueue_controller.go @@ -126,6 +126,9 @@ func (r *TaskQueueReconciler) syncTaskQueueStatus(ctx context.Context, tq *queue } } } + if r.getInProgressTaskCount(tq) < tq.Spec.MaxConcurrentTasks { // In case of fewer tasks than max concurrent, we should processPendingTasks + shouldRequeue = false + } return shouldRequeue, utilerrors.NewAggregate(errs) } diff --git a/pkg/controller/batch/watcher.go b/pkg/controller/batch/watcher.go index 22fde3e1..6140ea7b 100644 --- a/pkg/controller/batch/watcher.go +++ b/pkg/controller/batch/watcher.go @@ -98,7 +98,7 @@ func (r *TaskQueueReconciler) handleResourcesUpdate(ctx context.Context, obj int return nil } if _, err := r.syncTaskQueueStatus(ctx, tq, func(key string) bool { - return key == newObj.GetName() + return key == fmt.Sprintf("%s/%s", newObj.GetNamespace(), newObj.GetName()) }); err != nil { return fmt.Errorf("failed to sync task status: %w", err) }