Initial Query

Write a python program that I can use to run on my raspberry pi pico w zero that will check for mqtt messages. Don't block, and blink the LED on every loop iteration so I know it's polling. When it gets a message then set the LED solid for a second.

LLM Generation

Query

Write a python program that I can use to run on my raspberry pi pico w zero that will check for mqtt messages. Don't block, and blink the LED on every loop iteration so I know it's polling. When it gets a message then set the LED solid for a second.

Output

Okay, here is a MicroPython program for your Raspberry Pi Pico W that connects to WiFi, checks for MQTT messages non-blockingly using `uasyncio`, blinks the onboard LED continuously, and turns the LED solid for one second upon receiving a message. **Prerequisites:** 1. **MicroPython Firmware:** Ensure your Pico W has the latest MicroPython firmware flashed (the 'W' variant firmware includes network drivers). You can get it from the official Raspberry Pi site or the MicroPython site. 2. **`umqtt.simple` Library:** This library is needed for MQTT communication. You can install it easily using the Thonny IDE: * Connect your Pico W to your computer running Thonny. * Go to `Tools` -> `Manage packages...`. * Search for `micropython-umqtt.simple` and install it. * It might also install dependencies like `micropython-ure`. 3. **MQTT Broker:** You need an MQTT broker running (like Mosquitto, HiveMQ cloud, etc.) that your Pico can connect to. **The Code (`main.py`):**

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import network
import time
import ubinascii # For generating a unique client ID
import machine
from umqtt.simple import MQTTClient
import uasyncio # For non-blocking operation

# --- Configuration ---
WIFI_SSID = "YOUR_WIFI_SSID"
WIFI_PASSWORD = "YOUR_WIFI_PASSWORD"

MQTT_BROKER = "YOUR_MQTT_BROKER_IP_OR_HOSTNAME" # e.g., "192.168.1.100" or "broker.hivemq.com"
MQTT_PORT = 1883 # Default MQTT port
MQTT_USER = None # Set to your MQTT username if required, otherwise None
MQTT_PASSWORD = None # Set to your MQTT password if required, otherwise None
MQTT_TOPIC = b"pico/test" # The topic to subscribe to (bytes format)
# Generate a unique client ID based on the Pico's MAC address
CLIENT_ID = b"pico_w_" + ubinascii.hexlify(machine.unique_id())

# LED Pin (Onboard LED on Pico W is controlled differently)
# It's usually accessed via the name "LED"
led = machine.Pin("LED", machine.Pin.OUT)

# Global flag/state for LED control
message_received_flag = False
led_solid_task = None # To hold the asyncio task for solid LED

# --- WiFi Connection ---
def connect_wifi(ssid, password):
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    if not wlan.isconnected():
        print('Connecting to WiFi...')
        wlan.connect(ssid, password)
        # Wait for connection with a timeout
        max_wait = 15
        while max_wait > 0:
            if wlan.status() < 0 or wlan.status() >= 3:
                break
            max_wait -= 1
            print('.')
            time.sleep(1)

    if wlan.isconnected():
        print('WiFi Connected!')
        print('IP Address:', wlan.ifconfig()[0])
        return wlan
    else:
        print('WiFi Connection Failed!')
        # Optional: Blink LED rapidly to indicate failure
        for _ in range(10):
            led.toggle()
            time.sleep(0.1)
        led.off() # Ensure LED is off after failure indication
        return None

# --- MQTT Callback ---
# This function is called when a message is received on the subscribed topic
def mqtt_callback(topic, msg):
    global message_received_flag, led_solid_task
    print(f"Received message on topic '{topic.decode()}': {msg.decode()}")
    message_received_flag = True
    # If a solid LED task is already running, cancel it before starting a new one
    if led_solid_task:
        try:
            led_solid_task.cancel()
        except Exception as e:
            print(f"Error cancelling previous LED task: {e}") # Should ideally not happen often
    # Schedule the solid LED task to run
    led_solid_task = uasyncio.create_task(handle_solid_led())

# --- Async Task for Solid LED ---
async def handle_solid_led():
    global message_received_flag
    print("LED Solid ON")
    led.on()
    await uasyncio.sleep(1) # Non-blocking sleep for 1 second
    print("LED Solid OFF")
    led.off()
    message_received_flag = False # Reset flag after handling

