Utility functions

Utility functions used in the fastai library

File Functions

Utilities (other than extensions to Pathlib.Path) for dealing with IO.


source

walk


def walk(
    path:Path \| str, # path to start searching
    symlinks:bool=True, # follow symlinks?
    keep_file:callable=<function ret_true at 0x7faa717b3490>, # function that returns True for wanted files
    keep_folder:callable=<function ret_true at 0x7faa717b3490>, # function that returns True for folders to enter
    skip_folder:callable=<function ret_false at 0x7faa717b3520>, # function that returns True for folders to skip
    func:callable=<function join at 0x7faa71f76e60>, # function to apply to each matched file
    ret_folders:bool=False, # return folders, not just files
    sort:bool=True, # sort files by name within each folder
):

Generator version of os.walk, using functions to filter files and folders


source

exttypes


def exttypes(
    types
):

Get exts for comma-separated or list typ; if not found in list, return list with just types. Supported: py, js, java, c, cpp, rb, r, ex, sh, web, doc, cfg

print(exttypes('py,doc'))
print(exttypes('zig,txt'))
['ipynb', 'py', 'md', 'rst']
['zig', 'txt']

source

globtastic


def globtastic(
    path:Path \| str='.', # path to start searching
    recursive:bool=True, # search subfolders
    symlinks:bool=True, # follow symlinks?
    file_glob:str=None, # Only include files matching glob
    file_re:str=None, # Only include files matching regex
    folder_re:str=None, # Only enter folders matching regex
    skip_file_glob:str=None, # Skip files matching glob
    skip_file_re:str=None, # Skip files matching regex
    skip_folder_re:str=None, # Skip folders matching regex,
    func:callable=<function join at 0x7faa71f76e60>, # function to apply to each matched file
    ret_folders:bool=False, # return folders, not just files
    sort:bool=True, # sort files by name within each folder
    types:str \| list=None, # list or comma-separated str of ext types from: py, js, java, c, cpp, rb, r, ex, sh, web, doc, cfg
    exts:str \| list=None, # list or comma-separated str of exts to include
)->L: # Paths to matched files

A more powerful glob, including regex matches, symlink handling, and skip parameters

globtastic('.', skip_folder_re='^[_.]', folder_re='core', file_glob='*.*py*', file_re='c')
['./fastcore/basics.py', './fastcore/dispatch.py', './fastcore/docments.py', './fastcore/docscrape.py', './fastcore/script.py']
globtastic(skip_folder_re='^[_.]', folder_re='core', types='py', file_re='c', skip_file_re='^_', sort=True)
['./fastcore/all.py', './fastcore/ansi.py', './fastcore/basics.py', './fastcore/dispatch.py', './fastcore/docments.py', './fastcore/docscrape.py', './fastcore/foundation.py', './fastcore/imghdr.py', './fastcore/imports.py', './fastcore/meta.py', './fastcore/nb_imports.py', './fastcore/net.py', './fastcore/parallel.py', './fastcore/py2pyi.py', './fastcore/script.py', './fastcore/shutil.py', './fastcore/style.py', './fastcore/test.py', './fastcore/tools.py', './fastcore/transform.py', './fastcore/utils.py', './fastcore/xdg.py', './fastcore/xml.py', './fastcore/xtras.py']

source

pglob


def pglob(
    path:Path \| str='.', # path to start searching
    func:callable=<class 'pathlib.Path'>, # function to apply to each matched file
    recursive:bool=True, symlinks:bool=True, file_glob:str=None, file_re:str=None, folder_re:str=None,
    skip_file_glob:str=None, skip_file_re:str=None, skip_folder_re:str=None, ret_folders:bool=False, sort:bool=True,
    types:str \| list=None, exts:str \| list=None
)->L: # Paths to matched files

Shortcut for globtastic(..., call=Path)

pglob('..', skip_folder_re='^[_.]', types='doc', skip_file_re='^_')[:6]
[Path('../CHANGELOG.md'), Path('../CODE_OF_CONDUCT.md'), Path('../CONTRIBUTING.md'), Path('../README.md')]

source

maybe_open


def maybe_open(
    f, mode:str='r', kwargs:VAR_KEYWORD
):

Context manager: open f if it is a path (and close on exit)

This is useful for functions where you want to accept a path or file. maybe_open will not close your file handle if you pass one in.

def _f(fn):
    with maybe_open(fn) as f: return f.encoding

fname = '00_test.ipynb'
sys_encoding = 'cp1252' if sys.platform == 'win32' else 'utf-8'
test_eq(_f(fname).lower(), sys_encoding)
with open(fname) as fh: test_eq(_f(fh).lower(), sys_encoding)

For example, we can use this to reimplement imghdr.what from the Python standard library, which is written in Python 3.9 as:

from fastcore import imghdr
def what(file, h=None):
    f = None
    try:
        if h is None:
            if isinstance(file, (str,os.PathLike)):
                f = open(file, 'rb')
                h = f.read(32)
            else:
                location = file.tell()
                h = file.read(32)
                file.seek(location)
        for tf in imghdr.tests:
            res = tf(h, f)
            if res: return res
    finally:
        if f: f.close()
    return None

Here’s an example of the use of this function:

fname = 'images/puppy.jpg'
what(fname)
'jpeg'

With maybe_open, Self, and L.map_first, we can rewrite this in a much more concise and (in our opinion) clear way:

def what(file, h=None):
    if h is None:
        with maybe_open(file, 'rb') as f: h = f.peek(32)
    return L(imghdr.tests).map_first(Self(h,file))

…and we can check that it still works:

test_eq(what(fname), 'jpeg')

…along with the version passing a file handle:

with open(fname,'rb') as f: test_eq(what(f), 'jpeg')

…along with the h parameter version:

with open(fname,'rb') as f: test_eq(what(None, h=f.read(32)), 'jpeg')

source

mkdir


def mkdir(
    path, exist_ok:bool=False, parents:bool=False, overwrite:bool=False, kwargs:VAR_KEYWORD
):

Creates and returns a directory defined by path, optionally removing previous existing directory if overwrite is True

with tempfile.TemporaryDirectory() as d:
    path = Path(os.path.join(d, 'new_dir'))
    new_dir = mkdir(path)
    assert new_dir.exists()
    test_eq(new_dir, path)
        
    # test overwrite
    with open(new_dir/'test.txt', 'w') as f: f.writelines('test')
    test_eq(len(list(walk(new_dir))), 1) # assert file is present
    new_dir = mkdir(new_dir, overwrite=True)
    test_eq(len(list(walk(new_dir))), 0) # assert file was deleted

source

image_size


def image_size(
    fn
):

Tuple of (w,h) for png, gif, or jpg; None otherwise

test_eq(image_size(fname), (1200,803))
from PIL import Image
from IPython.display import Image as IPImage
img = Image.new('RGB', (50, 50), color='red')
img


source

img_bytes


def img_bytes(
    img, fmt:str='PNG'
):
ib = img_bytes(img)
IPImage(ib)


source

detect_mime


def detect_mime(
    data
):

Get the MIME type for bytes data, covering common PDF, audio, video, and image types

detect_mime(ib)
'image/png'

source

bunzip


def bunzip(
    fn
):

bunzip fn, raising exception if output already exists

f = Path('files/test.txt')
if f.exists(): f.unlink()
bunzip('files/test.txt.bz2')
t = f.open().readlines()
test_eq(len(t),1)
test_eq(t[0], 'test\n')
f.unlink()

source

loads


def loads(
    s, kw:VAR_KEYWORD
):

Same as json.loads, but handles None


source

loads_multi


def loads_multi(
    s:str
):

Generator of >=0 decoded json dicts, possibly with non-json ignored text at start and end

tst = """
# ignored
{ "a":1 }
hello
{
"b":2
}
"""

test_eq(list(loads_multi(tst)), [{'a': 1}, {'b': 2}])

source

dumps


def dumps(
    obj, kw:VAR_KEYWORD
):

Same as json.dumps, but uses ujson if available


source

untar_dir


def untar_dir(
    fname, dest, rename:bool=False, overwrite:bool=False
):

untar file into dest, creating a directory if the root contains more than one item

def test_untar(foldername, rename=False, **kwargs):
    with tempfile.TemporaryDirectory() as d:
        nm = os.path.join(d, 'a')
        shutil.make_archive(nm, 'gztar', **kwargs)
        with tempfile.TemporaryDirectory() as d2:
            d2 = Path(d2)
            untar_dir(nm+'.tar.gz', d2, rename=rename)
            test_eq(d2.ls(), [d2/foldername])

If the contents of fname contain just one file or directory, it is placed directly in dest:

# using `base_dir` in `make_archive` results in `images` directory included in file names
test_untar('images', base_dir='images')

If rename then the directory created is named based on the archive, without extension:

test_untar('a', base_dir='images', rename=True)

If the contents of fname contain multiple files and directories, a new folder in dest is created with the same name as fname (but without extension):

# using `root_dir` in `make_archive` results in `images` directory *not* included in file names
test_untar('a', root_dir='images')

source

repo_details


def repo_details(
    url
):

Tuple of owner,name from ssh or https git repo url

test_eq(repo_details('https://github.com/fastai/fastai.git'), ['fastai', 'fastai'])
test_eq(repo_details('[email protected]:fastai/nbdev.git\n'), ['fastai', 'nbdev'])

source

shell


def shell(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

Shortcut for subprocess.run(shell=True)


source

ssh


def ssh(
    host, args:str='', user:str='ubuntu', sock:NoneType=None
):

Run SSH command with given arguments


source

rsync_multi


def rsync_multi(
    ip, files, user:str='ubuntu', persist:str='5m'
):

Transfer multiple files with rename using persistent SSH connection


source

run


def run(
    cmd, rest:VAR_POSITIONAL, same_in_win:bool=False, ignore_ex:bool=False, as_bytes:bool=False, stderr:bool=True
):

Pass cmd (splitting with shlex if string) to subprocess.run; return stdout; raise IOError if fails

You can pass a string (which will be split based on standard shell rules), a list, or pass args directly:

run('echo', same_in_win=True)
run('pip', '--version', same_in_win=True)
run(['pip', '--version'], same_in_win=True)
'pip 25.3 from /Users/jhoward/aai-ws/.venv/lib/python3.12/site-packages/pip (python 3.12)'
if sys.platform == 'win32':
    assert 'ipynb' in run('cmd /c dir /p')
    assert 'ipynb' in run(['cmd', '/c', 'dir', '/p'])
    assert 'ipynb' in run('cmd', '/c', 'dir',  '/p')
else:
    assert 'ipynb' in run('ls -ls')
    assert 'ipynb' in run(['ls', '-l'])
    assert 'ipynb' in run('ls', '-l')

Some commands fail in non-error situations, like grep. Use ignore_ex in those cases, which will return a tuple of stdout and returncode:

if sys.platform == 'win32':
    test_eq(run('cmd /c findstr asdfds 00_test.ipynb', ignore_ex=True)[0], 1)
else:
    test_eq(run('grep asdfds 00_test.ipynb', ignore_ex=True)[0], 1)

run automatically decodes returned bytes to a str. Use as_bytes to skip that:

if sys.platform == 'win32':
    test_eq(run('cmd /c echo hi'), 'hi')
else:
    test_eq(run('echo hi', as_bytes=True), b'hi\n')

source

open_file


def open_file(
    fn, mode:str='r', kwargs:VAR_KEYWORD
):

Open a file, with optional compression if gz or bz2 suffix


source

save_pickle


def save_pickle(
    fn, o
):

Save a pickle file, to a file name or opened file


source

load_pickle


def load_pickle(
    fn
):

Load a pickle file from a file name or opened file

for suf in '.pkl','.bz2','.gz':
    # delete=False is added for Windows
    # https://stackoverflow.com/questions/23212435/permission-denied-to-write-to-my-temporary-file
    with tempfile.NamedTemporaryFile(suffix=suf, delete=False) as f:
        fn = Path(f.name)
        save_pickle(fn, 't')
        t = load_pickle(fn)
    f.close()
    test_eq(t,'t')

source

parse_env


def parse_env(
    s:str=None, fn:Union[str, Path]=None
)->dict:

Parse a shell-style environment string or file

testf = """# comment
   # another comment
 export FOO="bar#baz"
BAR=thing # comment "ok"
  baz='thong'
QUX=quux
export ZAP = "zip" # more comments
   FOOBAR = 42   # trailing space and comment"""

exp = dict(FOO='bar#baz', BAR='thing', baz='thong', QUX='quux', ZAP='zip', FOOBAR='42')

test_eq(parse_env(testf),  exp)

source

expand_wildcards


def expand_wildcards(
    code
):

Expand all wildcard imports in the given code string.

inp = """from math import *
from os import *
from random import *
def func(): return sin(pi) + path.join('a', 'b') + randint(1, 10)"""

exp = """from math import pi, sin
from os import path
from random import randint
def func(): return sin(pi) + path.join('a', 'b') + randint(1, 10)"""

test_eq(expand_wildcards(inp), exp)

inp = """from itertools import *
def func(): pass"""
test_eq(expand_wildcards(inp), inp)

inp = """def outer():
    from math import *
    def inner():
        from os import *
        return sin(pi) + path.join('a', 'b')"""

exp = """def outer():
    from math import pi, sin
    def inner():
        from os import path
        return sin(pi) + path.join('a', 'b')"""

test_eq(expand_wildcards(inp), exp)

Collections


source

dict2obj


def dict2obj(
    d:NoneType=None, list_func:_L_Meta=<class 'fastcore.foundation.L'>,
    dict_func:type=<class 'fastcore.basics.AttrDict'>, kwargs:VAR_KEYWORD
):

Convert (possibly nested) dicts (or lists of dicts) to AttrDict

This is a convenience to give you “dotted” access to (possibly nested) dictionaries, e.g:

d1 = dict(a=1, b=dict(c=2,d=3))
d2 = dict2obj(d1)
test_eq(d2.b.c, 2)
test_eq(d2.b['c'], 2)

kwargs can also be used:

d3 = dict2obj(a=1, b=dict(c=2,d=3))
test_eq(d3.b.c, 2)
test_eq(d3.b['c'], 2)

It can also be used on lists of dicts.

_list_of_dicts = [d1, d1]
ds = dict2obj(_list_of_dicts)
test_eq(ds[0].b.c, 2)

source

obj2dict


def obj2dict(
    d
):

Convert (possibly nested) AttrDicts (or lists of AttrDicts) to dict

obj2dict can be used to reverse what is done by dict2obj:

test_eq(obj2dict(d2), d1)
test_eq(obj2dict(ds), _list_of_dicts)

source

repr_dict


def repr_dict(
    d
):

Print nested dicts and lists, such as returned by dict2obj

print(repr_dict(d2))
- a: 1
- b: 
  - c: 2
  - d: 3

source

is_listy


def is_listy(
    x
):

isinstance(x, (tuple,list,L,slice,Generator))

assert is_listy((1,))
assert is_listy([1])
assert is_listy(L([1]))
assert is_listy(slice(2))
assert not is_listy(array([1]))

source

mapped


def mapped(
    f, it
):

map f over it, unless it’s not listy, in which case return f(it)

def _f(x,a=1): return x-a

test_eq(mapped(_f,1),0)
test_eq(mapped(_f,[1,2]),[0,1])
test_eq(mapped(_f,(1,)),(0,))

Extensions to Pathlib.Path

The following methods are added to the standard python libary Pathlib.Path.


source

Path.readlines


def readlines(
    hint:int=-1, encoding:str='utf8'
):

Read the content of self


source

Path.read_json


def read_json(
    encoding:NoneType=None, errors:NoneType=None
):

Same as read_text followed by loads


source

Path.mk_write


def mk_write(
    data, encoding:NoneType=None, errors:NoneType=None, mode:int=511, uid:int=-1, gid:int=-1
):

Make all parent dirs of self, and write data


source

Path.write_json


def write_json(
    data, encoding:NoneType=None, errors:NoneType=None, mode:int=511, uid:int=-1, gid:int=-1, kw:VAR_KEYWORD
):

Same as dumpsfollowed by mk_write


source

Path.relpath


def relpath(
    start:NoneType=None
):

Same as os.path.relpath, but returns a Path, and resolves symlinks

p = Path('../fastcore/').resolve()
p
Path('/Users/jhoward/aai-ws/fastcore/fastcore')
p.relpath(Path.cwd())
Path('../fastcore')

source

Path.ls


def ls(
    n_max:NoneType=None, file_type:NoneType=None, file_exts:NoneType=None
):

Contents of path as a list

We add an ls() method to pathlib.Path which is simply defined as list(Path.iterdir()), mainly for convenience in REPL environments such as notebooks.

path = Path()
t = path.ls()
assert len(t)>0
t1 = path.ls(10)
test_eq(len(t1), 10)
t2 = path.ls(file_exts='.ipynb')
assert len(t)>len(t2)
t[0]
Path('llms.txt')

You can also pass an optional file_type MIME prefix and/or a list of file extensions.

lib_path = (path/'../fastcore')
txt_files=lib_path.ls(file_type='text')
assert len(txt_files) > 0 and txt_files[0].suffix=='.py'
ipy_files=path.ls(file_exts=['.ipynb'])
assert len(ipy_files) > 0 and ipy_files[0].suffix=='.ipynb'
txt_files[0],ipy_files[0]
(Path('../fastcore/shutil.py'), Path('000_tour.ipynb'))

source

Path.normpath


def normpath(
    
):

Normalize path, eliminating double slashes, etc.

normpath normalizes a path by collapsing redundant separators and up-level references (e.g., ..).

p = Path('foo//bar/../baz')
p.normpath()
Path('foo/baz')

source

Path.__repr__


def __repr__(
    
):

Return repr(self).

fastai also updates the repr of Path such that, if Path.BASE_PATH is defined, all paths are printed relative to that path (as long as they are contained in Path.BASE_PATH:

t = ipy_files[0].absolute()
try:
    Path.BASE_PATH = t.parent.parent
    test_eq(repr(t), f"Path('nbs/{t.name}')")
finally: Path.BASE_PATH = None

source

Path.delete


def delete(
    
):

Delete a file, symlink, or directory tree

Reindexing Collections


source

ReindexCollection


def ReindexCollection(
    coll, idxs:NoneType=None, cache:NoneType=None, tfm:function=<function noop at 0x7faa71946c20>
):

Reindexes collection coll with indices idxs and optional LRU cache of size cache

This is useful when constructing batches or organizing data in a particular manner (i.e. for deep learning). This class is primarly used in organizing data for language models in fastai.

You can supply a custom index upon instantiation with the idxs argument, or you can call the reindex method to supply a new index for your collection.

Here is how you can reindex a list such that the elements are reversed:

rc=ReindexCollection(['a', 'b', 'c', 'd', 'e'], idxs=[4,3,2,1,0])
list(rc)
['e', 'd', 'c', 'b', 'a']

Alternatively, you can use the reindex method:


source

ReindexCollection.reindex

def reindex(
    idxs
):

Replace self.idxs with idxs

rc=ReindexCollection(['a', 'b', 'c', 'd', 'e'])
rc.reindex([4,3,2,1,0])
list(rc)
['e', 'd', 'c', 'b', 'a']

You can optionally specify a LRU cache, which uses functools.lru_cache upon instantiation:

sz = 50
t = ReindexCollection(L.range(sz), cache=2)

#trigger a cache hit by indexing into the same element multiple times
t[0], t[0]
t._get.cache_info()
CacheInfo(hits=1, misses=1, maxsize=2, currsize=1)

You can optionally clear the LRU cache by calling the cache_clear method:


source

ReindexCollection.cache_clear

def cache_clear(
    
):

Clear LRU cache

sz = 50
t = ReindexCollection(L.range(sz), cache=2)

#trigger a cache hit by indexing into the same element multiple times
t[0], t[0]
t.cache_clear()
t._get.cache_info()
CacheInfo(hits=0, misses=0, maxsize=2, currsize=0)

source

ReindexCollection.shuffle

def shuffle(
    
):

Randomly shuffle indices

Note that an ordered index is automatically constructed for the data structure even if one is not supplied.

rc=ReindexCollection(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])
rc.shuffle()
list(rc)
['b', 'a', 'g', 'h', 'd', 'e', 'c', 'f']
sz = 50
t = ReindexCollection(L.range(sz), cache=2)
test_eq(list(t), range(sz))
test_eq(t[sz-1], sz-1)
test_eq(t._get.cache_info().hits, 1)
t.shuffle()
test_eq(t._get.cache_info().hits, 1)
test_ne(list(t), range(sz))
test_eq(set(t), set(range(sz)))
t.cache_clear()
test_eq(t._get.cache_info().hits, 0)
test_eq(t.count(0), 1)

SaveReturn and save_iter Variants

These utilities solve a common problem in Python: how to extract additional information from generator functions beyond just the yielded values.

In Python, generator functions can yield values and also return a final value, but the return value is normally lost when you iterate over the generator:

def example_generator():
    total = 0
    for i in range(3):
        total += i
        yield i
    return total  # This gets lost!

# The return value (3) is lost
values = list(example_generator())  # [0, 1, 2]

source

SaveReturn


def SaveReturn(
    its
):

Wrap an iterator such that the generator function’s return value is stored in .value

Exported source
class SaveReturn:
    "Wrap an iterator such that the generator function's return value is stored in `.value`"
    def __init__(self, its): self.its = its
    def __iter__(self):
        self.value = yield from self.its
        return self.value

SaveReturn is the simplest approach to solving this problem - it wraps any existing (non-async) generator and captures its return value. This works because yield from (used internally in SaveReturn) returns the value from the return of the generator function.

def sum_range(n):
    total = 0
    for i in range(n):
        total += i
        yield i
    return total  # This value is returned by yield from

sr = SaveReturn(sum_range(5))
values = list(sr)  # This will consume the generator and get the return value
print(f"Values: {values}")
sr.value
Values: [0, 1, 2, 3, 4]
10

In order to provide an accurate signature for save_iter, we need a version of wraps that removes leading parameters:


source

trim_wraps


def trim_wraps(
    f, n:int=1
):

Like wraps, but removes the first n parameters from the signature

trim_wraps is a decorator factory that works like functools.wraps, but removes the first n parameters from the wrapped function’s signature. This is useful when creating wrapper functions that consume some parameters internally and shouldn’t expose them in the public API.

def adder(base, x, y): return base + x + y

def make_adder(base_value):
    @trim_wraps(adder)
    def _(x, y): return adder(base_value, x, y)
    return _

add_10 = make_adder(10)
print(f"{add_10.__name__}{inspect.signature(add_10)}")
adder(x, y)

source

save_iter


def save_iter(
    g
):

Decorator that allows a generator function to store values in the returned iterator object

save_iter modifies generator functions to store state in the iterator object itself. The generator receives an object as its first parameter, which it can use to store attributes. You can store values during iteration, not just at the end, and you can store multiple attributes if needed.

@save_iter
def sum_range(o, n):  # Note: 'o' parameter added
    total = 0
    for i in range(n):
        total += i
        yield i
    o.value = total  # Store directly on the iterator object

Because iternally save_iter uses trim_wraps, the signature of sum_range correctly shows that you should not pass o to it; it’s injected by the decorating function.

print(sum_range.__signature__)
(n)
sr = sum_range(5)
print(f"Values: {list(sr)}")
print(f"Sum stored: {sr.value}")
Values: [0, 1, 2, 3, 4]
Sum stored: 10

source

asave_iter


def asave_iter(
    g
):

Like save_iter, but for async iterators

asave_iter provides the same functionality as save_iter, but for async generator functions. yield from and return can not be used with async generator functions, so SaveReturn can’t be used here.

@asave_iter
async def asum_range(self, n):
    total = 0
    for i in range(n):
        total += i
        yield i
    self.value = total

asr = asum_range(5)
print(f"Values: {[o async for o in asr]}")
print(f"Sum stored: {asr.value}")
Values: [0, 1, 2, 3, 4]
Sum stored: 10

Other Helpers


source

unqid


def unqid(
    seeded:bool=False
):

Generate a unique id suitable for use as a Python identifier

unqid generates a random unique identifier that is safe to use as a Python variable name (starts with _, uses only alphanumeric characters and underscores). It’s based on UUID4, encoded in URL-safe base64.

If seeded=True, uses random.getrandbits which respects random.seed(), making it reproducible. Otherwise uses uuid4() which is always random.

unqid()
'_7WDcaL3JT7qV037u3Werzw'

With seeding for reproducibility:

random.seed(42)
a = unqid(seeded=True)
random.seed(42)
b = unqid(seeded=True)
test_eq(a, b)

Without seeding - always unique:

test_ne(unqid(), unqid())

source

rtoken_hex


def rtoken_hex(
    nbytes:int=16, # Number of bytes to generate
)->str: # hex string of length nbytes*2

Generate a random hex string using Python’s random module.

This is the same as secrets.token_hex, but is reproducible/seedable.

import secrets
secrets.token_hex(4),rtoken_hex(4)
('408ea190', '8c7d7247')

source

friendly_name


def friendly_name(
    levels:int=3, suffix:int=4
):

Generate a random human-readable name with customizable word levels and suffix length

friendly_name generates random, human-readable names by combining adjectives, nouns, verbs, and adverbs with a random alphanumeric suffix. This is useful for creating memorable identifiers for temporary files, test data, or user-friendly resource names.

friendly_name()  # Default: 3 word levels + 4-char suffix
'objective-forest-builds-0y6d'

Names are hyphen-separated and follow the pattern adjective-noun-verb-adverb, randomly chosen from lists of size 102, 116, 110, and 30, respectively. The levels param selects how many of the names to include:

friendly_name(2)  # 2 words + 4-char suffix
'lavender-hummingbird-divu'

suffix sets the length of the random alphanumeric ending. Each suffix item is taken from the 36 options of lowercase letters plus digits.

friendly_name(4, 6)  # All 4 word types + 6-char suffix
'elated-koala-begins-softly-zpqk51'

source

n_friendly_names


def n_friendly_names(
    levels:int=3, suffix:int=4
):

Number of possible combos for `friendly_names

The number of combinations if all levels are included is:

print(f'{n_friendly_names(4):,}')
65,581,614,489,600

The default settings give:

print(f'{n_friendly_names():,}')
2,186,053,816,320

source

exec_eval


def exec_eval(
    code, # Code to exec/eval
    g:NoneType=None, # Globals namespace dict
    l:NoneType=None, # Locals namespace dict
):

Evaluate code in g (defaults to globals()) and l (defaults to locals())

This is a combination of eval and exec, which behaves like ipython and Jupyter. If the last line is an expression, it is evaluated and the result is returned:

exec_eval('''
def f(x): return x+1
f(1)
''')
2

By default, the code uses the caller’s globals and locals. For instance, here f is available since it’s been added to our symbol table:

exec_eval('print(f(2))')
3

Pass a dict as the g param in order to use an arbitrary namespace:

exec_eval('print(f)', {'f': 'Hi I am f.'})
Hi I am f.

This function helps us identify the first declared raw function of a dispatched function:

from plum import Function
def f1(x): return "Any"
def f2(x:int): return "Int"

df = Function(f1).dispatch(f1).dispatch(f2)

test_eq(_unwrapped_type_dispatch_func(df), f1)

source

sparkline


def sparkline(
    data, mn:NoneType=None, mx:NoneType=None, empty_zero:bool=False
):

Sparkline for data, with Nones (and zero, if empty_zero) shown as empty column

data = [9,6,None,1,4,0,8,15,10]
print(f'without "empty_zero": {sparkline(data, empty_zero=False)}')
print(f'   with "empty_zero": {sparkline(data, empty_zero=True )}')
without "empty_zero": ▅▂ ▁▂▁▃▇▅
   with "empty_zero": ▅▂ ▁▂ ▃▇▅

You can set a maximum and minimum for the y-axis of the sparkline with the arguments mn and mx respectively:

sparkline([1,2,3,400], mn=0, mx=3)
'▂▅▇▇'

source

modify_exception


def modify_exception(
    e:Exception, # An exception
    msg:str=None, # A custom message
    replace:bool=False, # Whether to replace e.args with [msg]
)->Exception:

Modifies e with a custom message attached

msg = "This is my custom message!"

test_fail(lambda: (_ for _ in ()).throw(modify_exception(Exception(), None)), contains='')
test_fail(lambda: (_ for _ in ()).throw(modify_exception(Exception(), msg)), contains=msg)
test_fail(lambda: (_ for _ in ()).throw(modify_exception(Exception("The first message"), msg)), contains="The first message This is my custom message!")
test_fail(lambda: (_ for _ in ()).throw(modify_exception(Exception("The first message"), msg, True)), contains="This is my custom message!")

source

round_multiple


def round_multiple(
    x, mult, round_down:bool=False
):

Round x to nearest multiple of mult

test_eq(round_multiple(63,32), 64)
test_eq(round_multiple(50,32), 64)
test_eq(round_multiple(40,32), 32)
test_eq(round_multiple( 0,32),  0)
test_eq(round_multiple(63,32, round_down=True), 32)
test_eq(round_multiple((63,40),32), (64,32))

source

set_num_threads


def set_num_threads(
    nt
):

Get numpy (and others) to use nt threads

This sets the number of threads consistently for many tools, by:

  1. Set the following environment variables equal to nt: OPENBLAS_NUM_THREADS,NUMEXPR_NUM_THREADS,OMP_NUM_THREADS,MKL_NUM_THREADS
  2. Sets nt threads for numpy and pytorch.

source

join_path_file


def join_path_file(
    file, path, ext:str=''
):

Return path/file if file is a string or a Path, file otherwise

path = Path.cwd()/'_tmp'/'tst'
f = join_path_file('tst.txt', path)
assert path.exists()
test_eq(f, path/'tst.txt')
with open(f, 'w') as f_: assert join_path_file(f_, path) == f_
shutil.rmtree(Path.cwd()/'_tmp')

source

autostart


def autostart(
    g
):

Decorator that automatically starts a generator


source

EventTimer


def EventTimer(
    store:int=5, span:int=60
):

An event timer with history of store items of time span

Add events with add, and get number of events and their frequency (freq).

# Random wait function for testing
def _randwait(): yield from (sleep(random.random()/200) for _ in range(100))

c = EventTimer(store=5, span=0.03)
for o in _randwait(): c.add(1)
print(f'Num Events: {c.events}, Freq/sec: {c.freq:.01f}')
print('Most recent: ', sparkline(c.hist), *L(c.hist).map('{:.01f}'))
Num Events: 1, Freq/sec: 73.6
Most recent:  ▁▁▂▅▇ 33.0 26.9 54.1 89.8 120.6

source

stringfmt_names


def stringfmt_names(
    s:str
)->list:

Unique brace-delimited names in s

s = '/pulls/{pull_number}/reviews/{review_id}'
test_eq(stringfmt_names(s), ['pull_number','review_id'])

source

PartialFormatter


def PartialFormatter(
    
):

A string.Formatter that doesn’t error on missing fields, and tracks missing fields and unused args


source

partial_format


def partial_format(
    s:str, kwargs:VAR_KEYWORD
):

string format s, ignoring missing field errors, returning missing and extra fields

The result is a tuple of (formatted_string,missing_fields,extra_fields), e.g:

res,missing,xtra = partial_format(s, pull_number=1, foo=2)
test_eq(res, '/pulls/1/reviews/{review_id}')
test_eq(missing, ['review_id'])
test_eq(xtra, {'foo':2})

source

truncstr


def truncstr(
    s:str, maxlen:int, suf:str='…', space:str='', sizevar:str=None
)->str:

Truncate s to length maxlen, adding suffix suf if truncated

w = 'abacadabra'
test_eq(truncstr(w, 10), w)
test_eq(truncstr(w, 5), 'abac…')
test_eq(truncstr(w, 5, suf=''), 'abaca')
test_eq(truncstr(w, 11, space='_'), w+"_")
test_eq(truncstr(w, 10, space='_'), w[:-1]+'…')
test_eq(truncstr(w, 5, suf='!!'), 'aba!!')

sizevar lets you include the original string length in your suffix. E.g when you set sizevar='_n_', any {_n_} in your suffix gets replaced with the actual length of the string before truncation. For instance, here the (11) tells you the original string was 11 characters long:

test_eq(truncstr('hello world', 8, suf='…({_n_})', sizevar='_n_'), 'hel…(11)')

source

utc2local


def utc2local(
    dt:datetime
)->datetime:

Convert dt from UTC to local time

dt = datetime(2000,1,1,12)
print(f'{dt} UTC is {utc2local(dt)} local time')
2000-01-01 12:00:00 UTC is 2000-01-01 22:00:00+10:00 local time

source

local2utc


def local2utc(
    dt:datetime
)->datetime:

Convert dt from local to UTC time

print(f'{dt} local is {local2utc(dt)} UTC time')
2000-01-01 12:00:00 local is 2000-01-01 02:00:00+00:00 UTC time

source

trace


def trace(
    f
):

Add set_trace to an existing function f

You can add a breakpoint to an existing function, e.g:

Path.cwd = trace(Path.cwd)
Path.cwd()

Now, when the function is called it will drop you into the debugger. Note, you must issue the s command when you begin to step into the function that is being traced.


source

modified_env


def modified_env(
    delete:VAR_POSITIONAL, replace:VAR_KEYWORD
):

Context manager temporarily modifying os.environ by deleting delete and replacing replace

# USER isn't in Cloud Linux Environments
env_test = 'USERNAME' if sys.platform == "win32" else 'SHELL'
oldusr = os.environ[env_test]

replace_param = {env_test: 'a'}
with modified_env('PATH', **replace_param):
    test_eq(os.environ[env_test], 'a')
    assert 'PATH' not in os.environ

assert 'PATH' in os.environ
test_eq(os.environ[env_test], oldusr)

source

ContextManagers


def ContextManagers(
    mgrs
):

Wrapper for contextlib.ExitStack which enters a collection of context managers


source

shufflish


def shufflish(
    x, pct:float=0.04
):

Randomly relocate items of x up to pct of len(x) from their starting location


source

console_help


def console_help(
    libname:str, # name of library for console script listing
):

Show help for all console scripts from libname


source

hl_md


def hl_md(
    s, lang:str='html', show:bool=True
):

Syntax highlight s using lang.

When we display code in a notebook, it’s nice to highlight it, so we create a function to simplify that:

hl_md('<test><xml foo="bar">a child</xml></test>')
<test><xml foo="bar">a child</xml></test>

source

type2str


def type2str(
    typ:type
)->str:

Stringify typ

test_eq(type2str(Optional[float]), 'Union[float, None]')

source

dataclass_src


def dataclass_src(
    cls
):
DC = make_dataclass('DC', [('x', int), ('y', Optional[float], None), ('z', float, None)])
print(dataclass_src(DC))
@dataclass
class DC:
    x: int
    y: Union[float, None] = None
    z: float = None

source

Unset


def Unset(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

An enumeration.


source

nullable_dc


def nullable_dc(
    cls
):

Like dataclass, but default of UNSET added to fields without defaults

@nullable_dc
class Person: name: str; age: int; city: str = "Unknown"
Person(name="Bob")
Person(name='Bob', age=UNSET, city='Unknown')

source

make_nullable


def make_nullable(
    clas
):
@dataclass
class Person: name: str; age: int; city: str = "Unknown"

make_nullable(Person)
Person("Bob", city='NY')
Person(name='Bob', age=UNSET, city='NY')
Person(name="Bob")
Person(name='Bob', age=UNSET, city='Unknown')
Person("Bob", 34)
Person(name='Bob', age=34, city='Unknown')

source

flexiclass


def flexiclass(
    cls, # The class to convert
)->dataclass:

Convert cls into a dataclass like make_nullable. Converts in place and also returns the result.

This can be used as a decorator…

@flexiclass
class Person: name: str; age: int; city: str = "Unknown"

bob = Person(name="Bob")
bob
Person(name='Bob', age=UNSET, city='Unknown')

…or can update the behavior of an existing class (or dataclass):

class Person: name: str; age: int; city: str = "Unknown"

flexiclass(Person)
bob = Person(name="Bob")
bob
Person(name='Bob', age=UNSET, city='Unknown')

Action occurs in-place:

class Person: name: str; age: int; city: str = "Unknown"

flexiclass(Person)
is_dataclass(Person)
True

source

asdict


def asdict(
    o
)->dict:

Convert o to a dict, supporting dataclasses, namedtuples, iterables, and __dict__ attrs.

Any UNSET values are not included.

asdict(bob)
{'name': 'Bob', 'city': 'Unknown'}

Set the optional __flds__ parameter to customise the field list, and the optional __skip__ parameter to skip some names.

class CustomObj:
    def __init__(self): self.a,self.b,self.c,self.d = 1,2,3,4
    __flds__ = ['a','b','c','d']
    __skip__ = ['b']

obj = CustomObj()
test_eq(asdict(obj), {'a': 1, 'c': 3, 'd': 4})

To customise dict conversion behavior for a class, implement the _asdict method (this is used in the Python stdlib for named tuples).


source

vars_pub


def vars_pub(
    x
):

Get public non-skipped vars

The vars_pub function returns a list of public (non-underscore-prefixed) variable names from an object, excluding any names listed in the object’s optional __skip__ attribute.

class TestObj:
    def __init__(self): self.pub_attr,self._priv_attr,self.another_pub,self.skip_me = 1,2,3,4
    __skip__ = ['skip_me']

obj = TestObj()
test_eq(vars_pub(obj), ['pub_attr', 'another_pub'])

Without __skip__, all pub vars are returned

class SimpleObj:
    def __init__(self): self.a,self._b,self.c = 1,2,3

simple = SimpleObj()
test_eq(vars_pub(simple), ['a', 'c'])

source

is_typeddict


def is_typeddict(
    cls:type
)->bool:

Check if cls is a TypedDict

class MyDict(TypedDict): name:str

assert is_typeddict(MyDict)
assert not is_typeddict({'a':1})

source

is_namedtuple


def is_namedtuple(
    cls
):

True if cls is a namedtuple type

assert is_namedtuple(namedtuple('tst', ['a']))
assert not is_namedtuple(tuple)

source

CachedIter


def CachedIter(
    o
):

Cache the result returned by an iterator

def f():
    yield 1
    return 2

r = CachedIter(f())
for o in r: print(o)
r.value
1
2

source

CachedAwaitable


def CachedAwaitable(
    o
):

Cache the result from an awaitable


source

reawaitable


def reawaitable(
    func:callable
):

Wraps the result of an asynchronous function into an object which can be awaited more than once

CachedCoro and reawaitable are partly based on python issue tracker code from Serhiy Storchaka. They allow an awaitable to be called multiple times.

@reawaitable
async def fetch_data():
    await asyncio.sleep(0.1)
    return "data"

r = fetch_data()
print(await r)  # "data"
print(await r)  # "data" (no delay)
data
data

source

flexicache


def flexicache(
    funcs:VAR_POSITIONAL, maxsize:int=128
):

Like lru_cache, but customisable with policy funcs

This is a flexible lru cache function that you can pass a list of functions to. Those functions define the cache eviction policy. For instance, time_policy is provided for time-based cache eviction, and mtime_policy evicts based on a file’s modified-time changing. The policy functions are passed the last value that function returned was (initially None), and return a new value to indicate the cache has expired. When the cache expires, all functions are called with None to force getting new values.


source

time_policy


def time_policy(
    seconds
):

A flexicache policy that expires cached items after seconds have passed


source

mtime_policy


def mtime_policy(
    filepath
):

A flexicache policy that expires cached items after filepath modified-time changes

@flexicache(time_policy(10), mtime_policy('000_tour.ipynb'))
def cached_func(x, y): return x+y

cached_func(1,2)
3
@flexicache(time_policy(10), mtime_policy('000_tour.ipynb'))
async def cached_func(x, y): return x+y

print(await cached_func(1,2))
await cached_func(1,2)
3
3

source

timed_cache


def timed_cache(
    seconds:int=60, maxsize:int=128
):

Like lru_cache, but also with time-based eviction

# demonstrate that flexicache is LRU
@flexicache(maxsize=2)
def cached_func(x): return time()

time_1 = cached_func(1)
test_eq(time_1, cached_func(1))

time_2 = cached_func(2)
test_eq(time_1, cached_func(1))
test_eq(time_2, cached_func(2))

time_3 = cached_func(3) # Removes 1

test_eq(time_2, cached_func(2)) # cache remains
test_eq(time_3, cached_func(3)) # cache remains
test_ne(time_1, cached_func(1)) # NEQ, removes 2
test_ne(time_2, cached_func(2))  # NEQ, removes 3
test_eq(cached_func(1), cached_func(1))

This function is a small convenience wrapper for using flexicache with time_policy.

@timed_cache(seconds=0.05, maxsize=2)
def cached_func(x): return x * 2, time()

# basic caching
result1, time1 = cached_func(2)
test_eq(result1, 4)
sleep(0.001)
result2, time2 = cached_func(2)
test_eq(result2, 4)
test_eq(time1, time2)

# caching different values
result3, _ = cached_func(3)
test_eq(result3, 6)

# maxsize
_, time4 = cached_func(4)
_, time2_new = cached_func(2)
test_close(time2, time2_new, eps=0.1)
_, time3_new = cached_func(3)
test_ne(time3_new, time())

# time expiration
sleep(0.05)
_, time4_new = cached_func(4)
test_ne(time4_new, time())