How to Implement Firmware Updates (FUOTA) over MQTT

FUOTA over MQTT

If your device supports HTTPS, the update process is straightforward: the device simply “pulls” the file from a URL. (See our guide on How to implement Firmware Updates (FUOTA) over HTTPS if that fits your use case).

But what if your device is constrained and only has an MQTT connection?

You face two major hurdles:

  1. Packet Size Limits: MQTT brokers often limit messages to 4KB or less. You cannot send a 500KB binary file in a single message.

  2. Timeouts: Cloud functions (like TagoIO Analysis) have execution time limits. If you try to loop through a large file byte-by-byte in one script execution, it will time out before the update finishes.

In this guide, we will build a Stateless Chunking System. The device will request the file piece by piece, and TagoIO will serve it on demand.

The Architecture: Request & Response

We will treat the TagoIO Analysis as a “Stateless Server.” It does not stay running. Instead, it wakes up, delivers exactly one chunk, and goes back to sleep.


Step 1: The Dashboard (Template)

We need a way to upload the firmware file to TagoIO. To save time, we will use the same dashboard template used in the HTTPS tutorial.

  1. Click here to install the FUOTA Dashboard Template.

    • If you already installed this for the HTTPS tutorial, you can skip this step and reuse the existing dashboard.
  2. Select your account and click Confirm.

  3. This installs a dashboard named FUOTA Manager with:

    • An Input Form to upload the .bin file.

    • A Device Selector to choose which device gets the update.

How it works: When you upload a file via this form, TagoIO saves it to the “Files” section and generates a private, secure URL. Our script will use this URL to read the file.


Step 2: Configure Permissions (Access Management)

The Analysis script needs permission to read the file URL from the device parameters and send the data chunks back.

  1. Go to Access Management in the sidebar.

  2. Click Create Policy.

    • Name: “Analysis FUOTA Access”.

    • Target: Analysis.

    • Field: ID (You can select “Any” or target your specific analysis after Step 3).

  3. Permissions:

    • Permission 1:

      • Effect: Allow.
      • Resource: Device.
      • Rules: Check Access, Edit, Token Access, Send Data, and Delete Data.
      • Target: Tag (You can select “Any” or target your specific device by adding a shared tag)
    • Permission 2:

      • Effect: Allow.
      • Resource: File.
      • Rules: Check Access.
      • Target: Any
    • Permission 3:

      • Effect: Allow.
      • Resource: Services.
      • Rules: Check Publish Message.
  4. Click Create Policy.

Note: Without this policy, your script will fail with “Permission Denied” errors when trying to send the chunks.


Step 3: The Analysis Script

This script acts as a “Stateless Server.” It handles two specific scenarios:

  1. Manifest: When you upload a file, it calculates the total size and notifies the device.

  2. Chunking: When the device requests a specific index (e.g., “Chunk 5”), it fetches only those specific bytes and sends them.

  3. Go to Analysis and create a new script.

  4. Runtime: Node.js (or Deno).

  5. Code: Copy and paste the following:

/*
 * Analysis: Universal FUOTA Handler (SDK Services + Validation)
 * Runtime: Deno
 * Description: Handles File Upload (Manifest) and Chunk Requests via SDK MQTT.
 */
import { Analysis, Resources, Utils, Services } from "npm:@tago-io/sdk";
import { DateTime } from "npm:luxon";

// CONFIGURATION
const CHUNK_SIZE = 4096; // 4KB
const TAGO_BROKER_HOST = "mqtt.tago.io";
const TAGO_BROKER_PORT = 1883;

type validation_type = "success" | "danger" | "warning" | string;

interface IValidateOptions {
  show_markdown?: boolean;
  user_id?: string;
  session_id?: string;
}

/**
 * Helper: Sends validation feedback to the Input Form widget.
 * (Kept exactly as in your original script)
 */
