Use case: an actor that must do something if it has not received any messages for more than a given amount of time.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.actor | |
import scala.concurrent.duration._ | |
/** | |
* <p>Thaasophobia is a fear of being idle, sitting. | |
* <p>WARNING: A thaasophobic actor with default behaviour could stop before | |
* concurrent operations complete: | |
* <ul> | |
* <li>Don't use future callbacks inside the actor | |
* <li>This actor should not expect to receive replies/ack messages | |
* </ul> | |
*/ | |
trait Thaasophobia extends Actor { | |
def idleTimeout: Duration | |
override protected[akka] def aroundReceive(receive: Receive, msg: Any): Unit = | |
msg match { | |
case ReceiveTimeout => | |
handleIdleness() | |
case _ => | |
super.aroundReceive(receive, msg) | |
} | |
def handleIdleness(): Unit = context.stop(self) | |
override def preStart(): Unit = context.setReceiveTimeout(idleTimeout) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.actor | |
import akka.testkit.TestActors.EchoActor | |
import akka.testkit.{ImplicitSender, TestActorRef} | |
import me.crowdmix.traffic.actors.ActorSpec | |
import scala.concurrent.duration._ | |
import scala.concurrent.{ExecutionContext, Future} | |
class ThaasophobiaSpec extends ActorSpec("thaasophobia") with ImplicitSender { | |
"Thaasophobic actor" should { | |
"behave normally" in new Context { | |
thaasophobicActor ! Msg | |
expectMsg(Msg) | |
} | |
"stay alive" in new Context { | |
watch(thaasophobicActor) | |
import ExecutionContext.Implicits.global | |
system.scheduler.schedule(0.millis, _idleTimeout / 2, thaasophobicActor, Ignore) | |
expectNoMsg(1.second) | |
} | |
"stop" in new Context { | |
watch(thaasophobicActor) | |
expectTerminated(thaasophobicActor) | |
} | |
"stop before future completes" in new Context { | |
watch(thaasophobicActor) | |
thaasophobicActor ! Sleep((_idleTimeout * 1.2).toMillis) | |
expectTerminated(thaasophobicActor) | |
} | |
} | |
trait Context { | |
case object Msg | |
case object Ignore | |
case class Sleep(millis: Long) | |
case object Awake | |
val _idleTimeout = 100.millis | |
val thaasophobicActor = | |
TestActorRef(new EchoActor with Thaasophobia { | |
import ExecutionContext.Implicits.global | |
override val idleTimeout = _idleTimeout | |
override val receive: Receive = { | |
case Ignore => | |
case Sleep(millis) => | |
val _sender = sender() | |
Future { | |
Thread.sleep(millis) | |
_sender | |
}.onSuccess { case ref => ref ! Awake } | |
case msg => | |
super.receive(msg) | |
} | |
}) | |
} | |
} |
Comments
Post a Comment