Initial Query

Write a python program that I can use to run on my raspberry pi pico w zero that will check for mqtt messages. Don't block, and blink the LED on every loop iteration so I know it's polling. When it gets a message then set the LED solid for a second.

LLM Generation

Query

Write a python program that I can use to run on my raspberry pi pico w zero that will check for mqtt messages. Don't block, and blink the LED on every loop iteration so I know it's polling. When it gets a message then set the LED solid for a second.

Output

# MQTT Message Monitor for Raspberry Pi Pico W Here's a Python program for your Raspberry Pi Pico W that will check for MQTT messages while blinking the onboard LED to indicate it's polling. When a message is received, it will set the LED solid for one second.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import network
import time
from machine import Pin
from umqtt.simple import MQTTClient
import random

# Configure the onboard LED
led = Pin("LED", Pin.OUT)

# WiFi settings - replace with your network details
WIFI_SSID = "YOUR_WIFI_SSID"
WIFI_PASSWORD = "YOUR_WIFI_PASSWORD"

# MQTT settings - replace with your broker details
MQTT_BROKER = "YOUR_MQTT_BROKER_ADDRESS"  # e.g., "192.168.1.100" or "broker.hivemq.com"
MQTT_PORT = 1883
MQTT_CLIENT_ID = f"pico_w_{random.randint(0, 1000000)}"
MQTT_TOPIC = b"YOUR_TOPIC"  # e.g., b"home/sensors/temperature"

# Global variable to track message reception
message_received = False
message_time = 0

def connect_wifi():
    """Connect to WiFi network"""
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    
    if not wlan.isconnected():
        print(f"Connecting to {WIFI_SSID}...")
        wlan.connect(WIFI_SSID, WIFI_PASSWORD)
        
        # Wait for connection with timeout
        max_wait = 10
        while max_wait > 0:
            if wlan.isconnected():
                break
            max_wait -= 1
            print("Waiting for connection...")
            time.sleep(1)
        
        if wlan.isconnected():
            print("Connected to WiFi")
            print("IP:", wlan.ifconfig()[0])
        else:
            print("Failed to connect to WiFi")
            raise RuntimeError("Network connection failed")

def mqtt_callback(topic, msg):
    """Callback function for when a message is received"""
    global message_received, message_time
    print(f"Message received on {topic}: {msg}")
    message_received = True
    message_time = time.time()

def setup_mqtt():
    """Set up and connect MQTT client"""
    client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, MQTT_PORT)
    client.set_callback(mqtt_callback)
    try:
        client.connect()
        print(f"Connected to MQTT broker at {MQTT_BROKER}")
        client.subscribe(MQTT_TOPIC)
        print(f"Subscribed to topic: {MQTT_TOPIC}")
        return client
    except Exception as e:
        print(f"Failed to connect to MQTT broker: {e}")
        raise

def main():
    """Main program loop"""
    global message_received, message_time
    
    # Setup initial connections
    connect_wifi()
    mqtt_client = setup_mqtt()
    
    # Variables for LED blinking
    last_blink = 0
    blink_interval = 500  # milliseconds
    led_state = False
    
    print("Monitoring for MQTT messages...")
    
    while True:
        # Non-blocking check for MQTT messages
        try:
            mqtt_client.check_msg()
        except Exception as e:
            print(f"MQTT error: {e}")
            # Try to reconnect
            time.sleep(5)
            try:
                mqtt_client = setup_mqtt()
            except:
                pass
            continue
        
        current_time = time.ticks_ms()
        
        # If message received within the last second, keep LED solid
        if message_received and (time.time() - message_time) < 1:
            led.value(1)  # Solid LED
        else:
            message_received = False
            
            # Blink LED to show we're polling
            if time.ticks_diff(current_time, last_blink) >= blink_interval:
                led_state = not led_state
                led.value(led_state)
                last_blink = current_time
        
        # Small delay to prevent CPU overuse
        time.sleep_ms(10)

if __name__ == "__main__":
    main()
## Installation Instructions 1. Make sure you have MicroPython installed on your Pico W 2. Install the `umqtt.simple` library if not already installed:
1
2
3
   import upip
   upip.install("micropython-umqtt.simple")
   
3. Modify the WiFi and MQTT settings in the code to match your environment 4. Save this file as `main.py` on your Pico W 5. Reset your Pico W to run the program ## How It Works - The LED will blink approximately once per second during normal operation - When an MQTT message is received, the LED will stay solid for 1 second - The program runs in a non-blocking loop, checking for messages continuously - If the connection to the broker is lost, it will attempt to reconnect This program provides visual feedback through the LED to confirm the device is running properly and can detect when messages arrive.

Check if all of the following conditions are true:

Substring Evaluation

Testing if the previous output contains the string STA_IF: True

Substring Evaluation

Testing if the previous output contains the string WLAN: True

Substring Evaluation

Testing if the previous output contains the string set_callback: True

Substring Evaluation

Testing if the previous output contains the string MQTTClient: True

Substring Evaluation

Testing if the previous output contains the string check_msg: True

Final Answer: True

Initial Query

Write a python program that I can use to run on my raspberry pi pico w zero that will check for mqtt messages. Don't block, and blink the LED on every loop iteration so I know it's polling. When it gets a message then set the LED solid for a second.

LLM Generation

Query

Write a python program that I can use to run on my raspberry pi pico w zero that will check for mqtt messages. Don't block, and blink the LED on every loop iteration so I know it's polling. When it gets a message then set the LED solid for a second.

Output

# MQTT Client with LED Indicator for Raspberry Pi Pico W Here's a Python program for your Raspberry Pi Pico W that checks for MQTT messages while blinking the LED to indicate it's running:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import network
import time
from machine import Pin
from umqtt.simple import MQTTClient
import random

