博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka练习
阅读量:7257 次
发布时间:2019-06-29

本文共 8501 字,大约阅读时间需要 28 分钟。

package com.ocean.kafka;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import javax.swing.plaf.multi.MultiButtonUI;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;public class MennuCommitConsumer {    private Properties properties = new Properties();    private KafkaConsumer
consumer; public MennuCommitConsumer() { properties.setProperty("bootstrap.servers", "master:9092"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("group.id", "java_group"); // properties.setProperty("auto.offset.reset", "null"); properties.setProperty("enable.auto.commit", "false"); consumer = new KafkaConsumer
(properties); } public void subscirbleTopc() { List
topics = new ArrayList
(); topics.add("b"); topics.add("from-java"); consumer.subscribe(topics); while (true) { ConsumerRecords
records = consumer.poll(1000); for (ConsumerRecord
record : records) { System.out.println("partition:" + record.partition() + "offset:" + record.offset() + "key:" + record.key() + "value:" + record.value()); } // consumer.commitSync(); // 这句话是为了提交数据 如果不写 则会在下次启动时 还会出现 } } public void getOffset() { OffsetAndMetadata offsets = consumer.committed(new TopicPartition("b", 0)); System.out.println("offsets:" + offsets.offset()); } // 制定分区消费 指定从offset的值出开始消费 // 对消费着topic的消费指定有两种方式 // 1.consumer.subscribe(topics); // 2.consumer.assign(topicPartitions); public void sonsumerAssigned() { // List
topics= new ArrayList
(); // topics.add("b"); // consumer.subscribe(topics); // 指定分区 List
topicPartitions = new ArrayList
(); topicPartitions.add(new TopicPartition("from-java", 0)); consumer.assign(topicPartitions); // 指定分区的offset分区的位置 consumer.seek(new TopicPartition("from-java", 0), 21); while (true) { ConsumerRecords
records = consumer.poll(1000); for (ConsumerRecord
record : records) { System.out.println( "partition:" + record.partition() + "offset:" + record.offset() + "value:" + record.value()); } } } public void setCommentOffset() { Map
offsets = new HashMap
(); offsets.put(new TopicPartition("from_java", 0), new OffsetAndMetadata(0)); List
topics = new ArrayList
(); topics.add("from_java"); consumer.subscribe(topics); // 指定位置提交某个分区的offsets的值 这会在下一次拉取数据前生效 consumer.commitSync(offsets); while (true) { ConsumerRecords
records = consumer.poll(1000); for (ConsumerRecord
record : records) { if (record.partition() == 0) { System.out.println("partition:" + record.partition() + "offset:" + record.offset() + "value:" + record.value()); } } } } public void exactlyOnceConSumer(){ //1.配置上参数 properties.setProperty("enable.auto.commmit", "false"); //2.订阅主题或者分区 //consumer.subscribe(topics); //重设offset (offset)的值需要从mysql中获取 //3.从mysql中获取 //4.1 consumer.commitSync(offsets); //提交到kafka服务器中 //或者使用 //4.2 consumer.seek(new TopicPartition("from-java",0),0); //来指定要从kafka中高消费数据的初始值位置 //订阅主题或分区 //consumer.subscribe(topics); //5. poll数据// recordes =consumer.pool(1000) //6. 遍历参数值分析计算 //7.计算结束之后使用consumer.committed(new TopicPartition("from-java",1)) //获取当前消费的offset值 //8.把计算结果和offset值 以原子操作(事物)的形式保存到mysql数据库 //9.重新调到第五步循环执行 进行下一次pool和下一次计算 } public static void main(String[] args) { MennuCommitConsumer mennuCommitConsumer = new MennuCommitConsumer(); // mennuCommitConsumer.subscirbleTopc(); // mennuCommitConsumer.getOffset(); mennuCommitConsumer.sonsumerAssigned(); mennuCommitConsumer.setCommentOffset(); }}
package com.ocean.kafka;import java.util.ArrayList;import java.util.List;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.log4j.pattern.PropertiesPatternConverter;public class ProducerConsumer {    private Properties properties = new Properties();    private KafkaConsumer
consumer; public ProducerConsumer() { properties = new Properties(); properties.put("bootstrap.servers", "master:9092"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("group.id", "java-group"); consumer = new KafkaConsumer
(properties); } public void subscribeTopic() { List
topics = new ArrayList
(); topics.add("home-work_pic"); consumer.subscribe(topics); // 循环从kafka中拉取数据 while (true) { // 从kafka中拉取数据 ConsumerRecords
records = consumer.poll(1000); for (ConsumerRecord
record : records) { System.out.println("接收信息:partition" + record.partition() + "offset:" + record.offset() + "key:" + record.key() + "value:" + record.value()); } } } public static void main(String[] args) { ProducerConsumer producerConsumer = new ProducerConsumer(); producerConsumer.subscribeTopic(); }}
package com.ocean.kafka;import java.util.List;import java.util.Map;import java.util.Properties;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.apache.kafka.common.Metric;import org.apache.kafka.common.MetricName;import org.apache.kafka.common.PartitionInfo;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class ProducerKafka {    private KafkaProducer
producer; private Properties properties; public ProducerKafka() { properties=new Properties(); properties.put("bootstrap.servers", "master:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // properties.put("acks", "all");// properties.put("retries", 0);// producer=new KafkaProducer
(properties); } public void assignPartitionSend(String key,String value){ ProducerRecord
record =new ProducerRecord
("from-java", 0,key,value); producer.send(record); } public void sendRecorder(String key,String value){ Logger logger=LoggerFactory.getLogger(ProducerKafka.class); ProducerRecord
record =new ProducerRecord
("from-java", key,value); producer.send(record); } public void getTopicPartitions(String topic){ Logger logger=LoggerFactory.getLogger(ProducerKafka.class);// ProducerRecord
record =new ProducerRecord
("from-java", key,value); List
partitionInfos =producer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionInfos) { System.out.println(partitionInfo); } } public void getMetrics(){ @SuppressWarnings("unchecked") Map
metrics =(Map
) producer.metrics(); for (MetricName name : metrics.keySet()) { System.out.println(name.name()+":"+metrics.get(name).value()); } } public void sendRecorderWithCallback(String key,String value){ final Logger logger=LoggerFactory.getLogger(ProducerKafka.class); ProducerRecord
record =new ProducerRecord
("from-java",key,value); Callback callback=new Callback() { //回掉方法 public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception==null){ logger.info("存储位置:partition:"+metadata.partition()+",offset:"+metadata.offset()+",ts:"+metadata.timestamp()); }else{ logger.warn("服务端出现异常"); exception.printStackTrace(); } } }; producer.send(record,callback); } public void close(){ producer.flush(); producer.close(); } public static void main(String[] args) { ProducerKafka client =new ProducerKafka(); for(int i=0;i<100;i++){ client.sendRecorderWithCallback("Ckey"+i, "Cvalue"+i); }// client.getMetrics(); client.close(); }}

转载地址:http://qevdm.baihongyu.com/

你可能感兴趣的文章