Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,7 @@ public GetFeaturesBatchRequest(
// For batch requests, partition the requestDataList into n sublists of size
// microBatchSize and create GetFeaturesMicroBatchRequest for each
this.requestList =
ListUtils.partition(requestDataList, microBatchSize)
.parallelStream()
ListUtils.partition(requestDataList, microBatchSize).stream()
.map(
requestData ->
new GetFeaturesMicroBatchRequest(
Expand All @@ -212,8 +211,7 @@ public GetFeaturesBatchRequest(
} else {
// For microBatchSize=1, create a List of individual GetFeaturesRequest objects
this.requestList =
requestDataList
.parallelStream()
requestDataList.stream()
.map(
requestData ->
new GetFeaturesRequest(
Expand Down Expand Up @@ -400,7 +398,7 @@ private static void validateParameters(
if (requestDataList == null || requestDataList.isEmpty()) {
throw new InvalidRequestParameterException(TectonErrorMessage.INVALID_REQUEST_DATA_LIST);
}
requestDataList.parallelStream().forEach(AbstractGetFeaturesRequest::validateRequestParameters);
requestDataList.forEach(AbstractGetFeaturesRequest::validateRequestParameters);
if (microBatchSize > RequestConstants.MAX_MICRO_BATCH_SIZE || microBatchSize < 1) {
throw new InvalidRequestParameterException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,14 @@ public GetFeaturesBatchResponse(

// Serialize list of HttpResponse into list of GetFeaturesMicroBatchResponse
List<GetFeaturesMicroBatchResponse> microBatchResponses =
httpResponseList
.parallelStream()
httpResponseList.stream()
.map(httpResponse -> parseSingleHttpResponse(httpResponse, microBatchSize))
.collect(Collectors.toList());

// Concatenate list of GetFeaturesResponse objects from each microbatch into a single list
// Maintain ordering
this.batchResponseList =
microBatchResponses
.parallelStream()
microBatchResponses.stream()
.map(microBatch -> microBatch.microBatchResponseList)
.flatMap(List::stream)
.collect(Collectors.toList());
Expand Down
13 changes: 5 additions & 8 deletions src/main/java/ai/tecton/client/transport/TectonHttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ public List<HttpResponse> performParallelRequests(
// Map request body to OkHttp Request
// ordering of requests is maintained
List<Request> requestList =
requestBodyList
.parallelStream()
requestBodyList.stream()
.map(
requestBody ->
new HttpRequest(url.url().toString(), endpoint, method, apiKey, requestBody))
Expand Down Expand Up @@ -153,12 +152,10 @@ public void onResponse(Call call, Response response) {
};

// Enqueue all calls
requestList
.parallelStream()
.forEach(
request -> {
client.newCall(request).enqueue(callback);
});
requestList.forEach(
request -> {
client.newCall(request).enqueue(callback);
});

// Wait until A) all calls have completed or B) specified timeout has elapsed
try {
Expand Down