Utility functions used in the fastai library
from fastcore.test import *
from nbdev.showdoc import *
from fastcore.nb_imports import *

Collections

dict2obj[source]

dict2obj(d)

Convert (possibly nested) dicts (or lists of dicts) to AttrDict

This is a convenience to give you "dotted" access to (possibly nested) dictionaries, e.g:

d1 = dict(a=1, b=dict(c=2,d=3))
d2 = dict2obj(d1)
test_eq(d2.b.c, 2)
test_eq(d2.b['c'], 2)

It can also be used on lists of dicts.

ds = dict2obj(L(d1, d1))
test_eq(ds[0].b.c, 2)

repr_dict[source]

repr_dict(d)

Print nested dicts and lists, such as returned by dict2obj

print(repr_dict(d2))
- a: 1
- b: 
  - c: 2
  - d: 3

repr_dict is used to display AttrDict both with repr and in Jupyter Notebooks:

AttrDict.__repr__[source]

AttrDict.__repr__()

Return repr(self).

print(repr(d2))
- a: 1
- b: 
  - c: 2
  - d: 3
d2
  • a: 1
  • b:
    • c: 2
    • d: 3

is_listy[source]

is_listy(x)

isinstance(x, (tuple,list,L,slice,Generator))

assert is_listy((1,))
assert is_listy([1])
assert is_listy(L([1]))
assert is_listy(slice(2))
assert not is_listy(array([1]))

shufflish[source]

shufflish(x, pct=0.04)

Randomly relocate items of x up to pct of len(x) from their starting location

mapped[source]

mapped(f, it)

map f over it, unless it's not listy, in which case return f(it)

def _f(x,a=1): return x-a

test_eq(mapped(_f,1),0)
test_eq(mapped(_f,[1,2]),[0,1])
test_eq(mapped(_f,(1,)),(0,))

Reindexing Collections

class ReindexCollection[source]

ReindexCollection(coll, idxs=None, cache=None, tfm=noop) :: GetAttr

Reindexes collection coll with indices idxs and optional LRU cache of size cache

This is useful when constructing batches or organizing data in a particular manner (i.e. for deep learning). This class is primarly used in organizing data for language models in fastai.

You can supply a custom index upon instantiation with the idxs argument, or you can call the reindex method to supply a new index for your collection.

Here is how you can reindex a list such that the elements are reversed:

rc=ReindexCollection(['a', 'b', 'c', 'd', 'e'], idxs=[4,3,2,1,0])
list(rc)
['e', 'd', 'c', 'b', 'a']

Alternatively, you can use the reindex method:

ReindexCollection.reindex[source]

ReindexCollection.reindex(idxs)

Replace self.idxs with idxs

rc=ReindexCollection(['a', 'b', 'c', 'd', 'e'])
rc.reindex([4,3,2,1,0])
list(rc)
['e', 'd', 'c', 'b', 'a']

You can optionally specify a LRU cache, which uses functools.lru_cache upon instantiation:

sz = 50
t = ReindexCollection(L.range(sz), cache=2)

#trigger a cache hit by indexing into the same element multiple times
t[0], t[0]
t._get.cache_info()
CacheInfo(hits=1, misses=1, maxsize=2, currsize=1)

You can optionally clear the LRU cache by calling the cache_clear method:

ReindexCollection.cache_clear[source]

ReindexCollection.cache_clear()

Clear LRU cache

sz = 50
t = ReindexCollection(L.range(sz), cache=2)

#trigger a cache hit by indexing into the same element multiple times
t[0], t[0]
t.cache_clear()
t._get.cache_info()
CacheInfo(hits=0, misses=0, maxsize=2, currsize=0)
ReindexCollection.shuffle[source]

ReindexCollection.shuffle()

Randomly shuffle indices

Note that an ordered index is automatically constructed for the data structure even if one is not supplied.

rc=ReindexCollection(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])
rc.shuffle()
list(rc)
['f', 'e', 'a', 'd', 'h', 'g', 'b', 'c']
sz = 50
t = ReindexCollection(L.range(sz), cache=2)
test_eq(list(t), range(sz))
test_eq(t[sz-1], sz-1)
test_eq(t._get.cache_info().hits, 1)
t.shuffle()
test_eq(t._get.cache_info().hits, 1)
test_ne(list(t), range(sz))
test_eq(set(t), set(range(sz)))
t.cache_clear()
test_eq(t._get.cache_info().hits, 0)
test_eq(t.count(0), 1)

Extensions to Pathlib.Path

An extension of the standard python libary Pathlib.Path. These extensions are accomplished by monkey patching additional methods onto Pathlib.Path.

Path.readlines[source]

Path.readlines(hint=-1, encoding='utf8')

Read the content of self

Path.mk_write[source]

Path.mk_write(data, encoding=None, errors=None, mode=511)

Make all parent dirs of self

Path.ls[source]

Path.ls(n_max=None, file_type=None, file_exts=None)

Contents of path as a list

