From 30bf7c0bc36d0e815c2c1114a021d7398663f649 Mon Sep 17 00:00:00 2001 From: Zack Date: Sun, 1 Dec 2024 17:35:35 -0500 Subject: [PATCH] Remove usage of parallelStream in favor of stream --- .../client/request/GetFeaturesBatchRequest.java | 8 +++----- .../client/response/GetFeaturesBatchResponse.java | 6 ++---- .../tecton/client/transport/TectonHttpClient.java | 13 +++++-------- 3 files changed, 10 insertions(+), 17 deletions(-) diff --git a/src/main/java/ai/tecton/client/request/GetFeaturesBatchRequest.java b/src/main/java/ai/tecton/client/request/GetFeaturesBatchRequest.java index 57966632..365f00d9 100644 --- a/src/main/java/ai/tecton/client/request/GetFeaturesBatchRequest.java +++ b/src/main/java/ai/tecton/client/request/GetFeaturesBatchRequest.java @@ -198,8 +198,7 @@ public GetFeaturesBatchRequest( // For batch requests, partition the requestDataList into n sublists of size // microBatchSize and create GetFeaturesMicroBatchRequest for each this.requestList = - ListUtils.partition(requestDataList, microBatchSize) - .parallelStream() + ListUtils.partition(requestDataList, microBatchSize).stream() .map( requestData -> new GetFeaturesMicroBatchRequest( @@ -212,8 +211,7 @@ public GetFeaturesBatchRequest( } else { // For microBatchSize=1, create a List of individual GetFeaturesRequest objects this.requestList = - requestDataList - .parallelStream() + requestDataList.stream() .map( requestData -> new GetFeaturesRequest( @@ -400,7 +398,7 @@ private static void validateParameters( if (requestDataList == null || requestDataList.isEmpty()) { throw new InvalidRequestParameterException(TectonErrorMessage.INVALID_REQUEST_DATA_LIST); } - requestDataList.parallelStream().forEach(AbstractGetFeaturesRequest::validateRequestParameters); + requestDataList.forEach(AbstractGetFeaturesRequest::validateRequestParameters); if (microBatchSize > RequestConstants.MAX_MICRO_BATCH_SIZE || microBatchSize < 1) { throw new InvalidRequestParameterException( String.format( diff --git a/src/main/java/ai/tecton/client/response/GetFeaturesBatchResponse.java b/src/main/java/ai/tecton/client/response/GetFeaturesBatchResponse.java index 2bd43b07..70762383 100644 --- a/src/main/java/ai/tecton/client/response/GetFeaturesBatchResponse.java +++ b/src/main/java/ai/tecton/client/response/GetFeaturesBatchResponse.java @@ -49,16 +49,14 @@ public GetFeaturesBatchResponse( // Serialize list of HttpResponse into list of GetFeaturesMicroBatchResponse List microBatchResponses = - httpResponseList - .parallelStream() + httpResponseList.stream() .map(httpResponse -> parseSingleHttpResponse(httpResponse, microBatchSize)) .collect(Collectors.toList()); // Concatenate list of GetFeaturesResponse objects from each microbatch into a single list // Maintain ordering this.batchResponseList = - microBatchResponses - .parallelStream() + microBatchResponses.stream() .map(microBatch -> microBatch.microBatchResponseList) .flatMap(List::stream) .collect(Collectors.toList()); diff --git a/src/main/java/ai/tecton/client/transport/TectonHttpClient.java b/src/main/java/ai/tecton/client/transport/TectonHttpClient.java index 4e4aa916..d0330c90 100644 --- a/src/main/java/ai/tecton/client/transport/TectonHttpClient.java +++ b/src/main/java/ai/tecton/client/transport/TectonHttpClient.java @@ -114,8 +114,7 @@ public List performParallelRequests( // Map request body to OkHttp Request // ordering of requests is maintained List requestList = - requestBodyList - .parallelStream() + requestBodyList.stream() .map( requestBody -> new HttpRequest(url.url().toString(), endpoint, method, apiKey, requestBody)) @@ -153,12 +152,10 @@ public void onResponse(Call call, Response response) { }; // Enqueue all calls - requestList - .parallelStream() - .forEach( - request -> { - client.newCall(request).enqueue(callback); - }); + requestList.forEach( + request -> { + client.newCall(request).enqueue(callback); + }); // Wait until A) all calls have completed or B) specified timeout has elapsed try {