Utility functions used in the fastai library
from fastcore.test import *
from nbdev.showdoc import *
from fastcore.nb_imports import *
from time import sleep

Collections

dict2obj[source]

dict2obj(d)

Convert (possibly nested) dicts (or lists of dicts) to AttrDict

This is a convenience to give you "dotted" access to (possibly nested) dictionaries, e.g:

d1 = dict(a=1, b=dict(c=2,d=3))
d2 = dict2obj(d1)
test_eq(d2.b.c, 2)
test_eq(d2.b['c'], 2)

It can also be used on lists of dicts.

_list_of_dicts = [d1, d1]
ds = dict2obj(_list_of_dicts)
test_eq(ds[0].b.c, 2)

obj2dict[source]

obj2dict(d)

Convert (possibly nested) AttrDicts (or lists of AttrDicts) to dict

obj2dict can be used to reverse what is done by dict2obj:

test_eq(obj2dict(d2), d1)
test_eq(obj2dict(ds), _list_of_dicts) 

repr_dict[source]

repr_dict(d)

Print nested dicts and lists, such as returned by dict2obj

print(repr_dict(d2))
- a: 1
- b: 
  - c: 2
  - d: 3

repr_dict is used to display AttrDict both with repr and in Jupyter Notebooks:

AttrDict.__repr__[source]

AttrDict.__repr__()

Return repr(self).

print(repr(d2))
- a: 1
- b: 
  - c: 2
  - d: 3
d2
  • a: 1
  • b:
    • c: 2
    • d: 3

is_listy[source]

is_listy(x)

isinstance(x, (tuple,list,L,slice,Generator))

assert is_listy((1,))
assert is_listy([1])
assert is_listy(L([1]))
assert is_listy(slice(2))
assert not is_listy(array([1]))

shufflish[source]

shufflish(x, pct=0.04)

Randomly relocate items of x up to pct of len(x) from their starting location

mapped[source]

mapped(f, it)

map f over it, unless it's not listy, in which case return f(it)

def _f(x,a=1): return x-a

test_eq(mapped(_f,1),0)
test_eq(mapped(_f,[1,2]),[0,1])
test_eq(mapped(_f,(1,)),(0,))

Reindexing Collections

class ReindexCollection[source]

ReindexCollection(coll, idxs=None, cache=None, tfm=noop) :: GetAttr

Reindexes collection coll with indices idxs and optional LRU cache of size cache

This is useful when constructing batches or organizing data in a particular manner (i.e. for deep learning). This class is primarly used in organizing data for language models in fastai.

You can supply a custom index upon instantiation with the idxs argument, or you can call the reindex method to supply a new index for your collection.

Here is how you can reindex a list such that the elements are reversed:

rc=ReindexCollection(['a', 'b', 'c', 'd', 'e'], idxs=[4,3,2,1,0])
list(rc)
['e', 'd', 'c', 'b', 'a']

Alternatively, you can use the reindex method:

ReindexCollection.reindex[source]

ReindexCollection.reindex(idxs)

Replace self.idxs with idxs

rc=ReindexCollection(['a', 'b', 'c', 'd', 'e'])
rc.reindex([4,3,2,1,0])
list(rc)
['e', 'd', 'c', 'b', 'a']

You can optionally specify a LRU cache, which uses functools.lru_cache upon instantiation:

sz = 50
t = ReindexCollection(L.range(sz), cache=2)

#trigger a cache hit by indexing into the same element multiple times
t[0], t[0]
t._get.cache_info()
CacheInfo(hits=1, misses=1, maxsize=2, currsize=1)

You can optionally clear the LRU cache by calling the cache_clear method:

ReindexCollection.cache_clear[source]

ReindexCollection.cache_clear()

Clear LRU cache

sz = 50
t = ReindexCollection(L.range(sz), cache=2)

#trigger a cache hit by indexing into the same element multiple times
t[0], t[0]
t.cache_clear()
t._get.cache_info()
CacheInfo(hits=0, misses=0, maxsize=2, currsize=0)
ReindexCollection.shuffle[source]

ReindexCollection.shuffle()

Randomly shuffle indices

Note that an ordered index is automatically constructed for the data structure even if one is not supplied.

rc=ReindexCollection(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])
rc.shuffle()
list(rc)
['c', 'f', 'e', 'g', 'h', 'b', 'd', 'a']
sz = 50
t = ReindexCollection(L.range(sz), cache=2)
test_eq(list(t), range(sz))
test_eq(t[sz-1], sz-1)
test_eq(t._get.cache_info().hits, 1)
t.shuffle()
test_eq(t._get.cache_info().hits, 1)
test_ne(list(t), range(sz))
test_eq(set(t), set(range(sz)))
t.cache_clear()
test_eq(t._get.cache_info().hits, 0)
test_eq(t.count(0), 1)