We add an ls() method to pathlib.Path which is simply defined as list(Path.iterdir()), mainly for convenience in REPL environments such as notebooks.

path = Path()
t = path.ls()
assert len(t)>0
t1 = path.ls(10)
test_eq(len(t1), 10)
t2 = path.ls(file_exts='.ipynb')
assert len(t)>len(t2)
t[0]
Path('.ipynb_checkpoints')

You can also pass an optional file_type MIME prefix and/or a list of file extensions.

lib_path = (path/'../fastcore')
txt_files=lib_path.ls(file_type='text')
assert len(txt_files) > 0 and txt_files[0].suffix=='.py'
ipy_files=path.ls(file_exts=['.ipynb'])
assert len(ipy_files) > 0 and ipy_files[0].suffix=='.ipynb'
txt_files[0],ipy_files[0]
(Path('../fastcore/utils.py'), Path('03a.ipynb'))

open_file[source]

open_file(fn, mode='r', **kwargs)

Open a file, with optional compression if gz or bz2 suffix

save_pickle[source]

save_pickle(fn, o)

Save a pickle file, to a file name or opened file

load_pickle[source]

load_pickle(fn)

Load a pickle file from a file name or opened file

for suf in '.pkl','.bz2','.gz':
    with tempfile.NamedTemporaryFile(suffix=suf) as f:
        fn = Path(f.name)
        save_pickle(fn, 't')
        t = load_pickle(fn)
    test_eq(t,'t')

Path.__repr__[source]

Path.__repr__()

Return repr(self).

