From 7c17e72db42ae6016bf315a7b09368395b38736d Mon Sep 17 00:00:00 2001 From: Otto Winter Date: Mon, 20 Sep 2021 00:33:10 +0200 Subject: [PATCH] Add readv and writev for more efficient API packets (#2342) --- esphome/components/api/api_connection.cpp | 29 ++-- esphome/components/api/api_frame_helper.cpp | 127 ++++++++++-------- esphome/components/api/api_frame_helper.h | 5 +- .../components/socket/bsd_sockets_impl.cpp | 48 +++++++ esphome/components/socket/headers.h | 6 + .../components/socket/lwip_raw_tcp_impl.cpp | 87 +++++++++--- esphome/components/socket/socket.h | 2 + 7 files changed, 220 insertions(+), 84 deletions(-) diff --git a/esphome/components/api/api_connection.cpp b/esphome/components/api/api_connection.cpp index 786fc28d68..8e6d1bc1c2 100644 --- a/esphome/components/api/api_connection.cpp +++ b/esphome/components/api/api_connection.cpp @@ -702,15 +702,7 @@ bool APIConnection::send_log_message(int level, const char *tag, const char *lin // string message = 3; buffer.encode_string(3, line, strlen(line)); // SubscribeLogsResponse - 29 - bool success = this->send_buffer(buffer, 29); - if (!success) { - buffer = this->create_buffer(); - // bool send_failed = 4; - buffer.encode_bool(4, true); - return this->send_buffer(buffer, 29); - } else { - return true; - } + return this->send_buffer(buffer, 29); } HelloResponse APIConnection::hello(const HelloRequest &msg) { @@ -783,8 +775,23 @@ void APIConnection::subscribe_home_assistant_states(const SubscribeHomeAssistant bool APIConnection::send_buffer(ProtoWriteBuffer buffer, uint32_t message_type) { if (this->remove_) return false; - if (!this->helper_->can_write_without_blocking()) - return false; + if (!this->helper_->can_write_without_blocking()) { + delay(0); + APIError err = helper_->loop(); + if (err != APIError::OK) { + on_fatal_error(); + ESP_LOGW(TAG, "%s: Socket operation failed: %s errno=%d", client_info_.c_str(), api_error_to_str(err), errno); + return false; + } + if (!this->helper_->can_write_without_blocking()) { + // SubscribeLogsResponse + if (message_type != 29) { + ESP_LOGV(TAG, "Cannot send message because of TCP buffer space"); + } + delay(0); + return false; + } + } APIError err = this->helper_->write_packet(message_type, buffer.get_buffer()->data(), buffer.get_buffer()->size()); if (err == APIError::WOULD_BLOCK) diff --git a/esphome/components/api/api_frame_helper.cpp b/esphome/components/api/api_frame_helper.cpp index e68831e594..15014b7937 100644 --- a/esphome/components/api/api_frame_helper.cpp +++ b/esphome/components/api/api_frame_helper.cpp @@ -125,13 +125,6 @@ APIError APINoiseFrameHelper::init() { HELPER_LOG("Setting nonblocking failed with errno %d", errno); return APIError::TCP_NONBLOCKING_FAILED; } - int enable = 1; - err = socket_->setsockopt(IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(int)); - if (err != 0) { - state_ = State::FAILED; - HELPER_LOG("Setting nodelay failed with errno %d", errno); - return APIError::TCP_NODELAY_FAILED; - } // init prologue prologue_.insert(prologue_.end(), PROLOGUE_INIT, PROLOGUE_INIT + strlen(PROLOGUE_INIT)); @@ -494,12 +487,13 @@ APIError APINoiseFrameHelper::write_packet(uint16_t type, const uint8_t *payload size_t total_len = 3 + mbuf.size; tmpbuf[1] = (uint8_t)(mbuf.size >> 8); tmpbuf[2] = (uint8_t) mbuf.size; + + struct iovec iov; + iov.iov_base = &tmpbuf[0]; + iov.iov_len = total_len; + // write raw to not have two packets sent if NAGLE disabled - aerr = write_raw_(&tmpbuf[0], total_len); - if (aerr != APIError::OK) { - return aerr; - } - return APIError::OK; + return write_raw_(&iov, 1); } APIError APINoiseFrameHelper::try_send_tx_buf_() { // try send from tx_buf @@ -526,16 +520,19 @@ APIError APINoiseFrameHelper::try_send_tx_buf_() { * @param data The data to write * @param len The length of data */ -APIError APINoiseFrameHelper::write_raw_(const uint8_t *data, size_t len) { - if (len == 0) +APIError APINoiseFrameHelper::write_raw_(const struct iovec *iov, int iovcnt) { + if (iovcnt == 0) return APIError::OK; int err; APIError aerr; - // uncomment for even more debugging + size_t total_write_len = 0; + for (int i = 0; i < iovcnt; i++) { #ifdef HELPER_LOG_PACKETS - ESP_LOGVV(TAG, "Sending raw: %s", hexencode(data, len).c_str()); + ESP_LOGVV(TAG, "Sending raw: %s", hexencode(reinterpret_cast(iov[i].iov_base), iov[i].iov_len).c_str()); #endif + total_write_len += iov[i].iov_len; + } if (!tx_buf_.empty()) { // try to empty tx_buf_ first @@ -546,41 +543,56 @@ APIError APINoiseFrameHelper::write_raw_(const uint8_t *data, size_t len) { if (!tx_buf_.empty()) { // tx buf not empty, can't write now because then stream would be inconsistent - tx_buf_.insert(tx_buf_.end(), data, data + len); + for (int i = 0; i < iovcnt; i++) { + tx_buf_.insert(tx_buf_.end(), reinterpret_cast(iov[i].iov_base), + reinterpret_cast(iov[i].iov_base) + iov[i].iov_len); + } return APIError::OK; } - ssize_t sent = socket_->write(data, len); + ssize_t sent = socket_->writev(iov, iovcnt); if (is_would_block(sent)) { // operation would block, add buffer to tx_buf - tx_buf_.insert(tx_buf_.end(), data, data + len); + for (int i = 0; i < iovcnt; i++) { + tx_buf_.insert(tx_buf_.end(), reinterpret_cast(iov[i].iov_base), + reinterpret_cast(iov[i].iov_base) + iov[i].iov_len); + } return APIError::OK; } else if (sent == -1) { // an error occured state_ = State::FAILED; HELPER_LOG("Socket write failed with errno %d", errno); return APIError::SOCKET_WRITE_FAILED; - } else if (sent != len) { + } else if (sent != total_write_len) { // partially sent, add end to tx_buf - tx_buf_.insert(tx_buf_.end(), data + sent, data + len); + size_t to_consume = sent; + for (int i = 0; i < iovcnt; i++) { + if (to_consume >= iov[i].iov_len) { + to_consume -= iov[i].iov_len; + } else { + tx_buf_.insert(tx_buf_.end(), reinterpret_cast(iov[i].iov_base) + to_consume, + reinterpret_cast(iov[i].iov_base) + iov[i].iov_len); + to_consume = 0; + } + } return APIError::OK; } // fully sent return APIError::OK; } APIError APINoiseFrameHelper::write_frame_(const uint8_t *data, size_t len) { - APIError aerr; - uint8_t header[3]; header[0] = 0x01; // indicator header[1] = (uint8_t)(len >> 8); header[2] = (uint8_t) len; - aerr = write_raw_(header, 3); - if (aerr != APIError::OK) - return aerr; - aerr = write_raw_(data, len); - return aerr; + struct iovec iov[2]; + iov[0].iov_base = header; + iov[0].iov_len = 3; + iov[1].iov_base = const_cast(data); + iov[1].iov_len = len; + + return write_raw_(iov, 2); } /** Initiate the data structures for the handshake. @@ -709,13 +721,6 @@ APIError APIPlaintextFrameHelper::init() { HELPER_LOG("Setting nonblocking failed with errno %d", errno); return APIError::TCP_NONBLOCKING_FAILED; } - int enable = 1; - err = socket_->setsockopt(IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(int)); - if (err != 0) { - state_ = State::FAILED; - HELPER_LOG("Setting nodelay failed with errno %d", errno); - return APIError::TCP_NODELAY_FAILED; - } state_ = State::DATA; return APIError::OK; @@ -863,15 +868,13 @@ APIError APIPlaintextFrameHelper::write_packet(uint16_t type, const uint8_t *pay ProtoVarInt(payload_len).encode(header); ProtoVarInt(type).encode(header); - aerr = write_raw_(&header[0], header.size()); - if (aerr != APIError::OK) { - return aerr; - } - aerr = write_raw_(payload, payload_len); - if (aerr != APIError::OK) { - return aerr; - } - return APIError::OK; + struct iovec iov[2]; + iov[0].iov_base = &header[0]; + iov[0].iov_len = header.size(); + iov[1].iov_base = const_cast(payload); + iov[1].iov_len = payload_len; + + return write_raw_(iov, 2); } APIError APIPlaintextFrameHelper::try_send_tx_buf_() { // try send from tx_buf @@ -896,16 +899,19 @@ APIError APIPlaintextFrameHelper::try_send_tx_buf_() { * @param data The data to write * @param len The length of data */ -APIError APIPlaintextFrameHelper::write_raw_(const uint8_t *data, size_t len) { - if (len == 0) +APIError APIPlaintextFrameHelper::write_raw_(const struct iovec *iov, int iovcnt) { + if (iovcnt == 0) return APIError::OK; int err; APIError aerr; - // uncomment for even more debugging + size_t total_write_len = 0; + for (int i = 0; i < iovcnt; i++) { #ifdef HELPER_LOG_PACKETS - ESP_LOGVV(TAG, "Sending raw: %s", hexencode(data, len).c_str()); + ESP_LOGVV(TAG, "Sending raw: %s", hexencode(reinterpret_cast(iov[i].iov_base), iov[i].iov_len).c_str()); #endif + total_write_len += iov[i].iov_len; + } if (!tx_buf_.empty()) { // try to empty tx_buf_ first @@ -916,23 +922,38 @@ APIError APIPlaintextFrameHelper::write_raw_(const uint8_t *data, size_t len) { if (!tx_buf_.empty()) { // tx buf not empty, can't write now because then stream would be inconsistent - tx_buf_.insert(tx_buf_.end(), data, data + len); + for (int i = 0; i < iovcnt; i++) { + tx_buf_.insert(tx_buf_.end(), reinterpret_cast(iov[i].iov_base), + reinterpret_cast(iov[i].iov_base) + iov[i].iov_len); + } return APIError::OK; } - ssize_t sent = socket_->write(data, len); + ssize_t sent = socket_->writev(iov, iovcnt); if (is_would_block(sent)) { // operation would block, add buffer to tx_buf - tx_buf_.insert(tx_buf_.end(), data, data + len); + for (int i = 0; i < iovcnt; i++) { + tx_buf_.insert(tx_buf_.end(), reinterpret_cast(iov[i].iov_base), + reinterpret_cast(iov[i].iov_base) + iov[i].iov_len); + } return APIError::OK; } else if (sent == -1) { // an error occured state_ = State::FAILED; HELPER_LOG("Socket write failed with errno %d", errno); return APIError::SOCKET_WRITE_FAILED; - } else if (sent != len) { + } else if (sent != total_write_len) { // partially sent, add end to tx_buf - tx_buf_.insert(tx_buf_.end(), data + sent, data + len); + size_t to_consume = sent; + for (int i = 0; i < iovcnt; i++) { + if (to_consume >= iov[i].iov_len) { + to_consume -= iov[i].iov_len; + } else { + tx_buf_.insert(tx_buf_.end(), reinterpret_cast(iov[i].iov_base) + to_consume, + reinterpret_cast(iov[i].iov_base) + iov[i].iov_len); + to_consume = 0; + } + } return APIError::OK; } // fully sent diff --git a/esphome/components/api/api_frame_helper.h b/esphome/components/api/api_frame_helper.h index a9a653cf4f..44df629b2f 100644 --- a/esphome/components/api/api_frame_helper.h +++ b/esphome/components/api/api_frame_helper.h @@ -58,6 +58,7 @@ const char *api_error_to_str(APIError err); class APIFrameHelper { public: + virtual ~APIFrameHelper() = default; virtual APIError init() = 0; virtual APIError loop() = 0; virtual APIError read_packet(ReadPacketBuffer *buffer) = 0; @@ -96,7 +97,7 @@ class APINoiseFrameHelper : public APIFrameHelper { APIError try_read_frame_(ParsedFrame *frame); APIError try_send_tx_buf_(); APIError write_frame_(const uint8_t *data, size_t len); - APIError write_raw_(const uint8_t *data, size_t len); + APIError write_raw_(const struct iovec *iov, int iovcnt); APIError init_handshake_(); APIError check_handshake_finished_(); void send_explicit_handshake_reject_(const std::string &reason); @@ -154,7 +155,7 @@ class APIPlaintextFrameHelper : public APIFrameHelper { APIError try_read_frame_(ParsedFrame *frame); APIError try_send_tx_buf_(); - APIError write_raw_(const uint8_t *data, size_t len); + APIError write_raw_(const struct iovec *iov, int iovcnt); std::unique_ptr socket_; diff --git a/esphome/components/socket/bsd_sockets_impl.cpp b/esphome/components/socket/bsd_sockets_impl.cpp index a0cdb6ec42..aca15f118e 100644 --- a/esphome/components/socket/bsd_sockets_impl.cpp +++ b/esphome/components/socket/bsd_sockets_impl.cpp @@ -5,6 +5,10 @@ #include +#ifdef ARDUINO_ARCH_ESP32 +#include +#endif + namespace esphome { namespace socket { @@ -75,7 +79,51 @@ class BSDSocketImpl : public Socket { } int listen(int backlog) override { return ::listen(fd_, backlog); } ssize_t read(void *buf, size_t len) override { return ::read(fd_, buf, len); } + ssize_t readv(const struct iovec *iov, int iovcnt) override { +#if defined(ARDUINO_ARCH_ESP32) && ESP_IDF_VERSION_MAJOR < 4 + // esp-idf v3 doesn't have readv, emulate it + ssize_t ret = 0; + for (int i = 0; i < iovcnt; i++) { + ssize_t err = this->read(reinterpret_cast(iov[i].iov_base), iov[i].iov_len); + if (err == -1) { + if (ret != 0) + // if we already read some don't return an error + break; + return err; + } + ret += err; + if (err != iov[i].iov_len) + break; + } + return ret; +#else + return ::readv(fd_, iov, iovcnt); +#endif + } ssize_t write(const void *buf, size_t len) override { return ::write(fd_, buf, len); } + ssize_t send(void *buf, size_t len, int flags) { return ::send(fd_, buf, len, flags); } + ssize_t writev(const struct iovec *iov, int iovcnt) override { +#if defined(ARDUINO_ARCH_ESP32) && ESP_IDF_VERSION_MAJOR < 4 + // esp-idf v3 doesn't have writev, emulate it + ssize_t ret = 0; + for (int i = 0; i < iovcnt; i++) { + ssize_t err = + this->send(reinterpret_cast(iov[i].iov_base), iov[i].iov_len, i == iovcnt - 1 ? 0 : MSG_MORE); + if (err == -1) { + if (ret != 0) + // if we already wrote some don't return an error + break; + return err; + } + ret += err; + if (err != iov[i].iov_len) + break; + } + return ret; +#else + return ::writev(fd_, iov, iovcnt); +#endif + } int setblocking(bool blocking) override { int fl = ::fcntl(fd_, F_GETFL, 0); if (blocking) { diff --git a/esphome/components/socket/headers.h b/esphome/components/socket/headers.h index da710b760e..fbe8f929a0 100644 --- a/esphome/components/socket/headers.h +++ b/esphome/components/socket/headers.h @@ -81,6 +81,11 @@ struct sockaddr_storage { }; typedef uint32_t socklen_t; +struct iovec { + void *iov_base; + size_t iov_len; +}; + #ifdef ARDUINO_ARCH_ESP8266 // arduino-esp8266 declares a global vars called INADDR_NONE/ANY which are invalid with the define #ifdef INADDR_ANY @@ -104,6 +109,7 @@ typedef uint32_t socklen_t; #include #include #include +#include #include #include #include diff --git a/esphome/components/socket/lwip_raw_tcp_impl.cpp b/esphome/components/socket/lwip_raw_tcp_impl.cpp index 39741ea7ec..3c225aeb82 100644 --- a/esphome/components/socket/lwip_raw_tcp_impl.cpp +++ b/esphome/components/socket/lwip_raw_tcp_impl.cpp @@ -371,7 +371,23 @@ class LWIPRawImpl : public Socket { return read; } - ssize_t write(const void *buf, size_t len) override { + ssize_t readv(const struct iovec *iov, int iovcnt) override { + ssize_t ret = 0; + for (int i = 0; i < iovcnt; i++) { + ssize_t err = read(reinterpret_cast(iov[i].iov_base), iov[i].iov_len); + if (err == -1) { + if (ret != 0) + // if we already read some don't return an error + break; + return err; + } + ret += err; + if (err != iov[i].iov_len) + break; + } + return ret; + } + ssize_t internal_write(const void *buf, size_t len) { if (pcb_ == nullptr) { errno = ECONNRESET; return -1; @@ -400,25 +416,60 @@ class LWIPRawImpl : public Socket { errno = ECONNRESET; return -1; } - if (tcp_nagle_disabled(pcb_)) { - LWIP_LOG("tcp_output(%p)", pcb_); - err = tcp_output(pcb_); - if (err == ERR_ABRT) { - LWIP_LOG(" -> err ERR_ABRT"); - // sometimes lwip returns ERR_ABRT for no apparent reason - // the connection works fine afterwards, and back with ESPAsyncTCP we - // indirectly also ignored this error - // FIXME: figure out where this is returned and what it means in this context - return to_send; - } - if (err != ERR_OK) { - LWIP_LOG(" -> err %d", err); - errno = ECONNRESET; - return -1; - } - } return to_send; } + int internal_output() { + LWIP_LOG("tcp_output(%p)", pcb_); + err_t err = tcp_output(pcb_); + if (err == ERR_ABRT) { + LWIP_LOG(" -> err ERR_ABRT"); + // sometimes lwip returns ERR_ABRT for no apparent reason + // the connection works fine afterwards, and back with ESPAsyncTCP we + // indirectly also ignored this error + // FIXME: figure out where this is returned and what it means in this context + return 0; + } + if (err != ERR_OK) { + LWIP_LOG(" -> err %d", err); + errno = ECONNRESET; + return -1; + } + return 0; + } + ssize_t write(const void *buf, size_t len) override { + ssize_t written = internal_write(buf, len); + if (written == -1) + return -1; + if (written == 0) + // no need to output if nothing written + return 0; + int err = internal_output(); + if (err == -1) + return -1; + return written; + } + ssize_t writev(const struct iovec *iov, int iovcnt) override { + ssize_t written = 0; + for (int i = 0; i < iovcnt; i++) { + ssize_t err = internal_write(reinterpret_cast(iov[i].iov_base), iov[i].iov_len); + if (err == -1) { + if (written != 0) + // if we already read some don't return an error + break; + return err; + } + written += err; + if (err != iov[i].iov_len) + break; + } + if (written == 0) + // no need to output if nothing written + return 0; + int err = internal_output(); + if (err == -1) + return -1; + return written; + } int setblocking(bool blocking) override { if (pcb_ == nullptr) { errno = ECONNRESET; diff --git a/esphome/components/socket/socket.h b/esphome/components/socket/socket.h index 7a5ce79161..9920610bf5 100644 --- a/esphome/components/socket/socket.h +++ b/esphome/components/socket/socket.h @@ -31,7 +31,9 @@ class Socket { virtual int setsockopt(int level, int optname, const void *optval, socklen_t optlen) = 0; virtual int listen(int backlog) = 0; virtual ssize_t read(void *buf, size_t len) = 0; + virtual ssize_t readv(const struct iovec *iov, int iovcnt) = 0; virtual ssize_t write(const void *buf, size_t len) = 0; + virtual ssize_t writev(const struct iovec *iov, int iovcnt) = 0; virtual int setblocking(bool blocking) = 0; virtual int loop() { return 0; }; };