SpringBoot 对接华为大数据平台kerberos认证的Kafka数据
侧边栏壁纸
  • 累计撰写 190 篇文章
  • 累计收到 1,269 条评论

SpringBoot 对接华为大数据平台kerberos认证的Kafka数据

萧瑟
2023-08-29 / 0 评论 / 447 阅读 / 正在检测是否收录...
温馨提示:
本文最后更新于2023年08月29日,已超过463天没有更新,若内容或图片失效,请留言反馈。

最近项目中有用到kafka来同步某业务核心数据,通过kafka来进行数据的实时更新。

原本使用的是Apache kafka 来进行订阅消费,后因业务方信创要求,更换了华为大数据平台的Kafka数据源,采用kerberos认证。对接了一天多,数据才接入成功,网上相关文档比较少,这里我总结一下,为后人少踩坑。

本次使用的是华为MRS3.1.2版本,其他版本应该都类似。

修改项目中的pom.xml依赖,将默认的apache的kafka-client包替换为华为自带的。

pom修改

如果拉去不到相关依赖包,请更换为华为Maven镜像,配置信息如下。

<repositories>
    <repository>
        <id>huawei-cloud-sdk</id>
        <name>HuaWei Cloud Mirrors</name>
        <url>https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>huawei-mirrors</id>
        <name>HuaWei Mirrors</name>
        <url>https://repo.huaweicloud.com/repository/maven/</url>
    </repository>
</repositories>

我们需要在配置文件中增加新的配置文件,不要使用spring.kafka相关配置,建议自己新增一个配置信息,本文以huawei.mrs.kafka为例。

huawei:
  mrs:
    kafka:
      enable: false
      bootstrap-servers: 10.244.231.2:21007,10.244.230.202:21007,10.244.230.125:21007
      security:
        protocol: SASL_PLAINTEXT
      kerberos:
        domain:
          name: hadoop.hadoop_651_arm.com
      sasl:
        kerberos:
          service:
            name: kafka

新建一个配置文件HuaWeiMrsKafkaConfiguration,配置相关信息

@Value("${huawei.mrs.kafka.enable}")
public Boolean enable;
@Value("${huawei.mrs.kafka.bootstrap-servers}")
public String boostrapServers;

@Value("${huawei.mrs.kafka.security.protocol}")
public String securityProtocol;

@Value("${huawei.mrs.kafka.kerberos.domain.name}")
public String kerberosDomainName;

@Value("${huawei.mrs.kafka.sasl.kerberos.service.name}")
public String kerberosServiceName;

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<String, String> template) {

    ConcurrentKafkaListenerContainerFactory<Object, Object> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    //禁止消费者自启动,达到动态启动消费者的目的
    factory.setAutoStartup(enable);
    return factory;
}


@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
    Map<String, Object> configs = new HashMap<>();
    configs.put("security.protocol", securityProtocol);
    configs.put("kerberos.domain.name", kerberosDomainName);
    configs.put("bootstrap.servers", boostrapServers);
    configs.put("sasl.kerberos.service.name", kerberosServiceName);
    configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    return new DefaultKafkaConsumerFactory<>(configs);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    Map<String, Object> configs = new HashMap<>();
    configs.put("security.protocol", securityProtocol);
    configs.put("kerberos.domain.name", kerberosDomainName);
    configs.put("bootstrap.servers", boostrapServers);
    configs.put("sasl.kerberos.service.name", kerberosServiceName);
    configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configs);
    return new KafkaTemplate<>(producerFactory);
}

@Bean
public RecordMessageConverter converter() {
    return new StringJsonMessageConverter();
}

再新建一个全局消费kafka数据的KafkaConsumer

@Slf4j
@Component
public class KafkaConsumer {

    @KafkaListener(topics = "topic1")
    public void topicMessage(ConsumerRecord<?, ?> record) {
        log.info("Kafka->topic:‘topic1’-->{}", record.value());
        //TODO:业务逻辑
    }
}

配置kerberos信息,也是非常重要的一步,配置kerberos认证文件,本文将以最简单的教程为例,在启动脚本中配置。

为什么不在项目中配置呢?

因为各位的项目都各自不同,放在resources目录中,有的时候会读取不到,所以我们以最简单的例子来作为讲解。

打包项目,将打包好的jar放置在服务器中。并将下载安全集群认证用户的krb5.conf和user.keytab文件放置跟jar包相同目录或者自定义一个目录(本文以/data/java/huawei-mrs-kafka为例)。

在这个文件夹或者部署目录通缉新建一个jaas.conf文件,将keyTab项修改为绝对路径。principal则是华为大数据平台提供的账号。

Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/data/java/huawei-mrs-kafka/user.keytab"
principal="developuser@HADOOP_651_ARM.COM"
useTicketCache=false
storeKey=true
debug=true;
};

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/data/java/huawei-mrs-kafka/user.keytab"
principal="developuser@HADOOP_651_ARM.COM"
useTicketCache=false
storeKey=true
debug=true;
};

最后一步,运行jar包,指定krb5.confjaas.conf环境变量。

java -jar -Djava.security.krb5.conf=/data/java/huawei-mrs-kafka/krb5.conf -Djava.security.auth.login.config=/data/java/huawei-mrs-kafka/jaas.conf huawei-mrs-kafka-1.0-SNAPSHOT.jar

因为本地没有相关环境,生产环境又处于内网,这里就不提供相关运行成功的代码,对接华为大数据平台的Kafka其实很简单,只是网上的教程很少而已。如有这方面的问题,可以在本文留言,看到了就会回复。

评论 (0)

取消