import asyncio
from fastcore.test import *
from nbdev.showdoc import *
from fastcore.nb_imports import *Parallel
threaded
def threaded(
process:bool=False, # Create a Process instead of a Thread?
daemon:bool=False, # Use daemon mode?
):
Run f in a Thread (or Process if process=True), and returns it
@threaded
def _1():
time.sleep(0.05)
print("second")
return 5
@threaded
def _2():
time.sleep(0.01)
print("first")
a = _1()
_2()
time.sleep(0.1)first
second
After the thread is complete, the return value is stored in the result attr.
a.result5
Pass daemon=True to make the thread (or process) a daemon, so it won’t prevent the parent from exiting. Useful for background services like webservers, where you don’t want a still-running thread to block process shutdown.
@threaded(daemon=True)
def f(): time.sleep(0.01)
assert f().daemonstartthread
def startthread(
f:NoneType=None, daemon:bool=False
):
Like threaded, but start thread immediately
@startthread
def _():
time.sleep(0.05)
print("second")
@startthread
def _():
time.sleep(0.01)
print("first")
time.sleep(0.1)first
second
@startthread(daemon=True)
def f(): time.sleep(0.01)
assert f.daemonstartproc
def startproc(
f:NoneType=None, daemon:bool=False
):
Like threaded(True), but start Process immediately
@startproc
def _():
time.sleep(0.05)
print("second")
@startproc
def _():
time.sleep(0.01)
print("first")
time.sleep(0.1)first
second
parallelable
def parallelable(
param_name, num_workers, f:NoneType=None
):
Call self as a function.
ThreadPoolExecutor
def ThreadPoolExecutor(
max_workers:int=4, on_exc:builtin_function_or_method=print, pause:int=0, kwargs:VAR_KEYWORD
):
Same as Python’s ThreadPoolExecutor, except can pass max_workers==0 for serial execution
ProcessPoolExecutor
def ProcessPoolExecutor(
max_workers:int=4, on_exc:builtin_function_or_method=print, pause:int=0, mp_context:NoneType=None,
initializer:NoneType=None, initargs:tuple=(), max_tasks_per_child:NoneType=None
):
Same as Python’s ProcessPoolExecutor, except can pass max_workers==0 for serial execution
parallel
def parallel(
f, items, args:VAR_POSITIONAL, n_workers:int=4, total:NoneType=None, progress:NoneType=None, pause:int=0,
method:NoneType=None, threadpool:bool=False, timeout:NoneType=None, chunksize:int=1,
return_exceptions:bool=False, kwargs:VAR_KEYWORD
):
Applies func in parallel to items, using n_workers
inp,exp = range(50),range(1,51)
test_eq(parallel(_add_one, inp, n_workers=2), exp)
test_eq(parallel(_add_one, inp, threadpool=True, n_workers=2), exp)
test_eq(parallel(_add_one, inp, n_workers=1, a=2), range(2,52))
test_eq(parallel(_add_one, inp, n_workers=0), exp)
test_eq(parallel(_add_one, inp, n_workers=0, a=2), range(2,52))Use the pause parameter to ensure a pause of pause seconds between processes starting. This is in case there are race conditions in starting some process, or to stagger the time each process starts, for example when making many requests to a webserver. Set threadpool=True to use ThreadPoolExecutor instead of ProcessPoolExecutor.
from datetime import datetimedef print_time(i):
time.sleep(random.random()/1000)
print(i, datetime.now())
parallel(print_time, range(5), n_workers=2, pause=0.1);0 2026-05-13 13:25:26.977693
1 2026-05-13 13:25:27.079169
2 2026-05-13 13:25:27.180193
3 2026-05-13 13:25:27.281662
4 2026-05-13 13:25:27.382139
You can also pass return_exceptions=True to catch any exceptions from parallel workers and return them instead:
def die_sometimes(x):
if 3<x<6: raise Exception(f"exc: {x}")
return x*2
parallel(die_sometimes, range(8), return_exceptions=True)[0, 2, 4, 6, Exception('exc: 4'), Exception('exc: 5'), 12, 14]
parallel_async
async def parallel_async(
f, items, args:VAR_POSITIONAL, n_workers:int=16, pause:int=0, timeout:NoneType=None, chunksize:int=1,
on_exc:builtin_function_or_method=print, cancel_on_error:bool=False, return_exceptions:bool=False,
kwargs:VAR_KEYWORD
):
Applies f to items in parallel using asyncio and a semaphore to limit concurrency.
async def print_time_async(i):
start =datetime.now()
wait = random.random()/30
await asyncio.sleep(wait)
print(i, start, datetime.now(), wait)
if i==5: raise Exception(f"exc {i}")
return i
res = await parallel_async(print_time_async, range(6), n_workers=3, return_exceptions=True)
test_eq(res[:5], [0, 1, 2, 3, 4])
test_eq(type(res[5]), Exception)2 2026-05-13 13:25:27.564974 2026-05-13 13:25:27.569803 0.004192140760309271
1 2026-05-13 13:25:27.564969 2026-05-13 13:25:27.569908 0.004733583519831327
4 2026-05-13 13:25:27.570059 2026-05-13 13:25:27.575066 0.004417952090144302
3 2026-05-13 13:25:27.570047 2026-05-13 13:25:27.577781 0.007399881148787446
0 2026-05-13 13:25:27.564949 2026-05-13 13:25:27.580303 0.015048152922976935
5 2026-05-13 13:25:27.575107 2026-05-13 13:25:27.580550 0.005396135128260119
Adding pause ensures a gap between starts:
await parallel_async(print_time_async, range(6), n_workers=3, pause=0.1, return_exceptions=True);0 2026-05-13 13:25:27.603860 2026-05-13 13:25:27.617834 0.012897349937030123
1 2026-05-13 13:25:27.704777 2026-05-13 13:25:27.735915 0.02999927582214811
2 2026-05-13 13:25:27.804897 2026-05-13 13:25:27.830147 0.024032506917510476
3 2026-05-13 13:25:27.904851 2026-05-13 13:25:27.911237 0.005580292386023334
4 2026-05-13 13:25:28.004162 2026-05-13 13:25:28.006516 0.001997318249937352
5 2026-05-13 13:25:28.104945 2026-05-13 13:25:28.117438 0.011197564731975381
With cancel_on_error=True, parallel_async cancels remaining on first failure:
async def maybe_fail(i:int):
"Double i unless it's 3, in which case fail"
await asyncio.sleep(random.random()/50)
if i==3: raise ValueError(f"bad: {i}")
return i*2try: res = await parallel_async(maybe_fail, range(6), n_workers=3, cancel_on_error=True)
except ExceptionGroup as e:
print(f"Exception: {e}")
print(f"Inner exceptions: {e.exceptions}")Exception: unhandled errors in a TaskGroup (1 sub-exception)
Inner exceptions: (ValueError('bad: 3'),)
With return_exceptions=False, an exception is raised on error:
with expect_fail(ValueError): await parallel_async(maybe_fail, range(6), n_workers=3)bg_task
def bg_task(
coro
):
Like asyncio.create_task but logs exceptions for fire-and-forget tasks
async def _ok(): return 42
async def _fail(): raise ValueError("this error will be printed")
t1 = bg_task(_ok())
t2 = bg_task(_fail())
await asyncio.sleep(0.01)
test_eq(t1.result(), 42)Traceback (most recent call last):
File "/var/folders/51/b2_szf2945n072c0vj2cyty40000gn/T/ipykernel_62209/1179310912.py", line 2, in _fail
async def _fail(): raise ValueError("this error will be printed")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: this error will be printed
run_procs
def run_procs(
f, f_done, args
):
Call f for each item in args in parallel, yielding f_done
parallel_gen
def parallel_gen(
cls, items, n_workers:int=4, progress:bool=False, kwargs:VAR_KEYWORD
):
Instantiate cls in n_workers procs & call each on a subset of items in parallel.
cls is any class with __call__. It will be passed args and kwargs when initialized. Note that n_workers instances of cls are created, one in each process. items are then split in n_workers batches and one is sent to each cls. The function then returns a generator of tuples of item indices and results.
class TestSleepyBatchFunc:
"For testing parallel processes that run at different speeds"
def __init__(self): self.a=1
def __call__(self, batch):
for k in batch:
time.sleep(random.random()/10)
yield k+self.a
x = np.linspace(0,0.99,20)
res = L(parallel_gen(TestSleepyBatchFunc, x, n_workers=2, progress=False))
test_eq(res.sorted().itemgot(1), x+1)