Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 31 additions & 32 deletions src/main/java/ai/tecton/client/transport/TectonHttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.internal.http.HttpMethod;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;

Expand Down Expand Up @@ -121,39 +122,37 @@ public List<HttpResponse> performParallelRequests(
// Initialize a countdown latch for numberOfCalls.
CountDownLatch countDownLatch = new CountDownLatch(requestBodyList.size());

Callback callback =
new Callback() {
@Override
public void onFailure(Call call, IOException e) {
// On timeout, executor rejects all pending calls. This could lead to an
// InterruptedIOException for in-flight calls, which is expected.
// Only log failures for other call failures such as network issues
if (!(e instanceof InterruptedIOException)) {
parallelCallHandler.logCallFailure(e.getMessage());
}
}

@Override
public void onResponse(Call call, Response response) {
try (ResponseBody responseBody = response.body()) {
// Add response to corresponding index
parallelCallHandler.set(
Integer.parseInt(call.request().header("request-index")),
new HttpResponse(response, responseBody));
} catch (Exception e) {
throw new TectonServiceException(e.getMessage());
} finally {
Objects.requireNonNull(response.body()).close();
countDownLatch.countDown();
}
}
};

// Enqueue all calls
requestList.forEach(
request -> {
client.newCall(request).enqueue(callback);
});
for (int i = 0; i < requestList.size(); i++) {
int index = i;
client
.newCall(requestList.get(index))
.enqueue(
new Callback() {
@Override
public void onFailure(Call call, IOException e) {
// On timeout, executor rejects all pending calls. This could lead to an
// InterruptedIOException for in-flight calls, which is expected.
// Only log failures for other call failures such as network issues
if (!(e instanceof InterruptedIOException)) {
parallelCallHandler.logCallFailure(e.getMessage());
}
}

@Override
public void onResponse(Call call, Response response) {
try (ResponseBody responseBody = response.body()) {
// Add response to corresponding index
parallelCallHandler.set(index, new HttpResponse(response, responseBody));
} catch (Exception e) {
throw new TectonServiceException(e.getMessage());
} finally {
Objects.requireNonNull(response.body()).close();
countDownLatch.countDown();
}
}
});
}

// Wait until A) all calls have completed or B) specified timeout has elapsed
try {
Expand Down