Implementing an Azure IoT Python Device Agent


The nice thing about Azure IoT is that there are SDKs already available in popular programming languages including C, C#, Java, Node.js, and Python and therefore should cater for our devices and preferred languages. More details on the specific SDKs can be found with the following links:

In this blog post we are going to focus on creating a couple of device agents using the Python Device SDK.

The scenario is we have an alarm which is managed by an alarm agent. When the status of the alarm changes, the alarm agent will send an alarm status to the IoT hub. The IoT hub then pass the event to the monitor which is managed by an alarm monitor agent.

What we will need:

  • An Azure subscription
  • Azure PowerShell
  • Azure IoT Hub
  • Python 3 Environment Installed

The alarm agent:

Let’s provision an Azure IoT Device so that the alarm agent can communicate with the IoT Hub. Open a Powershell console and make sure we are signed in with our subscription using Azure Powershell. Once that is done then provision the device as shown below.

PS D:\Workspace\IoTHub> $DoorAlarm= Add-AzIotHubDevice -ResourceGroupName "rg-sea-aniotodyssey" -IotHubName "ih-sea-aniotodyssey" -DeviceId "DoorAlarm" -AuthMethod "shared_private_key"
PS D:\Workspace\IoTHub>

Next we will retrieved the secondary connection string and assign it to an environment variable which we can use later in our Python application.

PS D:\Workspace\IoTHub> $TempObject = Get-AzIotHubDeviceConnectionString -ResourceGroupName "rg-sea-aniotodyssey" -IotHubName "ih-sea-aniotodyssey" -DeviceId "DoorAlarm" -KeyType secondary
PS D:\Workspace\IoTHub> $Env:DoorAlarmConnectionString = $TempObject.ConnectionString
PS D:\Workspace\IoTHub> $Env:SourceId = "DoorAlarm"
PS D:\Workspace\IoTHub> $Env:DestinationId = "AlarmMonitor"
PS D:\Workspace\IoTHub>

Now let’s begin implementing the Alarm Agent. The basic idea is that the agent will be monitoring the alarm indefinitely unless aborted by the user. So let’s create a Python application to do just that.

# AlarmAgent.py
import asyncio
import time
import sys
import signal
async def main():
    slept_counter = 0
    def abort_handler(*args):
        print("Alarm agent aborted with counter = %d" % slept_counter)
        sys.exit()
    signal.signal(signal.SIGINT, abort_handler)
    while True:
        slept_counter += 1
        time.sleep(0.1)
if __name__ == "__main__":
    asyncio.run(main())

Handle user abort demo

Instead of increment the slept_counter we will update AlarmAgent.py to randomly change the alarm status and print out the new status if changed (note that like previous blog posts we are simulating the physical devices at this stage).

# AlarmAgent.py
import asyncio
import time
import sys
import signal
import random
async def main():
    current_status = False
    def abort_handler(*args):
        print("Alarm agent aborted")
        sys.exit()
    signal.signal(signal.SIGINT, abort_handler)
    print("Initial alarm status: %s" % current_status)
    while True:
        time.sleep(3)
        if bool(random.getrandbits(1)) != current_status:
            current_status = not current_status
            print("Current alarm status: %s" % current_status)
if __name__ == "__main__":
    asyncio.run(main())

Simulate alarm status demo

When the alarm agent send an alarm event, it will be serialised using protobuf with the following data:

  • The status
  • Date
  • Time

If you are not familiar with protobuf, check out the blog post Introduction to Protocol Buffers.

Now carrying on, remember the alarm agent is an Azure IoT Hub device and only communicates with an IoT Hub, not the alarm monitor agent. So to notify the alarm monitor agent of new events, we use an serverless function to act as a message delivery service. From the sequence diagram above, we call this the Messenger. It’s single purpose is to process messages coming into the IoT Hub and relay it to the intended recipient, in our case the alarm monitor agent. In order to address the alarm event to the right recipient we will also wrap it in another protobuf message which we will call a parcel. All parcel will have the following structure:

// Parcel.proto
syntax = "proto3";
message Parcel {
   string source = 1;
   string destination = 2;
   string type = 3;
   string content = 4;
}

Let’s update AlarmAgent.py to encapsulate the status changed event inside a Parcel object.

# AlarmAgent.py
...
from AlarmStatus_pb2 import AlarmStatus
from Parcel_pb2 import Parcel
def pack_alarm_event(status=False):
    date_time_utc = datetime.now(timezone.utc)
    alarm_status = AlarmStatus()
    alarm_status.alarm_active = status
    alarm_status.time_utc = "{}".format(date_time_utc.time())
    alarm_status.date_utc = "{}".format(date_time_utc.date())
    parcel = Parcel()
    parcel.source = os.getenv("SourceId")
    parcel.destination = os.getenv("DestinationId")
    parcel.type = "AlarmStatus"
    parcel.content = str(alarm_status.SerializeToString(), 'utf-8')
    serialised_parcel = parcel.SerializeToString()
    print("Serialised data: {}".format(serialised_parcel))
    return serialised_parcel
