High-order functions.

dotty-cps-async supports the automatic transformation of high-order functions, where the lambda expression argument contains await.

For example, assume an HTTP client providing the following interface to fetch some data from a list of remote servers.

trait HttpClient:
  def fetchData(url: String): Future[String]

Then, we can fetch data from all servers just by using await in the map argument:

urls.map( await(httpClient.fetchData(_)) )

Note that the default map method will run all operations sequentially. Sequential evaluation is needed to allow the code to work correctly for a case of updating the multidimensional array in a for loop with asynchronous operations.

If we want all requests to run in parallel, we can start them in one map and, when all started - wait for the end of requests:

urls.map( httpClient.fetchData(_) ).map(await(_))

For handling awaits inside high-order functions, dotty-cps-async uses different strategies in dependency of execution runtime capabilities.

Async shift substitution.

The most general way is compile-time substitution: during async transform, dotty-cps-async substitutes method map with signature List[A].map[B](f: A => B) to

summon[AsyncShift[List[A]]].map[F[_], B](c, summon[CpsMonad[F]])

which is implemented in the cps runtime with the signature

AsyncShift[List[A]].map[F[_], B](obj: List[A], cpsMonad: CpsMonad[F])(f: A => F[B])

dotty-cps-async includes implementations of shifted methods for most objects of the Scala standard library.

So, we can write something like

