Parallel

Threading and multiprocessing functions
from fastcore.test import *
from nbdev.showdoc import *
from fastcore.nb_imports import *

source

threaded


def threaded(
    process:bool=False
):

Run f in a Thread (or Process if process=True), and returns it

@threaded
def _1():
    time.sleep(0.05)
    print("second")
    return 5

@threaded
def _2():
    time.sleep(0.01)
    print("first")

a = _1()
_2()
time.sleep(0.1)
first
second

After the thread is complete, the return value is stored in the result attr.

a.result
5

source

startthread


def startthread(
    f
):

Like threaded, but start thread immediately

@startthread
def _():
    time.sleep(0.05)
    print("second")

@startthread
def _():
    time.sleep(0.01)
    print("first")

time.sleep(0.1)
first
second

source

startproc


def startproc(
    f
):

Like threaded(True), but start Process immediately

@startproc
def _():
    time.sleep(0.05)
    print("second")

@startproc
def _():
    time.sleep(0.01)
    print("first")

time.sleep(0.1)
first
second

source

parallelable


def parallelable(
    param_name, num_workers, f:NoneType=None
):

source

ThreadPoolExecutor


def ThreadPoolExecutor(
    max_workers:int=4, on_exc:builtin_function_or_method=print, pause:int=0, kwargs:VAR_KEYWORD
):

Same as Python’s ThreadPoolExecutor, except can pass max_workers==0 for serial execution


source

ProcessPoolExecutor


def ProcessPoolExecutor(
    max_workers:int=4, on_exc:builtin_function_or_method=print, pause:int=0, mp_context:NoneType=None,
    initializer:NoneType=None, initargs:tuple=(), max_tasks_per_child:NoneType=None
):

Same as Python’s ProcessPoolExecutor, except can pass max_workers==0 for serial execution


source

parallel


def parallel(
    f, items, args:VAR_POSITIONAL, n_workers:int=4, total:NoneType=None, progress:NoneType=None, pause:int=0,
    method:NoneType=None, threadpool:bool=False, timeout:NoneType=None, chunksize:int=1, kwargs:VAR_KEYWORD
):

Applies func in parallel to items, using n_workers

inp,exp = range(50),range(1,51)

test_eq(parallel(_add_one, inp, n_workers=2), exp)
test_eq(parallel(_add_one, inp, threadpool=True, n_workers=2), exp)
test_eq(parallel(_add_one, inp, n_workers=1, a=2), range(2,52))
test_eq(parallel(_add_one, inp, n_workers=0), exp)
test_eq(parallel(_add_one, inp, n_workers=0, a=2), range(2,52))

Use the pause parameter to ensure a pause of pause seconds between processes starting. This is in case there are race conditions in starting some process, or to stagger the time each process starts, for example when making many requests to a webserver. Set threadpool=True to use ThreadPoolExecutor instead of ProcessPoolExecutor.

from datetime import datetime
def print_time(i): 
    time.sleep(random.random()/1000)
    print(i, datetime.now())

parallel(print_time, range(5), n_workers=2, pause=0.1);
0 2026-03-09 17:02:02.475188
1 2026-03-09 17:02:02.575907
2 2026-03-09 17:02:02.677131
3 2026-03-09 17:02:02.778713
4 2026-03-09 17:02:02.880124

source

parallel_async


async def parallel_async(
    f, items, args:VAR_POSITIONAL, n_workers:int=16, pause:int=0, timeout:NoneType=None, chunksize:int=1,
    on_exc:builtin_function_or_method=print, cancel_on_error:bool=False, kwargs:VAR_KEYWORD
):

Applies f to items in parallel using asyncio and a semaphore to limit concurrency.

async def print_time_async(i): 
    start =datetime.now()
    wait = random.random()/30
    await asyncio.sleep(wait)
    print(i, start, datetime.now(), wait)

