A functional approach to protocol decoding

Intro

A few months ago I was confronted with the task to decode the communication of several devices on a CANopen bus. The dump was produced by the candump program, which is part of the GNU can-utils library, whose output looks similar to this:

can0  70F   [1]  7F
can0  721   [1]  7F
can0  0A1   [8]  02 FF 01 00 03 00 00 00
can0  000   [2]  82 00

My wish was to make this human readable, i.e. plain text. Without any deeper thought or reason I chose a (in my opinion) very functional approach for this task which is of so general nature that I wanted to devote it a blog post. Since CANopen is a rather complicated protocol and its details have nothing to do with the presented solution I will use a much simpler dummy protocol to prove my point. This does not affect generality however.

My original code was written in Python so I will start presenting my original solution in Python, followed by an implementation in Haskell with stronger type safety.

Protocol

Every protocol message consists of exactly 2 bytes. The first byte identifies the sender and the second byte the status of the sender, thus this is a read-only protocol where participants just broadcast their status.

The question of message send arbitration, etc. are not of interest here. We just assume we have some utility program which in the end will give us a file with the following contents

(1,2)
(3,0)
(5,1)
...

That means: One tuple per line with sender ID first followed by the status.

The network has the following known participants:

Name ID Status Codes
Motor_A 21 0: Not moving, 1: Moving, 2: Hardware defect
Motor_B 22 0: Not moving, 1: Moving, 2: Hardware defect
Motor_C 23 0: Not moving, 1: Moving, 2: Hardware defect
Sensor_A 41 0: Ok, 1: Above threshold, 2: Below threshold
Sensor_B 42 0: Ok, 1: Above threshold, 2: Below threshold
GPIO_A 51 0: Off, 1: On
GPIO_B 52 0: Off, 1: On
Device_A 2 0: Idle, 1: Processing, 2: Sending, 3: Error
Device_B 4 0: Idle, 1: Calculating

The details are really not important, the key point is: for a different device ID the same status code means a different thing, so we have to distinguish the devices in order to understand the meaning of the message. Note however that we have some very crude grouping in the sense that all motors start at ID 20, all sensors at ID 40, and so on.

Implementation

The concept is very simple: We are using a list of functions, so called interpreters, to handle the input. We only provide an interpreter for the things we are interested in. An interpreter is nothing more than a function which either returns None or a string. An interpreter receives a protocol message as its input and decides if it knows how to interpret (handle) this message. If it doesn’t it returns None. If it does it extracts the information and returns a nicely formatted string of the message’s content.

Now comes the core idea of the implementation. The main decoding function does nothing more than looping through all available interpreters. The return value of the first interpreter not returning None will be used as the decoding result. If no interpreter matches some default behavior is invoked.

Here is a possible implementation:

def create_group_interpreter(range_start, range_end, status_map, group_name):
    def interpreter(ID, status):
        if range_start <= ID < range_end:
            device_nr = ID - range_start
            status_string = status_map.get(status, None)
            if status_string is None:
                return None
            else:
                return (f"{group_name} #{device_nr}: {status_string}")
        return None
    return interpreter


motor_status_map = {
    0: "Not moving",
    1: "Moving",
    2: "Hardware defect",
}

sensor_status_map = {
    0: "Ok",
    1: "Above threshold",
    2: "Below threshold",
}

gpio_status_map = {
    0: "Off",
    1: "On",
}


motor_interpreter = create_group_interpreter(20, 30, motor_status_map, "Motor")
sensor_interpreter = create_group_interpreter(40, 50, sensor_status_map, "Sensor")
gpio_interpreter = create_group_interpreter(50, 58, gpio_status_map, "GPIO")


def device_a_interpreter(ID, status):
    if ID == 2:
        status_map = {
            0: "Idle",
            1: "Processing",
            2: "Sending",
        }
        status_string = status_map.get(status, None)
        if status_string is None:
            return None
        return (f"Device_A: {status_string}")
    return None


def device_b_interpreter(ID, status):
    if ID == 4:
        status_map = {
            0: "Idle",
            1: "Calculating",
        }
        status_string = status_map.get(status, None)
        if status_string is None:
            return None
        return (f"Device_B: {status_string}")
    return None

# Main application code starts here

interpreters = [motor_interpreter,
                sensor_interpreter,
                gpio_interpreter,
                device_a_interpreter,
                device_b_interpreter]


def decode(ID, status):
    for interpreter in interpreters:
        interpretation = interpreter(ID, status)
        if interpretation is not None:
            return interpretation
    return None


