Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,14 @@ When deployed as a Lambda function, _neptune-export_ will automatically copy the

The Lambda function expects a number of parameters, which you can supply either as [environment variables](https://docs.aws.amazon.com/lambda/latest/dg/env_variables.html) or via a JSON input parameter. Fields in the JSON input parameter override any environment variables you have set up.

| Environment Variable | JSON Field | Description ||
| ---- | ---- | ---- | ---- |
| `COMMAND` | `command` | _neptune-export_ command and command-line options: e.g. `export-pg -e <neptune_endpoint>` | Mandatory |
| `OUTPUT_S3_PATH` | `outputS3Path` | S3 location to which exported files will be written | Mandatory |
| `CONFIG_FILE_S3_PATH` | `configFileS3Path` | S3 location of a JSON config file to be used when exporting a property graph from a config file | Optional |
| `COMPLETION_FILE_S3_PATH` | `completionFileS3Path` | S3 location to which a completion file should be written once all export files have been copied to S3 | Optional |
| `SSE_KMS_KEY_ID` | `sseKmsKeyId` | ID of the customer managed AWS-KMS symmetric encryption key to used for server-side encryption when exporting to S3 | Optional |
| Environment Variable | JSON Field | Description ||
| ---- | ---- |-----------------------------------------------------------------------------------------------------------------------------------------| ---- |
| `COMMAND` | `command` | _neptune-export_ command and command-line options: e.g. `export-pg -e <neptune_endpoint>` | Mandatory |
| `OUTPUT_S3_PATH` | `outputS3Path` | S3 location to which exported files will be written | Mandatory |
| `CONFIG_FILE_S3_PATH` | `configFileS3Path` | S3 location of a JSON config file to be used when exporting a property graph from a config file | Optional |
| `COMPLETION_FILE_S3_PATH` | `completionFileS3Path` | S3 location to which a completion file should be written once all export files have been copied to S3 | Optional |
| `SSE_KMS_KEY_ID` | `sseKmsKeyId` | ID of the customer managed AWS-KMS symmetric encryption key to used for server-side encryption when exporting to S3 | Optional |
| `EXPECTED_BUCKET_OWNER` | `expectedBucketOwner` | Expected bucket owner account ID for S3 bucket verification. When provided, verifies the bucket is owned by the specified AWS account. If not provided, defaults to the account ID resolved from the credentials used for S3 operations. | Optional |

## Samples

Expand Down
9 changes: 9 additions & 0 deletions docs/export-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
[ "s3RoleExternalId" : <s3RoleExternalId>, ]
[ "s3RoleSessionName": <s3RoleSessionName>, ]
[ "sseKmsKeyId" : <sseKmsKeyId>, ]
[ "expectedBucketOwner" : <expectedBucketOwner>, ]
[ "uploadToS3OnError" : <uploadToS3OnError>, ]
}'

Expand Down Expand Up @@ -153,6 +154,14 @@
This option may occur a maximum of 1 times


"expectedBucketOwner" : <expectedBucketOwner>
Expected bucket owner account ID for S3 bucket verification. When provided,
verifies the bucket is owned by the specified AWS account. If not provided,
defaults to the account ID resolved from the credentials used for S3 operations.

This option may occur a maximum of 1 times


