diff --git a/build.gradle b/build.gradle index 8b2c85a..9fb5f3a 100644 --- a/build.gradle +++ b/build.gradle @@ -15,7 +15,7 @@ def javaProjects = [ configure(javaProjects) { - version '3.0.0' + version '3.0.1' repositories { mavenCentral() if (project.version.endsWith('-SNAPSHOT')) { diff --git a/lcloud-udp-discovery-example/build.gradle b/lcloud-udp-discovery-example/build.gradle index a2063d0..3a0d95e 100644 --- a/lcloud-udp-discovery-example/build.gradle +++ b/lcloud-udp-discovery-example/build.gradle @@ -4,7 +4,7 @@ plugins { } group = 'org.mtbo.lcloud' -version = '3.0.0-SNAPSHOT' +version = '3.0.1-SNAPSHOT' apply plugin: 'application' mainClassName = 'ExampleService' diff --git a/lcloud-udp-discovery-example/src/main/java/ExampleService.java b/lcloud-udp-discovery-example/src/main/java/ExampleService.java index 8cc5916..9fbbca0 100644 --- a/lcloud-udp-discovery-example/src/main/java/ExampleService.java +++ b/lcloud-udp-discovery-example/src/main/java/ExampleService.java @@ -9,9 +9,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; -import org.mtbo.lcloud.discovery.DiscoveryService; -import org.mtbo.lcloud.discovery.udp.UdpConnection; -import org.mtbo.lcloud.discovery.udp.UdpDiscoveryClient; +import org.mtbo.lcloud.discovery.udp.*; /** * Demo application @@ -28,39 +26,43 @@ public class ExampleService { * @throws SocketException in case of fatal network error */ public static void main(String[] args) throws InterruptedException, SocketException { - DiscoveryService discoveryService = - new DiscoveryService( - Optional.ofNullable(System.getenv("HOSTNAME")).orElse(UUID.randomUUID().toString()), + + final int port = 8888; + var serviceConfig = + new UdpServiceConfig( "xService", - new UdpConnection(8888)); + Optional.ofNullable(System.getenv("HOSTNAME")).orElse(UUID.randomUUID().toString()), + port); + var discoveryService = new UdpDiscoveryService(serviceConfig); - discoveryService.setDaemon(true); - discoveryService.start(); + var clientConfig = new UdpClientConfig("xService", port); + var discoveryClient = new UdpDiscoveryClient(clientConfig); - var config = new UdpDiscoveryClient.UdpConfig("xService", 8888); - var discoveryClient = new UdpDiscoveryClient(config); + var serviceListener = discoveryService.listen(new UdpConnection(port)).subscribe(); - discoveryClient - .startLookup(Duration.ofSeconds(2)) - .doOnNext( - instances -> { - String joined = instances.stream().sorted().collect(Collectors.joining("\n")); + var clientListener = + discoveryClient + .startLookup(Duration.ofSeconds(2)) + .doOnNext( + instances -> { + String joined = instances.stream().sorted().collect(Collectors.joining("\n")); - System.out.println("***********************************************"); - System.out.println(config.serviceName + " instances are discovered:\n\n" + joined); - System.out.println("***********************************************\n"); - }) - .doOnError( - throwable -> { - logger.severe("Lookup Error: " + throwable.getMessage()); - logger.throwing(ExampleService.class.getName(), "main", throwable); - }) - .onErrorComplete(throwable -> !(throwable instanceof InterruptedException)) - .subscribe(); + System.out.println("***********************************************"); + System.out.println( + clientConfig.serviceName + " instances are discovered:\n\n" + joined); + System.out.println("***********************************************\n"); + }) + .doOnError( + throwable -> { + logger.severe("Lookup Error: " + throwable.getMessage()); + logger.throwing(ExampleService.class.getName(), "main", throwable); + }) + .onErrorComplete(throwable -> !(throwable instanceof InterruptedException)) + .subscribe(); Thread.sleep(10000); - - discoveryService.shutdown(); + clientListener.dispose(); + serviceListener.dispose(); } static Logger logger = Logger.getLogger(ExampleService.class.getName()); diff --git a/lcloud-udp-discovery/build.gradle b/lcloud-udp-discovery/build.gradle index e9ed915..3e8f8a6 100644 --- a/lcloud-udp-discovery/build.gradle +++ b/lcloud-udp-discovery/build.gradle @@ -4,7 +4,7 @@ plugins { } group = 'org.mtbo.lcloud' -version = '3.0.0-SNAPSHOT' +version = '3.0.1-SNAPSHOT' repositories { mavenCentral() @@ -27,24 +27,24 @@ java { publishing { repositories { - maven { - url = layout.buildDirectory.dir('staging-deploy') - } - // maven { -// name = "GitHubPackages" -// url = uri("https://maven.pkg.github.com/mtbo-org/lcloud") -// credentials { -// username = findProperty("gpr.user") ?: System.getenv("USERNAME") -// password = findProperty("gpr.token") ?: System.getenv("TOKEN") -// } -// -//// def releasesRepoUrl = layout.buildDirectory.dir('repos/releases') -//// def snapshotsRepoUrl = layout.buildDirectory.dir('repos/snapshots') -//// url = version.endsWith('SNAPSHOT') ? snapshotsRepoUrl : releasesRepoUrl -// -// uri('https://maven.pkg.github.com/mtbo-org/lcloud') +// url = layout.buildDirectory.dir('staging-deploy') // } + + maven { + name = "GitHubPackages" + url = uri("https://maven.pkg.github.com/mtbo-org/lcloud") + credentials { + username = findProperty("gpr.user") ?: System.getenv("USERNAME") + password = findProperty("gpr.token") ?: System.getenv("TOKEN") + } + +// def releasesRepoUrl = layout.buildDirectory.dir('repos/releases') +// def snapshotsRepoUrl = layout.buildDirectory.dir('repos/snapshots') +// url = version.endsWith('SNAPSHOT') ? snapshotsRepoUrl : releasesRepoUrl + + uri('https://maven.pkg.github.com/mtbo-org/lcloud') + } } publications { diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/ClientConfig.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/ClientConfig.java new file mode 100644 index 0000000..0258946 --- /dev/null +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/ClientConfig.java @@ -0,0 +1,59 @@ +/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ + +package org.mtbo.lcloud.discovery; + +import java.util.Objects; + +/** Discovery config */ +public abstract class ClientConfig { + /** unique service identifier */ + public final String serviceName; + + /** optional clients buffer size, {@link Integer#MAX_VALUE} for unlimited */ + public final int endpointsCount; + + /** + * @param serviceName unique service identifier + * @param endpointsCount optional clients buffer size, {@link Integer#MAX_VALUE} for unlimited + */ + public ClientConfig(String serviceName, int endpointsCount) { + this.serviceName = serviceName; + this.endpointsCount = endpointsCount; + } + + /** + * Constructor with defaults + * + * @param serviceName unique service identifier + */ + public ClientConfig(String serviceName) { + this(serviceName, Integer.MAX_VALUE); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) return true; + if (obj == null || obj.getClass() != this.getClass()) return false; + var that = (ClientConfig) obj; + return Objects.equals(this.serviceName, that.serviceName) + && this.endpointsCount == that.endpointsCount; + } + + @Override + public int hashCode() { + return Objects.hash(serviceName, endpointsCount); + } + + @Override + public String toString() { + return "Config[" + + "serviceName=" + + serviceName + + ", " + + "port=" + + ", " + + "clientsCount=" + + endpointsCount + + ']'; + } +} diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Connection.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Connection.java index 2a587c7..afab7f3 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Connection.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Connection.java @@ -2,5 +2,23 @@ package org.mtbo.lcloud.discovery; +import reactor.core.publisher.Mono; + /** Allows to send and receive packets */ -public interface Connection extends Listener, Sender {} +public interface Connection + extends Listener, Sender { + + /** + * Create unbounded socket + * + * @return created socket + */ + Mono socket(); + + /** + * Close socket. Shadow exception. + * + * @param socket to close + */ + void close(SocketType socket); +} diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryClient.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryClient.java index 03d35b1..60c18f4 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryClient.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryClient.java @@ -3,91 +3,36 @@ package org.mtbo.lcloud.discovery; import java.time.Duration; -import java.util.HashSet; -import java.util.Objects; +import java.util.Set; import java.util.logging.Logger; +import java.util.stream.Collectors; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; -/** Allows to request network services named by some {@link Config#serviceName serice name} */ -public abstract class DiscoveryClient { +/** Allows to request network services named by some {@link ClientConfig#serviceName serice name} */ +public abstract class DiscoveryClient { /** Configuration parameters */ - protected final Config config; + protected final ClientConfig config; /** * Construct client with config * - * @param config configuration, including {@link Config#serviceName service name} + * @param config configuration, including {@link ClientConfig#serviceName service name} */ - public DiscoveryClient(Config config) { + public DiscoveryClient(ClientConfig config) { this.config = config; } - /** {@link DiscoveryClient discovery client's} config */ - public abstract static class Config { - /** unique service identifier */ - public final String serviceName; - - /** optional clients buffer size, {@link Integer#MAX_VALUE} for unlimited */ - public final int clientsCount; - - /** - * @param serviceName unique service identifier - * @param clientsCount optional clients buffer size, {@link Integer#MAX_VALUE} for unlimited - */ - public Config(String serviceName, int clientsCount) { - this.serviceName = serviceName; - this.clientsCount = clientsCount; - } - - /** - * Constructor with defaults - * - * @param serviceName unique service identifier - */ - public Config(String serviceName) { - this(serviceName, Integer.MAX_VALUE); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) return true; - if (obj == null || obj.getClass() != this.getClass()) return false; - var that = (Config) obj; - return Objects.equals(this.serviceName, that.serviceName) - && this.clientsCount == that.clientsCount; - } - - @Override - public int hashCode() { - return Objects.hash(serviceName, clientsCount); - } - - @Override - public String toString() { - return "Config[" - + "serviceName=" - + serviceName - + ", " - + "port=" - + ", " - + "clientsCount=" - + clientsCount - + ']'; - } - } - /** - * Periodically look for network services named by {@link Config#serviceName} receiveInterval is - * calculated with minus 100 millis + * Periodically look for network services named by {@link ClientConfig#serviceName} + * receiveInterval is calculated with minus 100 millis * * @param interval delay between requests * @return {@link Publisher} of service instances id's set */ - public final Flux> startLookup(Duration interval) { + public final Flux> startLookup(Duration interval) { Duration receiveTimeout = interval.minus(Duration.ofMillis(100)); if (receiveTimeout.isNegative()) { @@ -98,14 +43,14 @@ public final Flux> startLookup(Duration interval) { } /** - * Periodically look for network services named by {@link Config#serviceName} receiveInterval is - * calculated with minus 100 millis + * Periodically look for network services named by {@link ClientConfig#serviceName} + * receiveInterval is calculated with minus 100 millis * * @param interval delay between requests * @param receiveTimeout time interval to check responses from instances * @return {@link Publisher} of service instances id's set */ - public Flux> startLookup(Duration interval, Duration receiveTimeout) { + public Flux> startLookup(Duration interval, Duration receiveTimeout) { Flux mainAddresses = getMainBroadcastAddresses(); return Flux.interval(interval) @@ -120,7 +65,7 @@ public Flux> startLookup(Duration interval, Duration receiveTime * @return Single or zero instance {@link Publisher} of service instances id's set */ @SuppressWarnings("unused") - public Mono> lookupOnce(Duration receiveTimeout) { + public Mono> lookupOnce(Duration receiveTimeout) { Flux mainAddresses = getMainBroadcastAddresses(); return Mono.from(lookupOnceInternal(receiveTimeout, mainAddresses)); @@ -133,12 +78,26 @@ public Mono> lookupOnce(Duration receiveTimeout) { * @param mainAddresses cached local broadcast addresses * @return Single or zero instance {@link Publisher} of service instances id's set */ - private Flux> lookupOnceInternal( + private Flux> lookupOnceInternal( Duration receiveTimeout, Flux mainAddresses) { return Flux.from(send(mainAddresses)) - .flatMap(this::receive) - .bufferTimeout(config.clientsCount, receiveTimeout) - .map(HashSet::new); + .flatMap( + socket -> + Flux.using( + () -> socket, + this::receive, + (s) -> { + try { + s.close(); + logger.finer("Closed socket"); + } catch (Exception ignored) { + logger.warning("Failed to close socket"); + } + }, + true) + .timeout(receiveTimeout, Flux.empty())) + .bufferTimeout(config.endpointsCount, receiveTimeout) + .map(strings -> strings.stream().filter(s -> !s.isEmpty()).collect(Collectors.toSet())); } /** @@ -160,8 +119,7 @@ private Flux send(Flux mainAddresses) { Flux addresses = mainAddresses.concatWith(getAdditionalAddresses()); Flux> v = - addresses.zipWith( - createSocket().repeat(), AddressSocket::new); + addresses.zipWith(createSocket().repeat(), AddressSocket::new); var sendData = ("UDP_DISCOVERY_REQUEST " + config.serviceName).getBytes(); @@ -223,12 +181,8 @@ private Mono receiveResponse(SocketType socket) { protected abstract PacketType createReceivePacket(); private Flux receive(SocketType socket) { - return getResponses(socket); - } - - private Flux getResponses(SocketType socket) { return Flux.from(receiveResponse(socket)) - .repeat(1) + .repeat() .flatMap( receivePacket -> { var message = packetMessage(receivePacket); diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryService.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryService.java index 7d67235..ec991c8 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryService.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryService.java @@ -2,43 +2,84 @@ package org.mtbo.lcloud.discovery; +import java.io.Closeable; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.logging.Logger; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + /** Packets pull-push loop thread */ -public final class DiscoveryService extends Thread { - private final PacketSource source; - private final PacketProcessor processor; +public abstract class DiscoveryService< + SocketType extends Closeable, PacketType extends Packet> { + + private final ByteBuffer prefix; + private final byte[] sendData; /** - * Construct loop + * Construct service with config * - * @param instanceName instance name - * @param serviceName instance namespace - * @param connection to push packets + * @param config configuration, including {@link ServiceConfig#serviceName service name} */ - public DiscoveryService(String instanceName, String serviceName, Connection connection) { - source = new PacketSource(connection); - processor = new PacketProcessor(instanceName, serviceName, connection); - source.subscribe(processor); + public DiscoveryService(ServiceConfig config) { + this.prefix = + ByteBuffer.wrap( + ("UDP_DISCOVERY_REQUEST " + config.serviceName).getBytes(StandardCharsets.UTF_8)) + .asReadOnlyBuffer(); + this.sendData = + ("UDP_DISCOVERY_RESPONSE " + config.instanceName).getBytes(StandardCharsets.UTF_8); } - @Override - public void run() { - - while (!interrupted()) { - source.process(); - } + /** + * Create listen operator + * + * @param connection used for listen + * @return flux with true in case of accepted packet, else false + */ + public Flux listen(Connection connection) { + return Flux.from(connection.socket()) + .flatMap( + socket -> + Flux.using( + () -> socket, + (listenSocket) -> loop(connection, listenSocket), + connection::close, + true) + .publishOn(Schedulers.boundedElastic())); } - /** Break loop */ - public void shutdown() { - processor.shutdown(); - interrupt(); - if (!interrupted()) { - try { - join(); - } catch (InterruptedException ignored) { - } - } - - source.close(); + private Flux loop( + Connection connection, SocketType listenSocket) { + return connection + .receive(listenSocket) + .zipWith(Mono.just(listenSocket)) + .doOnNext( + tuple -> { + ByteBuffer data = tuple.getT1().data(); + logger.finer( + ">>> Packet received; packet: " + + new String(data.array(), data.arrayOffset(), data.limit())); + }) + .repeat() + .flatMap( + tuple -> { + PacketType packet = tuple.getT1(); + SocketType sendSocket = tuple.getT2(); + + if (prefix.equals(packet.data().slice(0, prefix.limit()))) { + return connection + .send(sendSocket, packet.copyWithData(ByteBuffer.wrap(sendData))) + .doOnError( + throwable -> { + logger.severe(throwable.getMessage()); + logger.throwing(DiscoveryService.class.getSimpleName(), "run", throwable); + }); + } else { + return Mono.just(false); + } + }); } + + static final Logger logger = Logger.getLogger(DiscoveryService.class.getName()); } diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Listener.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Listener.java index 7d0cee6..1f88a0a 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Listener.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Listener.java @@ -2,14 +2,15 @@ package org.mtbo.lcloud.discovery; -import java.util.concurrent.CompletableFuture; +import reactor.core.publisher.Mono; /** Allows to receive packets */ -public interface Listener { +public interface Listener { /** * Receive packets * + * @param socket on which to receive packet * @return packet from incoming connection */ - CompletableFuture receive(); + Mono receive(SocketType socket); } diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Packet.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Packet.java index c84fd00..56509c2 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Packet.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Packet.java @@ -5,7 +5,7 @@ import java.nio.ByteBuffer; /** Piece of data */ -public interface Packet { +public interface Packet { /** * @return wrapped data @@ -16,5 +16,5 @@ public interface Packet { * @param wrap new data * @return new packet with data replaced. Remaining field should be copied. */ - Packet copyWithData(ByteBuffer wrap); + PacketType copyWithData(ByteBuffer wrap); } diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketProcessor.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketProcessor.java deleted file mode 100644 index 114d082..0000000 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketProcessor.java +++ /dev/null @@ -1,81 +0,0 @@ -/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ - -package org.mtbo.lcloud.discovery; - -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.Flow; -import java.util.logging.Logger; - -/** Process {@link Packet packets} and send responses to {@link #sender} */ -public class PacketProcessor implements Flow.Subscriber { - private final Sender sender; - - private final ByteBuffer prefix; - private final byte[] sendData; - - private Flow.Subscription subscription; - - /** - * Constructor - * - * @param instanceName service instance name - * @param serviceName service name - * @param sender downstream - */ - public PacketProcessor(String instanceName, String serviceName, Sender sender) { - this.sender = sender; - - this.prefix = - ByteBuffer.wrap(("UDP_DISCOVERY_REQUEST " + serviceName).getBytes(StandardCharsets.UTF_8)) - .asReadOnlyBuffer(); - this.sendData = ("UDP_DISCOVERY_RESPONSE " + instanceName).getBytes(StandardCharsets.UTF_8); - } - - /** Shutdown {@link #subscription} */ - public void shutdown() { - subscription.cancel(); - } - - @Override - public void onSubscribe(Flow.Subscription subscription) { - this.subscription = subscription; - subscription.request(1); - } - - @Override - public void onNext(Packet item) { - ByteBuffer data = item.data(); - logger.finer( - ">>> Packet received; packet: " - + new String(data.array(), data.arrayOffset(), data.limit())); - - if (prefix.equals(data.slice(0, prefix.limit()))) { - // Send a response - sender - .send(item.copyWithData(ByteBuffer.wrap(sendData))) - .exceptionallyAsync( - throwable -> { - logger.severe(throwable.getMessage()); - logger.throwing(PacketProcessor.class.getSimpleName(), "run", throwable); - return null; - }); - } - - // Enqueue next packet processing - subscription.request(1); - } - - @Override - public void onError(Throwable throwable) { - logger.severe(throwable.getMessage()); - logger.throwing(PacketProcessor.class.getSimpleName(), "onError", throwable); - } - - @Override - public void onComplete() { - logger.severe("Stop response"); - } - - static final Logger logger = Logger.getLogger(PacketProcessor.class.getName()); -} diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketSource.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketSource.java deleted file mode 100644 index cba5a9d..0000000 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketSource.java +++ /dev/null @@ -1,56 +0,0 @@ -/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ - -package org.mtbo.lcloud.discovery; - -import java.util.concurrent.SubmissionPublisher; -import java.util.logging.Logger; -import org.mtbo.lcloud.discovery.exceptions.NetworkException; - -/** Incoming packet {@link java.util.concurrent.Flow.Publisher publisher} */ -public final class PacketSource extends SubmissionPublisher { - - private final Listener listener; - - /** - * Constructor - * - * @param listener incoming packets source - */ - public PacketSource(Listener listener) { - this.listener = listener; - } - - /** Network receive/send step. */ - public void process() { - listener - .receive() - .handleAsync( - (packet, throwable) -> { - if (throwable instanceof RuntimeException) { - throw (RuntimeException) throwable; - } - return offer(packet, (subscriber, packetUnused) -> false); - }) - .exceptionallyAsync( - throwable -> { - logger.severe("Unable to receive: " + throwable.getMessage()); - throwable.printStackTrace(); - logger.throwing(PacketSource.class.getName(), "process", throwable); - if (throwable instanceof NetworkException - || !(throwable instanceof RuntimeException)) { - return Integer.MAX_VALUE; - } else { - throw (RuntimeException) throwable; - } - }) - .thenAcceptAsync( - lagOrDrop -> { - if (1 != lagOrDrop) { - logger.finer("LAG OR DROP: " + lagOrDrop); - } - }) - .join(); - } - - static final Logger logger = Logger.getLogger(PacketSource.class.getName()); -} diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Sender.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Sender.java index e5d95d0..2986fb9 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Sender.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Sender.java @@ -2,15 +2,16 @@ package org.mtbo.lcloud.discovery; -import java.util.concurrent.CompletableFuture; +import reactor.core.publisher.Mono; /** Allows to send packets */ -public interface Sender { +public interface Sender { /** - * Send packet + * Send packet operator * + * @param socket on which to send packet * @param packet packet with data and connection parameters - * @return void future + * @return void mono */ - CompletableFuture send(Packet packet); + Mono send(SocketType socket, PacketType packet); } diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/ServiceConfig.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/ServiceConfig.java new file mode 100644 index 0000000..b17b461 --- /dev/null +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/ServiceConfig.java @@ -0,0 +1,48 @@ +/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ + +package org.mtbo.lcloud.discovery; + +import java.util.Objects; + +/** Discovery config */ +public abstract class ServiceConfig { + /** unique service identifier */ + public final String serviceName; + + /** this endpoint name */ + public final String instanceName; + + /** + * @param serviceName unique service identifier + * @param instanceName this endpoint name + */ + public ServiceConfig(String serviceName, String instanceName) { + this.serviceName = serviceName; + this.instanceName = instanceName; + } + + @Override + public String toString() { + return "ServiceConfig{" + + "serviceName='" + + serviceName + + '\'' + + ", instanceName='" + + instanceName + + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + ServiceConfig that = (ServiceConfig) o; + return Objects.equals(serviceName, that.serviceName) + && Objects.equals(instanceName, that.instanceName); + } + + @Override + public int hashCode() { + return Objects.hash(serviceName, instanceName); + } +} diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpClientConfig.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpClientConfig.java new file mode 100644 index 0000000..68db9ed --- /dev/null +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpClientConfig.java @@ -0,0 +1,54 @@ +/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ + +package org.mtbo.lcloud.discovery.udp; + +import java.util.Objects; +import org.mtbo.lcloud.discovery.ClientConfig; + +/** {@link UdpDiscoveryClient UDP discovery client's} config */ +public final class UdpClientConfig extends ClientConfig { + /** UDP port */ + public final int port; + + /** + * Constructor + * + * @param serviceName unique service identifier + * @param clientsCount max instances list size (default to {@link Integer#MAX_VALUE}) + * @param port UDP port + */ + @SuppressWarnings("unused") + public UdpClientConfig(String serviceName, int clientsCount, int port) { + super(serviceName, clientsCount); + this.port = port; + } + + /** + * Constructor with defaults + * + * @param serviceName unique service identifier + * @param port UDP port + */ + public UdpClientConfig(String serviceName, int port) { + super(serviceName); + this.port = port; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + UdpClientConfig udpConfig = (UdpClientConfig) o; + return port == udpConfig.port; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), port); + } + + @Override + public String toString() { + return "UdpConfig{" + "port=" + port + '}'; + } +} diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpConnection.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpConnection.java index d9e86bd..4989548 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpConnection.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpConnection.java @@ -2,104 +2,87 @@ package org.mtbo.lcloud.discovery.udp; -import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetSocketAddress; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.util.concurrent.CompletableFuture; import java.util.logging.Logger; import org.mtbo.lcloud.discovery.Connection; -import org.mtbo.lcloud.discovery.Packet; -import org.mtbo.lcloud.discovery.exceptions.NetworkException; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; /** Allows to send and receive packets using UDP connection */ -public final class UdpConnection implements Connection { +public final class UdpConnection implements Connection { - private final DatagramSocket socket; + final int port; /** - * Create and bind broadcast UDP socket + * UDP connection * * @param port for incoming packets - * @throws SocketException in case of bind error */ - public UdpConnection(final int port) throws SocketException { - socket = new DatagramSocket(null); - socket.setReuseAddress(true); - socket.setBroadcast(true); - socket.bind(new InetSocketAddress(port)); + public UdpConnection(int port) { + this.port = port; } + /** Create and bind broadcast UDP socket */ @Override - public CompletableFuture receive() { - return CompletableFuture.supplyAsync( - () -> { - byte[] buffer = new byte[4096]; + public Mono socket() { + return Mono.fromCallable( + () -> { + var datagramSocket = new DatagramSocket(null); + datagramSocket.setReuseAddress(true); + datagramSocket.setBroadcast(true); + datagramSocket.bind(new InetSocketAddress(port)); + return datagramSocket; + }) + .publishOn(Schedulers.boundedElastic()); + } - DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + @Override + public void close(DatagramSocket socket) { + socket.close(); + } + + @Override + public Mono receive(DatagramSocket socket) { - try { - socket.receive(packet); - } catch (IOException e) { - throw new NetworkException(e); - } + return Mono.fromCallable( + () -> { + byte[] buffer = new byte[4096]; - logger.finest( - ">>> Discovery packet received from: " + packet.getAddress().getHostAddress()); + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); - return new UdpPacket(packet); - }); + socket.receive(packet); + return packet; + }) + .publishOn(Schedulers.boundedElastic()) + .map(UdpPacket::new); } @Override - public CompletableFuture send(Packet packet) { - return CompletableFuture.runAsync( - () -> { - assert packet instanceof UdpPacket; - final var udpPacket = (UdpPacket) packet; - final var data = udpPacket.data(); - byte[] array = data.array(); - final var sendPacket = - new DatagramPacket( - array, - data.arrayOffset(), - data.limit(), - udpPacket.packet.getAddress(), - udpPacket.packet.getPort()); - try { - socket.send(sendPacket); - } catch (IOException e) { - logger.severe("Failed to send UDP packet: " + e); - throw new RuntimeException(e); - } - logger.finer( - ">>> Sent packet to: " - + sendPacket.getAddress().getHostAddress() - + ":" - + udpPacket.packet.getPort()); - }); + public Mono send(DatagramSocket socket, UdpPacket packet) { + return Mono.fromCallable( + () -> { + final var data = packet.data(); + byte[] array = data.array(); + final var sendPacket = + new DatagramPacket( + array, + data.arrayOffset(), + data.limit(), + packet.packet().getAddress(), + packet.packet().getPort()); + socket.send(sendPacket); + logger.finer( + ">>> Sent packet to: " + + sendPacket.getAddress().getHostAddress() + + ":" + + packet.packet().getPort()); + + return true; + }) + .publishOn(Schedulers.boundedElastic()); } static final Logger logger = Logger.getLogger(UdpConnection.class.getName()); - - private record UdpPacket(DatagramPacket packet) implements Packet { - - @Override - public ByteBuffer data() { - return ByteBuffer.wrap(packet.getData(), packet.getOffset(), packet.getLength()); - } - - @Override - public Packet copyWithData(ByteBuffer wrap) { - return new UdpPacket( - new DatagramPacket( - wrap.array(), - wrap.arrayOffset(), - wrap.limit(), - packet.getAddress(), - packet.getPort())); - } - } } diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryClient.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryClient.java index df81534..f63a899 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryClient.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryClient.java @@ -3,7 +3,6 @@ package org.mtbo.lcloud.discovery.udp; import java.net.*; -import java.util.Objects; import java.util.logging.Logger; import java.util.stream.Stream; import org.mtbo.lcloud.discovery.DiscoveryClient; @@ -18,10 +17,10 @@ public class UdpDiscoveryClient /** * Construct client with config * - * @param config configuration, including {@link Config#serviceName service name} and {@link - * UdpConfig#port} + * @param config configuration, including {@link UdpClientConfig#serviceName service name} and + * {@link UdpClientConfig#port} */ - public UdpDiscoveryClient(UdpConfig config) { + public UdpDiscoveryClient(UdpClientConfig config) { super(config); } @@ -42,7 +41,7 @@ protected Mono sendPacket(DatagramSocket socket, DatagramPacket @Override protected DatagramPacket createSendPacket(byte[] sendData, InetAddress address) { - return new DatagramPacket(sendData, sendData.length, address, ((UdpConfig) config).port); + return new DatagramPacket(sendData, sendData.length, address, ((UdpClientConfig) config).port); } @Override @@ -83,10 +82,10 @@ protected Flux getMainBroadcastAddresses() { return Flux.from( Mono.fromCallable(() -> InetAddress.getByName("255.255.255.255")) .publishOn(Schedulers.boundedElastic())) - .doOnNext(inetAddress -> logger.fine("get main address (REAL): " + inetAddress.toString())) + .doOnNext(inetAddress -> logger.finer("get main address (REAL): " + inetAddress.toString())) .cache() .doOnNext( - inetAddress -> logger.fine("get main address (CACHED): " + inetAddress.toString())); + inetAddress -> logger.finer("get main address (CACHED): " + inetAddress.toString())); } @Override @@ -124,60 +123,5 @@ private boolean canSend(NetworkInterface networkInterface) { } } - /** {@link UdpDiscoveryClient UDP discovery client's} config */ - public static final class UdpConfig extends Config { - private final int port; - - /** - * Constructor - * - * @param serviceName unique service identifier - * @param clientsCount max instances list size (default to {@link Integer#MAX_VALUE}) - * @param port UDP port - */ - @SuppressWarnings("unused") - public UdpConfig(String serviceName, int clientsCount, int port) { - super(serviceName, clientsCount); - this.port = port; - } - - /** - * Constructor with defaults - * - * @param serviceName unique service identifier - * @param port UDP port - */ - public UdpConfig(String serviceName, int port) { - super(serviceName); - this.port = port; - } - - @Override - public boolean equals(Object o) { - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; - UdpConfig udpConfig = (UdpConfig) o; - return port == udpConfig.port; - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), port); - } - - @Override - public String toString() { - return "UdpConfig{" - + "port=" - + port - + ", serviceName='" - + serviceName - + '\'' - + ", clientsCount=" - + clientsCount - + '}'; - } - } - static final Logger logger = Logger.getLogger(UdpDiscoveryClient.class.getName()); } diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryService.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryService.java new file mode 100644 index 0000000..a83b776 --- /dev/null +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryService.java @@ -0,0 +1,20 @@ +/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ + +package org.mtbo.lcloud.discovery.udp; + +import java.net.DatagramSocket; +import org.mtbo.lcloud.discovery.DiscoveryService; +import org.mtbo.lcloud.discovery.ServiceConfig; + +/** UDP implementation of {@link DiscoveryService} */ +public class UdpDiscoveryService extends DiscoveryService { + + /** + * Construct service with config + * + * @param config configuration, including {@link ServiceConfig#serviceName service name} + */ + public UdpDiscoveryService(UdpServiceConfig config) { + super(config); + } +} diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpPacket.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpPacket.java new file mode 100644 index 0000000..64d3ea0 --- /dev/null +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpPacket.java @@ -0,0 +1,27 @@ +/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ + +package org.mtbo.lcloud.discovery.udp; + +import java.net.DatagramPacket; +import java.nio.ByteBuffer; +import org.mtbo.lcloud.discovery.Packet; + +/** + * UDP packet implementation of {@link Packet} + * + * @param packet + */ +public record UdpPacket(DatagramPacket packet) implements Packet { + + @Override + public ByteBuffer data() { + return ByteBuffer.wrap(packet.getData(), packet.getOffset(), packet.getLength()); + } + + @Override + public UdpPacket copyWithData(ByteBuffer wrap) { + return new UdpPacket( + new DatagramPacket( + wrap.array(), wrap.arrayOffset(), wrap.limit(), packet.getAddress(), packet.getPort())); + } +} diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpServiceConfig.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpServiceConfig.java new file mode 100644 index 0000000..d9a66de --- /dev/null +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpServiceConfig.java @@ -0,0 +1,39 @@ +/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ + +package org.mtbo.lcloud.discovery.udp; + +import java.util.Objects; +import org.mtbo.lcloud.discovery.ServiceConfig; + +/** UDP implementation of {@link ServiceConfig} */ +public class UdpServiceConfig extends ServiceConfig { + private final int port; + + /** + * @param serviceName unique service identifier + * @param instanceName this endpoint name + * @param port UDP port + */ + public UdpServiceConfig(String serviceName, String instanceName, int port) { + super(serviceName, instanceName); + this.port = port; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + UdpServiceConfig that = (UdpServiceConfig) o; + return port == that.port; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), port); + } + + @Override + public String toString() { + return "UdpServiceConfig{" + "port=" + port + '}'; + } +}