esphome/esphomeyaml/helpers.py
2018-05-06 15:56:12 +02:00

450 lines
12 KiB
Python

from __future__ import print_function
import logging
import re
from collections import OrderedDict, deque
from esphomeyaml import core
from esphomeyaml.const import CONF_AVAILABILITY, CONF_COMMAND_TOPIC, CONF_DISCOVERY, \
CONF_INVERTED, \
CONF_MODE, CONF_NUMBER, CONF_PAYLOAD_AVAILABLE, CONF_PAYLOAD_NOT_AVAILABLE, CONF_RETAIN, \
CONF_STATE_TOPIC, CONF_TOPIC
from esphomeyaml.core import ESPHomeYAMLError, HexInt
_LOGGER = logging.getLogger(__name__)
def ensure_unique_string(preferred_string, current_strings):
test_string = preferred_string
current_strings_set = set(current_strings)
tries = 1
while test_string in current_strings_set:
tries += 1
test_string = u"{}_{}".format(preferred_string, tries)
return test_string
def indent_all_but_first_and_last(text, padding=u' '):
lines = text.splitlines(True)
if len(lines) <= 2:
return text
return lines[0] + u''.join(padding + line for line in lines[1:-1]) + lines[-1]
def indent_list(text, padding=u' '):
return [padding + line for line in text.splitlines()]
def indent(text, padding=u' '):
return u'\n'.join(indent_list(text, padding))
class Expression(object):
def __init__(self):
self.requires = []
self.required = False
def __str__(self):
raise NotImplementedError
def require(self):
self.required = True
for require in self.requires:
if require.required:
continue
require.require()
class RawExpression(Expression):
def __init__(self, text):
super(RawExpression, self).__init__()
self.text = text
def __str__(self):
return self.text
# pylint: disable=redefined-builtin
class AssignmentExpression(Expression):
def __init__(self, type, modifier, name, rhs, obj):
super(AssignmentExpression, self).__init__()
self.type = type
self.modifier = modifier
self.name = name
self.rhs = safe_exp(rhs)
self.requires.append(self.rhs)
self.obj = obj
def __str__(self):
type_ = self.type
if core.SIMPLIFY:
type_ = u'auto'
return u"{} {}{} = {}".format(type_, self.modifier, self.name, self.rhs)
class ExpressionList(Expression):
def __init__(self, *args):
super(ExpressionList, self).__init__()
# Remove every None on end
args = list(args)
while args and args[-1] is None:
args.pop()
self.args = []
for arg in args:
exp = safe_exp(arg)
self.requires.append(exp)
self.args.append(exp)
def __str__(self):
text = u", ".join(unicode(x) for x in self.args)
return indent_all_but_first_and_last(text)
class CallExpression(Expression):
def __init__(self, base, *args):
super(CallExpression, self).__init__()
self.base = base
self.args = ExpressionList(*args)
self.requires.append(self.args)
def __str__(self):
return u'{}({})'.format(self.base, self.args)
class StructInitializer(Expression):
def __init__(self, base, *args):
super(StructInitializer, self).__init__()
self.base = base
if not isinstance(args, OrderedDict):
args = OrderedDict(args)
self.args = OrderedDict()
for key, value in args.iteritems():
if value is None:
continue
exp = safe_exp(value)
self.args[key] = exp
self.requires.append(exp)
def __str__(self):
cpp = u'{}{{\n'.format(self.base)
for key, value in self.args.iteritems():
cpp += u' .{} = {},\n'.format(key, value)
cpp += u'}'
return cpp
class ArrayInitializer(Expression):
def __init__(self, *args, **kwargs):
super(ArrayInitializer, self).__init__()
self.multiline = kwargs.get('multiline', True)
self.args = []
for arg in args:
if arg is None:
continue
exp = safe_exp(arg)
self.args.append(exp)
self.requires.append(exp)
def __str__(self):
if not self.args:
return u'{}'
if self.multiline:
cpp = u'{\n'
for arg in self.args:
cpp += u' {},\n'.format(arg)
cpp += u'}'
else:
cpp = u'{' + u', '.join(str(arg) for arg in self.args) + u'}'
return cpp
class Literal(Expression):
def __str__(self):
raise NotImplementedError
class StringLiteral(Literal):
def __init__(self, string):
super(StringLiteral, self).__init__()
self.string = string
def __str__(self):
return u'"{}"'.format(self.string)
class IntLiteral(Literal):
def __init__(self, i):
super(IntLiteral, self).__init__()
self.i = i
def __str__(self):
return unicode(self.i)
class BoolLiteral(Literal):
def __init__(self, binary):
super(BoolLiteral, self).__init__()
self.binary = binary
def __str__(self):
return u"true" if self.binary else u"false"
class HexIntLiteral(Literal):
def __init__(self, i):
super(HexIntLiteral, self).__init__()
self.i = HexInt(i)
def __str__(self):
return str(self.i)
class FloatLiteral(Literal):
def __init__(self, float_):
super(FloatLiteral, self).__init__()
self.float_ = float_
def __str__(self):
return u"{:f}f".format(self.float_)
def safe_exp(obj):
if isinstance(obj, Expression):
return obj
elif isinstance(obj, bool):
return BoolLiteral(obj)
elif isinstance(obj, (str, unicode)):
return StringLiteral(obj)
elif isinstance(obj, (int, long)):
return IntLiteral(obj)
elif isinstance(obj, float):
return FloatLiteral(obj)
raise ValueError(u"Object is not an expression", obj)
class Statement(object):
def __init__(self):
pass
def __str__(self):
raise NotImplementedError
class RawStatement(Statement):
def __init__(self, text):
super(RawStatement, self).__init__()
self.text = text
def __str__(self):
return self.text
class ExpressionStatement(Statement):
def __init__(self, expression):
super(ExpressionStatement, self).__init__()
self.expression = safe_exp(expression)
def __str__(self):
return u"{};".format(self.expression)
def statement(expression):
if isinstance(expression, Statement):
return expression
return ExpressionStatement(expression)
# pylint: disable=redefined-builtin, invalid-name
def variable(type, id, rhs):
rhs = safe_exp(rhs)
obj = MockObj(id, u'.')
assignment = AssignmentExpression(type, '', id, rhs, obj)
add(assignment)
_VARIABLES[id] = obj, type
obj.requires.append(assignment)
return obj
def Pvariable(type, id, rhs):
rhs = safe_exp(rhs)
obj = MockObj(id, u'->')
assignment = AssignmentExpression(type, '*', id, rhs, obj)
add(assignment)
_VARIABLES[id] = obj, type
obj.requires.append(assignment)
return obj
_QUEUE = deque()
_VARIABLES = {}
_EXPRESSIONS = []
def get_variable(id, type=None):
result = None
while _QUEUE:
if id is not None:
if id in _VARIABLES:
result = _VARIABLES[id][0]
break
elif type is not None:
result = next((x[0] for x in _VARIABLES.itervalues() if x[1] == type), None)
if result is not None:
break
func, config = _QUEUE.popleft()
func(config)
if id is None and type is None:
return None
if result is None:
if id is not None:
result = _VARIABLES[id][0]
elif type is not None:
result = next((x[0] for x in _VARIABLES.itervalues() if x[1] == type), None)
if result is None:
raise ESPHomeYAMLError(u"Couldn't find ID '{}' with type {}".format(id, type))
return result
def add_task(func, config):
_QUEUE.append((func, config))
def add(expression, require=True):
if require and isinstance(expression, Expression):
expression.require()
_EXPRESSIONS.append(expression)
return expression
class MockObj(Expression):
def __init__(self, base, op=u'.'):
self.base = base
self.op = op
super(MockObj, self).__init__()
def __getattr__(self, attr):
next_op = u'.'
if attr.startswith(u'P'):
attr = attr[1:]
next_op = u'->'
op = self.op
obj = MockObj(u'{}{}{}'.format(self.base, op, attr), next_op)
obj.requires.append(self)
return obj
def __call__(self, *args, **kwargs):
call = CallExpression(self.base, *args)
obj = MockObj(call, self.op)
obj.requires.append(self)
obj.requires.append(call)
return obj
def __str__(self):
return unicode(self.base)
def require(self):
self.required = True
for require in self.requires:
if require.required:
continue
require.require()
App = MockObj(u'App')
GPIOPin = MockObj(u'GPIOPin')
GPIOOutputPin = MockObj(u'GPIOOutputPin')
GPIOInputPin = MockObj(u'GPIOInputPin')
def get_gpio_pin_number(conf):
if isinstance(conf, int):
return conf
return conf[CONF_NUMBER]
def exp_gpio_pin_(obj, conf, default_mode):
if isinstance(conf, int):
return conf
if 'pcf8574' in conf:
hub = get_variable(conf['pcf8574'])
if default_mode == u'INPUT':
return hub.make_input_pin(conf[CONF_NUMBER],
RawExpression('PCF8574_' + conf[CONF_MODE]),
conf[CONF_INVERTED])
elif default_mode == u'OUTPUT':
return hub.make_output_pin(conf[CONF_NUMBER], conf[CONF_INVERTED])
else:
raise ESPHomeYAMLError(u"Unknown default mode {}".format(default_mode))
if conf.get(CONF_INVERTED) is None:
return obj(conf[CONF_NUMBER], conf.get(CONF_MODE))
return obj(conf[CONF_NUMBER], RawExpression(conf.get(CONF_MODE, default_mode)),
conf[CONF_INVERTED])
def exp_gpio_pin(conf):
return GPIOPin(conf[CONF_NUMBER], conf[CONF_MODE], conf.get(CONF_INVERTED))
def exp_gpio_output_pin(conf):
return exp_gpio_pin_(GPIOOutputPin, conf, u'OUTPUT')
def exp_gpio_input_pin(conf):
return exp_gpio_pin_(GPIOInputPin, conf, u'INPUT')
def setup_mqtt_component(obj, config):
if CONF_RETAIN in config:
add(obj.set_retain(config[CONF_RETAIN]))
if not config.get(CONF_DISCOVERY, True):
add(obj.disable_discovery())
if CONF_STATE_TOPIC in config:
add(obj.set_custom_state_topic(config[CONF_STATE_TOPIC]))
if CONF_COMMAND_TOPIC in config:
add(obj.set_custom_command_topic(config[CONF_COMMAND_TOPIC]))
if CONF_AVAILABILITY in config:
availability = config[CONF_AVAILABILITY]
add(obj.set_availability(availability[CONF_TOPIC], availability[CONF_PAYLOAD_AVAILABLE],
availability[CONF_PAYLOAD_NOT_AVAILABLE]))
def exp_empty_optional(type):
return RawExpression(u'Optional<{}>()'.format(type))
def exp_optional(type, value):
if value is None:
return exp_empty_optional(type)
return value
# shlex's quote for Python 2.7
_find_unsafe = re.compile(r'[^\w@%+=:,./-]').search
def quote(s):
"""Return a shell-escaped version of the string *s*."""
if not s:
return u"''"
if _find_unsafe(s) is None:
return s
# use single quotes, and put single quotes into double quotes
# the string $'b is then quoted as '$'"'"'b'
return u"'" + s.replace(u"'", u"'\"'\"'") + u"'"
def color(the_color, message='', reset=None):
"""Color helper."""
from colorlog.escape_codes import escape_codes, parse_colors
if not message:
return parse_colors(the_color)
return parse_colors(the_color) + message + escape_codes[reset or 'reset']