diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 35fd90a..50cfcd1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,3 +14,5 @@ jobs: unit-tests: uses: vapor/ci/.github/workflows/run-unit-tests.yml@main secrets: inherit + with: + with_wasm: true diff --git a/Package.swift b/Package.swift index c6ef132..228a887 100644 --- a/Package.swift +++ b/Package.swift @@ -13,7 +13,9 @@ let package = Package( .library(name: "AsyncKit", targets: ["AsyncKit"]), ], dependencies: [ - .package(url: "https://github.com/apple/swift-nio.git", from: "2.61.0"), + // TODO: SM: Update swift-nio version once NIOAsyncRuntime is available from swift-nio + // .package(url: "https://github.com/apple/swift-nio.git", from: "2.89.0"), + .package(url: "https://github.com/PassiveLogic/swift-nio.git", branch: "feat/addNIOAsyncRuntimeForWasm"), .package(url: "https://github.com/apple/swift-log.git", from: "1.5.3"), .package(url: "https://github.com/apple/swift-collections.git", from: "1.0.5"), .package(url: "https://github.com/apple/swift-algorithms.git", from: "1.1.0"), @@ -35,6 +37,8 @@ let package = Package( name: "AsyncKitTests", dependencies: [ .target(name: "AsyncKit"), + .product(name: "NIOEmbedded", package: "swift-nio"), + .product(name: "NIOPosix", package: "swift-nio"), ], swiftSettings: swiftSettings ), diff --git a/Sources/AsyncKit/ConnectionPool/EventLoopGroupConnectionPool.swift b/Sources/AsyncKit/ConnectionPool/EventLoopGroupConnectionPool.swift index 3c79b8f..9d649f5 100644 --- a/Sources/AsyncKit/ConnectionPool/EventLoopGroupConnectionPool.swift +++ b/Sources/AsyncKit/ConnectionPool/EventLoopGroupConnectionPool.swift @@ -1,4 +1,6 @@ +#if canImport(Dispatch) import Dispatch +#endif import NIOConcurrencyHelpers import NIOCore import struct Logging.Logger @@ -280,6 +282,7 @@ public final class EventLoopGroupConnectionPool where Source: Connection } } + #if canImport(Dispatch) /// Closes the connection pool. /// /// All available connections will be closed immediately. Any connections still in use will be @@ -312,6 +315,7 @@ public final class EventLoopGroupConnectionPool where Source: Connection } } } + #endif // canImport(Dispatch) /// Closes the connection pool. /// @@ -336,10 +340,17 @@ public final class EventLoopGroupConnectionPool where Source: Connection guard self.lock.withLock({ // Do not initiate shutdown multiple times. guard !self.didShutdown else { + #if canImport(Dispatch) DispatchQueue.global().async { self.logger.warning("Connection pool can not be shut down more than once.") callback(ConnectionPoolError.shutdown) } + #else + Task { + self.logger.warning("Connection pool can not be shut down more than once.") + callback(ConnectionPoolError.shutdown) + } + #endif return false } @@ -356,6 +367,7 @@ public final class EventLoopGroupConnectionPool where Source: Connection // all pools are closed, invoke the callback and provide it the first encountered error, if // any. By design, this loosely matches the general flow used by `MultiThreadedEventLoopGroup`'s // `shutdownGracefully(queue:_:)` implementation. + #if canImport(Dispatch) let shutdownQueue = DispatchQueue(label: "codes.vapor.async-kit.poolShutdownGracefullyQueue") let shutdownGroup = DispatchGroup() var outcome: Result = .success(()) @@ -380,6 +392,37 @@ public final class EventLoopGroupConnectionPool where Source: Connection callback(error) } } + #else + Task { + await withTaskGroup(of: Result.self) { group in + for pool in self.storage.values { + group.addTask { + let singleResult: Result = await withCheckedContinuation { continuation in + pool.close().whenComplete { result in + continuation.resume(returning: result) + } + } + + return singleResult + } + } + + var outcome: Result = .success(()) + for await singleResult in group { + outcome = outcome.flatMap { singleResult } + } + + switch outcome { + case .success: + self.logger.debug("Connection group pool finished shutdown.") + callback(nil) + case .failure(let error): + self.logger.error("Connection group pool got shutdown error (and then shut down anyway): \(error)") + callback(error) + } + } + } + #endif } deinit { diff --git a/Sources/AsyncKit/Exports.swift b/Sources/AsyncKit/Exports.swift index ebee4eb..13ed550 100644 --- a/Sources/AsyncKit/Exports.swift +++ b/Sources/AsyncKit/Exports.swift @@ -1,6 +1,10 @@ +#if canImport(NIOEmbedded) && !os(WASI) @_documentation(visibility: internal) @_exported import class NIOEmbedded.EmbeddedEventLoop +#endif @_documentation(visibility: internal) @_exported import protocol NIOCore.EventLoop @_documentation(visibility: internal) @_exported import protocol NIOCore.EventLoopGroup @_documentation(visibility: internal) @_exported import class NIOCore.EventLoopFuture @_documentation(visibility: internal) @_exported import struct NIOCore.EventLoopPromise +#if canImport(NIOPosix) && !os(WASI) @_documentation(visibility: internal) @_exported import class NIOPosix.MultiThreadedEventLoopGroup +#endif diff --git a/Tests/AsyncKitTests/ConnectionPoolTests.swift b/Tests/AsyncKitTests/ConnectionPoolTests.swift index 8f04016..339677f 100644 --- a/Tests/AsyncKitTests/ConnectionPoolTests.swift +++ b/Tests/AsyncKitTests/ConnectionPoolTests.swift @@ -4,6 +4,7 @@ import Logging import NIOConcurrencyHelpers import NIOCore import NIOEmbedded +import class NIOPosix.MultiThreadedEventLoopGroup import XCTest final class ConnectionPoolTests: AsyncKitTestCase {