Akka使用系列之三:层次结构

Akka 是用层次结构组织 Actors 的。

1. Akka 的层次结构

我们需要实现一个翻译模块,其功能是输入中文输出多国语言。我们可以让一个 Master Actor 负责接收外界输入,多个 Worker Actor 负责将输入翻译成特定语言,Master Actor 和 Worker Actor 之间是上下级层次关系。下图展示了这种层级结构。

具体代码实现如下所示。

 
 
 
 
  1. class Master extends Actor with ActorLogging{
  2.     val english2chinese 
  3.     = context.actorOf(Props[English2Chinese],"English2Chinese")
  4.     val english2cat     
  5.     = context.actorOf(Props[English2Cat],"English2Cat")
  6.     def receive = {
  7.         case eng1:String =>{
  8.             english2chinese ! eng1
  9.             english2cat     ! eng1
  10.         }
  11.     }
  12. }
  13. class English2Chinese extends Actor with ActorLogging{
  14.     def receive = {
  15.         case eng:String => {
  16.             println("我翻译不出来!")
  17.         }
  18.     }
  19. }
  20. class English2Cat extends Actor with ActorLogging{
  21.     def receive = {
  22.         case eng:String =>{
  23.              println( "喵喵喵!")
  24.         }
  25.     }
  26. }
  27. object Main{
  28.     def main(args:Array[String])={
  29.         val sys = ActorSystem("system")
  30.         val master = sys.actorOf(Props[Master],"Master")
  31.         master ! "Hello,world!"
  32.     }
  33. }

我们在 Master Actor 中使用 context.actorOf 实例化 English2Chinese 和 English2Cat,便可以在它们之间形成层次关系。这点通过它们的 actor 地址得到证实。

上面的 Actors 层次结构是我们程序里 Actor 的层次结构。这个层次结构是 Actor System 层次结构的一部分。Actor System 层次结构从根节点出来有两个子节点:UserGuardian 和 SystemGuardian。用户程序产生的所有 Actor 都在 UserGuardian 节点下,SystemGuardian 节点则包含系统中的一些 Actor,比如 deadLetterListener。如果一个 Actor 已经 stop 了,发送给这个 Actor 的消息就会被转送到 deadLetterListener。因此完整的 Actor 层次结构如下所示。

2. Akka 的容错机制

对于分布式系统来说,容错机制是很重要的指标。那么 Akka 是怎么实现容错的呢?Akka 的容错机制是基于层次结构: Akka 在 Actor 加一个监控策略,对其子 Actor 进行监控。下面的代码是给 Actor 加了一个监控策略,其监控策略内容:如果子 Actor 在运行过程中抛出 Exception,对该子 Actor 执行停止动作 (即停止该子 Actor)。

 
 
 
 
  1. override val supervisorStrategy 
  2. = OneForOneStrategy(){
  3.     case _:Exception => Stop
  4. }

Akka 的监控策略一共支持四种动作:Stop, Resume, Restart 和 Escalate。

  1. Stop:子 Actor 停止。
  2. Resume:子 Actor 忽略引发异常的消息,继续处理后续消息。
  3. Restart:子 Actor 停止,重新初始化一个子 Actor 处理后续消息
  4. Escalate:错误太严重,自己已经无法处理,将错误信息上报给父 Actor。

Akka 的监控策略分为两种。一种是 OneForOne。这种策略只对抛出 Exception 的子 Actor 执行相应动作。还是拿上面的翻译模块做例子,我们加入一个 OneForOne 的 Stop 的监控策略。

 
 
 
 
  1. class Master1 extends Actor with ActorLogging{
  2.   val english2Chinese=  
  3.   context.actorOf(Props[English2Chinese1],"English2Chinese")
  4.   val english2Cat = 
  5.   context.actorOf(Props[English2Cat1], "English2Cat")
  6.   override val supervisorStrategy 
  7.   = OneForOneStrategy(){
  8.     case _:Exception => Stop
  9.   }
  10.   override def receive = {
  11.     case eng:String => {
  12.       english2Cat ! eng;
  13.       english2Chinese ! eng;
  14.      }
  15.   }
  16. }
  17. class English2Chinese1 extends Actor with ActorLogging{
  18.   override def receive = {
  19.     case eng:String => {
  20.       println("翻译不出来")
  21.     }
  22.   }
  23. }
  24. class English2Cat1 extends Actor with ActorLogging{
  25.   override def receive = {
  26.     case eng:String => {
  27.       throw new Exception("Exception in English2Cat1")
  28.     }
  29.   }
  30. }
  31. object hierarchy1 {
  32.   def main(args:Array[String])={
  33.     val system 
  34.     = ActorSystem("system")
  35.     val master
  36.     = system.actorOf(Props[Master1],"Master")
  37.     master ! "Hello, world"
  38.     Thread.sleep(1000)
  39.     master ! "Hello, world"
  40.   }
  41. }

运行这段代码,我们得到下面结果。从下面的结果,我们可以看出:***轮 English2Cat1 抛出了 Exception, English2Chinese1 正常工作;第二轮,English2Cat1 已经死了,English2Chinese1 也已经死亡了。这个结果说明监控策略已经将 MasterActor 的所有子 Actor 停止了。

另一种是 AllForOne。如果有子 Actor 抛出 Exception,这种监控策略对所有子 Actor 执行动作。

 
 
 
 
  1. class Master2 extends Actor with ActorLogging{
  2.   val english2Chinese 
  3.   = context.actorOf(Props[English2Chinese2],"English2Chinese")
  4.   val english2Cat     
  5.   = context.actorOf(Props[English2Cat2], "English2Cat")
  6.   override val supervisorStrategy= AllForOneStrategy() {
  7.     case _: Exception => Stop
  8.   }
  9.   override def receive = {
  10.     case eng:String => {
  11.       english2Cat ! eng;
  12.       english2Chinese ! eng;
  13.      }
  14.   }
  15. }

运行这段代码,我们得到下面结果。从下面的结果,我们可以看出:***轮 English2Cat1 抛出了 Exception, English2Chinese1 正常工作;第二轮,English2Cat1 已经死了,English2Chinese1 也已经死亡了。这个结果说明监控策略已经将 MasterActor 的所有子 Actor 停止了。

3. 总结

我们使用 Akka 开发并行程序时,可以使用层级结构组织 Actors。层次结构不仅比较符合人类直觉,还为容错提供了机制保障。

【本文为专栏作者“李立”的原创稿件,转载请通过获取联系和授权】

文章标题:Akka使用系列之三:层次结构
本文来源:http://www.shufengxianlan.com/qtweb/news42/442942.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联