diff --git a/esphome/automation.py b/esphome/automation.py index fab998527f..4007dc4c51 100644 --- a/esphome/automation.py +++ b/esphome/automation.py @@ -262,21 +262,16 @@ async def repeat_action_to_code(config, action_id, template_arg, args): return var -def validate_wait_until(value): - schema = cv.Schema( - { - cv.Required(CONF_CONDITION): validate_potentially_and_condition, - cv.Optional(CONF_TIMEOUT): cv.templatable( - cv.positive_time_period_milliseconds - ), - } - ) - if isinstance(value, dict) and CONF_CONDITION in value: - return schema(value) - return validate_wait_until({CONF_CONDITION: value}) +_validate_wait_until = cv.maybe_simple_value( + { + cv.Required(CONF_CONDITION): validate_potentially_and_condition, + cv.Optional(CONF_TIMEOUT): cv.templatable(cv.positive_time_period_milliseconds), + }, + key=CONF_CONDITION, +) -@register_action("wait_until", WaitUntilAction, validate_wait_until) +@register_action("wait_until", WaitUntilAction, _validate_wait_until) async def wait_until_action_to_code(config, action_id, template_arg, args): conditions = await build_condition(config[CONF_CONDITION], template_arg, args) var = cg.new_Pvariable(action_id, template_arg, conditions) diff --git a/esphome/components/logger/__init__.py b/esphome/components/logger/__init__.py index 20a0b0f792..d11b00405d 100644 --- a/esphome/components/logger/__init__.py +++ b/esphome/components/logger/__init__.py @@ -203,15 +203,6 @@ async def to_code(config): ) -def maybe_simple_message(schema): - def validator(value): - if isinstance(value, dict): - return cv.Schema(schema)(value) - return cv.Schema(schema)({CONF_FORMAT: value}) - - return validator - - def validate_printf(value): # https://stackoverflow.com/questions/30011379/how-can-i-parse-a-c-format-string-in-python cfmt = r""" @@ -234,7 +225,7 @@ def validate_printf(value): CONF_LOGGER_LOG = "logger.log" LOGGER_LOG_ACTION_SCHEMA = cv.All( - maybe_simple_message( + cv.maybe_simple_value( { cv.Required(CONF_FORMAT): cv.string, cv.Optional(CONF_ARGS, default=list): cv.ensure_list(cv.lambda_), @@ -242,9 +233,10 @@ LOGGER_LOG_ACTION_SCHEMA = cv.All( *LOG_LEVEL_TO_ESP_LOG, upper=True ), cv.Optional(CONF_TAG, default="main"): cv.string, - } - ), - validate_printf, + }, + validate_printf, + key=CONF_FORMAT, + ) ) diff --git a/esphome/components/modbus_controller/__init__.py b/esphome/components/modbus_controller/__init__.py index f919cb0678..41beb67e1c 100644 --- a/esphome/components/modbus_controller/__init__.py +++ b/esphome/components/modbus_controller/__init__.py @@ -71,9 +71,9 @@ SENSOR_VALUE_TYPE = { "S_DWORD": SensorValueType.S_DWORD, "S_DWORD_R": SensorValueType.S_DWORD_R, "U_QWORD": SensorValueType.U_QWORD, - "U_QWORDU_R": SensorValueType.U_QWORD_R, + "U_QWORD_R": SensorValueType.U_QWORD_R, "S_QWORD": SensorValueType.S_QWORD, - "U_QWORD_R": SensorValueType.S_QWORD_R, + "S_QWORD_R": SensorValueType.S_QWORD_R, "FP32": SensorValueType.FP32, "FP32_R": SensorValueType.FP32_R, } @@ -87,9 +87,9 @@ TYPE_REGISTER_MAP = { "S_DWORD": 2, "S_DWORD_R": 2, "U_QWORD": 4, - "U_QWORDU_R": 4, - "S_QWORD": 4, "U_QWORD_R": 4, + "S_QWORD": 4, + "S_QWORD_R": 4, "FP32": 2, "FP32_R": 2, } diff --git a/esphome/components/modbus_controller/select/__init__.py b/esphome/components/modbus_controller/select/__init__.py index 6f194ef2a3..f8ef61ddc4 100644 --- a/esphome/components/modbus_controller/select/__init__.py +++ b/esphome/components/modbus_controller/select/__init__.py @@ -2,7 +2,6 @@ import esphome.codegen as cg import esphome.config_validation as cv from esphome.components import select from esphome.const import CONF_ADDRESS, CONF_ID, CONF_LAMBDA, CONF_OPTIMISTIC -from esphome.jsonschema import jschema_composite from .. import ( SENSOR_VALUE_TYPE, @@ -30,7 +29,6 @@ ModbusSelect = modbus_controller_ns.class_( ) -@jschema_composite def ensure_option_map(): def validator(value): cv.check_not_templatable(value) diff --git a/esphome/config_validation.py b/esphome/config_validation.py index 8e1c63a54e..3dc8011a87 100644 --- a/esphome/config_validation.py +++ b/esphome/config_validation.py @@ -58,7 +58,7 @@ from esphome.core import ( ) from esphome.helpers import list_starts_with, add_class_to_obj from esphome.jsonschema import ( - jschema_composite, + jschema_list, jschema_extractor, jschema_registry, jschema_typed, @@ -327,7 +327,7 @@ def boolean(value): ) -@jschema_composite +@jschema_list def ensure_list(*validators): """Validate this configuration option to be a list. @@ -494,7 +494,11 @@ def templatable(other_validators): """ schema = Schema(other_validators) + @jschema_extractor("templatable") def validator(value): + # pylint: disable=comparison-with-callable + if value == jschema_extractor: + return other_validators if isinstance(value, Lambda): return returning_lambda(value) if isinstance(other_validators, dict): @@ -1546,7 +1550,7 @@ def validate_registry(name, registry): return ensure_list(validate_registry_entry(name, registry)) -@jschema_composite +@jschema_list def maybe_simple_value(*validators, **kwargs): key = kwargs.pop("key", CONF_VALUE) validator = All(*validators) diff --git a/esphome/jsonschema.py b/esphome/jsonschema.py index 12929dc602..94325f4abc 100644 --- a/esphome/jsonschema.py +++ b/esphome/jsonschema.py @@ -1,7 +1,7 @@ """Helpers to retrieve schema from voluptuous validators. These are a helper decorators to help get schema from some -components which uses volutuous in a way where validation +components which uses voluptuous in a way where validation is hidden in local functions These decorators should not modify at all what the functions originally do. @@ -24,7 +24,7 @@ def jschema_extractor(validator_name): if EnableJsonSchemaCollect: def decorator(func): - hidden_schemas[str(func)] = validator_name + hidden_schemas[repr(func)] = validator_name return func return decorator @@ -41,7 +41,7 @@ def jschema_extended(func): def decorate(*args, **kwargs): ret = func(*args, **kwargs) assert len(args) == 2 - extended_schemas[str(ret)] = args + extended_schemas[repr(ret)] = args return ret return decorate @@ -49,13 +49,13 @@ def jschema_extended(func): return func -def jschema_composite(func): +def jschema_list(func): if EnableJsonSchemaCollect: def decorate(*args, **kwargs): ret = func(*args, **kwargs) # args length might be 2, but 2nd is always validator - list_schemas[str(ret)] = args + list_schemas[repr(ret)] = args return ret return decorate @@ -67,7 +67,7 @@ def jschema_registry(registry): if EnableJsonSchemaCollect: def decorator(func): - registry_schemas[str(func)] = registry + registry_schemas[repr(func)] = registry return func return decorator @@ -83,7 +83,7 @@ def jschema_typed(func): def decorate(*args, **kwargs): ret = func(*args, **kwargs) - typed_schemas[str(ret)] = (args, kwargs) + typed_schemas[repr(ret)] = (args, kwargs) return ret return decorate diff --git a/script/build_jsonschema.py b/script/build_jsonschema.py index 7673519916..5373d404a7 100644 --- a/script/build_jsonschema.py +++ b/script/build_jsonschema.py @@ -70,7 +70,7 @@ def add_definition_array_or_single_object(ref): def add_core(): from esphome.core.config import CONFIG_SCHEMA - base_props["esphome"] = get_jschema("esphome", CONFIG_SCHEMA.schema) + base_props["esphome"] = get_jschema("esphome", CONFIG_SCHEMA) def add_buses(): @@ -216,7 +216,7 @@ def add_components(): add_module_registries(domain, c.module) add_module_schemas(domain, c.module) - # need first to iterate all platforms then iteate components + # need first to iterate all platforms then iterate components # a platform component can have other components as properties, # e.g. climate components usually have a temperature sensor @@ -325,7 +325,9 @@ def get_entry(parent_key, vschema): if DUMP_COMMENTS: entry[JSC_COMMENT] = "entry: " + parent_key + "/" + str(vschema) - if isinstance(vschema, list): + if isinstance(vschema, dict): + entry = {"what": "is_this"} + elif isinstance(vschema, list): ref = get_jschema(parent_key + "[]", vschema[0]) entry = {"type": "array", "items": ref} elif isinstance(vschema, schema_type) and hasattr(vschema, "schema"): @@ -387,8 +389,10 @@ def get_entry(parent_key, vschema): v = vschema(None) if isinstance(v, ID): - if v.type.base != "script::Script" and ( - v.type.inherits_from(Trigger) or v.type == Automation + if ( + v.type.base != "script::Script" + and v.type.base != "switch_::Switch" + and (v.type.inherits_from(Trigger) or v.type == Automation) ): return None entry = {"type": "string", "id_type": v.type.base} @@ -410,6 +414,8 @@ def default_schema(): def is_default_schema(jschema): + if jschema is None: + return False if is_ref(jschema): jschema = unref(jschema) if not jschema: @@ -425,6 +431,9 @@ def get_jschema(path, vschema, create_return_ref=True): jschema = convert_schema(path, vschema) + if jschema is None: + return None + if is_ref(jschema): # this can happen when returned extended # schemas where all properties found in previous extended schema @@ -450,6 +459,9 @@ def get_schema_str(vschema): def create_ref(name, vschema, jschema): + if jschema is None: + raise ValueError("Cannot create a ref with null jschema for " + name) + if name in schema_names: raise ValueError("Not supported") @@ -523,6 +535,15 @@ def convert_schema(path, vschema, un_extend=True): extended = ejs.extended_schemas.get(str(vschema)) if extended: lhs = get_jschema(path, extended[0], False) + + # The midea actions are extending an empty schema (resulted in the templatize not templatizing anything) + # this causes a recursion in that this extended looks the same in extended schema as the extended[1] + if ejs.extended_schemas.get(str(vschema)) == ejs.extended_schemas.get( + str(extended[1]) + ): + assert path.startswith("midea_ac") + return convert_schema(path, extended[1], False) + rhs = get_jschema(path, extended[1], False) # check if we are not merging properties which are already in base component @@ -567,6 +588,8 @@ def convert_schema(path, vschema, un_extend=True): # we should take the valid schema, # commonly all is used to validate a schema, and then a function which # is not a schema es also given, get_schema will then return a default_schema() + if v == dict: + continue # this is a dict in the SCHEMA of packages val_schema = get_jschema(path, v, False) if is_default_schema(val_schema): if not output: @@ -673,6 +696,11 @@ def add_pin_registry(): for mode in ("INPUT", "OUTPUT"): schema_name = f"PIN.GPIO_FULL_{mode}_PIN_SCHEMA" + + # TODO: get pin definitions properly + if schema_name not in definitions: + definitions[schema_name] = {"type": ["object", "null"], JSC_PROPERTIES: {}} + internal = definitions[schema_name] definitions[schema_name]["additionalItems"] = False definitions[f"PIN.{mode}_INTERNAL"] = internal @@ -683,12 +711,11 @@ def add_pin_registry(): definitions[schema_name] = {"oneOf": schemas, "type": ["string", "object"]} for k, v in pin_registry.items(): - pin_jschema = get_jschema( - f"PIN.{mode}_" + k, v[1][0 if mode == "OUTPUT" else 1] - ) - if unref(pin_jschema): - pin_jschema["required"] = [k] - schemas.append(pin_jschema) + if isinstance(v[1], vol.validators.All): + pin_jschema = get_jschema(f"PIN.{mode}_" + k, v[1]) + if unref(pin_jschema): + pin_jschema["required"] = [k] + schemas.append(pin_jschema) def dump_schema(): @@ -730,9 +757,9 @@ def dump_schema(): cv.valid_name, cv.hex_int, cv.hex_int_range, - pins.output_pin, - pins.input_pin, - pins.input_pullup_pin, + pins.gpio_output_pin_schema, + pins.gpio_input_pin_schema, + pins.gpio_input_pullup_pin_schema, cv.float_with_unit, cv.subscribe_topic, cv.publish_topic, @@ -753,12 +780,12 @@ def dump_schema(): for v in [pins.gpio_input_pin_schema, pins.gpio_input_pullup_pin_schema]: schema_registry[v] = get_ref("PIN.GPIO_FULL_INPUT_PIN_SCHEMA") - for v in [pins.internal_gpio_input_pin_schema, pins.input_pin]: + for v in [pins.internal_gpio_input_pin_schema, pins.gpio_input_pin_schema]: schema_registry[v] = get_ref("PIN.INPUT_INTERNAL") for v in [pins.gpio_output_pin_schema, pins.internal_gpio_output_pin_schema]: schema_registry[v] = get_ref("PIN.GPIO_FULL_OUTPUT_PIN_SCHEMA") - for v in [pins.internal_gpio_output_pin_schema, pins.output_pin]: + for v in [pins.internal_gpio_output_pin_schema, pins.gpio_output_pin_schema]: schema_registry[v] = get_ref("PIN.OUTPUT_INTERNAL") add_module_schemas("CONFIG", cv) diff --git a/script/build_language_schema.py b/script/build_language_schema.py new file mode 100644 index 0000000000..ec0912e9e6 --- /dev/null +++ b/script/build_language_schema.py @@ -0,0 +1,813 @@ +import inspect +import json +import argparse +from operator import truediv +import os +import voluptuous as vol + +# NOTE: Cannot import other esphome components globally as a modification in jsonschema +# is needed before modules are loaded +import esphome.jsonschema as ejs + +ejs.EnableJsonSchemaCollect = True + +# schema format: +# Schemas are splitted in several files in json format, one for core stuff, one for each platform (sensor, binary_sensor, etc) and +# one for each component (dallas, sim800l, etc.) component can have schema for root component/hub and also for platform component, +# e.g. dallas has hub component which has pin and then has the sensor platform which has sensor name, index, etc. +# When files are loaded they are merged in a single object. +# The root format is + +S_CONFIG_VAR = "config_var" +S_CONFIG_VARS = "config_vars" +S_CONFIG_SCHEMA = "CONFIG_SCHEMA" +S_COMPONENT = "component" +S_COMPONENTS = "components" +S_PLATFORMS = "platforms" +S_SCHEMA = "schema" +S_SCHEMAS = "schemas" +S_EXTENDS = "extends" +S_TYPE = "type" +S_NAME = "name" + +parser = argparse.ArgumentParser() +parser.add_argument( + "--output-path", default=".", help="Output path", type=os.path.abspath +) + +args = parser.parse_args() + +DUMP_RAW = False +DUMP_UNKNOWN = False +DUMP_PATH = False +JSON_DUMP_PRETTY = True + +# store here dynamic load of esphome components +components = {} + +schema_core = {} + +# output is where all is built +output = {"core": schema_core} +# The full generated output is here here +schema_full = {"components": output} + +# A string, string map, key is the str(schema) and value is +# a tuple, first element is the schema reference and second is the schema path given, the schema reference is needed to test as different schemas have same key +known_schemas = {} + +solve_registry = [] + + +def get_component_names(): + # return [ + # "esphome", + # "esp32", + # "esp8266", + # "logger", + # "sensor", + # "remote_receiver", + # "binary_sensor", + # ] + from esphome.loader import CORE_COMPONENTS_PATH + + component_names = ["esphome", "sensor"] + + for d in os.listdir(CORE_COMPONENTS_PATH): + if not d.startswith("__") and os.path.isdir( + os.path.join(CORE_COMPONENTS_PATH, d) + ): + if d not in component_names: + component_names.append(d) + + return component_names + + +def load_components(): + from esphome.config import get_component + + for domain in get_component_names(): + components[domain] = get_component(domain) + + +load_components() + +# Import esphome after loading components (so schema is tracked) +# pylint: disable=wrong-import-position +import esphome.core as esphome_core +import esphome.config_validation as cv +from esphome import automation +from esphome import pins +from esphome.components import remote_base +from esphome.const import CONF_TYPE +from esphome.loader import get_platform +from esphome.helpers import write_file_if_changed +from esphome.util import Registry + +# pylint: enable=wrong-import-position + + +def write_file(name, obj): + full_path = os.path.join(args.output_path, name + ".json") + if JSON_DUMP_PRETTY: + json_str = json.dumps(obj, indent=2) + else: + json_str = json.dumps(obj, separators=(",", ":")) + write_file_if_changed(full_path, json_str) + print(f"Wrote {full_path}") + + +def register_module_schemas(key, module, manifest=None): + for name, schema in module_schemas(module): + register_known_schema(key, name, schema) + if ( + manifest and manifest.multi_conf and S_CONFIG_SCHEMA in output[key][S_SCHEMAS] + ): # not sure about 2nd part of the if, might be useless config (e.g. as3935) + output[key][S_SCHEMAS][S_CONFIG_SCHEMA]["is_list"] = True + + +def register_known_schema(module, name, schema): + if module not in output: + output[module] = {S_SCHEMAS: {}} + config = convert_config(schema, f"{module}/{name}") + if S_TYPE not in config: + print(f"Config var without type: {module}.{name}") + + output[module][S_SCHEMAS][name] = config + repr_schema = repr(schema) + if repr_schema in known_schemas: + schema_info = known_schemas[repr_schema] + schema_info.append((schema, f"{module}.{name}")) + else: + known_schemas[repr_schema] = [(schema, f"{module}.{name}")] + + +def module_schemas(module): + # This should yield elements in order so extended schemas are resolved properly + # To do this we check on the source code where the symbol is seen first. Seems to work. + try: + module_str = inspect.getsource(module) + except TypeError: + # improv + module_str = "" + except OSError: + # some empty __init__ files + module_str = "" + schemas = {} + for m_attr_name in dir(module): + m_attr_obj = getattr(module, m_attr_name) + if isConvertibleSchema(m_attr_obj): + schemas[module_str.find(m_attr_name)] = [m_attr_name, m_attr_obj] + + for pos in sorted(schemas.keys()): + yield schemas[pos] + + +found_registries = {} + +# Pin validators keys are the functions in pin which validate the pins +pin_validators = {} + + +def add_pin_validators(): + for m_attr_name in dir(pins): + if "gpio" in m_attr_name: + s = pin_validators[repr(getattr(pins, m_attr_name))] = {} + if "schema" in m_attr_name: + s["schema"] = True # else is just number + if "internal" in m_attr_name: + s["internal"] = True + if "input" in m_attr_name: + s["modes"] = ["input"] + elif "output" in m_attr_name: + s["modes"] = ["output"] + else: + s["modes"] = [] + if "pullup" in m_attr_name: + s["modes"].append("pullup") + from esphome.components.adc import sensor as adc_sensor + + pin_validators[repr(adc_sensor.validate_adc_pin)] = { + "internal": True, + "modes": ["input"], + } + + +def add_module_registries(domain, module): + for attr_name in dir(module): + attr_obj = getattr(module, attr_name) + if isinstance(attr_obj, Registry): + if attr_obj == automation.ACTION_REGISTRY: + reg_type = "action" + reg_domain = "core" + found_registries[repr(attr_obj)] = reg_type + elif attr_obj == automation.CONDITION_REGISTRY: + reg_type = "condition" + reg_domain = "core" + found_registries[repr(attr_obj)] = reg_type + else: # attr_name == "FILTER_REGISTRY": + reg_domain = domain + reg_type = attr_name.partition("_")[0].lower() + found_registries[repr(attr_obj)] = f"{domain}.{reg_type}" + + for name in attr_obj.keys(): + if "." not in name: + reg_entry_name = name + else: + parts = name.split(".") + if len(parts) == 2: + reg_domain = parts[0] + reg_entry_name = parts[1] + else: + reg_domain = ".".join([parts[1], parts[0]]) + reg_entry_name = parts[2] + + if reg_domain not in output: + output[reg_domain] = {} + if reg_type not in output[reg_domain]: + output[reg_domain][reg_type] = {} + output[reg_domain][reg_type][reg_entry_name] = convert_config( + attr_obj[name].schema, f"{reg_domain}/{reg_type}/{reg_entry_name}" + ) + + # print(f"{domain} - {attr_name} - {name}") + + +def do_pins(): + # do pin registries + pins_providers = schema_core["pins"] = [] + for pin_registry in pins.PIN_SCHEMA_REGISTRY: + s = convert_config( + pins.PIN_SCHEMA_REGISTRY[pin_registry][1], f"pins/{pin_registry}" + ) + if pin_registry not in output: + output[pin_registry] = {} # mcp23xxx does not create a component yet + output[pin_registry]["pin"] = s + pins_providers.append(pin_registry) + + +def do_esp32(): + import esphome.components.esp32.boards as esp32_boards + + setEnum( + output["esp32"]["schemas"]["CONFIG_SCHEMA"]["schema"]["config_vars"]["board"], + list(esp32_boards.BOARD_TO_VARIANT.keys()), + ) + + +def do_esp8266(): + import esphome.components.esp8266.boards as esp8266_boards + + setEnum( + output["esp8266"]["schemas"]["CONFIG_SCHEMA"]["schema"]["config_vars"]["board"], + list(esp8266_boards.ESP8266_BOARD_PINS.keys()), + ) + + +def fix_remote_receiver(): + output["remote_receiver.binary_sensor"]["schemas"]["CONFIG_SCHEMA"] = { + "type": "schema", + "schema": { + "extends": ["binary_sensor.BINARY_SENSOR_SCHEMA", "core.COMPONENT_SCHEMA"], + "config_vars": output["remote_base"]["binary"], + }, + } + + +def add_referenced_recursive(referenced_schemas, config_var, path, eat_schema=False): + assert ( + S_CONFIG_VARS not in config_var and S_EXTENDS not in config_var + ) # S_TYPE in cv or "key" in cv or len(cv) == 0 + if ( + config_var.get(S_TYPE) in ["schema", "trigger", "maybe"] + and S_SCHEMA in config_var + ): + schema = config_var[S_SCHEMA] + for k, v in schema.get(S_CONFIG_VARS, {}).items(): + if eat_schema: + new_path = path + [S_CONFIG_VARS, k] + else: + new_path = path + ["schema", S_CONFIG_VARS, k] + add_referenced_recursive(referenced_schemas, v, new_path) + for k in schema.get(S_EXTENDS, []): + if k not in referenced_schemas: + referenced_schemas[k] = [path] + else: + if path not in referenced_schemas[k]: + referenced_schemas[k].append(path) + + s1 = get_str_path_schema(k) + p = k.split(".") + if len(p) == 3 and path[0] == f"{p[0]}.{p[1]}": + # special case for schema inside platforms + add_referenced_recursive( + referenced_schemas, s1, [path[0], "schemas", p[2]] + ) + else: + add_referenced_recursive( + referenced_schemas, s1, [p[0], "schemas", p[1]] + ) + elif config_var.get(S_TYPE) == "typed": + for tk, tv in config_var.get("types").items(): + add_referenced_recursive( + referenced_schemas, + { + S_TYPE: S_SCHEMA, + S_SCHEMA: tv, + }, + path + ["types", tk], + eat_schema=True, + ) + + +def get_str_path_schema(strPath): + parts = strPath.split(".") + if len(parts) > 2: + parts[0] += "." + parts[1] + parts[1] = parts[2] + s1 = output.get(parts[0], {}).get(S_SCHEMAS, {}).get(parts[1], {}) + return s1 + + +def pop_str_path_schema(strPath): + parts = strPath.split(".") + if len(parts) > 2: + parts[0] += "." + parts[1] + parts[1] = parts[2] + output.get(parts[0], {}).get(S_SCHEMAS, {}).pop(parts[1]) + + +def get_arr_path_schema(path): + s = output + for x in path: + s = s[x] + return s + + +def merge(source, destination): + """ + run me with nosetests --with-doctest file.py + + >>> a = { 'first' : { 'all_rows' : { 'pass' : 'dog', 'number' : '1' } } } + >>> b = { 'first' : { 'all_rows' : { 'fail' : 'cat', 'number' : '5' } } } + >>> merge(b, a) == { 'first' : { 'all_rows' : { 'pass' : 'dog', 'fail' : 'cat', 'number' : '5' } } } + True + """ + for key, value in source.items(): + if isinstance(value, dict): + # get node or create one + node = destination.setdefault(key, {}) + merge(value, node) + else: + destination[key] = value + + return destination + + +def shrink(): + """Shrink the extending schemas which has just an end type, e.g. at this point + ota / port is type schema with extended pointing to core.port, this should instead be + type number. core.port is number + + This also fixes enums, as they are another schema and they are instead put in the same cv + """ + + # referenced_schemas contains a dict, keys are all that are shown in extends: [] arrays, values are lists of paths that are pointing to that extend + # e.g. key: core.COMPONENT_SCHEMA has a lot of paths of config vars which extends this schema + + pass_again = True + + while pass_again: + pass_again = False + + referenced_schemas = {} + + for k, v in output.items(): + for kv, vv in v.items(): + if kv != "pin" and isinstance(vv, dict): + for kvv, vvv in vv.items(): + add_referenced_recursive(referenced_schemas, vvv, [k, kv, kvv]) + + for x, paths in referenced_schemas.items(): + if len(paths) == 1: + key_s = get_str_path_schema(x) + arr_s = get_arr_path_schema(paths[0]) + # key_s |= arr_s + # key_s.pop(S_EXTENDS) + pass_again = True + if S_SCHEMA in arr_s: + if S_EXTENDS in arr_s[S_SCHEMA]: + arr_s[S_SCHEMA].pop(S_EXTENDS) + else: + print("expected extends here!" + x) + arr_s = merge(key_s, arr_s) + if arr_s[S_TYPE] == "enum": + arr_s.pop(S_SCHEMA) + else: + arr_s.pop(S_EXTENDS) + arr_s |= key_s[S_SCHEMA] + print(x) + + # simple types should be spread on each component, + # for enums so far these are logger.is_log_level, cover.validate_cover_state and pulse_counter.sensor.COUNT_MODE_SCHEMA + # then for some reasons sensor filter registry falls here + # then are all simple types, integer and strings + for x, paths in referenced_schemas.items(): + key_s = get_str_path_schema(x) + if key_s and key_s[S_TYPE] in ["enum", "registry", "integer", "string"]: + if key_s[S_TYPE] == "registry": + print("Spreading registry: " + x) + for target in paths: + target_s = get_arr_path_schema(target) + assert target_s[S_SCHEMA][S_EXTENDS] == [x] + target_s.pop(S_SCHEMA) + target_s |= key_s + if key_s[S_TYPE] in ["integer", "string"]: + target_s["data_type"] = x.split(".")[1] + # remove this dangling again + pop_str_path_schema(x) + elif not key_s: + for target in paths: + target_s = get_arr_path_schema(target) + assert target_s[S_SCHEMA][S_EXTENDS] == [x] + target_s.pop(S_SCHEMA) + target_s.pop(S_TYPE) # undefined + target_s["data_type"] = x.split(".")[1] + # remove this dangling again + pop_str_path_schema(x) + + # remove dangling items (unreachable schemas) + for domain, domain_schemas in output.items(): + for schema_name in list(domain_schemas.get(S_SCHEMAS, {}).keys()): + s = f"{domain}.{schema_name}" + if ( + not s.endswith("." + S_CONFIG_SCHEMA) + and s not in referenced_schemas.keys() + ): + print(f"Removing {s}") + output[domain][S_SCHEMAS].pop(schema_name) + + +def build_schema(): + print("Building schema") + + # check esphome was not loaded globally (IDE auto imports) + if len(ejs.extended_schemas) == 0: + raise Exception( + "no data collected. Did you globally import an ESPHome component?" + ) + + # Core schema + schema_core[S_SCHEMAS] = {} + register_module_schemas("core", cv) + + platforms = {} + schema_core[S_PLATFORMS] = platforms + core_components = {} + schema_core[S_COMPONENTS] = core_components + + add_pin_validators() + + # Load a preview of each component + for domain, manifest in components.items(): + if manifest.is_platform_component: + # e.g. sensor, binary sensor, add S_COMPONENTS + # note: S_COMPONENTS is not filled until loaded, e.g. + # if lock: is not used, then we don't need to know about their + # platforms yet. + output[domain] = {S_COMPONENTS: {}, S_SCHEMAS: {}} + platforms[domain] = {} + elif manifest.config_schema is not None: + # e.g. dallas + output[domain] = {S_SCHEMAS: {S_CONFIG_SCHEMA: {}}} + + # Generate platforms (e.g. sensor, binary_sensor, climate ) + for domain in platforms: + c = components[domain] + register_module_schemas(domain, c.module) + + # Generate components + for domain, manifest in components.items(): + if domain not in platforms: + if manifest.config_schema is not None: + core_components[domain] = {} + register_module_schemas(domain, manifest.module, manifest) + + for platform in platforms: + platform_manifest = get_platform(domain=platform, platform=domain) + if platform_manifest is not None: + output[platform][S_COMPONENTS][domain] = {} + register_module_schemas( + f"{domain}.{platform}", platform_manifest.module + ) + + # Do registries + add_module_registries("core", automation) + for domain, manifest in components.items(): + add_module_registries(domain, manifest.module) + add_module_registries("remote_base", remote_base) + + # update props pointing to registries + for reg_config_var in solve_registry: + (registry, config_var) = reg_config_var + config_var[S_TYPE] = "registry" + config_var["registry"] = found_registries[repr(registry)] + + do_pins() + do_esp8266() + do_esp32() + fix_remote_receiver() + shrink() + + # aggregate components, so all component info is in same file, otherwise we have dallas.json, dallas.sensor.json, etc. + data = {} + for component, component_schemas in output.items(): + if "." in component: + key = component.partition(".")[0] + if key not in data: + data[key] = {} + data[key][component] = component_schemas + else: + if component not in data: + data[component] = {} + data[component] |= {component: component_schemas} + + # bundle core inside esphome + data["esphome"]["core"] = data.pop("core")["core"] + + for c, s in data.items(): + write_file(c, s) + + +def setEnum(obj, items): + obj[S_TYPE] = "enum" + obj["values"] = items + + +def isConvertibleSchema(schema): + if schema is None: + return False + if isinstance(schema, (cv.Schema, cv.All)): + return True + if repr(schema) in ejs.hidden_schemas: + return True + if repr(schema) in ejs.typed_schemas: + return True + if repr(schema) in ejs.list_schemas: + return True + if repr(schema) in ejs.registry_schemas: + return True + if isinstance(schema, dict): + for k in schema.keys(): + if isinstance(k, (cv.Required, cv.Optional)): + return True + return False + + +def convert_config(schema, path): + converted = {} + convert_1(schema, converted, path) + return converted + + +def convert_1(schema, config_var, path): + """config_var can be a config_var or a schema: both are dicts + config_var has a S_TYPE property, if this is S_SCHEMA, then it has a S_SCHEMA property + schema does not have a type property, schema can have optionally both S_CONFIG_VARS and S_EXTENDS + """ + repr_schema = repr(schema) + + if repr_schema in known_schemas: + schema_info = known_schemas[(repr_schema)] + for (schema_instance, name) in schema_info: + if schema_instance is schema: + assert S_CONFIG_VARS not in config_var + assert S_EXTENDS not in config_var + if not S_TYPE in config_var: + config_var[S_TYPE] = S_SCHEMA + assert config_var[S_TYPE] == S_SCHEMA + + if S_SCHEMA not in config_var: + config_var[S_SCHEMA] = {} + if S_EXTENDS not in config_var[S_SCHEMA]: + config_var[S_SCHEMA][S_EXTENDS] = [name] + else: + config_var[S_SCHEMA][S_EXTENDS].append(name) + return + + # Extended schemas are tracked when the .extend() is used in a schema + if repr_schema in ejs.extended_schemas: + extended = ejs.extended_schemas.get(repr_schema) + # The midea actions are extending an empty schema (resulted in the templatize not templatizing anything) + # this causes a recursion in that this extended looks the same in extended schema as the extended[1] + if repr_schema == repr(extended[1]): + assert path.startswith("midea_ac/") + return + + assert len(extended) == 2 + convert_1(extended[0], config_var, path + "/extL") + convert_1(extended[1], config_var, path + "/extR") + return + + if isinstance(schema, cv.All): + i = 0 + for inner in schema.validators: + i = i + 1 + convert_1(inner, config_var, path + f"/val {i}") + return + + if hasattr(schema, "validators"): + i = 0 + for inner in schema.validators: + i = i + 1 + convert_1(inner, config_var, path + f"/val {i}") + + if isinstance(schema, cv.Schema): + convert_1(schema.schema, config_var, path + "/all") + return + + if isinstance(schema, dict): + convert_keys(config_var, schema, path) + return + + if repr_schema in ejs.list_schemas: + config_var["is_list"] = True + items_schema = ejs.list_schemas[repr_schema][0] + convert_1(items_schema, config_var, path + "/list") + return + + if DUMP_RAW: + config_var["raw"] = repr_schema + + # pylint: disable=comparison-with-callable + if schema == cv.boolean: + config_var[S_TYPE] = "boolean" + elif schema == automation.validate_potentially_and_condition: + config_var[S_TYPE] = "registry" + config_var["registry"] = "condition" + elif schema == cv.int_ or schema == cv.int_range: + config_var[S_TYPE] = "integer" + elif schema == cv.string or schema == cv.string_strict or schema == cv.valid_name: + config_var[S_TYPE] = "string" + + elif isinstance(schema, vol.Schema): + # test: esphome/project + config_var[S_TYPE] = "schema" + config_var["schema"] = convert_config(schema.schema, path + "/s")["schema"] + + elif repr_schema in pin_validators: + config_var |= pin_validators[repr_schema] + config_var[S_TYPE] = "pin" + + elif repr_schema in ejs.hidden_schemas: + schema_type = ejs.hidden_schemas[repr_schema] + + data = schema(ejs.jschema_extractor) + + # enums, e.g. esp32/variant + if schema_type == "one_of": + config_var[S_TYPE] = "enum" + config_var["values"] = list(data) + elif schema_type == "enum": + config_var[S_TYPE] = "enum" + config_var["values"] = list(data.keys()) + elif schema_type == "maybe": + config_var[S_TYPE] = "maybe" + config_var["schema"] = convert_config(data, path + "/maybe")["schema"] + # esphome/on_boot + elif schema_type == "automation": + extra_schema = None + config_var[S_TYPE] = "trigger" + if automation.AUTOMATION_SCHEMA == ejs.extended_schemas[repr(data)][0]: + extra_schema = ejs.extended_schemas[repr(data)][1] + if ( + extra_schema is not None and len(extra_schema) > 1 + ): # usually only trigger_id here + config = convert_config(extra_schema, path + "/extra") + if "schema" in config: + automation_schema = config["schema"] + if not ( + len(automation_schema["config_vars"]) == 1 + and "trigger_id" in automation_schema["config_vars"] + ): + automation_schema["config_vars"]["then"] = {S_TYPE: "trigger"} + if "trigger_id" in automation_schema["config_vars"]: + automation_schema["config_vars"].pop("trigger_id") + + config_var[S_TYPE] = "trigger" + config_var["schema"] = automation_schema + # some triggers can have a list of actions directly, while others needs to have some other configuration, + # e.g. sensor.on_value_rang, and the list of actions is only accepted under "then" property. + try: + schema({"delay": "1s"}) + except cv.Invalid: + config_var["has_required_var"] = True + else: + print("figure out " + path) + elif schema_type == "effects": + config_var[S_TYPE] = "registry" + config_var["registry"] = "light.effects" + config_var["filter"] = data[0] + elif schema_type == "templatable": + config_var["templatable"] = True + convert_1(data, config_var, path + "/templat") + elif schema_type == "triggers": + # remote base + convert_1(data, config_var, path + "/trigger") + elif schema_type == "sensor": + schema = data + convert_1(data, config_var, path + "/trigger") + else: + raise Exception("Unknown extracted schema type") + + elif repr_schema in ejs.registry_schemas: + solve_registry.append((ejs.registry_schemas[repr_schema], config_var)) + + elif repr_schema in ejs.typed_schemas: + config_var[S_TYPE] = "typed" + types = config_var["types"] = {} + typed_schema = ejs.typed_schemas[repr_schema] + if len(typed_schema) > 1: + config_var["typed_key"] = typed_schema[1].get("key", CONF_TYPE) + for schema_key, schema_type in typed_schema[0][0].items(): + config = convert_config(schema_type, path + "/type_" + schema_key) + types[schema_key] = config["schema"] + + elif DUMP_UNKNOWN: + if S_TYPE not in config_var: + config_var["unknown"] = repr_schema + + if DUMP_PATH: + config_var["path"] = path + + +def get_overridden_config(key, converted): + # check if the key is in any extended schema in this converted schema, i.e. + # if we see a on_value_range in a dallas sensor, then this is overridden because + # it is already defined in sensor + assert S_CONFIG_VARS not in converted and S_EXTENDS not in converted + config = converted.get(S_SCHEMA, {}) + + return get_overridden_key_inner(key, config, {}) + + +def get_overridden_key_inner(key, config, ret): + if S_EXTENDS not in config: + return ret + for s in config[S_EXTENDS]: + p = s.partition(".") + s1 = output.get(p[0], {}).get(S_SCHEMAS, {}).get(p[2], {}).get(S_SCHEMA) + if s1: + if key in s1.get(S_CONFIG_VARS, {}): + for k, v in s1.get(S_CONFIG_VARS)[key].items(): + if k not in ret: # keep most overridden + ret[k] = v + get_overridden_key_inner(key, s1, ret) + + return ret + + +def convert_keys(converted, schema, path): + for k, v in schema.items(): + # deprecated stuff + if repr(v).startswith("