Fix launcher job scheduling directives when unsuspending#772
Conversation
|
@GonzaloSaez could you sign DCO? |
@GonzaloSaez Could you keep the current mechanism (creating a batch/v1 Job even when the MPIJob is suspended)? |
You can follow https://github.com/kubeflow/mpi-operator/pull/772/checks?check_run_id=63645778871 steps to sign DCO. |
880261d to
fe8d324
Compare
tenzen-y
left a comment
There was a problem hiding this comment.
@GonzaloSaez Thank you for working on this problem.
Basically, LGTM.
Additionally, could you add an integration test case to https://github.com/kubeflow/mpi-operator/blob/master/test/integration/mpi_job_controller_test.go?
pkg/controller/mpi_job_controller.go
Outdated
| // so we must clear it first via a status sub-resource update (consistent with JobSet). | ||
| if launcher.Status.StartTime != nil { | ||
| launcher.Status.StartTime = nil | ||
| if _, err := c.kubeClient.BatchV1().Jobs(namespace).UpdateStatus(context.TODO(), launcher, metav1.UpdateOptions{}); err != nil { |
There was a problem hiding this comment.
| if _, err := c.kubeClient.BatchV1().Jobs(namespace).UpdateStatus(context.TODO(), launcher, metav1.UpdateOptions{}); err != nil { | |
| var err error | |
| if launcher, err = c.kubeClient.BatchV1().Jobs(namespace).UpdateStatus(context.TODO(), launcher, metav1.UpdateOptions{}); err != nil { |
Could you update launcher after startTime update to avoid coflict while scheduling directive update?
pkg/controller/mpi_job_controller.go
Outdated
| // syncLauncherSchedulingDirectives updates the mutable scheduling directives (as per KEP-2926) on | ||
| // the launcher Job's pod template to match the desired template. | ||
| func syncLauncherSchedulingDirectives(launcher *batchv1.Job, desired *corev1.PodTemplateSpec) { | ||
| if launcher.Spec.Template.Labels == nil { |
There was a problem hiding this comment.
| if launcher.Spec.Template.Labels == nil { | |
| if desired.Labels != nil && launcher.Spec.Template.Labels == nil { |
Optimizing initialization would be better.
There was a problem hiding this comment.
I went ahead and re-used some of the jobset code, lmk what you think please
There was a problem hiding this comment.
The idea sounds reasonable.
I left a comment for improvement: #772 (comment)
pkg/controller/mpi_job_controller.go
Outdated
| // the launcher Job's pod template to match the desired template. | ||
| func syncLauncherSchedulingDirectives(launcher *batchv1.Job, desired *corev1.PodTemplateSpec) { | ||
| if launcher.Spec.Template.Labels == nil { | ||
| launcher.Spec.Template.Labels = make(map[string]string) |
There was a problem hiding this comment.
| launcher.Spec.Template.Labels = make(map[string]string) | |
| launcher.Spec.Template.Labels = make(map[string]string, len(desired.Labels)) |
pkg/controller/mpi_job_controller.go
Outdated
| if desired.Annotations != nil { | ||
| if launcher.Spec.Template.Annotations == nil { | ||
| launcher.Spec.Template.Annotations = make(map[string]string) | ||
| } | ||
| for k, v := range desired.Annotations { | ||
| launcher.Spec.Template.Annotations[k] = v | ||
| } | ||
| } |
There was a problem hiding this comment.
| if desired.Annotations != nil { | |
| if launcher.Spec.Template.Annotations == nil { | |
| launcher.Spec.Template.Annotations = make(map[string]string) | |
| } | |
| for k, v := range desired.Annotations { | |
| launcher.Spec.Template.Annotations[k] = v | |
| } | |
| } | |
| if desired.Annotations != nil && launcher.Spec.Template.Annotations == nil { | |
| launcher.Spec.Template.Annotations = make(map[string]string) | |
| } | |
| for k, v := range desired.Annotations { | |
| launcher.Spec.Template.Annotations[k] = v | |
| } |
The range loop will be executed only when the desired.Annotaions are not null.
fe8d324 to
9228f9b
Compare
pkg/controller/mpi_job_controller.go
Outdated
| mergeMaps := func(old, new map[string]string) map[string]string { | ||
| merged := make(map[string]string, max(len(old), len(new))) | ||
| maps.Copy(merged, old) | ||
| maps.Copy(merged, new) | ||
| return merged | ||
| } |
There was a problem hiding this comment.
Could you implement mergeMaps[K comparable, V any](a, b map[K]V]) map[K]V separately instead of an anonymous function?
func mergeMaps[K comparable, V any](a, b map[K]V]) map[K]V {
merged := make(map[K]V, max(len(a), len(b)))
maps.Copy(merged, a)
maps.Copy(merged, b)
return merged
}|
@GonzaloSaez, some of the CI jobs failed. Please take a look. |
| "kueue.x-k8s.io/workload": "my-workload", | ||
| } | ||
| launcherTemplate.Spec.NodeSelector = map[string]string{ | ||
| "cloud.google.com/gke-accelerator": "nvidia-tesla-t4", |
There was a problem hiding this comment.
| "cloud.google.com/gke-accelerator": "nvidia-tesla-t4", | |
| "example.com/accelerator": "example-model", |
Could you avoid the vendor-specific one?
| // launcher Job gets the updated scheduling directives on second resume. | ||
| mpiJobLauncherTemplate := &mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template | ||
| mpiJobLauncherTemplate.ObjectMeta.Labels["foo"] = "baz" | ||
| mpiJobLauncherTemplate.Spec.NodeSelector["cloud.google.com/gke-accelerator"] = "nvidia-tesla-t4-v2" |
There was a problem hiding this comment.
| mpiJobLauncherTemplate.Spec.NodeSelector["cloud.google.com/gke-accelerator"] = "nvidia-tesla-t4-v2" | |
| mpiJobLauncherTemplate.Spec.NodeSelector["example.com/accelerator"] = "example-model" |
ditto
Signed-off-by: GonzaloSaez <11050889+GonzaloSaez@users.noreply.github.com>
9228f9b to
3e448c5
Compare
|
I tried running the kueue e2e test from kubernetes-sigs/kueue#9253 but it still fails. However, I see the nodeSelector, scheduling gates, etc. being propagated to the launcher job so I think it may be related to the job configuration or that we are missing something else wrt to separation between launcher and worker pods in kueue. I can also take a look at it if needed. |
Thank you for verifying the TAS test. Yes, ideally, we would like to confirm the test case, but let use try that separate from this enhancement. I will also check if any are missing. |
| } | ||
|
|
||
| func mergeMaps[K comparable, V any](a, b map[K]V) map[K]V { | ||
| merged := make(map[K]V, max(len(a), len(b))) |
There was a problem hiding this comment.
| merged := make(map[K]V, max(len(a), len(b))) | |
| merged := make(map[K]V, len(a)+len(b)) |
Sorry for the confusion. As I check this code again, shouldn't this be the sum of a and b?
There was a problem hiding this comment.
It depends, if a and b have the same or very similar keys then we'd over-allocating. Lmk what you prefer
There was a problem hiding this comment.
I believe that the caller functions of mergeMaps should avoid consider internal implementations which means even the length of a and b are pretty different should be considered.
There was a problem hiding this comment.
Surely, in the worst case (a and b are mostly the same and both have very big lengths), it will allocate too redundant memory.
There was a problem hiding this comment.
Alright, both approaches (max(a, b) and sum(a, b)) have different problems, and I don't want to waste time on trivial discussions. So, I would approve the current your approach.
tenzen-y
left a comment
There was a problem hiding this comment.
@GonzaloSaez Thank you for addressed comment.
Otherwise LGTM
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: tenzen-y The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
I manually checked the E2E case, and verifications succeeded. One thing is that the current expected result is not correct. The correct one is the following: This is expected because the MPIJob resource requirements are mixed across roles. |
This should address #770.
If an MPIJob is suspended and then unsuspended (i.e. like Kueue would do during workload creation or when preemption occurs), the launcher job would not have the correct scheduling directives after launch job unsuspension. We need to perform the same operations as JobSet does: https://github.com/kubernetes-sigs/jobset/blob/f1bbaaef64b2a56c4721843b1d83750d21227948/pkg/controllers/jobset_controller.go#L537