Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/main/kotlin/org/javabom/bomscheduler/config/BomConfig.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
package org.javabom.bomscheduler.config

import org.javabom.bomscheduler.job.JobCollection
import org.javabom.bomscheduler.job.MemoryJobCoordinator
import org.javabom.bomscheduler.job.JobCoordinator
import org.javabom.bomscheduler.job.JobManager
import org.javabom.bomscheduler.schedule.BomSchedule
import org.javabom.bomscheduler.schedule.BomScheduleInterceptor
import org.javabom.bomscheduler.task.TaskManager
import org.javabom.bomscheduler.task.TaskRunner
import org.springframework.aop.Advisor
import org.springframework.aop.support.DefaultPointcutAdvisor
import org.springframework.aop.support.annotation.AnnotationMatchingPointcut
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.boot.context.properties.ConstructorBinding
import org.springframework.context.annotation.Bean
Expand All @@ -12,4 +21,29 @@ import org.springframework.context.annotation.Configuration
class BomConfig{
@Bean
fun jobCoordinator(): JobCoordinator = MemoryJobCoordinator()

@Bean
fun jobManager(jobCoordinator: JobCoordinator, jobCollection: JobCollection): JobManager {
return JobManager(jobCoordinator, jobCollection)
}

@Bean
fun taskRunner(): TaskRunner {
return TaskRunner()
}

@Bean
fun TaskManager(
jobManager: JobManager,
jobCollection: JobCollection
): TaskManager {
return TaskManager(jobManager, jobCollection, taskRunner())
}

@Bean
fun bomScheduleInterceptor(): Advisor {
val interceptor = BomScheduleInterceptor(taskRunner())
val pointcut = AnnotationMatchingPointcut(BomSchedule::class.java)
return DefaultPointcutAdvisor(pointcut, interceptor)
}
}
8 changes: 7 additions & 1 deletion src/main/kotlin/org/javabom/bomscheduler/job/Job.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,10 @@ data class Job(
val version: Int = 0,
val lastExecutionTime: LocalDateTime = LocalDateTime.now(),
val instanceExpiredTime: LocalDateTime? = null
)
) {

fun isExpired(): Boolean {
return instanceExpiredTime
?.isBefore(LocalDateTime.now())?: false
}
}
13 changes: 12 additions & 1 deletion src/main/kotlin/org/javabom/bomscheduler/job/JobCollection.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
package org.javabom.bomscheduler.job

class JobCollection(definedJobs : List<Job> = emptyList()) {
/**
* 정의된 job 에 대한 참조를 가지고 있는 클래스
*/
class JobCollection(
private var definedJobs: List<Job> = emptyList()
) {

init {
definedJobs.forEach{ println("Register Job ${it.name}")}
}

fun getDefinedJobs(): List<Job> = definedJobs

fun replaceJobs(jobs: List<Job>) {
definedJobs = jobs
}
}
73 changes: 73 additions & 0 deletions src/main/kotlin/org/javabom/bomscheduler/job/JobManager.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.javabom.bomscheduler.job

import org.javabom.bomscheduler.config.ScheduleConfig
import org.springframework.context.SmartLifecycle
import java.time.LocalDateTime

/**
* JobCoordinator 와의 통신을 담당.
* JobCoordinator 로 부터 주기적으로 다른 인스턴스 (HOST) 로 부터 빼앗아 올 수 있는 Job이 있는지 확인하여
* JobCollection에 등록한다
*/
class JobManager(
private val jobCoordinator: JobCoordinator,
private val jobCollection: JobCollection
): SmartLifecycle {

private var running = false
private var keepGoing = false

override fun start() {
running = true
keepGoing = true

Thread {
updateJob()
}.start()
}

override fun stop() {
this.keepGoing = false
}

override fun isRunning(): Boolean {
return running
}

/**
* 주기적으로 JobCoordinator 에 빼앗아 올 수 있는 Job이 있는지 확인한다.
* 빼앗아올 Job 이 있다면 Job의 정보를 변경하고 JobCoordinator 에 update 요청을 한다.
* 그 후 JobColllection에 실행 가능한 Job 교체를 요청한다.
*/
private fun updateJob() {
while (keepGoing) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

이조건문 탈출하려면 어떻게 해야하는거야?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

요거 스프링이 종료될 때 SmartLifeCycle 구현한 클래스의 stop 메소드 호출하면서 while 문 탈출 할 수 있도록 돌아가는걸로 알고있어!

val (currentJobs, anotherJobs) = jobCoordinator.getDefinedJob()
.partition { it.instanceName == ScheduleConfig.HOST_NAME }

val now = LocalDateTime.now()

val exchangedJobs = anotherJobs.filter { it.isExpired() }
.map {
val ttl = it.ttl

it.copy(
instanceName = ScheduleConfig.HOST_NAME,
instanceExpiredTime = now.plusNanos(ttl)
)
}


if (exchangedJobs.isNotEmpty()) {
jobCoordinator.updateJobs(exchangedJobs)
}

jobCollection.replaceJobs(currentJobs + exchangedJobs)
}

running = false
}

fun updateJobExecutionTime(job: Job) {
jobCoordinator.updateJobs(listOf(job))
}
}
9 changes: 9 additions & 0 deletions src/main/kotlin/org/javabom/bomscheduler/job/JobMetadata.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.javabom.bomscheduler.job

