发布与订阅模型在许多编程语言中都有实现,也就是我们经常说的设计模式中的一种–观察者模式。在一些应用场合,例如发送方并不是以固定频率发送消息,如果接收方频繁去咨询发送方,这种操作无疑是很麻烦并且不友好的。
随着业务复杂, 业务的项目依赖关系增强, 使用消息队列帮助系统降低耦合度
.
“
Redis中的订阅发布模式, 当没有订阅者时, 消息会被直接丢弃(Redis不会持久化保存消息)
”
生产者使用Redis中的list数据结构进行实现, 将待处理的消息塞入到消息队列中.
class Producer(object):
def __init__(self, host="localhost", port=6379):
self._conn = redis.StrictRedis(host=host, port=port)
self.key = "test_key"
self.value = "test_value_{id}"
def produce(self):
for id in xrange(5):
msg = self.value.format(id=id)
self._conn.lpush(self.key, msg)
消费者使用redis中brpop
进行实现, brpop会从list头部消息, 并能够设置超时等待时间.
class Consumer(object):
def __init__(self, host="localhost", port=6379):
self._conn = redis.StrictRedis(host=host, port=port)
self.key = "test_key"
def consume(self, timeout=0):
# timeout=0 表示会无线阻塞, 直到获得消息
while True:
msg = self._conn.brpop(self.key, timeout=timeout)
process(msg)
def process(msg):
print msg
if __name__ == '__main__':
consumer = Consumer()
consumer.consume()
# 输出结果
('test_key', 'test_value_1')
('test_key', 'test_value_2')
('test_key', 'test_value_3')
('test_key', 'test_value_4')
('test_key', 'test_value_5')
在Redis Pubsub中, 一个频道(channel)相当于一个消息队列
class Publisher(object):
def __init__(self, host, port):
self._conn = redis.StrictRedis(host=host, port=port)
self.channel = "test_channel"
self.value = "test_value_{id}"
def pub(self):
for id in xrange(5):
msg = self.value.format(id=id)
self._conn.publish(self.channel, msg)
其中get_message
使用了select
IO多路复用来检查socket连接是否是否可读.
class Subscriber(object):
def __init__(self, host="localhost", port=6379):
self._conn = redis.StrictRedis(host=host, port=port)
self._pubsub = self._conn.pubsub() # 生成pubsub对象
self.channel = "test_channel"
self._pubsub.subscribe(self.channel)
def sub(self):
while True:
msg = self._pubsub.get_message()
if msg and isinstance(msg.get("data"), basestring):
process(msg.get("data"))
def close(self):
self._pubsub.close()
# 输出结果
test_value_1
test_value_2
test_value_3
test_value_4
test_value_
在Jedis中订阅方处理是采用同步的方式, 看源码中PubSub模块的process函数
在do-while
循环中, 会等到当前消息处理完毕才能够处理下一条消息, 这样会导致当入队列消息量过大的时候, redis链接被强制关闭.
解决方案: 将整个处理函数改为异步的方式.
文章题目:讲解一下Redis中的订阅发布模式
转载源于:http://www.shufengxianlan.com/qtweb/news26/420826.html
网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联