File Functions

Utilities (other than extensions to Pathlib.Path) for dealing with IO.

maybe_open[source]

maybe_open(f, mode='r', **kwargs)

Context manager: open f if it is a path (and close on exit)

This is useful for functions where you want to accept a path or file. maybe_open will not close your file handle if you pass one in.

def _f(fn):
    with maybe_open(fn) as f: return f.encoding

fname = '00_test.ipynb'
sys_encoding = 'cp1252' if sys.platform == 'win32' else 'UTF-8'
test_eq(_f(fname), sys_encoding)
with open(fname) as fh: test_eq(_f(fh), sys_encoding)

For example, we can use this to reimplement imghdr.what from the Python standard library, which is written in Python 3.9 as:

def what(file, h=None):
    f = None
    try:
        if h is None:
            if isinstance(file, (str,os.PathLike)):
                f = open(file, 'rb')
                h = f.read(32)
            else:
                location = file.tell()
                h = file.read(32)
                file.seek(location)
        for tf in imghdr.tests:
            res = tf(h, f)
            if res: return res
    finally:
        if f: f.close()
    return None

Here's an example of the use of this function:

fname = 'images/puppy.jpg'
what(fname)
'jpeg'

With maybe_open, Self, and L.map_first, we can rewrite this in a much more concise and (in our opinion) clear way:

def what(file, h=None):
    if h is None:
        with maybe_open(file, 'rb') as f: h = f.peek(32)
    return L(imghdr.tests).map_first(Self(h,file))

...and we can check that it still works:

test_eq(what(fname), 'jpeg')

...along with the version passing a file handle:

with open(fname,'rb') as f: test_eq(what(f), 'jpeg')

...along with the h parameter version:

with open(fname,'rb') as f: test_eq(what(None, h=f.read(32)), 'jpeg')
def _jpg_size(f):
        size,ftype = 2,0
        while not 0xc0 <= ftype <= 0xcf:
            f.seek(size, 1)
            byte = f.read(1)
            while ord(byte) == 0xff: byte = f.read(1)
            ftype = ord(byte)
            size = struct.unpack('>H', f.read(2))[0] - 2
        f.seek(1, 1)  # `precision'
        h,w = struct.unpack('>HH', f.read(4))
        return w,h

def _gif_size(f): return struct.unpack('<HH', head[6:10])

def _png_size(f):
    assert struct.unpack('>i', head[4:8])[0]==0x0d0a1a0a
    return struct.unpack('>ii', head[16:24])

image_size[source]

image_size(fn)

Tuple of (w,h) for png, gif, or jpg; None otherwise

test_eq(image_size(fname), (1200,803))

bunzip[source]

bunzip(fn)

bunzip fn, raising exception if output already exists

f = Path('files/test.txt')
if f.exists(): f.unlink()
bunzip('files/test.txt.bz2')
t = f.open().readlines()
test_eq(len(t),1)
test_eq(t[0], 'test\n')
f.unlink()

join_path_file[source]

join_path_file(file, path, ext='')

Return path/file if file is a string or a Path, file otherwise

path = Path.cwd()/'_tmp'/'tst'
f = join_path_file('tst.txt', path)
assert path.exists()
test_eq(f, path/'tst.txt')
with open(f, 'w') as f_: assert join_path_file(f_, path) == f_
shutil.rmtree(Path.cwd()/'_tmp')

loads[source]

loads(s, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, object_pairs_hook=None, **kw)

Same as json.loads, but handles None

loads_multi[source]

loads_multi(s:str)

Generator of >=0 decoded json dicts, possibly with non-json ignored text at start and end

tst = """
# ignored
{ "a":1 }
hello
{
"b":2
}
"""

test_eq(list(loads_multi(tst)), [{'a': 1}, {'b': 2}])

untar_dir[source]

untar_dir(file, dest)

repo_details[source]

repo_details(url)

Tuple of owner,name from ssh or https git repo url

test_eq(repo_details('https://github.com/fastai/fastai.git'), ['fastai', 'fastai'])
test_eq(repo_details('[email protected]:fastai/nbdev.git\n'), ['fastai', 'nbdev'])

run[source]

run(cmd, *rest, ignore_ex=False, as_bytes=False, stderr=False)

