MQTT client can't subscribe to all topics

Hello,
I have an external script which is intended to subscribe to whatever I am sending via MQTT from a Tago analysis script. My two scenarios are:

  1. When a new user is signed up → An analysis script is triggered publishing some information to a MQTT topic
  2. When some data is submited via an input form by the user in it’s dashboard → An analysis script is triggered publishing some information to a MQTT topic

What I am doing right now is create one subscriber for each device, and so my second scenario is covered, however I don’t think this ideal. Since I need the device token for each device I want to listen to. For me the ideal approach would be to use a master token which allows me to have just one subscriber listening to every device, and also listen to the published topic when a new user is sing up.
I have tried implementing this by creating a device and giving it permissions to everything via tags so that I can use just its token but couldn’t make it work.
Note that I would like the data sent from the dashboard to be stored in the bucket of the corresponding device also.

Thanks for your time!

This is how I am implementing it so far. Although it works there are some issues I don’t like, which would be solved by using only one subscriber.

On the script outside Tago: I am awaiting this function everytime a tago device is created (cp is an object which saves all the information relating the device)

async def mqtt_subscriber(cp):
    print(f"[{cp.id}] listening to Tago goals...")
    async with Client("mqtt.tago.io", username="velo-energy", password=cp.tago_device_token) as client:
        async with client.filtered_messages("tago/" + str(cp.id) + "/#") as messages:
            await client.subscribe("tago/" + str(str(cp.id)) + "/#")
            async for message in messages:
                print(f"[{cp.id}] TOPIC: {message.topic}")
                msg = json.loads(message.payload)
                print(msg)
                if message.topic == "tago/" + str(cp.id) + "/new_user":
                    cp.user_name = msg["user_name"]
                    cp.user_email = msg["user_email"]
                    cp.tz = msg["user_timezone"]
                    cp.cp_type = msg["user_tags"][0]["value"]

On an analysis script in Tago for sending data from input forms in dashboards:

tago_account = tago.Account(token)

def myAnalysis(context,scope):
  my_data_bucket = scope[0]['bucket']
  my_data_value = scope[0]['value']
  my_data_device_id = scope[0]['origin']

  device_info = tago_account.devices.info(my_data_device_id)
  device_name = device_info["result"]["name"]


  my_data_json = {'variable': 'power_goal', 'value': my_data_value, 'unit': 'W', 'device': device_name}
  json_msg = json.dumps(my_data_json)
  topic = 'tago/' + str(device_name) + "/goals"
  print(f"TOPIC: {topic}")
  retain = False
  qos = 0
  # Publishing to MQTT
  MQTT = Services(context.token).MQTT
  result = MQTT.publish(json_msg,my_data_bucket,topic,retain,qos)
  #context.log(result)
Analysis().init(myAnalysis)

On an analysis script in Tago for sending data related to a new user who just signed up:

def myAnalysis(context, scope):

  tago_account = tago.Account(ACCOUNT_TOKEN)

  user_name = scope[0]["name"]
  user_email = scope[0]["email"]
  user_timezone = scope[0]["timezone"]
  user_tags = scope[0]["tags"]
  device_name = user_tags[1]["value"]
  tago_devices = tago_account.devices.list()
  for dev in tago_devices["result"]:
    if dev["name"] == device_name:
      tago_device_id = dev["id"]
      break

  tago_device_info = tago_account.devices.info(tago_device_id)
  tago_device_token_list = tago_account.devices.tokenList(tago_device_id)
  tago_device_token = tago_device_token_list["result"][-1]["token"]
  tago_bucket_id = tago_device_info["result"]["bucket"]["id"]

  my_data_json = {"user_name": user_name, "user_email": user_email, "user_timezone": user_timezone, "user_tags": user_tags}
  json_msg = json.dumps(my_data_json)
  topic = "tago/" + str(user_tags[1]["value"]) + "/new_user"
  print(f"TOPIC: {topic}")
  retain = False
  qos = 0
  # Publishing to MQTT
  MQTT = Services(context.token).MQTT
  result = MQTT.publish(json_msg,tago_bucket_id,topic,retain,qos)
  #context.log(result)

# The analysis token in only necessary to run the analysis outside TagoIO
Analysis().init(myAnalysis)

If there is any other way around this please let me know.

The TagoIO MQTT broker actually connects to the bucket of the device, instead of the device ID / token.

The only thing you need to change in your analysis is to publish to the bucket of your master device, instead of what you’re doing currently, that is get the bucket ID of the device that triggered the analysis and publishing to it.

  result = MQTT.publish(json_msg,my_data_bucket,topic,retain,qos)

you want to change to

  result = MQTT.publish(json_msg,'Bucket ID of your master device',topic,retain,qos)

1 Like

Hello Vitor,
I know you explained this to me in the meeting but I had already forgotten how to do it. It is so simple! and it works perfectly, thank you so much!

Andrés

1 Like