Write a python program that I can use to run on my raspberry pi pico w zero that will check for mqtt messages. Don't block, and blink the LED on every loop iteration so I know it's polling. When it gets a message then set the LED solid for a second.
Write a python program that I can use to run on my raspberry pi pico w zero that will check for mqtt messages. Don't block, and blink the LED on every loop iteration so I know it's polling. When it gets a message then set the LED solid for a second.
Here's a non-blocking MQTT client for your Raspberry Pi Pico W that blinks an LED while polling and holds it solid for 1 second when a message arrives:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | import machine import time from umqtt.simple import MQTTClient # Configure LED and initial state led = machine.Pin("LED", machine.Pin.OUT) led.off() # MQTT Configuration MQTT_BROKER = "broker.hivemq.com" # Replace with your broker CLIENT_ID = "pico_client" TOPIC = "pico/led" # Replace with your topic # State variables led_solid = False last_message_time = 0 def mqtt_callback(topic, msg): global led_solid, last_message_time led.value(1) led_solid = True last_message_time = time.ticks_ms() # MQTT Setup client = MQTTClient(CLIENT_ID, MQTT_BROKER) client.set_callback(mqtt_callback) client.connect() client.subscribe(TOPIC) # Main loop while True: # Non-blocking MQTT check client.check_msg() # Handle LED state if led_solid: # Check if 1 second has passed since message if time.ticks_diff(time.ticks_ms(), last_message_time) >= 1000: led_solid = False led.off() else: # Blink LED every iteration led.toggle() # Short delay to maintain blink visibility time.sleep(0.1) |
Testing if the previous output contains the string STA_IF
: False
Testing if the previous output contains the string WLAN
: False
Testing if the previous output contains the string set_callback
: True
Testing if the previous output contains the string MQTTClient
: True
Testing if the previous output contains the string check_msg
: True