wamp协议实现rpc调用

组成:

  • 路由: 分发rpc的服务器 可以现在go语言版的,https://github.com/jcelliott/turnpike
  • server: 函数提供者
  • client:函数调用方
    server和client 可以选择 https://wamp-proto.org/implementations/index.html
    python 例子 http://autobahn.readthedocs.io/en/latest/wamp/examples.html

python为例 server端


import datetime from twisted.internet.defer import inlineCallbacks from autobahn.twisted.wamp import ApplicationSession,ApplicationRunner # from autobahn_autoreconnect import ApplicationRunner class Component(ApplicationSession): """ A simple time service application component. """ @inlineCallbacks def onJoin(self, details): print("session attached") def utcnow(): now = datetime.datetime.utcnow() return now.strftime("%Y-%m-%dT%H:%M:%SZ") try: yield self.register(utcnow, u'com.timeservice.now') except Exception as e: print("failed to register procedure: {}".format(e)) else: print("procedure registered") if __name__ == '__main__': runner = ApplicationRunner(u"ws://xxx.xxx.com:31000/ws", u"pwd") runner.run(Component, auto_reconnect=True)

客户端

from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks

from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner


class Component(ApplicationSession):
    """
    An application component using the time service.
    """

    @inlineCallbacks
    def onJoin(self, details):
        print("session attached")
        try:
            now = yield self.call(u'com.timeservice.now')
        except Exception as e:
            print("Error: {}".format(e))
        else:
            print("Current time from time service: {}".format(now))

        self.leave()

    def onDisconnect(self):
        print("disconnected")
        reactor.stop()


if __name__ == '__main__':
    runner = ApplicationRunner(u"ws://xxx.xxx.com:31000/ws", u"pwd")
    runner.run(Component, auto_reconnect=True)

经过一段时间的测试发现turnpike路由存在一个问题:
当server意外断网后(拔网线这种), turnpike不会发现这个server已掉线, 重启server注册时提示函数已经存在, 必须重启turnpike

现改用https://github.com/gammazero/nexus

后测试发现nexus也有这个问题,测试方法,但拔掉客户端的网线再插上,报提示"REGISTER for already registered procedure"
目前通过修改nexus的代码解决,使得收到注册消息时永远是替换方式
dealer.go:register函数

var created string
    var regID wamp.ID
    // If no existing registration found for the procedure, then create a new
    // registration.
    if reg == nil {  //改成 if true {
        regID = d.idGen.Next()
        created = wamp.NowISO8601()
        reg = &registration{

dealer.go delCalleeReg函数

    if len(reg.callees) == 0 {
        delete(d.registrations, regID)
        switch reg.match {
        default:
            if d.procRegMap[reg.procedure] != nil && d.procRegMap[reg.procedure].id ==  regID {
                delete(d.procRegMap, reg.procedure)
            }

2018.12.10
使用未按上面修改过的最新的nexus,加上autobahn js或python版可以实现rpc callee的负载均衡

    session.register('com.myapp.add2', add2, {
        'invoke': "random"  //负载方式 "single", "roundrobin", "random", "first", "last"
    });
python
yield self.register(utcnow, u'com.timeservice.now1', RegisterOptions(invoke=u'random'))

当时使用最新的autobahn python版客户端会提示注册失败。

性能测试

单核本机caller +本机callee 每秒大约可以到2500个rpc请求 cpu跑满

异常问题

启动两个callee,然后caller循环调用rpc,这时kill掉其中一个callee,有几率caller的call函数不再返回卡死,估计是nexus的bug
用crossbar.io做router就不存在这个问题,而且crossbar.io貌似支持集群,可以规避router的单点故障,但crossbar.io的单核性能不如nexus

此条目发表在python分类目录。将固定链接加入收藏夹。

发表评论