Viewing File: /home/ubuntu/.local/lib/python3.10/site-packages/fastcore/parallel.py
# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/03a_parallel.ipynb.
# %% auto 0
__all__ = ['threaded', 'startthread', 'startproc', 'parallelable', 'ThreadPoolExecutor', 'ProcessPoolExecutor', 'parallel',
'add_one', 'run_procs', 'parallel_gen']
# %% ../nbs/03a_parallel.ipynb 1
from .imports import *
from .basics import *
from .foundation import *
from .meta import *
from .xtras import *
from functools import wraps
import concurrent.futures,time
from multiprocessing import Process,Queue,Manager,set_start_method,get_all_start_methods,get_context
from threading import Thread
try:
if sys.platform == 'darwin' and IN_NOTEBOOK: set_start_method("fork")
except: pass
# %% ../nbs/03a_parallel.ipynb 4
def threaded(process=False):
"Run `f` in a `Thread` (or `Process` if `process=True`), and returns it"
def _r(f):
def g(_obj_td, *args, **kwargs):
res = f(*args, **kwargs)
_obj_td.result = res
@wraps(f)
def _f(*args, **kwargs):
res = (Thread,Process)[process](target=g, args=args, kwargs=kwargs)
res._args = (res,)+res._args
res.start()
return res
return _f
if callable(process):
o = process
process = False
return _r(o)
return _r
# %% ../nbs/03a_parallel.ipynb 8
def startthread(f):
"Like `threaded`, but start thread immediately"
return threaded(f)()
# %% ../nbs/03a_parallel.ipynb 10
def startproc(f):
"Like `threaded(True)`, but start Process immediately"
return threaded(True)(f)()
# %% ../nbs/03a_parallel.ipynb 12
def _call(lock, pause, n, g, item):
l = False
if pause:
try:
l = lock.acquire(timeout=pause*(n+2))
time.sleep(pause)
finally:
if l: lock.release()
return g(item)
# %% ../nbs/03a_parallel.ipynb 13
def parallelable(param_name, num_workers, f=None):
f_in_main = f == None or sys.modules[f.__module__].__name__ == "__main__"
if sys.platform == "win32" and IN_NOTEBOOK and num_workers > 0 and f_in_main:
print("Due to IPython and Windows limitation, python multiprocessing isn't available now.")
print(f"So `{param_name}` has to be changed to 0 to avoid getting stuck")
return False
return True
# %% ../nbs/03a_parallel.ipynb 14
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
"Same as Python's ThreadPoolExecutor, except can pass `max_workers==0` for serial execution"
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):
if max_workers is None: max_workers=defaults.cpus
store_attr()
self.not_parallel = max_workers==0
if self.not_parallel: max_workers=1
super().__init__(max_workers, **kwargs)
def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
if self.not_parallel == False: self.lock = Manager().Lock()
g = partial(f, *args, **kwargs)
if self.not_parallel: return map(g, items)
_g = partial(_call, self.lock, self.pause, self.max_workers, g)
try: return super().map(_g, items, timeout=timeout, chunksize=chunksize)
except Exception as e: self.on_exc(e)
# %% ../nbs/03a_parallel.ipynb 16
@delegates()
class ProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor):
"Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution"
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):
if max_workers is None: max_workers=defaults.cpus
store_attr()
self.not_parallel = max_workers==0
if self.not_parallel: max_workers=1
super().__init__(max_workers, **kwargs)
def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
if not parallelable('max_workers', self.max_workers, f): self.max_workers = 0
self.not_parallel = self.max_workers==0
if self.not_parallel: self.max_workers=1
if self.not_parallel == False: self.lock = Manager().Lock()
g = partial(f, *args, **kwargs)
if self.not_parallel: return map(g, items)
_g = partial(_call, self.lock, self.pause, self.max_workers, g)
try: return super().map(_g, items, timeout=timeout, chunksize=chunksize)
except Exception as e: self.on_exc(e)
# %% ../nbs/03a_parallel.ipynb 18
try: from fastprogress import progress_bar
except: progress_bar = None
# %% ../nbs/03a_parallel.ipynb 19
def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0,
method=None, threadpool=False, timeout=None, chunksize=1, **kwargs):
"Applies `func` in parallel to `items`, using `n_workers`"
kwpool = {}
if threadpool: pool = ThreadPoolExecutor
else:
if not method and sys.platform == 'darwin': method='fork'
if method: kwpool['mp_context'] = get_context(method)
pool = ProcessPoolExecutor
with pool(n_workers, pause=pause, **kwpool) as ex:
r = ex.map(f,items, *args, timeout=timeout, chunksize=chunksize, **kwargs)
if progress and progress_bar:
if total is None: total = len(items)
r = progress_bar(r, total=total, leave=False)
return L(r)
# %% ../nbs/03a_parallel.ipynb 20
def add_one(x, a=1):
# this import is necessary for multiprocessing in notebook on windows
import random
time.sleep(random.random()/80)
return x+a
# %% ../nbs/03a_parallel.ipynb 26
def run_procs(f, f_done, args):
"Call `f` for each item in `args` in parallel, yielding `f_done`"
processes = L(args).map(Process, args=arg0, target=f)
for o in processes: o.start()
yield from f_done()
processes.map(Self.join())
# %% ../nbs/03a_parallel.ipynb 27
def _f_pg(obj, queue, batch, start_idx):
for i,b in enumerate(obj(batch)): queue.put((start_idx+i,b))
def _done_pg(queue, items): return (queue.get() for _ in items)
# %% ../nbs/03a_parallel.ipynb 28
def parallel_gen(cls, items, n_workers=defaults.cpus, **kwargs):
"Instantiate `cls` in `n_workers` procs & call each on a subset of `items` in parallel."
if not parallelable('n_workers', n_workers): n_workers = 0
if n_workers==0:
yield from enumerate(list(cls(**kwargs)(items)))
return
batches = L(chunked(items, n_chunks=n_workers))
idx = L(itertools.accumulate(0 + batches.map(len)))
queue = Queue()
if progress_bar: items = progress_bar(items, leave=False)
f=partial(_f_pg, cls(**kwargs), queue)
done=partial(_done_pg, queue, items)
yield from run_procs(f, done, L(batches,idx).zip())
Back to Directory
File Manager