MQTT Examples

This document provides real-world examples of using MockForge MQTT for testing IoT applications, microservices communication, and pub/sub systems.

IoT Device Simulation

Smart Home System

Scenario: Test a smart home application that controls lights, thermostats, and security sensors.

MockForge Configuration:

mqtt:
  enabled: true
  port: 1883

  fixtures:
    # Smart Lights
    - identifier: "living-room-light"
      name: "Living Room Light"
      topic_pattern: "^home/lights/living_room/command$"
      qos: 1
      response:
        payload:
          device_id: "living_room_light"
          status: "success"
          brightness: "{{faker.int 0 100}}"
          timestamp: "{{now}}"

    - identifier: "kitchen-light"
      name: "Kitchen Light"
      topic_pattern: "^home/lights/kitchen/command$"
      qos: 1
      response:
        payload:
          device_id: "kitchen_light"
          status: "success"
          color_temp: "{{faker.int 2700 6500}}"
          timestamp: "{{now}}"

    # Thermostat
    - identifier: "thermostat"
      name: "Smart Thermostat"
      topic_pattern: "^home/climate/thermostat$"
      qos: 2
      retained: true
      response:
        payload:
          temperature: "{{faker.float 18.0 25.0}}"
          humidity: "{{faker.float 35.0 65.0}}"
          mode: "{{faker.random_element heating cooling auto}}"
          setpoint: "{{faker.float 19.0 23.0}}"
          timestamp: "{{now}}"
      auto_publish:
        enabled: true
        interval_ms: 30000

    # Motion Sensors
    - identifier: "motion-sensor"
      name: "Motion Sensor"
      topic_pattern: "^home/security/motion/(.+)$"
      qos: 0
      response:
        payload:
          sensor_id: "{{topic_param 1}}"
          motion_detected: "{{faker.boolean}}"
          battery_level: "{{faker.float 70.0 100.0}}"
          timestamp: "{{now}}"
      auto_publish:
        enabled: true
        interval_ms: 15000

Test Code (Python):

import paho.mqtt.client as mqtt
import json
import time

def test_smart_home_integration():
    client = mqtt.Client("test-client")
    client.connect("localhost", 1883, 60)

    # Test light control
    client.publish("home/lights/living_room/command", json.dumps({
        "action": "turn_on",
        "brightness": 80
    }), qos=1)

    # Subscribe to responses
    responses = []
    def on_message(client, userdata, msg):
        responses.append(json.loads(msg.payload.decode()))

    client.on_message = on_message
    client.subscribe("home/lights/living_room/status")
    client.loop_start()

    # Wait for response
    time.sleep(1)
    client.loop_stop()

    assert len(responses) > 0
    assert responses[0]["device_id"] == "living_room_light"
    assert responses[0]["status"] == "success"

    # Test thermostat reading
    client.subscribe("home/climate/thermostat")
    client.loop_start()
    time.sleep(2)  # Wait for auto-published message
    client.loop_stop()

    # Verify thermostat data
    thermostat_data = None
    for response in responses:
        if "temperature" in response:
            thermostat_data = response
            break

    assert thermostat_data is not None
    assert 18.0 <= thermostat_data["temperature"] <= 25.0
    assert thermostat_data["mode"] in ["heating", "cooling", "auto"]

    client.disconnect()

Industrial IoT Monitoring

Scenario: Test an industrial monitoring system with sensors, actuators, and PLCs.

MockForge Configuration:

mqtt:
  enabled: true
  port: 1883
  max_connections: 100

  fixtures:
    # Temperature Sensors
    - identifier: "temp-sensor-1"
      name: "Temperature Sensor 1"
      topic_pattern: "^factory/sensors/temp/1$"
      qos: 1
      retained: true
      response:
        payload:
          sensor_id: "temp_1"
          temperature: "{{faker.float 20.0 80.0}}"
          unit: "celsius"
          status: "operational"
          timestamp: "{{now}}"
      auto_publish:
        enabled: true
        interval_ms: 5000

    # Pressure Sensors
    - identifier: "pressure-sensor"
      name: "Pressure Sensor"
      topic_pattern: "^factory/sensors/pressure/(.+)$"
      qos: 1
      response:
        payload:
          sensor_id: "{{topic_param 1}}"
          pressure: "{{faker.float 0.5 5.0}}"
          unit: "bar"
          threshold: 3.5
          alert: "{{#if (> pressure 3.5)}}true{{else}}false{{/if}}"
          timestamp: "{{now}}"

    # Conveyor Belt Controller
    - identifier: "conveyor-controller"
      name: "Conveyor Belt Controller"
      topic_pattern: "^factory/actuators/conveyor/(.+)/command$"
      qos: 2
      response:
        payload:
          actuator_id: "{{topic_param 1}}"
          command_ack: true
          status: "executing"
          estimated_completion: "{{now + 5s}}"
          timestamp: "{{now}}"

    # Quality Control Station
    - identifier: "qc-station"
      name: "Quality Control Station"
      topic_pattern: "^factory/qc/station_(.+)/result$"
      qos: 2
      response:
        payload:
          station_id: "{{topic_param 1}}"
          product_id: "{{uuid}}"
          quality_score: "{{faker.float 85.0 100.0}}"
          defects: "{{faker.int 0 2}}"
          passed: "{{#if (> quality_score 95.0)}}true{{else}}false{{/if}}"
          timestamp: "{{now}}"

