from fastcore.test import *
from nbdev.showdoc import *
from fastcore.nb_imports import *Parallel
threaded
def threaded(
process:bool=False
):
Run f in a Thread (or Process if process=True), and returns it
@threaded
def _1():
time.sleep(0.05)
print("second")
return 5
@threaded
def _2():
time.sleep(0.01)
print("first")
a = _1()
_2()
time.sleep(0.1)first
second
After the thread is complete, the return value is stored in the result attr.
a.result5
startthread
def startthread(
f
):
Like threaded, but start thread immediately
@startthread
def _():
time.sleep(0.05)
print("second")
@startthread
def _():
time.sleep(0.01)
print("first")
time.sleep(0.1)first
second
startproc
def startproc(
f
):
Like threaded(True), but start Process immediately
@startproc
def _():
time.sleep(0.05)
print("second")
@startproc
def _():
time.sleep(0.01)
print("first")
time.sleep(0.1)first
second
parallelable
def parallelable(
param_name, num_workers, f:NoneType=None
):
ThreadPoolExecutor
def ThreadPoolExecutor(
max_workers:int=4, on_exc:builtin_function_or_method=print, pause:int=0, kwargs:VAR_KEYWORD
):
Same as Python’s ThreadPoolExecutor, except can pass max_workers==0 for serial execution
ProcessPoolExecutor
def ProcessPoolExecutor(
max_workers:int=4, on_exc:builtin_function_or_method=print, pause:int=0, mp_context:NoneType=None,
initializer:NoneType=None, initargs:tuple=(), max_tasks_per_child:NoneType=None
):
Same as Python’s ProcessPoolExecutor, except can pass max_workers==0 for serial execution
parallel
def parallel(
f, items, args:VAR_POSITIONAL, n_workers:int=4, total:NoneType=None, progress:NoneType=None, pause:int=0,
method:NoneType=None, threadpool:bool=False, timeout:NoneType=None, chunksize:int=1, kwargs:VAR_KEYWORD
):
Applies func in parallel to items, using n_workers
inp,exp = range(50),range(1,51)
test_eq(parallel(_add_one, inp, n_workers=2), exp)
test_eq(parallel(_add_one, inp, threadpool=True, n_workers=2), exp)
test_eq(parallel(_add_one, inp, n_workers=1, a=2), range(2,52))
test_eq(parallel(_add_one, inp, n_workers=0), exp)
test_eq(parallel(_add_one, inp, n_workers=0, a=2), range(2,52))Use the pause parameter to ensure a pause of pause seconds between processes starting. This is in case there are race conditions in starting some process, or to stagger the time each process starts, for example when making many requests to a webserver. Set threadpool=True to use ThreadPoolExecutor instead of ProcessPoolExecutor.
from datetime import datetimedef print_time(i):
time.sleep(random.random()/1000)
print(i, datetime.now())
parallel(print_time, range(5), n_workers=2, pause=0.1);0 2026-03-09 17:02:02.475188
1 2026-03-09 17:02:02.575907
2 2026-03-09 17:02:02.677131
3 2026-03-09 17:02:02.778713
4 2026-03-09 17:02:02.880124
parallel_async
async def parallel_async(
f, items, args:VAR_POSITIONAL, n_workers:int=16, pause:int=0, timeout:NoneType=None, chunksize:int=1,
on_exc:builtin_function_or_method=print, cancel_on_error:bool=False, kwargs:VAR_KEYWORD
):
Applies f to items in parallel using asyncio and a semaphore to limit concurrency.
async def print_time_async(i):
start =datetime.now()
wait = random.random()/30
await asyncio.sleep(wait)
print(i, start, datetime.now(), wait)
await parallel_async(print_time_async, range(6), n_workers=3);1 2026-03-09 17:04:29.973713 2026-03-09 17:04:29.980911 0.006317305978820205
2 2026-03-09 17:04:29.973734 2026-03-09 17:04:29.985213 0.010986495373929654
0 2026-03-09 17:04:29.973659 2026-03-09 17:04:30.001349 0.02660853512134027
5 2026-03-09 17:04:30.001397 2026-03-09 17:04:30.009049 0.006747053859902045
3 2026-03-09 17:04:29.981235 2026-03-09 17:04:30.009845 0.028491947602613043
4 2026-03-09 17:04:29.985275 2026-03-09 17:04:30.012762 0.027138033640580195
Adding pause ensures a gap between starts:
await parallel_async(print_time_async, range(6), n_workers=3, pause=0.1);0 2026-03-09 17:04:31.418755 2026-03-09 17:04:31.432981 0.013117748051221312
1 2026-03-09 17:04:31.519680 2026-03-09 17:04:31.536720 0.016248614341927096
2 2026-03-09 17:04:31.619834 2026-03-09 17:04:31.637578 0.01662938067120506
3 2026-03-09 17:04:31.719084 2026-03-09 17:04:31.747357 0.02714251612256462
4 2026-03-09 17:04:31.819673 2026-03-09 17:04:31.838811 0.017785103040610917
5 2026-03-09 17:04:31.919186 2026-03-09 17:04:31.940911 0.020535072775403555
# With the default `cancel_on_error=False`, all tasks run and exceptions are returnedasync def maybe_fail(i:int):
"Double i unless it's 3, in which case fail"
await asyncio.sleep(random.random()/50)
if i==3: raise ValueError(f"bad: {i}")
return i*2
await parallel_async(maybe_fail, range(6), n_workers=3)[0, 2, 4, ValueError('bad: 3'), 8, 10]
With cancel_on_error=True, parallel_async cancels remaining on first failure:
try: res = await parallel_async(maybe_fail, range(6), n_workers=3, cancel_on_error=True)
except ExceptionGroup as e:
print(f"Exception: {e}")
print(f"Inner exceptions: {e.exceptions}")Exception: unhandled errors in a TaskGroup (1 sub-exception)
Inner exceptions: (ValueError('bad: 3'),)
bg_task
def bg_task(
coro
):
Like asyncio.create_task but logs exceptions for fire-and-forget tasks
async def _ok(): return 42
async def _fail(): raise ValueError("this error will be printed")
t1 = bg_task(_ok())
t2 = bg_task(_fail())
await asyncio.sleep(0.01)
test_eq(t1.result(), 42)Traceback (most recent call last):
File "/var/folders/51/b2_szf2945n072c0vj2cyty40000gn/T/ipykernel_35266/1179310912.py", line 2, in _fail
async def _fail(): raise ValueError("this error will be printed")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: this error will be printed
run_procs
def run_procs(
f, f_done, args
):
Call f for each item in args in parallel, yielding f_done
parallel_gen
def parallel_gen(
cls, items, n_workers:int=4, progress:bool=True, kwargs:VAR_KEYWORD
):
Instantiate cls in n_workers procs & call each on a subset of items in parallel.
cls is any class with __call__. It will be passed args and kwargs when initialized. Note that n_workers instances of cls are created, one in each process. items are then split in n_workers batches and one is sent to each cls. The function then returns a generator of tuples of item indices and results.
class TestSleepyBatchFunc:
"For testing parallel processes that run at different speeds"
def __init__(self): self.a=1
def __call__(self, batch):
for k in batch:
time.sleep(random.random()/10)
yield k+self.a
x = np.linspace(0,0.99,20)
res = L(parallel_gen(TestSleepyBatchFunc, x, n_workers=2, progress=False))
test_eq(res.sorted().itemgot(1), x+1)