From 1ba14020e8ce826747a43d8cac5e7dce5483542b Mon Sep 17 00:00:00 2001 From: Brad Rushworth <67445484+brushworth@users.noreply.github.com> Date: Tue, 30 Jun 2020 13:57:49 +1000 Subject: [PATCH 1/4] RYA-534 Upgrade to RDF4J 3.2 and RYA-496 Upgrading to Accumulo 1.9.3 --- common/rya.provenance/pom.xml | 1 + .../rya/indexing/entity/model/Entity.java | 30 ++--- .../rya/indexing/entity/model/Property.java | 13 +- .../rya/indexing/entity/model/Type.java | 14 +- .../indexing/entity/model/TypedEntity.java | 20 ++- .../sail/config/RyaAccumuloSailConfig.java | 42 +++--- extras/kafka.connect/api/pom.xml | 1 + .../periodic.notification/twill.yarn/pom.xml | 2 +- extras/periodic.notification/twill/pom.xml | 7 +- .../api/conf/AccumuloMergeConfiguration.java | 2 +- .../export/api/conf/MergeConfiguration.java | 6 +- extras/rya.forwardchain/pom.xml | 3 +- .../app/query/StatementPatternIdManager.java | 25 ++-- extras/rya.prospector/pom.xml | 11 ++ extras/rya.reasoning/pom.xml | 9 +- .../queries/InMemoryQueryRepositoryTest.java | 39 +++--- .../client/command/RunQueryCommandIT.java | 27 ++-- .../kafka/KafkaRyaStreamsClientFactory.java | 22 ++-- .../streams/querymanager/QueryManager.java | 65 ++++----- .../querymanager/QueryManagerDaemon.java | 35 +++-- .../querymanager/QueryManagerTest.java | 37 +++--- .../kafka/KafkaQueryChangeLogSourceIT.java | 44 ++++--- .../kafka/LocalQueryExecutorIT.java | 18 +-- .../kafka/LocalQueryExecutorTest.java | 63 +++++---- .../mr/AccumuloHDFSFileInputFormat.java | 26 ++-- .../rya/accumulo/mr/RyaInputFormat.java | 25 +++- .../rya/accumulo/pig/AccumuloStorage.java | 31 ++--- pom.xml | 124 +++++++++++++----- sail/pom.xml | 1 + .../RdfCloudTripleStoreConnection.java | 9 +- 30 files changed, 433 insertions(+), 319 deletions(-) diff --git a/common/rya.provenance/pom.xml b/common/rya.provenance/pom.xml index 87c6e25d9..7096dba02 100644 --- a/common/rya.provenance/pom.xml +++ b/common/rya.provenance/pom.xml @@ -33,6 +33,7 @@ under the License. org.eclipse.rdf4j rdf4j-runtime + pom diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Entity.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Entity.java index 6874a164a..5f0497820 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Entity.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Entity.java @@ -18,7 +18,19 @@ */ package org.apache.rya.indexing.entity.model; -import static java.util.Objects.requireNonNull; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import jdk.nashorn.internal.ir.annotations.Immutable; +import org.apache.log4j.Logger; +import org.apache.rya.api.domain.RyaIRI; +import org.apache.rya.indexing.entity.storage.EntityStorage; +import org.apache.rya.indexing.smarturi.SmartUriAdapter; +import org.apache.rya.indexing.smarturi.SmartUriException; +import org.eclipse.rdf4j.model.IRI; import java.util.ArrayList; import java.util.HashMap; @@ -28,21 +40,7 @@ import java.util.Objects; import java.util.Optional; -import org.apache.http.annotation.Immutable; -import org.apache.log4j.Logger; -import org.apache.rya.api.domain.RyaIRI; -import org.apache.rya.indexing.entity.storage.EntityStorage; -import org.apache.rya.indexing.smarturi.SmartUriAdapter; -import org.apache.rya.indexing.smarturi.SmartUriException; -import org.eclipse.rdf4j.model.IRI; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; -import edu.umd.cs.findbugs.annotations.Nullable; +import static java.util.Objects.requireNonNull; /** * An {@link Entity} is a named concept that has at least one defined structure diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Property.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Property.java index b44c52c09..0b9b77d58 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Property.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Property.java @@ -18,16 +18,15 @@ */ package org.apache.rya.indexing.entity.model; -import static java.util.Objects.requireNonNull; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import jdk.nashorn.internal.ir.annotations.Immutable; +import org.apache.rya.api.domain.RyaIRI; +import org.apache.rya.api.domain.RyaType; import java.util.Objects; -import org.apache.http.annotation.Immutable; -import org.apache.rya.api.domain.RyaType; -import org.apache.rya.api.domain.RyaIRI; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; +import static java.util.Objects.requireNonNull; /** * A value that has been set for an {@link TypedEntity}. diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Type.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Type.java index a7c988b72..1a61812bf 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Type.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Type.java @@ -18,18 +18,16 @@ */ package org.apache.rya.indexing.entity.model; -import static java.util.Objects.requireNonNull; - -import java.util.Objects; - -import org.apache.http.annotation.Immutable; +import com.google.common.collect.ImmutableSet; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import jdk.nashorn.internal.ir.annotations.Immutable; import org.apache.rya.api.domain.RyaIRI; import org.apache.rya.indexing.entity.storage.TypeStorage; -import com.google.common.collect.ImmutableSet; +import java.util.Objects; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; +import static java.util.Objects.requireNonNull; /** * Defines the structure of an {@link TypedEntity}. diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/TypedEntity.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/TypedEntity.java index 816e7fa51..fdce30dc2 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/TypedEntity.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/TypedEntity.java @@ -18,23 +18,21 @@ */ package org.apache.rya.indexing.entity.model; -import static java.util.Objects.requireNonNull; +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableMap; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import jdk.nashorn.internal.ir.annotations.Immutable; +import org.apache.rya.api.domain.RyaIRI; +import org.apache.rya.api.domain.RyaType; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; -import org.apache.http.annotation.Immutable; -import org.apache.rya.api.domain.RyaType; -import org.apache.rya.api.domain.RyaIRI; - -import com.google.common.collect.ImmutableCollection; -import com.google.common.collect.ImmutableMap; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; -import edu.umd.cs.findbugs.annotations.Nullable; +import static java.util.Objects.requireNonNull; /** * A {@link TypedEntity} is a view of an {@link Entity} that has had a specific diff --git a/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaAccumuloSailConfig.java b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaAccumuloSailConfig.java index 43af1fb81..37e9b226e 100644 --- a/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaAccumuloSailConfig.java +++ b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaAccumuloSailConfig.java @@ -20,16 +20,16 @@ import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.eclipse.rdf4j.model.IRI; -import org.eclipse.rdf4j.model.Literal; import org.eclipse.rdf4j.model.Model; import org.eclipse.rdf4j.model.Resource; +import org.eclipse.rdf4j.model.Value; import org.eclipse.rdf4j.model.ValueFactory; import org.eclipse.rdf4j.model.impl.SimpleValueFactory; -import org.eclipse.rdf4j.model.util.GraphUtil; -import org.eclipse.rdf4j.model.util.GraphUtilException; import org.eclipse.rdf4j.sail.config.AbstractSailImplConfig; import org.eclipse.rdf4j.sail.config.SailConfigException; +import java.util.Set; + /** * @deprecated Use {@link AccumuloRdfConfiguration} instead. */ @@ -117,9 +117,7 @@ public void validate() throws SailConfigException { public Resource export(final Model model) { final Resource implNode = super.export(model); - @SuppressWarnings("deprecation") - final - ValueFactory v = model.getValueFactory(); + final ValueFactory v = SimpleValueFactory.getInstance(); model.add(implNode, USER, v.createLiteral(user)); model.add(implNode, PASSWORD, v.createLiteral(password)); @@ -136,27 +134,27 @@ public void parse(final Model model, final Resource implNode) throws SailConfigE System.out.println("parsing"); try { - final Literal userLit = GraphUtil.getOptionalObjectLiteral(model, implNode, USER); - if (userLit != null) { - setUser(userLit.getLabel()); + final Set userLit = model.filter(implNode, USER, null).objects(); + if (userLit.size() == 1) { + setUser(userLit.iterator().next().stringValue()); } - final Literal pwdLit = GraphUtil.getOptionalObjectLiteral(model, implNode, PASSWORD); - if (pwdLit != null) { - setPassword(pwdLit.getLabel()); + final Set pwdLit = model.filter(implNode, PASSWORD, null).objects(); + if (pwdLit.size() == 1) { + setPassword(pwdLit.iterator().next().stringValue()); } - final Literal instLit = GraphUtil.getOptionalObjectLiteral(model, implNode, INSTANCE); - if (instLit != null) { - setInstance(instLit.getLabel()); + final Set instLit = model.filter(implNode, INSTANCE, null).objects(); + if (instLit.size() == 1) { + setInstance(instLit.iterator().next().stringValue()); } - final Literal zooLit = GraphUtil.getOptionalObjectLiteral(model, implNode, ZOOKEEPERS); - if (zooLit != null) { - setZookeepers(zooLit.getLabel()); + final Set zooLit = model.filter(implNode, ZOOKEEPERS, null).objects(); + if (zooLit.size() == 1) { + setZookeepers(zooLit.iterator().next().stringValue()); } - final Literal mockLit = GraphUtil.getOptionalObjectLiteral(model, implNode, IS_MOCK); - if (mockLit != null) { - setMock(Boolean.parseBoolean(mockLit.getLabel())); + final Set mockLit = model.filter(implNode, IS_MOCK, null).objects(); + if (mockLit.size() == 1) { + setMock(Boolean.parseBoolean(mockLit.iterator().next().stringValue())); } - } catch (final GraphUtilException e) { + } catch (final Exception e) { throw new SailConfigException(e.getMessage(), e); } } diff --git a/extras/kafka.connect/api/pom.xml b/extras/kafka.connect/api/pom.xml index fcc6a154e..df86d3943 100644 --- a/extras/kafka.connect/api/pom.xml +++ b/extras/kafka.connect/api/pom.xml @@ -68,6 +68,7 @@ org.eclipse.rdf4j rdf4j-runtime + pom org.slf4j diff --git a/extras/periodic.notification/twill.yarn/pom.xml b/extras/periodic.notification/twill.yarn/pom.xml index 00cb1905f..46bee18ec 100644 --- a/extras/periodic.notification/twill.yarn/pom.xml +++ b/extras/periodic.notification/twill.yarn/pom.xml @@ -43,7 +43,7 @@ org.apache.twill twill-yarn - 0.12.0 + ${twill.version} org.apache.rya diff --git a/extras/periodic.notification/twill/pom.xml b/extras/periodic.notification/twill/pom.xml index a4e8f7a9e..8f6d969af 100644 --- a/extras/periodic.notification/twill/pom.xml +++ b/extras/periodic.notification/twill/pom.xml @@ -77,12 +77,12 @@ com.google.guava - guava + guava org.apache.twill twill-api - 0.12.0 + ${twill.version} @@ -165,6 +165,9 @@ commons-logging:commons-logging org.slf4j:slf4j-log4j12 log4j:log4j + + + org.locationtech.spatial4j:spatial4j diff --git a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/api/conf/AccumuloMergeConfiguration.java b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/api/conf/AccumuloMergeConfiguration.java index a35d5aab7..1a351147f 100644 --- a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/api/conf/AccumuloMergeConfiguration.java +++ b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/api/conf/AccumuloMergeConfiguration.java @@ -18,7 +18,7 @@ */ package org.apache.rya.export.api.conf; -import org.apache.http.annotation.Immutable; +import jdk.nashorn.internal.ir.annotations.Immutable; import org.apache.rya.export.InstanceType; /** diff --git a/extras/rya.export/export.api/src/main/java/org/apache/rya/export/api/conf/MergeConfiguration.java b/extras/rya.export/export.api/src/main/java/org/apache/rya/export/api/conf/MergeConfiguration.java index 96f7ed678..329589bd4 100644 --- a/extras/rya.export/export.api/src/main/java/org/apache/rya/export/api/conf/MergeConfiguration.java +++ b/extras/rya.export/export.api/src/main/java/org/apache/rya/export/api/conf/MergeConfiguration.java @@ -18,12 +18,12 @@ */ package org.apache.rya.export.api.conf; -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.http.annotation.Immutable; +import jdk.nashorn.internal.ir.annotations.Immutable; import org.apache.rya.export.DBType; import org.apache.rya.export.MergePolicy; +import static com.google.common.base.Preconditions.checkNotNull; + /** * Immutable configuration object to allow the MergeTool to connect to the parent and child * databases for data merging. diff --git a/extras/rya.forwardchain/pom.xml b/extras/rya.forwardchain/pom.xml index b5c275a39..4ade411f7 100644 --- a/extras/rya.forwardchain/pom.xml +++ b/extras/rya.forwardchain/pom.xml @@ -33,7 +33,8 @@ under the License. org.eclipse.rdf4j rdf4j-runtime - + pom + org.apache.rya rya.api diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java index ee4c053bb..9a8505e21 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java @@ -18,23 +18,22 @@ */ package org.apache.rya.indexing.pcj.fluo.app.query; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import com.google.common.hash.Hashing; +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; + +import java.util.HashSet; +import java.util.Set; + import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM; import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS; import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH; -import java.util.HashSet; -import java.util.Set; - -import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.data.Bytes; - -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.collect.Sets; -import com.google.common.hash.Hashing; - /** * Utility class for updating and removing StatementPattern nodeIds in the Fluo table. All StatementPattern nodeIds are * stored in a single set under a single entry in the Fluo table. This is to eliminate the need for a scan to find all @@ -62,7 +61,7 @@ public static void addStatementPatternIds(TransactionBase tx, Set ids) { } String idString = builder.append(Joiner.on(VAR_DELIM).join(ids)).toString(); tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS, Bytes.of(idString)); - tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, Bytes.of(Hashing.sha256().hashString(idString).toString())); + tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, Bytes.of(Hashing.sha256().hashUnencodedChars(idString).toString())); } /** @@ -84,7 +83,7 @@ public static void removeStatementPatternIds(TransactionBase tx, Set ids storedIds.removeAll(ids); String idString = Joiner.on(VAR_DELIM).join(ids); tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS, Bytes.of(idString)); - tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, Bytes.of(Hashing.sha256().hashString(idString).toString())); + tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, Bytes.of(Hashing.sha256().hashUnencodedChars(idString).toString())); } } diff --git a/extras/rya.prospector/pom.xml b/extras/rya.prospector/pom.xml index 5ec2f2e53..8ed6f8de6 100644 --- a/extras/rya.prospector/pom.xml +++ b/extras/rya.prospector/pom.xml @@ -55,6 +55,17 @@ under the License. hadoop2 test + + org.powermock + powermock-module-junit4 + test + + + org.powermock + powermock-api-mockito2 + test + + diff --git a/extras/rya.reasoning/pom.xml b/extras/rya.reasoning/pom.xml index 21efb2000..9ad3d7b32 100644 --- a/extras/rya.reasoning/pom.xml +++ b/extras/rya.reasoning/pom.xml @@ -58,7 +58,7 @@ under the License. org.apache.accumulo accumulo-minicluster - compile + provided @@ -77,6 +77,7 @@ under the License. org.eclipse.rdf4j rdf4j-runtime + pom @@ -84,12 +85,18 @@ under the License. mrunit hadoop2 test + org.powermock powermock-module-junit4 test + + org.powermock + powermock-api-mockito2 + test + diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java index 5a16e799a..daba34e32 100644 --- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java +++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java @@ -18,11 +18,11 @@ */ package org.apache.rya.streams.api.queries; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException; +import org.junit.Test; import java.util.HashSet; import java.util.Optional; @@ -31,12 +31,11 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.rya.streams.api.entity.StreamsQuery; -import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException; -import org.junit.Test; - -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Unit tests the methods of {@link InMemoryQueryRepository}. @@ -83,7 +82,7 @@ public void initializedWithPopulatedChangeLog() throws Exception { final QueryChangeLog changeLog = new InMemoryQueryChangeLog(); final QueryRepository queries = new InMemoryQueryRepository( changeLog, SCHEDULE ); try { - queries.startAndWait(); + queries.startAsync(); // Add some queries and deletes to it. final Set expected = new HashSet<>(); expected.add( queries.add("query 1", true, true) ); @@ -98,10 +97,10 @@ public void initializedWithPopulatedChangeLog() throws Exception { final Set stored = initializedQueries.list(); assertEquals(expected, stored); } finally { - queries.stop(); + queries.stopAsync(); } } finally { - queries.stop(); + queries.stopAsync(); } } @@ -160,7 +159,7 @@ public void updateListenerNotify() throws Exception { // Setup a totally in memory QueryRepository. final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE ); try { - queries.startAndWait(); + queries.startAsync(); // Add a query to it. final StreamsQuery query = queries.add("query 1", true, false); @@ -179,7 +178,7 @@ public void updateListenerNotify() throws Exception { queries.add("query 2", true, false); } finally { - queries.stop(); + queries.stopAsync(); } } @@ -191,8 +190,8 @@ public void updateListenerNotify_multiClient() throws Exception { final QueryRepository queries2 = new InMemoryQueryRepository( changeLog, SCHEDULE ); try { - queries.startAndWait(); - queries2.startAndWait(); + queries.startAsync(); + queries2.startAsync(); //show listener on repo that query was added to is being notified of the new query. final CountDownLatch repo1Latch = new CountDownLatch(1); @@ -226,8 +225,8 @@ public void updateListenerNotify_multiClient() throws Exception { assertTrue(repo2Latch.await(5, TimeUnit.SECONDS)); } catch(final InterruptedException e ) { } finally { - queries.stop(); - queries2.stop(); + queries.stopAsync(); + queries2.stopAsync(); } } diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java index 9fbc2c627..07eb3e898 100644 --- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java +++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java @@ -18,17 +18,8 @@ */ package org.apache.rya.streams.client.command; -import static org.junit.Assert.assertEquals; - -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -60,8 +51,16 @@ import org.junit.Rule; import org.junit.Test; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; /** * Integration tests the methods of {@link RunQueryCommand}. @@ -99,7 +98,7 @@ public void setup() { @After public void cleanup() throws Exception { - queryRepo.stopAndWait(); + queryRepo.stopAsync(); stmtProducer.close(); resultConsumer.close(); } diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java index 0bf13d143..e267252fb 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java @@ -18,12 +18,9 @@ */ package org.apache.rya.streams.kafka; -import static java.util.Objects.requireNonNull; - -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -55,10 +52,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.TimeUnit; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; +import static java.util.Objects.requireNonNull; /** * Constructs instances of {@link RyaStreamsClient} that are connected to a Kafka cluster. @@ -111,7 +109,7 @@ public static RyaStreamsClient make( @Override public void close() { try { - queryRepo.stopAndWait(); + queryRepo.stopAsync(); } catch (final Exception e) { log.warn("Couldn't close a QueryRepository.", e); } @@ -123,7 +121,7 @@ public void close() { * Create a {@link Producer} that is able to write to a topic in Kafka. * * @param kafkaHostname - The Kafka broker hostname. (not null) - * @param kafkaPort - The Kafka broker port. + * @param kakfaPort - The Kafka broker port. * @param keySerializerClass - Serializes the keys. (not null) * @param valueSerializerClass - Serializes the values. (not null) * @return A {@link Producer} that can be used to write records to a topic. @@ -149,7 +147,7 @@ private static Producer makeProducer( * starting at the earliest point by default. * * @param kafkaHostname - The Kafka broker hostname. (not null) - * @param kafkaPort - The Kafka broker port. + * @param kakfaPort - The Kafka broker port. * @param keyDeserializerClass - Deserializes the keys. (not null) * @param valueDeserializerClass - Deserializes the values. (not null) * @return A {@link Consumer} that can be used to read records from a topic. diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java index e6bd800e7..2d2f25249 100644 --- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java +++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java @@ -18,7 +18,23 @@ */ package org.apache.rya.streams.querymanager; -import static java.util.Objects.requireNonNull; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; @@ -35,25 +51,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.rya.streams.api.entity.StreamsQuery; -import org.apache.rya.streams.api.queries.ChangeLogEntry; -import org.apache.rya.streams.api.queries.InMemoryQueryRepository; -import org.apache.rya.streams.api.queries.QueryChange; -import org.apache.rya.streams.api.queries.QueryChangeLog; -import org.apache.rya.streams.api.queries.QueryChangeLogListener; -import org.apache.rya.streams.api.queries.QueryRepository; -import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; -import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; -import com.google.common.util.concurrent.AbstractService; -import com.google.common.util.concurrent.UncheckedExecutionException; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; +import static java.util.Objects.requireNonNull; /** * A service for managing {@link StreamsQuery} running on a Rya Streams system. @@ -139,10 +137,12 @@ protected void doStart() { executor.submit(new QueryEventWorker(queryEvents, queryExecutor, blockingValue, blockingUnits, shutdownSignal)); // Start up the query execution framework. - queryExecutor.startAndWait(); + queryExecutor.startAsync(); + queryExecutor.awaitRunning(); // Startup the source that discovers new Query Change Logs. - changeLogSource.startAndWait(); + changeLogSource.startAsync(); + changeLogSource.awaitRunning(); // Subscribe the source a listener that writes to the LogEventWorker's work queue. changeLogSource.subscribe(new LogEventWorkGenerator(logEvents, blockingValue, blockingUnits, shutdownSignal)); @@ -176,14 +176,16 @@ protected void doStop() { // Stop the source of new Change Logs. try { - changeLogSource.stopAndWait(); + changeLogSource.stopAsync(); + changeLogSource.awaitTerminated(); } catch(final UncheckedExecutionException e) { log.warn("Could not stop the Change Log Source.", e); } // Stop the query execution framework. try { - queryExecutor.stopAndWait(); + queryExecutor.stopAsync(); + queryExecutor.awaitTerminated(); } catch(final UncheckedExecutionException e) { log.warn("Could not stop the Query Executor", e); } @@ -303,7 +305,7 @@ public String toString() { * {@link QueryChangeLogSource}. * * @param ryaInstance - The Rya Instance the created log is for. (not null) - * @param log - The created {@link QueryChangeLog. (not null) + * @param log - The created {@link QueryChangeLog}. (not null) * @return A {@link LogEvent} built using the provided values. */ public static LogEvent create(final String ryaInstance, final QueryChangeLog log) { @@ -499,7 +501,7 @@ static class LogEventWorkGenerator implements SourceListener { private final TimeUnit offerUnits; /** - * Constructs an instance of {@link QueryManagerSourceListener}. + * Constructs an instance of the {@link SourceListener} interface. * * @param workQueue - A blocking queue that will have {@link LogEvent}s offered to it. (not null) * @param offerValue - How long to wait when offering new work. @@ -548,7 +550,7 @@ public void notifyDelete(final String ryaInstanceName) { *

* Whenever a new log has been created, then it registers a {@link QueryEventWorkGenerator} * that generates {@link QueryEvent}s based on the content and updates to the discovered - * {@link QueryChagneLog}. + * {@link QueryChangeLog}. *

* Whenever a log is deleted, then the generator is stopped and a stop all {@link QueryEvent} * is written to the work queue. @@ -620,7 +622,8 @@ public void run() { // so that it may be shutdown later. final Scheduler scheduler = Scheduler.newFixedRateSchedule(0, blockingValue, blockingUnits); final QueryRepository repo = new InMemoryQueryRepository(logEvent.getQueryChangeLog().get(), scheduler); - repo.startAndWait(); + repo.startAsync(); + repo.awaitRunning(); repos.put(ryaInstance, repo); // Subscribe a worker that adds the Query Events to the queryWorkQueue queue. @@ -659,7 +662,8 @@ public void run() { // Shut down the query repository for the Rya instance. This ensures the listener will // not receive any more work that needs to be done. final QueryRepository deletedRepo = repos.remove(ryaInstance); - deletedRepo.stopAndWait(); + deletedRepo.stopAsync(); + deletedRepo.awaitTerminated(); // Add work that stops all of the queries related to the instance. final QueryEvent stopAllEvent = QueryEvent.stopALL(ryaInstance); @@ -675,7 +679,8 @@ public void run() { log.info("LogEventWorker shutting down..."); // Shutdown all of the QueryRepositories that were started. - repos.values().forEach(repo -> repo.stopAndWait()); + repos.values().forEach(repo -> repo.stopAsync()); + repos.values().forEach(repo -> repo.awaitTerminated()); log.info("LogEventWorker shut down."); } diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java index 04a0382fb..b879eba60 100644 --- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java +++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java @@ -18,16 +18,12 @@ */ package org.apache.rya.streams.querymanager; -import static java.util.Objects.requireNonNull; - -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.concurrent.TimeUnit; - -import javax.xml.bind.JAXBException; - +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; import org.apache.commons.daemon.Daemon; import org.apache.commons.daemon.DaemonContext; import org.apache.commons.daemon.DaemonInitException; @@ -44,13 +40,14 @@ import org.slf4j.LoggerFactory; import org.xml.sax.SAXException; -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import com.beust.jcommander.ParameterException; -import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import javax.xml.bind.JAXBException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.TimeUnit; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; +import static java.util.Objects.requireNonNull; /** * JSVC integration code for a {@link QueryManager} to be used as a non-Windows daemon. @@ -122,13 +119,15 @@ public void init(final DaemonContext context) throws DaemonInitException, Except @Override public void start() throws Exception { log.info("Starting the Rya Streams Query Manager Daemon."); - manager.startAndWait(); + manager.startAsync(); + manager.awaitRunning(); } @Override public void stop() throws Exception { log.info("Stopping the Rya Streams Query Manager Daemon."); - manager.stopAndWait(); + manager.stopAsync(); + manager.awaitTerminated(); } @Override diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java index f1c9e0f45..c2da67c63 100644 --- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java +++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java @@ -18,17 +18,6 @@ */ package org.apache.rya.streams.querymanager; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - import org.apache.rya.streams.api.entity.StreamsQuery; import org.apache.rya.streams.api.queries.InMemoryQueryChangeLog; import org.apache.rya.streams.api.queries.QueryChange; @@ -36,6 +25,17 @@ import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; import org.junit.Test; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + /** * Unit tests the methods of {@link QueryManager}. */ @@ -75,11 +75,12 @@ public void testCreateQuery() throws Exception { final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS); try { - qm.startAndWait(); + qm.startAsync(); + qm.awaitRunning(); queryStarted.await(5, TimeUnit.SECONDS); verify(qe).startQuery(ryaInstance, query); } finally { - qm.stopAndWait(); + qm.stopAsync(); } } @@ -129,11 +130,12 @@ public void testDeleteQuery() throws Exception { final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS); try { - qm.startAndWait(); + qm.startAsync(); + qm.awaitRunning(); queryDeleted.await(5, TimeUnit.SECONDS); verify(qe).stopQuery(query.getQueryId()); } finally { - qm.stopAndWait(); + qm.stopAsync(); } } @@ -184,11 +186,12 @@ public void testUpdateQuery() throws Exception { final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS); try { - qm.startAndWait(); + qm.startAsync(); + qm.awaitRunning(); queryDeleted.await(10, TimeUnit.SECONDS); verify(qe).stopQuery(query.getQueryId()); } finally { - qm.stopAndWait(); + qm.stopAsync(); } } } \ No newline at end of file diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSourceIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSourceIT.java index 5914b789a..4bc4d3f43 100644 --- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSourceIT.java +++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSourceIT.java @@ -18,15 +18,7 @@ */ package org.apache.rya.streams.querymanager.kafka; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; import org.apache.rya.streams.api.queries.QueryChangeLog; import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.querymanager.QueryChangeLogSource; @@ -36,7 +28,14 @@ import org.junit.Rule; import org.junit.Test; -import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** * Integration tests the methods of {@link KafkaQueryChangeLogSource}. @@ -79,13 +78,14 @@ public void notifyDelete(final String ryaInstanceName) { } try { // Start the source. - source.startAndWait(); + source.startAsync(); + source.awaitRunning(); // If the latch isn't counted down, then fail the test. assertTrue( created.await(5, TimeUnit.SECONDS) ); } finally { - source.stopAndWait(); + source.stopAsync(); } } @@ -113,7 +113,8 @@ public void notifyDelete(final String ryaInstanceName) { } try { // Start the source. - source.startAndWait(); + source.startAsync(); + source.awaitRunning(); // Wait twice the polling duration to ensure it iterates at least once. Thread.sleep(200); @@ -125,7 +126,7 @@ public void notifyDelete(final String ryaInstanceName) { } // If the latch isn't counted down, then fail the test. assertTrue( created.await(5, TimeUnit.SECONDS) ); } finally { - source.stopAndWait(); + source.stopAsync(); } } @@ -161,7 +162,8 @@ public void notifyDelete(final String ryaInstanceName) { try { // Start the source - source.startAndWait(); + source.startAsync(); + source.awaitRunning(); // Wait for it to indicate the topic was created. assertTrue( created.await(5, TimeUnit.SECONDS) ); @@ -173,7 +175,7 @@ public void notifyDelete(final String ryaInstanceName) { assertTrue( deleted.await(5, TimeUnit.SECONDS) ); } finally { - source.stopAndWait(); + source.stopAsync(); } } @@ -205,7 +207,8 @@ public void notifyDelete(final String ryaInstanceName) { } try { // Start the source - source.startAndWait(); + source.startAsync(); + source.awaitRunning(); // Wait for that first listener to indicate the topic was created. This means that one has been cached. assertTrue( created.await(5, TimeUnit.SECONDS) ); @@ -226,7 +229,7 @@ public void notifyDelete(final String ryaInstanceName) { } assertTrue( newListenerCreated.await(5, TimeUnit.SECONDS) ); } finally { - source.stopAndWait(); + source.stopAsync(); } } @@ -240,7 +243,8 @@ public void unsubscribedDoesNotReceiveNotifications() throws Exception { try { // Start the source. - source.startAndWait(); + source.startAsync(); + source.awaitRunning(); // Create a listener that flips a boolean to true when it is notified. final AtomicBoolean notified = new AtomicBoolean(false); @@ -271,7 +275,7 @@ public void notifyDelete(final String ryaInstanceName) { // Show the boolean was never flipped to true. assertFalse(notified.get()); } finally { - source.stopAndWait(); + source.stopAsync(); } } } \ No newline at end of file diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java index fcb3a4674..624ef35b1 100644 --- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java +++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java @@ -18,12 +18,7 @@ */ package org.apache.rya.streams.querymanager.kafka; -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - +import com.google.common.collect.Lists; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -50,7 +45,11 @@ import org.junit.Rule; import org.junit.Test; -import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; /** * Integration tests the methods of {@link LocalQueryExecutor}. @@ -124,7 +123,8 @@ public void runQuery() throws Exception { final String kafkaServers = kafka.getKafkaHostname() + ":" + kafka.getKafkaPort(); final KafkaStreamsFactory jobFactory = new SingleThreadKafkaStreamsFactory(kafkaServers); final QueryExecutor executor = new LocalQueryExecutor(createKafkaTopic, jobFactory); - executor.startAndWait(); + executor.startAsync(); + executor.awaitRunning(); try { // Start the query. executor.startQuery(ryaInstance, sQuery); @@ -144,7 +144,7 @@ public void runQuery() throws Exception { assertEquals(expected, results); } finally { - executor.stopAndWait(); + executor.stopAsync(); } } } \ No newline at end of file diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java index efbcf4bba..b58aad11a 100644 --- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java +++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java @@ -18,17 +18,7 @@ */ package org.apache.rya.streams.querymanager.kafka; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.Set; -import java.util.UUID; - +import com.google.common.collect.Sets; import org.apache.kafka.streams.KafkaStreams; import org.apache.rya.streams.api.entity.StreamsQuery; import org.apache.rya.streams.kafka.KafkaStreamsFactory; @@ -36,7 +26,16 @@ import org.apache.rya.streams.querymanager.QueryExecutor; import org.junit.Test; -import com.google.common.collect.Sets; +import java.util.Set; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Unit tests the methods of {@link LocalQueryExecutor}. @@ -62,7 +61,8 @@ public void startQuery() throws Exception { // Start the executor that will be tested. final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory); - executor.startAndWait(); + executor.startAsync(); + executor.awaitRunning(); try { // Tell the executor to start the query. executor.startQuery(ryaInstance, query); @@ -70,7 +70,7 @@ public void startQuery() throws Exception { // Show a job was started for that query's ID. verify(queryJob).start(); } finally { - executor.stopAndWait(); + executor.stopAsync(); } } @@ -84,12 +84,13 @@ public void stopQuery_serviceNotStarted() throws Exception { public void stopQuery_queryNotRunning() throws Exception { // Start an executor. final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class)); - executor.startAndWait(); + executor.startAsync(); + executor.awaitRunning(); try { // Try to stop a query that was never stareted. executor.stopQuery(UUID.randomUUID()); } finally { - executor.stopAndWait(); + executor.stopAsync(); } } @@ -106,7 +107,8 @@ public void stopQuery() throws Exception { // Start the executor that will be tested. final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory); - executor.startAndWait(); + executor.startAsync(); + executor.awaitRunning(); try { // Tell the executor to start the query. executor.startQuery(ryaInstance, query); @@ -117,7 +119,7 @@ public void stopQuery() throws Exception { // Show a job was stopped for that query's ID. verify(queryJob).close(); } finally { - executor.stopAndWait(); + executor.stopAsync(); } } @@ -143,7 +145,8 @@ public void stopAll_noneForThatRyaInstance() throws Exception { // Start the executor that will be tested. final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory); - executor.startAndWait(); + executor.startAsync(); + executor.awaitRunning(); try { // Tell the executor to start the queries. executor.startQuery(ryaInstance, query1); @@ -161,7 +164,7 @@ public void stopAll_noneForThatRyaInstance() throws Exception { verify(queryJob2, never()).close(); } finally { - executor.stopAndWait(); + executor.stopAsync(); } } @@ -182,7 +185,8 @@ public void stopAll() throws Exception { // Start the executor that will be tested. final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory); - executor.startAndWait(); + executor.startAsync(); + executor.awaitRunning(); try { // Tell the executor to start the queries. executor.startQuery(ryaInstance1, query1); @@ -200,7 +204,7 @@ public void stopAll() throws Exception { verify(queryJob2).close(); } finally { - executor.stopAndWait(); + executor.stopAsync(); } } @@ -214,7 +218,8 @@ public void getRunningQueryIds_serviceNotStarted() throws Exception { public void getRunningQueryIds_noneStarted() throws Exception { // Start an executor. final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class)); - executor.startAndWait(); + executor.startAsync(); + executor.awaitRunning(); try { // Get the list of running queries. final Set runningQueries = executor.getRunningQueryIds(); @@ -222,7 +227,7 @@ public void getRunningQueryIds_noneStarted() throws Exception { // Show no queries are reported as running. assertTrue(runningQueries.isEmpty()); } finally { - executor.stopAndWait(); + executor.stopAsync(); } } @@ -242,7 +247,8 @@ public void getRunningQueryIds_noneStopped() throws Exception { // Start the executor that will be tested. final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory); - executor.startAndWait(); + executor.startAsync(); + executor.awaitRunning(); try { // Start the queries. executor.startQuery(ryaInstance, query1); @@ -257,7 +263,7 @@ public void getRunningQueryIds_noneStopped() throws Exception { assertEquals(expected, executor.getRunningQueryIds()); } finally { - executor.stopAndWait(); + executor.stopAsync(); } } @@ -277,7 +283,8 @@ public void getRunningQueryIds_stoppedNoLongerListed() throws Exception { // Start the executor that will be tested. final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory); - executor.startAndWait(); + executor.startAsync(); + executor.awaitRunning(); try { // Start the queries. executor.startQuery(ryaInstance, query1); @@ -294,7 +301,7 @@ public void getRunningQueryIds_stoppedNoLongerListed() throws Exception { assertEquals(expected, executor.getRunningQueryIds()); } finally { - executor.stopAndWait(); + executor.stopAsync(); } } } \ No newline at end of file diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/AccumuloHDFSFileInputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/AccumuloHDFSFileInputFormat.java index 63929bb79..132993f56 100644 --- a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/AccumuloHDFSFileInputFormat.java +++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/AccumuloHDFSFileInputFormat.java @@ -19,25 +19,18 @@ * under the License. */ -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - +import com.google.common.base.Preconditions; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.file.rfile.RFileOperations; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.util.ArgumentChecker; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -52,6 +45,12 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + /** * {@link FileInputFormat} that finds the Accumulo tablet files on the HDFS * disk, and uses that as the input for MapReduce jobs. @@ -68,8 +67,8 @@ public List getSplits(JobContext jobContext) throws IOException { String user = MRUtils.AccumuloProps.getUsername(jobContext); AuthenticationToken password = MRUtils.AccumuloProps.getPassword(jobContext); String table = MRUtils.AccumuloProps.getTablename(jobContext); - ArgumentChecker.notNull(instance); - ArgumentChecker.notNull(table); + Preconditions.checkNotNull(instance); + Preconditions.checkNotNull(table); //find the files necessary try { @@ -113,8 +112,11 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont FileSystem fs = file.getFileSystem(job); Instance instance = MRUtils.AccumuloProps.getInstance(taskAttemptContext); - fileSKVIterator = RFileOperations.getInstance().openReader(file.toString(), ALLRANGE, - new HashSet(), false, fs, job, instance.getConfiguration()); + fileSKVIterator = RFileOperations.getInstance().newScanReaderBuilder() + .forFile(file.toString(), fs, job) + .withTableConfiguration(instance.getConfiguration()) + .overRange(ALLRANGE, new HashSet<>(), false) + .build(); } @Override diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java index 2fc272805..521d23f65 100644 --- a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java +++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java @@ -19,21 +19,18 @@ * under the License. */ -import java.io.IOException; -import java.util.Map.Entry; - +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat; import org.apache.accumulo.core.client.mapreduce.RangeInputSplit; +import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; - import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; import org.apache.rya.api.domain.RyaStatement; @@ -41,6 +38,10 @@ import org.apache.rya.api.resolver.triple.TripleRow; import org.apache.rya.api.resolver.triple.TripleRowResolverException; +import java.io.IOException; +import java.util.List; +import java.util.Map.Entry; + /** * Subclass of {@link AbstractInputFormat} for reading * {@link RyaStatementWritable}s directly from a running Rya instance. @@ -76,6 +77,20 @@ public class RyaStatementRecordReader extends AbstractRecordReader contextIterators(TaskAttemptContext context, String tableName) { + return InputConfigurator.getIterators(CLASS, context.getConfiguration()); + } + @Override protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName, RangeInputSplit split) { diff --git a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/AccumuloStorage.java b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/AccumuloStorage.java index d0185a122..591d6eb25 100644 --- a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/AccumuloStorage.java +++ b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/AccumuloStorage.java @@ -19,21 +19,6 @@ * under the License. */ - - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; @@ -69,6 +54,18 @@ import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; + /** * A LoadStoreFunc for retrieving data from and storing data to Accumulo *

@@ -377,6 +374,10 @@ private static byte[] objToBytes(final Object o) { public void cleanupOnFailure(final String failure, final Job job) { } + @Override + public void cleanupOnSuccess(String s, Job job) throws IOException { + } + @Override public WritableComparable getSplitComparable(final InputSplit inputSplit) throws IOException { //cannot get access to the range directly diff --git a/pom.xml b/pom.xml index 5a27585f5..76767ff83 100644 --- a/pom.xml +++ b/pom.xml @@ -69,38 +69,38 @@ under the License. web - 2.3.1 + 3.2.2 - 1.6.4 - 2.5.0 + 1.9.3 + 2.9.2 - 3.4.6 + 3.4.14 - 0.9.2 + 0.15.0 - 5.2.1 - 2.1 + 5.3.1 + 2.9.1 3.10.2 2.2.0 - 3.2.2 + 3.2.2 - 2.6 - 1.10 - 1.6 + 2.6 + 1.10 + 1.6 2.5 - 1.3 + 1.5 - 14.0.1 + 18.0 2.8.1 - 4.5.2 - 4.4.4 + 4.5.10 + 4.4.12 2.2.11 - 1.2.0 + 1.4.0 3.4 1.7.2 1.3 @@ -110,11 +110,12 @@ under the License. 1.0.2.RELEASE 1.1.0.RELEASE - 4.12 - 1.10.19 - 1.1.0 - 1.7.25 - 1.6.1 + 4.12 + 1.1.0 + 1.7.26 + 1.10.19 + 2.28.2 + 2.0.2 UTF-8 UTF-8 @@ -122,16 +123,18 @@ under the License. 3.0.4 - 1.0.0-incubating + 1.2.0 3.0.8 - 0.9.1 + 0.9.3 1.2 1.60 4.0.1 1.13 + 0.14.0 + 1.3.9-1 1.0-1 0.10.0.1 @@ -355,6 +358,20 @@ under the License. + + + + + oss.sonatype.org-snapshot + http://oss.sonatype.org/content/repositories/snapshots + + false + + + true + + + @@ -558,7 +575,7 @@ under the License. org.apache.rya rya.kafka.connect.mongo ${project.version} - + org.apache.accumulo accumulo-core @@ -571,8 +588,10 @@ under the License. org.eclipse.rdf4j - rdf4j-runtime + rdf4j-bom ${org.eclipse.rdf4j.version} + pom + import org.eclipse.rdf4j @@ -604,7 +623,6 @@ under the License. rdf4j-queryresultio-text ${org.eclipse.rdf4j.version} - org.eclipse.rdf4j rdf4j-rio-api @@ -650,6 +668,12 @@ under the License. rdf4j-queryrender ${org.eclipse.rdf4j.version} + + org.eclipse.rdf4j + rdf4j-runtime + ${org.eclipse.rdf4j.version} + pom + org.eclipse.rdf4j rdf4j-runtime-osgi @@ -1001,7 +1025,13 @@ under the License. org.mockito mockito-all - ${mockito.version} + ${mockito.all.version} + test + + + org.mockito + mockito-core + ${mockito.core.version} test @@ -1017,19 +1047,53 @@ under the License. test + + org.powermock + powermock-core + + + org.powermock + powermock-all + org.powermock powermock-module-junit4 + + org.powermock + powermock-api-mockito + + + org.powermock + powermock-api-mockito2 + + + org.powermock + powermock-core + ${powermock.version} + test + + + org.powermock + powermock-all + ${powermock.version} + test + org.powermock powermock-module-junit4 ${powermock.version} test - + + org.powermock + powermock-api-mockito2 + ${powermock.version} + test + + org.openjdk.jmh @@ -1131,11 +1195,11 @@ under the License. - + org.codehaus.mojo animal-sniffer-maven-plugin - 1.15 + 1.16 org.codehaus.mojo.signature diff --git a/sail/pom.xml b/sail/pom.xml index d222e4193..b2508423c 100644 --- a/sail/pom.xml +++ b/sail/pom.xml @@ -67,6 +67,7 @@ under the License. org.eclipse.rdf4j rdf4j-runtime + pom diff --git a/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java b/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java index ae97fd8ab..56394883a 100644 --- a/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java +++ b/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java @@ -103,18 +103,21 @@ import org.eclipse.rdf4j.query.impl.EmptyBindingSet; import org.eclipse.rdf4j.sail.SailException; import org.eclipse.rdf4j.sail.helpers.AbstractSailConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RdfCloudTripleStoreConnection extends AbstractSailConnection { + + private static final Logger logger = LoggerFactory.getLogger(RdfCloudTripleStoreConnection.class); + private final RdfCloudTripleStore store; + private final C conf; private RdfEvalStatsDAO rdfEvalStatsDAO; private SelectivityEvalDAO selectEvalDAO; private RyaDAO ryaDAO; private InferenceEngine inferenceEngine; private NamespaceManager namespaceManager; - private final C conf; - - private ProvenanceCollector provenanceCollector; public RdfCloudTripleStoreConnection(final RdfCloudTripleStore sailBase, final C conf, final ValueFactory vf) From 6bdd629f5535c5ef0b606e73f577000eb34a7d35 Mon Sep 17 00:00:00 2001 From: Brad Rushworth <67445484+brushworth@users.noreply.github.com> Date: Tue, 14 Jul 2020 17:16:59 +1000 Subject: [PATCH 2/4] Replacing the old @Immutable annotation with @Contract(threading = ThreadingBehavior.IMMUTABLE) per the advice at https://dev-aux.com/java/org-apache-http-annotation-threadsafe-class-not-found --- extras/indexing/pom.xml | 2 +- .../java/org/apache/rya/indexing/entity/model/Entity.java | 5 +++-- .../java/org/apache/rya/indexing/entity/model/Property.java | 5 +++-- .../main/java/org/apache/rya/indexing/entity/model/Type.java | 5 +++-- .../org/apache/rya/indexing/entity/model/TypedEntity.java | 5 +++-- .../rya/export/api/conf/AccumuloMergeConfiguration.java | 5 +++-- .../org/apache/rya/export/api/conf/MergeConfiguration.java | 5 +++-- pom.xml | 2 +- 8 files changed, 20 insertions(+), 14 deletions(-) diff --git a/extras/indexing/pom.xml b/extras/indexing/pom.xml index c9efd0672..aba2d99e8 100644 --- a/extras/indexing/pom.xml +++ b/extras/indexing/pom.xml @@ -70,7 +70,6 @@ httpclient - org.apache.rya @@ -84,6 +83,7 @@ org.apache.rya rya.periodic.notification.api + org.eclipse.rdf4j diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Entity.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Entity.java index 5f0497820..e1afec64e 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Entity.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Entity.java @@ -24,7 +24,8 @@ import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; -import jdk.nashorn.internal.ir.annotations.Immutable; +import org.apache.http.annotation.Contract; +import org.apache.http.annotation.ThreadingBehavior; import org.apache.log4j.Logger; import org.apache.rya.api.domain.RyaIRI; import org.apache.rya.indexing.entity.storage.EntityStorage; @@ -72,7 +73,7 @@ * the {@link Type}, but nothing has explicitly indicated it is of that Type. * Once something has done so, it is an explicitly typed Entity. */ -@Immutable +@Contract(threading = ThreadingBehavior.IMMUTABLE) @DefaultAnnotation(NonNull.class) public class Entity { private static final Logger log = Logger.getLogger(Entity.class); diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Property.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Property.java index 0b9b77d58..cda44bb7a 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Property.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Property.java @@ -20,7 +20,8 @@ import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; -import jdk.nashorn.internal.ir.annotations.Immutable; +import org.apache.http.annotation.Contract; +import org.apache.http.annotation.ThreadingBehavior; import org.apache.rya.api.domain.RyaIRI; import org.apache.rya.api.domain.RyaType; @@ -31,7 +32,7 @@ /** * A value that has been set for an {@link TypedEntity}. */ -@Immutable +@Contract(threading = ThreadingBehavior.IMMUTABLE) @DefaultAnnotation(NonNull.class) public class Property { diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Type.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Type.java index 1a61812bf..23c3dbd54 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Type.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Type.java @@ -21,7 +21,8 @@ import com.google.common.collect.ImmutableSet; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; -import jdk.nashorn.internal.ir.annotations.Immutable; +import org.apache.http.annotation.Contract; +import org.apache.http.annotation.ThreadingBehavior; import org.apache.rya.api.domain.RyaIRI; import org.apache.rya.indexing.entity.storage.TypeStorage; @@ -42,7 +43,7 @@ * <urn:nutritionalInformation> * */ -@Immutable +@Contract(threading = ThreadingBehavior.IMMUTABLE) @DefaultAnnotation(NonNull.class) public class Type { diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/TypedEntity.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/TypedEntity.java index fdce30dc2..ad0f0dd62 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/TypedEntity.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/TypedEntity.java @@ -23,7 +23,8 @@ import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; -import jdk.nashorn.internal.ir.annotations.Immutable; +import org.apache.http.annotation.Contract; +import org.apache.http.annotation.ThreadingBehavior; import org.apache.rya.api.domain.RyaIRI; import org.apache.rya.api.domain.RyaType; @@ -38,7 +39,7 @@ * A {@link TypedEntity} is a view of an {@link Entity} that has had a specific * {@link Type} applied to it. */ -@Immutable +@Contract(threading = ThreadingBehavior.IMMUTABLE) @DefaultAnnotation(NonNull.class) public class TypedEntity { diff --git a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/api/conf/AccumuloMergeConfiguration.java b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/api/conf/AccumuloMergeConfiguration.java index 1a351147f..9f01b4a2f 100644 --- a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/api/conf/AccumuloMergeConfiguration.java +++ b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/api/conf/AccumuloMergeConfiguration.java @@ -18,14 +18,15 @@ */ package org.apache.rya.export.api.conf; -import jdk.nashorn.internal.ir.annotations.Immutable; +import org.apache.http.annotation.Contract; +import org.apache.http.annotation.ThreadingBehavior; import org.apache.rya.export.InstanceType; /** * Immutable configuration object to allow the MergeTool to connect to the parent and child * databases for data merging. */ -@Immutable +@Contract(threading = ThreadingBehavior.IMMUTABLE) public class AccumuloMergeConfiguration extends MergeConfigurationDecorator { /** * Information needed to connect to the parent database diff --git a/extras/rya.export/export.api/src/main/java/org/apache/rya/export/api/conf/MergeConfiguration.java b/extras/rya.export/export.api/src/main/java/org/apache/rya/export/api/conf/MergeConfiguration.java index 329589bd4..757ce63ec 100644 --- a/extras/rya.export/export.api/src/main/java/org/apache/rya/export/api/conf/MergeConfiguration.java +++ b/extras/rya.export/export.api/src/main/java/org/apache/rya/export/api/conf/MergeConfiguration.java @@ -18,7 +18,8 @@ */ package org.apache.rya.export.api.conf; -import jdk.nashorn.internal.ir.annotations.Immutable; +import org.apache.http.annotation.Contract; +import org.apache.http.annotation.ThreadingBehavior; import org.apache.rya.export.DBType; import org.apache.rya.export.MergePolicy; @@ -28,7 +29,7 @@ * Immutable configuration object to allow the MergeTool to connect to the parent and child * databases for data merging. */ -@Immutable +@Contract(threading = ThreadingBehavior.IMMUTABLE) public class MergeConfiguration { /** * Information needed to connect to the parent database diff --git a/pom.xml b/pom.xml index 76767ff83..7187602f5 100644 --- a/pom.xml +++ b/pom.xml @@ -250,7 +250,7 @@ under the License. LocationTech - Third Party - https://repo.eclipse.org/content/repositories/thirdparty/ + https://repo.eclipse.org/content/repositories/locationtech-thirdparty/ true From dc8896d660784d19df00c47c5374b9dbf3530a2b Mon Sep 17 00:00:00 2001 From: Brad Rushworth <67445484+brushworth@users.noreply.github.com> Date: Mon, 16 Nov 2020 16:19:05 +1100 Subject: [PATCH 3/4] RYA-534 Upgrade to RDF4J 3.4.4 RYA-496 Upgrading to Accumulo 1.10.0 --- extras/periodic.notification/twill/pom.xml | 6 ++++++ extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml | 2 -- pom.xml | 12 +++++++----- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/extras/periodic.notification/twill/pom.xml b/extras/periodic.notification/twill/pom.xml index 8f6d969af..ed595698f 100644 --- a/extras/periodic.notification/twill/pom.xml +++ b/extras/periodic.notification/twill/pom.xml @@ -165,9 +165,15 @@ commons-logging:commons-logging org.slf4j:slf4j-log4j12 log4j:log4j + org.locationtech.spatial4j:spatial4j + + + com.fasterxml.jackson.core:jackson-annotations + com.fasterxml.jackson.core:jackson-core + com.fasterxml.jackson.core:jackson-databind diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml b/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml index 26fa77cda..6198f32cf 100644 --- a/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml +++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml @@ -42,12 +42,10 @@ under the License. com.vividsolutions jts - 1.13 org.eclipse.rdf4j rdf4j-queryalgebra-geosparql - ${org.eclipse.rdf4j.version} org.eclipse.rdf4j diff --git a/pom.xml b/pom.xml index 7187602f5..70a758c9d 100644 --- a/pom.xml +++ b/pom.xml @@ -69,9 +69,11 @@ under the License. web - 3.2.2 + 3.4.4 - 1.9.3 + 2.10.4 + + 1.10.0 2.9.2 3.4.14 @@ -110,7 +112,7 @@ under the License. 1.0.2.RELEASE 1.1.0.RELEASE - 4.12 + 4.13.1 1.1.0 1.7.26 1.10.19 @@ -152,7 +154,7 @@ under the License. geoindexing 1.3.0-m1 - 0.9.3 + 0.9.3 1.13 @@ -222,7 +224,7 @@ under the License. com.vividsolutions jts - 1.13 + ${jts.version} From 6a988ac75d2fc9eeaffed30920034b04c5e1d29e Mon Sep 17 00:00:00 2001 From: Brad Rushworth <67445484+brushworth@users.noreply.github.com> Date: Fri, 8 Jan 2021 15:12:42 +1100 Subject: [PATCH 4/4] RYA-534 The geoindexing profile needed some refactoring following its dependency upgrade. --- extras/rya.giraph/pom.xml | 5 +++ .../rya.pcj.functions.geo/pom.xml | 2 +- .../pcj/functions/geo/BufferRdf4J.java | 26 +++++++++++++++ .../pcj/functions/geo/EhEqualsRdf4J.java | 26 +++++++++++++++ .../pcj/functions/geo/RCC8EQRdf4J.java | 26 +++++++++++++++ .../pcj/functions/geo/SfEqualsRdf4J.java | 26 +++++++++++++++ .../geosparql/SpatialSupportInitializer.java | 33 ++++++++++++++----- ...query.algebra.evaluation.function.Function | 4 +++ .../kafka/processors/filter/GeoFilterIT.java | 8 ++--- pom.xml | 1 + 10 files changed, 144 insertions(+), 13 deletions(-) create mode 100644 extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/BufferRdf4J.java create mode 100644 extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/EhEqualsRdf4J.java create mode 100644 extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/RCC8EQRdf4J.java create mode 100644 extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/SfEqualsRdf4J.java diff --git a/extras/rya.giraph/pom.xml b/extras/rya.giraph/pom.xml index 75e65157a..90b4eecb7 100644 --- a/extras/rya.giraph/pom.xml +++ b/extras/rya.giraph/pom.xml @@ -105,5 +105,10 @@ under the License. + + junit + junit + test + \ No newline at end of file diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml b/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml index 603eb2a14..b72f79acb 100644 --- a/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml +++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml @@ -75,7 +75,7 @@ under the License. SpatialContextFactory - com.spatial4j.core.context.jts.JtsSpatialContextFactory + org.locationtech.spatial4j.context.jts.JtsSpatialContextFactory diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/BufferRdf4J.java b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/BufferRdf4J.java new file mode 100644 index 000000000..bd75c6849 --- /dev/null +++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/BufferRdf4J.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.rya.indexing.pcj.functions.geo; + +public class BufferRdf4J extends FunctionAdapter { + public BufferRdf4J() { + super(new org.eclipse.rdf4j.query.algebra.evaluation.function.geosparql.Buffer()); + } +} diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/EhEqualsRdf4J.java b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/EhEqualsRdf4J.java new file mode 100644 index 000000000..b4df6eb61 --- /dev/null +++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/EhEqualsRdf4J.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.rya.indexing.pcj.functions.geo; + +public class EhEqualsRdf4J extends FunctionAdapter { + public EhEqualsRdf4J() { + super(new org.eclipse.rdf4j.query.algebra.evaluation.function.geosparql.EhEquals()); + } +} diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/RCC8EQRdf4J.java b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/RCC8EQRdf4J.java new file mode 100644 index 000000000..cc144d9c2 --- /dev/null +++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/RCC8EQRdf4J.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.rya.indexing.pcj.functions.geo; + +public class RCC8EQRdf4J extends FunctionAdapter { + public RCC8EQRdf4J() { + super(new org.eclipse.rdf4j.query.algebra.evaluation.function.geosparql.RCC8EQ()); + } +} diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/SfEqualsRdf4J.java b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/SfEqualsRdf4J.java new file mode 100644 index 000000000..20bd0185e --- /dev/null +++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/SfEqualsRdf4J.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.rya.indexing.pcj.functions.geo; + +public class SfEqualsRdf4J extends FunctionAdapter { + public SfEqualsRdf4J() { + super(new org.eclipse.rdf4j.query.algebra.evaluation.function.geosparql.SfEquals()); + } +} diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/function/geosparql/SpatialSupportInitializer.java b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/function/geosparql/SpatialSupportInitializer.java index ff385ba0e..c6bbaec45 100644 --- a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/function/geosparql/SpatialSupportInitializer.java +++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/function/geosparql/SpatialSupportInitializer.java @@ -18,16 +18,18 @@ */ package org.eclipse.rdf4j.query.algebra.evaluation.function.geosparql; -import java.io.IOException; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.io.WKTWriter; +import org.locationtech.spatial4j.context.SpatialContext; +import org.locationtech.spatial4j.context.jts.JtsSpatialContext; +import org.locationtech.spatial4j.shape.Shape; -import com.spatial4j.core.context.SpatialContext; -import com.spatial4j.core.context.jts.JtsSpatialContext; -import com.spatial4j.core.shape.Shape; -import com.vividsolutions.jts.geom.Geometry; -import com.vividsolutions.jts.io.WKTWriter; +import java.io.IOException; /** * See https://bitbucket.org/pulquero/sesame-geosparql-jts + * + * GeoSPARQL standard defined at https://www.ogc.org/standards/geosparql */ public class SpatialSupportInitializer extends SpatialSupport { @@ -53,6 +55,11 @@ public JtsSpatialAlgebra(JtsSpatialContext context) { this.context = context; } + @Override + public Shape buffer(Shape s, double distance) { + return context.makeShape(context.getGeometryFrom(s).buffer(distance)); + } + @Override public Shape convexHull(Shape s) { return context.makeShape(context.getGeometryFrom(s).convexHull()); @@ -94,8 +101,8 @@ public boolean relate(Shape s1, Shape s2, String intersectionPattern) { } @Override - public boolean equals(Shape s1, Shape s2) { - return context.getGeometryFrom(s1).equalsNorm(context.getGeometryFrom(s2)); + public boolean sfEquals(Shape s1, Shape s2) { + return relate(s1, s2, "TFFFTFFFT"); } @Override @@ -153,6 +160,11 @@ public boolean sfOverlaps(Shape s1, Shape s2) { } } + @Override + public boolean ehEquals(Shape s1, Shape s2) { + return relate(s1, s2, "TFFFTFFFT"); + } + @Override public boolean ehDisjoint(Shape s1, Shape s2) { return relate(s1, s2, "FF*FF****"); @@ -223,6 +235,11 @@ public boolean rcc8ntppi(Shape s1, Shape s2) { return relate(s1, s2, "TTTFFTFFT"); } + @Override + public boolean rcc8eq(Shape s1, Shape s2) { + return relate(s1, s2, "TFFFTFFFT"); + } + } static class JtsWktWriter implements WktWriter { diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/resources/META-INF/services/org.eclipse.rdf4j.query.algebra.evaluation.function.Function b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/resources/META-INF/services/org.eclipse.rdf4j.query.algebra.evaluation.function.Function index ddfb09b92..1f8f71d90 100644 --- a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/resources/META-INF/services/org.eclipse.rdf4j.query.algebra.evaluation.function.Function +++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/resources/META-INF/services/org.eclipse.rdf4j.query.algebra.evaluation.function.Function @@ -1,6 +1,7 @@ org.apache.rya.indexing.pcj.functions.geo.DistanceRdf4J org.apache.rya.indexing.pcj.functions.geo.ConvexHullRdf4J org.apache.rya.indexing.pcj.functions.geo.BoundaryRdf4J +org.apache.rya.indexing.pcj.functions.geo.BufferRdf4J org.apache.rya.indexing.pcj.functions.geo.EnvelopeRdf4J org.apache.rya.indexing.pcj.functions.geo.UnionRdf4J org.apache.rya.indexing.pcj.functions.geo.IntersectionRdf4J @@ -14,6 +15,7 @@ org.apache.rya.indexing.pcj.functions.geo.SfCrossesRdf4J org.apache.rya.indexing.pcj.functions.geo.SfWithinRdf4J org.apache.rya.indexing.pcj.functions.geo.SfContainsRdf4J org.apache.rya.indexing.pcj.functions.geo.SfOverlapsRdf4J +org.apache.rya.indexing.pcj.functions.geo.SfEqualsRdf4J org.apache.rya.indexing.pcj.functions.geo.EhDisjointRdf4J org.apache.rya.indexing.pcj.functions.geo.EhMeetRdf4J org.apache.rya.indexing.pcj.functions.geo.EhOverlapRdf4J @@ -21,8 +23,10 @@ org.apache.rya.indexing.pcj.functions.geo.EhCoversRdf4J org.apache.rya.indexing.pcj.functions.geo.EhCoveredByRdf4J org.apache.rya.indexing.pcj.functions.geo.EhInsideRdf4J org.apache.rya.indexing.pcj.functions.geo.EhContainsRdf4J +org.apache.rya.indexing.pcj.functions.geo.EhEqualsRdf4J org.apache.rya.indexing.pcj.functions.geo.RCC8DCRdf4J org.apache.rya.indexing.pcj.functions.geo.RCC8ECRdf4J +org.apache.rya.indexing.pcj.functions.geo.RCC8EQRdf4J org.apache.rya.indexing.pcj.functions.geo.RCC8PORdf4J org.apache.rya.indexing.pcj.functions.geo.RCC8TPPIRdf4J org.apache.rya.indexing.pcj.functions.geo.RCC8TPPRdf4J diff --git a/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java b/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java index 6fb798cf6..f2503437b 100644 --- a/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java +++ b/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java @@ -50,10 +50,10 @@ import org.junit.Rule; import org.junit.Test; -import com.vividsolutions.jts.geom.Coordinate; -import com.vividsolutions.jts.geom.Geometry; -import com.vividsolutions.jts.geom.GeometryFactory; -import com.vividsolutions.jts.io.WKTWriter; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.geom.GeometryFactory; +import org.locationtech.jts.io.WKTWriter; /** * Integration tests the geo methods of {@link FilterProcessor}. diff --git a/pom.xml b/pom.xml index 9592ec557..6a25fa51e 100644 --- a/pom.xml +++ b/pom.xml @@ -152,6 +152,7 @@ under the License. geoindexing + 1.3.0-m1 0.9.3