diff --git a/pom.xml b/pom.xml index d519037..03c726c 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.ugcs ugcs-java-sdk UgCS Server Java SDK - 4.2 + 4.12 pom ucs-messaging @@ -13,11 +13,11 @@ ucs-client - 1.8 - 1.8 + 11 + 11 UTF-8 UTF-8 - 1.7.25 + 1.7.26 1.2.3 2.8.7 @@ -34,6 +34,11 @@ logback-classic ${logback.version} + + javax.naming + jndi + 1.2.1 + com.google.protobuf @@ -79,17 +84,23 @@ jackson-databind ${jackson.version} + + com.fasterxml.jackson.dataformat + jackson-dataformat-xml + ${jackson.version} + - junit - junit - 4.12 - test + org.junit + junit-bom + 5.7.1 + pom + import org.mockito mockito-core - 2.21.0 + 2.28.2 test @@ -166,15 +177,15 @@ - org.codehaus.mojo - findbugs-maven-plugin - 3.0.3 + com.github.spotbugs + spotbugs-maven-plugin + 3.1.12 Max Low true - ${project.build.directory} - findbugs-exclude.xml + ${project.build.directory} + spotbugs-exclude.xml diff --git a/ucs-api/pom.xml b/ucs-api/pom.xml index 61c7478..240ec9f 100644 --- a/ucs-api/pom.xml +++ b/ucs-api/pom.xml @@ -4,7 +4,7 @@ com.ugcs ugcs-java-sdk - 4.2 + 4.12 ucs-api UCS API diff --git a/ucs-api/src/main/java/com/ugcs/ucs/proto/mapping/HciMessageMapping.java b/ucs-api/src/main/java/com/ugcs/ucs/proto/mapping/HciMessageMapping.java index 1fe4596..62992be 100644 --- a/ucs-api/src/main/java/com/ugcs/ucs/proto/mapping/HciMessageMapping.java +++ b/ucs-api/src/main/java/com/ugcs/ucs/proto/mapping/HciMessageMapping.java @@ -161,5 +161,9 @@ public HciMessageMapping() { putMapping(163, GetElevationTilesResponse.class); putMapping(164, ExportRouteToKmlRequest.class); putMapping(165, ExportRouteToKmlResponse.class); + putMapping(168, GetTelemetrySnapshotRequest.class); + putMapping(169, GetTelemetrySnapshotResponse.class); + putMapping(170, SplitRouteByCorridorPointRequest.class); + putMapping(171, SplitRouteByCorridorPointResponse.class); } } diff --git a/ucs-api/src/main/proto/Domain.proto b/ucs-api/src/main/proto/Domain.proto index d7f828f..8d779b2 100644 --- a/ucs-api/src/main/proto/Domain.proto +++ b/ucs-api/src/main/proto/Domain.proto @@ -135,6 +135,7 @@ enum VehicleParameterType { enum PayloadType { PT_CAMERA = 0; PT_DATA_LINK = 1; + PT_LIDAR = 2; } enum PayloadParameterType { @@ -147,6 +148,9 @@ enum PayloadParameterType { PPT_SENSOR_HORIZONTAL_PIXELS = 6; PPT_SENSOR_VERTICAL_PIXELS = 7; PPT_MIN_TRIGGERING_INTERVAL = 8; + PPT_SCANNING_FOV = 9; + PPT_SCANNING_RANGE = 10; + PPT_SCANNING_SPEED = 11; } enum SeverityLevel { @@ -180,6 +184,11 @@ enum FailsafeAction { FA_NA = 4; } +enum RecordingState { + RS_OFF = 0; + RS_ON = 1; +} + enum TriggerState { TS_SINGLE_SHOT = 0; TS_START_RECORDING = 1; @@ -269,6 +278,7 @@ enum ActionCode { AC_REPEAT_SERVO = 15; AC_TRANSITION_FIXED = 16; AC_TRANSITION_VTOL = 17; + AC_LIDAR_RECORDING_CONTROL = 18; } enum NfzType { @@ -327,6 +337,8 @@ enum Subsystem { S_ADSB_VEHICLE = 10; S_WEATHER_STATION = 11; S_VSM = 12; + S_LIDAR = 13; + S_RANGEFINDER = 14; } enum Semantic { @@ -367,11 +379,17 @@ enum Semantic { S_FLIGHT_MODE = 35; S_LIST = 36; S_AUTOPILOT_STATUS = 37; + S_TIMESTAMP = 38; + S_ANY = 39; S_TEMPERATURE = 40; S_HUMIDITY = 41; S_PRECIPITATION = 42; - S_TIMESTAMP = 38; - S_ANY = 39; + S_DISTANCE = 43; +} + +enum KmlExportAltitudeMode { + E_AMSL = 1; + E_RELATIVE_TO_FIRST_WP = 2; } //***************************************************************************** @@ -452,10 +470,12 @@ message DomainObjectWrapper { optional SetServo setServo = 82; optional RepeatServo repeatServo = 83; optional TransitionFixed transitionFixed = 84; - optional TransitionVtol transitionVtol = 85; + optional TransitionVtol transitionVtol = 85; optional RouteUpload routeUpload = 86; optional RoutePass routePass = 87; optional Setting setting = 88; + optional LidarRecordingControlDefinition lidarRecordingControlDefinition = 89; + optional LidarRecordingControl lidarRecordingControl = 90; } //***************************************************************************** @@ -505,7 +525,6 @@ message MissionPreference { optional string tag = 7; } - message Setting { optional int32 id = 1; optional string name = 2; @@ -765,7 +784,7 @@ message Route { optional int64 scheduledTime = 6 [(ugcs_field_posix_time) = true]; optional Mission mission = 7; repeated SegmentDefinition segments = 8; - optional TrajectoryType trajectoryType = 10; + optional TrajectoryType trajectoryType = 10 [deprecated=true]; optional double safeAltitude = 11; optional double maxAltitude = 12; optional double initialSpeed = 13; @@ -935,6 +954,7 @@ message Action { optional RepeatServo repeatServo = 24; optional TransitionVtol transitionVtol = 26; optional TransitionFixed transitionFixed = 27; + optional LidarRecordingControl lidarRecordingControl = 28; optional string tag = 21; optional double estimatedTime = 25; } @@ -951,6 +971,7 @@ message Waypoint { optional WaypointTurnType turnType = 9; optional AltitudeFrame altitudeFrame = 11; optional string tag = 10; + optional double cornerRadius = 12; } message CameraControl { @@ -1009,6 +1030,14 @@ message HeadingChange { optional string tag = 6; } +// Start/Stop lidar data recording. +message LidarRecordingControl { + optional int32 id = 1; + optional int32 version = 2; + optional RecordingState targetState = 3; + optional string tag = 4; +} + message PoiChange { optional int32 id = 1; optional int32 version = 2; @@ -1123,8 +1152,9 @@ message ActionDefinition { optional PanoramaDefinition panoramaDefinition = 13; optional SetServoDefinition setServoDefinition = 14; optional RepeatServoDefinition repeatServoDefinition = 15; - optional TransitionVtolDefinition transitionVtolDefinition = 17; + optional TransitionVtolDefinition transitionVtolDefinition = 17; optional TransitionFixedDefinition transitionFixedDefinition = 18; + optional LidarRecordingControlDefinition lidarRecordingControlDefinition = 19; optional string tag = 16; } @@ -1165,6 +1195,13 @@ message CameraSeriesByDistanceDefinition { optional string tag = 8; } +message LidarRecordingControlDefinition { + optional int32 id = 1; + optional int32 version = 2; + optional RecordingState targetState = 3; + optional string tag = 4; +} + message HeadingDefinition { optional int32 id = 1; optional int32 version = 2; @@ -1474,6 +1511,12 @@ message VehicleTrack { optional Track track = 2; } +message TelemetrySnapshot { + optional Vehicle vehicle = 1 [(ugcs_field_id) = true]; + optional int64 snapshotTime = 2 [(ugcs_field_posix_time) = true]; + repeated Telemetry telemetry = 3; +} + //***************************************************************************** // Events //***************************************************************************** diff --git a/ucs-api/src/main/proto/Messages.proto b/ucs-api/src/main/proto/Messages.proto index d2d0846..9feff26 100644 --- a/ucs-api/src/main/proto/Messages.proto +++ b/ucs-api/src/main/proto/Messages.proto @@ -3,7 +3,7 @@ syntax = "proto2"; option java_package = "com.ugcs.ucs.proto"; option java_outer_classname = "MessagesProto"; option (ugcs_protocol_major_version) = 1; -option (ugcs_protocol_minor_version) = 2; +option (ugcs_protocol_minor_version) = 7; import "Options.proto"; import "Domain.proto"; @@ -85,6 +85,7 @@ message LicensePermissionsDto { required bool kmlExport = 20; optional int32 vehicleConnectionsLimit = 21; optional bool dssClient = 22 [default = false]; + repeated string forbiddenCommandCodes = 23; } message PlatformAndBooleanPairDto { @@ -154,6 +155,10 @@ message PlatformParametersDto { required PlatformParameters parameters = 2; } +message PayloadParametersMappingDto { + required PayloadType payloadType = 1; + repeated PayloadParameterType parameterTypes = 2; +} // Keys and values message PlatformAndVehicleTypePairDto { @@ -530,6 +535,7 @@ message GetRasterGridResponse { required int32 height = 3; required bool noData = 4; } + //Request for elevation tiles in specified rectangle message GetElevationTilesRequest { required int32 clientId = 1; @@ -685,6 +691,7 @@ message GetKmlRouteRepresentationRequest { required string routeName = 2; required VehicleProfile routeVehicleProfile = 3; repeated Wgs84LocationDto waypoints = 4; + required KmlExportAltitudeMode altitudeMode = 5; } //Deprecated message GetKmlRouteRepresentationResponse { @@ -694,6 +701,7 @@ message GetKmlRouteRepresentationResponse { message ExportRouteToKmlRequest { required int32 clientId = 1; optional ProcessedRoute route = 2; + required KmlExportAltitudeMode altitudeMode = 3; } message ExportRouteToKmlResponse { @@ -938,6 +946,7 @@ message GetMappingRequest { optional bool getFacadeScanPatterns = 9; optional bool getAltitudeFrames = 10; optional bool getPlatformParameters = 11; + optional bool getPayloadParameters = 12; } message GetMappingResponse { @@ -951,6 +960,7 @@ message GetMappingResponse { optional FacadeScanPatternsMappingDto facadeScanPaterrnsMapping = 9; optional AltitudeFramesMappingDto altitudeFramesMapping = 10; repeated PlatformParametersDto platformParameters = 11; + repeated PayloadParametersMappingDto payloadParameters = 12; } // Locks @@ -1069,6 +1079,17 @@ message SplitRouteByDistanceResponse { repeated Route parts = 1; } +message SplitRouteByCorridorPointRequest { + required int32 clientId = 1; + required Route route = 2; + required int32 segmentIndex = 3; + required int32 pointIndex = 4; +} + +message SplitRouteByCorridorPointResponse { + repeated Route parts = 1; +} + message JoinRouteRequest { required int32 clientId = 1; repeated Route parts = 2; @@ -1136,3 +1157,14 @@ message UnsubscribeEventRequest { message UnsubscribeEventResponse { } + +message GetTelemetrySnapshotRequest { + required int32 clientId = 1; + // list of vehicles to get snapshots for + repeated Vehicle vehicles = 2; +} + +message GetTelemetrySnapshotResponse { + // list of vehicle telemetry snapshots + repeated TelemetrySnapshot snapshots = 1; +} \ No newline at end of file diff --git a/ucs-client/pom.xml b/ucs-client/pom.xml index 6c24a5a..d189b53 100644 --- a/ucs-client/pom.xml +++ b/ucs-client/pom.xml @@ -4,7 +4,7 @@ com.ugcs ugcs-java-sdk - 4.2 + 4.12 ucs-client UCS Client diff --git a/ucs-client/src/main/java/com/ugcs/ucs/client/Client.java b/ucs-client/src/main/java/com/ugcs/ucs/client/Client.java index 52bc141..f20535a 100644 --- a/ucs-client/src/main/java/com/ugcs/ucs/client/Client.java +++ b/ucs-client/src/main/java/com/ugcs/ucs/client/Client.java @@ -45,7 +45,8 @@ public Client(SocketAddress serverAddress) { this.connector = new MinaConnector( new MessageWrapperCodecFactory(new HciMessageMapping()), 1, - 1); + 1, + null); } public void addNotificationListener(ServerNotificationListener listener) { @@ -183,4 +184,4 @@ public boolean select(Object message) { && !STATUS_MESSAGE_TYPES.contains(wrapper.getMessage().getClass()); } } -} \ No newline at end of file +} diff --git a/ucs-common/pom.xml b/ucs-common/pom.xml index b754ff2..1a82a44 100644 --- a/ucs-common/pom.xml +++ b/ucs-common/pom.xml @@ -4,7 +4,7 @@ com.ugcs ugcs-java-sdk - 4.2 + 4.12 ucs-common UCS Common Data Types and Utilities @@ -20,8 +20,13 @@ test - junit - junit + org.junit.jupiter + junit-jupiter + test + + + org.junit.vintage + junit-vintage-engine net.jodah diff --git a/ucs-common/src/main/java/com/ugcs/common/io/CircularBuffer.java b/ucs-common/src/main/java/com/ugcs/common/io/CircularBuffer.java index af6ed20..ac4cb5c 100644 --- a/ucs-common/src/main/java/com/ugcs/common/io/CircularBuffer.java +++ b/ucs-common/src/main/java/com/ugcs/common/io/CircularBuffer.java @@ -126,24 +126,32 @@ public int read(byte[] b) { return read(b, 0, b.length); } - @Override + public int read(byte[] b, int off, int len) { if (b == null) throw new NullPointerException(); if (off < 0 || len < 0 || off + len > b.length) throw new IndexOutOfBoundsException(); - + if (isEmpty()) return -1; - int n = 0; - for (int i = off; i < off + len; ++i) { - int value = read(); - if (value == -1) - break; - b[i] = (byte)value; - ++n; + int readCount = 0; + if (head > tail) { + int batchSize = Math.min(buffer.length - head, len); + System.arraycopy(buffer, head, b, off, batchSize); + readCount += batchSize; + head = head + batchSize; + if (head == buffer.length) + head = 0; + } + if (head < tail && readCount < len) { + int batchSize = len - readCount; + System.arraycopy(buffer, head, b, off+readCount, batchSize); + readCount += batchSize; + head = head + batchSize; + } - return n; + return readCount; } @Override @@ -203,22 +211,27 @@ public void write(byte[] b) { write(b, 0, b.length); } - @Override + public void write(byte[] b, int off, int len) { if (b == null) throw new NullPointerException(); if (off < 0 || len < 0 || off + len > b.length) throw new IndexOutOfBoundsException(); - + if (len == 0) return; - + reserve(length() + len); - for (int i = off; i < off + len; ++i) { - put(tail, b[i]); - ++tail; + if (tail + len <= buffer.length) { + System.arraycopy(b, off, buffer, tail, len); + tail = tail+len; if (tail == buffer.length) - tail = 0; // wrap around + tail = 0; + } else { + int firstBatchSize = buffer.length - tail; + System.arraycopy(b, off, buffer, tail, firstBatchSize); + System.arraycopy(b, off+firstBatchSize, buffer, 0, len-firstBatchSize); + tail = len-firstBatchSize; } } } diff --git a/ucs-messaging/pom.xml b/ucs-messaging/pom.xml index b09db15..9af8a40 100644 --- a/ucs-messaging/pom.xml +++ b/ucs-messaging/pom.xml @@ -4,7 +4,7 @@ com.ugcs ugcs-java-sdk - 4.2 + 4.12 ucs-messaging UCS Messaging Library diff --git a/ucs-messaging/src/main/java/com/ugcs/messaging/GroupingThreadPool.java b/ucs-messaging/src/main/java/com/ugcs/messaging/GroupingThreadPool.java index ea5abc4..01e4213 100644 --- a/ucs-messaging/src/main/java/com/ugcs/messaging/GroupingThreadPool.java +++ b/ucs-messaging/src/main/java/com/ugcs/messaging/GroupingThreadPool.java @@ -75,6 +75,14 @@ public GroupingThreadPool(int coreWorkers, int maxWorkers, TaskMapper taskMapper this(coreWorkers, maxWorkers, taskMapper, Executors.defaultThreadFactory()); } + public GroupingThreadPool(int coreWorkers, int maxWorkers, TaskMapper taskMapper, String poolName) { + this(coreWorkers, maxWorkers, taskMapper, runnable -> { + Thread thread = new Thread(runnable); + thread.setName(poolName + "-" + thread.getName()); + return thread; + }); + } + public GroupingThreadPool(int coreWorkers, int maxWorkers, TaskMapper taskMapper, ThreadFactory threadFactory) { if (coreWorkers < 0 || maxWorkers < 1 @@ -199,7 +207,7 @@ private boolean spawnWorker(boolean ignoreShutdown) { if (!ignoreShutdown && shutdown) return false; - Worker worker = null; + Worker worker; wl.lock(); try { if (workers.size() >= maxWorkers) @@ -218,7 +226,7 @@ private boolean spawnWorker(boolean ignoreShutdown) { if (!started) removeWorker(worker); } - return started; + return true; } private boolean respawnCoreWorker() { @@ -338,9 +346,9 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE @Override public String toString() { - int numQueues = 0; + int numQueues; int numTasks = 0; - int numWaitingQueues = 0; + int numWaitingQueues; long maxWaitingNanos = 0L; ql.lock(); try { @@ -358,7 +366,7 @@ public String toString() { } finally { ql.unlock(); } - int numWorkers = 0; + int numWorkers; wl.lock(); try { numWorkers = workers.size(); diff --git a/ucs-messaging/src/main/java/com/ugcs/messaging/ListenableFuture.java b/ucs-messaging/src/main/java/com/ugcs/messaging/ListenableFuture.java index 5118328..0cf205e 100644 --- a/ucs-messaging/src/main/java/com/ugcs/messaging/ListenableFuture.java +++ b/ucs-messaging/src/main/java/com/ugcs/messaging/ListenableFuture.java @@ -1,10 +1,15 @@ package com.ugcs.messaging; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public interface ListenableFuture extends Future { void addCompletionListener(CompletionListener listener); + void awaitCompletionListeners(long timeout, TimeUnit unit) + throws TimeoutException, InterruptedException; + ListenableFuture map(Mapper mapper); } diff --git a/ucs-messaging/src/main/java/com/ugcs/messaging/ListenableFutureAdapter.java b/ucs-messaging/src/main/java/com/ugcs/messaging/ListenableFutureAdapter.java index 3c2ad26..28d1e1e 100644 --- a/ucs-messaging/src/main/java/com/ugcs/messaging/ListenableFutureAdapter.java +++ b/ucs-messaging/src/main/java/com/ugcs/messaging/ListenableFutureAdapter.java @@ -18,6 +18,35 @@ public ListenableFutureAdapter(ListenableFuture future, Mapper mapper) this.mapper = mapper; } + @Override + public void addCompletionListener(final CompletionListener listener) { + Objects.requireNonNull(listener); + + CompletionListener innerListener = new CompletionListener() { + @Override + public void completed(CompletionEvent event) { + Object source = ListenableFutureAdapter.this; + if (event.getError() != null) { + // error translated from the lower level + listener.completed(new CompletionEvent(source, null, event.getError())); + } else { + R result = null; + try { + result = mapper.map(event.getResult()); + } catch (Exception e) { + // error extracted from the response value + listener.completed(new CompletionEvent(source, null, e)); + } + if (result != null) { + // ok + listener.completed(new CompletionEvent(source, result, null)); + } + } + } + }; + future.addCompletionListener(innerListener); + } + @Override public boolean cancel(boolean mayInterruptIfRunning) { return future.cancel(mayInterruptIfRunning); @@ -53,34 +82,8 @@ public R get(long timeout, TimeUnit unit) throws InterruptedException, Execution } } - /* events */ - @Override - public void addCompletionListener(final CompletionListener listener) { - Objects.requireNonNull(listener); - - CompletionListener innerListener = new CompletionListener() { - @Override - public void completed(CompletionEvent event) { - Object source = ListenableFutureAdapter.this; - if (event.getError() != null) { - // error translated from the lower level - listener.completed(new CompletionEvent(source, null, event.getError())); - } else { - R result = null; - try { - result = mapper.map(event.getResult()); - } catch (Exception e) { - // error extracted from the response value - listener.completed(new CompletionEvent(source, null, e)); - } - if (result != null) { - // ok - listener.completed(new CompletionEvent(source, result, null)); - } - } - } - }; - future.addCompletionListener(innerListener); + public void awaitCompletionListeners(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException { + future.awaitCompletionListeners(timeout, unit); } } diff --git a/ucs-messaging/src/main/java/com/ugcs/messaging/api/MessageFuture.java b/ucs-messaging/src/main/java/com/ugcs/messaging/api/MessageFuture.java index 8730cae..74ac6bd 100644 --- a/ucs-messaging/src/main/java/com/ugcs/messaging/api/MessageFuture.java +++ b/ucs-messaging/src/main/java/com/ugcs/messaging/api/MessageFuture.java @@ -1,9 +1,9 @@ package com.ugcs.messaging.api; -import java.util.ArrayList; -import java.util.Collections; +import java.util.ArrayDeque; import java.util.List; import java.util.Objects; +import java.util.Queue; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -16,10 +16,11 @@ public class MessageFuture extends AbstractListenableFuture implements MessageListener { private final MessageSession session; - private final List> listeners = new ArrayList<>(); + private final Queue> listeners = new ArrayDeque<>(); private FutureState state; private FutureResult result; + private boolean draining; private final Object stateSync = new Object(); public MessageFuture(MessageSession session, Object message, MessageSelector selector) { @@ -43,15 +44,34 @@ public MessageFuture(MessageSession session, Object message, MessageSelector sel } } + @Override + public void addCompletionListener(CompletionListener listener) { + Objects.requireNonNull(listener); + + boolean invokeNow; + synchronized (stateSync) { + listeners.offer(listener); + invokeNow = state != FutureState.RUNNING; + } + if (invokeNow) { + // drain completion queue + drainCompletionListeners(); + } + } + @Override public boolean cancel(boolean mayInterruptIfRunning) { + boolean cancelled; synchronized (stateSync) { if (state != FutureState.RUNNING || !mayInterruptIfRunning) return false; - - completeIfRunning(FutureState.CANCELLED, null, null); - return state == FutureState.CANCELLED; + cancelled = completeIfRunning(FutureState.CANCELLED, null, null); + } + // assert: if cancelled => state == FutureState.CANCELLED + if (cancelled) { + drainCompletionListeners(); } + return cancelled; } @Override @@ -115,7 +135,39 @@ public Object get(long timeout, TimeUnit unit) } } - private void completeIfRunning(FutureState state, Object value, Throwable error) { + // listeners still can be added after calling completion await + // in that case add listeners/await completion calls should be + // synchronized externally + @Override + public void awaitCompletionListeners(long timeout, TimeUnit unit) + throws InterruptedException, TimeoutException { + if (timeout < 0) + throw new IllegalArgumentException("Timeout must be non-negative"); + if (unit == null) + throw new IllegalArgumentException("Time unit not specified"); + + long t = System.nanoTime() + unit.toNanos(timeout); + synchronized (stateSync) { + // wait while listeners queue is not empty + while (!listeners.isEmpty()) { + long nanosTimeout = t - System.nanoTime(); + // timeout check + if (nanosTimeout <= 0) { + // fail with timeout if listeners still not invoked + if (!listeners.isEmpty()) + throw new TimeoutException(); + } + // wait + // interrupt does not cause listeners cancellation + stateSync.wait( + nanosTimeout / 1_000_000L, + (int)(nanosTimeout % 1_000_000L)); + } + } + } + + // returns true if future was completed on this call + private boolean completeIfRunning(FutureState state, Object value, Throwable error) { if (state == null || state == FutureState.RUNNING) throw new IllegalArgumentException("state"); @@ -128,10 +180,6 @@ private void completeIfRunning(FutureState state, Object value, Throwable error) this.result = new FutureResult(value, error); completed = true; - // copy completion listeners - listenersCopy = new ArrayList<>(listeners); - listeners.clear(); - // notify all waiting threads stateSync.notifyAll(); } @@ -139,9 +187,55 @@ private void completeIfRunning(FutureState state, Object value, Throwable error) } finally { if (completed) { session.removeListener(this); + } + } + return completed; + } - // fire completed event - invokeCompleted(listenersCopy); + private void drainCompletionListeners() { + synchronized (stateSync) { + if (state == FutureState.RUNNING) + throw new IllegalStateException(); + // guard from processing queue by multiple threads + // not necessary, but should be more predictive + if (draining) + return; + draining = true; + } + try { + while (true) { + CompletionListener listener; + CompletionEvent event; + synchronized (stateSync) { + listener = listeners.poll(); + if (listener == null) { + draining = false; + // notify all waiting threads and exit loop + stateSync.notifyAll(); + break; + } + + // build completion event from current state + Object value = state == FutureState.SUCCEEDED + ? result.getValue() + : null; + Throwable error = state == FutureState.FAILED + ? result.getError() + : state == FutureState.CANCELLED + ? new CancellationException() + : null; + event = new CompletionEvent<>(this, value, error); + } + + // assert: listener != null && event != null + try { + listener.completed(event); + } catch (Exception ignore) { + } + } + } catch (Exception e) { + synchronized (stateSync) { + draining = false; } } } @@ -173,27 +267,26 @@ private void throwResultException() throws ExecutionException { } } - /* MessageListener implementation */ - @Override public void messageReceived(MessageEvent event) { Objects.requireNonNull(event); - completeIfRunning(FutureState.SUCCEEDED, event.getMessage(), null); + boolean completed = completeIfRunning(FutureState.SUCCEEDED, event.getMessage(), null); + if (completed) { + drainCompletionListeners(); + } } @Override public void cancelled() { cancel(true); } - - /* inner classes */ enum FutureState { RUNNING, CANCELLED, FAILED, - SUCCEEDED, + SUCCEEDED } static class FutureResult { @@ -214,53 +307,4 @@ Throwable getError() { return error; } } - - /* events */ - - @Override - public void addCompletionListener(CompletionListener listener) { - Objects.requireNonNull(listener); - - boolean invokeNow = true; - synchronized (stateSync) { - if (state == FutureState.RUNNING) { - listeners.add(listener); - invokeNow = false; - } - } - if (invokeNow) { - // fire completion event for this listener - invokeCompleted(Collections.singletonList(listener)); - } - } - - private void invokeCompleted(List> listeners) { - if (listeners == null || listeners.isEmpty()) - return; - - Object value = null; - Throwable error = null; - - synchronized (stateSync) { - if (state == FutureState.RUNNING) - throw new IllegalStateException(); - - value = state == FutureState.SUCCEEDED - ? result.getValue() - : null; - error = state == FutureState.FAILED - ? result.getError() - : state == FutureState.CANCELLED - ? new CancellationException() - : null; - } - - CompletionEvent event = new CompletionEvent<>(this, value, error); - for (CompletionListener listener : listeners) { - try { - listener.completed(event); - } catch (Exception ignore) { - } - } - } } diff --git a/ucs-messaging/src/main/java/com/ugcs/messaging/mina/MinaAcceptor.java b/ucs-messaging/src/main/java/com/ugcs/messaging/mina/MinaAcceptor.java index c964480..763b72c 100644 --- a/ucs-messaging/src/main/java/com/ugcs/messaging/mina/MinaAcceptor.java +++ b/ucs-messaging/src/main/java/com/ugcs/messaging/mina/MinaAcceptor.java @@ -6,11 +6,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.ugcs.messaging.GroupingThreadPool; -import com.ugcs.messaging.TaskMapper; -import com.ugcs.messaging.api.Acceptor; -import com.ugcs.messaging.api.CodecFactory; -import com.ugcs.messaging.api.MessageSessionListener; +import javax.net.ssl.SSLContext; + import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder; import org.apache.mina.core.service.IoProcessor; import org.apache.mina.core.service.SimpleIoProcessorPool; @@ -19,6 +16,7 @@ import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.mina.filter.logging.LoggingFilter; +import org.apache.mina.filter.ssl.SslFilter; import org.apache.mina.transport.socket.SocketAcceptor; import org.apache.mina.transport.socket.nio.NioProcessor; import org.apache.mina.transport.socket.nio.NioSession; @@ -26,6 +24,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.ugcs.messaging.GroupingThreadPool; +import com.ugcs.messaging.TaskMapper; +import com.ugcs.messaging.api.Acceptor; +import com.ugcs.messaging.api.CodecFactory; +import com.ugcs.messaging.api.MessageSessionListener; + public class MinaAcceptor implements Acceptor { private static final Logger log = LoggerFactory.getLogger(MinaAcceptor.class); @@ -38,17 +42,18 @@ public class MinaAcceptor implements Acceptor { private final MinaAdapter minaAdapter; private final ExecutorService executor; - public MinaAcceptor(CodecFactory codecFactory) { - this(codecFactory, DEFAULT_MAX_IO_THREADS, DEFAULT_MAX_TASK_THREADS); + public MinaAcceptor(CodecFactory codecFactory, SSLContext sslContext) { + this(codecFactory, DEFAULT_MAX_IO_THREADS, DEFAULT_MAX_TASK_THREADS, sslContext); } - public MinaAcceptor(CodecFactory codecFactory, int maxIoThreads, int maxTaskThreads) { - this(codecFactory, maxIoThreads, maxTaskThreads, MinaTaskMappers.orderedByMessageTypes()); + public MinaAcceptor(CodecFactory codecFactory, int maxIoThreads, int maxTaskThreads, SSLContext sslContext) { + this(codecFactory, maxIoThreads, maxTaskThreads, MinaTaskMappers.orderedByMessageTypes(), sslContext); } - public MinaAcceptor(CodecFactory codecFactory, int maxIoThreads, int maxTaskThreads, TaskMapper taskMapper) { + public MinaAcceptor(CodecFactory codecFactory, int maxIoThreads, int maxTaskThreads, TaskMapper taskMapper, + SSLContext sslContext) { this(codecFactory, new SimpleIoProcessorPool<>(NioProcessor.class, maxIoThreads), - newExecutor(maxTaskThreads, taskMapper)); + newExecutor(maxTaskThreads, taskMapper), sslContext); log.info("Initialized acceptor {max I/O threads: {}, max task threads: {}}", maxIoThreads, maxTaskThreads > 0 @@ -56,7 +61,8 @@ public MinaAcceptor(CodecFactory codecFactory, int maxIoThreads, int maxTaskThre : "unbounded"); } - public MinaAcceptor(CodecFactory codecFactory, IoProcessor processor, ExecutorService executor) { + public MinaAcceptor(CodecFactory codecFactory, IoProcessor processor, ExecutorService executor, + SSLContext sslContext) { Objects.requireNonNull(codecFactory); Objects.requireNonNull(processor); Objects.requireNonNull(executor); @@ -64,6 +70,12 @@ public MinaAcceptor(CodecFactory codecFactory, IoProcessor processor acceptor = new NioSocketAcceptor(processor); DefaultIoFilterChainBuilder filters = acceptor.getFilterChain(); + // ssl + if (sslContext != null) { + SslFilter sslFilter = new SslFilter(sslContext); + filters.addLast("sslFilter", sslFilter); + } + // encoding filters.addLast("codec", new ProtocolCodecFilter(new MinaCodecFactory(codecFactory))); @@ -101,7 +113,7 @@ private static ExecutorService newExecutor(int maxThreads, TaskMapper taskMapper // tasks are ordered within groups maxThreads = Math.max(1, maxThreads); int coreThreads = Math.max(1, maxThreads / 2); - return new GroupingThreadPool(coreThreads, maxThreads, taskMapper); + return new GroupingThreadPool(coreThreads, maxThreads, taskMapper, "MinaAcceptorPool"); } } @@ -125,7 +137,7 @@ public void stop() { public void close() { for (IoSession session : acceptor.getManagedSessions().values()) - session.close(false); + session.closeOnFlush(); // disposing selector resources acceptor.dispose(); // stopping executor service diff --git a/ucs-messaging/src/main/java/com/ugcs/messaging/mina/MinaConnector.java b/ucs-messaging/src/main/java/com/ugcs/messaging/mina/MinaConnector.java index 6ece242..c2fe737 100644 --- a/ucs-messaging/src/main/java/com/ugcs/messaging/mina/MinaConnector.java +++ b/ucs-messaging/src/main/java/com/ugcs/messaging/mina/MinaConnector.java @@ -6,15 +6,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.ugcs.messaging.GroupingThreadPool; -import com.ugcs.messaging.TaskMapper; -import com.ugcs.messaging.api.CodecFactory; -import com.ugcs.messaging.api.ConnectListener; -import com.ugcs.messaging.api.Connector; -import com.ugcs.messaging.api.MessageSession; -import com.ugcs.messaging.api.MessageSessionErrorEvent; -import com.ugcs.messaging.api.MessageSessionEvent; -import com.ugcs.messaging.api.MessageSessionListener; +import javax.net.ssl.SSLContext; + import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.future.IoFutureListener; @@ -25,6 +18,7 @@ import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.mina.filter.logging.LoggingFilter; +import org.apache.mina.filter.ssl.SslFilter; import org.apache.mina.transport.socket.SocketConnector; import org.apache.mina.transport.socket.nio.NioProcessor; import org.apache.mina.transport.socket.nio.NioSession; @@ -32,6 +26,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.ugcs.messaging.GroupingThreadPool; +import com.ugcs.messaging.TaskMapper; +import com.ugcs.messaging.api.CodecFactory; +import com.ugcs.messaging.api.ConnectListener; +import com.ugcs.messaging.api.Connector; +import com.ugcs.messaging.api.MessageSession; +import com.ugcs.messaging.api.MessageSessionErrorEvent; +import com.ugcs.messaging.api.MessageSessionEvent; +import com.ugcs.messaging.api.MessageSessionListener; + public class MinaConnector implements Connector { private static final Logger log = LoggerFactory.getLogger(MinaConnector.class); @@ -44,25 +48,27 @@ public class MinaConnector implements Connector { private final MinaAdapter minaAdapter; private final ExecutorService executor; - public MinaConnector(CodecFactory codecFactory) { - this(codecFactory, DEFAULT_MAX_IO_THREADS, DEFAULT_MAX_TASK_THREADS); + public MinaConnector(CodecFactory codecFactory, SSLContext sslContext) { + this(codecFactory, DEFAULT_MAX_IO_THREADS, DEFAULT_MAX_TASK_THREADS, sslContext); } - public MinaConnector(CodecFactory codecFactory, int maxIoThreads, int maxTaskThreads) { - this(codecFactory, maxIoThreads, maxTaskThreads, MinaTaskMappers.orderedByMessageTypes()); + public MinaConnector(CodecFactory codecFactory, int maxIoThreads, int maxTaskThreads, SSLContext sslContext) { + this(codecFactory, maxIoThreads, maxTaskThreads, MinaTaskMappers.orderedByMessageTypes(), sslContext); } - public MinaConnector(CodecFactory codecFactory, int maxIoThreads, int maxTaskThreads, TaskMapper taskMapper) { + public MinaConnector(CodecFactory codecFactory, int maxIoThreads, int maxTaskThreads, TaskMapper taskMapper, + SSLContext sslContext) { this(codecFactory, new SimpleIoProcessorPool<>(NioProcessor.class, maxIoThreads), - newExecutor(maxTaskThreads, taskMapper)); + newExecutor(maxTaskThreads, taskMapper), sslContext); log.info("Initialized connector {max I/O threads: {}, max task threads: {}}", - Integer.toString(maxIoThreads), + maxIoThreads, maxTaskThreads > 0 ? Integer.toString(maxTaskThreads) : "unbounded"); } - public MinaConnector(CodecFactory codecFactory, IoProcessor processor, ExecutorService executor) { + public MinaConnector(CodecFactory codecFactory, IoProcessor processor, ExecutorService executor, + SSLContext sslContext) { Objects.requireNonNull(codecFactory); Objects.requireNonNull(processor); Objects.requireNonNull(executor); @@ -70,6 +76,13 @@ public MinaConnector(CodecFactory codecFactory, IoProcessor processo connector = new NioSocketConnector(processor); DefaultIoFilterChainBuilder filters = connector.getFilterChain(); + // ssl + if (sslContext != null) { + SslFilter sslFilter = new SslFilter(sslContext); + sslFilter.setUseClientMode(true); + filters.addLast("sslFilter", sslFilter); + } + // encoding filters.addLast("codec", new ProtocolCodecFilter(new MinaCodecFactory(codecFactory))); @@ -103,7 +116,7 @@ private static ExecutorService newExecutor(int maxThreads, TaskMapper taskMapper // tasks are ordered within groups maxThreads = Math.max(1, maxThreads); int coreThreads = Math.max(1, maxThreads / 2); - return new GroupingThreadPool(coreThreads, maxThreads, taskMapper); + return new GroupingThreadPool(coreThreads, maxThreads, taskMapper, "MinaConnectorPool"); } } @@ -135,43 +148,40 @@ public void connectNonBlocking(SocketAddress address, final ConnectListener list ConnectFuture connectFuture = connector.connect(address); if (listener != null) { - connectFuture.addListener(new IoFutureListener() { - @Override - public void operationComplete(ConnectFuture future) { - Objects.requireNonNull(future); - - if (!future.isConnected() || future.getException() != null) { - MessageSessionErrorEvent event = new MessageSessionErrorEvent( - MinaConnector.this, - null, - future.getException()); - listener.connectError(event); - return; - } - MessageSession messageSession = null; - try { - IoSession minaSession = future.getSession(); - messageSession = minaAdapter.getMessageSession(minaSession); - } catch (Throwable e) { - MessageSessionErrorEvent event = new MessageSessionErrorEvent( - MinaConnector.this, - null, - e); - listener.connectError(event); - return; - } - MessageSessionEvent event = new MessageSessionEvent( + connectFuture.addListener((IoFutureListener)future -> { + Objects.requireNonNull(future); + + if (!future.isConnected() || future.getException() != null) { + MessageSessionErrorEvent event = new MessageSessionErrorEvent( + MinaConnector.this, + null, + future.getException()); + listener.connectError(event); + return; + } + MessageSession messageSession = null; + try { + IoSession minaSession = future.getSession(); + messageSession = minaAdapter.getMessageSession(minaSession); + } catch (Throwable e) { + MessageSessionErrorEvent event = new MessageSessionErrorEvent( MinaConnector.this, - messageSession); - listener.connected(event); + null, + e); + listener.connectError(event); + return; } + MessageSessionEvent event = new MessageSessionEvent( + MinaConnector.this, + messageSession); + listener.connected(event); }); } } public void close() { for (IoSession session : connector.getManagedSessions().values()) - session.close(false); + session.closeOnFlush(); // disposing selector resources connector.dispose(); // stopping executor service