Utility functions

Utility functions used in the fastai library

File Functions

Utilities (other than extensions to Pathlib.Path) for dealing with IO.


source

walk

 walk (path:Union[pathlib.Path,str], symlinks:bool=True, keep_file:<built-
       infunctioncallable>=<function ret_true>, keep_folder:<built-
       infunctioncallable>=<function ret_true>, skip_folder:<built-
       infunctioncallable>=<function ret_false>, func:<built-
       infunctioncallable>=<function join>, ret_folders:bool=False)

Generator version of os.walk, using functions to filter files and folders

Type Default Details
path Path | str path to start searching
symlinks bool True follow symlinks?
keep_file callable ret_true function that returns True for wanted files
keep_folder callable ret_true function that returns True for folders to enter
skip_folder callable ret_false function that returns True for folders to skip
func callable join function to apply to each matched file
ret_folders bool False return folders, not just files

source

globtastic

 globtastic (path:Union[pathlib.Path,str], recursive:bool=True,
             symlinks:bool=True, file_glob:str=None, file_re:str=None,
             folder_re:str=None, skip_file_glob:str=None,
             skip_file_re:str=None, skip_folder_re:str=None, func:<built-
             infunctioncallable>=<function join>, ret_folders:bool=False)

A more powerful glob, including regex matches, symlink handling, and skip parameters

