python使用redis实现发布订阅模式

利用redis 的发布订阅模式,可以实现功能更强大的消息队列,使用list数据结构实现的消息队列,消息只能被消费一次,但实践中,多个消费者希望都能够消费队列里的所有数据,这种情形下,就可以使用发布定于模式。

发布订阅模式由发布者,Channel,订阅者构成, 发布者负责发布消息到队列中,所有订阅了这个Channel的订阅者都能够收到发布者所发布的消息。

1. 发布者代码

先来看发布者代码

import time
from redis.client import Redis

r = Redis(host='127.0.0.1', port=6379, db=0, password='198671724zds')

def publish():
    for i in range(10):
        r.publish('int_channel', i)
        time.sleep(1)

if __name__ == '__main__':
    publish()

发布者每隔1秒钟发布一条消息

2. 订阅者代码

import time
from redis.client import Redis

r = Redis(host='127.0.0.1', port=6379, db=0, password='198671724zds')

def sub():
    pub = r.pubsub()        # 返回发布订阅对象,通过这个对象你能1)订阅频道 2)监听频道中的消息
    pub.subscribe('int_channel')        # 订阅一个channel
    msg_stream = pub.listen()      # 监听消息
    for msg in msg_stream:
        print(msg)
        if msg["type"] == "message":
            print(str(msg["channel"], encoding="utf-8") + ":" + str(msg["data"], encoding="utf-8"))
        elif msg["type"] == "subscribe":
            print(str(msg["channel"], encoding="utf-8"), '订阅成功')


if __name__ == '__main__':
    sub()

你可以启动多个发布者和订阅者, 订阅者启动后,会立即收到一条消息

{'type': 'subscribe', 'pattern': None, 'channel': b'int_channel', 'data': 1}

这条消息表示订阅成功。

  • 通过subscribe 订阅一个频道
  • 使用listen方法监听频道,一旦有消息被发布,就能立即获取
  • redis不会保存已经被消费的数据,因此,新键入的订阅者不能收到在它订阅前已经发布的消息

除了使用listen方法进行监听消息外,还可以使用parse_response 方法

import time
from redis.client import Redis

r = Redis(host='127.0.0.1', port=6379, db=0, password='198671724zds')

def sub():
    pub = r.pubsub()        # 返回发布订阅对象,通过这个对象你能1)订阅频道 2)监听频道中的消息
    pub.subscribe('int_channel')        # 订阅一个channel
    while True:
        msg = pub.parse_response()
        print(msg)


if __name__ == '__main__':
    sub()

启动订阅者之后,会立即收到一条消息

[b'subscribe', b'int_channel', 1]

这个列表里,第一个元素表示消息的类型,第二个元素表示订阅的channel,当发布者开始发布消息时,得到的消息是下面的样子

[b'message', b'int_channel', b'0']
[b'message', b'int_channel', b'1']

消息的类型是message, 第三个元素是具体的消息

扫描关注, 与我技术互动

QQ交流群: 211426309

加入知识星球, 每天收获更多精彩内容

分享日常研究的python技术和遇到的问题及解决方案