val x = cache.getOrElse( await(fetchData() )

Loom-based runtime await.

JDK-21 includes a set of interfaces (project Loom) that allows execution of code in virtual threads, where runtime blocking wait is not blocking from OS view: real thread can execute tasks from other virtual threads during the wait. In this case, we don’t need to substitute a high-order function but change instead the function argument to the original form. Target monad should implement one of the CpsRuntimeAwait or CpsRuntimeAwaitProvider typeclasses (see sources )

For enabling this future, add dotty-cps-async-loom artifact to the dependencies of your project.

How to provide shifted functions.

Functional interface.

Suppose you want to make high-order methods of your class C be able to accept lambda functions with await. For that purpose, you have to implement the given AsyncShift[C] type class with a shifted version of your high-order methods. Such a ‘shifted’ version has an additional type parameter F[_] and an additional list of arguments, inserted first, containing the original object instance and an appropriate CpsMonad[F] instance.

Parameters should be changed in the following way:

  • If the original parameter has type A => B, then changed: A => F[B]

  • If the original parameter is called by name with type => A, then changed: () => F[A]

  • Otherwise, the changed parameter has the same type as the original.

Example:

case class TaggedValue[T](tag: String, value: T)
  def update[S](f: T => S): TaggedValue[S] =
    TaggedValue(tag, f(x))

// Below the changed code:
// - type `T => S` of argument `f` becomes `T => F[S]`
// - `(o, m)` is prepended as the first argument list

class TaggedValueAsyncShift[T] extends AsyncShift[TaggedValue[T]]:
  def update[F[_], S](o: TaggedValue[T], m: CpsMonad[F])(f: T => F[S]): F[TaggedValue[S]] =
    f(value).map(TaggedValue(tag,_))

object TaggedValue:
  transparent inline given shiftedTaggedValue[T] as AsyncShift[TaggedValue[T] =
    TaggedValueAsyncShift[T]()

Object-oriented interface.

In some cases, we use classes – defined in an object-oriented manner – with private data. If we want a class to provide an API for dotty-cps-async, then we can do this without breaking encapsulation. What is needed - to implement an async-shifted version of the function inside our class:

Example:

class MyIntController:
  private var x: Int = 0

  def modify(f: Int => Int): Int =
    val old = x
    x = f(x)
    sendSignal(x)
    old

  def modify_async[F[_]](m: CpsMonad[M])(f: Int => F[Int]): F[Int] =
    val old = x
    m.map(f(x))(_ => { sendSignal(x); old })
As we have seen, shifted functions have an additional type parameter: F[_] and a parameter CpsMonad[F] (or a more specific type, if needed). Async transformer will substitute the call of modify into modify_async during compilation.

Sometimes, we already have F[_] as the type parameter of the enclosing class. In such a case, we can omit those additional parameters in the async variant.

Note that you should carefully decide whether you need async function support and how to deal with concurrent modifications. For example, in the code snippet below, different changes will interleave with each other.

Usually, low-level constructs do not need async counterparts.

Special semantics for substitutions in call chains

Consider a chain of calls, which accepts async-shifted functions. One example is withFilter from the Scala collections library. Let’s look at the following code:

for {
  url <- urls if await(status(url)) == Active
  items <- await(api.retrieveItems(url))
  item <- items
} yield item

Here, the usual semantics of withFilter assume that we iterate over urls only once. But if we translate this expression according to the standard rules, we will receive two passes: one pass in async withFilter and the second pass in flatMap.

To perform the iteration once, we translate withFilter not to F[WithFilter] but to a substituted type DelayedWithFilter, which holds the received predicate and delays actual evaluation upon the call of the next operation in the chain.

The implementation of class DelayedWithFilter looks like:

class DelayedWithFilter[F[_], A, C[X] <: Iterable[X], CA <: C[A]](
    c: CA,
    m: CpsMonad[F],
    p: A => F[Boolean],
) extends CallChainAsyncShiftSubst[F, WithFilter[A, C], F[WithFilter[A, C]] ]
{
  // return eager copy
  def _finishChain: F[WithFilter[A, C]] = //...

  def withFilter(q: A => Boolean): DelayedWithFilter[F, A, CX, CA] = //...
  def withFilter_async(q: A=> F[Boolean]) = //...

  def map[B](f: A => B): F[C[B]] = //...
  def map_async[B](f: A => F[B]): F[C[B]] = //...

  def flatMap[B](f: A => IterableOnce[B]): F[C[B]] = //...
  def flatMap_async[B](f: A => F[IterableOnce[B]]): F[C[B]] = //...

  def foreach[U](f: A => U): F[Unit] = //...
  def foreach_async[U](f: A => F[U]): F[Unit] = //...
}

I.e., in the delayed variant, all original methods should collect operations into the next delayed object or perform an actual batched call. We also have the method _finishChain, which is called when we have no next call in the chain; an example of such a case is val x = c.withFilter(p).

By convention, the substituted type should be derived from trait CallChainAsyncShiftSubst[F, T, FT].

This structure has a nice categorical interpretation. If you are curious about that, read details in Categorical interpretation for substitutions in async call chains:.

Builder methods.

Yet one common usage pattern of high-order functions is builder methods, where we use high-order functions to build some processing algorithm.

trait ReadChannel[F, A]:

  def map(f: A => B): ReadChannel[F, B]

Here, method map is used for building the streaming interface. We can provide an async variant of map which will return the same type as the original function:

trait ReadChannel[F, A]:

  def map(f: A => B): ReadChannel[F, B]

  def mapAsync(f: A => F[B]): ReadChannel[F, B]

Also, we can see that our channel structure is already build on top of F[_], so it is not necessary to pass F to method parameter.

For convenience, dotty-cps-async supports both naming variants of mapAsync: camelCase mapAsync and snake_case map_async.

We propose to use the following convention when naming such methods:

  • use method_async when the async method will unlikely be called directly by the programmer and will be used only for substitution in high-order function;

  • use methodAsync when we expect that the developer can use this method directly along with cps substitution.

Async high-order functional interfaces

For a case with an asynchronous high-order function interface (i.e. methods which accept functions like f:(A => F[B])), the async macro can automatically transform the asynchronous result to have the same signature,

so you can use await calls inside async lambdas without implementing additional methods or type classes.

Automatic generation of shifted functions

When dotty-cps-async-compiler-plugin is enabled, it is possible to generate shifted functions automatically in simple cases: where the higt-order function

calls argument (but not store it in a variable or in an object field). In such cases, you can annotate your high-order method with cps.plugin.makeCps annotation and the plugin will generate a shifted function with the same name as the original function but with the

shifted arguments and suffix _async.

Example: .. code-block:: scala

import cps.plugin.annotation.makeCps

case class Summary(min:Double, max:Double, sum:Double, n:Int)

object MeasurementsOps {

@makeCps def gatherStatistics[A](data: Seq[A])(f: A=>Double): Summary = {

data.foldLeft(Summary(Double.MAX,Doube.MIN,0,0)) { (s,e) =>

val c = f(e) s.copy(min=Math.min(s.min,c),max=Math.max(s.max,c), sum=c.sum+c, n=s.n+1)

}

}

}

With this definition we can use gatherStatistics with async context even on platforms, which do not support continuations. This stared as a work of Olena Kravchenko in Google Summer of Code 2023 project.