1.构建配置文件 首先第一步是进行用户鉴权的操作(如果Kafka集群需要的话) 然后需要配置的三个主要参数 2.构建生产者发送消息 首先我们应该杜绝 fire and forget的操作这样可能会导致消息丢失。 具体的操作见代码注释,解释的很详细。 其实多线程Producer的代码与普通的构造方式一样,只需要弄明白两点 关于这两种方案各有利弊,对比如下: **一条消息是如何通过Producer发送到Kafka的Broker上的呢?发送过程又有哪些历程呢?**接下来会一一讲解。 传统部署方案浪费资源。 k8s更加充分利用服务器资源。 项目完全部署在云端,才能更加充分的利用服务器资源。
1.Producer简介
Kafka Producer 就是负责向Kafka 服务端,写入数据的程序。 Kafka 支持多种Producer库,主流的编程语言都覆盖到了。但是除了Java其他的语言都是由非Apach Kafka社区的人进行维护的。有其他语言的客户端需求,可以去这个网址去下载相应的信息。
2.构造Producer
2.1简单构造
Properties properties = new Properties(); if(enableAuthentication) { //Authentication 使用 SASL/SCRAM 认证方式 开启认证配置 InputStream in = ProducerConfig.class.getClassLoader().getResourceAsStream("producer.properties"); try { properties.load(in); } catch (IOException e) { //todo handler produce.properties not exist e.printStackTrace(); } } //一下三个参数都是没有默认值的,必须指定 properties.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); properties.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); properties.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
/** * 常见消息发送异常 * 1.可重试异常(重试的最大次数 见配置文件的配置 如果在规定的重试次数之内 发送成功 异常就不会出现在 回调的异常中) * a.LeaderNotAvailableException Leader 分区不可用,出现在Leader 换届选举的过程中 * b.NotControllerException controller 不可用,controller 在经历新的一轮选举 * c.NetworkException 网络瞬间故障异常 * 2.不可重试异常 * a.RecordTooLargeException 发送消息的大小超过了我们配置的消息大小的上限 * b.SerializationException 序列化失败异常 * c.KafkaException 其他类型的异常 */ /** * 同步发送消息 * * @param producer */ @Override public void sendMessage(Producer producer) { for (int i = 0; i < 500; i++) { try { //加get就是消息同步发送 但是效率偏低 RecordMetadata recordMetadata = (RecordMetadata) producer.send(new ProducerRecord(topic, "KEY","this is a message!!!!!!" + i)).get(); System.out.println(recordMetadata); } catch (ExecutionException e) { //todo 处理操作消息异常 System.out.println("Error in sending record"); System.out.println(e); } catch (InterruptedException e) { //处理 超时异常 System.out.println("Error in sending record"); System.out.println(e); } } producer.close(); }
/** * 异步发送消息 有回调函数,如果发送失败可以进行补偿操作 * * @param producer */ @Override public void sendMessageCallBack(Producer producer, ProducerMessage producerMessage) { if (producerMessage == null) { return; } else if (StringUtils.isEmpty(producerMessage.getKey())){ producer.send(new ProducerRecord(topic, producerMessage.getValue()), new Callback() { /** * 这两个参数必然有一个不为空 * @param recordMetadata 消息发送后返回的元数据信息 包括消息存入的分区 位移等等 * @param e */ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { //todo 消息发送成功逻辑 } else { if (e instanceof RetriableException) { //todo 处理可重试异常 } else { //todo 处理不可重试异常 } } } }); }else { producer.send(new ProducerRecord(topic, producerMessage.getKey(), producerMessage.getValue()), new Callback() { /** * 这两个参数必然有一个不为空 * @param recordMetadata 消息发送后返回的元数据信息 包括消息存入的分区 位移等等 * @param e */ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { //todo 消息发送成功逻辑 } else { if (e instanceof RetriableException) { //todo 处理可重试异常 } else { //todo 处理不可重试异常 } } } }); } producer.close(); }
2.多线程Producer
模式
说明
优势
劣势
使用场景
多线程单KafkaProducer
多个线程共享一个实例
简单,性能好
1.所有的线程共享一个缓冲区需要设置较大值。
2.一旦一个线程崩溃,kennel会导致其他的线程也不可用
分区不多的kafka集群
多线程多KafkaProducer
线程独享自己的实例
1.单个KafkaProducer崩溃,不会影响其他的线程。
2.每个线程拥有自己的Producer、缓冲区,以及一组响应的配置参数,可以进行更加细粒度的调优。
需要分配较大的内存
分区较多的kafka集群
3.producer 发送消息过程
4.partinoner
5.无消息丢失配置
6.Kafka如何实现Exactly once
7.Producer其他
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算