Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ jobs:
unit-tests:
uses: vapor/ci/.github/workflows/run-unit-tests.yml@main
secrets: inherit
with:
with_wasm: true
6 changes: 5 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ let package = Package(
.library(name: "AsyncKit", targets: ["AsyncKit"]),
],
dependencies: [
.package(url: "https://github.com/apple/swift-nio.git", from: "2.61.0"),
// TODO: SM: Update swift-nio version once NIOAsyncRuntime is available from swift-nio
// .package(url: "https://github.com/apple/swift-nio.git", from: "2.89.0"),
.package(url: "https://github.com/PassiveLogic/swift-nio.git", branch: "feat/addNIOAsyncRuntimeForWasm"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.5.3"),
.package(url: "https://github.com/apple/swift-collections.git", from: "1.0.5"),
.package(url: "https://github.com/apple/swift-algorithms.git", from: "1.1.0"),
Expand All @@ -35,6 +37,8 @@ let package = Package(
name: "AsyncKitTests",
dependencies: [
.target(name: "AsyncKit"),
.product(name: "NIOEmbedded", package: "swift-nio"),
.product(name: "NIOPosix", package: "swift-nio"),
],
swiftSettings: swiftSettings
),
Expand Down
43 changes: 43 additions & 0 deletions Sources/AsyncKit/ConnectionPool/EventLoopGroupConnectionPool.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#if canImport(Dispatch)
import Dispatch
#endif
import NIOConcurrencyHelpers
import NIOCore
import struct Logging.Logger
Expand Down Expand Up @@ -280,6 +282,7 @@ public final class EventLoopGroupConnectionPool<Source> where Source: Connection
}
}

#if canImport(Dispatch)
/// Closes the connection pool.
///
/// All available connections will be closed immediately. Any connections still in use will be
Expand Down Expand Up @@ -312,6 +315,7 @@ public final class EventLoopGroupConnectionPool<Source> where Source: Connection
}
}
}
#endif // canImport(Dispatch)

/// Closes the connection pool.
///
Expand All @@ -336,10 +340,17 @@ public final class EventLoopGroupConnectionPool<Source> where Source: Connection
guard self.lock.withLock({
// Do not initiate shutdown multiple times.
guard !self.didShutdown else {
#if canImport(Dispatch)
DispatchQueue.global().async {
self.logger.warning("Connection pool can not be shut down more than once.")
callback(ConnectionPoolError.shutdown)
}
#else
Task {
self.logger.warning("Connection pool can not be shut down more than once.")
callback(ConnectionPoolError.shutdown)
}
#endif
return false
}

Expand All @@ -356,6 +367,7 @@ public final class EventLoopGroupConnectionPool<Source> where Source: Connection
// all pools are closed, invoke the callback and provide it the first encountered error, if
// any. By design, this loosely matches the general flow used by `MultiThreadedEventLoopGroup`'s
// `shutdownGracefully(queue:_:)` implementation.
#if canImport(Dispatch)
let shutdownQueue = DispatchQueue(label: "codes.vapor.async-kit.poolShutdownGracefullyQueue")
let shutdownGroup = DispatchGroup()
var outcome: Result<Void, any Error> = .success(())
Expand All @@ -380,6 +392,37 @@ public final class EventLoopGroupConnectionPool<Source> where Source: Connection
callback(error)
}
}
#else
Task {
await withTaskGroup(of: Result<Void, any Error>.self) { group in
for pool in self.storage.values {
group.addTask {
let singleResult: Result<Void, any Error> = await withCheckedContinuation { continuation in
pool.close().whenComplete { result in
continuation.resume(returning: result)
}
}

return singleResult
}
}

var outcome: Result<Void, any Error> = .success(())
for await singleResult in group {
outcome = outcome.flatMap { singleResult }
}

switch outcome {
case .success:
self.logger.debug("Connection group pool finished shutdown.")
callback(nil)
case .failure(let error):
self.logger.error("Connection group pool got shutdown error (and then shut down anyway): \(error)")
callback(error)
}
}
}
#endif
}

deinit {
Expand Down
4 changes: 4 additions & 0 deletions Sources/AsyncKit/Exports.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#if canImport(NIOEmbedded) && !os(WASI)
@_documentation(visibility: internal) @_exported import class NIOEmbedded.EmbeddedEventLoop
#endif
@_documentation(visibility: internal) @_exported import protocol NIOCore.EventLoop
@_documentation(visibility: internal) @_exported import protocol NIOCore.EventLoopGroup
@_documentation(visibility: internal) @_exported import class NIOCore.EventLoopFuture
@_documentation(visibility: internal) @_exported import struct NIOCore.EventLoopPromise
#if canImport(NIOPosix) && !os(WASI)
@_documentation(visibility: internal) @_exported import class NIOPosix.MultiThreadedEventLoopGroup
#endif
1 change: 1 addition & 0 deletions Tests/AsyncKitTests/ConnectionPoolTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import Logging
import NIOConcurrencyHelpers
import NIOCore
import NIOEmbedded
import class NIOPosix.MultiThreadedEventLoopGroup
import XCTest

final class ConnectionPoolTests: AsyncKitTestCase {
Expand Down