77import java .net .http .HttpResponse ;
88import java .net .http .HttpResponse .BodyHandler ;
99import java .net .http .HttpResponse .BodyHandlers ;
10+ import java .net .http .HttpResponse .BodySubscribers ;
11+ import java .nio .ByteBuffer ;
1012import java .nio .charset .StandardCharsets ;
1113import java .util .HashMap ;
14+ import java .util .List ;
1215import java .util .Map ;
1316import java .util .concurrent .CompletableFuture ;
1417import java .util .concurrent .Flow ;
1518import java .util .function .Consumer ;
1619
1720public class JdkA2AHttpClient implements A2AHttpClient {
1821
22+ private static final int HTTP_OK = 200 ;
23+ private static final int HTTP_MULTIPLE_CHOICES = 300 ;
24+ private static final int HTTP_UNAUTHORIZED = 401 ;
25+ private static final int HTTP_FORBIDDEN = 403 ;
26+
1927 private final HttpClient httpClient ;
2028
2129 public JdkA2AHttpClient () {
@@ -88,6 +96,7 @@ protected CompletableFuture<Void> asyncRequest(
8896 ) {
8997 Flow .Subscriber <String > subscriber = new Flow .Subscriber <String >() {
9098 private Flow .Subscription subscription ;
99+ private volatile boolean errorRaised = false ;
91100
92101 @ Override
93102 public void onSubscribe (Flow .Subscription subscription ) {
@@ -109,25 +118,75 @@ public void onNext(String item) {
109118
110119 @ Override
111120 public void onError (Throwable throwable ) {
112- errorConsumer .accept (throwable );
113- subscription .cancel ();
121+ if (!errorRaised ) {
122+ errorRaised = true ;
123+ errorConsumer .accept (throwable );
124+ }
125+ if (subscription != null ) {
126+ subscription .cancel ();
127+ }
114128 }
115129
116130 @ Override
117131 public void onComplete () {
118- completeRunnable .run ();
119- subscription .cancel ();
132+ if (!errorRaised ) {
133+ completeRunnable .run ();
134+ }
135+ if (subscription != null ) {
136+ subscription .cancel ();
137+ }
120138 }
121139 };
122140
123- BodyHandler <Void > bodyHandler = BodyHandlers .fromLineSubscriber (subscriber );
141+ // Create a custom body handler that checks status before processing body
142+ BodyHandler <Void > bodyHandler = responseInfo -> {
143+ // Check status code first, before creating the subscriber
144+ if (!isSuccessStatus (responseInfo .statusCode ())) {
145+ final String errorMessage ;
146+ if (responseInfo .statusCode () == HTTP_UNAUTHORIZED ) {
147+ errorMessage = "Authentication failed: Client credentials are missing or invalid" ;
148+ } else if (responseInfo .statusCode () == HTTP_FORBIDDEN ) {
149+ errorMessage = "Authorization failed: Client does not have permission for the operation" ;
150+ } else {
151+ errorMessage = "Request failed with status " + responseInfo .statusCode ();
152+ }
153+ // Return a body subscriber that immediately signals error
154+ return BodySubscribers .fromSubscriber (new Flow .Subscriber <List <ByteBuffer >>() {
155+ @ Override
156+ public void onSubscribe (Flow .Subscription subscription ) {
157+ subscriber .onError (new IOException (errorMessage ));
158+ }
159+
160+ @ Override
161+ public void onNext (List <ByteBuffer > item ) {
162+ // Should not be called
163+ }
164+
165+ @ Override
166+ public void onError (Throwable throwable ) {
167+ // Should not be called
168+ }
169+
170+ @ Override
171+ public void onComplete () {
172+ // Should not be called
173+ }
174+ });
175+ } else {
176+ // Status is OK, proceed with normal line subscriber
177+ return BodyHandlers .fromLineSubscriber (subscriber ).apply (responseInfo );
178+ }
179+ };
124180
125181 // Send the response async, and let the subscriber handle the lines.
126182 return httpClient .sendAsync (request , bodyHandler )
127183 .thenAccept (response -> {
128- if (!JdkHttpResponse .success (response )) {
129- subscriber .onError (new IOException ("Request failed " + response .statusCode ()));
130- }
184+ // Status checking is now handled in the body handler
185+ })
186+ .exceptionally (throwable -> {
187+ // handle any other async errors (network issues, etc.)
188+ subscriber .onError (new IOException ("Request failed: " + throwable .getMessage (), throwable ));
189+ return null ;
131190 });
132191 }
133192 }
@@ -200,6 +259,13 @@ public A2AHttpResponse post() throws IOException, InterruptedException {
200259 .build ();
201260 HttpResponse <String > response =
202261 httpClient .send (request , BodyHandlers .ofString (StandardCharsets .UTF_8 ));
262+
263+ if (response .statusCode () == HTTP_UNAUTHORIZED ) {
264+ throw new IOException ("Authentication failed: Client credentials are missing or invalid" );
265+ } else if (response .statusCode () == HTTP_FORBIDDEN ) {
266+ throw new IOException ("Authorization failed: Client does not have permission for the operation" );
267+ }
268+
203269 return new JdkHttpResponse (response );
204270 }
205271
@@ -227,12 +293,16 @@ public boolean success() {// Send the request and get the response
227293 }
228294
229295 static boolean success (HttpResponse <?> response ) {
230- return response .statusCode () >= 200 && response .statusCode () < 300 ;
296+ return response .statusCode () >= HTTP_OK && response .statusCode () < HTTP_MULTIPLE_CHOICES ;
231297 }
232298
233299 @ Override
234300 public String body () {
235301 return response .body ();
236302 }
237303 }
304+
305+ private static boolean isSuccessStatus (int statusCode ) {
306+ return statusCode >= HTTP_OK && statusCode < HTTP_MULTIPLE_CHOICES ;
307+ }
238308}
0 commit comments