Write a python program that I can use to run on my raspberry pi pico w zero that will check for mqtt messages. Don't block, and blink the LED on every loop iteration so I know it's polling. When it gets a message then set the LED solid for a second.
Write a python program that I can use to run on my raspberry pi pico w zero that will check for mqtt messages. Don't block, and blink the LED on every loop iteration so I know it's polling. When it gets a message then set the LED solid for a second.
# MQTT Message Monitor for Raspberry Pi Pico W Here's a Python program for your Raspberry Pi Pico W that will check for MQTT messages while blinking the onboard LED to indicate it's polling. When a message is received, it will set the LED solid for one second.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 | import network import time from machine import Pin from umqtt.simple import MQTTClient import random # Configure the onboard LED led = Pin("LED", Pin.OUT) # WiFi settings - replace with your network details WIFI_SSID = "YOUR_WIFI_SSID" WIFI_PASSWORD = "YOUR_WIFI_PASSWORD" # MQTT settings - replace with your broker details MQTT_BROKER = "YOUR_MQTT_BROKER_ADDRESS" # e.g., "192.168.1.100" or "broker.hivemq.com" MQTT_PORT = 1883 MQTT_CLIENT_ID = f"pico_w_{random.randint(0, 1000000)}" MQTT_TOPIC = b"YOUR_TOPIC" # e.g., b"home/sensors/temperature" # Global variable to track message reception message_received = False message_time = 0 def connect_wifi(): """Connect to WiFi network""" wlan = network.WLAN(network.STA_IF) wlan.active(True) if not wlan.isconnected(): print(f"Connecting to {WIFI_SSID}...") wlan.connect(WIFI_SSID, WIFI_PASSWORD) # Wait for connection with timeout max_wait = 10 while max_wait > 0: if wlan.isconnected(): break max_wait -= 1 print("Waiting for connection...") time.sleep(1) if wlan.isconnected(): print("Connected to WiFi") print("IP:", wlan.ifconfig()[0]) else: print("Failed to connect to WiFi") raise RuntimeError("Network connection failed") def mqtt_callback(topic, msg): """Callback function for when a message is received""" global message_received, message_time print(f"Message received on {topic}: {msg}") message_received = True message_time = time.time() def setup_mqtt(): """Set up and connect MQTT client""" client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, MQTT_PORT) client.set_callback(mqtt_callback) try: client.connect() print(f"Connected to MQTT broker at {MQTT_BROKER}") client.subscribe(MQTT_TOPIC) print(f"Subscribed to topic: {MQTT_TOPIC}") return client except Exception as e: print(f"Failed to connect to MQTT broker: {e}") raise def main(): """Main program loop""" global message_received, message_time # Setup initial connections connect_wifi() mqtt_client = setup_mqtt() # Variables for LED blinking last_blink = 0 blink_interval = 500 # milliseconds led_state = False print("Monitoring for MQTT messages...") while True: # Non-blocking check for MQTT messages try: mqtt_client.check_msg() except Exception as e: print(f"MQTT error: {e}") # Try to reconnect time.sleep(5) try: mqtt_client = setup_mqtt() except: pass continue current_time = time.ticks_ms() # If message received within the last second, keep LED solid if message_received and (time.time() - message_time) < 1: led.value(1) # Solid LED else: message_received = False # Blink LED to show we're polling if time.ticks_diff(current_time, last_blink) >= blink_interval: led_state = not led_state led.value(led_state) last_blink = current_time # Small delay to prevent CPU overuse time.sleep_ms(10) if __name__ == "__main__": main() |
1 2 3 | import upip upip.install("micropython-umqtt.simple") |
Testing if the previous output contains the string STA_IF
: True
Testing if the previous output contains the string WLAN
: True
Testing if the previous output contains the string set_callback
: True
Testing if the previous output contains the string MQTTClient
: True
Testing if the previous output contains the string check_msg
: True
Write a python program that I can use to run on my raspberry pi pico w zero that will check for mqtt messages. Don't block, and blink the LED on every loop iteration so I know it's polling. When it gets a message then set the LED solid for a second.
Write a python program that I can use to run on my raspberry pi pico w zero that will check for mqtt messages. Don't block, and blink the LED on every loop iteration so I know it's polling. When it gets a message then set the LED solid for a second.
# MQTT Client with LED Indicator for Raspberry Pi Pico W Here's a Python program for your Raspberry Pi Pico W that checks for MQTT messages while blinking the LED to indicate it's running:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | import network import time from machine import Pin from umqtt.simple import MQTTClient import random # WiFi configuration (update these with your credentials) WIFI_SSID = "YOUR_WIFI_SSID" WIFI_PASSWORD = "YOUR_WIFI_PASSWORD" # MQTT configuration (update these with your broker details) MQTT_BROKER = "YOUR_MQTT_BROKER_ADDRESS" MQTT_PORT = 1883 MQTT_CLIENT_ID = f"pico_client_{random.randint(0, 1000)}" MQTT_TOPIC = b"pico/messages" # Initialize the onboard LED led = Pin("LED", Pin.OUT) # Global flag for message reception message_received = False message_time = 0 # Connect to WiFi def connect_wifi(): wlan = network.WLAN(network.STA_IF) wlan.active(True) if not wlan.isconnected(): print("Connecting to WiFi...") wlan.connect(WIFI_SSID, WIFI_PASSWORD) # Wait for connection with timeout max_wait = 10 while max_wait > 0: if wlan.isconnected(): break max_wait -= 1 print(".", end="") time.sleep(1) print() if wlan.isconnected(): print("Connected to WiFi") print("IP:", wlan.ifconfig()[0]) return True else: print("Failed to connect to WiFi") return False return True # MQTT message callback def mqtt_callback(topic, msg): global message_received, message_time print(f"Message received on {topic.decode()}: {msg.decode()}") # Set the message received flag message_received = True message_time = time.time() # Connect to MQTT broker def connect_mqtt(): try: client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, port=MQTT_PORT) client.set_callback(mqtt_callback) client.connect() client.subscribe(MQTT_TOPIC) print(f"Connected to MQTT broker and subscribed to {MQTT_TOPIC.decode()}") return client except Exception as e: print("Failed to connect to MQTT broker:", e) return None def main(): global message_received, message_time # Connect to WiFi if not connect_wifi(): while True: # Failed to connect to WiFi, blink rapidly to indicate error led.toggle() time.sleep(0.1) # Connect to MQTT broker mqtt_client = connect_mqtt() if not mqtt_client: while True: # Failed to connect to MQTT, blink in a pattern to indicate error led.on() time.sleep(0.1) led.off() time.sleep(0.1) led.on() time.sleep(0.1) led.off() time.sleep(0.5) # Main loop variables led_state = 0 last_blink_time = time.time() last_wifi_check_time = time.time() blink_interval = 0.2 # Blink every 0.2 seconds wifi_check_interval = 30 # Check WiFi connection every 30 seconds print("Starting MQTT polling loop...") try: while True: current_time = time.time() # Check WiFi connection periodically if current_time - last_wifi_check_time >= wifi_check_interval: wlan = network.WLAN(network.STA_IF) if not wlan.isconnected(): print("WiFi disconnected. Reconnecting...") if connect_wifi(): mqtt_client = connect_mqtt() last_wifi_check_time = current_time # Check for MQTT messages (non-blocking) if mqtt_client: try: mqtt_client.check_msg() except Exception as e: print("MQTT Error:", e) # Try to reconnect mqtt_client = connect_mqtt() # Handle LED state if message_received: # Solid LED for 1 second after message received led.on() # Check if 1 second has passed since message received if current_time - message_time >= 1.0: message_received = False else: # Normal blinking mode to indicate polling if current_time - last_blink_time >= blink_interval: led.toggle() # Toggle LED state last_blink_time = current_time # Small delay to prevent CPU overuse time.sleep(0.01) except KeyboardInterrupt: print("Program stopped by user") except Exception as e: print("Unexpected error:", e) finally: # Clean up if mqtt_client: mqtt_client.disconnect() led.off() if __name__ == "__main__": main() |
Testing if the previous output contains the string STA_IF
: True
Testing if the previous output contains the string WLAN
: True
Testing if the previous output contains the string set_callback
: True
Testing if the previous output contains the string MQTTClient
: True
Testing if the previous output contains the string check_msg
: True