在 RocketMQ 4.9.6 版本中,filtersrv
被移除了,这一变化是出于以下几个目的和考虑:
我们提供的服务有:做网站、成都网站建设、微信公众号开发、网站优化、网站认证、康县ssl等。为1000多家企事业单位解决了网站和推广的问题。提供周到的售前咨询和贴心的售后服务,是有科学管理、有技术的康县网站制作公司
1、架构优化:RocketMQ 作为一个高性能、分布式的消息中间件,其架构设计和实现一直在不断演进和优化。filtersrv
的移除是其中一部分,旨在简化架构,提高系统的可维护性和扩展性。
2、功能整合:filtersrv
主要用于消息过滤,但在新版本中,这部分功能被整合到了其他组件中,Broker 或 NameServer,这样的整合有助于减少组件数量,降低系统复杂性。
3、性能提升:通过移除 filtersrv
,可以减少消息传递的环节,从而降低延迟,提高整体性能。
4、资源节省:filtersrv
作为额外的服务进程,会占用一定的系统资源,移除后,可以节省这部分资源,降低部署和维护成本。
5、易用性增强:对于用户来说,filtersrv
的存在可能增加了部署和维护的复杂性,移除后,用户可以更加便捷地进行部署和使用。
接下来,我们将详细探讨如何在移除 filtersrv
后,使用 RocketMQ 进行消息过滤。
消息过滤机制
在 RocketMQ 中,消息过滤主要通过两种方式实现:
1、消费端过滤:消费者在订阅主题时,可以通过设置过滤条件(如 SQL 表达式),来选择性地接收消息,这种方式适用于消费者对特定类型或属性的消息感兴趣,而不想处理全部消息的场景。
2、生产者端过滤:生产者在发送消息时,可以设置消息的属性(如键值对),这些属性可以被用于后续的消息筛选和处理。
示例代码
以下是一个简单的示例,展示了如何在移除 filtersrv
后,使用 RocketMQ 进行消息过滤。
生产者端
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) throws Exception { // 创建生产者实例 DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); // 设置 NameServer 地址 producer.setNamesrvAddr("127.0.0.1:9876"); // 启动生产者 producer.start(); // 创建消息实例 Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes()); // 设置消息属性 msg.setKeys("KeyA"); // 发送消息 producer.send(msg); // 关闭生产者 producer.shutdown(); } }
消费者端
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class Consumer { public static void main(String[] args) throws Exception { // 创建消费者实例 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); // 设置 NameServer 地址 consumer.setNamesrvAddr("127.0.0.1:9876"); // 订阅主题 consumer.subscribe("TopicTest", "*"); // 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); System.out.printf("Consumer Started.%n"); } }
在上述示例中,生产者发送了一条带有属性 KeyA
的消息,消费者可以基于这些属性进行过滤,只消费满足特定条件的消息。
网站栏目:rocketmq4.9.6版本把filtersrv删除是出于什么目的呢?
当前URL:http://www.shufengxianlan.com/qtweb/news34/403184.html
网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联