您好,登录后才能下订单哦!
密码登录
            
            
            
            
        登录注册
            
            
            
        点击 登录注册 即表示同意《亿速云用户服务条款》
        通信模型架构图

master 端代码
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
// 需要导入这2个包 封装一些属性。
class MasterActor extends Actor {
  //在开始之前调用一次
  override def preStart(): Unit = {
  }
  //用于接收消息
  override def receive: Receive = {
    case "started" => {
      println("Master has been started!")
      //进入这个分支,说明这个Master线程已经启动完成
    }
    case "connecting" => {
      println("Master has been get connect from Worker!")
      println("a Worker Node has been register!")
      //返回消息给Worker
      sender() ! "connected"
      Thread.sleep(1000)
    }
    case "stoped" => {
    }
  }
}
object Demo01MasterActor {
  def main(args: Array[String]) {
    //设置MasterIP和端口
    val masterHost = "localhost"
    val masterPort = "1234"
    //端口和IP封装到akka架构,获取一个属性配置文件
    val conStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$masterHost"
         |akka.remote.netty.tcp.port = "$masterPort"
  """.stripMargin
    val config = ConfigFactory.parseString(conStr)
    val masterActorSystem = ActorSystem("MasterActorSystem", config)
    val masterActor = masterActorSystem.actorOf(Props[MasterActor], "MasterActor")
    masterActor ! "started"
    masterActorSystem.awaitTermination();
  }
}
worker端代码
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
class WorkerActor extends Actor {
  var masterURL: ActorSelection = null
  //启动Actor之前执行,做初始化工作
  override def preStart(): Unit = {
    //配置访问Master的URL
    //MasterIP:localhost
    //MasterPort:8888(根据Master配置)
    //Master的 ActorSystem对象:MasterActorSystem、MasterActor
    masterURL = context.actorSelection("akka.tcp://MasterActorSystem@localhost:8888/user/MasterActor")
  }
  override def receive: Receive = {
    case "started" => {
      println("Worker has been started!")
      //进入这个分支,说明这个Worker线程已经启动完成
      //可以去向Master注册
      //请求和Master建立连接
      masterURL ! "connecting"
    }
    case "connected" => {
      println("Worker 收到来自Master确认信息!")
    }
    case "stoped" => {
    }
  }
}
object Demo01WorkerActor {
  def main(args: Array[String]) {
    //初始化MastereIP和端口、WorkerIP和端口
    //    val masterHost = args(0)
    //    val masterPort = args(1)
    //    val workerHost = args(2)
    //    val workePort = args(3)
    val masterHost = "localhost"
    val masterPort = "8888"
    val workerHost = "localhost"
    val workePort = "8889"
    //端口和IP封装到akka架构,获取一个属性配置文件
    val conStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$workerHost"
         |akka.remote.netty.tcp.port = "$workePort"
  """.stripMargin
    val config = ConfigFactory.parseString(conStr)
    val workerActorSystem = ActorSystem("WorkerActorSystem", config)
    val workerActor = workerActorSystem.actorOf(Props[WorkerActor], "WorkerActor")
    workerActor ! "started"
    workerActorSystem.awaitTermination();
  }
}免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。