if __name__ == '__main__':
    import sys
    import re

    message_re = re.compile(r'\((?P<ID>\d+),(?P<status>\d+)\)')
    protocol_dump_file = sys.argv[1]

    def unknown(ID, status):
        print(f"Unknown ID {ID} with status {status}")

    with open(protocol_dump_file, 'r') as protocol_dump:
        for line in protocol_dump.readlines():
            m = message_re.match(line)
            if m:
                ID = int(m.group('ID'))
                status = int(m.group('status'))
                decode_result = decode(ID, status)
                if decode_result is not None:
                    print(decode_result)
                else:
                    unknown(ID, status)

Note: In order to execute this code Python3.6 or higher is required because I use literal string interpolation.

Given the following input

(21,0)
(22,1)
(2,0)
(37,5)
(23,0)
(2,1)
(4,0)
(41,0)
(6,2)
(21,2)
(42,1)
(51,0)
(52,1)

this yields the following output

$ python3.6 fp_decode.py input.txt
Motor #1: Not moving
Motor #2: Moving
Device_A: Idle
Unknown ID 37 with status 5
Motor #3: Not moving
Device_A: Processing
Device_B: Idle
Sensor #1: Ok
Unknown ID 6 with status 2
Motor #1: Hardware defect
Sensor #2: Above threshold
GPIO #1: Off
GPIO #2: On

As the above code shows we are using interpreter creator functions to create multiple interpreters for every group (motors, sensors and gpios). This allows for maximum code reuse. The interpreter creators make use of closures to provide the status map and device name to the generated interpreter function.

Discussion

With regards to the primitive dummy protocol the above solution may appear over engineered, but its real power starts to shine in more complicated scenarios. It is worth noting that the implementation is extremely flexible. Everything can be customized down to the case of unhandled messages.

Let’s assume that one day the status specification and ID of Device_A change. Instead of digging deep down some big switch case or if/else tree we know that the device_a_interpreter function is the only place we have to look at.

In our example the ID field is just a simple digit, but in a realistic scenario it will be way more complex, e.g. an HTTP header. If this is the case every interpreter can have custom code to decide if it is able to handle the attached payload. Same applies for the status field.

Likewise if one day we decide we are no longer interested in sensor data, then we just remove the sensor interpreter from the interpreter list. That is a one line code change. Simple as that. The main application logic remains untouched.

What are the disadvantages of this implementation? First of all it is not type safe, as usual for Python. We are only using a convention, nothing enforced by a type system. If an interpreter breaks this convention and for example returns an integer instead of None or string this would break our code. In the above implementation we are also not guarding against unknown status codes. If a status code is unknown this is treated as “not interpreted”.

$ python3.6 fp_decode.py bad_input.txt
Motor #1: Not moving
Motor #2: Moving
Device_A: Idle
Unknown ID 37 with status 5
Motor #3: Not moving
Device_A: Processing
Device_B: Idle
Unknown ID 2 with status 4
Sensor #1: Ok
Unknown ID 6 with status 2
Motor #1: Hardware defect
Sensor #2: Above threshold
GPIO #1: Off
GPIO #2: On
Unknown ID 41 with status 6

With the following “bad input”:

(21,0)
(22,1)
(2,0)
(37,5)
(23,0)
(2,1)
(4,0)
(2,4)
(41,0)
(6,2)
(21,2)
(42,1)
(51,0)
(52,1)
(41,6)

Of course this can be fixed, but is not of importance for this post.

We also have to see that the generic loop over all available interpreters will very likely not be able to meet the performance of a hand written solution, but that shouldn’t be too surprising. We are trading performance for generality here and in a typical use case this shouldn’t hurt too much.

Haskell

In the intro I promised to provide a Haskell solution as well. We can more or less copy/paste the Python version, all we need is a bunch of type definitions.

#!/usr/bin/env stack
{- stack
  script
  --resolver lts-12.7
  --package containers
-}


import Control.Monad (forM_)
import Data.IntMap.Strict (IntMap, fromList, (!?))
import System.Environment (getArgs)


data InterpreterResult = NotInterpreted
                       | InterpreterOk String
                       | InterpreterError String

type MessageID = Int
type MessageStatus = Int

newtype Interpreter = Interpreter { getInterpreter :: MessageID -> MessageStatus -> InterpreterResult }


create_group_interpreter :: Int            -- range start
                         -> Int            -- range end (exclusive)
                         -> IntMap String  -- the status map for the group
                         -> String         -- the name of the group
                         -> Interpreter