await parallel_async(print_time_async, range(6), n_workers=3);
1 2026-03-09 17:04:29.973713 2026-03-09 17:04:29.980911 0.006317305978820205
2 2026-03-09 17:04:29.973734 2026-03-09 17:04:29.985213 0.010986495373929654
0 2026-03-09 17:04:29.973659 2026-03-09 17:04:30.001349 0.02660853512134027
5 2026-03-09 17:04:30.001397 2026-03-09 17:04:30.009049 0.006747053859902045
3 2026-03-09 17:04:29.981235 2026-03-09 17:04:30.009845 0.028491947602613043
4 2026-03-09 17:04:29.985275 2026-03-09 17:04:30.012762 0.027138033640580195

Adding pause ensures a gap between starts:

await parallel_async(print_time_async, range(6), n_workers=3, pause=0.1);
0 2026-03-09 17:04:31.418755 2026-03-09 17:04:31.432981 0.013117748051221312
1 2026-03-09 17:04:31.519680 2026-03-09 17:04:31.536720 0.016248614341927096
2 2026-03-09 17:04:31.619834 2026-03-09 17:04:31.637578 0.01662938067120506
3 2026-03-09 17:04:31.719084 2026-03-09 17:04:31.747357 0.02714251612256462
4 2026-03-09 17:04:31.819673 2026-03-09 17:04:31.838811 0.017785103040610917
5 2026-03-09 17:04:31.919186 2026-03-09 17:04:31.940911 0.020535072775403555
# With the default `cancel_on_error=False`, all tasks run and exceptions are returned
async def maybe_fail(i:int):
    "Double i unless it's 3, in which case fail"
    await asyncio.sleep(random.random()/50)
    if i==3: raise ValueError(f"bad: {i}")
    return i*2

await parallel_async(maybe_fail, range(6), n_workers=3)
[0, 2, 4, ValueError('bad: 3'), 8, 10]

With cancel_on_error=True, parallel_async cancels remaining on first failure:

try: res = await parallel_async(maybe_fail, range(6), n_workers=3, cancel_on_error=True)
except ExceptionGroup as e:
    print(f"Exception: {e}")
    print(f"Inner exceptions: {e.exceptions}")
Exception: unhandled errors in a TaskGroup (1 sub-exception)
Inner exceptions: (ValueError('bad: 3'),)

source

bg_task


def bg_task(
    coro
):

Like asyncio.create_task but logs exceptions for fire-and-forget tasks

async def _ok(): return 42
async def _fail(): raise ValueError("this error will be printed")

t1 = bg_task(_ok())
t2 = bg_task(_fail())
await asyncio.sleep(0.01)
test_eq(t1.result(), 42)
Traceback (most recent call last):
  File "/var/folders/51/b2_szf2945n072c0vj2cyty40000gn/T/ipykernel_35266/1179310912.py", line 2, in _fail
    async def _fail(): raise ValueError("this error will be printed")
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: this error will be printed

source

run_procs


def run_procs(
    f, f_done, args
):

Call f for each item in args in parallel, yielding f_done


source

parallel_gen


def parallel_gen(
    cls, items, n_workers:int=4, progress:bool=True, kwargs:VAR_KEYWORD
):

Instantiate cls in n_workers procs & call each on a subset of items in parallel.

cls is any class with __call__. It will be passed args and kwargs when initialized. Note that n_workers instances of cls are created, one in each process. items are then split in n_workers batches and one is sent to each cls. The function then returns a generator of tuples of item indices and results.

class TestSleepyBatchFunc:
    "For testing parallel processes that run at different speeds"
    def __init__(self): self.a=1
    def __call__(self, batch):
        for k in batch:
            time.sleep(random.random()/10)
            yield k+self.a

x = np.linspace(0,0.99,20)

res = L(parallel_gen(TestSleepyBatchFunc, x, n_workers=2, progress=False))
test_eq(res.sorted().itemgot(1), x+1)