2022-04-03 09:30:22 +02:00
import inspect
import json
import argparse
import os
2022-06-17 03:46:20 +02:00
import glob
import re
2022-04-03 09:30:22 +02:00
import voluptuous as vol
2022-06-17 03:46:20 +02:00
# NOTE: Cannot import other esphome components globally as a modification in vol_schema
2022-04-03 09:30:22 +02:00
# is needed before modules are loaded
2022-06-17 03:46:20 +02:00
import esphome . schema_extractors as ejs
2022-04-03 09:30:22 +02:00
2022-06-17 03:46:20 +02:00
ejs . EnableSchemaExtraction = True
2022-04-03 09:30:22 +02:00
# schema format:
2022-06-27 23:02:46 +02:00
# Schemas are split in several files in json format, one for core stuff, one for each platform (sensor, binary_sensor, etc) and
2022-04-03 09:30:22 +02:00
# one for each component (dallas, sim800l, etc.) component can have schema for root component/hub and also for platform component,
# e.g. dallas has hub component which has pin and then has the sensor platform which has sensor name, index, etc.
# When files are loaded they are merged in a single object.
# The root format is
S_CONFIG_VAR = " config_var "
S_CONFIG_VARS = " config_vars "
S_CONFIG_SCHEMA = " CONFIG_SCHEMA "
S_COMPONENT = " component "
S_COMPONENTS = " components "
S_PLATFORMS = " platforms "
S_SCHEMA = " schema "
S_SCHEMAS = " schemas "
S_EXTENDS = " extends "
S_TYPE = " type "
S_NAME = " name "
parser = argparse . ArgumentParser ( )
parser . add_argument (
" --output-path " , default = " . " , help = " Output path " , type = os . path . abspath
)
args = parser . parse_args ( )
DUMP_RAW = False
DUMP_UNKNOWN = False
DUMP_PATH = False
JSON_DUMP_PRETTY = True
# store here dynamic load of esphome components
components = { }
schema_core = { }
# output is where all is built
output = { " core " : schema_core }
# The full generated output is here here
schema_full = { " components " : output }
# A string, string map, key is the str(schema) and value is
# a tuple, first element is the schema reference and second is the schema path given, the schema reference is needed to test as different schemas have same key
known_schemas = { }
solve_registry = [ ]
def get_component_names ( ) :
from esphome . loader import CORE_COMPONENTS_PATH
component_names = [ " esphome " , " sensor " ]
for d in os . listdir ( CORE_COMPONENTS_PATH ) :
if not d . startswith ( " __ " ) and os . path . isdir (
os . path . join ( CORE_COMPONENTS_PATH , d )
) :
if d not in component_names :
component_names . append ( d )
return component_names
def load_components ( ) :
from esphome . config import get_component
for domain in get_component_names ( ) :
components [ domain ] = get_component ( domain )
load_components ( )
# Import esphome after loading components (so schema is tracked)
# pylint: disable=wrong-import-position
import esphome . core as esphome_core
import esphome . config_validation as cv
from esphome import automation
from esphome import pins
from esphome . components import remote_base
from esphome . const import CONF_TYPE
2022-06-17 03:46:20 +02:00
from esphome . loader import get_platform , CORE_COMPONENTS_PATH
2022-04-03 09:30:22 +02:00
from esphome . helpers import write_file_if_changed
from esphome . util import Registry
# pylint: enable=wrong-import-position
def write_file ( name , obj ) :
full_path = os . path . join ( args . output_path , name + " .json " )
if JSON_DUMP_PRETTY :
json_str = json . dumps ( obj , indent = 2 )
else :
json_str = json . dumps ( obj , separators = ( " , " , " : " ) )
write_file_if_changed ( full_path , json_str )
print ( f " Wrote { full_path } " )
def register_module_schemas ( key , module , manifest = None ) :
for name , schema in module_schemas ( module ) :
register_known_schema ( key , name , schema )
2022-06-17 03:46:20 +02:00
if manifest :
# Multi conf should allow list of components
# not sure about 2nd part of the if, might be useless config (e.g. as3935)
if manifest . multi_conf and S_CONFIG_SCHEMA in output [ key ] [ S_SCHEMAS ] :
output [ key ] [ S_SCHEMAS ] [ S_CONFIG_SCHEMA ] [ " is_list " ] = True
2022-04-03 09:30:22 +02:00
def register_known_schema ( module , name , schema ) :
if module not in output :
output [ module ] = { S_SCHEMAS : { } }
config = convert_config ( schema , f " { module } / { name } " )
if S_TYPE not in config :
print ( f " Config var without type: { module } . { name } " )
output [ module ] [ S_SCHEMAS ] [ name ] = config
repr_schema = repr ( schema )
if repr_schema in known_schemas :
schema_info = known_schemas [ repr_schema ]
schema_info . append ( ( schema , f " { module } . { name } " ) )
else :
known_schemas [ repr_schema ] = [ ( schema , f " { module } . { name } " ) ]
def module_schemas ( module ) :
# This should yield elements in order so extended schemas are resolved properly
# To do this we check on the source code where the symbol is seen first. Seems to work.
try :
module_str = inspect . getsource ( module )
except TypeError :
# improv
module_str = " "
except OSError :
# some empty __init__ files
module_str = " "
schemas = { }
for m_attr_name in dir ( module ) :
m_attr_obj = getattr ( module , m_attr_name )
if isConvertibleSchema ( m_attr_obj ) :
schemas [ module_str . find ( m_attr_name ) ] = [ m_attr_name , m_attr_obj ]
for pos in sorted ( schemas . keys ( ) ) :
yield schemas [ pos ]
found_registries = { }
# Pin validators keys are the functions in pin which validate the pins
pin_validators = { }
def add_pin_validators ( ) :
for m_attr_name in dir ( pins ) :
if " gpio " in m_attr_name :
s = pin_validators [ repr ( getattr ( pins , m_attr_name ) ) ] = { }
if " schema " in m_attr_name :
s [ " schema " ] = True # else is just number
if " internal " in m_attr_name :
s [ " internal " ] = True
if " input " in m_attr_name :
s [ " modes " ] = [ " input " ]
elif " output " in m_attr_name :
s [ " modes " ] = [ " output " ]
else :
s [ " modes " ] = [ ]
if " pullup " in m_attr_name :
s [ " modes " ] . append ( " pullup " )
from esphome . components . adc import sensor as adc_sensor
pin_validators [ repr ( adc_sensor . validate_adc_pin ) ] = {
" internal " : True ,
" modes " : [ " input " ] ,
}
def add_module_registries ( domain , module ) :
for attr_name in dir ( module ) :
attr_obj = getattr ( module , attr_name )
if isinstance ( attr_obj , Registry ) :
if attr_obj == automation . ACTION_REGISTRY :
reg_type = " action "
reg_domain = " core "
found_registries [ repr ( attr_obj ) ] = reg_type
elif attr_obj == automation . CONDITION_REGISTRY :
reg_type = " condition "
reg_domain = " core "
found_registries [ repr ( attr_obj ) ] = reg_type
else : # attr_name == "FILTER_REGISTRY":
reg_domain = domain
reg_type = attr_name . partition ( " _ " ) [ 0 ] . lower ( )
found_registries [ repr ( attr_obj ) ] = f " { domain } . { reg_type } "
for name in attr_obj . keys ( ) :
if " . " not in name :
reg_entry_name = name
else :
parts = name . split ( " . " )
if len ( parts ) == 2 :
reg_domain = parts [ 0 ]
reg_entry_name = parts [ 1 ]
else :
reg_domain = " . " . join ( [ parts [ 1 ] , parts [ 0 ] ] )
reg_entry_name = parts [ 2 ]
if reg_domain not in output :
output [ reg_domain ] = { }
if reg_type not in output [ reg_domain ] :
output [ reg_domain ] [ reg_type ] = { }
output [ reg_domain ] [ reg_type ] [ reg_entry_name ] = convert_config (
attr_obj [ name ] . schema , f " { reg_domain } / { reg_type } / { reg_entry_name } "
)
# print(f"{domain} - {attr_name} - {name}")
def do_pins ( ) :
# do pin registries
pins_providers = schema_core [ " pins " ] = [ ]
for pin_registry in pins . PIN_SCHEMA_REGISTRY :
s = convert_config (
pins . PIN_SCHEMA_REGISTRY [ pin_registry ] [ 1 ] , f " pins/ { pin_registry } "
)
if pin_registry not in output :
output [ pin_registry ] = { } # mcp23xxx does not create a component yet
output [ pin_registry ] [ " pin " ] = s
pins_providers . append ( pin_registry )
def do_esp32 ( ) :
import esphome . components . esp32 . boards as esp32_boards
setEnum (
output [ " esp32 " ] [ " schemas " ] [ " CONFIG_SCHEMA " ] [ " schema " ] [ " config_vars " ] [ " board " ] ,
2022-12-07 04:07:51 +01:00
list ( esp32_boards . BOARDS . keys ( ) ) ,
2022-04-03 09:30:22 +02:00
)
def do_esp8266 ( ) :
import esphome . components . esp8266 . boards as esp8266_boards
setEnum (
output [ " esp8266 " ] [ " schemas " ] [ " CONFIG_SCHEMA " ] [ " schema " ] [ " config_vars " ] [ " board " ] ,
list ( esp8266_boards . ESP8266_BOARD_PINS . keys ( ) ) ,
)
def fix_remote_receiver ( ) :
2022-06-17 03:46:20 +02:00
remote_receiver_schema = output [ " remote_receiver.binary_sensor " ] [ " schemas " ]
remote_receiver_schema [ " CONFIG_SCHEMA " ] = {
2022-04-03 09:30:22 +02:00
" type " : " schema " ,
" schema " : {
" extends " : [ " binary_sensor.BINARY_SENSOR_SCHEMA " , " core.COMPONENT_SCHEMA " ] ,
2022-06-17 03:46:20 +02:00
" config_vars " : output [ " remote_base " ] . pop ( " binary " ) ,
2022-04-03 09:30:22 +02:00
} ,
}
2022-06-17 03:46:20 +02:00
remote_receiver_schema [ " CONFIG_SCHEMA " ] [ " schema " ] [ " config_vars " ] [ " receiver_id " ] = {
" key " : " GeneratedID " ,
" use_id_type " : " remote_base::RemoteReceiverBase " ,
" type " : " use_id " ,
}
def fix_script ( ) :
output [ " script " ] [ S_SCHEMAS ] [ S_CONFIG_SCHEMA ] [ S_TYPE ] = S_SCHEMA
config_schema = output [ " script " ] [ S_SCHEMAS ] [ S_CONFIG_SCHEMA ]
config_schema [ S_SCHEMA ] [ S_CONFIG_VARS ] [ " id " ] [ " id_type " ] = {
" class " : " script::Script "
}
config_schema [ " is_list " ] = True
2022-11-22 01:43:01 +01:00
def fix_menu ( ) :
# # Menu has a recursive schema which is not kept properly
schemas = output [ " display_menu_base " ] [ S_SCHEMAS ]
# 1. Move items to a new schema
schemas [ " MENU_TYPES " ] = {
S_TYPE : S_SCHEMA ,
S_SCHEMA : {
S_CONFIG_VARS : {
" items " : schemas [ " DISPLAY_MENU_BASE_SCHEMA " ] [ S_SCHEMA ] [ S_CONFIG_VARS ] [
" items "
]
}
} ,
}
# 2. Remove items from the base schema
schemas [ " DISPLAY_MENU_BASE_SCHEMA " ] [ S_SCHEMA ] [ S_CONFIG_VARS ] . pop ( " items " )
# 3. Add extends to this
schemas [ " DISPLAY_MENU_BASE_SCHEMA " ] [ S_SCHEMA ] [ S_EXTENDS ] . append (
" display_menu_base.MENU_TYPES "
)
# 4. Configure menu items inside as recursive
menu = schemas [ " MENU_TYPES " ] [ S_SCHEMA ] [ S_CONFIG_VARS ] [ " items " ] [ " types " ] [ " menu " ]
menu [ S_CONFIG_VARS ] . pop ( " items " )
menu [ S_EXTENDS ] = [ " display_menu_base.MENU_TYPES " ]
2022-06-17 03:46:20 +02:00
def get_logger_tags ( ) :
pattern = re . compile ( r ' ^static const char \ *const TAG = " ( \ w.*) " ; ' , re . MULTILINE )
# tags not in components dir
tags = [
" app " ,
" component " ,
" entity_base " ,
" scheduler " ,
" api.service " ,
]
for x in os . walk ( CORE_COMPONENTS_PATH ) :
for y in glob . glob ( os . path . join ( x [ 0 ] , " *.cpp " ) ) :
with open ( y , encoding = " utf-8 " ) as file :
data = file . read ( )
match = pattern . search ( data )
if match :
tags . append ( match . group ( 1 ) )
return tags
def add_logger_tags ( ) :
tags = get_logger_tags ( )
logs = output [ " logger " ] [ " schemas " ] [ " CONFIG_SCHEMA " ] [ " schema " ] [ " config_vars " ] [
" logs "
] [ " schema " ] [ " config_vars " ]
for t in tags :
logs [ t ] = logs [ " string " ] . copy ( )
logs . pop ( " string " )
2022-04-03 09:30:22 +02:00
def add_referenced_recursive ( referenced_schemas , config_var , path , eat_schema = False ) :
assert (
S_CONFIG_VARS not in config_var and S_EXTENDS not in config_var
) # S_TYPE in cv or "key" in cv or len(cv) == 0
if (
config_var . get ( S_TYPE ) in [ " schema " , " trigger " , " maybe " ]
and S_SCHEMA in config_var
) :
schema = config_var [ S_SCHEMA ]
for k , v in schema . get ( S_CONFIG_VARS , { } ) . items ( ) :
if eat_schema :
new_path = path + [ S_CONFIG_VARS , k ]
else :
new_path = path + [ " schema " , S_CONFIG_VARS , k ]
add_referenced_recursive ( referenced_schemas , v , new_path )
for k in schema . get ( S_EXTENDS , [ ] ) :
if k not in referenced_schemas :
referenced_schemas [ k ] = [ path ]
else :
if path not in referenced_schemas [ k ] :
referenced_schemas [ k ] . append ( path )
s1 = get_str_path_schema ( k )
p = k . split ( " . " )
if len ( p ) == 3 and path [ 0 ] == f " { p [ 0 ] } . { p [ 1 ] } " :
# special case for schema inside platforms
add_referenced_recursive (
referenced_schemas , s1 , [ path [ 0 ] , " schemas " , p [ 2 ] ]
)
else :
add_referenced_recursive (
referenced_schemas , s1 , [ p [ 0 ] , " schemas " , p [ 1 ] ]
)
elif config_var . get ( S_TYPE ) == " typed " :
for tk , tv in config_var . get ( " types " ) . items ( ) :
add_referenced_recursive (
referenced_schemas ,
{
S_TYPE : S_SCHEMA ,
S_SCHEMA : tv ,
} ,
path + [ " types " , tk ] ,
eat_schema = True ,
)
def get_str_path_schema ( strPath ) :
parts = strPath . split ( " . " )
if len ( parts ) > 2 :
parts [ 0 ] + = " . " + parts [ 1 ]
parts [ 1 ] = parts [ 2 ]
s1 = output . get ( parts [ 0 ] , { } ) . get ( S_SCHEMAS , { } ) . get ( parts [ 1 ] , { } )
return s1
def pop_str_path_schema ( strPath ) :
parts = strPath . split ( " . " )
if len ( parts ) > 2 :
parts [ 0 ] + = " . " + parts [ 1 ]
parts [ 1 ] = parts [ 2 ]
output . get ( parts [ 0 ] , { } ) . get ( S_SCHEMAS , { } ) . pop ( parts [ 1 ] )
def get_arr_path_schema ( path ) :
s = output
for x in path :
s = s [ x ]
return s
def merge ( source , destination ) :
"""
run me with nosetests - - with - doctest file . py
>> > a = { ' first ' : { ' all_rows ' : { ' pass ' : ' dog ' , ' number ' : ' 1 ' } } }
>> > b = { ' first ' : { ' all_rows ' : { ' fail ' : ' cat ' , ' number ' : ' 5 ' } } }
>> > merge ( b , a ) == { ' first ' : { ' all_rows ' : { ' pass ' : ' dog ' , ' fail ' : ' cat ' , ' number ' : ' 5 ' } } }
True
"""
for key , value in source . items ( ) :
if isinstance ( value , dict ) :
# get node or create one
node = destination . setdefault ( key , { } )
merge ( value , node )
else :
destination [ key ] = value
return destination
def shrink ( ) :
""" Shrink the extending schemas which has just an end type, e.g. at this point
ota / port is type schema with extended pointing to core . port , this should instead be
type number . core . port is number
This also fixes enums , as they are another schema and they are instead put in the same cv
"""
# referenced_schemas contains a dict, keys are all that are shown in extends: [] arrays, values are lists of paths that are pointing to that extend
# e.g. key: core.COMPONENT_SCHEMA has a lot of paths of config vars which extends this schema
pass_again = True
while pass_again :
pass_again = False
referenced_schemas = { }
for k , v in output . items ( ) :
for kv , vv in v . items ( ) :
if kv != " pin " and isinstance ( vv , dict ) :
for kvv , vvv in vv . items ( ) :
add_referenced_recursive ( referenced_schemas , vvv , [ k , kv , kvv ] )
for x , paths in referenced_schemas . items ( ) :
if len ( paths ) == 1 :
key_s = get_str_path_schema ( x )
arr_s = get_arr_path_schema ( paths [ 0 ] )
# key_s |= arr_s
# key_s.pop(S_EXTENDS)
pass_again = True
if S_SCHEMA in arr_s :
if S_EXTENDS in arr_s [ S_SCHEMA ] :
arr_s [ S_SCHEMA ] . pop ( S_EXTENDS )
else :
print ( " expected extends here! " + x )
arr_s = merge ( key_s , arr_s )
2022-06-17 03:46:20 +02:00
if arr_s [ S_TYPE ] in [ " enum " , " typed " ] :
2022-04-03 09:30:22 +02:00
arr_s . pop ( S_SCHEMA )
else :
arr_s . pop ( S_EXTENDS )
arr_s | = key_s [ S_SCHEMA ]
print ( x )
# simple types should be spread on each component,
# for enums so far these are logger.is_log_level, cover.validate_cover_state and pulse_counter.sensor.COUNT_MODE_SCHEMA
# then for some reasons sensor filter registry falls here
# then are all simple types, integer and strings
for x , paths in referenced_schemas . items ( ) :
key_s = get_str_path_schema ( x )
if key_s and key_s [ S_TYPE ] in [ " enum " , " registry " , " integer " , " string " ] :
if key_s [ S_TYPE ] == " registry " :
print ( " Spreading registry: " + x )
for target in paths :
target_s = get_arr_path_schema ( target )
assert target_s [ S_SCHEMA ] [ S_EXTENDS ] == [ x ]
target_s . pop ( S_SCHEMA )
target_s | = key_s
if key_s [ S_TYPE ] in [ " integer " , " string " ] :
target_s [ " data_type " ] = x . split ( " . " ) [ 1 ]
# remove this dangling again
pop_str_path_schema ( x )
elif not key_s :
for target in paths :
target_s = get_arr_path_schema ( target )
assert target_s [ S_SCHEMA ] [ S_EXTENDS ] == [ x ]
target_s . pop ( S_SCHEMA )
target_s . pop ( S_TYPE ) # undefined
target_s [ " data_type " ] = x . split ( " . " ) [ 1 ]
# remove this dangling again
pop_str_path_schema ( x )
# remove dangling items (unreachable schemas)
for domain , domain_schemas in output . items ( ) :
for schema_name in list ( domain_schemas . get ( S_SCHEMAS , { } ) . keys ( ) ) :
s = f " { domain } . { schema_name } "
if (
not s . endswith ( " . " + S_CONFIG_SCHEMA )
and s not in referenced_schemas . keys ( )
) :
print ( f " Removing { s } " )
output [ domain ] [ S_SCHEMAS ] . pop ( schema_name )
def build_schema ( ) :
print ( " Building schema " )
# check esphome was not loaded globally (IDE auto imports)
if len ( ejs . extended_schemas ) == 0 :
raise Exception (
" no data collected. Did you globally import an ESPHome component? "
)
# Core schema
schema_core [ S_SCHEMAS ] = { }
register_module_schemas ( " core " , cv )
platforms = { }
schema_core [ S_PLATFORMS ] = platforms
core_components = { }
schema_core [ S_COMPONENTS ] = core_components
add_pin_validators ( )
# Load a preview of each component
for domain , manifest in components . items ( ) :
if manifest . is_platform_component :
# e.g. sensor, binary sensor, add S_COMPONENTS
# note: S_COMPONENTS is not filled until loaded, e.g.
# if lock: is not used, then we don't need to know about their
# platforms yet.
output [ domain ] = { S_COMPONENTS : { } , S_SCHEMAS : { } }
platforms [ domain ] = { }
elif manifest . config_schema is not None :
# e.g. dallas
output [ domain ] = { S_SCHEMAS : { S_CONFIG_SCHEMA : { } } }
# Generate platforms (e.g. sensor, binary_sensor, climate )
for domain in platforms :
c = components [ domain ]
register_module_schemas ( domain , c . module )
# Generate components
for domain , manifest in components . items ( ) :
if domain not in platforms :
if manifest . config_schema is not None :
core_components [ domain ] = { }
2022-06-17 03:46:20 +02:00
if len ( manifest . dependencies ) > 0 :
core_components [ domain ] [ " dependencies " ] = manifest . dependencies
2022-04-03 09:30:22 +02:00
register_module_schemas ( domain , manifest . module , manifest )
for platform in platforms :
platform_manifest = get_platform ( domain = platform , platform = domain )
if platform_manifest is not None :
output [ platform ] [ S_COMPONENTS ] [ domain ] = { }
2022-06-17 03:46:20 +02:00
if len ( platform_manifest . dependencies ) > 0 :
output [ platform ] [ S_COMPONENTS ] [ domain ] [
" dependencies "
] = platform_manifest . dependencies
2022-04-03 09:30:22 +02:00
register_module_schemas (
2022-06-17 03:46:20 +02:00
f " { domain } . { platform } " , platform_manifest . module , platform_manifest
2022-04-03 09:30:22 +02:00
)
# Do registries
add_module_registries ( " core " , automation )
for domain , manifest in components . items ( ) :
add_module_registries ( domain , manifest . module )
add_module_registries ( " remote_base " , remote_base )
# update props pointing to registries
for reg_config_var in solve_registry :
( registry , config_var ) = reg_config_var
config_var [ S_TYPE ] = " registry "
config_var [ " registry " ] = found_registries [ repr ( registry ) ]
do_pins ( )
do_esp8266 ( )
do_esp32 ( )
fix_remote_receiver ( )
2022-06-17 03:46:20 +02:00
fix_script ( )
add_logger_tags ( )
2022-04-03 09:30:22 +02:00
shrink ( )
2022-11-22 01:43:01 +01:00
fix_menu ( )
2022-04-03 09:30:22 +02:00
# aggregate components, so all component info is in same file, otherwise we have dallas.json, dallas.sensor.json, etc.
data = { }
for component , component_schemas in output . items ( ) :
if " . " in component :
key = component . partition ( " . " ) [ 0 ]
if key not in data :
data [ key ] = { }
data [ key ] [ component ] = component_schemas
else :
if component not in data :
data [ component ] = { }
data [ component ] | = { component : component_schemas }
# bundle core inside esphome
data [ " esphome " ] [ " core " ] = data . pop ( " core " ) [ " core " ]
for c , s in data . items ( ) :
write_file ( c , s )
def setEnum ( obj , items ) :
obj [ S_TYPE ] = " enum "
obj [ " values " ] = items
def isConvertibleSchema ( schema ) :
if schema is None :
return False
if isinstance ( schema , ( cv . Schema , cv . All ) ) :
return True
if repr ( schema ) in ejs . hidden_schemas :
return True
if repr ( schema ) in ejs . typed_schemas :
return True
if repr ( schema ) in ejs . list_schemas :
return True
if repr ( schema ) in ejs . registry_schemas :
return True
if isinstance ( schema , dict ) :
for k in schema . keys ( ) :
if isinstance ( k , ( cv . Required , cv . Optional ) ) :
return True
return False
def convert_config ( schema , path ) :
converted = { }
convert_1 ( schema , converted , path )
return converted
def convert_1 ( schema , config_var , path ) :
""" config_var can be a config_var or a schema: both are dicts
config_var has a S_TYPE property , if this is S_SCHEMA , then it has a S_SCHEMA property
schema does not have a type property , schema can have optionally both S_CONFIG_VARS and S_EXTENDS
"""
repr_schema = repr ( schema )
if repr_schema in known_schemas :
schema_info = known_schemas [ ( repr_schema ) ]
for ( schema_instance , name ) in schema_info :
if schema_instance is schema :
assert S_CONFIG_VARS not in config_var
assert S_EXTENDS not in config_var
if not S_TYPE in config_var :
config_var [ S_TYPE ] = S_SCHEMA
2022-06-17 03:46:20 +02:00
# assert config_var[S_TYPE] == S_SCHEMA
2022-04-03 09:30:22 +02:00
if S_SCHEMA not in config_var :
config_var [ S_SCHEMA ] = { }
if S_EXTENDS not in config_var [ S_SCHEMA ] :
config_var [ S_SCHEMA ] [ S_EXTENDS ] = [ name ]
else :
config_var [ S_SCHEMA ] [ S_EXTENDS ] . append ( name )
return
# Extended schemas are tracked when the .extend() is used in a schema
if repr_schema in ejs . extended_schemas :
extended = ejs . extended_schemas . get ( repr_schema )
# The midea actions are extending an empty schema (resulted in the templatize not templatizing anything)
# this causes a recursion in that this extended looks the same in extended schema as the extended[1]
if repr_schema == repr ( extended [ 1 ] ) :
assert path . startswith ( " midea_ac/ " )
return
assert len ( extended ) == 2
convert_1 ( extended [ 0 ] , config_var , path + " /extL " )
convert_1 ( extended [ 1 ] , config_var , path + " /extR " )
return
if isinstance ( schema , cv . All ) :
i = 0
for inner in schema . validators :
i = i + 1
convert_1 ( inner , config_var , path + f " /val { i } " )
return
if hasattr ( schema , " validators " ) :
i = 0
for inner in schema . validators :
i = i + 1
convert_1 ( inner , config_var , path + f " /val { i } " )
if isinstance ( schema , cv . Schema ) :
convert_1 ( schema . schema , config_var , path + " /all " )
return
if isinstance ( schema , dict ) :
convert_keys ( config_var , schema , path )
return
if repr_schema in ejs . list_schemas :
config_var [ " is_list " ] = True
items_schema = ejs . list_schemas [ repr_schema ] [ 0 ]
convert_1 ( items_schema , config_var , path + " /list " )
return
if DUMP_RAW :
config_var [ " raw " ] = repr_schema
# pylint: disable=comparison-with-callable
if schema == cv . boolean :
config_var [ S_TYPE ] = " boolean "
elif schema == automation . validate_potentially_and_condition :
config_var [ S_TYPE ] = " registry "
config_var [ " registry " ] = " condition "
elif schema == cv . int_ or schema == cv . int_range :
config_var [ S_TYPE ] = " integer "
elif schema == cv . string or schema == cv . string_strict or schema == cv . valid_name :
config_var [ S_TYPE ] = " string "
elif isinstance ( schema , vol . Schema ) :
# test: esphome/project
config_var [ S_TYPE ] = " schema "
config_var [ " schema " ] = convert_config ( schema . schema , path + " /s " ) [ " schema " ]
elif repr_schema in pin_validators :
config_var | = pin_validators [ repr_schema ]
config_var [ S_TYPE ] = " pin "
elif repr_schema in ejs . hidden_schemas :
schema_type = ejs . hidden_schemas [ repr_schema ]
2022-06-17 03:46:20 +02:00
data = schema ( ejs . SCHEMA_EXTRACT )
2022-04-03 09:30:22 +02:00
# enums, e.g. esp32/variant
if schema_type == " one_of " :
config_var [ S_TYPE ] = " enum "
config_var [ " values " ] = list ( data )
elif schema_type == " enum " :
config_var [ S_TYPE ] = " enum "
config_var [ " values " ] = list ( data . keys ( ) )
elif schema_type == " maybe " :
2022-06-17 03:46:20 +02:00
config_var [ S_TYPE ] = S_SCHEMA
config_var [ " maybe " ] = data [ 1 ]
config_var [ " schema " ] = convert_config ( data [ 0 ] , path + " /maybe " ) [ " schema " ]
2022-04-03 09:30:22 +02:00
# esphome/on_boot
elif schema_type == " automation " :
extra_schema = None
config_var [ S_TYPE ] = " trigger "
if automation . AUTOMATION_SCHEMA == ejs . extended_schemas [ repr ( data ) ] [ 0 ] :
extra_schema = ejs . extended_schemas [ repr ( data ) ] [ 1 ]
if (
extra_schema is not None and len ( extra_schema ) > 1
) : # usually only trigger_id here
config = convert_config ( extra_schema , path + " /extra " )
if " schema " in config :
automation_schema = config [ " schema " ]
if not (
len ( automation_schema [ " config_vars " ] ) == 1
and " trigger_id " in automation_schema [ " config_vars " ]
) :
automation_schema [ " config_vars " ] [ " then " ] = { S_TYPE : " trigger " }
if " trigger_id " in automation_schema [ " config_vars " ] :
automation_schema [ " config_vars " ] . pop ( " trigger_id " )
config_var [ S_TYPE ] = " trigger "
config_var [ " schema " ] = automation_schema
# some triggers can have a list of actions directly, while others needs to have some other configuration,
# e.g. sensor.on_value_rang, and the list of actions is only accepted under "then" property.
try :
schema ( { " delay " : " 1s " } )
except cv . Invalid :
config_var [ " has_required_var " ] = True
else :
print ( " figure out " + path )
elif schema_type == " effects " :
config_var [ S_TYPE ] = " registry "
config_var [ " registry " ] = " light.effects "
config_var [ " filter " ] = data [ 0 ]
elif schema_type == " templatable " :
config_var [ " templatable " ] = True
convert_1 ( data , config_var , path + " /templat " )
elif schema_type == " triggers " :
# remote base
convert_1 ( data , config_var , path + " /trigger " )
elif schema_type == " sensor " :
schema = data
convert_1 ( data , config_var , path + " /trigger " )
2022-06-17 03:46:20 +02:00
elif schema_type == " declare_id " :
# pylint: disable=protected-access
parents = data . _parents
config_var [ " id_type " ] = {
" class " : str ( data . base ) ,
" parents " : [ str ( x . base ) for x in parents ]
if isinstance ( parents , list )
else None ,
}
elif schema_type == " use_id " :
if inspect . ismodule ( data ) :
m_attr_obj = getattr ( data , " CONFIG_SCHEMA " )
use_schema = known_schemas . get ( repr ( m_attr_obj ) )
if use_schema :
[ output_module , output_name ] = use_schema [ 0 ] [ 1 ] . split ( " . " )
use_id_config = output [ output_module ] [ S_SCHEMAS ] [ output_name ]
config_var [ " use_id_type " ] = use_id_config [ " schema " ] [ " config_vars " ] [
" id "
] [ " id_type " ] [ " class " ]
config_var [ S_TYPE ] = " use_id "
else :
print ( " TODO deferred? " )
else :
if isinstance ( data , str ) :
# TODO: Figure out why pipsolar does this
config_var [ " use_id_type " ] = data
else :
config_var [ " use_id_type " ] = str ( data . base )
config_var [ S_TYPE ] = " use_id "
2022-04-03 09:30:22 +02:00
else :
raise Exception ( " Unknown extracted schema type " )
2022-06-17 03:46:20 +02:00
elif config_var . get ( " key " ) == " GeneratedID " :
if path == " i2c/CONFIG_SCHEMA/extL/all/id " :
config_var [ " id_type " ] = { " class " : " i2c::I2CBus " , " parents " : [ " Component " ] }
elif path == " uart/CONFIG_SCHEMA/val 1/extL/all/id " :
config_var [ " id_type " ] = {
" class " : " uart::UARTComponent " ,
" parents " : [ " Component " ] ,
}
elif path == " pins/esp32/val 1/id " :
config_var [ " id_type " ] = " pin "
else :
raise Exception ( " Cannot determine id_type for " + path )
2022-04-03 09:30:22 +02:00
elif repr_schema in ejs . registry_schemas :
solve_registry . append ( ( ejs . registry_schemas [ repr_schema ] , config_var ) )
elif repr_schema in ejs . typed_schemas :
config_var [ S_TYPE ] = " typed "
types = config_var [ " types " ] = { }
typed_schema = ejs . typed_schemas [ repr_schema ]
if len ( typed_schema ) > 1 :
config_var [ " typed_key " ] = typed_schema [ 1 ] . get ( " key " , CONF_TYPE )
for schema_key , schema_type in typed_schema [ 0 ] [ 0 ] . items ( ) :
config = convert_config ( schema_type , path + " /type_ " + schema_key )
types [ schema_key ] = config [ " schema " ]
elif DUMP_UNKNOWN :
if S_TYPE not in config_var :
config_var [ " unknown " ] = repr_schema
if DUMP_PATH :
config_var [ " path " ] = path
def get_overridden_config ( key , converted ) :
# check if the key is in any extended schema in this converted schema, i.e.
# if we see a on_value_range in a dallas sensor, then this is overridden because
# it is already defined in sensor
assert S_CONFIG_VARS not in converted and S_EXTENDS not in converted
config = converted . get ( S_SCHEMA , { } )
return get_overridden_key_inner ( key , config , { } )
def get_overridden_key_inner ( key , config , ret ) :
if S_EXTENDS not in config :
return ret
for s in config [ S_EXTENDS ] :
p = s . partition ( " . " )
s1 = output . get ( p [ 0 ] , { } ) . get ( S_SCHEMAS , { } ) . get ( p [ 2 ] , { } ) . get ( S_SCHEMA )
if s1 :
if key in s1 . get ( S_CONFIG_VARS , { } ) :
for k , v in s1 . get ( S_CONFIG_VARS ) [ key ] . items ( ) :
if k not in ret : # keep most overridden
ret [ k ] = v
get_overridden_key_inner ( key , s1 , ret )
return ret
def convert_keys ( converted , schema , path ) :
for k , v in schema . items ( ) :
# deprecated stuff
if repr ( v ) . startswith ( " <function invalid " ) :
continue
result = { }
if isinstance ( k , cv . GenerateID ) :
result [ " key " ] = " GeneratedID "
elif isinstance ( k , cv . Required ) :
result [ " key " ] = " Required "
elif (
isinstance ( k , cv . Optional )
or isinstance ( k , cv . Inclusive )
or isinstance ( k , cv . Exclusive )
) :
result [ " key " ] = " Optional "
else :
converted [ " key " ] = " String "
2022-06-17 03:46:20 +02:00
key_string_match = re . search (
r " <function ( \ w*) at \ w*> " , str ( k ) , re . IGNORECASE
)
if key_string_match :
converted [ " key_type " ] = key_string_match . group ( 1 )
else :
converted [ " key_type " ] = str ( k )
2022-04-03 09:30:22 +02:00
esphome_core . CORE . data = {
esphome_core . KEY_CORE : { esphome_core . KEY_TARGET_PLATFORM : " esp8266 " }
}
if hasattr ( k , " default " ) and str ( k . default ) != " ... " :
default_value = k . default ( )
if default_value is not None :
result [ " default " ] = str ( default_value )
# Do value
convert_1 ( v , result , path + f " / { str ( k ) } " )
if " schema " not in converted :
converted [ S_TYPE ] = " schema "
converted [ " schema " ] = { S_CONFIG_VARS : { } }
if S_CONFIG_VARS not in converted [ " schema " ] :
converted [ " schema " ] [ S_CONFIG_VARS ] = { }
for base_k , base_v in get_overridden_config ( k , converted ) . items ( ) :
if base_k in result and base_v == result [ base_k ] :
result . pop ( base_k )
converted [ " schema " ] [ S_CONFIG_VARS ] [ str ( k ) ] = result
2022-06-17 03:46:20 +02:00
if " key " in converted and converted [ " key " ] == " String " :
config_vars = converted [ " schema " ] [ " config_vars " ]
assert len ( config_vars ) == 1
key = list ( config_vars . keys ( ) ) [ 0 ]
assert key . startswith ( " < " )
config_vars [ " string " ] = config_vars . pop ( key )
2022-04-03 09:30:22 +02:00
build_schema ( )