Published: April 10, 2018
by Tobias Pleyer
Tags: haskell, async

Haskell’s async.mapConcurrently

Recently I parallelized chefkoch, so multiple URL fetches would happen simultaneously. I used Haskell’s async package to achieve this. I only used one function, mapConcurrently, which allows you to run an IO function in parallel over a list of values and then collect the results back in a list. The signature looks like this

mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)

Awesomely enough its implementation is also an one-liner, and quite readable actually

mapConcurrently f = runConcurrently . traverse (Concurrently . f)

I wanted to use this implementation as the motive for this post and another code run. I think it shows the beauty and elegance that functional programming with Haskell can offer. Before we dive into the implementation and walk through it I want to give a few remarks

  1. The implementation makes use of the Traversable type class. I will specialize this for lists, which are an instance of Traversable
  2. The monad we are dealing with is the IO monad. Typically the IO monad is opaque, hiding its internals. As a consequence in normal Haskell code you can’t deconstruct IO values. For a better visualization I assume the following: Applying the function myAction to the value dataX will result in the IO action IO resX, i.e. will have the value resX when extracted from the IO action.

With these assumptions in place let’s “run” some code:

mapConcurrently myAction [data1, data2, data3]
-- definition of mapConcurrently
  = runConcurrently . traverse (Concurrently . myAction) [data1, data2, data3]
-- definition of traverse for lists
  = runConcurrently (
      List.foldr (\x ys -> liftA2 (:) ((Concurrently . myAction) x) ys)
                 (pure []) [data1, data2, data3]
    )
-- definition of liftA2 for lists
  = runConcurrently (
      List.foldr (\x ys -> (<*>) (fmap (:) ((Concurrently . myAction) x)) ys)
                 (pure []) [data1, data2, data3]
    )
-- definition of foldr evaluated strictly
  = runConcurrently (
      (\x ys -> (<*>) (fmap (:) ((Concurrently . myAction) x)) ys) data1 (
        (\x ys -> (<*>) (fmap (:) ((Concurrently . myAction) x)) ys) data2 (
          (\x ys -> (<*>) (fmap (:) ((Concurrently . myAction) x)) ys)
            data3 (pure [])
        )
      )
    )
-- definition of `pure` for Concurrently
  = runConcurrently (
      (\x ys -> (<*>) (fmap (:) ((Concurrently . myAction) x)) ys) data1 (
        (\x ys -> (<*>) (fmap (:) ((Concurrently . myAction) x)) ys) data2 (
          (\x ys -> (<*>) (fmap (:) ((Concurrently . myAction) x)) ys)
            data3 (Concurrently (return []))
        )
      )
    )
-- lambda application
  = runConcurrently (
      (\x ys -> (<*>) (fmap (:) ((Concurrently . myAction) x)) ys) data1 (
        (\x ys -> (<*>) (fmap (:) ((Concurrently . myAction) x)) ys) data2 (
          (<*>) (Concurrently ((:) <$> myAction data3)) (Concurrently (return []))
        )
      )
    )
-- definition of <*> for Concurrently
  = runConcurrently (
      (\x ys -> (<*>) (fmap (:) ((Concurrently . myAction) x)) ys) data1 (
        (\x ys -> (<*>) (fmap (:) ((Concurrently . myAction) x)) ys) data2 (
          Concurrently ((\(f,a) -> f a) <$> concurrently ((:) <$> myAction data3) (return []))
        )
      )
    )
-- lambda application and definition of <*> for Concurrently
  = runConcurrently (
      (\x ys -> (<*>) (fmap (:) ((Concurrently . myAction) x)) ys) data1 (
        (<*>) (fmap (:) ((Concurrently . myAction) data2)) (Concurrently ((\(f,a) -> f a) <$> concurrently ((:) <$> myAction data3) (return [])))
      )
    )
-- definition of fmap for Concurrently
  = runConcurrently (
      (\x ys -> (<*>) (fmap (:) ((Concurrently . myAction) x)) ys) data1 (
        (<*>) (Concurrently ((:) <$> myAction data2)) (Concurrently ((\(f,a) -> f a) <$> concurrently ((:) <$> myAction data3) (return [])))
      )
    )