function initializeValidation(resources: Resources, validationVariable: string, device_id: string, opts?: IValidateOptions) {
  let i = 0;
  return async function _(message: string, type: validation_type = "success") {
    if (!message || !type) throw "Missing message or type";

    i += 1;
    await resources.devices
      .deleteDeviceData(device_id, { variables: validationVariable, qty: 999 })
      .catch(console.log);

    await resources.devices
      .sendDeviceData(device_id, {
        variable: validationVariable,
        value: message,
        time: DateTime.now().plus({ milliseconds: i * 200 }).toJSDate(), 
        metadata: {
          type: ["success", "danger", "warning"].includes(type) ? type : null,
          color: !["success", "danger", "warning"].includes(type) ? type : undefined,
          show_markdown: !!opts?.show_markdown,
          user_id: opts?.user_id,
          session_id: opts?.session_id,
        },
      })
      .catch(console.error);

    return message;
  };
}

/**
 * Helper: Publish to Device via TagoIO MQTT Service
 * This replaces the need for a separate Action.
 */
async function publishViaMQTT(services: Services, resources: Resources, deviceId: string, payload: any) {
  // 1. Fetch the Device Token (Requires IAM Permission: Token -> Read)
  const tokens = await resources.devices.tokenList(deviceId, { 
    page: 1, 
    amount: 1, 
    filter: { permission: "full" } 
  });

  if (!tokens || !tokens.length) {
    throw "Error: Could not fetch Device Token. Check IAM Policies.";
  }

  const deviceToken = tokens[0].token;

  // 2. Publish using SDK Services
  await services.MQTT.publish({
    bucket: deviceId, // Logs the outgoing message to the bucket for debugging
    message: JSON.stringify(payload),
    topic: "tago/data/post", // Standard topic devices listen to
    options: {
      server: TAGO_BROKER_HOST,
      port: TAGO_BROKER_PORT,
      username: "Token",
      password: deviceToken, // Authenticates as the device
    },
  });
}

