博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ZeroMQ,史上最快的消息队列
阅读量:4602 次
发布时间:2019-06-09

本文共 11637 字,大约阅读时间需要 38 分钟。

一、ZMQ 是什么

  阅读了 ZMQ 的 后,我的理解是,这是个类似于 Socket 的一系列接口,他跟 Socket 的区别是:普通的 socket 是端到端的(1:1的关系),而 ZMQ 却是可以N:M 的关系,人们对 BSD 套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而 ZMQ 屏蔽了这些细节,让你的网络编程更为简单。ZMQ 用于 node 与 node 间的通信,node 可以是主机或者是进程。

二、本文的目的

  在集群对外提供服务的过程中,我们有很多的配置,需要根据需要随时更新,那么这个信息如何推动到各个节点?并且保证信息的一致性和可靠性?本文在介绍 ZMQ 基本理论的基础上,试图使用 ZMQ 实现一个配置分发中心。从一个节点,将信息无误的分发到各个服务器节点上,并保证信息正确性和一致性。

三、ZMQ 的三个基本模型

1.请求回应模型。由请求端发起请求,并等待回应端回应请求。从请求端来看,一定是一对对收发配对的;

反之,在回应端一定是发收对。请求端和回应端都可以是1:N的模型。通常把1认为是server,N认为是Client。
0MQ可以很好的支持路由功能(实现路由功能的组件叫做Device),把1:N扩展为N:M(只需要加入若干路由节点)。
从这个模型看,更底层的端点地址是对上层隐藏的。每个请求都隐含回应地址,而应用则不关心它。
2.发布订阅模型。这个模型里,发布端是单向只发送数据的,且不关心是否把全部的信息都发送给订阅者。
如果发布端开始发布信息的时候,订阅端尚未连接上,这些信息直接丢弃。
不过一旦订阅端连接上来,中间会保证没有信息丢失。
同样,订阅端则只负责接收,而不能反馈。
如果发布端和订阅端需要交互(比如要确认订阅者是否已经连接上),则使用额外的socket采用请求回应模型满足这个需求。
3.管道模型。这个模型里,管道是单向的,从PUSH端单向的向PULL端单向的推送数据流。

ZMQ的请求答复模型

ZMQ 的 hello world!

由 Client 发起请求,并等待 Server 回应请求。请求端发送一个简单的 hello,服务端则回应一个 world

request.py

1 # !/usr/bin/python 2 # coding=utf-8 3 #  4 # Hello World client in Python  5 # Connects REQ socket to tcp://localhost:5555  6 # Sends "Hello" to server, expects "World" back  7  8 import zmq 9 10 context = zmq.Context()11 12 # Socket to talk to server 13 print("Connecting to hello world server..." )14 socket = context.socket(zmq.REQ) 15 socket.connect("tcp://localhost:5555") 16 17 # Do 10 requests, waiting each time for a response 18 for request in range (1,10): 19     print("Sending request %d ..." % request)20     socket.send(b"Hello")21     # Get the reply. 22     message = socket.recv() 23     print("Received reply [%s]" % message)
lxy@lenovo-pc:~/code/python/zmq$ python3 request.py Connecting to hello world server...Sending request 1 ...Received reply [b'World']Sending request 2 ...Received reply [b'World']Sending request 3 ...Received reply [b'World']Sending request 4 ...Received reply [b'World']Sending request 5 ...Received reply [b'World']Sending request 6 ...Received reply [b'World']Sending request 7 ...Received reply [b'World']Sending request 8 ...Received reply [b'World']Sending request 9 ...Received reply [b'World']

reply.py