import java.time.LocalDateTime

data class JobMetadata(
val jobName: String,
val lastExecutionTime: LocalDateTime,
val ttl: Long
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.javabom.bomscheduler.schedule

import org.aopalliance.intercept.MethodInterceptor
import org.aopalliance.intercept.MethodInvocation
import org.javabom.bomscheduler.task.TaskManager
import org.javabom.bomscheduler.task.TaskRunner

/**
* @BomSchedule 이 등록되어있는 Schedule 들이 실행 되기전 TaskRunner에 소비 가능한 Task가 있는지 (jobName 기준) 확인 후
* 있을 경우에만 task 를 소비하고 정의된 행동을 수행한다.
*/
class BomScheduleInterceptor(
private val taskRunner: TaskRunner
) : MethodInterceptor {
override fun invoke(invocation: MethodInvocation): Any? {
val jobName = invocation.method.getAnnotation(BomSchedule::class.java).jobName

val task = taskRunner.findTaskByJobName(jobName)

if (task != null) {
task.consuming()
return invocation.proceed()
}

return null
}
}
11 changes: 11 additions & 0 deletions src/main/kotlin/org/javabom/bomscheduler/task/Task.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.javabom.bomscheduler.task

data class Task (
val jobName: String,
val callback: () -> Unit
) {

fun consuming() {
callback.invoke()
}
}
70 changes: 70 additions & 0 deletions src/main/kotlin/org/javabom/bomscheduler/task/TaskManager.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.javabom.bomscheduler.task

import org.javabom.bomscheduler.config.ScheduleConfig
import org.javabom.bomscheduler.job.JobCollection
import org.javabom.bomscheduler.job.JobManager
import org.springframework.context.SmartLifecycle
import java.time.LocalDateTime

/**
* 주기적으로 JobCollection 에서 실행가능한 Job을 기반으로 Task를 생성해 TaskRunner 에 전달하는 클래스
* JobManager를 의존하고있는 이유는 Task가 소비될 때 callback으로 Jobmanager에게 해당 job의 실행 시간 update 요청을 하기 위함
*/
class TaskManager(
private val jobManager: JobManager,
private val jobCollection: JobCollection,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

요거 잡 매니저 통해서 사용하면 이상한가?????

private val taskRunner: TaskRunner
): SmartLifecycle {
private var running = false
private var keepGoing = false


override fun start() {
running = true
keepGoing = true

Thread {
createExecutableTasks()
}.start()
}

override fun stop() {
keepGoing = false
}

override fun isRunning(): Boolean {
return running
}

/**
* Task를 생성하는 메소드
* 생성 후 TaskRunner에 전달한다
*/
private fun createExecutableTasks() {
val executableTasks = jobCollection.getDefinedJobs()
.filter { it.instanceName == ScheduleConfig.HOST_NAME }
.map {
Task(
jobName = it.name,
callback = {
val executionTime = LocalDateTime.now()
val ttl = it.ttl

taskRunner.consumeTask(it.name)
jobManager.updateJobExecutionTime(
it.copy(
lastExecutionTime = executionTime,
instanceExpiredTime = executionTime.plusNanos(ttl)
)
)
}
)
}

while (keepGoing) {
taskRunner.putTasks(executableTasks)
}

running = false
}
}
23 changes: 23 additions & 0 deletions src/main/kotlin/org/javabom/bomscheduler/task/TaskRunner.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.javabom.bomscheduler.task

import org.javabom.bomscheduler.job.Job
import org.javabom.bomscheduler.job.JobMetadata
import org.springframework.stereotype.Component

/**
* TaskRunner 현재 실행 가능한 Task 를 가지고 있는 클래스
* Task가 소비될 시 해당 Task를 제거한다
*/
class TaskRunner {
private val executableTasks: MutableMap<String, Task> = mutableMapOf()

fun putTasks(tasks: List<Task>) {
tasks.forEach {
executableTasks.putIfAbsent(it.jobName, it)
}
}

fun findTaskByJobName(jobName: String) = executableTasks[jobName]

fun consumeTask(jobName: String) = executableTasks.remove(jobName)
}