mirror of
https://github.com/esphome/esphome.git
synced 2024-11-29 12:55:46 +01:00
281 lines
10 KiB
Python
281 lines
10 KiB
Python
import copy
|
|
|
|
import voluptuous as vol
|
|
|
|
from esphomeyaml import core
|
|
import esphomeyaml.config_validation as cv
|
|
from esphomeyaml.const import CONF_ABOVE, CONF_ACTION_ID, CONF_AND, CONF_AUTOMATION_ID, \
|
|
CONF_BELOW, CONF_CONDITION, CONF_CONDITION_ID, CONF_DELAY, CONF_ELSE, CONF_ID, CONF_IF, \
|
|
CONF_LAMBDA, CONF_OR, CONF_RANGE, CONF_THEN, CONF_TRIGGER_ID
|
|
from esphomeyaml.core import CORE, EsphomeyamlError
|
|
from esphomeyaml.cpp_generator import ArrayInitializer, Pvariable, TemplateArguments, add, \
|
|
get_variable, process_lambda, templatable
|
|
from esphomeyaml.cpp_types import Action, App, Component, PollingComponent, Trigger, \
|
|
esphomelib_ns, float_, uint32
|
|
from esphomeyaml.util import ServiceRegistry
|
|
|
|
|
|
def maybe_simple_id(*validators):
|
|
validator = vol.All(*validators)
|
|
|
|
def validate(value):
|
|
if isinstance(value, dict):
|
|
return validator(value)
|
|
return validator({CONF_ID: value})
|
|
|
|
return validate
|
|
|
|
|
|
def validate_recursive_condition(value):
|
|
return CONDITIONS_SCHEMA(value)
|
|
|
|
|
|
def validate_recursive_action(value):
|
|
value = cv.ensure_list(value)[:]
|
|
for i, item in enumerate(value):
|
|
item = copy.deepcopy(item)
|
|
if not isinstance(item, dict):
|
|
raise vol.Invalid(u"Action must consist of key-value mapping! Got {}".format(item))
|
|
key = next((x for x in item if x != CONF_ACTION_ID), None)
|
|
if key is None:
|
|
raise vol.Invalid(u"Key missing from action! Got {}".format(item))
|
|
if key not in ACTION_REGISTRY:
|
|
raise vol.Invalid(u"Unable to find action with the name '{}', is the component loaded?"
|
|
u"".format(key))
|
|
item.setdefault(CONF_ACTION_ID, None)
|
|
key2 = next((x for x in item if x != CONF_ACTION_ID and x != key), None)
|
|
if key2 is not None:
|
|
raise vol.Invalid(u"Cannot have two actions in one item. Key '{}' overrides '{}'! "
|
|
u"Did you forget to indent the action?"
|
|
u"".format(key, key2))
|
|
validator = ACTION_REGISTRY[key][0]
|
|
value[i] = {
|
|
CONF_ACTION_ID: cv.declare_variable_id(Action)(item[CONF_ACTION_ID]),
|
|
key: validator(item[key])
|
|
}
|
|
return value
|
|
|
|
|
|
ACTION_REGISTRY = ServiceRegistry()
|
|
|
|
# pylint: disable=invalid-name
|
|
DelayAction = esphomelib_ns.class_('DelayAction', Action, Component)
|
|
LambdaAction = esphomelib_ns.class_('LambdaAction', Action)
|
|
IfAction = esphomelib_ns.class_('IfAction', Action)
|
|
UpdateComponentAction = esphomelib_ns.class_('UpdateComponentAction', Action)
|
|
Automation = esphomelib_ns.class_('Automation')
|
|
|
|
Condition = esphomelib_ns.class_('Condition')
|
|
AndCondition = esphomelib_ns.class_('AndCondition', Condition)
|
|
OrCondition = esphomelib_ns.class_('OrCondition', Condition)
|
|
RangeCondition = esphomelib_ns.class_('RangeCondition', Condition)
|
|
LambdaCondition = esphomelib_ns.class_('LambdaCondition', Condition)
|
|
|
|
CONDITIONS_SCHEMA = vol.All(cv.ensure_list, [cv.templatable({
|
|
cv.GenerateID(CONF_CONDITION_ID): cv.declare_variable_id(Condition),
|
|
vol.Optional(CONF_AND): validate_recursive_condition,
|
|
vol.Optional(CONF_OR): validate_recursive_condition,
|
|
vol.Optional(CONF_RANGE): vol.All(vol.Schema({
|
|
vol.Optional(CONF_ABOVE): vol.Coerce(float),
|
|
vol.Optional(CONF_BELOW): vol.Coerce(float),
|
|
}), cv.has_at_least_one_key(CONF_ABOVE, CONF_BELOW)),
|
|
vol.Optional(CONF_LAMBDA): cv.lambda_,
|
|
})])
|
|
|
|
|
|
def validate_automation(extra_schema=None, extra_validators=None, single=False):
|
|
schema = AUTOMATION_SCHEMA.extend(extra_schema or {})
|
|
|
|
def validator_(value):
|
|
if isinstance(value, list):
|
|
try:
|
|
# First try as a sequence of actions
|
|
return [schema({CONF_THEN: value})]
|
|
except vol.Invalid as err:
|
|
# Next try as a sequence of automations
|
|
try:
|
|
return vol.Schema([schema])(value)
|
|
except vol.Invalid as err2:
|
|
if 'Unable to find action' in str(err):
|
|
raise err2
|
|
raise vol.MultipleInvalid([err, err2])
|
|
elif isinstance(value, dict):
|
|
if CONF_THEN in value:
|
|
return [schema(value)]
|
|
return [schema({CONF_THEN: value})]
|
|
# This should only happen with invalid configs, but let's have a nice error message.
|
|
return [schema(value)]
|
|
|
|
def validator(value):
|
|
value = validator_(value)
|
|
if extra_validators is not None:
|
|
value = vol.Schema([extra_validators])(value)
|
|
if single:
|
|
if len(value) != 1:
|
|
raise vol.Invalid("Cannot have more than 1 automation for templates")
|
|
return value[0]
|
|
return value
|
|
|
|
return validator
|
|
|
|
|
|
AUTOMATION_SCHEMA = vol.Schema({
|
|
cv.GenerateID(CONF_TRIGGER_ID): cv.declare_variable_id(Trigger),
|
|
cv.GenerateID(CONF_AUTOMATION_ID): cv.declare_variable_id(Automation),
|
|
vol.Optional(CONF_IF): CONDITIONS_SCHEMA,
|
|
vol.Required(CONF_THEN): validate_recursive_action,
|
|
})
|
|
|
|
|
|
def build_condition(config, arg_type):
|
|
template_arg = TemplateArguments(arg_type)
|
|
if isinstance(config, core.Lambda):
|
|
for lambda_ in process_lambda(config, [(arg_type, 'x')]):
|
|
yield
|
|
yield LambdaCondition.new(template_arg, lambda_)
|
|
elif CONF_AND in config:
|
|
yield AndCondition.new(template_arg, build_conditions(config[CONF_AND], template_arg))
|
|
elif CONF_OR in config:
|
|
yield OrCondition.new(template_arg, build_conditions(config[CONF_OR], template_arg))
|
|
elif CONF_LAMBDA in config:
|
|
lambda_ = None
|
|
for lambda_ in process_lambda(config[CONF_LAMBDA], [(arg_type, 'x')]):
|
|
yield
|
|
yield LambdaCondition.new(template_arg, lambda_)
|
|
elif CONF_RANGE in config:
|
|
conf = config[CONF_RANGE]
|
|
rhs = RangeCondition.new(template_arg)
|
|
type = RangeCondition.template(template_arg)
|
|
condition = Pvariable(config[CONF_CONDITION_ID], rhs, type=type)
|
|
if CONF_ABOVE in conf:
|
|
for template_ in templatable(conf[CONF_ABOVE], arg_type, float_):
|
|
yield
|
|
condition.set_min(template_)
|
|
if CONF_BELOW in conf:
|
|
for template_ in templatable(conf[CONF_BELOW], arg_type, float_):
|
|
yield
|
|
condition.set_max(template_)
|
|
yield condition
|
|
else:
|
|
raise EsphomeyamlError(u"Unsupported condition {}".format(config))
|
|
|
|
|
|
def build_conditions(config, arg_type):
|
|
conditions = []
|
|
for conf in config:
|
|
condition = None
|
|
for condition in build_condition(conf, arg_type):
|
|
yield None
|
|
conditions.append(condition)
|
|
yield ArrayInitializer(*conditions)
|
|
|
|
|
|
DELAY_ACTION_SCHEMA = cv.templatable(cv.positive_time_period_milliseconds)
|
|
|
|
|
|
@ACTION_REGISTRY.register(CONF_DELAY, DELAY_ACTION_SCHEMA)
|
|
def delay_action_to_code(config, action_id, arg_type):
|
|
template_arg = TemplateArguments(arg_type)
|
|
rhs = App.register_component(DelayAction.new(template_arg))
|
|
type = DelayAction.template(template_arg)
|
|
action = Pvariable(action_id, rhs, type=type)
|
|
for template_ in templatable(config, arg_type, uint32):
|
|
yield
|
|
add(action.set_delay(template_))
|
|
yield action
|
|
|
|
|
|
IF_ACTION_SCHEMA = vol.All({
|
|
vol.Required(CONF_CONDITION): validate_recursive_condition,
|
|
vol.Optional(CONF_THEN): validate_recursive_action,
|
|
vol.Optional(CONF_ELSE): validate_recursive_action,
|
|
}, cv.has_at_least_one_key(CONF_THEN, CONF_ELSE))
|
|
|
|
|
|
@ACTION_REGISTRY.register(CONF_IF, IF_ACTION_SCHEMA)
|
|
def if_action_to_code(config, action_id, arg_type):
|
|
template_arg = TemplateArguments(arg_type)
|
|
for conditions in build_conditions(config[CONF_CONDITION], arg_type):
|
|
yield None
|
|
rhs = IfAction.new(template_arg, conditions)
|
|
type = IfAction.template(template_arg)
|
|
action = Pvariable(action_id, rhs, type=type)
|
|
if CONF_THEN in config:
|
|
for actions in build_actions(config[CONF_THEN], arg_type):
|
|
yield None
|
|
add(action.add_then(actions))
|
|
if CONF_ELSE in config:
|
|
for actions in build_actions(config[CONF_ELSE], arg_type):
|
|
yield None
|
|
add(action.add_else(actions))
|
|
yield action
|
|
|
|
|
|
LAMBDA_ACTION_SCHEMA = cv.lambda_
|
|
|
|
|
|
@ACTION_REGISTRY.register(CONF_LAMBDA, LAMBDA_ACTION_SCHEMA)
|
|
def lambda_action_to_code(config, action_id, arg_type):
|
|
template_arg = TemplateArguments(arg_type)
|
|
for lambda_ in process_lambda(config, [(arg_type, 'x')]):
|
|
yield None
|
|
rhs = LambdaAction.new(template_arg, lambda_)
|
|
type = LambdaAction.template(template_arg)
|
|
yield Pvariable(action_id, rhs, type=type)
|
|
|
|
|
|
CONF_COMPONENT_UPDATE = 'component.update'
|
|
COMPONENT_UPDATE_ACTION_SCHEMA = maybe_simple_id({
|
|
vol.Required(CONF_ID): cv.use_variable_id(PollingComponent),
|
|
})
|
|
|
|
|
|
@ACTION_REGISTRY.register(CONF_COMPONENT_UPDATE, COMPONENT_UPDATE_ACTION_SCHEMA)
|
|
def component_update_action_to_code(config, action_id, arg_type):
|
|
template_arg = TemplateArguments(arg_type)
|
|
for var in get_variable(config[CONF_ID]):
|
|
yield None
|
|
rhs = UpdateComponentAction.new(var)
|
|
type = UpdateComponentAction.template(template_arg)
|
|
yield Pvariable(action_id, rhs, type=type)
|
|
|
|
|
|
def build_action(full_config, arg_type):
|
|
action_id = full_config[CONF_ACTION_ID]
|
|
key, config = next((k, v) for k, v in full_config.items() if k in ACTION_REGISTRY)
|
|
|
|
builder = ACTION_REGISTRY[key][1]
|
|
for result in builder(config, action_id, arg_type):
|
|
yield None
|
|
yield result
|
|
|
|
|
|
def build_actions(config, arg_type):
|
|
actions = []
|
|
for conf in config:
|
|
action = None
|
|
for action in build_action(conf, arg_type):
|
|
yield None
|
|
actions.append(action)
|
|
yield ArrayInitializer(*actions, multiline=False)
|
|
|
|
|
|
def build_automation_(trigger, arg_type, config):
|
|
rhs = App.make_automation(TemplateArguments(arg_type), trigger)
|
|
type = Automation.template(arg_type)
|
|
obj = Pvariable(config[CONF_AUTOMATION_ID], rhs, type=type)
|
|
if CONF_IF in config:
|
|
conditions = None
|
|
for conditions in build_conditions(config[CONF_IF], arg_type):
|
|
yield None
|
|
add(obj.add_conditions(conditions))
|
|
actions = None
|
|
for actions in build_actions(config[CONF_THEN], arg_type):
|
|
yield None
|
|
add(obj.add_actions(actions))
|
|
yield obj
|
|
|
|
|
|
def build_automation(trigger, arg_type, config):
|
|
CORE.add_job(build_automation_, trigger, arg_type, config)
|