网站运营与推广方案,四川门户网站建设,网站介绍ppt怎么做,西安网站定制开发使用Akka的Actor模拟Spark的Master和Worker工作机制
Spark的Master和Worker协调工作原理
在 Apache Spark 中#xff0c;Master 和 Worker 之间通过心跳机制进行通信和保持活动状态。下面是 Master 和 Worker 之间心跳机制的工作流程#xff1a;
Worker 启动后#xff0c…使用Akka的Actor模拟Spark的Master和Worker工作机制
Spark的Master和Worker协调工作原理
在 Apache Spark 中Master 和 Worker 之间通过心跳机制进行通信和保持活动状态。下面是 Master 和 Worker 之间心跳机制的工作流程
Worker 启动后会向预先配置的 Master 节点发送注册请求。Master 接收到注册请求后会为该 Worker 创建一个唯一的标识符Worker ID并将其信息保存在内存中。Master 向 Worker 发送包含 Master URL、Worker ID 等信息的注册响应。Worker 收到注册响应后会启动一个定时器并开始周期性地向 Master 发送心跳消息。Worker 的心跳消息中包含当前的负载状况、可用资源等信息。Master 接收到心跳消息后更新该 Worker 的最近心跳时间并根据需要对集群进行动态调整例如添加新的任务或删除故障的 Worker。如果 Master 在一段时间内没有收到某个 Worker 的心跳消息它将把该 Worker 标记为失效并将其相应的资源标记为可用以供后续使用。
具体原理如下
Worker 通过网络向 Master 发送心跳消息通常使用基于 TCP 的长连接。这些心跳消息可以包含有关 Worker 健康状况、资源利用情况等的信息。Master 使用一个内部的心跳管理组件来处理接收到的心跳消息并维护每个 Worker 的状态。它根据心跳消息的频率和时间戳来判断 Worker 是否正常运行。如果 Master 在预定的时间内没有收到 Worker 的心跳消息它会将该 Worker 标记为失效并触发一系列的故障处理机制例如重新分配任务给其他可用的 Worker。Worker 定期发送心跳消息以确保在网络故障、Worker 故障或其他问题发生时能够及时通知 Master。
通过心跳机制Master 能够实时监控 Worker 的状态并根据需要进行集群的动态管理和资源调度从而实现高可用性和容错性。
使用Akka的Actor模拟Spark的Master和Worker工作机制
worker注册到Master, Master完成注册并回复worker注册成功。worker定时发送心跳并在Master接收到。Master接收到worker心跳后要更新该worker的最近一次发送心跳的时间。给Master启动定时任务定时检测注册的worker有哪些没有更新心跳,并将其从hashmap中删除。master worker 进行分布式部署(Linux系统)-》如何给maven项目打包-上传linux。
创建SparkMaster类继承Actor特质实现Receive方法并定义对应的伴生对象在伴生对象中创建SparkMaster-actor引用并启动Actor发送消息。服务端Master对worker进行心跳监测发现6秒内无法获取worker心跳将异常的Worker的实例从HashMap中移除。若能正常获取到心跳则获取心跳信息后更新心跳时间。定时保持心跳机制。
代码实现
class SparkMaster extends Actor {//定义个hashMap,管理workers(所有worker的实例)val workers mutable.Map[String, WorkerInfo]()override def receive: Receive {case start {println(master服务器启动了...)//这里开始。。self ! StartTimeOutWorker}case RegisterWorkerInfo(id, cpu, ram) {//接收到worker注册信息if (!workers.contains(id)) {//创建WorkerInfo 对象val workerInfo new WorkerInfo(id, cpu, ram)//加入到workersworkers ((id, workerInfo))println(服务器的workers workers)//回复一个消息说注册成功sender() ! RegisteredWorkerInfo}}case HeartBeat(id) {//更新对应的worker的心跳时间//1.从workers对应的HashMap中取出WorkerInfo,然后更新worker心跳时间val workerInfo workers(id)workerInfo.lastHeartBeat System.currentTimeMillis()println(master更新了 id 心跳时间...)}case StartTimeOutWorker {println(开始了定时检测worker心跳的任务)import context.dispatcher//说明//1. 0 millis 不延时立即执行定时器//2. 9000 millis 表示每隔3秒执行一次//3. self:表示发给自己//4. RemoveTimeOutWorker 发送的内容context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)}//对RemoveTimeOutWorker消息处理//这里需求检测哪些worker心跳超时now - lastHeartBeat 6000并从map中删除case RemoveTimeOutWorker {//首先将所有的 workers 的 所有WorkerInfoval workerInfos workers.valuesval nowTime System.currentTimeMillis()//先把超时的所有workerInfo,删除即可workerInfos.filter(workerInfo (nowTime - workerInfo.lastHeartBeat) 6000).foreach(workerInfo workers.remove(workerInfo.id))println(当前有 workers.size 个worker存活的)}}
}object SparkMaster {def main(args: Array[String]): Unit {//这里我们分析出有3个host,port,sparkMasterActorif (args.length ! 3) {println(请输入参数 host port sparkMasterActor名字)sys.exit()}val host args(0)val port args(1)val name args(2)//先创建ActorSystemval config ConfigFactory.parseString(s|akka.actor.providerakka.remote.RemoteActorRefProvider|akka.remote.netty.tcp.hostname${host}|akka.remote.netty.tcp.port${port}.stripMargin)val sparkMasterSystem ActorSystem(SparkMaster, config)//创建SparkMaster -actorval sparkMasterRef sparkMasterSystem.actorOf(Props[SparkMaster], s${name})//启动SparkMastersparkMasterRef ! start}
}
定义SparkWorker类继承Actor特质实现Receive方法在方法中实现向master发送注册信息的请求获取到服务端Master注册成功的消息后定义定时任务发送心跳包给Master。
class SparkWorker(masterHost:String,masterPort:Int,masterName:String) extends Actor{//masterProxy是Master的代理/引用refvar masterPorxy :ActorSelection _val id java.util.UUID.randomUUID().toStringoverride def preStart(): Unit {println(preStart()调用)//初始化masterPorxymasterPorxy context.actorSelection(sakka.tcp://SparkMaster${masterHost}:${masterPort}/user/${masterName})println(masterProxy masterPorxy)}override def receive:Receive {case start {println(worker启动了)//发出一个注册消息masterPorxy ! RegisterWorkerInfo(id, 16, 16 * 1024)}case RegisteredWorkerInfo {println(workerid id 注册成功~)//当注册成功后就定义一个定时器,每隔一定时间发送SendHeartBeat给自己import context.dispatcher//说明//1. 0 millis 不延时立即执行定时器//2. 3000 millis 表示每隔3秒执行一次//3. self:表示发给自己//4. SendHeartBeat 发送的内容context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)}case SendHeartBeat {println(worker id 给master发送心跳)masterPorxy ! HeartBeat(id)}}
}object SparkWorker {def main(args: Array[String]): Unit {if (args.length ! 6) {println(请输入参数 workerHost workerPort workerName masterHost masterPort masterName)sys.exit()}val workerHost args(0)val workerPort args(1)val workerName args(2)val masterHost args(3)val masterPort args(4)val masterName args(5)val config ConfigFactory.parseString(s|akka.actor.providerakka.remote.RemoteActorRefProvider|akka.remote.netty.tcp.hostname${workerHost}|akka.remote.netty.tcp.port${workerPort}.stripMargin)//创建ActorSystemval sparkWorkerSystem ActorSystem(SparkWorker,config)//创建SparkWorker 的引用/代理val sparkWorkerRef sparkWorkerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort.toInt,masterName)), s${workerName})//启动actorsparkWorkerRef ! start}
}
分别定义发送注册信息的RegisterWorkerInfo的样例类WorkerInfo消息类定义注册成功的消息样例对象RegisteredWorkerInfo心跳信息样例类HeartBeat以及确认发送心跳信息样例对象SendHeartBeat触发超时work的样例对象StartTimeOutWorker移除超时worker的样例对象RemoveTimeOutWorker。
代码如下
// worker注册信息 //MessageProtocol.scala
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)// 这个是WorkerInfo, 这个信息将来是保存到master的 hm(该hashmap是用于管理worker)
// 将来这个WorkerInfo会扩展比如增加worker上一次的心跳时间
class WorkerInfo(val id: String, val cpu: Int, val ram: Int) {var lastHeartBeat : Long System.currentTimeMillis()
}// 当worker注册成功服务器返回一个RegisteredWorkerInfo 对象
case object RegisteredWorkerInfo//worker每隔一定时间由定时器发给自己的一个消息
case object SendHeartBeat
//worker每隔一定时间由定时器触发而向master发现的协议消息
case class HeartBeat(id: String)//master给自己发送一个触发检查超时worker的信息
case object StartTimeOutWorker
// master给自己发消息检测worker,对于心跳超时的.
case object RemoveTimeOutWorker 运行效果 通过这个案例我们可以深入理解Spark的Master和Worker的通讯机制为了方便以后对Spark的底层源码的学习命名的方式和源码保持一致.(如 通讯消息类命名就是一样的)同时也加深了我们对主从服务心跳检测机制(HeartBeat)的理解方便以后spark源码二次开发。