From ea595e7ee2dcbe5b4d73c4c0bbc33cb3f1eb7713 Mon Sep 17 00:00:00 2001 From: "vladimir.koltunov" Date: Sun, 18 May 2025 12:34:55 +0300 Subject: [PATCH 1/6] send reactor --- build.gradle | 9 +- lcloud-udp-discovery-example/build.gradle | 2 +- .../src/main/java/ExampleService.java | 50 +++--- lcloud-udp-discovery/build.gradle | 2 +- .../lcloud/discovery/udp/DiscoveryClient.java | 160 +++++++++++------- 5 files changed, 133 insertions(+), 90 deletions(-) diff --git a/build.gradle b/build.gradle index 3bfe914..8b2c85a 100644 --- a/build.gradle +++ b/build.gradle @@ -15,7 +15,7 @@ def javaProjects = [ configure(javaProjects) { - version '2.0.0' + version '3.0.0' repositories { mavenCentral() if (project.version.endsWith('-SNAPSHOT')) { @@ -24,6 +24,13 @@ configure(javaProjects) { } } + apply plugin: 'java-library' + + dependencies { + implementation "io.projectreactor:reactor-core:3.8.0-M3" + testImplementation "io.projectreactor:reactor-test:3.8.0-M3" + } + apply plugin: 'com.diffplug.spotless' spotless { diff --git a/lcloud-udp-discovery-example/build.gradle b/lcloud-udp-discovery-example/build.gradle index 5a212f1..a2063d0 100644 --- a/lcloud-udp-discovery-example/build.gradle +++ b/lcloud-udp-discovery-example/build.gradle @@ -4,7 +4,7 @@ plugins { } group = 'org.mtbo.lcloud' -version = '2.0.0-SNAPSHOT' +version = '3.0.0-SNAPSHOT' apply plugin: 'application' mainClassName = 'ExampleService' diff --git a/lcloud-udp-discovery-example/src/main/java/ExampleService.java b/lcloud-udp-discovery-example/src/main/java/ExampleService.java index 66907be..4fcf8df 100644 --- a/lcloud-udp-discovery-example/src/main/java/ExampleService.java +++ b/lcloud-udp-discovery-example/src/main/java/ExampleService.java @@ -2,7 +2,6 @@ import java.net.SocketException; import java.util.Optional; -import java.util.Set; import java.util.UUID; import java.util.logging.ConsoleHandler; import java.util.logging.Handler; @@ -39,46 +38,37 @@ public static void main(String[] args) throws InterruptedException, SocketExcept DiscoveryClient discoveryClient = new DiscoveryClient("xService", 8888); - var th = getThread(discoveryClient); + discoveryClient + .lookup() + .doOnNext( + instances -> { + String joined = instances.stream().sorted().collect(Collectors.joining("\n")); - th.join(5000); + System.out.println("***********************************************"); + System.out.println("Instances Discovered:\n\n" + joined); + System.out.println("***********************************************\n"); + }) + .doOnError( + throwable -> { + System.out.println("Lookup Error: " + throwable.getMessage()); + throwable.printStackTrace(); + }) + .onErrorComplete(throwable -> !(throwable instanceof InterruptedException)) + .subscribe(); - th.interrupt(); + Thread.sleep(5000); discoveryService.shutdown(); } - private static Thread getThread(DiscoveryClient discoveryClient) { - var th = - new Thread( - () -> { - while (true) { - Set instances = null; - try { - instances = discoveryClient.lookup(); - } catch (InterruptedException | SocketException e) { - break; - } - - String joined = instances.stream().sorted().collect(Collectors.joining("\n")); - - System.out.println("***********************************************"); - System.out.println("Instances Discovered:\n\n" + joined); - System.out.println("***********************************************\n"); - } - }); - - th.start(); - return th; - } - // static Logger logger = Logger.getLogger(ExampleService.class.getName()); static { Handler handler = new ConsoleHandler(); - handler.setLevel(Level.FINE); + Level level = Level.FINE; + handler.setLevel(level); Logger logger1 = Logger.getLogger(""); - logger1.setLevel(Level.FINE); + logger1.setLevel(level); logger1.addHandler(handler); } } diff --git a/lcloud-udp-discovery/build.gradle b/lcloud-udp-discovery/build.gradle index 801739f..e9ed915 100644 --- a/lcloud-udp-discovery/build.gradle +++ b/lcloud-udp-discovery/build.gradle @@ -4,7 +4,7 @@ plugins { } group = 'org.mtbo.lcloud' -version = '2.0.0-SNAPSHOT' +version = '3.0.0-SNAPSHOT' repositories { mavenCentral() diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/DiscoveryClient.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/DiscoveryClient.java index 190a080..4b4797f 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/DiscoveryClient.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/DiscoveryClient.java @@ -4,9 +4,13 @@ import java.io.IOException; import java.net.*; -import java.util.HashSet; +import java.time.Duration; import java.util.Set; import java.util.logging.Logger; +import java.util.stream.Stream; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class DiscoveryClient { @@ -19,65 +23,107 @@ public DiscoveryClient(String serviceName, int port) { this.port = port; } - public Set lookup() throws InterruptedException, SocketException { - try (DatagramSocket socket = new DatagramSocket()) { - // Open a random port to send the package - socket.setBroadcast(true); - - var sendData = ("UDP_DISCOVERY_REQUEST " + serviceName).getBytes(); - - // Try the 255.255.255.255 first - try { - var sendPacket = - new DatagramPacket( - sendData, sendData.length, InetAddress.getByName("255.255.255.255"), port); - socket.send(sendPacket); - logger.finer(">>> Request packet sent to: 255.255.255.255 (DEFAULT)"); - } catch (Exception ignored) { - } - - // Broadcast the message over all the network interfaces - var interfaces = NetworkInterface.getNetworkInterfaces(); - - while (interfaces.hasMoreElements()) { - var networkInterface = interfaces.nextElement(); - - if (networkInterface.isLoopback() || !networkInterface.isUp()) { - continue; // Don't want to broadcast to the loopback interface - } - - for (var interfaceAddress : networkInterface.getInterfaceAddresses()) { - var broadcast = interfaceAddress.getBroadcast(); - if (broadcast == null) { - continue; - } - - // Send the broadcast package! - try { - var sendPacket = new DatagramPacket(sendData, sendData.length, broadcast, port); - socket.send(sendPacket); - } catch (Exception ignored) { - } - - logger.finer( - ">>> Request packet sent to: " - + broadcast.getHostAddress() - + "; Interface: " - + networkInterface.getDisplayName()); - } - } - - logger.finer(">>> Done looping over all network interfaces. Now waiting for a reply!"); - - var result = new HashSet(); - - var th = getThread(socket, result); - th.join(1000); - - return result; + private static boolean canSend(NetworkInterface networkInterface) { + try { + return !networkInterface.isLoopback() && networkInterface.isUp(); + } catch (SocketException e) { + // TODO + return false; } } + public Flux> lookup() { + Flux mainAddresses = getMainAddresses(); + return Flux.interval(Duration.ofMillis(1000)) + .flatMap(aLong -> Flux.from(send(mainAddresses)).doOnEach( + signal -> { + System.out.println("-----------------------"); + System.out.println("[" + signal + "]"); + System.out.println("-----------------------"); + }).flatMap(count -> Flux.from(receive()))); + } + + /** + * Wrap address to socket pair. + * + * @param inetAddress + * @param socket + */ + public record AddressSocket(InetAddress inetAddress, DatagramSocket socket) {} + + private Mono send(Flux mainAddresses) { + + // Try the main first + Flux addresses = mainAddresses.concatWith(getAdditionalAddresses()); + + Flux v = + addresses.zipWith( + Mono.fromCallable(DatagramSocket::new).publishOn(Schedulers.boundedElastic()).repeat(), + AddressSocket::new); + + + var sendData = ("UDP_DISCOVERY_REQUEST " + serviceName).getBytes(); + + return v.flatMap( + addressWithSocket -> { + InetAddress address = addressWithSocket.inetAddress; + DatagramSocket socket = addressWithSocket.socket; + + logger.fine("Try to send: " + address.getHostAddress() + " " + socket.isClosed()); + + var sendPacket = new DatagramPacket(sendData, sendData.length, address, port); + + return Flux.from( + Mono.fromCallable( + () -> { + socket.send(sendPacket); + return 0; + }) + .publishOn(Schedulers.boundedElastic()) + .doOnError(throwable -> logger.warning("send error: " + throwable)) + .onErrorResume((throwable) -> Mono.empty()) + .doOnNext( + o -> + logger.finer( + ">>> Request packet sent to: " + address.getHostAddress()))); + }) + .count(); + } + + private Mono> receive() { + return Mono.just(Set.of()); + } + + /** + * Get IPv4 network broadcast address + * + * @return cached local broadcast address + */ + public static Flux getMainAddresses() { + return Flux.from( + Mono.fromCallable(() -> InetAddress.getByName("255.255.255.255")) + .publishOn(Schedulers.boundedElastic())) + .doOnNext(inetAddress -> logger.fine("get main address (REAL): " + inetAddress.toString())) + .cache() + .doOnNext( + inetAddress -> logger.fine("get main address (CACHED): " + inetAddress.toString())); + } + + public static Flux getAdditionalAddresses() { + return getNetworkInterfaces() + .flatMap(Flux::fromStream) + .filter(DiscoveryClient::canSend) + .flatMapIterable(NetworkInterface::getInterfaceAddresses) + .mapNotNull(InterfaceAddress::getBroadcast); + } + + private static Flux> getNetworkInterfaces() { + return Flux.from( + Mono.fromCallable(NetworkInterface::networkInterfaces) + .publishOn(Schedulers.boundedElastic()) + .onErrorResume(e -> Mono.just(Stream.empty()))); + } + private Thread getThread(DatagramSocket socket, Set result) { result.clear(); From ab0d9fca4a23a552efcb9f83f61658bf6d4424b3 Mon Sep 17 00:00:00 2001 From: "vladimir.koltunov" Date: Sun, 18 May 2025 19:40:07 +0300 Subject: [PATCH 2/6] Reactor client implementation --- .../src/main/java/ExampleService.java | 18 +- .../lcloud/discovery/DiscoveryService.java | 27 +- .../lcloud/discovery/PacketProcessor.java | 9 + .../mtbo/lcloud/discovery/PacketSource.java | 22 +- .../exceptions/NetworkException.java | 8 + .../lcloud/discovery/udp/DiscoveryClient.java | 257 +++++++++++------- 6 files changed, 215 insertions(+), 126 deletions(-) diff --git a/lcloud-udp-discovery-example/src/main/java/ExampleService.java b/lcloud-udp-discovery-example/src/main/java/ExampleService.java index 4fcf8df..19f9272 100644 --- a/lcloud-udp-discovery-example/src/main/java/ExampleService.java +++ b/lcloud-udp-discovery-example/src/main/java/ExampleService.java @@ -1,6 +1,7 @@ /* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ import java.net.SocketException; +import java.time.Duration; import java.util.Optional; import java.util.UUID; import java.util.logging.ConsoleHandler; @@ -36,36 +37,37 @@ public static void main(String[] args) throws InterruptedException, SocketExcept discoveryService.setDaemon(true); discoveryService.start(); - DiscoveryClient discoveryClient = new DiscoveryClient("xService", 8888); + DiscoveryClient.Config config = new DiscoveryClient.Config("xService", 8888); + DiscoveryClient discoveryClient = new DiscoveryClient(config); discoveryClient - .lookup() + .startLookup(Duration.ofSeconds(2)) .doOnNext( instances -> { String joined = instances.stream().sorted().collect(Collectors.joining("\n")); System.out.println("***********************************************"); - System.out.println("Instances Discovered:\n\n" + joined); + System.out.println(config.serviceName() + " instances are discovered:\n\n" + joined); System.out.println("***********************************************\n"); }) .doOnError( throwable -> { - System.out.println("Lookup Error: " + throwable.getMessage()); - throwable.printStackTrace(); + logger.severe("Lookup Error: " + throwable.getMessage()); + logger.throwing(ExampleService.class.getName(), "main", throwable); }) .onErrorComplete(throwable -> !(throwable instanceof InterruptedException)) .subscribe(); - Thread.sleep(5000); + Thread.sleep(10000); discoveryService.shutdown(); } - // static Logger logger = Logger.getLogger(ExampleService.class.getName()); + static Logger logger = Logger.getLogger(ExampleService.class.getName()); static { Handler handler = new ConsoleHandler(); - Level level = Level.FINE; + Level level = Level.INFO; handler.setLevel(level); Logger logger1 = Logger.getLogger(""); logger1.setLevel(level); diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryService.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryService.java index 096ceb6..7d67235 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryService.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryService.java @@ -2,8 +2,6 @@ package org.mtbo.lcloud.discovery; -import java.util.concurrent.ExecutionException; - /** Packets pull-push loop thread */ public final class DiscoveryService extends Thread { private final PacketSource source; @@ -26,30 +24,21 @@ public DiscoveryService(String instanceName, String serviceName, Connection conn public void run() { while (!interrupted()) { - try { - source.process().get(); - } catch (InterruptedException e) { - break; - } catch (ExecutionException e) { - throw new RuntimeException(e); - } + source.process(); } } - /** - * Break loop - * - */ + /** Break loop */ public void shutdown() { - source.close(); processor.shutdown(); interrupt(); if (!interrupted()) { - try { - join(); - } catch (InterruptedException ignored) { - - } + try { + join(); + } catch (InterruptedException ignored) { + } } + + source.close(); } } diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketProcessor.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketProcessor.java index f0cde39..114d082 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketProcessor.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketProcessor.java @@ -7,6 +7,7 @@ import java.util.concurrent.Flow; import java.util.logging.Logger; +/** Process {@link Packet packets} and send responses to {@link #sender} */ public class PacketProcessor implements Flow.Subscriber { private final Sender sender; @@ -15,6 +16,13 @@ public class PacketProcessor implements Flow.Subscriber { private Flow.Subscription subscription; + /** + * Constructor + * + * @param instanceName service instance name + * @param serviceName service name + * @param sender downstream + */ public PacketProcessor(String instanceName, String serviceName, Sender sender) { this.sender = sender; @@ -24,6 +32,7 @@ public PacketProcessor(String instanceName, String serviceName, Sender sender) { this.sendData = ("UDP_DISCOVERY_RESPONSE " + instanceName).getBytes(StandardCharsets.UTF_8); } + /** Shutdown {@link #subscription} */ public void shutdown() { subscription.cancel(); } diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketSource.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketSource.java index 8f8ef63..1e256fb 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketSource.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketSource.java @@ -2,22 +2,27 @@ package org.mtbo.lcloud.discovery; -import org.mtbo.lcloud.discovery.exceptions.NetworkException; - -import java.util.concurrent.CompletableFuture; import java.util.concurrent.SubmissionPublisher; import java.util.logging.Logger; +import org.mtbo.lcloud.discovery.exceptions.NetworkException; +/** Incoming packet {@link java.util.concurrent.Flow.Publisher publisher} */ public final class PacketSource extends SubmissionPublisher { private final Listener listener; + /** + * Constructor + * + * @param listener incoming packets source + */ public PacketSource(Listener listener) { this.listener = listener; } - public CompletableFuture process() { - return listener + /** Network receive/send step. */ + public void process() { + listener .receive() .handleAsync( (packet, throwable) -> { @@ -28,9 +33,9 @@ public CompletableFuture process() { }) .exceptionallyAsync( throwable -> { - - throwable.printStackTrace(); logger.severe("Unable to receive: " + throwable.getMessage()); + throwable.printStackTrace(); + logger.throwing(PacketSource.class.getName(), "process", throwable); if (throwable instanceof NetworkException || !(throwable instanceof RuntimeException)) { return Integer.MAX_VALUE; @@ -38,7 +43,8 @@ public CompletableFuture process() { throw (RuntimeException) throwable; } }) - .thenAcceptAsync(lagOrDrop -> logger.finer("LAG OR DROP: " + lagOrDrop)); + .thenAcceptAsync(lagOrDrop -> logger.finer("LAG OR DROP: " + lagOrDrop)) + .join(); } static final Logger logger = Logger.getLogger(PacketSource.class.getName()); diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/exceptions/NetworkException.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/exceptions/NetworkException.java index 2e4fd77..3039c34 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/exceptions/NetworkException.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/exceptions/NetworkException.java @@ -1,6 +1,14 @@ +/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ + package org.mtbo.lcloud.discovery.exceptions; +/** Unchecked network error to throw from reactive operators */ public class NetworkException extends RuntimeException { + /** + * Throwable wrapping constructor + * + * @param cause wrapped throwable + */ public NetworkException(Throwable cause) { super(cause); } diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/DiscoveryClient.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/DiscoveryClient.java index 4b4797f..fbdefcf 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/DiscoveryClient.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/DiscoveryClient.java @@ -2,96 +2,197 @@ package org.mtbo.lcloud.discovery.udp; -import java.io.IOException; import java.net.*; import java.time.Duration; -import java.util.Set; +import java.util.HashSet; import java.util.logging.Logger; import java.util.stream.Stream; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +/** Allows to request network services named by some {@link Config#serviceName serice name} */ public class DiscoveryClient { - private final String serviceName; + private final Config config; - private final int port; + /** + * Construct client with config + * + * @param config configuration, including {@link Config#serviceName service name} and {@link + * Config#port port} + */ + public DiscoveryClient(Config config) { + this.config = config; + } - public DiscoveryClient(String serviceName, int port) { - this.serviceName = serviceName; - this.port = port; + /** + * {@link DiscoveryClient discovery client's} config + * + * @param serviceName unique service identifier + * @param port UDP port + * @param clientsCount optional clients buffer size, {@link Integer#MAX_VALUE} for unlimited + */ + public record Config(String serviceName, int port, int clientsCount) { + /** + * Constructor with defaults + * + * @param serviceName unique service identifier + * @param port UDP port + */ + public Config(String serviceName, int port) { + this(serviceName, port, Integer.MAX_VALUE); + } } - private static boolean canSend(NetworkInterface networkInterface) { - try { - return !networkInterface.isLoopback() && networkInterface.isUp(); - } catch (SocketException e) { - // TODO - return false; + /** + * Periodically look for network services named by {@link Config#serviceName} receiveInterval is + * calculated with minus 100 millis + * + * @param interval delay between requests + * @return {@link Publisher} of service instances id's set + */ + public Flux> startLookup(Duration interval) { + Duration receiveTimeout = interval.minus(Duration.ofMillis(100)); + + if (receiveTimeout.isNegative()) { + receiveTimeout = interval; } + + return startLookup(interval, receiveTimeout); + } + + /** + * Periodically look for network services named by {@link Config#serviceName} receiveInterval is + * calculated with minus 100 millis + * + * @param interval delay between requests + * @param receiveTimeout time interval to check responses from instances + * @return {@link Publisher} of service instances id's set + */ + public Flux> startLookup(Duration interval, Duration receiveTimeout) { + Flux mainAddresses = getMainAddresses(); + + return Flux.interval(interval) + .flatMap(unused -> lookupOnceInternal(receiveTimeout, mainAddresses)) + .repeat(); } - public Flux> lookup() { + /** + * Lookup services once + * + * @param receiveTimeout time interval to check responses from instances + * @return Single or zero instance {@link Publisher} of service instances id's set + */ + public Mono> lookupOnce(Duration receiveTimeout) { Flux mainAddresses = getMainAddresses(); - return Flux.interval(Duration.ofMillis(1000)) - .flatMap(aLong -> Flux.from(send(mainAddresses)).doOnEach( - signal -> { - System.out.println("-----------------------"); - System.out.println("[" + signal + "]"); - System.out.println("-----------------------"); - }).flatMap(count -> Flux.from(receive()))); + + return Mono.from(lookupOnceInternal(receiveTimeout, mainAddresses)); + } + + /** + * Internal method reuses cached mainAddresses + * + * @param receiveTimeout + * @param mainAddresses + * @return + */ + private Flux> lookupOnceInternal( + Duration receiveTimeout, Flux mainAddresses) { + return Flux.from(send(mainAddresses)) + .flatMap(this::receive) + .bufferTimeout(128, receiveTimeout) + .map(HashSet::new); } /** * Wrap address to socket pair. * - * @param inetAddress - * @param socket + * @param inetAddress internet address + * @param socket UDP socket */ - public record AddressSocket(InetAddress inetAddress, DatagramSocket socket) {} + private record AddressSocket(InetAddress inetAddress, DatagramSocket socket) {} - private Mono send(Flux mainAddresses) { + private Flux send(Flux mainAddresses) { // Try the main first - Flux addresses = mainAddresses.concatWith(getAdditionalAddresses()); + Flux addresses = mainAddresses.concatWith(getAdditionalAddresses()); - Flux v = + Flux v = addresses.zipWith( Mono.fromCallable(DatagramSocket::new).publishOn(Schedulers.boundedElastic()).repeat(), AddressSocket::new); - - var sendData = ("UDP_DISCOVERY_REQUEST " + serviceName).getBytes(); + var sendData = ("UDP_DISCOVERY_REQUEST " + config.serviceName).getBytes(); return v.flatMap( - addressWithSocket -> { - InetAddress address = addressWithSocket.inetAddress; - DatagramSocket socket = addressWithSocket.socket; - - logger.fine("Try to send: " + address.getHostAddress() + " " + socket.isClosed()); - - var sendPacket = new DatagramPacket(sendData, sendData.length, address, port); - - return Flux.from( - Mono.fromCallable( - () -> { - socket.send(sendPacket); - return 0; - }) - .publishOn(Schedulers.boundedElastic()) - .doOnError(throwable -> logger.warning("send error: " + throwable)) - .onErrorResume((throwable) -> Mono.empty()) - .doOnNext( - o -> - logger.finer( - ">>> Request packet sent to: " + address.getHostAddress()))); + addressWithSocket -> { + InetAddress address = addressWithSocket.inetAddress; + DatagramSocket socket = addressWithSocket.socket; + + logger.fine("Try to send: " + address.getHostAddress() + " " + socket.isClosed()); + + var sendPacket = new DatagramPacket(sendData, sendData.length, address, config.port); + + return Flux.from( + Mono.fromCallable( + () -> { + socket.send(sendPacket); + return socket; + }) + .publishOn(Schedulers.boundedElastic()) + .doOnError(throwable -> logger.warning("send error: " + throwable)) + .onErrorResume((throwable) -> Mono.empty()) + .doOnNext( + o -> + logger.finer(">>> Request packet sent to: " + address.getHostAddress()))); + }); + } + + private Mono receiveResponse(DatagramSocket socket) { + var buffer = new byte[15000]; + + var receivePacket = new DatagramPacket(buffer, buffer.length); + + return Mono.fromCallable( + () -> { + socket.receive(receivePacket); + return receivePacket; }) - .count(); + .publishOn(Schedulers.boundedElastic()) + .onErrorContinue((throwable, o) -> logger.warning("receive error: " + throwable)); } - private Mono> receive() { - return Mono.just(Set.of()); + private Flux receive(DatagramSocket socket) { + return getResponses(socket); + } + + private Flux getResponses(DatagramSocket socket) { + return Flux.from(receiveResponse(socket)) + .repeat(1) + .flatMap( + receivePacket -> { + var message = + new String( + receivePacket.getData(), + receivePacket.getOffset(), + receivePacket.getLength()); + + logger.fine( + ">>> Broadcast response from: " + + receivePacket.getAddress().getHostAddress() + + ", [" + + message + + "]"); + + if (message.startsWith("UDP_DISCOVERY_RESPONSE ")) { + return Mono.just(message.substring("UDP_DISCOVERY_RESPONSE ".length())); + } else { + return Mono.empty(); + } + }) + .doOnEach(stringSignal -> logger.fine("XXX: " + stringSignal)); } /** @@ -109,6 +210,11 @@ public static Flux getMainAddresses() { inetAddress -> logger.fine("get main address (CACHED): " + inetAddress.toString())); } + /** + * Enumerate network interfaces broadcast addresses able to transmit and receive packets + * + * @return addresses Reactive Streams {@link Publisher} + */ public static Flux getAdditionalAddresses() { return getNetworkInterfaces() .flatMap(Flux::fromStream) @@ -124,44 +230,13 @@ private static Flux> getNetworkInterfaces() { .onErrorResume(e -> Mono.just(Stream.empty()))); } - private Thread getThread(DatagramSocket socket, Set result) { - result.clear(); - - var th = - new Thread( - () -> { - var buffer = new byte[15000]; - - while (!Thread.interrupted()) { - var receivePacket = new DatagramPacket(buffer, buffer.length); - - try { - socket.receive(receivePacket); - } catch (IOException e) { - break; - } - - var message = - new String( - receivePacket.getData(), - receivePacket.getOffset(), - receivePacket.getLength()); - - logger.fine( - ">>> Broadcast response from: " - + receivePacket.getAddress().getHostAddress() - + ", [" - + message - + "]"); - - if (message.startsWith("UDP_DISCOVERY_RESPONSE ")) { - result.add(message.substring("UDP_DISCOVERY_RESPONSE ".length())); - } - } - }); - - th.start(); - return th; + private static boolean canSend(NetworkInterface networkInterface) { + try { + return !networkInterface.isLoopback() && networkInterface.isUp(); + } catch (SocketException e) { + logger.warning("interface is not allowed to send packets: " + networkInterface.getName()); + return false; + } } static final Logger logger = Logger.getLogger(DiscoveryClient.class.getName()); From 9ce1758964193b40e3cc230259f9f93b10b11c75 Mon Sep 17 00:00:00 2001 From: "vladimir.koltunov" Date: Sun, 18 May 2025 21:15:33 +0300 Subject: [PATCH 3/6] javadoc --- .../src/main/java/ExampleService.java | 10 +- .../mtbo/lcloud/discovery/AddressSocket.java | 11 + .../lcloud/discovery/DiscoveryClient.java | 282 ++++++++++++++++++ .../mtbo/lcloud/discovery/PacketSource.java | 7 +- .../lcloud/discovery/udp/DiscoveryClient.java | 243 --------------- .../discovery/udp/UdpDiscoveryClient.java | 183 ++++++++++++ 6 files changed, 487 insertions(+), 249 deletions(-) create mode 100644 lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/AddressSocket.java create mode 100644 lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryClient.java delete mode 100644 lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/DiscoveryClient.java create mode 100644 lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryClient.java diff --git a/lcloud-udp-discovery-example/src/main/java/ExampleService.java b/lcloud-udp-discovery-example/src/main/java/ExampleService.java index 19f9272..8cc5916 100644 --- a/lcloud-udp-discovery-example/src/main/java/ExampleService.java +++ b/lcloud-udp-discovery-example/src/main/java/ExampleService.java @@ -10,8 +10,8 @@ import java.util.logging.Logger; import java.util.stream.Collectors; import org.mtbo.lcloud.discovery.DiscoveryService; -import org.mtbo.lcloud.discovery.udp.DiscoveryClient; import org.mtbo.lcloud.discovery.udp.UdpConnection; +import org.mtbo.lcloud.discovery.udp.UdpDiscoveryClient; /** * Demo application @@ -37,8 +37,8 @@ public static void main(String[] args) throws InterruptedException, SocketExcept discoveryService.setDaemon(true); discoveryService.start(); - DiscoveryClient.Config config = new DiscoveryClient.Config("xService", 8888); - DiscoveryClient discoveryClient = new DiscoveryClient(config); + var config = new UdpDiscoveryClient.UdpConfig("xService", 8888); + var discoveryClient = new UdpDiscoveryClient(config); discoveryClient .startLookup(Duration.ofSeconds(2)) @@ -47,7 +47,7 @@ public static void main(String[] args) throws InterruptedException, SocketExcept String joined = instances.stream().sorted().collect(Collectors.joining("\n")); System.out.println("***********************************************"); - System.out.println(config.serviceName() + " instances are discovered:\n\n" + joined); + System.out.println(config.serviceName + " instances are discovered:\n\n" + joined); System.out.println("***********************************************\n"); }) .doOnError( @@ -67,7 +67,7 @@ public static void main(String[] args) throws InterruptedException, SocketExcept static { Handler handler = new ConsoleHandler(); - Level level = Level.INFO; + Level level = Level.FINE; handler.setLevel(level); Logger logger1 = Logger.getLogger(""); logger1.setLevel(level); diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/AddressSocket.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/AddressSocket.java new file mode 100644 index 0000000..2a952c7 --- /dev/null +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/AddressSocket.java @@ -0,0 +1,11 @@ +/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ + +package org.mtbo.lcloud.discovery; + +/** + * Wrap address to socket pair. + * + * @param address address from which send + * @param socket socket through which send + */ +public record AddressSocket(AddressType address, SocketType socket) {} diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryClient.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryClient.java new file mode 100644 index 0000000..0356774 --- /dev/null +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryClient.java @@ -0,0 +1,282 @@ +/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ + +package org.mtbo.lcloud.discovery; + +import java.time.Duration; +import java.util.HashSet; +import java.util.Objects; +import java.util.logging.Logger; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +/** Allows to request network services named by some {@link Config#serviceName serice name} */ +public abstract class DiscoveryClient { + + /** Configuration parameters */ + protected final Config config; + + /** + * Construct client with config + * + * @param config configuration, including {@link Config#serviceName service name} + */ + public DiscoveryClient(Config config) { + this.config = config; + } + + /** {@link DiscoveryClient discovery client's} config */ + public abstract static class Config { + /** unique service identifier */ + public final String serviceName; + + /** optional clients buffer size, {@link Integer#MAX_VALUE} for unlimited */ + public final int clientsCount; + + /** + * @param serviceName unique service identifier + * @param clientsCount optional clients buffer size, {@link Integer#MAX_VALUE} for unlimited + */ + public Config(String serviceName, int clientsCount) { + this.serviceName = serviceName; + this.clientsCount = clientsCount; + } + + /** + * Constructor with defaults + * + * @param serviceName unique service identifier + */ + public Config(String serviceName) { + this(serviceName, Integer.MAX_VALUE); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) return true; + if (obj == null || obj.getClass() != this.getClass()) return false; + var that = (Config) obj; + return Objects.equals(this.serviceName, that.serviceName) + && this.clientsCount == that.clientsCount; + } + + @Override + public int hashCode() { + return Objects.hash(serviceName, clientsCount); + } + + @Override + public String toString() { + return "Config[" + + "serviceName=" + + serviceName + + ", " + + "port=" + + ", " + + "clientsCount=" + + clientsCount + + ']'; + } + } + + /** + * Periodically look for network services named by {@link Config#serviceName} receiveInterval is + * calculated with minus 100 millis + * + * @param interval delay between requests + * @return {@link Publisher} of service instances id's set + */ + public final Flux> startLookup(Duration interval) { + Duration receiveTimeout = interval.minus(Duration.ofMillis(100)); + + if (receiveTimeout.isNegative()) { + receiveTimeout = interval; + } + + return startLookup(interval, receiveTimeout); + } + + /** + * Periodically look for network services named by {@link Config#serviceName} receiveInterval is + * calculated with minus 100 millis + * + * @param interval delay between requests + * @param receiveTimeout time interval to check responses from instances + * @return {@link Publisher} of service instances id's set + */ + public Flux> startLookup(Duration interval, Duration receiveTimeout) { + Flux mainAddresses = getMainBroadcastAddresses(); + + return Flux.interval(interval) + .flatMap(unused -> lookupOnceInternal(receiveTimeout, mainAddresses)) + .repeat(); + } + + /** + * Lookup services once + * + * @param receiveTimeout time interval to check responses from instances + * @return Single or zero instance {@link Publisher} of service instances id's set + */ + @SuppressWarnings("unused") + public Mono> lookupOnce(Duration receiveTimeout) { + Flux mainAddresses = getMainBroadcastAddresses(); + + return Mono.from(lookupOnceInternal(receiveTimeout, mainAddresses)); + } + + /** + * Internal method reuses cached mainAddresses + * + * @param receiveTimeout time interval to check responses from instances + * @param mainAddresses cached local broadcast addresses + * @return Single or zero instance {@link Publisher} of service instances id's set + */ + private Flux> lookupOnceInternal( + Duration receiveTimeout, Flux mainAddresses) { + return Flux.from(send(mainAddresses)) + .flatMap(this::receive) + .bufferTimeout(config.clientsCount, receiveTimeout) + .map(HashSet::new); + } + + /** + * Create socket operator + * + * @return socket + */ + protected abstract Mono createSocket(); + + /** + * Send broadcast via addresses operator + * + * @param mainAddresses precached addresses. May be appended with additional ones. + * @return sockets on which sending was performed. Can be used to receive responses. + */ + private Flux send(Flux mainAddresses) { + + // Try the main first + Flux addresses = mainAddresses.concatWith(getAdditionalAddresses()); + + Flux> v = + addresses.zipWith( + createSocket().publishOn(Schedulers.boundedElastic()).repeat(), AddressSocket::new); + + var sendData = ("UDP_DISCOVERY_REQUEST " + config.serviceName).getBytes(); + + return v.flatMap( + addressWithSocket -> { + AddressType address = addressWithSocket.address(); + SocketType socket = addressWithSocket.socket(); + + var sendPacket = createSendPacket(sendData, address); + + return Flux.from( + sendPacket(socket, sendPacket) + .doOnError(throwable -> logger.warning("send error: " + throwable)) + .onErrorResume((throwable) -> Mono.empty()) + .doOnNext(o -> logger.finer(">>> Request packet sent to: " + address))); + }); + } + + /** + * Send packet operator + * + * @param socket on which to send + * @param sendPacket packet to send + * @return mono operator + */ + protected abstract Mono sendPacket(SocketType socket, PacketType sendPacket); + + /** + * Create packet for send to address + * + * @param sendData data bytes + * @param address address to send + * @return packet + */ + protected abstract PacketType createSendPacket(byte[] sendData, AddressType address); + + private Mono receiveResponse(SocketType socket) { + + var receivePacket = createReceivePacket(); + + return receivePacket(socket, receivePacket) + .onErrorContinue((throwable, o) -> logger.warning("receive error: " + throwable)); + } + + /** + * Process packer receiving operator + * + * @param socket from which packet will be received + * @param receivePacket packet r/w + * @return mono operator + */ + protected abstract Mono receivePacket(SocketType socket, PacketType receivePacket); + + /** + * Create packet for receive + * + * @return packet + */ + protected abstract PacketType createReceivePacket(); + + private Flux receive(SocketType socket) { + return getResponses(socket); + } + + private Flux getResponses(SocketType socket) { + return Flux.from(receiveResponse(socket)) + .repeat(1) + .flatMap( + receivePacket -> { + var message = packetMessage(receivePacket); + + logger.fine( + ">>> Broadcast response from: " + + packetAddress(receivePacket) + + ", [" + + message + + "]"); + + if (message.startsWith("UDP_DISCOVERY_RESPONSE ")) { + return Mono.just(message.substring("UDP_DISCOVERY_RESPONSE ".length())); + } else { + return Mono.empty(); + } + }); + } + + /** + * Human's readable address name + * + * @param packet contains address + * @return display name + */ + protected abstract String packetAddress(PacketType packet); + + /** + * Parse packet for message + * + * @param receivedPacket received packet + * @return parsed message + */ + protected abstract String packetMessage(PacketType receivedPacket); + + /** + * Get IPv4 network broadcast address + * + * @return cached local broadcast addresses + */ + protected abstract Flux getMainBroadcastAddresses(); + + /** + * Enumerate network interfaces broadcast addresses able to transmit and receive packets + * + * @return addresses Reactive Streams {@link Publisher} + */ + protected abstract Flux getAdditionalAddresses(); + + static final Logger logger = Logger.getLogger(DiscoveryClient.class.getName()); +} diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketSource.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketSource.java index 1e256fb..cba5a9d 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketSource.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketSource.java @@ -43,7 +43,12 @@ public void process() { throw (RuntimeException) throwable; } }) - .thenAcceptAsync(lagOrDrop -> logger.finer("LAG OR DROP: " + lagOrDrop)) + .thenAcceptAsync( + lagOrDrop -> { + if (1 != lagOrDrop) { + logger.finer("LAG OR DROP: " + lagOrDrop); + } + }) .join(); } diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/DiscoveryClient.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/DiscoveryClient.java deleted file mode 100644 index fbdefcf..0000000 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/DiscoveryClient.java +++ /dev/null @@ -1,243 +0,0 @@ -/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ - -package org.mtbo.lcloud.discovery.udp; - -import java.net.*; -import java.time.Duration; -import java.util.HashSet; -import java.util.logging.Logger; -import java.util.stream.Stream; -import org.reactivestreams.Publisher; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; - -/** Allows to request network services named by some {@link Config#serviceName serice name} */ -public class DiscoveryClient { - - private final Config config; - - /** - * Construct client with config - * - * @param config configuration, including {@link Config#serviceName service name} and {@link - * Config#port port} - */ - public DiscoveryClient(Config config) { - this.config = config; - } - - /** - * {@link DiscoveryClient discovery client's} config - * - * @param serviceName unique service identifier - * @param port UDP port - * @param clientsCount optional clients buffer size, {@link Integer#MAX_VALUE} for unlimited - */ - public record Config(String serviceName, int port, int clientsCount) { - /** - * Constructor with defaults - * - * @param serviceName unique service identifier - * @param port UDP port - */ - public Config(String serviceName, int port) { - this(serviceName, port, Integer.MAX_VALUE); - } - } - - /** - * Periodically look for network services named by {@link Config#serviceName} receiveInterval is - * calculated with minus 100 millis - * - * @param interval delay between requests - * @return {@link Publisher} of service instances id's set - */ - public Flux> startLookup(Duration interval) { - Duration receiveTimeout = interval.minus(Duration.ofMillis(100)); - - if (receiveTimeout.isNegative()) { - receiveTimeout = interval; - } - - return startLookup(interval, receiveTimeout); - } - - /** - * Periodically look for network services named by {@link Config#serviceName} receiveInterval is - * calculated with minus 100 millis - * - * @param interval delay between requests - * @param receiveTimeout time interval to check responses from instances - * @return {@link Publisher} of service instances id's set - */ - public Flux> startLookup(Duration interval, Duration receiveTimeout) { - Flux mainAddresses = getMainAddresses(); - - return Flux.interval(interval) - .flatMap(unused -> lookupOnceInternal(receiveTimeout, mainAddresses)) - .repeat(); - } - - /** - * Lookup services once - * - * @param receiveTimeout time interval to check responses from instances - * @return Single or zero instance {@link Publisher} of service instances id's set - */ - public Mono> lookupOnce(Duration receiveTimeout) { - Flux mainAddresses = getMainAddresses(); - - return Mono.from(lookupOnceInternal(receiveTimeout, mainAddresses)); - } - - /** - * Internal method reuses cached mainAddresses - * - * @param receiveTimeout - * @param mainAddresses - * @return - */ - private Flux> lookupOnceInternal( - Duration receiveTimeout, Flux mainAddresses) { - return Flux.from(send(mainAddresses)) - .flatMap(this::receive) - .bufferTimeout(128, receiveTimeout) - .map(HashSet::new); - } - - /** - * Wrap address to socket pair. - * - * @param inetAddress internet address - * @param socket UDP socket - */ - private record AddressSocket(InetAddress inetAddress, DatagramSocket socket) {} - - private Flux send(Flux mainAddresses) { - - // Try the main first - Flux addresses = mainAddresses.concatWith(getAdditionalAddresses()); - - Flux v = - addresses.zipWith( - Mono.fromCallable(DatagramSocket::new).publishOn(Schedulers.boundedElastic()).repeat(), - AddressSocket::new); - - var sendData = ("UDP_DISCOVERY_REQUEST " + config.serviceName).getBytes(); - - return v.flatMap( - addressWithSocket -> { - InetAddress address = addressWithSocket.inetAddress; - DatagramSocket socket = addressWithSocket.socket; - - logger.fine("Try to send: " + address.getHostAddress() + " " + socket.isClosed()); - - var sendPacket = new DatagramPacket(sendData, sendData.length, address, config.port); - - return Flux.from( - Mono.fromCallable( - () -> { - socket.send(sendPacket); - return socket; - }) - .publishOn(Schedulers.boundedElastic()) - .doOnError(throwable -> logger.warning("send error: " + throwable)) - .onErrorResume((throwable) -> Mono.empty()) - .doOnNext( - o -> - logger.finer(">>> Request packet sent to: " + address.getHostAddress()))); - }); - } - - private Mono receiveResponse(DatagramSocket socket) { - var buffer = new byte[15000]; - - var receivePacket = new DatagramPacket(buffer, buffer.length); - - return Mono.fromCallable( - () -> { - socket.receive(receivePacket); - return receivePacket; - }) - .publishOn(Schedulers.boundedElastic()) - .onErrorContinue((throwable, o) -> logger.warning("receive error: " + throwable)); - } - - private Flux receive(DatagramSocket socket) { - return getResponses(socket); - } - - private Flux getResponses(DatagramSocket socket) { - return Flux.from(receiveResponse(socket)) - .repeat(1) - .flatMap( - receivePacket -> { - var message = - new String( - receivePacket.getData(), - receivePacket.getOffset(), - receivePacket.getLength()); - - logger.fine( - ">>> Broadcast response from: " - + receivePacket.getAddress().getHostAddress() - + ", [" - + message - + "]"); - - if (message.startsWith("UDP_DISCOVERY_RESPONSE ")) { - return Mono.just(message.substring("UDP_DISCOVERY_RESPONSE ".length())); - } else { - return Mono.empty(); - } - }) - .doOnEach(stringSignal -> logger.fine("XXX: " + stringSignal)); - } - - /** - * Get IPv4 network broadcast address - * - * @return cached local broadcast address - */ - public static Flux getMainAddresses() { - return Flux.from( - Mono.fromCallable(() -> InetAddress.getByName("255.255.255.255")) - .publishOn(Schedulers.boundedElastic())) - .doOnNext(inetAddress -> logger.fine("get main address (REAL): " + inetAddress.toString())) - .cache() - .doOnNext( - inetAddress -> logger.fine("get main address (CACHED): " + inetAddress.toString())); - } - - /** - * Enumerate network interfaces broadcast addresses able to transmit and receive packets - * - * @return addresses Reactive Streams {@link Publisher} - */ - public static Flux getAdditionalAddresses() { - return getNetworkInterfaces() - .flatMap(Flux::fromStream) - .filter(DiscoveryClient::canSend) - .flatMapIterable(NetworkInterface::getInterfaceAddresses) - .mapNotNull(InterfaceAddress::getBroadcast); - } - - private static Flux> getNetworkInterfaces() { - return Flux.from( - Mono.fromCallable(NetworkInterface::networkInterfaces) - .publishOn(Schedulers.boundedElastic()) - .onErrorResume(e -> Mono.just(Stream.empty()))); - } - - private static boolean canSend(NetworkInterface networkInterface) { - try { - return !networkInterface.isLoopback() && networkInterface.isUp(); - } catch (SocketException e) { - logger.warning("interface is not allowed to send packets: " + networkInterface.getName()); - return false; - } - } - - static final Logger logger = Logger.getLogger(DiscoveryClient.class.getName()); -} diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryClient.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryClient.java new file mode 100644 index 0000000..1226c2e --- /dev/null +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryClient.java @@ -0,0 +1,183 @@ +/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ + +package org.mtbo.lcloud.discovery.udp; + +import java.net.*; +import java.util.Objects; +import java.util.logging.Logger; +import java.util.stream.Stream; +import org.mtbo.lcloud.discovery.DiscoveryClient; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +/** UDP DiscoveryClient implementation */ +public class UdpDiscoveryClient + extends DiscoveryClient { + + /** + * Construct client with config + * + * @param config configuration, including {@link Config#serviceName service name} and {@link + * UdpConfig#port} + */ + public UdpDiscoveryClient(UdpConfig config) { + super(config); + } + + @Override + protected Mono createSocket() { + return Mono.fromCallable(DatagramSocket::new); + } + + @Override + protected Mono sendPacket(DatagramSocket socket, DatagramPacket sendPacket) { + return Mono.fromCallable( + () -> { + socket.send(sendPacket); + return socket; + }) + .publishOn(Schedulers.boundedElastic()); + } + + @Override + protected DatagramPacket createSendPacket(byte[] sendData, InetAddress address) { + return new DatagramPacket(sendData, sendData.length, address, ((UdpConfig) config).port); + } + + @Override + protected Mono receivePacket( + DatagramSocket socket, DatagramPacket receivePacket) { + return Mono.fromCallable( + () -> { + socket.receive(receivePacket); + return receivePacket; + }) + .publishOn(Schedulers.boundedElastic()); + } + + @Override + protected DatagramPacket createReceivePacket() { + var buffer = new byte[4096]; + return new DatagramPacket(buffer, buffer.length); + } + + @Override + protected String packetMessage(DatagramPacket receivedPacket) { + return new String( + receivedPacket.getData(), receivedPacket.getOffset(), receivedPacket.getLength()); + } + + @Override + protected String packetAddress(DatagramPacket packet) { + return packet.getAddress().getHostAddress(); + } + + /** + * Get IPv4 network broadcast addresses + * + * @return cached local broadcast address + */ + @Override + protected Flux getMainBroadcastAddresses() { + return Flux.from( + Mono.fromCallable(() -> InetAddress.getByName("255.255.255.255")) + .publishOn(Schedulers.boundedElastic())) + .doOnNext(inetAddress -> logger.fine("get main address (REAL): " + inetAddress.toString())) + .cache() + .doOnNext( + inetAddress -> logger.fine("get main address (CACHED): " + inetAddress.toString())); + } + + @Override + protected Flux getAdditionalAddresses() { + return getNetworkInterfaces() + .flatMap(Flux::fromStream) + .filter(this::canSend) + .flatMapIterable(NetworkInterface::getInterfaceAddresses) + .mapNotNull(InterfaceAddress::getBroadcast); + } + + /** + * Network interfaces + * + * @return network interfaces as {@link Stream} + */ + private Flux> getNetworkInterfaces() { + return Flux.from( + Mono.fromCallable(NetworkInterface::networkInterfaces) + .publishOn(Schedulers.boundedElastic()) + .onErrorResume(e -> Mono.just(Stream.empty()))); + } + + /** + * Is network interface able to send packets + * + * @return able to send + */ + private boolean canSend(NetworkInterface networkInterface) { + try { + return !networkInterface.isLoopback() && networkInterface.isUp(); + } catch (SocketException e) { + logger.warning("interface is not allowed to send packets: " + networkInterface.getName()); + return false; + } + } + + /** {@link UdpDiscoveryClient UDP discovery client's} config */ + public static final class UdpConfig extends Config { + private final int port; + + /** + * Constructor + * + * @param serviceName unique service identifier + * @param clientsCount max instances list size (default to {@link Integer#MAX_VALUE}) + * @param port UDP port + */ + @SuppressWarnings("unused") + public UdpConfig(String serviceName, int clientsCount, int port) { + super(serviceName, clientsCount); + this.port = port; + } + + /** + * Constructor with defaults + * + * @param serviceName unique service identifier + * @param port UDP port + */ + public UdpConfig(String serviceName, int port) { + super(serviceName); + this.port = port; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + UdpConfig udpConfig = (UdpConfig) o; + return port == udpConfig.port; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), port); + } + + @Override + public String toString() { + return "UdpConfig{" + + "port=" + + port + + ", serviceName='" + + serviceName + + '\'' + + ", clientsCount=" + + clientsCount + + '}'; + } + } + + static final Logger logger = Logger.getLogger(UdpDiscoveryClient.class.getName()); +} From 2af0f577dca12ec9a97bda1388f09d23593f10a0 Mon Sep 17 00:00:00 2001 From: "vladimir.koltunov" Date: Sun, 18 May 2025 21:19:35 +0300 Subject: [PATCH 4/6] javadoc --- .../main/java/org/mtbo/lcloud/discovery/DiscoveryClient.java | 2 +- .../java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryClient.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryClient.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryClient.java index 0356774..03d35b1 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryClient.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryClient.java @@ -161,7 +161,7 @@ private Flux send(Flux mainAddresses) { Flux> v = addresses.zipWith( - createSocket().publishOn(Schedulers.boundedElastic()).repeat(), AddressSocket::new); + createSocket().repeat(), AddressSocket::new); var sendData = ("UDP_DISCOVERY_REQUEST " + config.serviceName).getBytes(); diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryClient.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryClient.java index 1226c2e..df81534 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryClient.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryClient.java @@ -27,7 +27,7 @@ public UdpDiscoveryClient(UdpConfig config) { @Override protected Mono createSocket() { - return Mono.fromCallable(DatagramSocket::new); + return Mono.fromCallable(DatagramSocket::new).publishOn(Schedulers.boundedElastic()); } @Override From bab0178f9dbf64b58e8e8dac8222083e2625904b Mon Sep 17 00:00:00 2001 From: "vladimir.koltunov" Date: Sun, 18 May 2025 21:50:42 +0300 Subject: [PATCH 5/6] publishing settings --- lcloud-udp-discovery/build.gradle | 32 +++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/lcloud-udp-discovery/build.gradle b/lcloud-udp-discovery/build.gradle index e9ed915..b2bd0a5 100644 --- a/lcloud-udp-discovery/build.gradle +++ b/lcloud-udp-discovery/build.gradle @@ -27,24 +27,24 @@ java { publishing { repositories { - maven { - url = layout.buildDirectory.dir('staging-deploy') - } - // maven { -// name = "GitHubPackages" -// url = uri("https://maven.pkg.github.com/mtbo-org/lcloud") -// credentials { -// username = findProperty("gpr.user") ?: System.getenv("USERNAME") -// password = findProperty("gpr.token") ?: System.getenv("TOKEN") -// } -// -//// def releasesRepoUrl = layout.buildDirectory.dir('repos/releases') -//// def snapshotsRepoUrl = layout.buildDirectory.dir('repos/snapshots') -//// url = version.endsWith('SNAPSHOT') ? snapshotsRepoUrl : releasesRepoUrl -// -// uri('https://maven.pkg.github.com/mtbo-org/lcloud') +// url = layout.buildDirectory.dir('staging-deploy') // } + + maven { + name = "GitHubPackages" + url = uri("https://maven.pkg.github.com/mtbo-org/lcloud") + credentials { + username = findProperty("gpr.user") ?: System.getenv("USERNAME") + password = findProperty("gpr.token") ?: System.getenv("TOKEN") + } + +// def releasesRepoUrl = layout.buildDirectory.dir('repos/releases') +// def snapshotsRepoUrl = layout.buildDirectory.dir('repos/snapshots') +// url = version.endsWith('SNAPSHOT') ? snapshotsRepoUrl : releasesRepoUrl + + uri('https://maven.pkg.github.com/mtbo-org/lcloud') + } } publications { From 7bf19f672a5b1894a484d2142bb6201d9a8a0cdb Mon Sep 17 00:00:00 2001 From: "vladimir.koltunov" Date: Mon, 19 May 2025 01:45:11 +0300 Subject: [PATCH 6/6] service implemented using Reactor --- build.gradle | 2 +- lcloud-udp-discovery-example/build.gradle | 2 +- .../src/main/java/ExampleService.java | 60 ++++---- lcloud-udp-discovery/build.gradle | 2 +- .../mtbo/lcloud/discovery/ClientConfig.java | 59 ++++++++ .../org/mtbo/lcloud/discovery/Connection.java | 20 ++- .../lcloud/discovery/DiscoveryClient.java | 114 +++++---------- .../lcloud/discovery/DiscoveryService.java | 99 +++++++++---- .../org/mtbo/lcloud/discovery/Listener.java | 7 +- .../org/mtbo/lcloud/discovery/Packet.java | 4 +- .../lcloud/discovery/PacketProcessor.java | 81 ----------- .../mtbo/lcloud/discovery/PacketSource.java | 56 -------- .../org/mtbo/lcloud/discovery/Sender.java | 11 +- .../mtbo/lcloud/discovery/ServiceConfig.java | 48 +++++++ .../lcloud/discovery/udp/UdpClientConfig.java | 54 ++++++++ .../lcloud/discovery/udp/UdpConnection.java | 131 ++++++++---------- .../discovery/udp/UdpDiscoveryClient.java | 68 +-------- .../discovery/udp/UdpDiscoveryService.java | 20 +++ .../mtbo/lcloud/discovery/udp/UdpPacket.java | 27 ++++ .../discovery/udp/UdpServiceConfig.java | 39 ++++++ 20 files changed, 479 insertions(+), 425 deletions(-) create mode 100644 lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/ClientConfig.java delete mode 100644 lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketProcessor.java delete mode 100644 lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketSource.java create mode 100644 lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/ServiceConfig.java create mode 100644 lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpClientConfig.java create mode 100644 lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryService.java create mode 100644 lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpPacket.java create mode 100644 lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpServiceConfig.java diff --git a/build.gradle b/build.gradle index 8b2c85a..9fb5f3a 100644 --- a/build.gradle +++ b/build.gradle @@ -15,7 +15,7 @@ def javaProjects = [ configure(javaProjects) { - version '3.0.0' + version '3.0.1' repositories { mavenCentral() if (project.version.endsWith('-SNAPSHOT')) { diff --git a/lcloud-udp-discovery-example/build.gradle b/lcloud-udp-discovery-example/build.gradle index a2063d0..3a0d95e 100644 --- a/lcloud-udp-discovery-example/build.gradle +++ b/lcloud-udp-discovery-example/build.gradle @@ -4,7 +4,7 @@ plugins { } group = 'org.mtbo.lcloud' -version = '3.0.0-SNAPSHOT' +version = '3.0.1-SNAPSHOT' apply plugin: 'application' mainClassName = 'ExampleService' diff --git a/lcloud-udp-discovery-example/src/main/java/ExampleService.java b/lcloud-udp-discovery-example/src/main/java/ExampleService.java index 8cc5916..9fbbca0 100644 --- a/lcloud-udp-discovery-example/src/main/java/ExampleService.java +++ b/lcloud-udp-discovery-example/src/main/java/ExampleService.java @@ -9,9 +9,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; -import org.mtbo.lcloud.discovery.DiscoveryService; -import org.mtbo.lcloud.discovery.udp.UdpConnection; -import org.mtbo.lcloud.discovery.udp.UdpDiscoveryClient; +import org.mtbo.lcloud.discovery.udp.*; /** * Demo application @@ -28,39 +26,43 @@ public class ExampleService { * @throws SocketException in case of fatal network error */ public static void main(String[] args) throws InterruptedException, SocketException { - DiscoveryService discoveryService = - new DiscoveryService( - Optional.ofNullable(System.getenv("HOSTNAME")).orElse(UUID.randomUUID().toString()), + + final int port = 8888; + var serviceConfig = + new UdpServiceConfig( "xService", - new UdpConnection(8888)); + Optional.ofNullable(System.getenv("HOSTNAME")).orElse(UUID.randomUUID().toString()), + port); + var discoveryService = new UdpDiscoveryService(serviceConfig); - discoveryService.setDaemon(true); - discoveryService.start(); + var clientConfig = new UdpClientConfig("xService", port); + var discoveryClient = new UdpDiscoveryClient(clientConfig); - var config = new UdpDiscoveryClient.UdpConfig("xService", 8888); - var discoveryClient = new UdpDiscoveryClient(config); + var serviceListener = discoveryService.listen(new UdpConnection(port)).subscribe(); - discoveryClient - .startLookup(Duration.ofSeconds(2)) - .doOnNext( - instances -> { - String joined = instances.stream().sorted().collect(Collectors.joining("\n")); + var clientListener = + discoveryClient + .startLookup(Duration.ofSeconds(2)) + .doOnNext( + instances -> { + String joined = instances.stream().sorted().collect(Collectors.joining("\n")); - System.out.println("***********************************************"); - System.out.println(config.serviceName + " instances are discovered:\n\n" + joined); - System.out.println("***********************************************\n"); - }) - .doOnError( - throwable -> { - logger.severe("Lookup Error: " + throwable.getMessage()); - logger.throwing(ExampleService.class.getName(), "main", throwable); - }) - .onErrorComplete(throwable -> !(throwable instanceof InterruptedException)) - .subscribe(); + System.out.println("***********************************************"); + System.out.println( + clientConfig.serviceName + " instances are discovered:\n\n" + joined); + System.out.println("***********************************************\n"); + }) + .doOnError( + throwable -> { + logger.severe("Lookup Error: " + throwable.getMessage()); + logger.throwing(ExampleService.class.getName(), "main", throwable); + }) + .onErrorComplete(throwable -> !(throwable instanceof InterruptedException)) + .subscribe(); Thread.sleep(10000); - - discoveryService.shutdown(); + clientListener.dispose(); + serviceListener.dispose(); } static Logger logger = Logger.getLogger(ExampleService.class.getName()); diff --git a/lcloud-udp-discovery/build.gradle b/lcloud-udp-discovery/build.gradle index b2bd0a5..3e8f8a6 100644 --- a/lcloud-udp-discovery/build.gradle +++ b/lcloud-udp-discovery/build.gradle @@ -4,7 +4,7 @@ plugins { } group = 'org.mtbo.lcloud' -version = '3.0.0-SNAPSHOT' +version = '3.0.1-SNAPSHOT' repositories { mavenCentral() diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/ClientConfig.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/ClientConfig.java new file mode 100644 index 0000000..0258946 --- /dev/null +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/ClientConfig.java @@ -0,0 +1,59 @@ +/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ + +package org.mtbo.lcloud.discovery; + +import java.util.Objects; + +/** Discovery config */ +public abstract class ClientConfig { + /** unique service identifier */ + public final String serviceName; + + /** optional clients buffer size, {@link Integer#MAX_VALUE} for unlimited */ + public final int endpointsCount; + + /** + * @param serviceName unique service identifier + * @param endpointsCount optional clients buffer size, {@link Integer#MAX_VALUE} for unlimited + */ + public ClientConfig(String serviceName, int endpointsCount) { + this.serviceName = serviceName; + this.endpointsCount = endpointsCount; + } + + /** + * Constructor with defaults + * + * @param serviceName unique service identifier + */ + public ClientConfig(String serviceName) { + this(serviceName, Integer.MAX_VALUE); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) return true; + if (obj == null || obj.getClass() != this.getClass()) return false; + var that = (ClientConfig) obj; + return Objects.equals(this.serviceName, that.serviceName) + && this.endpointsCount == that.endpointsCount; + } + + @Override + public int hashCode() { + return Objects.hash(serviceName, endpointsCount); + } + + @Override + public String toString() { + return "Config[" + + "serviceName=" + + serviceName + + ", " + + "port=" + + ", " + + "clientsCount=" + + endpointsCount + + ']'; + } +} diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Connection.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Connection.java index 2a587c7..afab7f3 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Connection.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Connection.java @@ -2,5 +2,23 @@ package org.mtbo.lcloud.discovery; +import reactor.core.publisher.Mono; + /** Allows to send and receive packets */ -public interface Connection extends Listener, Sender {} +public interface Connection + extends Listener, Sender { + + /** + * Create unbounded socket + * + * @return created socket + */ + Mono socket(); + + /** + * Close socket. Shadow exception. + * + * @param socket to close + */ + void close(SocketType socket); +} diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryClient.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryClient.java index 03d35b1..60c18f4 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryClient.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryClient.java @@ -3,91 +3,36 @@ package org.mtbo.lcloud.discovery; import java.time.Duration; -import java.util.HashSet; -import java.util.Objects; +import java.util.Set; import java.util.logging.Logger; +import java.util.stream.Collectors; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; -/** Allows to request network services named by some {@link Config#serviceName serice name} */ -public abstract class DiscoveryClient { +/** Allows to request network services named by some {@link ClientConfig#serviceName serice name} */ +public abstract class DiscoveryClient { /** Configuration parameters */ - protected final Config config; + protected final ClientConfig config; /** * Construct client with config * - * @param config configuration, including {@link Config#serviceName service name} + * @param config configuration, including {@link ClientConfig#serviceName service name} */ - public DiscoveryClient(Config config) { + public DiscoveryClient(ClientConfig config) { this.config = config; } - /** {@link DiscoveryClient discovery client's} config */ - public abstract static class Config { - /** unique service identifier */ - public final String serviceName; - - /** optional clients buffer size, {@link Integer#MAX_VALUE} for unlimited */ - public final int clientsCount; - - /** - * @param serviceName unique service identifier - * @param clientsCount optional clients buffer size, {@link Integer#MAX_VALUE} for unlimited - */ - public Config(String serviceName, int clientsCount) { - this.serviceName = serviceName; - this.clientsCount = clientsCount; - } - - /** - * Constructor with defaults - * - * @param serviceName unique service identifier - */ - public Config(String serviceName) { - this(serviceName, Integer.MAX_VALUE); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) return true; - if (obj == null || obj.getClass() != this.getClass()) return false; - var that = (Config) obj; - return Objects.equals(this.serviceName, that.serviceName) - && this.clientsCount == that.clientsCount; - } - - @Override - public int hashCode() { - return Objects.hash(serviceName, clientsCount); - } - - @Override - public String toString() { - return "Config[" - + "serviceName=" - + serviceName - + ", " - + "port=" - + ", " - + "clientsCount=" - + clientsCount - + ']'; - } - } - /** - * Periodically look for network services named by {@link Config#serviceName} receiveInterval is - * calculated with minus 100 millis + * Periodically look for network services named by {@link ClientConfig#serviceName} + * receiveInterval is calculated with minus 100 millis * * @param interval delay between requests * @return {@link Publisher} of service instances id's set */ - public final Flux> startLookup(Duration interval) { + public final Flux> startLookup(Duration interval) { Duration receiveTimeout = interval.minus(Duration.ofMillis(100)); if (receiveTimeout.isNegative()) { @@ -98,14 +43,14 @@ public final Flux> startLookup(Duration interval) { } /** - * Periodically look for network services named by {@link Config#serviceName} receiveInterval is - * calculated with minus 100 millis + * Periodically look for network services named by {@link ClientConfig#serviceName} + * receiveInterval is calculated with minus 100 millis * * @param interval delay between requests * @param receiveTimeout time interval to check responses from instances * @return {@link Publisher} of service instances id's set */ - public Flux> startLookup(Duration interval, Duration receiveTimeout) { + public Flux> startLookup(Duration interval, Duration receiveTimeout) { Flux mainAddresses = getMainBroadcastAddresses(); return Flux.interval(interval) @@ -120,7 +65,7 @@ public Flux> startLookup(Duration interval, Duration receiveTime * @return Single or zero instance {@link Publisher} of service instances id's set */ @SuppressWarnings("unused") - public Mono> lookupOnce(Duration receiveTimeout) { + public Mono> lookupOnce(Duration receiveTimeout) { Flux mainAddresses = getMainBroadcastAddresses(); return Mono.from(lookupOnceInternal(receiveTimeout, mainAddresses)); @@ -133,12 +78,26 @@ public Mono> lookupOnce(Duration receiveTimeout) { * @param mainAddresses cached local broadcast addresses * @return Single or zero instance {@link Publisher} of service instances id's set */ - private Flux> lookupOnceInternal( + private Flux> lookupOnceInternal( Duration receiveTimeout, Flux mainAddresses) { return Flux.from(send(mainAddresses)) - .flatMap(this::receive) - .bufferTimeout(config.clientsCount, receiveTimeout) - .map(HashSet::new); + .flatMap( + socket -> + Flux.using( + () -> socket, + this::receive, + (s) -> { + try { + s.close(); + logger.finer("Closed socket"); + } catch (Exception ignored) { + logger.warning("Failed to close socket"); + } + }, + true) + .timeout(receiveTimeout, Flux.empty())) + .bufferTimeout(config.endpointsCount, receiveTimeout) + .map(strings -> strings.stream().filter(s -> !s.isEmpty()).collect(Collectors.toSet())); } /** @@ -160,8 +119,7 @@ private Flux send(Flux mainAddresses) { Flux addresses = mainAddresses.concatWith(getAdditionalAddresses()); Flux> v = - addresses.zipWith( - createSocket().repeat(), AddressSocket::new); + addresses.zipWith(createSocket().repeat(), AddressSocket::new); var sendData = ("UDP_DISCOVERY_REQUEST " + config.serviceName).getBytes(); @@ -223,12 +181,8 @@ private Mono receiveResponse(SocketType socket) { protected abstract PacketType createReceivePacket(); private Flux receive(SocketType socket) { - return getResponses(socket); - } - - private Flux getResponses(SocketType socket) { return Flux.from(receiveResponse(socket)) - .repeat(1) + .repeat() .flatMap( receivePacket -> { var message = packetMessage(receivePacket); diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryService.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryService.java index 7d67235..ec991c8 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryService.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/DiscoveryService.java @@ -2,43 +2,84 @@ package org.mtbo.lcloud.discovery; +import java.io.Closeable; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.logging.Logger; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + /** Packets pull-push loop thread */ -public final class DiscoveryService extends Thread { - private final PacketSource source; - private final PacketProcessor processor; +public abstract class DiscoveryService< + SocketType extends Closeable, PacketType extends Packet> { + + private final ByteBuffer prefix; + private final byte[] sendData; /** - * Construct loop + * Construct service with config * - * @param instanceName instance name - * @param serviceName instance namespace - * @param connection to push packets + * @param config configuration, including {@link ServiceConfig#serviceName service name} */ - public DiscoveryService(String instanceName, String serviceName, Connection connection) { - source = new PacketSource(connection); - processor = new PacketProcessor(instanceName, serviceName, connection); - source.subscribe(processor); + public DiscoveryService(ServiceConfig config) { + this.prefix = + ByteBuffer.wrap( + ("UDP_DISCOVERY_REQUEST " + config.serviceName).getBytes(StandardCharsets.UTF_8)) + .asReadOnlyBuffer(); + this.sendData = + ("UDP_DISCOVERY_RESPONSE " + config.instanceName).getBytes(StandardCharsets.UTF_8); } - @Override - public void run() { - - while (!interrupted()) { - source.process(); - } + /** + * Create listen operator + * + * @param connection used for listen + * @return flux with true in case of accepted packet, else false + */ + public Flux listen(Connection connection) { + return Flux.from(connection.socket()) + .flatMap( + socket -> + Flux.using( + () -> socket, + (listenSocket) -> loop(connection, listenSocket), + connection::close, + true) + .publishOn(Schedulers.boundedElastic())); } - /** Break loop */ - public void shutdown() { - processor.shutdown(); - interrupt(); - if (!interrupted()) { - try { - join(); - } catch (InterruptedException ignored) { - } - } - - source.close(); + private Flux loop( + Connection connection, SocketType listenSocket) { + return connection + .receive(listenSocket) + .zipWith(Mono.just(listenSocket)) + .doOnNext( + tuple -> { + ByteBuffer data = tuple.getT1().data(); + logger.finer( + ">>> Packet received; packet: " + + new String(data.array(), data.arrayOffset(), data.limit())); + }) + .repeat() + .flatMap( + tuple -> { + PacketType packet = tuple.getT1(); + SocketType sendSocket = tuple.getT2(); + + if (prefix.equals(packet.data().slice(0, prefix.limit()))) { + return connection + .send(sendSocket, packet.copyWithData(ByteBuffer.wrap(sendData))) + .doOnError( + throwable -> { + logger.severe(throwable.getMessage()); + logger.throwing(DiscoveryService.class.getSimpleName(), "run", throwable); + }); + } else { + return Mono.just(false); + } + }); } + + static final Logger logger = Logger.getLogger(DiscoveryService.class.getName()); } diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Listener.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Listener.java index 7d0cee6..1f88a0a 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Listener.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Listener.java @@ -2,14 +2,15 @@ package org.mtbo.lcloud.discovery; -import java.util.concurrent.CompletableFuture; +import reactor.core.publisher.Mono; /** Allows to receive packets */ -public interface Listener { +public interface Listener { /** * Receive packets * + * @param socket on which to receive packet * @return packet from incoming connection */ - CompletableFuture receive(); + Mono receive(SocketType socket); } diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Packet.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Packet.java index c84fd00..56509c2 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Packet.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Packet.java @@ -5,7 +5,7 @@ import java.nio.ByteBuffer; /** Piece of data */ -public interface Packet { +public interface Packet { /** * @return wrapped data @@ -16,5 +16,5 @@ public interface Packet { * @param wrap new data * @return new packet with data replaced. Remaining field should be copied. */ - Packet copyWithData(ByteBuffer wrap); + PacketType copyWithData(ByteBuffer wrap); } diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketProcessor.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketProcessor.java deleted file mode 100644 index 114d082..0000000 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketProcessor.java +++ /dev/null @@ -1,81 +0,0 @@ -/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ - -package org.mtbo.lcloud.discovery; - -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.Flow; -import java.util.logging.Logger; - -/** Process {@link Packet packets} and send responses to {@link #sender} */ -public class PacketProcessor implements Flow.Subscriber { - private final Sender sender; - - private final ByteBuffer prefix; - private final byte[] sendData; - - private Flow.Subscription subscription; - - /** - * Constructor - * - * @param instanceName service instance name - * @param serviceName service name - * @param sender downstream - */ - public PacketProcessor(String instanceName, String serviceName, Sender sender) { - this.sender = sender; - - this.prefix = - ByteBuffer.wrap(("UDP_DISCOVERY_REQUEST " + serviceName).getBytes(StandardCharsets.UTF_8)) - .asReadOnlyBuffer(); - this.sendData = ("UDP_DISCOVERY_RESPONSE " + instanceName).getBytes(StandardCharsets.UTF_8); - } - - /** Shutdown {@link #subscription} */ - public void shutdown() { - subscription.cancel(); - } - - @Override - public void onSubscribe(Flow.Subscription subscription) { - this.subscription = subscription; - subscription.request(1); - } - - @Override - public void onNext(Packet item) { - ByteBuffer data = item.data(); - logger.finer( - ">>> Packet received; packet: " - + new String(data.array(), data.arrayOffset(), data.limit())); - - if (prefix.equals(data.slice(0, prefix.limit()))) { - // Send a response - sender - .send(item.copyWithData(ByteBuffer.wrap(sendData))) - .exceptionallyAsync( - throwable -> { - logger.severe(throwable.getMessage()); - logger.throwing(PacketProcessor.class.getSimpleName(), "run", throwable); - return null; - }); - } - - // Enqueue next packet processing - subscription.request(1); - } - - @Override - public void onError(Throwable throwable) { - logger.severe(throwable.getMessage()); - logger.throwing(PacketProcessor.class.getSimpleName(), "onError", throwable); - } - - @Override - public void onComplete() { - logger.severe("Stop response"); - } - - static final Logger logger = Logger.getLogger(PacketProcessor.class.getName()); -} diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketSource.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketSource.java deleted file mode 100644 index cba5a9d..0000000 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/PacketSource.java +++ /dev/null @@ -1,56 +0,0 @@ -/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ - -package org.mtbo.lcloud.discovery; - -import java.util.concurrent.SubmissionPublisher; -import java.util.logging.Logger; -import org.mtbo.lcloud.discovery.exceptions.NetworkException; - -/** Incoming packet {@link java.util.concurrent.Flow.Publisher publisher} */ -public final class PacketSource extends SubmissionPublisher { - - private final Listener listener; - - /** - * Constructor - * - * @param listener incoming packets source - */ - public PacketSource(Listener listener) { - this.listener = listener; - } - - /** Network receive/send step. */ - public void process() { - listener - .receive() - .handleAsync( - (packet, throwable) -> { - if (throwable instanceof RuntimeException) { - throw (RuntimeException) throwable; - } - return offer(packet, (subscriber, packetUnused) -> false); - }) - .exceptionallyAsync( - throwable -> { - logger.severe("Unable to receive: " + throwable.getMessage()); - throwable.printStackTrace(); - logger.throwing(PacketSource.class.getName(), "process", throwable); - if (throwable instanceof NetworkException - || !(throwable instanceof RuntimeException)) { - return Integer.MAX_VALUE; - } else { - throw (RuntimeException) throwable; - } - }) - .thenAcceptAsync( - lagOrDrop -> { - if (1 != lagOrDrop) { - logger.finer("LAG OR DROP: " + lagOrDrop); - } - }) - .join(); - } - - static final Logger logger = Logger.getLogger(PacketSource.class.getName()); -} diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Sender.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Sender.java index e5d95d0..2986fb9 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Sender.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/Sender.java @@ -2,15 +2,16 @@ package org.mtbo.lcloud.discovery; -import java.util.concurrent.CompletableFuture; +import reactor.core.publisher.Mono; /** Allows to send packets */ -public interface Sender { +public interface Sender { /** - * Send packet + * Send packet operator * + * @param socket on which to send packet * @param packet packet with data and connection parameters - * @return void future + * @return void mono */ - CompletableFuture send(Packet packet); + Mono send(SocketType socket, PacketType packet); } diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/ServiceConfig.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/ServiceConfig.java new file mode 100644 index 0000000..b17b461 --- /dev/null +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/ServiceConfig.java @@ -0,0 +1,48 @@ +/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ + +package org.mtbo.lcloud.discovery; + +import java.util.Objects; + +/** Discovery config */ +public abstract class ServiceConfig { + /** unique service identifier */ + public final String serviceName; + + /** this endpoint name */ + public final String instanceName; + + /** + * @param serviceName unique service identifier + * @param instanceName this endpoint name + */ + public ServiceConfig(String serviceName, String instanceName) { + this.serviceName = serviceName; + this.instanceName = instanceName; + } + + @Override + public String toString() { + return "ServiceConfig{" + + "serviceName='" + + serviceName + + '\'' + + ", instanceName='" + + instanceName + + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + ServiceConfig that = (ServiceConfig) o; + return Objects.equals(serviceName, that.serviceName) + && Objects.equals(instanceName, that.instanceName); + } + + @Override + public int hashCode() { + return Objects.hash(serviceName, instanceName); + } +} diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpClientConfig.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpClientConfig.java new file mode 100644 index 0000000..68db9ed --- /dev/null +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpClientConfig.java @@ -0,0 +1,54 @@ +/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ + +package org.mtbo.lcloud.discovery.udp; + +import java.util.Objects; +import org.mtbo.lcloud.discovery.ClientConfig; + +/** {@link UdpDiscoveryClient UDP discovery client's} config */ +public final class UdpClientConfig extends ClientConfig { + /** UDP port */ + public final int port; + + /** + * Constructor + * + * @param serviceName unique service identifier + * @param clientsCount max instances list size (default to {@link Integer#MAX_VALUE}) + * @param port UDP port + */ + @SuppressWarnings("unused") + public UdpClientConfig(String serviceName, int clientsCount, int port) { + super(serviceName, clientsCount); + this.port = port; + } + + /** + * Constructor with defaults + * + * @param serviceName unique service identifier + * @param port UDP port + */ + public UdpClientConfig(String serviceName, int port) { + super(serviceName); + this.port = port; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + UdpClientConfig udpConfig = (UdpClientConfig) o; + return port == udpConfig.port; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), port); + } + + @Override + public String toString() { + return "UdpConfig{" + "port=" + port + '}'; + } +} diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpConnection.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpConnection.java index d9e86bd..4989548 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpConnection.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpConnection.java @@ -2,104 +2,87 @@ package org.mtbo.lcloud.discovery.udp; -import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetSocketAddress; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.util.concurrent.CompletableFuture; import java.util.logging.Logger; import org.mtbo.lcloud.discovery.Connection; -import org.mtbo.lcloud.discovery.Packet; -import org.mtbo.lcloud.discovery.exceptions.NetworkException; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; /** Allows to send and receive packets using UDP connection */ -public final class UdpConnection implements Connection { +public final class UdpConnection implements Connection { - private final DatagramSocket socket; + final int port; /** - * Create and bind broadcast UDP socket + * UDP connection * * @param port for incoming packets - * @throws SocketException in case of bind error */ - public UdpConnection(final int port) throws SocketException { - socket = new DatagramSocket(null); - socket.setReuseAddress(true); - socket.setBroadcast(true); - socket.bind(new InetSocketAddress(port)); + public UdpConnection(int port) { + this.port = port; } + /** Create and bind broadcast UDP socket */ @Override - public CompletableFuture receive() { - return CompletableFuture.supplyAsync( - () -> { - byte[] buffer = new byte[4096]; + public Mono socket() { + return Mono.fromCallable( + () -> { + var datagramSocket = new DatagramSocket(null); + datagramSocket.setReuseAddress(true); + datagramSocket.setBroadcast(true); + datagramSocket.bind(new InetSocketAddress(port)); + return datagramSocket; + }) + .publishOn(Schedulers.boundedElastic()); + } - DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + @Override + public void close(DatagramSocket socket) { + socket.close(); + } + + @Override + public Mono receive(DatagramSocket socket) { - try { - socket.receive(packet); - } catch (IOException e) { - throw new NetworkException(e); - } + return Mono.fromCallable( + () -> { + byte[] buffer = new byte[4096]; - logger.finest( - ">>> Discovery packet received from: " + packet.getAddress().getHostAddress()); + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); - return new UdpPacket(packet); - }); + socket.receive(packet); + return packet; + }) + .publishOn(Schedulers.boundedElastic()) + .map(UdpPacket::new); } @Override - public CompletableFuture send(Packet packet) { - return CompletableFuture.runAsync( - () -> { - assert packet instanceof UdpPacket; - final var udpPacket = (UdpPacket) packet; - final var data = udpPacket.data(); - byte[] array = data.array(); - final var sendPacket = - new DatagramPacket( - array, - data.arrayOffset(), - data.limit(), - udpPacket.packet.getAddress(), - udpPacket.packet.getPort()); - try { - socket.send(sendPacket); - } catch (IOException e) { - logger.severe("Failed to send UDP packet: " + e); - throw new RuntimeException(e); - } - logger.finer( - ">>> Sent packet to: " - + sendPacket.getAddress().getHostAddress() - + ":" - + udpPacket.packet.getPort()); - }); + public Mono send(DatagramSocket socket, UdpPacket packet) { + return Mono.fromCallable( + () -> { + final var data = packet.data(); + byte[] array = data.array(); + final var sendPacket = + new DatagramPacket( + array, + data.arrayOffset(), + data.limit(), + packet.packet().getAddress(), + packet.packet().getPort()); + socket.send(sendPacket); + logger.finer( + ">>> Sent packet to: " + + sendPacket.getAddress().getHostAddress() + + ":" + + packet.packet().getPort()); + + return true; + }) + .publishOn(Schedulers.boundedElastic()); } static final Logger logger = Logger.getLogger(UdpConnection.class.getName()); - - private record UdpPacket(DatagramPacket packet) implements Packet { - - @Override - public ByteBuffer data() { - return ByteBuffer.wrap(packet.getData(), packet.getOffset(), packet.getLength()); - } - - @Override - public Packet copyWithData(ByteBuffer wrap) { - return new UdpPacket( - new DatagramPacket( - wrap.array(), - wrap.arrayOffset(), - wrap.limit(), - packet.getAddress(), - packet.getPort())); - } - } } diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryClient.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryClient.java index df81534..f63a899 100644 --- a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryClient.java +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryClient.java @@ -3,7 +3,6 @@ package org.mtbo.lcloud.discovery.udp; import java.net.*; -import java.util.Objects; import java.util.logging.Logger; import java.util.stream.Stream; import org.mtbo.lcloud.discovery.DiscoveryClient; @@ -18,10 +17,10 @@ public class UdpDiscoveryClient /** * Construct client with config * - * @param config configuration, including {@link Config#serviceName service name} and {@link - * UdpConfig#port} + * @param config configuration, including {@link UdpClientConfig#serviceName service name} and + * {@link UdpClientConfig#port} */ - public UdpDiscoveryClient(UdpConfig config) { + public UdpDiscoveryClient(UdpClientConfig config) { super(config); } @@ -42,7 +41,7 @@ protected Mono sendPacket(DatagramSocket socket, DatagramPacket @Override protected DatagramPacket createSendPacket(byte[] sendData, InetAddress address) { - return new DatagramPacket(sendData, sendData.length, address, ((UdpConfig) config).port); + return new DatagramPacket(sendData, sendData.length, address, ((UdpClientConfig) config).port); } @Override @@ -83,10 +82,10 @@ protected Flux getMainBroadcastAddresses() { return Flux.from( Mono.fromCallable(() -> InetAddress.getByName("255.255.255.255")) .publishOn(Schedulers.boundedElastic())) - .doOnNext(inetAddress -> logger.fine("get main address (REAL): " + inetAddress.toString())) + .doOnNext(inetAddress -> logger.finer("get main address (REAL): " + inetAddress.toString())) .cache() .doOnNext( - inetAddress -> logger.fine("get main address (CACHED): " + inetAddress.toString())); + inetAddress -> logger.finer("get main address (CACHED): " + inetAddress.toString())); } @Override @@ -124,60 +123,5 @@ private boolean canSend(NetworkInterface networkInterface) { } } - /** {@link UdpDiscoveryClient UDP discovery client's} config */ - public static final class UdpConfig extends Config { - private final int port; - - /** - * Constructor - * - * @param serviceName unique service identifier - * @param clientsCount max instances list size (default to {@link Integer#MAX_VALUE}) - * @param port UDP port - */ - @SuppressWarnings("unused") - public UdpConfig(String serviceName, int clientsCount, int port) { - super(serviceName, clientsCount); - this.port = port; - } - - /** - * Constructor with defaults - * - * @param serviceName unique service identifier - * @param port UDP port - */ - public UdpConfig(String serviceName, int port) { - super(serviceName); - this.port = port; - } - - @Override - public boolean equals(Object o) { - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; - UdpConfig udpConfig = (UdpConfig) o; - return port == udpConfig.port; - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), port); - } - - @Override - public String toString() { - return "UdpConfig{" - + "port=" - + port - + ", serviceName='" - + serviceName - + '\'' - + ", clientsCount=" - + clientsCount - + '}'; - } - } - static final Logger logger = Logger.getLogger(UdpDiscoveryClient.class.getName()); } diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryService.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryService.java new file mode 100644 index 0000000..a83b776 --- /dev/null +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpDiscoveryService.java @@ -0,0 +1,20 @@ +/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ + +package org.mtbo.lcloud.discovery.udp; + +import java.net.DatagramSocket; +import org.mtbo.lcloud.discovery.DiscoveryService; +import org.mtbo.lcloud.discovery.ServiceConfig; + +/** UDP implementation of {@link DiscoveryService} */ +public class UdpDiscoveryService extends DiscoveryService { + + /** + * Construct service with config + * + * @param config configuration, including {@link ServiceConfig#serviceName service name} + */ + public UdpDiscoveryService(UdpServiceConfig config) { + super(config); + } +} diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpPacket.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpPacket.java new file mode 100644 index 0000000..64d3ea0 --- /dev/null +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpPacket.java @@ -0,0 +1,27 @@ +/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ + +package org.mtbo.lcloud.discovery.udp; + +import java.net.DatagramPacket; +import java.nio.ByteBuffer; +import org.mtbo.lcloud.discovery.Packet; + +/** + * UDP packet implementation of {@link Packet} + * + * @param packet + */ +public record UdpPacket(DatagramPacket packet) implements Packet { + + @Override + public ByteBuffer data() { + return ByteBuffer.wrap(packet.getData(), packet.getOffset(), packet.getLength()); + } + + @Override + public UdpPacket copyWithData(ByteBuffer wrap) { + return new UdpPacket( + new DatagramPacket( + wrap.array(), wrap.arrayOffset(), wrap.limit(), packet.getAddress(), packet.getPort())); + } +} diff --git a/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpServiceConfig.java b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpServiceConfig.java new file mode 100644 index 0000000..d9a66de --- /dev/null +++ b/lcloud-udp-discovery/src/main/java/org/mtbo/lcloud/discovery/udp/UdpServiceConfig.java @@ -0,0 +1,39 @@ +/* (C) 2025 Vladimir E. Koltunov (mtbo.org) */ + +package org.mtbo.lcloud.discovery.udp; + +import java.util.Objects; +import org.mtbo.lcloud.discovery.ServiceConfig; + +/** UDP implementation of {@link ServiceConfig} */ +public class UdpServiceConfig extends ServiceConfig { + private final int port; + + /** + * @param serviceName unique service identifier + * @param instanceName this endpoint name + * @param port UDP port + */ + public UdpServiceConfig(String serviceName, String instanceName, int port) { + super(serviceName, instanceName); + this.port = port; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + UdpServiceConfig that = (UdpServiceConfig) o; + return port == that.port; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), port); + } + + @Override + public String toString() { + return "UdpServiceConfig{" + "port=" + port + '}'; + } +}