首页
归档
友链
关于
Search
1
网易云音乐黑胶会员月月免费赠送
2,970 阅读
2
十年之约RSS聚合订阅服务上线
2,532 阅读
3
工资发放日的区别
2,215 阅读
4
rsyslogd内存占用过高解决方案
2,168 阅读
5
Nginx反代MinIO后,上传文件签名异常
2,071 阅读
零碎
标本
码海
工具
其他
登录
Search
标签搜索
北京
摄影
Java
旅行
生活
学习笔记
教程
Linux
服务器
软件
SpringBoot
日记
Windows
服务
数据库
福利
Spring
系统
SQL
前端
萧瑟
累计撰写
190
篇文章
累计收到
1,268
条评论
首页
栏目
零碎
标本
码海
工具
其他
页面
归档
友链
关于
搜索到
1
篇与
Kafka
的结果
2023-08-29
SpringBoot 对接华为大数据平台kerberos认证的Kafka数据
最近项目中有用到kafka来同步某业务核心数据,通过kafka来进行数据的实时更新。原本使用的是Apache kafka 来进行订阅消费,后因业务方信创要求,更换了华为大数据平台的Kafka数据源,采用kerberos认证。对接了一天多,数据才接入成功,网上相关文档比较少,这里我总结一下,为后人少踩坑。本次使用的是华为MRS3.1.2版本,其他版本应该都类似。修改项目中的pom.xml依赖,将默认的apache的kafka-client包替换为华为自带的。如果拉去不到相关依赖包,请更换为华为Maven镜像,配置信息如下。<repositories> <repository> <id>huawei-cloud-sdk</id> <name>HuaWei Cloud Mirrors</name> <url>https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>huawei-mirrors</id> <name>HuaWei Mirrors</name> <url>https://repo.huaweicloud.com/repository/maven/</url> </repository> </repositories>我们需要在配置文件中增加新的配置文件,不要使用spring.kafka相关配置,建议自己新增一个配置信息,本文以huawei.mrs.kafka为例。huawei: mrs: kafka: enable: false bootstrap-servers: 10.244.231.2:21007,10.244.230.202:21007,10.244.230.125:21007 security: protocol: SASL_PLAINTEXT kerberos: domain: name: hadoop.hadoop_651_arm.com sasl: kerberos: service: name: kafka新建一个配置文件HuaWeiMrsKafkaConfiguration,配置相关信息@Value("${huawei.mrs.kafka.enable}") public Boolean enable; @Value("${huawei.mrs.kafka.bootstrap-servers}") public String boostrapServers; @Value("${huawei.mrs.kafka.security.protocol}") public String securityProtocol; @Value("${huawei.mrs.kafka.kerberos.domain.name}") public String kerberosDomainName; @Value("${huawei.mrs.kafka.sasl.kerberos.service.name}") public String kerberosServiceName; @Bean public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<String, String> template) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); //禁止消费者自启动,达到动态启动消费者的目的 factory.setAutoStartup(enable); return factory; } @Bean public ConsumerFactory<Object, Object> consumerFactory() { Map<String, Object> configs = new HashMap<>(); configs.put("security.protocol", securityProtocol); configs.put("kerberos.domain.name", kerberosDomainName); configs.put("bootstrap.servers", boostrapServers); configs.put("sasl.kerberos.service.name", kerberosServiceName); configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return new DefaultKafkaConsumerFactory<>(configs); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { Map<String, Object> configs = new HashMap<>(); configs.put("security.protocol", securityProtocol); configs.put("kerberos.domain.name", kerberosDomainName); configs.put("bootstrap.servers", boostrapServers); configs.put("sasl.kerberos.service.name", kerberosServiceName); configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configs); return new KafkaTemplate<>(producerFactory); } @Bean public RecordMessageConverter converter() { return new StringJsonMessageConverter(); }再新建一个全局消费kafka数据的KafkaConsumer类@Slf4j @Component public class KafkaConsumer { @KafkaListener(topics = "topic1") public void topicMessage(ConsumerRecord<?, ?> record) { log.info("Kafka->topic:‘topic1’-->{}", record.value()); //TODO:业务逻辑 } }配置kerberos信息,也是非常重要的一步,配置kerberos认证文件,本文将以最简单的教程为例,在启动脚本中配置。为什么不在项目中配置呢?因为各位的项目都各自不同,放在resources目录中,有的时候会读取不到,所以我们以最简单的例子来作为讲解。打包项目,将打包好的jar放置在服务器中。并将下载安全集群认证用户的krb5.conf和user.keytab文件放置跟jar包相同目录或者自定义一个目录(本文以/data/java/huawei-mrs-kafka为例)。在这个文件夹或者部署目录通缉新建一个jaas.conf文件,将keyTab项修改为绝对路径。principal则是华为大数据平台提供的账号。Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/data/java/huawei-mrs-kafka/user.keytab" principal="developuser@HADOOP_651_ARM.COM" useTicketCache=false storeKey=true debug=true; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/data/java/huawei-mrs-kafka/user.keytab" principal="developuser@HADOOP_651_ARM.COM" useTicketCache=false storeKey=true debug=true; }; 最后一步,运行jar包,指定krb5.conf和jaas.conf环境变量。java -jar -Djava.security.krb5.conf=/data/java/huawei-mrs-kafka/krb5.conf -Djava.security.auth.login.config=/data/java/huawei-mrs-kafka/jaas.conf huawei-mrs-kafka-1.0-SNAPSHOT.jar因为本地没有相关环境,生产环境又处于内网,这里就不提供相关运行成功的代码,对接华为大数据平台的Kafka其实很简单,只是网上的教程很少而已。如有这方面的问题,可以在本文留言,看到了就会回复。{cloud title="示例下载" type="default" url="https://www.gitlink.org.cn/yanqs/huawei-mrs-kafka-demo" password=""/}
2023年08月29日
439 阅读
0 评论