Utility functions

Utility functions used in the fastai library

File Functions

Utilities (other than extensions to Pathlib.Path) for dealing with IO.


source

walk

 walk (path:pathlib.Path|str, symlinks:bool=True, keep_file:<built-
       infunctioncallable>=<function ret_true>, keep_folder:<built-
       infunctioncallable>=<function ret_true>, skip_folder:<built-
       infunctioncallable>=<function ret_false>, func:<built-
       infunctioncallable>=<function join>, ret_folders:bool=False)

Generator version of os.walk, using functions to filter files and folders

Type Default Details
path pathlib.Path | str path to start searching
symlinks bool True follow symlinks?
keep_file callable ret_true function that returns True for wanted files
keep_folder callable ret_true function that returns True for folders to enter
skip_folder callable ret_false function that returns True for folders to skip
func callable join function to apply to each matched file
ret_folders bool False return folders, not just files

source

globtastic

 globtastic (path:pathlib.Path|str, recursive:bool=True,
             symlinks:bool=True, file_glob:str=None, file_re:str=None,
             folder_re:str=None, skip_file_glob:str=None,
             skip_file_re:str=None, skip_folder_re:str=None, func:<built-
             infunctioncallable>=<function join>, ret_folders:bool=False)

A more powerful glob, including regex matches, symlink handling, and skip parameters

