Environment
otavia
runs mainly on the JVM platform and currently only supports Scala 3
for reliable compile-time type safety. If you are not familiar with Scala 3
, you can refer to the following information to learn it.
- Basic(enough for
otavia
): Scala 3 Book - Advance: Scala 3 Language Reference
The source code for some of the following examples can be found at GitHub - otavia-projects/otavia-examples.
Add dependencies
If you use sbt, add the dependency with version
libraryDependencies += "cc.otavia" %% "otavia-all" % "{version}"
If you use mill:
ivy"cc.otavia::otavia-all:{version}"
if maven:
<dependency>
<groupId>cc.otavia</groupId>
<artifactId>otavia-all_3</artifactId>
<version>{version}</version>
</dependency>
Simple Ping-Pong Actors
This simple example defines two Actors
: PingActor
and PongActor
, where PingActor
receives a Start
message and sends a Ping
message to PongActor
, and each Ping
message sent must receive a reply message of type Pong
.
Defining Messages
According to the above description, we need 3 types of messages, and these are the 3 basic types of messages in otavia
. A Start
message is a Notice
message, a Notice
message is a type of message in otavia
that does not need to get a reply, as long as there is an address for the Actor
you can send a Notice
message to the Actor
. Ping
is an Ask
message that must have a reply message associated with it, so if an Actor
sends it to another Actor
, it means it must receive a reply message (kind of like a method parameter in a method definition). A Pong
is a reply message, which is a bit like a return value in a method definition.
The Start
message is of type Notice
, so it must inherit the Notice
trait.
case class Start(sid: Int) extends Notice
Pong
must inherit the Reply
trait, Ping
is a message of type Ask
and must inherit the Ask
trait, the Ask
trait carries a type constraint that describes the type of message for which a reply is expected for this Ask
message.
case class Pong(pingId: Int) extends Reply
case class Ping(id: Int) extends Ask[Pong]
Implementing the actor
Once we have our messages, let's define our Actor.
First let's determine the types of messages our Actor
can receive, since otavia
is a message-type-safe Actor
programming framework, so let's determine the types of messages that each Actor
can receive: PingActor
receives Start
and Pong
messages, and PongActor
receives Ping
messages and replies to Pong
messages. Since reply messages are constrained by Ask
messages in otavia
, there is no need to constrain such messages in the definition of Actor
, and since PingActor
needs to send a message to PongActor
, PingActor
needs to know the address of PongActor
. Roughly, we can define the class name and generic parameters of our Actor as follows:
final class PongActor() extends StateActor[Ping] {
// ...
}
final class PingActor(pongActorAddress: Address[Ping]) extends StateActor[Start] {
// ...
}
Here comes StateActor
which we can ignore for now, the final Actor
in otavia
must inherit either StateActor
or ChannelsActor
. The ChannelsActor
is the Actor
that handles IO, and all the rest of the Actors
are StateActor
s.
Next, let's implement the methods for processing messages!
First there is the PingActor
, which has to process the Start
message and send the Ping
message during processing, then wait for the Pong
message as a reply message, and finally end the processing of the Start
message.
final class PingActor(pongActorAddress: Address[Ping]) extends StateActor[Start] {
override def resumeNotice(stack: NoticeStack[Start]): StackYield = stack.state match {
case _: StartState =>
println("PingActor handle Start message")
println("PingActor send Ping Message")
val state = FutureState[Pong]()
pongActorAddress.ask(Ping(stack.notice.sid), state.future)
stack.suspend(state)
case state: FutureState[Pong] =>
val future = state.future
if (future.isSuccess) {
println(s"PingActor received ${future.getNow} message success!")
assert(future.getNow.pingId == stack.ask.sid)
}
stack.`return`()
}
}
resumeNotice
is the entry point for Actor
to process Notice
messages. Any Notice
messages sent from elsewhere are passed into the Actor
through this method. Next we'll implement the PongActor
. The PongActor
receives a Ping
message and replies with a Pong
message:
final class PongActor() extends StateActor[Ping] {
override def resumeAsk(stack: AskStack[Ping]): StackYield = {
println(s"PongActor received ${stack.ask} message")
println(s"PongActor reply ${stack.ask} with Pong message")
stack.`return`(Pong(stack.ask.id))
}
}
resumeAsk
is the entry point for Actor
to process Ask
messages, and Ask
messages sent from elsewhere are passed into Actor
from this method.
We can see that the resumeXXX
method of Actor
does not take messages as arguments directly, but rather loads them into Stack
s: Notice
messages are loaded into NoticeStack
and Ask
messages are loaded into AskStack
. Unlike most other Actor programming frameworks, sending and receiving messages in otavia
is tightly managed, which takes away some of the flexibility but ensures that sending and receiving messages are compile-time type-safe, and makes the process of sending and receiving messages more akin to method calls. Let's take a look at how we send and receive messages:
- Sends a
Notice
message using thenotice
method ofAddress
. Calling thenotice
method returns immediately and has no return value. - Use the
Address
'sask
method to send anAsk
message, and theask
method also takes aFuture
as an argument. When theActor
receives aReply
message associated with thisAsk
message, theReply
message is placed into theFuture
and the state of theFuture
is set to complete. - A
Future
can only be associated with oneStackState
, but aStackState
can be associated with multipleFuture
s. - A
Stack
can only have oneStackState
. At the end of theresumeXXX
method, if the return value isSome(StackState)
, set thisStackState
to the latest state of theStack
and release the oldStackState
and its associatedFuture
. - When a
Future
completes, theStackState
checks to see if the associatedStack
can be executed again by theresumeXXX
method. When theresumable
method ofStackState
returnsture
or all associatedFuture
s reach completion, the associatedStack
can be executed again. - The
StackState
is customizable by the developer and theresumable
method can be overridden. Of course,otavia
also predefines some commonStackState
s, such asFutureState
used in the example.StackState
providessuspend
method to returnStackYield
. - When the
Actor
processes aNotice
message or anAsk
message, it loads the message into a newly createdStack
, sets theStackState
of theStack
toStackState.start
, and passes theStack
to theresumeXXX
method to begin execution. .StackState.start
is a specialStackState
of typeStartState
that is not associated with anyFuture
and whoseresumable
method returnsture
. - The
Stack
uses thereturn
method to end processing of the message it is bound to, so the return value of thereturn
method isNone
. ForAskStack
, thereturn
method takes aReply
message as an argument to send to the sender of theAsk
message as a reply message to theAsk
message.
All of the above is compile-time type-safe for sending and receiving messages! A Stack
can only have one StackState
, its initial state is StartStack
, every time resumeXXX
completes, it switches to a new StackState
, and the last return
method will return None
, which means the Stack
is complete!
Although the message processing may seem complex, most of these steps do not need to be done directly by the developer. The developer implements the resumeXXX
method by simply pattern matching the Stack
's StackState
and then executing the corresponding business logic. If there is a need to wait for some asynchronous messages, a new StackState
is returned, otherwise the return
method is used to end the Stack
.
At this point, all the Actors
and messages we need are fully implemented. Next, start an ActorSystem
to run these Actors
.
Running the actor
@main def run(): Unit = {
val system = ActorSystem()
val pongActor = system.buildActor(() => new PongActor())
val pingActor = system.buildActor(() => new PingActor(pongActor))
pingActor.notice(Start(88))
}
With ActorSystem()
we can easily create an ActorSystem
, which is a runtime container for the actor in otavia
. A JVM instance is only allowed to start one ActorSystem
instance. The buildActor
method of ActorSystem
allows us to instantiate our Actor
. The buildActor
method does not return the Actor
object itself, instead it returns an Address
to which we can send messages that the Actor
can handle.
Everything above is compile-time type-safe; if you send a message to the address returned by buildActor
that the corresponding actor can't handle, it won't compile. Same, if you use the AskStack
's return
method to send a Reply
message that does not match the corresponding Ask
message, this will also fail to compile.
Receive multiple message types
The above example demonstrates an actor that handles one type of message, but in real-world scenarios we often need to handle multiple types of messages in a single actor. This is very easy in Scala 3
, and thanks to Scala 3
's powerful Union Types
and Intersection Types
, we can also make it compile-time type-safe to handle multiple messages.
Suppose we need to implement an actor that receives a Hello
message and returns a World
message, receives a Ping
message and returns a Pong
message, and receives an Echo
message and returns no message.
The above requirement requires us to define the following kinds of messages:
case class Echo() extends Notice
case class World() extends Reply
case class Hello() extends Ask[World]
case class Pong() extends Reply
case class Ping() extends Ask[Pong]
Then we implement our actor:
final class MultiMsgActor() extends StateActor[Echo | Hello | Ping] {
override def resumeNotice(stack: NoticeStack[Echo]): StackYield = {
println("MultiMsgActor received Echo message")
stack.`return`()
}
override def resumeAsk(stack: AskStack[Hello | Ping]): StackYield = {
stack match {
case stack: AskStack[Hello] if stack.ask.isInstanceOf[Hello] => handleHello(stack)
case stack: AskStack[Ping] if stack.ask.isInstanceOf[Ping] => handlePing(stack)
}
}
private def handleHello(stack: AskStack[Hello]): StackYield = {
println("MultiMsgActor received Hello message")
stack.`return`(World())
}
private def handlePing(stack: AskStack[Ping]): StackYield = {
println("MultiMsgActor received Ping message")
stack.`return`(Pong())
}
}
You might wonder, if a message inherits both Notice
and Ask
, will the message end up being processed by resumeNotice
or resumeAsk
? The answer is both. Otavia
does not determine how a message should be handled based on the type of message, but by how it is sent. There are notice
and ask
methods in Address
, and messages sent by the notice
method are eventually processed by resumeNotice
, while messages sent by the ask
method are processed by resumeAsk
!
Timer
The otavia
runtime includes a powerful timer component, Timer
, which you can interact with in a number of ways, the main usage scenarios are described below:
Handle TimeoutEvent
An Actor
can register for a timeout trigger task via the registerActorTimeout
method of Timer
. When the timeout condition is reached, the Timer
generates a TimeoutEvent
and sends it to the registered Actor
. The timeout event is then handled by the handleActorTimeout
method of the Actor
.
final class TickActor() extends StateActor[Nothing] { // [Nothing] if no message need process!
private var onceTickId: Long = Timer.INVALID_TIMEOUT_REGISTER_ID
private var periodTickId: Long = Timer.INVALID_TIMEOUT_REGISTER_ID
override protected def afterMount(): Unit = {
onceTickId = timer.registerActorTimeout(TimeoutTrigger.DelayTime(1, TimeUnit.SECONDS), self)
periodTickId = timer.registerActorTimeout(TimeoutTrigger.DelayPeriod(2, 2, TimeUnit.SECONDS, TimeUnit.SECONDS), self)
}
override protected def handleActorTimeout(timeoutEvent: TimeoutEvent): Unit = {
if (timeoutEvent.registerId == periodTickId) {
println(s"period timeout event triggered at ${LocalDateTime.now()}")
} else if (timeoutEvent.registerId == onceTickId) {
println(s"once timeout event triggered at ${LocalDateTime.now()}")
} else {
println("Never run this")
}
}
}
It is important to note that the timer
method within the Actor
object must be used after the Actor
has been mounted on the ActorSystem
. Only after the Actor
has been mounted will runtime-related information be injected into the Actor
object. Therefore, you cannot use the timer
method directly in the constructor of the Actor
object because the Actor
instance has not been mounted to the ActorSystem
yet, and using the runtime-related methods will result in a null pointer exception. This is described later in the Actor
lifecycle.
Stack Sleep
If we want a Stack
to wait a certain amount of time before it starts re-executing, we can have StackState
associated with a MessageFuture[TimeoutReply]
, and this Future
will take the timeout event as a result. The MessageFuture[TimeoutReply]
completes only when the timeout event is received.
In the previous example, PingActor
handles the Start
message with FutureState
, which is one of the more common state classes defined by otavia
, but you can also customize StackState
if these don't meet your needs.
Now FutureState
is not enough for us because it is bound to only one MessageFuture[Pong]
, now we need to bind not only MessageFuture[Pong]
but also MessageFuture[TimeoutReply]
. Stack
can be re-executed only when both Future
s have completed. Let's define our new StackState
.
class PongTimeoutState extends StackState {
val pongFuture = MessageFuture[Pong]()
val timeoutFuture = MessageFuture[TimeoutReply]()
}
Next, re-implement our PingActor
final class PingActor(pongActorAddress: Address[Ping]) extends StateActor[Start] {
override def resumeNotice(stack: NoticeStack[Start]): StackYield = stack.state match {
case _: StartState =>
println("PingActor handle Start message")
println("PingActor send Ping Message")
val state = PongTimeoutState()
pongActorAddress.ask(Ping(stack.notice.sid), state.pongFuture)
timer.sleepStack(state.timeoutFuture, 2 * 1000)
stack.suspend(state)
case state: PongTimeoutState =>
val future = state.pongFuture
if (future.isSuccess) {
println(s"PingActor received ${future.getNow} message success!")
assert(future.getNow.pingId == stack.ask.sid)
}
stack.`return`()
}
}
Okay, that's done! Now our PingActor
needs to receive the Pong
reply message and also wait 2 seconds for this Stack
to be resumed.
Setting a Timeout for Reply Messages
Sometimes, when we send an Ask
message, the Actor
on the other side may take a long time, but we don't want our requesting Actor
to wait too long, what do we do in this case? Maybe you've already figured out the answer! We've talked about this before:
A
StackState
can be associated with one or moreFuture
s , and aStack
can only resume execution if theresumable
method of theStackState
isture
, or all associatedFuture
s have reached completion.
So we can override the resumable
method of StackState
! Now let's redefine our PongTimeoutState
method.
class PongTimeoutState extends StackState {
val pongFuture = MessageFuture[Pong]()
val timeoutFuture = MessageFuture[TimeoutReply]()
override def resumable(): Boolean = timeoutFuture.isDone
}
Now as soon as timeoutFuture
completes, then Stack
can be re-executed. However, when we use pongFuture
we need to check for completion with pongFuture.isDone
. That's one way to do it, but given that this is a relatively common scenario, otavia
provides a simpler way. All we need to do is make a small change to our previous implementation of PingActor
:
final class PingActor(pongActorAddress: Address[Ping]) extends StateActor[Start] {
override def resumeNotice(stack: NoticeStack[Start]): StackYield = stack.state match {
case _: StartState =>
println("PingActor handle Start message")
println("PingActor send Ping Message")
val state = FutureState()
pongActorAddress.ask(Ping(stack.notice.sid), state.future, 2 * 1000)
stack.suspend(state)
case state: FutureState[Pong] =>
val future = state.future
if (future.isSuccess) {
println(s"PingActor received ${future.getNow} message success!")
assert(future.getNow.pingId == stack.ask.sid)
}
stack.`return`()
}
}
Notice the difference! The ask
method comes with a timeout parameter!
pongActorAddress.ask(Ping(stack.notice.sid), state.future, 2 * 1000) // new
pongActorAddress.ask(Ping(stack.notice.sid), state.future) // old
If the Pong
message is not received after 2 seconds, then the Future
will be set to complete, but unlike before, we can't get the Pong
message from inside the Future
, it's already in a failed state, i.e. isDone=ture isSuccess=false isFailed= ture
. Since this StackState
is associated with only one Future
, and this Future
is already in a completed state, this Stack
can continue to be scheduled for execution.
LifeCycle of Actor
In otavia
, the user doesn't have to worry much about managing the life cycle of Actor
, the Actor
instance is still managed by the JVM garbage collection. As long as the Actor
is no longer referenced by the GC root object, the Actor
instance will be automatically reclaimed by the JVM's garbage collection system. There are several hook methods in Actor
that can be called during different life cycles, such as
afterMount
: Called after theActor
instance is mounted toActorSystem
For Actor instancescontext
related methods are only available after mounting.beforeRestart
: Called before reboot.restart
: Methods for restartingActor
instances.afterRestart
: Called after a reboot.AutoCleanable.cleaner
: If your implementation ofActor
needs to clean up some unsafe resources, inherit theAutoCleanable
trait and implement thecleaner
method.
The following figure shows the complete lifecycle of an Actor
:
Let's define an Actor
and look at the various hook methods in the lifecycle:
case class Start() extends Notice
final class LifeActor extends StateActor[Start] with AutoCleanable {
override protected def afterMount(): Unit = println("LifeActor: afterMount")
override protected def beforeRestart(): Unit = println("LifeActor: beforeRestart")
override protected def restart(): Unit = println("LifeActor: restart")
override protected def afterRestart(): Unit = println("LifeActor: afterRestart")
override def cleaner(): ActorCleaner = new ActorCleaner {
println("creating actor cleaner")
override protected def clean(): Unit = println("clean actor resource before actor stop")
}
// if occurs some error which developer is not catch, this will trigger the actor restart
// you can also override the noticeExceptionStrategy method to change the strategy
override def resumeNotice(stack: NoticeStack[Start]): StackYield = {
println("process message")
throw new Error("")
}
}
The afterMount
method is mainly used for dependency injection, which will be explained in more detail later.
Handling IO
Now we have learned to 1) Define Actor
2) Define messages 3) Send a message 4) Receive a message and process a message 5) Trigger a timeout using Timer
.
But in real business, we often need to deal with a lot of IO tasks, and IO tasks will block our threads, which is one of the reasons why our programs are underperforming. At the same time, some new technologies such as epoll, io_uring, etc. are booming, which allow us to handle IO tasks without blocking. The JVM also provides NIO to support non-blocking IO, but it's not easy to use NIO because the APIs provided by the JVM, such as java.nio.ByteBuffer
and java.nio.channels.Channel
, are too low-level. It's more common to use Netty to handle IO tasks in the JVM.
Thanks to Netty for providing a powerful IO programming tool for the JVM world! Inspired by Netty, the IO module in otavia
is basically ported from Netty
! Handling IO tasks in otavia
is very much the same as in Netty, and the API is basically the same. This also makes it very easy for otavia
to take advantage of the extensive Netty ecosystem and port codecs for various network protocols. View otavia ecosystem.
otavia
also uses Channel
to represent an IO object, such as an open file, a network connection. In otavia
, however, the Channel
must run in a ChannelsActor
. Also Channel
contains a ChannelPipeline
component with a ChannelHandler
queue.
In order to better manage different Channel
s, otavia
implements several different kinds of ChannelsActor
, they are:
AcceptorActor
: manages theChannel
that listens for TCP connections, which needs to instantiate a set ofAcceptedWorkerActor
s as workingActor
s. NormalChannel
s that are accepted by the listeningChannel
are wrapped in a message and sent to one of theAcceptedWorkerActor
s, and managed by the selectedAcceptedWorkerActor
.AcceptedWorkerActor
: The workingActor
for theAcceptorActor
.SocketChannelsActor
: manages the TCP clientChannel
.DatagramChannelsActor
: manages the UDPChannel
.
You will need to choose a type of ChannelsActor
to implement depending on your needs. Now, let's start our journey with a simple file reading example!
Files IO
Netty only supports network IO, otavia
supports not only network but also files. Now we're going to implement an Actor
that receives a request to read a file and returns the file as a Seq[String]
.
First, let's define the messages that this Actor
needs to process and return.
case class LinesReply(lines: Seq[String]) extends Reply
case class ReadLines() extends Ask[LinesReply]
Next we will implement our Actor
, what is the behavior of this Actor
? First we need to open the file! So we have a Channel
that represents the file, and we send the command to read the file to this Channel
.
final class ReadLinesActor(file: File, charset: Charset = StandardCharsets.UTF_8)
extends ChannelsActor[ReadLines] {
override protected def initFileChannel(channel: Channel): Unit = ???
override def resumeAsk(stack: AskStack[ReadLines]): StackYield = {
stack.state match {
case _: StartState =>
stack.suspend(openFile(file, Seq(StandardOpenOption.READ), attrs = Seq.empty))
case openState: ChannelFutureState if openState.id == 0 =>
val linesState = ChannelFutureState(1)
openState.future.channel.ask(FileReadPlan(-1, -1), linesState.future)
stack.suspend(linesState)
case linesState: ChannelFutureState if linesState.id == 1 =>
stack.`return`(LinesReply(linesState.future.getNow.asInstanceOf[Seq[String]]))
}
}
}
First we use the openFile
method to open the file and return a StackState
, which will be ready to run when the file is opened. This method is a shortcut provided by the ChannelsActor
, and we can see from the source code that it is implemented as:
// source code from ChannelsActor
final protected def openFile(path: Path, opts: Seq[OpenOption], attrs: Seq[FileAttribute[?]]): StackState = {
val channel = createFileChannelAndInit()
val state = ChannelFutureState()
val future: ChannelFuture = state.future
channel.open(path, opts, attrs, future)
state
}
Nothing special, it's pretty much the same as the Stack
we talked about before, except that here the StackState
is a ChannelFutureState
and the associated Future
is a ChannelFuture
. This ChannelFuture
is then passed to Channel
using the channel.open
method. The open
method is not blocking and returns immediately after it is called, and if this file is opened, then Reactor
sends ChannelsActor
a ReactorEvent
, ChannelsActor
receives the event and sets the ChannelFuture
to completion. The ChannelActor
then checks to see if the Stack
associated with it can be re-executed.
Here we also see a createFileChannelAndInit
method, which we can guess from the name creates a file Channel
and also initializes the Channel
. This method calls the ChannelsActor.initFileChannel
method to initialize the Channel
.
Now we can implement our ReadLinesActor.initFileChannel
. We can see that we passed in a FileReadPlan
instance after the file was opened:
val linesState = ChannelFutureState(1)
openState.future.channel.ask(FileReadPlan(-1, -1), linesState.future)
The ask
method will eventually pass FileReadPlan
into ChannelPipeline
via the write
method of Channel
. The ChannelHandler
inside the ChannelPipeline
needs to be implemented by us, which is basically the same as in Netty.
Now let's implement our ChannelHandler
. What does this ChannelHandler
need to do? First, we need to be able to handle the write
inbound event, and second, we need to handle the channelRead
outbound event. Then we need to handle the channelReadComplete
outbound event when the read is complete, and in the channelReadComplete
method, we need to generate the final result message back to our ReadLinesActor
.
class ReadLinesHandler(charset: Charset) extends ByteToMessageDecoder {
private val lines = ArrayBuffer.empty[String]
private var currentMsgId: Long = -1
override def write(ctx: ChannelHandlerContext, msg: AnyRef, msgId: Long): Unit = msg match {
case readPlan: FileReadPlan =>
ctx.read(readPlan)
currentMsgId = msgId
case _ =>
ctx.write(msg, msgId)
}
override protected def decode(ctx: ChannelHandlerContext, input: AdaptiveBuffer): Unit = {
var continue = true
while (continue) {
val length = input.bytesBefore('\n'.toByte) + 1
if (length > 0) {
lines.addOne(input.readCharSequence(length, charset).toString)
} else continue = false
}
}
override def channelReadComplete(ctx: ChannelHandlerContext): Unit = {
val seq = lines.toSeq
lines.clear()
val msgId = currentMsgId
currentMsgId = -1
ctx.fireChannelRead(seq, msgId)
}
}
Then we also need to add this ReadLinesHandler
to the ChannelPipeline
of our Channel
, remember the initFileChannel
method that we didn't implement in the ReadLinesActor
before. Now we can finalize this method!
override protected def initFileChannel(channel: Channel): Unit =
channel.pipeline.addFirst(new ReadLinesHandler(charset))
We can notice that our write
method has a msgId
parameter, which is a unique message number within the Channel
generated by the ask
method. Because the write
method is non-blocking, it does not wait for the underlying IO call to complete, but instead submits a command to read the data to the Reactor
via ChannelHandlerContext.read
. The actual IO work of reading the data is done by the Reactor
, and the result is sent to the ChannelsActor
as an Event
. The ChannelsActor
then dispatches the Event
to the Channel
. The Channel
then propagates a corresponding outbound event in the ChannelPipeline
by calling the channelRead
method. Here we inherit from ByteToMessageDecoder
, whose channelRead
method calls the decode
method, which is mainly used to convert a sequence of bytes into the object we need. When the Reactor
finishes reading the data, it also sends an Event
to the ChannelsActor
, which in turn calls the channelReadComplete
method to propagate an outbound event in the ChannelPipeline
. We need to override the channelReadComplete
method in our ReadLinesHandler
to generate the final result and continue to call the channelRead
method to propagate the final result in the outbound direction. Notice that our fireChannelRead
method also has a msgId
parameter that will allow the message to eventually find its way to the Future
inside our previous ask
method. Next the story goes back to our Actor
's Stack
dispatch.
Now let's start our program to read a text file!
@main def start(): Unit = {
val system = ActorSystem()
val readLinesActor = system.buildActor(() => new ReadLinesActor("build.sc"))
system.buildActor(() => new MainActor(Array.empty) {
override def main0(stack: NoticeStack[MainActor.Args]): StackYield = stack.state match {
case _: StartState =>
val state = FutureState[LinesReply]()
readLinesActor.ask(ReadLines(), state.future)
stack.suspend(state)
case state: FutureState[LinesReply] =>
for (line <- state.future.getNow.lines) print(line)
stack.`return`()
}
})
}
You may be thinking that this is too much trouble, I can just use the java file API to do it in a few lines of code, but you have to write so much! Well, you have to pay for what you get, and the payoff is that this file IO doesn't block the current Actor
's execution thread! The actual IO reading and writing in otavia
is done by the Reactor
component, whose default transport layer implementation is based on NIO. Since the transport layer is implemented using the SPI mechanism, we can replace the default NIO transport layer with a higher-performance one based on techniques such as epoll, kqueue, IOCP, or even io_uring! This doesn't require a single change to the upper level code, as long as you implement the transport layer module according to the SPI specification, add the JARs to CLASSPATH, and it will take effect immediately! Implementing this efficient transport layer module is the goal of the native-transport project in otavia ecosystem! If you are interested, your contributions are warmly welcomed!
Network IO
Now we are going to implement an echo server on a TCP network that can use telnet to connect, send data inside the telnet, and return the data to the telnet as is. Since this is a TCP service, we use AcceptedWorkerActor
and AcceptorActor
to implement it. The classes we implement named EchoAcceptor
and EchoWorker
.
First we implement EchoAcceptor
, this Actor
manages a Channel
that listens for connections, accepts the connection and generates a new Channel
, and then sends this new Channel
to the EchoWorker
. The EchoWorker
is responsible for specific tasks.
final class EchoAcceptor extends AcceptorActor[EchoWorker] {
override protected def workerNumber: Int = 4
override protected def workerFactory: AcceptorActor.WorkerFactory[EchoServerWorker] =
() => new EchoWorker()
}
In EchoAcceptor
we define the workerFactory
method of how to create an EchoWorker
and the workerNumber
method of how many EchoWorker
instances to create. The EchoAcceptor
receives the ChannelsActor.Bind
message and then creates a Channel
and binds it to a port to listen for network connections. A WorkerNumber
instance of EchoWorker
is created to distribute the incoming network connections. Next we define our EchoWorker
final class EchoWorker extends AcceptedWorkerActor[Nothing] {
override protected def initChannel(channel: Channel): Unit = ???
override def resumeAsk(stack: AskStack[AcceptedChannel]): StackYield =
handleAccepted(stack)
override protected def afterAccepted(channel: ChannelAddress): Unit =
println(s"EchoWorker accepted ${channel}")
}
Notice the initChannel
method? It's similar to initFileChannel
in the previous example, but this method is used to initialize the network Channel
. We need to define our ChannelHandler
again and add it to the ChannelPipeline
via this method. Now let's implement our ChannelHandler
.
final class EchoWorkerHandler extends ByteToMessageDecoder {
override protected def decode(ctx: ChannelHandlerContext, input: AdaptiveBuffer): Unit =
if (input.readableBytes > 0) ctx.writeAndFlush(input)
}
Now that we have our EchoWorkerHandler
, let's add it to ChannelPipeline
:
override protected def initChannel(channel: Channel): Unit =
channel.pipeline.addLast(new EchoWorkerHandler())
Now that everything is ready, let's start our echo server!
@main def start(): Unit = {
val actorSystem = ActorSystem()
actorSystem.buildActor(() => new MainActor() {
override def main0(stack: NoticeStack[MainActor.Args]): StackYield = stack.state match {
case _: StartState =>
val acceptor = system.buildActor(() => new EchoAcceptor())
val state = FutureState[BindReply]()
acceptor.ask(Bind(8080), state.future)
stack.suspend(state)
case state: FutureState[BindReply] =>
println("echo server bind port 8080 success")
stack.`return`()
}
})
}
Just start telnet and connect to the echo server and try it out!
This example is just a brief introduction to the use of Channel, and does not show the more powerful ways in which Channel can interact with the ChannelsActor. You can learn more at Core Concepts and Design.
Global Actor and Dependency Injection
Dependency injection is an effective way to decouple code, and it's also useful for the Actor model. Imagine that sometimes our Actor
can receive a fixed type of message, but we have different implementations for different scenarios. If we use buildActor
to construct these Actors
, there is significant code coupling. Whenever the scenario changes we need to modify this code, which is very inflexible. To solve this problem, otavia
introduces the idea of dependency injection. We will use a simple example to demonstrate how dependency injection works in otavia
.
Let's assume that we need to rely on a service Actor
in our system, and we know the type of messages it receives and returns, but this service requires different implementations in different scenarios. The following is the message definition for our service.
case class Result1() extends Reply
case class Query1() extends Ask[Result1]
case class Result2() extends Reply
case class Query2() extends Ask[Result2]
We can then define a trait
to constrain the messages that our service Actor
can receive. This is similar to defining a service using interfaces in object-oriented, except that in object-oriented you use interfaces to constrain the methods that can be called and in otavia
you use interfaces to constrain the messages that can be processed!
trait QueryService extends Actor[Query1 | Query2]
With this QueryService
interface, the specific service Actor
we implement needs to inherit from this interface.
final class QueryServiceImpl() extends StateActor with QueryService {
override def resumeAsk(stack: AskStack[Query1 | Query2]): StackYield = ??? // impl logic
}
// QueryService in another scenario
final class QueryServiceCase2() extends SocketChannelsActor with QueryService {
override def resumeAsk(stack: AskStack[Query1 | Query2]): StackYield = ??? // impl logic
}
For Actor
s that rely on QueryService
, you can use the autowire
method to look up the Address
of an available QueryService
in the ActorSystem
.
case class Start() extends Notice
final class TestActor extends StateActor[Start] {
private var queryService: Address[MessageOf[QueryService]] = _
override protected def afterMount(): Unit = queryService = autowire[QueryService]() // compile-time type-safe
override def resumeNotice(stack: NoticeStack[Start]): StackYield = stack.state match {
case _: StartState =>
val state = FutureState[Result1]()
queryService.ask(Query1(), state.future)
stack.suspend(state)
case state: FutureState[?] =>
val pong = state.future.asInstanceOf[MessageFuture[Result1]].getNow
stack.`return`()
}
}
Now even if we replace the implementation of QueryService
, our TestActor
doesn't have to change at all! How do we get autowire[QueryService]()
to find the specific implementation Actor
, just set it to the global Actor
when instantiating the implementation Actor
.
system.buildActor(() => new QueryServiceImpl(), global = true)
// or if use the other one
system.buildActor(() => new QueryServiceCase2(), global = true)
Batch Processing Messages
Being able to batch messages is a useful technique for some scenarios. Allowing batching is simple, you just need to override the batchable
method of your Actor
to change its return value to true
, and then override the batchNoticeFilter
or batchAskFilter
methods to select the messages that will go into the batch schedule.
For the detailed code you can refer ConsoleAppender