async def main():
    ...
    while True:
        await asyncio.sleep(3)
        if bool(random.getrandbits(1)) != current_status:
            current_status = not current_status
            packed_parcel = pack_alarm_event(current_status)
if __name__ == "__main__":
    asyncio.run(main())

Now that we have a packed parcel ready to send, we can set up the Alarm Agent to pass the parcel onto the IoT Hub.

# AlarmAgent.py
...
from azure.iot.device.aio import IoTHubDeviceClient
...
async def main():
    ...
    conn_str = os.getenv("DoorAlarmConnectionString")
    device_client = IoTHubDeviceClient.create_from_connection_string(conn_str)
    await device_client.connect()
    ...
    while True:
        await asyncio.sleep(3)
        if bool(random.getrandbits(1)) != current_status:
            current_status = not current_status
            await device_client.send_message(pack_alarm_event(current_status))
...

The alarm monitor agent:

Similarly we will provision another Azure IoT Device for the alarm monitor agent and retrieve it’s connection string.

PS D:\Workspace\IoTHub> $AlarmMonitor = Add-AzIotHubDevice -ResourceGroupName "rg-sea-aniotodyssey" -IotHubName "ih-sea-aniotodyssey" -DeviceId "AlarmMonitor" -AuthMethod "shared_private_key"
PS D:\Workspace\IoTHub> $TempObject = Get-AzIotHubDeviceConnectionString -ResourceGroupName "rg-sea-aniotodyssey" -IotHubName "ih-sea-aniotodyssey" -DeviceId "AlarmMonitor" -KeyType secondary
PS D:\Workspace\IoTHub> $Env:AlarmMonitorConnectionString = $TempObject.ConnectionString
PS D:\Workspace\IoTHub>

Now we can focus on the alarm monitor agent, start by handling the data coming from the IoT Hub.

# AlarmMonitorAgent.py
import os
import asyncio
from azure.iot.device.aio import IoTHubDeviceClient
from azure.iot.device import MethodResponse
async def main():
    conn_str = os.getenv("AlarmMonitorConnectionString")
    device_client = IoTHubDeviceClient.create_from_connection_string(conn_str)
    await device_client.connect()
    # Define behavior for handling methods
    async def method_request_handler(method_request):
        if method_request.name == "ProcessMessage":
            payload = {"result": True, "data": "some data"}
            status = 200
            print("Process direct method call")
        else:
            payload = {"result": False, "data": "unknown method"}
            status = 400
        method_response = MethodResponse.create_from_method_request(method_request, status, payload)
        await device_client.send_method_response(method_response)
    device_client.on_method_request_received = method_request_handler
    # Define behavior for halting the application
    def stdin_listener():
        while True:
            selection = input("Press Q to quit\n")
            if selection == "Q" or selection == "q":
                print("Quitting...")
                break
    # Run the stdin listener in the event loop
    loop = asyncio.get_running_loop()
    user_finished = loop.run_in_executor(None, stdin_listener)
    # Wait for user to indicate they are done listening for method calls
    await user_finished
    # Finally, shut down the client
    await device_client.shutdown()
if __name__ == "__main__":
    asyncio.run(main())

The code above is the implementation of the AlarmMonitorAgent. The main thing to note is that when a message is received, method_request_handler() is called. Here we can unpack the message (aka the parcel) and decide how to handle it. Also want to point out that while all our parcels has been serialised as protobuf objects, a current limitation of direct method invocation is that only JSON payload is supported (issue #610), so a protobuf to JSON translation was done before passing the payload to the addressed device.

Moving forward, let’s look at unpacking the parcel and decide if we want to process it or throw it out. We will process it if the parcel is address to the correct device and the content type can be handled by the device, otherwise reject it.

PS D:\Workspace\IoTHub> $Env:AlarmMonitorAgentId = "AlarmMonitor"
PS D:\Workspace\IoTHub>
# AlarmMonitorAgent.py
...
async def main():
    
    ...
    async def method_request_handler(method_request):
        if method_request.name == "ProcessMessage":
            parcel = Parcel(**method_request.payload)
            if (parcel.destination == os.getenv("AlarmMonitorAgentId") and
                    parcel.type == "AlarmStatus"):
                payload = {"result": True, "data": "some data"}
                status = 200
                alarm_status = AlarmStatus()
                alarm_status.ParseFromString(bytes(parcel.content, 'utf-8'))
                print("Deserialised alarm_active: {}, time_utc: {}, date_utc: {}".format(alarm_status.alarm_active, alarm_status.time_utc, alarm_status.date_utc))
            else:
                payload = {"result": False, "data": "unknown method"}
                status = 400
        else:
            payload = {"result": False, "data": "unknown method"}
            status = 400
...

Connecting all the pieces:

Now that we have implemented the monitor agent to processed the alarm status events, let’s have a look at them both in action. The following demo shows the alarm agent on the top printing out an alarm status events as a serialised string while on the bottom shows the alarm monitor agent printing out the deserialised alarm status events when one is received. Just for context, the IoT Hub is hosted some 10km away.

Categories: Azure, Internet of things, PythonTags: , ,

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: