from fastcore.test import *
from nbdev.showdoc import *
from fastcore.nb_imports import *
Parallel
threaded
threaded (process=False)
Run f
in a Thread
(or Process
if process=True
), and returns it
@threaded
def _1():
0.05)
time.sleep(print("second")
return 5
@threaded
def _2():
0.01)
time.sleep(print("first")
= _1()
a
_2()0.1) time.sleep(
first
second
After the thread is complete, the return value is stored in the result
attr.
a.result
5
startthread
startthread (f)
Like threaded
, but start thread immediately
@startthread
def _():
0.05)
time.sleep(print("second")
@startthread
def _():
0.01)
time.sleep(print("first")
0.1) time.sleep(
first
second
startproc
startproc (f)
Like threaded(True)
, but start Process immediately
@startproc
def _():
0.05)
time.sleep(print("second")
@startproc
def _():
0.01)
time.sleep(print("first")
0.1) time.sleep(
first
second
parallelable
parallelable (param_name, num_workers, f=None)
ThreadPoolExecutor
ThreadPoolExecutor (max_workers=4, on_exc=<built-in function print>, pause=0, **kwargs)
Same as Python’s ThreadPoolExecutor, except can pass max_workers==0
for serial execution
ProcessPoolExecutor
ProcessPoolExecutor (max_workers=4, on_exc=<built-in function print>, pause=0, mp_context=None, initializer=None, initargs=())
Same as Python’s ProcessPoolExecutor, except can pass max_workers==0
for serial execution
parallel
parallel (f, items, *args, n_workers=4, total=None, progress=None, pause=0, method=None, threadpool=False, timeout=None, chunksize=1, **kwargs)
Applies func
in parallel to items
, using n_workers
= range(50),range(1,51)
inp,exp
=2), exp)
test_eq(parallel(_add_one, inp, n_workers=True, n_workers=2), exp)
test_eq(parallel(_add_one, inp, threadpool=1, a=2), range(2,52))
test_eq(parallel(_add_one, inp, n_workers=0), exp)
test_eq(parallel(_add_one, inp, n_workers=0, a=2), range(2,52)) test_eq(parallel(_add_one, inp, n_workers
Use the pause
parameter to ensure a pause of pause
seconds between processes starting. This is in case there are race conditions in starting some process, or to stagger the time each process starts, for example when making many requests to a webserver. Set threadpool=True
to use ThreadPoolExecutor
instead of ProcessPoolExecutor
.
from datetime import datetime
def print_time(i):
/1000)
time.sleep(random.random()print(i, datetime.now())
range(5), n_workers=2, pause=0.25); parallel(print_time,
0 2024-10-11 23:06:05.920741
1 2024-10-11 23:06:06.171470
2 2024-10-11 23:06:06.431925
3 2024-10-11 23:06:06.689940
4 2024-10-11 23:06:06.937109
parallel_async
parallel_async (f, items, *args, n_workers=16, timeout=None, chunksize=1, on_exc=<built-in function print>, **kwargs)
Applies f
to items
in parallel using asyncio and a semaphore to limit concurrency.
import asyncio
async def print_time_async(i):
= random.random()
wait await asyncio.sleep(wait)
print(i, datetime.now(), wait)
await parallel_async(print_time_async, range(6), n_workers=3);
0 2024-10-11 23:06:39.545583 0.10292732609738675
3 2024-10-11 23:06:39.900393 0.3516179734831676
4 2024-10-11 23:06:39.941094 0.03699593757956876
2 2024-10-11 23:06:39.957677 0.5148658606540902
1 2024-10-11 23:06:40.099716 0.6574035385815227
5 2024-10-11 23:06:40.654097 0.7116319667399102
run_procs
run_procs (f, f_done, args)
Call f
for each item in args
in parallel, yielding f_done
parallel_gen
parallel_gen (cls, items, n_workers=4, **kwargs)
Instantiate cls
in n_workers
procs & call each on a subset of items
in parallel.
# class _C:
# def __call__(self, o): return ((i+1) for i in o)
# items = range(5)
# res = L(parallel_gen(_C, items, n_workers=0))
# idxs,dat1 = zip(*res.sorted(itemgetter(0)))
# test_eq(dat1, range(1,6))
# res = L(parallel_gen(_C, items, n_workers=3))
# idxs,dat2 = zip(*res.sorted(itemgetter(0)))
# test_eq(dat2, dat1)
cls
is any class with __call__
. It will be passed args
and kwargs
when initialized. Note that n_workers
instances of cls
are created, one in each process. items
are then split in n_workers
batches and one is sent to each cls
. The function then returns a generator of tuples of item indices and results.
class TestSleepyBatchFunc:
"For testing parallel processes that run at different speeds"
def __init__(self): self.a=1
def __call__(self, batch):
for k in batch:
/4)
time.sleep(random.random()yield k+self.a
= np.linspace(0,0.99,20)
x
= L(parallel_gen(TestSleepyBatchFunc, x, n_workers=2))
res sorted().itemgot(1), x+1) test_eq(res.
# #|hide
# from subprocess import Popen, PIPE
# # test num_workers > 0 in scripts works when python process start method is spawn
# process = Popen(["python", "parallel_test.py"], stdout=PIPE)
# _, err = process.communicate(timeout=10)
# exit_code = process.wait()
# test_eq(exit_code, 0)