Pass cmd (splitting with shlex if string) to subprocess.run; return stdout; raise IOError if fails

You can pass a string (which will be split based on standard shell rules), a list, or pass args directly:

if sys.platform == 'win32':
    assert 'ipynb' in run('cmd /c dir /p')
    assert 'ipynb' in run(['cmd', '/c', 'dir', '/p'])
    assert 'ipynb' in run('cmd', '/c', 'dir',  '/p')
else:
    assert 'ipynb' in run('ls -ls')
    assert 'ipynb' in run(['ls', '-l'])
    assert 'ipynb' in run('ls', '-l')

Some commands fail in non-error situations, like grep. Use ignore_ex in those cases, which will return a tuple of stdout and returncode:

if sys.platform == 'win32':
    test_eq(run('cmd /c findstr asdfds 00_test.ipynb', ignore_ex=True)[0], 1)
else:
    test_eq(run('grep asdfds 00_test.ipynb', ignore_ex=True)[0], 1)

run automatically decodes returned bytes to a str. Use as_bytes to skip that:

if sys.platform == 'win32':
    # why I ignore as_types, because every time nbdev_clean_nbs will update \n to \nn
    test_eq(run('cmd /c echo hi'), 'hi')
else:
    test_eq(run('echo hi', as_bytes=True), b'hi\n')

open_file[source]

open_file(fn, mode='r', **kwargs)

Open a file, with optional compression if gz or bz2 suffix

save_pickle[source]

save_pickle(fn, o)

Save a pickle file, to a file name or opened file

load_pickle[source]

load_pickle(fn)

Load a pickle file from a file name or opened file

for suf in '.pkl','.bz2','.gz':
    # delete=False is added for Windows. https://stackoverflow.com/questions/23212435/permission-denied-to-write-to-my-temporary-file
    with tempfile.NamedTemporaryFile(suffix=suf, delete=False) as f:
        fn = Path(f.name)
        save_pickle(fn, 't')
        t = load_pickle(fn)
    f.close()
    test_eq(t,'t')

Extensions to Pathlib.Path

The following methods are added to the standard python libary Pathlib.Path.

Path.readlines[source]

Path.readlines(hint=-1, encoding='utf8')

Read the content of self

Path.read_json[source]

Path.read_json(encoding=None, errors=None)

Same as read_text followed by loads

Path.mk_write[source]

Path.mk_write(data, encoding=None, errors=None, mode=511)

Make all parent dirs of self, and write data

Path.ls[source]

Path.ls(n_max=None, file_type=None, file_exts=None)

Contents of path as a list

We add an ls() method to pathlib.Path which is simply defined as list(Path.iterdir()), mainly for convenience in REPL environments such as notebooks.

path = Path()
t = path.ls()
assert len(t)>0
t1 = path.ls(10)
test_eq(len(t1), 10)
t2 = path.ls(file_exts='.ipynb')
assert len(t)>len(t2)
t[0]
Path('.gitattributes')

You can also pass an optional file_type MIME prefix and/or a list of file extensions.

lib_path = (path/'../fastcore')
txt_files=lib_path.ls(file_type='text')
assert len(txt_files) > 0 and txt_files[0].suffix=='.py'
ipy_files=path.ls(file_exts=['.ipynb'])
assert len(ipy_files) > 0 and ipy_files[0].suffix=='.ipynb'
txt_files[0],ipy_files[0]
(Path('../fastcore/__init__.py'), Path('01_basics.ipynb'))

Path.__repr__[source]

Path.__repr__()

Return repr(self).

