mirror of
https://github.com/esphome/esphome.git
synced 2024-11-09 09:51:20 +01:00
853 lines
29 KiB
Python
853 lines
29 KiB
Python
import collections
|
|
import importlib
|
|
import logging
|
|
import re
|
|
import os.path
|
|
|
|
# pylint: disable=unused-import, wrong-import-order
|
|
import sys
|
|
from contextlib import contextmanager
|
|
|
|
import voluptuous as vol
|
|
|
|
from esphome import core, core_config, yaml_util
|
|
from esphome.const import CONF_ESPHOME, CONF_PLATFORM, ESP_PLATFORMS, CONF_PACKAGES, \
|
|
CONF_SUBSTITUTIONS
|
|
from esphome.core import CORE, EsphomeError # noqa
|
|
from esphome.helpers import color, indent
|
|
from esphome.util import safe_print, OrderedDict
|
|
|
|
from typing import List, Optional, Tuple, Union # noqa
|
|
from esphome.core import ConfigType # noqa
|
|
from esphome.yaml_util import is_secret, ESPHomeDataBase, ESPForceValue
|
|
from esphome.voluptuous_schema import ExtraKeysInvalid
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
_COMPONENT_CACHE = {}
|
|
|
|
|
|
class ComponentManifest:
|
|
def __init__(self, module, base_components_path, is_core=False, is_platform=False):
|
|
self.module = module
|
|
self._is_core = is_core
|
|
self.is_platform = is_platform
|
|
self.base_components_path = base_components_path
|
|
|
|
@property
|
|
def is_platform_component(self):
|
|
return getattr(self.module, 'IS_PLATFORM_COMPONENT', False)
|
|
|
|
@property
|
|
def config_schema(self):
|
|
return getattr(self.module, 'CONFIG_SCHEMA', None)
|
|
|
|
@property
|
|
def is_multi_conf(self):
|
|
return getattr(self.module, 'MULTI_CONF', False)
|
|
|
|
@property
|
|
def to_code(self):
|
|
return getattr(self.module, 'to_code', None)
|
|
|
|
@property
|
|
def esp_platforms(self):
|
|
return getattr(self.module, 'ESP_PLATFORMS', ESP_PLATFORMS)
|
|
|
|
@property
|
|
def dependencies(self):
|
|
return getattr(self.module, 'DEPENDENCIES', [])
|
|
|
|
@property
|
|
def conflicts_with(self):
|
|
return getattr(self.module, 'CONFLICTS_WITH', [])
|
|
|
|
@property
|
|
def auto_load(self):
|
|
return getattr(self.module, 'AUTO_LOAD', [])
|
|
|
|
@property
|
|
def codeowners(self) -> List[str]:
|
|
return getattr(self.module, 'CODEOWNERS', [])
|
|
|
|
def _get_flags_set(self, name, config):
|
|
if not hasattr(self.module, name):
|
|
return set()
|
|
obj = getattr(self.module, name)
|
|
if callable(obj):
|
|
obj = obj(config)
|
|
if obj is None:
|
|
return set()
|
|
if not isinstance(obj, (list, tuple, set)):
|
|
obj = [obj]
|
|
return set(obj)
|
|
|
|
@property
|
|
def source_files(self):
|
|
if self._is_core:
|
|
core_p = os.path.abspath(os.path.join(os.path.dirname(__file__), 'core'))
|
|
source_files = core.find_source_files(os.path.join(core_p, 'dummy'))
|
|
ret = {}
|
|
for f in source_files:
|
|
ret[f'esphome/core/{f}'] = os.path.join(core_p, f)
|
|
return ret
|
|
|
|
source_files = core.find_source_files(self.module.__file__)
|
|
ret = {}
|
|
# Make paths absolute
|
|
directory = os.path.abspath(os.path.dirname(self.module.__file__))
|
|
for x in source_files:
|
|
full_file = os.path.join(directory, x)
|
|
rel = os.path.relpath(full_file, self.base_components_path)
|
|
# Always use / for C++ include names
|
|
rel = rel.replace(os.sep, '/')
|
|
target_file = f'esphome/components/{rel}'
|
|
ret[target_file] = full_file
|
|
return ret
|
|
|
|
|
|
CORE_COMPONENTS_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), 'components'))
|
|
_UNDEF = object()
|
|
CUSTOM_COMPONENTS_PATH = _UNDEF
|
|
|
|
|
|
def _mount_config_dir():
|
|
global CUSTOM_COMPONENTS_PATH
|
|
if CUSTOM_COMPONENTS_PATH is not _UNDEF:
|
|
return
|
|
custom_path = os.path.abspath(os.path.join(CORE.config_dir, 'custom_components'))
|
|
if not os.path.isdir(custom_path):
|
|
CUSTOM_COMPONENTS_PATH = None
|
|
return
|
|
if CORE.config_dir not in sys.path:
|
|
sys.path.insert(0, CORE.config_dir)
|
|
CUSTOM_COMPONENTS_PATH = custom_path
|
|
|
|
|
|
def _lookup_module(domain, is_platform):
|
|
if domain in _COMPONENT_CACHE:
|
|
return _COMPONENT_CACHE[domain]
|
|
|
|
_mount_config_dir()
|
|
# First look for custom_components
|
|
try:
|
|
module = importlib.import_module(f'custom_components.{domain}')
|
|
except ImportError as e:
|
|
# ImportError when no such module
|
|
if 'No module named' not in str(e):
|
|
_LOGGER.warning("Unable to import custom component %s:", domain, exc_info=True)
|
|
except Exception: # pylint: disable=broad-except
|
|
# Other error means component has an issue
|
|
_LOGGER.error("Unable to load custom component %s:", domain, exc_info=True)
|
|
return None
|
|
else:
|
|
# Found in custom components
|
|
manif = ComponentManifest(module, CUSTOM_COMPONENTS_PATH, is_platform=is_platform)
|
|
_COMPONENT_CACHE[domain] = manif
|
|
return manif
|
|
|
|
try:
|
|
module = importlib.import_module(f'esphome.components.{domain}')
|
|
except ImportError as e:
|
|
if 'No module named' not in str(e):
|
|
_LOGGER.error("Unable to import component %s:", domain, exc_info=True)
|
|
return None
|
|
except Exception: # pylint: disable=broad-except
|
|
_LOGGER.error("Unable to load component %s:", domain, exc_info=True)
|
|
return None
|
|
else:
|
|
manif = ComponentManifest(module, CORE_COMPONENTS_PATH, is_platform=is_platform)
|
|
_COMPONENT_CACHE[domain] = manif
|
|
return manif
|
|
|
|
|
|
def get_component(domain):
|
|
assert '.' not in domain
|
|
return _lookup_module(domain, False)
|
|
|
|
|
|
def get_platform(domain, platform):
|
|
full = f'{platform}.{domain}'
|
|
return _lookup_module(full, True)
|
|
|
|
|
|
_COMPONENT_CACHE['esphome'] = ComponentManifest(
|
|
core_config, CORE_COMPONENTS_PATH, is_core=True, is_platform=False,
|
|
)
|
|
|
|
|
|
def iter_components(config):
|
|
for domain, conf in config.items():
|
|
component = get_component(domain)
|
|
if component.is_multi_conf:
|
|
for conf_ in conf:
|
|
yield domain, component, conf_
|
|
else:
|
|
yield domain, component, conf
|
|
if component.is_platform_component:
|
|
for p_config in conf:
|
|
p_name = "{}.{}".format(domain, p_config[CONF_PLATFORM])
|
|
platform = get_platform(domain, p_config[CONF_PLATFORM])
|
|
yield p_name, platform, p_config
|
|
|
|
|
|
ConfigPath = List[Union[str, int]]
|
|
|
|
|
|
def _path_begins_with(path, other): # type: (ConfigPath, ConfigPath) -> bool
|
|
if len(path) < len(other):
|
|
return False
|
|
return path[:len(other)] == other
|
|
|
|
|
|
class Config(OrderedDict):
|
|
def __init__(self):
|
|
super().__init__()
|
|
# A list of voluptuous errors
|
|
self.errors = [] # type: List[vol.Invalid]
|
|
# A list of paths that should be fully outputted
|
|
# The values will be the paths to all "domain", for example (['logger'], 'logger')
|
|
# or (['sensor', 'ultrasonic'], 'sensor.ultrasonic')
|
|
self.output_paths = [] # type: List[Tuple[ConfigPath, str]]
|
|
|
|
def add_error(self, error):
|
|
# type: (vol.Invalid) -> None
|
|
if isinstance(error, vol.MultipleInvalid):
|
|
for err in error.errors:
|
|
self.add_error(err)
|
|
return
|
|
self.errors.append(error)
|
|
|
|
@contextmanager
|
|
def catch_error(self, path=None):
|
|
path = path or []
|
|
try:
|
|
yield
|
|
except vol.Invalid as e:
|
|
e.prepend(path)
|
|
self.add_error(e)
|
|
|
|
def add_str_error(self, message, path):
|
|
# type: (str, ConfigPath) -> None
|
|
self.add_error(vol.Invalid(message, path))
|
|
|
|
def add_output_path(self, path, domain):
|
|
# type: (ConfigPath, str) -> None
|
|
self.output_paths.append((path, domain))
|
|
|
|
def remove_output_path(self, path, domain):
|
|
# type: (ConfigPath, str) -> None
|
|
self.output_paths.remove((path, domain))
|
|
|
|
def is_in_error_path(self, path):
|
|
# type: (ConfigPath) -> bool
|
|
for err in self.errors:
|
|
if _path_begins_with(err.path, path):
|
|
return True
|
|
return False
|
|
|
|
def set_by_path(self, path, value):
|
|
conf = self
|
|
for key in path[:-1]:
|
|
conf = conf[key]
|
|
conf[path[-1]] = value
|
|
|
|
def get_error_for_path(self, path):
|
|
# type: (ConfigPath) -> Optional[vol.Invalid]
|
|
for err in self.errors:
|
|
if self.get_deepest_path(err.path) == path:
|
|
return err
|
|
return None
|
|
|
|
def get_deepest_document_range_for_path(self, path):
|
|
# type: (ConfigPath) -> Optional[ESPHomeDataBase]
|
|
data = self
|
|
doc_range = None
|
|
for item_index in path:
|
|
try:
|
|
data = data[item_index]
|
|
except (KeyError, IndexError, TypeError):
|
|
return doc_range
|
|
if isinstance(data, ESPHomeDataBase) and data.esp_range is not None:
|
|
doc_range = data.esp_range
|
|
|
|
return doc_range
|
|
|
|
def get_nested_item(self, path):
|
|
# type: (ConfigPath) -> ConfigType
|
|
data = self
|
|
for item_index in path:
|
|
try:
|
|
data = data[item_index]
|
|
except (KeyError, IndexError, TypeError):
|
|
return {}
|
|
return data
|
|
|
|
def get_deepest_path(self, path):
|
|
# type: (ConfigPath) -> ConfigPath
|
|
"""Return the path that is the deepest reachable by following path."""
|
|
data = self
|
|
part = []
|
|
for item_index in path:
|
|
try:
|
|
data = data[item_index]
|
|
except (KeyError, IndexError, TypeError):
|
|
return part
|
|
part.append(item_index)
|
|
return part
|
|
|
|
|
|
def iter_ids(config, path=None):
|
|
path = path or []
|
|
if isinstance(config, core.ID):
|
|
yield config, path
|
|
elif isinstance(config, core.Lambda):
|
|
for id in config.requires_ids:
|
|
yield id, path
|
|
elif isinstance(config, list):
|
|
for i, item in enumerate(config):
|
|
yield from iter_ids(item, path + [i])
|
|
elif isinstance(config, dict):
|
|
for key, value in config.items():
|
|
yield from iter_ids(value, path + [key])
|
|
|
|
|
|
def do_id_pass(result): # type: (Config) -> None
|
|
from esphome.cpp_generator import MockObjClass
|
|
from esphome.cpp_types import Component
|
|
|
|
declare_ids = [] # type: List[Tuple[core.ID, ConfigPath]]
|
|
searching_ids = [] # type: List[Tuple[core.ID, ConfigPath]]
|
|
for id, path in iter_ids(result):
|
|
if id.is_declaration:
|
|
if id.id is not None:
|
|
# Look for duplicate definitions
|
|
match = next((v for v in declare_ids if v[0].id == id.id), None)
|
|
if match is not None:
|
|
opath = '->'.join(str(v) for v in match[1])
|
|
result.add_str_error(f"ID {id.id} redefined! Check {opath}", path)
|
|
continue
|
|
declare_ids.append((id, path))
|
|
else:
|
|
searching_ids.append((id, path))
|
|
# Resolve default ids after manual IDs
|
|
for id, _ in declare_ids:
|
|
id.resolve([v[0].id for v in declare_ids])
|
|
if isinstance(id.type, MockObjClass) and id.type.inherits_from(Component):
|
|
CORE.component_ids.add(id.id)
|
|
|
|
# Check searched IDs
|
|
for id, path in searching_ids:
|
|
if id.id is not None:
|
|
# manually declared
|
|
match = next((v[0] for v in declare_ids if v[0].id == id.id), None)
|
|
if match is None:
|
|
# No declared ID with this name
|
|
import difflib
|
|
error = ("Couldn't find ID '{}'. Please check you have defined "
|
|
"an ID with that name in your configuration.".format(id.id))
|
|
# Find candidates
|
|
matches = difflib.get_close_matches(id.id, [v[0].id for v in declare_ids])
|
|
if matches:
|
|
matches_s = ', '.join(f'"{x}"' for x in matches)
|
|
error += f" These IDs look similar: {matches_s}."
|
|
result.add_str_error(error, path)
|
|
continue
|
|
if not isinstance(match.type, MockObjClass) or not isinstance(id.type, MockObjClass):
|
|
continue
|
|
if not match.type.inherits_from(id.type):
|
|
result.add_str_error("ID '{}' of type {} doesn't inherit from {}. Please "
|
|
"double check your ID is pointing to the correct value"
|
|
"".format(id.id, match.type, id.type), path)
|
|
|
|
if id.id is None and id.type is not None:
|
|
for v in declare_ids:
|
|
if v[0] is None or not isinstance(v[0].type, MockObjClass):
|
|
continue
|
|
inherits = v[0].type.inherits_from(id.type)
|
|
if inherits:
|
|
id.id = v[0].id
|
|
break
|
|
else:
|
|
result.add_str_error(f"Couldn't resolve ID for type '{id.type}'", path)
|
|
|
|
|
|
def recursive_check_replaceme(value):
|
|
import esphome.config_validation as cv
|
|
|
|
if isinstance(value, list):
|
|
return cv.Schema([recursive_check_replaceme])(value)
|
|
if isinstance(value, dict):
|
|
return cv.Schema({cv.valid: recursive_check_replaceme})(value)
|
|
if isinstance(value, ESPForceValue):
|
|
pass
|
|
if isinstance(value, str) and value == 'REPLACEME':
|
|
raise cv.Invalid("Found 'REPLACEME' in configuration, this is most likely an error. "
|
|
"Please make sure you have replaced all fields from the sample "
|
|
"configuration.\n"
|
|
"If you want to use the literal REPLACEME string, "
|
|
"please use \"!force REPLACEME\"")
|
|
return value
|
|
|
|
|
|
def validate_config(config, command_line_substitutions):
|
|
result = Config()
|
|
|
|
# 0. Load packages
|
|
if CONF_PACKAGES in config:
|
|
from esphome.components.packages import do_packages_pass
|
|
result.add_output_path([CONF_PACKAGES], CONF_PACKAGES)
|
|
try:
|
|
config = do_packages_pass(config)
|
|
except vol.Invalid as err:
|
|
result.update(config)
|
|
result.add_error(err)
|
|
return result
|
|
|
|
# 1. Load substitutions
|
|
if CONF_SUBSTITUTIONS in config:
|
|
from esphome.components import substitutions
|
|
result[CONF_SUBSTITUTIONS] = {**config[CONF_SUBSTITUTIONS], **command_line_substitutions}
|
|
result.add_output_path([CONF_SUBSTITUTIONS], CONF_SUBSTITUTIONS)
|
|
try:
|
|
substitutions.do_substitution_pass(config, command_line_substitutions)
|
|
except vol.Invalid as err:
|
|
result.add_error(err)
|
|
return result
|
|
|
|
# 1.1. Check for REPLACEME special value
|
|
try:
|
|
recursive_check_replaceme(config)
|
|
except vol.Invalid as err:
|
|
result.add_error(err)
|
|
|
|
if 'esphomeyaml' in config:
|
|
_LOGGER.warning("The esphomeyaml section has been renamed to esphome in 1.11.0. "
|
|
"Please replace 'esphomeyaml:' in your configuration with 'esphome:'.")
|
|
config[CONF_ESPHOME] = config.pop('esphomeyaml')
|
|
|
|
if CONF_ESPHOME not in config:
|
|
result.add_str_error("'esphome' section missing from configuration. Please make sure "
|
|
"your configuration has an 'esphome:' line in it.", [])
|
|
return result
|
|
|
|
# 2. Load partial core config
|
|
result[CONF_ESPHOME] = config[CONF_ESPHOME]
|
|
result.add_output_path([CONF_ESPHOME], CONF_ESPHOME)
|
|
try:
|
|
core_config.preload_core_config(config)
|
|
except vol.Invalid as err:
|
|
result.add_error(err)
|
|
return result
|
|
# Remove temporary esphome config path again, it will be reloaded later
|
|
result.remove_output_path([CONF_ESPHOME], CONF_ESPHOME)
|
|
|
|
# 3. Load components.
|
|
# Load components (also AUTO_LOAD) and set output paths of result
|
|
# Queue of items to load, FIFO
|
|
load_queue = collections.deque()
|
|
for domain, conf in config.items():
|
|
load_queue.append((domain, conf))
|
|
|
|
# List of items to enter next stage
|
|
check_queue = [] # type: List[Tuple[ConfigPath, str, ConfigType, ComponentManifest]]
|
|
|
|
# This step handles:
|
|
# - Adding output path
|
|
# - Auto Load
|
|
# - Loading configs into result
|
|
|
|
while load_queue:
|
|
domain, conf = load_queue.popleft()
|
|
domain = str(domain)
|
|
if domain.startswith('.'):
|
|
# Ignore top-level keys starting with a dot
|
|
continue
|
|
result.add_output_path([domain], domain)
|
|
result[domain] = conf
|
|
component = get_component(domain)
|
|
path = [domain]
|
|
if component is None:
|
|
result.add_str_error(f"Component not found: {domain}", path)
|
|
continue
|
|
CORE.loaded_integrations.add(domain)
|
|
|
|
# Process AUTO_LOAD
|
|
for load in component.auto_load:
|
|
if load not in config:
|
|
load_conf = core.AutoLoad()
|
|
config[load] = load_conf
|
|
load_queue.append((load, load_conf))
|
|
|
|
if not component.is_platform_component:
|
|
check_queue.append(([domain], domain, conf, component))
|
|
continue
|
|
|
|
# This is a platform component, proceed to reading platform entries
|
|
# Remove this is as an output path
|
|
result.remove_output_path([domain], domain)
|
|
|
|
# Ensure conf is a list
|
|
if not conf:
|
|
result[domain] = conf = []
|
|
elif not isinstance(conf, list):
|
|
result[domain] = conf = [conf]
|
|
|
|
for i, p_config in enumerate(conf):
|
|
path = [domain, i]
|
|
# Construct temporary unknown output path
|
|
p_domain = f'{domain}.unknown'
|
|
result.add_output_path(path, p_domain)
|
|
result[domain][i] = p_config
|
|
if not isinstance(p_config, dict):
|
|
result.add_str_error("Platform schemas must be key-value pairs.", path)
|
|
continue
|
|
p_name = p_config.get('platform')
|
|
if p_name is None:
|
|
result.add_str_error("No platform specified! See 'platform' key.", path)
|
|
continue
|
|
# Remove temp output path and construct new one
|
|
result.remove_output_path(path, p_domain)
|
|
p_domain = f'{domain}.{p_name}'
|
|
result.add_output_path(path, p_domain)
|
|
# Try Load platform
|
|
platform = get_platform(domain, p_name)
|
|
if platform is None:
|
|
result.add_str_error(f"Platform not found: '{p_domain}'", path)
|
|
continue
|
|
CORE.loaded_integrations.add(p_name)
|
|
|
|
# Process AUTO_LOAD
|
|
for load in platform.auto_load:
|
|
if load not in config:
|
|
load_conf = core.AutoLoad()
|
|
config[load] = load_conf
|
|
load_queue.append((load, load_conf))
|
|
|
|
check_queue.append((path, p_domain, p_config, platform))
|
|
|
|
# 4. Validate component metadata, including
|
|
# - Transformation (nullable, multi conf)
|
|
# - Dependencies
|
|
# - Conflicts
|
|
# - Supported ESP Platform
|
|
|
|
# List of items to proceed to next stage
|
|
validate_queue = [] # type: List[Tuple[ConfigPath, ConfigType, ComponentManifest]]
|
|
for path, domain, conf, comp in check_queue:
|
|
if conf is None:
|
|
result[domain] = conf = {}
|
|
|
|
success = True
|
|
for dependency in comp.dependencies:
|
|
if dependency not in config:
|
|
result.add_str_error("Component {} requires component {}"
|
|
"".format(domain, dependency), path)
|
|
success = False
|
|
if not success:
|
|
continue
|
|
|
|
success = True
|
|
for conflict in comp.conflicts_with:
|
|
if conflict in config:
|
|
result.add_str_error("Component {} cannot be used together with component {}"
|
|
"".format(domain, conflict), path)
|
|
success = False
|
|
if not success:
|
|
continue
|
|
|
|
if CORE.esp_platform not in comp.esp_platforms:
|
|
result.add_str_error("Component {} doesn't support {}.".format(domain,
|
|
CORE.esp_platform),
|
|
path)
|
|
continue
|
|
|
|
if not comp.is_platform_component and comp.config_schema is None and \
|
|
not isinstance(conf, core.AutoLoad):
|
|
result.add_str_error("Component {} cannot be loaded via YAML "
|
|
"(no CONFIG_SCHEMA).".format(domain), path)
|
|
continue
|
|
|
|
if comp.is_multi_conf:
|
|
if not isinstance(conf, list):
|
|
result[domain] = conf = [conf]
|
|
for i, part_conf in enumerate(conf):
|
|
validate_queue.append((path + [i], part_conf, comp))
|
|
continue
|
|
|
|
validate_queue.append((path, conf, comp))
|
|
|
|
# 5. Validate configuration schema
|
|
for path, conf, comp in validate_queue:
|
|
if comp.config_schema is None:
|
|
continue
|
|
with result.catch_error(path):
|
|
if comp.is_platform:
|
|
# Remove 'platform' key for validation
|
|
input_conf = OrderedDict(conf)
|
|
platform_val = input_conf.pop('platform')
|
|
validated = comp.config_schema(input_conf)
|
|
# Ensure result is OrderedDict so we can call move_to_end
|
|
if not isinstance(validated, OrderedDict):
|
|
validated = OrderedDict(validated)
|
|
validated['platform'] = platform_val
|
|
validated.move_to_end('platform', last=False)
|
|
result.set_by_path(path, validated)
|
|
else:
|
|
validated = comp.config_schema(conf)
|
|
result.set_by_path(path, validated)
|
|
|
|
# 6. If no validation errors, check IDs
|
|
if not result.errors:
|
|
# Only parse IDs if no validation error. Otherwise
|
|
# user gets confusing messages
|
|
do_id_pass(result)
|
|
return result
|
|
|
|
|
|
def _nested_getitem(data, path):
|
|
for item_index in path:
|
|
try:
|
|
data = data[item_index]
|
|
except (KeyError, IndexError, TypeError):
|
|
return None
|
|
return data
|
|
|
|
|
|
def humanize_error(config, validation_error):
|
|
validation_error = str(validation_error)
|
|
m = re.match(r'^(.*?)\s*(?:for dictionary value )?@ data\[.*$', validation_error, re.DOTALL)
|
|
if m is not None:
|
|
validation_error = m.group(1)
|
|
validation_error = validation_error.strip()
|
|
if not validation_error.endswith('.'):
|
|
validation_error += '.'
|
|
return validation_error
|
|
|
|
|
|
def _get_parent_name(path, config):
|
|
if not path:
|
|
return '<root>'
|
|
for domain_path, domain in config.output_paths:
|
|
if _path_begins_with(path, domain_path):
|
|
if len(path) > len(domain_path):
|
|
# Sub-item
|
|
break
|
|
return domain
|
|
return path[-1]
|
|
|
|
|
|
def _format_vol_invalid(ex, config):
|
|
# type: (vol.Invalid, Config) -> str
|
|
message = ''
|
|
|
|
paren = _get_parent_name(ex.path[:-1], config)
|
|
|
|
if isinstance(ex, ExtraKeysInvalid):
|
|
if ex.candidates:
|
|
message += '[{}] is an invalid option for [{}]. Did you mean {}?'.format(
|
|
ex.path[-1], paren, ', '.join(f'[{x}]' for x in ex.candidates))
|
|
else:
|
|
message += '[{}] is an invalid option for [{}]. Please check the indentation.'.format(
|
|
ex.path[-1], paren)
|
|
elif 'extra keys not allowed' in str(ex):
|
|
message += '[{}] is an invalid option for [{}].'.format(ex.path[-1], paren)
|
|
elif 'required key not provided' in str(ex):
|
|
message += "'{}' is a required option for [{}].".format(ex.path[-1], paren)
|
|
else:
|
|
message += humanize_error(config, ex)
|
|
|
|
return message
|
|
|
|
|
|
class InvalidYAMLError(EsphomeError):
|
|
def __init__(self, base_exc):
|
|
try:
|
|
base = str(base_exc)
|
|
except UnicodeDecodeError:
|
|
base = repr(base_exc)
|
|
message = f"Invalid YAML syntax:\n\n{base}"
|
|
super().__init__(message)
|
|
self.base_exc = base_exc
|
|
|
|
|
|
def _load_config(command_line_substitutions):
|
|
try:
|
|
config = yaml_util.load_yaml(CORE.config_path)
|
|
except EsphomeError as e:
|
|
raise InvalidYAMLError(e)
|
|
CORE.raw_config = config
|
|
|
|
try:
|
|
result = validate_config(config, command_line_substitutions)
|
|
except EsphomeError:
|
|
raise
|
|
except Exception:
|
|
_LOGGER.error("Unexpected exception while reading configuration:")
|
|
raise
|
|
|
|
return result
|
|
|
|
|
|
def load_config(command_line_substitutions):
|
|
try:
|
|
return _load_config(command_line_substitutions)
|
|
except vol.Invalid as err:
|
|
raise EsphomeError(f"Error while parsing config: {err}")
|
|
|
|
|
|
def line_info(obj, highlight=True):
|
|
"""Display line config source."""
|
|
if not highlight:
|
|
return None
|
|
if isinstance(obj, ESPHomeDataBase) and obj.esp_range is not None:
|
|
mark = obj.esp_range.start_mark
|
|
source = "[source {}:{}]".format(mark.document, mark.line + 1)
|
|
return color('cyan', source)
|
|
return None
|
|
|
|
|
|
def _print_on_next_line(obj):
|
|
if isinstance(obj, (list, tuple, dict)):
|
|
return True
|
|
if isinstance(obj, str):
|
|
return len(obj) > 80
|
|
if isinstance(obj, core.Lambda):
|
|
return len(obj.value) > 80
|
|
return False
|
|
|
|
|
|
def dump_dict(config, path, at_root=True):
|
|
# type: (Config, ConfigPath, bool) -> Tuple[str, bool]
|
|
conf = config.get_nested_item(path)
|
|
ret = ''
|
|
multiline = False
|
|
|
|
if at_root:
|
|
error = config.get_error_for_path(path)
|
|
if error is not None:
|
|
ret += '\n' + color('bold_red', _format_vol_invalid(error, config)) + '\n'
|
|
|
|
if isinstance(conf, (list, tuple)):
|
|
multiline = True
|
|
if not conf:
|
|
ret += '[]'
|
|
multiline = False
|
|
|
|
for i in range(len(conf)):
|
|
path_ = path + [i]
|
|
error = config.get_error_for_path(path_)
|
|
if error is not None:
|
|
ret += '\n' + color('bold_red', _format_vol_invalid(error, config)) + '\n'
|
|
|
|
sep = '- '
|
|
if config.is_in_error_path(path_):
|
|
sep = color('red', sep)
|
|
msg, _ = dump_dict(config, path_, at_root=False)
|
|
msg = indent(msg)
|
|
inf = line_info(config.get_nested_item(path_), highlight=config.is_in_error_path(path_))
|
|
if inf is not None:
|
|
msg = inf + '\n' + msg
|
|
elif msg:
|
|
msg = msg[2:]
|
|
ret += sep + msg + '\n'
|
|
elif isinstance(conf, dict):
|
|
multiline = True
|
|
if not conf:
|
|
ret += '{}'
|
|
multiline = False
|
|
|
|
for k in conf.keys():
|
|
path_ = path + [k]
|
|
error = config.get_error_for_path(path_)
|
|
if error is not None:
|
|
ret += '\n' + color('bold_red', _format_vol_invalid(error, config)) + '\n'
|
|
|
|
st = f'{k}: '
|
|
if config.is_in_error_path(path_):
|
|
st = color('red', st)
|
|
msg, m = dump_dict(config, path_, at_root=False)
|
|
|
|
inf = line_info(config.get_nested_item(path_), highlight=config.is_in_error_path(path_))
|
|
if m:
|
|
msg = '\n' + indent(msg)
|
|
|
|
if inf is not None:
|
|
if m:
|
|
msg = ' ' + inf + msg
|
|
else:
|
|
msg = msg + ' ' + inf
|
|
ret += st + msg + '\n'
|
|
elif isinstance(conf, str):
|
|
if is_secret(conf):
|
|
conf = '!secret {}'.format(is_secret(conf))
|
|
if not conf:
|
|
conf += "''"
|
|
|
|
if len(conf) > 80:
|
|
conf = '|-\n' + indent(conf)
|
|
error = config.get_error_for_path(path)
|
|
col = 'bold_red' if error else 'white'
|
|
ret += color(col, str(conf))
|
|
elif isinstance(conf, core.Lambda):
|
|
if is_secret(conf):
|
|
conf = '!secret {}'.format(is_secret(conf))
|
|
|
|
conf = '!lambda |-\n' + indent(str(conf.value))
|
|
error = config.get_error_for_path(path)
|
|
col = 'bold_red' if error else 'white'
|
|
ret += color(col, conf)
|
|
elif conf is None:
|
|
pass
|
|
else:
|
|
error = config.get_error_for_path(path)
|
|
col = 'bold_red' if error else 'white'
|
|
ret += color(col, str(conf))
|
|
multiline = '\n' in ret
|
|
|
|
return ret, multiline
|
|
|
|
|
|
def strip_default_ids(config):
|
|
if isinstance(config, list):
|
|
to_remove = []
|
|
for i, x in enumerate(config):
|
|
x = config[i] = strip_default_ids(x)
|
|
if (isinstance(x, core.ID) and not x.is_manual) or isinstance(x, core.AutoLoad):
|
|
to_remove.append(x)
|
|
for x in to_remove:
|
|
config.remove(x)
|
|
elif isinstance(config, dict):
|
|
to_remove = []
|
|
for k, v in config.items():
|
|
v = config[k] = strip_default_ids(v)
|
|
if (isinstance(v, core.ID) and not v.is_manual) or isinstance(v, core.AutoLoad):
|
|
to_remove.append(k)
|
|
for k in to_remove:
|
|
config.pop(k)
|
|
return config
|
|
|
|
|
|
def read_config(command_line_substitutions):
|
|
_LOGGER.info("Reading configuration %s...", CORE.config_path)
|
|
try:
|
|
res = load_config(command_line_substitutions)
|
|
except EsphomeError as err:
|
|
_LOGGER.error("Error while reading config: %s", err)
|
|
return None
|
|
if res.errors:
|
|
if not CORE.verbose:
|
|
res = strip_default_ids(res)
|
|
|
|
safe_print(color('bold_red', "Failed config"))
|
|
safe_print('')
|
|
for path, domain in res.output_paths:
|
|
if not res.is_in_error_path(path):
|
|
continue
|
|
|
|
safe_print(color('bold_red', f'{domain}:') + ' ' +
|
|
(line_info(res.get_nested_item(path)) or ''))
|
|
safe_print(indent(dump_dict(res, path)[0]))
|
|
return None
|
|
return OrderedDict(res)
|