Generators

Async Streaming

The generator syntax is a way to deliver values into some form of stream asynchronously.

Example:

import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import cps.*                  // asyncStream, await
import cps.monads.{*, given}  // support for built-in monads (i.e. Future)
import cps.stream.*           // AsyncList

object Example:

  def main(args: Array[String]): Unit =
    val stream = asyncStream[AsyncList[Future, Int]] { out =>
      out.emit(0)
      for i <- 1 to 10 do out.emit(i)
    }
    val f = stream.takeListAll()
    val res = Await.result(f, Duration(1, "seconds"))
    println(s"res=$res")

I recommend you try cps.stream.AsyncList.

Here, cps.stream.AsyncList is a minimal implementation of async stream supplied with dotty-cps-async. There exist integration modules for well-known async streaming libraries (see section Integrations).

The input of asyncStream is a code block which should be a lambda expression that accepts an emitter argument; i.e., the simplified definition looks as follows :

inline def asyncStream[R](using a: CpsAsyncAbsorber[R])(f: CpsAsyncEmitter[a.Stream, a.Monad, a.Element) => Unit): R

For a full definition, look at the source:

Writing generator adapters for custom streams

To allow generator syntax for your stream, you need to implement trait

CpsAsyncEmitAbsorber[R] where eval accepts a cps-transformed function and outputs the result stream.

dotty-cps-async provides a platform-specific trait BaseUnfoldCpsAsyncEmitAbsorber which can simplify generator implementations for streams having something like unfoldAsync[S, E](s: S)(f: S => F[Option[(S, E)]]): R.

For example, look at the implementation of CpsAsyncEmitAbsorber[R] for Akka Streams source:

given AkkaStreamEmitAbsorber[T](using ExecutionContext):
                             BaseUnfoldCpsAsyncEmitAbsorber[Source[T, NotUsed], Future, T] with

  override type Element = T

  def unfold[S](s0: S)(f: S => Future[Option[(T, S)]]): Source[T, NotUsed] =
    Source.unfoldAsync[S, T](s0)((s) => f(s).map(_.map{ case (x, y) => (y, x) }) )