一、ZMQ 是什么
阅读了 ZMQ 的 后,我的理解是,这是个类似于 Socket 的一系列接口,他跟 Socket 的区别是:普通的 socket 是端到端的(1:1的关系),而 ZMQ 却是可以N:M 的关系,人们对 BSD 套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而 ZMQ 屏蔽了这些细节,让你的网络编程更为简单。ZMQ 用于 node 与 node 间的通信,node 可以是主机或者是进程。
二、本文的目的
在集群对外提供服务的过程中,我们有很多的配置,需要根据需要随时更新,那么这个信息如何推动到各个节点?并且保证信息的一致性和可靠性?本文在介绍 ZMQ 基本理论的基础上,试图使用 ZMQ 实现一个配置分发中心。从一个节点,将信息无误的分发到各个服务器节点上,并保证信息正确性和一致性。
三、ZMQ 的三个基本模型
1.请求回应模型。由请求端发起请求,并等待回应端回应请求。从请求端来看,一定是一对对收发配对的;
反之,在回应端一定是发收对。请求端和回应端都可以是1:N的模型。通常把1认为是server,N认为是Client。0MQ可以很好的支持路由功能(实现路由功能的组件叫做Device),把1:N扩展为N:M(只需要加入若干路由节点)。从这个模型看,更底层的端点地址是对上层隐藏的。每个请求都隐含回应地址,而应用则不关心它。2.发布订阅模型。这个模型里,发布端是单向只发送数据的,且不关心是否把全部的信息都发送给订阅者。如果发布端开始发布信息的时候,订阅端尚未连接上,这些信息直接丢弃。不过一旦订阅端连接上来,中间会保证没有信息丢失。同样,订阅端则只负责接收,而不能反馈。如果发布端和订阅端需要交互(比如要确认订阅者是否已经连接上),则使用额外的socket采用请求回应模型满足这个需求。3.管道模型。这个模型里,管道是单向的,从PUSH端单向的向PULL端单向的推送数据流。ZMQ的请求答复模型
ZMQ 的 hello world!
由 Client 发起请求,并等待 Server 回应请求。请求端发送一个简单的 hello,服务端则回应一个 world
request.py
1 # !/usr/bin/python 2 # coding=utf-8 3 # 4 # Hello World client in Python 5 # Connects REQ socket to tcp://localhost:5555 6 # Sends "Hello" to server, expects "World" back 7 8 import zmq 9 10 context = zmq.Context()11 12 # Socket to talk to server 13 print("Connecting to hello world server..." )14 socket = context.socket(zmq.REQ) 15 socket.connect("tcp://localhost:5555") 16 17 # Do 10 requests, waiting each time for a response 18 for request in range (1,10): 19 print("Sending request %d ..." % request)20 socket.send(b"Hello")21 # Get the reply. 22 message = socket.recv() 23 print("Received reply [%s]" % message)
lxy@lenovo-pc:~/code/python/zmq$ python3 request.py Connecting to hello world server...Sending request 1 ...Received reply [b'World']Sending request 2 ...Received reply [b'World']Sending request 3 ...Received reply [b'World']Sending request 4 ...Received reply [b'World']Sending request 5 ...Received reply [b'World']Sending request 6 ...Received reply [b'World']Sending request 7 ...Received reply [b'World']Sending request 8 ...Received reply [b'World']Sending request 9 ...Received reply [b'World']
reply.py
1 # !/usr/bin/python 2 # coding=utf-8 3 # 4 # Hello World server in Python 5 # Binds REP socket to tcp://*:5555 6 # Expects "Hello" from client, replies with "World" 7 8 import zmq 9 import time10 11 context = zmq.Context()12 socket = context.socket(zmq.REP)13 socket.bind("tcp://*:5555")14 15 while True:16 message = socket.recv()17 print("Received request: %s" % message)18 #time.sleep(1)19 socket.send_string("World")
lxy@lenovo-pc:~/code/python/zmq$ python3 reply.py Received request: b'Hello'Received request: b'Hello'Received request: b'Hello'Received request: b'Hello'Received request: b'Hello'Received request: b'Hello'Received request: b'Hello'Received request: b'Hello'Received request: b'Hello'
ZMQ的发布订阅模型
一个广播server为现场足球赛
publish.py
1 # !/usr/bin/python 2 # coding=utf-8 3 4 import zmq 5 from random import choice 6 7 context = zmq.Context() 8 socket = context.socket(zmq.PUB) 9 socket.bind("tcp://127.0.0.1:5000")10 11 countries = ["netherlands","brazil","germany","portugal"]12 events = ['yellow card', 'red card', 'goal', 'corner', 'foul'] 13 14 while True:15 msg = choice(countries)+" "+choice(events)16 print("-> %s" % msg)17 socket.send(msg.encode(encoding='utf-8'))
-> portugal corner -> portugal yellow card -> portugal goal -> netherlands yellow card -> germany yellow card -> brazil yellow card -> portugal goal -> germany corner
subscribe.py
1 # !/usr/bin/python 2 # coding=utf-8 3 import zmq 4 5 context = zmq.Context() 6 socket = context.socket(zmq.SUB) 7 socket.connect("tcp://127.0.0.1:5000") 8 socket.setsockopt(zmq.SUBSCRIBE, b"netherlands") 9 socket.setsockopt(zmq.SUBSCRIBE, b"germany") 10 11 while True: 12 print(socket.recv())
netherlands red card netherlands goal netherlands red card germany foul netherlands yellow card germany foul netherlands goal netherlands corner germany foul netherlands corner
ZMQ的管道模型
ventilator.py
1 # !/usr/bin/python 2 # coding=utf-8 3 # Task ventilator 4 # Binds PUSH socket to tcp://localhost:5557 5 # Sends batch of tasks to workers via that socket 6 7 import time 8 import zmq 9 10 context = zmq.Context() 11 socket = context.socket(zmq.PUSH) 12 #print(dir(socket))13 #exit(0)14 socket.bind("tcp://127.0.0.1:5557") 15 16 input("Press Enter when the workers are ready: ")17 print("Sending tasks to workers ... \n")18 19 # send 100 tasks20 for task_nbr in range(1, 101):21 socket.send_string("task %d" % task_nbr)22 23 time.sleep(1)
worker.py
1 # !/usr/bin/python 2 # coding=utf-8 3 #Task worker 4 #Connects PULL socket to tcp://localhost:5557 5 #Collects workloads from ventilator via that socket 6 #Connects PUSH socket to tcp://localhost:5558 7 #Sends results to sink via that socket 8 9 import time10 import zmq11 12 context = zmq.Context() 13 14 receiver = context.socket(zmq.PULL) 15 receiver.connect("tcp://127.0.0.1:5557") 16 17 sender = context.socket(zmq.PUSH)18 sender.connect("tcp://127.0.0.1:5558")19 20 while True:21 str = receiver.recv()22 print("Received reply : %s" % str)23 sender.send(str)
启动3个worker,输出结果如下
worker1
Received reply : b'task 1'Received reply : b'task 4'Received reply : b'task 7'Received reply : b'task 10'Received reply : b'task 13'Received reply : b'task 16'Received reply : b'task 19'Received reply : b'task 22'Received reply : b'task 25'Received reply : b'task 28'Received reply : b'task 31'Received reply : b'task 34'Received reply : b'task 37'Received reply : b'task 40'Received reply : b'task 43'Received reply : b'task 46'Received reply : b'task 49'Received reply : b'task 52'Received reply : b'task 55'Received reply : b'task 58'Received reply : b'task 61'Received reply : b'task 64'Received reply : b'task 67'Received reply : b'task 70'Received reply : b'task 73'Received reply : b'task 76'Received reply : b'task 79'Received reply : b'task 82'Received reply : b'task 85'Received reply : b'task 88'Received reply : b'task 91'Received reply : b'task 94'Received reply : b'task 97'Received reply : b'task 100'
worker2
Received reply : b'task 2'Received reply : b'task 5'Received reply : b'task 8'Received reply : b'task 11'Received reply : b'task 14'Received reply : b'task 17'Received reply : b'task 20'Received reply : b'task 23'Received reply : b'task 26'Received reply : b'task 29'Received reply : b'task 32'Received reply : b'task 35'Received reply : b'task 38'Received reply : b'task 41'Received reply : b'task 44'Received reply : b'task 47'Received reply : b'task 50'Received reply : b'task 53'Received reply : b'task 56'Received reply : b'task 59'Received reply : b'task 62'Received reply : b'task 65'Received reply : b'task 68'Received reply : b'task 71'Received reply : b'task 74'Received reply : b'task 77'Received reply : b'task 80'Received reply : b'task 83'Received reply : b'task 86'Received reply : b'task 89'Received reply : b'task 92'Received reply : b'task 95'Received reply : b'task 98'
worker3
Received reply : b'task 3'Received reply : b'task 6'Received reply : b'task 9'Received reply : b'task 12'Received reply : b'task 15'Received reply : b'task 18'Received reply : b'task 21'Received reply : b'task 24'Received reply : b'task 27'Received reply : b'task 30'Received reply : b'task 33'Received reply : b'task 36'Received reply : b'task 39'Received reply : b'task 42'Received reply : b'task 45'Received reply : b'task 48'Received reply : b'task 51'Received reply : b'task 54'Received reply : b'task 57'Received reply : b'task 60'Received reply : b'task 63'Received reply : b'task 66'Received reply : b'task 69'Received reply : b'task 72'Received reply : b'task 75'Received reply : b'task 78'Received reply : b'task 81'Received reply : b'task 84'Received reply : b'task 87'Received reply : b'task 90'Received reply : b'task 93'Received reply : b'task 96'Received reply : b'task 99'
sink.py
1 # !/usr/bin/python 2 # coding=utf-8 3 4 import time 5 import zmq 6 7 context = zmq.Context() 8 receiver = context.socket(zmq.PULL) 9 receiver.bind("tcp://127.0.0.1:5558")10 11 while True:12 str = receiver.recv()13 print("received: %s" % str)
received: b'task 1'received: b'task 4'received: b'task 7'received: b'task 10'received: b'task 13'received: b'task 16'received: b'task 19'received: b'task 22'received: b'task 25'received: b'task 28'received: b'task 31'received: b'task 34'received: b'task 37'received: b'task 40'received: b'task 43'received: b'task 46'received: b'task 49'received: b'task 52'received: b'task 55'received: b'task 58'received: b'task 61'received: b'task 64'received: b'task 67'received: b'task 70'received: b'task 73'received: b'task 76'received: b'task 79'received: b'task 82'received: b'task 85'received: b'task 88'received: b'task 91'received: b'task 94'received: b'task 97'received: b'task 100'received: b'task 2'received: b'task 5'received: b'task 8'received: b'task 11'received: b'task 14'received: b'task 17'received: b'task 20'received: b'task 23'received: b'task 26'received: b'task 29'received: b'task 32'received: b'task 35'received: b'task 38'received: b'task 41'received: b'task 44'received: b'task 47'received: b'task 50'received: b'task 53'received: b'task 56'received: b'task 59'received: b'task 62'received: b'task 65'received: b'task 68'received: b'task 71'received: b'task 74'received: b'task 77'received: b'task 80'received: b'task 83'received: b'task 86'received: b'task 89'received: b'task 92'received: b'task 95'received: b'task 98'received: b'task 3'received: b'task 6'received: b'task 9'received: b'task 12'received: b'task 15'received: b'task 18'received: b'task 21'received: b'task 24'received: b'task 27'received: b'task 30'received: b'task 33'received: b'task 36'received: b'task 39'received: b'task 42'received: b'task 45'received: b'task 48'received: b'task 51'received: b'task 54'received: b'task 57'received: b'task 60'received: b'task 63'received: b'task 66'received: b'task 69'received: b'task 72'received: b'task 75'received: b'task 78'received: b'task 81'received: b'task 84'received: b'task 87'received: b'task 90'received: b'task 93'received: b'task 96'received: b'task 99'
从上面的输出可以看出,ventilator分配的100个任务被平均分配到了3个worker,最后由sink汇总
四、其他扩展模式
通常,一个节点,即可以作为 Server,同时也能作为 Client,通过 PipeLine 模型中的 Worker,他向上连接着任务分发,向下连接着结果搜集的 Sink 机器。因此,我们可以借助这种特性,丰富的扩展原有的三种模式。例如,一个代理 Publisher,作为一个内网的 Subscriber 接受信息,同时将信息,转发到外网,其结构图如图 4 所示。
五、多个服务器
ZMQ 和 Socket 的区别在于,前者支持N:M的连接,而后者则只是1:1的连接,那么一个 Client 连接多个 Server 的情况是怎样的呢,我们通过图 5 来说明。
server1.py
1 import zmq 2 context = zmq.Context() 3 socket = context.socket(zmq.REP) 4 socket.bind("tcp://127.0.0.1:5000") 5 6 while True: 7 msg = socket.recv() 8 print "Got", msg 9 socket.send(msg)
server2.py
1 import zmq 2 context = zmq.Context() 3 socket = context.socket(zmq.REP) 4 socket.bind("tcp://127.0.0.1:6000") 5 6 while True: 7 msg = socket.recv() 8 print "Got", msg 9 socket.send(msg)
client.py
1 import zmq 2 context = zmq.Context() 3 socket = context.socket(zmq.REQ) 4 socket.connect("tcp://127.0.0.1:5000") 5 socket.connect("tcp://127.0.0.1:6000") 6 7 for i in range(10): 8 msg = "msg %s" % i 9 socket.send(msg) 10 print "Sending", msg 11 msg_in = socket.recv()
会发现client的请求会被均衡的分配给两个server
Example client output:
Sending msg 0 Sending msg 1 Sending msg 2 Sending msg 3 Sending msg 4 Sending msg 5 Sending msg 6 Sending msg 7 Sending msg 8 Sending msg 9
Example output server 1 at port 5000:
Got msg 0 Got msg 2 Got msg 4 Got msg 6 Got msg 8
Example output server 2 at port 6000:
Got msg 1 Got msg 3 Got msg 5 Got msg 7 Got msg 9