Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package io.mantisrx.server.master.resourcecluster;

import io.mantisrx.server.core.domain.WorkerId;

import io.mantisrx.shaded.com.fasterxml.jackson.databind.JsonNode;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;

/**
Expand All @@ -25,6 +28,7 @@
@Getter
public class TaskExecutorTaskCancelledException extends Exception {
private static final long serialVersionUID = 1L;
private static final ObjectMapper mapper = new ObjectMapper();

private final WorkerId workerId;

Expand All @@ -38,4 +42,12 @@ public synchronized Throwable fillInStackTrace() {
// Do not include stack trace to be returned to clients
return this;
}

/**
* Serializes the full exception object including workerId to JsonNode.
* @return JsonNode representation of this exception
*/
public JsonNode toJsonNode() {
return mapper.valueToTree(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.mantisrx.server.master.resourcecluster.RequestThrottledException;
import io.mantisrx.server.master.resourcecluster.ResourceCluster.TaskExecutorNotFoundException;
import io.mantisrx.server.master.resourcecluster.TaskExecutorTaskCancelledException;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.JsonNode;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.node.JsonNodeFactory;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.node.ObjectNode;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ser.FilterProvider;
Expand Down Expand Up @@ -304,6 +305,15 @@ protected String generateFailureResponsePayload(String errorMsg, long requestId)
return node.toString();
}

protected String generateFailureResponsePayload(JsonNode errorMsgNode, long requestId) {
ObjectNode node = JsonNodeFactory.instance.objectNode();
node.put("time", System.currentTimeMillis());
node.put("host", this.hostName);
node.set("error", errorMsgNode);
node.put("requestId", requestId);
return node.toString();
}

FilterProvider parseFilter(String fields, String target) {
if (Strings.isNullOrEmpty(fields)) {
return null;
Expand Down Expand Up @@ -351,18 +361,31 @@ protected <T> Route withFuture(CompletableFuture<T> tFuture) {
throwable -> {
if (throwable instanceof TaskExecutorNotFoundException) {
MasterApiMetrics.getInstance().incrementResp4xx();
return complete(StatusCodes.NOT_FOUND);
return complete(
StatusCodes.NOT_FOUND,
HttpEntities.create(
ContentTypes.APPLICATION_JSON,
generateFailureResponsePayload(throwable.getMessage(), -1)));
}

if (throwable instanceof RequestThrottledException) {
MasterApiMetrics.getInstance().incrementResp4xx();
MasterApiMetrics.getInstance().incrementThrottledRequestCount();
return complete(StatusCodes.TOO_MANY_REQUESTS);
return complete(
StatusCodes.TOO_MANY_REQUESTS,
HttpEntities.create(
ContentTypes.APPLICATION_JSON,
generateFailureResponsePayload(throwable.getMessage(), -1)));
}

if (throwable instanceof TaskExecutorTaskCancelledException) {
MasterApiMetrics.getInstance().incrementResp4xx();
return complete(StatusCodes.NOT_ACCEPTABLE, throwable, Jackson.marshaller() );
TaskExecutorTaskCancelledException ex = (TaskExecutorTaskCancelledException) throwable;
return complete(
StatusCodes.NOT_ACCEPTABLE,
HttpEntities.create(
ContentTypes.APPLICATION_JSON,
generateFailureResponsePayload(ex.toJsonNode(), -1)));
}

if (throwable instanceof AskTimeoutException) {
Expand All @@ -371,7 +394,11 @@ protected <T> Route withFuture(CompletableFuture<T> tFuture) {

MasterApiMetrics.getInstance().incrementResp5xx();
logger.error("withFuture error: ", throwable);
return complete(StatusCodes.INTERNAL_SERVER_ERROR, throwable, Jackson.marshaller());
return complete(
StatusCodes.INTERNAL_SERVER_ERROR,
HttpEntities.create(
ContentTypes.APPLICATION_JSON,
generateFailureResponsePayload(throwable.getMessage(), -1)));
},
r -> complete(StatusCodes.OK, r, Jackson.marshaller())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import io.mantisrx.server.master.resourcecluster.TaskExecutorReport;
import io.mantisrx.server.master.resourcecluster.TaskExecutorTaskCancelledException;
import java.io.IOException;

import io.mantisrx.shaded.com.fasterxml.jackson.databind.JsonNode;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
Expand Down Expand Up @@ -71,10 +74,21 @@ public void testGetTaskExecutorStateWithCancelledWorker() throws IOException {
);
String encoded = serializer.toJson(heartbeat);

testRoute.run(
String response = testRoute.run(
HttpRequest.POST("/api/v1/resourceClusters/myCluster/actions/heartBeatFromTaskExecutor")
.withEntity(HttpEntities.create(ContentTypes.APPLICATION_JSON, encoded)))
.assertStatusCode(StatusCodes.NOT_ACCEPTABLE)
.assertEntity(serializer.toJson(err));
.entityString();

// Parse response and verify the error field contains the exception
ObjectMapper mapper = new ObjectMapper();
JsonNode responseNode = mapper.readTree(response);
JsonNode errorNode = responseNode.get("error");

String expectedError = serializer.toJson(err);
String actualError = mapper.writeValueAsString(errorNode);

assert actualError.equals(expectedError) :
String.format("Expected error: %s, but got: %s", expectedError, actualError);
}
}
Loading