1 # !/usr/bin/python 2 # coding=utf-8 3 #  4 # Hello World server in Python  5 # Binds REP socket to tcp://*:5555  6 # Expects "Hello" from client, replies with "World"  7  8 import zmq 9 import time10 11 context = zmq.Context()12 socket = context.socket(zmq.REP)13 socket.bind("tcp://*:5555")14 15 while True:16     message = socket.recv()17     print("Received request: %s" % message)18     #time.sleep(1)19     socket.send_string("World")
lxy@lenovo-pc:~/code/python/zmq$ python3 reply.py Received request: b'Hello'Received request: b'Hello'Received request: b'Hello'Received request: b'Hello'Received request: b'Hello'Received request: b'Hello'Received request: b'Hello'Received request: b'Hello'Received request: b'Hello'

ZMQ的发布订阅模型

一个广播server为现场足球赛

publish.py

1 # !/usr/bin/python 2 # coding=utf-8 3  4 import zmq 5 from random import choice 6  7 context = zmq.Context() 8 socket = context.socket(zmq.PUB) 9 socket.bind("tcp://127.0.0.1:5000")10 11 countries = ["netherlands","brazil","germany","portugal"]12 events = ['yellow card', 'red card', 'goal', 'corner', 'foul'] 13 14 while True:15     msg = choice(countries)+" "+choice(events)16     print("-> %s" % msg)17     socket.send(msg.encode(encoding='utf-8'))
-> portugal corner -> portugal yellow card -> portugal goal -> netherlands yellow card -> germany yellow card -> brazil yellow card -> portugal goal -> germany corner

subscribe.py

1 # !/usr/bin/python 2 # coding=utf-8 3 import zmq  4  5 context = zmq.Context()  6 socket = context.socket(zmq.SUB)  7 socket.connect("tcp://127.0.0.1:5000")  8 socket.setsockopt(zmq.SUBSCRIBE, b"netherlands")  9 socket.setsockopt(zmq.SUBSCRIBE, b"germany") 10 11 while True: 12     print(socket.recv())
netherlands red card netherlands goal netherlands red card germany foul netherlands yellow card germany foul netherlands goal netherlands corner germany foul netherlands corner

ZMQ的管道模型

ventilator.py

1 # !/usr/bin/python 2 # coding=utf-8 3 # Task ventilator 4 # Binds PUSH socket to tcp://localhost:5557 5 # Sends batch of tasks to workers via that socket 6  7 import time 8 import zmq  9 10 context = zmq.Context() 11 socket = context.socket(zmq.PUSH) 12 #print(dir(socket))13 #exit(0)14 socket.bind("tcp://127.0.0.1:5557") 15 16 input("Press Enter when the workers are ready: ")17 print("Sending tasks to workers ... \n")18 19 # send 100 tasks20 for task_nbr in range(1, 101):21     socket.send_string("task %d" % task_nbr)22 23 time.sleep(1)

worker.py

1 # !/usr/bin/python 2 # coding=utf-8 3 #Task worker 4 #Connects PULL socket to tcp://localhost:5557 5 #Collects workloads from ventilator via that socket 6 #Connects PUSH socket to tcp://localhost:5558 7 #Sends results to sink via that socket 8  9 import time10 import zmq11 12 context = zmq.Context() 13 14 receiver = context.socket(zmq.PULL) 15 receiver.connect("tcp://127.0.0.1:5557") 16 17 sender = context.socket(zmq.PUSH)18 sender.connect("tcp://127.0.0.1:5558")19 20 while True:21     str = receiver.recv()22     print("Received reply : %s" % str)23     sender.send(str)

启动3个worker,输出结果如下

worker1

Received reply : b'task 1'Received reply : b'task 4'Received reply : b'task 7'Received reply : b'task 10'Received reply : b'task 13'Received reply : b'task 16'Received reply : b'task 19'Received reply : b'task 22'Received reply : b'task 25'Received reply : b'task 28'Received reply : b'task 31'Received reply : b'task 34'Received reply : b'task 37'Received reply : b'task 40'Received reply : b'task 43'Received reply : b'task 46'Received reply : b'task 49'Received reply : b'task 52'Received reply : b'task 55'Received reply : b'task 58'Received reply : b'task 61'Received reply : b'task 64'Received reply : b'task 67'Received reply : b'task 70'Received reply : b'task 73'Received reply : b'task 76'Received reply : b'task 79'Received reply : b'task 82'Received reply : b'task 85'Received reply : b'task 88'Received reply : b'task 91'Received reply : b'task 94'Received reply : b'task 97'Received reply : b'task 100'

