mirror of
https://github.com/esphome/esphome.git
synced 2024-12-22 16:37:52 +01:00
New vscode schema gen (#3336)
This commit is contained in:
parent
9de61fcf58
commit
05dc97099a
@ -262,21 +262,16 @@ async def repeat_action_to_code(config, action_id, template_arg, args):
|
|||||||
return var
|
return var
|
||||||
|
|
||||||
|
|
||||||
def validate_wait_until(value):
|
_validate_wait_until = cv.maybe_simple_value(
|
||||||
schema = cv.Schema(
|
|
||||||
{
|
{
|
||||||
cv.Required(CONF_CONDITION): validate_potentially_and_condition,
|
cv.Required(CONF_CONDITION): validate_potentially_and_condition,
|
||||||
cv.Optional(CONF_TIMEOUT): cv.templatable(
|
cv.Optional(CONF_TIMEOUT): cv.templatable(cv.positive_time_period_milliseconds),
|
||||||
cv.positive_time_period_milliseconds
|
},
|
||||||
),
|
key=CONF_CONDITION,
|
||||||
}
|
)
|
||||||
)
|
|
||||||
if isinstance(value, dict) and CONF_CONDITION in value:
|
|
||||||
return schema(value)
|
|
||||||
return validate_wait_until({CONF_CONDITION: value})
|
|
||||||
|
|
||||||
|
|
||||||
@register_action("wait_until", WaitUntilAction, validate_wait_until)
|
@register_action("wait_until", WaitUntilAction, _validate_wait_until)
|
||||||
async def wait_until_action_to_code(config, action_id, template_arg, args):
|
async def wait_until_action_to_code(config, action_id, template_arg, args):
|
||||||
conditions = await build_condition(config[CONF_CONDITION], template_arg, args)
|
conditions = await build_condition(config[CONF_CONDITION], template_arg, args)
|
||||||
var = cg.new_Pvariable(action_id, template_arg, conditions)
|
var = cg.new_Pvariable(action_id, template_arg, conditions)
|
||||||
|
@ -203,15 +203,6 @@ async def to_code(config):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def maybe_simple_message(schema):
|
|
||||||
def validator(value):
|
|
||||||
if isinstance(value, dict):
|
|
||||||
return cv.Schema(schema)(value)
|
|
||||||
return cv.Schema(schema)({CONF_FORMAT: value})
|
|
||||||
|
|
||||||
return validator
|
|
||||||
|
|
||||||
|
|
||||||
def validate_printf(value):
|
def validate_printf(value):
|
||||||
# https://stackoverflow.com/questions/30011379/how-can-i-parse-a-c-format-string-in-python
|
# https://stackoverflow.com/questions/30011379/how-can-i-parse-a-c-format-string-in-python
|
||||||
cfmt = r"""
|
cfmt = r"""
|
||||||
@ -234,7 +225,7 @@ def validate_printf(value):
|
|||||||
|
|
||||||
CONF_LOGGER_LOG = "logger.log"
|
CONF_LOGGER_LOG = "logger.log"
|
||||||
LOGGER_LOG_ACTION_SCHEMA = cv.All(
|
LOGGER_LOG_ACTION_SCHEMA = cv.All(
|
||||||
maybe_simple_message(
|
cv.maybe_simple_value(
|
||||||
{
|
{
|
||||||
cv.Required(CONF_FORMAT): cv.string,
|
cv.Required(CONF_FORMAT): cv.string,
|
||||||
cv.Optional(CONF_ARGS, default=list): cv.ensure_list(cv.lambda_),
|
cv.Optional(CONF_ARGS, default=list): cv.ensure_list(cv.lambda_),
|
||||||
@ -242,9 +233,10 @@ LOGGER_LOG_ACTION_SCHEMA = cv.All(
|
|||||||
*LOG_LEVEL_TO_ESP_LOG, upper=True
|
*LOG_LEVEL_TO_ESP_LOG, upper=True
|
||||||
),
|
),
|
||||||
cv.Optional(CONF_TAG, default="main"): cv.string,
|
cv.Optional(CONF_TAG, default="main"): cv.string,
|
||||||
}
|
},
|
||||||
),
|
|
||||||
validate_printf,
|
validate_printf,
|
||||||
|
key=CONF_FORMAT,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -71,9 +71,9 @@ SENSOR_VALUE_TYPE = {
|
|||||||
"S_DWORD": SensorValueType.S_DWORD,
|
"S_DWORD": SensorValueType.S_DWORD,
|
||||||
"S_DWORD_R": SensorValueType.S_DWORD_R,
|
"S_DWORD_R": SensorValueType.S_DWORD_R,
|
||||||
"U_QWORD": SensorValueType.U_QWORD,
|
"U_QWORD": SensorValueType.U_QWORD,
|
||||||
"U_QWORDU_R": SensorValueType.U_QWORD_R,
|
"U_QWORD_R": SensorValueType.U_QWORD_R,
|
||||||
"S_QWORD": SensorValueType.S_QWORD,
|
"S_QWORD": SensorValueType.S_QWORD,
|
||||||
"U_QWORD_R": SensorValueType.S_QWORD_R,
|
"S_QWORD_R": SensorValueType.S_QWORD_R,
|
||||||
"FP32": SensorValueType.FP32,
|
"FP32": SensorValueType.FP32,
|
||||||
"FP32_R": SensorValueType.FP32_R,
|
"FP32_R": SensorValueType.FP32_R,
|
||||||
}
|
}
|
||||||
@ -87,9 +87,9 @@ TYPE_REGISTER_MAP = {
|
|||||||
"S_DWORD": 2,
|
"S_DWORD": 2,
|
||||||
"S_DWORD_R": 2,
|
"S_DWORD_R": 2,
|
||||||
"U_QWORD": 4,
|
"U_QWORD": 4,
|
||||||
"U_QWORDU_R": 4,
|
|
||||||
"S_QWORD": 4,
|
|
||||||
"U_QWORD_R": 4,
|
"U_QWORD_R": 4,
|
||||||
|
"S_QWORD": 4,
|
||||||
|
"S_QWORD_R": 4,
|
||||||
"FP32": 2,
|
"FP32": 2,
|
||||||
"FP32_R": 2,
|
"FP32_R": 2,
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,6 @@ import esphome.codegen as cg
|
|||||||
import esphome.config_validation as cv
|
import esphome.config_validation as cv
|
||||||
from esphome.components import select
|
from esphome.components import select
|
||||||
from esphome.const import CONF_ADDRESS, CONF_ID, CONF_LAMBDA, CONF_OPTIMISTIC
|
from esphome.const import CONF_ADDRESS, CONF_ID, CONF_LAMBDA, CONF_OPTIMISTIC
|
||||||
from esphome.jsonschema import jschema_composite
|
|
||||||
|
|
||||||
from .. import (
|
from .. import (
|
||||||
SENSOR_VALUE_TYPE,
|
SENSOR_VALUE_TYPE,
|
||||||
@ -30,7 +29,6 @@ ModbusSelect = modbus_controller_ns.class_(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@jschema_composite
|
|
||||||
def ensure_option_map():
|
def ensure_option_map():
|
||||||
def validator(value):
|
def validator(value):
|
||||||
cv.check_not_templatable(value)
|
cv.check_not_templatable(value)
|
||||||
|
@ -58,7 +58,7 @@ from esphome.core import (
|
|||||||
)
|
)
|
||||||
from esphome.helpers import list_starts_with, add_class_to_obj
|
from esphome.helpers import list_starts_with, add_class_to_obj
|
||||||
from esphome.jsonschema import (
|
from esphome.jsonschema import (
|
||||||
jschema_composite,
|
jschema_list,
|
||||||
jschema_extractor,
|
jschema_extractor,
|
||||||
jschema_registry,
|
jschema_registry,
|
||||||
jschema_typed,
|
jschema_typed,
|
||||||
@ -327,7 +327,7 @@ def boolean(value):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@jschema_composite
|
@jschema_list
|
||||||
def ensure_list(*validators):
|
def ensure_list(*validators):
|
||||||
"""Validate this configuration option to be a list.
|
"""Validate this configuration option to be a list.
|
||||||
|
|
||||||
@ -494,7 +494,11 @@ def templatable(other_validators):
|
|||||||
"""
|
"""
|
||||||
schema = Schema(other_validators)
|
schema = Schema(other_validators)
|
||||||
|
|
||||||
|
@jschema_extractor("templatable")
|
||||||
def validator(value):
|
def validator(value):
|
||||||
|
# pylint: disable=comparison-with-callable
|
||||||
|
if value == jschema_extractor:
|
||||||
|
return other_validators
|
||||||
if isinstance(value, Lambda):
|
if isinstance(value, Lambda):
|
||||||
return returning_lambda(value)
|
return returning_lambda(value)
|
||||||
if isinstance(other_validators, dict):
|
if isinstance(other_validators, dict):
|
||||||
@ -1546,7 +1550,7 @@ def validate_registry(name, registry):
|
|||||||
return ensure_list(validate_registry_entry(name, registry))
|
return ensure_list(validate_registry_entry(name, registry))
|
||||||
|
|
||||||
|
|
||||||
@jschema_composite
|
@jschema_list
|
||||||
def maybe_simple_value(*validators, **kwargs):
|
def maybe_simple_value(*validators, **kwargs):
|
||||||
key = kwargs.pop("key", CONF_VALUE)
|
key = kwargs.pop("key", CONF_VALUE)
|
||||||
validator = All(*validators)
|
validator = All(*validators)
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
"""Helpers to retrieve schema from voluptuous validators.
|
"""Helpers to retrieve schema from voluptuous validators.
|
||||||
|
|
||||||
These are a helper decorators to help get schema from some
|
These are a helper decorators to help get schema from some
|
||||||
components which uses volutuous in a way where validation
|
components which uses voluptuous in a way where validation
|
||||||
is hidden in local functions
|
is hidden in local functions
|
||||||
These decorators should not modify at all what the functions
|
These decorators should not modify at all what the functions
|
||||||
originally do.
|
originally do.
|
||||||
@ -24,7 +24,7 @@ def jschema_extractor(validator_name):
|
|||||||
if EnableJsonSchemaCollect:
|
if EnableJsonSchemaCollect:
|
||||||
|
|
||||||
def decorator(func):
|
def decorator(func):
|
||||||
hidden_schemas[str(func)] = validator_name
|
hidden_schemas[repr(func)] = validator_name
|
||||||
return func
|
return func
|
||||||
|
|
||||||
return decorator
|
return decorator
|
||||||
@ -41,7 +41,7 @@ def jschema_extended(func):
|
|||||||
def decorate(*args, **kwargs):
|
def decorate(*args, **kwargs):
|
||||||
ret = func(*args, **kwargs)
|
ret = func(*args, **kwargs)
|
||||||
assert len(args) == 2
|
assert len(args) == 2
|
||||||
extended_schemas[str(ret)] = args
|
extended_schemas[repr(ret)] = args
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
return decorate
|
return decorate
|
||||||
@ -49,13 +49,13 @@ def jschema_extended(func):
|
|||||||
return func
|
return func
|
||||||
|
|
||||||
|
|
||||||
def jschema_composite(func):
|
def jschema_list(func):
|
||||||
if EnableJsonSchemaCollect:
|
if EnableJsonSchemaCollect:
|
||||||
|
|
||||||
def decorate(*args, **kwargs):
|
def decorate(*args, **kwargs):
|
||||||
ret = func(*args, **kwargs)
|
ret = func(*args, **kwargs)
|
||||||
# args length might be 2, but 2nd is always validator
|
# args length might be 2, but 2nd is always validator
|
||||||
list_schemas[str(ret)] = args
|
list_schemas[repr(ret)] = args
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
return decorate
|
return decorate
|
||||||
@ -67,7 +67,7 @@ def jschema_registry(registry):
|
|||||||
if EnableJsonSchemaCollect:
|
if EnableJsonSchemaCollect:
|
||||||
|
|
||||||
def decorator(func):
|
def decorator(func):
|
||||||
registry_schemas[str(func)] = registry
|
registry_schemas[repr(func)] = registry
|
||||||
return func
|
return func
|
||||||
|
|
||||||
return decorator
|
return decorator
|
||||||
@ -83,7 +83,7 @@ def jschema_typed(func):
|
|||||||
|
|
||||||
def decorate(*args, **kwargs):
|
def decorate(*args, **kwargs):
|
||||||
ret = func(*args, **kwargs)
|
ret = func(*args, **kwargs)
|
||||||
typed_schemas[str(ret)] = (args, kwargs)
|
typed_schemas[repr(ret)] = (args, kwargs)
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
return decorate
|
return decorate
|
||||||
|
@ -70,7 +70,7 @@ def add_definition_array_or_single_object(ref):
|
|||||||
def add_core():
|
def add_core():
|
||||||
from esphome.core.config import CONFIG_SCHEMA
|
from esphome.core.config import CONFIG_SCHEMA
|
||||||
|
|
||||||
base_props["esphome"] = get_jschema("esphome", CONFIG_SCHEMA.schema)
|
base_props["esphome"] = get_jschema("esphome", CONFIG_SCHEMA)
|
||||||
|
|
||||||
|
|
||||||
def add_buses():
|
def add_buses():
|
||||||
@ -216,7 +216,7 @@ def add_components():
|
|||||||
add_module_registries(domain, c.module)
|
add_module_registries(domain, c.module)
|
||||||
add_module_schemas(domain, c.module)
|
add_module_schemas(domain, c.module)
|
||||||
|
|
||||||
# need first to iterate all platforms then iteate components
|
# need first to iterate all platforms then iterate components
|
||||||
# a platform component can have other components as properties,
|
# a platform component can have other components as properties,
|
||||||
# e.g. climate components usually have a temperature sensor
|
# e.g. climate components usually have a temperature sensor
|
||||||
|
|
||||||
@ -325,7 +325,9 @@ def get_entry(parent_key, vschema):
|
|||||||
if DUMP_COMMENTS:
|
if DUMP_COMMENTS:
|
||||||
entry[JSC_COMMENT] = "entry: " + parent_key + "/" + str(vschema)
|
entry[JSC_COMMENT] = "entry: " + parent_key + "/" + str(vschema)
|
||||||
|
|
||||||
if isinstance(vschema, list):
|
if isinstance(vschema, dict):
|
||||||
|
entry = {"what": "is_this"}
|
||||||
|
elif isinstance(vschema, list):
|
||||||
ref = get_jschema(parent_key + "[]", vschema[0])
|
ref = get_jschema(parent_key + "[]", vschema[0])
|
||||||
entry = {"type": "array", "items": ref}
|
entry = {"type": "array", "items": ref}
|
||||||
elif isinstance(vschema, schema_type) and hasattr(vschema, "schema"):
|
elif isinstance(vschema, schema_type) and hasattr(vschema, "schema"):
|
||||||
@ -387,8 +389,10 @@ def get_entry(parent_key, vschema):
|
|||||||
|
|
||||||
v = vschema(None)
|
v = vschema(None)
|
||||||
if isinstance(v, ID):
|
if isinstance(v, ID):
|
||||||
if v.type.base != "script::Script" and (
|
if (
|
||||||
v.type.inherits_from(Trigger) or v.type == Automation
|
v.type.base != "script::Script"
|
||||||
|
and v.type.base != "switch_::Switch"
|
||||||
|
and (v.type.inherits_from(Trigger) or v.type == Automation)
|
||||||
):
|
):
|
||||||
return None
|
return None
|
||||||
entry = {"type": "string", "id_type": v.type.base}
|
entry = {"type": "string", "id_type": v.type.base}
|
||||||
@ -410,6 +414,8 @@ def default_schema():
|
|||||||
|
|
||||||
|
|
||||||
def is_default_schema(jschema):
|
def is_default_schema(jschema):
|
||||||
|
if jschema is None:
|
||||||
|
return False
|
||||||
if is_ref(jschema):
|
if is_ref(jschema):
|
||||||
jschema = unref(jschema)
|
jschema = unref(jschema)
|
||||||
if not jschema:
|
if not jschema:
|
||||||
@ -425,6 +431,9 @@ def get_jschema(path, vschema, create_return_ref=True):
|
|||||||
|
|
||||||
jschema = convert_schema(path, vschema)
|
jschema = convert_schema(path, vschema)
|
||||||
|
|
||||||
|
if jschema is None:
|
||||||
|
return None
|
||||||
|
|
||||||
if is_ref(jschema):
|
if is_ref(jschema):
|
||||||
# this can happen when returned extended
|
# this can happen when returned extended
|
||||||
# schemas where all properties found in previous extended schema
|
# schemas where all properties found in previous extended schema
|
||||||
@ -450,6 +459,9 @@ def get_schema_str(vschema):
|
|||||||
|
|
||||||
|
|
||||||
def create_ref(name, vschema, jschema):
|
def create_ref(name, vschema, jschema):
|
||||||
|
if jschema is None:
|
||||||
|
raise ValueError("Cannot create a ref with null jschema for " + name)
|
||||||
|
|
||||||
if name in schema_names:
|
if name in schema_names:
|
||||||
raise ValueError("Not supported")
|
raise ValueError("Not supported")
|
||||||
|
|
||||||
@ -523,6 +535,15 @@ def convert_schema(path, vschema, un_extend=True):
|
|||||||
extended = ejs.extended_schemas.get(str(vschema))
|
extended = ejs.extended_schemas.get(str(vschema))
|
||||||
if extended:
|
if extended:
|
||||||
lhs = get_jschema(path, extended[0], False)
|
lhs = get_jschema(path, extended[0], False)
|
||||||
|
|
||||||
|
# The midea actions are extending an empty schema (resulted in the templatize not templatizing anything)
|
||||||
|
# this causes a recursion in that this extended looks the same in extended schema as the extended[1]
|
||||||
|
if ejs.extended_schemas.get(str(vschema)) == ejs.extended_schemas.get(
|
||||||
|
str(extended[1])
|
||||||
|
):
|
||||||
|
assert path.startswith("midea_ac")
|
||||||
|
return convert_schema(path, extended[1], False)
|
||||||
|
|
||||||
rhs = get_jschema(path, extended[1], False)
|
rhs = get_jschema(path, extended[1], False)
|
||||||
|
|
||||||
# check if we are not merging properties which are already in base component
|
# check if we are not merging properties which are already in base component
|
||||||
@ -567,6 +588,8 @@ def convert_schema(path, vschema, un_extend=True):
|
|||||||
# we should take the valid schema,
|
# we should take the valid schema,
|
||||||
# commonly all is used to validate a schema, and then a function which
|
# commonly all is used to validate a schema, and then a function which
|
||||||
# is not a schema es also given, get_schema will then return a default_schema()
|
# is not a schema es also given, get_schema will then return a default_schema()
|
||||||
|
if v == dict:
|
||||||
|
continue # this is a dict in the SCHEMA of packages
|
||||||
val_schema = get_jschema(path, v, False)
|
val_schema = get_jschema(path, v, False)
|
||||||
if is_default_schema(val_schema):
|
if is_default_schema(val_schema):
|
||||||
if not output:
|
if not output:
|
||||||
@ -673,6 +696,11 @@ def add_pin_registry():
|
|||||||
|
|
||||||
for mode in ("INPUT", "OUTPUT"):
|
for mode in ("INPUT", "OUTPUT"):
|
||||||
schema_name = f"PIN.GPIO_FULL_{mode}_PIN_SCHEMA"
|
schema_name = f"PIN.GPIO_FULL_{mode}_PIN_SCHEMA"
|
||||||
|
|
||||||
|
# TODO: get pin definitions properly
|
||||||
|
if schema_name not in definitions:
|
||||||
|
definitions[schema_name] = {"type": ["object", "null"], JSC_PROPERTIES: {}}
|
||||||
|
|
||||||
internal = definitions[schema_name]
|
internal = definitions[schema_name]
|
||||||
definitions[schema_name]["additionalItems"] = False
|
definitions[schema_name]["additionalItems"] = False
|
||||||
definitions[f"PIN.{mode}_INTERNAL"] = internal
|
definitions[f"PIN.{mode}_INTERNAL"] = internal
|
||||||
@ -683,9 +711,8 @@ def add_pin_registry():
|
|||||||
definitions[schema_name] = {"oneOf": schemas, "type": ["string", "object"]}
|
definitions[schema_name] = {"oneOf": schemas, "type": ["string", "object"]}
|
||||||
|
|
||||||
for k, v in pin_registry.items():
|
for k, v in pin_registry.items():
|
||||||
pin_jschema = get_jschema(
|
if isinstance(v[1], vol.validators.All):
|
||||||
f"PIN.{mode}_" + k, v[1][0 if mode == "OUTPUT" else 1]
|
pin_jschema = get_jschema(f"PIN.{mode}_" + k, v[1])
|
||||||
)
|
|
||||||
if unref(pin_jschema):
|
if unref(pin_jschema):
|
||||||
pin_jschema["required"] = [k]
|
pin_jschema["required"] = [k]
|
||||||
schemas.append(pin_jschema)
|
schemas.append(pin_jschema)
|
||||||
@ -730,9 +757,9 @@ def dump_schema():
|
|||||||
cv.valid_name,
|
cv.valid_name,
|
||||||
cv.hex_int,
|
cv.hex_int,
|
||||||
cv.hex_int_range,
|
cv.hex_int_range,
|
||||||
pins.output_pin,
|
pins.gpio_output_pin_schema,
|
||||||
pins.input_pin,
|
pins.gpio_input_pin_schema,
|
||||||
pins.input_pullup_pin,
|
pins.gpio_input_pullup_pin_schema,
|
||||||
cv.float_with_unit,
|
cv.float_with_unit,
|
||||||
cv.subscribe_topic,
|
cv.subscribe_topic,
|
||||||
cv.publish_topic,
|
cv.publish_topic,
|
||||||
@ -753,12 +780,12 @@ def dump_schema():
|
|||||||
|
|
||||||
for v in [pins.gpio_input_pin_schema, pins.gpio_input_pullup_pin_schema]:
|
for v in [pins.gpio_input_pin_schema, pins.gpio_input_pullup_pin_schema]:
|
||||||
schema_registry[v] = get_ref("PIN.GPIO_FULL_INPUT_PIN_SCHEMA")
|
schema_registry[v] = get_ref("PIN.GPIO_FULL_INPUT_PIN_SCHEMA")
|
||||||
for v in [pins.internal_gpio_input_pin_schema, pins.input_pin]:
|
for v in [pins.internal_gpio_input_pin_schema, pins.gpio_input_pin_schema]:
|
||||||
schema_registry[v] = get_ref("PIN.INPUT_INTERNAL")
|
schema_registry[v] = get_ref("PIN.INPUT_INTERNAL")
|
||||||
|
|
||||||
for v in [pins.gpio_output_pin_schema, pins.internal_gpio_output_pin_schema]:
|
for v in [pins.gpio_output_pin_schema, pins.internal_gpio_output_pin_schema]:
|
||||||
schema_registry[v] = get_ref("PIN.GPIO_FULL_OUTPUT_PIN_SCHEMA")
|
schema_registry[v] = get_ref("PIN.GPIO_FULL_OUTPUT_PIN_SCHEMA")
|
||||||
for v in [pins.internal_gpio_output_pin_schema, pins.output_pin]:
|
for v in [pins.internal_gpio_output_pin_schema, pins.gpio_output_pin_schema]:
|
||||||
schema_registry[v] = get_ref("PIN.OUTPUT_INTERNAL")
|
schema_registry[v] = get_ref("PIN.OUTPUT_INTERNAL")
|
||||||
|
|
||||||
add_module_schemas("CONFIG", cv)
|
add_module_schemas("CONFIG", cv)
|
||||||
|
813
script/build_language_schema.py
Normal file
813
script/build_language_schema.py
Normal file
@ -0,0 +1,813 @@
|
|||||||
|
import inspect
|
||||||
|
import json
|
||||||
|
import argparse
|
||||||
|
from operator import truediv
|
||||||
|
import os
|
||||||
|
import voluptuous as vol
|
||||||
|
|
||||||
|
# NOTE: Cannot import other esphome components globally as a modification in jsonschema
|
||||||
|
# is needed before modules are loaded
|
||||||
|
import esphome.jsonschema as ejs
|
||||||
|
|
||||||
|
ejs.EnableJsonSchemaCollect = True
|
||||||
|
|
||||||
|
# schema format:
|
||||||
|
# Schemas are splitted in several files in json format, one for core stuff, one for each platform (sensor, binary_sensor, etc) and
|
||||||
|
# one for each component (dallas, sim800l, etc.) component can have schema for root component/hub and also for platform component,
|
||||||
|
# e.g. dallas has hub component which has pin and then has the sensor platform which has sensor name, index, etc.
|
||||||
|
# When files are loaded they are merged in a single object.
|
||||||
|
# The root format is
|
||||||
|
|
||||||
|
S_CONFIG_VAR = "config_var"
|
||||||
|
S_CONFIG_VARS = "config_vars"
|
||||||
|
S_CONFIG_SCHEMA = "CONFIG_SCHEMA"
|
||||||
|
S_COMPONENT = "component"
|
||||||
|
S_COMPONENTS = "components"
|
||||||
|
S_PLATFORMS = "platforms"
|
||||||
|
S_SCHEMA = "schema"
|
||||||
|
S_SCHEMAS = "schemas"
|
||||||
|
S_EXTENDS = "extends"
|
||||||
|
S_TYPE = "type"
|
||||||
|
S_NAME = "name"
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument(
|
||||||
|
"--output-path", default=".", help="Output path", type=os.path.abspath
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
DUMP_RAW = False
|
||||||
|
DUMP_UNKNOWN = False
|
||||||
|
DUMP_PATH = False
|
||||||
|
JSON_DUMP_PRETTY = True
|
||||||
|
|
||||||
|
# store here dynamic load of esphome components
|
||||||
|
components = {}
|
||||||
|
|
||||||
|
schema_core = {}
|
||||||
|
|
||||||
|
# output is where all is built
|
||||||
|
output = {"core": schema_core}
|
||||||
|
# The full generated output is here here
|
||||||
|
schema_full = {"components": output}
|
||||||
|
|
||||||
|
# A string, string map, key is the str(schema) and value is
|
||||||
|
# a tuple, first element is the schema reference and second is the schema path given, the schema reference is needed to test as different schemas have same key
|
||||||
|
known_schemas = {}
|
||||||
|
|
||||||
|
solve_registry = []
|
||||||
|
|
||||||
|
|
||||||
|
def get_component_names():
|
||||||
|
# return [
|
||||||
|
# "esphome",
|
||||||
|
# "esp32",
|
||||||
|
# "esp8266",
|
||||||
|
# "logger",
|
||||||
|
# "sensor",
|
||||||
|
# "remote_receiver",
|
||||||
|
# "binary_sensor",
|
||||||
|
# ]
|
||||||
|
from esphome.loader import CORE_COMPONENTS_PATH
|
||||||
|
|
||||||
|
component_names = ["esphome", "sensor"]
|
||||||
|
|
||||||
|
for d in os.listdir(CORE_COMPONENTS_PATH):
|
||||||
|
if not d.startswith("__") and os.path.isdir(
|
||||||
|
os.path.join(CORE_COMPONENTS_PATH, d)
|
||||||
|
):
|
||||||
|
if d not in component_names:
|
||||||
|
component_names.append(d)
|
||||||
|
|
||||||
|
return component_names
|
||||||
|
|
||||||
|
|
||||||
|
def load_components():
|
||||||
|
from esphome.config import get_component
|
||||||
|
|
||||||
|
for domain in get_component_names():
|
||||||
|
components[domain] = get_component(domain)
|
||||||
|
|
||||||
|
|
||||||
|
load_components()
|
||||||
|
|
||||||
|
# Import esphome after loading components (so schema is tracked)
|
||||||
|
# pylint: disable=wrong-import-position
|
||||||
|
import esphome.core as esphome_core
|
||||||
|
import esphome.config_validation as cv
|
||||||
|
from esphome import automation
|
||||||
|
from esphome import pins
|
||||||
|
from esphome.components import remote_base
|
||||||
|
from esphome.const import CONF_TYPE
|
||||||
|
from esphome.loader import get_platform
|
||||||
|
from esphome.helpers import write_file_if_changed
|
||||||
|
from esphome.util import Registry
|
||||||
|
|
||||||
|
# pylint: enable=wrong-import-position
|
||||||
|
|
||||||
|
|
||||||
|
def write_file(name, obj):
|
||||||
|
full_path = os.path.join(args.output_path, name + ".json")
|
||||||
|
if JSON_DUMP_PRETTY:
|
||||||
|
json_str = json.dumps(obj, indent=2)
|
||||||
|
else:
|
||||||
|
json_str = json.dumps(obj, separators=(",", ":"))
|
||||||
|
write_file_if_changed(full_path, json_str)
|
||||||
|
print(f"Wrote {full_path}")
|
||||||
|
|
||||||
|
|
||||||
|
def register_module_schemas(key, module, manifest=None):
|
||||||
|
for name, schema in module_schemas(module):
|
||||||
|
register_known_schema(key, name, schema)
|
||||||
|
if (
|
||||||
|
manifest and manifest.multi_conf and S_CONFIG_SCHEMA in output[key][S_SCHEMAS]
|
||||||
|
): # not sure about 2nd part of the if, might be useless config (e.g. as3935)
|
||||||
|
output[key][S_SCHEMAS][S_CONFIG_SCHEMA]["is_list"] = True
|
||||||
|
|
||||||
|
|
||||||
|
def register_known_schema(module, name, schema):
|
||||||
|
if module not in output:
|
||||||
|
output[module] = {S_SCHEMAS: {}}
|
||||||
|
config = convert_config(schema, f"{module}/{name}")
|
||||||
|
if S_TYPE not in config:
|
||||||
|
print(f"Config var without type: {module}.{name}")
|
||||||
|
|
||||||
|
output[module][S_SCHEMAS][name] = config
|
||||||
|
repr_schema = repr(schema)
|
||||||
|
if repr_schema in known_schemas:
|
||||||
|
schema_info = known_schemas[repr_schema]
|
||||||
|
schema_info.append((schema, f"{module}.{name}"))
|
||||||
|
else:
|
||||||
|
known_schemas[repr_schema] = [(schema, f"{module}.{name}")]
|
||||||
|
|
||||||
|
|
||||||
|
def module_schemas(module):
|
||||||
|
# This should yield elements in order so extended schemas are resolved properly
|
||||||
|
# To do this we check on the source code where the symbol is seen first. Seems to work.
|
||||||
|
try:
|
||||||
|
module_str = inspect.getsource(module)
|
||||||
|
except TypeError:
|
||||||
|
# improv
|
||||||
|
module_str = ""
|
||||||
|
except OSError:
|
||||||
|
# some empty __init__ files
|
||||||
|
module_str = ""
|
||||||
|
schemas = {}
|
||||||
|
for m_attr_name in dir(module):
|
||||||
|
m_attr_obj = getattr(module, m_attr_name)
|
||||||
|
if isConvertibleSchema(m_attr_obj):
|
||||||
|
schemas[module_str.find(m_attr_name)] = [m_attr_name, m_attr_obj]
|
||||||
|
|
||||||
|
for pos in sorted(schemas.keys()):
|
||||||
|
yield schemas[pos]
|
||||||
|
|
||||||
|
|
||||||
|
found_registries = {}
|
||||||
|
|
||||||
|
# Pin validators keys are the functions in pin which validate the pins
|
||||||
|
pin_validators = {}
|
||||||
|
|
||||||
|
|
||||||
|
def add_pin_validators():
|
||||||
|
for m_attr_name in dir(pins):
|
||||||
|
if "gpio" in m_attr_name:
|
||||||
|
s = pin_validators[repr(getattr(pins, m_attr_name))] = {}
|
||||||
|
if "schema" in m_attr_name:
|
||||||
|
s["schema"] = True # else is just number
|
||||||
|
if "internal" in m_attr_name:
|
||||||
|
s["internal"] = True
|
||||||
|
if "input" in m_attr_name:
|
||||||
|
s["modes"] = ["input"]
|
||||||
|
elif "output" in m_attr_name:
|
||||||
|
s["modes"] = ["output"]
|
||||||
|
else:
|
||||||
|
s["modes"] = []
|
||||||
|
if "pullup" in m_attr_name:
|
||||||
|
s["modes"].append("pullup")
|
||||||
|
from esphome.components.adc import sensor as adc_sensor
|
||||||
|
|
||||||
|
pin_validators[repr(adc_sensor.validate_adc_pin)] = {
|
||||||
|
"internal": True,
|
||||||
|
"modes": ["input"],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def add_module_registries(domain, module):
|
||||||
|
for attr_name in dir(module):
|
||||||
|
attr_obj = getattr(module, attr_name)
|
||||||
|
if isinstance(attr_obj, Registry):
|
||||||
|
if attr_obj == automation.ACTION_REGISTRY:
|
||||||
|
reg_type = "action"
|
||||||
|
reg_domain = "core"
|
||||||
|
found_registries[repr(attr_obj)] = reg_type
|
||||||
|
elif attr_obj == automation.CONDITION_REGISTRY:
|
||||||
|
reg_type = "condition"
|
||||||
|
reg_domain = "core"
|
||||||
|
found_registries[repr(attr_obj)] = reg_type
|
||||||
|
else: # attr_name == "FILTER_REGISTRY":
|
||||||
|
reg_domain = domain
|
||||||
|
reg_type = attr_name.partition("_")[0].lower()
|
||||||
|
found_registries[repr(attr_obj)] = f"{domain}.{reg_type}"
|
||||||
|
|
||||||
|
for name in attr_obj.keys():
|
||||||
|
if "." not in name:
|
||||||
|
reg_entry_name = name
|
||||||
|
else:
|
||||||
|
parts = name.split(".")
|
||||||
|
if len(parts) == 2:
|
||||||
|
reg_domain = parts[0]
|
||||||
|
reg_entry_name = parts[1]
|
||||||
|
else:
|
||||||
|
reg_domain = ".".join([parts[1], parts[0]])
|
||||||
|
reg_entry_name = parts[2]
|
||||||
|
|
||||||
|
if reg_domain not in output:
|
||||||
|
output[reg_domain] = {}
|
||||||
|
if reg_type not in output[reg_domain]:
|
||||||
|
output[reg_domain][reg_type] = {}
|
||||||
|
output[reg_domain][reg_type][reg_entry_name] = convert_config(
|
||||||
|
attr_obj[name].schema, f"{reg_domain}/{reg_type}/{reg_entry_name}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# print(f"{domain} - {attr_name} - {name}")
|
||||||
|
|
||||||
|
|
||||||
|
def do_pins():
|
||||||
|
# do pin registries
|
||||||
|
pins_providers = schema_core["pins"] = []
|
||||||
|
for pin_registry in pins.PIN_SCHEMA_REGISTRY:
|
||||||
|
s = convert_config(
|
||||||
|
pins.PIN_SCHEMA_REGISTRY[pin_registry][1], f"pins/{pin_registry}"
|
||||||
|
)
|
||||||
|
if pin_registry not in output:
|
||||||
|
output[pin_registry] = {} # mcp23xxx does not create a component yet
|
||||||
|
output[pin_registry]["pin"] = s
|
||||||
|
pins_providers.append(pin_registry)
|
||||||
|
|
||||||
|
|
||||||
|
def do_esp32():
|
||||||
|
import esphome.components.esp32.boards as esp32_boards
|
||||||
|
|
||||||
|
setEnum(
|
||||||
|
output["esp32"]["schemas"]["CONFIG_SCHEMA"]["schema"]["config_vars"]["board"],
|
||||||
|
list(esp32_boards.BOARD_TO_VARIANT.keys()),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def do_esp8266():
|
||||||
|
import esphome.components.esp8266.boards as esp8266_boards
|
||||||
|
|
||||||
|
setEnum(
|
||||||
|
output["esp8266"]["schemas"]["CONFIG_SCHEMA"]["schema"]["config_vars"]["board"],
|
||||||
|
list(esp8266_boards.ESP8266_BOARD_PINS.keys()),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def fix_remote_receiver():
|
||||||
|
output["remote_receiver.binary_sensor"]["schemas"]["CONFIG_SCHEMA"] = {
|
||||||
|
"type": "schema",
|
||||||
|
"schema": {
|
||||||
|
"extends": ["binary_sensor.BINARY_SENSOR_SCHEMA", "core.COMPONENT_SCHEMA"],
|
||||||
|
"config_vars": output["remote_base"]["binary"],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def add_referenced_recursive(referenced_schemas, config_var, path, eat_schema=False):
|
||||||
|
assert (
|
||||||
|
S_CONFIG_VARS not in config_var and S_EXTENDS not in config_var
|
||||||
|
) # S_TYPE in cv or "key" in cv or len(cv) == 0
|
||||||
|
if (
|
||||||
|
config_var.get(S_TYPE) in ["schema", "trigger", "maybe"]
|
||||||
|
and S_SCHEMA in config_var
|
||||||
|
):
|
||||||
|
schema = config_var[S_SCHEMA]
|
||||||
|
for k, v in schema.get(S_CONFIG_VARS, {}).items():
|
||||||
|
if eat_schema:
|
||||||
|
new_path = path + [S_CONFIG_VARS, k]
|
||||||
|
else:
|
||||||
|
new_path = path + ["schema", S_CONFIG_VARS, k]
|
||||||
|
add_referenced_recursive(referenced_schemas, v, new_path)
|
||||||
|
for k in schema.get(S_EXTENDS, []):
|
||||||
|
if k not in referenced_schemas:
|
||||||
|
referenced_schemas[k] = [path]
|
||||||
|
else:
|
||||||
|
if path not in referenced_schemas[k]:
|
||||||
|
referenced_schemas[k].append(path)
|
||||||
|
|
||||||
|
s1 = get_str_path_schema(k)
|
||||||
|
p = k.split(".")
|
||||||
|
if len(p) == 3 and path[0] == f"{p[0]}.{p[1]}":
|
||||||
|
# special case for schema inside platforms
|
||||||
|
add_referenced_recursive(
|
||||||
|
referenced_schemas, s1, [path[0], "schemas", p[2]]
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
add_referenced_recursive(
|
||||||
|
referenced_schemas, s1, [p[0], "schemas", p[1]]
|
||||||
|
)
|
||||||
|
elif config_var.get(S_TYPE) == "typed":
|
||||||
|
for tk, tv in config_var.get("types").items():
|
||||||
|
add_referenced_recursive(
|
||||||
|
referenced_schemas,
|
||||||
|
{
|
||||||
|
S_TYPE: S_SCHEMA,
|
||||||
|
S_SCHEMA: tv,
|
||||||
|
},
|
||||||
|
path + ["types", tk],
|
||||||
|
eat_schema=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_str_path_schema(strPath):
|
||||||
|
parts = strPath.split(".")
|
||||||
|
if len(parts) > 2:
|
||||||
|
parts[0] += "." + parts[1]
|
||||||
|
parts[1] = parts[2]
|
||||||
|
s1 = output.get(parts[0], {}).get(S_SCHEMAS, {}).get(parts[1], {})
|
||||||
|
return s1
|
||||||
|
|
||||||
|
|
||||||
|
def pop_str_path_schema(strPath):
|
||||||
|
parts = strPath.split(".")
|
||||||
|
if len(parts) > 2:
|
||||||
|
parts[0] += "." + parts[1]
|
||||||
|
parts[1] = parts[2]
|
||||||
|
output.get(parts[0], {}).get(S_SCHEMAS, {}).pop(parts[1])
|
||||||
|
|
||||||
|
|
||||||
|
def get_arr_path_schema(path):
|
||||||
|
s = output
|
||||||
|
for x in path:
|
||||||
|
s = s[x]
|
||||||
|
return s
|
||||||
|
|
||||||
|
|
||||||
|
def merge(source, destination):
|
||||||
|
"""
|
||||||
|
run me with nosetests --with-doctest file.py
|
||||||
|
|
||||||
|
>>> a = { 'first' : { 'all_rows' : { 'pass' : 'dog', 'number' : '1' } } }
|
||||||
|
>>> b = { 'first' : { 'all_rows' : { 'fail' : 'cat', 'number' : '5' } } }
|
||||||
|
>>> merge(b, a) == { 'first' : { 'all_rows' : { 'pass' : 'dog', 'fail' : 'cat', 'number' : '5' } } }
|
||||||
|
True
|
||||||
|
"""
|
||||||
|
for key, value in source.items():
|
||||||
|
if isinstance(value, dict):
|
||||||
|
# get node or create one
|
||||||
|
node = destination.setdefault(key, {})
|
||||||
|
merge(value, node)
|
||||||
|
else:
|
||||||
|
destination[key] = value
|
||||||
|
|
||||||
|
return destination
|
||||||
|
|
||||||
|
|
||||||
|
def shrink():
|
||||||
|
"""Shrink the extending schemas which has just an end type, e.g. at this point
|
||||||
|
ota / port is type schema with extended pointing to core.port, this should instead be
|
||||||
|
type number. core.port is number
|
||||||
|
|
||||||
|
This also fixes enums, as they are another schema and they are instead put in the same cv
|
||||||
|
"""
|
||||||
|
|
||||||
|
# referenced_schemas contains a dict, keys are all that are shown in extends: [] arrays, values are lists of paths that are pointing to that extend
|
||||||
|
# e.g. key: core.COMPONENT_SCHEMA has a lot of paths of config vars which extends this schema
|
||||||
|
|
||||||
|
pass_again = True
|
||||||
|
|
||||||
|
while pass_again:
|
||||||
|
pass_again = False
|
||||||
|
|
||||||
|
referenced_schemas = {}
|
||||||
|
|
||||||
|
for k, v in output.items():
|
||||||
|
for kv, vv in v.items():
|
||||||
|
if kv != "pin" and isinstance(vv, dict):
|
||||||
|
for kvv, vvv in vv.items():
|
||||||
|
add_referenced_recursive(referenced_schemas, vvv, [k, kv, kvv])
|
||||||
|
|
||||||
|
for x, paths in referenced_schemas.items():
|
||||||
|
if len(paths) == 1:
|
||||||
|
key_s = get_str_path_schema(x)
|
||||||
|
arr_s = get_arr_path_schema(paths[0])
|
||||||
|
# key_s |= arr_s
|
||||||
|
# key_s.pop(S_EXTENDS)
|
||||||
|
pass_again = True
|
||||||
|
if S_SCHEMA in arr_s:
|
||||||
|
if S_EXTENDS in arr_s[S_SCHEMA]:
|
||||||
|
arr_s[S_SCHEMA].pop(S_EXTENDS)
|
||||||
|
else:
|
||||||
|
print("expected extends here!" + x)
|
||||||
|
arr_s = merge(key_s, arr_s)
|
||||||
|
if arr_s[S_TYPE] == "enum":
|
||||||
|
arr_s.pop(S_SCHEMA)
|
||||||
|
else:
|
||||||
|
arr_s.pop(S_EXTENDS)
|
||||||
|
arr_s |= key_s[S_SCHEMA]
|
||||||
|
print(x)
|
||||||
|
|
||||||
|
# simple types should be spread on each component,
|
||||||
|
# for enums so far these are logger.is_log_level, cover.validate_cover_state and pulse_counter.sensor.COUNT_MODE_SCHEMA
|
||||||
|
# then for some reasons sensor filter registry falls here
|
||||||
|
# then are all simple types, integer and strings
|
||||||
|
for x, paths in referenced_schemas.items():
|
||||||
|
key_s = get_str_path_schema(x)
|
||||||
|
if key_s and key_s[S_TYPE] in ["enum", "registry", "integer", "string"]:
|
||||||
|
if key_s[S_TYPE] == "registry":
|
||||||
|
print("Spreading registry: " + x)
|
||||||
|
for target in paths:
|
||||||
|
target_s = get_arr_path_schema(target)
|
||||||
|
assert target_s[S_SCHEMA][S_EXTENDS] == [x]
|
||||||
|
target_s.pop(S_SCHEMA)
|
||||||
|
target_s |= key_s
|
||||||
|
if key_s[S_TYPE] in ["integer", "string"]:
|
||||||
|
target_s["data_type"] = x.split(".")[1]
|
||||||
|
# remove this dangling again
|
||||||
|
pop_str_path_schema(x)
|
||||||
|
elif not key_s:
|
||||||
|
for target in paths:
|
||||||
|
target_s = get_arr_path_schema(target)
|
||||||
|
assert target_s[S_SCHEMA][S_EXTENDS] == [x]
|
||||||
|
target_s.pop(S_SCHEMA)
|
||||||
|
target_s.pop(S_TYPE) # undefined
|
||||||
|
target_s["data_type"] = x.split(".")[1]
|
||||||
|
# remove this dangling again
|
||||||
|
pop_str_path_schema(x)
|
||||||
|
|
||||||
|
# remove dangling items (unreachable schemas)
|
||||||
|
for domain, domain_schemas in output.items():
|
||||||
|
for schema_name in list(domain_schemas.get(S_SCHEMAS, {}).keys()):
|
||||||
|
s = f"{domain}.{schema_name}"
|
||||||
|
if (
|
||||||
|
not s.endswith("." + S_CONFIG_SCHEMA)
|
||||||
|
and s not in referenced_schemas.keys()
|
||||||
|
):
|
||||||
|
print(f"Removing {s}")
|
||||||
|
output[domain][S_SCHEMAS].pop(schema_name)
|
||||||
|
|
||||||
|
|
||||||
|
def build_schema():
|
||||||
|
print("Building schema")
|
||||||
|
|
||||||
|
# check esphome was not loaded globally (IDE auto imports)
|
||||||
|
if len(ejs.extended_schemas) == 0:
|
||||||
|
raise Exception(
|
||||||
|
"no data collected. Did you globally import an ESPHome component?"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Core schema
|
||||||
|
schema_core[S_SCHEMAS] = {}
|
||||||
|
register_module_schemas("core", cv)
|
||||||
|
|
||||||
|
platforms = {}
|
||||||
|
schema_core[S_PLATFORMS] = platforms
|
||||||
|
core_components = {}
|
||||||
|
schema_core[S_COMPONENTS] = core_components
|
||||||
|
|
||||||
|
add_pin_validators()
|
||||||
|
|
||||||
|
# Load a preview of each component
|
||||||
|
for domain, manifest in components.items():
|
||||||
|
if manifest.is_platform_component:
|
||||||
|
# e.g. sensor, binary sensor, add S_COMPONENTS
|
||||||
|
# note: S_COMPONENTS is not filled until loaded, e.g.
|
||||||
|
# if lock: is not used, then we don't need to know about their
|
||||||
|
# platforms yet.
|
||||||
|
output[domain] = {S_COMPONENTS: {}, S_SCHEMAS: {}}
|
||||||
|
platforms[domain] = {}
|
||||||
|
elif manifest.config_schema is not None:
|
||||||
|
# e.g. dallas
|
||||||
|
output[domain] = {S_SCHEMAS: {S_CONFIG_SCHEMA: {}}}
|
||||||
|
|
||||||
|
# Generate platforms (e.g. sensor, binary_sensor, climate )
|
||||||
|
for domain in platforms:
|
||||||
|
c = components[domain]
|
||||||
|
register_module_schemas(domain, c.module)
|
||||||
|
|
||||||
|
# Generate components
|
||||||
|
for domain, manifest in components.items():
|
||||||
|
if domain not in platforms:
|
||||||
|
if manifest.config_schema is not None:
|
||||||
|
core_components[domain] = {}
|
||||||
|
register_module_schemas(domain, manifest.module, manifest)
|
||||||
|
|
||||||
|
for platform in platforms:
|
||||||
|
platform_manifest = get_platform(domain=platform, platform=domain)
|
||||||
|
if platform_manifest is not None:
|
||||||
|
output[platform][S_COMPONENTS][domain] = {}
|
||||||
|
register_module_schemas(
|
||||||
|
f"{domain}.{platform}", platform_manifest.module
|
||||||
|
)
|
||||||
|
|
||||||
|
# Do registries
|
||||||
|
add_module_registries("core", automation)
|
||||||
|
for domain, manifest in components.items():
|
||||||
|
add_module_registries(domain, manifest.module)
|
||||||
|
add_module_registries("remote_base", remote_base)
|
||||||
|
|
||||||
|
# update props pointing to registries
|
||||||
|
for reg_config_var in solve_registry:
|
||||||
|
(registry, config_var) = reg_config_var
|
||||||
|
config_var[S_TYPE] = "registry"
|
||||||
|
config_var["registry"] = found_registries[repr(registry)]
|
||||||
|
|
||||||
|
do_pins()
|
||||||
|
do_esp8266()
|
||||||
|
do_esp32()
|
||||||
|
fix_remote_receiver()
|
||||||
|
shrink()
|
||||||
|
|
||||||
|
# aggregate components, so all component info is in same file, otherwise we have dallas.json, dallas.sensor.json, etc.
|
||||||
|
data = {}
|
||||||
|
for component, component_schemas in output.items():
|
||||||
|
if "." in component:
|
||||||
|
key = component.partition(".")[0]
|
||||||
|
if key not in data:
|
||||||
|
data[key] = {}
|
||||||
|
data[key][component] = component_schemas
|
||||||
|
else:
|
||||||
|
if component not in data:
|
||||||
|
data[component] = {}
|
||||||
|
data[component] |= {component: component_schemas}
|
||||||
|
|
||||||
|
# bundle core inside esphome
|
||||||
|
data["esphome"]["core"] = data.pop("core")["core"]
|
||||||
|
|
||||||
|
for c, s in data.items():
|
||||||
|
write_file(c, s)
|
||||||
|
|
||||||
|
|
||||||
|
def setEnum(obj, items):
|
||||||
|
obj[S_TYPE] = "enum"
|
||||||
|
obj["values"] = items
|
||||||
|
|
||||||
|
|
||||||
|
def isConvertibleSchema(schema):
|
||||||
|
if schema is None:
|
||||||
|
return False
|
||||||
|
if isinstance(schema, (cv.Schema, cv.All)):
|
||||||
|
return True
|
||||||
|
if repr(schema) in ejs.hidden_schemas:
|
||||||
|
return True
|
||||||
|
if repr(schema) in ejs.typed_schemas:
|
||||||
|
return True
|
||||||
|
if repr(schema) in ejs.list_schemas:
|
||||||
|
return True
|
||||||
|
if repr(schema) in ejs.registry_schemas:
|
||||||
|
return True
|
||||||
|
if isinstance(schema, dict):
|
||||||
|
for k in schema.keys():
|
||||||
|
if isinstance(k, (cv.Required, cv.Optional)):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def convert_config(schema, path):
|
||||||
|
converted = {}
|
||||||
|
convert_1(schema, converted, path)
|
||||||
|
return converted
|
||||||
|
|
||||||
|
|
||||||
|
def convert_1(schema, config_var, path):
|
||||||
|
"""config_var can be a config_var or a schema: both are dicts
|
||||||
|
config_var has a S_TYPE property, if this is S_SCHEMA, then it has a S_SCHEMA property
|
||||||
|
schema does not have a type property, schema can have optionally both S_CONFIG_VARS and S_EXTENDS
|
||||||
|
"""
|
||||||
|
repr_schema = repr(schema)
|
||||||
|
|
||||||
|
if repr_schema in known_schemas:
|
||||||
|
schema_info = known_schemas[(repr_schema)]
|
||||||
|
for (schema_instance, name) in schema_info:
|
||||||
|
if schema_instance is schema:
|
||||||
|
assert S_CONFIG_VARS not in config_var
|
||||||
|
assert S_EXTENDS not in config_var
|
||||||
|
if not S_TYPE in config_var:
|
||||||
|
config_var[S_TYPE] = S_SCHEMA
|
||||||
|
assert config_var[S_TYPE] == S_SCHEMA
|
||||||
|
|
||||||
|
if S_SCHEMA not in config_var:
|
||||||
|
config_var[S_SCHEMA] = {}
|
||||||
|
if S_EXTENDS not in config_var[S_SCHEMA]:
|
||||||
|
config_var[S_SCHEMA][S_EXTENDS] = [name]
|
||||||
|
else:
|
||||||
|
config_var[S_SCHEMA][S_EXTENDS].append(name)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Extended schemas are tracked when the .extend() is used in a schema
|
||||||
|
if repr_schema in ejs.extended_schemas:
|
||||||
|
extended = ejs.extended_schemas.get(repr_schema)
|
||||||
|
# The midea actions are extending an empty schema (resulted in the templatize not templatizing anything)
|
||||||
|
# this causes a recursion in that this extended looks the same in extended schema as the extended[1]
|
||||||
|
if repr_schema == repr(extended[1]):
|
||||||
|
assert path.startswith("midea_ac/")
|
||||||
|
return
|
||||||
|
|
||||||
|
assert len(extended) == 2
|
||||||
|
convert_1(extended[0], config_var, path + "/extL")
|
||||||
|
convert_1(extended[1], config_var, path + "/extR")
|
||||||
|
return
|
||||||
|
|
||||||
|
if isinstance(schema, cv.All):
|
||||||
|
i = 0
|
||||||
|
for inner in schema.validators:
|
||||||
|
i = i + 1
|
||||||
|
convert_1(inner, config_var, path + f"/val {i}")
|
||||||
|
return
|
||||||
|
|
||||||
|
if hasattr(schema, "validators"):
|
||||||
|
i = 0
|
||||||
|
for inner in schema.validators:
|
||||||
|
i = i + 1
|
||||||
|
convert_1(inner, config_var, path + f"/val {i}")
|
||||||
|
|
||||||
|
if isinstance(schema, cv.Schema):
|
||||||
|
convert_1(schema.schema, config_var, path + "/all")
|
||||||
|
return
|
||||||
|
|
||||||
|
if isinstance(schema, dict):
|
||||||
|
convert_keys(config_var, schema, path)
|
||||||
|
return
|
||||||
|
|
||||||
|
if repr_schema in ejs.list_schemas:
|
||||||
|
config_var["is_list"] = True
|
||||||
|
items_schema = ejs.list_schemas[repr_schema][0]
|
||||||
|
convert_1(items_schema, config_var, path + "/list")
|
||||||
|
return
|
||||||
|
|
||||||
|
if DUMP_RAW:
|
||||||
|
config_var["raw"] = repr_schema
|
||||||
|
|
||||||
|
# pylint: disable=comparison-with-callable
|
||||||
|
if schema == cv.boolean:
|
||||||
|
config_var[S_TYPE] = "boolean"
|
||||||
|
elif schema == automation.validate_potentially_and_condition:
|
||||||
|
config_var[S_TYPE] = "registry"
|
||||||
|
config_var["registry"] = "condition"
|
||||||
|
elif schema == cv.int_ or schema == cv.int_range:
|
||||||
|
config_var[S_TYPE] = "integer"
|
||||||
|
elif schema == cv.string or schema == cv.string_strict or schema == cv.valid_name:
|
||||||
|
config_var[S_TYPE] = "string"
|
||||||
|
|
||||||
|
elif isinstance(schema, vol.Schema):
|
||||||
|
# test: esphome/project
|
||||||
|
config_var[S_TYPE] = "schema"
|
||||||
|
config_var["schema"] = convert_config(schema.schema, path + "/s")["schema"]
|
||||||
|
|
||||||
|
elif repr_schema in pin_validators:
|
||||||
|
config_var |= pin_validators[repr_schema]
|
||||||
|
config_var[S_TYPE] = "pin"
|
||||||
|
|
||||||
|
elif repr_schema in ejs.hidden_schemas:
|
||||||
|
schema_type = ejs.hidden_schemas[repr_schema]
|
||||||
|
|
||||||
|
data = schema(ejs.jschema_extractor)
|
||||||
|
|
||||||
|
# enums, e.g. esp32/variant
|
||||||
|
if schema_type == "one_of":
|
||||||
|
config_var[S_TYPE] = "enum"
|
||||||
|
config_var["values"] = list(data)
|
||||||
|
elif schema_type == "enum":
|
||||||
|
config_var[S_TYPE] = "enum"
|
||||||
|
config_var["values"] = list(data.keys())
|
||||||
|
elif schema_type == "maybe":
|
||||||
|
config_var[S_TYPE] = "maybe"
|
||||||
|
config_var["schema"] = convert_config(data, path + "/maybe")["schema"]
|
||||||
|
# esphome/on_boot
|
||||||
|
elif schema_type == "automation":
|
||||||
|
extra_schema = None
|
||||||
|
config_var[S_TYPE] = "trigger"
|
||||||
|
if automation.AUTOMATION_SCHEMA == ejs.extended_schemas[repr(data)][0]:
|
||||||
|
extra_schema = ejs.extended_schemas[repr(data)][1]
|
||||||
|
if (
|
||||||
|
extra_schema is not None and len(extra_schema) > 1
|
||||||
|
): # usually only trigger_id here
|
||||||
|
config = convert_config(extra_schema, path + "/extra")
|
||||||
|
if "schema" in config:
|
||||||
|
automation_schema = config["schema"]
|
||||||
|
if not (
|
||||||
|
len(automation_schema["config_vars"]) == 1
|
||||||
|
and "trigger_id" in automation_schema["config_vars"]
|
||||||
|
):
|
||||||
|
automation_schema["config_vars"]["then"] = {S_TYPE: "trigger"}
|
||||||
|
if "trigger_id" in automation_schema["config_vars"]:
|
||||||
|
automation_schema["config_vars"].pop("trigger_id")
|
||||||
|
|
||||||
|
config_var[S_TYPE] = "trigger"
|
||||||
|
config_var["schema"] = automation_schema
|
||||||
|
# some triggers can have a list of actions directly, while others needs to have some other configuration,
|
||||||
|
# e.g. sensor.on_value_rang, and the list of actions is only accepted under "then" property.
|
||||||
|
try:
|
||||||
|
schema({"delay": "1s"})
|
||||||
|
except cv.Invalid:
|
||||||
|
config_var["has_required_var"] = True
|
||||||
|
else:
|
||||||
|
print("figure out " + path)
|
||||||
|
elif schema_type == "effects":
|
||||||
|
config_var[S_TYPE] = "registry"
|
||||||
|
config_var["registry"] = "light.effects"
|
||||||
|
config_var["filter"] = data[0]
|
||||||
|
elif schema_type == "templatable":
|
||||||
|
config_var["templatable"] = True
|
||||||
|
convert_1(data, config_var, path + "/templat")
|
||||||
|
elif schema_type == "triggers":
|
||||||
|
# remote base
|
||||||
|
convert_1(data, config_var, path + "/trigger")
|
||||||
|
elif schema_type == "sensor":
|
||||||
|
schema = data
|
||||||
|
convert_1(data, config_var, path + "/trigger")
|
||||||
|
else:
|
||||||
|
raise Exception("Unknown extracted schema type")
|
||||||
|
|
||||||
|
elif repr_schema in ejs.registry_schemas:
|
||||||
|
solve_registry.append((ejs.registry_schemas[repr_schema], config_var))
|
||||||
|
|
||||||
|
elif repr_schema in ejs.typed_schemas:
|
||||||
|
config_var[S_TYPE] = "typed"
|
||||||
|
types = config_var["types"] = {}
|
||||||
|
typed_schema = ejs.typed_schemas[repr_schema]
|
||||||
|
if len(typed_schema) > 1:
|
||||||
|
config_var["typed_key"] = typed_schema[1].get("key", CONF_TYPE)
|
||||||
|
for schema_key, schema_type in typed_schema[0][0].items():
|
||||||
|
config = convert_config(schema_type, path + "/type_" + schema_key)
|
||||||
|
types[schema_key] = config["schema"]
|
||||||
|
|
||||||
|
elif DUMP_UNKNOWN:
|
||||||
|
if S_TYPE not in config_var:
|
||||||
|
config_var["unknown"] = repr_schema
|
||||||
|
|
||||||
|
if DUMP_PATH:
|
||||||
|
config_var["path"] = path
|
||||||
|
|
||||||
|
|
||||||
|
def get_overridden_config(key, converted):
|
||||||
|
# check if the key is in any extended schema in this converted schema, i.e.
|
||||||
|
# if we see a on_value_range in a dallas sensor, then this is overridden because
|
||||||
|
# it is already defined in sensor
|
||||||
|
assert S_CONFIG_VARS not in converted and S_EXTENDS not in converted
|
||||||
|
config = converted.get(S_SCHEMA, {})
|
||||||
|
|
||||||
|
return get_overridden_key_inner(key, config, {})
|
||||||
|
|
||||||
|
|
||||||
|
def get_overridden_key_inner(key, config, ret):
|
||||||
|
if S_EXTENDS not in config:
|
||||||
|
return ret
|
||||||
|
for s in config[S_EXTENDS]:
|
||||||
|
p = s.partition(".")
|
||||||
|
s1 = output.get(p[0], {}).get(S_SCHEMAS, {}).get(p[2], {}).get(S_SCHEMA)
|
||||||
|
if s1:
|
||||||
|
if key in s1.get(S_CONFIG_VARS, {}):
|
||||||
|
for k, v in s1.get(S_CONFIG_VARS)[key].items():
|
||||||
|
if k not in ret: # keep most overridden
|
||||||
|
ret[k] = v
|
||||||
|
get_overridden_key_inner(key, s1, ret)
|
||||||
|
|
||||||
|
return ret
|
||||||
|
|
||||||
|
|
||||||
|
def convert_keys(converted, schema, path):
|
||||||
|
for k, v in schema.items():
|
||||||
|
# deprecated stuff
|
||||||
|
if repr(v).startswith("<function invalid"):
|
||||||
|
continue
|
||||||
|
|
||||||
|
result = {}
|
||||||
|
|
||||||
|
if isinstance(k, cv.GenerateID):
|
||||||
|
result["key"] = "GeneratedID"
|
||||||
|
elif isinstance(k, cv.Required):
|
||||||
|
result["key"] = "Required"
|
||||||
|
elif (
|
||||||
|
isinstance(k, cv.Optional)
|
||||||
|
or isinstance(k, cv.Inclusive)
|
||||||
|
or isinstance(k, cv.Exclusive)
|
||||||
|
):
|
||||||
|
result["key"] = "Optional"
|
||||||
|
else:
|
||||||
|
converted["key"] = "String"
|
||||||
|
converted["key_dump"] = str(k)
|
||||||
|
|
||||||
|
esphome_core.CORE.data = {
|
||||||
|
esphome_core.KEY_CORE: {esphome_core.KEY_TARGET_PLATFORM: "esp8266"}
|
||||||
|
}
|
||||||
|
if hasattr(k, "default") and str(k.default) != "...":
|
||||||
|
default_value = k.default()
|
||||||
|
if default_value is not None:
|
||||||
|
result["default"] = str(default_value)
|
||||||
|
|
||||||
|
# Do value
|
||||||
|
convert_1(v, result, path + f"/{str(k)}")
|
||||||
|
if "schema" not in converted:
|
||||||
|
converted[S_TYPE] = "schema"
|
||||||
|
converted["schema"] = {S_CONFIG_VARS: {}}
|
||||||
|
if S_CONFIG_VARS not in converted["schema"]:
|
||||||
|
converted["schema"][S_CONFIG_VARS] = {}
|
||||||
|
for base_k, base_v in get_overridden_config(k, converted).items():
|
||||||
|
if base_k in result and base_v == result[base_k]:
|
||||||
|
result.pop(base_k)
|
||||||
|
converted["schema"][S_CONFIG_VARS][str(k)] = result
|
||||||
|
|
||||||
|
|
||||||
|
build_schema()
|
Loading…
Reference in New Issue
Block a user