mqttasgi

MQTT ASGI protocol server for Django — bridge MQTT messages to Django Channels consumers with full ORM, Channel Layers, and testing support. The perfect backbone for your home automation projects, IoT pipelines, and real-time device integrations.

Safety Notice

This listing is from the official public ClawHub registry. Review SKILL.md and referenced scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "mqttasgi" with this command: npx skills add sivulich/mqttasgi

mqttasgi

mqttasgi is an ASGI protocol server that bridges MQTT (via paho-mqtt) and Django Channels, inspired by Daphne. It lets Django consumers subscribe/publish to MQTT topics with full ORM and Channel Layers support.

Supports: Django 3.2–5.x · Channels 3.x–4.x · paho-mqtt 1.x and 2.x · Python 3.9–3.13

Installation

pip install mqttasgi

Running the server

mqttasgi -H localhost -p 1883 my_application.asgi:application
ParameterEnv variableDefaultPurpose
-H / --hostMQTT_HOSTNAMElocalhostMQTT broker host
-p / --portMQTT_PORT1883MQTT broker port
-U / --usernameMQTT_USERNAMEBroker username
-P / --passwordMQTT_PASSWORDBroker password
-c / --cleansessionMQTT_CLEANTrueMQTT clean session
-v / --verbosityVERBOSITY0Logging level (0–2)
-i / --idMQTT_CLIENT_IDMQTT client ID
-C / --certTLS_CERTTLS certificate
-K / --keyTLS_KEYTLS key
-S / --cacertTLS_CATLS CA certificate
-SSL / --use-sslMQTT_USE_SSLFalseSSL without cert auth
-T / --transportMQTT_TRANSPORTtcpTransport: tcp or websockets
-r / --retriesMQTT_RETRIES3Reconnect retries (0 = unlimited)

All parameters can also be set via a .env file at the project root. CLI args take precedence over env vars.

asgi.py setup

import os
import django
from channels.routing import ProtocolTypeRouter
from my_application.consumers import MyMqttConsumer
from django.core.asgi import get_asgi_application

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_application.settings')
django.setup()

application = ProtocolTypeRouter({
    'http': get_asgi_application(),
    'mqtt': MyMqttConsumer.as_asgi(),
})

Writing a consumer

from mqttasgi.consumers import MqttConsumer

class MyMqttConsumer(MqttConsumer):

    async def connect(self):
        """Called when connected to the broker. Subscribe here."""
        await self.subscribe('my/topic', qos=2)

    async def receive(self, mqtt_message):
        """Called for each incoming MQTT message."""
        topic   = mqtt_message['topic']
        payload = mqtt_message['payload']   # bytes
        qos     = mqtt_message['qos']
        await self.publish('response/topic', payload, qos=1, retain=False)

    async def disconnect(self):
        """Called on broker disconnect. Clean up here."""
        await self.unsubscribe('my/topic')

Consumer API

MethodDescription
await self.subscribe(topic, qos)Subscribe to an MQTT topic
await self.unsubscribe(topic)Unsubscribe from an MQTT topic
await self.publish(topic, payload, qos=1, retain=False)Publish an MQTT message
self.scopeASGI scope dict (includes app_id, instance_type, and any consumer_parameters)

Channel Layers

# Outside the consumer (e.g. Django view or management command)
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
    "my.group",
    {"type": "my.custom.message", "text": "Hello from outside"}
)

# Inside the consumer
class MyMqttConsumer(MqttConsumer):

    async def connect(self):
        await self.subscribe('my/topic', qos=2)
        await self.channel_layer.group_add("my.group", self.channel_name)

    async def my_custom_message(self, event):
        # Handler name must match the `type` field (dots become underscores)
        print('Channel layer message:', event)

    async def receive(self, mqtt_message): ...
    async def disconnect(self): ...

Multiple workers (experimental)

Only the master consumer (instance_type='master', app_id=0) may spawn or kill workers.

class MasterConsumer(MqttConsumer):

    async def connect(self):
        # Spawn a worker with a unique app_id
        await self.spawn_worker(
            app_id=1,
            consumer_path='my_application.consumers.WorkerConsumer',
            consumer_params={'device_id': 'sensor-01'},
        )

    async def receive(self, mqtt_message):
        if condition:
            await self.kill_worker(app_id=1)

    async def disconnect(self): ...

