import contextlib import os import textwrap import time from oslo_serialization import jsonutils from oslo_utils import encodeutils import prettytable import six def env(*args, **kwargs): """Returns the first environment variable set. If all are empty, defaults to '' or keyword arg `default`. """ for arg in args: value = os.environ.get(arg) if value: return value return kwargs.get('default', '') def arg(*args, **kwargs): """Decorator for CLI args. Example: >>> @arg("name", help="Name of the new entity") ... def entity_create(args): ... pass """ def _decorator(func): add_arg(func, *args, **kwargs) return func return _decorator def add_arg(func, *args, **kwargs): """Bind CLI arguments to a shell.py `do_foo` function.""" if not hasattr(func, 'arguments'): func.arguments = [] # NOTE(sirp): avoid dups that can occur when the module is shared across # tests. if (args, kwargs) not in func.arguments: # Because of the semantics of decorator composition if we just append # to the options list positional options will appear to be backwards. func.arguments.insert(0, (args, kwargs)) def pretty_choice_list(l): return ', '.join("'%s'" % i for i in l) def pretty_choice_dict(d): """Returns a formatted dict as 'key=value'.""" return pretty_choice_list(['%s=%s' % (k, d[k]) for k in sorted(d.keys())]) def print_list(objs, fields, formatters={}, sortby=None, align='c'): pt = prettytable.PrettyTable([f for f in fields], caching=False) pt.align = align for o in objs: row = [] for field in fields: if field in formatters: if callable(formatters[field]): row.append(formatters[field](o)) else: row.append(o.get(formatters[field], None)) else: data = o.get(field, None) if data is None or data == "": data = '-' data = six.text_type(data).replace("\r", "") row.append(data) pt.add_row(row) if sortby is not None and sortby in fields: result = encodeutils.safe_encode(pt.get_string(sortby=sortby)) else: result = encodeutils.safe_encode(pt.get_string()) if six.PY3: result = result.decode() print(result) def print_dict(d, dict_property="Property", dict_value="Value", wrap=0): pt = prettytable.PrettyTable([dict_property, dict_value], caching=False) pt.align = 'l' for k, v in sorted(d.items()): # convert dict to str to check length if isinstance(v, (dict, list)): v = jsonutils.dumps(v) if wrap > 0: v = textwrap.fill(six.text_type(v), wrap) # if value has a newline, add in multiple rows # e.g. fault with stacktrace if v and isinstance(v, six.string_types) and (r'\n' in v or '\r' in v): # '\r' would break the table, so remove it. if '\r' in v: v = v.replace('\r', '') lines = v.strip().split(r'\n') col1 = k for line in lines: pt.add_row([col1, line]) col1 = '' else: if v is None: v = '-' pt.add_row([k, v]) result = encodeutils.safe_encode(pt.get_string()) if six.PY3: result = result.decode() print(result) def safe_issubclass(*args): """Like issubclass, but will just return False if not a class.""" try: if issubclass(*args): return True except TypeError: pass return False @contextlib.contextmanager def record_time(times, enabled, *args): """Record the time of a specific action. :param times: A list of tuples holds time data. :type times: list :param enabled: Whether timing is enabled. :type enabled: bool :param *args: Other data to be stored besides time data, these args will be joined to a string. """ if not enabled: yield else: start = time.time() yield end = time.time() times.append((' '.join(args), start, end)) def get_function_name(func): if six.PY2: if hasattr(func, "im_class"): return "%s.%s" % (func.im_class, func.__name__) else: return "%s.%s" % (func.__module__, func.__name__) else: return "%s.%s" % (func.__module__, func.__qualname__)