create_group_interpreter range_start range_end status_map group_name =
  let
    interpreter id status =
      if (range_start <= id) && (id < range_end)
      then
        let device_nr = id - range_start in
        case (status_map !? status) of
          Nothing -> InterpreterError ("Unknown status " ++ (show status) ++
                                       " for " ++ group_name ++ " #" ++ (show device_nr))
          Just s -> InterpreterOk (group_name ++ " #" ++ (show device_nr) ++ ": " ++ s)
      else
        NotInterpreted
  in
    Interpreter interpreter


motor_status_map = fromList [ (0, "Not moving")
                            , (1, "Moving")
                            , (2, "Hardware defect") ]
motor_interpreter = create_group_interpreter 20 30 motor_status_map "Motor"

sensor_status_map = fromList [ (0, "Ok")
                             , (1, "Above threshold")
                             , (2, "Below threshold") ]
sensor_interpreter = create_group_interpreter 40 50 sensor_status_map "Sensor"

gpio_status_map = fromList [ (0, "Off")
                           , (1, "On") ]
gpio_interpreter = create_group_interpreter 50 58 gpio_status_map "GPIO"


device_a_interpreter_func id status =
  let
    statusMap = fromList [ (0, "Idle")
                         , (1, "Processing")
                         , (2, "Sending") ]
  in
    if (id == 2)
    then
      case (statusMap !? status) of
        Nothing -> InterpreterError ("Unknown status " ++ (show status) ++ " for Device_A")
        Just s -> InterpreterOk ("Device_A: " ++ s)
    else
      NotInterpreted

device_a_interpreter = Interpreter device_a_interpreter_func


device_b_interpreter_func id status =
  let
    statusMap = fromList [ (0, "Idle")
                         , (1, "Calculating") ]
  in
    if (id == 4)
    then
      case (statusMap !? status) of
        Nothing -> InterpreterError ("Unknown status " ++ (show status) ++ " for Device_B")
        Just s -> InterpreterOk ("Device_B: " ++ s)
    else
      NotInterpreted

device_b_interpreter = Interpreter device_b_interpreter_func

-- main application logic starts here

interpreters = [ motor_interpreter
               , sensor_interpreter
               , gpio_interpreter
               , device_a_interpreter
               , device_b_interpreter
               ]


decodeWithInterpreters :: [Interpreter] -> MessageID -> MessageStatus -> InterpreterResult
decodeWithInterpreters interpreters id status =
  let tryInterpreter (Interpreter curr) next =
        case (curr id status) of
          NotInterpreted -> next
          result -> result
  in foldr tryInterpreter NotInterpreted interpreters


decode = decodeWithInterpreters interpreters


onOk :: String -> IO ()
onOk s = putStrLn s

onErr :: String -> IO ()
onErr s = putStrLn ("Error! " ++ s)


handleResult err ok res =
  case res of
    NotInterpreted -> return ()
    InterpreterOk s -> ok s
    InterpreterError s -> err s


main = do
  fileName <- head <$> getArgs
  fileContent <- readFile fileName
  let messages = map read (lines fileContent) :: [(Int,Int)]
      printResult = handleResult onErr onOk
  forM_ messages (printResult . (uncurry decode))

The Haskell version provides the same output as the Python version. Additionally it also handles the case of status lookup failure:

$ stack fp_decode.hs input.txt 
Motor #1: Not moving
Motor #2: Moving
Device_A: Idle
Motor #3: Not moving
Device_A: Processing
Device_B: Idle
Sensor #1: Ok
Motor #1: Hardware defect
Sensor #2: Above threshold
GPIO #1: Off
GPIO #2: On
$
$ stack fp_decode.hs bad_input.txt
Motor #1: Not moving
Motor #2: Moving
Device_A: Idle
Motor #3: Not moving
Device_A: Processing
Device_B: Idle
Error! Unknown status 4 for Device_A
Sensor #1: Ok
Motor #1: Hardware defect
Sensor #2: Above threshold
GPIO #1: Off
GPIO #2: On
Error! Unknown status 6 for Sensor #1

Note: The code in the decodeWithInterpreters function uses foldr to achieve the same “loop until the first hit” as in the Python version. If all interpreters have been tried without success then the “default value” NotInterpreted is returned.

Using folds to look through a choice of available handlers is a very common pattern in Haskell. As a real world example the Scotty Webframework uses such a pattern for routing. But this deserves a post on its own.