async function startAnalysis(context: any, scope: any) {
  const resources = new Resources({ token: context.token });
  const services = new Services({ token: context.token });

  // =========================================================
  // SCENARIO A: Start Update (Triggered by File Upload)
  // =========================================================
  const fileVar = scope.find((x: any) => x.variable === "firmware_file");
  
  if (fileVar) {
    const deviceId = fileVar.device;
    if (!deviceId) return context.log("Error: No device selected in Input Form.");

    const environment = Utils.envToJson(context.environment);
    const userId = environment._user_id;
    const validate = initializeValidation(resources, "firmware_validation", deviceId, { user_id: userId });

    try {
        const fileUrl = fileVar.metadata?.file?.url;

        if (!fileUrl) throw await validate("File URL not found.", "danger");

        await validate("Calculating file size...", "warning");
        context.log(`[Manifest] Initializing for Device: ${deviceId}`);

        // 1. HEAD request: Get file size
        const headResp = await fetch(fileUrl, { method: "HEAD" });
        const totalSize = Number(headResp.headers.get("content-length"));
        
        if (!totalSize) throw await validate("Error: Could not determine file size.", "danger");
        
        const totalChunks = Math.ceil(totalSize / CHUNK_SIZE);

        // 2. Clean Up Old Params
        const currentParams = await resources.devices.paramList(deviceId);
        const keysToRemove = ["ota_url", "ota_size", "ota_chunks"];
        const pToDelete = currentParams.filter((p) => keysToRemove.includes(p.key)).map((p) => p.id);
        if (pToDelete.length) await Promise.all(pToDelete.map((id) => resources.devices.paramRemove(deviceId, id)));

        // 3. Save New Params
        await resources.devices.paramSet(deviceId, [
          { key: "ota_url", value: fileUrl },
          { key: "ota_size", value: totalSize.toString() },
          { key: "ota_chunks", value: totalChunks.toString() }
        ]);

        // 4. Construct Manifest Payload
        const manifestPayload = {
            variable: "firmware_manifest",
            value: "pending",
            metadata: {
                total_size: totalSize,
                total_chunks: totalChunks,
                chunk_size: CHUNK_SIZE,
                version: scope.find((x:any) => x.variable === "firmware_version")?.value || "1.0"
            }
        };

        // 5. Send Manifest via MQTT (Using Services)
        // This ensures the device gets the 'pending' signal immediately via MQTT
        await publishViaMQTT(services, resources, deviceId, manifestPayload);
        
        await validate(`Ready: ${totalSize} bytes (${totalChunks} chunks). Manifest Sent.`, "success");
        return context.log(`[Manifest] Sent via MQTT. Size: ${totalSize}`);

    } catch (error) {
        if (typeof error !== "string") {
             context.log(error);
             await validate("Internal Error. Check Analysis Logs.", "danger");
        }
    }
  }

  // =========================================================
  // SCENARIO B: Get Chunk (Triggered by Device Request)
  // =========================================================
  const requestVar = scope.find((x: any) => x.variable === "firmware_request");

  if (requestVar) {
    const deviceId = requestVar.device;
    const chunkIndex = Number(requestVar.value);

    // 1. Retrieve URL from Parameters
    const params = await resources.devices.paramList(deviceId);
    const urlParam = params.find((p: any) => p.key === "ota_url");
    
    if (!urlParam) return context.log("Error: No OTA URL configured.");

    // 2. Calculate Range
    const startByte = chunkIndex * CHUNK_SIZE;
    const endByte = startByte + CHUNK_SIZE - 1; 

    // 3. Fetch ONLY the needed bytes (Range Header)
    const response = await fetch(urlParam.value, {
        headers: { "Range": `bytes=${startByte}-${endByte}` }
    });
    const arrayBuffer = await response.arrayBuffer();
    
    // 4. Encode to Base64
    const chunkBase64 = btoa(String.fromCharCode(...new Uint8Array(arrayBuffer)));

    // 5. Construct Chunk Payload
    const chunkPayload = {
        variable: "firmware_block",
        value: chunkIndex,
        group: requestVar.group, // Maintain group context if exists
        metadata: {
            data: chunkBase64,
            size: arrayBuffer.byteLength
        }
    };

    // 6. Publish via MQTT (Using Services)
    // This replaces the "Downlink Action"
    try {
        await publishViaMQTT(services, resources, deviceId, chunkPayload);
        context.log(`[Chunk ${chunkIndex}] Sent ${arrayBuffer.byteLength} bytes via Services.MQTT`);
    } catch (err) {
        context.log(err);
        await resources.devices.sendDeviceData(deviceId, {
            variable: "fuota_error", 
            value: "Failed to publish chunk via MQTT"
        });
    }
  }
}

Analysis.use(startAnalysis);


Step 4: Configure the Actions (The Trigger)

We will use a single Action to handle the entire logic. TagoIO Actions allow multiple triggers using “OR” logic, meaning the script will run if either event happens.

  1. Go to Actions and click Add Action.

  2. General:

    • Name: “FUOTA Handler”

    • Type: Variable Trigger

    • Action: Run Analysis (Select the script from Step 3).

  3. Triggers: (Add two triggers here)

    • Trigger 1: Variable firmware_file (Starts the update), with Any value.

    • Trigger 2: Variable firmware_request (Handles the chunk loop), with Any value.

  4. Save.

Now, whether you upload a file or the device requests a chunk, the same script will wake up and handle the job.


Step 5: The Device Simulator

To test this immediately without flashing a real microcontroller, use this Python script. It simulates an IoT device that connects, receives the manifest, and requests the file chunk by chunk.

Prerequisites:

pip install paho-mqtt

Simulator Code:
Replace YOUR_DEVICE_TOKEN_HERE with a token from your Device’s “Tokens” tab.

import paho.mqtt.client as mqtt
import json
import base64
import time

# --- CONFIGURATION ---
DEVICE_TOKEN = "YOUR_DEVICE_TOKEN_HERE" 
BROKER = "mqtt.tago.io"
PORT = 1883

