From a42a5f8ba8c8ada8339943012bb9d16392c7dc6f Mon Sep 17 00:00:00 2001 From: Rajat Hande Date: Thu, 22 Jan 2026 23:56:08 -0500 Subject: [PATCH] Expand ocm-migration support to migration plans --- .../oracle/oci_migration_mcp_server/models.py | 493 +++++++++++++++++- .../oracle/oci_migration_mcp_server/server.py | 91 +++- 2 files changed, 582 insertions(+), 2 deletions(-) diff --git a/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/models.py b/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/models.py index 39eba853..73fdfe9d 100644 --- a/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/models.py +++ b/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/models.py @@ -5,7 +5,7 @@ """ from datetime import datetime -from typing import Dict, Optional +from typing import Any, Dict, List, Literal, Optional import oci from pydantic import BaseModel, Field @@ -118,6 +118,397 @@ def map_migration(migration_data: oci.cloud_migrations.models.Migration) -> Migr # endregion +# region ResourceAssessmentStrategy + + +# ResourceAssessmentStrategy and its subtype fields +class ResourceAssessmentStrategy(BaseModel): + resource_type: Optional[Literal["CPU", "MEMORY", "ALL", "UNKNOWN_ENUM_VALUE"]] = ( + Field(None, description="The type of resource.") + ) + strategy_type: Optional[ + Literal["AS_IS", "AVERAGE", "PEAK", "PERCENTILE", "UNKNOWN_ENUM_VALUE"] + ] = Field(None, description="The type of strategy used for migration.") + # Subtype-specific, all optional to cover union of subtypes + adjustment_multiplier: Optional[float] = Field( + None, description="Multiplier applied to usage before recommendation." + ) + metric_type: Optional[ + Literal["AUTO", "HISTORICAL", "RUNTIME", "UNKNOWN_ENUM_VALUE"] + ] = Field(None, description="Metric source type for assessment.") + metric_time_window: Optional[Literal["1d", "7d", "30d", "UNKNOWN_ENUM_VALUE"]] = ( + Field(None, description="Time window over which metrics are evaluated.") + ) + percentile: Optional[Literal["P50", "P90", "P95", "P99", "UNKNOWN_ENUM_VALUE"]] = ( + Field(None, description="Percentile value (for percentile strategy).") + ) + + +def map_resource_assessment_strategy(obj) -> ResourceAssessmentStrategy | None: + if not obj: + return None + return ResourceAssessmentStrategy( + resource_type=getattr(obj, "resource_type", None), + strategy_type=getattr(obj, "strategy_type", None), + adjustment_multiplier=getattr(obj, "adjustment_multiplier", None), + metric_type=getattr(obj, "metric_type", None), + metric_time_window=getattr(obj, "metric_time_window", None), + percentile=getattr(obj, "percentile", None), + ) + + +def map_resource_assessment_strategies( + items, +) -> list[ResourceAssessmentStrategy] | None: + if not items: + return None + result: list[ResourceAssessmentStrategy] = [] + for it in items: + mapped = map_resource_assessment_strategy(it) + if mapped is not None: + result.append(mapped) + return result or None + + +# endregion + + +# CostEstimation stack +# region ComputeCostEstimation +class ComputeCostEstimation(BaseModel): + ocpu_per_hour: Optional[float] = None + ocpu_per_hour_by_subscription: Optional[float] = None + memory_gb_per_hour: Optional[float] = None + memory_gb_per_hour_by_subscription: Optional[float] = None + gpu_per_hour: Optional[float] = None + gpu_per_hour_by_subscription: Optional[float] = None + total_per_hour: Optional[float] = None + total_per_hour_by_subscription: Optional[float] = None + ocpu_count: Optional[float] = None + memory_amount_gb: Optional[float] = None + gpu_count: Optional[float] = None + + +# endregion + + +# region StorageCostEstimation +class StorageCostEstimation(BaseModel): + volumes: Optional[List[Dict[str, Any]]] = Field( + None, description="List of volume cost estimations (as dicts)." + ) + total_gb_per_month: Optional[float] = None + total_gb_per_month_by_subscription: Optional[float] = None + + +# endregion + + +# region OsImageEstimation +class OsImageEstimation(BaseModel): + total_per_hour: Optional[float] = None + total_per_hour_by_subscription: Optional[float] = None + + +# endregion + + +# region CostEstimation +class CostEstimation(BaseModel): + compute: Optional[ComputeCostEstimation] = None + storage: Optional[StorageCostEstimation] = None + os_image: Optional[OsImageEstimation] = None + currency_code: Optional[str] = None + total_estimation_per_month: Optional[float] = None + total_estimation_per_month_by_subscription: Optional[float] = None + subscription_id: Optional[str] = None + + +# endregion + + +def map_compute_cost_estimation(obj) -> ComputeCostEstimation | None: + if not obj: + return None + return ComputeCostEstimation( + ocpu_per_hour=getattr(obj, "ocpu_per_hour", None), + ocpu_per_hour_by_subscription=getattr( + obj, "ocpu_per_hour_by_subscription", None + ), + memory_gb_per_hour=getattr(obj, "memory_gb_per_hour", None), + memory_gb_per_hour_by_subscription=getattr( + obj, "memory_gb_per_hour_by_subscription", None + ), + gpu_per_hour=getattr(obj, "gpu_per_hour", None), + gpu_per_hour_by_subscription=getattr(obj, "gpu_per_hour_by_subscription", None), + total_per_hour=getattr(obj, "total_per_hour", None), + total_per_hour_by_subscription=getattr( + obj, "total_per_hour_by_subscription", None + ), + ocpu_count=getattr(obj, "ocpu_count", None), + memory_amount_gb=getattr(obj, "memory_amount_gb", None), + gpu_count=getattr(obj, "gpu_count", None), + ) + + +def map_storage_cost_estimation(obj) -> StorageCostEstimation | None: + if not obj: + return None + vols = getattr(obj, "volumes", None) + if vols is not None: + try: + vols = [_oci_to_dict(v) for v in vols] + except Exception: + vols = None + return StorageCostEstimation( + volumes=vols, + total_gb_per_month=getattr(obj, "total_gb_per_month", None), + total_gb_per_month_by_subscription=getattr( + obj, "total_gb_per_month_by_subscription", None + ), + ) + + +def map_os_image_estimation(obj) -> OsImageEstimation | None: + if not obj: + return None + return OsImageEstimation( + total_per_hour=getattr(obj, "total_per_hour", None), + total_per_hour_by_subscription=getattr( + obj, "total_per_hour_by_subscription", None + ), + ) + + +def map_cost_estimation(obj) -> CostEstimation | None: + if not obj: + return None + return CostEstimation( + compute=map_compute_cost_estimation(getattr(obj, "compute", None)), + storage=map_storage_cost_estimation(getattr(obj, "storage", None)), + os_image=map_os_image_estimation(getattr(obj, "os_image", None)), + currency_code=getattr(obj, "currency_code", None), + total_estimation_per_month=getattr(obj, "total_estimation_per_month", None), + total_estimation_per_month_by_subscription=getattr( + obj, "total_estimation_per_month_by_subscription", None + ), + subscription_id=getattr(obj, "subscription_id", None), + ) + + +# region MigrationPlanStats +class MigrationPlanStats(BaseModel): + total_estimated_cost: Optional[CostEstimation] = None + time_updated: Optional[datetime] = Field( + None, description="The time when the migration plan was calculated. RFC3339." + ) + vm_count: Optional[int] = Field( + None, description="The total count of VMs in migration" + ) + + +# endregion + + +def map_migration_plan_stats(obj) -> MigrationPlanStats | None: + if not obj: + return None + return MigrationPlanStats( + total_estimated_cost=map_cost_estimation( + getattr(obj, "total_estimated_cost", None) + ), + time_updated=getattr(obj, "time_updated", None), + vm_count=getattr(obj, "vm_count", None), + ) + + +# region TargetEnvironment +# TargetEnvironment and VM subtype fields flattened +class TargetEnvironment(BaseModel): + target_compartment_id: Optional[str] = Field( + None, description="Target compartment identifier" + ) + target_environment_type: Optional[ + Literal["VM_TARGET_ENV", "UNKNOWN_ENUM_VALUE"] + ] = Field(None, description="The type of target environment.") + # VM-specific optional fields + availability_domain: Optional[str] = None + fault_domain: Optional[str] = None + vcn: Optional[str] = None + subnet: Optional[str] = None + dedicated_vm_host: Optional[str] = None + ms_license: Optional[str] = None + preferred_shape_type: Optional[str] = None + + +# endregion + + +def map_target_environment(obj) -> TargetEnvironment | None: + if not obj: + return None + return TargetEnvironment( + target_compartment_id=getattr(obj, "target_compartment_id", None), + target_environment_type=getattr(obj, "target_environment_type", None), + availability_domain=getattr(obj, "availability_domain", None), + fault_domain=getattr(obj, "fault_domain", None), + vcn=getattr(obj, "vcn", None), + subnet=getattr(obj, "subnet", None), + dedicated_vm_host=getattr(obj, "dedicated_vm_host", None), + ms_license=getattr(obj, "ms_license", None), + preferred_shape_type=getattr(obj, "preferred_shape_type", None), + ) + + +def map_target_environments(items) -> list[TargetEnvironment] | None: + if not items: + return None + result: list[TargetEnvironment] = [] + for it in items: + mapped = map_target_environment(it) + if mapped is not None: + result.append(mapped) + return result or None + + +# region MigrationPlan + + +class MigrationPlan(BaseModel): + """ + Pydantic model mirroring the fields of oci.cloud_migrations.models.MigrationPlan. + Nested OCI types (strategies, migration_plan_stats, target_environments) are represented + as plain dicts/lists converted via _oci_to_dict for portability. + """ + + id: Optional[str] = Field( + None, description="The unique Oracle ID (OCID) that is immutable on creation." + ) + compartment_id: Optional[str] = Field( + None, description="The OCID of the compartment containing the migration plan." + ) + display_name: Optional[str] = Field( + None, + description=( + "A user-friendly name. Does not have to be unique, and it's changeable. " + "Avoid entering confidential information." + ), + ) + time_created: Optional[datetime] = Field( + None, + description="The time when the migration plan was created. RFC3339 datetime.", + ) + time_updated: Optional[datetime] = Field( + None, + description="The time when the migration plan was updated. RFC3339 datetime.", + ) + lifecycle_state: Optional[ + Literal[ + "CREATING", + "UPDATING", + "NEEDS_ATTENTION", + "ACTIVE", + "DELETING", + "DELETED", + "FAILED", + "UNKNOWN_ENUM_VALUE", + ] + ] = Field(None, description="The current state of the migration plan.") + lifecycle_details: Optional[str] = Field( + None, + description=( + "A message describing the current state in more detail. For example, it " + "can be used to provide actionable information for a resource in Failed state." + ), + ) + migration_id: Optional[str] = Field( + None, description="The OCID of the associated migration." + ) + + # Nested collections + strategies: Optional[List[ResourceAssessmentStrategy]] = Field( + None, description="List of strategies for the resources to be migrated." + ) + migration_plan_stats: Optional[MigrationPlanStats] = Field( + None, description="Statistics/details for the migration plan." + ) + calculated_limits: Optional[Dict[str, int]] = Field( + None, + description=( + "Limits of the resources that are needed for migration. " + 'Example: {"BlockVolume": 2, "VCN": 1}' + ), + ) + target_environments: Optional[List[TargetEnvironment]] = Field( + None, description="List of target environments." + ) + reference_to_rms_stack: Optional[str] = Field( + None, description="OCID of the referenced ORM job." + ) + source_migration_plan_id: Optional[str] = Field( + None, description="Source migration plan ID to be cloned." + ) + freeform_tags: Optional[Dict[str, str]] = Field( + None, + description=( + "Simple key-value pair that is applied without any predefined name, type or scope." + ), + ) + defined_tags: Optional[Dict[str, Dict[str, object]]] = Field( + None, + description=( + "Defined tags for this resource. Each key is predefined and scoped to a namespace." + ), + ) + system_tags: Optional[Dict[str, Dict[str, object]]] = Field( + None, + description=( + "Usage of system tag keys. These predefined keys are scoped to namespaces." + ), + ) + + +def map_migration_plan( + mp: "oci.cloud_migrations.models.MigrationPlan", +) -> MigrationPlan: + """ + Convert an oci.cloud_migrations.models.MigrationPlan to + oracle.oci_migration_mcp_server.models.MigrationPlan. + Nested OCI SDK models are coerced to dicts for transport. + """ + if mp is None: + return None # type: ignore[return-value] + + strategies = map_resource_assessment_strategies(getattr(mp, "strategies", None)) + + target_envs = map_target_environments(getattr(mp, "target_environments", None)) + + return MigrationPlan( + id=getattr(mp, "id", None), + compartment_id=getattr(mp, "compartment_id", None), + display_name=getattr(mp, "display_name", None), + time_created=getattr(mp, "time_created", None), + time_updated=getattr(mp, "time_updated", None), + lifecycle_state=getattr(mp, "lifecycle_state", None), + lifecycle_details=getattr(mp, "lifecycle_details", None), + migration_id=getattr(mp, "migration_id", None), + strategies=strategies, + migration_plan_stats=map_migration_plan_stats( + getattr(mp, "migration_plan_stats", None) + ), + calculated_limits=getattr(mp, "calculated_limits", None), + target_environments=target_envs, + reference_to_rms_stack=getattr(mp, "reference_to_rms_stack", None), + source_migration_plan_id=getattr(mp, "source_migration_plan_id", None), + freeform_tags=getattr(mp, "freeform_tags", None), + defined_tags=getattr(mp, "defined_tags", None), + system_tags=getattr(mp, "system_tags", None), + ) + + +# endregion + + # region MigrationSummary @@ -196,3 +587,103 @@ def map_migration_summary( # endregion + + +# region MigrationPlanSummary + + +class MigrationPlanSummary(BaseModel): + """ + Pydantic model mirroring the fields of oci.cloud_migrations.models.MigrationPlanSummary. + This summary model includes the common top-level fields of a migration plan. + """ + + id: Optional[str] = Field( + None, description="The unique Oracle ID (OCID) that is immutable on creation." + ) + compartment_id: Optional[str] = Field( + None, description="The OCID of the compartment containing the migration plan." + ) + display_name: Optional[str] = Field( + None, + description=( + "A user-friendly name. Does not have to be unique, and it's changeable. " + "Avoid entering confidential information." + ), + ) + time_created: Optional[datetime] = Field( + None, + description="The time when the migration plan was created. RFC3339 datetime.", + ) + time_updated: Optional[datetime] = Field( + None, + description="The time when the migration plan was updated. RFC3339 datetime.", + ) + lifecycle_state: Optional[ + Literal[ + "CREATING", + "UPDATING", + "NEEDS_ATTENTION", + "ACTIVE", + "DELETING", + "DELETED", + "FAILED", + "UNKNOWN_ENUM_VALUE", + ] + ] = Field(None, description="The current state of the migration plan.") + lifecycle_details: Optional[str] = Field( + None, + description=( + "A message describing the current state in more detail. For example, it " + "can be used to provide actionable information for a resource in Failed state." + ), + ) + migration_id: Optional[str] = Field( + None, description="The OCID of the associated migration." + ) + freeform_tags: Optional[Dict[str, str]] = Field( + None, + description=( + "Simple key-value pair that is applied without any predefined name, type or scope." + ), + ) + defined_tags: Optional[Dict[str, Dict[str, object]]] = Field( + None, + description=( + "Defined tags for this resource. Each key is predefined and scoped to a namespace." + ), + ) + system_tags: Optional[Dict[str, Dict[str, object]]] = Field( + None, + description=( + "Usage of system tag keys. These predefined keys are scoped to namespaces." + ), + ) + + +def map_migration_plan_summary( + mps: "oci.cloud_migrations.models.MigrationPlanSummary", +) -> MigrationPlanSummary: + """ + Convert an oci.cloud_migrations.models.MigrationPlanSummary to + oracle.oci_migration_mcp_server.models.MigrationPlanSummary. + """ + if mps is None: # type: ignore[unreachable] + return None # type: ignore[return-value] + + return MigrationPlanSummary( + id=getattr(mps, "id", None), + compartment_id=getattr(mps, "compartment_id", None), + display_name=getattr(mps, "display_name", None), + time_created=getattr(mps, "time_created", None), + time_updated=getattr(mps, "time_updated", None), + lifecycle_state=getattr(mps, "lifecycle_state", None), + lifecycle_details=getattr(mps, "lifecycle_details", None), + migration_id=getattr(mps, "migration_id", None), + freeform_tags=getattr(mps, "freeform_tags", None), + defined_tags=getattr(mps, "defined_tags", None), + system_tags=getattr(mps, "system_tags", None), + ) + + +# endregion diff --git a/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/server.py b/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/server.py index 7ded0f87..5d650bb2 100644 --- a/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/server.py +++ b/src/oci-migration-mcp-server/oracle/oci_migration_mcp_server/server.py @@ -12,8 +12,12 @@ from fastmcp import FastMCP from oracle.oci_migration_mcp_server.models import ( Migration, + MigrationPlan, + MigrationPlanSummary, MigrationSummary, map_migration, + map_migration_plan, + map_migration_plan_summary, map_migration_summary, ) from pydantic import Field @@ -80,7 +84,7 @@ def list_migrations( ] ] = Field(None, description="The lifecycle state of the migration to filter on"), ) -> list[MigrationSummary]: - migrations: list[Migration] = [] + migrations: list[MigrationSummary] = [] try: client = get_migration_client() @@ -117,6 +121,91 @@ def list_migrations( raise e +@mcp.tool( + description="List Migration Plans for a compartment," + "optionally filtered by lifecycle state or migration id", +) +def list_migration_plans( + compartment_id: str = Field(..., description="The OCID of the compartment"), + migration_id: Optional[str] = Field( + None, description="Filter by the OCID of the associated migration" + ), + limit: Optional[int] = Field( + None, + description="The maximum amount of migration plans to return. If None, there is no limit.", + ge=1, + ), + lifecycle_state: Optional[ + Literal[ + "CREATING", + "UPDATING", + "NEEDS_ATTENTION", + "ACTIVE", + "DELETING", + "DELETED", + "FAILED", + ] + ] = Field( + None, description="The lifecycle state of the migration plan to filter on" + ), +) -> list[MigrationPlanSummary]: + plans: list[MigrationPlanSummary] = [] + + try: + client = get_migration_client() + + response: oci.response.Response = None # type: ignore[assignment] + has_next_page = True + next_page: Optional[str] = None + + while has_next_page and (limit is None or len(plans) < limit): + kwargs = { + "compartment_id": compartment_id, + "page": next_page, + "limit": limit, + } + if lifecycle_state is not None: + kwargs["lifecycle_state"] = lifecycle_state + if migration_id is not None: + kwargs["migration_id"] = migration_id + + response = client.list_migration_plans(**kwargs) + has_next_page = response.has_next_page + next_page = response.next_page if hasattr(response, "next_page") else None + + data: list[oci.cloud_migrations.models.MigrationPlanSummary] = ( + response.data.items + ) + for d in data: + plans.append(map_migration_plan_summary(d)) + if limit is not None and len(plans) >= limit: + break + + logger.info(f"Found {len(plans)} Migration Plans") + return plans + + except Exception as e: + logger.error(f"Error in list_migration_plans tool: {str(e)}") + raise e + + +@mcp.tool(description="Get details for a specific Migration Plan by OCID") +def get_migration_plan( + migration_plan_id: str = Field(..., description="OCID of the migration plan") +) -> MigrationPlan: + try: + client = get_migration_client() + + response: oci.response.Response = client.get_migration_plan(migration_plan_id) + data: oci.cloud_migrations.models.MigrationPlan = response.data + logger.info("Found Migration Plan") + return map_migration_plan(data) + + except Exception as e: + logger.error(f"Error in get_migration_plan tool: {str(e)}") + raise e + + def main(): host = os.getenv("ORACLE_MCP_HOST")