From 0fe288ee93d570c4ce1d7f82451b5670e9fa6f32 Mon Sep 17 00:00:00 2001 From: Zack Date: Mon, 2 Dec 2024 00:16:03 -0500 Subject: [PATCH] Pass index into callback --- .../client/transport/TectonHttpClient.java | 63 +++++++++---------- 1 file changed, 31 insertions(+), 32 deletions(-) diff --git a/src/main/java/ai/tecton/client/transport/TectonHttpClient.java b/src/main/java/ai/tecton/client/transport/TectonHttpClient.java index eb71ccbe..18e071e1 100644 --- a/src/main/java/ai/tecton/client/transport/TectonHttpClient.java +++ b/src/main/java/ai/tecton/client/transport/TectonHttpClient.java @@ -27,6 +27,7 @@ import okhttp3.RequestBody; import okhttp3.Response; import okhttp3.ResponseBody; +import okhttp3.internal.http.HttpMethod; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; @@ -121,39 +122,37 @@ public List performParallelRequests( // Initialize a countdown latch for numberOfCalls. CountDownLatch countDownLatch = new CountDownLatch(requestBodyList.size()); - Callback callback = - new Callback() { - @Override - public void onFailure(Call call, IOException e) { - // On timeout, executor rejects all pending calls. This could lead to an - // InterruptedIOException for in-flight calls, which is expected. - // Only log failures for other call failures such as network issues - if (!(e instanceof InterruptedIOException)) { - parallelCallHandler.logCallFailure(e.getMessage()); - } - } - - @Override - public void onResponse(Call call, Response response) { - try (ResponseBody responseBody = response.body()) { - // Add response to corresponding index - parallelCallHandler.set( - Integer.parseInt(call.request().header("request-index")), - new HttpResponse(response, responseBody)); - } catch (Exception e) { - throw new TectonServiceException(e.getMessage()); - } finally { - Objects.requireNonNull(response.body()).close(); - countDownLatch.countDown(); - } - } - }; - // Enqueue all calls - requestList.forEach( - request -> { - client.newCall(request).enqueue(callback); - }); + for (int i = 0; i < requestList.size(); i++) { + int index = i; + client + .newCall(requestList.get(index)) + .enqueue( + new Callback() { + @Override + public void onFailure(Call call, IOException e) { + // On timeout, executor rejects all pending calls. This could lead to an + // InterruptedIOException for in-flight calls, which is expected. + // Only log failures for other call failures such as network issues + if (!(e instanceof InterruptedIOException)) { + parallelCallHandler.logCallFailure(e.getMessage()); + } + } + + @Override + public void onResponse(Call call, Response response) { + try (ResponseBody responseBody = response.body()) { + // Add response to corresponding index + parallelCallHandler.set(index, new HttpResponse(response, responseBody)); + } catch (Exception e) { + throw new TectonServiceException(e.getMessage()); + } finally { + Objects.requireNonNull(response.body()).close(); + countDownLatch.countDown(); + } + } + }); + } // Wait until A) all calls have completed or B) specified timeout has elapsed try {