From 565a2dcabf63cfbf68cdd633299faaf4966ccdc7 Mon Sep 17 00:00:00 2001 From: Irene Cho Date: Thu, 15 Jun 2017 16:28:27 -0400 Subject: [PATCH] init commit for consecutive fail job counts --- .../java/com/addthis/hydra/job/JobTask.java | 4 ++++ .../com/addthis/hydra/job/spawn/Spawn.java | 21 ++++++++++++++++--- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/hydra-api/src/main/java/com/addthis/hydra/job/JobTask.java b/hydra-api/src/main/java/com/addthis/hydra/job/JobTask.java index 2e8057298..14f7a10ab 100644 --- a/hydra-api/src/main/java/com/addthis/hydra/job/JobTask.java +++ b/hydra-api/src/main/java/com/addthis/hydra/job/JobTask.java @@ -173,6 +173,10 @@ public int getErrors() { return errors; } + public void setErrors(int errors) { + this.errors = errors; + } + public int incrementErrors() { return ++errors; } diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/spawn/Spawn.java b/hydra-main/src/main/java/com/addthis/hydra/job/spawn/Spawn.java index 1dc0b23e7..100da9148 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/spawn/Spawn.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/spawn/Spawn.java @@ -2762,7 +2762,7 @@ protected void handleMessage(CoreMessage core) { } } else if (core instanceof StatusTaskEnd) { StatusTaskEnd end = (StatusTaskEnd) core; - log.info("[task.end] :: {}/{} exit={}", end.getJobUuid(), end.getNodeID(), end.getExitCode()); + log.info("[task.end] :: {} / {} exit={}", end.getJobUuid(), end.getNodeID(), end.getExitCode()); SpawnMetrics.tasksCompletedPerHour.mark(); try { job = getJob(end.getJobUuid()); @@ -2774,7 +2774,9 @@ protected void handleMessage(CoreMessage core) { if (task.isRunning()) { taskQueuesByPriority.incrementHostAvailableSlots(end.getHostUuid()); } - handleStatusTaskEnd(job, task, end); + if(handleStatusTaskEnd(job, task, end) == 1) { + resetTaskErrors(job); + } } } } catch (Exception ex) { @@ -2785,6 +2787,14 @@ protected void handleMessage(CoreMessage core) { } } + void resetTaskErrors(Job job) { + List tasks = job.getCopyOfTasks(); + for(JobTask task : tasks) { + task.setErrors(0); + log.info("Error counter for consecutive job fails has been reset to 0 for job id = {}, host id = {}, taskId = {}", task.getJobUUID(), task.getHostUUID(), task.getTaskID()); + } + } + /** * Get a replacement host for a new task * @@ -3020,7 +3030,8 @@ private void stopTask(String jobUUID, int taskID, boolean force, boolean onlyIfQ * @param task The task to modify * @param update The message */ - private void handleStatusTaskEnd(Job job, JobTask task, StatusTaskEnd update) { + int handleStatusTaskEnd(Job job, JobTask task, StatusTaskEnd update) { + int flag = 0; TaskExitState exitState = update.getExitState(); boolean wasStopped = (exitState != null) && exitState.getWasStopped(); task.setFileCount(update.getFileCount()); @@ -3043,8 +3054,12 @@ private void handleStatusTaskEnd(Job job, JobTask task, StatusTaskEnd update) { } if (job.isFinished() && (update.getRebalanceSource() == null)) { finishJob(job, errored); + if(!errored) { + flag = 1; + } } queueJobTaskUpdateEvent(job); + return flag; } /**