Skip to content

Commit 941b2db

Browse files
committed
solution: Ktor client for Kotlin
1 parent 24f444e commit 941b2db

File tree

12 files changed

+990
-0
lines changed

12 files changed

+990
-0
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ Structure of dependencies between modules:
2828
* `etherjar-hex`
2929
* `etherjar-rpc-emerald`
3030
* `etherjar-rpc-api`
31+
* `etherjar-rpc-ktor`
32+
* `etherjar-rpc-api`
33+
* `etherjar-rpc-json`
3134
* `etherjar-rpc-http`
3235
* `etherjar-rpc-api`
3336
* `etherjar-domain`
@@ -62,6 +65,7 @@ where
6265
* `etherjar-rpc-json` - JSON mapping to/from Java objects
6366
* `etherjar-rpc-api` - [JSON-RPC API](https://github.com/ethereum/wiki/wiki/JSON-RPC) generic
6467
implementation
68+
* `etherjar-rpc-ktor` - Kotlin with coroutines transport implementation for JSON-RPC API data-layer
6569
* `etherjar-rpc-emerald` - gRPC transport,
6670
see [Emerald Dshackle](https://github.com/emeraldpay/dshackle)
6771
* `etherjar-rpc-http` - HTTP transport implementation for JSON-RPC API data-layer

etherjar-rpc-ktor/build.gradle

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright (c) 2020 EmeraldPay Inc, All Rights Reserved.
3+
* Copyright (c) 2016-2017 Infinitape Inc, All Rights Reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
plugins {
19+
id 'org.jetbrains.kotlin.jvm' version '2.1.0'
20+
}
21+
22+
compileKotlin {
23+
kotlinOptions {
24+
jvmTarget = '17'
25+
}
26+
}
27+
28+
compileTestKotlin {
29+
kotlinOptions {
30+
jvmTarget = '17'
31+
}
32+
}
33+
34+
dependencies {
35+
api project(':etherjar-rpc-api')
36+
api project(':etherjar-rpc-json')
37+
38+
implementation 'org.jetbrains.kotlin:kotlin-stdlib:2.1.0'
39+
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1'
40+
implementation 'io.ktor:ktor-client-core:3.0.3'
41+
implementation 'io.ktor:ktor-client-cio:3.0.3'
42+
implementation 'io.ktor:ktor-client-content-negotiation:3.0.3'
43+
implementation 'io.ktor:ktor-serialization-jackson:3.0.3'
44+
45+
// Jackson dependencies (matching etherjar-rpc-json versions)
46+
implementation 'com.fasterxml.jackson.core:jackson-core:2.9.8'
47+
implementation 'com.fasterxml.jackson.core:jackson-databind:2.9.8'
48+
49+
testImplementation 'io.ktor:ktor-client-mock:3.0.3'
50+
testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.1'
51+
testImplementation 'io.kotest:kotest-runner-junit5:5.9.1'
52+
testImplementation 'io.kotest:kotest-assertions-core:5.9.1'
53+
testImplementation 'io.mockk:mockk:1.13.12'
54+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright (c) 2025 EmeraldPay Ltd, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.emeraldpay.etherjar.rpc.ktor
18+
19+
import io.emeraldpay.etherjar.rpc.RpcCall
20+
import io.emeraldpay.etherjar.rpc.RpcCallResponse
21+
import kotlinx.coroutines.async
22+
import kotlinx.coroutines.awaitAll
23+
import kotlinx.coroutines.coroutineScope
24+
import java.util.concurrent.atomic.AtomicInteger
25+
26+
class CoroutineBatch(
27+
private val transport: CoroutineRpcTransport
28+
) {
29+
private val items = mutableListOf<CoroutineBatchItem<*, *>>()
30+
private val idGenerator = AtomicInteger(1)
31+
32+
fun <JS, RES> add(call: RpcCall<JS, RES>): CoroutineBatchItem<JS, RES> {
33+
val item = CoroutineBatchItem(call, idGenerator.getAndIncrement())
34+
items.add(item)
35+
return item
36+
}
37+
38+
suspend fun execute(): List<RpcCallResponse<*, *>> {
39+
val responses = transport.execute(items)
40+
processResponses(responses)
41+
return responses
42+
}
43+
44+
suspend fun executeAndGetResults(): List<Any?> = coroutineScope {
45+
execute()
46+
items.map { item ->
47+
async { item.getResult() }
48+
}.awaitAll()
49+
}
50+
51+
private fun processResponses(responses: List<RpcCallResponse<*, *>>) {
52+
val responseMap = responses.associateBy { response ->
53+
// We'll need to match by call content since RpcCallResponse doesn't have id
54+
// For now, match by index
55+
responses.indexOf(response)
56+
}
57+
58+
items.forEachIndexed { index, item ->
59+
val response = responseMap[index]
60+
if (response != null) {
61+
if (response.isError) {
62+
item.onError(response.error)
63+
} else {
64+
@Suppress("UNCHECKED_CAST")
65+
(item as CoroutineBatchItem<Any, Any>).onResult(response.value)
66+
}
67+
}
68+
}
69+
}
70+
71+
fun cancel() {
72+
items.forEach { it.cancel() }
73+
}
74+
75+
val size: Int
76+
get() = items.size
77+
78+
val isEmpty: Boolean
79+
get() = items.isEmpty()
80+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright (c) 2025 EmeraldPay Ltd, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.emeraldpay.etherjar.rpc.ktor
18+
19+
import io.emeraldpay.etherjar.rpc.RpcCall
20+
import io.emeraldpay.etherjar.rpc.RpcException
21+
import kotlinx.coroutines.CompletableDeferred
22+
23+
class CoroutineBatchItem<JS, RES>(
24+
val call: RpcCall<JS, RES>,
25+
val id: Int
26+
) {
27+
private val deferred = CompletableDeferred<RES>()
28+
29+
fun onResult(value: RES) {
30+
deferred.complete(value)
31+
}
32+
33+
fun onError(exception: RpcException) {
34+
deferred.completeExceptionally(exception)
35+
}
36+
37+
suspend fun getResult(): RES = deferred.await()
38+
39+
fun cancel() {
40+
deferred.cancel()
41+
}
42+
43+
val isCompleted: Boolean
44+
get() = deferred.isCompleted
45+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2025 EmeraldPay Ltd, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.emeraldpay.etherjar.rpc.ktor
18+
19+
import io.emeraldpay.etherjar.rpc.RpcCall
20+
import io.emeraldpay.etherjar.rpc.RpcCallResponse
21+
import kotlinx.coroutines.flow.Flow
22+
import java.io.Closeable
23+
24+
interface CoroutineRpcClient : Closeable {
25+
suspend fun <JS, RES> execute(call: RpcCall<JS, RES>): RES
26+
27+
suspend fun execute(batch: CoroutineBatch): List<RpcCallResponse<*, *>>
28+
29+
fun <JS, RES> executeFlow(call: RpcCall<JS, RES>): Flow<RES>
30+
31+
fun executeFlow(batch: CoroutineBatch): Flow<RpcCallResponse<*, *>>
32+
33+
fun createBatch(): CoroutineBatch
34+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright (c) 2025 EmeraldPay Ltd, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.emeraldpay.etherjar.rpc.ktor
18+
19+
import io.emeraldpay.etherjar.rpc.RpcCall
20+
import kotlinx.coroutines.flow.Flow
21+
import kotlinx.coroutines.flow.flow
22+
23+
suspend inline fun <JS, RES> CoroutineRpcClient.execute(
24+
callBuilder: () -> RpcCall<JS, RES>
25+
): RES = execute(callBuilder())
26+
27+
suspend inline fun CoroutineRpcClient.batch(
28+
builder: CoroutineBatch.() -> Unit
29+
): List<Any?> {
30+
val batch = createBatch()
31+
batch.builder()
32+
return batch.executeAndGetResults()
33+
}
34+
35+
fun <JS, RES> CoroutineRpcClient.executeAsFlow(call: RpcCall<JS, RES>): Flow<RES> = flow {
36+
emit(execute(call))
37+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright (c) 2025 EmeraldPay Ltd, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.emeraldpay.etherjar.rpc.ktor
18+
19+
import io.emeraldpay.etherjar.rpc.RpcCallResponse
20+
import java.io.Closeable
21+
22+
interface CoroutineRpcTransport : Closeable {
23+
suspend fun execute(items: List<CoroutineBatchItem<*, *>>): List<RpcCallResponse<*, *>>
24+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright (c) 2025 EmeraldPay Ltd, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.emeraldpay.etherjar.rpc.ktor
18+
19+
import io.emeraldpay.etherjar.rpc.RpcCall
20+
import io.emeraldpay.etherjar.rpc.RpcCallResponse
21+
import kotlinx.coroutines.flow.Flow
22+
import kotlinx.coroutines.flow.flow
23+
24+
class DefaultCoroutineRpcClient(
25+
private val transport: CoroutineRpcTransport
26+
) : CoroutineRpcClient {
27+
28+
override suspend fun <JS, RES> execute(call: RpcCall<JS, RES>): RES {
29+
val batch = createBatch()
30+
val item = batch.add(call)
31+
32+
val responses = transport.execute(listOf(item))
33+
val response = responses.first()
34+
35+
return if (response.isError) {
36+
throw response.error
37+
} else {
38+
@Suppress("UNCHECKED_CAST")
39+
response.value as RES
40+
}
41+
}
42+
43+
override suspend fun execute(batch: CoroutineBatch): List<RpcCallResponse<*, *>> {
44+
return batch.execute()
45+
}
46+
47+
override fun <JS, RES> executeFlow(call: RpcCall<JS, RES>): Flow<RES> = flow {
48+
emit(execute(call))
49+
}
50+
51+
override fun executeFlow(batch: CoroutineBatch): Flow<RpcCallResponse<*, *>> = flow {
52+
val responses = execute(batch)
53+
responses.forEach { response ->
54+
emit(response)
55+
}
56+
}
57+
58+
override fun createBatch(): CoroutineBatch {
59+
return CoroutineBatch(transport)
60+
}
61+
62+
override fun close() {
63+
transport.close()
64+
}
65+
}

0 commit comments

Comments
 (0)