Skip to content

Make a task's lambda invocation concurrent #26

@rsyring

Description

@rsyring

The core of invoking a task is a lambda invoke:

result = self.response = client().invoke(

A basic threading queue pattern probably works fine here:

import threading
from queue import Queue

def worker(q: Queue):
    while True:
        item = q.get()
        if item is None:
            q.task_done()
            break
        try:
            # process item
            pass
        finally:
            q.task_done()

# TODO: make maxsize configurable
q = Queue(maxsize=100)
workers = []

# TODO: make worker count configurable
for _ in range(8):
    # TODO: daemon may not be appropriate here.  But we do need to think about running threads and terminating them. 
    # We don't want the lambda to keep running b/c the task submission threads are running.  They should only exist long
   # Enough to drain the queue and then it should all shut down.
    t = threading.Thread(target=worker, args=(q,), daemon=True)
    t.start()
    workers.append(t)

# producers
for item in range(1000):
    q.put(item)

# shutdown
for _ in workers:
    q.put(None)
q.join()
for t in workers:
    t.join()

Claude suggested:

import queue
import threading
import time

class AutoScalingThreadPool:
    def __init__(self, max_workers=4, idle_timeout=5):
        self.work_queue = queue.Queue()
        self.max_workers = max_workers
        self.idle_timeout = idle_timeout  # seconds to wait before exiting
        self.active_workers = 0
        self.lock = threading.Lock()
        self.shutdown_flag = False
    
    def worker(self):
        while not self.shutdown_flag:
            try:
                task = self.work_queue.get(timeout=self.idle_timeout)
                
                # Process the task
                try:
                    task()
                except Exception as e:
                    print(f"Error processing task: {e}")
                finally:
                    self.work_queue.task_done()
                    
            except queue.Empty:
                # Idle timeout reached, exit this worker
                with self.lock:
                    self.active_workers -= 1
                    print(f"Worker exiting due to idle timeout. Active workers: {self.active_workers}")
                return
    
    def submit(self, fn, *args, **kwargs):
        """Submit work to the queue, spinning up workers if needed."""
        if self.shutdown_flag:
            raise RuntimeError("Pool is shut down")
        
        # Wrap function with args
        task = lambda: fn(*args, **kwargs)
        self.work_queue.put(task)
        
        # Spin up workers if needed
        with self.lock:
            if self.active_workers < self.max_workers:
                self.active_workers += 1
                t = threading.Thread(target=self.worker, daemon=True)
                t.start()
                print(f"Started new worker. Active workers: {self.active_workers}")
    
    def wait_completion(self):
        """Wait for all queued tasks to complete."""
        self.work_queue.join()

        # Then join all threads to ensure they've fully exited
        for t in self.threads:
            t.join()
    
    def shutdown(self):
        """Gracefully shutdown the pool."""
        self.shutdown_flag = True
        self.work_queue.join()


# Usage example
if __name__ == "__main__":
    def task(n):
        print(f"Processing task {n} on thread {threading.current_thread().name}")
        time.sleep(1)
        print(f"Completed task {n}")
    
    pool = AutoScalingThreadPool(max_workers=4, idle_timeout=3)
    
    # Submit some work
    print("Submitting 10 tasks...")
    for i in range(10):
        pool.submit(task, i)
    
    # Wait for completion
    pool.wait_completion()
    print("All tasks completed")
    
    # Workers will auto-exit after 3 seconds of idling
    print("Waiting for workers to exit...")
    time.sleep(4)
    
    # Submit more work later - workers will spin up again
    print("\nSubmitting 3 more tasks...")
    for i in range(10, 13):
        pool.submit(task, i)
    
    pool.wait_completion()
    print("Done!")

As I think about it a bit more, we don't want a time based system for deciding when the queue is empty. Instead, after our handler calls the primary function (which will return), it should then call the AutoScalingThreadPool instance's shutdown(). Shutdown() should return immediately if tasks weren't used. If they were used, then it should submit None for all the workers, q.join() and then join each thread.

Note: it's possible to leave a thread running when the lambda handler returns but it will get "paused" (and probably killed?) when lambda pauses the execution environment. That is, it's impossible to leave a thread running beyond when the lambda exits. This seemingly remains true even though a warm lambda will ruse the same process for a new invocation: https://chatgpt.com/share/68f6f341-81b0-8008-8a39-7eb95431816e

Actually, it seems it's possible the threads would resume after the environment is unpaused for the next invocation:

Background processes or callbacks that were initiated by your Lambda function and did not complete when the function ended resume if Lambda reuses the execution environment. Make sure that any background processes or callbacks in your code are complete before the code exits.

/cc @calebsyring

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions