High-order functions.¶
dotty-cps-async supports the automatic transformation of high-order functions, where the lambda expression argument contains await
.
For example, assume an HTTP client providing the following interface to fetch some data from a list of remote servers.
trait HttpClient:
def fetchData(url: String): Future[String]
Then, we can fetch data from all servers just by using await
in the map
argument:
urls.map( await(httpClient.fetchData(_)) )
Note that the default map
method will run all operations sequentially. Sequential evaluation is needed to allow the code to work correctly for a case of updating the multidimensional array in a for
loop with asynchronous operations.
If we want all requests to run in parallel, we can start them in one map and, when all started - wait for the end of requests:
urls.map( httpClient.fetchData(_) ).map(await(_))
For handling awaits inside high-order functions, dotty-cps-async uses different strategies in dependency of execution runtime capabilities.
Async shift substitution.¶
The most general way is compile-time substitution: during async transform, dotty-cps-async substitutes method map
with signature List[A].map[B](f: A => B)
to
summon[AsyncShift[List[A]]].map[F[_], B](c, summon[CpsMonad[F]])
which is implemented in the cps runtime with the signature
AsyncShift[List[A]].map[F[_], B](obj: List[A], cpsMonad: CpsMonad[F])(f: A => F[B])
dotty-cps-async includes implementations of shifted methods for most objects of the Scala standard library.
So, we can write something like
val x = cache.getOrElse( await(fetchData() )
Loom-based runtime await.¶
JDK-21 includes a set of interfaces (project Loom) that allows execution of code in virtual threads, where runtime blocking wait is not blocking from OS view: real thread can execute tasks from other virtual threads during the wait. In this case, we don’t need to substitute a high-order function but change instead the function argument to the original form. Target monad should implement one of the CpsRuntimeAwait or CpsRuntimeAwaitProvider typeclasses (see sources )
For enabling this future, add dotty-cps-async-loom artifact to the dependencies of your project.
How to provide shifted functions.¶
Functional interface.¶
Suppose you want to make high-order methods of your class C
be able to accept lambda functions with await
.
For that purpose, you have to implement the given AsyncShift[C]
type class with a shifted version of your high-order methods.
Such a ‘shifted’ version has an additional type parameter F[_]
and an additional list of arguments, inserted first, containing the original object instance and an appropriate CpsMonad[F]
instance.
Parameters should be changed in the following way:
If the original parameter has type
A => B
, then changed:A => F[B]
If the original parameter is called by name with type
=> A
, then changed:() => F[A]
Otherwise, the changed parameter has the same type as the original.
Example:
case class TaggedValue[T](tag: String, value: T)
def update[S](f: T => S): TaggedValue[S] =
TaggedValue(tag, f(x))
// Below the changed code:
// - type `T => S` of argument `f` becomes `T => F[S]`
// - `(o, m)` is prepended as the first argument list
class TaggedValueAsyncShift[T] extends AsyncShift[TaggedValue[T]]:
def update[F[_], S](o: TaggedValue[T], m: CpsMonad[F])(f: T => F[S]): F[TaggedValue[S]] =
f(value).map(TaggedValue(tag,_))
object TaggedValue:
transparent inline given shiftedTaggedValue[T] as AsyncShift[TaggedValue[T] =
TaggedValueAsyncShift[T]()
Object-oriented interface.¶
In some cases, we use classes – defined in an object-oriented manner – with private data. If we want a class to provide an API for dotty-cps-async, then we can do this without breaking encapsulation. What is needed - to implement an async-shifted version of the function inside our class:
Example:
class MyIntController:
private var x: Int = 0
def modify(f: Int => Int): Int =
val old = x
x = f(x)
sendSignal(x)
old
def modify_async[F[_]](m: CpsMonad[M])(f: Int => F[Int]): F[Int] =
val old = x
m.map(f(x))(_ => { sendSignal(x); old })
- As we have seen, shifted functions have an additional type parameter:
F[_]
and a parameterCpsMonad[F]
(or a more specific type, if needed). Async transformer will substitute the call ofmodify
intomodify_async
during compilation. Sometimes, we already have
F[_]
as the type parameter of the enclosing class. In such a case, we can omit those additional parameters in the async variant.- Note that you should carefully decide whether you need async function support and how to deal with concurrent modifications. For example, in the code snippet below, different changes will interleave with each other.
Usually, low-level constructs do not need async counterparts.
Special semantics for substitutions in call chains¶
Consider a chain of calls, which accepts async-shifted functions. One example is withFilter
from the Scala collections library. Let’s look at the following code:
for {
url <- urls if await(status(url)) == Active
items <- await(api.retrieveItems(url))
item <- items
} yield item
Here, the usual semantics of withFilter
assume that we iterate over urls
only once. But if we translate this expression according to the standard rules, we will receive two passes: one pass in async withFilter
and the second pass in flatMap
.
To perform the iteration once, we translate withFilter
not to F[WithFilter]
but to a substituted type DelayedWithFilter
, which holds the received predicate and delays actual evaluation upon the call of the next operation in the chain.
The implementation of class DelayedWithFilter
looks like:
class DelayedWithFilter[F[_], A, C[X] <: Iterable[X], CA <: C[A]](
c: CA,
m: CpsMonad[F],
p: A => F[Boolean],
) extends CallChainAsyncShiftSubst[F, WithFilter[A, C], F[WithFilter[A, C]] ]
{
// return eager copy
def _finishChain: F[WithFilter[A, C]] = //...
def withFilter(q: A => Boolean): DelayedWithFilter[F, A, CX, CA] = //...
def withFilter_async(q: A=> F[Boolean]) = //...
def map[B](f: A => B): F[C[B]] = //...
def map_async[B](f: A => F[B]): F[C[B]] = //...
def flatMap[B](f: A => IterableOnce[B]): F[C[B]] = //...
def flatMap_async[B](f: A => F[IterableOnce[B]]): F[C[B]] = //...
def foreach[U](f: A => U): F[Unit] = //...
def foreach_async[U](f: A => F[U]): F[Unit] = //...
}
I.e., in the delayed variant, all original methods should collect operations into the next delayed object or perform an actual batched call.
We also have the method _finishChain
, which is called when we have no next call in the chain; an example of such a case is val x = c.withFilter(p)
.
By convention, the substituted type should be derived from trait CallChainAsyncShiftSubst[F, T, FT]
.
This structure has a nice categorical interpretation. If you are curious about that, read details in Categorical interpretation for substitutions in async call chains:.
Builder methods.¶
Yet one common usage pattern of high-order functions is builder methods, where we use high-order functions to build some processing algorithm.
trait ReadChannel[F, A]:
def map(f: A => B): ReadChannel[F, B]
Here, method map
is used for building the streaming interface. We can provide an async variant of map
which will return the same type as the original function:
trait ReadChannel[F, A]:
def map(f: A => B): ReadChannel[F, B]
def mapAsync(f: A => F[B]): ReadChannel[F, B]
Also, we can see that our channel structure is already build on top of F[_]
, so it is not necessary to pass F
to method parameter.
For convenience, dotty-cps-async supports both naming variants of mapAsync
: camelCase mapAsync
and snake_case map_async
.
We propose to use the following convention when naming such methods:
use
method_async
when the async method will unlikely be called directly by the programmer and will be used only for substitution in high-order function;use
methodAsync
when we expect that the developer can use this method directly along with cps substitution.
Async high-order functional interfaces¶
- For a case with an asynchronous high-order function interface (i.e. methods which accept functions like
f:(A => F[B])
), theasync
macro can automatically transform the asynchronous result to have the same signature, so you can use
await
calls inside async lambdas without implementing additional methods or type classes.
Automatic generation of shifted functions¶
- When dotty-cps-async-compiler-plugin is enabled, it is possible to generate shifted functions automatically in simple cases: where the higt-order function
calls argument (but not store it in a variable or in an object field). In such cases, you can annotate your high-order method with cps.plugin.makeCps annotation and the plugin will generate a shifted function with the same name as the original function but with the
shifted arguments and suffix _async
.
Example: .. code-block:: scala
import cps.plugin.annotation.makeCps
case class Summary(min:Double, max:Double, sum:Double, n:Int)
object MeasurementsOps {
@makeCps def gatherStatistics[A](data: Seq[A])(f: A=>Double): Summary = {
- data.foldLeft(Summary(Double.MAX,Doube.MIN,0,0)) { (s,e) =>
val c = f(e) s.copy(min=Math.min(s.min,c),max=Math.max(s.max,c), sum=c.sum+c, n=s.n+1)
}
}
}
With this definition we can use gatherStatistics with async context even on platforms, which do not support continuations. This stared as a work of Olena Kravchenko in Google Summer of Code 2023 project.