diff --git a/src/oci-faaas-mcp-server/oracle/oci_faaas_mcp_server/models.py b/src/oci-faaas-mcp-server/oracle/oci_faaas_mcp_server/models.py index 10e95562..43047c15 100644 --- a/src/oci-faaas-mcp-server/oracle/oci_faaas_mcp_server/models.py +++ b/src/oci-faaas-mcp-server/oracle/oci_faaas_mcp_server/models.py @@ -213,3 +213,670 @@ def map_fusion_environment_status(data: Any) -> FusionEnvironmentStatus: time_created=_get(data, "time_created"), details=details or None, ) + + +class AdminUserSummary(BaseModel): + """ + IDM admin credentials without password. + + Attributes: + - username: Admin username + - email_address: Admin user's email address + - first_name: Admin user's first name + - last_name: Admin user's last name + """ + + username: Optional[str] = Field(None, description="Admin username") + email_address: Optional[str] = Field(None, description="Admin user's email address") + first_name: Optional[str] = Field(None, description="Admin user's first name") + last_name: Optional[str] = Field(None, description="Admin user's last name") + + +def map_admin_user_summary(u: Any) -> Optional[AdminUserSummary]: + """Convert an OCI SDK AdminUserSummary model (or dict) into Pydantic AdminUserSummary.""" + if not u: + return None + data = _oci_to_dict(u) or {} + return AdminUserSummary( + username=getattr(u, "username", None) or data.get("username"), + email_address=getattr(u, "email_address", None) or data.get("email_address"), + first_name=getattr(u, "first_name", None) or data.get("first_name"), + last_name=getattr(u, "last_name", None) or data.get("last_name"), + ) + + +class AdminUserCollection(BaseModel): + """ + A page of AdminUserSummary objects. + """ + + items: Optional[List[AdminUserSummary]] = Field( + None, description="A page of AdminUserSummary objects." + ) + + +def map_admin_user_collection(coll: Any) -> Optional[AdminUserCollection]: + """Convert an OCI SDK AdminUserCollection model (or dict) into Pydantic AdminUserCollection.""" + if not coll: + return None + items = getattr(coll, "items", None) + if items is None and isinstance(coll, dict): + items = coll.get("items") + + result: List[AdminUserSummary] = [] + if items: + for it in items: + mapped = map_admin_user_summary(it) + if mapped: + result.append(mapped) + return AdminUserCollection(items=result) + + +class RefreshActivitySummary(BaseModel): + """ + Summary of the refresh activity. + + Attributes: + - display_name: A friendly name for the refresh activity. Can be changed later. + - id: The unique identifier (OCID) of the refresh activity. Can’t be changed after creation. + - is_data_masking_opted: Represents if the customer opted for Data Masking or not + during refreshActivity. + - lifecycle_details: A message describing the current state in more detail. + - lifecycle_state: The current state of the refresh activity. + Valid values are Scheduled, In progress, Failed, Completed. + - refresh_issue_details_list: Details of refresh investigation information, + each item represents a different issue. + - service_availability: Service availability / impact during refresh activity execution, up or down. + - source_fusion_environment_id: The OCID of the Fusion environment that is the + source environment for the refresh. + - time_accepted: The time the refresh activity record was created. RFC3339 datetime. + - time_expected_finish: The time the refresh activity is scheduled to end. RFC3339 datetime. + - time_finished: The time the refresh activity actually completed / cancelled / failed. + RFC3339 datetime. + - time_of_restoration_point: The date and time of the most recent source + environment backup used for the environment refresh. + - time_scheduled_start: The time the refresh activity is scheduled to start. RFC3339 datetime. + - time_updated: The time the refresh activity record was updated. RFC3339 datetime. + """ + + display_name: Optional[str] = Field( + None, + description="A friendly name for the refresh activity. Can be changed later.", + ) + id: Optional[str] = Field( + None, + description=( + "The unique identifier (OCID) of the refresh activity. " + "Can’t be changed after creation." + ), + ) + is_data_masking_opted: Optional[bool] = Field( + None, + description="Represents if the customer opted for Data Masking or not during refreshActivity.", + ) + lifecycle_details: Optional[str] = Field( + None, + description="A message describing the current state in more detail.", + ) + lifecycle_state: Optional[str] = Field( + None, + description=( + "The current state of the refresh activity. " + "Valid values are Scheduled, In progress, Failed, Completed." + ), + ) + refresh_issue_details_list: Optional[List[Dict[str, Any]]] = Field( + None, + description=( + "Details of refresh investigation information, each item " + "represents a different issue." + ), + ) + service_availability: Optional[str] = Field( + None, + description=( + "Service availability / impact during refresh activity execution, " + "up or down." + ), + ) + source_fusion_environment_id: Optional[str] = Field( + None, + description="The OCID of the Fusion environment that is the source environment for the refresh.", + ) + time_accepted: Optional[datetime] = Field( + None, + description="The time the refresh activity record was created. RFC3339 datetime.", + ) + time_expected_finish: Optional[datetime] = Field( + None, + description="The time the refresh activity is scheduled to end. RFC3339 datetime.", + ) + time_finished: Optional[datetime] = Field( + None, + description=( + "The time the refresh activity actually completed / cancelled / failed. " + "RFC3339 datetime." + ), + ) + time_of_restoration_point: Optional[datetime] = Field( + None, + description=( + "The date and time of the most recent source environment backup used " + "for the environment refresh." + ), + ) + time_scheduled_start: Optional[datetime] = Field( + None, + description="The time the refresh activity is scheduled to start. RFC3339 datetime.", + ) + time_updated: Optional[datetime] = Field( + None, + description="The time the refresh activity record was updated. RFC3339 datetime.", + ) + + +def map_refresh_activity_summary(obj: Any) -> Optional[RefreshActivitySummary]: + """ + Convert an OCI SDK RefreshActivitySummary (or dict) into Pydantic RefreshActivitySummary. + """ + if not obj: + return None + data = _oci_to_dict(obj) or {} + + # Map issue details to plain dicts for flexibility (we don't define the nested model here) + raw_issues = getattr(obj, "refresh_issue_details_list", None) or data.get( + "refresh_issue_details_list" + ) + issues: Optional[List[Dict[str, Any]]] = None + if raw_issues is not None: + issues = [] + for it in raw_issues: + issues.append(_oci_to_dict(it) or it) + + return RefreshActivitySummary( + id=getattr(obj, "id", None) or data.get("id"), + display_name=getattr(obj, "display_name", None) or data.get("display_name"), + source_fusion_environment_id=getattr(obj, "source_fusion_environment_id", None) + or data.get("source_fusion_environment_id"), + time_of_restoration_point=getattr(obj, "time_of_restoration_point", None) + or data.get("time_of_restoration_point"), + lifecycle_state=getattr(obj, "lifecycle_state", None) + or data.get("lifecycle_state"), + time_scheduled_start=getattr(obj, "time_scheduled_start", None) + or data.get("time_scheduled_start"), + time_expected_finish=getattr(obj, "time_expected_finish", None) + or data.get("time_expected_finish"), + time_finished=getattr(obj, "time_finished", None) or data.get("time_finished"), + service_availability=getattr(obj, "service_availability", None) + or data.get("service_availability"), + time_accepted=getattr(obj, "time_accepted", None) or data.get("time_accepted"), + time_updated=getattr(obj, "time_updated", None) or data.get("time_updated"), + is_data_masking_opted=getattr(obj, "is_data_masking_opted", None) + or data.get("is_data_masking_opted"), + lifecycle_details=getattr(obj, "lifecycle_details", None) + or data.get("lifecycle_details"), + refresh_issue_details_list=issues, + ) + + +class RefreshActivityCollection(BaseModel): + """ + Results of a refresh activity search. + + Attributes: + - items: A page of refresh activity objects. + """ + + items: Optional[List[RefreshActivitySummary]] = Field( + None, description="A page of refresh activity objects." + ) + + +def map_refresh_activity_collection(coll: Any) -> Optional[RefreshActivityCollection]: + """Convert an OCI SDK RefreshActivityCollection model (or dict) into Pydantic + RefreshActivityCollection.""" + if not coll: + return None + + items = getattr(coll, "items", None) + if items is None and isinstance(coll, dict): + items = coll.get("items") + + result: List[RefreshActivitySummary] = [] + if items: + for it in items: + mapped = map_refresh_activity_summary(it) + if mapped: + result.append(mapped) + return RefreshActivityCollection(items=result) + + +class ScheduledActivitySummary(BaseModel): + """ + Summary of the scheduled activity for a Fusion environment. + + Attributes: + - actions: List of actions + - defined_tags: Defined tags for this resource. Example: {"foo-namespace": {"bar-key": "value"}} + - delay_in_hours: Cumulative delay hours + - display_name: A friendly name for the scheduled activity. Can be changed later. + - freeform_tags: Simple key-value pair applied without predefined name. Example: {"bar-key": "value"} + - fusion_environment_id: The OCID of the Fusion environment for the scheduled activity. + - id: Unique identifier that is immutable on creation. + - lifecycle_details: Message describing the current state in more detail. + - lifecycle_state: The current state of the scheduled activity. + Valid values are Scheduled, In progress, Failed, Completed. + - run_cycle: The run cadence of this scheduled activity. + Valid values are Quarterly, Monthly, OneOff, and Vertex. + - scheduled_activity_association_id: The unique identifier that associates a + scheduled activity with others in one complete maintenance. + - scheduled_activity_phase: A property describing the phase of the scheduled activity. + - service_availability: Service availability / impact during scheduled activity execution, up down + - time_accepted: The time the scheduled activity record was created. RFC3339 datetime. + - time_expected_finish: Current time the scheduled activity is scheduled to end. RFC3339 datetime. + - time_finished: The time the scheduled activity actually completed / cancelled / failed. + RFC3339 datetime. + - time_scheduled_start: Current time the scheduled activity is scheduled to start. RFC3339 datetime. + - time_updated: The time the scheduled activity record was updated. RFC3339 datetime. + """ + + actions: Optional[List[Dict[str, Any]]] = Field(None, description="List of actions") + defined_tags: Optional[Dict[str, Dict[str, Any]]] = Field( + None, + description="Defined tags for this resource. Each key is predefined and scoped to a namespace.", + ) + delay_in_hours: Optional[int] = Field(None, description="Cumulative delay hours") + display_name: Optional[str] = Field( + None, + description="A friendly name for the scheduled activity. Can be changed later.", + ) + freeform_tags: Optional[Dict[str, str]] = Field( + None, + description="Simple key-value pair that is applied without any predefined name, type or scope.", + ) + fusion_environment_id: Optional[str] = Field( + None, + description="The OCID of the Fusion environment for the scheduled activity.", + ) + id: Optional[str] = Field( + None, description="Unique identifier that is immutable on creation." + ) + lifecycle_details: Optional[str] = Field( + None, + description="A message describing the current state in more detail.", + ) + lifecycle_state: Optional[str] = Field( + None, + description=( + "The current state of the scheduled activity. " + "Valid values are Scheduled, In progress, Failed, Completed." + ), + ) + run_cycle: Optional[str] = Field( + None, + description=( + "The run cadence of this scheduled activity. " + "Valid values are Quarterly, Monthly, OneOff, and Vertex." + ), + ) + scheduled_activity_association_id: Optional[str] = Field( + None, + description=( + "The unique identifier that associates a scheduled activity " + "with others in one complete maintenance." + ), + ) + scheduled_activity_phase: Optional[str] = Field( + None, description="Phase of the scheduled activity." + ) + service_availability: Optional[str] = Field( + None, + description="Service availability / impact during scheduled activity execution, up down", + ) + time_accepted: Optional[datetime] = Field( + None, description="The time the scheduled activity record was created." + ) + time_expected_finish: Optional[datetime] = Field( + None, + description="Current time the scheduled activity is scheduled to end.", + ) + time_finished: Optional[datetime] = Field( + None, + description="The time the scheduled activity actually completed / cancelled / failed.", + ) + time_scheduled_start: Optional[datetime] = Field( + None, + description="Current time the scheduled activity is scheduled to start.", + ) + time_updated: Optional[datetime] = Field( + None, description="The time the scheduled activity record was updated." + ) + + +def map_scheduled_activity_summary(obj: Any) -> Optional[ScheduledActivitySummary]: + """Convert an OCI SDK ScheduledActivitySummary (or dict) into Pydantic ScheduledActivitySummary.""" + if not obj: + return None + data = _oci_to_dict(obj) or {} + + raw_actions = getattr(obj, "actions", None) or data.get("actions") + actions: Optional[List[Dict[str, Any]]] = None + if raw_actions is not None: + actions = [] + for it in raw_actions: + actions.append(_oci_to_dict(it) or it) + + return ScheduledActivitySummary( + id=getattr(obj, "id", None) or data.get("id"), + display_name=getattr(obj, "display_name", None) or data.get("display_name"), + run_cycle=getattr(obj, "run_cycle", None) or data.get("run_cycle"), + fusion_environment_id=getattr(obj, "fusion_environment_id", None) + or data.get("fusion_environment_id"), + lifecycle_state=getattr(obj, "lifecycle_state", None) + or data.get("lifecycle_state"), + actions=actions, + time_scheduled_start=getattr(obj, "time_scheduled_start", None) + or data.get("time_scheduled_start"), + time_expected_finish=getattr(obj, "time_expected_finish", None) + or data.get("time_expected_finish"), + time_finished=getattr(obj, "time_finished", None) or data.get("time_finished"), + delay_in_hours=getattr(obj, "delay_in_hours", None) + or data.get("delay_in_hours"), + service_availability=getattr(obj, "service_availability", None) + or data.get("service_availability"), + time_accepted=getattr(obj, "time_accepted", None) or data.get("time_accepted"), + time_updated=getattr(obj, "time_updated", None) or data.get("time_updated"), + lifecycle_details=getattr(obj, "lifecycle_details", None) + or data.get("lifecycle_details"), + scheduled_activity_phase=getattr(obj, "scheduled_activity_phase", None) + or data.get("scheduled_activity_phase"), + scheduled_activity_association_id=getattr( + obj, "scheduled_activity_association_id", None + ) + or data.get("scheduled_activity_association_id"), + freeform_tags=getattr(obj, "freeform_tags", None) or data.get("freeform_tags"), + defined_tags=getattr(obj, "defined_tags", None) or data.get("defined_tags"), + ) + + +class ScheduledActivityCollection(BaseModel): + """ + Results of a scheduled activity search. + + Attributes: + - items: A page of scheduled activity objects. + """ + + items: Optional[List[ScheduledActivitySummary]] = Field( + None, description="A page of scheduled activity objects." + ) + + +def map_scheduled_activity_collection( + coll: Any, +) -> Optional[ScheduledActivityCollection]: + """Convert an OCI SDK ScheduledActivityCollection model (or dict) into Pydantic + ScheduledActivityCollection.""" + if not coll: + return None + + items = getattr(coll, "items", None) + if items is None and isinstance(coll, dict): + items = coll.get("items") + + result: List[ScheduledActivitySummary] = [] + if items: + for it in items: + mapped = map_scheduled_activity_summary(it) + if mapped: + result.append(mapped) + return ScheduledActivityCollection(items=result) + + +class ScheduledActivity(BaseModel): + """ + Details of scheduled activity. + + Attributes: + - actions: List of actions + - defined_tags: Defined tags for this resource. Example: {"foo-namespace": {"bar-key": "value"}} + - delay_in_hours: Cumulative delay hours + - display_name: scheduled activity display name, can be renamed. + - freeform_tags: Simple key-value pair applied without predefined name. Example: {"bar-key": "value"} + - fusion_environment_id: FAaaS Environment Identifier. + - id: Unique identifier that is immutable on creation. + - lifecycle_details: A message describing the current state in more detail. + - lifecycle_state: The current state of the scheduledActivity. + - run_cycle: run cadence. + - scheduled_activity_association_id: The unique identifier that associates a + scheduled activity with others in one complete maintenance. + - scheduled_activity_phase: A property describing the phase of the scheduled activity. + - service_availability: Service availability / impact during scheduled activity execution up down + - time_created: The time the scheduled activity record was created. RFC3339 datetime. + - time_expected_finish: Current time the scheduled activity is scheduled to end. RFC3339 datetime. + - time_finished: The time the scheduled activity actually completed / cancelled / failed. + RFC3339 datetime. + - time_scheduled_start: Current time the scheduled activity is scheduled to start. RFC3339 datetime. + - time_updated: The time the scheduled activity record was updated. RFC3339 datetime. + """ + + actions: Optional[List[Dict[str, Any]]] = Field(None, description="List of actions") + defined_tags: Optional[Dict[str, Dict[str, Any]]] = Field( + None, + description="Defined tags for this resource. Each key is predefined and scoped to a namespace.", + ) + delay_in_hours: Optional[int] = Field(None, description="Cumulative delay hours") + display_name: Optional[str] = Field( + None, description="scheduled activity display name, can be renamed." + ) + freeform_tags: Optional[Dict[str, str]] = Field( + None, + description=( + "Simple key-value pair that is applied without any predefined " + "name, type or scope." + ), + ) + fusion_environment_id: Optional[str] = Field( + None, description="FAaaS Environment Identifier." + ) + id: Optional[str] = Field( + None, description="Unique identifier that is immutable on creation." + ) + lifecycle_details: Optional[str] = Field( + None, + description=( + "A message describing the current state in more detail. " + "For example, can be used to provide actionable information " + "for a resource in Failed state." + ), + ) + lifecycle_state: Optional[str] = Field( + None, description="The current state of the scheduledActivity." + ) + run_cycle: Optional[str] = Field(None, description="run cadence.") + scheduled_activity_association_id: Optional[str] = Field( + None, + description=( + "The unique identifier that associates a scheduled activity " + "with others in one complete maintenance." + ), + ) + scheduled_activity_phase: Optional[str] = Field( + None, description="A property describing the phase of the scheduled activity." + ) + service_availability: Optional[str] = Field( + None, + description="Service availability / impact during scheduled activity execution up down", + ) + time_created: Optional[datetime] = Field( + None, + description=( + "The time the scheduled activity record was created. " + "An RFC3339 formatted datetime string." + ), + ) + time_expected_finish: Optional[datetime] = Field( + None, + description=( + "Current time the scheduled activity is scheduled to end. " + "An RFC3339 formatted datetime string." + ), + ) + time_finished: Optional[datetime] = Field( + None, + description=( + "The time the scheduled activity actually completed / cancelled / failed. " + "An RFC3339 formatted datetime string." + ), + ) + time_scheduled_start: Optional[datetime] = Field( + None, + description=( + "Current time the scheduled activity is scheduled to start. " + "An RFC3339 formatted datetime string." + ), + ) + time_updated: Optional[datetime] = Field( + None, + description=( + "The time the scheduled activity record was updated. " + "An RFC3339 formatted datetime string." + ), + ) + + +def map_scheduled_activity(obj: Any) -> Optional[ScheduledActivity]: + """Convert an OCI SDK ScheduledActivity (or dict) into Pydantic ScheduledActivity.""" + if not obj: + return None + data = _oci_to_dict(obj) or {} + + raw_actions = getattr(obj, "actions", None) or data.get("actions") + actions: Optional[List[Dict[str, Any]]] = None + if raw_actions is not None: + actions = [] + for it in raw_actions: + actions.append(_oci_to_dict(it) or it) + + return ScheduledActivity( + id=getattr(obj, "id", None) or data.get("id"), + display_name=getattr(obj, "display_name", None) or data.get("display_name"), + run_cycle=getattr(obj, "run_cycle", None) or data.get("run_cycle"), + fusion_environment_id=getattr(obj, "fusion_environment_id", None) + or data.get("fusion_environment_id"), + lifecycle_state=getattr(obj, "lifecycle_state", None) + or data.get("lifecycle_state"), + actions=actions, + time_created=getattr(obj, "time_created", None) or data.get("time_created"), + time_scheduled_start=getattr(obj, "time_scheduled_start", None) + or data.get("time_scheduled_start"), + time_expected_finish=getattr(obj, "time_expected_finish", None) + or data.get("time_expected_finish"), + time_finished=getattr(obj, "time_finished", None) or data.get("time_finished"), + delay_in_hours=getattr(obj, "delay_in_hours", None) + or data.get("delay_in_hours"), + service_availability=getattr(obj, "service_availability", None) + or data.get("service_availability"), + time_updated=getattr(obj, "time_updated", None) or data.get("time_updated"), + lifecycle_details=getattr(obj, "lifecycle_details", None) + or data.get("lifecycle_details"), + scheduled_activity_phase=getattr(obj, "scheduled_activity_phase", None) + or data.get("scheduled_activity_phase"), + scheduled_activity_association_id=getattr( + obj, "scheduled_activity_association_id", None + ) + or data.get("scheduled_activity_association_id"), + freeform_tags=getattr(obj, "freeform_tags", None) or data.get("freeform_tags"), + defined_tags=getattr(obj, "defined_tags", None) or data.get("defined_tags"), + ) + + +class Subscription(BaseModel): + """ + Subscription information for a root compartment or tenancy. + + Attributes: + - id: OCID of the subscription details for a particular root compartment or tenancy. + - classic_subscription_id: Subscription id. + - service_name: The type of subscription, such as 'CLOUDCM'/'SAAS'/'CRM', etc. + - lifecycle_state: Lifecycle state of the subscription. + - lifecycle_details: Subscription resource intermediate states. + - skus: Stock keeping unit list. + """ + + id: Optional[str] = Field( + None, + description="OCID of the subscription details for particular root compartment or tenancy.", + ) + classic_subscription_id: Optional[str] = Field(None, description="Subscription id.") + service_name: Optional[str] = Field( + None, + description="The type of subscription, such as 'CLOUDCM'/'SAAS'/'CRM', etc.", + ) + lifecycle_state: Optional[str] = Field( + None, description="Lifecycle state of the subscription." + ) + lifecycle_details: Optional[str] = Field( + None, description="Subscription resource intermediate states." + ) + skus: Optional[List[Dict[str, Any]]] = Field( + None, description="Stock keeping unit list." + ) + + +def map_subscription(obj: Any) -> Optional[Subscription]: + """Convert an OCI SDK Subscription model (or dict) into Pydantic Subscription.""" + if not obj: + return None + data = _oci_to_dict(obj) or {} + + raw_skus = getattr(obj, "skus", None) or data.get("skus") + skus: Optional[List[Dict[str, Any]]] = None + if raw_skus is not None: + skus = [] + for it in raw_skus: + skus.append(_oci_to_dict(it) or it) + + return Subscription( + id=getattr(obj, "id", None) or data.get("id"), + classic_subscription_id=getattr(obj, "classic_subscription_id", None) + or data.get("classic_subscription_id"), + service_name=getattr(obj, "service_name", None) or data.get("service_name"), + lifecycle_state=getattr(obj, "lifecycle_state", None) + or data.get("lifecycle_state"), + lifecycle_details=getattr(obj, "lifecycle_details", None) + or data.get("lifecycle_details"), + skus=skus, + ) + + +class SubscriptionDetail(BaseModel): + """ + Detail for the FusionEnvironmentFamily subscription. + + Attributes: + - subscriptions: List of subscriptions. + """ + + subscriptions: Optional[List[Subscription]] = Field( + None, description="List of subscriptions." + ) + + +def map_subscription_detail(obj: Any) -> Optional[SubscriptionDetail]: + """Convert an OCI SDK SubscriptionDetail model (or dict) into Pydantic SubscriptionDetail.""" + if not obj: + return None + + raw_subs = getattr(obj, "subscriptions", None) + if raw_subs is None and isinstance(obj, dict): + raw_subs = obj.get("subscriptions") + + subs: List[Subscription] = [] + if raw_subs: + for it in raw_subs: + mapped = map_subscription(it) + if mapped: + subs.append(mapped) + + return SubscriptionDetail(subscriptions=subs) diff --git a/src/oci-faaas-mcp-server/oracle/oci_faaas_mcp_server/server.py b/src/oci-faaas-mcp-server/oracle/oci_faaas_mcp_server/server.py index 2b88b778..2e0b12ce 100644 --- a/src/oci-faaas-mcp-server/oracle/oci_faaas_mcp_server/server.py +++ b/src/oci-faaas-mcp-server/oracle/oci_faaas_mcp_server/server.py @@ -5,6 +5,7 @@ """ import os +from datetime import datetime from logging import Logger from typing import Any, Literal, Optional @@ -14,12 +15,22 @@ from . import __project__, __version__ from .models import ( + AdminUserSummary, FusionEnvironment, FusionEnvironmentFamily, FusionEnvironmentStatus, + RefreshActivitySummary, + ScheduledActivity, + ScheduledActivitySummary, + SubscriptionDetail, + map_admin_user_summary, map_fusion_environment, map_fusion_environment_family, map_fusion_environment_status, + map_refresh_activity_summary, + map_scheduled_activity, + map_scheduled_activity_summary, + map_subscription_detail, ) logger = Logger(__name__, level="INFO") @@ -67,6 +78,11 @@ def list_fusion_environment_families( "DELETING, DELETED, FAILED" ), ), + limit: Optional[int] = Field( + None, + description="The maximum number of resources to return. If None, there is no limit.", + ge=1, + ), ) -> list[FusionEnvironmentFamily]: client = get_faaas_client() @@ -74,10 +90,12 @@ def list_fusion_environment_families( next_page: Optional[str] = None has_next_page = True - while has_next_page: + while has_next_page and (limit is None or len(families) < limit): kwargs: dict[str, Any] = {"compartment_id": compartment_id} if next_page is not None: kwargs["page"] = next_page + if limit is not None: + kwargs["limit"] = limit if display_name is not None: kwargs["display_name"] = display_name if lifecycle_state is not None: @@ -97,6 +115,8 @@ def list_fusion_environment_families( ) for d in iterable: families.append(map_fusion_environment_family(d)) + if limit is not None and len(families) >= limit: + break # Robust pagination handling with header fallback headers = getattr(response, "headers", None) @@ -106,7 +126,9 @@ def list_fusion_environment_families( next_page = dict(headers).get("opc-next-page") except Exception: next_page = None - has_next_page = next_page is not None + has_next_page = next_page is not None and ( + limit is None or len(families) < limit + ) logger.info(f"Found {len(families)} Fusion Environment Families") return families @@ -145,6 +167,11 @@ def list_fusion_environments( "INACTIVE, DELETING, DELETED, FAILED" ), ), + limit: Optional[int] = Field( + None, + description="The maximum number of resources to return. If None, there is no limit.", + ge=1, + ), ) -> list[FusionEnvironment]: client = get_faaas_client() @@ -152,10 +179,12 @@ def list_fusion_environments( next_page: Optional[str] = None has_next_page = True - while has_next_page: + while has_next_page and (limit is None or len(environments) < limit): kwargs: dict[str, Any] = {"compartment_id": compartment_id} if next_page is not None: kwargs["page"] = next_page + if limit is not None: + kwargs["limit"] = limit if fusion_environment_family_id is not None: kwargs["fusion_environment_family_id"] = fusion_environment_family_id if display_name is not None: @@ -175,6 +204,8 @@ def list_fusion_environments( ) for d in iterable: environments.append(map_fusion_environment(d)) + if limit is not None and len(environments) >= limit: + break # Robust pagination handling with header fallback headers = getattr(response, "headers", None) @@ -184,7 +215,9 @@ def list_fusion_environments( next_page = dict(headers).get("opc-next-page") except Exception: next_page = None - has_next_page = next_page is not None + has_next_page = next_page is not None and ( + limit is None or len(environments) < limit + ) logger.info(f"Found {len(environments)} Fusion Environments") return environments @@ -216,6 +249,359 @@ def get_fusion_environment_status( return map_fusion_environment_status(response.data) +@mcp.tool(description="List all FusionEnvironment admin users") +def list_admin_users( + fusion_environment_id: str = Field( + ..., description="unique FusionEnvironment identifier" + ) +) -> list[AdminUserSummary]: + client = get_faaas_client() + + admins: list[AdminUserSummary] = [] + next_page: Optional[str] = None + has_next_page = True + + while has_next_page: + kwargs: dict[str, Any] = {"fusion_environment_id": fusion_environment_id} + if next_page is not None: + kwargs["page"] = next_page + + response: oci.response.Response = client.list_admin_users(**kwargs) + + data_obj = response.data or [] + items = getattr(data_obj, "items", None) + iterable = ( + items + if items is not None + else (data_obj if isinstance(data_obj, list) else [data_obj]) + ) + for d in iterable: + mapped = map_admin_user_summary(d) + if mapped: + admins.append(mapped) + + has_next_page = getattr(response, "has_next_page", False) + next_page = getattr(response, "next_page", None) + + return admins + + +@mcp.tool(description="List all FusionEnvironment refresh activities") +def list_refresh_activities( + fusion_environment_id: str = Field( + ..., description="unique FusionEnvironment identifier" + ), + display_name: Optional[str] = Field( + None, description="Filter to match entire display name." + ), + time_scheduled_start_greater_than_or_equal_to: Optional[datetime] = Field( + None, description="Filter: scheduled start time >= this RFC3339 datetime" + ), + time_expected_finish_less_than_or_equal_to: Optional[datetime] = Field( + None, description="Filter: expected finish time <= this RFC3339 datetime" + ), + lifecycle_state: Optional[ + Literal[ + "ACCEPTED", + "IN_PROGRESS", + "NEEDS_ATTENTION", + "FAILED", + "SUCCEEDED", + "CANCELED", + ] + ] = Field( + None, + description=( + "Filter by lifecycle state. Allowed: ACCEPTED, IN_PROGRESS, " + "NEEDS_ATTENTION, FAILED, SUCCEEDED, CANCELED." + ), + ), + limit: Optional[int] = Field( + None, description="Maximum number of results per page.", ge=1 + ), + page: Optional[str] = Field( + None, description="Pagination token from a previous call." + ), + sort_order: Optional[Literal["ASC", "DESC"]] = Field( + None, description="Sort order: ASC or DESC" + ), + sort_by: Optional[Literal["TIME_CREATED", "DISPLAY_NAME"]] = Field( + None, description="Sort by field. Allowed: TIME_CREATED, DISPLAY_NAME" + ), + opc_request_id: Optional[str] = Field( + None, description="Client request ID for tracing." + ), +) -> list[RefreshActivitySummary]: + client = get_faaas_client() + + activities: list[RefreshActivitySummary] = [] + next_page: Optional[str] = page + has_next_page = True + + while has_next_page and (limit is None or len(activities) < limit): + kwargs: dict[str, Any] = { + "fusion_environment_id": fusion_environment_id, + "page": next_page, + "limit": limit, + } + if display_name: + kwargs["display_name"] = display_name + if time_scheduled_start_greater_than_or_equal_to: + kwargs["time_scheduled_start_greater_than_or_equal_to"] = ( + time_scheduled_start_greater_than_or_equal_to + ) + if time_expected_finish_less_than_or_equal_to: + kwargs["time_expected_finish_less_than_or_equal_to"] = ( + time_expected_finish_less_than_or_equal_to + ) + if lifecycle_state: + kwargs["lifecycle_state"] = lifecycle_state + if sort_order: + kwargs["sort_order"] = sort_order + if sort_by: + kwargs["sort_by"] = sort_by + if opc_request_id: + kwargs["opc_request_id"] = opc_request_id + + response: oci.response.Response = client.list_refresh_activities(**kwargs) + + data_obj = response.data or [] + items = getattr(data_obj, "items", None) + iterable = ( + items + if items is not None + else (data_obj if isinstance(data_obj, list) else [data_obj]) + ) + for d in iterable: + mapped = map_refresh_activity_summary(d) + if mapped: + activities.append(mapped) + if limit is not None and len(activities) >= limit: + break + + has_next_page = getattr(response, "has_next_page", False) + next_page = getattr(response, "next_page", None) + + return activities + + +@mcp.tool(description="List all scheduled activities for a FusionEnvironment") +def list_scheduled_activities( + fusion_environment_id: str = Field( + ..., description="unique FusionEnvironment identifier" + ), + display_name: Optional[str] = Field( + None, + description="A filter to return only resources that match the entire display name given.", + ), + time_scheduled_start_greater_than_or_equal_to: Optional[datetime] = Field( + None, + description="A filter that returns all resources that are scheduled after this date", + ), + time_expected_finish_less_than_or_equal_to: Optional[datetime] = Field( + None, + description="A filter that returns all resources that end before this date", + ), + run_cycle: Optional[Literal["QUARTERLY", "MONTHLY", "ONEOFF", "VERTEX"]] = Field( + None, description="Filter by run cycle." + ), + lifecycle_state: Optional[ + Literal["ACCEPTED", "IN_PROGRESS", "FAILED", "SUCCEEDED", "CANCELED"] + ] = Field(None, description="Filter by lifecycle state."), + scheduled_activity_association_id: Optional[str] = Field( + None, description="Filter by scheduledActivityAssociationId." + ), + scheduled_activity_phase: Optional[ + Literal["PRE_MAINTENANCE", "MAINTENANCE", "POST_MAINTENANCE"] + ] = Field(None, description="Filter by scheduled activity phase."), + limit: Optional[int] = Field( + None, description="The maximum number of items to return.", ge=1 + ), + page: Optional[str] = Field( + None, + description="The page token representing the page at which to start retrieving results.", + ), + sort_order: Optional[Literal["ASC", "DESC"]] = Field( + None, description="The sort order to use, either 'ASC' or 'DESC'." + ), + sort_by: Optional[Literal["TIME_CREATED", "DISPLAY_NAME"]] = Field( + None, + description=("The field to sort by. Allowed: TIME_CREATED, DISPLAY_NAME"), + ), + opc_request_id: Optional[str] = Field( + None, description="The client request ID for tracing." + ), + retry_strategy: Optional[Literal["none", "default"]] = Field( + None, + description=( + "Retry strategy to use. Use 'none' to explicitly disable retries " + "(sets NoneRetryStrategy)." + ), + ), + allow_control_chars: Optional[bool] = Field( + None, + description="Whether to allow control characters in the response object.", + ), +) -> list[ScheduledActivitySummary]: + client = get_faaas_client() + + activities: list[ScheduledActivitySummary] = [] + next_page: Optional[str] = page + has_next_page = True + + while has_next_page and (limit is None or len(activities) < limit): + kwargs: dict[str, Any] = { + "fusion_environment_id": fusion_environment_id, + "page": next_page, + "limit": limit, + } + if display_name: + kwargs["display_name"] = display_name + if time_scheduled_start_greater_than_or_equal_to: + kwargs["time_scheduled_start_greater_than_or_equal_to"] = ( + time_scheduled_start_greater_than_or_equal_to + ) + if time_expected_finish_less_than_or_equal_to: + kwargs["time_expected_finish_less_than_or_equal_to"] = ( + time_expected_finish_less_than_or_equal_to + ) + if run_cycle: + kwargs["run_cycle"] = run_cycle + if lifecycle_state: + kwargs["lifecycle_state"] = lifecycle_state + if scheduled_activity_association_id: + kwargs["scheduled_activity_association_id"] = ( + scheduled_activity_association_id + ) + if scheduled_activity_phase: + kwargs["scheduled_activity_phase"] = scheduled_activity_phase + if sort_order: + kwargs["sort_order"] = sort_order + if sort_by: + kwargs["sort_by"] = sort_by + if opc_request_id: + kwargs["opc_request_id"] = opc_request_id + if allow_control_chars is not None: + kwargs["allow_control_chars"] = allow_control_chars + if retry_strategy == "none": + try: + kwargs["retry_strategy"] = oci.retry.NoneRetryStrategy() + except Exception: + pass + + response: oci.response.Response = client.list_scheduled_activities(**kwargs) + + data_obj = response.data or [] + items = getattr(data_obj, "items", None) + iterable = ( + items + if items is not None + else (data_obj if isinstance(data_obj, list) else [data_obj]) + ) + for d in iterable: + mapped = map_scheduled_activity_summary(d) + if mapped: + activities.append(mapped) + if limit is not None and len(activities) >= limit: + break + + has_next_page = getattr(response, "has_next_page", False) + next_page = getattr(response, "next_page", None) + + return activities + + +@mcp.tool(description="Gets a ScheduledActivity by identifier") +def get_scheduled_activity( + fusion_environment_id: str = Field( + ..., description="unique FusionEnvironment identifier" + ), + scheduled_activity_id: str = Field( + ..., description="Unique ScheduledActivity identifier." + ), + opc_request_id: Optional[str] = Field( + None, description="The client request ID for tracing." + ), + retry_strategy: Optional[Literal["none", "default"]] = Field( + None, + description=( + "A retry strategy to apply to this operation. " + "Use 'none' to explicitly disable retries (sets NoneRetryStrategy). " + "This operation uses DEFAULT_RETRY_STRATEGY as default if no retry strategy is provided." + ), + ), + allow_control_chars: Optional[bool] = Field( + None, + description=( + "Whether to allow control characters in the response object. " + "By default, control characters are not allowed." + ), + ), +) -> ScheduledActivity: + client = get_faaas_client() + + kwargs = {} + if opc_request_id: + kwargs["opc_request_id"] = opc_request_id + if allow_control_chars is not None: + kwargs["allow_control_chars"] = allow_control_chars + if retry_strategy == "none": + try: + kwargs["retry_strategy"] = oci.retry.NoneRetryStrategy() + except Exception: + # Retry module may not be available in some environments; ignore if unavailable. + pass + + response: oci.response.Response = client.get_scheduled_activity( + fusion_environment_id=fusion_environment_id, + scheduled_activity_id=scheduled_activity_id, + **kwargs, + ) + return map_scheduled_activity(response.data) + + +@mcp.tool(description="Gets the subscription details of a Fusion Environment Family.") +def get_fusion_environment_family_subscription_detail( + fusion_environment_family_id: str = Field( + ..., description="The unique identifier (OCID) of the FusionEnvironmentFamily." + ), + opc_request_id: Optional[str] = Field( + None, description="The client request ID for tracing." + ), + retry_strategy: Optional[Literal["none", "default"]] = Field( + None, + description=( + "Retry strategy to use. Use 'none' to explicitly disable retries (sets NoneRetryStrategy). " + "If omitted or 'default', the SDK default/client-level retry strategy is used." + ), + ), + allow_control_chars: Optional[bool] = Field( + None, + description="Whether to allow control characters in the response object.", + ), +) -> SubscriptionDetail: + client = get_faaas_client() + + kwargs = {} + if opc_request_id: + kwargs["opc_request_id"] = opc_request_id + if allow_control_chars is not None: + kwargs["allow_control_chars"] = allow_control_chars + if retry_strategy == "none": + try: + kwargs["retry_strategy"] = oci.retry.NoneRetryStrategy() + except Exception: + # If retry module not available, skip to maintain compatibility in test environments + pass + + response: oci.response.Response = ( + client.get_fusion_environment_family_subscription_detail( + fusion_environment_family_id=fusion_environment_family_id, **kwargs + ) + ) + return map_subscription_detail(response.data) + + def main(): mcp.run() diff --git a/src/oci-faaas-mcp-server/oracle/oci_faaas_mcp_server/tests/test_faaas_tools.py b/src/oci-faaas-mcp-server/oracle/oci_faaas_mcp_server/tests/test_faaas_tools.py index 65bc7e23..382ae8a6 100644 --- a/src/oci-faaas-mcp-server/oracle/oci_faaas_mcp_server/tests/test_faaas_tools.py +++ b/src/oci-faaas-mcp-server/oracle/oci_faaas_mcp_server/tests/test_faaas_tools.py @@ -6,6 +6,7 @@ import sys import types +from datetime import datetime from unittest.mock import MagicMock, create_autospec, patch import pytest @@ -63,6 +64,7 @@ async def test_list_fusion_environment_families(self, mock_get_client): ) result = call_tool_result.structured_content["result"] + assert isinstance(result, list) assert len(result) == 1 assert result[0]["id"] == "family1" @@ -95,6 +97,7 @@ async def test_list_fusion_environments(self, mock_get_client): ) result = call_tool_result.structured_content["result"] + assert isinstance(result, list) assert len(result) == 1 assert result[0]["id"] == "env1" @@ -119,6 +122,7 @@ async def test_get_fusion_environment(self, mock_get_client): ) result = call_tool_result.structured_content + assert isinstance(result, dict) assert result["id"] == "env1" assert result["display_name"] == "Env 1" @@ -143,6 +147,7 @@ async def test_get_fusion_environment_status(self, mock_get_client): ) result = call_tool_result.structured_content + assert isinstance(result, dict) assert result["fusion_environment_id"] == "env1" assert result["status"] == "ACTIVE" @@ -178,3 +183,602 @@ def test_get_faaas_client_initializes_client(self, tmp_path): assert "additional_user_agent" in cfg # Signer constructed using token file contents and key mock_signer.assert_called_once() + + @pytest.mark.asyncio + @patch("oracle.oci_faaas_mcp_server.server.get_faaas_client") + async def test_get_fusion_environment_family_subscription_detail( + self, mock_get_client + ): + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + mock_get_response = create_autospec(oci.response.Response) + mock_get_response.data = { + "subscriptions": [ + { + "id": "ocid1.subscription.oc1..exampleuniqueID", + "classic_subscription_id": "classic-1", + "service_name": "SAAS", + "lifecycle_state": "ACTIVE", + "skus": [], + } + ] + } + mock_client.get_fusion_environment_family_subscription_detail.return_value = ( + mock_get_response + ) + + async with Client(mcp) as client: + call_tool_result = await client.call_tool( + "get_fusion_environment_family_subscription_detail", + { + "fusion_environment_family_id": "ocid1.fusionenvironmentfamily.oc1..example" + }, + ) + result = call_tool_result.structured_content + + assert "subscriptions" in result + assert isinstance(result["subscriptions"], list) + assert result["subscriptions"][0]["service_name"] == "SAAS" + + @pytest.mark.asyncio + @patch("oracle.oci_faaas_mcp_server.server.get_faaas_client") + async def test_list_scheduled_activities_minimal(self, mock_get_client): + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + mock_list_response = create_autospec(oci.response.Response) + mock_list_response.data = [ + { + "id": "sa1", + "display_name": "Activity 1", + "run_cycle": "MONTHLY", + "fusion_environment_id": "env1", + "lifecycle_state": "ACCEPTED", + "time_scheduled_start": "2025-01-01T00:00:00Z", + "time_expected_finish": "2025-01-01T02:00:00Z", + "scheduled_activity_phase": "MAINTENANCE", + "scheduled_activity_association_id": "assoc1", + } + ] + mock_list_response.has_next_page = False + mock_list_response.next_page = None + mock_client.list_scheduled_activities.return_value = mock_list_response + + async with Client(mcp) as client: + call_tool_result = await client.call_tool( + "list_scheduled_activities", + {"fusion_environment_id": "env1"}, + ) + result = call_tool_result.structured_content["result"] + + assert isinstance(result, list) + assert len(result) == 1 + assert result[0]["id"] == "sa1" + + @pytest.mark.asyncio + @patch("oracle.oci_faaas_mcp_server.server.get_faaas_client") + async def test_list_scheduled_activities_with_filters(self, mock_get_client): + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + mock_list_response = create_autospec(oci.response.Response) + mock_list_response.data = [] + mock_list_response.has_next_page = False + mock_list_response.next_page = None + mock_client.list_scheduled_activities.return_value = mock_list_response + + dt_start = datetime(2025, 1, 1, 0, 0, 0) + dt_end = datetime(2025, 1, 2, 0, 0, 0) + + async with Client(mcp) as client: + await client.call_tool( + "list_scheduled_activities", + { + "fusion_environment_id": "env1", + "display_name": "ZDT", + "time_scheduled_start_greater_than_or_equal_to": dt_start, + "time_expected_finish_less_than_or_equal_to": dt_end, + "run_cycle": "MONTHLY", + "lifecycle_state": "SUCCEEDED", + "scheduled_activity_association_id": "assoc1", + "scheduled_activity_phase": "POST_MAINTENANCE", + "limit": 10, + "page": "token123", + "sort_order": "ASC", + "sort_by": "DISPLAY_NAME", + "opc_request_id": "req-1", + "allow_control_chars": True, + "retry_strategy": "none", + }, + ) + + # Verify mapped kwargs passed to OCI client + _, kwargs = mock_client.list_scheduled_activities.call_args + assert kwargs["fusion_environment_id"] == "env1" + assert kwargs["display_name"] == "ZDT" + assert kwargs["time_scheduled_start_greater_than_or_equal_to"] == dt_start + assert kwargs["time_expected_finish_less_than_or_equal_to"] == dt_end + assert kwargs["run_cycle"] == "MONTHLY" + assert kwargs["lifecycle_state"] == "SUCCEEDED" + assert kwargs["scheduled_activity_association_id"] == "assoc1" + assert kwargs["scheduled_activity_phase"] == "POST_MAINTENANCE" + assert kwargs["limit"] == 10 + assert kwargs["page"] == "token123" + assert kwargs["sort_order"] == "ASC" + assert kwargs["sort_by"] == "DISPLAY_NAME" + assert kwargs["opc_request_id"] == "req-1" + assert kwargs["allow_control_chars"] is True + + @pytest.mark.asyncio + @patch("oracle.oci_faaas_mcp_server.server.get_faaas_client") + async def test_get_scheduled_activity_minimal(self, mock_get_client): + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + mock_get_response = create_autospec(oci.response.Response) + mock_get_response.data = { + "id": "sa1", + "display_name": "Activity 1", + "run_cycle": "MONTHLY", + "fusion_environment_id": "env1", + "lifecycle_state": "ACCEPTED", + "time_scheduled_start": "2025-01-01T00:00:00Z", + "time_expected_finish": "2025-01-01T02:00:00Z", + "scheduled_activity_phase": "MAINTENANCE", + "scheduled_activity_association_id": "assoc1", + } + mock_client.get_scheduled_activity.return_value = mock_get_response + + async with Client(mcp) as client: + call_tool_result = await client.call_tool( + "get_scheduled_activity", + { + "fusion_environment_id": "env1", + "scheduled_activity_id": "sa1", + }, + ) + result = call_tool_result.structured_content + assert result["id"] == "sa1" + assert result["display_name"] == "Activity 1" + + @pytest.mark.asyncio + @patch("oracle.oci_faaas_mcp_server.server.get_faaas_client") + async def test_get_scheduled_activity_with_options(self, mock_get_client): + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + mock_get_response = create_autospec(oci.response.Response) + mock_get_response.data = {"id": "sa2"} + mock_client.get_scheduled_activity.return_value = mock_get_response + + async with Client(mcp) as client: + await client.call_tool( + "get_scheduled_activity", + { + "fusion_environment_id": "env1", + "scheduled_activity_id": "sa2", + "opc_request_id": "req-2", + "allow_control_chars": True, + "retry_strategy": "none", + }, + ) + + _, kwargs = mock_client.get_scheduled_activity.call_args + assert kwargs["fusion_environment_id"] == "env1" + assert kwargs["scheduled_activity_id"] == "sa2" + assert kwargs["opc_request_id"] == "req-2" + assert kwargs["allow_control_chars"] is True + + @pytest.mark.asyncio + @patch("oracle.oci_faaas_mcp_server.server.get_faaas_client") + async def test_list_admin_users_minimal(self, mock_get_client): + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + resp = create_autospec(oci.response.Response) + resp.data = [ + { + "username": "admin1", + "email_address": "admin1@example.com", + "first_name": "A", + "last_name": "One", + } + ] + resp.has_next_page = False + resp.next_page = None + mock_client.list_admin_users.return_value = resp + + async with Client(mcp) as client: + call_res = await client.call_tool( + "list_admin_users", {"fusion_environment_id": "env1"} + ) + result = call_res.structured_content["result"] + + assert isinstance(result, list) + assert len(result) == 1 + assert result[0]["username"] == "admin1" + assert result[0]["email_address"] == "admin1@example.com" + + @pytest.mark.asyncio + @patch("oracle.oci_faaas_mcp_server.server.get_faaas_client") + async def test_list_admin_users_pagination(self, mock_get_client): + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + resp1 = create_autospec(oci.response.Response) + resp1.data = [{"username": "admin1"}] + resp1.has_next_page = True + resp1.next_page = "page2" + + resp2 = create_autospec(oci.response.Response) + resp2.data = [{"username": "admin2"}] + resp2.has_next_page = False + resp2.next_page = None + + mock_client.list_admin_users.side_effect = [resp1, resp2] + + async with Client(mcp) as client: + call_res = await client.call_tool( + "list_admin_users", {"fusion_environment_id": "env1"} + ) + result = call_res.structured_content["result"] + + assert [u["username"] for u in result] == ["admin1", "admin2"] + + # Verify pagination behavior: second call uses page token + assert mock_client.list_admin_users.call_count == 2 + _, first_kwargs = mock_client.list_admin_users.call_args_list[0] + _, second_kwargs = mock_client.list_admin_users.call_args_list[1] + assert "page" not in first_kwargs or first_kwargs["page"] is None + assert second_kwargs["page"] == "page2" + assert second_kwargs["fusion_environment_id"] == "env1" + + @pytest.mark.asyncio + @patch("oracle.oci_faaas_mcp_server.server.get_faaas_client") + async def test_list_refresh_activities_minimal(self, mock_get_client): + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + resp = create_autospec(oci.response.Response) + resp.data = [ + { + "id": "ra1", + "display_name": "Refresh 1", + "lifecycle_state": "SUCCEEDED", + "time_scheduled_start": "2025-01-01T01:00:00Z", + } + ] + resp.has_next_page = False + resp.next_page = None + mock_client.list_refresh_activities.return_value = resp + + async with Client(mcp) as client: + call_res = await client.call_tool( + "list_refresh_activities", {"fusion_environment_id": "env1"} + ) + result = call_res.structured_content["result"] + assert isinstance(result, list) + assert len(result) == 1 + assert result[0]["id"] == "ra1" + + @pytest.mark.asyncio + @patch("oracle.oci_faaas_mcp_server.server.get_faaas_client") + async def test_list_refresh_activities_with_filters_and_limit( + self, mock_get_client + ): + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + resp1 = create_autospec(oci.response.Response) + resp1.data = [{"id": "ra1", "display_name": "Refresh 1"}] + resp1.has_next_page = True + resp1.next_page = "p2" + mock_client.list_refresh_activities.side_effect = [resp1] + + dt_start = datetime(2025, 1, 1, 0, 0, 0) + dt_end = datetime(2025, 1, 2, 0, 0, 0) + + async with Client(mcp) as client: + call_res = await client.call_tool( + "list_refresh_activities", + { + "fusion_environment_id": "env1", + "display_name": "Ref1", + "time_scheduled_start_greater_than_or_equal_to": dt_start, + "time_expected_finish_less_than_or_equal_to": dt_end, + "lifecycle_state": "SUCCEEDED", + "limit": 1, + "page": "p1", + "sort_order": "DESC", + "sort_by": "TIME_CREATED", + "opc_request_id": "reqX", + }, + ) + result = call_res.structured_content["result"] + assert len(result) == 1 + assert result[0]["id"] == "ra1" + + # With limit=1 and has_next_page True, only one SDK call should be made + assert mock_client.list_refresh_activities.call_count == 1 + _, kwargs = mock_client.list_refresh_activities.call_args + assert kwargs["fusion_environment_id"] == "env1" + assert kwargs["display_name"] == "Ref1" + assert kwargs["time_scheduled_start_greater_than_or_equal_to"] == dt_start + assert kwargs["time_expected_finish_less_than_or_equal_to"] == dt_end + assert kwargs["lifecycle_state"] == "SUCCEEDED" + assert kwargs["limit"] == 1 + assert kwargs["page"] == "p1" + assert kwargs["sort_order"] == "DESC" + assert kwargs["sort_by"] == "TIME_CREATED" + assert kwargs["opc_request_id"] == "reqX" + + @pytest.mark.asyncio + @patch("oracle.oci_faaas_mcp_server.server.get_faaas_client") + async def test_list_fusion_environment_families_pagination_header_fallback( + self, mock_get_client + ): + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + resp1 = create_autospec(oci.response.Response) + resp1.data = [{"id": "family1", "display_name": "Family 1"}] + resp1.next_page = None + # Simulate header-provided next page token + resp1.headers = {"opc-next-page": "nxtToken"} + + resp2 = create_autospec(oci.response.Response) + resp2.data = [{"id": "family2", "display_name": "Family 2"}] + resp2.next_page = None + resp2.headers = {} + + mock_client.list_fusion_environment_families.side_effect = [resp1, resp2] + + async with Client(mcp) as client: + call_res = await client.call_tool( + "list_fusion_environment_families", + {"compartment_id": "compartmentA"}, + ) + result = call_res.structured_content["result"] + + assert [f["id"] for f in result] == ["family1", "family2"] + + # First call without page, second call with header-derived page + assert mock_client.list_fusion_environment_families.call_count == 2 + _, first_kwargs = mock_client.list_fusion_environment_families.call_args_list[0] + _, second_kwargs = mock_client.list_fusion_environment_families.call_args_list[ + 1 + ] + assert first_kwargs["compartment_id"] == "compartmentA" + assert "page" not in first_kwargs or first_kwargs["page"] is None + assert second_kwargs["page"] == "nxtToken" + + @pytest.mark.asyncio + @patch("oracle.oci_faaas_mcp_server.server.get_faaas_client") + async def test_list_fusion_environments_pagination_header_fallback( + self, mock_get_client + ): + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + resp1 = create_autospec(oci.response.Response) + resp1.data = [{"id": "env1", "display_name": "Env 1"}] + resp1.next_page = None + resp1.headers = {"opc-next-page": "tok2"} + + resp2 = create_autospec(oci.response.Response) + resp2.data = [{"id": "env2", "display_name": "Env 2"}] + resp2.next_page = None + resp2.headers = {} + + mock_client.list_fusion_environments.side_effect = [resp1, resp2] + + async with Client(mcp) as client: + call_res = await client.call_tool( + "list_fusion_environments", + { + "compartment_id": "compartmentB", + "fusion_environment_family_id": "fam1", + }, + ) + result = call_res.structured_content["result"] + assert [e["id"] for e in result] == ["env1", "env2"] + + assert mock_client.list_fusion_environments.call_count == 2 + _, first_kwargs = mock_client.list_fusion_environments.call_args_list[0] + _, second_kwargs = mock_client.list_fusion_environments.call_args_list[1] + assert first_kwargs["compartment_id"] == "compartmentB" + assert first_kwargs["fusion_environment_family_id"] == "fam1" + assert "page" not in first_kwargs or first_kwargs["page"] is None + assert second_kwargs["page"] == "tok2" + + @pytest.mark.asyncio + @patch("oracle.oci_faaas_mcp_server.server.get_faaas_client") + async def test_list_fusion_environments_limit_enforced(self, mock_get_client): + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + resp1 = create_autospec(oci.response.Response) + resp1.data = [{"id": "env1"}] + resp1.next_page = None + resp1.headers = {"opc-next-page": "tok2"} # would paginate but limit stops it + + mock_client.list_fusion_environments.side_effect = [resp1] + + async with Client(mcp) as client: + call_res = await client.call_tool( + "list_fusion_environments", + {"compartment_id": "compartmentB", "limit": 1}, + ) + result = call_res.structured_content["result"] + assert len(result) == 1 + assert result[0]["id"] == "env1" + + # Only one SDK call due to limit=1 + assert mock_client.list_fusion_environments.call_count == 1 + + @pytest.mark.asyncio + @patch("oracle.oci_faaas_mcp_server.server.get_faaas_client") + async def test_list_families_handles_items_container(self, mock_get_client): + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + resp = create_autospec(oci.response.Response) + resp.data = types.SimpleNamespace( + items=[{"id": "familyX", "display_name": "Fam X"}] + ) + resp.has_next_page = False + resp.next_page = None + mock_client.list_fusion_environment_families.return_value = resp + + async with Client(mcp) as client: + call_res = await client.call_tool( + "list_fusion_environment_families", + {"compartment_id": "compartmentC"}, + ) + result = call_res.structured_content["result"] + assert [f["id"] for f in result] == ["familyX"] + + @pytest.mark.asyncio + @patch("oracle.oci_faaas_mcp_server.server.get_faaas_client") + async def test_list_admin_users_handles_items_container(self, mock_get_client): + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + resp = create_autospec(oci.response.Response) + resp.data = types.SimpleNamespace(items=[{"username": "container-admin"}]) + resp.has_next_page = False + resp.next_page = None + mock_client.list_admin_users.return_value = resp + + async with Client(mcp) as client: + call_res = await client.call_tool( + "list_admin_users", {"fusion_environment_id": "envC"} + ) + result = call_res.structured_content["result"] + assert result[0]["username"] == "container-admin" + + @pytest.mark.asyncio + @patch("oracle.oci_faaas_mcp_server.server.get_faaas_client") + async def test_list_refresh_activities_paginates_and_accumulates( + self, mock_get_client + ): + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + resp1 = create_autospec(oci.response.Response) + resp1.data = [{"id": "ra1"}] + resp1.has_next_page = True + resp1.next_page = "nextPage" + + resp2 = create_autospec(oci.response.Response) + resp2.data = [{"id": "ra2"}] + resp2.has_next_page = False + resp2.next_page = None + + mock_client.list_refresh_activities.side_effect = [resp1, resp2] + + async with Client(mcp) as client: + call_res = await client.call_tool( + "list_refresh_activities", {"fusion_environment_id": "envR"} + ) + result = call_res.structured_content["result"] + assert [r["id"] for r in result] == ["ra1", "ra2"] + + # Verify that the page token from the first response was used on the second call + _, first_kwargs = mock_client.list_refresh_activities.call_args_list[0] + _, second_kwargs = mock_client.list_refresh_activities.call_args_list[1] + assert "page" not in first_kwargs or first_kwargs["page"] is None + assert second_kwargs["page"] == "nextPage" + + @pytest.mark.asyncio + @patch("oracle.oci_faaas_mcp_server.server.get_faaas_client") + async def test_retry_strategy_none_list_scheduled_activities(self, mock_get_client): + # Provide a retry stub so server can set kwargs['retry_strategy'] + oci.retry = types.SimpleNamespace(NoneRetryStrategy=lambda: "NONE") # type: ignore[attr-defined] + + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + resp = create_autospec(oci.response.Response) + resp.data = [] + resp.has_next_page = False + resp.next_page = None + mock_client.list_scheduled_activities.return_value = resp + + async with Client(mcp) as client: + await client.call_tool( + "list_scheduled_activities", + { + "fusion_environment_id": "envRS", + "retry_strategy": "none", + }, + ) + + _, kwargs = mock_client.list_scheduled_activities.call_args + assert kwargs["retry_strategy"] == "NONE" + + @pytest.mark.asyncio + @patch("oracle.oci_faaas_mcp_server.server.get_faaas_client") + async def test_retry_strategy_none_get_scheduled_activity(self, mock_get_client): + # Provide a retry stub so server can set kwargs['retry_strategy'] + oci.retry = types.SimpleNamespace(NoneRetryStrategy=lambda: "NONE") # type: ignore[attr-defined] + + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + resp = create_autospec(oci.response.Response) + resp.data = {"id": "sa-retry"} + mock_client.get_scheduled_activity.return_value = resp + + async with Client(mcp) as client: + await client.call_tool( + "get_scheduled_activity", + { + "fusion_environment_id": "envRS", + "scheduled_activity_id": "sa-retry", + "retry_strategy": "none", + }, + ) + + _, kwargs = mock_client.get_scheduled_activity.call_args + assert kwargs["retry_strategy"] == "NONE" + assert kwargs["fusion_environment_id"] == "envRS" + assert kwargs["scheduled_activity_id"] == "sa-retry" + + @pytest.mark.asyncio + @patch("oracle.oci_faaas_mcp_server.server.get_faaas_client") + async def test_retry_strategy_none_get_subscription_detail_with_options( + self, mock_get_client + ): + # Provide a retry stub so server can set kwargs['retry_strategy'] + oci.retry = types.SimpleNamespace(NoneRetryStrategy=lambda: "NONE") # type: ignore[attr-defined] + + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + resp = create_autospec(oci.response.Response) + resp.data = { + "subscriptions": [{"id": "sub1", "service_name": "SAAS", "skus": []}] + } + mock_client.get_fusion_environment_family_subscription_detail.return_value = ( + resp + ) + + async with Client(mcp) as client: + await client.call_tool( + "get_fusion_environment_family_subscription_detail", + { + "fusion_environment_family_id": "ocid1.fam.oc1..example", + "opc_request_id": "r-123", + "allow_control_chars": True, + "retry_strategy": "none", + }, + ) + + _, kwargs = ( + mock_client.get_fusion_environment_family_subscription_detail.call_args + ) + assert kwargs["opc_request_id"] == "r-123" + assert kwargs["allow_control_chars"] is True + assert kwargs["retry_strategy"] == "NONE"