-
-
Notifications
You must be signed in to change notification settings - Fork 1k
Description
Hello, I am using httpx to make asynchronous requests in a generator function. However, I encountered a problem when I tried to cancel the generator using generator.close() or asyncio.gather(*tasks, return_exceptions=True). The generator did not exit gracefully and raised an asyncio.CancelledError instead of a return. This caused some unwanted side effects and made it difficult to handle the cancellation properly.
I looked into the source code of httpx and found that the problem was in the aiter_raw and aiter_bytes functions in the httpx/_client.py module. These functions use async for to iterate over the response stream, but they do not catch the asyncio.CancelledError that may be raised when the stream is cancelled. According to the [documentation] of asyncio.CancelledError, this exception should be caught and either re-raised or suppressed. In this case, I think it would make sense to re-raise it as a GeneratorExit, which is the expected exception for generator termination.
I suggest adding a try-except block around the async for loop in the aiter_raw and aiter_bytes functions, like this:
async def aiter_raw(
self, chunk_size: typing.Optional[int] = None
) -> typing.AsyncIterator[bytes]:
"""
A byte-iterator over the raw response content.
"""
if self.is_stream_consumed:
raise StreamConsumed()
if self.is_closed:
raise StreamClosed()
if not isinstance(self.stream, AsyncByteStream):
raise RuntimeError("Attempted to call an async iterator on an sync stream.")
try:
self.is_stream_consumed = True
self._num_bytes_downloaded = 0
chunker = ByteChunker(chunk_size=chunk_size)
with request_context(request=self._request):
async for raw_stream_bytes in self.stream:
self._num_bytes_downloaded += len(raw_stream_bytes)
for chunk in chunker.decode(raw_stream_bytes):
await asyncio.sleep(0)
yield chunk
for chunk in chunker.flush():
await asyncio.sleep(0)
yield chunk
except asyncio.CancelledError:
return
finally:
await self.aclose()