南通免费网站建设,网站前台如何做访问量显示,农产品跨境电商平台有哪些,简述网站建设的基本流程图文章目录 原理创建分布式事件总线实现自动订阅和事件转发 使用启动Redis服务配置传递Abp默认事件传递自定义事件 项目地址 原理
本地事件总线是通过Ioc容器来实现的。
IEventBus接口定义了事件总线的基本功能#xff0c;如注册事件、取消注册事件、触发事件等。
Abp.Events… 文章目录 原理创建分布式事件总线实现自动订阅和事件转发 使用启动Redis服务配置传递Abp默认事件传递自定义事件 项目地址 原理
本地事件总线是通过Ioc容器来实现的。
IEventBus接口定义了事件总线的基本功能如注册事件、取消注册事件、触发事件等。
Abp.Events.Bus.EventBus是本地事件总线的实现类其中私有成员ConcurrentDictionaryType, ListIEventHandlerFactory _handlerFactories是事件订阅表。通过维护事件订阅表来实现事件处理器的注册和取消注册。当对应类型的事件触发时通过订阅表查找所有事件处理器通过Ioc容器来获取处理器实例然后通过反射来调用事件处理器的HandleEvent方法。
创建分布式事件总线
首先我们需要一个分布式事件总线中间件用来将事件从本地事件总线转发到分布式事件总线。常用的中间件有RabbitMQ、Kafka、Redis等。
开源社区已经有实现好的库本项目参考了 wuyi6216/Abp.RemoteEventBus
这里已经定义好了一个分布式事件总线接口 public interface IDistributedEventBus : IDisposable
{void MessageHandle(string topic, string message);void Publish(IDistributedEventData eventData);void Subscribe(string topic);void Unsubscribe(string topic);void UnsubscribeAll();
}
为了兼容本地事件总线我们需要定义一个分布式事件总线接口继承自IEventBus接口。 public interface IMultipleEventBus : IDistributedEventBus, IEventBus
{}
实现自动订阅和事件转发
当注册本地事件时将订阅分布式事件事件Topic为类型的字符串表现形式
public IDisposable Register(Type eventType, IEventHandlerFactory factory)
{GetOrCreateHandlerFactories(eventType);ListIEventHandlerFactory currentLists;if (_handlerFactories.TryGetValue(eventType, out currentLists)){lock (currentLists){if (currentLists.Count 0){//Register to distributed eventthis.Subscribe(eventType.ToString());}currentLists.Add(factory);}}return new FactoryUnregistrar(this, eventType, factory);
}
创建TriggerRemote此方法用于将本地事件参数打包成为分布式事件消息payload并发布该消息
public void TriggerRemote(Type eventType, object eventSource, IEventData eventData)
{var exceptions new ListException();eventData.EventSource eventSource;try{var payloadDictionary new Dictionarystring, object{{ PayloadKey, eventData }};var distributedeventData new DistributedEventData(eventType.ToString(), payloadDictionary);Publish(distributedeventData);}catch (Exception ex){exceptions.Add(ex);}if (exceptions.Any()){if (exceptions.Count 1){exceptions[0].ReThrow();}throw new AggregateException(More than one error has occurred while triggering the event: eventType, exceptions);}
}
当触发本地事件时将消息转发至分布式事件总线。 在Trigger方法中调用TriggerRemote事件状态回调和事件异常回调将不会被转发。
if (!(typeof(DistributedEventBusEvent) eventType|| typeof(DistributedEventBusEvent).IsAssignableFrom(eventType)|| typeof(DistributedEventMessageHandleExceptionData) eventType|| typeof(DistributedEventHandleExceptionData) eventType))
{if (typeof(DistributedEventArgs) ! eventType){TriggerRemote(eventType, eventSource, eventData);}
}在消费端接收到分布式事件消息时从Topic中解析类型转发给本地事件。若此类型在本地事件注册过则将消息反序列化为本地事件参数然后触发本地事件。 本地事件处理器将触发最终的处理方法。 public virtual void MessageHandle(string topic, string message)
{Logger.Debug($Receive message on topic {topic});try{var eventData _remoteEventSerializer.DeserializeDistributedEventData(message);var eventArgs new DistributedEventArgs(eventData, topic, message);Trigger(this, new DistributedEventBusHandlingEvent(eventArgs));if (!string.IsNullOrEmpty(eventData.Type)){string pattern (.*?)\[(.*?)\];Match match Regex.Match(eventData.Type, pattern);if (match.Success){var type match.Groups[1].Value;var type2 match.Groups[2].Value;var localTriggerType typeFinder.Find(c c.FullName type).FirstOrDefault();var genericType typeFinder.Find(c c.FullName type2).FirstOrDefault();if (localTriggerType ! null genericType ! null){if (localTriggerType.GetTypeInfo().IsGenericType localTriggerType.GetGenericArguments().Length 1 !genericType.IsAbstract !genericType.IsInterface){var localTriggerGenericType localTriggerType.GetGenericTypeDefinition().MakeGenericType(genericType);if (eventData.Data.TryGetValue(PayloadKey, out var payload)){var payloadObject (payload as JObject).ToObject(localTriggerGenericType);Trigger(localTriggerGenericType, this, (IEventData)payloadObject);}}}}else{var localTriggerType typeFinder.Find(c c.FullName eventData.Type).FirstOrDefault();if (localTriggerType ! null !localTriggerType.IsAbstract !localTriggerType.IsInterface){if (eventData.Data.TryGetValue(PayloadKey, out var payload)){var payloadObject (payload as JObject).ToObject(localTriggerType);Trigger(localTriggerType, this, (IEventData)payloadObject);}}}Trigger(this, new DistributedEventBusHandledEvent(eventArgs));}}catch (Exception ex){Logger.Error(Consume remote message exception, ex);Trigger(this, new DistributedEventMessageHandleExceptionData(ex, topic, topic));}
}
使用
DistributedEventBus有不同的实现方式这里以Redis为例
启动Redis服务
下载Redis并启动服务使用默认端口6379
配置
生产者和消费者端都需要配置分布式事件总线
首先引用Abp.DistributedEventBus.Redis并配置Abp模块依赖
[DependsOn(typeof(AbpDistributedEventBusRedisModule))]
在PreInitialize方法中配置Redis连接信息 Configuration.Modules.DistributedEventBus().UseRedis().Configure(setting {setting.Server 127.0.0.1:6379;});
用MultipleEventBus替换Abp默认事件总线 //todo: 事件总线Configuration.ReplaceService(typeof(IEventBus),() IocManager.IocContainer.Register(Component.ForIEventBus().ImplementedByMultipleEventBus()));传递Abp默认事件
我们知道在使用仓储时Abp会自动触发一些事件如创建、更新、删除等。我们来测试这些事件是否能通过分布式事件总线来传递。
定义一个实体类用于传递实体的增删改事件。 public class Person : FullAuditedEntityint
{public string Name { get; set; }public int Age { get; set; }public string PhoneNumber { get; set; }}
在消费者端定义一个事件处理器用于处理实体的增删改事件。 public class RemoteEntityChangedEventHandler :IEventHandlerEntityUpdatedEventDataPerson,IEventHandlerEntityCreatedEventDataPerson,IEventHandlerEntityDeletedEventDataPerson,ITransientDependency
{void IEventHandlerEntityUpdatedEventDataPerson.HandleEvent(EntityUpdatedEventDataPerson eventData){var person eventData.Entity;Console.WriteLine($Remote Entity Updated - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber});}void IEventHandlerEntityCreatedEventDataPerson.HandleEvent(EntityCreatedEventDataPerson eventData){var person eventData.Entity;Console.WriteLine($Remote Entity Created - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber});}void IEventHandlerEntityDeletedEventDataPerson.HandleEvent(EntityDeletedEventDataPerson eventData){var person eventData.Entity;Console.WriteLine($Remote Entity Deleted - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber});}
}
在生产者端用IRepository对实体进行增删改操作。 var person new Person()
{Name John,Age 36,PhoneNumber 18588888888};personRepository.Insert(person);var person2 new Person()
{Name John2,Age 36,PhoneNumber 18588888889};
personRepository.Insert(person2);var persons personRepository.GetAllList();
foreach (var p in persons)
{p.Age 1;personRepository.Update(p);Console.WriteLine($Entity Updated - Name:{p.Name}, Age:{p.Age}, PhoneNumber:{p.PhoneNumber});}
foreach (var p in persons)
{personRepository.Delete(p);Console.WriteLine($Entity Deleted - Name:{p.Name}, Age:{p.Age}, PhoneNumber:{p.PhoneNumber});}
运行程序同时运行消费者端和生产者端可以看到消费者端打印出了实体的增删改事件。 注意
分布式事件总线在两个独立系统间传递事件所以需要定义一个共同的类型对象用于事件参数的传递。 因此消费者端需要引用生产者端的模块以便获取共同的类型对象。
public override Assembly[] GetAdditionalAssemblies()
{var clientModuleAssembly typeof(Person).GetAssembly();return [clientModuleAssembly];
}传递自定义事件
定义NotificationEventData用于传递自定义事件。 public class NotificationEventData : EventData
{public int Id { get; set; }public string Title { get; set; }public string Message { get; set; }public bool IsRead { get; set; }
}在消费者端定义一个事件处理器用于处理自定义事件。
public class NotificationEventHandler :IEventHandlerNotificationEventData, ITransientDependency
{void IEventHandlerNotificationEventData.HandleEvent(NotificationEventData eventData){Console.WriteLine($Id: {eventData.Id});Console.WriteLine($Title: {eventData.Title});Console.WriteLine($Message: {eventData.Message});Console.WriteLine($IsRead: {eventData.IsRead});}
}在生产者端触发自定义事件。
var eventBus IocManager.Instance.ResolveIEventBus();eventBus.TriggerNotificationEventData(new NotificationEventData()
{Title Hi,Message Customized definition event test!,Id 100,IsRead true,
});运行程序同时运行消费者端和生产者端可以看到消费者端打印出了自定义事件。 项目地址
Github:DistributedEventBus