Creating a Simple CoAP Server with Python


What is CoAP?

CoAP stands for Constrained Application Protocol, is a communication protocol intended for constrained devices or constrained communication channels. It has a client/server based model much like HTTP but with the bonus of the client able to register as a resource observer, unlike HTTP where the client needs to poll the server for updates. While CoAP is built on top of UDP, it does provide mechanism for reliable communications.

More information about CoAP can be found in the following resources:

What is a CoAP server?

A CoAP server is generally a service running on a constrained device hosting/managing one or more resources (e.g. light switches, alarms, sensors). Each resource will be provided with a unique CoAP address endpoint and a defined API so that CoAP clients could interact with that resource.

Creating a simple CoAP server

We will create simple CoAP server with a single resource. The resource will hold the state of an alarm. There will be two access methods:

  1. PUT
    • This method allows a CoAP client connected to the alarm to update it’s state on the server.
  2. OBSERVABLE GET
    • This method allows other CoAP clients to register with the resource so they can be notified when the state of the alarm changes.

To create our simple CoAP server we will be using Python 3 and the Aiocoap library. However, in this tutorial we will just focus on the PUT method and leave OBSERVABLE GET for the next tutorial.

Create a Python3 environment for our simple CoAP server
Install Aiocoap Python library

In our code editor, create a file named server.py and add a class named AlarmResource. This class will manage CoAP requests for the alarm resource.

# server.py

import aiocoap.resource as resource
import aiocoap

class AlarmResource(resource.Resource):
    """This resource supports the PUT method.
    PUT: Update state of alarm."""

    def __init__(self):
        super().__init__()
        self.state = "OFF"

    async def render_put(self, request):
        self.state = request.payload
        print('Update alarm state: %s' % self.state)

        return aiocoap.Message(code=aiocoap.CHANGED, payload=self.state)

Now add a main() method to initialise the server and add the alarm resources to it.

import asyncio

def main():
    # Resource tree creation
    root = resource.Site()
    root.add_resource(['alarm'], AlarmResource())

    asyncio.Task(aiocoap.Context.create_server_context(root, bind=('localhost', 5683)))

    asyncio.get_event_loop().run_forever()

if __name__ == "__main__":
    main()

To test the server we will create a simple client that randomly update the state of the alarm every time it is run (by sending a PUT request with either an “ON” or “OFF” payload).

# client_put.py
import asyncio
import random

from aiocoap import *

async def main():
    context = await Context.create_client_context()
    alarm_state = random.choice([True, False])
    payload = b"OFF"

    if alarm_state:
        payload = b"ON"

    request = Message(code=PUT, payload=payload, uri="coap://localhost/alarm")

    response = await context.request(request).response
    print('Result: %s\n%r'%(response.code, response.payload))

if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(main())

Below is a demo showing the client (left) updating the server (right).

Categories: Internet of things, PythonTags: , , ,

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: