From a4ae6ab01e2562bcf333c0a5b28e7e0ad2cbecb2 Mon Sep 17 00:00:00 2001 From: Gobukgol Date: Fri, 2 Sep 2022 19:15:45 +0900 Subject: [PATCH 1/3] =?UTF-8?q?=EC=8B=A4=ED=96=89=EB=8B=A8=EA=B3=84=20?= =?UTF-8?q?=EC=B5=9C=EC=B4=88=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bomscheduler/config/JobManagerConfig.kt | 44 +++++++++++++ .../org/javabom/bomscheduler/job/Job.kt | 8 ++- .../javabom/bomscheduler/job/JobCollection.kt | 10 ++- .../javabom/bomscheduler/job/JobManager.kt | 63 +++++++++++++++++++ .../javabom/bomscheduler/job/JobMetadata.kt | 9 +++ .../schedule/BomScheduleInterceptor.kt | 23 +++++++ .../org/javabom/bomscheduler/task/Task.kt | 11 ++++ .../javabom/bomscheduler/task/TaskManager.kt | 62 ++++++++++++++++++ .../javabom/bomscheduler/task/TaskRunner.kt | 19 ++++++ 9 files changed, 247 insertions(+), 2 deletions(-) create mode 100644 src/main/kotlin/org/javabom/bomscheduler/config/JobManagerConfig.kt create mode 100644 src/main/kotlin/org/javabom/bomscheduler/job/JobManager.kt create mode 100644 src/main/kotlin/org/javabom/bomscheduler/job/JobMetadata.kt create mode 100644 src/main/kotlin/org/javabom/bomscheduler/schedule/BomScheduleInterceptor.kt create mode 100644 src/main/kotlin/org/javabom/bomscheduler/task/Task.kt create mode 100644 src/main/kotlin/org/javabom/bomscheduler/task/TaskManager.kt create mode 100644 src/main/kotlin/org/javabom/bomscheduler/task/TaskRunner.kt diff --git a/src/main/kotlin/org/javabom/bomscheduler/config/JobManagerConfig.kt b/src/main/kotlin/org/javabom/bomscheduler/config/JobManagerConfig.kt new file mode 100644 index 0000000..fbe3e82 --- /dev/null +++ b/src/main/kotlin/org/javabom/bomscheduler/config/JobManagerConfig.kt @@ -0,0 +1,44 @@ +package org.javabom.bomscheduler.config + +import org.javabom.bomscheduler.job.JobCollection +import org.javabom.bomscheduler.job.JobCoordinator +import org.javabom.bomscheduler.job.JobManager +import org.javabom.bomscheduler.schedule.BomSchedule +import org.javabom.bomscheduler.schedule.BomScheduleInterceptor +import org.javabom.bomscheduler.task.TaskManager +import org.javabom.bomscheduler.task.TaskRunner +import org.springframework.aop.Advisor +import org.springframework.aop.support.DefaultPointcutAdvisor +import org.springframework.aop.support.annotation.AnnotationMatchingPointcut +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.context.annotation.Import + +@Configuration +class JobManagerConfig { + + @Bean + fun jobManager(jobCoordinator: JobCoordinator, jobCollection: JobCollection): JobManager { + return JobManager(jobCoordinator, jobCollection) + } + + @Bean + fun taskRunner(): TaskRunner { + return TaskRunner() + } + + @Bean + fun TaskManager( + jobManager: JobManager, + jobCollection: JobCollection + ): TaskManager { + return TaskManager(jobManager, jobCollection, taskRunner()) + } + + @Bean + fun bomScheduleInterceptor(): Advisor { + val interceptor = BomScheduleInterceptor(taskRunner()) + val pointcut = AnnotationMatchingPointcut(BomSchedule::class.java) + return DefaultPointcutAdvisor(pointcut, interceptor) + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/javabom/bomscheduler/job/Job.kt b/src/main/kotlin/org/javabom/bomscheduler/job/Job.kt index 6db4b2d..a372d58 100644 --- a/src/main/kotlin/org/javabom/bomscheduler/job/Job.kt +++ b/src/main/kotlin/org/javabom/bomscheduler/job/Job.kt @@ -9,4 +9,10 @@ data class Job( val version: Int = 0, val lastExecutionTime: LocalDateTime = LocalDateTime.now(), val instanceExpiredTime: LocalDateTime? = null -) +) { + + fun isExpired(): Boolean { + return instanceExpiredTime + ?.isBefore(LocalDateTime.now())?: false + } +} diff --git a/src/main/kotlin/org/javabom/bomscheduler/job/JobCollection.kt b/src/main/kotlin/org/javabom/bomscheduler/job/JobCollection.kt index ca6e42c..d47fd56 100644 --- a/src/main/kotlin/org/javabom/bomscheduler/job/JobCollection.kt +++ b/src/main/kotlin/org/javabom/bomscheduler/job/JobCollection.kt @@ -1,8 +1,16 @@ package org.javabom.bomscheduler.job -class JobCollection(definedJobs : List = emptyList()) { +class JobCollection( + private var definedJobs: List = emptyList() +) { init { definedJobs.forEach{ println("Register Job ${it.name}")} } + + fun getDefinedJobs(): List = definedJobs + + fun replaceJobs(jobs: List) { + definedJobs = jobs + } } diff --git a/src/main/kotlin/org/javabom/bomscheduler/job/JobManager.kt b/src/main/kotlin/org/javabom/bomscheduler/job/JobManager.kt new file mode 100644 index 0000000..7658689 --- /dev/null +++ b/src/main/kotlin/org/javabom/bomscheduler/job/JobManager.kt @@ -0,0 +1,63 @@ +package org.javabom.bomscheduler.job + +import org.javabom.bomscheduler.config.ScheduleConfig +import org.springframework.context.SmartLifecycle +import java.time.LocalDateTime + +class JobManager( + private val jobCoordinator: JobCoordinator, + private val jobCollection: JobCollection +): SmartLifecycle { + + private var running = false + private var keepGoing = false + + override fun start() { + running = true + keepGoing = true + + Thread { + updateJob() + }.start() + } + + override fun stop() { + this.keepGoing = false + } + + override fun isRunning(): Boolean { + return running + } + + private fun updateJob() { + while (keepGoing) { + val (currentJobs, anotherJobs) = jobCoordinator.getDefinedJob() + .partition { it.instanceName == ScheduleConfig.HOST_NAME } + + val now = LocalDateTime.now() + + val exchangedJobs = anotherJobs.filter { it.isExpired() } + .map { + val ttl = it.ttl + + it.copy( + instanceName = ScheduleConfig.HOST_NAME, + instanceExpiredTime = now.plusNanos(ttl) + ) + } + + + if (exchangedJobs.isNotEmpty()) { + jobCoordinator.updateJobs(exchangedJobs) + } + + jobCollection.replaceJobs(currentJobs + exchangedJobs) + } + + running = false + } + + fun updateJobExecutionTime(job: Job) { + jobCoordinator.updateJobs(listOf(job)) + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/javabom/bomscheduler/job/JobMetadata.kt b/src/main/kotlin/org/javabom/bomscheduler/job/JobMetadata.kt new file mode 100644 index 0000000..d301ad1 --- /dev/null +++ b/src/main/kotlin/org/javabom/bomscheduler/job/JobMetadata.kt @@ -0,0 +1,9 @@ +package org.javabom.bomscheduler.job + +import java.time.LocalDateTime + +data class JobMetadata( + val jobName: String, + val lastExecutionTime: LocalDateTime, + val ttl: Long +) diff --git a/src/main/kotlin/org/javabom/bomscheduler/schedule/BomScheduleInterceptor.kt b/src/main/kotlin/org/javabom/bomscheduler/schedule/BomScheduleInterceptor.kt new file mode 100644 index 0000000..e1eb7e9 --- /dev/null +++ b/src/main/kotlin/org/javabom/bomscheduler/schedule/BomScheduleInterceptor.kt @@ -0,0 +1,23 @@ +package org.javabom.bomscheduler.schedule + +import org.aopalliance.intercept.MethodInterceptor +import org.aopalliance.intercept.MethodInvocation +import org.javabom.bomscheduler.task.TaskManager +import org.javabom.bomscheduler.task.TaskRunner + +class BomScheduleInterceptor( + private val taskRunner: TaskRunner +) : MethodInterceptor { + override fun invoke(invocation: MethodInvocation): Any? { + val jobName = invocation.method.getAnnotation(BomSchedule::class.java).jobName + + val task = taskRunner.findTaskByJobName(jobName) + + if (task != null) { + task.consuming() + return invocation.proceed() + } + + return null + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/javabom/bomscheduler/task/Task.kt b/src/main/kotlin/org/javabom/bomscheduler/task/Task.kt new file mode 100644 index 0000000..ab335a0 --- /dev/null +++ b/src/main/kotlin/org/javabom/bomscheduler/task/Task.kt @@ -0,0 +1,11 @@ +package org.javabom.bomscheduler.task + +data class Task ( + val jobName: String, + val callback: () -> Unit +) { + + fun consuming() { + callback.invoke() + } +} diff --git a/src/main/kotlin/org/javabom/bomscheduler/task/TaskManager.kt b/src/main/kotlin/org/javabom/bomscheduler/task/TaskManager.kt new file mode 100644 index 0000000..54b3cf4 --- /dev/null +++ b/src/main/kotlin/org/javabom/bomscheduler/task/TaskManager.kt @@ -0,0 +1,62 @@ +package org.javabom.bomscheduler.task + +import org.javabom.bomscheduler.config.ScheduleConfig +import org.javabom.bomscheduler.job.JobCollection +import org.javabom.bomscheduler.job.JobManager +import org.springframework.context.SmartLifecycle +import java.time.LocalDateTime + +class TaskManager( + private val jobManager: JobManager, + private val jobCollection: JobCollection, + private val taskRunner: TaskRunner +): SmartLifecycle { + private var running = false + private var keepGoing = false + + + override fun start() { + running = true + keepGoing = true + + Thread { + createExecutableTasks() + }.start() + } + + override fun stop() { + keepGoing = false + } + + override fun isRunning(): Boolean { + return running + } + + private fun createExecutableTasks() { + val executableTasks = jobCollection.getDefinedJobs() + .filter { it.instanceName == ScheduleConfig.HOST_NAME } + .map { + Task( + jobName = it.name, + callback = { + val executionTime = LocalDateTime.now() + val ttl = it.ttl + + taskRunner.consumeTask(it.name) + jobManager.updateJobExecutionTime( + it.copy( + lastExecutionTime = executionTime, + instanceExpiredTime = executionTime.plusNanos(ttl) + ) + ) + } + ) + } + + while (keepGoing) { + taskRunner.putTasks(executableTasks) + } + + running = false + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/javabom/bomscheduler/task/TaskRunner.kt b/src/main/kotlin/org/javabom/bomscheduler/task/TaskRunner.kt new file mode 100644 index 0000000..d8a1f62 --- /dev/null +++ b/src/main/kotlin/org/javabom/bomscheduler/task/TaskRunner.kt @@ -0,0 +1,19 @@ +package org.javabom.bomscheduler.task + +import org.javabom.bomscheduler.job.Job +import org.javabom.bomscheduler.job.JobMetadata +import org.springframework.stereotype.Component + +class TaskRunner { + private val executableTasks: MutableMap = mutableMapOf() + + fun putTasks(tasks: List) { + tasks.forEach { + executableTasks.putIfAbsent(it.jobName, it) + } + } + + fun findTaskByJobName(jobName: String) = executableTasks[jobName] + + fun consumeTask(jobName: String) = executableTasks.remove(jobName) +} \ No newline at end of file From a9b24bf9fc5be14d06dd1179cfc27fa05bcdda09 Mon Sep 17 00:00:00 2001 From: Gobukgol Date: Sat, 10 Sep 2022 16:16:06 +0900 Subject: [PATCH 2/3] =?UTF-8?q?JobMangerConfig=20BomConfig=20=EB=B3=91?= =?UTF-8?q?=ED=95=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../javabom/bomscheduler/config/BomConfig.kt | 34 ++++++++++++++ .../bomscheduler/config/JobManagerConfig.kt | 44 ------------------- 2 files changed, 34 insertions(+), 44 deletions(-) delete mode 100644 src/main/kotlin/org/javabom/bomscheduler/config/JobManagerConfig.kt diff --git a/src/main/kotlin/org/javabom/bomscheduler/config/BomConfig.kt b/src/main/kotlin/org/javabom/bomscheduler/config/BomConfig.kt index ebedb52..1c54dac 100644 --- a/src/main/kotlin/org/javabom/bomscheduler/config/BomConfig.kt +++ b/src/main/kotlin/org/javabom/bomscheduler/config/BomConfig.kt @@ -1,7 +1,16 @@ package org.javabom.bomscheduler.config +import org.javabom.bomscheduler.job.JobCollection import org.javabom.bomscheduler.job.MemoryJobCoordinator import org.javabom.bomscheduler.job.JobCoordinator +import org.javabom.bomscheduler.job.JobManager +import org.javabom.bomscheduler.schedule.BomSchedule +import org.javabom.bomscheduler.schedule.BomScheduleInterceptor +import org.javabom.bomscheduler.task.TaskManager +import org.javabom.bomscheduler.task.TaskRunner +import org.springframework.aop.Advisor +import org.springframework.aop.support.DefaultPointcutAdvisor +import org.springframework.aop.support.annotation.AnnotationMatchingPointcut import org.springframework.boot.context.properties.ConfigurationProperties import org.springframework.boot.context.properties.ConstructorBinding import org.springframework.context.annotation.Bean @@ -12,4 +21,29 @@ import org.springframework.context.annotation.Configuration class BomConfig{ @Bean fun jobCoordinator(): JobCoordinator = MemoryJobCoordinator() + + @Bean + fun jobManager(jobCoordinator: JobCoordinator, jobCollection: JobCollection): JobManager { + return JobManager(jobCoordinator, jobCollection) + } + + @Bean + fun taskRunner(): TaskRunner { + return TaskRunner() + } + + @Bean + fun TaskManager( + jobManager: JobManager, + jobCollection: JobCollection + ): TaskManager { + return TaskManager(jobManager, jobCollection, taskRunner()) + } + + @Bean + fun bomScheduleInterceptor(): Advisor { + val interceptor = BomScheduleInterceptor(taskRunner()) + val pointcut = AnnotationMatchingPointcut(BomSchedule::class.java) + return DefaultPointcutAdvisor(pointcut, interceptor) + } } diff --git a/src/main/kotlin/org/javabom/bomscheduler/config/JobManagerConfig.kt b/src/main/kotlin/org/javabom/bomscheduler/config/JobManagerConfig.kt deleted file mode 100644 index fbe3e82..0000000 --- a/src/main/kotlin/org/javabom/bomscheduler/config/JobManagerConfig.kt +++ /dev/null @@ -1,44 +0,0 @@ -package org.javabom.bomscheduler.config - -import org.javabom.bomscheduler.job.JobCollection -import org.javabom.bomscheduler.job.JobCoordinator -import org.javabom.bomscheduler.job.JobManager -import org.javabom.bomscheduler.schedule.BomSchedule -import org.javabom.bomscheduler.schedule.BomScheduleInterceptor -import org.javabom.bomscheduler.task.TaskManager -import org.javabom.bomscheduler.task.TaskRunner -import org.springframework.aop.Advisor -import org.springframework.aop.support.DefaultPointcutAdvisor -import org.springframework.aop.support.annotation.AnnotationMatchingPointcut -import org.springframework.context.annotation.Bean -import org.springframework.context.annotation.Configuration -import org.springframework.context.annotation.Import - -@Configuration -class JobManagerConfig { - - @Bean - fun jobManager(jobCoordinator: JobCoordinator, jobCollection: JobCollection): JobManager { - return JobManager(jobCoordinator, jobCollection) - } - - @Bean - fun taskRunner(): TaskRunner { - return TaskRunner() - } - - @Bean - fun TaskManager( - jobManager: JobManager, - jobCollection: JobCollection - ): TaskManager { - return TaskManager(jobManager, jobCollection, taskRunner()) - } - - @Bean - fun bomScheduleInterceptor(): Advisor { - val interceptor = BomScheduleInterceptor(taskRunner()) - val pointcut = AnnotationMatchingPointcut(BomSchedule::class.java) - return DefaultPointcutAdvisor(pointcut, interceptor) - } -} \ No newline at end of file From e74a3c4f94c2824c18ca018bbe5ae82908d39fd5 Mon Sep 17 00:00:00 2001 From: Gobukgol Date: Mon, 26 Sep 2022 19:29:15 +0900 Subject: [PATCH 3/3] =?UTF-8?q?=EA=B5=AC=ED=98=84=20=ED=81=B4=EB=9E=98?= =?UTF-8?q?=EC=8A=A4=EB=93=A4=EC=97=90=20=EC=A3=BC=EC=84=9D=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/javabom/bomscheduler/job/JobCollection.kt | 3 +++ .../kotlin/org/javabom/bomscheduler/job/JobManager.kt | 10 ++++++++++ .../bomscheduler/schedule/BomScheduleInterceptor.kt | 4 ++++ .../org/javabom/bomscheduler/task/TaskManager.kt | 8 ++++++++ .../kotlin/org/javabom/bomscheduler/task/TaskRunner.kt | 4 ++++ 5 files changed, 29 insertions(+) diff --git a/src/main/kotlin/org/javabom/bomscheduler/job/JobCollection.kt b/src/main/kotlin/org/javabom/bomscheduler/job/JobCollection.kt index d47fd56..e4cf2ed 100644 --- a/src/main/kotlin/org/javabom/bomscheduler/job/JobCollection.kt +++ b/src/main/kotlin/org/javabom/bomscheduler/job/JobCollection.kt @@ -1,5 +1,8 @@ package org.javabom.bomscheduler.job +/** + * 정의된 job 에 대한 참조를 가지고 있는 클래스 + */ class JobCollection( private var definedJobs: List = emptyList() ) { diff --git a/src/main/kotlin/org/javabom/bomscheduler/job/JobManager.kt b/src/main/kotlin/org/javabom/bomscheduler/job/JobManager.kt index 7658689..ec1726e 100644 --- a/src/main/kotlin/org/javabom/bomscheduler/job/JobManager.kt +++ b/src/main/kotlin/org/javabom/bomscheduler/job/JobManager.kt @@ -4,6 +4,11 @@ import org.javabom.bomscheduler.config.ScheduleConfig import org.springframework.context.SmartLifecycle import java.time.LocalDateTime +/** + * JobCoordinator 와의 통신을 담당. + * JobCoordinator 로 부터 주기적으로 다른 인스턴스 (HOST) 로 부터 빼앗아 올 수 있는 Job이 있는지 확인하여 + * JobCollection에 등록한다 + */ class JobManager( private val jobCoordinator: JobCoordinator, private val jobCollection: JobCollection @@ -29,6 +34,11 @@ class JobManager( return running } + /** + * 주기적으로 JobCoordinator 에 빼앗아 올 수 있는 Job이 있는지 확인한다. + * 빼앗아올 Job 이 있다면 Job의 정보를 변경하고 JobCoordinator 에 update 요청을 한다. + * 그 후 JobColllection에 실행 가능한 Job 교체를 요청한다. + */ private fun updateJob() { while (keepGoing) { val (currentJobs, anotherJobs) = jobCoordinator.getDefinedJob() diff --git a/src/main/kotlin/org/javabom/bomscheduler/schedule/BomScheduleInterceptor.kt b/src/main/kotlin/org/javabom/bomscheduler/schedule/BomScheduleInterceptor.kt index e1eb7e9..7477dad 100644 --- a/src/main/kotlin/org/javabom/bomscheduler/schedule/BomScheduleInterceptor.kt +++ b/src/main/kotlin/org/javabom/bomscheduler/schedule/BomScheduleInterceptor.kt @@ -5,6 +5,10 @@ import org.aopalliance.intercept.MethodInvocation import org.javabom.bomscheduler.task.TaskManager import org.javabom.bomscheduler.task.TaskRunner +/** + * @BomSchedule 이 등록되어있는 Schedule 들이 실행 되기전 TaskRunner에 소비 가능한 Task가 있는지 (jobName 기준) 확인 후 + * 있을 경우에만 task 를 소비하고 정의된 행동을 수행한다. + */ class BomScheduleInterceptor( private val taskRunner: TaskRunner ) : MethodInterceptor { diff --git a/src/main/kotlin/org/javabom/bomscheduler/task/TaskManager.kt b/src/main/kotlin/org/javabom/bomscheduler/task/TaskManager.kt index 54b3cf4..c9a26c5 100644 --- a/src/main/kotlin/org/javabom/bomscheduler/task/TaskManager.kt +++ b/src/main/kotlin/org/javabom/bomscheduler/task/TaskManager.kt @@ -6,6 +6,10 @@ import org.javabom.bomscheduler.job.JobManager import org.springframework.context.SmartLifecycle import java.time.LocalDateTime +/** + * 주기적으로 JobCollection 에서 실행가능한 Job을 기반으로 Task를 생성해 TaskRunner 에 전달하는 클래스 + * JobManager를 의존하고있는 이유는 Task가 소비될 때 callback으로 Jobmanager에게 해당 job의 실행 시간 update 요청을 하기 위함 + */ class TaskManager( private val jobManager: JobManager, private val jobCollection: JobCollection, @@ -32,6 +36,10 @@ class TaskManager( return running } + /** + * Task를 생성하는 메소드 + * 생성 후 TaskRunner에 전달한다 + */ private fun createExecutableTasks() { val executableTasks = jobCollection.getDefinedJobs() .filter { it.instanceName == ScheduleConfig.HOST_NAME } diff --git a/src/main/kotlin/org/javabom/bomscheduler/task/TaskRunner.kt b/src/main/kotlin/org/javabom/bomscheduler/task/TaskRunner.kt index d8a1f62..c33608a 100644 --- a/src/main/kotlin/org/javabom/bomscheduler/task/TaskRunner.kt +++ b/src/main/kotlin/org/javabom/bomscheduler/task/TaskRunner.kt @@ -4,6 +4,10 @@ import org.javabom.bomscheduler.job.Job import org.javabom.bomscheduler.job.JobMetadata import org.springframework.stereotype.Component +/** + * TaskRunner 현재 실행 가능한 Task 를 가지고 있는 클래스 + * Task가 소비될 시 해당 Task를 제거한다 + */ class TaskRunner { private val executableTasks: MutableMap = mutableMapOf()