Developers who have experience with Scala know how to work with side effects. Find them, wrap each of them with IO.delay{..}
to get an effect, chain them together using Apply[F]
or Monad[F]
, run the result program as IOApp
.
In this case all side effects are launched in such an order as they are chained in the result IO[..]
. But there is another way of working with a certain type of effectful computations. It could improve testability and control over how these computations are executed. So in this article I’d like to show how to gather and process these effects later, after the logic itself is executed in full.
Idea & Motivation
Let’s start with a more precise definition of an effectful computation. In this article I am going to use this term as a synonym for a computation with a side effect that is wrapped in IO.delay(..)
.
Example of a computation with a side effect:
def unsafeSumAndPrint(in: List[Int]): Unit = {
val sum = in.sum
println(s"Sum: $sum")
}
Example of an effectful computation:
def safeSumAndPrint(in: List[Int]): IO[Unit] =
IO.delay(unsafeSumAndPrint(in))
The next step is to split effectful computations on several categories:
- The first one is a generic computation of
IO[A]
.
This computation returns a value ofA
as a result where the typeA
has 2 or more possible values. LikeIO[Boolean]
,IO[Int]
,Either[Err, String]
and so on. Also this returned value ofA
is required by the code following its computation1. - The second type is
IO[Nothing]
- a computation that runs forever. - The third type is
IO[Unit]
- a finite2 effectful computation that does not return anything important for logic. In the article this is the one that we are interested in.
So why we are interested in IO[Unit]
? It represents an effect without a meaningful result apart from a possible error and the fact of its execution.
There are several examples of such logic in practice:
- logging
- tracing
- metrics
- launch of a background computation
- updating values:
- setting a value of Ref[IO, ..]
- updating info in a cache
- publishing an event
and so on.
Next question - why do we need to gather these effectful computations? Why not to run them right away?
Consider the next code example:
final class Service[F[_]: Monad](
db: DB[F],
metrics: ServiceMetrics[F],
log: Logger[F],
cache: Cache[F]
) {
private def process(user: User, input: Input, data: UserData): F[Result] = ???
def businessLogic(user: User, input: Input): F[Result] = for
userData <- db.load(user.id)
result <- process(user, input, userData)
_ <- metrics.incrementForUser(user.id)
_ <- log.info(s"User data for '${user.userName}' has been processed: $userData")
_ <- cache.update(user.id, userData)
yield result
}
Above you can see a piece of business logic implemented in a classic way. All related effectful computations(metrics, logging, caching) are directly included there. But it can be implemented in a better way.
final case class Processed(userId: UserId, userName: Username, data: UserData)
// More info about Tell[..] will be provided later in the article
type TellProcessed[F[_]] = Tell[F, Processed]
final class Service[F[_]: Monad: TellProcessed](db: DB[F]) {
private def process(user: User, input: Input, data: UserData): F[Result] = ???
def businessLogic(user: User, input: Input): F[Result] = for
userData <- db.load(user.id)
result <- process(user, input, userData)
_ <- Processed(user.id, user.userName, userData).tell[F]
yield result
}
So here instead of launch all effectful logic right away, we generate the Processed(..)
event and put it in the F[_]
effect using .tell[F]
. Of course we must put the logic of launching metrics.incrementForUser(..)
, log.info(..)
and cache.update(..)
somewhere using the info from that Processed(..)
event. But we can put it aside and do it later outside from the business logic. And now this logic is easier to understand and test as it has fewer dependencies and better decoupling.
Using Tell[..]
As you can see in the example above we need Tell[F[_], L]
to make this approach work. Tell[F[_], L]
is a special type class from the cats-mtl
library that can be used to put a value of L
to the container F[_]
.
The function .tell[F]
from the example above is provided by this type class and its syntax. The next thing we need is WriterT[F[_], L]
. This data structure is a monad transformer that has an instance of Tell[..]
. If we check the definition of WriterT
we will see this:
WriterT[F[_], L, V](run: F[(L, V)])
It means that there can be only one value of L
stored in the effect. In order to have there several values we have to explicitly use some kind of a collection there. After putting everything together we will get.
BasicTypes.scala:
object BasicTypes {
/** Adding this additional type alias to make our types a bit more readable. */
final type TellLogs[F[_]] = Tell[F, Log]
/** Our end-of-the-world effect */
type Eff[T] = WriterT[IO, Chain[Log], T]
/** Reliable version of the effect */
type REff[T] = ReliableWriterT[IO, Throwable, Chain[Log], T]
}
Tell[F, Log] vs Tell[F, Chain[Log]]
We have all but one thing. It is necessary to marry these types Eff[T] = WriterT[IO, Chain[Log], T]
and type TellLogs[F[_]] = Tell[F, Log]
together. As you can see, we use Log
not Chain[Log]
in the TellLogs
definition. It is needed to be compatible both with cats-effect
and ZIO
. In the cats
stack we have to use WriterT[.., Chain[Log], ..]
to make a structure that contains our log records. Cats’ Writer[..]
does not have an internal collection for these records. In ZIO though we are going to use ZPure[..]
that already have an internal Chunk[A]
that stores Log
.
If we define TellLogs
as
final type TellLogs[F[_]] = Tell[F, Chain[Log]]
It is going to work out of the box with cats’ WriterT
but it will be necessary to implement some additional logic for ZPure
.
If we define TellLogs
as
final type TellLogs[F[_]] = Tell[F, Log]
then it is vice versa - a very simple implementation of Tell for ZPure
but requires a bit of a hack for cats’ WriterT
. The difference between these two approaches is not that big but nevertheless there is a difference.
So we have to choose. In the scope of this article I am going to use the next considerations Tell[F, L]
:
- Tell defines an ability of pulling multiple values of
L
in theF[_]
container. - Adding a particular collection the definition of
Tell[..]
means to expose implementation details, thus it is better to avoid it.
Following these considerations means that we need to stick with the second option - define TellLogs[F[_]] = Tell[F, Log]
and make it work for cats’ WriterT
.
In order to do that we need to add a custom implementation of WriterT
like this.
TellExtension.scala:
object TellExtension {
given TellForChain[F[_], L](using T: Tell[F, Chain[L]]): Tell[F, L] =
new Tell[F, L]:
def functor: Functor[F] = T.functor
def tell(l: L): F[Unit] = T.tell(Chain.one(l))
}
Basically what we do here is next. If we have T: Tell[F, Chain[L]]
then we can use it for putting l: L
into F[_]
by wrapping it as Chain.one(..)
. Then we need to import this TellForChain
when we use our WriterT
based effect. See the example for Cats Effect below.
Now we are ready to move on. Let’s implement a service with logging using all the tools from above.
Instead of defining our logging as a trait, we will use enum
from Scala 3.
Log.scala:
/** Example of half structured log - its cases have additional info.
* We are going to use it as a data structure for gathering info needed to be logged.
*/
enum Log {
case Info(tag: String, input: String, output: String)
case Error(tag: String, input: String, error: Throwable)
}
And at last - the service itself:
Example1.scala
trait Example1 {
/** This is a basic service with two operations */
trait Service[F[_]]:
def operation1(in: Input1): F[Output1]
def operation2(in: Input2): F[Output2]
/** This is its implementation. As constructor params it requires two low level operations. In a
* real system it might be DB operations or http calls to an external system. Also it requires
* `T: TellAppLogs[F]` which is used to put instances of `AppLog` into `F[_]`
*/
protected final class ServiceImpl[F[_]: Monad](
lowLvlOp1: Input1 => F[Output1],
lowLvlOp2: Output1 => F[Output2],
lowLvlOp3: Input2 => F[Output1]
)(using
T: TellTagLogs[F]
) extends Service[F]:
override def operation1(in: Input1): F[Output1] = for
out1 <- lowLvlOp1(in)
// Here we see how log elements can be added to F[_]
_ <- tellOne(Log.Info("operation1", in.show, out1.show))
out2 <- lowLvlOp2(out1)
_ <- tellOne(Log.Info("operation1", in.show, out2.show))
yield out1
override def operation2(in: Input2): F[Output2] = for
out1 <- lowLvlOp3(in)
// and here as well
_ <- Log.Info("operation2", in.show, out1.show).tell[F]
out2 <- lowLvlOp2(out1)
_ <- Log.Info("operation2", in.show, out2.show).tell[F]
yield out2
}
Logging as a capability
It is possible to define a logger as a capability in order to abstract over the way of collecting logs.
LogCapability.scala:
trait LogCapability[F[_]] {
def info(tag: String, input: String, output: String): F[Unit]
def error(tag: String, input: String, error: Throwable): F[Unit]
}
This interface provides an additional layer of abstraction. It can be implemented in two ways:
- straight forward logging using Sync-based implementation
/** Writes log messages straight to the console */
final class ConsoleLogCapability[F[_]: Console] extends LogCapability[F]:
def info(tag: String, input: String, output: String): F[Unit] =
Console[F].println(s"Info[tag:$tag; input:$input; output:$output]")
def error(tag: String, input: String, error: Throwable): F[Unit] =
Console[F].println(s"Error[tag:$tag; input:$input; error:$error]")
- implementation based on WriterT
/** Collects logs into the F[_] */
final class TellBasedLogCapability[F[_]](using T: TellLogs[F]) extends LogCapability[F]:
def info(tag: String, input: String, output: String): F[Unit] =
T.tell(Log.Info(tag, input, output))
def error(tag: String, input: String, error: Throwable): F[Unit] =
T.tell(Log.Error(tag, input, error))
Then it is possible to use it this way:
private val Log: LogCapability[F] = LogCapability[F]
override def operation1(in: Input1): F[Output1] = for
out1 <- lowLvlOp1(in)
_ <- Log.info("operation1: out1", in.show, out1.show)
out2 <- lowLvlOp2(out1)
_ <- Log.info("operation1: out2", in.show, out2.show)
yield out1
It is possible to use this approach even with such libraries as log4cats by making a custom implementation of Logger[F], MessageLogger[F] and/or ErrorLogger[F] from there similar to TellBasedLogCapability[F]
.
What about Errors?
So far so good. But there is another question - how does WriterT
work with errors?
Check this code example:
type Logs = Chain[Log]
final case class Error(msg: String) extends AnyVal
final case class Log(msg: String) extends AnyVal
def main(args: Array[String]): Unit = {
type Eff1[A] = WriterT[SyncIO, Logs, A]
def eff1: Eff1[Int] = for
// this log record is lost
_ <- Tell[Eff1, Log].tell(Log("It is going to fail"))
// we can see only this error
_ <- MonadThrow[Eff1].raiseError(new RuntimeException("failure"))
yield 1
val result1: Either[Throwable, (Logs, Int)] = eff1.run.attempt.unsafeRunSync()
println(s"result1: $result1") // result1: Left(java.lang.RuntimeException: failure)
// omitted ...
}
If we run a program of type WriterT[SyncIO, Chain[String], A]
then we will get result1: Either[Throwable, (Logs, Int)]
. From this type we see that the Logs
value is returned only if there are no errors during execution of the program - the right part of Either
. In case of any error we get only Throwable
without Logs
.
ReliableWriterT
3 can help us to solve this problem. It is a special version of WriterT
that is aware of errors that can happen in F[_]
.
def main(args: Array[String]): Unit = {
// omitted ...
type Eff2[A] = ReliableWriterT[SyncIO, Throwable, Logs, A]
def eff2Success: Eff2[Int] =
for _ <- Tell[Eff2, Logs].tell(Chain.one(Log("Successful program")))
yield 10
def eff2RaiseError: Eff2[Int] = for
_ <- Tell[Eff2, Logs].tell(Chain.one(Log("Faulty program")))
_ <- MonadThrow[Eff2].raiseError(new RuntimeException("test error"))
yield 10
val (logs1, success) = eff2Success.run.unsafeRunSync()
println(s"logs: $logs1; success: $success")
// prints "logs: Chain(Log(Successful program)); success: Right(10)"
val (logs2, err) = eff2RaiseError.run.unsafeRunSync()
println(s"logs: $logs2; err: $err")
// prints "logs: Chain(Log(Faulty program)); err: Left(test error)"
}
Cats Example
Now we have everything to make an example for cats-effect
.
CatsExample.scala:
object CatsExample extends Example1 with IOApp.Simple{
override def run: IO[Unit] =
val lowLvlOp1: Input1 => REff[Output1] = s => Applicative[REff].pure(s.length)
val lowLvlOp2: Output1 => REff[Output2] = i => Applicative[REff].pure(i.toDouble)
val lowLvlOp3: Input2 => REff[Output1] = l => Applicative[REff].pure(l.size)
val service: Service[REff] = ServiceImpl[REff](lowLvlOp1, lowLvlOp2, lowLvlOp3)
val program = for
result1 <- service.operation1("hello")
result2 <- service.operation2("hello".toList)
yield (result1, result2)
for
_ <- Console[IO].println("Hello Cats")
(logs, resultOrErr) <- program.run
_ <- LogProcessor.processLogs[IO, Chain](logs)
(r1, r2) <- IO.fromEither(resultOrErr)
_ <- Console[IO].println(s"result1: $r1")
_ <- Console[IO].println(s"result2: $r2")
yield ()
}
ZIO Example
For the ZIO example I am going to use ZPure
as an effect. To make it work it is also necessary to add instances of Console
, Monad
and Tell
for that effect(see ZPureInstances.scala
).
ZIOExample.scala:
import io.koff.tf.effect_gathering.ZPureInstances.given
object ZIOExample extends ZIOAppDefault with Example1 {
type REff[T] = Writer[Log, T]
override def run: ZIO[Any & ZIOAppArgs & Scope, Any, Any] =
val lowLvlOp1: Input1 => REff[Output1] = s => Writer.succeed(s.length)
val lowLvlOp2: Output1 => REff[Output2] = i => Writer.succeed(i.toDouble)
val lowLvlOp3: Input2 => REff[Output1] = l => Writer.succeed(l.size)
val service: Service[REff] = ServiceImpl(lowLvlOp1, lowLvlOp2, lowLvlOp3)
val program = for
result1 <- service.operation1("hello")
result2 <- service.operation2("hello".toList)
yield (result1, result2)
for
_ <- Console.printLine("Let's go!")
(logs, resultOrErr) = program.runAll(())
_ <- LogProcessor.processLogs[Task, Chunk](logs)
_ <- Console.printLine(s"result: $resultOrErr")
yield ()
}
Links
- Source code
- Tell[..] from
cats-mtl
- WriterT from
cats
- ReliableWriterT
- Docs for ZIO ZPure
- Source code for ZPure
Remarks
-
Consider these examples:
def func1(): IO[A] = ??? def func2(in: A): IO[B] = ??? for a <- func1() b <- func2(a) yield b
In this example above the
A
is necessary to callfunc2()
. Thus it is the 1st class.def func1(): IO[A] = ??? def func2(): IO[B] = ??? for a <- func1() // `a` can be replaced with `_` b <- func2() yield b
But in this example the following code has no need in
A
so it can be dropped by convertingA
toUnit
(likefunc1().void
). Thus it is the 3rd class. ↩︎ -
There is no tools in vanilla Scala 2/3 to actually prove that a certain computation is finite. In this case it is rather a strong assumption which if important better to check with tests. ↩︎
-
Check the difference between the usual
WriterT
andReliableWriterT
side by side:final case class WriterT [F[_], L, V] (run: F[(L, V)]) final case class ReliableWriterT[F[_], E, W, A](run: F[(W, Either[E, A])])
ReliableWriterT
catches theE
error and returns it together with the collected logsW
. ↩︎