总结以下:ØMQ (ZeroMQ) 是一个基于消息队列的多线程网络库,它封装了网络通信、消息队列、线程调度等功能,向上层提供简洁的API,应用程序通过加载库文件,调用API函数来实现高性能网络通信。
看起来有些抽象,下面我们结合ZeroMQ 的 Python 封装―――― pyzmp,用实例看一下ZeroMQ的三种最基本的工作模式。
安装方法
pip install pyzmq
查看是否安装成功
>>> import zmq >>> print(zmq.__version__) 22.0.3
#client.py import zmq context = zmq.Context() # Socket to talk to server print("Connecting to hello world server…") socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") socket.send(b"Hello") # Get the reply. message = socket.recv() print(f"Received reply [ {message} ]")
#server.py import time import zmq context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: %s" % message) # Do some 'work' time.sleep(1) # Send reply back to client socket.send(b"World")
python client.py Connecting to hello world server… Received reply [ b'World' ]
python server.py Received request: b'Hello'
可以试一下,多运行几个client.py,看看情况是什么样的。
这里直接引用官方文档的例子:
发布者:类似于一个天气更新服务器,向订阅者发送天气更新,内容包括邮政编码、温度、湿度等信息
#Publisher.py import zmq from random import randrange context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:5556") while True: zipcode = randrange(1, 100000) temperature = randrange(-80, 135) relhumidity = randrange(10, 60) socket.send_string("%i %i %i" % (zipcode, temperature, relhumidity))
订阅者:它监听发布者更新的数据流,过滤只接收与特定邮政编码相关的天气信息,默认接收接收10条数据
#Subscribe.py import sys import zmq # Socket to talk to server context = zmq.Context() socket = context.socket(zmq.SUB) print("Collecting updates from weather server...") socket.connect("tcp://localhost:5556") # Subscribe to zipcode, default is NYC, 10001 zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001" # Python 2 - ascii bytes to unicode str if isinstance(zip_filter, bytes): zip_filter = zip_filter.decode('ascii') socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter) # Process 5 updates total_temp = 0 for update_nbr in range(5): string = socket.recv_string() zipcode, temperature, relhumidity = string.split() total_temp += int(temperature) print( "Average temperature for zipcode '%s' was %dF" % (zip_filter, total_temp / (update_nbr + 1)) )
ventilator 使用的是 SOCKET_PUSH,将任务分发到 Worker 节点上。Worker 节点上,使用 SOCKET_PULL 从上游接受任务,并使用 SOCKET_PUSH 将结果汇集到 Sink。值得注意的是,任务的分发的时候也同样有一个负载均衡的路由功能,worker 可以随时自由加入,ventilator 可以均衡将任务分发出去。
Push/Pull模式还是蛮常用的,这里我们主要测试一下它的负载均衡。
# ventilator.py import zmq import time context = zmq.Context() socket = context.socket(zmq.PUSH) socket.bind("tcp://*:5557") while True: socket.send(b"test") print("已发送") time.sleep(1)
# worker.py import zmq context = zmq.Context() recive = context.socket(zmq.PULL) recive.connect('tcp://127.0.0.1:5557') sender = context.socket(zmq.PUSH) sender.connect('tcp://127.0.0.1:5558') while True: data = recive.recv() print("work1 正在转发...") sender.send(data)
# sink.py import zmq import sys context = zmq.Context() socket = context.socket(zmq.PULL) socket.bind("tcp://*:5558") while True: response = socket.recv() print("response: %s" % response)
打开4个Terminal,分别运行
python sink.py python worker.py python worker.py python ventilator.py
消息模型可以根据需要组合使用,后续的代理模式和路由模式等都是在三种基本模式上面的扩展或变异。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:mmqy2019@163.com进行举报,并提供相关证据,查实之后,将立刻删除涉嫌侵权内容。
长按识别二维码并关注微信
更方便到期提醒、手机管理