worker2

Received reply : b'task 2'Received reply : b'task 5'Received reply : b'task 8'Received reply : b'task 11'Received reply : b'task 14'Received reply : b'task 17'Received reply : b'task 20'Received reply : b'task 23'Received reply : b'task 26'Received reply : b'task 29'Received reply : b'task 32'Received reply : b'task 35'Received reply : b'task 38'Received reply : b'task 41'Received reply : b'task 44'Received reply : b'task 47'Received reply : b'task 50'Received reply : b'task 53'Received reply : b'task 56'Received reply : b'task 59'Received reply : b'task 62'Received reply : b'task 65'Received reply : b'task 68'Received reply : b'task 71'Received reply : b'task 74'Received reply : b'task 77'Received reply : b'task 80'Received reply : b'task 83'Received reply : b'task 86'Received reply : b'task 89'Received reply : b'task 92'Received reply : b'task 95'Received reply : b'task 98'

worker3

Received reply : b'task 3'Received reply : b'task 6'Received reply : b'task 9'Received reply : b'task 12'Received reply : b'task 15'Received reply : b'task 18'Received reply : b'task 21'Received reply : b'task 24'Received reply : b'task 27'Received reply : b'task 30'Received reply : b'task 33'Received reply : b'task 36'Received reply : b'task 39'Received reply : b'task 42'Received reply : b'task 45'Received reply : b'task 48'Received reply : b'task 51'Received reply : b'task 54'Received reply : b'task 57'Received reply : b'task 60'Received reply : b'task 63'Received reply : b'task 66'Received reply : b'task 69'Received reply : b'task 72'Received reply : b'task 75'Received reply : b'task 78'Received reply : b'task 81'Received reply : b'task 84'Received reply : b'task 87'Received reply : b'task 90'Received reply : b'task 93'Received reply : b'task 96'Received reply : b'task 99'

sink.py

1 # !/usr/bin/python 2 # coding=utf-8 3  4 import time 5 import zmq 6  7 context = zmq.Context() 8 receiver = context.socket(zmq.PULL) 9 receiver.bind("tcp://127.0.0.1:5558")10 11 while True:12     str = receiver.recv()13     print("received: %s" % str)
received: b'task 1'received: b'task 4'received: b'task 7'received: b'task 10'received: b'task 13'received: b'task 16'received: b'task 19'received: b'task 22'received: b'task 25'received: b'task 28'received: b'task 31'received: b'task 34'received: b'task 37'received: b'task 40'received: b'task 43'received: b'task 46'received: b'task 49'received: b'task 52'received: b'task 55'received: b'task 58'received: b'task 61'received: b'task 64'received: b'task 67'received: b'task 70'received: b'task 73'received: b'task 76'received: b'task 79'received: b'task 82'received: b'task 85'received: b'task 88'received: b'task 91'received: b'task 94'received: b'task 97'received: b'task 100'received: b'task 2'received: b'task 5'received: b'task 8'received: b'task 11'received: b'task 14'received: b'task 17'received: b'task 20'received: b'task 23'received: b'task 26'received: b'task 29'received: b'task 32'received: b'task 35'received: b'task 38'received: b'task 41'received: b'task 44'received: b'task 47'received: b'task 50'received: b'task 53'received: b'task 56'received: b'task 59'received: b'task 62'received: b'task 65'received: b'task 68'received: b'task 71'received: b'task 74'received: b'task 77'received: b'task 80'received: b'task 83'received: b'task 86'received: b'task 89'received: b'task 92'received: b'task 95'received: b'task 98'received: b'task 3'received: b'task 6'received: b'task 9'received: b'task 12'received: b'task 15'received: b'task 18'received: b'task 21'received: b'task 24'received: b'task 27'received: b'task 30'received: b'task 33'received: b'task 36'received: b'task 39'received: b'task 42'received: b'task 45'received: b'task 48'received: b'task 51'received: b'task 54'received: b'task 57'received: b'task 60'received: b'task 63'received: b'task 66'received: b'task 69'received: b'task 72'received: b'task 75'received: b'task 78'received: b'task 81'received: b'task 84'received: b'task 87'received: b'task 90'received: b'task 93'received: b'task 96'received: b'task 99'