-- definition of <*> for Concurrently
  = runConcurrently (
      (\x ys -> (<*>) (fmap (:) ((Concurrently . myAction) x)) ys) data1 (
        Concurrently ((\(f,a) -> f a) <$> concurrently ((:) <$> myAction data2) ((\(f,a) -> f a) <$> concurrently ((:) <$> myAction data3) (return [])))
      )
    )
-- lambda application and definition of <*> for Concurrently
  = runConcurrently (Concurrently ((\(f,a) -> f a) <$> concurrently ((:) <$> myAction data1) ((\(f,a) -> f a) <$> concurrently ((:) <$> myAction data2) ((\(f,a) -> f a) <$> concurrently ((:) <$> myAction data3) (return [])))))
-- apply runConcurrently (unpack)
  = (\(f,a) -> f a) <$> concurrently ((:) <$> myAction data1) ((\(f,a) -> f a) <$> concurrently ((:) <$> myAction data2) ((\(f,a) -> f a) <$> concurrently ((:) <$> myAction data3) (return [])))
-- myAction dataX = IO resX
  = (\(f,a) -> f a) <$> concurrently ((:) <$> (IO res1)) ((\(f,a) -> f a) <$> concurrently ((:) <$> (IO res2)) ((\(f,a) -> f a) <$> concurrently ((:) <$> (IO res3)) (return [])))
-- 'execute' IO actions and apply definition of `concurrently`
  = (\(f,a) -> f a) <$> concurrently ((:) <$> (IO res1)) ((\(f,a) -> f a) <$> concurrently ((:) <$> (IO res2)) ((\(f,a) -> f a) <$> (IO ((:) res3, []))))
-- definition of `concurrently`
  = (\(f,a) -> f a) <$> concurrently ((:) <$> (IO res1)) ((\(f,a) -> f a) <$> concurrently ((:) <$> (IO res2)) (IO ((:) res3 [])))
-- definition of `concurrently`
  = (\(f,a) -> f a) <$> concurrently ((:) <$> (IO res1)) ((\(f,a) -> f a) <$> concurrently ((:) <$> (IO res2)) (IO [res3]))
-- ...
  = [res1, res2, res3]

Comments

Thanks to traverse the implementation of mapConcurrently almost comes naturally. Here is the quote from the documentation of traverse:

Map each element of a structure to an action, evaluate these actions from left to right, and collect the results.

But this gives us only the collection of the results, not automatically concurrency, or in this case parallelism. This comes from the Concurrently type class, more precisely the Applicative instance of Concurrently. But let’s first look at the definition of Concurrently and its Functor implementation:

newtype Concurrently a = Concurrently { runConcurrently :: IO a }

instance Functor Concurrently where
  fmap f (Concurrently a) = Concurrently $ f <$> a

So a value of type Concurrently is just a thin wrapper around an IO action. The Functor implementation says that applying a function to a value of a concurrent action just applies the function to the IO action and then runs it concurrently.

Now let’s move to the Applicative implementation:

instance Applicative Concurrently where
  pure = Concurrently . return
  Concurrently fs <*> Concurrently as =
    Concurrently $ (\(f, a) -> f a) <$> concurrently fs as

So a pure concurrent action is an action which returns immediately, no work done. Isn’t it wonderful how close this comes to what we would think of a “pure” action? But now to the central part: the implementation of (<*>). The implementation makes use of the concurrently primitive. It is not really a primitive, it is a normal function which runs two concurrent actions and returns both their results as a tuple. But I call it primitive because it is the smallest interaction unit: it runs exactly two IO actions concurrently, which is smallest in the sense that the next smallest thing, running one action, is already not concurrency anymore.

In this sense we can formulate the implementation of (<*>) as follows:

If we have a cocncurrent IO action which produces a function, and a concurrent IO action which produces a value, then the effect of applying this function to the value concurrently is to run both actions concurrently and then apply the resulting function to the value.

When you write it out like that it really sounds like the most obvious thing to do…

But the nice part is that this gives us enough to run an arbitrary amount of concurrent computations, because the Concurrent class composes and one or both of the arguemnts to (<*>) can consists of more concurrent computations. This is exactly what we can observe in the bottom part of the above code transformations: When we have decomposed everything into one big Concurrently value and then unpacked it, all that was left was a nested call chain of concurrently function calls. Quite beautiful indeed!