diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 344e30c4429e..e610aa056a7c 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -703,6 +703,12 @@ language governing permissions and limitations under the License. -->
1.7.0-SNAPSHOT
nar
+
+ org.apache.nifi
+ nifi-proxy-configuration-nar
+ 1.7.0-SNAPSHOT
+ nar
+
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml
index caa95dc74bf8..e41a921cbc11 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/pom.xml
@@ -76,6 +76,10 @@
nifi-utils
1.7.0-SNAPSHOT
+
+ org.apache.nifi
+ nifi-proxy-configuration-api
+
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
index a0918128f632..285ed62975c0 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
@@ -28,6 +28,7 @@
import com.amazonaws.regions.Regions;
import java.io.File;
import java.io.IOException;
+import java.net.Proxy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -52,6 +53,8 @@
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.credentials.provider.factory.CredentialPropertyDescriptors;
import org.apache.nifi.processors.aws.regions.AWSRegions;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.ssl.SSLContextService;
/**
@@ -92,6 +95,25 @@ public abstract class AbstractAWSProcessor customValidate(final ValidationContext va
problems.add(new ValidationResult.Builder().input("Proxy Host Port").valid(false).explanation("Both proxy host and port must be set").build());
}
+ ProxyConfiguration.validateProxySpec(validationContext, problems, PROXY_SPECS);
+
return problems;
}
@@ -192,11 +219,31 @@ protected ClientConfiguration createConfiguration(final ProcessContext context)
}
}
- if (context.getProperty(PROXY_HOST).isSet()) {
- String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
- config.setProxyHost(proxyHost);
- Integer proxyPort = context.getProperty(PROXY_HOST_PORT).evaluateAttributeExpressions().asInteger();
- config.setProxyPort(proxyPort);
+ final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> {
+ if (context.getProperty(PROXY_HOST).isSet()) {
+ final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
+ String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
+ Integer proxyPort = context.getProperty(PROXY_HOST_PORT).evaluateAttributeExpressions().asInteger();
+ String proxyUsername = context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue();
+ String proxyPassword = context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
+ componentProxyConfig.setProxyType(Proxy.Type.HTTP);
+ componentProxyConfig.setProxyServerHost(proxyHost);
+ componentProxyConfig.setProxyServerPort(proxyPort);
+ componentProxyConfig.setProxyUserName(proxyUsername);
+ componentProxyConfig.setProxyUserPassword(proxyPassword);
+ return componentProxyConfig;
+ }
+ return ProxyConfiguration.DIRECT_CONFIGURATION;
+ });
+
+ if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
+ config.setProxyHost(proxyConfig.getProxyServerHost());
+ config.setProxyPort(proxyConfig.getProxyServerPort());
+
+ if (proxyConfig.hasCredential()) {
+ config.setProxyUsername(proxyConfig.getProxyUserName());
+ config.setProxyPassword(proxyConfig.getProxyUserPassword());
+ }
}
return config;
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java
index be8433018e06..f0273e5178c6 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java
@@ -174,7 +174,7 @@ public ValidationResult validate(String subject, String input, ValidationContext
Collections.unmodifiableList(
Arrays.asList(NAMESPACE, METRIC_NAME, VALUE, MAXIMUM, MINIMUM, SAMPLECOUNT, SUM, TIMESTAMP,
UNIT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE,
- TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT)
+ TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD)
);
private volatile Set dynamicPropertyNames = new HashSet<>();
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
index ee614a6039b0..3c9f73b45a5b 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
@@ -74,7 +74,8 @@ public class DeleteDynamoDB extends AbstractWriteDynamoDBProcessor {
public static final List properties = Collections.unmodifiableList(
Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE,
HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY,
- CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE));
+ CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE,
+ PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
@Override
protected List getSupportedPropertyDescriptors() {
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
index 73c9f9a075c6..328dae66cfe3 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
@@ -80,7 +80,8 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor {
public static final List properties = Collections.unmodifiableList(
Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE,
HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, JSON_DOCUMENT, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY,
- CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE));
+ CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE,
+ PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found")
.description("FlowFiles are routed to not found relationship if key not found in the table").build();
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
index 08a4b234118a..92e552a15937 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
@@ -84,7 +84,8 @@ public class PutDynamoDB extends AbstractWriteDynamoDBProcessor {
public static final List properties = Collections.unmodifiableList(
Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, RANGE_KEY_VALUE,
HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, JSON_DOCUMENT, DOCUMENT_CHARSET, BATCH_SIZE,
- REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE));
+ REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE,
+ PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
/**
* Dyamodb max item size limit 400 kb
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
index 8c8856d66605..8110a9a47ee9 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
@@ -72,7 +72,7 @@ public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
public static final List properties = Collections.unmodifiableList(
Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
- PROXY_HOST, PROXY_HOST_PORT, ENDPOINT_OVERRIDE));
+ PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, ENDPOINT_OVERRIDE));
/**
* Max buffer size 1 MB
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java
index 14bdf828ca66..13aedfe46aa3 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java
@@ -80,7 +80,7 @@ public class PutKinesisStream extends AbstractKinesisStreamProcessor {
public static final List properties = Collections.unmodifiableList(
Arrays.asList(KINESIS_STREAM_NAME, KINESIS_PARTITION_KEY, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
- AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_HOST, PROXY_HOST_PORT, ENDPOINT_OVERRIDE));
+ AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, ENDPOINT_OVERRIDE));
/** A random number generator for cases where partition key is not available */
protected Random randomParitionKeyGenerator = new Random();
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/lambda/PutLambda.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/lambda/PutLambda.java
index 6daf19b2b23d..b14836d21db9 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/lambda/PutLambda.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/lambda/PutLambda.java
@@ -127,8 +127,8 @@ public class PutLambda extends AbstractAWSLambdaProcessor {
public static final long MAX_REQUEST_SIZE = 6 * 1000 * 1000;
public static final List properties = Collections.unmodifiableList(
- Arrays.asList(AWS_LAMBDA_FUNCTION_NAME, AWS_LAMBDA_FUNCTION_QUALIFIER, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT
- ));
+ Arrays.asList(AWS_LAMBDA_FUNCTION_NAME, AWS_LAMBDA_FUNCTION_QUALIFIER, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
+ PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
@Override
protected List getSupportedPropertyDescriptors() {
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
index f2de7946f3cb..7d2cce93eebf 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -59,7 +59,7 @@ public class DeleteS3Object extends AbstractS3Processor {
public static final List properties = Collections.unmodifiableList(
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, TIMEOUT, VERSION_ID,
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
- SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT));
+ SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
@Override
protected List getSupportedPropertyDescriptors() {
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index 61a3d0690117..122231ae5e63 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -76,7 +76,7 @@ public class FetchS3Object extends AbstractS3Processor {
public static final List properties = Collections.unmodifiableList(
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID,
- SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT));
+ SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
@Override
protected List getSupportedPropertyDescriptors() {
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index d988d168f6bb..0fd5ab7b6475 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -137,7 +137,8 @@ public class ListS3 extends AbstractS3Processor {
public static final List properties = Collections.unmodifiableList(
Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
- SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE));
+ SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD,
+ DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE));
public static final Set relationships = Collections.unmodifiableSet(
new HashSet<>(Collections.singletonList(REL_SUCCESS)));
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 3dc31999e651..f856b8510994 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -209,7 +209,7 @@ public class PutS3Object extends AbstractS3Processor {
Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE,
ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE,
- SERVER_SIDE_ENCRYPTION, PROXY_HOST, PROXY_HOST_PORT));
+ SERVER_SIDE_ENCRYPTION, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
final static String S3_BUCKET_KEY = "s3.bucket";
final static String S3_OBJECT_KEY = "s3.key";
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
index 4a9324384b92..2acc38b2a4ea 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
@@ -77,7 +77,7 @@ public class PutSNS extends AbstractSNSProcessor {
public static final List properties = Collections.unmodifiableList(
Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
- USE_JSON_STRUCTURE, CHARACTER_ENCODING, PROXY_HOST, PROXY_HOST_PORT));
+ USE_JSON_STRUCTURE, CHARACTER_ENCODING, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
public static final int MAX_SIZE = 256 * 1024;
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
index 381046c29a90..2bde4e162ea1 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
@@ -22,6 +22,7 @@
import java.util.Map;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -141,7 +142,7 @@ public void testDeleteVersionFromExpressions() {
public void testGetPropertyDescriptors() throws Exception {
DeleteS3Object processor = new DeleteS3Object();
List pd = processor.getSupportedPropertyDescriptors();
- assertEquals("size should be eq", 20, pd.size());
+ assertEquals("size should be eq", 23, pd.size());
assertTrue(pd.contains(processor.ACCESS_KEY));
assertTrue(pd.contains(processor.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(processor.BUCKET));
@@ -160,5 +161,10 @@ public void testGetPropertyDescriptors() throws Exception {
assertTrue(pd.contains(processor.VERSION_ID));
assertTrue(pd.contains(processor.WRITE_ACL_LIST));
assertTrue(pd.contains(processor.WRITE_USER_LIST));
+ assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE));
+ assertTrue(pd.contains(processor.PROXY_HOST));
+ assertTrue(pd.contains(processor.PROXY_HOST_PORT));
+ assertTrue(pd.contains(processor.PROXY_USERNAME));
+ assertTrue(pd.contains(processor.PROXY_PASSWORD));
}
}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
index 1ebf79bbd02d..bcfff236c420 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
@@ -24,6 +24,7 @@
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -178,7 +179,7 @@ public void testGetObjectExceptionGoesToFailure() throws IOException {
public void testGetPropertyDescriptors() throws Exception {
FetchS3Object processor = new FetchS3Object();
List pd = processor.getSupportedPropertyDescriptors();
- assertEquals("size should be eq", 14, pd.size());
+ assertEquals("size should be eq", 17, pd.size());
assertTrue(pd.contains(FetchS3Object.ACCESS_KEY));
assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(FetchS3Object.BUCKET));
@@ -191,5 +192,11 @@ public void testGetPropertyDescriptors() throws Exception {
assertTrue(pd.contains(FetchS3Object.SSL_CONTEXT_SERVICE));
assertTrue(pd.contains(FetchS3Object.TIMEOUT));
assertTrue(pd.contains(FetchS3Object.VERSION_ID));
+ assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE));
+ assertTrue(pd.contains(FetchS3Object.PROXY_HOST));
+ assertTrue(pd.contains(FetchS3Object.PROXY_HOST_PORT));
+ assertTrue(pd.contains(FetchS3Object.PROXY_USERNAME));
+ assertTrue(pd.contains(FetchS3Object.PROXY_PASSWORD));
+
}
}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index f5ce29122f66..48210b94d3bc 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -26,6 +26,7 @@
import org.apache.commons.lang3.time.DateUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -294,7 +295,7 @@ public void testListIgnoreByMinAge() throws IOException {
public void testGetPropertyDescriptors() throws Exception {
ListS3 processor = new ListS3();
List pd = processor.getSupportedPropertyDescriptors();
- assertEquals("size should be eq", 17, pd.size());
+ assertEquals("size should be eq", 20, pd.size());
assertTrue(pd.contains(ListS3.ACCESS_KEY));
assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(ListS3.BUCKET));
@@ -305,11 +306,15 @@ public void testGetPropertyDescriptors() throws Exception {
assertTrue(pd.contains(ListS3.SIGNER_OVERRIDE));
assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE));
assertTrue(pd.contains(ListS3.TIMEOUT));
- assertTrue(pd.contains(ListS3.PROXY_HOST));
- assertTrue(pd.contains(ListS3.PROXY_HOST_PORT));
assertTrue(pd.contains(ListS3.DELIMITER));
assertTrue(pd.contains(ListS3.PREFIX));
assertTrue(pd.contains(ListS3.USE_VERSIONS));
+ assertTrue(pd.contains(ListS3.LIST_TYPE));
assertTrue(pd.contains(ListS3.MIN_AGE));
+ assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE));
+ assertTrue(pd.contains(ListS3.PROXY_HOST));
+ assertTrue(pd.contains(ListS3.PROXY_HOST_PORT));
+ assertTrue(pd.contains(ListS3.PROXY_USERNAME));
+ assertTrue(pd.contains(ListS3.PROXY_PASSWORD));
}
}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
index 0ee779240d52..747da00907e5 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
@@ -25,6 +25,7 @@
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -150,7 +151,7 @@ public void testSignerOverrideOptions() {
public void testGetPropertyDescriptors() throws Exception {
PutS3Object processor = new PutS3Object();
List pd = processor.getSupportedPropertyDescriptors();
- assertEquals("size should be eq", 28, pd.size());
+ assertEquals("size should be eq", 31, pd.size());
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(PutS3Object.BUCKET));
@@ -172,6 +173,11 @@ public void testGetPropertyDescriptors() throws Exception {
assertTrue(pd.contains(PutS3Object.WRITE_ACL_LIST));
assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST));
assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION));
+ assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE));
+ assertTrue(pd.contains(PutS3Object.PROXY_HOST));
+ assertTrue(pd.contains(PutS3Object.PROXY_HOST_PORT));
+ assertTrue(pd.contains(PutS3Object.PROXY_USERNAME));
+ assertTrue(pd.contains(PutS3Object.PROXY_PASSWORD));
}
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 1bca21ae8b0a..d031bf58109a 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -40,6 +40,10 @@
org.apache.nifi
nifi-record
+
+ org.apache.nifi
+ nifi-proxy-configuration-api
+
com.microsoft.azure
azure-eventhubs
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
index 5b2d8ce4a65c..2156b5654409 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
@@ -59,7 +59,8 @@ public abstract class AbstractAzureBlobProcessor extends AbstractProcessor {
AzureStorageUtils.PROP_SAS_TOKEN,
AzureStorageUtils.ACCOUNT_NAME,
AzureStorageUtils.ACCOUNT_KEY,
- BLOB));
+ BLOB,
+ AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
private static final Set RELATIONSHIPS = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(
@@ -73,7 +74,9 @@ protected List getSupportedPropertyDescriptors() {
@Override
protected Collection customValidate(ValidationContext validationContext) {
- return AzureStorageUtils.validateCredentialProperties(validationContext);
+ final Collection results = AzureStorageUtils.validateCredentialProperties(validationContext);
+ AzureStorageUtils.validateProxySpec(validationContext, results);
+ return results;
}
@Override
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java
index 2819f99ab289..a3f66d80c8f4 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.azure.storage;
+import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
@@ -58,7 +59,10 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile);
CloudBlobContainer container = blobClient.getContainerReference(containerName);
CloudBlob blob = container.getBlockBlobReference(blobPath);
- blob.deleteIfExists();
+
+ final OperationContext operationContext = new OperationContext();
+ AzureStorageUtils.setProxy(operationContext, context);
+ blob.deleteIfExists(null, null, null, operationContext);
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
index b9bbf44bf450..2300cea472c0 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
@@ -24,6 +24,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import com.microsoft.azure.storage.OperationContext;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -69,6 +70,9 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile);
CloudBlobContainer container = blobClient.getContainerReference(containerName);
+ final OperationContext operationContext = new OperationContext();
+ AzureStorageUtils.setProxy(operationContext, context);
+
final Map attributes = new HashMap<>();
final CloudBlob blob = container.getBlockBlobReference(blobPath);
@@ -76,7 +80,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
// distribution of download over threads, investigate
flowFile = session.write(flowFile, os -> {
try {
- blob.download(os);
+ blob.download(os, null, null, operationContext);
} catch (StorageException e) {
storedException.set(e);
throw new IOException(e);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
index fd27958952c0..d9df136937ec 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.azure.storage;
+import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageUri;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
@@ -93,7 +94,8 @@ public class ListAzureBlobStorage extends AbstractListProcessor {
AzureStorageUtils.PROP_SAS_TOKEN,
AzureStorageUtils.ACCOUNT_NAME,
AzureStorageUtils.ACCOUNT_KEY,
- PROP_PREFIX));
+ PROP_PREFIX,
+ AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
@Override
protected List getSupportedPropertyDescriptors() {
@@ -102,7 +104,9 @@ protected List getSupportedPropertyDescriptors() {
@Override
protected Collection customValidate(ValidationContext validationContext) {
- return AzureStorageUtils.validateCredentialProperties(validationContext);
+ final Collection results = AzureStorageUtils.validateCredentialProperties(validationContext);
+ AzureStorageUtils.validateProxySpec(validationContext, results);
+ return results;
}
@Override
@@ -162,7 +166,10 @@ protected List performListing(final ProcessContext context, final Long
CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
CloudBlobContainer container = blobClient.getContainerReference(containerName);
- for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, null)) {
+ final OperationContext operationContext = new OperationContext();
+ AzureStorageUtils.setProxy(operationContext, context);
+
+ for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
if (blob instanceof CloudBlob) {
CloudBlob cloudBlob = (CloudBlob) blob;
BlobProperties properties = cloudBlob.getProperties();
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
index abcb2ee6cbf6..f42e4b353246 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
@@ -26,6 +26,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import com.microsoft.azure.storage.OperationContext;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -77,6 +78,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
CloudBlob blob = container.getBlockBlobReference(blobPath);
+ final OperationContext operationContext = new OperationContext();
+ AzureStorageUtils.setProxy(operationContext, context);
+
final Map attributes = new HashMap<>();
long length = flowFile.getSize();
session.read(flowFile, rawIn -> {
@@ -87,7 +91,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}
try {
- blob.upload(in, length);
+ blob.upload(in, length, null, null, operationContext);
BlobProperties properties = blob.getProperties();
attributes.put("azure.container", containerName);
attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString());
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java
index d6e510e31210..c3a2877b478f 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.azure.storage.queue;
+import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.queue.CloudQueue;
import com.microsoft.azure.storage.queue.CloudQueueClient;
@@ -94,7 +95,7 @@ public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
private static final List properties = Collections.unmodifiableList(Arrays.asList(
AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
- BATCH_SIZE, VISIBILITY_TIMEOUT));
+ BATCH_SIZE, VISIBILITY_TIMEOUT, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
@Override
public List getSupportedPropertyDescriptors() {
@@ -122,7 +123,11 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
try {
cloudQueueClient = createCloudQueueClient(context, null);
cloudQueue = cloudQueueClient.getQueueReference(queue);
- retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
+
+ final OperationContext operationContext = new OperationContext();
+ AzureStorageUtils.setProxy(operationContext, context);
+
+ retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, operationContext);
} catch (URISyntaxException | StorageException e) {
getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
context.yield();
@@ -184,6 +189,8 @@ public Collection customValidate(final ValidationContext valid
.build());
}
+ AzureStorageUtils.validateProxySpec(validationContext, problems);
+
return problems;
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java
index c289a746f11f..4172c8997408 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.azure.storage.queue;
+import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.queue.CloudQueue;
import com.microsoft.azure.storage.queue.CloudQueueClient;
@@ -70,7 +71,7 @@ public class PutAzureQueueStorage extends AbstractAzureQueueStorage {
private static final List properties = Collections.unmodifiableList(Arrays.asList(
AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, TTL,
- QUEUE, VISIBILITY_DELAY));
+ QUEUE, VISIBILITY_DELAY, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
@Override
public List getSupportedPropertyDescriptors() {
@@ -101,7 +102,11 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
try {
cloudQueueClient = createCloudQueueClient(context, flowFile);
cloudQueue = cloudQueueClient.getQueueReference(queue);
- cloudQueue.addMessage(message, ttl, delay, null, null);
+
+ final OperationContext operationContext = new OperationContext();
+ AzureStorageUtils.setProxy(operationContext, context);
+
+ cloudQueue.addMessage(message, ttl, delay, null, operationContext);
} catch (URISyntaxException | StorageException e) {
getLogger().error("Failed to write the message to Azure Queue Storage due to {}", new Object[]{e});
flowFile = session.penalize(flowFile);
@@ -147,6 +152,8 @@ public Collection customValidate(final ValidationContext valid
}
}
+ AzureStorageUtils.validateProxySpec(validationContext, problems);
+
return problems;
}
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
index 0a9696e81262..28212581889d 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.azure.storage.utils;
import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
import com.microsoft.azure.storage.blob.CloudBlobClient;
@@ -29,6 +30,8 @@
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
import java.net.URI;
import java.net.URISyntaxException;
@@ -162,4 +165,17 @@ public static Collection validateCredentialProperties(Validati
return results;
}
+
+ private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, ProxySpec.SOCKS};
+ public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE
+ = ProxyConfiguration.createProxyConfigPropertyDescriptor(false, PROXY_SPECS);
+
+ public static void validateProxySpec(ValidationContext context, Collection results) {
+ ProxyConfiguration.validateProxySpec(context, results, PROXY_SPECS);
+ }
+
+ public static void setProxy(final OperationContext operationContext, final ProcessContext processContext) {
+ final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(processContext);
+ operationContext.setProxy(proxyConfig.createProxy());
+ }
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
index 5ab046d11c80..d4157dda9195 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
@@ -56,6 +56,10 @@ language governing permissions and limitations under the License. -->
org.apache.nifi
nifi-record
+
+ org.apache.nifi
+ nifi-proxy-configuration-api
+
org.apache.commons
commons-lang3
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
index b279346fc500..0c2a124db06a 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
@@ -33,13 +33,14 @@
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StringUtils;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.InputStream;
-import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.URL;
import java.util.ArrayList;
@@ -136,25 +137,26 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String proper
.build();
}
- private static final List propertyDescriptors;
+ private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH, ProxySpec.SOCKS};
+ public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE
+ = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
+ static final List COMMON_PROPERTY_DESCRIPTORS;
static {
final List properties = new ArrayList<>();
properties.add(ES_URL);
+ properties.add(PROP_SSL_CONTEXT_SERVICE);
+ properties.add(USERNAME);
+ properties.add(PASSWORD);
+ properties.add(CONNECT_TIMEOUT);
+ properties.add(RESPONSE_TIMEOUT);
+ properties.add(PROXY_CONFIGURATION_SERVICE);
properties.add(PROXY_HOST);
properties.add(PROXY_PORT);
properties.add(PROXY_USERNAME);
properties.add(PROXY_PASSWORD);
- properties.add(RESPONSE_TIMEOUT);
- propertyDescriptors = Collections.unmodifiableList(properties);
- }
-
- @Override
- public List getSupportedPropertyDescriptors() {
- final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
- properties.addAll(propertyDescriptors);
- return properties;
+ COMMON_PROPERTY_DESCRIPTORS = Collections.unmodifiableList(properties);
}
@Override
@@ -164,28 +166,39 @@ protected void createElasticsearchClient(ProcessContext context) throws ProcessE
OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder();
// Add a proxy if set
- final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
- final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger();
- if (proxyHost != null && proxyPort != null) {
- final Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
+ final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> {
+ final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
+ final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger();
+ if (proxyHost != null && proxyPort != null) {
+ final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
+ componentProxyConfig.setProxyType(Proxy.Type.HTTP);
+ componentProxyConfig.setProxyServerHost(proxyHost);
+ componentProxyConfig.setProxyServerPort(proxyPort);
+ componentProxyConfig.setProxyUserName(context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue());
+ componentProxyConfig.setProxyUserPassword(context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue());
+ return componentProxyConfig;
+ }
+ return ProxyConfiguration.DIRECT_CONFIGURATION;
+ });
+
+ if (!Proxy.Type.DIRECT.equals(proxyConfig.getProxyType())) {
+ final Proxy proxy = proxyConfig.createProxy();
okHttpClient.proxy(proxy);
- }
- final String proxyUsername = context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue();
- final String proxyPassword = context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
-
- if (proxyUsername != null && proxyPassword != null){
- okHttpClient.proxyAuthenticator(new Authenticator() {
- @Override
- public Request authenticate(Route route, Response response) throws IOException {
- final String credential=Credentials.basic(proxyUsername, proxyPassword);
- return response.request().newBuilder()
- .header("Proxy-Authorization", credential)
- .build();
- }
- });
+ if (proxyConfig.hasCredential()){
+ okHttpClient.proxyAuthenticator(new Authenticator() {
+ @Override
+ public Request authenticate(Route route, Response response) throws IOException {
+ final String credential=Credentials.basic(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
+ return response.request().newBuilder()
+ .header("Proxy-Authorization", credential)
+ .build();
+ }
+ });
+ }
}
+
// Set timeouts
okHttpClient.connectTimeout((context.getProperty(CONNECT_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS);
okHttpClient.readTimeout(context.getProperty(RESPONSE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS);
@@ -208,8 +221,12 @@ protected Collection customValidate(ValidationContext validati
results.add(new ValidationResult.Builder()
.valid(false)
.explanation("Proxy Host and Proxy Port must be both set or empty")
+ .subject("Proxy server configuration")
.build());
}
+
+ ProxyConfiguration.validateProxySpec(validationContext, results, PROXY_SPECS);
+
return results;
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
index e069c720b819..e782ba392afd 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
@@ -147,13 +147,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
_rels.add(REL_NOT_FOUND);
relationships = Collections.unmodifiableSet(_rels);
- final List descriptors = new ArrayList<>();
- descriptors.add(ES_URL);
- descriptors.add(PROP_SSL_CONTEXT_SERVICE);
- descriptors.add(USERNAME);
- descriptors.add(PASSWORD);
- descriptors.add(CONNECT_TIMEOUT);
- descriptors.add(RESPONSE_TIMEOUT);
+ final List descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(DOC_ID);
descriptors.add(INDEX);
descriptors.add(TYPE);
@@ -169,9 +163,7 @@ public Set getRelationships() {
@Override
public final List getSupportedPropertyDescriptors() {
- final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
- properties.addAll(propertyDescriptors);
- return properties;
+ return propertyDescriptors;
}
@OnScheduled
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
index cd1092a0c466..ed9a51016ece 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -150,13 +150,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
_rels.add(REL_RETRY);
relationships = Collections.unmodifiableSet(_rels);
- final List descriptors = new ArrayList<>();
- descriptors.add(ES_URL);
- descriptors.add(PROP_SSL_CONTEXT_SERVICE);
- descriptors.add(USERNAME);
- descriptors.add(PASSWORD);
- descriptors.add(CONNECT_TIMEOUT);
- descriptors.add(RESPONSE_TIMEOUT);
+ final List descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(ID_ATTRIBUTE);
descriptors.add(INDEX);
descriptors.add(TYPE);
@@ -174,9 +168,7 @@ public Set getRelationships() {
@Override
public final List getSupportedPropertyDescriptors() {
- final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
- properties.addAll(propertyDescriptors);
- return properties;
+ return propertyDescriptors;
}
@Override
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index 9ea22bc9bd3b..3e796fb08df2 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -189,13 +189,7 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
_rels.add(REL_RETRY);
relationships = Collections.unmodifiableSet(_rels);
- final List descriptors = new ArrayList<>();
- descriptors.add(ES_URL);
- descriptors.add(PROP_SSL_CONTEXT_SERVICE);
- descriptors.add(USERNAME);
- descriptors.add(PASSWORD);
- descriptors.add(CONNECT_TIMEOUT);
- descriptors.add(RESPONSE_TIMEOUT);
+ final List descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(RECORD_READER);
descriptors.add(ID_RECORD_PATH);
descriptors.add(INDEX);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
index b1f860f5e3f2..33eac3bc2c6d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
@@ -230,13 +230,7 @@ public enum QueryInfoRouteStrategy {
private QueryInfoRouteStrategy queryInfoRouteStrategy = QueryInfoRouteStrategy.NEVER;
static {
- final List descriptors = new ArrayList<>();
- descriptors.add(ES_URL);
- descriptors.add(PROP_SSL_CONTEXT_SERVICE);
- descriptors.add(USERNAME);
- descriptors.add(PASSWORD);
- descriptors.add(CONNECT_TIMEOUT);
- descriptors.add(RESPONSE_TIMEOUT);
+ final List descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(QUERY);
descriptors.add(PAGE_SIZE);
descriptors.add(INDEX);
@@ -257,9 +251,7 @@ public Set getRelationships() {
@Override
public final List getSupportedPropertyDescriptors() {
- final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
- properties.addAll(propertyDescriptors);
- return properties;
+ return propertyDescriptors;
}
@OnScheduled
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
index f08e33cbea47..e90af7919dc7 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
@@ -182,13 +182,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
_rels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(_rels);
- final List descriptors = new ArrayList<>();
- descriptors.add(ES_URL);
- descriptors.add(PROP_SSL_CONTEXT_SERVICE);
- descriptors.add(USERNAME);
- descriptors.add(PASSWORD);
- descriptors.add(CONNECT_TIMEOUT);
- descriptors.add(RESPONSE_TIMEOUT);
+ final List descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(QUERY);
descriptors.add(SCROLL_DURATION);
descriptors.add(PAGE_SIZE);
@@ -207,9 +201,7 @@ public Set getRelationships() {
@Override
public final List getSupportedPropertyDescriptors() {
- final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
- properties.addAll(propertyDescriptors);
- return properties;
+ return propertyDescriptors;
}
@OnScheduled
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index d3fa7f8743fe..1e7ffb49fcb8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -64,6 +64,10 @@
org.apache.nifi
nifi-record-serialization-service-api
+
+ org.apache.nifi
+ nifi-proxy-configuration-api
+
org.apache.nifi
nifi-record
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
index 488627445b52..4b4d207f1234 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
@@ -18,6 +18,7 @@
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -27,6 +28,8 @@
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.FTPTransfer;
@@ -63,6 +66,7 @@ protected List getSupportedPropertyDescriptors() {
properties.add(FTPTransfer.USE_COMPRESSION);
properties.add(FTPTransfer.CONNECTION_MODE);
properties.add(FTPTransfer.TRANSFER_MODE);
+ properties.add(FTPTransfer.PROXY_CONFIGURATION_SERVICE);
properties.add(FTPTransfer.PROXY_TYPE);
properties.add(FTPTransfer.PROXY_HOST);
properties.add(FTPTransfer.PROXY_PORT);
@@ -76,4 +80,11 @@ protected List getSupportedPropertyDescriptors() {
protected FileTransfer createFileTransfer(final ProcessContext context) {
return new FTPTransfer(context, getLogger());
}
+
+ @Override
+ protected Collection customValidate(ValidationContext validationContext) {
+ final List results = new ArrayList<>();
+ FTPTransfer.validateProxySpec(validationContext, results);
+ return results;
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
index 68465579f9d4..19cba94caa6a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
@@ -18,6 +18,7 @@
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -28,7 +29,10 @@
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
@@ -76,6 +80,12 @@ protected List getSupportedPropertyDescriptors() {
properties.add(SFTPTransfer.HOST_KEY_FILE);
properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
properties.add(SFTPTransfer.USE_COMPRESSION);
+ properties.add(SFTPTransfer.PROXY_CONFIGURATION_SERVICE);
+ properties.add(FTPTransfer.PROXY_TYPE);
+ properties.add(FTPTransfer.PROXY_HOST);
+ properties.add(FTPTransfer.PROXY_PORT);
+ properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
+ properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
return properties;
}
@@ -83,4 +93,11 @@ protected List getSupportedPropertyDescriptors() {
protected FileTransfer createFileTransfer(final ProcessContext context) {
return new SFTPTransfer(context, getLogger());
}
+
+ @Override
+ protected Collection customValidate(ValidationContext validationContext) {
+ final Collection results = new ArrayList<>();
+ SFTPTransfer.validateProxySpec(validationContext, results);
+ return results;
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java
index 1dd8dbf85d3e..79d7914a13ee 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -28,6 +29,8 @@
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processors.standard.util.FTPTransfer;
@@ -76,6 +79,7 @@ protected void init(final ProcessorInitializationContext context) {
properties.add(FTPTransfer.MAX_SELECTS);
properties.add(FTPTransfer.REMOTE_POLL_BATCH_SIZE);
properties.add(FTPTransfer.USE_NATURAL_ORDERING);
+ properties.add(FTPTransfer.PROXY_CONFIGURATION_SERVICE);
properties.add(FTPTransfer.PROXY_TYPE);
properties.add(FTPTransfer.PROXY_HOST);
properties.add(FTPTransfer.PROXY_PORT);
@@ -95,4 +99,11 @@ protected List getSupportedPropertyDescriptors() {
protected FileTransfer getFileTransfer(final ProcessContext context) {
return new FTPTransfer(context, getLogger());
}
+
+ @Override
+ protected Collection customValidate(ValidationContext validationContext) {
+ final List results = new ArrayList<>();
+ FTPTransfer.validateProxySpec(validationContext, results);
+ return results;
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
index ec5fc2c19908..315bb2b011d7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
@@ -42,7 +42,6 @@
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
-import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
@@ -93,6 +92,7 @@
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.util.HTTPUtils;
import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
@@ -100,6 +100,9 @@
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;
+import static org.apache.nifi.processors.standard.util.HTTPUtils.PROXY_HOST;
+import static org.apache.nifi.processors.standard.util.HTTPUtils.PROXY_PORT;
+
@Tags({"get", "fetch", "poll", "http", "https", "ingest", "source", "input"})
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Fetches data from an HTTP or HTTPS URL and writes the data to the content of a FlowFile. Once the content has been fetched, the ETag and Last Modified "
@@ -195,18 +198,6 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
- public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
- .name("Proxy Host")
- .description("The fully qualified hostname or IP address of the proxy server")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
- .name("Proxy Port")
- .description("The port of the proxy server")
- .required(false)
- .addValidator(StandardValidators.PORT_VALIDATOR)
- .build();
public static final String DEFAULT_COOKIE_POLICY_STR = "default";
public static final String STANDARD_COOKIE_POLICY_STR = "standard";
@@ -268,6 +259,7 @@ protected void init(final ProcessorInitializationContext context) {
properties.add(ACCEPT_CONTENT_TYPE);
properties.add(FOLLOW_REDIRECTS);
properties.add(REDIRECT_COOKIE_POLICY);
+ properties.add(HTTPUtils.PROXY_CONFIGURATION_SERVICE);
properties.add(PROXY_HOST);
properties.add(PROXY_PORT);
this.properties = Collections.unmodifiableList(properties);
@@ -315,13 +307,7 @@ protected Collection customValidate(final ValidationContext co
.build());
}
- if (context.getProperty(PROXY_HOST).isSet() && !context.getProperty(PROXY_PORT).isSet()) {
- results.add(new ValidationResult.Builder()
- .explanation("Proxy Host was set but no Proxy Port was specified")
- .valid(false)
- .subject("Proxy server configuration")
- .build());
- }
+ HTTPUtils.validateProxyProperties(context, results);
return results;
}
@@ -456,22 +442,18 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
final String password = context.getProperty(PASSWORD).getValue();
// set the credentials if appropriate
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
if (username != null) {
- final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
if (password == null) {
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username));
} else {
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
}
- clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
// Set the proxy if specified
- if (context.getProperty(PROXY_HOST).isSet() && context.getProperty(PROXY_PORT).isSet()) {
- final String host = context.getProperty(PROXY_HOST).getValue();
- final int port = context.getProperty(PROXY_PORT).asInteger();
- clientBuilder.setProxy(new HttpHost(host, port));
- }
+ HTTPUtils.setProxy(context, clientBuilder, credentialsProvider);
// create request
final HttpGet get = new HttpGet(url);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java
index e155019eb776..6b891c7442a2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java
@@ -33,6 +33,7 @@
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
@@ -81,6 +82,12 @@ protected void init(final ProcessorInitializationContext context) {
properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
properties.add(SFTPTransfer.USE_COMPRESSION);
properties.add(SFTPTransfer.USE_NATURAL_ORDERING);
+ properties.add(SFTPTransfer.PROXY_CONFIGURATION_SERVICE);
+ properties.add(FTPTransfer.PROXY_TYPE);
+ properties.add(FTPTransfer.PROXY_HOST);
+ properties.add(FTPTransfer.PROXY_PORT);
+ properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
+ properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
this.properties = Collections.unmodifiableList(properties);
}
@@ -102,6 +109,8 @@ protected Collection customValidate(final ValidationContext co
.build());
}
+ SFTPTransfer.validateProxySpec(context, results);
+
return results;
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
index 7ec6a877d4bc..a45e93c7bde3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
@@ -59,6 +59,8 @@
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.ProxyAuthenticator;
import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
import org.apache.nifi.stream.io.StreamUtils;
@@ -79,7 +81,6 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.Proxy.Type;
import java.net.URL;
@@ -412,6 +413,10 @@ public final class InvokeHTTP extends AbstractProcessor {
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
+ private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH, ProxySpec.SOCKS};
+ public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE
+ = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
+
public static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList(
PROP_METHOD,
PROP_URL,
@@ -423,6 +428,7 @@ public final class InvokeHTTP extends AbstractProcessor {
PROP_ATTRIBUTES_TO_SEND,
PROP_BASIC_AUTH_USERNAME,
PROP_BASIC_AUTH_PASSWORD,
+ PROXY_CONFIGURATION_SERVICE,
PROP_PROXY_HOST,
PROP_PROXY_PORT,
PROP_PROXY_TYPE,
@@ -565,6 +571,8 @@ protected Collection customValidate(final ValidationContext va
results.add(new ValidationResult.Builder().subject("SSL Context Service").valid(false).explanation("If Proxy Type is HTTPS, SSL Context Service must be set").build());
}
+ ProxyConfiguration.validateProxySpec(validationContext, results, PROXY_SPECS);
+
return results;
}
@@ -575,14 +583,30 @@ public void setUpClient(final ProcessContext context) throws IOException, Unreco
OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient().newBuilder();
// Add a proxy if set
- final String proxyHost = context.getProperty(PROP_PROXY_HOST).evaluateAttributeExpressions().getValue();
- final Integer proxyPort = context.getProperty(PROP_PROXY_PORT).evaluateAttributeExpressions().asInteger();
- final String proxyType = context.getProperty(PROP_PROXY_TYPE).evaluateAttributeExpressions().getValue();
- boolean isHttpsProxy = false;
- if (proxyHost != null && proxyPort != null) {
- final Proxy proxy = new Proxy(Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
+ boolean isHttpsProxy = HTTPS.equals(context.getProperty(PROP_PROXY_TYPE).evaluateAttributeExpressions().getValue());
+ final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> {
+ final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
+ final String proxyHost = context.getProperty(PROP_PROXY_HOST).evaluateAttributeExpressions().getValue();
+ final Integer proxyPort = context.getProperty(PROP_PROXY_PORT).evaluateAttributeExpressions().asInteger();
+ if (proxyHost != null && proxyPort != null) {
+ componentProxyConfig.setProxyType(Type.HTTP);
+ componentProxyConfig.setProxyServerHost(proxyHost);
+ componentProxyConfig.setProxyServerPort(proxyPort);
+ final String proxyUsername = trimToEmpty(context.getProperty(PROP_PROXY_USER).evaluateAttributeExpressions().getValue());
+ final String proxyPassword = context.getProperty(PROP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
+ componentProxyConfig.setProxyUserName(proxyUsername);
+ componentProxyConfig.setProxyUserPassword(proxyPassword);
+ }
+ return componentProxyConfig;
+ });
+
+ final Proxy proxy = proxyConfig.createProxy();
+ if (!Type.DIRECT.equals(proxy.type())) {
okHttpClientBuilder.proxy(proxy);
- isHttpsProxy = HTTPS.equals(proxyType);
+ if (proxyConfig.hasCredential()) {
+ ProxyAuthenticator proxyAuthenticator = new ProxyAuthenticator(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
+ okHttpClientBuilder.proxyAuthenticator(proxyAuthenticator);
+ }
}
// configure ETag cache if enabled
@@ -691,7 +715,6 @@ private void setSslSocketFactory(OkHttpClient.Builder okHttpClientBuilder, SSLCo
private void setAuthenticator(OkHttpClient.Builder okHttpClientBuilder, ProcessContext context) {
final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue());
- final String proxyUsername = trimToEmpty(context.getProperty(PROP_PROXY_USER).evaluateAttributeExpressions().getValue());
// If the username/password properties are set then check if digest auth is being used
if (!authUser.isEmpty() && "true".equalsIgnoreCase(context.getProperty(PROP_DIGEST_AUTH).getValue())) {
@@ -706,23 +729,8 @@ private void setAuthenticator(OkHttpClient.Builder okHttpClientBuilder, ProcessC
com.burgstaller.okhttp.digest.Credentials credentials = new com.burgstaller.okhttp.digest.Credentials(authUser, authPass);
final DigestAuthenticator digestAuthenticator = new DigestAuthenticator(credentials);
- if(!proxyUsername.isEmpty()) {
- final String proxyPassword = context.getProperty(PROP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
- ProxyAuthenticator proxyAuthenticator = new ProxyAuthenticator(proxyUsername, proxyPassword);
-
- okHttpClientBuilder.proxyAuthenticator(proxyAuthenticator);
- }
-
okHttpClientBuilder.interceptors().add(new AuthenticationCacheInterceptor(authCache));
okHttpClientBuilder.authenticator(new CachingAuthenticatorDecorator(digestAuthenticator, authCache));
- } else {
- // Add proxy authentication only
- if(!proxyUsername.isEmpty()) {
- final String proxyPassword = context.getProperty(PROP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
- ProxyAuthenticator proxyAuthenticator = new ProxyAuthenticator(proxyUsername, proxyPassword);
-
- okHttpClientBuilder.proxyAuthenticator(proxyAuthenticator);
- }
}
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
index 8deadb208275..79a4177dc684 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
@@ -18,6 +18,7 @@
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
@@ -29,6 +30,8 @@
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.FileTransfer;
@@ -79,6 +82,7 @@ protected List getSupportedPropertyDescriptors() {
properties.add(FTPTransfer.DATA_TIMEOUT);
properties.add(FTPTransfer.CONNECTION_MODE);
properties.add(FTPTransfer.TRANSFER_MODE);
+ properties.add(FTPTransfer.PROXY_CONFIGURATION_SERVICE);
properties.add(FTPTransfer.PROXY_TYPE);
properties.add(FTPTransfer.PROXY_HOST);
properties.add(FTPTransfer.PROXY_PORT);
@@ -105,4 +109,11 @@ protected Scope getStateScope(final ProcessContext context) {
// pick up where it left off, even if the Primary Node changes.
return Scope.CLUSTER;
}
+
+ @Override
+ protected Collection customValidate(ValidationContext validationContext) {
+ final List results = new ArrayList<>();
+ FTPTransfer.validateProxySpec(validationContext, results);
+ return results;
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
index b7805e9e88c5..ac1a42da34f5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -18,6 +18,7 @@
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -30,8 +31,11 @@
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
@@ -83,6 +87,12 @@ protected List getSupportedPropertyDescriptors() {
properties.add(SFTPTransfer.DATA_TIMEOUT);
properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
+ properties.add(SFTPTransfer.PROXY_CONFIGURATION_SERVICE);
+ properties.add(FTPTransfer.PROXY_TYPE);
+ properties.add(FTPTransfer.PROXY_HOST);
+ properties.add(FTPTransfer.PROXY_PORT);
+ properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
+ properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
return properties;
}
@@ -102,4 +112,11 @@ protected Scope getStateScope(final ProcessContext context) {
// pick up where it left off, even if the Primary Node changes.
return Scope.CLUSTER;
}
+
+ @Override
+ protected Collection customValidate(ValidationContext validationContext) {
+ final Collection results = new ArrayList<>();
+ SFTPTransfer.validateProxySpec(validationContext, results);
+ return results;
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
index 8bb24bffd615..ac30830df76c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java
@@ -19,7 +19,6 @@
import org.apache.commons.io.IOUtils;
import org.apache.http.Header;
import org.apache.http.HttpException;
-import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.auth.AuthScope;
@@ -75,6 +74,7 @@
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.HTTPUtils;
import org.apache.nifi.security.util.CertificateUtils;
import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.ssl.SSLContextService;
@@ -127,6 +127,9 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
+import static org.apache.nifi.processors.standard.util.HTTPUtils.PROXY_HOST;
+import static org.apache.nifi.processors.standard.util.HTTPUtils.PROXY_PORT;
+
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"http", "https", "remote", "copy", "archive"})
@@ -243,18 +246,6 @@ public class PostHTTP extends AbstractProcessor {
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
- public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
- .name("Proxy Host")
- .description("The fully qualified hostname or IP address of the proxy server")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
- .name("Proxy Port")
- .description("The port of the proxy server")
- .required(false)
- .addValidator(StandardValidators.PORT_VALIDATOR)
- .build();
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
.name("Content-Type")
.description("The Content-Type to specify for the content of the FlowFile being POSTed if " + SEND_AS_FLOWFILE.getName() + " is false. "
@@ -302,6 +293,7 @@ protected void init(final ProcessorInitializationContext context) {
properties.add(DATA_TIMEOUT);
properties.add(ATTRIBUTES_AS_HEADERS_REGEX);
properties.add(USER_AGENT);
+ properties.add(HTTPUtils.PROXY_CONFIGURATION_SERVICE);
properties.add(PROXY_HOST);
properties.add(PROXY_PORT);
properties.add(CONTENT_TYPE);
@@ -328,14 +320,6 @@ protected Collection customValidate(final ValidationContext co
.valid(false).subject("SSL Context").build());
}
- if (context.getProperty(PROXY_HOST).isSet() && !context.getProperty(PROXY_PORT).isSet()) {
- results.add(new ValidationResult.Builder()
- .explanation("Proxy Host was set but no Proxy Port was specified")
- .valid(false)
- .subject("Proxy server configuration")
- .build());
- }
-
boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean();
int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
boolean chunkedSet = context.getProperty(CHUNKED_ENCODING).isSet();
@@ -345,6 +329,8 @@ protected Collection customValidate(final ValidationContext co
.explanation("if compression level is 0 and not sending as a FlowFile, then the \'" + CHUNKED_ENCODING.getName() + "\' property must be set").build());
}
+ HTTPUtils.validateProxyProperties(context, results);
+
return results;
}
@@ -535,22 +521,18 @@ public void process(final HttpResponse response, final HttpContext httpContext)
final String username = context.getProperty(USERNAME).getValue();
final String password = context.getProperty(PASSWORD).getValue();
// set the credentials if appropriate
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
if (username != null) {
- final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
if (password == null) {
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username));
} else {
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
}
- clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
// Set the proxy if specified
- if (context.getProperty(PROXY_HOST).isSet() && context.getProperty(PROXY_PORT).isSet()) {
- final String host = context.getProperty(PROXY_HOST).getValue();
- final int port = context.getProperty(PROXY_PORT).asInteger();
- clientBuilder.setProxy(new HttpHost(host, port));
- }
+ HTTPUtils.setProxy(context, clientBuilder, credentialsProvider);
client = clientBuilder.build();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFTP.java
index d4b11fce5468..50ae599b6f14 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFTP.java
@@ -18,6 +18,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -36,6 +37,8 @@
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
@@ -89,6 +92,7 @@ protected void init(final ProcessorInitializationContext context) {
properties.add(FTPTransfer.LAST_MODIFIED_TIME);
properties.add(FTPTransfer.PERMISSIONS);
properties.add(FTPTransfer.USE_COMPRESSION);
+ properties.add(FTPTransfer.PROXY_CONFIGURATION_SERVICE);
properties.add(FTPTransfer.PROXY_TYPE);
properties.add(FTPTransfer.PROXY_HOST);
properties.add(FTPTransfer.PROXY_PORT);
@@ -163,4 +167,11 @@ private List getCommands(final List descriptors, fin
return cmds;
}
+
+ @Override
+ protected Collection customValidate(ValidationContext validationContext) {
+ final List results = new ArrayList<>();
+ FTPTransfer.validateProxySpec(validationContext, results);
+ return results;
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java
index fbaeba455ba9..35bb174a70e9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -28,8 +29,11 @@
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
@SupportsBatching
@@ -70,6 +74,12 @@ protected void init(final ProcessorInitializationContext context) {
properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
properties.add(SFTPTransfer.USE_COMPRESSION);
+ properties.add(SFTPTransfer.PROXY_CONFIGURATION_SERVICE);
+ properties.add(FTPTransfer.PROXY_TYPE);
+ properties.add(FTPTransfer.PROXY_HOST);
+ properties.add(FTPTransfer.PROXY_PORT);
+ properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
+ properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
this.properties = Collections.unmodifiableList(properties);
}
@@ -83,4 +93,10 @@ protected SFTPTransfer getFileTransfer(final ProcessContext context) {
return new SFTPTransfer(context, getLogger());
}
+ @Override
+ protected Collection customValidate(ValidationContext validationContext) {
+ final Collection results = new ArrayList<>();
+ SFTPTransfer.validateProxySpec(validationContext, results);
+ return results;
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
index 91a5ac296755..e2ddf6b902f9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
@@ -28,10 +28,12 @@
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.commons.net.ftp.FTPClient;
@@ -39,6 +41,9 @@
import org.apache.commons.net.ftp.FTPHTTPClient;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@@ -46,6 +51,8 @@
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
public class FTPTransfer implements FileTransfer {
@@ -88,22 +95,26 @@ public class FTPTransfer implements FileTransfer {
.name("Proxy Host")
.description("The fully qualified hostname or IP address of the proxy server")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
.name("Proxy Port")
.description("The port of the proxy server")
.addValidator(StandardValidators.PORT_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor.Builder()
.name("Http Proxy Username")
.description("Http Proxy Username")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
.build();
public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor.Builder()
.name("Http Proxy Password")
.description("Http Proxy Password")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
.sensitive(true)
.build();
@@ -123,6 +134,10 @@ public class FTPTransfer implements FileTransfer {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
+ private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH, ProxySpec.SOCKS};
+ public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE
+ = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
+
private final ComponentLog logger;
private final ProcessContext ctx;
@@ -136,6 +151,10 @@ public FTPTransfer(final ProcessContext context, final ComponentLog logger) {
this.logger = logger;
}
+ public static void validateProxySpec(ValidationContext context, Collection results) {
+ ProxyConfiguration.validateProxySpec(context, results, PROXY_SPECS);
+ }
+
@Override
public String getProtocolName() {
return "ftp";
@@ -522,12 +541,15 @@ private FTPClient getClient(final FlowFile flowFile) throws IOException {
}
}
- final Proxy.Type proxyType = Proxy.Type.valueOf(ctx.getProperty(PROXY_TYPE).getValue());
- final String proxyHost = ctx.getProperty(PROXY_HOST).getValue();
- final Integer proxyPort = ctx.getProperty(PROXY_PORT).asInteger();
+ final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(ctx, createComponentProxyConfigSupplier(ctx));
+
+ final Proxy.Type proxyType = proxyConfig.getProxyType();
+ final String proxyHost = proxyConfig.getProxyServerHost();
+ final Integer proxyPort = proxyConfig.getProxyServerPort();
+
FTPClient client;
if (proxyType == Proxy.Type.HTTP) {
- client = new FTPHTTPClient(proxyHost, proxyPort, ctx.getProperty(HTTP_PROXY_USERNAME).getValue(), ctx.getProperty(HTTP_PROXY_PASSWORD).getValue());
+ client = new FTPHTTPClient(proxyHost, proxyPort, proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
} else {
client = new FTPClient();
if (proxyType == Proxy.Type.SOCKS) {
@@ -627,4 +649,17 @@ protected int numberPermissions(String perms) {
}
return number;
}
+
+ public static Supplier createComponentProxyConfigSupplier(final PropertyContext ctx) {
+ return () -> {
+ final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
+ componentProxyConfig.setProxyType(Proxy.Type.valueOf(ctx.getProperty(PROXY_TYPE).getValue()));
+ componentProxyConfig.setProxyServerHost(ctx.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue());
+ componentProxyConfig.setProxyServerPort(ctx.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger());
+ componentProxyConfig.setProxyUserName(ctx.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue());
+ componentProxyConfig.setProxyUserPassword(ctx.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue());
+ return componentProxyConfig;
+ };
+ }
+
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPUtils.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPUtils.java
index d9aeb675575b..dcbad7d40a38 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPUtils.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPUtils.java
@@ -297,4 +297,5 @@ public void setTransferMode(final String val) {
}
}
+
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/HTTPUtils.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/HTTPUtils.java
index 937554d70e6e..6916fa78dc84 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/HTTPUtils.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/HTTPUtils.java
@@ -16,6 +16,21 @@
*/
package org.apache.nifi.processors.standard.util;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
+
+import java.net.Proxy;
+import java.util.Collection;
import java.util.Map;
public class HTTPUtils {
@@ -39,4 +54,63 @@ public static String getURI(Map map) {
}
}
+ public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
+ .name("Proxy Host")
+ .description("The fully qualified hostname or IP address of the proxy server")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
+ .name("Proxy Port")
+ .description("The port of the proxy server")
+ .required(false)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .build();
+
+ private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH};
+ public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE
+ = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
+
+
+ public static void setProxy(final ProcessContext context, final HttpClientBuilder clientBuilder, final CredentialsProvider credentialsProvider) {
+ // Set the proxy if specified
+ final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> {
+ if (context.getProperty(PROXY_HOST).isSet() && context.getProperty(PROXY_PORT).isSet()) {
+ final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
+ final String host = context.getProperty(PROXY_HOST).getValue();
+ final int port = context.getProperty(PROXY_PORT).asInteger();
+ componentProxyConfig.setProxyType(Proxy.Type.HTTP);
+ componentProxyConfig.setProxyServerHost(host);
+ componentProxyConfig.setProxyServerPort(port);
+ return componentProxyConfig;
+ }
+ return ProxyConfiguration.DIRECT_CONFIGURATION;
+ });
+
+ if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
+ final String host = proxyConfig.getProxyServerHost();
+ final int port = proxyConfig.getProxyServerPort();
+ clientBuilder.setProxy(new HttpHost(host, port));
+
+ if (proxyConfig.hasCredential()) {
+ final AuthScope proxyAuthScope = new AuthScope(host, port);
+ final UsernamePasswordCredentials proxyCredential
+ = new UsernamePasswordCredentials(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
+ credentialsProvider.setCredentials(proxyAuthScope, proxyCredential);
+ }
+ }
+ }
+
+ public static void validateProxyProperties(ValidationContext context, Collection results) {
+ if (context.getProperty(PROXY_HOST).isSet() && !context.getProperty(PROXY_PORT).isSet()) {
+ results.add(new ValidationResult.Builder()
+ .explanation("Proxy Host was set but no Proxy Port was specified")
+ .valid(false)
+ .subject("Proxy server configuration")
+ .build());
+ }
+
+ ProxyConfiguration.validateProxySpec(context, results, PROXY_SPECS);
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index c11a53b9cf6e..4fc94ec9985f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -25,6 +25,7 @@
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Locale;
@@ -33,13 +34,18 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import com.jcraft.jsch.ProxySOCKS5;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import org.apache.nifi.proxy.ProxySpec;
import org.slf4j.LoggerFactory;
import com.jcraft.jsch.ChannelSftp;
@@ -47,9 +53,12 @@
import com.jcraft.jsch.ChannelSftp.LsEntrySelector;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.ProxyHTTP;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpException;
+import static org.apache.nifi.processors.standard.util.FTPTransfer.createComponentProxyConfigSupplier;
+
public class SFTPTransfer implements FileTransfer {
public static final PropertyDescriptor PRIVATE_KEY_PATH = new PropertyDescriptor.Builder()
@@ -96,6 +105,7 @@ public class SFTPTransfer implements FileTransfer {
.required(true)
.build();
+
/**
* Property which is used to decide if the {@link #ensureDirectoryExists(FlowFile, File)} method should perform a {@link ChannelSftp#ls(String)} before calling
* {@link ChannelSftp#mkdir(String)}. In most cases, the code should call ls before mkdir, but some weird permission setups (chmod 100) on a directory would cause the 'ls' to throw a permission
@@ -116,6 +126,10 @@ public class SFTPTransfer implements FileTransfer {
.defaultValue("false")
.build();
+ private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH, ProxySpec.SOCKS_AUTH};
+ public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE
+ = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
+
private final ComponentLog logger;
private final ProcessContext ctx;
@@ -134,6 +148,10 @@ public SFTPTransfer(final ProcessContext processContext, final ComponentLog logg
disableDirectoryListing = disableListing == null ? false : Boolean.TRUE.equals(disableListing.asBoolean());
}
+ public static void validateProxySpec(ValidationContext context, Collection results) {
+ ProxyConfiguration.validateProxySpec(context, results, PROXY_SPECS);
+ }
+
@Override
public String getProtocolName() {
return "sftp";
@@ -418,6 +436,26 @@ protected ChannelSftp getChannel(final FlowFile flowFile) throws IOException {
ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(),
ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger().intValue());
+ final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(ctx, createComponentProxyConfigSupplier(ctx));
+ switch (proxyConfig.getProxyType()) {
+ case HTTP:
+ final ProxyHTTP proxyHTTP = new ProxyHTTP(proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort());
+ // Check if Username is set and populate the proxy accordingly
+ if (proxyConfig.hasCredential()) {
+ proxyHTTP.setUserPasswd(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
+ }
+ session.setProxy(proxyHTTP);
+ break;
+ case SOCKS:
+ final ProxySOCKS5 proxySOCKS5 = new ProxySOCKS5(proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort());
+ if (proxyConfig.hasCredential()) {
+ proxySOCKS5.setUserPasswd(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
+ }
+ session.setProxy(proxySOCKS5);
+ break;
+
+ }
+
final String hostKeyVal = ctx.getProperty(HOST_KEY_FILE).getValue();
if (hostKeyVal != null) {
jsch.setKnownHosts(hostKeyVal);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/pom.xml
new file mode 100644
index 000000000000..2a8ac038656c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/pom.xml
@@ -0,0 +1,43 @@
+
+
+
+
+ nifi-standard-services
+ org.apache.nifi
+ 1.7.0-SNAPSHOT
+
+ 4.0.0
+
+ nifi-proxy-configuration-api
+ jar
+
+
+
+ org.apache.nifi
+ nifi-api
+
+
+
+ org.apache.nifi
+ nifi-mock
+ 1.7.0-SNAPSHOT
+ test
+
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/ProxyConfiguration.java b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/ProxyConfiguration.java
new file mode 100644
index 000000000000..e6d498c20c55
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/ProxyConfiguration.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.proxy;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
+import static org.apache.nifi.proxy.ProxySpec.HTTP;
+import static org.apache.nifi.proxy.ProxySpec.HTTP_AUTH;
+import static org.apache.nifi.proxy.ProxySpec.SOCKS;
+import static org.apache.nifi.proxy.ProxySpec.SOCKS_AUTH;
+
+public class ProxyConfiguration {
+
+ public static final ProxyConfiguration DIRECT_CONFIGURATION = new ProxyConfiguration();
+
+ public static PropertyDescriptor createProxyConfigPropertyDescriptor(final boolean hasComponentProxyConfigs, final ProxySpec ... _specs) {
+
+ final Set specs = getUniqueProxySpecs(_specs);
+
+ final StringBuilder description = new StringBuilder("Specifies the Proxy Configuration Controller Service to proxy network requests.");
+ if (hasComponentProxyConfigs) {
+ description.append(" If set, it supersedes proxy settings configured per component.");
+ }
+ description.append(" Supported proxies: ");
+ description.append(specs.stream().map(ProxySpec::getDisplayName).collect(Collectors.joining(", ")));
+
+ return new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE)
+ .description(description.toString())
+ .build();
+ }
+
+ /**
+ * Remove redundancy. If X_AUTH is supported, then X should be supported, too.
+ * @param _specs original specs
+ * @return sorted unique specs
+ */
+ private static Set getUniqueProxySpecs(ProxySpec ... _specs) {
+ final Set specs = Arrays.stream(_specs).sorted().collect(Collectors.toSet());
+ if (specs.contains(HTTP_AUTH)) {
+ specs.remove(HTTP);
+ }
+ if (specs.contains(SOCKS_AUTH)) {
+ specs.remove(SOCKS);
+ }
+ return specs;
+ }
+
+ /**
+ * This method can be used from customValidate method of components using this Controller Service
+ * to validate the service is configured with the supported proxy types.
+ * @param context the validation context
+ * @param results if validation fails, an invalid validation result will be added to this collection
+ * @param _specs specify supported proxy specs
+ */
+ public static void validateProxySpec(ValidationContext context, Collection results, final ProxySpec ... _specs) {
+
+ final Set specs = getUniqueProxySpecs(_specs);
+ final Set supportedProxyTypes = specs.stream().map(ProxySpec::getProxyType).collect(Collectors.toSet());
+
+ if (!context.getProperty(PROXY_CONFIGURATION_SERVICE).isSet()) {
+ return;
+ }
+
+ final ProxyConfigurationService proxyService = context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
+ final ProxyConfiguration proxyConfiguration = proxyService.getConfiguration();
+ final Proxy.Type proxyType = proxyConfiguration.getProxyType();
+
+ if (proxyType.equals(Proxy.Type.DIRECT)) {
+ return;
+ }
+
+ if (!supportedProxyTypes.contains(proxyType)) {
+ results.add(new ValidationResult.Builder()
+ .explanation(String.format("Proxy type %s is not supported.", proxyType))
+ .valid(false)
+ .subject(PROXY_CONFIGURATION_SERVICE.getDisplayName())
+ .build());
+
+ // If the proxy type is not supported, no need to do further validation.
+ return;
+ }
+
+ if (proxyConfiguration.hasCredential()) {
+ // If credential is set, check whether the component is capable to use it.
+ if (!specs.contains(Proxy.Type.HTTP.equals(proxyType) ? HTTP_AUTH : SOCKS_AUTH)) {
+ results.add(new ValidationResult.Builder()
+ .explanation(String.format("Proxy type %s with Authentication is not supported.", proxyType))
+ .valid(false)
+ .subject(PROXY_CONFIGURATION_SERVICE.getDisplayName())
+ .build());
+ }
+ }
+
+
+ }
+
+ /**
+ * A convenient method to get ProxyConfiguration instance from a PropertyContext.
+ * @param context the process context
+ * @return The proxy configurations at Controller Service if set, or DIRECT_CONFIGURATION
+ */
+ public static ProxyConfiguration getConfiguration(PropertyContext context) {
+ return getConfiguration(context, () -> DIRECT_CONFIGURATION);
+ }
+
+ /**
+ * This method can be used by Components those originally have per component proxy configurations
+ * to implement ProxyConfiguration Controller Service with backward compatibility.
+ * @param context the process context
+ * @param perComponentSetting the function to supply ProxyConfiguration based on per component settings,
+ * only called when Proxy Configuration Service is not set
+ * @return The proxy configurations at Controller Service if set, or per component settings otherwise
+ */
+ public static ProxyConfiguration getConfiguration(PropertyContext context, Supplier perComponentSetting) {
+ if (context.getProperty(PROXY_CONFIGURATION_SERVICE).isSet()) {
+ final ProxyConfigurationService proxyService = context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
+ return proxyService.getConfiguration();
+ } else {
+ return perComponentSetting.get();
+ }
+ }
+
+ private Proxy.Type proxyType = Proxy.Type.DIRECT;
+ private String proxyServerHost;
+ private Integer proxyServerPort;
+ private String proxyUserName;
+ private String proxyUserPassword;
+
+ public Proxy.Type getProxyType() {
+ return proxyType;
+ }
+
+ public void setProxyType(Proxy.Type proxyType) {
+ this.proxyType = proxyType;
+ }
+
+ public String getProxyServerHost() {
+ return proxyServerHost;
+ }
+
+ public void setProxyServerHost(String proxyServerHost) {
+ this.proxyServerHost = proxyServerHost;
+ }
+
+ public Integer getProxyServerPort() {
+ return proxyServerPort;
+ }
+
+ public void setProxyServerPort(Integer proxyServerPort) {
+ this.proxyServerPort = proxyServerPort;
+ }
+
+ public boolean hasCredential() {
+ return proxyUserName != null && !proxyUserName.isEmpty();
+ }
+
+ public String getProxyUserName() {
+ return proxyUserName;
+ }
+
+ public void setProxyUserName(String proxyUserName) {
+ this.proxyUserName = proxyUserName;
+ }
+
+ public String getProxyUserPassword() {
+ return proxyUserPassword;
+ }
+
+ public void setProxyUserPassword(String proxyUserPassword) {
+ this.proxyUserPassword = proxyUserPassword;
+ }
+
+ /**
+ * Create a Proxy instance based on proxy type, proxy server host and port.
+ */
+ public Proxy createProxy() {
+ return Proxy.Type.DIRECT.equals(proxyType) ? Proxy.NO_PROXY : new Proxy(proxyType, new InetSocketAddress(proxyServerHost, proxyServerPort));
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/ProxyConfigurationService.java b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/ProxyConfigurationService.java
new file mode 100644
index 000000000000..8e6594bf3635
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/ProxyConfigurationService.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.proxy;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ControllerService;
+
+/**
+ * Provides configurations to access a Proxy server.
+ */
+public interface ProxyConfigurationService extends ControllerService {
+
+ PropertyDescriptor PROXY_CONFIGURATION_SERVICE = new PropertyDescriptor.Builder()
+ .name("proxy-configuration-service")
+ .displayName("Proxy Configuration Service")
+ .description("Specifies the Proxy Configuration Controller Service to proxy network requests." +
+ " If set, it supersedes proxy settings configured per component.")
+ .identifiesControllerService(ProxyConfigurationService.class)
+ .required(false)
+ .build();
+
+ /**
+ * Returns proxy configurations.
+ * Implementations should return a non-null ProxyConfiguration instance which returns DIRECT proxy type instead of returning null,
+ * when underlying configuration or initialization is not done yet.
+ * @return A ProxyConfiguration instance.
+ */
+ ProxyConfiguration getConfiguration();
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/ProxySpec.java b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/ProxySpec.java
new file mode 100644
index 000000000000..06783c2cb127
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/ProxySpec.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.proxy;
+
+import java.net.Proxy;
+
+public enum ProxySpec {
+
+ HTTP(Proxy.Type.HTTP, "HTTP"),
+ HTTP_AUTH(Proxy.Type.HTTP, "HTTP + AuthN"),
+ SOCKS(Proxy.Type.SOCKS, "SOCKS"),
+ SOCKS_AUTH(Proxy.Type.SOCKS, "SOCKS + AuthN");
+
+ private Proxy.Type proxyType;
+ private String displayName;
+
+ ProxySpec(Proxy.Type type, String displayName) {
+ this.proxyType = type;
+ this.displayName = displayName;
+ }
+
+ public Proxy.Type getProxyType() {
+ return proxyType;
+ }
+
+ public String getDisplayName() {
+ return displayName;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/test/java/org/apache/nifi/proxy/TestProxyConfiguration.java b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/test/java/org/apache/nifi/proxy/TestProxyConfiguration.java
new file mode 100644
index 000000000000..2cd4fb249f1f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/test/java/org/apache/nifi/proxy/TestProxyConfiguration.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.proxy;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+import java.net.Proxy;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.nifi.proxy.ProxyConfiguration.DIRECT_CONFIGURATION;
+import static org.apache.nifi.proxy.ProxyConfiguration.createProxyConfigPropertyDescriptor;
+import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
+import static org.apache.nifi.proxy.ProxySpec.HTTP;
+import static org.apache.nifi.proxy.ProxySpec.HTTP_AUTH;
+import static org.apache.nifi.proxy.ProxySpec.SOCKS;
+import static org.apache.nifi.proxy.ProxySpec.SOCKS_AUTH;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestProxyConfiguration {
+
+ private static class ComponentUsingProxy extends AbstractProcessor {
+
+ private ProxySpec[] proxySpecs;
+
+ private void setProxySpecs(ProxySpec ... proxySpecs) {
+ this.proxySpecs = proxySpecs;
+ }
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return Collections.singletonList(createProxyConfigPropertyDescriptor(true, proxySpecs));
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+ }
+
+ @Override
+ protected Collection customValidate(ValidationContext validationContext) {
+ final List results = new ArrayList<>();
+ ProxyConfiguration.validateProxySpec(validationContext, results, proxySpecs);
+ return results;
+ }
+ }
+
+ private static final ProxyConfiguration HTTP_CONFIG = new ProxyConfiguration();
+ private static final ProxyConfiguration SOCKS_CONFIG = new ProxyConfiguration();
+ private static final ProxyConfiguration HTTP_AUTH_CONFIG = new ProxyConfiguration();
+ private static final ProxyConfiguration SOCKS_AUTH_CONFIG = new ProxyConfiguration();
+
+ static {
+ HTTP_CONFIG.setProxyType(Proxy.Type.HTTP);
+
+ HTTP_AUTH_CONFIG.setProxyType(Proxy.Type.HTTP);
+ HTTP_AUTH_CONFIG.setProxyUserName("proxy-user");
+ HTTP_AUTH_CONFIG.setProxyUserPassword("proxy-password");
+
+ SOCKS_CONFIG.setProxyType(Proxy.Type.SOCKS);
+
+ SOCKS_AUTH_CONFIG.setProxyType(Proxy.Type.SOCKS);
+ SOCKS_AUTH_CONFIG.setProxyUserName("proxy-user");
+ SOCKS_AUTH_CONFIG.setProxyUserPassword("proxy-password");
+ }
+
+ private void testValidateProxySpec(final boolean[] expectations, ProxySpec ... specs) throws InitializationException {
+ final String serviceId = "proxyConfigurationService";
+ final ProxyConfigurationService service = mock(ProxyConfigurationService.class);
+ when(service.getIdentifier()).thenReturn(serviceId);
+ when(service.getConfiguration()).thenReturn(DIRECT_CONFIGURATION, HTTP_CONFIG, HTTP_AUTH_CONFIG, SOCKS_CONFIG, SOCKS_AUTH_CONFIG);
+
+
+ final ComponentUsingProxy processor = new ComponentUsingProxy();
+ processor.setProxySpecs(specs);
+
+ final TestRunner testRunner = TestRunners.newTestRunner(processor);
+ testRunner.addControllerService(serviceId, service);
+ testRunner.enableControllerService(service);
+ testRunner.setProperty(PROXY_CONFIGURATION_SERVICE, serviceId);
+
+ for (boolean expectation : expectations) {
+ if (expectation) {
+ testRunner.assertValid();
+ } else {
+ testRunner.assertNotValid();
+ }
+ }
+ }
+
+ @Test
+ public void testHTTP() throws Exception {
+ // DEFAULT, HTTP
+ testValidateProxySpec(new boolean[] {true, true, false, false, false}, HTTP);
+
+ }
+
+ @Test
+ public void testHTTPAuth() throws Exception {
+ // DEFAULT, HTTP, HTTP_AUTH
+ testValidateProxySpec(new boolean[] {true, true, true, false, false}, HTTP_AUTH);
+ }
+
+ @Test
+ public void testHTTP_HTTPAuth() throws Exception {
+ // DEFAULT, HTTP, HTTP_AUTH
+ testValidateProxySpec(new boolean[] {true, true, true, false, false}, HTTP, HTTP_AUTH);
+ }
+
+ @Test
+ public void testSOCKS() throws Exception {
+ // DEFAULT, SOCKS
+ testValidateProxySpec(new boolean[] {true, false, false, true, false}, SOCKS);
+ }
+
+ @Test
+ public void testSOCKSAuth() throws Exception {
+ // DEFAULT, SOCKS, SOCKS_AUTH
+ testValidateProxySpec(new boolean[] {true, false, false, true, true}, SOCKS_AUTH);
+ }
+
+ @Test
+ public void testSOCKS_SOCKSAuth() throws Exception {
+ // DEFAULT, SOCKS, SOCKS_AUTH
+ testValidateProxySpec(new boolean[] {true, false, false, true, true}, SOCKS, SOCKS_AUTH);
+ }
+
+ @Test
+ public void testHTTPAuth_SOCKS() throws Exception {
+ // DEFAULT, HTTP, HTTP_AUTH, SOCKS
+ testValidateProxySpec(new boolean[] {true, true, true, true, false}, HTTP_AUTH, SOCKS);
+ }
+
+ @Test
+ public void testHTTPAuth_SOCKSAuth() throws Exception {
+ // DEFAULT, HTTP, HTTP_AUTH, SOCKS, SOCKS_AUTH
+ testValidateProxySpec(new boolean[] {true, true, true, true, true}, HTTP_AUTH, SOCKS_AUTH);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration-nar/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration-nar/pom.xml
new file mode 100644
index 000000000000..6d1cab8e79f8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration-nar/pom.xml
@@ -0,0 +1,42 @@
+
+
+
+
+ nifi-proxy-configuration-bundle
+ org.apache.nifi
+ 1.7.0-SNAPSHOT
+
+ 4.0.0
+
+ nifi-proxy-configuration-nar
+ nar
+
+
+
+ org.apache.nifi
+ nifi-standard-services-api-nar
+ 1.7.0-SNAPSHOT
+ nar
+
+
+ org.apache.nifi
+ nifi-proxy-configuration
+ 1.7.0-SNAPSHOT
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration/pom.xml
new file mode 100644
index 000000000000..0186277b00b2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration/pom.xml
@@ -0,0 +1,45 @@
+
+
+
+
+ nifi-proxy-configuration-bundle
+ org.apache.nifi
+ 1.7.0-SNAPSHOT
+
+ 4.0.0
+
+ nifi-proxy-configuration
+ jar
+
+
+
+ org.apache.nifi
+ nifi-api
+
+
+ org.apache.nifi
+ nifi-proxy-configuration-api
+
+
+ org.apache.nifi
+ nifi-standard-utils
+ 1.7.0-SNAPSHOT
+
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration/src/main/java/org/apache/nifi/proxy/StandardProxyConfigurationService.java b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration/src/main/java/org/apache/nifi/proxy/StandardProxyConfigurationService.java
new file mode 100644
index 000000000000..a3b3f516b33a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration/src/main/java/org/apache/nifi/proxy/StandardProxyConfigurationService.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.proxy;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.net.Proxy;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+@CapabilityDescription("Provides a set of configurations for different NiFi components to use a proxy server.")
+@Tags({"Proxy"})
+public class StandardProxyConfigurationService extends AbstractControllerService implements ProxyConfigurationService {
+
+ static final PropertyDescriptor PROXY_TYPE = new PropertyDescriptor.Builder()
+ .name("proxy-type")
+ .displayName("Proxy Type")
+ .description("Proxy type.")
+ .allowableValues(Proxy.Type.values())
+ .defaultValue(Proxy.Type.DIRECT.name())
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor PROXY_SERVER_HOST = new PropertyDescriptor.Builder()
+ .name("proxy-server-host")
+ .displayName("Proxy Server Host")
+ .description("Proxy server hostname or ip-address.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ static final PropertyDescriptor PROXY_SERVER_PORT = new PropertyDescriptor.Builder()
+ .name("proxy-server-port")
+ .displayName("Proxy Server Port")
+ .description("Proxy server port number.")
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ static final PropertyDescriptor PROXY_USER_NAME = new PropertyDescriptor.Builder()
+ .name("proxy-user-name")
+ .displayName("Proxy User Name")
+ .description("The name of the proxy client for user authentication.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ static final PropertyDescriptor PROXY_USER_PASSWORD = new PropertyDescriptor.Builder()
+ .name("proxy-user-password")
+ .displayName("Proxy User Password")
+ .description("The password of the proxy client for user authentication.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .sensitive(true)
+ .build();
+
+ private volatile ProxyConfiguration configuration = ProxyConfiguration.DIRECT_CONFIGURATION;
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ final List properties = new ArrayList<>();
+ properties.add(PROXY_TYPE);
+ properties.add(PROXY_SERVER_HOST);
+ properties.add(PROXY_SERVER_PORT);
+ properties.add(PROXY_USER_NAME);
+ properties.add(PROXY_USER_PASSWORD);
+ return properties;
+ }
+
+ @OnEnabled
+ public void setConfiguredValues(final ConfigurationContext context) {
+ configuration = new ProxyConfiguration();
+ configuration.setProxyType(Proxy.Type.valueOf(context.getProperty(PROXY_TYPE).getValue()));
+ configuration.setProxyServerHost(context.getProperty(PROXY_SERVER_HOST).evaluateAttributeExpressions().getValue());
+ configuration.setProxyServerPort(context.getProperty(PROXY_SERVER_PORT).evaluateAttributeExpressions().asInteger());
+ configuration.setProxyUserName(context.getProperty(PROXY_USER_NAME).evaluateAttributeExpressions().getValue());
+ configuration.setProxyUserPassword(context.getProperty(PROXY_USER_PASSWORD).evaluateAttributeExpressions().getValue());
+ }
+
+ @Override
+ protected Collection customValidate(ValidationContext validationContext) {
+ final Proxy.Type proxyType = Proxy.Type.valueOf(validationContext.getProperty(PROXY_TYPE).getValue());
+ if (Proxy.Type.DIRECT.equals(proxyType)) {
+ return Collections.emptyList();
+ }
+
+ final List results = new ArrayList<>();
+ if (!validationContext.getProperty(PROXY_SERVER_HOST).isSet()) {
+ results.add(new ValidationResult.Builder().subject(PROXY_SERVER_HOST.getDisplayName())
+ .explanation("required").valid(false).build());
+ }
+ if (!validationContext.getProperty(PROXY_SERVER_PORT).isSet()) {
+ results.add(new ValidationResult.Builder().subject(PROXY_SERVER_PORT.getDisplayName())
+ .explanation("required").valid(false).build());
+ }
+ return results;
+ }
+
+ @Override
+ public ProxyConfiguration getConfiguration() {
+ return configuration;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 000000000000..3d4e54c22db3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.proxy.StandardProxyConfigurationService
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/pom.xml
new file mode 100644
index 000000000000..4e96b4b228d0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/pom.xml
@@ -0,0 +1,34 @@
+
+
+
+
+ nifi-standard-services
+ org.apache.nifi
+ 1.7.0-SNAPSHOT
+
+ 4.0.0
+
+ nifi-proxy-configuration-bundle
+ pom
+
+ nifi-proxy-configuration
+ nifi-proxy-configuration-nar
+
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
index 67e293ae7ce0..57ce127610cc 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
@@ -83,5 +83,10 @@
nifi-kerberos-credentials-service-api
compile
+
+ org.apache.nifi
+ nifi-proxy-configuration-api
+ compile
+
diff --git a/nifi-nar-bundles/nifi-standard-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/pom.xml
index 7a5c29b987cb..76ab82bf73e5 100644
--- a/nifi-nar-bundles/nifi-standard-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/pom.xml
@@ -43,5 +43,7 @@
nifi-hwx-schema-registry-bundle
nifi-kerberos-credentials-service-api
nifi-kerberos-credentials-service-bundle
+ nifi-proxy-configuration-api
+ nifi-proxy-configuration-bundle
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 80e579c08df8..a94bde7fb22f 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -239,6 +239,11 @@
nifi-http-context-map
1.7.0-SNAPSHOT
+
+ org.apache.nifi
+ nifi-proxy-configuration-api
+ 1.7.0-SNAPSHOT
+
org.apache.nifi
nifi-volatile-provenance-repository