Test Code (JavaScript/Node.js):

const mqtt = require('mqtt');

describe('Industrial IoT System', () => {
  let client;

  beforeAll(() => {
    client = mqtt.connect('mqtt://localhost:1883');
  });

  afterAll(() => {
    client.end();
  });

  test('sensor data collection', (done) => {
    const sensorData = [];

    client.subscribe('factory/sensors/temp/1');
    client.subscribe('factory/sensors/pressure/1');

    client.on('message', (topic, message) => {
      const data = JSON.parse(message.toString());
      sensorData.push({ topic, data });

      if (sensorData.length >= 2) {
        // Verify temperature sensor
        const tempSensor = sensorData.find(s => s.topic === 'factory/sensors/temp/1');
        expect(tempSensor.data.temperature).toBeGreaterThanOrEqual(20);
        expect(tempSensor.data.temperature).toBeLessThanOrEqual(80);
        expect(tempSensor.data.unit).toBe('celsius');

        // Verify pressure sensor
        const pressureSensor = sensorData.find(s => s.topic === 'factory/sensors/pressure/1');
        expect(pressureSensor.data.pressure).toBeGreaterThanOrEqual(0.5);
        expect(pressureSensor.data.pressure).toBeLessThanOrEqual(5.0);
        expect(pressureSensor.data.unit).toBe('bar');

        client.unsubscribe(['factory/sensors/temp/1', 'factory/sensors/pressure/1']);
        done();
      }
    });

    // Trigger sensor readings
    client.publish('factory/sensors/temp/1/trigger', 'read');
    client.publish('factory/sensors/pressure/1/trigger', 'read');
  });

  test('actuator control', (done) => {
    client.subscribe('factory/actuators/conveyor/1/status');

    client.on('message', (topic, message) => {
      if (topic === 'factory/actuators/conveyor/1/status') {
        const status = JSON.parse(message.toString());
        expect(status.actuator_id).toBe('1');
        expect(status.command_ack).toBe(true);
        expect(status.status).toBe('executing');

        client.unsubscribe('factory/actuators/conveyor/1/status');
        done();
      }
    });

    // Send control command
    client.publish('factory/actuators/conveyor/1/command', JSON.stringify({
      action: 'start',
      speed: 50
    }), { qos: 2 });
  });

  test('quality control workflow', (done) => {
    client.subscribe('factory/qc/station_1/result');

    client.on('message', (topic, message) => {
      const result = JSON.parse(message.toString());
      expect(result.station_id).toBe('1');
      expect(result.quality_score).toBeGreaterThanOrEqual(85);
      expect(result.quality_score).toBeLessThanOrEqual(100);
      expect(typeof result.defects).toBe('number');
      expect(typeof result.passed).toBe('boolean');

      client.unsubscribe('factory/qc/station_1/result');
      done();
    });

    // Trigger quality check
    client.publish('factory/qc/station_1/check', JSON.stringify({
      product_id: 'PROD-001',
      batch_id: 'BATCH-2024'
    }));
  });
});

Microservices Communication

Event-Driven Architecture

Scenario: Test microservices communicating via MQTT events.

MockForge Configuration:

mqtt:
  enabled: true
  port: 1883

  fixtures:
    # User Service Events
    - identifier: "user-registered"
      name: "User Registration Event"
      topic_pattern: "^events/user/registered$"
      qos: 1
      response:
        payload:
          event_type: "user_registered"
          user_id: "{{uuid}}"
          email: "{{faker.email}}"
          timestamp: "{{now}}"
          source: "user-service"

    # Order Service Events
    - identifier: "order-created"
      name: "Order Created Event"
      topic_pattern: "^events/order/created$"
      qos: 1
      response:
        payload:
          event_type: "order_created"
          order_id: "{{uuid}}"
          user_id: "{{uuid}}"
          amount: "{{faker.float 10.0 500.0}}"
          currency: "USD"
          items: "{{faker.int 1 10}}"
          timestamp: "{{now}}"
          source: "order-service"

    # Payment Service Events
    - identifier: "payment-processed"
      name: "Payment Processed Event"
      topic_pattern: "^events/payment/processed$"
      qos: 2
      response:
        payload:
          event_type: "payment_processed"
          payment_id: "{{uuid}}"
          order_id: "{{uuid}}"
          amount: "{{faker.float 10.0 500.0}}"
          currency: "USD"
          status: "{{faker.random_element completed failed pending}}"
          method: "{{faker.random_element credit_card paypal bank_transfer}}"
          timestamp: "{{now}}"
          source: "payment-service"

    # Notification Service
    - identifier: "email-notification"
      name: "Email Notification"
      topic_pattern: "^commands/notification/email$"
      qos: 1
      response:
        payload:
          command_type: "send_email"
          notification_id: "{{uuid}}"
          recipient: "{{faker.email}}"
          subject: "Order Confirmation"
          template: "order_confirmation"
          status: "queued"
          timestamp: "{{now}}"