# TOPICS
# Uplink: Where we send requests
TOPIC_PUB = "tago/data/post"

# Downlink: Where we receive data. 
# Since the Analysis script publishes to 'tago/data/post', we must listen there.
TOPIC_SUB = "tago/data/post" 

# Global State
firmware_buffer = {} 
expected_chunks = 0
is_downloading = False

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print(f"Connected to TagoIO MQTT!")
        # Subscribe to the standard topic to receive messages from the Analysis
        client.subscribe(TOPIC_SUB)
        print(f"Listening on: {TOPIC_SUB}")
    else:
        print(f"Connection Failed. Code: {rc}")

def on_message(client, userdata, msg):
    global is_downloading, expected_chunks, firmware_buffer

    try:
        payload = json.loads(msg.payload.decode())
    except:
        return 

    # SAFETY CHECK: Ignore our own uplink messages
    # Since we subscribe and publish to the same topic, we might see our own 'firmware_request'
    if payload.get("variable") == "firmware_request":
        return

    # 1. HANDLE MANIFEST (Start Signal)
    if payload.get("variable") == "firmware_manifest":
        meta = payload.get("metadata", {})
        expected_chunks = int(meta.get("total_chunks", 0))
        file_size = meta.get("total_size", 0)
        version = meta.get("version", "unknown")
        
        print(f"\n[OTA] New Firmware v{version} Available!")
        print(f"      Size: {file_size} bytes | Chunks: {expected_chunks}")
        
        firmware_buffer = {}
        is_downloading = True
        
        # Start by requesting Chunk 0
        request_chunk(client, 0)

    # 2. HANDLE CHUNK (Data Block)
    elif payload.get("variable") == "firmware_block":
        chunk_index = int(payload.get("value"))
        
        meta = payload.get("metadata", {})
        b64_data = meta.get("data", "")
        
        print(f"[OTA] Received Chunk {chunk_index + 1}/{expected_chunks}")
        
        if b64_data:
            chunk_bytes = base64.b64decode(b64_data)
            firmware_buffer[chunk_index] = chunk_bytes
        
        # Request next chunk
        next_chunk = chunk_index + 1
        if next_chunk < expected_chunks:
            time.sleep(0.1) 
            request_chunk(client, next_chunk)
        else:
            finish_update()

def request_chunk(client, index):
    payload = {
        "variable": "firmware_request",
        "value": index,
        "unit": "index"
    }
    client.publish(TOPIC_PUB, json.dumps(payload))

def finish_update():
    global is_downloading
    print("\n[OTA] Download Complete. Reassembling...")
    
    final_data = bytearray()
    for i in range(expected_chunks):
        if i in firmware_buffer:
            final_data.extend(firmware_buffer[i])
        else:
            print(f"Error: Missing chunk {i}")
            return

    with open("firmware_mqtt_download.bin", "wb") as f:
        f.write(final_data)
    
    print(f"[OTA] Success! File saved as 'firmware_mqtt_download.bin'")
    is_downloading = False

# --- MAIN ---
client = mqtt.Client()
client.username_pw_set("Token", DEVICE_TOKEN)
client.on_connect = on_connect
client.on_message = on_message

print("Connecting to Broker...")
client.connect(BROKER, PORT, 60)
client.loop_forever()

Step 6: Testing the System

  1. Run the Python Simulator: It will connect and wait.

  2. Upload Firmware:

    • Go to your FUOTA Manager Dashboard.

    • Select your device in the selector.

    • Upload a dummy .bin file (or any small file) using the form.

  3. Watch the Process:

    • Python Console: You will see New Firmware Available! followed by a rapid stream of Received Chunk X.

    • TagoIO Analysis Console: You can view the logs to see the script waking up, fetching 4KB, and sleeping for each request.

    • Result: A file named firmware_mqtt_download.bin will appear in your Python script’s folder.

You now have a robust, scalable firmware update system that works entirely over MQTT, respecting both device constraints and serverless timeouts.