mirror of
https://github.com/esphome/esphome.git
synced 2025-01-04 18:47:43 +01:00
64bd33a94e
Having different output each time due to random key order makes some tasks harder (such as CI scripts).
423 lines
16 KiB
Python
423 lines
16 KiB
Python
import fnmatch
|
|
import functools
|
|
import inspect
|
|
import logging
|
|
import math
|
|
import os
|
|
|
|
import uuid
|
|
import yaml
|
|
import yaml.constructor
|
|
|
|
from esphome import core
|
|
from esphome.config_helpers import read_config_file
|
|
from esphome.core import EsphomeError, IPAddress, Lambda, MACAddress, TimePeriod, DocumentRange
|
|
from esphome.helpers import add_class_to_obj
|
|
from esphome.util import OrderedDict, filter_yaml_files
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
# Mostly copied from Home Assistant because that code works fine and
|
|
# let's not reinvent the wheel here
|
|
|
|
SECRET_YAML = 'secrets.yaml'
|
|
_SECRET_CACHE = {}
|
|
_SECRET_VALUES = {}
|
|
|
|
|
|
class ESPHomeDataBase:
|
|
@property
|
|
def esp_range(self):
|
|
return getattr(self, '_esp_range', None)
|
|
|
|
def from_node(self, node):
|
|
# pylint: disable=attribute-defined-outside-init
|
|
self._esp_range = DocumentRange.from_marks(node.start_mark, node.end_mark)
|
|
|
|
|
|
class ESPForceValue:
|
|
pass
|
|
|
|
|
|
def make_data_base(value):
|
|
try:
|
|
return add_class_to_obj(value, ESPHomeDataBase)
|
|
except TypeError:
|
|
# Adding class failed, ignore error
|
|
return value
|
|
|
|
|
|
def _add_data_ref(fn):
|
|
@functools.wraps(fn)
|
|
def wrapped(loader, node):
|
|
res = fn(loader, node)
|
|
# newer PyYAML versions use generators, resolve them
|
|
if inspect.isgenerator(res):
|
|
generator = res
|
|
res = next(generator)
|
|
# Let generator finish
|
|
for _ in generator:
|
|
pass
|
|
res = make_data_base(res)
|
|
if isinstance(res, ESPHomeDataBase):
|
|
res.from_node(node)
|
|
return res
|
|
|
|
return wrapped
|
|
|
|
|
|
class ESPHomeLoader(yaml.SafeLoader): # pylint: disable=too-many-ancestors
|
|
"""Loader class that keeps track of line numbers."""
|
|
|
|
@_add_data_ref
|
|
def construct_yaml_int(self, node):
|
|
return super().construct_yaml_int(node)
|
|
|
|
@_add_data_ref
|
|
def construct_yaml_float(self, node):
|
|
return super().construct_yaml_float(node)
|
|
|
|
@_add_data_ref
|
|
def construct_yaml_binary(self, node):
|
|
return super().construct_yaml_binary(node)
|
|
|
|
@_add_data_ref
|
|
def construct_yaml_omap(self, node):
|
|
return super().construct_yaml_omap(node)
|
|
|
|
@_add_data_ref
|
|
def construct_yaml_str(self, node):
|
|
return super().construct_yaml_str(node)
|
|
|
|
@_add_data_ref
|
|
def construct_yaml_seq(self, node):
|
|
return super().construct_yaml_seq(node)
|
|
|
|
@_add_data_ref
|
|
def construct_yaml_map(self, node):
|
|
"""Traverses the given mapping node and returns a list of constructed key-value pairs."""
|
|
assert isinstance(node, yaml.MappingNode)
|
|
# A list of key-value pairs we find in the current mapping
|
|
pairs = []
|
|
# A list of key-value pairs we find while resolving merges ('<<' key), will be
|
|
# added to pairs in a second pass
|
|
merge_pairs = []
|
|
# A dict of seen keys so far, used to alert the user of duplicate keys and checking
|
|
# which keys to merge.
|
|
# Value of dict items is the start mark of the previous declaration.
|
|
seen_keys = {}
|
|
|
|
for key_node, value_node in node.value:
|
|
# merge key is '<<'
|
|
is_merge_key = key_node.tag == 'tag:yaml.org,2002:merge'
|
|
# key has no explicit tag set
|
|
is_default_tag = key_node.tag == 'tag:yaml.org,2002:value'
|
|
|
|
if is_default_tag:
|
|
# Default tag for mapping keys is string
|
|
key_node.tag = 'tag:yaml.org,2002:str'
|
|
|
|
if not is_merge_key:
|
|
# base case, this is a simple key-value pair
|
|
key = self.construct_object(key_node)
|
|
value = self.construct_object(value_node)
|
|
|
|
# Check if key is hashable
|
|
try:
|
|
hash(key)
|
|
except TypeError:
|
|
raise yaml.constructor.ConstructorError(
|
|
f'Invalid key "{key}" (not hashable)', key_node.start_mark)
|
|
|
|
# Check if it is a duplicate key
|
|
if key in seen_keys:
|
|
raise yaml.constructor.ConstructorError(
|
|
f'Duplicate key "{key}"', key_node.start_mark,
|
|
'NOTE: Previous declaration here:', seen_keys[key],
|
|
)
|
|
seen_keys[key] = key_node.start_mark
|
|
|
|
# Add to pairs
|
|
pairs.append((key, value))
|
|
continue
|
|
|
|
# This is a merge key, resolve value and add to merge_pairs
|
|
value = self.construct_object(value_node)
|
|
if isinstance(value, dict):
|
|
# base case, copy directly to merge_pairs
|
|
# direct merge, like "<<: {some_key: some_value}"
|
|
merge_pairs.extend(value.items())
|
|
elif isinstance(value, list):
|
|
# sequence merge, like "<<: [{some_key: some_value}, {other_key: some_value}]"
|
|
for item in value:
|
|
if not isinstance(item, dict):
|
|
raise yaml.constructor.ConstructorError(
|
|
"While constructing a mapping", node.start_mark,
|
|
"Expected a mapping for merging, but found {}".format(type(item)),
|
|
value_node.start_mark)
|
|
merge_pairs.extend(item.items())
|
|
else:
|
|
raise yaml.constructor.ConstructorError(
|
|
"While constructing a mapping", node.start_mark,
|
|
"Expected a mapping or list of mappings for merging, "
|
|
"but found {}".format(type(value)), value_node.start_mark)
|
|
|
|
if merge_pairs:
|
|
# We found some merge keys along the way, merge them into base pairs
|
|
# https://yaml.org/type/merge.html
|
|
# Construct a new merge set with values overridden by current mapping or earlier
|
|
# sequence entries removed
|
|
for key, value in merge_pairs:
|
|
if key in seen_keys:
|
|
# key already in the current map or from an earlier merge sequence entry,
|
|
# do not override
|
|
#
|
|
# "... each of its key/value pairs is inserted into the current mapping,
|
|
# unless the key already exists in it."
|
|
#
|
|
# "If the value associated with the merge key is a sequence, then this sequence
|
|
# is expected to contain mapping nodes and each of these nodes is merged in
|
|
# turn according to its order in the sequence. Keys in mapping nodes earlier
|
|
# in the sequence override keys specified in later mapping nodes."
|
|
continue
|
|
pairs.append((key, value))
|
|
# Add key node to seen keys, for sequence merge values.
|
|
seen_keys[key] = None
|
|
|
|
return OrderedDict(pairs)
|
|
|
|
@_add_data_ref
|
|
def construct_env_var(self, node):
|
|
args = node.value.split()
|
|
# Check for a default value
|
|
if len(args) > 1:
|
|
return os.getenv(args[0], ' '.join(args[1:]))
|
|
if args[0] in os.environ:
|
|
return os.environ[args[0]]
|
|
raise yaml.MarkedYAMLError(
|
|
f"Environment variable '{node.value}' not defined", node.start_mark
|
|
)
|
|
|
|
@property
|
|
def _directory(self):
|
|
return os.path.dirname(self.name)
|
|
|
|
def _rel_path(self, *args):
|
|
return os.path.join(self._directory, *args)
|
|
|
|
@_add_data_ref
|
|
def construct_secret(self, node):
|
|
secrets = _load_yaml_internal(self._rel_path(SECRET_YAML))
|
|
if node.value not in secrets:
|
|
raise yaml.MarkedYAMLError(
|
|
f"Secret '{node.value}' not defined", node.start_mark
|
|
)
|
|
val = secrets[node.value]
|
|
_SECRET_VALUES[str(val)] = node.value
|
|
return val
|
|
|
|
@_add_data_ref
|
|
def construct_include(self, node):
|
|
return _load_yaml_internal(self._rel_path(node.value))
|
|
|
|
@_add_data_ref
|
|
def construct_include_dir_list(self, node):
|
|
files = filter_yaml_files(_find_files(self._rel_path(node.value), '*.yaml'))
|
|
return [_load_yaml_internal(f) for f in files]
|
|
|
|
@_add_data_ref
|
|
def construct_include_dir_merge_list(self, node):
|
|
files = filter_yaml_files(_find_files(self._rel_path(node.value), '*.yaml'))
|
|
merged_list = []
|
|
for fname in files:
|
|
loaded_yaml = _load_yaml_internal(fname)
|
|
if isinstance(loaded_yaml, list):
|
|
merged_list.extend(loaded_yaml)
|
|
return merged_list
|
|
|
|
@_add_data_ref
|
|
def construct_include_dir_named(self, node):
|
|
files = filter_yaml_files(_find_files(self._rel_path(node.value), '*.yaml'))
|
|
mapping = OrderedDict()
|
|
for fname in files:
|
|
filename = os.path.splitext(os.path.basename(fname))[0]
|
|
mapping[filename] = _load_yaml_internal(fname)
|
|
return mapping
|
|
|
|
@_add_data_ref
|
|
def construct_include_dir_merge_named(self, node):
|
|
files = filter_yaml_files(_find_files(self._rel_path(node.value), '*.yaml'))
|
|
mapping = OrderedDict()
|
|
for fname in files:
|
|
loaded_yaml = _load_yaml_internal(fname)
|
|
if isinstance(loaded_yaml, dict):
|
|
mapping.update(loaded_yaml)
|
|
return mapping
|
|
|
|
@_add_data_ref
|
|
def construct_lambda(self, node):
|
|
return Lambda(str(node.value))
|
|
|
|
@_add_data_ref
|
|
def construct_force(self, node):
|
|
obj = self.construct_scalar(node)
|
|
return add_class_to_obj(obj, ESPForceValue)
|
|
|
|
|
|
ESPHomeLoader.add_constructor('tag:yaml.org,2002:int', ESPHomeLoader.construct_yaml_int)
|
|
ESPHomeLoader.add_constructor('tag:yaml.org,2002:float', ESPHomeLoader.construct_yaml_float)
|
|
ESPHomeLoader.add_constructor('tag:yaml.org,2002:binary', ESPHomeLoader.construct_yaml_binary)
|
|
ESPHomeLoader.add_constructor('tag:yaml.org,2002:omap', ESPHomeLoader.construct_yaml_omap)
|
|
ESPHomeLoader.add_constructor('tag:yaml.org,2002:str', ESPHomeLoader.construct_yaml_str)
|
|
ESPHomeLoader.add_constructor('tag:yaml.org,2002:seq', ESPHomeLoader.construct_yaml_seq)
|
|
ESPHomeLoader.add_constructor('tag:yaml.org,2002:map', ESPHomeLoader.construct_yaml_map)
|
|
ESPHomeLoader.add_constructor('!env_var', ESPHomeLoader.construct_env_var)
|
|
ESPHomeLoader.add_constructor('!secret', ESPHomeLoader.construct_secret)
|
|
ESPHomeLoader.add_constructor('!include', ESPHomeLoader.construct_include)
|
|
ESPHomeLoader.add_constructor('!include_dir_list', ESPHomeLoader.construct_include_dir_list)
|
|
ESPHomeLoader.add_constructor('!include_dir_merge_list',
|
|
ESPHomeLoader.construct_include_dir_merge_list)
|
|
ESPHomeLoader.add_constructor('!include_dir_named', ESPHomeLoader.construct_include_dir_named)
|
|
ESPHomeLoader.add_constructor('!include_dir_merge_named',
|
|
ESPHomeLoader.construct_include_dir_merge_named)
|
|
ESPHomeLoader.add_constructor('!lambda', ESPHomeLoader.construct_lambda)
|
|
ESPHomeLoader.add_constructor('!force', ESPHomeLoader.construct_force)
|
|
|
|
|
|
def load_yaml(fname):
|
|
_SECRET_VALUES.clear()
|
|
_SECRET_CACHE.clear()
|
|
return _load_yaml_internal(fname)
|
|
|
|
|
|
def _load_yaml_internal(fname):
|
|
content = read_config_file(fname)
|
|
loader = ESPHomeLoader(content)
|
|
loader.name = fname
|
|
try:
|
|
return loader.get_single_data() or OrderedDict()
|
|
except yaml.YAMLError as exc:
|
|
raise EsphomeError(exc)
|
|
finally:
|
|
loader.dispose()
|
|
|
|
|
|
def dump(dict_):
|
|
"""Dump YAML to a string and remove null."""
|
|
return yaml.dump(dict_, default_flow_style=False, allow_unicode=True,
|
|
Dumper=ESPHomeDumper)
|
|
|
|
|
|
def _is_file_valid(name):
|
|
"""Decide if a file is valid."""
|
|
return not name.startswith('.')
|
|
|
|
|
|
def _find_files(directory, pattern):
|
|
"""Recursively load files in a directory."""
|
|
for root, dirs, files in os.walk(directory, topdown=True):
|
|
dirs[:] = [d for d in dirs if _is_file_valid(d)]
|
|
for basename in files:
|
|
if _is_file_valid(basename) and fnmatch.fnmatch(basename, pattern):
|
|
filename = os.path.join(root, basename)
|
|
yield filename
|
|
|
|
|
|
def is_secret(value):
|
|
try:
|
|
return _SECRET_VALUES[str(value)]
|
|
except (KeyError, ValueError):
|
|
return None
|
|
|
|
|
|
class ESPHomeDumper(yaml.SafeDumper): # pylint: disable=too-many-ancestors
|
|
def represent_mapping(self, tag, mapping, flow_style=None):
|
|
value = []
|
|
node = yaml.MappingNode(tag, value, flow_style=flow_style)
|
|
if self.alias_key is not None:
|
|
self.represented_objects[self.alias_key] = node
|
|
best_style = True
|
|
if hasattr(mapping, 'items'):
|
|
mapping = sorted(mapping.items(), key=lambda item: item[0])
|
|
for item_key, item_value in mapping:
|
|
node_key = self.represent_data(item_key)
|
|
node_value = self.represent_data(item_value)
|
|
if not (isinstance(node_key, yaml.ScalarNode) and not node_key.style):
|
|
best_style = False
|
|
if not (isinstance(node_value, yaml.ScalarNode) and not node_value.style):
|
|
best_style = False
|
|
value.append((node_key, node_value))
|
|
if flow_style is None:
|
|
if self.default_flow_style is not None:
|
|
node.flow_style = self.default_flow_style
|
|
else:
|
|
node.flow_style = best_style
|
|
return node
|
|
|
|
def represent_secret(self, value):
|
|
return self.represent_scalar(tag='!secret', value=_SECRET_VALUES[str(value)])
|
|
|
|
def represent_stringify(self, value):
|
|
if is_secret(value):
|
|
return self.represent_secret(value)
|
|
return self.represent_scalar(tag='tag:yaml.org,2002:str', value=str(value))
|
|
|
|
# pylint: disable=arguments-differ
|
|
def represent_bool(self, value):
|
|
return self.represent_scalar('tag:yaml.org,2002:bool', 'true' if value else 'false')
|
|
|
|
def represent_int(self, value):
|
|
if is_secret(value):
|
|
return self.represent_secret(value)
|
|
return self.represent_scalar(tag='tag:yaml.org,2002:int', value=str(value))
|
|
|
|
def represent_float(self, value):
|
|
if is_secret(value):
|
|
return self.represent_secret(value)
|
|
if math.isnan(value):
|
|
value = '.nan'
|
|
elif math.isinf(value):
|
|
value = '.inf' if value > 0 else '-.inf'
|
|
else:
|
|
value = str(repr(value)).lower()
|
|
# Note that in some cases `repr(data)` represents a float number
|
|
# without the decimal parts. For instance:
|
|
# >>> repr(1e17)
|
|
# '1e17'
|
|
# Unfortunately, this is not a valid float representation according
|
|
# to the definition of the `!!float` tag. We fix this by adding
|
|
# '.0' before the 'e' symbol.
|
|
if '.' not in value and 'e' in value:
|
|
value = value.replace('e', '.0e', 1)
|
|
return self.represent_scalar(tag='tag:yaml.org,2002:float', value=value)
|
|
|
|
def represent_lambda(self, value):
|
|
if is_secret(value.value):
|
|
return self.represent_secret(value.value)
|
|
return self.represent_scalar(tag='!lambda', value=value.value, style='|')
|
|
|
|
def represent_id(self, value):
|
|
if is_secret(value.id):
|
|
return self.represent_secret(value.id)
|
|
return self.represent_stringify(value.id)
|
|
|
|
|
|
ESPHomeDumper.add_multi_representer(
|
|
dict,
|
|
lambda dumper, value: dumper.represent_mapping('tag:yaml.org,2002:map', value)
|
|
)
|
|
ESPHomeDumper.add_multi_representer(
|
|
list,
|
|
lambda dumper, value: dumper.represent_sequence('tag:yaml.org,2002:seq', value)
|
|
)
|
|
ESPHomeDumper.add_multi_representer(bool, ESPHomeDumper.represent_bool)
|
|
ESPHomeDumper.add_multi_representer(str, ESPHomeDumper.represent_stringify)
|
|
ESPHomeDumper.add_multi_representer(int, ESPHomeDumper.represent_int)
|
|
ESPHomeDumper.add_multi_representer(float, ESPHomeDumper.represent_float)
|
|
ESPHomeDumper.add_multi_representer(IPAddress, ESPHomeDumper.represent_stringify)
|
|
ESPHomeDumper.add_multi_representer(MACAddress, ESPHomeDumper.represent_stringify)
|
|
ESPHomeDumper.add_multi_representer(TimePeriod, ESPHomeDumper.represent_stringify)
|
|
ESPHomeDumper.add_multi_representer(Lambda, ESPHomeDumper.represent_lambda)
|
|
ESPHomeDumper.add_multi_representer(core.ID, ESPHomeDumper.represent_id)
|
|
ESPHomeDumper.add_multi_representer(uuid.UUID, ESPHomeDumper.represent_stringify)
|