Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ public class ExportPropertyGraphFromGremlinQueries extends NeptuneExportCommand
@Once
private boolean includeTypeDefinitions = false;

@Option(name = {"--strict-cardinality"}, description = "Use strict cardinality (optional, default 'false').")
@Once
private boolean strictCardinality = false;

@Option(name = {"--timeout-millis"}, description = "Query timeout in milliseconds (optional).")
@Once
private Long timeoutMillis = null;
Expand Down Expand Up @@ -121,7 +125,7 @@ public void run() {
JsonResource<ExportStats, GraphSchema> statsFileResource = directories.statsFileResource();

CsvPrinterOptions csvPrinterOptions = CsvPrinterOptions.builder().setIncludeTypeDefinitions(includeTypeDefinitions).build();
JsonPrinterOptions jsonPrinterOptions = JsonPrinterOptions.builder().setStrictCardinality(true).build();
JsonPrinterOptions jsonPrinterOptions = JsonPrinterOptions.builder().setStrictCardinality(strictCardinality).build();

PropertyGraphTargetConfig targetConfig = target.config(directories, new PrinterOptions(csvPrinterOptions, jsonPrinterOptions));
NamedQueriesCollection namedQueries = getNamedQueriesCollection(queries, queriesFile, queriesResource);
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/amazonaws/services/neptune/io/KinesisConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;

public class KinesisConfig {

private final Stream stream;
Expand Down Expand Up @@ -50,6 +52,14 @@ public boolean isEnableAggregation() {
}

public KinesisConfig(AbstractTargetModule targetModule) {
// Check for Windows and warn user
// https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-supported-plats.html
if (System.getProperty("os.name").toLowerCase().contains("windows")) {
throw new UnsupportedOperationException(
"Kinesis streaming is not supported on Windows due to KPL limitations. " +
"Please use a Linux environment or export to files instead.");
}

if (StringUtils.isNotEmpty(targetModule.getRegion()) && StringUtils.isNotEmpty(targetModule.getStreamName())) {
logger.trace("Constructing new KinesisConfig for stream name: {}, in region: {}, with LargeStreamRecordHandlingStrategy: {} and AggregationEnabled={}",
targetModule.getStreamName(), targetModule.getRegion(), targetModule.getLargeStreamRecordHandlingStrategy(), targetModule.isEnableAggregation());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,27 @@ public void printProperties(Map<?, ?> properties) throws IOException {
}

private void printProperty(Object value, PropertySchema propertySchema) throws IOException {

DataType dataType = propertySchema.dataType();
String formattedKey = propertySchema.nameWithoutDataType();
boolean isMultiValue = propertySchema.isMultiValue();

printProperty(value, dataType, formattedKey, isMultiValue);
if (isMap(value)) {
generator.writeFieldName(formattedKey);
printStartRow();
printNestedProperties((Map<?, ?>) value);
printEndRow();
} else {
DataType dataType = propertySchema.dataType();
boolean isMultiValue = propertySchema.isMultiValue();

printProperty(value, dataType, formattedKey, isMultiValue);
}
}

private void printNestedProperties(Map<?, ?> value) throws IOException {
for (Map.Entry<?, ?> property : value.entrySet()) {
PropertySchema propertySchema = new PropertySchema(property.getKey());
propertySchema.accept(property.getValue(), true);
printProperty(property.getValue(), propertySchema);
}
}

private void printProperty(Object value, DataType dataType, String formattedKey, boolean forceMultiValue) throws IOException {
Expand Down Expand Up @@ -210,4 +225,8 @@ public void close() throws Exception {
private boolean isList(Object value) {
return value instanceof List<?>;
}

private boolean isMap(Object value) {
return value instanceof Map<?, ?>;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public String format(Object value, boolean escapeNewline) {
}

private String escapeNewlineChar(String value) {
return value.replace("\n", "\\n");
return value.replace("\r", "\\r").replace("\n", "\\n");
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,13 @@ public File createDownloadFile(String parent) {
}

public S3ObjectInfo withNewKeySuffix(String suffix) {
File file = StringUtils.isNotEmpty(key) ? new File(key, suffix) : new File(suffix);
return new S3ObjectInfo( String.format("s3://%s/%s", bucket, file.getPath()));
String newKey = StringUtils.isNotEmpty(key) ? key.replaceFirst("([^/])$","$1/") + suffix : suffix;
return new S3ObjectInfo( String.format("s3://%s/%s", bucket, newKey));
}

public S3ObjectInfo replaceOrAppendKey(String placeholder, String ifPresent, String ifAbsent) {

File file = key.contains(placeholder) ?
new File(key.replace(placeholder, ifPresent)) :
new File(key, ifAbsent);

return new S3ObjectInfo( String.format("s3://%s/%s", bucket, file.getPath()));
String finalKey = key.contains(placeholder) ? key.replace(placeholder, ifPresent) : key.replaceFirst("([^/])$|^$","$1/") + ifAbsent;
return new S3ObjectInfo( String.format("s3://%s/%s", bucket, finalKey));
}

public S3ObjectInfo replaceOrAppendKey(String placeholder, String ifPresent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void createsDigestFilePathsForVeryLongFilenames() throws IOException {
Directories directories = Directories.createFor(DirectoryStructure.PropertyGraph, new File("home"), "export-id", "", "");
Path filePath = directories.createFilePath(path, longName, PropertyGraphExportFormat.csv);

assertEquals("/export/8044f12c352773b7ff400ef524da6e90db419e4a.csv", filePath.toString());
assertEquals(File.separator + "export" + File.separator + "8044f12c352773b7ff400ef524da6e90db419e4a.csv", filePath.toString());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/* Note that the Kinesis Producer Library is no longer supported on Windows as of > v0.14.0.
* And this older version is not supported with the AWS Java SDK v2. Since changing this
* approach to use something other than the KPL will require significant effort, these
* tests have been disabled when compiling on Windows. The export to Kinesis functionality
* will not currently work on Windows.
*/
public class KinesisConfigTest {

private AbstractTargetModule target;
Expand All @@ -40,47 +46,65 @@ public void resetTargetModule() {
public void shouldCreateStreamIfNameAndRegionAreProvided() {
when(target.getStreamName()).thenReturn("test");
when(target.getRegion()).thenReturn("us-west-2");
KinesisConfig config = new KinesisConfig(target);

assertNotNull(config.stream());

if (System.getProperty("os.name").toLowerCase().contains("windows")) {
assertThrows(UnsupportedOperationException.class, () -> new KinesisConfig(target));
} else {
KinesisConfig config = new KinesisConfig(target);
assertNotNull(config.stream());
}
}

@Test
public void shouldNotCreateStreamIfNameNotProvided() {
when(target.getStreamName()).thenReturn("");
when(target.getRegion()).thenReturn("us-west-2");
KinesisConfig config = new KinesisConfig(target);

Throwable t = assertThrows(IllegalArgumentException.class, () -> config.stream());
assertEquals("You must supply an AWS Region and Amazon Kinesis Data Stream name", t.getMessage());

if (System.getProperty("os.name").toLowerCase().contains("windows")) {
assertThrows(UnsupportedOperationException.class, () -> new KinesisConfig(target));
} else {
KinesisConfig config = new KinesisConfig(target);
Throwable t = assertThrows(IllegalArgumentException.class, () -> config.stream());
assertEquals("You must supply an AWS Region and Amazon Kinesis Data Stream name", t.getMessage());
}
}

@Test
public void shouldNotCreateStreamIfRegionNotProvided() {
when(target.getStreamName()).thenReturn("test");
when(target.getRegion()).thenReturn("");
KinesisConfig config = new KinesisConfig(target);

Throwable t = assertThrows(IllegalArgumentException.class, () -> config.stream());
assertEquals("You must supply an AWS Region and Amazon Kinesis Data Stream name", t.getMessage());

if (System.getProperty("os.name").toLowerCase().contains("windows")) {
assertThrows(UnsupportedOperationException.class, () -> new KinesisConfig(target));
} else {
KinesisConfig config = new KinesisConfig(target);
Throwable t = assertThrows(IllegalArgumentException.class, () -> config.stream());
assertEquals("You must supply an AWS Region and Amazon Kinesis Data Stream name", t.getMessage());
}
}

@Test
public void shouldUseProvidedCredentialsProvider() throws InterruptedException {
when(target.getStreamName()).thenReturn("test");
when(target.getRegion()).thenReturn("us-west-2");
AwsCredentialsProvider credentialsProvider = spy(AnonymousCredentialsProvider.create());
when(target.getCredentialsProvider()).thenReturn(credentialsProvider);

KinesisConfig config = new KinesisConfig(target);
try {
config.stream().publish("test");
} catch (Exception e) {
// expected to fail as anonymous credentials are unauthorized.

if (System.getProperty("os.name").toLowerCase().contains("windows")) {
assertThrows(UnsupportedOperationException.class, () -> new KinesisConfig(target));
} else {
AwsCredentialsProvider credentialsProvider = spy(AnonymousCredentialsProvider.create());
when(target.getCredentialsProvider()).thenReturn(credentialsProvider);

KinesisConfig config = new KinesisConfig(target);
try {
config.stream().publish("test");
} catch (Exception e) {
// expected to fail as anonymous credentials are unauthorized.
}

java.lang.Thread.sleep(100); // Kinesis writing is done asynchronously. Wait to ensure credentials first have time to resolve.

verify(credentialsProvider, Mockito.atLeast(1)).resolveCredentials();
}

java.lang.Thread.sleep(100); // Kinesis writing is done asynchronously. Wait to ensure credentials first have time to resolve.

verify(credentialsProvider, Mockito.atLeast(1)).resolveCredentials();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,24 @@ public void testExportStats() throws JsonProcessingException {
String formattedStats = stats.formatStats(schema);

String expectedStats =
"Source:\n" +
" Nodes: 0\n" +
" Edges: 0\n" +
"Export:\n" +
" Nodes: 2\n" +
" Edges: 2\n" +
" Properties: 0\n" +
"Details:\n" +
" Nodes: \n" +
" node1: 1\n" +
" |_ prop1 {propertyCount=0, minCardinality=-1, maxCardinality=-1, recordCount=0, dataTypeCounts=[]}\n" +
" |_ prop2 {propertyCount=0, minCardinality=-1, maxCardinality=-1, recordCount=0, dataTypeCounts=[]}\n" +
" node2: 1\n" +
" Edges: \n" +
" edge2: 1\n" +
" edge1: 1\n" +
" |_ prop1 {propertyCount=0, minCardinality=-1, maxCardinality=-1, recordCount=0, dataTypeCounts=[]}\n" +
" |_ prop2 {propertyCount=0, minCardinality=-1, maxCardinality=-1, recordCount=0, dataTypeCounts=[]}\n";
"Source:" + System.lineSeparator() +
" Nodes: 0" + System.lineSeparator() +
" Edges: 0" + System.lineSeparator() +
"Export:" + System.lineSeparator() +
" Nodes: 2" + System.lineSeparator() +
" Edges: 2" + System.lineSeparator() +
" Properties: 0" + System.lineSeparator() +
"Details:" + System.lineSeparator() +
" Nodes: " + System.lineSeparator() +
" node1: 1" + System.lineSeparator() +
" |_ prop1 {propertyCount=0, minCardinality=-1, maxCardinality=-1, recordCount=0, dataTypeCounts=[]}" + System.lineSeparator() +
" |_ prop2 {propertyCount=0, minCardinality=-1, maxCardinality=-1, recordCount=0, dataTypeCounts=[]}" + System.lineSeparator() +
" node2: 1" + System.lineSeparator() +
" Edges: " + System.lineSeparator() +
" edge2: 1" + System.lineSeparator() +
" edge1: 1" + System.lineSeparator() +
" |_ prop1 {propertyCount=0, minCardinality=-1, maxCardinality=-1, recordCount=0, dataTypeCounts=[]}" + System.lineSeparator() +
" |_ prop2 {propertyCount=0, minCardinality=-1, maxCardinality=-1, recordCount=0, dataTypeCounts=[]}" + System.lineSeparator();

assertEquals(expectedStats, formattedStats);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void shouldNotEscapeNewlineCharAfterPrintPropertiesToCSVAndRewrite() thro

@Test
public void shouldNotEscapeNewlineAfterPrintPropertiesToCSVAndRewrite() throws Exception {
testEscapeCharacterAfterPrintPropertiesAndRewrite("A" + System.lineSeparator() + "B", "\"A\nB\"",
testEscapeCharacterAfterPrintPropertiesAndRewrite("A" + System.lineSeparator() + "B", "\"A" + System.lineSeparator() + "B\"",
new PrinterOptions(CsvPrinterOptions.builder().build()));
}

Expand All @@ -158,8 +158,11 @@ public void shouldEscapeNewlineCharSetTrueAfterPrintPropertiesToCSVAndRewrite()

@Test
public void shouldEscapeNewlineSetTrueAfterPrintPropertiesToCSVAndRewrite() throws Exception {
testEscapeCharacterAfterPrintPropertiesAndRewrite("A" + System.lineSeparator() + "B",
"\"A\\nB\"",
String lineSep = System.lineSeparator();
String expectedEscaped = lineSep.equals("\r\n") ? "\"A\\r\\nB\"" : "\"A\\nB\"";

testEscapeCharacterAfterPrintPropertiesAndRewrite("A" + lineSep + "B",
expectedEscaped,
new PrinterOptions(CsvPrinterOptions.builder().setEscapeNewline(true).build()));
}

Expand Down Expand Up @@ -213,7 +216,7 @@ private void testEscapeCharacterAfterPrintPropertiesAndRewrite(String originalVa
// what CSVFormat read in from printed CSV should be the original value
if (printerOptions.csv().escapeNewline()){
// parsed record will contain escaped newline, to compare to original we have to unescape it
assertEquals(originalValue, record.get("property1").replace("\\n", "\n"));
assertEquals(originalValue, record.get("property1").replace("\\n", "\n").replace("\\r","\r"));
} else {
assertEquals(originalValue, record.get("property1"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,30 @@ public void shouldPrintMultiValueListAsArrayIrrespectiveOfWhetherMultiValueIsTru
stringWriter.toString());
}

@Test
public void shouldPrintNestedPropertiesMapAsJsonObject() throws Exception {
StringWriter stringWriter = new StringWriter();

PropertySchema propertySchema1 = new PropertySchema("property", false, DataType.String, true, EnumSet.noneOf(DataType.class));

LabelSchema labelSchema = new LabelSchema(new Label("Entity"));
labelSchema.put("property", propertySchema1);

Map<?, ?> props = map(entry("property", map(
entry("nestedProperty1", "value1"),
entry("nestedProperty2", "value2"))));

try (PropertyGraphPrinter propertyGraphPrinter = PropertyGraphExportFormat.json.createPrinter(new PrintOutputWriter("outputId", stringWriter), labelSchema, PrinterOptions.NULL_OPTIONS)) {
propertyGraphPrinter.printStartRow();
propertyGraphPrinter.printProperties(props);
propertyGraphPrinter.printEndRow();
}

assertEquals(
"{\"property\":{\"nestedProperty1\":\"value1\",\"nestedProperty2\":\"value2\"}}",
stringWriter.toString());
}

@Test
public void appendsPreviouslyUnseenValuesToObjectWhenInferringSchema() throws IOException {

Expand All @@ -197,10 +221,10 @@ public void appendsPreviouslyUnseenValuesToObjectWhenInferringSchema() throws IO
map(entry("fname", "fname5"), entry("lname", "lname5"), entry("age", 50))
);

String expectedOutput = "{\"fname\":\"fname1\"}\n" +
"{\"fname\":\"fname2\",\"lname\":\"lname2\"}\n" +
"{\"fname\":\"fname3\",\"age\":30}\n" +
"{\"lname\":\"lname4\",\"age\":40}\n" +
String expectedOutput = "{\"fname\":\"fname1\"}" + System.lineSeparator() +
"{\"fname\":\"fname2\",\"lname\":\"lname2\"}" + System.lineSeparator() +
"{\"fname\":\"fname3\",\"age\":30}" + System.lineSeparator() +
"{\"lname\":\"lname4\",\"age\":40}" + System.lineSeparator() +
"{\"fname\":\"fname5\",\"lname\":\"lname5\",\"age\":50}";

assertEquals(expectedOutput, stringWriter.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ public void appendsPreviouslyUnseenColumnsToEndOfRow() throws IOException {
map(entry("fname", "fname5"), entry("lname", "lname5"), entry("age", 50))
);

String expectedOutput = "\"fname1\"\n" +
"\"fname2\",\"lname2\"\n" +
"\"fname3\",,30\n" +
",\"lname4\",40\n" +
"\"fname5\",\"lname5\",50\n";
String expectedOutput = "\"fname1\"" + System.lineSeparator() +
"\"fname2\",\"lname2\"" + System.lineSeparator() +
"\"fname3\",,30" + System.lineSeparator() +
",\"lname4\",40" + System.lineSeparator() +
"\"fname5\",\"lname5\",50" + System.lineSeparator();

assertEquals(expectedOutput, stringWriter.toString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void shouldNotEscapeNewlineChar(){
@Test
public void shouldNotEscapeNewline(){
String result = DataType.String.format("A" + System.lineSeparator() + "B");
assertEquals("\"A\nB\"", result);
assertEquals("\"A" + System.lineSeparator() + "B\"", result);
}

@Test
Expand All @@ -88,8 +88,11 @@ public void shouldEscapeNewlineCharIfEscapeNewlineSetToTrue(){

@Test
public void shouldEscapeNewlineIfEscapeNewlineSetToTrue(){
String result = DataType.String.format("A" + System.lineSeparator() + "B", true);
assertEquals("\"A\\nB\"", result);
String result1 = DataType.String.format("A\r\nB", true);
assertEquals("\"A\\r\\nB\"", result1);

String result2 = DataType.String.format("A\nB", true);
assertEquals("\"A\\nB\"", result2);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void testExecuteTupleQuerySelectAll() throws Exception {
// Test data does not have named graphs but a ?g binding is required by TupleQueryHandler
client.executeTupleQuery("SELECT * WHERE { BIND(<http://aws.amazon.com/neptune/csv2rdf/graph/version> AS ?g) ?s ?p ?o }", getMockTargetConfig(outputWriter));

assertEquals(testDataNTriples, outputWriter.toString());
assertEquals(testDataNTriples, outputWriter.toString().replace("\\r\\n", "\\n").replace("\\r", "\\n"));
}

@Test
Expand Down
Loading