diff --git a/app/controller/chainsail/controller/run.py b/app/controller/chainsail/controller/run.py index 81d48828..74f849c4 100644 --- a/app/controller/chainsail/controller/run.py +++ b/app/controller/chainsail/controller/run.py @@ -22,6 +22,7 @@ update_nodes_mpi, ) from chainsail.grpc import Health, add_HealthServicer_to_server +from chainsail.grpc import user_code_pb2_grpc, user_code_pb2 ProcessStatus = Tuple[bool, str] @@ -54,6 +55,51 @@ def check_status(proc: Process) -> ProcessStatus: return (True, "SUCCESS") +def check_user_code_server_ready(host="localhost", port=50052, timeout=1.0): + """ + Check if the user code server is ready to handle requests within the timeout. + + Args: + host: Hostname of the user code server + port: Port of the user code server + timeout: Timeout in seconds for the health check + + Returns: + bool: True if user code server is ready, False otherwise + """ + try: + # Create a channel with timeout + channel = grpc.insecure_channel(f"{host}:{port}") + + # Use the channel with a timeout for the call + stub = user_code_pb2_grpc.HealthStub(channel) + request = user_code_pb2.HealthCheckRequest(service="") + + # Make the health check call with timeout + response = stub.Check(request, timeout=timeout) + + # Check if the response indicates the server is serving + is_ready = response.status == user_code_pb2.HealthCheckResponse.SERVING + + # Close the channel + channel.close() + + logger.debug( + f"User code server health check: status={response.status}, ready={is_ready}" + ) + return is_ready + + except grpc.RpcError as e: + logger.debug( + f"User code server health check failed with gRPC error: " + f"{e.code()} - {e.details()}" + ) + return False + except Exception as e: + logger.debug(f"User code server health check failed with error: {e}") + return False + + @click.command() @click.option("--job", required=True, type=str, help="Chainsail job id") @click.option( @@ -141,7 +187,22 @@ def wrapped_run_job(): # Gets the status of the controller process def controller_state(): - return check_status(controller_proc)[1] + # First check if the controller process itself is running + process_ready, process_status = check_status(controller_proc) + + if not process_ready: + logger.debug(f"Controller process not ready: {process_status}") + return process_status + + # If controller process is running, check if user code server is ready + user_code_ready = check_user_code_server_ready() + + if not user_code_ready: + logger.debug("User code server not ready, controller reporting NOT_SERVING") + return "NOT_SERVING" + + logger.debug("Both controller process and user code server are ready") + return process_status # Start gRPC server with futures.ThreadPoolExecutor(max_workers=config.n_threads) as ex: