from fastcore.test import *
from nbdev.showdoc import *
from fastcore.nb_imports import *
from time import sleep
This is a convenience to give you "dotted" access to (possibly nested) dictionaries, e.g:
d1 = dict(a=1, b=dict(c=2,d=3))
d2 = dict2obj(d1)
test_eq(d2.b.c, 2)
test_eq(d2.b['c'], 2)
It can also be used on lists of dicts.
_list_of_dicts = [d1, d1]
ds = dict2obj(_list_of_dicts)
test_eq(ds[0].b.c, 2)
test_eq(obj2dict(d2), d1)
test_eq(obj2dict(ds), _list_of_dicts)
print(repr_dict(d2))
assert is_listy((1,))
assert is_listy([1])
assert is_listy(L([1]))
assert is_listy(slice(2))
assert not is_listy(array([1]))
def _f(x,a=1): return x-a
test_eq(mapped(_f,1),0)
test_eq(mapped(_f,[1,2]),[0,1])
test_eq(mapped(_f,(1,)),(0,))
This is useful when constructing batches or organizing data in a particular manner (i.e. for deep learning). This class is primarly used in organizing data for language models in fastai.
You can supply a custom index upon instantiation with the idxs
argument, or you can call the reindex
method to supply a new index for your collection.
Here is how you can reindex a list such that the elements are reversed:
rc=ReindexCollection(['a', 'b', 'c', 'd', 'e'], idxs=[4,3,2,1,0])
list(rc)
Alternatively, you can use the reindex
method:
rc=ReindexCollection(['a', 'b', 'c', 'd', 'e'])
rc.reindex([4,3,2,1,0])
list(rc)
You can optionally specify a LRU cache, which uses functools.lru_cache upon instantiation:
sz = 50
t = ReindexCollection(L.range(sz), cache=2)
#trigger a cache hit by indexing into the same element multiple times
t[0], t[0]
t._get.cache_info()
You can optionally clear the LRU cache by calling the cache_clear
method:
sz = 50
t = ReindexCollection(L.range(sz), cache=2)
#trigger a cache hit by indexing into the same element multiple times
t[0], t[0]
t.cache_clear()
t._get.cache_info()
Note that an ordered index is automatically constructed for the data structure even if one is not supplied.
rc=ReindexCollection(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])
rc.shuffle()
list(rc)
sz = 50
t = ReindexCollection(L.range(sz), cache=2)
test_eq(list(t), range(sz))
test_eq(t[sz-1], sz-1)
test_eq(t._get.cache_info().hits, 1)
t.shuffle()
test_eq(t._get.cache_info().hits, 1)
test_ne(list(t), range(sz))
test_eq(set(t), set(range(sz)))
t.cache_clear()
test_eq(t._get.cache_info().hits, 0)
test_eq(t.count(0), 1)
Utilities (other than extensions to Pathlib.Path) for dealing with IO.
globtastic('.', skip_folder_re='^[_.]', folder_re='core', file_glob='*.*py*', file_re='c')
This is useful for functions where you want to accept a path or file. maybe_open
will not close your file handle if you pass one in.
def _f(fn):
with maybe_open(fn) as f: return f.encoding
fname = '00_test.ipynb'
sys_encoding = 'cp1252' if sys.platform == 'win32' else 'UTF-8'
test_eq(_f(fname), sys_encoding)
with open(fname) as fh: test_eq(_f(fh), sys_encoding)
For example, we can use this to reimplement imghdr.what
from the Python standard library, which is written in Python 3.9 as:
def what(file, h=None):
f = None
try:
if h is None:
if isinstance(file, (str,os.PathLike)):
f = open(file, 'rb')
h = f.read(32)
else:
location = file.tell()
h = file.read(32)
file.seek(location)
for tf in imghdr.tests:
res = tf(h, f)
if res: return res
finally:
if f: f.close()
return None
Here's an example of the use of this function:
fname = 'images/puppy.jpg'
what(fname)
With maybe_open
, Self
, and L.map_first
, we can rewrite this in a much more concise and (in our opinion) clear way:
def what(file, h=None):
if h is None:
with maybe_open(file, 'rb') as f: h = f.peek(32)
return L(imghdr.tests).map_first(Self(h,file))
...and we can check that it still works:
test_eq(what(fname), 'jpeg')
...along with the version passing a file handle:
with open(fname,'rb') as f: test_eq(what(f), 'jpeg')
...along with the h
parameter version:
with open(fname,'rb') as f: test_eq(what(None, h=f.read(32)), 'jpeg')
test_eq(image_size(fname), (1200,803))
f = Path('files/test.txt')
if f.exists(): f.unlink()
bunzip('files/test.txt.bz2')
t = f.open().readlines()
test_eq(len(t),1)
test_eq(t[0], 'test\n')
f.unlink()
path = Path.cwd()/'_tmp'/'tst'
f = join_path_file('tst.txt', path)
assert path.exists()
test_eq(f, path/'tst.txt')
with open(f, 'w') as f_: assert join_path_file(f_, path) == f_
shutil.rmtree(Path.cwd()/'_tmp')
tst = """
# ignored
{ "a":1 }
hello
{
"b":2
}
"""
test_eq(list(loads_multi(tst)), [{'a': 1}, {'b': 2}])
def test_untar(foldername, rename=False, **kwargs):
with tempfile.TemporaryDirectory() as d:
nm = os.path.join(d, 'a')
shutil.make_archive(nm, 'gztar', **kwargs)
with tempfile.TemporaryDirectory() as d2:
d2 = Path(d2)
untar_dir(nm+'.tar.gz', d2, rename=rename)
test_eq(d2.ls(), [d2/foldername])
If the contents of fname
contain just one file or directory, it is placed directly in dest
:
test_untar('images', base_dir='images')
If rename
then the directory created is named based on the archive, without extension:
test_untar('a', base_dir='images', rename=True)
If the contents of fname
contain multiple files and directories, a new folder in dest
is created with the same name as fname
(but without extension):
test_untar('a', root_dir='images')
test_eq(repo_details('https://github.com/fastai/fastai.git'), ['fastai', 'fastai'])
test_eq(repo_details('[email protected]:fastai/nbdev.git\n'), ['fastai', 'nbdev'])
You can pass a string (which will be split based on standard shell rules), a list, or pass args directly:
run('echo', same_in_win=True)
run('pip', '--version', same_in_win=True)
run(['pip', '--version'], same_in_win=True)
if sys.platform == 'win32':
assert 'ipynb' in run('cmd /c dir /p')
assert 'ipynb' in run(['cmd', '/c', 'dir', '/p'])
assert 'ipynb' in run('cmd', '/c', 'dir', '/p')
else:
assert 'ipynb' in run('ls -ls')
assert 'ipynb' in run(['ls', '-l'])
assert 'ipynb' in run('ls', '-l')
Some commands fail in non-error situations, like grep
. Use ignore_ex
in those cases, which will return a tuple of stdout and returncode:
if sys.platform == 'win32':
test_eq(run('cmd /c findstr asdfds 00_test.ipynb', ignore_ex=True)[0], 1)
else:
test_eq(run('grep asdfds 00_test.ipynb', ignore_ex=True)[0], 1)
run
automatically decodes returned bytes to a str
. Use as_bytes
to skip that:
if sys.platform == 'win32':
# why I ignore as_types, because every time nbdev_clean_nbs will update \n to \r\n
test_eq(run('cmd /c echo hi'), 'hi')
else:
test_eq(run('echo hi', as_bytes=True), b'hi\n')
for suf in '.pkl','.bz2','.gz':
# delete=False is added for Windows
# https://stackoverflow.com/questions/23212435/permission-denied-to-write-to-my-temporary-file
with tempfile.NamedTemporaryFile(suffix=suf, delete=False) as f:
fn = Path(f.name)
save_pickle(fn, 't')
t = load_pickle(fn)
f.close()
test_eq(t,'t')
The following methods are added to the standard python libary Pathlib.Path.
p = Path('../fastcore/').resolve()
p
p.relpath(Path.cwd())
We add an ls()
method to pathlib.Path
which is simply defined as list(Path.iterdir())
, mainly for convenience in REPL environments such as notebooks.
path = Path()
t = path.ls()
assert len(t)>0
t1 = path.ls(10)
test_eq(len(t1), 10)
t2 = path.ls(file_exts='.ipynb')
assert len(t)>len(t2)
t[0]
You can also pass an optional file_type
MIME prefix and/or a list of file extensions.
lib_path = (path/'../fastcore')
txt_files=lib_path.ls(file_type='text')
assert len(txt_files) > 0 and txt_files[0].suffix=='.py'
ipy_files=path.ls(file_exts=['.ipynb'])
assert len(ipy_files) > 0 and ipy_files[0].suffix=='.ipynb'
txt_files[0],ipy_files[0]
fastai also updates the repr
of Path
such that, if Path.BASE_PATH
is defined, all paths are printed relative to that path (as long as they are contained in Path.BASE_PATH
:
t = ipy_files[0].absolute()
try:
Path.BASE_PATH = t.parent.parent
test_eq(repr(t), f"Path('nbs/{t.name}')")
finally: Path.BASE_PATH = None
get_source_link
allows you get a link to source code related to an object. For nbdev related projects such as fastcore, we can get the full link to a GitHub repo. For nbdev
projects, be sure to properly set the git_url
in settings.ini
(derived from lib_name
and branch
on top of the prefix you will need to adapt) so that those links are correct.
For example, below we get the link to fastcore.test.test_eq
:
assert 'fastcore/test.py' in get_source_link(test_eq)
get_source_link(test_eq)
For non-nbdev related projects, we can still retrieve part of the link:
from nbformat.converter import convert
assert get_source_link(convert).startswith('nbformat/converter.py')
get_source_link(convert)
w = 'abacadabra'
test_eq(truncstr(w, 10), w)
test_eq(truncstr(w, 5), 'abac…')
test_eq(truncstr(w, 5, suf=''), 'abaca')
test_eq(truncstr(w, 11, space='_'), w+"_")
test_eq(truncstr(w, 10, space='_'), w[:-1]+'…')
test_eq(truncstr(w, 5, suf='!!'), 'aba!!')
data = [9,6,None,1,4,0,8,15,10]
print(f'without "empty_zero": {sparkline(data, empty_zero=False)}')
print(f' with "empty_zero": {sparkline(data, empty_zero=True )}')
You can set a maximum and minimum for the y-axis of the sparkline with the arguments mn
and mx
respectively:
sparkline([1,2,3,400], mn=0, mx=3)
Add events with add
, and get number of events
and their frequency (freq
).
def _randwait(): yield from (sleep(random.random()/200) for _ in range(100))
c = EventTimer(store=5, span=0.03)
for o in _randwait(): c.add(1)
print(f'Num Events: {c.events}, Freq/sec: {c.freq:.01f}')
print('Most recent: ', sparkline(c.hist), *L(c.hist).map('{:.01f}'))
s = '/pulls/{pull_number}/reviews/{review_id}'
test_eq(stringfmt_names(s), ['pull_number','review_id'])
The result is a tuple of (formatted_string,missing_fields,extra_fields)
, e.g:
res,missing,xtra = partial_format(s, pull_number=1, foo=2)
test_eq(res, '/pulls/1/reviews/{review_id}')
test_eq(missing, ['review_id'])
test_eq(xtra, {'foo':2})
dt = datetime(2000,1,1,12)
print(f'{dt} UTC is {utc2local(dt)} local time')
print(f'{dt} local is {local2utc(dt)} UTC time')
You can add a breakpoint to an existing function, e.g:
Path.cwd = trace(Path.cwd)
Path.cwd()
Now, when the function is called it will drop you into the debugger. Note, you must issue the s
command when you begin to step into the function that is being traced.
test_eq(round_multiple(63,32), 64)
test_eq(round_multiple(50,32), 64)
test_eq(round_multiple(40,32), 32)
test_eq(round_multiple( 0,32), 0)
test_eq(round_multiple(63,32, round_down=True), 32)
test_eq(round_multiple((63,40),32), (64,32))
env_test = 'USERNAME' if sys.platform == "win32" else 'SHELL'
oldusr = os.environ[env_test]
replace_param = {env_test: 'a'}
with modified_env('PATH', **replace_param):
test_eq(os.environ[env_test], 'a')
assert 'PATH' not in os.environ
assert 'PATH' in os.environ
test_eq(os.environ[env_test], oldusr)
for o in "y YES t True on 1".split(): assert str2bool(o)
for o in "n no FALSE off 0".split(): assert not str2bool(o)
for o in 0,None,'',False: assert not str2bool(o)
for o in 1,True: assert str2bool(o)