写在前面:我是「且听风吟」,目前是某上市游戏公司的大数据开发工程师,热爱大数据开源技术,喜欢自己的所学所悟,现阶段正在从头梳理大数据体系的知识,以后将会把时间重点放在Spark和Flink上面。 前面两篇文章,我们对kafka的生产者和消费者进行了java实战。通过逻辑分析辅以代码实战,更加深刻的掌握了kafka同步发送消息和异步发送消息以及手动提交offset和自动提交offset的实现。 其实在实际的生产过程中,光有消息的发送和消费是不够的,我们还需要掌握生产者在发送消息时候的拦截器操作。那么本文咱们就详解kafka的拦截器,并根据案例实现两个常见的自定义的拦截器,并将它们组成拦截链。 注意:我所使用的kafka版本为2.4.1,java版本为1.8,本文会对一些新老版本的改动地方加以说明,同时本文的拦截器demo也会开源到csdn和github。 kafka的拦截器指的是Producer拦截器(interceptor),主要是为了实现clients端的自定义化逻辑控制,是在Kafka 0.10版本被引入的。 想要实现拦截器,我们需要先实现ProducerInterceptor接口 有几点是使用拦截器的时候需要特别注意的: 见到onsend方法就要注意topic和分区,一般不进行修改; 见到onAcknowledgement方法就要注意里面的逻辑不要太复杂,以免影响消息发送效率。 由于拦截器可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外如果指定了多个拦截器,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而不是向上传递,大家在使用过程中要特别留意。 现在有一个需求:想要用拦截器实现producer发送数据后,这部分数据都带上时间戳,并且在producer发送完成后,统计到有多少数据发送成功了,有多少数据发送失败了。 需求解析: 这个需求主要考察对拦截器的基础掌握程度,比如给数据加上时间戳,这个功能可以在拦截器里的 需求实现: 通过上面的需求解析,我们知道这个需求可以用一个拦截器来实现,但是为了展示拦截器链的关联使用,这里我们给它分成两个拦截器来实现。拦截器1实现增加时间戳的功能,拦截器2实现消息发送情况统计次数的功能。 大致如下图所示: 想要把发送的数据都带上时间戳,其实很简单,我们只需要实现 ProducerInterceptor接口,然后按照我们的需求重写其中的onSend方法就可以了,别的方法不用动。 完整代码如下: 想要实现统计发送消息的成功次数和失败次数,我们就需要在 然后我们只需要定义两个变量,并根据Exception的值分别累加就可以统计到了。注意,我们还需要在close方法里输出两个变量的值,这样当producer发送数据结束并close后,会自动调用拦截器的close方法来输出咱们想要统计的成功和失败次数。 注意:拦截器链里的拦截器是按照顺序组成的,因此我们要注意前后拦截器对彼此的影响,比如这里拦截器2的onsend方法不能返回null,要把拦截器1返回来的record原封不动的返回才可以。 完整代码如下: 两个拦截器已经写好了,剩下的就是需要在producer代码里组成拦截器链并使用了。使用java自定义producer的方法我们已经介绍的很详细了,感兴趣的可以看本系列第一篇文章:kafka实战篇(一):Producer消息发送实战 我们可以在原有producer代码的基础上,把连接器加到配置参数里来实现这个功能。新增配置参数代码如下: 更改后的完整producer代码如下: 首先在kafka启动一个命令行消费者: 然后启动producer代码,观察数据: 我们对kafka的拦截器进行了原理分析和代码实战,通过一个常见的案例解释了拦截器是怎么工作的,并使用两个拦截器所组成的一个完整的拦截器链来实现了这个案例。不知道对大家在kafka拦截器方面有没有帮助呢?有什么问题欢迎讨论。 完整的代码已上传,感兴趣的可以下载查看。 如果您对我的文章感兴趣,欢迎关注,如果您有疑惑或发现文中有不对的地方,还请不吝赐教,非常感谢!!
如果你也对大数据感兴趣,希望在这个行业一展拳脚。欢迎关注我,我们一起学习。博客地址:https://ropledata.blog.csdn.net
博客的名字来源于:且听风吟,静待花开。也符合我对技术的看法,想要真正掌握一门技术就需要厚积薄发的毅力,同时保持乐观的心态。
你只管努力,剩下的交给时间!文章目录
一、前言
二、拦截器原理
2.1、拦截器是什么?
2.2、怎么实现拦截器?
org.apache.kafka.clients.producer.ProducerInterceptor
,关于ProducerInterceptor接口,我们需要掌握如下这些方法:
configure(configs
)方法:用于获取配置信息和初始化数据。onSend(ProducerRecord)
方法:该方法封装进KafkaProducer.send方法中,即它运行在用户的主线程(main线程)中。Producer确保在消息被序列化以及计算分区前调用该方法。注意:用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。onAcknowledgement(RecordMetadata, Exception)
方法:该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用。并且通常都是在producer回调逻辑触发之前。注意:onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很复杂的逻辑,否则会拖慢producer的消息发送效率。close
方法:可以关闭拦截器,主要用于执行一些资源清理工作。2.3、特别注意
三、拦截器案例实战
3.1、案例分析
onSend(ProducerRecord)
方法中实现;统计消息发送的成功和失败次数,这个可以在拦截器里的onAcknowledgement(RecordMetadata, Exception)
方法里来实现。
3.2、实现拦截器1(增加时间戳)
import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; /** * @author ropleData * 博客地址:https://ropledata.blog.csdn.net * 专注大数据领域,欢迎访问 */ public class TimeInterceptor implements ProducerInterceptor<String, String> { @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return new ProducerRecord<String, String>(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + record.value(), record.headers()); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
3.3、实现拦截器2(发送消息情况统计)
onAcknowledgement(RecordMetadata, Exception)
里面,根据消息发送后返回的异常信息来判断是否发送成功。一般异常如果为空就说明发送成功了,反之就说明发送失败了。import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; /** * @author ropleData * 博客地址:https://ropledata.blog.csdn.net * 专注大数据领域,欢迎访问 */ public class CounterInterceptor implements ProducerInterceptor<String, String> { private long successCount = 0L; private long errorCount = 0L; @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if (exception == null) { successCount++; } else { errorCount++; } } @Override public void close() { System.out.println("成功次数=" + successCount); System.out.println("失败次数=" + errorCount); } @Override public void configure(Map<String, ?> configs) { } }
3.4、在producer程序里组成拦截链
ArrayList<String> interceptors = new ArrayList<>(); interceptors.add("TimeInterceptor");//注意:这里是拦截器的全类名,如果有包要加上报名 interceptors.add("TimeInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.ArrayList; import java.util.List; import java.util.Properties; /** * @author ropleData * 博客地址:https://ropledata.blog.csdn.net * 专注大数据领域,欢迎访问 */ public class KafkaProducerDemo { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); ArrayList<String> interceptors = new ArrayList<>(); interceptors.add("TimeInterceptor"); //注意:这里是拦截器的全类名,如果有包要加上报名 interceptors.add("CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); KafkaProducer<String, String> producer = new KafkaProducer<>(props); //发送消息 for (int i = 0; i < 100; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("testInterceptor", "message" + i); producer.send(record); } //一定要关闭producer,这样才会调用interceptor的close方法 producer.close(); } }
3.5、测试
bin/kafka-console-consumer.sh --bootstrap-server kafka-1:9092 --topic testInterceptor --from-beginning
四、总结
csdn:https://download.csdn.net/download/qq_26803795/12363976
github:https://github.com/ropleData/KafkaInterceptorDemo
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算