Testing (no broker required)

MqttComunicator drives consumers directly through the ASGI interface — no running broker needed.

pytest.ini

[pytest]
asyncio_mode = auto

tests/conftest.py

import django
from django.conf import settings

def pytest_configure(config):
    if not settings.configured:
        settings.configure(
            SECRET_KEY='test-secret-key',
            INSTALLED_APPS=['channels'],
            DATABASES={},
            CHANNEL_LAYERS={
                'default': {
                    'BACKEND': 'channels.layers.InMemoryChannelLayer',
                }
            },
        )
        django.setup()

Writing tests

import pytest
from mqttasgi.testing import MqttComunicator  # note: one 'm' in Comunicator
from my_application.consumers import MyMqttConsumer

async def test_subscribe_on_connect():
    comm = MqttComunicator(MyMqttConsumer.as_asgi(), app_id=1)
    response = await comm.connect()          # returns first message from consumer
    assert response['type'] == 'mqtt.sub'
    assert response['mqtt']['topic'] == 'my/topic'
    await comm.disconnect()

async def test_publish_on_message():
    comm = MqttComunicator(MyMqttConsumer.as_asgi(), app_id=1)
    await comm.connect()
    await comm.publish('my/topic', b'hello', qos=1)
    response = await comm.receive_from()    # next message from consumer
    assert response['type'] == 'mqtt.pub'
    assert response['mqtt']['payload'] == b'hello'
    await comm.disconnect()

MqttComunicator API

MethodDescription
MqttComunicator(app, app_id, instance_type='worker', consumer_parameters=None)Create communicator
await comm.connect(timeout=1)Send mqtt.connect; returns first consumer response
await comm.publish(topic, payload, qos)Send mqtt.msg event to the consumer
await comm.receive_from(timeout=1)Receive next message the consumer sent
await comm.disconnect(timeout=1)Send mqtt.disconnect and wait for shutdown

Consumer responses have this shape:

{
    'type': 'mqtt.sub',   # or mqtt.pub / mqtt.usub
    'mqtt': {
        'topic': 'my/topic',
        'payload': b'...',   # only for mqtt.pub
        'qos': 1,
    }
}

Internal message types (for advanced use)

Server → Consumer: mqtt.connect, mqtt.msg, mqtt.disconnect

Consumer → Server: mqtt.pub, mqtt.sub, mqtt.usub, mqttasgi.worker.spawn, mqttasgi.worker.kill

Project ideas and examples

Home automation — motion-triggered lights

A motion sensor publishes to home/sensor/motion. A consumer listens and publishes a command to the light controller, logging every event to the Django ORM.

from mqttasgi.consumers import MqttConsumer
from myapp.models import MotionEvent

class LightAutomationConsumer(MqttConsumer):

    async def connect(self):
        await self.subscribe('home/sensor/motion', qos=1)

    async def receive(self, mqtt_message):
        room = mqtt_message['payload'].decode()
        await MotionEvent.objects.acreate(room=room)
        await self.publish(f'home/lights/{room}/set', b'on', qos=1)

    async def disconnect(self):
        await self.unsubscribe('home/sensor/motion')

AI-powered automation — ask Claude before acting

Route sensor data through Claude to decide what action to take. The consumer calls the Anthropic API and publishes the result back onto the MQTT bus.

import anthropic
from mqttasgi.consumers import MqttConsumer

client = anthropic.Anthropic()

class AIAutomationConsumer(MqttConsumer):

    async def connect(self):
        await self.subscribe('home/sensor/#', qos=1)

    async def receive(self, mqtt_message):
        topic   = mqtt_message['topic']
        payload = mqtt_message['payload'].decode()

        message = client.messages.create(
            model='claude-opus-4-6',
            max_tokens=64,
            messages=[{
                'role': 'user',
                'content': (
                    f'Sensor reading — topic: {topic}, value: {payload}. '
                    'Reply with only the MQTT topic and payload to publish, '
                    'separated by a space. Example: home/lights/living on'
                ),
            }],
        )
        response = message.content[0].text.strip().split(' ', 1)
        if len(response) == 2:
            out_topic, out_payload = response
            await self.publish(out_topic, out_payload.encode(), qos=1)

    async def disconnect(self):
        await self.unsubscribe('home/sensor/#')