Type Default Details
path pathlib.Path | str path to start searching
recursive bool True search subfolders
symlinks bool True follow symlinks?
file_glob str None Only include files matching glob
file_re str None Only include files matching regex
folder_re str None Only enter folders matching regex
skip_file_glob str None Skip files matching glob
skip_file_re str None Skip files matching regex
skip_folder_re str None Skip folders matching regex,
func callable join function to apply to each matched file
ret_folders bool False return folders, not just files
Returns L Paths to matched files
globtastic('.', skip_folder_re='^[_.]', folder_re='core', file_glob='*.*py*', file_re='c')
(#5) ['./fastcore/docments.py','./fastcore/dispatch.py','./fastcore/basics.py','./fastcore/docscrape.py','./fastcore/script.py']

source

maybe_open

 maybe_open (f, mode='r', **kwargs)

Context manager: open f if it is a path (and close on exit)

This is useful for functions where you want to accept a path or file. maybe_open will not close your file handle if you pass one in.

def _f(fn):
    with maybe_open(fn) as f: return f.encoding

fname = '00_test.ipynb'
sys_encoding = 'cp1252' if sys.platform == 'win32' else 'UTF-8'
test_eq(_f(fname), sys_encoding)
with open(fname) as fh: test_eq(_f(fh), sys_encoding)

For example, we can use this to reimplement imghdr.what from the Python standard library, which is written in Python 3.9 as:

from fastcore import imghdr
def what(file, h=None):
    f = None
    try:
        if h is None:
            if isinstance(file, (str,os.PathLike)):
                f = open(file, 'rb')
                h = f.read(32)
            else:
                location = file.tell()
                h = file.read(32)
                file.seek(location)
        for tf in imghdr.tests:
            res = tf(h, f)
            if res: return res
    finally:
        if f: f.close()
    return None

Here’s an example of the use of this function:

fname = 'images/puppy.jpg'
what(fname)
'jpeg'

With maybe_open, Self, and L.map_first, we can rewrite this in a much more concise and (in our opinion) clear way:

def what(file, h=None):
    if h is None:
        with maybe_open(file, 'rb') as f: h = f.peek(32)
    return L(imghdr.tests).map_first(Self(h,file))

…and we can check that it still works:

test_eq(what(fname), 'jpeg')

…along with the version passing a file handle:

with open(fname,'rb') as f: test_eq(what(f), 'jpeg')

…along with the h parameter version:

with open(fname,'rb') as f: test_eq(what(None, h=f.read(32)), 'jpeg')

source

mkdir

 mkdir (path, exist_ok=False, parents=False, overwrite=False, **kwargs)

Creates and returns a directory defined by path, optionally removing previous existing directory if overwrite is True

with tempfile.TemporaryDirectory() as d:
    path = Path(os.path.join(d, 'new_dir'))
    new_dir = mkdir(path)
    assert new_dir.exists()
    test_eq(new_dir, path)
        
    # test overwrite
    with open(new_dir/'test.txt', 'w') as f: f.writelines('test')
    test_eq(len(list(walk(new_dir))), 1) # assert file is present
    new_dir = mkdir(new_dir, overwrite=True)
    test_eq(len(list(walk(new_dir))), 0) # assert file was deleted

source

image_size

 image_size (fn)

Tuple of (w,h) for png, gif, or jpg; None otherwise

test_eq(image_size(fname), (1200,803))

source

bunzip

 bunzip (fn)

bunzip fn, raising exception if output already exists

f = Path('files/test.txt')
if f.exists(): f.unlink()
bunzip('files/test.txt.bz2')
t = f.open().readlines()
test_eq(len(t),1)
test_eq(t[0], 'test\n')
f.unlink()

source

loads

 loads (s, **kw)

Same as json.loads, but handles None


source

loads_multi

 loads_multi (s:str)

Generator of >=0 decoded json dicts, possibly with non-json ignored text at start and end

tst = """
# ignored
{ "a":1 }
hello
{
"b":2
}
"""

test_eq(list(loads_multi(tst)), [{'a': 1}, {'b': 2}])

source

dumps

 dumps (obj, **kw)

Same as json.dumps, but uses ujson if available


source

untar_dir

 untar_dir (fname, dest, rename=False, overwrite=False)

untar file into dest, creating a directory if the root contains more than one item

def test_untar(foldername, rename=False, **kwargs):
    with tempfile.TemporaryDirectory() as d:
        nm = os.path.join(d, 'a')
        shutil.make_archive(nm, 'gztar', **kwargs)
        with tempfile.TemporaryDirectory() as d2:
            d2 = Path(d2)
            untar_dir(nm+'.tar.gz', d2, rename=rename)
            test_eq(d2.ls(), [d2/foldername])

If the contents of fname contain just one file or directory, it is placed directly in dest:

# using `base_dir` in `make_archive` results in `images` directory included in file names
test_untar('images', base_dir='images')

If rename then the directory created is named based on the archive, without extension:

test_untar('a', base_dir='images', rename=True)

If the contents of fname contain multiple files and directories, a new folder in dest is created with the same name as fname (but without extension):

# using `root_dir` in `make_archive` results in `images` directory *not* included in file names
test_untar('a', root_dir='images')

source

repo_details

 repo_details (url)

Tuple of owner,name from ssh or https git repo url

test_eq(repo_details('https://github.com/fastai/fastai.git'), ['fastai', 'fastai'])
test_eq(repo_details('[email protected]:fastai/nbdev.git\n'), ['fastai', 'nbdev'])

source

run

 run (cmd, *rest, same_in_win=False, ignore_ex=False, as_bytes=False,
      stderr=False)

Pass cmd (splitting with shlex if string) to subprocess.run; return stdout; raise IOError if fails

You can pass a string (which will be split based on standard shell rules), a list, or pass args directly:

run('echo', same_in_win=True)
run('pip', '--version', same_in_win=True)
run(['pip', '--version'], same_in_win=True)
'pip 25.1.1 from /Users/jhoward/aai-ws/.venv/lib/python3.12/site-packages/pip (python 3.12)'
if sys.platform == 'win32':
    assert 'ipynb' in run('cmd /c dir /p')
    assert 'ipynb' in run(['cmd', '/c', 'dir', '/p'])
    assert 'ipynb' in run('cmd', '/c', 'dir',  '/p')
else:
    assert 'ipynb' in run('ls -ls')
    assert 'ipynb' in run(['ls', '-l'])
    assert 'ipynb' in run('ls', '-l')

Some commands fail in non-error situations, like grep. Use ignore_ex in those cases, which will return a tuple of stdout and returncode:

if sys.platform == 'win32':
    test_eq(run('cmd /c findstr asdfds 00_test.ipynb', ignore_ex=True)[0], 1)
else:
    test_eq(run('grep asdfds 00_test.ipynb', ignore_ex=True)[0], 1)

run automatically decodes returned bytes to a str. Use as_bytes to skip that:

if sys.platform == 'win32':
    test_eq(run('cmd /c echo hi'), 'hi')
else:
    test_eq(run('echo hi', as_bytes=True), b'hi\n')

source

open_file

 open_file (fn, mode='r', **kwargs)

Open a file, with optional compression if gz or bz2 suffix


source

save_pickle

 save_pickle (fn, o)

Save a pickle file, to a file name or opened file


source

load_pickle

 load_pickle (fn)

Load a pickle file from a file name or opened file

for suf in '.pkl','.bz2','.gz':
    # delete=False is added for Windows
    # https://stackoverflow.com/questions/23212435/permission-denied-to-write-to-my-temporary-file
    with tempfile.NamedTemporaryFile(suffix=suf, delete=False) as f:
        fn = Path(f.name)
        save_pickle(fn, 't')
        t = load_pickle(fn)
    f.close()
    test_eq(t,'t')

source

parse_env

 parse_env (s:str=None, fn:Union[str,pathlib.Path]=None)

Parse a shell-style environment string or file

testf = """# comment
   # another comment
 export FOO="bar#baz"
BAR=thing # comment "ok"
  baz='thong'
QUX=quux
export ZAP = "zip" # more comments
   FOOBAR = 42   # trailing space and comment"""

exp = dict(FOO='bar#baz', BAR='thing', baz='thong', QUX='quux', ZAP='zip', FOOBAR='42')

test_eq(parse_env(testf),  exp)

source

expand_wildcards

 expand_wildcards (code)

Expand all wildcard imports in the given code string.

inp = """from math import *
from os import *
from random import *
def func(): return sin(pi) + path.join('a', 'b') + randint(1, 10)"""

exp = """from math import pi, sin
from os import path
from random import randint
def func(): return sin(pi) + path.join('a', 'b') + randint(1, 10)"""

test_eq(expand_wildcards(inp), exp)

inp = """from itertools import *
def func(): pass"""
test_eq(expand_wildcards(inp), inp)

inp = """def outer():
    from math import *
    def inner():
        from os import *
        return sin(pi) + path.join('a', 'b')"""

exp = """def outer():
    from math import pi, sin
    def inner():
        from os import path
        return sin(pi) + path.join('a', 'b')"""

test_eq(expand_wildcards(inp), exp)

Collections


source

dict2obj

 dict2obj (d, list_func=<class 'fastcore.foundation.L'>, dict_func=<class
           'fastcore.basics.AttrDict'>)

Convert (possibly nested) dicts (or lists of dicts) to AttrDict

This is a convenience to give you “dotted” access to (possibly nested) dictionaries, e.g:

d1 = dict(a=1, b=dict(c=2,d=3))
d2 = dict2obj(d1)
test_eq(d2.b.c, 2)
test_eq(d2.b['c'], 2)

It can also be used on lists of dicts.

_list_of_dicts = [d1, d1]
ds = dict2obj(_list_of_dicts)
test_eq(ds[0].b.c, 2)

source

obj2dict

 obj2dict (d)

Convert (possibly nested) AttrDicts (or lists of AttrDicts) to dict

obj2dict can be used to reverse what is done by dict2obj:

test_eq(obj2dict(d2), d1)
test_eq(obj2dict(ds), _list_of_dicts)

source

repr_dict

 repr_dict (d)

Print nested dicts and lists, such as returned by dict2obj

print(repr_dict(d2))
- a: 1
- b: 
  - c: 2
  - d: 3

source

is_listy

 is_listy (x)

isinstance(x, (tuple,list,L,slice,Generator))

assert is_listy((1,))
assert is_listy([1])
assert is_listy(L([1]))
assert is_listy(slice(2))
assert not is_listy(array([1]))

source

mapped

 mapped (f, it)

map f over it, unless it’s not listy, in which case return f(it)

def _f(x,a=1): return x-a

test_eq(mapped(_f,1),0)
test_eq(mapped(_f,[1,2]),[0,1])
test_eq(mapped(_f,(1,)),(0,))

Extensions to Pathlib.Path

The following methods are added to the standard python libary Pathlib.Path.


source

Path.readlines

 Path.readlines (hint=-1, encoding='utf8')

Read the content of self


source

Path.read_json

 Path.read_json (encoding=None, errors=None)

Same as read_text followed by loads


source

Path.mk_write

 Path.mk_write (data, encoding=None, errors=None, mode=511)

Make all parent dirs of self, and write data


source

Path.relpath

 Path.relpath (start=None)

Same as os.path.relpath, but returns a Path, and resolves symlinks

p = Path('../fastcore/').resolve()
p
Path('/Users/jhoward/aai-ws/fastcore/fastcore')
p.relpath(Path.cwd())
Path('../fastcore')

source

Path.ls

 Path.ls (n_max=None, file_type=None, file_exts=None)

Contents of path as a list

We add an ls() method to pathlib.Path which is simply defined as list(Path.iterdir()), mainly for convenience in REPL environments such as notebooks.

path = Path()
t = path.ls()
assert len(t)>0
t1 = path.ls(10)
test_eq(len(t1), 10)
t2 = path.ls(file_exts='.ipynb')
assert len(t)>len(t2)
t[0]
Path('llms.txt')

You can also pass an optional file_type MIME prefix and/or a list of file extensions.

lib_path = (path/'../fastcore')
txt_files=lib_path.ls(file_type='text')
assert len(txt_files) > 0 and txt_files[0].suffix=='.py'
ipy_files=path.ls(file_exts=['.ipynb'])
assert len(ipy_files) > 0 and ipy_files[0].suffix=='.ipynb'
txt_files[0],ipy_files[0]
(Path('../fastcore/shutil.py'), Path('000_tour.ipynb'))

source

Path.__repr__

 Path.__repr__ ()

Return repr(self).

fastai also updates the repr of Path such that, if Path.BASE_PATH is defined, all paths are printed relative to that path (as long as they are contained in Path.BASE_PATH:

t = ipy_files[0].absolute()
try:
    Path.BASE_PATH = t.parent.parent
    test_eq(repr(t), f"Path('nbs/{t.name}')")
finally: Path.BASE_PATH = None

source

Path.delete

 Path.delete ()

Delete a file, symlink, or directory tree

Reindexing Collections


source

ReindexCollection

 ReindexCollection (coll, idxs=None, cache=None, tfm=<function noop>)

Reindexes collection coll with indices idxs and optional LRU cache of size cache

This is useful when constructing batches or organizing data in a particular manner (i.e. for deep learning). This class is primarly used in organizing data for language models in fastai.

You can supply a custom index upon instantiation with the idxs argument, or you can call the reindex method to supply a new index for your collection.

Here is how you can reindex a list such that the elements are reversed:

rc=ReindexCollection(['a', 'b', 'c', 'd', 'e'], idxs=[4,3,2,1,0])
list(rc)
['e', 'd', 'c', 'b', 'a']

Alternatively, you can use the reindex method:


source

ReindexCollection.reindex
 ReindexCollection.reindex (idxs)

Replace self.idxs with idxs

rc=ReindexCollection(['a', 'b', 'c', 'd', 'e'])
rc.reindex([4,3,2,1,0])
list(rc)
['e', 'd', 'c', 'b', 'a']

You can optionally specify a LRU cache, which uses functools.lru_cache upon instantiation:

sz = 50
t = ReindexCollection(L.range(sz), cache=2)

#trigger a cache hit by indexing into the same element multiple times
t[0], t[0]
t._get.cache_info()
CacheInfo(hits=1, misses=1, maxsize=2, currsize=1)

You can optionally clear the LRU cache by calling the cache_clear method:


source

ReindexCollection.cache_clear
 ReindexCollection.cache_clear ()

Clear LRU cache

sz = 50
t = ReindexCollection(L.range(sz), cache=2)

#trigger a cache hit by indexing into the same element multiple times
t[0], t[0]
t.cache_clear()
t._get.cache_info()
CacheInfo(hits=0, misses=0, maxsize=2, currsize=0)

source

ReindexCollection.shuffle
 ReindexCollection.shuffle ()

Randomly shuffle indices

Note that an ordered index is automatically constructed for the data structure even if one is not supplied.

rc=ReindexCollection(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])
rc.shuffle()
list(rc)
['g', 'c', 'f', 'd', 'a', 'b', 'e', 'h']
sz = 50
t = ReindexCollection(L.range(sz), cache=2)
test_eq(list(t), range(sz))
test_eq(t[sz-1], sz-1)
test_eq(t._get.cache_info().hits, 1)
t.shuffle()
test_eq(t._get.cache_info().hits, 1)
test_ne(list(t), range(sz))
test_eq(set(t), set(range(sz)))
t.cache_clear()
test_eq(t._get.cache_info().hits, 0)
test_eq(t.count(0), 1)

SaveReturn and save_iter Variants

These utilities solve a common problem in Python: how to extract additional information from generator functions beyond just the yielded values.

In Python, generator functions can yield values and also return a final value, but the return value is normally lost when you iterate over the generator:

def example_generator():
    total = 0
    for i in range(3):
        total += i
        yield i
    return total  # This gets lost!

# The return value (3) is lost
values = list(example_generator())  # [0, 1, 2]

source

SaveReturn

 SaveReturn (its)

Wrap an iterator such that the generator function’s return value is stored in .value

Exported source
class SaveReturn:
    "Wrap an iterator such that the generator function's return value is stored in `.value`"
    def __init__(self, its): self.its = its
    def __iter__(self):
        self.value = yield from self.its
        return self.value

SaveReturn is the simplest approach to solving this problem - it wraps any existing (non-async) generator and captures its return value. This works because yield from (used internally in SaveReturn) returns the value from the return of the generator function.

def sum_range(n):
    total = 0
    for i in range(n):
        total += i
        yield i
    return total  # This value is returned by yield from

sr = SaveReturn(sum_range(5))
values = list(sr)  # This will consume the generator and get the return value
print(f"Values: {values}")
sr.value
Values: [0, 1, 2, 3, 4]
10

In order to provide an accurate signature for save_iter, we need a version of wraps that removes leading parameters:


source

trim_wraps

 trim_wraps (f, n=1)

Like wraps, but removes the first n parameters from the signature

trim_wraps is a decorator factory that works like functools.wraps, but removes the first n parameters from the wrapped function’s signature. This is useful when creating wrapper functions that consume some parameters internally and shouldn’t expose them in the public API.

def adder(base, x, y): return base + x + y

def make_adder(base_value):
    @trim_wraps(adder)
    def _(x, y): return adder(base_value, x, y)
    return _

add_10 = make_adder(10)
print(f"{add_10.__name__}{inspect.signature(add_10)}")
adder(x, y)

source

save_iter

 save_iter (g)

Decorator that allows a generator function to store values in the returned iterator object

save_iter modifies generator functions to store state in the iterator object itself. The generator receives an object as its first parameter, which it can use to store attributes. You can store values during iteration, not just at the end, and you can store multiple attributes if needed.

@save_iter
def sum_range(o, n):  # Note: 'o' parameter added
    total = 0
    for i in range(n):
        total += i
        yield i
    o.value = total  # Store directly on the iterator object

Because iternally save_iter uses trim_wraps, the signature of sum_range correctly shows that you should not pass o to it; it’s injected by the decorating function.

print(sum_range.__signature__)
(n)
sr = sum_range(5)
print(f"Values: {list(sr)}")
print(f"Sum stored: {sr.value}")
Values: [0, 1, 2, 3, 4]
Sum stored: 10

source

asave_iter

 asave_iter (g)

Like save_iter, but for async iterators

asave_iter provides the same functionality as save_iter, but for async generator functions. yield from and return can not be used with async generator functions, so SaveReturn can’t be used here.

@asave_iter
async def asum_range(self, n):
    total = 0
    for i in range(n):
        total += i
        yield i
    self.value = total

asr = asum_range(5)
print(f"Values: {[o async for o in asr]}")
print(f"Sum stored: {asr.value}")
Values: [0, 1, 2, 3, 4]
Sum stored: 10

Other Helpers


source

exec_eval

 exec_eval (code, g=None, l=None)

Evaluate code in g (defaults to globals()) and l (defaults to locals())

Type Default Details
code Code to exec/eval
g NoneType None Globals namespace dict
l NoneType None Locals namespace dict

This is a combination of eval and exec, which behaves like ipython and Jupyter. If the last line is an expression, it is evaluated and the result is returned:

exec_eval('''
def f(x): return x+1
f(1)
''')
2

By default, the code uses the caller’s globals and locals. For instance, here f is available since it’s been added to our symbol table:

exec_eval('print(f(2))')
3

Pass a dict as the g param in order to use an arbitrary namespace:

exec_eval('print(f)', {'f': 'Hi I am f.'})
Hi I am f.

This function helps us identify the first declared raw function of a dispatched function:

from plum import Function
def f1(x): return "Any"
def f2(x:int): return "Int"

df = Function(f1).dispatch(f1).dispatch(f2)

test_eq(_unwrapped_type_dispatch_func(df), f1)

source

truncstr

 truncstr (s:str, maxlen:int, suf:str='…', space='')

Truncate s to length maxlen, adding suffix suf if truncated

w = 'abacadabra'
test_eq(truncstr(w, 10), w)
test_eq(truncstr(w, 5), 'abac…')
test_eq(truncstr(w, 5, suf=''), 'abaca')
test_eq(truncstr(w, 11, space='_'), w+"_")
test_eq(truncstr(w, 10, space='_'), w[:-1]+'…')
test_eq(truncstr(w, 5, suf='!!'), 'aba!!')

source

sparkline

 sparkline (data, mn=None, mx=None, empty_zero=False)

Sparkline for data, with Nones (and zero, if empty_zero) shown as empty column

data = [9,6,None,1,4,0,8,15,10]
print(f'without "empty_zero": {sparkline(data, empty_zero=False)}')
print(f'   with "empty_zero": {sparkline(data, empty_zero=True )}')
without "empty_zero": ▅▂ ▁▂▁▃▇▅
   with "empty_zero": ▅▂ ▁▂ ▃▇▅

You can set a maximum and minimum for the y-axis of the sparkline with the arguments mn and mx respectively:

sparkline([1,2,3,400], mn=0, mx=3)
'▂▅▇▇'

source

modify_exception

 modify_exception (e:Exception, msg:str=None, replace:bool=False)

Modifies e with a custom message attached

Type Default Details
e Exception An exception
msg str None A custom message
replace bool False Whether to replace e.args with [msg]
Returns Exception
msg = "This is my custom message!"

test_fail(lambda: (_ for _ in ()).throw(modify_exception(Exception(), None)), contains='')
test_fail(lambda: (_ for _ in ()).throw(modify_exception(Exception(), msg)), contains=msg)
test_fail(lambda: (_ for _ in ()).throw(modify_exception(Exception("The first message"), msg)), contains="The first message This is my custom message!")
test_fail(lambda: (_ for _ in ()).throw(modify_exception(Exception("The first message"), msg, True)), contains="This is my custom message!")

source

round_multiple

 round_multiple (x, mult, round_down=False)

Round x to nearest multiple of mult

test_eq(round_multiple(63,32), 64)
test_eq(round_multiple(50,32), 64)
test_eq(round_multiple(40,32), 32)
test_eq(round_multiple( 0,32),  0)
test_eq(round_multiple(63,32, round_down=True), 32)
test_eq(round_multiple((63,40),32), (64,32))

source

set_num_threads

 set_num_threads (nt)

Get numpy (and others) to use nt threads

This sets the number of threads consistently for many tools, by:

  1. Set the following environment variables equal to nt: OPENBLAS_NUM_THREADS,NUMEXPR_NUM_THREADS,OMP_NUM_THREADS,MKL_NUM_THREADS
  2. Sets nt threads for numpy and pytorch.

source

join_path_file

 join_path_file (file, path, ext='')

Return path/file if file is a string or a Path, file otherwise

path = Path.cwd()/'_tmp'/'tst'
f = join_path_file('tst.txt', path)
assert path.exists()
test_eq(f, path/'tst.txt')
with open(f, 'w') as f_: assert join_path_file(f_, path) == f_
shutil.rmtree(Path.cwd()/'_tmp')

source

autostart

 autostart (g)

Decorator that automatically starts a generator


source

EventTimer

 EventTimer (store=5, span=60)

An event timer with history of store items of time span

Add events with add, and get number of events and their frequency (freq).

# Random wait function for testing
def _randwait(): yield from (sleep(random.random()/200) for _ in range(100))

c = EventTimer(store=5, span=0.03)
for o in _randwait(): c.add(1)
print(f'Num Events: {c.events}, Freq/sec: {c.freq:.01f}')
print('Most recent: ', sparkline(c.hist), *L(c.hist).map('{:.01f}'))
Num Events: 13, Freq/sec: 451.7
Most recent:  ▁▁▁▂▇ 264.5 257.0 278.7 293.7 363.0

source

stringfmt_names

 stringfmt_names (s:str)

Unique brace-delimited names in s

s = '/pulls/{pull_number}/reviews/{review_id}'
test_eq(stringfmt_names(s), ['pull_number','review_id'])

source

PartialFormatter

 PartialFormatter ()

A string.Formatter that doesn’t error on missing fields, and tracks missing fields and unused args


source

partial_format

 partial_format (s:str, **kwargs)

string format s, ignoring missing field errors, returning missing and extra fields

The result is a tuple of (formatted_string,missing_fields,extra_fields), e.g:

res,missing,xtra = partial_format(s, pull_number=1, foo=2)
test_eq(res, '/pulls/1/reviews/{review_id}')
test_eq(missing, ['review_id'])
test_eq(xtra, {'foo':2})

source

utc2local

 utc2local (dt:datetime.datetime)

Convert dt from UTC to local time

dt = datetime(2000,1,1,12)
print(f'{dt} UTC is {utc2local(dt)} local time')
2000-01-01 12:00:00 UTC is 2000-01-01 22:00:00+10:00 local time

source

local2utc

 local2utc (dt:datetime.datetime)

Convert dt from local to UTC time

print(f'{dt} local is {local2utc(dt)} UTC time')
2000-01-01 12:00:00 local is 2000-01-01 02:00:00+00:00 UTC time

source

trace

 trace (f)

Add set_trace to an existing function f

You can add a breakpoint to an existing function, e.g:

Path.cwd = trace(Path.cwd)
Path.cwd()

Now, when the function is called it will drop you into the debugger. Note, you must issue the s command when you begin to step into the function that is being traced.


source

modified_env

 modified_env (*delete, **replace)

Context manager temporarily modifying os.environ by deleting delete and replacing replace

# USER isn't in Cloud Linux Environments
env_test = 'USERNAME' if sys.platform == "win32" else 'SHELL'
oldusr = os.environ[env_test]

replace_param = {env_test: 'a'}
with modified_env('PATH', **replace_param):
    test_eq(os.environ[env_test], 'a')
    assert 'PATH' not in os.environ

assert 'PATH' in os.environ
test_eq(os.environ[env_test], oldusr)

source

ContextManagers

 ContextManagers (mgrs)

Wrapper for contextlib.ExitStack which enters a collection of context managers


source

shufflish

 shufflish (x, pct=0.04)

Randomly relocate items of x up to pct of len(x) from their starting location


source

console_help

 console_help (libname:str)

Show help for all console scripts from libname

Type Details
libname str name of library for console script listing

source

hl_md

 hl_md (s, lang='xml', show=True)

Syntax highlight s using lang.

When we display code in a notebook, it’s nice to highlight it, so we create a function to simplify that:

hl_md('<test><xml foo="bar">a child</xml></test>')
<test><xml foo="bar">a child</xml></test>

source

type2str

 type2str (typ:type)

Stringify typ

test_eq(type2str(Optional[float]), 'Union[float, None]')

source

dataclass_src

 dataclass_src (cls)
DC = make_dataclass('DC', [('x', int), ('y', Optional[float], None), ('z', float, None)])
print(dataclass_src(DC))
@dataclass
class DC:
    x: int
    y: Union[float, None] = None
    z: float = None

source

Unset

 Unset (value, names=None, module=None, qualname=None, type=None, start=1)

An enumeration.


source

nullable_dc

 nullable_dc (cls)

Like dataclass, but default of UNSET added to fields without defaults

@nullable_dc
class Person: name: str; age: int; city: str = "Unknown"
Person(name="Bob")
Person(name='Bob', age=UNSET, city='Unknown')

source

make_nullable

 make_nullable (clas)
@dataclass
class Person: name: str; age: int; city: str = "Unknown"

make_nullable(Person)
Person("Bob", city='NY')
Person(name='Bob', age=UNSET, city='NY')
Person(name="Bob")
Person(name='Bob', age=UNSET, city='Unknown')
Person("Bob", 34)
Person(name='Bob', age=34, city='Unknown')

source

flexiclass

 flexiclass (cls)

Convert cls into a dataclass like make_nullable. Converts in place and also returns the result.

Type Details
cls The class to convert
Returns dataclass

This can be used as a decorator…

@flexiclass
class Person: name: str; age: int; city: str = "Unknown"

bob = Person(name="Bob")
bob
Person(name='Bob', age=UNSET, city='Unknown')

…or can update the behavior of an existing class (or dataclass):

class Person: name: str; age: int; city: str = "Unknown"

flexiclass(Person)
bob = Person(name="Bob")
bob
Person(name='Bob', age=UNSET, city='Unknown')

Action occurs in-place:

class Person: name: str; age: int; city: str = "Unknown"

flexiclass(Person)
is_dataclass(Person)
True

source

asdict

 asdict (o)

Convert o to a dict, supporting dataclasses, namedtuples, iterables, and __dict__ attrs.

Any UNSET values are not included.

asdict(bob)
{'name': 'Bob', 'city': 'Unknown'}

To customise dict conversion behavior for a class, implement the _asdict method (this is used in the Python stdlib for named tuples).


source

is_typeddict

 is_typeddict (cls:type)

Check if cls is a TypedDict

class MyDict(TypedDict): name:str

assert is_typeddict(MyDict)
assert not is_typeddict({'a':1})

source

is_namedtuple

 is_namedtuple (cls)

True if cls is a namedtuple type

assert is_namedtuple(namedtuple('tst', ['a']))
assert not is_namedtuple(tuple)

source

CachedIter

 CachedIter (o)

Cache the result returned by an iterator

def f():
    yield 1
    return 2

r = CachedIter(f())
for o in r: print(o)
r.value
1
2

source

CachedAwaitable

 CachedAwaitable (o)

Cache the result from an awaitable


source

reawaitable

 reawaitable (func:<built-infunctioncallable>)

Wraps the result of an asynchronous function into an object which can be awaited more than once

CachedCoro and reawaitable are partly based on python issue tracker code from Serhiy Storchaka. They allow an awaitable to be called multiple times.

@reawaitable
async def fetch_data():
    await asyncio.sleep(0.1)
    return "data"

r = fetch_data()
print(await r)  # "data"
print(await r)  # "data" (no delay)
data
data

source

flexicache

 flexicache (*funcs, maxsize=128)

Like lru_cache, but customisable with policy funcs

This is a flexible lru cache function that you can pass a list of functions to. Those functions define the cache eviction policy. For instance, time_policy is provided for time-based cache eviction, and mtime_policy evicts based on a file’s modified-time changing. The policy functions are passed the last value that function returned was (initially None), and return a new value to indicate the cache has expired. When the cache expires, all functions are called with None to force getting new values.


source

time_policy

 time_policy (seconds)

A flexicache policy that expires cached items after seconds have passed


source

mtime_policy

 mtime_policy (filepath)

A flexicache policy that expires cached items after filepath modified-time changes

@flexicache(time_policy(10), mtime_policy('000_tour.ipynb'))
def cached_func(x, y): return x+y

cached_func(1,2)
3
@flexicache(time_policy(10), mtime_policy('000_tour.ipynb'))
async def cached_func(x, y): return x+y

print(await cached_func(1,2))
await cached_func(1,2)
3
3

source

timed_cache

 timed_cache (seconds=60, maxsize=128)

Like lru_cache, but also with time-based eviction

# demonstrate that flexicache is LRU
@flexicache(maxsize=2)
def cached_func(x): return time()

time_1 = cached_func(1)
test_eq(time_1, cached_func(1))

time_2 = cached_func(2)
test_eq(time_1, cached_func(1))
test_eq(time_2, cached_func(2))

time_3 = cached_func(3) # Removes 1

test_eq(time_2, cached_func(2)) # cache remains
test_eq(time_3, cached_func(3)) # cache remains
test_ne(time_1, cached_func(1)) # NEQ, removes 2
test_ne(time_2, cached_func(2))  # NEQ, removes 3
test_eq(cached_func(1), cached_func(1))

This function is a small convenience wrapper for using flexicache with time_policy.

@timed_cache(seconds=0.05, maxsize=2)
def cached_func(x): return x * 2, time()

# basic caching
result1, time1 = cached_func(2)
test_eq(result1, 4)
sleep(0.001)
result2, time2 = cached_func(2)
test_eq(result2, 4)
test_eq(time1, time2)

# caching different values
result3, _ = cached_func(3)
test_eq(result3, 6)

# maxsize
_, time4 = cached_func(4)
_, time2_new = cached_func(2)
test_close(time2, time2_new, eps=0.1)
_, time3_new = cached_func(3)
test_ne(time3_new, time())

# time expiration
sleep(0.05)
_, time4_new = cached_func(4)
test_ne(time4_new, time())