Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/main/java/ai/tecton/client/TectonClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ public GetFeatureServiceMetadataResponse getFeatureServiceMetadata(

/**
* Makes a batch request to retrieve a list of feature vector and metadata for a given workspace
* and feature service
* and feature service.
*
* <p>Calls are queued by the constructor-provided OkHttp client's executor service, using the
* default one if none was provided.
*
* @param batchRequest The {@link GetFeaturesRequest} object with the request parameters
* @return {@link GetFeaturesBatchResponse} object with the list of feature vector and metadata
Expand Down Expand Up @@ -173,7 +176,8 @@ public GetFeaturesBatchResponse getFeaturesBatch(GetFeaturesBatchRequest batchRe
batchRequest.getEndpoint(),
batchRequest.getMethod(),
requestList,
batchRequest.getTimeout());
batchRequest.getTimeout(),
batchRequest.getUseExecutorServiceForParallelism());
long stop = System.currentTimeMillis();
Duration totalTime = Duration.ofMillis(stop - start);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class GetFeaturesBatchRequest {
private List<? extends AbstractGetFeaturesRequest> requestList;
private final int microBatchSize;
private final Duration timeout;
private final boolean useExecutorServiceForParallelism;
private static final String BATCH_ENDPOINT = "/api/v1/feature-service/get-features-batch";
private static JsonAdapter<GetFeaturesMicroBatchRequest.GetFeaturesRequestBatchJson> jsonAdapter =
null;
Expand Down Expand Up @@ -191,8 +192,58 @@ public GetFeaturesBatchRequest(
Set<MetadataOption> metadataOptions,
int microBatchSize,
Duration timeout) {
this(
workspaceName,
featureServiceName,
requestDataList,
metadataOptions,
microBatchSize,
timeout,
false);
}

/**
* Constructor that creates a new GetFeaturesBatchRequest with the specified parameters
*
* @param workspaceName Name of the workspace in which the Feature Service is defined
* @param featureServiceName Name of the Feature Service for which the feature vectors are being
* requested
* @param requestDataList a {@link List} of {@link GetFeaturesRequestData} object with joinKeyMap
* and/or requestContextMap
* @param metadataOptions metadataOptions A {@link Set} of {@link MetadataOption} for retrieving
* additional metadata about the feature values. Use {@link
* RequestConstants#ALL_METADATA_OPTIONS} to request all metadata and {@link
* RequestConstants#NONE_METADATA_OPTIONS} to request no metadata respectively. By default,
* {@link RequestConstants#DEFAULT_METADATA_OPTIONS} will be added to each request
* @param microBatchSize an int value between 1 and {@value
* RequestConstants#MAX_MICRO_BATCH_SIZE}. The client splits the GetFeaturesBatchRequest into
* multiple micro batches of this size and executes them parallely. By default, the
* microBatchSize is set to {@value RequestConstants#DEFAULT_MICRO_BATCH_SIZE}
* @param timeout The max time in {@link Duration} for which the client waits for the batch
* requests to complete before canceling the operation and returning the partial list of
* results.
* @param useExecutorServiceForParallelism If set to true, the client uses the OkHttp dispatcher's
* ExecutorService for pre-processing work, in addition to actual HTTP calls. This can be
* useful when the client is used in a multi-threaded environment and there are many requests
* to pre-process.
* @throws InvalidRequestParameterException when workspaceName or featureServiceName is empty or
* null
* @throws InvalidRequestParameterException when requestDataList is invalid (null/empty or
* contains null/empty elements)
* @throws InvalidRequestParameterException when the microBatchSize is out of bounds of [ 1,
* {@value RequestConstants#MAX_MICRO_BATCH_SIZE} ]
*/
public GetFeaturesBatchRequest(
String workspaceName,
String featureServiceName,
List<GetFeaturesRequestData> requestDataList,
Set<MetadataOption> metadataOptions,
int microBatchSize,
Duration timeout,
boolean useExecutorServiceForParallelism) {
validateParameters(workspaceName, featureServiceName, requestDataList, microBatchSize);
this.timeout = timeout;
this.useExecutorServiceForParallelism = useExecutorServiceForParallelism;

if (microBatchSize > 1 && requestDataList.size() > 1) {
// For batch requests, partition the requestDataList into n sublists of size
Expand Down Expand Up @@ -242,6 +293,15 @@ public Duration getTimeout() {
return timeout;
}

/**
* Getter for useExecutorServiceForParallelism
*
* @return useExecutorServiceForParallelism
*/
public boolean getUseExecutorServiceForParallelism() {
return useExecutorServiceForParallelism;
}

/**
* Getter for microBatchSize
*
Expand Down Expand Up @@ -270,6 +330,7 @@ public static class Builder {
private Set<MetadataOption> metadataOptionList = RequestConstants.DEFAULT_METADATA_OPTIONS;
private int microBatchSize = RequestConstants.DEFAULT_MICRO_BATCH_SIZE;
private Duration timeout = RequestConstants.NONE_TIMEOUT;
private boolean useExecutorServiceForParallelism = false;

/** Constructs an empty Builder */
public Builder() {
Expand Down Expand Up @@ -367,6 +428,18 @@ public Builder timeout(Duration timeout) {
return this;
}

/**
* @param useExecutorServiceForParallelism If set to true, the client uses the OkHttp
* dispatcher's ExecutorService for pre-processing work, in addition to actual HTTP calls.
* This can be useful when the client is used in a multi-threaded environment and there are
* many requests to pre-process.
* @return this Builder
*/
public Builder useExecutorServiceForParallelism(boolean useExecutorServiceForParallelism) {
this.useExecutorServiceForParallelism = useExecutorServiceForParallelism;
return this;
}

/**
* Returns an instance of {@link GetFeaturesBatchRequest} created from the fields set on this
* builder
Expand All @@ -384,7 +457,8 @@ public GetFeaturesBatchRequest build() throws TectonClientException {
requestDataList,
metadataOptionList,
microBatchSize,
timeout);
timeout,
useExecutorServiceForParallelism);
}
}

Expand Down
146 changes: 112 additions & 34 deletions src/main/java/ai/tecton/client/transport/TectonHttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import okhttp3.Call;
Expand Down Expand Up @@ -105,53 +107,129 @@ public HttpResponse performRequest(String endpoint, HttpMethod method, String re
}

public List<HttpResponse> performParallelRequests(
String endpoint, HttpMethod method, List<String> requestBodyList, Duration timeout)
String endpoint,
HttpMethod method,
List<String> requestBodyList,
Duration timeout,
boolean useExecutorServiceForParallelism)
throws TectonClientException {
// Initialize response list
ParallelCallHandler parallelCallHandler = new ParallelCallHandler(requestBodyList.size());

// Map request body to OkHttp Request
// ordering of requests is maintained
// Map request body to OkHttp Request. Ordering of requests is maintained.
List<Request> requestList = new ArrayList<>();
for (int i = 0; i < requestBodyList.size(); i++) {
HttpRequest httpRequest =
new HttpRequest(url.url().toString(), endpoint, method, apiKey, requestBodyList.get(i));
requestList.add(buildRequestWithDefaultHeaders(httpRequest, i));
if (useExecutorServiceForParallelism) {
// Use the executor service to parallelize the request creation.
List<Future<Request>> futures = new ArrayList<>();

for (int i = 0; i < requestBodyList.size(); i++) {
int index = i;
futures.add(
this.client
.dispatcher()
.executorService()
.submit(
() -> {
HttpRequest httpRequest =
new HttpRequest(
url.url().toString(),
endpoint,
method,
apiKey,
requestBodyList.get(index));
return buildRequestWithDefaultHeaders(httpRequest, index);
}));
}

for (Future<Request> future : futures) {
try {
requestList.add(future.get());
} catch (InterruptedException | ExecutionException e) {
throw new TectonClientException(e.getMessage());
}
}
} else {
for (int i = 0; i < requestBodyList.size(); i++) {
HttpRequest httpRequest =
new HttpRequest(url.url().toString(), endpoint, method, apiKey, requestBodyList.get(i));
requestList.add(buildRequestWithDefaultHeaders(httpRequest, i));
}
}

// Initialize a countdown latch for numberOfCalls.
CountDownLatch countDownLatch = new CountDownLatch(requestBodyList.size());

// Enqueue all calls
for (int i = 0; i < requestList.size(); i++) {
int index = i;
client
.newCall(requestList.get(index))
.enqueue(
new Callback() {
@Override
public void onFailure(Call call, IOException e) {
// On timeout, executor rejects all pending calls. This could lead to an
// InterruptedIOException for in-flight calls, which is expected.
// Only log failures for other call failures such as network issues
if (!(e instanceof InterruptedIOException)) {
parallelCallHandler.logCallFailure(e.getMessage());
if (useExecutorServiceForParallelism) {
for (int i = 0; i < requestList.size(); i++) {
int index = i;
Request req = requestList.get(index);
client
.dispatcher()
.executorService()
.submit(
() ->
client
.newCall(req)
.enqueue(
new Callback() {
@Override
public void onFailure(Call call, IOException e) {
// On timeout, executor rejects all pending calls. This could lead
// to an
// InterruptedIOException for in-flight calls, which is expected.
// Only log failures for other call failures such as network issues
if (!(e instanceof InterruptedIOException)) {
parallelCallHandler.logCallFailure(e.getMessage());
}
}

@Override
public void onResponse(Call call, Response response) {
try (ResponseBody responseBody = response.body()) {
// Add response to corresponding index
parallelCallHandler.set(
index, new HttpResponse(response, responseBody));
} catch (Exception e) {
throw new TectonServiceException(e.getMessage());
} finally {
Objects.requireNonNull(response.body()).close();
countDownLatch.countDown();
}
}
}));
}
} else {
for (int i = 0; i < requestList.size(); i++) {
int index = i;
client
.newCall(requestList.get(index))
.enqueue(
new Callback() {
@Override
public void onFailure(Call call, IOException e) {
// On timeout, executor rejects all pending calls. This could lead to an
// InterruptedIOException for in-flight calls, which is expected.
// Only log failures for other call failures such as network issues
if (!(e instanceof InterruptedIOException)) {
parallelCallHandler.logCallFailure(e.getMessage());
}
}
}

@Override
public void onResponse(Call call, Response response) {
try (ResponseBody responseBody = response.body()) {
// Add response to corresponding index
parallelCallHandler.set(index, new HttpResponse(response, responseBody));
} catch (Exception e) {
throw new TectonServiceException(e.getMessage());
} finally {
Objects.requireNonNull(response.body()).close();
countDownLatch.countDown();

@Override
public void onResponse(Call call, Response response) {
try (ResponseBody responseBody = response.body()) {
// Add response to corresponding index
parallelCallHandler.set(index, new HttpResponse(response, responseBody));
} catch (Exception e) {
throw new TectonServiceException(e.getMessage());
} finally {
Objects.requireNonNull(response.body()).close();
countDownLatch.countDown();
}
}
}
});
});
}
}

// Wait until A) all calls have completed or B) specified timeout has elapsed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void testParallelRequestsWithDefaultClient() {
List<String> requestList = prepareRequests(100);
List<HttpResponse> httpResponses =
httpClient.performParallelRequests(
endpoint, method, requestList, RequestConstants.NONE_TIMEOUT);
endpoint, method, requestList, RequestConstants.NONE_TIMEOUT, false);

List<String> responseList =
httpResponses.stream()
Expand All @@ -144,7 +144,7 @@ public void testParallelRequestsWithPartialErrorResponses() {
requestList.addAll(Arrays.asList("", "", ""));
List<HttpResponse> httpResponses =
httpClient.performParallelRequests(
endpoint, method, requestList, RequestConstants.NONE_TIMEOUT);
endpoint, method, requestList, RequestConstants.NONE_TIMEOUT, false);

// Verify that first 10 responses are successful and last 3 responses are errors
httpResponses.subList(0, 100).forEach(response -> Assert.assertTrue(response.isSuccessful()));
Expand All @@ -160,7 +160,8 @@ public void testParallelRequestWithTimeout() {
this.baseUrlString, this.apiKey, new TectonClientOptions.Builder().build());
List<String> requestList = prepareRequests(100);
List<HttpResponse> httpResponses =
httpClient.performParallelRequests(endpoint, method, requestList, Duration.ofMillis(10));
httpClient.performParallelRequests(
endpoint, method, requestList, Duration.ofMillis(10), false);
// 100 requests with a default maxParallelRequests is not expected to complete in 10 ms
long numSuccessfulCalls = httpResponses.stream().filter(Objects::nonNull).count();
Assert.assertTrue(numSuccessfulCalls < 100);
Expand Down