Test Code (Go):

package main

import (
    "encoding/json"
    "testing"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
)

func TestEventDrivenWorkflow(t *testing.T) {
    opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883")
    client := mqtt.NewClient(opts)

    if token := client.Connect(); token.Wait() && token.Error() != nil {
        t.Fatalf("Failed to connect: %v", token.Error())
    }
    defer client.Disconnect(250)

    // Test user registration -> order creation -> payment -> notification flow
    events := make(chan map[string]interface{}, 10)

    // Subscribe to all events
    client.Subscribe("events/#", 1, func(client mqtt.Client, msg mqtt.Message) {
        var event map[string]interface{}
        json.Unmarshal(msg.Payload(), &event)
        events <- event
    })

    // Trigger user registration
    userEvent := map[string]interface{}{
        "user_id": "user-123",
        "email": "user@example.com",
    }
    payload, _ := json.Marshal(userEvent)
    client.Publish("events/user/registered", 1, false, payload)

    // Wait for events
    timeout := time.After(5 * time.Second)
    receivedEvents := make(map[string]int)

    for {
        select {
        case event := <-events:
            eventType := event["event_type"].(string)
            receivedEvents[eventType]++

            // Verify event structure
            switch eventType {
            case "user_registered":
                if event["user_id"] == nil || event["email"] == nil {
                    t.Errorf("Invalid user_registered event: %v", event)
                }
            case "order_created":
                if event["order_id"] == nil || event["amount"] == nil {
                    t.Errorf("Invalid order_created event: %v", event)
                }
            case "payment_processed":
                if event["payment_id"] == nil || event["status"] == nil {
                    t.Errorf("Invalid payment_processed event: %v", event)
                }
            }
        case <-timeout:
            // Check that we received expected events
            if receivedEvents["user_registered"] == 0 {
                t.Error("Expected user_registered event")
            }
            if receivedEvents["order_created"] == 0 {
                t.Error("Expected order_created event")
            }
            if receivedEvents["payment_processed"] == 0 {
                t.Error("Expected payment_processed event")
            }
            return
        }
    }
}

Real-Time Data Streaming

Live Dashboard Testing

Scenario: Test a real-time dashboard that displays sensor data and alerts.

MockForge Configuration:

