Generators¶
Async Streaming¶
The generator syntax is a way to deliver values into some form of stream asynchronously.
Example:
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import cps.* // asyncStream, await
import cps.monads.{*, given} // support for built-in monads (i.e. Future)
import cps.stream.* // AsyncList
object Example:
def main(args: Array[String]): Unit =
val stream = asyncStream[AsyncList[Future, Int]] { out =>
out.emit(0)
for i <- 1 to 10 do out.emit(i)
}
val f = stream.takeListAll()
val res = Await.result(f, Duration(1, "seconds"))
println(s"res=$res")
I recommend you try cps.stream.AsyncList
.
Here, cps.stream.AsyncList
is a minimal implementation of async stream supplied with dotty-cps-async.
There exist integration modules for well-known async streaming libraries (see section Integrations).
The input of asyncStream
is a code block which should be a lambda expression that accepts an emitter argument; i.e., the simplified definition looks as follows :
inline def asyncStream[R](using a: CpsAsyncAbsorber[R])(f: CpsAsyncEmitter[a.Stream, a.Monad, a.Element) => Unit): R
For a full definition, look at the source:
asyncStream
is the entry point for macroCpsAsyncEmitAbsorber[R]
is an adapter from a generator to a stream of the given type.CpsAsyncEmitter
is a trait with operationsemit
, which should be called insideasyncStream
orasync
block.
Writing generator adapters for custom streams¶
- To allow generator syntax for your stream, you need to implement trait
CpsAsyncEmitAbsorber[R]
whereeval
accepts a cps-transformed function and outputs the result stream.
dotty-cps-async provides a platform-specific trait BaseUnfoldCpsAsyncEmitAbsorber
which can simplify generator implementations for streams having something like unfoldAsync[S, E](s: S)(f: S => F[Option[(S, E)]]): R
.
For example, look at the implementation of CpsAsyncEmitAbsorber[R]
for Akka Streams source:
given AkkaStreamEmitAbsorber[T](using ExecutionContext):
BaseUnfoldCpsAsyncEmitAbsorber[Source[T, NotUsed], Future, T] with
override type Element = T
def unfold[S](s0: S)(f: S => Future[Option[(T, S)]]): Source[T, NotUsed] =
Source.unfoldAsync[S, T](s0)((s) => f(s).map(_.map{ case (x, y) => (y, x) }) )