11import asyncio
2+ import inspect
3+ import sys
24from collections import deque , defaultdict
35from datetime import timedelta
46from itertools import chain
@@ -705,8 +707,30 @@ def __init__(self, upstream, func, *args, **kwargs):
705707 stream_name = kwargs .pop ('stream_name' , None )
706708 self .kwargs = kwargs
707709 self .args = args
710+ if inspect .iscoroutinefunction (func ) or gen .is_coroutine_function (func ):
711+ self .work_queue = asyncio .Queue ()
712+ else :
713+ self .work_queue = None
708714
709715 Stream .__init__ (self , upstream , stream_name = stream_name )
716+ if self .work_queue :
717+ self .cb_task = self .loop .asyncio_loop .create_task (self .cb ())
718+ else :
719+ self .cb_task = None
720+
721+ async def cb (self ):
722+ while True :
723+ if not self .work_queue :
724+ assert self .work_queue , f"Async mapping engine should not have been started on { self } "
725+ await asyncio .sleep (sys .float_info .max ) # Sleep forever in case assertion are turned off
726+ coro , metadata = await self .work_queue .get ()
727+ try :
728+ result = await coro
729+ except Exception as e :
730+ logger .exception (e )
731+ raise
732+ else :
733+ return self ._emit (result , metadata = metadata )
710734
711735 def update (self , x , who = None , metadata = None ):
712736 try :
@@ -715,7 +739,10 @@ def update(self, x, who=None, metadata=None):
715739 logger .exception (e )
716740 raise
717741 else :
718- return self ._emit (result , metadata = metadata )
742+ if self .work_queue :
743+ return self .work_queue .put_nowait ((result , metadata ))
744+ else :
745+ return self ._emit (result , metadata = metadata )
719746
720747
721748@Stream .register_api ()
0 commit comments