mqtt:
  enabled: true
  port: 1883

  fixtures:
    # Environmental Sensors
    - identifier: "env-sensor-cluster"
      name: "Environmental Sensor Cluster"
      topic_pattern: "^sensors/env/(.+)/(.+)$"
      qos: 0
      response:
        payload:
          sensor_type: "{{topic_param 2}}"
          location: "{{topic_param 1}}"
          value: "{{#switch topic_param.2}}
                     {{#case 'temperature'}}{{faker.float 15.0 35.0}}{{/case}}
                     {{#case 'humidity'}}{{faker.float 30.0 90.0}}{{/case}}
                     {{#case 'co2'}}{{faker.float 400.0 2000.0}}{{/case}}
                     {{#default}}0{{/default}}
                   {{/switch}}"
          unit: "{{#switch topic_param.2}}
                   {{#case 'temperature'}}celsius{{/case}}
                   {{#case 'humidity'}}percent{{/case}}
                   {{#case 'co2'}}ppm{{/case}}
                   {{#default}}unit{{/default}}
                 {{/switch}}"
          timestamp: "{{now}}"
      auto_publish:
        enabled: true
        interval_ms: 2000

    # System Alerts
    - identifier: "system-alerts"
      name: "System Alerts"
      topic_pattern: "^alerts/system/(.+)$"
      qos: 1
      response:
        payload:
          alert_type: "{{topic_param 1}}"
          severity: "{{faker.random_element info warning error critical}}"
          message: "{{#switch topic_param.1}}
                      {{#case 'temperature'}}High temperature detected{{/case}}
                      {{#case 'power'}}Power supply issue{{/case}}
                      {{#case 'network'}}Network connectivity lost{{/case}}
                      {{#default}}System alert{{/default}}
                    {{/switch}}"
          sensor_id: "{{uuid}}"
          timestamp: "{{now}}"
      auto_publish:
        enabled: true
        interval_ms: 30000

Test Code (Rust):

#![allow(unused)]
fn main() {
use paho_mqtt as mqtt;
use std::time::Duration;

#[tokio::test]
async fn test_realtime_dashboard() {
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri("tcp://localhost:1883")
        .client_id("dashboard-test")
        .finalize();

    let mut client = mqtt::AsyncClient::new(create_opts).unwrap();
    let conn_opts = mqtt::ConnectOptions::new();
    client.connect(conn_opts).await.unwrap();

    // Subscribe to sensor data
    client.subscribe("sensors/env/+/temperature", mqtt::QOS_0).await.unwrap();
    client.subscribe("sensors/env/+/humidity", mqtt::QOS_0).await.unwrap();
    client.subscribe("alerts/system/+", mqtt::QOS_1).await.unwrap();

    let mut receiver = client.get_stream(100);
    let mut message_count = 0;
    let mut alerts_received = 0;

    // Collect messages for 10 seconds
    let start_time = std::time::Instant::now();
    while start_time.elapsed() < Duration::from_secs(10) {
        if let Ok(Some(msg)) = tokio::time::timeout(Duration::from_millis(100), receiver.recv()).await {
            message_count += 1;

            let payload: serde_json::Value = serde_json::from_str(&msg.payload_str()).unwrap();

            // Verify sensor data structure
            if msg.topic().contains("sensors/env") {
                assert!(payload.get("sensor_type").is_some());
                assert!(payload.get("location").is_some());
                assert!(payload.get("value").is_some());
                assert!(payload.get("unit").is_some());
                assert!(payload.get("timestamp").is_some());
            }

            // Count alerts
            if msg.topic().contains("alerts/system") {
                alerts_received += 1;
                assert!(payload.get("alert_type").is_some());
                assert!(payload.get("severity").is_some());
                assert!(payload.get("message").is_some());
            }
        }
    }

    // Verify we received data
    assert!(message_count > 0, "No messages received");
    assert!(alerts_received > 0, "No alerts received");

    client.disconnect(None).await.unwrap();
}
}

CI/CD Integration

Automated Testing Pipeline

# .github/workflows/mqtt-tests.yml
name: MQTT Integration Tests

on: [push, pull_request]

jobs:
  mqtt-tests:
    runs-on: ubuntu-latest

    services:
      mockforge:
        image: mockforge:latest
        ports:
          - 1883:1883
        env:
          MOCKFORGE_MQTT_ENABLED: true
          MOCKFORGE_MQTT_FIXTURES: ./test-fixtures/mqtt/

    steps:
      - uses: actions/checkout@v3

      - name: Setup Node.js
        uses: actions/setup-node@v3
        with:
          node-version: '18'

      - name: Install dependencies
        run: npm ci

      - name: Wait for MockForge
        run: |
          timeout 30 bash -c 'until nc -z localhost 1883; do sleep 1; done'

      - name: Run MQTT tests
        run: npm test -- --testPathPattern=mqtt
        env:
          MQTT_BROKER: localhost:1883

Performance Testing

Load Testing MQTT Broker

mqtt:
  enabled: true
  port: 1883
  max_connections: 1000

  fixtures:
    - identifier: "load-test-sensor"
      name: "Load Test Sensor"
      topic_pattern: "^loadtest/sensor/(.+)$"
      qos: 0
      response:
        payload:
          sensor_id: "{{topic_param 1}}"
          value: "{{faker.float 0.0 100.0}}"
          timestamp: "{{now}}"

Load Test Script (Python):

import paho.mqtt.client as mqtt
import threading
import time
import json

def create_publisher(client_id, num_messages):
    client = mqtt.Client(f"publisher-{client_id}")
    client.connect("localhost", 1883, 60)

    for i in range(num_messages):
        payload = {
            "sensor_id": f"sensor_{client_id}_{i}",
            "value": i * 1.5,
            "timestamp": time.time()
        }
        client.publish(f"loadtest/sensor/{client_id}", json.dumps(payload), qos=0)

    client.disconnect()

def load_test():
    num_publishers = 50
    messages_per_publisher = 100

    start_time = time.time()

    threads = []
    for i in range(num_publishers):
        thread = threading.Thread(target=create_publisher, args=(i, messages_per_publisher))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    end_time = time.time()
    total_messages = num_publishers * messages_per_publisher
    duration = end_time - start_time

    print(f"Published {total_messages} messages in {duration:.2f} seconds")
    print(f"Throughput: {total_messages / duration:.0f} messages/second")

if __name__ == "__main__":
    load_test()

Next Steps