利用redis 的发布订阅模式,可以实现功能更强大的消息队列,使用list数据结构实现的消息队列,消息只能被消费一次,但实践中,多个消费者希望都能够消费队列里的所有数据,这种情形下,就可以使用发布定于模式。
发布订阅模式由发布者,Channel,订阅者构成, 发布者负责发布消息到队列中,所有订阅了这个Channel的订阅者都能够收到发布者所发布的消息。
先来看发布者代码
import time
from redis.client import Redis
r = Redis(host='127.0.0.1', port=6379, db=0, password='198671724zds')
def publish():
for i in range(10):
r.publish('int_channel', i)
time.sleep(1)
if __name__ == '__main__':
publish()
发布者每隔1秒钟发布一条消息
import time
from redis.client import Redis
r = Redis(host='127.0.0.1', port=6379, db=0, password='198671724zds')
def sub():
pub = r.pubsub() # 返回发布订阅对象,通过这个对象你能1)订阅频道 2)监听频道中的消息
pub.subscribe('int_channel') # 订阅一个channel
msg_stream = pub.listen() # 监听消息
for msg in msg_stream:
print(msg)
if msg["type"] == "message":
print(str(msg["channel"], encoding="utf-8") + ":" + str(msg["data"], encoding="utf-8"))
elif msg["type"] == "subscribe":
print(str(msg["channel"], encoding="utf-8"), '订阅成功')
if __name__ == '__main__':
sub()
你可以启动多个发布者和订阅者, 订阅者启动后,会立即收到一条消息
{'type': 'subscribe', 'pattern': None, 'channel': b'int_channel', 'data': 1}
这条消息表示订阅成功。
除了使用listen方法进行监听消息外,还可以使用parse_response 方法
import time
from redis.client import Redis
r = Redis(host='127.0.0.1', port=6379, db=0, password='198671724zds')
def sub():
pub = r.pubsub() # 返回发布订阅对象,通过这个对象你能1)订阅频道 2)监听频道中的消息
pub.subscribe('int_channel') # 订阅一个channel
while True:
msg = pub.parse_response()
print(msg)
if __name__ == '__main__':
sub()
启动订阅者之后,会立即收到一条消息
[b'subscribe', b'int_channel', 1]
这个列表里,第一个元素表示消息的类型,第二个元素表示订阅的channel,当发布者开始发布消息时,得到的消息是下面的样子
[b'message', b'int_channel', b'0']
[b'message', b'int_channel', b'1']
消息的类型是message, 第三个元素是具体的消息
QQ交流群: 211426309