介绍 http://kafka.apache.org
创新互联-专业网站定制、快速模板网站建设、高性价比峰峰矿网站开发、企业建站全套包干低至880元,成熟完善的模板库,直接使用。一站式峰峰矿网站制作公司更省心,省钱,快速模板网站建设找我们,业务覆盖峰峰矿地区。费用合理售后完善,十年实体公司更值得信赖。
kafka是一种高吞吐量的分布式发布订阅消息系统
kafka是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态)
当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线)。
高可靠交付对linkedin的日志不是必须的,故可通过降低可靠性来提高性能,同时通过构建分布式的集群,允许消息在系统中累积,使得kafka同时支持离线和在线日志处理
测试环境
kafka_2.10-0.8.1.1 3个节点做的集群
zookeeper-3.4.5 一个实例节点
代码示例
消息生产者代码示例
- import java.util.Collections;
- import java.util.Date;
- import java.util.Properties;
- import java.util.Random;
- import kafka.javaapi.producer.Producer;
- import kafka.producer.KeyedMessage;
- import kafka.producer.ProducerConfig;
- /**
- * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
- * @author Fung
- *
- */
- public class ProducerDemo {
- public static void main(String[] args) {
- Random rnd = new Random();
- int events=100;
- // 设置配置属性
- Properties props = new Properties();
- props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092");
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- // key.serializer.class默认为serializer.class
- props.put("key.serializer.class", "kafka.serializer.StringEncoder");
- // 可选配置,如果不配置,则使用默认的partitioner
- props.put("partitioner.class", "com.catt.kafka.demo.PartitionerDemo");
- // 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失
- // 值为0,1,-1,可以参考
- // http://kafka.apache.org/08/configuration.html
- props.put("request.required.acks", "1");
- ProducerConfig config = new ProducerConfig(props);
- // 创建producer
- Producer
producer = new Producer (config); - // 产生并发送消息
- long start=System.currentTimeMillis();
- for (long i = 0; i < events; i++) {
- long runtime = new Date().getTime();
- String ip = "192.168.2." + i;//rnd.nextInt(255);
- String msg = runtime + ",www.example.com," + ip;
- //如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0
- KeyedMessage
data = new KeyedMessage ( - "page_visits", ip, msg);
- producer.send(data);
- }
- System.out.println("耗时:" + (System.currentTimeMillis() - start));
- // 关闭producer
- producer.close();
- }
- }
消息消费者代码示例
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Properties;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import kafka.consumer.Consumer;
- import kafka.consumer.ConsumerConfig;
- import kafka.consumer.KafkaStream;
- import kafka.javaapi.consumer.ConsumerConnector;
- /**
- * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
- *
- * @author Fung
- *
- */
- public class ConsumerDemo {
- private final ConsumerConnector consumer;
- private final String topic;
- private ExecutorService executor;
- public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {
- consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));
- this.topic = a_topic;
- }
- public void shutdown() {
- if (consumer != null)
- consumer.shutdown();
- if (executor != null)
- executor.shutdown();
- }
- public void run(int numThreads) {
- Map
topicCountMap = new HashMap (); - topicCountMap.put(topic, new Integer(numThreads));
- Map
>> consumerMap = consumer - .createMessageStreams(topicCountMap);
- List
> streams = consumerMap.get(topic); - // now launch all the threads
- executor = Executors.newFixedThreadPool(numThreads);
- // now create an object to consume the messages
- //
- int threadNumber = 0;
- for (final KafkaStream stream : streams) {
- executor.submit(new ConsumerMsgTask(stream, threadNumber));
- threadNumber++;
- }
- }
- private static ConsumerConfig createConsumerConfig(String a_zookeeper,
- String a_groupId) {
- Properties props = new Properties();
- props.put("zookeeper.connect", a_zookeeper);
- props.put("group.id", a_groupId);
- props.put("zookeeper.session.timeout.ms", "400");
- props.put("zookeeper.sync.time.ms", "200");
- props.put("auto.commit.interval.ms", "1000");
- return new ConsumerConfig(props);
- }
- public static void main(String[] arg) {
- String[] args = { "172.168.63.221:2188", "group-1", "page_visits", "12" };
- String zooKeeper = args[0];
- String groupId = args[1];
- String topic = args[2];
- int threads = Integer.parseInt(args[3]);
- ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic);
- demo.run(threads);
- try {
- Thread.sleep(10000);
- } catch (InterruptedException ie) {
- }
- demo.shutdown();
- }
- }
消息处理类
- import kafka.consumer.ConsumerIterator;
- import kafka.consumer.KafkaStream;
- public class ConsumerMsgTask implements Runnable {
- private KafkaStream m_stream;
- private int m_threadNumber;
- public ConsumerMsgTask(KafkaStream stream, int threadNumber) {
- m_threadNumber = threadNumber;
- m_stream = stream;
- }
- public void run() {
- ConsumerIterator
it = m_stream.iterator(); - while (it.hasNext())
- System.out.println("Thread " + m_threadNumber + ": "
- + new String(it.next().message()));
- System.out.println("Shutting down Thread: " + m_threadNumber);
- }
- }
Partitioner类示例
- import kafka.producer.Partitioner;
- import kafka.utils.VerifiableProperties;
- public class PartitionerDemo implements Partitioner {
- public PartitionerDemo(VerifiableProperties props) {
- }
- @Override
- public int partition(Object obj, int numPartitions) {
- int partition = 0;
- if (obj instanceof String) {
- String key=(String)obj;
- int offset = key.lastIndexOf('.');
- if (offset > 0) {
- partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;
- }
- }else{
- partition = obj.toString().length() % numPartitions;
- }
- return partition;
- }
- }
参考
https://cwiki.apache.org/confluence/display/KAFKA/Index
https://kafka.apache.org/
原文链接:http://my.oschina.net/cloudcoder/blog/299215
新闻名称:KafkaJava客户端代码示例
文章分享:http://www.shufengxianlan.com/qtweb/news18/512868.html
网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联