Threading and multiprocessing functions
from fastcore.test import *
from nbdev.showdoc import *
from fastcore.nb_imports import *
@threaded
def _1():
time.sleep(0.05)
print("second")
@threaded
def _2():
time.sleep(0.01)
print("first")
_1()
_2()
time.sleep(0.1)
@startthread
def _():
time.sleep(0.05)
print("second")
@startthread
def _():
time.sleep(0.01)
print("first")
time.sleep(0.1)
This sets the number of threads consistently for many tools, by:
- Set the following environment variables equal to
nt
:OPENBLAS_NUM_THREADS
,NUMEXPR_NUM_THREADS
,OMP_NUM_THREADS
,MKL_NUM_THREADS
- Sets
nt
threads for numpy and pytorch.
def add_one(x, a=1):
time.sleep(random.random()/80)
return x+a
inp,exp = range(50),range(1,51)
test_eq(parallel(add_one, inp, n_workers=2, progress=False), exp)
test_eq(parallel(add_one, inp, threadpool=True, n_workers=2, progress=False), exp)
test_eq(parallel(add_one, inp, n_workers=1, a=2), range(2,52))
test_eq(parallel(add_one, inp, n_workers=0), exp)
test_eq(parallel(add_one, inp, n_workers=0, a=2), range(2,52))
Use the pause
parameter to ensure a pause of pause
seconds between processes starting. This is in case there are race conditions in starting some process, or to stagger the time each process starts, for example when making many requests to a webserver. Set threadpool=True
to use ThreadPoolExecutor
instead of ProcessPoolExecutor
.
from datetime import datetime
def print_time(i):
time.sleep(random.random()/1000)
print(i, datetime.now())
parallel(print_time, range(5), n_workers=2, pause=0.25);
Note that f
should accept a collection of items.
class _C:
def __call__(self, o): return ((i+1) for i in o)
items = range(5)
res = L(parallel_gen(_C, items, n_workers=0))
idxs,dat1 = zip(*res.sorted(itemgetter(0)))
test_eq(dat1, range(1,6))
res = L(parallel_gen(_C, items, n_workers=3))
idxs,dat2 = zip(*res.sorted(itemgetter(0)))
test_eq(dat2, dat1)
cls
is any class with __call__
. It will be passed args
and kwargs
when initialized. Note that n_workers
instances of cls
are created, one in each process. items
are then split in n_workers
batches and one is sent to each cls
. The function then returns a generator of tuples of item indices and results.
class TestSleepyBatchFunc:
"For testing parallel processes that run at different speeds"
def __init__(self): self.a=1
def __call__(self, batch):
for k in batch:
time.sleep(random.random()/4)
yield k+self.a
x = np.linspace(0,0.99,20)
res = L(parallel_gen(TestSleepyBatchFunc, x, n_workers=2))
test_eq(res.sorted().itemgot(1), x+1)
from subprocess import Popen, PIPE
# test num_workers > 0 in scripts works when python process start method is spawn
process = Popen(["python", "parallel_test.py"], stdout=PIPE)
_, err = process.communicate(timeout=5)
exit_code = process.wait()
test_eq(exit_code, 0)