Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 62 additions & 1 deletion app/controller/chainsail/controller/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
update_nodes_mpi,
)
from chainsail.grpc import Health, add_HealthServicer_to_server
from chainsail.grpc import user_code_pb2_grpc, user_code_pb2

ProcessStatus = Tuple[bool, str]

Expand Down Expand Up @@ -54,6 +55,51 @@ def check_status(proc: Process) -> ProcessStatus:
return (True, "SUCCESS")


def check_user_code_server_ready(host="localhost", port=50052, timeout=1.0):
"""
Check if the user code server is ready to handle requests within the timeout.

Args:
host: Hostname of the user code server
port: Port of the user code server
timeout: Timeout in seconds for the health check

Returns:
bool: True if user code server is ready, False otherwise
"""
try:
# Create a channel with timeout
channel = grpc.insecure_channel(f"{host}:{port}")

# Use the channel with a timeout for the call
stub = user_code_pb2_grpc.HealthStub(channel)
request = user_code_pb2.HealthCheckRequest(service="")

# Make the health check call with timeout
response = stub.Check(request, timeout=timeout)

# Check if the response indicates the server is serving
is_ready = response.status == user_code_pb2.HealthCheckResponse.SERVING

# Close the channel
channel.close()

logger.debug(
f"User code server health check: status={response.status}, ready={is_ready}"
)
return is_ready

except grpc.RpcError as e:
logger.debug(
f"User code server health check failed with gRPC error: "
f"{e.code()} - {e.details()}"
)
return False
except Exception as e:
logger.debug(f"User code server health check failed with error: {e}")
return False


@click.command()
@click.option("--job", required=True, type=str, help="Chainsail job id")
@click.option(
Expand Down Expand Up @@ -141,7 +187,22 @@ def wrapped_run_job():

# Gets the status of the controller process
def controller_state():
return check_status(controller_proc)[1]
# First check if the controller process itself is running
process_ready, process_status = check_status(controller_proc)

if not process_ready:
logger.debug(f"Controller process not ready: {process_status}")
return process_status

# If controller process is running, check if user code server is ready
user_code_ready = check_user_code_server_ready()

if not user_code_ready:
logger.debug("User code server not ready, controller reporting NOT_SERVING")
return "NOT_SERVING"

logger.debug("Both controller process and user code server are ready")
return process_status

# Start gRPC server
with futures.ThreadPoolExecutor(max_workers=config.n_threads) as ex:
Expand Down