博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[雪峰磁针石博客]python工具库介绍-dubbo:通过telnet接口访问dubbo服务
阅读量:7094 次
发布时间:2019-06-28

本文共 5415 字,大约阅读时间需要 18 分钟。

## 简介
dubbo服务发布之后,我们可以利用telnet命令进行调试、管理。更多资料参见 [Telnet命令参考手册](http://alibaba.github.io/dubbo-doc-static/Telnet+Command+Reference-zh-showComments=true&showCommentArea=true.htm)
telnet 调用示例:
```python
$ telnet 172.17.103.110 9097
Trying 172.17.103.110...
Connected to 172.17.103.110.
Escape character is '^]'.
dubbo>ls com.oppo.sso.service.onekey.IOnekeyRegister
register
dubbo>invoke com.oppo.sso.service.onekey.IOnekeyRegister.register({"applicationKey":"mac","imei":"","mobile":"13244448888","createIP":"127.0.0.1","createBy":"172.17.0.1"})
{"configCountry":null,"userIdLong":0,"appPackage":null,"appVersion":null,"accountName":null,"romVersion":null,"resultCode":3001,"thirdStatus":null,"registerType":0,"sendChannel":null,"operator":null,"manufacturer":null,"password":null,"osVersion":null,"lock":false,"model":null,"visitorLocked":false,"OK":false,"brand":null,"email":null,"createIP":null,"deny":false,"encryptEmail":null,"sessionKey":null,"thirdId":null,"passwordOriginal":null,"mobile":null,"applicationKey":null,"thirdpartyOk":false,"userAgent":null,"userName":null,"resultDesc":"应用不存在","userId":0,"encryptMobile":null,"emailStatus":null,"createBy":null,"freePwd":false,"changeTimes":0,"createTime":null,"mobileStatus":null,"oldLock":false,"codeTimeout":null,"lastUpdate":null,"imei":null,"sessionTimeout":0,"sdkVersion":null,"networkID":0,"status":null}
elapsed: 98 ms.
dubbo>
```
## Python实现
源码:
```python
"""
Name: dubbo.py
Tesed in python3.5
"""
import json
import telnetlib
import socket
class Dubbo(telnetlib.Telnet):
    prompt = 'dubbo>'
    coding = 'utf-8'
    def __init__(self, host=None, port=0,
                 timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
        super().__init__(host, port)
        self.write(b'\n')
    def command(self, flag, str_=""):
        data = self.read_until(flag.encode())
        self.write(str_.encode() + b"\n")
        return data
    def invoke(self, service_name, method_name, arg):
        command_str = "invoke {0}.{1}({2})".format(
            service_name, method_name, json.dumps(arg))
        self.command(Dubbo.prompt, command_str)
        data = self.command(Dubbo.prompt, "")
        data = json.loads(data.decode(Dubbo.coding,
                                      errors='ignore').split('\n')[0].strip())
        return data
if __name__ == '__main__':
    conn = Dubbo('172.17.103.110', 9097)
    json_data = {
        "applicationKey":"mac",
        "imei":"",
        "mobile":"13244448888",
        "createIP":"127.0.0.1",
        "createBy":"172.17.0.1"
    }
    result = conn.invoke(
        "com.oppo.sso.service.onekey.IOnekeyRegister",
        "register",
        json_data
    )
    print(result)
```
执行结果:
```python
# python3 dubbo.py
{'manufacturer': None, 'applicationKey': None, 'OK': False, 'codeTimeout': None, 'password': None, 'encryptEmail': None, 'passwordOriginal': None, 'thirdId': None, 'emailStatus': None, 'freePwd': False, 'sessionTimeout': 0, 'createTime': None, 'osVersion': None, 'lastUpdate': None, 'email': None, 'sdkVersion': None, 'registerType': 0, 'sendChannel': None, 'visitorLocked': False, 'createIP': None, 'thirdStatus': None, 'encryptMobile': None, 'networkID': 0, 'resultCode': 3001, 'brand': None, 'changeTimes': 0, 'userAgent': None, 'imei': None, 'operator': None, 'romVersion': None, 'model': None, 'lock': False, 'sessionKey': None, 'configCountry': None, 'deny': False, 'userIdLong': 0, 'resultDesc': '应用不存在', 'status': None, 'createBy': None, 'thirdpartyOk': False, 'appPackage': None, 'appVersion': None, 'accountName': None, 'userId': 0, 'oldLock': False, 'userName': None, 'mobile': None, 'mobileStatus': None}
```
更复杂的例子。
源码:
```python
import json
import telnetlib
import socket
class Dubbo(telnetlib.Telnet):
    prompt = 'dubbo>'
    coding = 'gbk'
    def __init__(self, host=None, port=0,
                 timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
        super().__init__(host, port)
        self.write(b'\n')
    def command(self, flag, str_=""):
        data = self.read_until(flag.encode())
        self.write(str_.encode() + b"\n")
        return data
    def invoke(self, service_name, method_name, arg):
        command_str = "invoke {0}.{1}({2})".format(
            service_name, method_name, json.dumps(arg))
        self.command(Dubbo.prompt, command_str)
        data = self.command(Dubbo.prompt, "")
        data = json.loads(data.decode(Dubbo.coding,
                                      errors='ignore').split('\n')[0].strip())
        return data
if __name__ == '__main__':
    conn = Dubbo('183.131.22.99', 21881)
    content = {
        "sign": "FKeKnMEPybHujjBTzz11BrulB5av7pLhJpk=",
        "partnerOrder": "0511e0d38f334319a96920fa02be02a7",
        "productDesc": "hello",
        "paySuccessTime": "2016-08-25 18:33:04",
        "price": "1",
        "count": "1",
        "attach": "from_pay_demo",
        "date": "20160825",
    }
    content_json = json.dumps(content).replace('"', '\\"')
    json_data = {
        "requestId": "0511e0d38f334319a96920fa02be02a7",
        "reqUrl": 'http://www.oppo.com',
        "httpReqType": "POST",
        "headerMap": None,
        "reqContent": content_json,
        "appId": "10001",
        "productName": "test",
        "timeStamp": "1472121184957",
        "partnerId": "2031",
        "signType": "MD5",
        "sign": "23B621FBBF887373C65E553C2222258F",
        "successResult": "OK",
    }
    result = conn.invoke(
        "com.oppo.pay.notify.api.facade.NotifyApplyService",
        "httpNotify",
        json_data
    )
    print(result)
```
执行结果:
```python
$ python3 dubbo.py
{'resMsg': 'ǩ', 'data': None, 'errorDetail': 'sign check fail!', 'resCode': '100003', 'success': False}
```
## 待改进
* 进行多个项目的实验,增加异常处理
## 参考资料
-   [本文最新版本地址](https://china-testing.github.io/python3_lib_dubbo.html)
-   [本文涉及的python测试开发库](https://github.com/china-testing/python-api-tesing) 谢谢点赞!
-   [本文相关海量书籍下载](https://github.com/china-testing/python-api-tesing/blob/master/books.md)
-   [本文源码地址](https://github.com/china-testing/python-api-tesing/tree/master/python3_libraries/_dubbo)

转载地址:http://ooaql.baihongyu.com/

你可能感兴趣的文章
修正zen cart商品评论显示太短的问题
查看>>
linux-awk
查看>>
SpringMVC框架的多表查询和增删查改
查看>>
记一次TFS 的 垃圾提示(无法下载 未获取项目 的 代码)
查看>>
NGINX基本配置与参数说明
查看>>
关于TP出现_STORAGE_WRITE_ERROR_的解决方案
查看>>
1833. [ZJOI2010]数字计数【数位DP】
查看>>
leaflet入门(四)API翻译(上)
查看>>
B.小A与任务
查看>>
勤快的love枫[ZJOI2007]
查看>>
Linux查看系统信息的一些命令及查看已安装软件包的命令
查看>>
poj1417 true liars(并查集 + DP)详解
查看>>
离散数学--二元关系总结
查看>>
HTML5 本地存储 localStorage、sessionStorage 的遍历、存储大小限制处理
查看>>
【leetcode】688. Knight Probability in Chessboard
查看>>
【leetcode】Maximum Product of Word Lengths
查看>>
C 工具库 GLib --- 提供多种高级的数据结构,如内存块、双向和单向链表、哈希表、动态字符串等...
查看>>
SQL中format()函数对应的格式
查看>>
svn command
查看>>
职业插画之路
查看>>