From dc77208299add0e8770e84b843c84ea6fde2e67d Mon Sep 17 00:00:00 2001
From: Brad Rushworth <67445484+brushworth@users.noreply.github.com>
Date: Tue, 30 Jun 2020 13:57:49 +1000
Subject: [PATCH 1/3] RYA-534 Upgrade to RDF4J 3.2 and RYA-496 Upgrading to
Accumulo 1.9.3
---
common/rya.provenance/pom.xml | 1 +
.../rya/indexing/entity/model/Entity.java | 30 ++--
.../rya/indexing/entity/model/Property.java | 13 +-
.../rya/indexing/entity/model/Type.java | 14 +-
.../indexing/entity/model/TypedEntity.java | 20 ++-
.../sail/config/RyaAccumuloSailConfig.java | 42 +++---
extras/kafka.connect/api/pom.xml | 1 +
.../periodic.notification/twill.yarn/pom.xml | 2 +-
extras/periodic.notification/twill/pom.xml | 7 +-
.../api/conf/AccumuloMergeConfiguration.java | 2 +-
.../export/api/conf/MergeConfiguration.java | 6 +-
extras/rya.forwardchain/pom.xml | 3 +-
.../app/query/StatementPatternIdManager.java | 25 ++--
extras/rya.prospector/pom.xml | 11 ++
extras/rya.reasoning/pom.xml | 9 +-
.../queries/InMemoryQueryRepositoryTest.java | 39 +++---
.../client/command/RunQueryCommandIT.java | 27 ++--
.../kafka/KafkaRyaStreamsClientFactory.java | 22 ++-
.../streams/querymanager/QueryManager.java | 65 +++++----
.../querymanager/QueryManagerDaemon.java | 35 +++--
.../querymanager/QueryManagerTest.java | 37 ++---
.../kafka/KafkaQueryChangeLogSourceIT.java | 44 +++---
.../kafka/LocalQueryExecutorIT.java | 18 +--
.../kafka/LocalQueryExecutorTest.java | 63 +++++----
.../mr/AccumuloHDFSFileInputFormat.java | 26 ++--
.../rya/accumulo/mr/RyaInputFormat.java | 25 +++-
.../rya/accumulo/pig/AccumuloStorage.java | 31 ++--
pom.xml | 132 +++++++++++++-----
sail/pom.xml | 1 +
.../RdfCloudTripleStoreConnection.java | 9 +-
30 files changed, 437 insertions(+), 323 deletions(-)
diff --git a/common/rya.provenance/pom.xml b/common/rya.provenance/pom.xml
index 87c6e25d9..7096dba02 100644
--- a/common/rya.provenance/pom.xml
+++ b/common/rya.provenance/pom.xml
@@ -33,6 +33,7 @@ under the License.
org.eclipse.rdf4j
rdf4j-runtime
+ pom
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Entity.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Entity.java
index 6874a164a..5f0497820 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Entity.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Entity.java
@@ -18,7 +18,19 @@
*/
package org.apache.rya.indexing.entity.model;
-import static java.util.Objects.requireNonNull;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import jdk.nashorn.internal.ir.annotations.Immutable;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaIRI;
+import org.apache.rya.indexing.entity.storage.EntityStorage;
+import org.apache.rya.indexing.smarturi.SmartUriAdapter;
+import org.apache.rya.indexing.smarturi.SmartUriException;
+import org.eclipse.rdf4j.model.IRI;
import java.util.ArrayList;
import java.util.HashMap;
@@ -28,21 +40,7 @@
import java.util.Objects;
import java.util.Optional;
-import org.apache.http.annotation.Immutable;
-import org.apache.log4j.Logger;
-import org.apache.rya.api.domain.RyaIRI;
-import org.apache.rya.indexing.entity.storage.EntityStorage;
-import org.apache.rya.indexing.smarturi.SmartUriAdapter;
-import org.apache.rya.indexing.smarturi.SmartUriException;
-import org.eclipse.rdf4j.model.IRI;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import edu.umd.cs.findbugs.annotations.Nullable;
+import static java.util.Objects.requireNonNull;
/**
* An {@link Entity} is a named concept that has at least one defined structure
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Property.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Property.java
index b44c52c09..0b9b77d58 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Property.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Property.java
@@ -18,16 +18,15 @@
*/
package org.apache.rya.indexing.entity.model;
-import static java.util.Objects.requireNonNull;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import jdk.nashorn.internal.ir.annotations.Immutable;
+import org.apache.rya.api.domain.RyaIRI;
+import org.apache.rya.api.domain.RyaType;
import java.util.Objects;
-import org.apache.http.annotation.Immutable;
-import org.apache.rya.api.domain.RyaType;
-import org.apache.rya.api.domain.RyaIRI;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
+import static java.util.Objects.requireNonNull;
/**
* A value that has been set for an {@link TypedEntity}.
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Type.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Type.java
index a7c988b72..1a61812bf 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Type.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Type.java
@@ -18,18 +18,16 @@
*/
package org.apache.rya.indexing.entity.model;
-import static java.util.Objects.requireNonNull;
-
-import java.util.Objects;
-
-import org.apache.http.annotation.Immutable;
+import com.google.common.collect.ImmutableSet;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import jdk.nashorn.internal.ir.annotations.Immutable;
import org.apache.rya.api.domain.RyaIRI;
import org.apache.rya.indexing.entity.storage.TypeStorage;
-import com.google.common.collect.ImmutableSet;
+import java.util.Objects;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
+import static java.util.Objects.requireNonNull;
/**
* Defines the structure of an {@link TypedEntity}.
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/TypedEntity.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/TypedEntity.java
index 816e7fa51..fdce30dc2 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/TypedEntity.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/TypedEntity.java
@@ -18,23 +18,21 @@
*/
package org.apache.rya.indexing.entity.model;
-import static java.util.Objects.requireNonNull;
+import com.google.common.collect.ImmutableCollection;
+import com.google.common.collect.ImmutableMap;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import jdk.nashorn.internal.ir.annotations.Immutable;
+import org.apache.rya.api.domain.RyaIRI;
+import org.apache.rya.api.domain.RyaType;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import org.apache.http.annotation.Immutable;
-import org.apache.rya.api.domain.RyaType;
-import org.apache.rya.api.domain.RyaIRI;
-
-import com.google.common.collect.ImmutableCollection;
-import com.google.common.collect.ImmutableMap;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import edu.umd.cs.findbugs.annotations.Nullable;
+import static java.util.Objects.requireNonNull;
/**
* A {@link TypedEntity} is a view of an {@link Entity} that has had a specific
diff --git a/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaAccumuloSailConfig.java b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaAccumuloSailConfig.java
index 43af1fb81..37e9b226e 100644
--- a/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaAccumuloSailConfig.java
+++ b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaAccumuloSailConfig.java
@@ -20,16 +20,16 @@
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.eclipse.rdf4j.model.IRI;
-import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Model;
import org.eclipse.rdf4j.model.Resource;
+import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
-import org.eclipse.rdf4j.model.util.GraphUtil;
-import org.eclipse.rdf4j.model.util.GraphUtilException;
import org.eclipse.rdf4j.sail.config.AbstractSailImplConfig;
import org.eclipse.rdf4j.sail.config.SailConfigException;
+import java.util.Set;
+
/**
* @deprecated Use {@link AccumuloRdfConfiguration} instead.
*/
@@ -117,9 +117,7 @@ public void validate() throws SailConfigException {
public Resource export(final Model model) {
final Resource implNode = super.export(model);
- @SuppressWarnings("deprecation")
- final
- ValueFactory v = model.getValueFactory();
+ final ValueFactory v = SimpleValueFactory.getInstance();
model.add(implNode, USER, v.createLiteral(user));
model.add(implNode, PASSWORD, v.createLiteral(password));
@@ -136,27 +134,27 @@ public void parse(final Model model, final Resource implNode) throws SailConfigE
System.out.println("parsing");
try {
- final Literal userLit = GraphUtil.getOptionalObjectLiteral(model, implNode, USER);
- if (userLit != null) {
- setUser(userLit.getLabel());
+ final Set userLit = model.filter(implNode, USER, null).objects();
+ if (userLit.size() == 1) {
+ setUser(userLit.iterator().next().stringValue());
}
- final Literal pwdLit = GraphUtil.getOptionalObjectLiteral(model, implNode, PASSWORD);
- if (pwdLit != null) {
- setPassword(pwdLit.getLabel());
+ final Set pwdLit = model.filter(implNode, PASSWORD, null).objects();
+ if (pwdLit.size() == 1) {
+ setPassword(pwdLit.iterator().next().stringValue());
}
- final Literal instLit = GraphUtil.getOptionalObjectLiteral(model, implNode, INSTANCE);
- if (instLit != null) {
- setInstance(instLit.getLabel());
+ final Set instLit = model.filter(implNode, INSTANCE, null).objects();
+ if (instLit.size() == 1) {
+ setInstance(instLit.iterator().next().stringValue());
}
- final Literal zooLit = GraphUtil.getOptionalObjectLiteral(model, implNode, ZOOKEEPERS);
- if (zooLit != null) {
- setZookeepers(zooLit.getLabel());
+ final Set zooLit = model.filter(implNode, ZOOKEEPERS, null).objects();
+ if (zooLit.size() == 1) {
+ setZookeepers(zooLit.iterator().next().stringValue());
}
- final Literal mockLit = GraphUtil.getOptionalObjectLiteral(model, implNode, IS_MOCK);
- if (mockLit != null) {
- setMock(Boolean.parseBoolean(mockLit.getLabel()));
+ final Set mockLit = model.filter(implNode, IS_MOCK, null).objects();
+ if (mockLit.size() == 1) {
+ setMock(Boolean.parseBoolean(mockLit.iterator().next().stringValue()));
}
- } catch (final GraphUtilException e) {
+ } catch (final Exception e) {
throw new SailConfigException(e.getMessage(), e);
}
}
diff --git a/extras/kafka.connect/api/pom.xml b/extras/kafka.connect/api/pom.xml
index fcc6a154e..df86d3943 100644
--- a/extras/kafka.connect/api/pom.xml
+++ b/extras/kafka.connect/api/pom.xml
@@ -68,6 +68,7 @@
org.eclipse.rdf4j
rdf4j-runtime
+ pom
org.slf4j
diff --git a/extras/periodic.notification/twill.yarn/pom.xml b/extras/periodic.notification/twill.yarn/pom.xml
index 00cb1905f..46bee18ec 100644
--- a/extras/periodic.notification/twill.yarn/pom.xml
+++ b/extras/periodic.notification/twill.yarn/pom.xml
@@ -43,7 +43,7 @@
org.apache.twill
twill-yarn
- 0.12.0
+ ${twill.version}
org.apache.rya
diff --git a/extras/periodic.notification/twill/pom.xml b/extras/periodic.notification/twill/pom.xml
index a4e8f7a9e..8f6d969af 100644
--- a/extras/periodic.notification/twill/pom.xml
+++ b/extras/periodic.notification/twill/pom.xml
@@ -77,12 +77,12 @@
com.google.guava
- guava
+ guava
org.apache.twill
twill-api
- 0.12.0
+ ${twill.version}
@@ -165,6 +165,9 @@
commons-logging:commons-logging
org.slf4j:slf4j-log4j12
log4j:log4j
+
+
+ org.locationtech.spatial4j:spatial4j
diff --git a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/api/conf/AccumuloMergeConfiguration.java b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/api/conf/AccumuloMergeConfiguration.java
index a35d5aab7..1a351147f 100644
--- a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/api/conf/AccumuloMergeConfiguration.java
+++ b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/api/conf/AccumuloMergeConfiguration.java
@@ -18,7 +18,7 @@
*/
package org.apache.rya.export.api.conf;
-import org.apache.http.annotation.Immutable;
+import jdk.nashorn.internal.ir.annotations.Immutable;
import org.apache.rya.export.InstanceType;
/**
diff --git a/extras/rya.export/export.api/src/main/java/org/apache/rya/export/api/conf/MergeConfiguration.java b/extras/rya.export/export.api/src/main/java/org/apache/rya/export/api/conf/MergeConfiguration.java
index 96f7ed678..329589bd4 100644
--- a/extras/rya.export/export.api/src/main/java/org/apache/rya/export/api/conf/MergeConfiguration.java
+++ b/extras/rya.export/export.api/src/main/java/org/apache/rya/export/api/conf/MergeConfiguration.java
@@ -18,12 +18,12 @@
*/
package org.apache.rya.export.api.conf;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.http.annotation.Immutable;
+import jdk.nashorn.internal.ir.annotations.Immutable;
import org.apache.rya.export.DBType;
import org.apache.rya.export.MergePolicy;
+import static com.google.common.base.Preconditions.checkNotNull;
+
/**
* Immutable configuration object to allow the MergeTool to connect to the parent and child
* databases for data merging.
diff --git a/extras/rya.forwardchain/pom.xml b/extras/rya.forwardchain/pom.xml
index b5c275a39..4ade411f7 100644
--- a/extras/rya.forwardchain/pom.xml
+++ b/extras/rya.forwardchain/pom.xml
@@ -33,7 +33,8 @@ under the License.
org.eclipse.rdf4j
rdf4j-runtime
-
+ pom
+
org.apache.rya
rya.api
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java
index ee4c053bb..9a8505e21 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java
@@ -18,23 +18,22 @@
*/
package org.apache.rya.indexing.pcj.fluo.app.query;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hashing;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+
+import java.util.HashSet;
+import java.util.Set;
+
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.data.Bytes;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
-import com.google.common.hash.Hashing;
-
/**
* Utility class for updating and removing StatementPattern nodeIds in the Fluo table. All StatementPattern nodeIds are
* stored in a single set under a single entry in the Fluo table. This is to eliminate the need for a scan to find all
@@ -62,7 +61,7 @@ public static void addStatementPatternIds(TransactionBase tx, Set ids) {
}
String idString = builder.append(Joiner.on(VAR_DELIM).join(ids)).toString();
tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS, Bytes.of(idString));
- tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, Bytes.of(Hashing.sha256().hashString(idString).toString()));
+ tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, Bytes.of(Hashing.sha256().hashUnencodedChars(idString).toString()));
}
/**
@@ -84,7 +83,7 @@ public static void removeStatementPatternIds(TransactionBase tx, Set ids
storedIds.removeAll(ids);
String idString = Joiner.on(VAR_DELIM).join(ids);
tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS, Bytes.of(idString));
- tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, Bytes.of(Hashing.sha256().hashString(idString).toString()));
+ tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, Bytes.of(Hashing.sha256().hashUnencodedChars(idString).toString()));
}
}
diff --git a/extras/rya.prospector/pom.xml b/extras/rya.prospector/pom.xml
index 5ec2f2e53..8ed6f8de6 100644
--- a/extras/rya.prospector/pom.xml
+++ b/extras/rya.prospector/pom.xml
@@ -55,6 +55,17 @@ under the License.
hadoop2
test
+
+ org.powermock
+ powermock-module-junit4
+ test
+
+
+ org.powermock
+ powermock-api-mockito2
+ test
+
+
diff --git a/extras/rya.reasoning/pom.xml b/extras/rya.reasoning/pom.xml
index 21efb2000..9ad3d7b32 100644
--- a/extras/rya.reasoning/pom.xml
+++ b/extras/rya.reasoning/pom.xml
@@ -58,7 +58,7 @@ under the License.
org.apache.accumulo
accumulo-minicluster
- compile
+ provided
@@ -77,6 +77,7 @@ under the License.
org.eclipse.rdf4j
rdf4j-runtime
+ pom
@@ -84,12 +85,18 @@ under the License.
mrunit
hadoop2
test
+
org.powermock
powermock-module-junit4
test
+
+ org.powermock
+ powermock-api-mockito2
+ test
+
diff --git a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
index 5a16e799a..daba34e32 100644
--- a/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
+++ b/extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
@@ -18,11 +18,11 @@
*/
package org.apache.rya.streams.api.queries;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException;
+import org.junit.Test;
import java.util.HashSet;
import java.util.Optional;
@@ -31,12 +31,11 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.rya.streams.api.entity.StreamsQuery;
-import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException;
-import org.junit.Test;
-
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Unit tests the methods of {@link InMemoryQueryRepository}.
@@ -83,7 +82,7 @@ public void initializedWithPopulatedChangeLog() throws Exception {
final QueryChangeLog changeLog = new InMemoryQueryChangeLog();
final QueryRepository queries = new InMemoryQueryRepository( changeLog, SCHEDULE );
try {
- queries.startAndWait();
+ queries.startAsync();
// Add some queries and deletes to it.
final Set expected = new HashSet<>();
expected.add( queries.add("query 1", true, true) );
@@ -98,10 +97,10 @@ public void initializedWithPopulatedChangeLog() throws Exception {
final Set stored = initializedQueries.list();
assertEquals(expected, stored);
} finally {
- queries.stop();
+ queries.stopAsync();
}
} finally {
- queries.stop();
+ queries.stopAsync();
}
}
@@ -160,7 +159,7 @@ public void updateListenerNotify() throws Exception {
// Setup a totally in memory QueryRepository.
final QueryRepository queries = new InMemoryQueryRepository( new InMemoryQueryChangeLog(), SCHEDULE );
try {
- queries.startAndWait();
+ queries.startAsync();
// Add a query to it.
final StreamsQuery query = queries.add("query 1", true, false);
@@ -179,7 +178,7 @@ public void updateListenerNotify() throws Exception {
queries.add("query 2", true, false);
} finally {
- queries.stop();
+ queries.stopAsync();
}
}
@@ -191,8 +190,8 @@ public void updateListenerNotify_multiClient() throws Exception {
final QueryRepository queries2 = new InMemoryQueryRepository( changeLog, SCHEDULE );
try {
- queries.startAndWait();
- queries2.startAndWait();
+ queries.startAsync();
+ queries2.startAsync();
//show listener on repo that query was added to is being notified of the new query.
final CountDownLatch repo1Latch = new CountDownLatch(1);
@@ -226,8 +225,8 @@ public void updateListenerNotify_multiClient() throws Exception {
assertTrue(repo2Latch.await(5, TimeUnit.SECONDS));
} catch(final InterruptedException e ) {
} finally {
- queries.stop();
- queries2.stop();
+ queries.stopAsync();
+ queries2.stopAsync();
}
}
diff --git a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
index 9fbc2c627..07eb3e898 100644
--- a/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
+++ b/extras/rya.streams/client/src/test/java/org/apache/rya/streams/client/command/RunQueryCommandIT.java
@@ -18,17 +18,8 @@
*/
package org.apache.rya.streams.client.command;
-import static org.junit.Assert.assertEquals;
-
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -60,8 +51,16 @@
import org.junit.Rule;
import org.junit.Test;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
/**
* Integration tests the methods of {@link RunQueryCommand}.
@@ -99,7 +98,7 @@ public void setup() {
@After
public void cleanup() throws Exception {
- queryRepo.stopAndWait();
+ queryRepo.stopAsync();
stmtProducer.close();
resultConsumer.close();
}
diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java
index 0bf13d143..e267252fb 100644
--- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java
+++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java
@@ -18,12 +18,9 @@
*/
package org.apache.rya.streams.kafka;
-import static java.util.Objects.requireNonNull;
-
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -55,10 +52,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
+import static java.util.Objects.requireNonNull;
/**
* Constructs instances of {@link RyaStreamsClient} that are connected to a Kafka cluster.
@@ -111,7 +109,7 @@ public static RyaStreamsClient make(
@Override
public void close() {
try {
- queryRepo.stopAndWait();
+ queryRepo.stopAsync();
} catch (final Exception e) {
log.warn("Couldn't close a QueryRepository.", e);
}
@@ -123,7 +121,7 @@ public void close() {
* Create a {@link Producer} that is able to write to a topic in Kafka.
*
* @param kafkaHostname - The Kafka broker hostname. (not null)
- * @param kafkaPort - The Kafka broker port.
+ * @param kakfaPort - The Kafka broker port.
* @param keySerializerClass - Serializes the keys. (not null)
* @param valueSerializerClass - Serializes the values. (not null)
* @return A {@link Producer} that can be used to write records to a topic.
@@ -149,7 +147,7 @@ private static Producer makeProducer(
* starting at the earliest point by default.
*
* @param kafkaHostname - The Kafka broker hostname. (not null)
- * @param kafkaPort - The Kafka broker port.
+ * @param kakfaPort - The Kafka broker port.
* @param keyDeserializerClass - Deserializes the keys. (not null)
* @param valueDeserializerClass - Deserializes the values. (not null)
* @return A {@link Consumer} that can be used to read records from a topic.
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
index e6bd800e7..2d2f25249 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
@@ -18,7 +18,23 @@
*/
package org.apache.rya.streams.querymanager;
-import static java.util.Objects.requireNonNull;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
@@ -35,25 +51,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.rya.streams.api.entity.StreamsQuery;
-import org.apache.rya.streams.api.queries.ChangeLogEntry;
-import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
-import org.apache.rya.streams.api.queries.QueryChange;
-import org.apache.rya.streams.api.queries.QueryChangeLog;
-import org.apache.rya.streams.api.queries.QueryChangeLogListener;
-import org.apache.rya.streams.api.queries.QueryRepository;
-import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
-import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
-import com.google.common.util.concurrent.AbstractService;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
+import static java.util.Objects.requireNonNull;
/**
* A service for managing {@link StreamsQuery} running on a Rya Streams system.
@@ -139,10 +137,12 @@ protected void doStart() {
executor.submit(new QueryEventWorker(queryEvents, queryExecutor, blockingValue, blockingUnits, shutdownSignal));
// Start up the query execution framework.
- queryExecutor.startAndWait();
+ queryExecutor.startAsync();
+ queryExecutor.awaitRunning();
// Startup the source that discovers new Query Change Logs.
- changeLogSource.startAndWait();
+ changeLogSource.startAsync();
+ changeLogSource.awaitRunning();
// Subscribe the source a listener that writes to the LogEventWorker's work queue.
changeLogSource.subscribe(new LogEventWorkGenerator(logEvents, blockingValue, blockingUnits, shutdownSignal));
@@ -176,14 +176,16 @@ protected void doStop() {
// Stop the source of new Change Logs.
try {
- changeLogSource.stopAndWait();
+ changeLogSource.stopAsync();
+ changeLogSource.awaitTerminated();
} catch(final UncheckedExecutionException e) {
log.warn("Could not stop the Change Log Source.", e);
}
// Stop the query execution framework.
try {
- queryExecutor.stopAndWait();
+ queryExecutor.stopAsync();
+ queryExecutor.awaitTerminated();
} catch(final UncheckedExecutionException e) {
log.warn("Could not stop the Query Executor", e);
}
@@ -303,7 +305,7 @@ public String toString() {
* {@link QueryChangeLogSource}.
*
* @param ryaInstance - The Rya Instance the created log is for. (not null)
- * @param log - The created {@link QueryChangeLog. (not null)
+ * @param log - The created {@link QueryChangeLog}. (not null)
* @return A {@link LogEvent} built using the provided values.
*/
public static LogEvent create(final String ryaInstance, final QueryChangeLog log) {
@@ -499,7 +501,7 @@ static class LogEventWorkGenerator implements SourceListener {
private final TimeUnit offerUnits;
/**
- * Constructs an instance of {@link QueryManagerSourceListener}.
+ * Constructs an instance of the {@link SourceListener} interface.
*
* @param workQueue - A blocking queue that will have {@link LogEvent}s offered to it. (not null)
* @param offerValue - How long to wait when offering new work.
@@ -548,7 +550,7 @@ public void notifyDelete(final String ryaInstanceName) {
*
* Whenever a new log has been created, then it registers a {@link QueryEventWorkGenerator}
* that generates {@link QueryEvent}s based on the content and updates to the discovered
- * {@link QueryChagneLog}.
+ * {@link QueryChangeLog}.
*
* Whenever a log is deleted, then the generator is stopped and a stop all {@link QueryEvent}
* is written to the work queue.
@@ -620,7 +622,8 @@ public void run() {
// so that it may be shutdown later.
final Scheduler scheduler = Scheduler.newFixedRateSchedule(0, blockingValue, blockingUnits);
final QueryRepository repo = new InMemoryQueryRepository(logEvent.getQueryChangeLog().get(), scheduler);
- repo.startAndWait();
+ repo.startAsync();
+ repo.awaitRunning();
repos.put(ryaInstance, repo);
// Subscribe a worker that adds the Query Events to the queryWorkQueue queue.
@@ -659,7 +662,8 @@ public void run() {
// Shut down the query repository for the Rya instance. This ensures the listener will
// not receive any more work that needs to be done.
final QueryRepository deletedRepo = repos.remove(ryaInstance);
- deletedRepo.stopAndWait();
+ deletedRepo.stopAsync();
+ deletedRepo.awaitTerminated();
// Add work that stops all of the queries related to the instance.
final QueryEvent stopAllEvent = QueryEvent.stopALL(ryaInstance);
@@ -675,7 +679,8 @@ public void run() {
log.info("LogEventWorker shutting down...");
// Shutdown all of the QueryRepositories that were started.
- repos.values().forEach(repo -> repo.stopAndWait());
+ repos.values().forEach(repo -> repo.stopAsync());
+ repos.values().forEach(repo -> repo.awaitTerminated());
log.info("LogEventWorker shut down.");
}
diff --git a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
index 04a0382fb..b879eba60 100644
--- a/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
+++ b/extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
@@ -18,16 +18,12 @@
*/
package org.apache.rya.streams.querymanager;
-import static java.util.Objects.requireNonNull;
-
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.concurrent.TimeUnit;
-
-import javax.xml.bind.JAXBException;
-
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
import org.apache.commons.daemon.Daemon;
import org.apache.commons.daemon.DaemonContext;
import org.apache.commons.daemon.DaemonInitException;
@@ -44,13 +40,14 @@
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.ParameterException;
-import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import javax.xml.bind.JAXBException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.TimeUnit;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
+import static java.util.Objects.requireNonNull;
/**
* JSVC integration code for a {@link QueryManager} to be used as a non-Windows daemon.
@@ -122,13 +119,15 @@ public void init(final DaemonContext context) throws DaemonInitException, Except
@Override
public void start() throws Exception {
log.info("Starting the Rya Streams Query Manager Daemon.");
- manager.startAndWait();
+ manager.startAsync();
+ manager.awaitRunning();
}
@Override
public void stop() throws Exception {
log.info("Stopping the Rya Streams Query Manager Daemon.");
- manager.stopAndWait();
+ manager.stopAsync();
+ manager.awaitTerminated();
}
@Override
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
index f1c9e0f45..c2da67c63 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/QueryManagerTest.java
@@ -18,17 +18,6 @@
*/
package org.apache.rya.streams.querymanager;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.queries.InMemoryQueryChangeLog;
import org.apache.rya.streams.api.queries.QueryChange;
@@ -36,6 +25,17 @@
import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
import org.junit.Test;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
/**
* Unit tests the methods of {@link QueryManager}.
*/
@@ -75,11 +75,12 @@ public void testCreateQuery() throws Exception {
final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS);
try {
- qm.startAndWait();
+ qm.startAsync();
+ qm.awaitRunning();
queryStarted.await(5, TimeUnit.SECONDS);
verify(qe).startQuery(ryaInstance, query);
} finally {
- qm.stopAndWait();
+ qm.stopAsync();
}
}
@@ -129,11 +130,12 @@ public void testDeleteQuery() throws Exception {
final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS);
try {
- qm.startAndWait();
+ qm.startAsync();
+ qm.awaitRunning();
queryDeleted.await(5, TimeUnit.SECONDS);
verify(qe).stopQuery(query.getQueryId());
} finally {
- qm.stopAndWait();
+ qm.stopAsync();
}
}
@@ -184,11 +186,12 @@ public void testUpdateQuery() throws Exception {
final QueryManager qm = new QueryManager(qe, source, 50, TimeUnit.MILLISECONDS);
try {
- qm.startAndWait();
+ qm.startAsync();
+ qm.awaitRunning();
queryDeleted.await(10, TimeUnit.SECONDS);
verify(qe).stopQuery(query.getQueryId());
} finally {
- qm.stopAndWait();
+ qm.stopAsync();
}
}
}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSourceIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSourceIT.java
index 5914b789a..4bc4d3f43 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSourceIT.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSourceIT.java
@@ -18,15 +18,7 @@
*/
package org.apache.rya.streams.querymanager.kafka;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
+import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
import org.apache.rya.streams.api.queries.QueryChangeLog;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.querymanager.QueryChangeLogSource;
@@ -36,7 +28,14 @@
import org.junit.Rule;
import org.junit.Test;
-import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
/**
* Integration tests the methods of {@link KafkaQueryChangeLogSource}.
@@ -79,13 +78,14 @@ public void notifyDelete(final String ryaInstanceName) { }
try {
// Start the source.
- source.startAndWait();
+ source.startAsync();
+ source.awaitRunning();
// If the latch isn't counted down, then fail the test.
assertTrue( created.await(5, TimeUnit.SECONDS) );
} finally {
- source.stopAndWait();
+ source.stopAsync();
}
}
@@ -113,7 +113,8 @@ public void notifyDelete(final String ryaInstanceName) { }
try {
// Start the source.
- source.startAndWait();
+ source.startAsync();
+ source.awaitRunning();
// Wait twice the polling duration to ensure it iterates at least once.
Thread.sleep(200);
@@ -125,7 +126,7 @@ public void notifyDelete(final String ryaInstanceName) { }
// If the latch isn't counted down, then fail the test.
assertTrue( created.await(5, TimeUnit.SECONDS) );
} finally {
- source.stopAndWait();
+ source.stopAsync();
}
}
@@ -161,7 +162,8 @@ public void notifyDelete(final String ryaInstanceName) {
try {
// Start the source
- source.startAndWait();
+ source.startAsync();
+ source.awaitRunning();
// Wait for it to indicate the topic was created.
assertTrue( created.await(5, TimeUnit.SECONDS) );
@@ -173,7 +175,7 @@ public void notifyDelete(final String ryaInstanceName) {
assertTrue( deleted.await(5, TimeUnit.SECONDS) );
} finally {
- source.stopAndWait();
+ source.stopAsync();
}
}
@@ -205,7 +207,8 @@ public void notifyDelete(final String ryaInstanceName) { }
try {
// Start the source
- source.startAndWait();
+ source.startAsync();
+ source.awaitRunning();
// Wait for that first listener to indicate the topic was created. This means that one has been cached.
assertTrue( created.await(5, TimeUnit.SECONDS) );
@@ -226,7 +229,7 @@ public void notifyDelete(final String ryaInstanceName) { }
assertTrue( newListenerCreated.await(5, TimeUnit.SECONDS) );
} finally {
- source.stopAndWait();
+ source.stopAsync();
}
}
@@ -240,7 +243,8 @@ public void unsubscribedDoesNotReceiveNotifications() throws Exception {
try {
// Start the source.
- source.startAndWait();
+ source.startAsync();
+ source.awaitRunning();
// Create a listener that flips a boolean to true when it is notified.
final AtomicBoolean notified = new AtomicBoolean(false);
@@ -271,7 +275,7 @@ public void notifyDelete(final String ryaInstanceName) {
// Show the boolean was never flipped to true.
assertFalse(notified.get());
} finally {
- source.stopAndWait();
+ source.stopAsync();
}
}
}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
index fcb3a4674..624ef35b1 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorIT.java
@@ -18,12 +18,7 @@
*/
package org.apache.rya.streams.querymanager.kafka;
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
+import com.google.common.collect.Lists;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -50,7 +45,11 @@
import org.junit.Rule;
import org.junit.Test;
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
/**
* Integration tests the methods of {@link LocalQueryExecutor}.
@@ -124,7 +123,8 @@ public void runQuery() throws Exception {
final String kafkaServers = kafka.getKafkaHostname() + ":" + kafka.getKafkaPort();
final KafkaStreamsFactory jobFactory = new SingleThreadKafkaStreamsFactory(kafkaServers);
final QueryExecutor executor = new LocalQueryExecutor(createKafkaTopic, jobFactory);
- executor.startAndWait();
+ executor.startAsync();
+ executor.awaitRunning();
try {
// Start the query.
executor.startQuery(ryaInstance, sQuery);
@@ -144,7 +144,7 @@ public void runQuery() throws Exception {
assertEquals(expected, results);
} finally {
- executor.stopAndWait();
+ executor.stopAsync();
}
}
}
\ No newline at end of file
diff --git a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
index efbcf4bba..b58aad11a 100644
--- a/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
+++ b/extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/kafka/LocalQueryExecutorTest.java
@@ -18,17 +18,7 @@
*/
package org.apache.rya.streams.querymanager.kafka;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.Set;
-import java.util.UUID;
-
+import com.google.common.collect.Sets;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.kafka.KafkaStreamsFactory;
@@ -36,7 +26,16 @@
import org.apache.rya.streams.querymanager.QueryExecutor;
import org.junit.Test;
-import com.google.common.collect.Sets;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Unit tests the methods of {@link LocalQueryExecutor}.
@@ -62,7 +61,8 @@ public void startQuery() throws Exception {
// Start the executor that will be tested.
final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
- executor.startAndWait();
+ executor.startAsync();
+ executor.awaitRunning();
try {
// Tell the executor to start the query.
executor.startQuery(ryaInstance, query);
@@ -70,7 +70,7 @@ public void startQuery() throws Exception {
// Show a job was started for that query's ID.
verify(queryJob).start();
} finally {
- executor.stopAndWait();
+ executor.stopAsync();
}
}
@@ -84,12 +84,13 @@ public void stopQuery_serviceNotStarted() throws Exception {
public void stopQuery_queryNotRunning() throws Exception {
// Start an executor.
final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
- executor.startAndWait();
+ executor.startAsync();
+ executor.awaitRunning();
try {
// Try to stop a query that was never stareted.
executor.stopQuery(UUID.randomUUID());
} finally {
- executor.stopAndWait();
+ executor.stopAsync();
}
}
@@ -106,7 +107,8 @@ public void stopQuery() throws Exception {
// Start the executor that will be tested.
final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
- executor.startAndWait();
+ executor.startAsync();
+ executor.awaitRunning();
try {
// Tell the executor to start the query.
executor.startQuery(ryaInstance, query);
@@ -117,7 +119,7 @@ public void stopQuery() throws Exception {
// Show a job was stopped for that query's ID.
verify(queryJob).close();
} finally {
- executor.stopAndWait();
+ executor.stopAsync();
}
}
@@ -143,7 +145,8 @@ public void stopAll_noneForThatRyaInstance() throws Exception {
// Start the executor that will be tested.
final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
- executor.startAndWait();
+ executor.startAsync();
+ executor.awaitRunning();
try {
// Tell the executor to start the queries.
executor.startQuery(ryaInstance, query1);
@@ -161,7 +164,7 @@ public void stopAll_noneForThatRyaInstance() throws Exception {
verify(queryJob2, never()).close();
} finally {
- executor.stopAndWait();
+ executor.stopAsync();
}
}
@@ -182,7 +185,8 @@ public void stopAll() throws Exception {
// Start the executor that will be tested.
final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
- executor.startAndWait();
+ executor.startAsync();
+ executor.awaitRunning();
try {
// Tell the executor to start the queries.
executor.startQuery(ryaInstance1, query1);
@@ -200,7 +204,7 @@ public void stopAll() throws Exception {
verify(queryJob2).close();
} finally {
- executor.stopAndWait();
+ executor.stopAsync();
}
}
@@ -214,7 +218,8 @@ public void getRunningQueryIds_serviceNotStarted() throws Exception {
public void getRunningQueryIds_noneStarted() throws Exception {
// Start an executor.
final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), mock(KafkaStreamsFactory.class));
- executor.startAndWait();
+ executor.startAsync();
+ executor.awaitRunning();
try {
// Get the list of running queries.
final Set runningQueries = executor.getRunningQueryIds();
@@ -222,7 +227,7 @@ public void getRunningQueryIds_noneStarted() throws Exception {
// Show no queries are reported as running.
assertTrue(runningQueries.isEmpty());
} finally {
- executor.stopAndWait();
+ executor.stopAsync();
}
}
@@ -242,7 +247,8 @@ public void getRunningQueryIds_noneStopped() throws Exception {
// Start the executor that will be tested.
final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
- executor.startAndWait();
+ executor.startAsync();
+ executor.awaitRunning();
try {
// Start the queries.
executor.startQuery(ryaInstance, query1);
@@ -257,7 +263,7 @@ public void getRunningQueryIds_noneStopped() throws Exception {
assertEquals(expected, executor.getRunningQueryIds());
} finally {
- executor.stopAndWait();
+ executor.stopAsync();
}
}
@@ -277,7 +283,8 @@ public void getRunningQueryIds_stoppedNoLongerListed() throws Exception {
// Start the executor that will be tested.
final QueryExecutor executor = new LocalQueryExecutor(mock(CreateKafkaTopic.class), jobFactory);
- executor.startAndWait();
+ executor.startAsync();
+ executor.awaitRunning();
try {
// Start the queries.
executor.startQuery(ryaInstance, query1);
@@ -294,7 +301,7 @@ public void getRunningQueryIds_stoppedNoLongerListed() throws Exception {
assertEquals(expected, executor.getRunningQueryIds());
} finally {
- executor.stopAndWait();
+ executor.stopAsync();
}
}
}
\ No newline at end of file
diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/AccumuloHDFSFileInputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/AccumuloHDFSFileInputFormat.java
index 63929bb79..132993f56 100644
--- a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/AccumuloHDFSFileInputFormat.java
+++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/AccumuloHDFSFileInputFormat.java
@@ -19,25 +19,18 @@
* under the License.
*/
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
+import com.google.common.base.Preconditions;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
-import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.file.rfile.RFileOperations;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.ArgumentChecker;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
@@ -52,6 +45,12 @@
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
/**
* {@link FileInputFormat} that finds the Accumulo tablet files on the HDFS
* disk, and uses that as the input for MapReduce jobs.
@@ -68,8 +67,8 @@ public List getSplits(JobContext jobContext) throws IOException {
String user = MRUtils.AccumuloProps.getUsername(jobContext);
AuthenticationToken password = MRUtils.AccumuloProps.getPassword(jobContext);
String table = MRUtils.AccumuloProps.getTablename(jobContext);
- ArgumentChecker.notNull(instance);
- ArgumentChecker.notNull(table);
+ Preconditions.checkNotNull(instance);
+ Preconditions.checkNotNull(table);
//find the files necessary
try {
@@ -113,8 +112,11 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
FileSystem fs = file.getFileSystem(job);
Instance instance = MRUtils.AccumuloProps.getInstance(taskAttemptContext);
- fileSKVIterator = RFileOperations.getInstance().openReader(file.toString(), ALLRANGE,
- new HashSet(), false, fs, job, instance.getConfiguration());
+ fileSKVIterator = RFileOperations.getInstance().newScanReaderBuilder()
+ .forFile(file.toString(), fs, job)
+ .withTableConfiguration(instance.getConfiguration())
+ .overRange(ALLRANGE, new HashSet<>(), false)
+ .build();
}
@Override
diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java
index 2fc272805..521d23f65 100644
--- a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java
+++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java
@@ -19,21 +19,18 @@
* under the License.
*/
-import java.io.IOException;
-import java.util.Map.Entry;
-
+import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat;
import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
+import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
import org.apache.rya.api.domain.RyaStatement;
@@ -41,6 +38,10 @@
import org.apache.rya.api.resolver.triple.TripleRow;
import org.apache.rya.api.resolver.triple.TripleRowResolverException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map.Entry;
+
/**
* Subclass of {@link AbstractInputFormat} for reading
* {@link RyaStatementWritable}s directly from a running Rya instance.
@@ -76,6 +77,20 @@ public class RyaStatementRecordReader extends AbstractRecordReader contextIterators(TaskAttemptContext context, String tableName) {
+ return InputConfigurator.getIterators(CLASS, context.getConfiguration());
+ }
+
@Override
protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName,
RangeInputSplit split) {
diff --git a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/AccumuloStorage.java b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/AccumuloStorage.java
index d0185a122..591d6eb25 100644
--- a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/AccumuloStorage.java
+++ b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/AccumuloStorage.java
@@ -19,21 +19,6 @@
* under the License.
*/
-
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
@@ -69,6 +54,18 @@
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
/**
* A LoadStoreFunc for retrieving data from and storing data to Accumulo
*
@@ -377,6 +374,10 @@ private static byte[] objToBytes(final Object o) {
public void cleanupOnFailure(final String failure, final Job job) {
}
+ @Override
+ public void cleanupOnSuccess(String s, Job job) throws IOException {
+ }
+
@Override
public WritableComparable> getSplitComparable(final InputSplit inputSplit) throws IOException {
//cannot get access to the range directly
diff --git a/pom.xml b/pom.xml
index 3e68ee5c1..37d4ac0bc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,38 +69,38 @@ under the License.
web
- 2.3.1
+ 3.2.2
- 1.6.4
- 2.5.0
+ 1.9.3
+ 2.9.2
- 3.4.6
+ 3.4.14
- 0.9.2
+ 0.15.0
- 5.2.1
- 2.1
+ 5.3.1
+ 2.9.1
3.10.2
2.2.0
- 3.2.2
+ 3.2.2
- 2.6
- 1.10
- 1.6
+ 2.6
+ 1.10
+ 1.6
2.5
- 1.3
+ 1.5
- 14.0.1
+ 18.0
2.8.1
- 4.5.2
- 4.4.4
+ 4.5.10
+ 4.4.12
2.2.11
- 1.2.0
+ 1.4.0
3.4
1.7.2
1.3
@@ -110,11 +110,12 @@ under the License.
1.0.2.RELEASE
1.1.0.RELEASE
- 4.12
- 1.10.19
- 1.1.0
- 1.7.25
- 1.6.1
+ 4.12
+ 1.1.0
+ 1.7.26
+ 1.10.19
+ 2.28.2
+ 2.0.2
UTF-8
UTF-8
@@ -122,16 +123,18 @@ under the License.
3.0.4
- 1.0.0-incubating
+ 1.2.0
3.0.8
- 0.9.1
+ 0.9.3
1.2
1.60
4.0.1
1.13
+ 0.14.0
+
1.3.9-1
1.0-1
0.10.0.1
@@ -227,7 +230,7 @@ under the License.
LocationTech - SNAPSHOT
- https://repo.locationtech.org/content/repositories/snapshots/
+ https://repo.eclipse.org/content/repositories/snapshots/
false
@@ -237,7 +240,7 @@ under the License.
LocationTech - RELEASE
- https://repo.locationtech.org/content/repositories/releases/
+ https://repo.eclipse.org/content/repositories/releases/
true
@@ -247,7 +250,7 @@ under the License.
LocationTech - Third Party
- https://repo.locationtech.org/content/repositories/thirdparty/
+ https://repo.eclipse.org/content/repositories/thirdparty/
true
@@ -366,6 +369,20 @@ under the License.
+
+
+
+
+ oss.sonatype.org-snapshot
+ http://oss.sonatype.org/content/repositories/snapshots
+
+ false
+
+
+ true
+
+
+
@@ -569,7 +586,7 @@ under the License.
org.apache.rya
rya.kafka.connect.mongo
${project.version}
-
+
org.apache.accumulo
accumulo-core
@@ -582,8 +599,10 @@ under the License.
org.eclipse.rdf4j
- rdf4j-runtime
+ rdf4j-bom
${org.eclipse.rdf4j.version}
+ pom
+ import
org.eclipse.rdf4j
@@ -615,7 +634,6 @@ under the License.
rdf4j-queryresultio-text
${org.eclipse.rdf4j.version}
-
org.eclipse.rdf4j
rdf4j-rio-api
@@ -661,6 +679,12 @@ under the License.
rdf4j-queryrender
${org.eclipse.rdf4j.version}
+
+ org.eclipse.rdf4j
+ rdf4j-runtime
+ ${org.eclipse.rdf4j.version}
+ pom
+
org.eclipse.rdf4j
rdf4j-runtime-osgi
@@ -1012,7 +1036,13 @@ under the License.
org.mockito
mockito-all
- ${mockito.version}
+ ${mockito.all.version}
+ test
+
+
+ org.mockito
+ mockito-core
+ ${mockito.core.version}
test
@@ -1028,19 +1058,53 @@ under the License.
test
+
+ org.powermock
+ powermock-core
+
+
+ org.powermock
+ powermock-all
+
org.powermock
powermock-module-junit4
+
+ org.powermock
+ powermock-api-mockito
+
+
+ org.powermock
+ powermock-api-mockito2
+
+
+ org.powermock
+ powermock-core
+ ${powermock.version}
+ test
+
+
+ org.powermock
+ powermock-all
+ ${powermock.version}
+ test
+
org.powermock
powermock-module-junit4
${powermock.version}
test
-
+
+ org.powermock
+ powermock-api-mockito2
+ ${powermock.version}
+ test
+
+
org.openjdk.jmh
@@ -1142,11 +1206,11 @@ under the License.
-
+
org.codehaus.mojo
animal-sniffer-maven-plugin
- 1.15
+ 1.16
org.codehaus.mojo.signature
@@ -1500,4 +1564,4 @@ under the License.
HEAD
https://git-wip-us.apache.org/repos/asf?p=incubator-rya.git
-
\ No newline at end of file
+
diff --git a/sail/pom.xml b/sail/pom.xml
index d222e4193..b2508423c 100644
--- a/sail/pom.xml
+++ b/sail/pom.xml
@@ -67,6 +67,7 @@ under the License.
org.eclipse.rdf4j
rdf4j-runtime
+ pom
diff --git a/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java b/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
index ae97fd8ab..56394883a 100644
--- a/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
+++ b/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java
@@ -103,18 +103,21 @@
import org.eclipse.rdf4j.query.impl.EmptyBindingSet;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.helpers.AbstractSailConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RdfCloudTripleStoreConnection extends AbstractSailConnection {
+
+ private static final Logger logger = LoggerFactory.getLogger(RdfCloudTripleStoreConnection.class);
+
private final RdfCloudTripleStore store;
+ private final C conf;
private RdfEvalStatsDAO rdfEvalStatsDAO;
private SelectivityEvalDAO selectEvalDAO;
private RyaDAO ryaDAO;
private InferenceEngine inferenceEngine;
private NamespaceManager namespaceManager;
- private final C conf;
-
-
private ProvenanceCollector provenanceCollector;
public RdfCloudTripleStoreConnection(final RdfCloudTripleStore sailBase, final C conf, final ValueFactory vf)
From 77e47923998dcb70812859c17d8eae6d175ba088 Mon Sep 17 00:00:00 2001
From: Brad Rushworth <67445484+brushworth@users.noreply.github.com>
Date: Tue, 14 Jul 2020 17:16:59 +1000
Subject: [PATCH 2/3] Replacing the old @Immutable annotation with
@Contract(threading = ThreadingBehavior.IMMUTABLE) per the advice at
https://dev-aux.com/java/org-apache-http-annotation-threadsafe-class-not-found
---
extras/indexing/pom.xml | 2 +-
.../java/org/apache/rya/indexing/entity/model/Entity.java | 5 +++--
.../java/org/apache/rya/indexing/entity/model/Property.java | 5 +++--
.../main/java/org/apache/rya/indexing/entity/model/Type.java | 5 +++--
.../org/apache/rya/indexing/entity/model/TypedEntity.java | 5 +++--
.../rya/export/api/conf/AccumuloMergeConfiguration.java | 5 +++--
.../org/apache/rya/export/api/conf/MergeConfiguration.java | 5 +++--
pom.xml | 2 +-
8 files changed, 20 insertions(+), 14 deletions(-)
diff --git a/extras/indexing/pom.xml b/extras/indexing/pom.xml
index c9efd0672..aba2d99e8 100644
--- a/extras/indexing/pom.xml
+++ b/extras/indexing/pom.xml
@@ -70,7 +70,6 @@
httpclient
-
org.apache.rya
@@ -84,6 +83,7 @@
org.apache.rya
rya.periodic.notification.api
+
org.eclipse.rdf4j
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Entity.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Entity.java
index 5f0497820..e1afec64e 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Entity.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Entity.java
@@ -24,7 +24,8 @@
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
-import jdk.nashorn.internal.ir.annotations.Immutable;
+import org.apache.http.annotation.Contract;
+import org.apache.http.annotation.ThreadingBehavior;
import org.apache.log4j.Logger;
import org.apache.rya.api.domain.RyaIRI;
import org.apache.rya.indexing.entity.storage.EntityStorage;
@@ -72,7 +73,7 @@
* the {@link Type}, but nothing has explicitly indicated it is of that Type.
* Once something has done so, it is an explicitly typed Entity.
*/
-@Immutable
+@Contract(threading = ThreadingBehavior.IMMUTABLE)
@DefaultAnnotation(NonNull.class)
public class Entity {
private static final Logger log = Logger.getLogger(Entity.class);
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Property.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Property.java
index 0b9b77d58..cda44bb7a 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Property.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Property.java
@@ -20,7 +20,8 @@
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
-import jdk.nashorn.internal.ir.annotations.Immutable;
+import org.apache.http.annotation.Contract;
+import org.apache.http.annotation.ThreadingBehavior;
import org.apache.rya.api.domain.RyaIRI;
import org.apache.rya.api.domain.RyaType;
@@ -31,7 +32,7 @@
/**
* A value that has been set for an {@link TypedEntity}.
*/
-@Immutable
+@Contract(threading = ThreadingBehavior.IMMUTABLE)
@DefaultAnnotation(NonNull.class)
public class Property {
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Type.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Type.java
index 1a61812bf..23c3dbd54 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Type.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/Type.java
@@ -21,7 +21,8 @@
import com.google.common.collect.ImmutableSet;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
-import jdk.nashorn.internal.ir.annotations.Immutable;
+import org.apache.http.annotation.Contract;
+import org.apache.http.annotation.ThreadingBehavior;
import org.apache.rya.api.domain.RyaIRI;
import org.apache.rya.indexing.entity.storage.TypeStorage;
@@ -42,7 +43,7 @@
* <urn:nutritionalInformation>
*
*/
-@Immutable
+@Contract(threading = ThreadingBehavior.IMMUTABLE)
@DefaultAnnotation(NonNull.class)
public class Type {
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/TypedEntity.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/TypedEntity.java
index fdce30dc2..ad0f0dd62 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/TypedEntity.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/model/TypedEntity.java
@@ -23,7 +23,8 @@
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
-import jdk.nashorn.internal.ir.annotations.Immutable;
+import org.apache.http.annotation.Contract;
+import org.apache.http.annotation.ThreadingBehavior;
import org.apache.rya.api.domain.RyaIRI;
import org.apache.rya.api.domain.RyaType;
@@ -38,7 +39,7 @@
* A {@link TypedEntity} is a view of an {@link Entity} that has had a specific
* {@link Type} applied to it.
*/
-@Immutable
+@Contract(threading = ThreadingBehavior.IMMUTABLE)
@DefaultAnnotation(NonNull.class)
public class TypedEntity {
diff --git a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/api/conf/AccumuloMergeConfiguration.java b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/api/conf/AccumuloMergeConfiguration.java
index 1a351147f..9f01b4a2f 100644
--- a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/api/conf/AccumuloMergeConfiguration.java
+++ b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/api/conf/AccumuloMergeConfiguration.java
@@ -18,14 +18,15 @@
*/
package org.apache.rya.export.api.conf;
-import jdk.nashorn.internal.ir.annotations.Immutable;
+import org.apache.http.annotation.Contract;
+import org.apache.http.annotation.ThreadingBehavior;
import org.apache.rya.export.InstanceType;
/**
* Immutable configuration object to allow the MergeTool to connect to the parent and child
* databases for data merging.
*/
-@Immutable
+@Contract(threading = ThreadingBehavior.IMMUTABLE)
public class AccumuloMergeConfiguration extends MergeConfigurationDecorator {
/**
* Information needed to connect to the parent database
diff --git a/extras/rya.export/export.api/src/main/java/org/apache/rya/export/api/conf/MergeConfiguration.java b/extras/rya.export/export.api/src/main/java/org/apache/rya/export/api/conf/MergeConfiguration.java
index 329589bd4..757ce63ec 100644
--- a/extras/rya.export/export.api/src/main/java/org/apache/rya/export/api/conf/MergeConfiguration.java
+++ b/extras/rya.export/export.api/src/main/java/org/apache/rya/export/api/conf/MergeConfiguration.java
@@ -18,7 +18,8 @@
*/
package org.apache.rya.export.api.conf;
-import jdk.nashorn.internal.ir.annotations.Immutable;
+import org.apache.http.annotation.Contract;
+import org.apache.http.annotation.ThreadingBehavior;
import org.apache.rya.export.DBType;
import org.apache.rya.export.MergePolicy;
@@ -28,7 +29,7 @@
* Immutable configuration object to allow the MergeTool to connect to the parent and child
* databases for data merging.
*/
-@Immutable
+@Contract(threading = ThreadingBehavior.IMMUTABLE)
public class MergeConfiguration {
/**
* Information needed to connect to the parent database
diff --git a/pom.xml b/pom.xml
index 37d4ac0bc..f4729a542 100644
--- a/pom.xml
+++ b/pom.xml
@@ -250,7 +250,7 @@ under the License.
LocationTech - Third Party
- https://repo.eclipse.org/content/repositories/thirdparty/
+ https://repo.eclipse.org/content/repositories/locationtech-thirdparty/
true
From 94dbd1627fa1215aa7eaffd828f7519e5753123d Mon Sep 17 00:00:00 2001
From: Brad Rushworth <67445484+brushworth@users.noreply.github.com>
Date: Fri, 26 Jun 2020 16:52:59 +1000
Subject: [PATCH 3/3] RYA-532 Assign specific interface types for Subject,
Predicate, Object and Context and just generally clean up the code.
---
.../rya/api/RdfCloudTripleStoreStatement.java | 74 --
.../rya/api/client/ExecuteSparqlQuery.java | 10 +-
.../org/apache/rya/api/domain/RangeIRI.java | 10 +-
.../org/apache/rya/api/domain/RyaIRI.java | 18 +-
.../apache/rya/api/domain/RyaIRIRange.java | 20 +-
.../org/apache/rya/api/domain/RyaRange.java | 4 +-
.../apache/rya/api/domain/RyaResource.java | 25 +
.../apache/rya/api/domain/RyaStatement.java | 116 +++-
.../org/apache/rya/api/domain/RyaType.java | 53 +-
.../apache/rya/api/domain/RyaTypeRange.java | 14 +-
.../org/apache/rya/api/domain/RyaValue.java | 42 ++
.../rya/api/domain/StatementMetadata.java | 42 +-
.../apache/rya/api/instance/RyaDetails.java | 29 +-
.../org/apache/rya/api/persist/RyaDAO.java | 9 +-
.../rya/api/persist/RyaDAOException.java | 3 +-
.../rya/api/persist/query/BatchRyaQuery.java | 44 +-
.../rya/api/persist/query/RyaQuery.java | 40 +-
.../rya/api/persist/query/join/HashJoin.java | 51 +-
.../api/persist/query/join/IterativeJoin.java | 64 +-
.../rya/api/persist/query/join/Join.java | 17 +-
.../rya/api/persist/query/join/MergeJoin.java | 70 +-
.../rya/api/persist/utils/RyaDAOHelper.java | 227 +++++--
.../api/persist/utils/RyaDaoQueryWrapper.java | 30 +-
.../rya/api/query/strategy/ByteRange.java | 6 +
.../query/strategy/TriplePatternStrategy.java | 10 +-
.../AbstractHashedTriplePatternStrategy.java | 15 +-
...HashedPoWholeRowTriplePatternStrategy.java | 42 +-
...ashedSpoWholeRowTriplePatternStrategy.java | 41 +-
.../NullRowTriplePatternStrategy.java | 29 +-
.../OspWholeRowTriplePatternStrategy.java | 30 +-
.../PoWholeRowTriplePatternStrategy.java | 32 +-
.../SpoWholeRowTriplePatternStrategy.java | 32 +-
.../rya/api/resolver/RdfToRyaConversions.java | 64 +-
.../apache/rya/api/resolver/RyaContext.java | 13 +-
.../rya/api/resolver/RyaToRdfConversions.java | 81 ++-
.../rya/api/resolver/RyaTripleContext.java | 5 +-
.../rya/api/resolver/RyaTypeResolver.java | 5 +-
.../resolver/impl/CustomDatatypeResolver.java | 16 +-
.../resolver/impl/RyaTypeResolverImpl.java | 16 +-
.../resolver/triple/TripleRowResolver.java | 7 +-
.../impl/WholeRowHashedTripleResolver.java | 44 +-
.../triple/impl/WholeRowTripleResolver.java | 32 +-
.../rya/api/utils/NullableStatementImpl.java | 103 ---
.../rya/api/utils/WildcardStatement.java | 137 ++++
.../apache/rya/api/domain/RyaTypeTest.java | 4 +
.../rya/api/persist/query/RyaQueryTest.java | 5 +-
...edPoWholeRowTriplePatternStrategyTest.java | 61 +-
...dSpoWholeRowTriplePatternStrategyTest.java | 109 ++-
.../NullRowTriplePatternStrategyTest.java | 28 +-
.../api/resolver/RdfToRyaConversionsTest.java | 254 ++++++-
.../api/resolver/RyaToRdfConversionsTest.java | 124 +++-
.../HashedWholeRowTripleResolverTest.java | 24 +-
.../impl/WholeRowTripleResolverTest.java | 23 +-
.../AccumuloRdfConfigurationBuilder.java | 18 +-
.../rya/accumulo/AccumuloRdfEvalStatsDAO.java | 30 +-
.../apache/rya/accumulo/AccumuloRdfUtils.java | 5 +-
.../apache/rya/accumulo/AccumuloRyaDAO.java | 73 +-
.../rya/accumulo/RyaTableKeyValues.java | 31 +-
.../accumulo/RyaTableMutationsFactory.java | 65 +-
.../query/AccumuloRyaQueryEngine.java | 33 +-
.../query/RangeBindingSetEntries.java | 21 +-
.../query/RyaStatementKeyValueIterator.java | 7 +-
.../query/ScannerBaseCloseableIterable.java | 7 +-
.../rya/accumulo/AccumuloRyaDAOTest.java | 637 +++++++++++++-----
.../AccumuloRyaDetailsRepositoryIT.java | 15 +-
.../query/RangeBindingSetEntriesTest.java | 129 ++++
dao/accumulo.rya/src/test/resources/log4j.xml | 40 ++
.../org/apache/rya/mongodb/MongoDBRyaDAO.java | 29 +-
.../AggregationPipelineQueryNode.java | 76 +--
.../dao/SimpleMongoDBStorageStrategy.java | 36 +-
.../RyaStatementBindingSetCursorIterator.java | 31 +-
.../rya/mongodb/MongoDBQueryEngineIT.java | 16 +-
.../apache/rya/mongodb/MongoDBRyaDAOIT.java | 29 +-
.../SimpleMongoDBStorageStrategyTest.java | 13 +-
.../instance/MongoRyaDetailsRepositoryIT.java | 17 +-
.../client/accumulo/AccumuloCreatePCJ.java | 16 +-
.../accumulo/AccumuloCreatePeriodicPCJ.java | 11 +-
.../apache/rya/indexing/TemporalInterval.java | 5 +-
.../AccumuloIndexingConfiguration.java | 27 +-
.../accumulo/entity/EntityCentricIndex.java | 45 +-
.../indexing/accumulo/entity/StarQuery.java | 25 +-
.../rya/indexing/entity/model/Entity.java | 13 +-
.../rya/indexing/entity/model/Property.java | 9 +-
.../indexing/entity/model/TypedEntity.java | 17 +-
.../entity/query/EntityQueryNode.java | 40 +-
.../entity/storage/EntityStorage.java | 15 +-
.../mongo/EntityDocumentConverter.java | 16 +-
.../storage/mongo/MongoEntityStorage.java | 55 +-
.../mongo/RyaTypeDocumentConverter.java | 12 +-
.../entity/update/BaseEntityIndexer.java | 47 +-
.../indexing/entity/update/EntityUpdater.java | 17 +-
.../AbstractExternalSetOptimizer.java | 11 +-
.../external/matching/BasicRater.java | 12 +-
.../rya/indexing/mongodb/MongoDbSmartUri.java | 23 +-
.../mongodb/update/MongoDocumentUpdater.java | 11 +-
.../mongodb/update/RyaObjectStorage.java | 26 +-
.../indexing/smarturi/SmartUriAdapter.java | 53 +-
.../indexing/smarturi/SmartUriStorage.java | 9 +-
.../duplication/DuplicateDataDetector.java | 37 +-
.../matching/StatementMetadataNode.java | 47 +-
.../rya/sail/config/RyaSailFactory.java | 21 +-
...DocumentIndexIntersectingIteratorTest.java | 16 +-
.../accumulo/entity/StarQueryTest.java | 15 +-
.../mongo/RyaTypeDocumentConverterTest.java | 9 +-
.../tupleSet/AccumuloIndexSetTest.java | 21 +-
.../accumulo/AccumuloRyaStatementStore.java | 24 +-
.../accumulo/util/AccumuloInstanceDriver.java | 29 +-
.../accumulo/util/AccumuloRyaUtils.java | 32 +-
.../apache/rya/export/accumulo/TestUtils.java | 9 +-
.../strategy/MongoPipelineStrategy.java | 21 +-
.../strategy/RoundRobinStrategy.java | 17 +-
.../strategy/SailExecutionStrategy.java | 12 +-
.../accumulo/AccumuloPcjSerializer.java | 32 +-
.../accumulo/BindingSetStringConverter.java | 20 +-
.../pcj/storage/accumulo/PcjTables.java | 36 +-
.../pcj/storage/accumulo/VariableOrder.java | 18 +-
.../pcj/storage/mongo/MongoPcjDocuments.java | 45 +-
.../pcj/storage/mongo/MongoPcjStorageIT.java | 26 +-
.../mongo/PcjDocumentsIntegrationTest.java | 21 +-
.../mr/merge/mappers/FileCopyToolMapper.java | 9 +-
.../mr/merge/util/AccumuloQueryRuleset.java | 24 +-
.../mr/merge/util/AccumuloRyaUtils.java | 36 +-
.../rya/accumulo/mr/merge/CopyToolTest.java | 31 +-
.../rya/accumulo/mr/merge/MergeToolTest.java | 29 +-
.../rya/accumulo/mr/merge/util/TestUtils.java | 15 +-
.../indexing/pcj/fluo/api/CreateFluoPcj.java | 45 +-
.../pcj/fluo/app/ConstructProjection.java | 20 +-
.../pcj/fluo/app/FluoStringConverter.java | 22 +-
.../KryoVisibilityBindingSetSerializer.java | 19 +-
.../app/util/BindingHashShardingFunction.java | 19 +-
.../pcj/fluo/app/ConstructGraphTest.java | 30 +-
.../pcj/fluo/app/ConstructGraphTestUtils.java | 15 +-
.../fluo/demo/FluoAndHistoricPcjsDemo.java | 18 +-
.../pcj/fluo/ConstructGraphTestUtils.java | 17 +-
.../prospector/domain/TripleValueType.java | 6 +-
.../apache/rya/prospector/mr/Prospector.java | 30 +-
.../rya/prospector/mr/ProspectorMapper.java | 9 +-
.../rya/prospector/plans/IndexWorkPlan.java | 17 +-
.../rya/prospector/plans/impl/CountPlan.java | 35 +-
.../prospector/service/ProspectorService.java | 22 +-
.../mr/JoinSelectStatisticsTest.java | 88 ++-
.../rya/prospector/mr/ProspectorTest.java | 20 +-
.../ProspectorServiceEvalStatsDAOTest.java | 21 +-
.../mr/DuplicateEliminationTest.java | 8 +-
.../accumulo/mr/GraphXEdgeInputFormat.java | 20 +-
.../rya/accumulo/mr/RyaOutputFormat.java | 25 +-
.../rya/accumulo/mr/RyaTypeWritable.java | 16 +-
.../mr/tools/AccumuloRdfCountTool.java | 19 +-
.../mr/GraphXEdgeInputFormatTest.java | 18 +-
.../rya/accumulo/mr/RyaInputFormatTest.java | 9 +-
.../rya/accumulo/mr/RyaOutputFormatTest.java | 15 +-
.../org/apache/rya/accumulo/mr/TestUtils.java | 34 +-
.../accumulo/pig/StatementPatternStorage.java | 33 +-
.../RdfCloudTripleStoreConnection.java | 41 +-
.../ParallelEvaluationStrategyImpl.java | 6 +-
.../inference/AllValuesFromVisitor.java | 14 +-
.../inference/DomainRangeVisitor.java | 12 +-
.../inference/HasValueVisitor.java | 14 +-
.../inference/InferenceEngine.java | 184 +++--
.../inference/SameAsVisitor.java | 14 +-
.../inference/SomeValuesFromVisitor.java | 14 +-
.../inference/SubClassOfVisitor.java | 10 +-
.../inference/SubPropertyOfVisitor.java | 10 +-
.../java/org/apache/rya/HashJoinTest.java | 182 ++---
.../org/apache/rya/IterativeJoinTest.java | 191 +++---
.../java/org/apache/rya/MergeJoinTest.java | 57 +-
.../evaluation/StatementPatternEvalTest.java | 127 +++-
.../inference/AllValuesFromVisitorTest.java | 25 +-
.../inference/HasValueVisitorTest.java | 33 +-
.../inference/InferenceEngineTest.java | 23 +-
.../rdftriplestore/inference/InferenceIT.java | 24 +-
.../inference/PropertyChainTest.java | 8 +-
.../inference/SomeValuesFromVisitorTest.java | 26 +-
sail/src/test/resources/log4j.xml | 40 ++
.../test/accumulo/RyaTestInstanceRule.java | 8 +-
.../cloud/rdf/web/sail/RdfControllerTest.java | 22 +-
176 files changed, 3915 insertions(+), 2749 deletions(-)
delete mode 100644 common/rya.api/src/main/java/org/apache/rya/api/RdfCloudTripleStoreStatement.java
create mode 100644 common/rya.api/src/main/java/org/apache/rya/api/domain/RyaResource.java
create mode 100644 common/rya.api/src/main/java/org/apache/rya/api/domain/RyaValue.java
delete mode 100644 common/rya.api/src/main/java/org/apache/rya/api/utils/NullableStatementImpl.java
create mode 100644 common/rya.api/src/main/java/org/apache/rya/api/utils/WildcardStatement.java
create mode 100644 dao/accumulo.rya/src/test/java/org/apache/rya/accumulo/query/RangeBindingSetEntriesTest.java
create mode 100644 dao/accumulo.rya/src/test/resources/log4j.xml
create mode 100644 sail/src/test/resources/log4j.xml
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/RdfCloudTripleStoreStatement.java b/common/rya.api/src/main/java/org/apache/rya/api/RdfCloudTripleStoreStatement.java
deleted file mode 100644
index 662d16856..000000000
--- a/common/rya.api/src/main/java/org/apache/rya/api/RdfCloudTripleStoreStatement.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.apache.rya.api;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collection;
-
-import org.eclipse.rdf4j.model.IRI;
-import org.eclipse.rdf4j.model.Resource;
-import org.eclipse.rdf4j.model.Statement;
-import org.eclipse.rdf4j.model.Value;
-import org.eclipse.rdf4j.model.ValueFactory;
-import org.eclipse.rdf4j.model.impl.SimpleStatement;
-import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
-
-public class RdfCloudTripleStoreStatement extends SimpleStatement {
- private static final long serialVersionUID = 1L;
-
- private static final ValueFactory VF = SimpleValueFactory.getInstance();
-
- private Resource[] contexts; //TODO: no blank nodes
-
- public RdfCloudTripleStoreStatement(Resource subject, IRI predicate, Value object) {
- super(subject, predicate, object);
- }
-
- public RdfCloudTripleStoreStatement(Resource subject, IRI predicate, Value object,
- Resource... contexts) {
- super(subject, predicate, object);
- this.contexts = contexts;
- }
-
- public Resource[] getContexts() {
- return contexts;
- }
-
- public Collection getStatements() {
- Collection statements = new ArrayList();
-
- if (getContexts() != null && getContexts().length > 1) {
- for (Resource contxt : getContexts()) {
- statements.add(VF.createStatement(getSubject(),
- getPredicate(), getObject(), contxt));
- }
- } else
- statements.add(this);
-
- return statements;
- }
-
- @Override
- public Resource getContext() {
- if (contexts == null || contexts.length == 0)
- return null;
- else return contexts[0];
- }
-}
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/ExecuteSparqlQuery.java b/common/rya.api/src/main/java/org/apache/rya/api/client/ExecuteSparqlQuery.java
index 212399a41..723b9bdf1 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/client/ExecuteSparqlQuery.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/ExecuteSparqlQuery.java
@@ -18,13 +18,12 @@
*/
package org.apache.rya.api.client;
-import java.io.Closeable;
-
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.TupleQueryResult;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
+import java.io.Closeable;
/**
* Loads a SPARQL Query and executes the query against an instance of Rya.
@@ -41,5 +40,6 @@ public interface ExecuteSparqlQuery extends Closeable {
* @throws InstanceDoesNotExistException No instance of Rya exists for the provided name.
* @throws RyaClientException Something caused the command to fail.
*/
- public TupleQueryResult executeSparqlQuery(String ryaInstanceName, String sparqlQuery) throws InstanceDoesNotExistException, RyaClientException;
+ TupleQueryResult executeSparqlQuery(String ryaInstanceName, String sparqlQuery) throws InstanceDoesNotExistException, RyaClientException;
+
}
\ No newline at end of file
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/domain/RangeIRI.java b/common/rya.api/src/main/java/org/apache/rya/api/domain/RangeIRI.java
index 84b38d1e1..34ee3a40f 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/domain/RangeIRI.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/domain/RangeIRI.java
@@ -20,8 +20,8 @@
*/
-
import org.eclipse.rdf4j.model.IRI;
+import org.eclipse.rdf4j.model.Resource;
/**
* Created by IntelliJ IDEA.
@@ -29,14 +29,14 @@
* Time: 1:03 PM
* To change this template use File | Settings | File Templates.
*/
-public class RangeIRI extends RangeValue implements IRI {
+public class RangeIRI extends RangeValue implements IRI {
- public RangeIRI(IRI start, IRI end) {
+ public RangeIRI(Resource start, Resource end) {
super(start, end);
}
- public RangeIRI(RangeValue rangeValue) {
- super((IRI) rangeValue.getStart(), (IRI) rangeValue.getEnd());
+ public RangeIRI(RangeValue rangeValue) {
+ super(rangeValue.getStart(), rangeValue.getEnd());
}
@Override
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaIRI.java b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaIRI.java
index 8909b056f..b46e73209 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaIRI.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaIRI.java
@@ -19,6 +19,7 @@
* under the License.
*/
+import org.apache.rya.api.resolver.RyaToRdfConversions;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.util.URIUtil;
import org.eclipse.rdf4j.model.vocabulary.XMLSchema;
@@ -28,7 +29,7 @@
* Date: 7/16/12
* Time: 11:56 AM
*/
-public class RyaIRI extends RyaType {
+public class RyaIRI extends RyaType implements RyaResource, IRI {
public RyaIRI() {
setDataType(XMLSchema.ANYURI);
@@ -58,4 +59,19 @@ protected void validate(String data) {
URIUtil.getLocalNameIndex(data);
}
+ @Override
+ public String getNamespace() {
+ return RyaToRdfConversions.convertIRI(this).getNamespace();
+ }
+
+ @Override
+ public String getLocalName() {
+ return RyaToRdfConversions.convertIRI(this).getLocalName();
+ }
+
+ @Override
+ public String stringValue() {
+ return RyaToRdfConversions.convertIRI(this).stringValue();
+ }
+
}
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaIRIRange.java b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaIRIRange.java
index 177048eb5..7243c677a 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaIRIRange.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaIRIRange.java
@@ -20,6 +20,8 @@
*/
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.eclipse.rdf4j.model.Resource;
/**
* Date: 7/17/12
@@ -28,31 +30,35 @@
public class RyaIRIRange extends RyaIRI implements RyaRange {
public static final RyaIRI LAST_IRI = new RyaIRI(((char) 255) + ":#" + ((char) 255));
- private RyaIRI start;
- private RyaIRI stop;
+ private RyaResource start;
+ private RyaResource stop;
public RyaIRIRange() {
super();
}
- public RyaIRIRange(RyaIRI start, RyaIRI stop) {
+ public RyaIRIRange(Resource start, Resource stop) {
+ this(RdfToRyaConversions.convertResource(start), RdfToRyaConversions.convertResource(stop));
+ }
+
+ public RyaIRIRange(RyaResource start, RyaResource stop) {
this.start = start;
this.stop = stop;
}
- public RyaIRI getStart() {
+ public RyaResource getStart() {
return start;
}
- public void setStart(RyaIRI start) {
+ public void setStart(RyaResource start) {
this.start = start;
}
- public RyaIRI getStop() {
+ public RyaResource getStop() {
return stop;
}
- public void setStop(RyaIRI stop) {
+ public void setStop(RyaResource stop) {
this.stop = stop;
}
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaRange.java b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaRange.java
index 15d57161d..553e9dbb5 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaRange.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaRange.java
@@ -26,7 +26,7 @@
* Time: 10:02 AM
*/
public interface RyaRange {
- public RyaType getStart();
+ RyaValue getStart();
- public RyaType getStop();
+ RyaValue getStop();
}
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaResource.java b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaResource.java
new file mode 100644
index 000000000..c831dea3c
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaResource.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.domain;
+
+import org.eclipse.rdf4j.model.Resource;
+
+public interface RyaResource extends RyaValue, Resource {
+ // Empty place holder as common supertype of IRI and BNode
+}
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaStatement.java b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaStatement.java
index 4d870a829..ea5a013a0 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaStatement.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaStatement.java
@@ -18,6 +18,14 @@
*/
package org.apache.rya.api.domain;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.eclipse.rdf4j.model.IRI;
+import org.eclipse.rdf4j.model.Resource;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.model.Value;
+
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -26,61 +34,95 @@
* Time: 7:20 AM
*/
public class RyaStatement {
- private RyaIRI subject;
+
+ private static final Logger logger = Logger.getLogger(RyaStatement.class);
+
+ private RyaResource subject;
private RyaIRI predicate;
- private RyaType object;
- private RyaIRI context;
+ private RyaValue object;
+ private RyaResource context;
private String qualifer;
private byte[] columnVisibility;
private byte[] value;
private Long timestamp;
public RyaStatement() {
+ this(null, null, null);
}
- public RyaStatement(final RyaIRI subject, final RyaIRI predicate, final RyaType object) {
+ public RyaStatement(Resource subject, IRI predicate, Value object) {
this(subject, predicate, object, null);
}
- public RyaStatement(final RyaIRI subject, final RyaIRI predicate, final RyaType object, final RyaIRI context) {
- this(subject, predicate, object, context, null);
+ public RyaStatement(Resource subject, IRI predicate, Value object, Resource context) {
+ this(RdfToRyaConversions.convertResource(subject), RdfToRyaConversions.convertIRI(predicate), RdfToRyaConversions.convertValue(object), RdfToRyaConversions.convertResource(context));
+ }
+
+ public RyaStatement(final RyaResource subject, final RyaIRI predicate, final RyaValue object) {
+ this(subject, predicate, object, null, null, new StatementMetadata());
+ }
+
+ public RyaStatement(final RyaResource subject, final RyaIRI predicate, final RyaValue object, final Long timestamp) {
+ this(subject, predicate, object, null, timestamp);
+ }
+
+ public RyaStatement(final RyaResource subject, final RyaIRI predicate, final RyaValue object, final RyaResource context) {
+ this(subject, predicate, object, context, null, new StatementMetadata());
}
+ public RyaStatement(final RyaResource subject, final RyaIRI predicate, final RyaValue object, final RyaResource context, final Long timestamp) {
+ this(subject, predicate, object, context, null, "".getBytes(StandardCharsets.UTF_8), (byte[]) null, timestamp);
+ }
- public RyaStatement(final RyaIRI subject, final RyaIRI predicate, final RyaType object, final RyaIRI context, final String qualifier) {
+ public RyaStatement(final RyaResource subject, final RyaIRI predicate, final RyaValue object, final RyaResource context, final String qualifier) {
this(subject, predicate, object, context, qualifier, new StatementMetadata());
}
- public RyaStatement(final RyaIRI subject, final RyaIRI predicate, final RyaType object, final RyaIRI context, final String qualifier, final StatementMetadata metadata) {
- this(subject, predicate, object, context, qualifier, metadata, null);
+ public RyaStatement(final RyaResource subject, final RyaIRI predicate, final RyaValue object, final RyaResource context, final String qualifier, final StatementMetadata metadata) {
+ this(subject, predicate, object, context, qualifier, metadata, "".getBytes(StandardCharsets.UTF_8));
}
- public RyaStatement(final RyaIRI subject, final RyaIRI predicate, final RyaType object, final RyaIRI context, final String qualifier, final StatementMetadata metadata, final byte[] columnVisibility) {
- this(subject, predicate, object, context, qualifier, columnVisibility, metadata.toBytes());
+ public RyaStatement(final RyaResource subject, final RyaIRI predicate, final RyaValue object, final RyaResource context, final String qualifier, final StatementMetadata metadata, final byte[] columnVisibility) {
+ this(subject, predicate, object, context, qualifier, metadata, columnVisibility, null);
}
- @Deprecated
- public RyaStatement(final RyaIRI subject, final RyaIRI predicate, final RyaType object, final RyaIRI context, final String qualifier, final byte[] columnVisibility, final byte[] value) {
+ public RyaStatement(final RyaResource subject, final RyaIRI predicate, final RyaValue object, final RyaResource context, final String qualifier, final StatementMetadata metadata, final byte[] columnVisibility, final Long timestamp) {
+ this(subject, predicate, object, context, qualifier, columnVisibility, metadata, timestamp);
+ }
+
+ public RyaStatement(final RyaResource subject, final RyaIRI predicate, final RyaValue object, final RyaResource context, final String qualifier, final byte[] columnVisibility, final byte[] value) {
this(subject, predicate, object, context, qualifier, columnVisibility, value, null);
}
- @Deprecated
- public RyaStatement(final RyaIRI subject, final RyaIRI predicate, final RyaType object, final RyaIRI context, final String qualifier, final byte[] columnVisibility, final byte[] value, final Long timestamp) {
+ public RyaStatement(final RyaResource subject, final RyaIRI predicate, final RyaValue object, final RyaResource context, final String qualifier, final byte[] columnVisibility, final StatementMetadata value, final Long timestamp) {
+ this(subject, predicate, object, context, qualifier, columnVisibility, value != null ? value.toBytes() : null, timestamp);
+ }
+
+ public RyaStatement(final RyaResource subject, final RyaIRI predicate, final RyaValue object, final RyaResource context, final String qualifier, final byte[] columnVisibility, final byte[] value, final Long timestamp) {
this.subject = subject;
this.predicate = predicate;
this.object = object;
this.context = context;
this.qualifer = qualifier;
this.columnVisibility = columnVisibility;
- this.value = value;
+ {
+ // Do not serialise and deserialise the value from JSON because this significantly slows down queries (~35%)
+ if (value != null) {
+ this.value = value;
+ }
+ // Never allow value to be null, because Accumulo can't tell the difference between null and new byte[0]
+ if (this.value == null) {
+ this.value = "".getBytes(StandardCharsets.UTF_8);
+ }
+ }
this.timestamp = timestamp != null ? timestamp : System.currentTimeMillis();
}
- public RyaIRI getSubject() {
+ public RyaResource getSubject() {
return subject;
}
- public void setSubject(final RyaIRI subject) {
+ public void setSubject(final RyaResource subject) {
this.subject = subject;
}
@@ -92,19 +134,19 @@ public void setPredicate(final RyaIRI predicate) {
this.predicate = predicate;
}
- public RyaType getObject() {
+ public RyaValue getObject() {
return object;
}
- public void setObject(final RyaType object) {
+ public void setObject(final RyaValue object) {
this.object = object;
}
- public RyaIRI getContext() {
+ public RyaResource getContext() {
return context;
}
- public void setContext(final RyaIRI context) {
+ public void setContext(final RyaResource context) {
this.context = context;
}
@@ -121,14 +163,18 @@ public StatementMetadata getMetadata() {
// no explicit metadata
try {
return new StatementMetadata(value);
- }
- catch (final Exception ex){
- return null;
+ } catch (final Exception ex) {
+ logger.error("Error converting value to StatementMetadata: ", ex);
+ return new StatementMetadata();
}
}
public void setStatementMetadata(final StatementMetadata metadata){
- this.value = metadata.toBytes();
+ if (metadata == null) {
+ this.value = new byte[0];
+ } else {
+ this.value = metadata.toBytes();
+ }
}
@Deprecated
@@ -233,7 +279,6 @@ public static RyaStatementBuilder builder(final RyaStatement ryaStatement) {
return new RyaStatementBuilder(ryaStatement);
}
-
//builder
public static class RyaStatementBuilder {
@@ -259,7 +304,11 @@ public RyaStatementBuilder setValue(final byte[] value) {
}
public RyaStatementBuilder setMetadata(final StatementMetadata metadata) {
- ryaStatement.setValue(metadata.toBytes());
+ if (metadata == null) {
+ ryaStatement.setValue(new byte[0]);
+ } else {
+ ryaStatement.setValue(metadata.toBytes());
+ }
return this;
}
@@ -273,12 +322,12 @@ public RyaStatementBuilder setQualifier(final String str) {
return this;
}
- public RyaStatementBuilder setContext(final RyaIRI ryaIRI) {
+ public RyaStatementBuilder setContext(final RyaResource ryaIRI) {
ryaStatement.setContext(ryaIRI);
return this;
}
- public RyaStatementBuilder setSubject(final RyaIRI ryaIRI) {
+ public RyaStatementBuilder setSubject(final RyaResource ryaIRI) {
ryaStatement.setSubject(ryaIRI);
return this;
}
@@ -288,7 +337,7 @@ public RyaStatementBuilder setPredicate(final RyaIRI ryaIRI) {
return this;
}
- public RyaStatementBuilder setObject(final RyaType ryaType) {
+ public RyaStatementBuilder setObject(final RyaValue ryaType) {
ryaStatement.setObject(ryaType);
return this;
}
@@ -297,4 +346,9 @@ public RyaStatement build() {
return ryaStatement;
}
}
+
+ public Statement toStatement() {
+ return RyaToRdfConversions.convertStatement(this);
+ }
+
}
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaType.java b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaType.java
index e40c8d62d..e35ba98b0 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaType.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaType.java
@@ -18,23 +18,26 @@
*/
package org.apache.rya.api.domain;
-import java.util.Objects;
-
import org.apache.commons.lang.builder.CompareToBuilder;
import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
import org.eclipse.rdf4j.model.IRI;
+import org.eclipse.rdf4j.model.Resource;
+import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.vocabulary.XMLSchema;
+import java.util.Objects;
+
/**
* Base Rya Type
* Date: 7/16/12
* Time: 11:45 AM
*/
-public class RyaType implements Comparable {
+public class RyaType implements Comparable, RyaValue {
- private IRI dataType;
- private String data;
- private String language;
+ protected IRI dataType;
+ protected String data;
+ protected String language;
/**
* Creates a new instance of {@link RyaType}.
@@ -111,38 +114,34 @@ public void setLanguage(final String language) {
@Override
public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("RyaType");
- sb.append("{dataType=").append(dataType);
- sb.append(", data='").append(data).append('\'');
- if (language != null) {
- sb.append(", language='").append(language).append('\'');
- }
- sb.append('}');
- return sb.toString();
+ return stringValue();
}
/**
* Determine equality based on string representations of data, datatype, and
- * language.
+ * language. Fallback to standard RDF equals if the compared object is not a RyaType.
* @param o The object to compare with
* @return {@code true} if the other object is also a RyaType and the data,
- * datatype, and language all match.
+ * datatype, and language all match. Also return true if {@param o} is a {@link Resource} with the same value.
*/
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
- if (o == null || !(o instanceof RyaType)) {
+ if (!(o instanceof Value)) {
return false;
}
- final RyaType other = (RyaType) o;
- final EqualsBuilder builder = new EqualsBuilder()
- .append(getData(), other.getData())
- .append(getDataType(), other.getDataType())
- .append(getLanguage(), other.getLanguage());
- return builder.isEquals();
+ if (o instanceof RyaType) {
+ final RyaType other = (RyaType) o;
+ final EqualsBuilder builder = new EqualsBuilder()
+ .append(getData(), other.getData())
+ .append(getDataType(), other.getDataType())
+ .append(getLanguage(), other.getLanguage());
+ return builder.isEquals();
+ }
+ // Fall back to the equals method of the Value class.
+ return (((Value) o).stringValue()).equals(this.stringValue());
}
/**
@@ -176,4 +175,10 @@ public int compareTo(final RyaType o) {
.append(getLanguage(), o.getLanguage());
return builder.toComparison();
}
+
+ @Override
+ public String stringValue() {
+ return RyaToRdfConversions.convertLiteral(this).stringValue();
+ }
+
}
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaTypeRange.java b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaTypeRange.java
index 38d57f5fe..c0e9a41d8 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaTypeRange.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaTypeRange.java
@@ -28,30 +28,30 @@
* Time: 9:53 AM
*/
public class RyaTypeRange extends RyaType implements RyaRange {
- private RyaType start;
- private RyaType stop;
+ private RyaValue start;
+ private RyaValue stop;
public RyaTypeRange() {
}
- public RyaTypeRange(RyaType start, RyaType stop) {
+ public RyaTypeRange(RyaValue start, RyaValue stop) {
this.start = start;
this.stop = stop;
}
- public RyaType getStart() {
+ public RyaValue getStart() {
return start;
}
- public void setStart(RyaType start) {
+ public void setStart(RyaValue start) {
this.start = start;
}
- public RyaType getStop() {
+ public RyaValue getStop() {
return stop;
}
- public void setStop(RyaType stop) {
+ public void setStop(RyaValue stop) {
this.stop = stop;
}
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaValue.java b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaValue.java
new file mode 100644
index 000000000..1de594c09
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/domain/RyaValue.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.domain;
+
+import org.eclipse.rdf4j.model.BNode;
+import org.eclipse.rdf4j.model.IRI;
+import org.eclipse.rdf4j.model.Literal;
+import org.eclipse.rdf4j.model.Value;
+
+import java.io.Serializable;
+
+public interface RyaValue extends Serializable, Value {
+
+ /**
+ * Returns the String-value of a Value object. This returns either a {@link Literal}'s label, a
+ * {@link IRI}'s URI or a {@link BNode}'s ID.
+ */
+ String stringValue();
+
+ String getData();
+
+ IRI getDataType();
+
+ String getLanguage();
+
+}
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/domain/StatementMetadata.java b/common/rya.api/src/main/java/org/apache/rya/api/domain/StatementMetadata.java
index 13e656aec..62a61fb07 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/domain/StatementMetadata.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/domain/StatementMetadata.java
@@ -18,14 +18,6 @@
*/
package org.apache.rya.api.domain;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Type;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.rya.api.persist.RdfDAOException;
-import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
-
import com.google.common.base.Preconditions;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
@@ -38,16 +30,24 @@
import com.google.gson.JsonPrimitive;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
+import org.apache.rya.api.persist.RdfDAOException;
+import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
public class StatementMetadata {
- private static Gson gson = new GsonBuilder().enableComplexMapKeySerialization()
+ private final static Gson gson = new GsonBuilder().enableComplexMapKeySerialization()
.registerTypeHierarchyAdapter(RyaType.class, new RyaTypeAdapter()).create();
public static StatementMetadata EMPTY_METADATA = new StatementMetadata();
- private Map metadataMap = new HashMap<>();
+ private Map metadataMap = new HashMap<>();
@SuppressWarnings("serial")
- private Type type = new TypeToken