chore(spark): migrate SDK to kubeflow_spark_api Pydantic models#295
Conversation
|
🎉 Welcome to the Kubeflow SDK! 🎉 Thanks for opening your first PR! We're happy to have you as part of our community 🚀 Here's what happens next:
Join the community:
Feel free to ask questions in the comments if you need any help or clarification! |
Pull Request Test Coverage Report for Build 22043884336Details
💛 - Coveralls |
There was a problem hiding this comment.
Pull request overview
This PR refactors the Spark SDK to use typed Pydantic models from kubeflow_spark_api instead of raw dictionaries for CRD construction. This aligns the Spark SDK with the established architecture pattern used by the Trainer SDK.
Changes:
- Added
kubeflow-spark-api>=2.3.0dependency and migrated from dict-based CRD construction to typed Pydantic models - Updated all option implementations to work with Pydantic models instead of dictionaries
- Refactored backend methods to convert to/from Pydantic models at the Kubernetes API boundary
Reviewed changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| pyproject.toml | Added kubeflow-spark-api>=2.3.0 dependency |
| uv.lock | Lock file updates for new kubeflow-spark-api dependency |
| kubeflow/spark/backends/kubernetes/backend.py | Convert between dict and Pydantic models at API boundary using .to_dict() and .from_dict() |
| kubeflow/spark/backends/kubernetes/utils.py | Refactored build_spark_connect_crd to return Pydantic model; renamed parse_spark_connect_status to get_spark_connect_info_from_cr with Pydantic input |
| kubeflow/spark/types/options.py | Updated all option callables to accept SparkConnect Pydantic model instead of dict |
| kubeflow/spark/backends/kubernetes/backend_test.py | Enhanced mock responses to include all required fields for Pydantic model validation |
| kubeflow/spark/backends/kubernetes/utils_test.py | Updated tests to work with Pydantic models and added validation test for invalid CR |
| kubeflow/spark/types/options_test.py | Migrated tests to use spark_connect_model fixture and verify Pydantic model attributes |
| hack/Dockerfile.spark-e2e-runner | Added --pre flag to allow installation of pre-release versions |
| role_spec.template = models.IoK8sApiCoreV1PodTemplateSpec() | ||
|
|
||
| # Convert existing template to dict, merge, and convert back | ||
| existing_dict = role_spec.template.to_dict() if role_spec.template else {} |
There was a problem hiding this comment.
Redundant None check in ternary expression. Since role_spec.template is guaranteed to be non-None after line 193, the ternary expression role_spec.template.to_dict() if role_spec.template else {} can be simplified to role_spec.template.to_dict().
| existing_dict = role_spec.template.to_dict() if role_spec.template else {} | |
| existing_dict = role_spec.template.to_dict() |
2df7db9 to
255b2ad
Compare
|
/assign @Shekharrajak |
andreyvelich
left a comment
There was a problem hiding this comment.
Thanks @tariq-hasan!
Overall, looks great.
cc @kubeflow/kubeflow-sdk-team @Fiona-Waters @abhijeet-dhumal @jaiakash
| WORKDIR /app | ||
|
|
||
| COPY pyproject.toml README.md LICENSE ./ | ||
| COPY kubeflow/ kubeflow/ |
There was a problem hiding this comment.
Can we remove these filters and run Spark E2E tests for every PR?
https://github.com/tariq-hasan/sdk/blob/255b2ad2e3f953b3aa78deebd4b20a137eb0667c/.github/workflows/test-spark-examples.yaml#L4-L12
It should be fine to run them on every PR, like we do for other tests.
For example, we didn't trigger Spark tests when PySpark dependency is updated: #300
There was a problem hiding this comment.
Sounds good. I have removed the paths.
| base_response = { | ||
| "apiVersion": f"{constants.SPARK_CONNECT_GROUP}/{constants.SPARK_CONNECT_VERSION}", | ||
| "kind": constants.SPARK_CONNECT_KIND, | ||
| "spec": { | ||
| "sparkVersion": constants.DEFAULT_SPARK_VERSION, | ||
| "image": constants.DEFAULT_SPARK_IMAGE, | ||
| "server": { | ||
| "cores": constants.DEFAULT_DRIVER_CPU, | ||
| "memory": constants.DEFAULT_DRIVER_MEMORY, | ||
| }, | ||
| "executor": { | ||
| "instances": 2, | ||
| "cores": constants.DEFAULT_EXECUTOR_CPU, | ||
| "memory": constants.DEFAULT_EXECUTOR_MEMORY, | ||
| }, | ||
| }, | ||
| } |
There was a problem hiding this comment.
Can you refactor it to use Spark Models, like we do in Trainer: https://github.com/tariq-hasan/sdk/blob/255b2ad2e3f953b3aa78deebd4b20a137eb0667c/kubeflow/trainer/backends/kubernetes/backend_test.py#L312 ?
There was a problem hiding this comment.
I have introduced the get_spark_connect function to return a typed model in place of the dict-based approach.
| """ | ||
| base_spec = { | ||
| "sparkVersion": constants.DEFAULT_SPARK_VERSION, | ||
| "image": constants.DEFAULT_SPARK_IMAGE, | ||
| "server": { | ||
| "cores": constants.DEFAULT_DRIVER_CPU, | ||
| "memory": constants.DEFAULT_DRIVER_MEMORY, | ||
| }, | ||
| "executor": { | ||
| "instances": 2, | ||
| "cores": constants.DEFAULT_EXECUTOR_CPU, | ||
| "memory": constants.DEFAULT_EXECUTOR_MEMORY, | ||
| }, |
There was a problem hiding this comment.
The get_spark_connect function is used here now as well.
| API ExecutorSpec model. | ||
| """ | ||
| # Determine number of instances | ||
| if executor and executor.num_instances is not None: |
There was a problem hiding this comment.
| if executor and executor.num_instances is not None: | |
| if executor and executor.num_instances: |
There was a problem hiding this comment.
I have updated the code to remove is not None.
| # Determine number of instances | ||
| if executor and executor.num_instances is not None: | ||
| instances = executor.num_instances | ||
| elif num_executors is not None: |
There was a problem hiding this comment.
| elif num_executors is not None: | |
| elif num_executors: |
There was a problem hiding this comment.
I have updated the code here as well to remove is not None.
| @@ -99,8 +184,8 @@ def build_spark_connect_crd( | |||
| executor: Optional[Executor] = None, | |||
There was a problem hiding this comment.
I still think we should remove driver and executor spec from the first released version, and extend it later.
@Shekharrajak Do you have any particular use-case when users want to set it?
| executor: Optional[Executor] = None, |
| name=metadata.get("name", ""), | ||
| namespace=metadata.get("namespace", ""), | ||
| name=spark_connect_cr.metadata.name, | ||
| namespace=spark_connect_cr.metadata.namespace or "", |
There was a problem hiding this comment.
namespace cannot be none
| namespace=spark_connect_cr.metadata.namespace or "", | |
| namespace=spark_connect_cr.metadata.namespace, |
There was a problem hiding this comment.
I have made the change.
| crd = spark_connect.to_dict() | ||
| assert crd["spec"]["executor"]["cores"] == 2 | ||
| assert crd["spec"]["executor"]["memory"] == "4g" |
There was a problem hiding this comment.
Can you refactor these tests to just access the object fields, you don't need to run to_dict(), e.g.
assert crd.spec.executor.cores == 2There was a problem hiding this comment.
Sounds good. I have made the change to use the spark_connect typed model directly and removed the to_dict() conversion.
pyproject.toml
Outdated
| "pydantic>=2.10.0", | ||
| "kubeflow-trainer-api>=2.0.0", | ||
| "kubeflow-katib-api>=0.19.0", | ||
| "kubeflow-spark-api>=2.3.0", |
There was a problem hiding this comment.
Can you add this to the spark extras, alongside pyspark.
There was a problem hiding this comment.
I have added it there as well.
There was a problem hiding this comment.
@tariq-hasan I think, we can remove it from the main deps for now.
| "kubeflow-spark-api>=2.3.0", |
255b2ad to
740fbf4
Compare
|
I have rebased the PR as well to account for the changes coming from #288. |
| ) | ||
|
|
||
|
|
||
| def build_spark_connect_crd( |
There was a problem hiding this comment.
| def build_spark_connect_crd( | |
| def build_spark_connect_cr( |
There was a problem hiding this comment.
I have replaced all the CRD references with CR references.
| ) -> dict[str, Any]: | ||
| """Build SparkConnect CRD manifest (KEP-107 compliant). | ||
| ) -> models.SparkV1alpha1SparkConnect: | ||
| """Build SparkConnect CRD using typed API models (KEP-107 compliant). |
There was a problem hiding this comment.
| """Build SparkConnect CRD using typed API models (KEP-107 compliant). | |
| """Build SparkConnect CR using typed API models (KEP-107 compliant). |
|
|
||
| Returns: | ||
| SparkConnect CRD as dictionary. | ||
| SparkConnect CRD as typed Pydantic model. |
There was a problem hiding this comment.
| SparkConnect CRD as typed Pydantic model. | |
| SparkConnect CR as typed Pydantic model. |
andreyvelich
left a comment
There was a problem hiding this comment.
@tariq-hasan Do you want me to release the 2.4.0 version to PyPI?
pyproject.toml
Outdated
| "pydantic>=2.10.0", | ||
| "kubeflow-trainer-api>=2.0.0", | ||
| "kubeflow-katib-api>=0.19.0", | ||
| "kubeflow-spark-api>=2.3.0", |
There was a problem hiding this comment.
@tariq-hasan I think, we can remove it from the main deps for now.
| "kubeflow-spark-api>=2.3.0", |
@andreyvelich Sounds good with me. |
|
@tariq-hasan I've published 2.4.0 version to PyPI, you can update pyproject.toml and uv.lock |
| "coverage>=7.0", | ||
| "kubeflow_trainer_api@git+https://github.com/kubeflow/trainer.git@master#subdirectory=api/python_api", | ||
| "kubeflow_katib_api@git+https://github.com/kubeflow/katib.git@master#subdirectory=api/python_api", | ||
| "kubeflow_spark_api@git+https://github.com/kubeflow/spark-operator.git@master#subdirectory=api/python_api", |
There was a problem hiding this comment.
@andreyvelich I have noticed that the version defined in the kubeflow-spark-api package is 2.3.0: https://github.com/kubeflow/spark-operator/blob/master/api/python_api/kubeflow_spark_api/__init__.py#L16.
Should I raise a PR to update the version in https://github.com/kubeflow/spark-operator/blob/master/api/python_api/kubeflow_spark_api/__init__.py?
There was a problem hiding this comment.
Yes, we should update this to v2.4.0, and run make generate. I did it in my local branch.
https://github.com/kubeflow/spark-operator/blob/master/VERSION#L1
There was a problem hiding this comment.
I have raised the PR: kubeflow/spark-operator#2853.
After it is merged I can update pyproject.toml and uv.lock in this PR to ensure sync with the 2.4.0 package.
astefanutti
left a comment
There was a problem hiding this comment.
Thanks @tariq-hasan!
/lgtm
|
/hold for this: kubeflow/spark-operator#2853 |
Signed-off-by: tariq-hasan <mmtariquehsn@gmail.com>
Signed-off-by: tariq-hasan <mmtariquehsn@gmail.com>
Signed-off-by: tariq-hasan <mmtariquehsn@gmail.com>
Signed-off-by: tariq-hasan <mmtariquehsn@gmail.com>
Signed-off-by: tariq-hasan <mmtariquehsn@gmail.com>
Signed-off-by: tariq-hasan <mmtariquehsn@gmail.com>
|
@tariq-hasan Since this is merged, please rebase this PR, so we can move forward: kubeflow/spark-operator#2853 |
876488d to
21991a3
Compare
Signed-off-by: tariq-hasan <mmtariquehsn@gmail.com>
7919126 to
2c6201a
Compare
Signed-off-by: tariq-hasan <mmtariquehsn@gmail.com>
cea64e5 to
e562676
Compare
@andreyvelich I have rebased the PR and updated the package version for |
andreyvelich
left a comment
There was a problem hiding this comment.
Thanks for the updates @tariq-hasan!
/lgtm
/approve
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: andreyvelich The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
/hold cancel |
|
Will merge this manually. |
What this PR does / why we need it:
This PR migrates the Spark SDK from constructing CRDs using raw dictionaries to using the typed Pydantic models provided by
kubeflow_spark_api. There are no user-facing API changes in this PR.What changed:
kubeflow_spark_api.from_dict()instead of manual extractionDriver,Executor,SparkConnectInfo) unchangedWhy:
Testing:
Tested against
kubeflow_spark_api==2.4.0rc0.Which issue(s) this PR fixes (optional, in
Fixes #<issue number>, #<issue number>, ...format, will close the issue(s) when PR gets merged):Fixes #271
Checklist: