SpringBoot 集成 AKKA 可以参考此文:SpringBoot 集成 AKKA
场景1:bossActor 收到信息,然后发给 worker1Actor 和 worker2Actor
controller 入口,初次调用 ActorRef.noSender()
@Tag(name = "test")
@RestController
@RequestMapping("/test")
@Validated
@Inner(value = false)
public class TestController {@Resourceprivate ActorSystem actorSystem;@PostMapping("/test")@Operation(summary = "test")public R search( ) {ActorRef pcm = actorSystem.actorOf(Props.create(BossActor.class));pcm.tell("I AM MASTER.TELLING BOSS", ActorRef.noSender());return R.ok();}
}
BossActor
接受 TestController 发送的信息,同时发送消息给 worker1Actor 和 worker2Actor
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedAbstractActor;public class BossActor extends UntypedAbstractActor {private ActorRef worker1Actor;private ActorRef worker2Actor;public BossActor() {this.worker1Actor = this.context().actorOf(Props.create(Worker1Actor.class));this.worker2Actor = this.context().actorOf(Props.create(Worker2Actor.class));}@Overridepublic void onReceive(Object message) {System.out.println("Boss 收到 master 的消息:" + message);worker1Actor.tell("I AM BOSS, TELLING WORKER1", self());worker2Actor.tell("I AM BOSS, TELLING WORKER2", self());}
}
Worker1Actor 和 Worker2Actor 获取 BossActor 的消息
import akka.actor.UntypedAbstractActor;public class Worker1Actor extends UntypedAbstractActor {@Overridepublic void onReceive(Object message) {System.out.println("worker1 收到 boss 消息:" + message);}
}
import akka.actor.UntypedAbstractActor;public class Worker2Actor extends UntypedAbstractActor {@Overridepublic void onReceive(Object message) {System.out.println("worker2 收到 boss 消息:" + message);}
}
打印输出
场景2:boss 发消息给 worker1,worker1 收到后发给 worker2,worker2 完成后返回 worker1,worker1再返回给 boss
分析难点:
1、由于 Actor 只能记住最后发消息给自己的人,所以 worker1 接到 worker2 消息后,getSender 只记住 worker2 ,找不到 boss 的地址
2、Actor 的 onReceive 方法接受的是消息,没有针对发送人,worker1 需要针对不同发送人 boss 和 worker2 做不同处理
针对问题2,我们可以设计一个实体 ReceiveDTO,里面包含 sender 发送人和 message 具体传参,所有消息严格实现这个类型
@Data
public class ReceiveDTO {String sender;Object message;
}
controller 入口,初次调用 ActorRef.noSender()
@Resourceprivate ActorSystem actorSystem;@PostMapping("/test")@Operation(summary = "test")public R search() {ActorRef pcm = actorSystem.actorOf(Props.create(BossActor.class));ReceiveDTO receiveDTO = new ReceiveDTO();receiveDTO.setSender("start");receiveDTO.setMessage("start");pcm.tell(receiveDTO, ActorRef.noSender());return R.ok();}
BossActor,根据 receiveDTO.getSender() 判断发送人是谁
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedAbstractActor;public class BossActor extends UntypedAbstractActor {private ActorRef worker1Actor;public BossActor() {this.worker1Actor = this.context().actorOf(Props.create(Worker1Actor.class));}@Overridepublic void onReceive(Object message) {ReceiveDTO receiveDTO = (ReceiveDTO) message;if (receiveDTO.getSender().equals("start")) {receiveDTO.setMessage("I AM BOSS, TELLING WORKER1");receiveDTO.setSender("boss");System.out.println("boss start");worker1Actor.tell(receiveDTO, self());}if (receiveDTO.getSender().equals("worker1")) {System.out.println("boss end 收到 worker1 消息: " + receiveDTO.getMessage());}}
}
Worker1Actor 接收 boss 信息时,同时把 boss 的 ActorRef 存到 bossActor,解决难题1
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedAbstractActor;public class Worker1Actor extends UntypedAbstractActor {private ActorRef bossActor;private ActorRef worker2Actor;public Worker1Actor() {this.worker2Actor = this.context().actorOf(Props.create(Worker2Actor.class));}@Overridepublic void onReceive(Object message) {ReceiveDTO receiveDTO = (ReceiveDTO) message;if (receiveDTO.getSender().equals("boss")) {System.out.println("worker1 收到 boss 消息: " + receiveDTO.getMessage());this.bossActor = getSender();ReceiveDTO receive1 = new ReceiveDTO();receive1.setMessage("I AM WORKER1, TELLING WORKER2");worker2Actor.tell(receive1, self());}if (receiveDTO.getSender().equals("worker2")) {System.out.println("worker1 收到 worker2 消息: " + receiveDTO.getMessage());ReceiveDTO receive1 = new ReceiveDTO();receive1.setMessage("I AM WORKER1, TELLING boss, job is over");receive1.setSender("worker1");bossActor.tell(receive1, self());}}
}
Worker2Actor
import akka.actor.UntypedAbstractActor;public class Worker2Actor extends UntypedAbstractActor {@Overridepublic void onReceive(Object message) {ReceiveDTO receiveDTO = (ReceiveDTO) message;System.out.println("worker2 收到 worker1 消息:" + receiveDTO.getMessage());ReceiveDTO receive1 = (ReceiveDTO) message;receive1.setSender("worker2");receive1.setMessage("I AM WORKER2, TELLING WORKER1");getSender().tell(receive1, self());}
}
打印输出