Source code for tagit.basics

"""Basic helper functions for the tagit module.

Part of the tagit module.
A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2016

"""

import sys
from os.path import dirname, basename
import os.path
import operator

__all__ = ('head', 'fst', 'tail', 'snd', 'last', 'unique', 'subset', 'union', 'Settings', 'Colors', 'Console', 'string_types', 'intersection', 'difference', 'exclusive', 'get_config', 'split', 'truncate_dir', 'dict_update', 'duplicates', 'item_count', 'check_version')

#: String types that can be used for checking if a object is a string
PY2 = sys.version_info[0] == 2
string_types = None
if PY2:
    string_types = basestring
else:
    string_types = str

# Some basic list operations
head = lambda lst: lst[0]
tail = lambda lst: lst[1:]
fst  = lambda lst: lst[0]
snd  = lambda lst: lst[1]
last = lambda lst: lst[-1]

# Some basic set operations
unique          = lambda lst: list(set(lst))
union           = lambda *args: unique(reduce(operator.add, args, []))
difference      = lambda lstA, lstB: filter(lambda item: item not in lstB, lstA)
exclusive       = lambda lstA, lstB: difference(lstA, lstB) + difference(lstB, lstA)
subset          = lambda sub, par: all(im in par for im in sub)
duplicates      = lambda lst: map(snd, filter(lambda (idx, item): item in lst[idx+1:], enumerate(lst)))
item_count      = lambda needle, haystack: sum([1.0 for item in haystack if item == needle])

[docs]def intersection(*args): if len(args) == 0: return [] biarg = lambda lstA, lstB: filter(lambda item: item in lstB, lstA) return reduce(biarg, args[1:], args[0])
[docs]def split(callback, lst): """Split the list *lst* via boolean return value of *callback*. True is returned first. """ #cut = map(callback, lst) #first = filter(lambda (c, i): c, zip(cut, lst)) #second = difference(first, cut) first, second = [], [] for item in lst: if callback(item): first.append(item) else: second.append(item) return first, second
[docs]def truncate_dir(path, cutoff=3): """Remove path up to last *cutoff* directories""" last_dirs = dirname(path).split(os.path.sep)[-cutoff:] return os.path.join(*(last_dirs + [basename(path)]))
[docs]def dict_update(aggregate, candidate): """Update a *dict* recursively.""" for key in candidate: if key in aggregate and isinstance(aggregate[key], dict): dict_update(aggregate[key], candidate[key]) else: aggregate[key] = candidate[key] return aggregate
def check_version((is_maj, is_min, is_rc), (shall_maj, shall_min, shall_rc), callee=None): if is_maj > shall_maj: return True if is_maj == shall_maj and is_min > shall_min: return True if is_maj == shall_maj and is_min == shall_min and is_rc >= shall_rc: return True raise Exception('Minimum version requirement not met!' + (callee is not None and ('(' + callee + ')') or '')) # Console colors
[docs]class Colors(object): """Console colors. """ STATUS = "\033[91m" ERROR = "\033[91m" ENDC = "\033[0m" PREFIX = "\033[36m" CODE = "\033[1m" HISTORY = "\033[94m" MODE = "\033[93m" TEXT = "" # normal console color (white)
[docs]class Console(object): """Status Line. Create lines like so:: fixed_size_part variable_size_part [STAT] The variable_size_part is truncated such that the line fits CONSOLE_WIDTH characters. If not *verbose*, only failure states are printed """ CONSOLE_WIDTH=80 PREFIX_LEN=CONSOLE_WIDTH - 7 def __init__(self, verbose): self.verbose = verbose self._prefix = ''
[docs] def title(self, variable, fixed=''): """Write the text part of the status line. """ self._title = (variable, fixed) self._prefix = fixed + self.__truncate(variable, self.PREFIX_LEN - len(fixed)) if self.verbose: sys.stdout.write(self._prefix)
def __truncate(self, prefix, length): """Truncate a string. """ if len(prefix) > length: return '...' + prefix[-(length-3):] return prefix
[docs] def fail(self): """Write FAIL result. """ if self.verbose: sys.stdout.write(Colors.ERROR + str.rjust("[SKIP]", self.CONSOLE_WIDTH - len(self._prefix)) + Colors.ENDC + "\n") else: print "Skipping", self._title[0]
[docs] def ok(self): """Write OK result. """ if self.verbose: sys.stdout.write(Colors.STATUS + str.rjust("[ OK ]", self.CONSOLE_WIDTH - len(self._prefix)) + Colors.ENDC + "\n")
[docs] def ignored(self): """Write IGNORED result. """ if self.verbose: sys.stdout.write(Colors.STATUS + str.rjust("[ NO ]", self.CONSOLE_WIDTH - len(self._prefix)) + Colors.ENDC + "\n")
# Settings class
[docs]class Settings(dict): """Less strict dictionary for missing settings. Return None instead of raising KeyError if a key is not in the settings dictionary. The settings dictionary looks like this:: { "section" : { "config" : value, ... }, ... } This class makes the dictionary keys "section" and "config" optional. .. TODO:: SubSettings provides the same tolerant behaviour for the "config" keys. However, then no more keys can be assigned. See the example below: >>> s = Settings({'section' : {'config' : 'value'}}) >>> s['section']['config'] = 'changed' >>> s['section']['config'] == 'changed' False That's because s['section'] returns a copy of the original dictionary (by means of SubSettings), not the dictionary itself. Currently, this is not viewed as an issue, as the recommended call for settings goes like this: >>> s = Settings(...) >>> s['view'].get('config', 'default') This method has the benefit that (i) we can change stuff within the Settings and (ii) the default value can be chosen when called (with SubSettings it's always None). """ def __getitem__(self, key): try: return SubSettings(super(Settings, self).__getitem__(key)) #return super(Settings, self).__getitem__(key) except KeyError: return SubSettings({}) #return {}
[docs] def trace(self, *args): """Traverse the *settings* dictionary and subdict's args are (section, [<more dicts>,] key, default value) """ assert len(args) > 2 key, default = args[-2:] sub = self for subkey in args[:-2]: sub = sub.get(subkey, {}) return sub.get(key, default)
class SubSettings(dict): """Helper for the *Settings* class. Handles keys of the second order (i.e. "config" keys in the *Settings* description). """ def __missing__(self, key): return None
[docs]def get_config(settings, *args): """Traverse the *settings* dictionary and subdict's args are (section, [<more dicts>,] key, default value) """ assert len(args) > 2 key, default = args[-2:] sub = settings for subkey in args[:-2]: sub = sub.get(subkey, {}) return sub.get(key, default)
if __name__ == '__main__': import json data = json.load(open('settings.json')) s = Settings(data) print "Via function" print get_config(s, 'model', 'thumbnail', 'on_load', False) print get_config(s, 'model', 'thumbnail', 'abc') print get_config(s, 'model', 'extension', 'fictional', 'fantasy') print get_config(s, 'session', 'debug', True) print get_config(s, 'session', 'debug', False) print "Via Settings" print s.trace('model', 'thumbnail', 'on_load', False) print s.trace('model', 'thumbnail', 'abc') print s.trace('model', 'extension', 'fictional', 'fantasy') print s.trace('session', 'debug', True) print s.trace('session', 'debug', False) #print s.trace('session', 'cba') #print s.trace('session') #print get_config(s) ## EOF ##