fastai also updates the repr of Path such that, if Path.BASE_PATH is defined, all paths are printed relative to that path (as long as they are contained in Path.BASE_PATH:

t = ipy_files[0].absolute()
try:
    Path.BASE_PATH = t.parent.parent
    test_eq(repr(t), f"Path('nbs/{t.name}')")
finally: Path.BASE_PATH = None

File Functions

Utilities (other than extensions to Pathlib.Path) for dealing with IO.

maybe_open[source]

maybe_open(f, mode='r', **kwargs)

Context manager: open f if it is a path (and close on exit)

This is useful for functions where you want to accept a path or file. maybe_open will not close your file handle if you pass one in.

def _f(fn):
    with maybe_open(fn) as f: return f.encoding

fname = '00_test.ipynb'
test_eq(_f(fname), 'UTF-8')
with open(fname) as fh: test_eq(_f(fh), 'UTF-8')

For example, we can use this to reimplement imghdr.what from the Python standard library, which is written in Python 3.9 as:

def what(file, h=None):
    f = None
    try:
        if h is None:
            if isinstance(file, (str,os.PathLike)):
                f = open(file, 'rb')
                h = f.read(32)
            else:
                location = file.tell()
                h = file.read(32)
                file.seek(location)
        for tf in imghdr.tests:
            res = tf(h, f)
            if res: return res
    finally:
        if f: f.close()
    return None

Here's an example of the use of this function:

fname = 'images/puppy.jpg'
what(fname)
'jpeg'

With maybe_open, Self, and L.map_first, we can rewrite this in a much more concise and (in our opinion) clear way:

def what(file, h=None):
    if h is None:
        with maybe_open(file, 'rb') as f: h = f.peek(32)
    return L(imghdr.tests).map_first(Self(h,file))

...and we can check that it still works:

test_eq(what(fname), 'jpeg')

...along with the version passing a file handle:

with open(fname,'rb') as f: test_eq(what(f), 'jpeg')

...along with the h parameter version:

with open(fname,'rb') as f: test_eq(what(None, h=f.read(32)), 'jpeg')
def _jpg_size(f):
        size,ftype = 2,0
        while not 0xc0 <= ftype <= 0xcf:
            f.seek(size, 1)
            byte = f.read(1)
            while ord(byte) == 0xff: byte = f.read(1)
            ftype = ord(byte)
            size = struct.unpack('>H', f.read(2))[0] - 2
        f.seek(1, 1)  # `precision'
        h,w = struct.unpack('>HH', f.read(4))
        return w,h

def _gif_size(f): return struct.unpack('<HH', head[6:10])

def _png_size(f):
    assert struct.unpack('>i', head[4:8])[0]==0x0d0a1a0a
    return struct.unpack('>ii', head[16:24])

image_size[source]

image_size(fn)

Tuple of (w,h) for png, gif, or jpg; None otherwise

test_eq(image_size(fname), (1200,803))

bunzip[source]

bunzip(fn)

bunzip fn, raising exception if output already exists

f = Path('files/test.txt')
if f.exists(): f.unlink()
bunzip('files/test.txt.bz2')
t = f.open().readlines()
test_eq(len(t),1)
test_eq(t[0], 'test\n')
f.unlink()

join_path_file[source]

join_path_file(file, path, ext='')

Return path/file if file is a string or a Path, file otherwise

path = Path.cwd()/'_tmp'/'tst'
f = join_path_file('tst.txt', path)
assert path.exists()
test_eq(f, path/'tst.txt')
with open(f, 'w') as f_: assert join_path_file(f_, path) == f_
shutil.rmtree(Path.cwd()/'_tmp')

loads[source]

loads(s, encoding=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, object_pairs_hook=None, **kw)

Same as json.loads, but handles None

untar_dir[source]

untar_dir(file, dest)

repo_details[source]

repo_details(url)

Tuple of owner,name from ssh or https git repo url

test_eq(repo_details('https://github.com/fastai/fastai.git'), ['fastai', 'fastai'])
test_eq(repo_details('[email protected]:fastai/nbdev.git\n'), ['fastai', 'nbdev'])

run[source]

run(cmd, *rest, ignore_ex=False, as_bytes=False)

Pass cmd (splitting with shlex if string) to subprocess.run; return stdout; raise IOError if fails

You can pass a string (which will be split based on standard shell rules), a list, or pass args directly:

assert 'ipynb' in run('ls -l')
assert 'ipynb' in run(['ls', '-l'])
assert 'ipynb' in run('ls', '-l')

Some commands fail in non-error situations, like grep. Use ignore_ex in those cases, which will return a tuple of stdout and returncode:

test_eq(run('grep asdfds 00_test.ipynb', ignore_ex=True)[0], 1)

run automatically decodes returned bytes to a str. Use as_bytes to skip that:

test_eq(run('echo hi', as_bytes=True), b'hi\n')

Other Helpers

stringfmt_names[source]

stringfmt_names(s:str)

Unique brace-delimited names in s

s = '/pulls/{pull_number}/reviews/{review_id}'
test_eq(stringfmt_names(s), ['pull_number','review_id'])

class PartialFormatter[source]

PartialFormatter() :: Formatter

partial_format[source]

partial_format(s:str, **kwargs)

string format s, ignoring missing field errors, returning missing and extra fields

The result is a tuple of (formatted_string,missing_fields,extra_fields), e.g:

res,missing,xtra = partial_format(s, pull_number=1, foo=2)
test_eq(res, '/pulls/1/reviews/{review_id}')
test_eq(missing, ['review_id'])
test_eq(xtra, {'foo':2})

trace[source]

trace(f)

Add set_trace to an existing function f

You can add a breakpoint to an existing function, e.g:

Path.cwd = trace(Path.cwd)
Path.cwd()

Now, when the function is called it will drop you into the debugger. Note, you must issue the s command when you begin to step into the function that is being traced.

round_multiple[source]

round_multiple(x, mult, round_down=False)

Round x to nearest multiple of mult

test_eq(round_multiple(63,32), 64)
test_eq(round_multiple(50,32), 64)
test_eq(round_multiple(40,32), 32)
test_eq(round_multiple( 0,32),  0)
test_eq(round_multiple(63,32, round_down=True), 32)
test_eq(round_multiple((63,40),32), (64,32))

modified_env[source]

modified_env(*delete, **replace)

Context manager temporarily modifying os.environ by deleting delete and replacing replace

oldsh = os.environ['SHELL']

with modified_env('PATH', SHELL='a'):
    test_eq(os.environ['SHELL'], 'a')
    assert 'PATH' not in os.environ

assert 'PATH' in os.environ
test_eq(os.environ['SHELL'], oldsh)

class ContextManagers[source]

ContextManagers(mgrs) :: GetAttr

Wrapper for contextlib.ExitStack which enters a collection of context managers

str2bool[source]

str2bool(s)

Case-insensitive convert string s too a bool (y,yes,t,true,on,1->True)

for o in "y YES t True on 1".split(): assert str2bool(o)
for o in "n no FALSE off 0".split(): assert not str2bool(o)
for o in 0,None,'',False: assert not str2bool(o)
for o in 1,True: assert str2bool(o)

sort_by_run[source]

sort_by_run(fs)

Transforms and callbacks will have run_after/run_before attributes, this function will sort them to respect those requirements (if it's possible). Also, sometimes we want a tranform/callback to be run at the end, but still be able to use run_after/run_before behaviors. For those, the function checks for a toward_end attribute (that needs to be True).

class Tst(): pass    
class Tst1(): run_before=[Tst]
class Tst2():
    run_before=Tst
    run_after=Tst1
    
tsts = [Tst(), Tst1(), Tst2()]
test_eq(sort_by_run(tsts), [tsts[1], tsts[2], tsts[0]])

Tst2.run_before,Tst2.run_after = Tst1,Tst
test_fail(lambda: sort_by_run([Tst(), Tst1(), Tst2()]))

def tst1(x): return x
tst1.run_before = Tst
test_eq(sort_by_run([tsts[0], tst1]), [tst1, tsts[0]])
    
class Tst1():
    toward_end=True
class Tst2():
    toward_end=True
    run_before=Tst1
tsts = [Tst(), Tst1(), Tst2()]
test_eq(sort_by_run(tsts), [tsts[0], tsts[2], tsts[1]])