"uploadToS3OnError" : <uploadToS3OnError>
Set as True to upload partial results to Amazon S3 if the export job fails

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public static Tagging createObjectTags(Collection<String> profiles) {
private final AtomicReference<S3ObjectInfo> result = new AtomicReference<>();
private static final Pattern STATUS_CODE_5XX_PATTERN = Pattern.compile("Status Code: (5\\d+)");
private final String sseKmsKeyId;
private final String expectedBucketOwner;
private final AwsCredentialsProvider s3CredentialsProvider;

public ExportToS3NeptuneExportEventHandler(String localOutputPath,
Expand All @@ -128,6 +129,7 @@ public ExportToS3NeptuneExportEventHandler(String localOutputPath,
Collection<String> profiles,
Collection<CompletionFileWriter> completionFileWriters,
String sseKmsKeyId,
String expectedBucketOwner,
AwsCredentialsProvider s3CredentialsProvider) {
this.localOutputPath = localOutputPath;
this.outputS3Path = outputS3Path;
Expand All @@ -139,6 +141,7 @@ public ExportToS3NeptuneExportEventHandler(String localOutputPath,
this.profiles = profiles;
this.completionFileWriters = completionFileWriters;
this.sseKmsKeyId = sseKmsKeyId;
this.expectedBucketOwner = expectedBucketOwner;
this.s3CredentialsProvider = s3CredentialsProvider;
}

Expand Down Expand Up @@ -238,9 +241,8 @@ private void uploadGcLogToS3(S3TransferManager transferManager,
S3ObjectInfo gcLogS3ObjectInfo = outputS3ObjectInfo.withNewKeySuffix("gc.log");

try {

UploadFileRequest uploadFileRequest = UploadFileRequest.builder()
.putObjectRequest(configureServerSideEncryption(PutObjectRequest.builder(), sseKmsKeyId)
.putObjectRequest(configureServerSideEncryption(PutObjectRequest.builder(), sseKmsKeyId, expectedBucketOwner)
.bucket(gcLogS3ObjectInfo.bucket())
.key(gcLogS3ObjectInfo.key())
.tagging(createObjectTags(profiles))
Expand Down Expand Up @@ -316,6 +318,7 @@ private void uploadCompletionFileToS3(S3TransferManager transferManager,
.putObjectRequest(PutObjectRequest.builder()
.bucket(completionFileS3ObjectInfo.bucket())
.key(completionFileS3ObjectInfo.key())
.expectedBucketOwner(expectedBucketOwner)
.metadata(S3ObjectInfo.createObjectMetadata(completionFile.length(), sseKmsKeyId))
.tagging(createObjectTags(profiles))
.build())
Expand Down Expand Up @@ -344,15 +347,15 @@ private void uploadExportFilesToS3(S3TransferManager transferManager, File direc
while (allowRetry){
try {
logger.info("Uploading export files to {}", outputS3ObjectInfo.toString());

UploadDirectoryRequest uploadRequest = UploadDirectoryRequest.builder()
.source(directory.toPath())
.bucket(outputS3ObjectInfo.bucket())
.s3Prefix(outputS3ObjectInfo.key())
.uploadFileRequestTransformer(builder -> {
UploadFileRequest built = builder.build();
PutObjectRequest.Builder newBuilder = built.putObjectRequest().toBuilder();
newBuilder = configureServerSideEncryption(newBuilder, sseKmsKeyId).tagging(createObjectTags(profiles));
newBuilder = configureServerSideEncryption(newBuilder, sseKmsKeyId, expectedBucketOwner).tagging(createObjectTags(profiles));
builder.putObjectRequest(newBuilder.build());
})
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co
json.path("sseKmsKeyId").textValue() :
EnvironmentVariableUtils.getOptionalEnv("SSE_KMS_KEY_ID", "");

String expectedBucketOwner = json.has("expectedBucketOwner") ?
json.path("expectedBucketOwner").textValue() :
EnvironmentVariableUtils.getOptionalEnv("EXPECTED_BUCKET_OWNER", "");

boolean createExportSubdirectory = Boolean.parseBoolean(
json.has("createExportSubdirectory") ?
json.path("createExportSubdirectory").toString() :
Expand Down Expand Up @@ -134,6 +138,10 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co

AwsCredentialsProvider s3CredentialsProvider = getS3CredentialsProvider(json, params, s3Region);

String resolvedExpectedBucketOwner = StringUtils.isNotBlank(expectedBucketOwner) ?
expectedBucketOwner :
s3CredentialsProvider.resolveCredentials().accountId().orElse("");

logger.log("cmd : " + cmd);
logger.log("params : " + params.toPrettyString());
logger.log("outputS3Path : " + outputS3Path);
Expand All @@ -145,6 +153,7 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co
logger.log("completionFileS3Path : " + completionFileS3Path);
logger.log("s3Region : " + s3Region);
logger.log("sseKmsKeyId : " + maskedKeyId);
logger.log("expectedBucketOwner : " + resolvedExpectedBucketOwner);
logger.log("completionFilePayload : " + completionFilePayload.toPrettyString());
logger.log("additionalParams : " + additionalParams.toPrettyString());
logger.log("maxFileDescriptorCount : " + maxFileDescriptorCount);
Expand Down Expand Up @@ -172,6 +181,7 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co
s3Region,
maxFileDescriptorCount,
sseKmsKeyId,
resolvedExpectedBucketOwner,
s3CredentialsProvider);

S3ObjectInfo outputS3ObjectInfo = neptuneExportService.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class NeptuneExportService {
private final String s3Region;
private final int maxFileDescriptorCount;
private final String sseKmsKeyId;
private final String expectedBucketOwner;
private final AwsCredentialsProvider s3CredentialsProvider;

public NeptuneExportService(String cmd,
Expand All @@ -84,6 +85,7 @@ public NeptuneExportService(String cmd,
String s3Region,
int maxFileDescriptorCount,
String sseKmsKeyId,
String expectedBucketOwner,
AwsCredentialsProvider s3CredentialsProvider) {
this.cmd = cmd;
this.localOutputPath = localOutputPath;
Expand All @@ -101,6 +103,7 @@ public NeptuneExportService(String cmd,
this.s3Region = s3Region;
this.maxFileDescriptorCount = maxFileDescriptorCount;
this.sseKmsKeyId = sseKmsKeyId;
this.expectedBucketOwner = expectedBucketOwner;
this.s3CredentialsProvider = s3CredentialsProvider;
}

Expand Down Expand Up @@ -184,6 +187,7 @@ public S3ObjectInfo execute() throws IOException {
profiles,
completionFileWriters,
sseKmsKeyId,
expectedBucketOwner,
s3CredentialsProvider);

eventHandlerCollection.addHandler(exportToS3EventHandler);
Expand All @@ -207,6 +211,7 @@ public S3ObjectInfo execute() throws IOException {
args,
profiles,
sseKmsKeyId,
expectedBucketOwner,
s3CredentialsProvider);
eventHandlerCollection.addHandler(neptuneMlEventHandler);
} else {
Expand All @@ -219,6 +224,7 @@ public S3ObjectInfo execute() throws IOException {
args,
profiles,
sseKmsKeyId,
expectedBucketOwner,
s3CredentialsProvider);
eventHandlerCollection.addHandler(neptuneMlEventHandler);
}
Expand Down Expand Up @@ -256,6 +262,7 @@ private void checkS3OutputIsEmpty() {
.bucket(s3ObjectInfo.bucket())
.prefix(s3ObjectInfo.key())
.maxKeys(1)
.expectedBucketOwner(expectedBucketOwner)
.build()
);

Expand Down Expand Up @@ -294,6 +301,7 @@ private File downloadFile(S3TransferManager transferManager, String s3Path) {
.getObjectRequest(GetObjectRequest.builder()
.bucket(configFileS3ObjectInfo.bucket())
.key(configFileS3ObjectInfo.key())
.expectedBucketOwner(expectedBucketOwner)
.build())
.destination(file.toPath())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class NeptuneMachineLearningExportEventHandlerV1 implements NeptuneExport
private final boolean createExportSubdirectory;
private final PrinterOptions printerOptions;
private final String sseKmsKeyId;
private final String expectedBucketOwner;
private final AwsCredentialsProvider s3CredentialsProvider;

public NeptuneMachineLearningExportEventHandlerV1(String outputS3Path,
Expand All @@ -76,6 +77,7 @@ public NeptuneMachineLearningExportEventHandlerV1(String outputS3Path,
Args args,
Collection<String> profiles,
String sseKmsKeyId,
String expectedBucketOwner,
AwsCredentialsProvider s3CredentialsProvider) {
logger.info("Adding neptune_ml event handler");

Expand All @@ -95,6 +97,7 @@ public NeptuneMachineLearningExportEventHandlerV1(String outputS3Path,
this.profiles = profiles;
this.printerOptions = new PrinterOptions(csvPrinterOptions, jsonPrinterOptions);
this.sseKmsKeyId = sseKmsKeyId;
this.expectedBucketOwner = expectedBucketOwner;
this.s3CredentialsProvider = s3CredentialsProvider;
}

Expand Down Expand Up @@ -230,9 +233,10 @@ private void uploadTrainingJobConfigurationFileToS3(String filename,
try {
UploadFileRequest uploadFileRequest = UploadFileRequest.builder()
.source(trainingJobConfigurationFile)
.putObjectRequest(configureServerSideEncryption(PutObjectRequest.builder(), sseKmsKeyId)
.putObjectRequest(configureServerSideEncryption(PutObjectRequest.builder(), sseKmsKeyId, expectedBucketOwner)
.bucket(s3ObjectInfo.bucket())
.key(s3ObjectInfo.key())
.expectedBucketOwner(expectedBucketOwner)
.tagging(ExportToS3NeptuneExportEventHandler.createObjectTags(profiles))
.build())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class NeptuneMachineLearningExportEventHandlerV2 implements NeptuneExport
private final PrinterOptions printerOptions;
private final boolean includeEdgeFeatures;
private final String sseKmsKeyId;
private final String expectedBucketOwner;
private final AwsCredentialsProvider s3CredentialsProvider;

public NeptuneMachineLearningExportEventHandlerV2(String outputS3Path,
Expand All @@ -79,6 +80,7 @@ public NeptuneMachineLearningExportEventHandlerV2(String outputS3Path,
Args args,
Collection<String> profiles,
String sseKmsKeyId,
String expectedBucketOwner,
AwsCredentialsProvider s3CredentialsProvider) {
logger.info("Adding neptune_ml event handler");

Expand All @@ -100,6 +102,7 @@ public NeptuneMachineLearningExportEventHandlerV2(String outputS3Path,
this.printerOptions = new PrinterOptions(csvPrinterOptions, jsonPrinterOptions);
this.includeEdgeFeatures = shouldIncludeEdgeFeatures(additionalParams);
this.sseKmsKeyId = sseKmsKeyId;
this.expectedBucketOwner = expectedBucketOwner;
this.s3CredentialsProvider = s3CredentialsProvider;
}

Expand Down Expand Up @@ -231,9 +234,10 @@ private void uploadTrainingJobConfigurationFileToS3(String filename,
try {
UploadFileRequest uploadFileRequest = UploadFileRequest.builder()
.source(trainingJobConfigurationFile)
.putObjectRequest(configureServerSideEncryption(PutObjectRequest.builder(), sseKmsKeyId)
.putObjectRequest(configureServerSideEncryption(PutObjectRequest.builder(), sseKmsKeyId, expectedBucketOwner)
.bucket(s3ObjectInfo.bucket())
.key(s3ObjectInfo.key())
.expectedBucketOwner(expectedBucketOwner)
.tagging(ExportToS3NeptuneExportEventHandler.createObjectTags(profiles))
.build())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ private JsonNode getFromFile() throws IOException {
private JsonNode getFromS3() throws IOException {
S3ObjectInfo s3ObjectInfo = new S3ObjectInfo(resourcePath.toString());
S3Client s3 = S3Client.create();
try (InputStream stream = s3.getObject(GetObjectRequest.builder().bucket(s3ObjectInfo.bucket()).key(s3ObjectInfo.key()).build())){

try (InputStream stream = s3.getObject(GetObjectRequest.builder()
.bucket(s3ObjectInfo.bucket())
.key(s3ObjectInfo.key())
.build())){
return new ObjectMapper().readTree(stream);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ public String key() {
}

public static PutObjectRequest.Builder configureServerSideEncryption(PutObjectRequest.Builder putObjectRequestBuilder, String sseKmsKeyId) {
return configureServerSideEncryption(putObjectRequestBuilder, sseKmsKeyId, null);
}

public static PutObjectRequest.Builder configureServerSideEncryption(PutObjectRequest.Builder putObjectRequestBuilder, String sseKmsKeyId, String expectedBucketOwner) {
if (!StringUtils.isBlank(expectedBucketOwner)) {
putObjectRequestBuilder = putObjectRequestBuilder.expectedBucketOwner(expectedBucketOwner);
}
if (!StringUtils.isBlank(sseKmsKeyId)) {
return putObjectRequestBuilder
.serverSideEncryption(ServerSideEncryption.AWS_KMS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public void shouldThrowErrorIfDirectoryMissing() {
Collections.EMPTY_SET,
Collections.EMPTY_SET,
"",
"",
mock(AwsCredentialsProvider.class)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private NeptuneMachineLearningExportEventHandlerV1 createEmptyHandler() {
new Args(new String[]{}),
Collections.EMPTY_SET,
"",
"",
mock(AwsCredentialsProvider.class)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private NeptuneMachineLearningExportEventHandlerV2 createEmptyHandler() {
new Args(new String[]{}),
Collections.EMPTY_SET,
"",
"",
mock(AwsCredentialsProvider.class)
);
}
Expand Down
Loading
Loading