网站忧化 优帮云,专业网站建设基本流程,海南开发公司,商标注册查询官网入口官方EDA#xff08;Event-Driven Architecture#xff09;是一种实现组件之间松耦合、易扩展的架构方式。一个最简单的EDA设计需要包含如下几个组件#xff1a; Events#xff1a;需要被处理的数据。一个Event至少包含两个属性#xff0c;类型和数据#xff0c;类型决定了Eve… EDAEvent-Driven Architecture是一种实现组件之间松耦合、易扩展的架构方式。一个最简单的EDA设计需要包含如下几个组件 Events需要被处理的数据。一个Event至少包含两个属性类型和数据类型决定了Events被哪个Handler处理数据是Handler中代加工的材料。 Event Handlers处理Events的方式方法。一般是一些方法操作。 Event Loop维护Events和Event Handlers之间的交互流程。接收所有的Event然后将其分配给合适的Handler处理。 MessageEvent无论是在同步还是异步的EDA中没有使用任何同步方式进行控制根本原因是Event被设计成了不可改变对象因为Event在经过每一个ChannelHandler的时候都会创建一个全新的Event多个线程之间不会出现资源竞争不需要同步的保护。
同步方案代码
public interface Message {
Class? extends Message getType();
}
public interface ChannelE extends Message{
void dispatch(E message);
}
public interface DynamicRouterE extends Message {
void registerChannel(Class? extends E messageType,Channel? extends E channel);
void dispatch(E message) throws MessageMatcherException;
}
public class Event implements Message{
Override
public Class? extends Message getType() {
return getClass();
}
}
import java.util.HashMap;
import java.util.Map;public class EventDispatcher implements DynamicRouterMessage{
private final MapClass? extends Message,Channel routerTable;public EventDispatcher() {
this.routerTablenew HashMap();
}Override
public void registerChannel(Class? extends Message messageType, Channel? extends Message channel) {
this.routerTable.put(messageType, channel);
}Override
public void dispatch(Message message) throws MessageMatcherException {
if(this.routerTable.containsKey(message.getType())){
this.routerTable.get(message.getType()).dispatch(message);
}else {
throw new MessageMatcherException(Cant match the channel for [message.getType()] type);
}
}}
public class MessageMatcherException extends Exception {
public MessageMatcherException(String message) {
super(message);
}
}
public class EventDispatcherExample {static class InputEvent extends Event{
private final int x;
private final int y;
public InputEvent(int x,int y) {
this.xx;
this.yy;
}
public int getX() {
return x;
}
public int getY() {
return y;
}
}
static class ResultEvent extends Event{
private final int result;
public ResultEvent(int result) {
this.resultresult;
}
public int getResult() {
return result;
}
}static class ResultEventHandler implements ChannelResultEvent{
Override
public void dispatch(ResultEvent message) {
System.out.println(The result is:message.getResult());
}
}static class InputEventHandler implements ChannelInputEvent{
private final EventDispatcher dispacher;
public InputEventHandler(EventDispatcher dispatcher) {
this.dispacherdispatcher;
}Override
public void dispatch(InputEvent message){
System.out.printf(X:%d,Y:%d\n,message.getX(),message.getY());
int resultmessage.getX()message.getY();
try {
this.dispacher.dispatch(new ResultEvent(result));
} catch (MessageMatcherException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}public static void main(String[] args) {
EventDispatcher dispatchernew EventDispatcher();
dispatcher.registerChannel(InputEvent.class, new InputEventHandler(dispatcher));
dispatcher.registerChannel(ResultEvent.class, new ResultEventHandler());
try {
dispatcher.dispatch(new InputEvent(1,2));
} catch (MessageMatcherException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}}
异步方案代码是在共用了部分同步代码之后形成的
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public abstract class AsyncChannel implements ChannelEvent{
private final ExecutorService executorService;public AsyncChannel(ExecutorService executorService) {
this.executorServiceexecutorService;
}public AsyncChannel() {
this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2));
}public final void dispatch(Event message) {
this.executorService.submit(()-this.handle(message));
}protected abstract void handle(Event message);void stop() {
if(null!this.executorService!this.executorService.isShutdown()) {
this.executorService.shutdown();
}
}}
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;public class AsyncEventDispatcher implements DynamicRouterEvent{
private final MapClass? extends Event,AsyncChannel routerTable;public AsyncEventDispatcher() {
this.routerTablenew ConcurrentHashMap();
}Override
public void registerChannel(Class? extends Event messageType, Channel? extends Event channel) {
if(!(channel instanceof AsyncChannel)) {
throw new IllegalArgumentException(The channel must be AsyncChannel type.);
}
this.routerTable.put(messageType, (AsyncChannel)channel);
}Override
public void dispatch(Event message) throws MessageMatcherException {
if(this.routerTable.containsKey(message.getType())) {
this.routerTable.get(message.getType()).dispatch(message);
}else {
throw new MessageMatcherException(Cant match the channel for [message.getType()] type);
}
}public void shutdown() {
this.routerTable.values().forEach(AsyncChannel::stop);
}
}
import java.util.concurrent.TimeUnit;public class AsyncEventDispatcherExample {static class AsyncInputEventHandler extends AsyncChannel{
private final AsyncEventDispatcher dispatcher;
AsyncInputEventHandler(AsyncEventDispatcher dispatcher){
this.dispatcherdispatcher;
}
Override
protected void handle(Event message) {
EventDispatcherExample.InputEvent inputEvent(EventDispatcherExample.InputEvent) message;
System.out.printf(X:%d,Y:%d\n,inputEvent.getX(),inputEvent.getY());
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int result inputEvent.getX()inputEvent.getY();
try {
this.dispatcher.dispatch(new EventDispatcherExample.ResultEvent(result));
} catch (MessageMatcherException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}static class AsyncResultEventHandler extends AsyncChannel{
Override
protected void handle(Event message) {
EventDispatcherExample.ResultEvent resultEvent(EventDispatcherExample.ResultEvent) message;
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(the result is:resultEvent.getResult());
}
}public static void main(String[] args) {
AsyncEventDispatcher dispatchernew AsyncEventDispatcher();
dispatcher.registerChannel(EventDispatcherExample.InputEvent.class, new AsyncInputEventHandler(dispatcher));
dispatcher.registerChannel(EventDispatcherExample.ResultEvent.class, new AsyncResultEventHandler());
try {
dispatcher.dispatch(new EventDispatcherExample.InputEvent(1, 2));
} catch (MessageMatcherException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}}