From ec58d98425dff7114ed6916fc32b44f132925274 Mon Sep 17 00:00:00 2001 From: Minseok Kim Date: Mon, 9 Feb 2026 17:56:44 +0900 Subject: [PATCH 1/2] chore: initialize issue 195 development plan From 9908c4c3babdce0bc92ec73e6e54ca65aefacc41 Mon Sep 17 00:00:00 2001 From: Minseok Kim Date: Mon, 9 Feb 2026 18:07:20 +0900 Subject: [PATCH 2/2] refactor: deduplicate v3 mutation paths and lock handling --- .../v2/engine/v3/V3CompatibleTableBinding.kt | 162 +++---- .../v2/engine/v3/V3MutationService.kt | 423 +++++++++++------- 2 files changed, 317 insertions(+), 268 deletions(-) diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V3CompatibleTableBinding.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V3CompatibleTableBinding.kt index 1f323d33..0d33cc0e 100644 --- a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V3CompatibleTableBinding.kt +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V3CompatibleTableBinding.kt @@ -2,6 +2,7 @@ package com.kakao.actionbase.v2.engine.v3 import com.kakao.actionbase.core.edge.mapper.EdgeRecordMapper import com.kakao.actionbase.core.edge.mutation.EdgeMutationBuilder +import com.kakao.actionbase.core.edge.mutation.EdgeMutationRecords import com.kakao.actionbase.core.edge.payload.EdgeMutationStatus import com.kakao.actionbase.core.edge.payload.MultiEdgeMutationStatus import com.kakao.actionbase.core.edge.record.EdgeGroupRecord @@ -53,18 +54,7 @@ class V3CompatibleTableBinding( .flatMap { findHashEdge(encodedHashEdgeKey) }.map { - // extract the state from the encoded value - val stateValue = mapper.state.decoder.decodeValue(it) - State( - stateValue.active, - stateValue.version, - stateValue.createdAt, - stateValue.deletedAt, - stateValue.properties - .mapNotNull { (key, value) -> - codeToFieldNameMap[key]?.let { name -> name to value } - }.toMap(), - ) + decodeCurrentState(it, mapper, codeToFieldNameMap) }.switchIfEmpty(Mono.defer { Mono.just(State.initial) }) .map { before -> val after = @@ -76,42 +66,7 @@ class V3CompatibleTableBinding( val beforeRecord = EdgeStateRecord.of(source, target, before, entity.id) val afterRecord = EdgeStateRecord.of(source, target, after, entity.id) val mutationRecords = EdgeMutationBuilder.build(beforeRecord, afterRecord, schema.direction, schema.indexes, schema.groups) - val mutations = mutableListOf() - val record = mapper.state.encoder.encode(mutationRecords.stateRecord) - mutations += - Put(record.key) - .addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, record.value) - mutations += - mutationRecords.createIndexRecords.map { - val record = mapper.index.encoder.encode(it) - Put(record.key) - .addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, record.value) - } - mutations += - mutationRecords.countRecords.map { - val key = mapper.count.encoder.encodeKey(it.key) - Increment(key) - .addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, it.value) - } - mutations += - mutationRecords.deleteIndexRecordKeys.map { - val key = mapper.index.encoder.encodeKey(it) - Delete(key) - } - mutations += - mutationRecords.groupRecords.groupBy { it.key to it.ttl }.map { (groupKey, records) -> - val (key, ttl) = groupKey - val encodedKey = mapper.group.encoder.encodeKey(key) - val increment = Increment(encodedKey) - records.mergeQualifiers().forEach { (mergedQualifier, mergedValue) -> - val qualifier = mapper.group.encoder.encodeQualifier(mergedQualifier) - increment.addColumn(Constants.DEFAULT_COLUMN_FAMILY, qualifier, mergedValue) - } - if (ttl != null && ttl != Long.MAX_VALUE && ttl > 0) { - increment.ttl = ttl - } - increment - } + val mutations = buildHBaseMutations(mutationRecords, mapper) handleDeferredRequests(mutations) .thenReturn( EdgeMutationStatus(source, target, events.size, mutationRecords.status, before, after, mutationRecords.acc), @@ -147,18 +102,7 @@ class V3CompatibleTableBinding( .flatMap { findHashEdge(encodedHashEdgeKey) }.map { - // extract the state from the encoded value - val stateValue = mapper.state.decoder.decodeValue(it) - State( - stateValue.active, - stateValue.version, - stateValue.createdAt, - stateValue.deletedAt, - stateValue.properties - .mapNotNull { (key, value) -> - codeToFieldNameMap[key]?.let { name -> name to value } - }.toMap(), - ) + decodeCurrentState(it, mapper, codeToFieldNameMap) }.switchIfEmpty(Mono.defer { Mono.just(State.initial) }) .map { before -> val after = @@ -170,42 +114,7 @@ class V3CompatibleTableBinding( val beforeRecord = EdgeStateRecord.of(key, key, before, entity.id) val afterRecord = EdgeStateRecord.of(key, key, after, entity.id) val mutationRecords = EdgeMutationBuilder.buildForMultiEdge(beforeRecord, afterRecord, schema.direction, schema.indexes, schema.groups) - val mutations = mutableListOf() - val record = mapper.state.encoder.encode(mutationRecords.stateRecord) - mutations += - Put(record.key) - .addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, record.value) - mutations += - mutationRecords.createIndexRecords.map { - val record = mapper.index.encoder.encode(it) - Put(record.key) - .addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, record.value) - } - mutations += - mutationRecords.countRecords.map { - val key = mapper.count.encoder.encodeKey(it.key) - Increment(key) - .addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, it.value) - } - mutations += - mutationRecords.deleteIndexRecordKeys.map { - val key = mapper.index.encoder.encodeKey(it) - Delete(key) - } - mutations += - mutationRecords.groupRecords.groupBy { it.key to it.ttl }.map { (groupKey, records) -> - val (key, ttl) = groupKey - val encodedKey = mapper.group.encoder.encodeKey(key) - val increment = Increment(encodedKey) - records.mergeQualifiers().forEach { (mergedQualifier, mergedValue) -> - val qualifier = mapper.group.encoder.encodeQualifier(mergedQualifier) - increment.addColumn(Constants.DEFAULT_COLUMN_FAMILY, qualifier, mergedValue) - } - if (ttl != null && ttl != Long.MAX_VALUE && ttl > 0) { - increment.ttl = ttl - } - increment - } + val mutations = buildHBaseMutations(mutationRecords, mapper) handleDeferredRequests(mutations) .thenReturn( MultiEdgeMutationStatus(key, events.size, mutationRecords.status, before, after, mutationRecords.acc), @@ -219,6 +128,67 @@ class V3CompatibleTableBinding( } } + private fun decodeCurrentState( + encodedState: ByteArray, + mapper: EdgeRecordMapper, + codeToFieldNameMap: Map, + ): State { + val stateValue = mapper.state.decoder.decodeValue(encodedState) + return State( + stateValue.active, + stateValue.version, + stateValue.createdAt, + stateValue.deletedAt, + stateValue.properties + .mapNotNull { (key, value) -> + codeToFieldNameMap[key]?.let { name -> name to value } + }.toMap(), + ) + } + + private fun buildHBaseMutations( + mutationRecords: EdgeMutationRecords, + mapper: EdgeRecordMapper, + ): MutableList { + val mutations = mutableListOf() + val record = mapper.state.encoder.encode(mutationRecords.stateRecord) + mutations += + Put(record.key) + .addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, record.value) + mutations += + mutationRecords.createIndexRecords.map { + val indexRecord = mapper.index.encoder.encode(it) + Put(indexRecord.key) + .addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, indexRecord.value) + } + mutations += + mutationRecords.countRecords.map { + val key = mapper.count.encoder.encodeKey(it.key) + Increment(key) + .addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER, it.value) + } + mutations += + mutationRecords.deleteIndexRecordKeys.map { + val key = mapper.index.encoder.encodeKey(it) + Delete(key) + } + mutations += + mutationRecords.groupRecords.groupBy { it.key to it.ttl }.map { (groupKey, records) -> + val (key, ttl) = groupKey + val encodedKey = mapper.group.encoder.encodeKey(key) + val increment = Increment(encodedKey) + records.mergeQualifiers().forEach { (mergedQualifier, mergedValue) -> + val qualifier = mapper.group.encoder.encodeQualifier(mergedQualifier) + increment.addColumn(Constants.DEFAULT_COLUMN_FAMILY, qualifier, mergedValue) + } + if (ttl != null && ttl != Long.MAX_VALUE && ttl > 0) { + increment.ttl = ttl + } + increment + } + return mutations + } + companion object { private fun State.specialStateValueToNull(): State = State( diff --git a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V3MutationService.kt b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V3MutationService.kt index 91613239..51f1ce6b 100644 --- a/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V3MutationService.kt +++ b/engine/src/main/kotlin/com/kakao/actionbase/v2/engine/v3/V3MutationService.kt @@ -65,111 +65,90 @@ class V3MutationService( lock: Boolean = true, sync: MutationMode? = null, requestContext: RequestContext = RequestContext.DEFAULT, - ): Mono { - val aliasEntityName = EntityName(database, alias) - val label = graph.getLabel(aliasEntityName) - if (label !is HBaseIndexedLabel) { - return Mono.error(UnsupportedOperationException("This Label (${label.entity.fullName}, ${label.javaClass}) is not indexed or not supported for edge mutation")) - } - - val mutationMode = MutationModeContext.of(label.entity.mode, sync) - - val tableBinding = label.v3TableBinding - val audit = Audit(requestContext.actor) - val requestId = requestContext.requestId - - return Flux - .fromIterable(request.mutations) - .map { it.createEvent(tableBinding.schema as ModelSchema.Edge) } // ensureType - .flatMap { edgeEvent -> - val edge = edgeEvent.toTraceEdge() - val operation = edgeEvent.event.type.toV2() - graph.wal - .write(aliasEntityName, label.name, edge, operation, audit, requestId, mutationMode) - .thenReturn(edgeEvent) - }.groupBy { it.source to it.target } - .flatMap { groupedFlux -> - val key = groupedFlux.key() - if (mutationMode.queue) { - groupedFlux - .collectList() - .flatMap { group -> - Mono.just( - EdgeMutationStatus( - source = key.first, - target = key.second, - count = group.size, - status = EdgeOperationStatus.QUEUED.name, - before = State.initial, - after = State.initial, - acc = 0, - ), - ) - } - } else { - groupedFlux - .collectList() - .flatMap { group -> - val sortedGroup = group.sortedBy { it.event.version } - tableBinding - .mutateEdge(key, sortedGroup.map { it.event }, lock, encoder, tableBinding.schema.codeToName) - .doOnNext { status -> - val last = sortedGroup.last() - val edge = last.toTraceEdge() - val cdcMessage = - CdcContext( - label = label.entity.name, - edge = edge, - op = last.event.type.toV2(), - status = EdgeOperationStatus.valueOf(status.status), - before = status.before.toHashEdge(key.first, key.second), - after = status.after.toHashEdge(key.first, key.second), - acc = status.acc, - alias = if (aliasEntityName == label.entity.name) null else aliasEntityName, - audit = Audit(requestContext.actor), - requestId = requestContext.requestId, - ) - graph.cdc - .write(cdcMessage) - .subscribeOn(Schedulers.boundedElastic()) - .subscribe() - }.onErrorResume { - if (it is LockAcquisitionFailedException) { - label - .findStaleLockAndClear(it.lockEdge, graph.lockTimeout) - .subscribeOn(Schedulers.boundedElastic()) - .subscribe() - } + ): Mono = + resolveMutationContext(database, alias, sync, requestContext, includeLabelClassInError = true) + .flatMap { mutationContext -> + Flux + .fromIterable(request.mutations) + .map { it.createEvent(mutationContext.tableBinding.schema as ModelSchema.Edge) } // ensureType + .flatMap { edgeEvent -> + val edge = edgeEvent.toTraceEdge() + val operation = edgeEvent.event.type.toV2() + graph.wal + .write( + mutationContext.aliasEntityName, + mutationContext.label.name, + edge, + operation, + mutationContext.audit, + mutationContext.requestId, + mutationContext.mutationMode, + ).thenReturn(edgeEvent) + }.groupBy { it.source to it.target } + .flatMap { groupedFlux -> + val key = groupedFlux.key() + if (mutationContext.mutationMode.queue) { + groupedFlux + .collectList() + .flatMap { group -> Mono.just( EdgeMutationStatus( source = key.first, target = key.second, - count = 0, - status = EdgeOperationStatus.ERROR.name, + count = group.size, + status = EdgeOperationStatus.QUEUED.name, before = State.initial, after = State.initial, acc = 0, ), ) } - }.subscribeOn(Schedulers.boundedElastic()) - } - }.collectList() - .map { - EdgeMutationResponse( - it - .map { item -> - EdgeMutationResponse.Item( - source = item.source, - target = item.target, - count = item.count, - status = item.status, - ) - }.sortedBy { item -> "${item.source}:${item.target}" }, - ) - }.timeout(Duration.ofMillis(graph.mutationRequestTimeout)) - .runEvenIfCancelled() - } + } else { + groupedFlux + .collectList() + .flatMap { group -> + val sortedGroup = group.sortedBy { it.event.version } + mutationContext.tableBinding + .mutateEdge( + key, + sortedGroup.map { it.event }, + lock, + encoder, + mutationContext.tableBinding.schema.codeToName, + ).doOnNext { status -> + val last = sortedGroup.last() + writeCdc(mutationContext, last, status, key) + }.onErrorResume { error -> + handleMutationError(error, mutationContext.label, swallowNonLockErrors = true) { + EdgeMutationStatus( + source = key.first, + target = key.second, + count = 0, + status = EdgeOperationStatus.ERROR.name, + before = State.initial, + after = State.initial, + acc = 0, + ) + } + } + }.subscribeOn(Schedulers.boundedElastic()) + } + }.collectList() + .map { + EdgeMutationResponse( + it + .map { item -> + EdgeMutationResponse.Item( + source = item.source, + target = item.target, + count = item.count, + status = item.status, + ) + }.sortedBy { item -> "${item.source}:${item.target}" }, + ) + }.timeout(Duration.ofMillis(graph.mutationRequestTimeout)) + .runEvenIfCancelled() + } fun mutateMultiEdge( database: String, @@ -178,89 +157,180 @@ class V3MutationService( lock: Boolean = true, sync: MutationMode? = null, requestContext: RequestContext = RequestContext.DEFAULT, - ): Mono { + ): Mono = + resolveMutationContext(database, alias, sync, requestContext, includeLabelClassInError = false) + .flatMap { mutationContext -> + Flux + .fromIterable(request.mutations) + .map { it.createEvent(mutationContext.tableBinding.schema as ModelSchema.MultiEdge) } // ensureType + .flatMap { multiEdgeEvent -> + val edge = multiEdgeEvent.toTraceEdge() + val operation = multiEdgeEvent.event.type.toV2() + graph.wal + .write( + mutationContext.aliasEntityName, + mutationContext.label.name, + edge, + operation, + mutationContext.audit, + mutationContext.requestId, + mutationContext.mutationMode, + ).thenReturn(multiEdgeEvent) + }.groupBy { it.id } + .flatMap { groupedFlux -> + val key = groupedFlux.key() + if (mutationContext.mutationMode.queue) { + groupedFlux + .collectList() + .flatMap { group -> + Mono.just( + MultiEdgeMutationStatus( + id = key, + count = group.size, + status = EdgeOperationStatus.QUEUED.name, + before = State.initial, + after = State.initial, + acc = 0, + ), + ) + } + } else { + groupedFlux + .collectList() + .flatMap { group -> + val sortedGroup = group.sortedBy { it.event.version } + mutationContext.tableBinding + .mutateMultiEdge( + key, + sortedGroup.map { it.event }, + lock, + encoder, + mutationContext.tableBinding.schema.codeToName, + ).doOnNext { status -> + val last = sortedGroup.last() + writeCdc(mutationContext, last, status) + }.onErrorResume { error -> + handleMutationError(error, mutationContext.label, swallowNonLockErrors = false) { + MultiEdgeMutationStatus( + id = key, + count = 0, + status = EdgeOperationStatus.ERROR.name, + before = State.initial, + after = State.initial, + acc = 0, + ) + } + } + }.subscribeOn(Schedulers.boundedElastic()) + } + }.collectList() + .map { + MultiEdgeMutationResponse( + it + .map { item -> + MultiEdgeMutationResponse.Item( + id = item.id, + count = item.count, + status = item.status, + ) + }.sortedBy { it.toString() }, + ) + }.timeout(Duration.ofMillis(graph.mutationRequestTimeout)) + .runEvenIfCancelled() + } + + private fun resolveMutationContext( + database: String, + alias: String, + sync: MutationMode?, + requestContext: RequestContext, + includeLabelClassInError: Boolean, + ): Mono { val aliasEntityName = EntityName(database, alias) val label = graph.getLabel(aliasEntityName) if (label !is HBaseIndexedLabel) { - return Mono.error(UnsupportedOperationException("This Label (${label.entity.fullName}) is not indexed or not supported for edge mutation")) + val detail = if (includeLabelClassInError) ", ${label.javaClass}" else "" + return Mono.error(UnsupportedOperationException("This Label (${label.entity.fullName}$detail) is not indexed or not supported for edge mutation")) } - val mutationMode = MutationModeContext.of(label.entity.mode, sync) + return Mono.just( + MutationContext( + aliasEntityName = aliasEntityName, + label = label, + mutationMode = MutationModeContext.of(label.entity.mode, sync), + tableBinding = label.v3TableBinding, + audit = Audit(requestContext.actor), + requestId = requestContext.requestId, + ), + ) + } - val tableBinding = label.v3TableBinding - val audit = Audit(requestContext.actor) - val requestId = requestContext.requestId + private fun writeCdc( + mutationContext: MutationContext, + last: EdgeEvent, + status: EdgeMutationStatus, + key: Pair, + ) { + val cdcMessage = + CdcContext( + label = mutationContext.label.entity.name, + edge = last.toTraceEdge(), + op = last.event.type.toV2(), + status = EdgeOperationStatus.valueOf(status.status), + before = status.before.toHashEdge(key.first, key.second), + after = status.after.toHashEdge(key.first, key.second), + acc = status.acc, + alias = if (mutationContext.aliasEntityName == mutationContext.label.entity.name) null else mutationContext.aliasEntityName, + audit = mutationContext.audit, + requestId = mutationContext.requestId, + ) + graph.cdc + .write(cdcMessage) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe() + } - return Flux - .fromIterable(request.mutations) - .map { it.createEvent(tableBinding.schema as ModelSchema.MultiEdge) } // ensureType - .flatMap { multiEdgeEvent -> - val edge = multiEdgeEvent.toTraceEdge() - val operation = multiEdgeEvent.event.type.toV2() - graph.wal - .write(aliasEntityName, label.name, edge, operation, audit, requestId, mutationMode) - .thenReturn(multiEdgeEvent) - }.groupBy { it.id } - .flatMap { groupedFlux -> - val key = groupedFlux.key() - if (mutationMode.queue) { - groupedFlux - .collectList() - .flatMap { group -> - Mono.just( - MultiEdgeMutationStatus( - id = key, - count = group.size, - status = EdgeOperationStatus.QUEUED.name, - before = State.initial, - after = State.initial, - acc = 0, - ), - ) - } - } else { - groupedFlux - .collectList() - .flatMap { group -> - val sortedGroup = group.sortedBy { it.event.version } - tableBinding - .mutateMultiEdge(key, sortedGroup.map { it.event }, lock, encoder, tableBinding.schema.codeToName) - .doOnNext { status -> - val last = sortedGroup.last() - val edge = last.toTraceEdge() - val cdcMessage = - CdcContext( - label = label.entity.name, - edge = edge, - op = last.event.type.toV2(), - status = EdgeOperationStatus.valueOf(status.status), - before = status.before.toHashEdge(source = status.before.getMultiEdgeSource(), target = status.before.getMultiEdgeTarget()), - after = status.after.toHashEdge(source = status.after.getMultiEdgeSource(), target = status.after.getMultiEdgeTarget()), - acc = status.acc, - alias = if (aliasEntityName == label.entity.name) null else aliasEntityName, - audit = Audit(requestContext.actor), - requestId = requestContext.requestId, - ) - graph.cdc - .write(cdcMessage) - .subscribeOn(Schedulers.boundedElastic()) - .subscribe() - } - }.subscribeOn(Schedulers.boundedElastic()) - } - }.collectList() - .map { - MultiEdgeMutationResponse( - it - .map { item -> - MultiEdgeMutationResponse.Item( - id = item.id, - count = item.count, - status = item.status, - ) - }.sortedBy { it.toString() }, - ) - }.timeout(Duration.ofMillis(graph.mutationRequestTimeout)) - .runEvenIfCancelled() + private fun writeCdc( + mutationContext: MutationContext, + last: MultiEdgeEvent, + status: MultiEdgeMutationStatus, + ) { + val cdcMessage = + CdcContext( + label = mutationContext.label.entity.name, + edge = last.toTraceEdge(), + op = last.event.type.toV2(), + status = EdgeOperationStatus.valueOf(status.status), + before = status.before.toHashEdge(source = status.before.getMultiEdgeSource(), target = status.before.getMultiEdgeTarget()), + after = status.after.toHashEdge(source = status.after.getMultiEdgeSource(), target = status.after.getMultiEdgeTarget()), + acc = status.acc, + alias = if (mutationContext.aliasEntityName == mutationContext.label.entity.name) null else mutationContext.aliasEntityName, + audit = mutationContext.audit, + requestId = mutationContext.requestId, + ) + graph.cdc + .write(cdcMessage) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe() + } + + private fun handleMutationError( + error: Throwable, + label: HBaseIndexedLabel, + swallowNonLockErrors: Boolean, + fallback: () -> T, + ): Mono { + if (error is LockAcquisitionFailedException) { + label + .findStaleLockAndClear(error.lockEdge, graph.lockTimeout) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe() + return Mono.just(fallback()) + } + return if (swallowNonLockErrors) { + Mono.just(fallback()) + } else { + Mono.error(error) + } } private fun State.toHashEdge( @@ -283,6 +353,15 @@ class V3MutationService( companion object { private const val DEFAULT_PRIMITIVE_VALUE = "0" + private data class MutationContext( + val aliasEntityName: EntityName, + val label: HBaseIndexedLabel, + val mutationMode: MutationModeContext, + val tableBinding: V3CompatibleTableBinding, + val audit: Audit, + val requestId: String, + ) + private fun EventType.toV2(): EdgeOperation = when (this) { EventType.INSERT -> EdgeOperation.INSERT