fastai also updates the repr of Path such that, if Path.BASE_PATH is defined, all paths are printed relative to that path (as long as they are contained in Path.BASE_PATH:

t = ipy_files[0].absolute()
try:
    Path.BASE_PATH = t.parent.parent
    test_eq(repr(t), f"Path('nbs/{t.name}')")
finally: Path.BASE_PATH = None

Other Helpers

truncstr[source]

truncstr(s:str, maxlen:int, suf:str='…', space='')

Truncate s to length maxlen, adding suffix suf if truncated

w = 'abacadabra'
test_eq(truncstr(w, 10), w)
test_eq(truncstr(w, 5), 'abac…')
test_eq(truncstr(w, 5, suf=''), 'abaca')
test_eq(truncstr(w, 11, space='_'), w+"_")
test_eq(truncstr(w, 10, space='_'), w[:-1]+'…')
test_eq(truncstr(w, 5, suf='!!'), 'aba!!')

sparkline[source]

sparkline(data, mn=None, mx=None, empty_zero=False)

Sparkline for data, with Nones (and zero, if empty_zero) shown as empty column

data = [9,6,None,1,4,0,8,15,10]
print(f'without "empty_zero": {sparkline(data, empty_zero=False)}')
print(f'   with "empty_zero": {sparkline(data, empty_zero=True )}')
without "empty_zero": ▅▂ ▁▂▁▃▇▅
   with "empty_zero": ▅▂ ▁▂ ▃▇▅

You can set a maximum and minimum for the y-axis of the sparkline with the arguments mn and mx respectively:

sparkline([1,2,3,400], mn=0, mx=3)
'▂▅▇▇'

autostart[source]

autostart(g)

Decorator that automatically starts a generator

class EventTimer[source]

EventTimer(store=5, span=60)

An event timer with history of store items of time span

Add events with add, and get number of events and their frequency (freq).

def _randwait(): yield from (sleep(random.random()/200) for _ in range(100))

c = EventTimer(store=5, span=0.03)
for o in _randwait(): c.add(1)
print(f'Num Events: {c.events}, Freq/sec: {c.freq:.01f}')
print('Most recent: ', sparkline(c.hist), *L(c.hist).map('{:.01f}'))
Num Events: 8, Freq/sec: 423.0
Most recent:  ▂▂▁▁▇ 318.5 319.0 266.9 275.6 427.7

stringfmt_names[source]

stringfmt_names(s:str)

Unique brace-delimited names in s

s = '/pulls/{pull_number}/reviews/{review_id}'
test_eq(stringfmt_names(s), ['pull_number','review_id'])

class PartialFormatter[source]

PartialFormatter() :: Formatter

A string.Formatter that doesn't error on missing fields, and tracks missing fields and unused args

partial_format[source]

partial_format(s:str, **kwargs)

string format s, ignoring missing field errors, returning missing and extra fields

The result is a tuple of (formatted_string,missing_fields,extra_fields), e.g:

res,missing,xtra = partial_format(s, pull_number=1, foo=2)
test_eq(res, '/pulls/1/reviews/{review_id}')
test_eq(missing, ['review_id'])
test_eq(xtra, {'foo':2})

utc2local[source]

utc2local(dt:datetime)

Convert dt from UTC to local time

dt = datetime(2000,1,1,12)
print(f'{dt} UTC is {utc2local(dt)} local time')
2000-01-01 12:00:00 UTC is 2000-01-01 12:00:00+00:00 local time

local2utc[source]

local2utc(dt:datetime)

Convert dt from local to UTC time

print(f'{dt} local is {local2utc(dt)} UTC time')
2000-01-01 12:00:00 local is 2000-01-01 12:00:00+00:00 UTC time

trace[source]

trace(f)

Add set_trace to an existing function f

You can add a breakpoint to an existing function, e.g:

Path.cwd = trace(Path.cwd)
Path.cwd()

Now, when the function is called it will drop you into the debugger. Note, you must issue the s command when you begin to step into the function that is being traced.

round_multiple[source]

round_multiple(x, mult, round_down=False)

Round x to nearest multiple of mult

test_eq(round_multiple(63,32), 64)
test_eq(round_multiple(50,32), 64)
test_eq(round_multiple(40,32), 32)
test_eq(round_multiple( 0,32),  0)
test_eq(round_multiple(63,32, round_down=True), 32)
test_eq(round_multiple((63,40),32), (64,32))

modified_env[source]

modified_env(*delete, **replace)

Context manager temporarily modifying os.environ by deleting delete and replacing replace

env_test = 'USERNAME' if sys.platform == "win32" else 'SHELL'
oldusr = os.environ[env_test]

replace_param = {env_test: 'a'}
with modified_env('PATH', **replace_param):
    test_eq(os.environ[env_test], 'a')
    assert 'PATH' not in os.environ

assert 'PATH' in os.environ
test_eq(os.environ[env_test], oldusr)

class ContextManagers[source]

ContextManagers(mgrs) :: GetAttr

Wrapper for contextlib.ExitStack which enters a collection of context managers

str2bool[source]

str2bool(s)

Case-insensitive convert string s too a bool (y,yes,t,true,on,1->True)

for o in "y YES t True on 1".split(): assert str2bool(o)
for o in "n no FALSE off 0".split(): assert not str2bool(o)
for o in 0,None,'',False: assert not str2bool(o)
for o in 1,True: assert str2bool(o)

sort_by_run[source]

sort_by_run(fs)