md5 files and esphome publish

This commit is contained in:
oarcher 2024-04-04 13:06:46 +00:00
parent af3fb615ea
commit 77d3fb6803
4 changed files with 113 additions and 26 deletions

View File

@ -6,6 +6,8 @@ import os
import re
import sys
import time
import hashlib
import gzip
from datetime import datetime
import argcomplete
@ -37,7 +39,7 @@ from esphome.const import (
SECRETS_FILES,
)
from esphome.core import CORE, EsphomeError, coroutine
from esphome.helpers import indent, is_ip_address
from esphome.helpers import indent, is_ip_address, copy_file_if_changed
from esphome.util import (
run_external_command,
run_external_process,
@ -220,7 +222,25 @@ def compile_program(args, config):
if rc != 0:
return rc
idedata = platformio_api.get_idedata(config)
return 0 if idedata is not None else 1
if idedata is None:
return 1
with open(CORE.firmware_bin, "rb") as firmware_bin:
bin_content = firmware_bin.read()
md5 = hashlib.md5(bin_content).hexdigest()
with open(CORE.firmware_md5, "w", encoding="ascii") as firmware_md5:
firmware_md5.write(md5)
gz_content = gzip.compress(bin_content, compresslevel=9)
with open(CORE.firmware_gz, "wb") as firmware_gz:
firmware_gz.write(gz_content)
md5_gz = hashlib.md5(gz_content).hexdigest()
with open(CORE.firmware_gz_md5, "w", encoding="ascii") as firmware_gz_md5:
firmware_gz_md5.write(md5_gz)
return 0
def upload_using_esptool(config, port, file):
@ -686,6 +706,22 @@ def command_rename(args, config):
print()
return 0
def command_publish(args, config):
files = [ CORE.firmware_bin, CORE.firmware_md5, CORE.firmware_gz, CORE.firmware_gz_md5]
if not all(map(lambda f: os.path.isfile(f), files)):
exit_code = compile_program(args, config)
if exit_code != 0:
return exit_code
topdir = os.path.join(args.directory, CORE.name)
_LOGGER.info(f"Copying firmware files to {topdir}...")
for src_file in files:
dst_file = os.path.join(args.directory, CORE.name, os.path.basename(src_file))
copy_file_if_changed(src_file, dst_file)
return 0
PRE_CONFIG_ACTIONS = {
"wizard": command_wizard,
@ -707,6 +743,7 @@ POST_CONFIG_ACTIONS = {
"idedata": command_idedata,
"rename": command_rename,
"discover": command_discover,
"publish": command_publish,
}
@ -804,6 +841,19 @@ def parse_args(argv):
"configuration", help="Your YAML configuration file.", nargs=1
)
parser_publish = subparsers.add_parser(
"publish",
help="Publish compiled binary. (currently copy binary to directory)",
)
parser_publish.add_argument(
"configuration", help="Your YAML configuration file(s).", nargs="+"
)
parser_publish.add_argument(
"--directory",
default=os.path.join(os.path.sep, "config", "www"),
help="Destination directory",
)
parser_run = subparsers.add_parser(
"run",
help="Validate the configuration, create a binary, upload it, and start logs.",

View File

@ -627,6 +627,18 @@ class EsphomeCore:
return self.relative_pioenvs_path(self.name, "firmware.uf2")
return self.relative_pioenvs_path(self.name, "firmware.bin")
@property
def firmware_md5(self):
return self.firmware_bin + '.md5'
@property
def firmware_gz(self):
return self.firmware_bin + '.gz'
@property
def firmware_gz_md5(self):
return self.firmware_gz + '.md5'
@property
def target_platform(self):
return self.data[KEY_CORE][KEY_TARGET_PLATFORM]

View File

@ -375,6 +375,11 @@ class EsphomeRunHandler(EsphomePortCommandWebSocket):
"""Build the command to run."""
return await self.build_device_command(["run"], json_message)
class EsphomePublishHandler(EsphomePortCommandWebSocket):
async def build_command(self, json_message: dict[str, Any]) -> list[str]:
config_file = settings.rel_path(json_message["configuration"])
return [*DASHBOARD_COMMAND, "publish", config_file]
class EsphomeCompileHandler(EsphomeCommandWebSocket):
async def build_command(self, json_message: dict[str, Any]) -> list[str]:
@ -1038,6 +1043,8 @@ def get_base_frontend_path() -> str:
if not static_path.endswith("/"):
static_path += "/"
_LOGGER.warning("esphome_dashboard %s" % os.path.abspath(os.path.join(os.getcwd(), static_path, "esphome_dashboard")))
# This path can be relative, so resolve against the root or else templates don't work
return os.path.abspath(os.path.join(os.getcwd(), static_path, "esphome_dashboard"))
@ -1118,6 +1125,7 @@ def make_app(debug=get_bool_env(ENV_DEV)) -> tornado.web.Application:
(f"{rel}logs", EsphomeLogsHandler),
(f"{rel}upload", EsphomeUploadHandler),
(f"{rel}run", EsphomeRunHandler),
(f"{rel}publish", EsphomePublishHandler),
(f"{rel}compile", EsphomeCompileHandler),
(f"{rel}validate", EsphomeValidateHandler),
(f"{rel}clean-mqtt", EsphomeCleanMqttHandler),

View File

@ -8,8 +8,9 @@ import random
import socket
import sys
import time
from typing import Optional
from esphome.core import EsphomeError
from esphome.core import EsphomeError, CORE
from esphome.helpers import is_ip_address, resolve_ip_address
RESPONSE_OK = 0x00
@ -194,11 +195,11 @@ def send_check(sock, data, msg):
def perform_ota(
sock: socket.socket, password: str, file_handle: io.IOBase, filename: str
sock: socket.socket, password: str, filename: Optional[str] = None
) -> None:
file_contents = file_handle.read()
file_size = len(file_contents)
_LOGGER.info("Uploading %s (%s bytes)", filename, file_size)
#file_contents = file_handle.read()
#file_size = len(file_contents)
#_LOGGER.info("Uploading %s (%s bytes)", filename, file_size)
# Enable nodelay, we need it for phase 1
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
@ -219,10 +220,28 @@ def perform_ota(
)[0]
if features == RESPONSE_SUPPORTS_COMPRESSION:
upload_contents = gzip.compress(file_contents, compresslevel=9)
_LOGGER.info("Compressed to %s bytes", len(upload_contents))
filename = CORE.firmware_gz
filename_md5 = CORE.firmware_gz_md5
#upload_contents = gzip.compress(file_contents, compresslevel=9)
#_LOGGER.info("Compressed to %s bytes", len(upload_contents))
_LOGGER.info("Using compressed image")
else:
upload_contents = file_contents
_LOGGER.info("Not using compressed image")
filename = CORE.firmware_bin
filename_md5 = CORE.firmware_md5
#upload_contents = file_contents
try:
with open(filename, "rb") as file_handle:
file_contents = file_handle.read()
except IOError as e:
raise OTAError(f"Error on {filename}: {str(e)}")
try:
with open(filename_md5, "r", encoding='ascii') as file_md5_handle:
md5 = file_md5_handle.read()
except IOError as e:
raise OTAError(f"Error on {filename_md5}: {str(e)}")
(auth,) = receive_exactly(
sock, 1, "auth", [RESPONSE_REQUEST_AUTH, RESPONSE_AUTH_OK]
@ -249,7 +268,7 @@ def perform_ota(
send_check(sock, result, "auth result")
receive_exactly(sock, 1, "auth result", RESPONSE_AUTH_OK)
upload_size = len(upload_contents)
upload_size = len(file_contents)
upload_size_encoded = [
(upload_size >> 24) & 0xFF,
(upload_size >> 16) & 0xFF,
@ -259,10 +278,9 @@ def perform_ota(
send_check(sock, upload_size_encoded, "binary size")
receive_exactly(sock, 1, "binary size", RESPONSE_UPDATE_PREPARE_OK)
upload_md5 = hashlib.md5(upload_contents).hexdigest()
_LOGGER.debug("MD5 of upload is %s", upload_md5)
_LOGGER.debug("MD5 of upload is %s", md5)
send_check(sock, upload_md5, "file checksum")
send_check(sock, md5, "file checksum")
receive_exactly(sock, 1, "file checksum", RESPONSE_BIN_MD5_OK)
# Disable nodelay for transfer
@ -278,7 +296,7 @@ def perform_ota(
offset = 0
progress = ProgressBar()
while True:
chunk = upload_contents[offset : offset + UPLOAD_BLOCK_SIZE]
chunk = file_contents[offset : offset + UPLOAD_BLOCK_SIZE]
if not chunk:
break
offset += len(chunk)
@ -310,7 +328,7 @@ def perform_ota(
time.sleep(1)
def run_ota_impl_(remote_host, remote_port, password, filename):
def run_ota_impl_(remote_host, remote_port, password, filename=None):
if is_ip_address(remote_host):
_LOGGER.info("Connecting to %s", remote_host)
ip = remote_host
@ -339,21 +357,20 @@ def run_ota_impl_(remote_host, remote_port, password, filename):
_LOGGER.error("Connecting to %s:%s failed: %s", remote_host, remote_port, err)
return 1
with open(filename, "rb") as file_handle:
try:
perform_ota(sock, password, file_handle, filename)
except OTAError as err:
_LOGGER.error(str(err))
return 1
finally:
sock.close()
try:
perform_ota(sock, password, filename=filename)
except OTAError as err:
_LOGGER.error(str(err))
return 1
finally:
sock.close()
return 0
def run_ota(remote_host, remote_port, password, filename):
def run_ota(remote_host, remote_port, password, filename=None):
try:
return run_ota_impl_(remote_host, remote_port, password, filename)
return run_ota_impl_(remote_host, remote_port, password, filename=filename)
except OTAError as err:
_LOGGER.error(err)
return 1