diff --git a/src/main/java/ai/tecton/client/TectonClient.java b/src/main/java/ai/tecton/client/TectonClient.java index e0f0e2d1..09ddd55a 100644 --- a/src/main/java/ai/tecton/client/TectonClient.java +++ b/src/main/java/ai/tecton/client/TectonClient.java @@ -141,7 +141,10 @@ public GetFeatureServiceMetadataResponse getFeatureServiceMetadata( /** * Makes a batch request to retrieve a list of feature vector and metadata for a given workspace - * and feature service + * and feature service. + * + *

Calls are queued by the constructor-provided OkHttp client's executor service, using the + * default one if none was provided. * * @param batchRequest The {@link GetFeaturesRequest} object with the request parameters * @return {@link GetFeaturesBatchResponse} object with the list of feature vector and metadata @@ -173,7 +176,8 @@ public GetFeaturesBatchResponse getFeaturesBatch(GetFeaturesBatchRequest batchRe batchRequest.getEndpoint(), batchRequest.getMethod(), requestList, - batchRequest.getTimeout()); + batchRequest.getTimeout(), + batchRequest.getUseExecutorServiceForParallelism()); long stop = System.currentTimeMillis(); Duration totalTime = Duration.ofMillis(stop - start); diff --git a/src/main/java/ai/tecton/client/request/GetFeaturesBatchRequest.java b/src/main/java/ai/tecton/client/request/GetFeaturesBatchRequest.java index 365f00d9..c4cb2c43 100644 --- a/src/main/java/ai/tecton/client/request/GetFeaturesBatchRequest.java +++ b/src/main/java/ai/tecton/client/request/GetFeaturesBatchRequest.java @@ -46,6 +46,7 @@ public class GetFeaturesBatchRequest { private List requestList; private final int microBatchSize; private final Duration timeout; + private final boolean useExecutorServiceForParallelism; private static final String BATCH_ENDPOINT = "/api/v1/feature-service/get-features-batch"; private static JsonAdapter jsonAdapter = null; @@ -191,8 +192,58 @@ public GetFeaturesBatchRequest( Set metadataOptions, int microBatchSize, Duration timeout) { + this( + workspaceName, + featureServiceName, + requestDataList, + metadataOptions, + microBatchSize, + timeout, + false); + } + + /** + * Constructor that creates a new GetFeaturesBatchRequest with the specified parameters + * + * @param workspaceName Name of the workspace in which the Feature Service is defined + * @param featureServiceName Name of the Feature Service for which the feature vectors are being + * requested + * @param requestDataList a {@link List} of {@link GetFeaturesRequestData} object with joinKeyMap + * and/or requestContextMap + * @param metadataOptions metadataOptions A {@link Set} of {@link MetadataOption} for retrieving + * additional metadata about the feature values. Use {@link + * RequestConstants#ALL_METADATA_OPTIONS} to request all metadata and {@link + * RequestConstants#NONE_METADATA_OPTIONS} to request no metadata respectively. By default, + * {@link RequestConstants#DEFAULT_METADATA_OPTIONS} will be added to each request + * @param microBatchSize an int value between 1 and {@value + * RequestConstants#MAX_MICRO_BATCH_SIZE}. The client splits the GetFeaturesBatchRequest into + * multiple micro batches of this size and executes them parallely. By default, the + * microBatchSize is set to {@value RequestConstants#DEFAULT_MICRO_BATCH_SIZE} + * @param timeout The max time in {@link Duration} for which the client waits for the batch + * requests to complete before canceling the operation and returning the partial list of + * results. + * @param useExecutorServiceForParallelism If set to true, the client uses the OkHttp dispatcher's + * ExecutorService for pre-processing work, in addition to actual HTTP calls. This can be + * useful when the client is used in a multi-threaded environment and there are many requests + * to pre-process. + * @throws InvalidRequestParameterException when workspaceName or featureServiceName is empty or + * null + * @throws InvalidRequestParameterException when requestDataList is invalid (null/empty or + * contains null/empty elements) + * @throws InvalidRequestParameterException when the microBatchSize is out of bounds of [ 1, + * {@value RequestConstants#MAX_MICRO_BATCH_SIZE} ] + */ + public GetFeaturesBatchRequest( + String workspaceName, + String featureServiceName, + List requestDataList, + Set metadataOptions, + int microBatchSize, + Duration timeout, + boolean useExecutorServiceForParallelism) { validateParameters(workspaceName, featureServiceName, requestDataList, microBatchSize); this.timeout = timeout; + this.useExecutorServiceForParallelism = useExecutorServiceForParallelism; if (microBatchSize > 1 && requestDataList.size() > 1) { // For batch requests, partition the requestDataList into n sublists of size @@ -242,6 +293,15 @@ public Duration getTimeout() { return timeout; } + /** + * Getter for useExecutorServiceForParallelism + * + * @return useExecutorServiceForParallelism + */ + public boolean getUseExecutorServiceForParallelism() { + return useExecutorServiceForParallelism; + } + /** * Getter for microBatchSize * @@ -270,6 +330,7 @@ public static class Builder { private Set metadataOptionList = RequestConstants.DEFAULT_METADATA_OPTIONS; private int microBatchSize = RequestConstants.DEFAULT_MICRO_BATCH_SIZE; private Duration timeout = RequestConstants.NONE_TIMEOUT; + private boolean useExecutorServiceForParallelism = false; /** Constructs an empty Builder */ public Builder() { @@ -367,6 +428,18 @@ public Builder timeout(Duration timeout) { return this; } + /** + * @param useExecutorServiceForParallelism If set to true, the client uses the OkHttp + * dispatcher's ExecutorService for pre-processing work, in addition to actual HTTP calls. + * This can be useful when the client is used in a multi-threaded environment and there are + * many requests to pre-process. + * @return this Builder + */ + public Builder useExecutorServiceForParallelism(boolean useExecutorServiceForParallelism) { + this.useExecutorServiceForParallelism = useExecutorServiceForParallelism; + return this; + } + /** * Returns an instance of {@link GetFeaturesBatchRequest} created from the fields set on this * builder @@ -384,7 +457,8 @@ public GetFeaturesBatchRequest build() throws TectonClientException { requestDataList, metadataOptionList, microBatchSize, - timeout); + timeout, + useExecutorServiceForParallelism); } } diff --git a/src/main/java/ai/tecton/client/transport/TectonHttpClient.java b/src/main/java/ai/tecton/client/transport/TectonHttpClient.java index 18e071e1..7bd86f98 100644 --- a/src/main/java/ai/tecton/client/transport/TectonHttpClient.java +++ b/src/main/java/ai/tecton/client/transport/TectonHttpClient.java @@ -15,6 +15,8 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import okhttp3.Call; @@ -105,53 +107,129 @@ public HttpResponse performRequest(String endpoint, HttpMethod method, String re } public List performParallelRequests( - String endpoint, HttpMethod method, List requestBodyList, Duration timeout) + String endpoint, + HttpMethod method, + List requestBodyList, + Duration timeout, + boolean useExecutorServiceForParallelism) throws TectonClientException { // Initialize response list ParallelCallHandler parallelCallHandler = new ParallelCallHandler(requestBodyList.size()); - // Map request body to OkHttp Request - // ordering of requests is maintained + // Map request body to OkHttp Request. Ordering of requests is maintained. List requestList = new ArrayList<>(); - for (int i = 0; i < requestBodyList.size(); i++) { - HttpRequest httpRequest = - new HttpRequest(url.url().toString(), endpoint, method, apiKey, requestBodyList.get(i)); - requestList.add(buildRequestWithDefaultHeaders(httpRequest, i)); + if (useExecutorServiceForParallelism) { + // Use the executor service to parallelize the request creation. + List> futures = new ArrayList<>(); + + for (int i = 0; i < requestBodyList.size(); i++) { + int index = i; + futures.add( + this.client + .dispatcher() + .executorService() + .submit( + () -> { + HttpRequest httpRequest = + new HttpRequest( + url.url().toString(), + endpoint, + method, + apiKey, + requestBodyList.get(index)); + return buildRequestWithDefaultHeaders(httpRequest, index); + })); + } + + for (Future future : futures) { + try { + requestList.add(future.get()); + } catch (InterruptedException | ExecutionException e) { + throw new TectonClientException(e.getMessage()); + } + } + } else { + for (int i = 0; i < requestBodyList.size(); i++) { + HttpRequest httpRequest = + new HttpRequest(url.url().toString(), endpoint, method, apiKey, requestBodyList.get(i)); + requestList.add(buildRequestWithDefaultHeaders(httpRequest, i)); + } } // Initialize a countdown latch for numberOfCalls. CountDownLatch countDownLatch = new CountDownLatch(requestBodyList.size()); // Enqueue all calls - for (int i = 0; i < requestList.size(); i++) { - int index = i; - client - .newCall(requestList.get(index)) - .enqueue( - new Callback() { - @Override - public void onFailure(Call call, IOException e) { - // On timeout, executor rejects all pending calls. This could lead to an - // InterruptedIOException for in-flight calls, which is expected. - // Only log failures for other call failures such as network issues - if (!(e instanceof InterruptedIOException)) { - parallelCallHandler.logCallFailure(e.getMessage()); + if (useExecutorServiceForParallelism) { + for (int i = 0; i < requestList.size(); i++) { + int index = i; + Request req = requestList.get(index); + client + .dispatcher() + .executorService() + .submit( + () -> + client + .newCall(req) + .enqueue( + new Callback() { + @Override + public void onFailure(Call call, IOException e) { + // On timeout, executor rejects all pending calls. This could lead + // to an + // InterruptedIOException for in-flight calls, which is expected. + // Only log failures for other call failures such as network issues + if (!(e instanceof InterruptedIOException)) { + parallelCallHandler.logCallFailure(e.getMessage()); + } + } + + @Override + public void onResponse(Call call, Response response) { + try (ResponseBody responseBody = response.body()) { + // Add response to corresponding index + parallelCallHandler.set( + index, new HttpResponse(response, responseBody)); + } catch (Exception e) { + throw new TectonServiceException(e.getMessage()); + } finally { + Objects.requireNonNull(response.body()).close(); + countDownLatch.countDown(); + } + } + })); + } + } else { + for (int i = 0; i < requestList.size(); i++) { + int index = i; + client + .newCall(requestList.get(index)) + .enqueue( + new Callback() { + @Override + public void onFailure(Call call, IOException e) { + // On timeout, executor rejects all pending calls. This could lead to an + // InterruptedIOException for in-flight calls, which is expected. + // Only log failures for other call failures such as network issues + if (!(e instanceof InterruptedIOException)) { + parallelCallHandler.logCallFailure(e.getMessage()); + } } - } - - @Override - public void onResponse(Call call, Response response) { - try (ResponseBody responseBody = response.body()) { - // Add response to corresponding index - parallelCallHandler.set(index, new HttpResponse(response, responseBody)); - } catch (Exception e) { - throw new TectonServiceException(e.getMessage()); - } finally { - Objects.requireNonNull(response.body()).close(); - countDownLatch.countDown(); + + @Override + public void onResponse(Call call, Response response) { + try (ResponseBody responseBody = response.body()) { + // Add response to corresponding index + parallelCallHandler.set(index, new HttpResponse(response, responseBody)); + } catch (Exception e) { + throw new TectonServiceException(e.getMessage()); + } finally { + Objects.requireNonNull(response.body()).close(); + countDownLatch.countDown(); + } } - } - }); + }); + } } // Wait until A) all calls have completed or B) specified timeout has elapsed diff --git a/src/test/java/ai/tecton/client/transport/TectonHttpClientTest.java b/src/test/java/ai/tecton/client/transport/TectonHttpClientTest.java index 1d95b2f6..0ef7423d 100644 --- a/src/test/java/ai/tecton/client/transport/TectonHttpClientTest.java +++ b/src/test/java/ai/tecton/client/transport/TectonHttpClientTest.java @@ -122,7 +122,7 @@ public void testParallelRequestsWithDefaultClient() { List requestList = prepareRequests(100); List httpResponses = httpClient.performParallelRequests( - endpoint, method, requestList, RequestConstants.NONE_TIMEOUT); + endpoint, method, requestList, RequestConstants.NONE_TIMEOUT, false); List responseList = httpResponses.stream() @@ -144,7 +144,7 @@ public void testParallelRequestsWithPartialErrorResponses() { requestList.addAll(Arrays.asList("", "", "")); List httpResponses = httpClient.performParallelRequests( - endpoint, method, requestList, RequestConstants.NONE_TIMEOUT); + endpoint, method, requestList, RequestConstants.NONE_TIMEOUT, false); // Verify that first 10 responses are successful and last 3 responses are errors httpResponses.subList(0, 100).forEach(response -> Assert.assertTrue(response.isSuccessful())); @@ -160,7 +160,8 @@ public void testParallelRequestWithTimeout() { this.baseUrlString, this.apiKey, new TectonClientOptions.Builder().build()); List requestList = prepareRequests(100); List httpResponses = - httpClient.performParallelRequests(endpoint, method, requestList, Duration.ofMillis(10)); + httpClient.performParallelRequests( + endpoint, method, requestList, Duration.ofMillis(10), false); // 100 requests with a default maxParallelRequests is not expected to complete in 10 ms long numSuccessfulCalls = httpResponses.stream().filter(Objects::nonNull).count(); Assert.assertTrue(numSuccessfulCalls < 100);