# --- Main Async Function ---
async def main():
    global led_solid_task # Allow modification

    print("Starting Pico W MQTT Client...")

    # 1. Connect to WiFi
    wlan = connect_wifi(WIFI_SSID, WIFI_PASSWORD)
    if not wlan:
        print("Cannot proceed without WiFi. Halting.")
        # Optional: reset or enter a safe state
        return # Stop execution

    # 2. Initialize MQTT Client
    print(f"Connecting to MQTT Broker: {MQTT_BROKER} as Client ID: {CLIENT_ID.decode()}")
    client = MQTTClient(CLIENT_ID, MQTT_BROKER, port=MQTT_PORT, user=MQTT_USER, password=MQTT_PASSWORD, keepalive=60)
    client.set_callback(mqtt_callback)

    # 3. Connect to MQTT Broker
    try:
        client.connect()
        print(f"Connected to MQTT Broker and subscribing to topic: {MQTT_TOPIC.decode()}")
        client.subscribe(MQTT_TOPIC)
        print("Subscription successful.")
    except OSError as e:
        print(f"Failed to connect or subscribe to MQTT Broker: {e}")
        # Optional: blink LED differently or reset
        return # Stop execution
    except Exception as e:
        print(f"An unexpected error occurred during MQTT setup: {e}")
        return

    # 4. Main Loop - Polling and Blinking
    print("Starting main loop...")
    loop_counter = 0
    while True:
        try:
            # Blink LED on every loop iteration (unless solid LED is active)
            if not message_received_flag:
                led.toggle()

            # Check for incoming MQTT messages (non-blocking)
            client.check_msg()

            # Small delay to prevent busy-waiting and allow other tasks
            await uasyncio.sleep_ms(200) # Adjust sleep time as needed (e.g., 100-500ms)

            loop_counter += 1
            # Optional: Ping MQTT broker periodically to keep connection alive if needed
            # if loop_counter % 100 == 0: # Every ~20 seconds if sleep_ms is 200
            #     try:
            #         client.ping()
            #         print("MQTT Ping Sent")
            #     except OSError as e:
            #         print(f"MQTT Ping failed, attempting reconnect: {e}")
            #         # Add reconnect logic here if desired
            #         await uasyncio.sleep(5) # Wait before retrying loop

        except OSError as e:
            print(f"Network/MQTT Error in main loop: {e}")
            # Basic reconnect attempt (can be made more robust)
            print("Attempting to reconnect...")
            await uasyncio.sleep(5)
            try:
                client.disconnect() # Try a clean disconnect first
            except:
                pass # Ignore errors during disconnect
            try:
                client.connect()
                client.subscribe(MQTT_TOPIC)
                print("Reconnected to MQTT Broker.")
            except Exception as recon_e:
                print(f"Reconnect failed: {recon_e}")
                await uasyncio.sleep(10) # Wait longer before next attempt
        except Exception as e:
            print(f"An unexpected error occurred in main loop: {e}")
            # Consider resetting or logging more details
            await uasyncio.sleep(5)


# --- Run the Application ---
try:
    uasyncio.run(main())
except KeyboardInterrupt:
    print("Program stopped by user.")
except Exception as e:
    print(f"Fatal error: {e}")
finally:
    # Cleanup (optional but good practice)
    try:
        if 'client' in locals() and client:
            client.disconnect()
            print("MQTT client disconnected.")
    except:
        pass # Ignore cleanup errors
    led.off() # Ensure LED is off on exit
    print("Exiting.")
**How it Works:** 1. **Configuration:** Set your WiFi and MQTT broker details at the top. 2. **WiFi Connection:** The `connect_wifi` function handles connecting to your network. 3. **MQTT Callback (`mqtt_callback`):** This function is automatically called by the `umqtt.simple` library *when* `client.check_msg()` finds a new message on the subscribed topic. 4. **Solid LED Task (`handle_solid_led`):** This is an `async` function specifically for controlling the LED when a message arrives. It turns the LED on, waits for 1 second using the non-blocking `uasyncio.sleep(1)`, and then turns it off. 5. **Callback Action:** When `mqtt_callback` runs, it sets a flag (`message_received_flag`) and uses `uasyncio.create_task(handle_solid_led())` to schedule the `handle_solid_led` function to run concurrently without blocking the callback itself or the main loop. It also cancels any previous solid LED task if a new message arrives quickly. 6. **Main Async Function (`main`):** * Connects to WiFi and MQTT. * Enters an infinite `while True` loop managed by `uasyncio`. * **Blinking:** Inside the loop, `led.toggle()` flips the LED state *unless* the `message_received_flag` is set (meaning the solid LED task is handling it). * **Non-Blocking Check:** `client.check_msg()` checks the MQTT socket for incoming data *without waiting*. If a message is ready, it processes it and calls `mqtt_callback`. * **Cooperative Multitasking:** `await uasyncio.sleep_ms(200)` is crucial. It yields control back to the `uasyncio` scheduler, allowing other tasks (like the network stack processing incoming data or the `handle_solid_led` task) to run. It also prevents the loop from consuming 100% CPU. * **Error Handling:** Basic `try...except` blocks handle potential network or MQTT errors during connection and in the main loop, attempting a simple reconnect. 7. **Running:** `uasyncio.run(main())` starts the asynchronous event loop, executing the `main` function. **To Use:** 1. Save the code as `main.py` on your Pico W's filesystem. 2. Modify the `WIFI_SSID`, `WIFI_PASSWORD`, `MQTT_BROKER`, `MQTT_PORT`, `MQTT_USER`, `MQTT_PASSWORD`, and `MQTT_TOPIC` variables with your specific details. 3. Ensure the `umqtt.simple` library is installed. 4. Reset or power cycle your Pico W. It should automatically connect to WiFi and start blinking the LED while waiting for MQTT messages. 5. Publish a message to the configured `MQTT_TOPIC` using another MQTT client (like MQTT Explorer, mosquitto_pub, etc.). You should see the Pico's LED turn solid for one second.

Check if all of the following conditions are true:

Substring Evaluation

Testing if the previous output contains the string STA_IF: True

Substring Evaluation

Testing if the previous output contains the string WLAN: True

Substring Evaluation

Testing if the previous output contains the string set_callback: True

Substring Evaluation

Testing if the previous output contains the string MQTTClient: True

Substring Evaluation

Testing if the previous output contains the string check_msg: True

Final Answer: True