Type Default Details
path Path | str path to start searching
recursive bool True search subfolders
symlinks bool True follow symlinks?
file_glob str None Only include files matching glob
file_re str None Only include files matching regex
folder_re str None Only enter folders matching regex
skip_file_glob str None Skip files matching glob
skip_file_re str None Skip files matching regex
skip_folder_re str None Skip folders matching regex,
func callable join function to apply to each matched file
ret_folders bool False return folders, not just files
Returns L Paths to matched files
globtastic('.', skip_folder_re='^[_.]', folder_re='core', file_glob='*.*py*', file_re='c')
(#5) ['./fastcore/docments.py','./fastcore/dispatch.py','./fastcore/basics.py','./fastcore/docscrape.py','./fastcore/script.py']

source

maybe_open

 maybe_open (f, mode='r', **kwargs)

Context manager: open f if it is a path (and close on exit)

This is useful for functions where you want to accept a path or file. maybe_open will not close your file handle if you pass one in.

def _f(fn):
    with maybe_open(fn) as f: return f.encoding

fname = '00_test.ipynb'
sys_encoding = 'cp1252' if sys.platform == 'win32' else 'UTF-8'
test_eq(_f(fname), sys_encoding)
with open(fname) as fh: test_eq(_f(fh), sys_encoding)

For example, we can use this to reimplement imghdr.what from the Python standard library, which is written in Python 3.9 as:

import imghdr
def what(file, h=None):
    f = None
    try:
        if h is None:
            if isinstance(file, (str,os.PathLike)):
                f = open(file, 'rb')
                h = f.read(32)
            else:
                location = file.tell()
                h = file.read(32)
                file.seek(location)
        for tf in imghdr.tests:
            res = tf(h, f)
            if res: return res
    finally:
        if f: f.close()
    return None

Here’s an example of the use of this function:

fname = 'images/puppy.jpg'
what(fname)
'jpeg'

With maybe_open, Self, and L.map_first, we can rewrite this in a much more concise and (in our opinion) clear way:

def what(file, h=None):
    if h is None:
        with maybe_open(file, 'rb') as f: h = f.peek(32)
    return L(imghdr.tests).map_first(Self(h,file))

…and we can check that it still works:

test_eq(what(fname), 'jpeg')

…along with the version passing a file handle:

with open(fname,'rb') as f: test_eq(what(f), 'jpeg')

…along with the h parameter version:

with open(fname,'rb') as f: test_eq(what(None, h=f.read(32)), 'jpeg')

source

mkdir

 mkdir (path, exist_ok=False, parents=False, overwrite=False, **kwargs)

Creates and returns a directory defined by path, optionally removing previous existing directory if overwrite is True

with tempfile.TemporaryDirectory() as d:
    path = Path(os.path.join(d, 'new_dir'))
    new_dir = mkdir(path)
    assert new_dir.exists()
    test_eq(new_dir, path)
        
    # test overwrite
    with open(new_dir/'test.txt', 'w') as f: f.writelines('test')
    test_eq(len(list(walk(new_dir))), 1) # assert file is present
    new_dir = mkdir(new_dir, overwrite=True)
    test_eq(len(list(walk(new_dir))), 0) # assert file was deleted

source

image_size

 image_size (fn)

Tuple of (w,h) for png, gif, or jpg; None otherwise

test_eq(image_size(fname), (1200,803))

source

bunzip

 bunzip (fn)

bunzip fn, raising exception if output already exists

f = Path('files/test.txt')
if f.exists(): f.unlink()
bunzip('files/test.txt.bz2')
t = f.open().readlines()
test_eq(len(t),1)
test_eq(t[0], 'test\n')
f.unlink()

source

loads

 loads (s, **kw)

Same as json.loads, but handles None


source

loads_multi

 loads_multi (s:str)

Generator of >=0 decoded json dicts, possibly with non-json ignored text at start and end

tst = """
# ignored
{ "a":1 }
hello
{
"b":2
}
"""

test_eq(list(loads_multi(tst)), [{'a': 1}, {'b': 2}])

source

dumps

 dumps (obj, **kw)

Same as json.dumps, but uses ujson if available


source

untar_dir

 untar_dir (fname, dest, rename=False, overwrite=False)

untar file into dest, creating a directory if the root contains more than one item

def test_untar(foldername, rename=False, **kwargs):
    with tempfile.TemporaryDirectory() as d:
        nm = os.path.join(d, 'a')
        shutil.make_archive(nm, 'gztar', **kwargs)
        with tempfile.TemporaryDirectory() as d2:
            d2 = Path(d2)
            untar_dir(nm+'.tar.gz', d2, rename=rename)
            test_eq(d2.ls(), [d2/foldername])

If the contents of fname contain just one file or directory, it is placed directly in dest:

# using `base_dir` in `make_archive` results in `images` directory included in file names
test_untar('images', base_dir='images')

If rename then the directory created is named based on the archive, without extension:

test_untar('a', base_dir='images', rename=True)

If the contents of fname contain multiple files and directories, a new folder in dest is created with the same name as fname (but without extension):

# using `root_dir` in `make_archive` results in `images` directory *not* included in file names
test_untar('a', root_dir='images')

source

repo_details

 repo_details (url)

Tuple of owner,name from ssh or https git repo url

test_eq(repo_details('https://github.com/fastai/fastai.git'), ['fastai', 'fastai'])
test_eq(repo_details('[email protected]:fastai/nbdev.git\n'), ['fastai', 'nbdev'])

source

run

 run (cmd, *rest, same_in_win=False, ignore_ex=False, as_bytes=False,
      stderr=False)

Pass cmd (splitting with shlex if string) to subprocess.run; return stdout; raise IOError if fails

You can pass a string (which will be split based on standard shell rules), a list, or pass args directly:

run('echo', same_in_win=True)
run('pip', '--version', same_in_win=True)
run(['pip', '--version'], same_in_win=True)
'pip 22.1.2 from /home/lgvaz/miniconda3/envs/fastai/lib/python3.7/site-packages/pip (python 3.7)'
if sys.platform == 'win32':
    assert 'ipynb' in run('cmd /c dir /p')
    assert 'ipynb' in run(['cmd', '/c', 'dir', '/p'])
    assert 'ipynb' in run('cmd', '/c', 'dir',  '/p')
else:
    assert 'ipynb' in run('ls -ls')
    assert 'ipynb' in run(['ls', '-l'])
    assert 'ipynb' in run('ls', '-l')

Some commands fail in non-error situations, like grep. Use ignore_ex in those cases, which will return a tuple of stdout and returncode:

if sys.platform == 'win32':
    test_eq(run('cmd /c findstr asdfds 00_test.ipynb', ignore_ex=True)[0], 1)
else:
    test_eq(run('grep asdfds 00_test.ipynb', ignore_ex=True)[0], 1)

run automatically decodes returned bytes to a str. Use as_bytes to skip that:

if sys.platform == 'win32':
    test_eq(run('cmd /c echo hi'), 'hi')
else:
    test_eq(run('echo hi', as_bytes=True), b'hi\n')

source

open_file

 open_file (fn, mode='r', **kwargs)

Open a file, with optional compression if gz or bz2 suffix


source

save_pickle

 save_pickle (fn, o)

Save a pickle file, to a file name or opened file


source

load_pickle

 load_pickle (fn)

Load a pickle file from a file name or opened file

for suf in '.pkl','.bz2','.gz':
    # delete=False is added for Windows
    # https://stackoverflow.com/questions/23212435/permission-denied-to-write-to-my-temporary-file
    with tempfile.NamedTemporaryFile(suffix=suf, delete=False) as f:
        fn = Path(f.name)
        save_pickle(fn, 't')
        t = load_pickle(fn)
    f.close()
    test_eq(t,'t')

Collections


source

dict2obj

 dict2obj (d, list_func=<class 'fastcore.foundation.L'>, dict_func=<class
           'fastcore.basics.AttrDict'>)

Convert (possibly nested) dicts (or lists of dicts) to AttrDict

This is a convenience to give you “dotted” access to (possibly nested) dictionaries, e.g:

d1 = dict(a=1, b=dict(c=2,d=3))
d2 = dict2obj(d1)
test_eq(d2.b.c, 2)
test_eq(d2.b['c'], 2)

It can also be used on lists of dicts.

_list_of_dicts = [d1, d1]
ds = dict2obj(_list_of_dicts)
test_eq(ds[0].b.c, 2)

source

obj2dict

 obj2dict (d)

Convert (possibly nested) AttrDicts (or lists of AttrDicts) to dict

obj2dict can be used to reverse what is done by dict2obj:

test_eq(obj2dict(d2), d1)
test_eq(obj2dict(ds), _list_of_dicts)

source

repr_dict

 repr_dict (d)

Print nested dicts and lists, such as returned by dict2obj

print(repr_dict(d2))
- a: 1
- b: 
  - c: 2
  - d: 3

source

is_listy

 is_listy (x)

isinstance(x, (tuple,list,L,slice,Generator))

assert is_listy((1,))
assert is_listy([1])
assert is_listy(L([1]))
assert is_listy(slice(2))
assert not is_listy(array([1]))

source

mapped

 mapped (f, it)

map f over it, unless it’s not listy, in which case return f(it)

def _f(x,a=1): return x-a

test_eq(mapped(_f,1),0)
test_eq(mapped(_f,[1,2]),[0,1])
test_eq(mapped(_f,(1,)),(0,))

Extensions to Pathlib.Path

The following methods are added to the standard python libary Pathlib.Path.


source

Path.readlines

 Path.readlines (hint=-1, encoding='utf8')

Read the content of self


source

Path.read_json

 Path.read_json (encoding=None, errors=None)

Same as read_text followed by loads


source

Path.mk_write

 Path.mk_write (data, encoding=None, errors=None, mode=511)

Make all parent dirs of self, and write data


source

Path.relpath

 Path.relpath (start=None)

Same as os.path.relpath, but returns a Path, and resolves symlinks

p = Path('../fastcore/').resolve()
p
Path('/home/lgvaz/git/fastcore/fastcore')
p.relpath(Path.cwd())
Path('../fastcore')

source

Path.ls

 Path.ls (n_max=None, file_type=None, file_exts=None)

Contents of path as a list

We add an ls() method to pathlib.Path which is simply defined as list(Path.iterdir()), mainly for convenience in REPL environments such as notebooks.

path = Path()
t = path.ls()
assert len(t)>0
t1 = path.ls(10)
test_eq(len(t1), 10)
t2 = path.ls(file_exts='.ipynb')
assert len(t)>len(t2)
t[0]
Path('05_transform.ipynb')

You can also pass an optional file_type MIME prefix and/or a list of file extensions.

lib_path = (path/'../fastcore')
txt_files=lib_path.ls(file_type='text')
assert len(txt_files) > 0 and txt_files[0].suffix=='.py'
ipy_files=path.ls(file_exts=['.ipynb'])
assert len(ipy_files) > 0 and ipy_files[0].suffix=='.ipynb'
txt_files[0],ipy_files[0]
(Path('../fastcore/all.py'), Path('05_transform.ipynb'))

fastai also updates the repr of Path such that, if Path.BASE_PATH is defined, all paths are printed relative to that path (as long as they are contained in Path.BASE_PATH:

t = ipy_files[0].absolute()
try:
    Path.BASE_PATH = t.parent.parent
    test_eq(repr(t), f"Path('nbs/{t.name}')")
finally: Path.BASE_PATH = None

source

Path.delete

 Path.delete ()

Delete a file, symlink, or directory tree

Reindexing Collections


source

ReindexCollection

 ReindexCollection (coll, idxs=None, cache=None, tfm=<function noop>)

Reindexes collection coll with indices idxs and optional LRU cache of size cache

This is useful when constructing batches or organizing data in a particular manner (i.e. for deep learning). This class is primarly used in organizing data for language models in fastai.

You can supply a custom index upon instantiation with the idxs argument, or you can call the reindex method to supply a new index for your collection.

Here is how you can reindex a list such that the elements are reversed:

rc=ReindexCollection(['a', 'b', 'c', 'd', 'e'], idxs=[4,3,2,1,0])
list(rc)
['e', 'd', 'c', 'b', 'a']

Alternatively, you can use the reindex method:


source

ReindexCollection.reindex
 ReindexCollection.reindex (idxs)

Replace self.idxs with idxs

rc=ReindexCollection(['a', 'b', 'c', 'd', 'e'])
rc.reindex([4,3,2,1,0])
list(rc)
['e', 'd', 'c', 'b', 'a']

You can optionally specify a LRU cache, which uses functools.lru_cache upon instantiation:

sz = 50
t = ReindexCollection(L.range(sz), cache=2)

#trigger a cache hit by indexing into the same element multiple times
t[0], t[0]
t._get.cache_info()
CacheInfo(hits=1, misses=1, maxsize=2, currsize=1)

You can optionally clear the LRU cache by calling the cache_clear method:


source

ReindexCollection.cache_clear
 ReindexCollection.cache_clear ()

Clear LRU cache

sz = 50
t = ReindexCollection(L.range(sz), cache=2)

#trigger a cache hit by indexing into the same element multiple times
t[0], t[0]
t.cache_clear()
t._get.cache_info()
CacheInfo(hits=0, misses=0, maxsize=2, currsize=0)

source

ReindexCollection.shuffle
 ReindexCollection.shuffle ()

Randomly shuffle indices

Note that an ordered index is automatically constructed for the data structure even if one is not supplied.

rc=ReindexCollection(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])
rc.shuffle()
list(rc)
['d', 'f', 'g', 'a', 'h', 'e', 'b', 'c']
sz = 50
t = ReindexCollection(L.range(sz), cache=2)
test_eq(list(t), range(sz))
test_eq(t[sz-1], sz-1)
test_eq(t._get.cache_info().hits, 1)
t.shuffle()
test_eq(t._get.cache_info().hits, 1)
test_ne(list(t), range(sz))
test_eq(set(t), set(range(sz)))
t.cache_clear()
test_eq(t._get.cache_info().hits, 0)
test_eq(t.count(0), 1)

Other Helpers


source

truncstr

 truncstr (s:str, maxlen:int, suf:str='…', space='')

Truncate s to length maxlen, adding suffix suf if truncated

w = 'abacadabra'
test_eq(truncstr(w, 10), w)
test_eq(truncstr(w, 5), 'abac…')
test_eq(truncstr(w, 5, suf=''), 'abaca')
test_eq(truncstr(w, 11, space='_'), w+"_")
test_eq(truncstr(w, 10, space='_'), w[:-1]+'…')
test_eq(truncstr(w, 5, suf='!!'), 'aba!!')

source

sparkline

 sparkline (data, mn=None, mx=None, empty_zero=False)

Sparkline for data, with Nones (and zero, if empty_zero) shown as empty column

data = [9,6,None,1,4,0,8,15,10]
print(f'without "empty_zero": {sparkline(data, empty_zero=False)}')
print(f'   with "empty_zero": {sparkline(data, empty_zero=True )}')
without "empty_zero": ▅▂ ▁▂▁▃▇▅
   with "empty_zero": ▅▂ ▁▂ ▃▇▅

You can set a maximum and minimum for the y-axis of the sparkline with the arguments mn and mx respectively:

sparkline([1,2,3,400], mn=0, mx=3)
'▂▅▇▇'

source

modify_exception

 modify_exception (e:Exception, msg:str=None, replace:bool=False)

Modifies e with a custom message attached

Type Default Details
e Exception An exception
msg str None A custom message
replace bool False Whether to replace e.args with [msg]
Returns Exception
msg = "This is my custom message!"

test_fail(lambda: (_ for _ in ()).throw(modify_exception(Exception(), None)), contains='')
test_fail(lambda: (_ for _ in ()).throw(modify_exception(Exception(), msg)), contains=msg)
test_fail(lambda: (_ for _ in ()).throw(modify_exception(Exception("The first message"), msg)), contains="The first message This is my custom message!")
test_fail(lambda: (_ for _ in ()).throw(modify_exception(Exception("The first message"), msg, True)), contains="This is my custom message!")

source

round_multiple

 round_multiple (x, mult, round_down=False)

Round x to nearest multiple of mult

test_eq(round_multiple(63,32), 64)
test_eq(round_multiple(50,32), 64)
test_eq(round_multiple(40,32), 32)
test_eq(round_multiple( 0,32),  0)
test_eq(round_multiple(63,32, round_down=True), 32)
test_eq(round_multiple((63,40),32), (64,32))

source

set_num_threads

 set_num_threads (nt)

Get numpy (and others) to use nt threads

This sets the number of threads consistently for many tools, by:

  1. Set the following environment variables equal to nt: OPENBLAS_NUM_THREADS,NUMEXPR_NUM_THREADS,OMP_NUM_THREADS,MKL_NUM_THREADS
  2. Sets nt threads for numpy and pytorch.

source

join_path_file

 join_path_file (file, path, ext='')

Return path/file if file is a string or a Path, file otherwise

path = Path.cwd()/'_tmp'/'tst'
f = join_path_file('tst.txt', path)
assert path.exists()
test_eq(f, path/'tst.txt')
with open(f, 'w') as f_: assert join_path_file(f_, path) == f_
shutil.rmtree(Path.cwd()/'_tmp')

Other Helpers


source

autostart

 autostart (g)

Decorator that automatically starts a generator


source

EventTimer

 EventTimer (store=5, span=60)

An event timer with history of store items of time span

Add events with add, and get number of events and their frequency (freq).

# Random wait function for testing
def _randwait(): yield from (sleep(random.random()/200) for _ in range(100))

c = EventTimer(store=5, span=0.03)
for o in _randwait(): c.add(1)
print(f'Num Events: {c.events}, Freq/sec: {c.freq:.01f}')
print('Most recent: ', sparkline(c.hist), *L(c.hist).map('{:.01f}'))
Num Events: 3, Freq/sec: 242.1
Most recent:  ▇▁▂▃▁ 358.8 267.5 296.5 315.5 266.4

source

stringfmt_names

 stringfmt_names (s:str)

Unique brace-delimited names in s

s = '/pulls/{pull_number}/reviews/{review_id}'
test_eq(stringfmt_names(s), ['pull_number','review_id'])

source

PartialFormatter

 PartialFormatter ()

A string.Formatter that doesn’t error on missing fields, and tracks missing fields and unused args


source

partial_format

 partial_format (s:str, **kwargs)

string format s, ignoring missing field errors, returning missing and extra fields

The result is a tuple of (formatted_string,missing_fields,extra_fields), e.g:

res,missing,xtra = partial_format(s, pull_number=1, foo=2)
test_eq(res, '/pulls/1/reviews/{review_id}')
test_eq(missing, ['review_id'])
test_eq(xtra, {'foo':2})

source

utc2local

 utc2local (dt:datetime.datetime)

Convert dt from UTC to local time

dt = datetime(2000,1,1,12)
print(f'{dt} UTC is {utc2local(dt)} local time')
2000-01-01 12:00:00 UTC is 2000-01-01 17:30:00+05:30 local time

source

local2utc

 local2utc (dt:datetime.datetime)

Convert dt from local to UTC time

print(f'{dt} local is {local2utc(dt)} UTC time')
2000-01-01 12:00:00 local is 2000-01-01 06:30:00+00:00 UTC time

source

trace

 trace (f)

Add set_trace to an existing function f

You can add a breakpoint to an existing function, e.g:

Path.cwd = trace(Path.cwd)
Path.cwd()

Now, when the function is called it will drop you into the debugger. Note, you must issue the s command when you begin to step into the function that is being traced.


source

modified_env

 modified_env (*delete, **replace)

Context manager temporarily modifying os.environ by deleting delete and replacing replace

# USER isn't in Cloud Linux Environments
env_test = 'USERNAME' if sys.platform == "win32" else 'SHELL'
oldusr = os.environ[env_test]

replace_param = {env_test: 'a'}
with modified_env('PATH', **replace_param):
    test_eq(os.environ[env_test], 'a')
    assert 'PATH' not in os.environ

assert 'PATH' in os.environ
test_eq(os.environ[env_test], oldusr)

source

ContextManagers

 ContextManagers (mgrs)

Wrapper for contextlib.ExitStack which enters a collection of context managers


source

shufflish

 shufflish (x, pct=0.04)

Randomly relocate items of x up to pct of len(x) from their starting location


source

console_help

 console_help (libname:str)

Show help for all console scripts from libname

Type Details
libname str name of library for console script listing