Working with stream data

This is the simplest rule to create. We use the Function sensor with the binding feature to set the runtime parameter we are interested:

${streamdata.temperature}

There are two different ways to set the runtime data for the Function, where the option without binding feature would be <GLOBAL.temperature>

image

The resource is a unique identifier of a ‘thing’. When a ‘thing’ pushes streaming data to the Waylay platform, it provides its unique identifier, i.e. a resource name. Each resource can push multiple parameters to the Waylay broker. The Waylay framework will automatically distribute resource parameters to tasks and nodes with the corresponding resource name. E.g. with the execute on data option, sensors with the corresponding resource name will automatically get invoked when new streamed data with the same resource name becomes available. The resource name can be specified at the task level and at the node level. In case we have many sensors in the task that share the same resource name, or if we want to invoke tasks that share the same template for different sensors (e.g. water meters), we may want to specify it at the task level and inherit it at the node level via the $ symbol. More about this you can find here

If we start a task using this template (e.g. saved as “template1”) in the reactive mode like this:

 curl --user apiKey:apiSecret -H "Content-Type:application/json" -X POST -d '{
    "name": "Stream processing",
    "template": "template1",
    "resource": "testresource",
    "type": "reactive"
  }' https://sandbox.waylay.io/api/tasks

In case we want to start a task without a template, we could as we have done this by simply [declaring the rule in the request] (/api/rest/#create-a-task-with-rule-defined-in-the-request/):

curl --user apiKey:apiSecret -H "Content-Type:application/json" -X POST -d '{
"sensors": [
    {
      "label": "Function_1",
      "name": "Function",
      "version": "1.0.1",
      "position": [
        244,
        248
      ],
      "properties": {
        "formula": "${streamdata.temperature}",
        "threshold": "21"
      },
      "sequence": 0,
      "tickTrigger": false,
      "dataTrigger": true,
      "resource": "$"
    }
  ],
  "actuators": [
    {
      "label": "debugDialog_1",
      "name": "debugDialog",
      "version": "1.0.5",
      "position": [
        536,
        202
      ],
      "properties": {
        "message": "Value is ${nodes.Function_1.rawData.formulaValue}"
      }
    }
  ],
  "relations": [],
  "notes": [],
  "triggers": [
    {
      "sourceLabel": "Function_1",
      "destinationLabel": "debugDialog_1",
      "invocationPolicy": 0,
      "statesTrigger": [
        "Above"
      ]
    }
  ]
  "task": {
    "type": "reactive",
    "start": true,
    "name": "Rule 1",
    "resource": "testresource"
  }
}' "https://sandbox.waylay.io/api/tasks"

and if data gets pushed via broker:

 curl --user apiKey:apiSecret 
    -H "Content-Type: application/json"
    -X POST  
    -d '{ 
         "temperature": 23, 
         "humidity": 73, 
         "resource": "testresource", 
         "domain": "sandbox.waylay.io"
      }'
      "https://data.waylay.io/messages?store=false"

We can see the debug message any time new data arrives (with temperature above 21)

image

How to use a CEP unit in the Function node

In case we want to use formula processing, which goes beyond simple threshold processing, we can use the CEP notation as described here. When working with stream data, instead of declarative binding we will use a different notation for stream data <GLOBAL.$value>, where $value needs to be replaced with the exact metric you want to process.

For instance, if we want to check if the temperature is changing between two consecutive stream payloads, we can do the following abs(<GLOBAL.temperature> - <GLOBAL.temperature>[-1]) and put the threshold to 0. Any change of the temperature will result in the Function node to be in the Above state.

image