#include "mqtt.h" namespace config { // Values should be defined in config.h uint16_t sending_interval = MQTT_SENDING_INTERVAL; // [s] //INFO: Listen to every CO2 sensor which is connected to the server: // mosquitto_sub -h MQTT_SERVER -t 'CO2sensors/#' -p 443 --capath /etc/ssl/certs/ -u "MQTT_USER" -P "MQTT_PASSWORD" -v const char *mqtt_server = MQTT_SERVER; const uint16_t mqtt_port = MQTT_PORT; const char *mqtt_user = MQTT_USER; const char *mqtt_password = MQTT_PASSWORD; const bool allow_mqtt_commands = ALLOW_MQTT_COMMANDS; const unsigned long wait_after_fail = 900; // [s] Wait 15 minutes after an MQTT connection fail, before trying again. } #if defined(ESP32) # include #endif WiFiClientSecure espClient; PubSubClient mqttClient(espClient); namespace mqtt { unsigned long last_sent_at = 0; unsigned long last_failed_at = 0; bool connected = false; String publish_topic; const char *json_sensor_format; String last_successful_publish = ""; void initialize(String &topic) { json_sensor_format = PSTR("{\"time\":\"%s\", \"co2\":%d, \"temp\":%.1f, \"rh\":%.1f}"); publish_topic = topic; #if defined(ESP8266) espClient.setInsecure(); // Sorry, we don't want to flash the sensors every 3 months. #endif // mqttClient.setSocketTimeout(config::mqtt_timeout); //NOTE: somehow doesn't seem to have any effect on connect() mqttClient.setServer(config::mqtt_server, config::mqtt_port); } void publish(const String ×tamp, int16_t co2, float temperature, float humidity) { if (WiFi.status() == WL_CONNECTED && mqttClient.connected()) { led_effects::onBoardLEDOn(); Serial.print(F("MQTT - Publishing message ... ")); char payload[75]; // Should be enough for json... snprintf(payload, sizeof(payload), json_sensor_format, timestamp.c_str(), co2, temperature, humidity); // Topic is the same as clientID. e.g. 'CO2sensors/ESP3d03da' if (mqttClient.publish(publish_topic.c_str(), payload)) { Serial.println(F("OK")); last_successful_publish = ntp::getLocalTime(); } else { Serial.println(F("Failed.")); } led_effects::onBoardLEDOff(); } } void setTimer(String messageString) { messageString.replace("timer ", ""); int timestep = messageString.toInt(); if (timestep >= 2 && timestep <= 1800) { Serial.print(F("Setting Measurement Interval to : ")); Serial.print(timestep); Serial.println("s."); sensor::scd30.setMeasurementInterval(messageString.toInt()); config::measurement_timestep = messageString.toInt(); led_effects::showKITTWheel(color::green, 1); } } void setMQTTinterval(String messageString) { messageString.replace("mqtt ", ""); config::sending_interval = messageString.toInt(); Serial.print(F("Setting Sending Interval to : ")); Serial.print(config::sending_interval); Serial.println("s."); led_effects::showKITTWheel(color::green, 1); } #ifdef AMPEL_CSV void setCSVinterval(String messageString) { messageString.replace("csv ", ""); config::csv_interval = messageString.toInt(); Serial.print(F("Setting CSV Interval to : ")); Serial.print(config::csv_interval); Serial.println("s."); led_effects::showKITTWheel(color::green, 1); } #endif void calibrateSensorToSpecificPPM(String messageString) { messageString.replace("calibrate ", ""); long int calibrationLevel = messageString.toInt(); if (calibrationLevel >= 400 && calibrationLevel <= 2000) { Serial.print(F("Force calibration, at ")); config::co2_calibration_level = messageString.toInt(); Serial.print(config::co2_calibration_level); Serial.println(" ppm."); sensor::startCalibrationProcess(); } } void setCO2forDebugging(String messageString) { Serial.print(F("DEBUG. Setting CO2 to ")); messageString.replace("co2 ", ""); sensor::co2 = messageString.toInt(); Serial.println(sensor::co2); } void sendInfoAboutLocalNetwork() { char info_topic[60]; // Should be enough for "CO2sensors/ESPd03cc5/info" snprintf(info_topic, sizeof(info_topic), "%s/info", publish_topic.c_str()); char payload[75]; // Should be enough for info json... const char *json_info_format = PSTR("{\"local_ip\":\"%s\", \"ssid\":\"%s\"}"); snprintf(payload, sizeof(payload), json_info_format, WiFi.localIP().toString().c_str(), WiFi.SSID().c_str()); mqttClient.publish(info_topic, payload); } /** * Allows sensor to be controlled by commands over MQTT * * mosquitto_pub -h MQTT_SERVER -t 'CO2sensors/SENSOR_ID/control' -p 443 --capath /etc/ssl/certs/ -u "MQTT_USER" -P "MQTT_PASSWORD" -m "reset" * mosquitto_pub -h MQTT_SERVER -t 'CO2sensors/SENSOR_ID/control' -p 443 --capath /etc/ssl/certs/ -u "MQTT_USER" -P "MQTT_PASSWORD" -m "timer 30" * mosquitto_pub -h MQTT_SERVER -t 'CO2sensors/SENSOR_ID/control' -p 443 --capath /etc/ssl/certs/ -u "MQTT_USER" -P "MQTT_PASSWORD" -m "mqtt 900" * mosquitto_pub -h MQTT_SERVER -t 'CO2sensors/SENSOR_ID/control' -p 443 --capath /etc/ssl/certs/ -u "MQTT_USER" -P "MQTT_PASSWORD" -m "calibrate 700" */ void controlSensorCallback(char *sub_topic, byte *message, unsigned int length) { if (length == 0) { return; } led_effects::onBoardLEDOn(); Serial.print(F("Message arrived on topic: ")); Serial.print(sub_topic); Serial.print(F(". Message: '")); String messageString; for (unsigned int i = 0; i < length; i++) { Serial.print((char) message[i]); messageString += (char) message[i]; } Serial.println("'."); if (messageString.startsWith("co2 ")) { setCO2forDebugging(messageString); } else if (messageString.startsWith("timer ")) { setTimer(messageString); } else if (messageString == "calibrate") { sensor::startCalibrationProcess(); } else if (messageString.startsWith("calibrate ")) { calibrateSensorToSpecificPPM(messageString); } else if (messageString.startsWith("mqtt ")) { setMQTTinterval(messageString); } else if (messageString == "publish") { Serial.println(F("Forcing MQTT publish now.")); publish(sensor::timestamp, sensor::co2, sensor::temperature, sensor::humidity); #ifdef AMPEL_CSV } else if (messageString.startsWith("csv ")) { setCSVinterval(messageString); } else if (messageString == "format_filesystem") { FS_LIB.format(); led_effects::showKITTWheel(color::blue, 2); #endif } else if (messageString == "night_mode") { led_effects::toggleNightMode(); } else if (messageString == "local_ip") { sendInfoAboutLocalNetwork(); } else if (messageString == "reset") { ESP.restart(); // softer than ESP.reset() } else { led_effects::showKITTWheel(color::red, 1); Serial.println(F("Message not supported. Doing nothing.")); } delay(50); led_effects::onBoardLEDOff(); } void reconnect() { if (last_failed_at > 0 && (seconds() - last_failed_at < config::wait_after_fail)) { // It failed less than wait_after_fail ago. Not even trying. return; } if (WiFi.status() != WL_CONNECTED) { //NOTE: Sadly, WiFi.status is sometimes WL_CONNECTED even though it's really not // No WIFI return; } Serial.print(F("MQTT - Attempting connection...")); led_effects::onBoardLEDOn(); // Wait for connection, at most 15s (default) mqttClient.connect(publish_topic.c_str(), config::mqtt_user, config::mqtt_password); led_effects::onBoardLEDOff(); connected = mqttClient.connected(); if (connected) { if (config::allow_mqtt_commands) { char control_topic[60]; // Should be enough for "CO2sensors/ESPd03cc5/control" snprintf(control_topic, sizeof(control_topic), "%s/control", publish_topic.c_str()); mqttClient.subscribe(control_topic); mqttClient.setCallback(controlSensorCallback); } Serial.println(F(" Connected.")); last_failed_at = 0; } else { last_failed_at = seconds(); Serial.print(F(" Failed! Error code=")); Serial.print(mqttClient.state()); Serial.print(F(". Will try again in ")); Serial.print(config::wait_after_fail); Serial.println("s."); } } void publishIfTimeHasCome(const String &timeStamp, const int16_t &co2, const float &temp, const float &hum) { // Send message via MQTT according to sending interval unsigned long now = seconds(); if (now - last_sent_at > config::sending_interval) { last_sent_at = now; publish(timeStamp, co2, temp, hum); } } void keepConnection() { // Keep MQTT connection if (!mqttClient.connected()) { reconnect(); } mqttClient.loop(); } }