Parallel

Threading and multiprocessing functions
from fastcore.test import *
from nbdev.showdoc import *
from fastcore.nb_imports import *

source

threaded

 threaded (process=False)

Run f in a Thread (or Process if process=True), and returns it

@threaded
def _1():
    time.sleep(0.05)
    print("second")
    return 5

@threaded
def _2():
    time.sleep(0.01)
    print("first")

a = _1()
_2()
time.sleep(0.1)
first
second

After the thread is complete, the return value is stored in the result attr.

a.result
5

source

startthread

 startthread (f)

Like threaded, but start thread immediately

@startthread
def _():
    time.sleep(0.05)
    print("second")

@startthread
def _():
    time.sleep(0.01)
    print("first")

time.sleep(0.1)
first
second

source

startproc

 startproc (f)

Like threaded(True), but start Process immediately

@startproc
def _():
    time.sleep(0.05)
    print("second")

@startproc
def _():
    time.sleep(0.01)
    print("first")

time.sleep(0.1)
first
second

source

parallelable

 parallelable (param_name, num_workers, f=None)

source

ThreadPoolExecutor

 ThreadPoolExecutor (max_workers=4, on_exc=<built-in function print>,
                     pause=0, **kwargs)

Same as Python’s ThreadPoolExecutor, except can pass max_workers==0 for serial execution


source

ProcessPoolExecutor

 ProcessPoolExecutor (max_workers=4, on_exc=<built-in function print>,
                      pause=0, mp_context=None, initializer=None,
                      initargs=())

Same as Python’s ProcessPoolExecutor, except can pass max_workers==0 for serial execution


source

parallel

 parallel (f, items, *args, n_workers=4, total=None, progress=None,
           pause=0, method=None, threadpool=False, timeout=None,
           chunksize=1, **kwargs)

Applies func in parallel to items, using n_workers

inp,exp = range(50),range(1,51)

test_eq(parallel(_add_one, inp, n_workers=2), exp)
test_eq(parallel(_add_one, inp, threadpool=True, n_workers=2), exp)
test_eq(parallel(_add_one, inp, n_workers=1, a=2), range(2,52))
test_eq(parallel(_add_one, inp, n_workers=0), exp)
test_eq(parallel(_add_one, inp, n_workers=0, a=2), range(2,52))

Use the pause parameter to ensure a pause of pause seconds between processes starting. This is in case there are race conditions in starting some process, or to stagger the time each process starts, for example when making many requests to a webserver. Set threadpool=True to use ThreadPoolExecutor instead of ProcessPoolExecutor.

from datetime import datetime
def print_time(i): 
    time.sleep(random.random()/1000)
    print(i, datetime.now())

parallel(print_time, range(5), n_workers=2, pause=0.25);
0 2024-10-11 23:06:05.920741
1 2024-10-11 23:06:06.171470
2 2024-10-11 23:06:06.431925
3 2024-10-11 23:06:06.689940
4 2024-10-11 23:06:06.937109

source

parallel_async

 parallel_async (f, items, *args, n_workers=16, timeout=None, chunksize=1,
                 on_exc=<built-in function print>, **kwargs)

Applies f to items in parallel using asyncio and a semaphore to limit concurrency.

import asyncio
async def print_time_async(i): 
    wait = random.random()
    await asyncio.sleep(wait)
    print(i, datetime.now(), wait)

await parallel_async(print_time_async, range(6), n_workers=3);
0 2024-10-11 23:06:39.545583 0.10292732609738675
3 2024-10-11 23:06:39.900393 0.3516179734831676
4 2024-10-11 23:06:39.941094 0.03699593757956876
2 2024-10-11 23:06:39.957677 0.5148658606540902
1 2024-10-11 23:06:40.099716 0.6574035385815227
5 2024-10-11 23:06:40.654097 0.7116319667399102

source

run_procs

 run_procs (f, f_done, args)

Call f for each item in args in parallel, yielding f_done


source

parallel_gen

 parallel_gen (cls, items, n_workers=4, **kwargs)

Instantiate cls in n_workers procs & call each on a subset of items in parallel.

# class _C:
#     def __call__(self, o): return ((i+1) for i in o)

# items = range(5)

# res = L(parallel_gen(_C, items, n_workers=0))
# idxs,dat1 = zip(*res.sorted(itemgetter(0)))
# test_eq(dat1, range(1,6))

# res = L(parallel_gen(_C, items, n_workers=3))
# idxs,dat2 = zip(*res.sorted(itemgetter(0)))
# test_eq(dat2, dat1)

cls is any class with __call__. It will be passed args and kwargs when initialized. Note that n_workers instances of cls are created, one in each process. items are then split in n_workers batches and one is sent to each cls. The function then returns a generator of tuples of item indices and results.

class TestSleepyBatchFunc:
    "For testing parallel processes that run at different speeds"
    def __init__(self): self.a=1
    def __call__(self, batch):
        for k in batch:
            time.sleep(random.random()/4)
            yield k+self.a

x = np.linspace(0,0.99,20)

res = L(parallel_gen(TestSleepyBatchFunc, x, n_workers=2))
test_eq(res.sorted().itemgot(1), x+1)
# #|hide
# from subprocess import Popen, PIPE
# # test num_workers > 0 in scripts works when python process start method is spawn
# process = Popen(["python", "parallel_test.py"], stdout=PIPE)
# _, err = process.communicate(timeout=10)
# exit_code = process.wait()
# test_eq(exit_code, 0)