rocketmq4.9.6版本把filtersrv删除是出于什么目的呢?

在 RocketMQ 4.9.6 版本中,filtersrv 被移除了,这一变化是出于以下几个目的和考虑:

我们提供的服务有:做网站、成都网站建设、微信公众号开发、网站优化、网站认证、康县ssl等。为1000多家企事业单位解决了网站和推广的问题。提供周到的售前咨询和贴心的售后服务,是有科学管理、有技术的康县网站制作公司

1、架构优化:RocketMQ 作为一个高性能、分布式的消息中间件,其架构设计和实现一直在不断演进和优化。filtersrv 的移除是其中一部分,旨在简化架构,提高系统的可维护性和扩展性。

2、功能整合filtersrv 主要用于消息过滤,但在新版本中,这部分功能被整合到了其他组件中,Broker 或 NameServer,这样的整合有助于减少组件数量,降低系统复杂性。

3、性能提升:通过移除 filtersrv,可以减少消息传递的环节,从而降低延迟,提高整体性能。

4、资源节省filtersrv 作为额外的服务进程,会占用一定的系统资源,移除后,可以节省这部分资源,降低部署和维护成本。

5、易用性增强:对于用户来说,filtersrv 的存在可能增加了部署和维护的复杂性,移除后,用户可以更加便捷地进行部署和使用。

接下来,我们将详细探讨如何在移除 filtersrv 后,使用 RocketMQ 进行消息过滤。

消息过滤机制

在 RocketMQ 中,消息过滤主要通过两种方式实现:

1、消费端过滤:消费者在订阅主题时,可以通过设置过滤条件(如 SQL 表达式),来选择性地接收消息,这种方式适用于消费者对特定类型或属性的消息感兴趣,而不想处理全部消息的场景。

2、生产者端过滤:生产者在发送消息时,可以设置消息的属性(如键值对),这些属性可以被用于后续的消息筛选和处理。

示例代码

以下是一个简单的示例,展示了如何在移除 filtersrv 后,使用 RocketMQ 进行消息过滤。

生产者端

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        // 设置 NameServer 地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动生产者
        producer.start();
        // 创建消息实例
        Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
        // 设置消息属性
        msg.setKeys("KeyA");
        // 发送消息
        producer.send(msg);
        // 关闭生产者
        producer.shutdown();
    }
}

消费者端

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
    public static void main(String[] args) throws Exception {
        // 创建消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
        // 设置 NameServer 地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 订阅主题
        consumer.subscribe("TopicTest", "*");
        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

在上述示例中,生产者发送了一条带有属性 KeyA 的消息,消费者可以基于这些属性进行过滤,只消费满足特定条件的消息。

网站栏目:rocketmq4.9.6版本把filtersrv删除是出于什么目的呢?
当前URL:http://www.shufengxianlan.com/qtweb/news34/403184.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联