从上面的输出可以看出,ventilator分配的100个任务被平均分配到了3个worker,最后由sink汇总

四、其他扩展模式

  通常,一个节点,即可以作为 Server,同时也能作为 Client,通过 PipeLine 模型中的 Worker,他向上连接着任务分发,向下连接着结果搜集的 Sink 机器。因此,我们可以借助这种特性,丰富的扩展原有的三种模式。例如,一个代理 Publisher,作为一个内网的 Subscriber 接受信息,同时将信息,转发到外网,其结构图如图 4 所示。

五、多个服务器

ZMQ 和 Socket 的区别在于,前者支持N:M的连接,而后者则只是1:1的连接,那么一个 Client 连接多个 Server 的情况是怎样的呢,我们通过图 5 来说明。

server1.py

1 import zmq 2 context = zmq.Context() 3 socket = context.socket(zmq.REP) 4 socket.bind("tcp://127.0.0.1:5000") 5 6 while True: 7     msg = socket.recv() 8     print "Got", msg 9     socket.send(msg)

server2.py

1 import zmq 2 context = zmq.Context() 3 socket = context.socket(zmq.REP) 4 socket.bind("tcp://127.0.0.1:6000") 5 6 while True: 7     msg = socket.recv() 8     print "Got", msg 9     socket.send(msg)

client.py

1 import zmq  2 context = zmq.Context()  3 socket = context.socket(zmq.REQ)  4 socket.connect("tcp://127.0.0.1:5000")  5 socket.connect("tcp://127.0.0.1:6000")  6   7 for i in range(10):  8      msg = "msg %s" % i  9      socket.send(msg) 10      print "Sending", msg 11      msg_in = socket.recv()

会发现client的请求会被均衡的分配给两个server

Example client output:

 

Sending msg 0 Sending msg 1 Sending msg 2 Sending msg 3 Sending msg 4 Sending msg 5 Sending msg 6 Sending msg 7 Sending msg 8 Sending msg 9

 Example output server 1 at port 5000:

Got msg 0 Got msg 2 Got msg 4 Got msg 6 Got msg 8

Example output server 2 at port 6000:

Got msg 1 Got msg 3 Got msg 5 Got msg 7 Got msg 9

 

转载于:https://www.cnblogs.com/phpfans/p/4056522.html

你可能感兴趣的文章
转Python学习(三)
查看>>
微信支付遇到的坑们
查看>>
wpf *和auto的区别
查看>>
[转]如何成为优秀的程序员
查看>>
unity3d 幻灯片效果实现
查看>>
AFNetworking 进行网络监测
查看>>
iOS获取状态栏和导航栏尺寸(宽度和高度)
查看>>
极光推送
查看>>
openTK学习
查看>>
根据角色获取用户组
查看>>
HTML5之pushstate、popstate操作history,无刷新改变当前url
查看>>
2048游戏:(一)运行效果
查看>>
[转载] 数据库的垂直切分和水平切分
查看>>
ReentrantLock可重入锁的使用场景
查看>>
LOJ#6277. 数列分块入门 1
查看>>
frame外弹出,刷新父页面
查看>>
爬虫一
查看>>
Linux 网络工具详解之 ip tuntap 和 tunctl 创建 tap/tun 设备
查看>>
JavaScript之Array/数组小结
查看>>
证券概念
查看>>