mirror of
https://github.com/goharbor/harbor.git
synced 2025-01-28 10:31:36 +01:00
166 lines
4.5 KiB
Python
166 lines
4.5 KiB
Python
|
import contextlib
|
||
|
import os
|
||
|
import textwrap
|
||
|
import time
|
||
|
|
||
|
from oslo_serialization import jsonutils
|
||
|
from oslo_utils import encodeutils
|
||
|
import prettytable
|
||
|
import six
|
||
|
|
||
|
|
||
|
def env(*args, **kwargs):
|
||
|
"""Returns the first environment variable set.
|
||
|
|
||
|
If all are empty, defaults to '' or keyword arg `default`.
|
||
|
"""
|
||
|
for arg in args:
|
||
|
value = os.environ.get(arg)
|
||
|
if value:
|
||
|
return value
|
||
|
return kwargs.get('default', '')
|
||
|
|
||
|
|
||
|
def arg(*args, **kwargs):
|
||
|
"""Decorator for CLI args.
|
||
|
|
||
|
Example:
|
||
|
|
||
|
>>> @arg("name", help="Name of the new entity")
|
||
|
... def entity_create(args):
|
||
|
... pass
|
||
|
"""
|
||
|
|
||
|
def _decorator(func):
|
||
|
add_arg(func, *args, **kwargs)
|
||
|
return func
|
||
|
|
||
|
return _decorator
|
||
|
|
||
|
|
||
|
def add_arg(func, *args, **kwargs):
|
||
|
"""Bind CLI arguments to a shell.py `do_foo` function."""
|
||
|
|
||
|
if not hasattr(func, 'arguments'):
|
||
|
func.arguments = []
|
||
|
|
||
|
# NOTE(sirp): avoid dups that can occur when the module is shared across
|
||
|
# tests.
|
||
|
if (args, kwargs) not in func.arguments:
|
||
|
# Because of the semantics of decorator composition if we just append
|
||
|
# to the options list positional options will appear to be backwards.
|
||
|
func.arguments.insert(0, (args, kwargs))
|
||
|
|
||
|
|
||
|
def pretty_choice_list(l):
|
||
|
return ', '.join("'%s'" % i for i in l)
|
||
|
|
||
|
|
||
|
def pretty_choice_dict(d):
|
||
|
"""Returns a formatted dict as 'key=value'."""
|
||
|
return pretty_choice_list(['%s=%s' % (k, d[k]) for k in sorted(d.keys())])
|
||
|
|
||
|
|
||
|
def print_list(objs, fields, formatters={}, sortby=None, align='c'):
|
||
|
pt = prettytable.PrettyTable([f for f in fields], caching=False)
|
||
|
pt.align = align
|
||
|
for o in objs:
|
||
|
row = []
|
||
|
for field in fields:
|
||
|
if field in formatters:
|
||
|
if callable(formatters[field]):
|
||
|
row.append(formatters[field](o))
|
||
|
else:
|
||
|
row.append(o.get(formatters[field], None))
|
||
|
else:
|
||
|
data = o.get(field, None)
|
||
|
if data is None or data == "":
|
||
|
data = '-'
|
||
|
data = six.text_type(data).replace("\r", "")
|
||
|
row.append(data)
|
||
|
pt.add_row(row)
|
||
|
if sortby is not None and sortby in fields:
|
||
|
result = encodeutils.safe_encode(pt.get_string(sortby=sortby))
|
||
|
else:
|
||
|
result = encodeutils.safe_encode(pt.get_string())
|
||
|
|
||
|
if six.PY3:
|
||
|
result = result.decode()
|
||
|
|
||
|
print(result)
|
||
|
|
||
|
|
||
|
def print_dict(d, dict_property="Property", dict_value="Value", wrap=0):
|
||
|
pt = prettytable.PrettyTable([dict_property, dict_value], caching=False)
|
||
|
pt.align = 'l'
|
||
|
for k, v in sorted(d.items()):
|
||
|
# convert dict to str to check length
|
||
|
if isinstance(v, (dict, list)):
|
||
|
v = jsonutils.dumps(v)
|
||
|
if wrap > 0:
|
||
|
v = textwrap.fill(six.text_type(v), wrap)
|
||
|
# if value has a newline, add in multiple rows
|
||
|
# e.g. fault with stacktrace
|
||
|
if v and isinstance(v, six.string_types) and (r'\n' in v or '\r' in v):
|
||
|
# '\r' would break the table, so remove it.
|
||
|
if '\r' in v:
|
||
|
v = v.replace('\r', '')
|
||
|
lines = v.strip().split(r'\n')
|
||
|
col1 = k
|
||
|
for line in lines:
|
||
|
pt.add_row([col1, line])
|
||
|
col1 = ''
|
||
|
else:
|
||
|
if v is None:
|
||
|
v = '-'
|
||
|
pt.add_row([k, v])
|
||
|
|
||
|
result = encodeutils.safe_encode(pt.get_string())
|
||
|
|
||
|
if six.PY3:
|
||
|
result = result.decode()
|
||
|
|
||
|
print(result)
|
||
|
|
||
|
|
||
|
def safe_issubclass(*args):
|
||
|
"""Like issubclass, but will just return False if not a class."""
|
||
|
|
||
|
try:
|
||
|
if issubclass(*args):
|
||
|
return True
|
||
|
except TypeError:
|
||
|
pass
|
||
|
|
||
|
return False
|
||
|
|
||
|
|
||
|
@contextlib.contextmanager
|
||
|
def record_time(times, enabled, *args):
|
||
|
"""Record the time of a specific action.
|
||
|
|
||
|
:param times: A list of tuples holds time data.
|
||
|
:type times: list
|
||
|
:param enabled: Whether timing is enabled.
|
||
|
:type enabled: bool
|
||
|
:param *args: Other data to be stored besides time data, these args
|
||
|
will be joined to a string.
|
||
|
"""
|
||
|
if not enabled:
|
||
|
yield
|
||
|
else:
|
||
|
start = time.time()
|
||
|
yield
|
||
|
end = time.time()
|
||
|
times.append((' '.join(args), start, end))
|
||
|
|
||
|
|
||
|
def get_function_name(func):
|
||
|
if six.PY2:
|
||
|
if hasattr(func, "im_class"):
|
||
|
return "%s.%s" % (func.im_class, func.__name__)
|
||
|
else:
|
||
|
return "%s.%s" % (func.__module__, func.__name__)
|
||
|
else:
|
||
|
return "%s.%s" % (func.__module__, func.__qualname__)
|