Akka如果抛出exception,Actor不会终止
我目前正试图开始与阿卡,我面临一个奇怪的问题。 我的演员有以下代码:
class AkkaWorkerFT extends Actor { def receive = { case Work(n, c) if n < 0 => throw new Exception("Negative number") case Work(n, c) => self reply n.isProbablePrime(c); } }
这就是我开始工作的方式:
val workers = Vector.fill(nrOfWorkers)(actorOf[AkkaWorkerFT].start()); val router = Routing.loadBalancerActor(SmallestMailboxFirstIterator(workers)).start()
这就是我如何closures一切:
futures.foreach( _.await ) router ! Broadcast(PoisonPill) router ! PoisonPill
现在会发生什么,如果我发送工作人员消息与n> 0(没有例外抛出),一切工作正常,应用程序正常closures。 但是,只要我发送一条消息导致exception,应用程序就不会终止,因为仍然有一个actor正在运行,但我无法弄清楚它来自哪里。
如果有帮助,这是所讨论的线程的堆栈:
Thread [akka:event-driven:dispatcher:event:handler-6] (Suspended) Unsafe.park(boolean, long) line: not available [native method] LockSupport.park(Object) line: 158 AbstractQueuedSynchronizer$ConditionObject.await() line: 1987 LinkedBlockingQueue<E>.take() line: 399 ThreadPoolExecutor.getTask() line: 947 ThreadPoolExecutor$Worker.run() line: 907 MonitorableThread(Thread).run() line: 680 MonitorableThread.run() line: 182
PS:没有终止的线程不是任何工作线程,因为我已经添加了一个postStopcallback,他们每一个停止正常。
PPS: Actors.registry.shutdownAll
解决方法的问题,但我认为shutdownAll应该只能用作最后的手段,不应该吗?
处理阿卡活动中的问题的正确方法不是抛出exception,而是设置主pipe层次
“在并发代码中抛出一个exception(让我们假设我们正在使用非链接的参与者),只是简单地炸掉当前执行参与者的线程。
没有办法发现事情出错了(除了检查堆栈跟踪)。 你无能为力。“
请参阅Supervisor层次结构的容错(1.2)
*注意*以上是对旧版本的Akka(1.2)真的在较新的版本(如2.2)你仍然设置一个监督层次结构,但它会陷阱subprocess抛出的exception。 例如
class Child extends Actor { var state = 0 def receive = { case ex: Exception ⇒ throw ex case x: Int ⇒ state = x case "get" ⇒ sender ! state } }
并在主pipe:
class Supervisor extends Actor { import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: ArithmeticException ⇒ Resume case _: NullPointerException ⇒ Restart case _: IllegalArgumentException ⇒ Stop case _: Exception ⇒ Escalate } def receive = { case p: Props ⇒ sender ! context.actorOf(p) } }
请参阅Supervisor层次结构的容错(2.2)
closureslogging以确保事情终止,如Viktor所提出的,有点奇怪。 你可以做的是:
EventHandler.shutdown()
干净地closures所有(logging器)侦听器,使exception之后的世界继续运行:
def shutdown() { foreachListener(_.stop()) EventHandlerDispatcher.shutdown() }
打开akka.conf
中的logging器