Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,4 @@ engine/target/
# Native library build artifacts
native/src/c/*.dylib
native/src/c/*.so
native/slatedb/
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class BaseConventionsPlugin : Plugin<Project> {
project.version = project.rootProject.version

// Configure repositories
project.repositories.mavenLocal()
project.repositories.mavenCentral()

// Configure dependencies
Expand Down
3 changes: 3 additions & 0 deletions engine/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ dependencies {
implementation(Dependencies.Logging.SLF4J_API)
implementation(Dependencies.Logging.LOGBACK_CLASSIC)

// SlateDB
implementation("io.slatedb:slatedb:0.1.0-SNAPSHOT")

// HBase
implementation(Dependencies.HBase.CLIENT)
implementation(Dependencies.HBase.MAPREDUCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ import com.kakao.actionbase.v2.engine.sql.toRowFlux
import com.kakao.actionbase.v2.engine.storage.hbase.HBaseConnections
import com.kakao.actionbase.v2.engine.storage.hbase.HBaseOptions
import com.kakao.actionbase.v2.engine.storage.jdbc.MetadataTable
import com.kakao.actionbase.v2.engine.storage.slatedb.SlateDbConnections
import com.kakao.actionbase.v2.engine.storage.slatedb.SlateDbOptions
import com.kakao.actionbase.v2.engine.util.getLogger
import com.kakao.actionbase.v2.engine.wal.Wal
import com.kakao.actionbase.v2.engine.wal.WalFactory
Expand Down Expand Up @@ -616,6 +618,10 @@ class Graph(
val options = storage.materialize().options as HBaseOptions
options.checkConnection()
}
StorageType.SLATEDB -> {
val options = storage.materialize().options as SlateDbOptions
options.checkConnection()
}
else -> Mono.just(false)
}

Expand Down Expand Up @@ -892,6 +898,7 @@ class Graph(
intervalDisposable?.dispose()
log.info("Disposed Flux.interval for reloading metastore - {}", intervalDisposable)
HBaseConnections.closeConnections().block()
SlateDbConnections.closeConnections().block()
DefaultHBaseCluster.INSTANCE.close()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ import com.kakao.actionbase.v2.engine.label.hbase.HBaseIndexedLabel
import com.kakao.actionbase.v2.engine.label.metastore.JdbcHashLabel
import com.kakao.actionbase.v2.engine.label.metastore.LocalBackedJdbcHashLabel
import com.kakao.actionbase.v2.engine.label.nil.NilLabel
import com.kakao.actionbase.v2.engine.label.slatedb.SlateDbHashLabel
import com.kakao.actionbase.v2.engine.label.slatedb.SlateDbIndexedLabel
import com.kakao.actionbase.v2.engine.service.ddl.LabelCreateRequest
import com.kakao.actionbase.v2.engine.sql.RowWithSchema
import com.kakao.actionbase.v2.engine.storage.DatastoreStorage
import com.kakao.actionbase.v2.engine.storage.hbase.HBaseStorage
import com.kakao.actionbase.v2.engine.storage.jdbc.JdbcStorage
import com.kakao.actionbase.v2.engine.storage.local.LocalStorage
import com.kakao.actionbase.v2.engine.storage.slatedb.SlateDbStorage
import com.kakao.actionbase.v2.engine.util.getLogger

import org.slf4j.Logger
Expand Down Expand Up @@ -79,6 +82,7 @@ data class LabelEntity(
is JdbcStorage -> JdbcHashLabel.create(this, graph, storage, block)
is HBaseStorage -> HBaseHashLabel.create(this, graph, storage)
is DatastoreStorage -> DatastoreHashLabel.create(this, graph, block)
is SlateDbStorage -> SlateDbHashLabel.create(this, graph, storage)
else -> {
logger.error(
"{} supports only Local, Jdbc, HBase storage types. {} is not supported. Fallback to NilLabel",
Expand All @@ -99,9 +103,10 @@ data class LabelEntity(
when (storage) {
is HBaseStorage -> HBaseIndexedLabel.create(this, graph, storage)
is DatastoreStorage -> DatastoreIndexedLabel.create(this, graph, block)
is SlateDbStorage -> SlateDbIndexedLabel.create(this, graph, storage)
else -> {
logger.error(
"{} supports only Jdbc, HBase storage types. {} is not supported. Fallback to NilLabel",
"{} supports only Jdbc, HBase, SlateDb storage types. {} is not supported. Fallback to NilLabel",
type,
storage,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.kakao.actionbase.v2.engine.storage.hbase.HBaseStorage
import com.kakao.actionbase.v2.engine.storage.jdbc.JdbcStorage
import com.kakao.actionbase.v2.engine.storage.local.LocalStorage
import com.kakao.actionbase.v2.engine.storage.nil.NilStorage
import com.kakao.actionbase.v2.engine.storage.slatedb.SlateDbStorage
import com.kakao.actionbase.v2.engine.util.getLogger

import org.slf4j.Logger
Expand Down Expand Up @@ -41,6 +42,9 @@ data class StorageEntity(
StorageType.HBASE -> {
HBaseStorage(this)
}
StorageType.SLATEDB -> {
SlateDbStorage(this)
}
StorageType.DATASTORE -> {
DatastoreStorage
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
package com.kakao.actionbase.v2.engine.label.slatedb

import com.kakao.actionbase.v2.core.code.EdgeEncoder
import com.kakao.actionbase.v2.core.code.EncodedKey
import com.kakao.actionbase.v2.core.code.IdEdgeEncoder
import com.kakao.actionbase.v2.core.code.KeyFieldValue
import com.kakao.actionbase.v2.core.code.KeyValue
import com.kakao.actionbase.v2.core.edge.Edge
import com.kakao.actionbase.v2.core.edge.SchemaEdge
import com.kakao.actionbase.v2.core.metadata.Direction
import com.kakao.actionbase.v2.engine.GraphDefaults
import com.kakao.actionbase.v2.engine.edge.decodeByteArray
import com.kakao.actionbase.v2.engine.edge.toRow
import com.kakao.actionbase.v2.engine.entity.LabelEntity
import com.kakao.actionbase.v2.engine.label.AbstractLabel
import com.kakao.actionbase.v2.engine.label.LabelFactory
import com.kakao.actionbase.v2.engine.sql.DataFrame
import com.kakao.actionbase.v2.engine.sql.Row
import com.kakao.actionbase.v2.engine.sql.StatKey
import com.kakao.actionbase.v2.engine.storage.slatedb.SlateDbStorage
import com.kakao.actionbase.v2.engine.storage.slatedb.SlateDbTable

import java.nio.ByteBuffer
import java.util.Arrays

import reactor.core.publisher.Mono

open class SlateDbHashLabel(
entity: LabelEntity,
coder: EdgeEncoder<ByteArray>,
private val table: Mono<SlateDbTable>,
) : AbstractLabel<ByteArray>(entity, coder) {
override fun findHashEdge(keyField: EncodedKey<ByteArray>): Mono<ByteArray> {
require(keyField.field == null) { "field must be null" }
return table.flatMap { it.get(keyField.key) }
}

override fun create(
keyField: EncodedKey<ByteArray>,
value: ByteArray,
): Mono<List<Any>> {
require(keyField.field == null) { "field must be null" }
return table
.flatMap { it.put(keyField.key, value) }
.thenReturn(emptyList())
}

override fun update(
keyField: EncodedKey<ByteArray>,
value: ByteArray,
): Mono<List<Any>> = create(keyField, value)

override fun delete(keyField: EncodedKey<ByteArray>): Mono<List<Any>> {
require(keyField.field == null) { "field must be null" }
return table
.flatMap { it.delete(keyField.key) }
.thenReturn(emptyList())
}

override fun setnx(
keyField: EncodedKey<ByteArray>,
value: ByteArray,
): Mono<Boolean> {
require(keyField.field == null) { "field must be null" }
return table.flatMap { tbl ->
tbl
.get(keyField.key)
.hasElement()
.flatMap { exists ->
if (exists) {
Mono.just(false)
} else {
tbl.put(keyField.key, value).thenReturn(true)
}
}
}
}

override fun cad(
keyField: EncodedKey<ByteArray>,
value: ByteArray,
): Mono<Long> {
require(keyField.field == null) { "field must be null" }
return table.flatMap { tbl ->
tbl
.get(keyField.key)
.flatMap { existingValue ->
if (Arrays.equals(existingValue, value)) {
tbl.delete(keyField.key).thenReturn(1L)
} else {
Mono.just(0L)
}
}.defaultIfEmpty(0L)
}
}

override fun findLockValue(keyField: EncodedKey<ByteArray>): Mono<ByteArray> {
require(keyField.field == null) { "field must be null" }
return table.flatMap { it.get(keyField.key) }
}

override fun incrby(
key: ByteArray,
acc: Long,
): Mono<List<Any>> =
table.flatMap { tbl ->
tbl
.get(key)
.map { bytes -> ByteBuffer.wrap(bytes).getLong() }
.defaultIfEmpty(0L)
.flatMap { current ->
val newValue = ByteBuffer.allocate(8).putLong(current + acc).array()
tbl.put(key, newValue).thenReturn(emptyList<Any>())
}
}

override fun scanStorage(
prefix: EncodedKey<ByteArray>,
limit: Int,
start: EncodedKey<ByteArray>?,
end: EncodedKey<ByteArray>?,
): Mono<List<KeyFieldValue<ByteArray>>> =
table
.flatMap { it.scanPrefix(prefix.key, limit + 1) }
.map { results ->
results
// Filter by start key (exclusive)
.dropWhile { (key, _) ->
start?.key?.let { startKey -> Arrays.compareUnsigned(startKey, key) >= 0 } ?: false
}
// Filter by end key (exclusive)
.dropLastWhile { (key, _) ->
end?.key?.let { endKey -> Arrays.compareUnsigned(endKey, key) < 0 } ?: false
}.take(limit)
.map { (key, value) -> KeyFieldValue(key, value) }
}

override fun encodedEdgeToSchemaEdge(keyFieldValue: KeyFieldValue<ByteArray>): SchemaEdge = entity.schema.decodeByteArray(keyFieldValue)

override fun deleteOnLock(keyField: KeyValue<ByteArray>): Mono<Boolean> = cad(EncodedKey(keyField.key), keyField.value).map { it > 0 }

override fun getSelf(
src: List<Any>,
stats: Set<StatKey>,
idEdgeEncoder: IdEdgeEncoder,
): Mono<DataFrame> {
val withAll = stats.contains(StatKey.WITH_ALL)
val withEdgeId = withAll || stats.contains(StatKey.EDGE_ID)

val keysMono =
Mono.just(
src.map {
val edge = Edge(0L, it, it).ensureType(entity.schema)
coder.encodeHashEdgeKey(edge, entity.id)
},
)

return keysMono
.flatMap { keys ->
table.flatMap { tbl ->
Mono.zip(
keys.map { key -> tbl.get(key.key).map { key to it } },
) { results ->
results
.filterIsInstance<Pair<EncodedKey<ByteArray>, ByteArray>>()
.mapNotNull { (key, value) ->
try {
encodedEdgeToSchemaEdge(KeyFieldValue(key.key, value))
} catch (e: Exception) {
null
}
}.filter { withAll || it.isActive }
.map {
if (withEdgeId) {
it.toRow(withAll, idEdgeEncoder)
} else {
it.toRow(withAll, null)
}
}
}
}
}.map { rows ->
DataFrame(
rows,
if (withAll) {
entity.schema.allStructType
} else if (withEdgeId) {
entity.schema.edgeIdStructType
} else {
entity.schema.structType
},
)
}.defaultIfEmpty(DataFrame.empty(entity.schema.allStructType))
}

override fun get(
src: Any,
tgt: List<Any>,
dir: Direction,
stats: Set<StatKey>,
idEdgeEncoder: IdEdgeEncoder,
): Mono<DataFrame> {
val withAll = stats.contains(StatKey.WITH_ALL)
val withEdgeId = withAll || stats.contains(StatKey.EDGE_ID)

val keys =
tgt.map {
val edge = Edge(0L, src, it).ensureType(entity.schema)
coder.encodeHashEdgeKey(edge, entity.id)
}

return table
.flatMap { tbl ->
if (keys.isEmpty()) {
Mono.just(emptyList())
} else {
Mono.zip(
keys.map { key -> tbl.get(key.key).map { key to it }.defaultIfEmpty(key to ByteArray(0)) },
) { results ->
results
.filterIsInstance<Pair<EncodedKey<ByteArray>, ByteArray>>()
.filter { it.second.isNotEmpty() }
.mapNotNull { (key, value) ->
try {
encodedEdgeToSchemaEdge(KeyFieldValue(key.key, value))
} catch (e: Exception) {
null
}
}.filter { withAll || it.isActive }
.map {
if (withEdgeId) {
it.toRow(withAll, idEdgeEncoder, isMultiEdge)
} else {
it.toRow(withAll, null, isMultiEdge)
}
}
}
}
}.map { rows ->
DataFrame(
rows,
if (withAll) {
entity.schema.allStructType
} else if (withEdgeId) {
entity.schema.edgeIdStructType
} else {
entity.schema.structType
},
)
}.defaultIfEmpty(DataFrame.empty(entity.schema.allStructType))
}

override fun getCountRows(
srcAndKeys: List<Pair<Any, ByteArray>>,
dir: Direction,
): Mono<List<Row>> =
table.flatMap { tbl ->
if (srcAndKeys.isEmpty()) {
Mono.just(emptyList())
} else {
Mono.zip(
srcAndKeys.map { (src, key) ->
tbl
.get(key)
.map { bytes -> ByteBuffer.wrap(bytes).getLong() }
.defaultIfEmpty(0L)
.map { count -> Row(arrayOf(src, count, dir)) }
},
) { results -> results.filterIsInstance<Row>() }
}
}

companion object : LabelFactory<SlateDbHashLabel, SlateDbStorage> {
override fun create(
entity: LabelEntity,
graph: GraphDefaults,
storage: SlateDbStorage,
block: SlateDbHashLabel.() -> Unit,
): SlateDbHashLabel {
val table = storage.options.getTable()
return SlateDbHashLabel(
entity = entity,
coder = graph.edgeEncoderFactory.bytesKeyValueEncoder,
table = table,
).apply(block)
}
}
}
Loading
Loading