Energy monitoring — store readings in Django, alert on threshold

Electricity sensors publish consumption data every 30 seconds. The consumer persists each reading and fires an alert if usage spikes.

from mqttasgi.consumers import MqttConsumer
from myapp.models import EnergyReading

ALERT_THRESHOLD_WATTS = 3000

class EnergyMonitorConsumer(MqttConsumer):

    async def connect(self):
        await self.subscribe('home/energy/consumption', qos=1)

    async def receive(self, mqtt_message):
        watts = float(mqtt_message['payload'])
        await EnergyReading.objects.acreate(watts=watts)
        if watts > ALERT_THRESHOLD_WATTS:
            await self.publish('home/alerts/energy', b'high_consumption', qos=2)

    async def disconnect(self):
        await self.unsubscribe('home/energy/consumption')

Multi-device coordination — workers per room

Spawn a dedicated worker for each room so subscriptions and logic stay isolated. The master consumer manages the worker lifecycle.

class MasterConsumer(MqttConsumer):

    ROOMS = ['living', 'bedroom', 'kitchen']

    async def connect(self):
        for i, room in enumerate(self.ROOMS, start=1):
            await self.spawn_worker(
                app_id=i,
                consumer_path='myapp.consumers.RoomConsumer',
                consumer_params={'room': room},
            )

    async def receive(self, mqtt_message): pass
    async def disconnect(self): pass


class RoomConsumer(MqttConsumer):

    async def connect(self):
        room = self.scope['room']
        await self.subscribe(f'home/{room}/#', qos=1)

    async def receive(self, mqtt_message):
        # Handle all topics for this room
        ...

    async def disconnect(self):
        room = self.scope['room']
        await self.unsubscribe(f'home/{room}/#')

Garden irrigation — schedule-aware automation

Combine Django's ORM with MQTT to only water the garden when the schedule says so and soil moisture is below a threshold.

from django.utils import timezone
from mqttasgi.consumers import MqttConsumer
from myapp.models import IrrigationSchedule

class IrrigationConsumer(MqttConsumer):

    async def connect(self):
        await self.subscribe('garden/sensor/moisture', qos=1)

    async def receive(self, mqtt_message):
        moisture = float(mqtt_message['payload'])
        now = timezone.now()
        scheduled = await IrrigationSchedule.objects.filter(
            active=True,
            start_hour=now.hour,
        ).aexists()

        if scheduled and moisture < 30.0:
            await self.publish('garden/valve/main', b'open', qos=2)

    async def disconnect(self):
        await self.unsubscribe('garden/sensor/moisture')

Common pitfalls

  • MqttComunicator.connect() returns the first message the consumer sends. If connect() does nothing (no subscribe, no publish), the call will time out — always subscribe or send something in connect().
  • The class is spelled MqttComunicator (one m) — this is an intentional (legacy) typo in the library.
  • Worker spawn/kill is only allowed from the master consumer (app_id=0). Calling it from a worker raises an error.
  • With mosquitto 2.x you need allow_anonymous true and an explicit listener line in mosquitto.conf for integration tests.
  • connect_max_retries=0 means retry forever with exponential back-off (capped at 30 s).

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

交互式教学网页生成器

根据教案、教材或课件生成交互式教学HTML网页

Registry SourceRecently Updated
General

大学生学习规划助手

大学生学习规划与作业助手。根据课程表、作业截止日期和复习计划,生成每日任务清单、优先级排序,整理学习资料和复习要点,提供番茄钟学习提醒。触发词:学习计划、作业规划、复习计划、任务清单、DDL管理、课程安排、番茄钟、学习提醒。

Registry SourceRecently Updated
General

The English Tutor

英语口语陪练助手。触发词:「练习英语」「我要说英语」「英语陪练」。功能:上传生词表 → 每日定时情景对话 → 语音输入即时纠错 → 飞书语音推送 → 自动艾宾浩斯复习。

Registry SourceRecently Updated
General

Writing Triadic Repo

Triadic collaborative writing framework with self-evolving memory engine. Creator mines intent via progressive questioning, Executor produces ≥2 distinct dra...

Registry SourceRecently Updated