博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
分布式EventBus的Socket实现 - 发布订阅
阅读量:6154 次
发布时间:2019-06-21

本文共 4190 字,大约阅读时间需要 13 分钟。

在这篇文章中, 所适用的范围只是本机的事件传播,要是牵涉到多台服务器之间的事件传播就不行了,解决办法有用msmq解决的,Node.js解决的,也有用redis的发布订阅解决的,这次用C# socket来实现,能实现立刻推送事件到所有订阅了相关event的server上。

这次的子系统适用的场景如下:

 

主要分2个部分:各个server使用的Event Bus Broker以及Event Bus Server。

Broker与Server之间的通信协议就3个:ME、Subscribe、Publish。分别代表:我的名字是、我要订阅的事件是、我触发事件。

Event Bus Server是基于SuperSocket开源组件写的,非常方便。实现了3个对应的命令类(建议大家先看看的文档),如下: 

public class ME : CommandBase
{ public override void ExecuteCommand(EventDispatcherBusSession session, StringRequestInfo requestInfo) { session.AssociatedServerIdentity = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[0]); Console.WriteLine(string.Format("[{0}]: connected.", session.AssociatedServerIdentity)); } }public class Publish : CommandBase
{ public override void ExecuteCommand(EventDispatcherBusSession session, StringRequestInfo requestInfo) { string evtClassPath = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[0]); string evtXml = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[1]); foreach(EventDispatcherBusSession s in session.AppServer.GetAllSessions()) { if (s.AssociatedServerIdentity == session.AssociatedServerIdentity) continue; if (s.SubscribedEventClasses.Contains(evtClassPath)) { s.Send("Publish " + requestInfo.Body);//forward only Console.WriteLine(string.Format("Forwarding publish command from {1} to server: [{0}]", s.AssociatedServerIdentity, session.AssociatedServerIdentity)); } } } }public class Subscribe : CommandBase
{ public override void ExecuteCommand(EventDispatcherBusSession session, StringRequestInfo requestInfo) { string classPath = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[0]); if (session.SubscribedEventClasses.Contains(classPath)) return; session.SubscribedEventClasses.Add(classPath); string msg = ""; msg+=string.Format("[{0}], now subscribed these events:\r\n", session.AssociatedServerIdentity); foreach(string evtClass in session.SubscribedEventClasses) msg += string.Format(" {0}\r\n", evtClass); Console.WriteLine(msg); } }

 

 启动server代码:

static void Main(string[] args)        {            var appServer = new EventDispatcherBusServer();            if (!appServer.Setup(2012))            {                Console.WriteLine("Failed to setup!");                Console.ReadKey();                return;            }            Console.WriteLine();            if (!appServer.Start())            {                Console.WriteLine("Failed to start!");                Console.ReadKey();                return;            }            Console.WriteLine("The server started successfully, press key 'q' to stop it!");            Console.ReadKey();            appServer.Stop();        }

 

 server端没多少好说的,都是很简单的,下面来看看客户端:

static void Main(string[] args)        {            EventBusClientBroker busBroker = new EventBusClientBroker("127.0.0.1", 2012, "App server 2", "AppEvents.dll");            busBroker.NewEventReceived += new EventReceived(busBroker_NewEventReceived);//新event被推送过来的事件            busBroker.Connect();            busBroker.Subscribe
();//订阅Event //busBroker.Close(); } static void busBroker_NewEventReceived(string evtClass, object evt) { //这里会将event对象发过来 //evtClass是evt的class全路径 if (evt is NewUserRegisteredEvent) Console.WriteLine(((NewUserRegisteredEvent)evt).UserName); else if (evt is UserProfileUpdatedEvent) Console.WriteLine(((UserProfileUpdatedEvent)evt).UserID); }

 

如果要触发一个事件,则:

NewUserRegisteredEvent evt = new NewUserRegisteredEvent();                    evt.RegisterDate = DateTime.Now;                    evt.UserName = "aaron";                    busBroker.Publish
(evt);

 

我们来看看效果图:

 

搞定,。 

 

转载地址:http://kpffa.baihongyu.com/

你可能感兴趣的文章
P1666 前缀单词
查看>>
HTML.2文本
查看>>
Ubuntu unity安装Indicator-Multiload
查看>>
解决Eclipse中新建jsp文件ISO8859-1 编码问题
查看>>
7.对象创建型模式-总结
查看>>
【论文阅读】Classification of breast cancer histology images using transfer learning
查看>>
移动端处理图片懒加载
查看>>
jQuery.on() 函数详解
查看>>
谈缓存和Redis
查看>>
【转】百度地图api,根据多点注标坐标范围计算地图缩放级别zoom自适应地图
查看>>
用户调研(补)
查看>>
ExtJS之开篇:我来了
查看>>
☆1018
查看>>
oracle 去掉空格
查看>>
6.13心得
查看>>
Runtime类
查看>>
eclipse decompiler
查看>>
记一个搜索网盘资源的网站
查看>>
jdk1.7和jdk1.8的String的getByte方法的差异
查看>>
java父子进程通信
查看>>