# WiFi configuration (update these with your credentials)
WIFI_SSID = "YOUR_WIFI_SSID"
WIFI_PASSWORD = "YOUR_WIFI_PASSWORD"

# MQTT configuration (update these with your broker details)
MQTT_BROKER = "YOUR_MQTT_BROKER_ADDRESS"
MQTT_PORT = 1883
MQTT_CLIENT_ID = f"pico_client_{random.randint(0, 1000)}"
MQTT_TOPIC = b"pico/messages"

# Initialize the onboard LED
led = Pin("LED", Pin.OUT)

# Global flag for message reception
message_received = False
message_time = 0

# Connect to WiFi
def connect_wifi():
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    
    if not wlan.isconnected():
        print("Connecting to WiFi...")
        wlan.connect(WIFI_SSID, WIFI_PASSWORD)
        
        # Wait for connection with timeout
        max_wait = 10
        while max_wait > 0:
            if wlan.isconnected():
                break
            max_wait -= 1
            print(".", end="")
            time.sleep(1)
        print()
        
        if wlan.isconnected():
            print("Connected to WiFi")
            print("IP:", wlan.ifconfig()[0])
            return True
        else:
            print("Failed to connect to WiFi")
            return False
    return True

# MQTT message callback
def mqtt_callback(topic, msg):
    global message_received, message_time
    print(f"Message received on {topic.decode()}: {msg.decode()}")
    
    # Set the message received flag
    message_received = True
    message_time = time.time()

# Connect to MQTT broker
def connect_mqtt():
    try:
        client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, port=MQTT_PORT)
        client.set_callback(mqtt_callback)
        client.connect()
        client.subscribe(MQTT_TOPIC)
        print(f"Connected to MQTT broker and subscribed to {MQTT_TOPIC.decode()}")
        return client
    except Exception as e:
        print("Failed to connect to MQTT broker:", e)
        return None

def main():
    global message_received, message_time
    
    # Connect to WiFi
    if not connect_wifi():
        while True:
            # Failed to connect to WiFi, blink rapidly to indicate error
            led.toggle()
            time.sleep(0.1)
    
    # Connect to MQTT broker
    mqtt_client = connect_mqtt()
    if not mqtt_client:
        while True:
            # Failed to connect to MQTT, blink in a pattern to indicate error
            led.on()
            time.sleep(0.1)
            led.off()
            time.sleep(0.1)
            led.on()
            time.sleep(0.1)
            led.off()
            time.sleep(0.5)
    
    # Main loop variables
    led_state = 0
    last_blink_time = time.time()
    last_wifi_check_time = time.time()
    blink_interval = 0.2  # Blink every 0.2 seconds
    wifi_check_interval = 30  # Check WiFi connection every 30 seconds
    
    print("Starting MQTT polling loop...")
    try:
        while True:
            current_time = time.time()
            
            # Check WiFi connection periodically
            if current_time - last_wifi_check_time >= wifi_check_interval:
                wlan = network.WLAN(network.STA_IF)
                if not wlan.isconnected():
                    print("WiFi disconnected. Reconnecting...")
                    if connect_wifi():
                        mqtt_client = connect_mqtt()
                last_wifi_check_time = current_time
            
            # Check for MQTT messages (non-blocking)
            if mqtt_client:
                try:
                    mqtt_client.check_msg()
                except Exception as e:
                    print("MQTT Error:", e)
                    # Try to reconnect
                    mqtt_client = connect_mqtt()
            
            # Handle LED state
            if message_received:
                # Solid LED for 1 second after message received
                led.on()
                
                # Check if 1 second has passed since message received
                if current_time - message_time >= 1.0:
                    message_received = False
            else:
                # Normal blinking mode to indicate polling
                if current_time - last_blink_time >= blink_interval:
                    led.toggle()  # Toggle LED state
                    last_blink_time = current_time
            
            # Small delay to prevent CPU overuse
            time.sleep(0.01)
    
    except KeyboardInterrupt:
        print("Program stopped by user")
    except Exception as e:
        print("Unexpected error:", e)
    finally:
        # Clean up
        if mqtt_client:
            mqtt_client.disconnect()
        led.off()

if __name__ == "__main__":
    main()
## How to Use This Program: 1. **Update Configuration**: Replace the placeholders for WiFi and MQTT settings with your own: - `YOUR_WIFI_SSID` and `YOUR_WIFI_PASSWORD` with your WiFi credentials - `YOUR_MQTT_BROKER_ADDRESS` with your MQTT broker's IP or hostname 2. **Dependencies**: Ensure you have the `umqtt.simple` library installed: - In Thonny IDE, go to Tools > Manage packages - Search for and install `micropython-umqtt.simple` 3. **Behavior**: - The LED will blink rapidly while the program is running and checking for MQTT messages - When a message is received, the LED will stay on solid for 1 second - The program automatically handles WiFi disconnections and reconnections 4. **Testing**: - Send a test message to the "pico/messages" topic (or change the topic in the code) - Observe the LED turning solid for 1 second when a message is received This program is designed to be non-blocking, so it will continue checking for new messages even while controlling the LED.

Check if all of the following conditions are true:

Substring Evaluation

Testing if the previous output contains the string STA_IF: True

Substring Evaluation

Testing if the previous output contains the string WLAN: True

Substring Evaluation

Testing if the previous output contains the string set_callback: True

Substring Evaluation

Testing if the previous output contains the string MQTTClient: True

Substring Evaluation

Testing if the previous output contains the string check_msg: True

Final Answer: True