Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.kakao.actionbase.v2.engine.v3

import com.kakao.actionbase.core.edge.mapper.EdgeRecordMapper
import com.kakao.actionbase.core.edge.mutation.EdgeMutationBuilder
import com.kakao.actionbase.core.edge.mutation.EdgeMutationRecords
import com.kakao.actionbase.core.edge.payload.EdgeMutationStatus
import com.kakao.actionbase.core.edge.payload.MultiEdgeMutationStatus
import com.kakao.actionbase.core.edge.record.EdgeGroupRecord
Expand Down Expand Up @@ -53,18 +54,7 @@ class V3CompatibleTableBinding(
.flatMap {
findHashEdge(encodedHashEdgeKey)
}.map {
// extract the state from the encoded value
val stateValue = mapper.state.decoder.decodeValue(it)
State(
stateValue.active,
stateValue.version,
stateValue.createdAt,
stateValue.deletedAt,
stateValue.properties
.mapNotNull { (key, value) ->
codeToFieldNameMap[key]?.let { name -> name to value }
}.toMap(),
)
decodeCurrentState(it, mapper, codeToFieldNameMap)
}.switchIfEmpty(Mono.defer { Mono.just(State.initial) })
.map { before ->
val after =
Expand All @@ -76,42 +66,7 @@ class V3CompatibleTableBinding(
val beforeRecord = EdgeStateRecord.of(source, target, before, entity.id)
val afterRecord = EdgeStateRecord.of(source, target, after, entity.id)
val mutationRecords = EdgeMutationBuilder.build(beforeRecord, afterRecord, schema.direction, schema.indexes, schema.groups)
val mutations = mutableListOf<Mutation>()
val record = mapper.state.encoder.encode(mutationRecords.stateRecord)
mutations +=
Put(record.key)
.addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, record.value)
mutations +=
mutationRecords.createIndexRecords.map {
val record = mapper.index.encoder.encode(it)
Put(record.key)
.addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, record.value)
}
mutations +=
mutationRecords.countRecords.map {
val key = mapper.count.encoder.encodeKey(it.key)
Increment(key)
.addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, it.value)
}
mutations +=
mutationRecords.deleteIndexRecordKeys.map {
val key = mapper.index.encoder.encodeKey(it)
Delete(key)
}
mutations +=
mutationRecords.groupRecords.groupBy { it.key to it.ttl }.map { (groupKey, records) ->
val (key, ttl) = groupKey
val encodedKey = mapper.group.encoder.encodeKey(key)
val increment = Increment(encodedKey)
records.mergeQualifiers().forEach { (mergedQualifier, mergedValue) ->
val qualifier = mapper.group.encoder.encodeQualifier(mergedQualifier)
increment.addColumn(Constants.DEFAULT_COLUMN_FAMILY, qualifier, mergedValue)
}
if (ttl != null && ttl != Long.MAX_VALUE && ttl > 0) {
increment.ttl = ttl
}
increment
}
val mutations = buildHBaseMutations(mutationRecords, mapper)
handleDeferredRequests(mutations)
.thenReturn(
EdgeMutationStatus(source, target, events.size, mutationRecords.status, before, after, mutationRecords.acc),
Expand Down Expand Up @@ -147,18 +102,7 @@ class V3CompatibleTableBinding(
.flatMap {
findHashEdge(encodedHashEdgeKey)
}.map {
// extract the state from the encoded value
val stateValue = mapper.state.decoder.decodeValue(it)
State(
stateValue.active,
stateValue.version,
stateValue.createdAt,
stateValue.deletedAt,
stateValue.properties
.mapNotNull { (key, value) ->
codeToFieldNameMap[key]?.let { name -> name to value }
}.toMap(),
)
decodeCurrentState(it, mapper, codeToFieldNameMap)
}.switchIfEmpty(Mono.defer { Mono.just(State.initial) })
.map { before ->
val after =
Expand All @@ -170,42 +114,7 @@ class V3CompatibleTableBinding(
val beforeRecord = EdgeStateRecord.of(key, key, before, entity.id)
val afterRecord = EdgeStateRecord.of(key, key, after, entity.id)
val mutationRecords = EdgeMutationBuilder.buildForMultiEdge(beforeRecord, afterRecord, schema.direction, schema.indexes, schema.groups)
val mutations = mutableListOf<Mutation>()
val record = mapper.state.encoder.encode(mutationRecords.stateRecord)
mutations +=
Put(record.key)
.addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, record.value)
mutations +=
mutationRecords.createIndexRecords.map {
val record = mapper.index.encoder.encode(it)
Put(record.key)
.addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, record.value)
}
mutations +=
mutationRecords.countRecords.map {
val key = mapper.count.encoder.encodeKey(it.key)
Increment(key)
.addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, it.value)
}
mutations +=
mutationRecords.deleteIndexRecordKeys.map {
val key = mapper.index.encoder.encodeKey(it)
Delete(key)
}
mutations +=
mutationRecords.groupRecords.groupBy { it.key to it.ttl }.map { (groupKey, records) ->
val (key, ttl) = groupKey
val encodedKey = mapper.group.encoder.encodeKey(key)
val increment = Increment(encodedKey)
records.mergeQualifiers().forEach { (mergedQualifier, mergedValue) ->
val qualifier = mapper.group.encoder.encodeQualifier(mergedQualifier)
increment.addColumn(Constants.DEFAULT_COLUMN_FAMILY, qualifier, mergedValue)
}
if (ttl != null && ttl != Long.MAX_VALUE && ttl > 0) {
increment.ttl = ttl
}
increment
}
val mutations = buildHBaseMutations(mutationRecords, mapper)
handleDeferredRequests(mutations)
.thenReturn(
MultiEdgeMutationStatus(key, events.size, mutationRecords.status, before, after, mutationRecords.acc),
Expand All @@ -219,6 +128,67 @@ class V3CompatibleTableBinding(
}
}

private fun decodeCurrentState(
encodedState: ByteArray,
mapper: EdgeRecordMapper,
codeToFieldNameMap: Map<Int, String>,
): State {
val stateValue = mapper.state.decoder.decodeValue(encodedState)
return State(
stateValue.active,
stateValue.version,
stateValue.createdAt,
stateValue.deletedAt,
stateValue.properties
.mapNotNull { (key, value) ->
codeToFieldNameMap[key]?.let { name -> name to value }
}.toMap(),
)
}

private fun buildHBaseMutations(
mutationRecords: EdgeMutationRecords,
mapper: EdgeRecordMapper,
): MutableList<Mutation> {
val mutations = mutableListOf<Mutation>()
val record = mapper.state.encoder.encode(mutationRecords.stateRecord)
mutations +=
Put(record.key)
.addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, record.value)
mutations +=
mutationRecords.createIndexRecords.map {
val indexRecord = mapper.index.encoder.encode(it)
Put(indexRecord.key)
.addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, indexRecord.value)
}
mutations +=
mutationRecords.countRecords.map {
val key = mapper.count.encoder.encodeKey(it.key)
Increment(key)
.addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, it.value)
}
mutations +=
mutationRecords.deleteIndexRecordKeys.map {
val key = mapper.index.encoder.encodeKey(it)
Delete(key)
}
mutations +=
mutationRecords.groupRecords.groupBy { it.key to it.ttl }.map { (groupKey, records) ->
val (key, ttl) = groupKey
val encodedKey = mapper.group.encoder.encodeKey(key)
val increment = Increment(encodedKey)
records.mergeQualifiers().forEach { (mergedQualifier, mergedValue) ->
val qualifier = mapper.group.encoder.encodeQualifier(mergedQualifier)
increment.addColumn(Constants.DEFAULT_COLUMN_FAMILY, qualifier, mergedValue)
}
if (ttl != null && ttl != Long.MAX_VALUE && ttl > 0) {
increment.ttl = ttl
}
increment
}
return mutations
}

companion object {
private fun State.specialStateValueToNull(): State =
State(
Expand Down
Loading
Loading