diff --git a/src/main/kotlin/org/javabom/bomscheduler/config/BomConfig.kt b/src/main/kotlin/org/javabom/bomscheduler/config/BomConfig.kt index ebedb52..1c54dac 100644 --- a/src/main/kotlin/org/javabom/bomscheduler/config/BomConfig.kt +++ b/src/main/kotlin/org/javabom/bomscheduler/config/BomConfig.kt @@ -1,7 +1,16 @@ package org.javabom.bomscheduler.config +import org.javabom.bomscheduler.job.JobCollection import org.javabom.bomscheduler.job.MemoryJobCoordinator import org.javabom.bomscheduler.job.JobCoordinator +import org.javabom.bomscheduler.job.JobManager +import org.javabom.bomscheduler.schedule.BomSchedule +import org.javabom.bomscheduler.schedule.BomScheduleInterceptor +import org.javabom.bomscheduler.task.TaskManager +import org.javabom.bomscheduler.task.TaskRunner +import org.springframework.aop.Advisor +import org.springframework.aop.support.DefaultPointcutAdvisor +import org.springframework.aop.support.annotation.AnnotationMatchingPointcut import org.springframework.boot.context.properties.ConfigurationProperties import org.springframework.boot.context.properties.ConstructorBinding import org.springframework.context.annotation.Bean @@ -12,4 +21,29 @@ import org.springframework.context.annotation.Configuration class BomConfig{ @Bean fun jobCoordinator(): JobCoordinator = MemoryJobCoordinator() + + @Bean + fun jobManager(jobCoordinator: JobCoordinator, jobCollection: JobCollection): JobManager { + return JobManager(jobCoordinator, jobCollection) + } + + @Bean + fun taskRunner(): TaskRunner { + return TaskRunner() + } + + @Bean + fun TaskManager( + jobManager: JobManager, + jobCollection: JobCollection + ): TaskManager { + return TaskManager(jobManager, jobCollection, taskRunner()) + } + + @Bean + fun bomScheduleInterceptor(): Advisor { + val interceptor = BomScheduleInterceptor(taskRunner()) + val pointcut = AnnotationMatchingPointcut(BomSchedule::class.java) + return DefaultPointcutAdvisor(pointcut, interceptor) + } } diff --git a/src/main/kotlin/org/javabom/bomscheduler/job/Job.kt b/src/main/kotlin/org/javabom/bomscheduler/job/Job.kt index 6db4b2d..a372d58 100644 --- a/src/main/kotlin/org/javabom/bomscheduler/job/Job.kt +++ b/src/main/kotlin/org/javabom/bomscheduler/job/Job.kt @@ -9,4 +9,10 @@ data class Job( val version: Int = 0, val lastExecutionTime: LocalDateTime = LocalDateTime.now(), val instanceExpiredTime: LocalDateTime? = null -) +) { + + fun isExpired(): Boolean { + return instanceExpiredTime + ?.isBefore(LocalDateTime.now())?: false + } +} diff --git a/src/main/kotlin/org/javabom/bomscheduler/job/JobCollection.kt b/src/main/kotlin/org/javabom/bomscheduler/job/JobCollection.kt index ca6e42c..e4cf2ed 100644 --- a/src/main/kotlin/org/javabom/bomscheduler/job/JobCollection.kt +++ b/src/main/kotlin/org/javabom/bomscheduler/job/JobCollection.kt @@ -1,8 +1,19 @@ package org.javabom.bomscheduler.job -class JobCollection(definedJobs : List = emptyList()) { +/** + * 정의된 job 에 대한 참조를 가지고 있는 클래스 + */ +class JobCollection( + private var definedJobs: List = emptyList() +) { init { definedJobs.forEach{ println("Register Job ${it.name}")} } + + fun getDefinedJobs(): List = definedJobs + + fun replaceJobs(jobs: List) { + definedJobs = jobs + } } diff --git a/src/main/kotlin/org/javabom/bomscheduler/job/JobManager.kt b/src/main/kotlin/org/javabom/bomscheduler/job/JobManager.kt new file mode 100644 index 0000000..ec1726e --- /dev/null +++ b/src/main/kotlin/org/javabom/bomscheduler/job/JobManager.kt @@ -0,0 +1,73 @@ +package org.javabom.bomscheduler.job + +import org.javabom.bomscheduler.config.ScheduleConfig +import org.springframework.context.SmartLifecycle +import java.time.LocalDateTime + +/** + * JobCoordinator 와의 통신을 담당. + * JobCoordinator 로 부터 주기적으로 다른 인스턴스 (HOST) 로 부터 빼앗아 올 수 있는 Job이 있는지 확인하여 + * JobCollection에 등록한다 + */ +class JobManager( + private val jobCoordinator: JobCoordinator, + private val jobCollection: JobCollection +): SmartLifecycle { + + private var running = false + private var keepGoing = false + + override fun start() { + running = true + keepGoing = true + + Thread { + updateJob() + }.start() + } + + override fun stop() { + this.keepGoing = false + } + + override fun isRunning(): Boolean { + return running + } + + /** + * 주기적으로 JobCoordinator 에 빼앗아 올 수 있는 Job이 있는지 확인한다. + * 빼앗아올 Job 이 있다면 Job의 정보를 변경하고 JobCoordinator 에 update 요청을 한다. + * 그 후 JobColllection에 실행 가능한 Job 교체를 요청한다. + */ + private fun updateJob() { + while (keepGoing) { + val (currentJobs, anotherJobs) = jobCoordinator.getDefinedJob() + .partition { it.instanceName == ScheduleConfig.HOST_NAME } + + val now = LocalDateTime.now() + + val exchangedJobs = anotherJobs.filter { it.isExpired() } + .map { + val ttl = it.ttl + + it.copy( + instanceName = ScheduleConfig.HOST_NAME, + instanceExpiredTime = now.plusNanos(ttl) + ) + } + + + if (exchangedJobs.isNotEmpty()) { + jobCoordinator.updateJobs(exchangedJobs) + } + + jobCollection.replaceJobs(currentJobs + exchangedJobs) + } + + running = false + } + + fun updateJobExecutionTime(job: Job) { + jobCoordinator.updateJobs(listOf(job)) + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/javabom/bomscheduler/job/JobMetadata.kt b/src/main/kotlin/org/javabom/bomscheduler/job/JobMetadata.kt new file mode 100644 index 0000000..d301ad1 --- /dev/null +++ b/src/main/kotlin/org/javabom/bomscheduler/job/JobMetadata.kt @@ -0,0 +1,9 @@ +package org.javabom.bomscheduler.job + +import java.time.LocalDateTime + +data class JobMetadata( + val jobName: String, + val lastExecutionTime: LocalDateTime, + val ttl: Long +) diff --git a/src/main/kotlin/org/javabom/bomscheduler/schedule/BomScheduleInterceptor.kt b/src/main/kotlin/org/javabom/bomscheduler/schedule/BomScheduleInterceptor.kt new file mode 100644 index 0000000..7477dad --- /dev/null +++ b/src/main/kotlin/org/javabom/bomscheduler/schedule/BomScheduleInterceptor.kt @@ -0,0 +1,27 @@ +package org.javabom.bomscheduler.schedule + +import org.aopalliance.intercept.MethodInterceptor +import org.aopalliance.intercept.MethodInvocation +import org.javabom.bomscheduler.task.TaskManager +import org.javabom.bomscheduler.task.TaskRunner + +/** + * @BomSchedule 이 등록되어있는 Schedule 들이 실행 되기전 TaskRunner에 소비 가능한 Task가 있는지 (jobName 기준) 확인 후 + * 있을 경우에만 task 를 소비하고 정의된 행동을 수행한다. + */ +class BomScheduleInterceptor( + private val taskRunner: TaskRunner +) : MethodInterceptor { + override fun invoke(invocation: MethodInvocation): Any? { + val jobName = invocation.method.getAnnotation(BomSchedule::class.java).jobName + + val task = taskRunner.findTaskByJobName(jobName) + + if (task != null) { + task.consuming() + return invocation.proceed() + } + + return null + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/javabom/bomscheduler/task/Task.kt b/src/main/kotlin/org/javabom/bomscheduler/task/Task.kt new file mode 100644 index 0000000..ab335a0 --- /dev/null +++ b/src/main/kotlin/org/javabom/bomscheduler/task/Task.kt @@ -0,0 +1,11 @@ +package org.javabom.bomscheduler.task + +data class Task ( + val jobName: String, + val callback: () -> Unit +) { + + fun consuming() { + callback.invoke() + } +} diff --git a/src/main/kotlin/org/javabom/bomscheduler/task/TaskManager.kt b/src/main/kotlin/org/javabom/bomscheduler/task/TaskManager.kt new file mode 100644 index 0000000..c9a26c5 --- /dev/null +++ b/src/main/kotlin/org/javabom/bomscheduler/task/TaskManager.kt @@ -0,0 +1,70 @@ +package org.javabom.bomscheduler.task + +import org.javabom.bomscheduler.config.ScheduleConfig +import org.javabom.bomscheduler.job.JobCollection +import org.javabom.bomscheduler.job.JobManager +import org.springframework.context.SmartLifecycle +import java.time.LocalDateTime + +/** + * 주기적으로 JobCollection 에서 실행가능한 Job을 기반으로 Task를 생성해 TaskRunner 에 전달하는 클래스 + * JobManager를 의존하고있는 이유는 Task가 소비될 때 callback으로 Jobmanager에게 해당 job의 실행 시간 update 요청을 하기 위함 + */ +class TaskManager( + private val jobManager: JobManager, + private val jobCollection: JobCollection, + private val taskRunner: TaskRunner +): SmartLifecycle { + private var running = false + private var keepGoing = false + + + override fun start() { + running = true + keepGoing = true + + Thread { + createExecutableTasks() + }.start() + } + + override fun stop() { + keepGoing = false + } + + override fun isRunning(): Boolean { + return running + } + + /** + * Task를 생성하는 메소드 + * 생성 후 TaskRunner에 전달한다 + */ + private fun createExecutableTasks() { + val executableTasks = jobCollection.getDefinedJobs() + .filter { it.instanceName == ScheduleConfig.HOST_NAME } + .map { + Task( + jobName = it.name, + callback = { + val executionTime = LocalDateTime.now() + val ttl = it.ttl + + taskRunner.consumeTask(it.name) + jobManager.updateJobExecutionTime( + it.copy( + lastExecutionTime = executionTime, + instanceExpiredTime = executionTime.plusNanos(ttl) + ) + ) + } + ) + } + + while (keepGoing) { + taskRunner.putTasks(executableTasks) + } + + running = false + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/javabom/bomscheduler/task/TaskRunner.kt b/src/main/kotlin/org/javabom/bomscheduler/task/TaskRunner.kt new file mode 100644 index 0000000..c33608a --- /dev/null +++ b/src/main/kotlin/org/javabom/bomscheduler/task/TaskRunner.kt @@ -0,0 +1,23 @@ +package org.javabom.bomscheduler.task + +import org.javabom.bomscheduler.job.Job +import org.javabom.bomscheduler.job.JobMetadata +import org.springframework.stereotype.Component + +/** + * TaskRunner 현재 실행 가능한 Task 를 가지고 있는 클래스 + * Task가 소비될 시 해당 Task를 제거한다 + */ +class TaskRunner { + private val executableTasks: MutableMap = mutableMapOf() + + fun putTasks(tasks: List) { + tasks.forEach { + executableTasks.putIfAbsent(it.jobName, it) + } + } + + fun findTaskByJobName(jobName: String) = executableTasks[jobName] + + fun consumeTask(jobName: String) = executableTasks.remove(jobName) +} \ No newline at end of file