SpringBoot集成Kafka

  1. 简单案例
    1. 引用包
    2. 配置
    3. 发送消息
    4. 监听消息
  2. 双kafka案例
    1. 配置文件
    2. 代码配置
    3. 使用

简单案例

引用包

<!-- 设置了版本号,有可能会报错ClassNotFound -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

配置

spring:
  kafka:
  topic: testTopic
  bootstrap-servers: 127.0.0.1:9092
  producer:
    retries: 0
    batch-size: 50
    buffer-memory: 6554432
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer
    properties:
      max:
        request:
          size: 5242880
      linger.ms: 1

发送消息

@Slf4j
@Component
public class MessageService{
    
    @Autowired
    private KafkaTemplate kafkaTemplate;
    @Value("${spring.kafka.topic}")
    private String topics;
    
    public void sendSyncMessage(String resourceName){
        try{
            kafkaTemplate.send(topics,resourceName);
        } catch (Exception e) {
            log.error("message send failed, error={}", e);
        }
    }
}

监听消息

@Slf4j
@Component
public class MessageListener {

    @KafkaListener(topics = {"testTopic"}, groupId = "testGroupId")
    public void annul1(ConsumerRecord<String, String> record) {
        log.info("groupId = myContainer2, message = " + record.toString());
    }
}

双kafka案例

  • SpringBoot多kafka配置

    配置文件

    spring:
    kafka:
      kafka1:
        bootstrap-servers: 127.0.0.1:9092
        producer:
          retries: 0
          batch-size: 50
          buffer-memory: 6554432
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          properties:
            max:
              request:
                size: 5242880
            linger.ms: 1
      kafka2:
        bootstrap-servers: 127.0.0.1:9092
        producer:
          retries: 0
          batch-size: 50
          buffer-memory: 6554432
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          properties:
            max:
              request:
                size: 5242880
            linger.ms: 1
    

代码配置

  • 实例1配置

    @Configuration
    @EnableKafka
    public class Kafka1Config {
    
      @Bean("kafka1ExtListenerKafkaProperties")
      @Primary
      @ConfigurationProperties(prefix = "spring.kafka.kafka1")
      public KafkaProperties kafka1ExtListenerKafkaProperties() {
          return new KafkaProperties();
      }
    
      @Bean("kafka1ListenerContainerFactory")
      @Primary
      KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
          ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
          factory.setConsumerFactory(consumerFactory());
          factory.setConcurrency(3);
          factory.getContainerProperties().setPollTimeout(3000);
          return factory;
      }
    
      private ConsumerFactory<Integer, String> consumerFactory() {
          return new DefaultKafkaConsumerFactory<>(consumerConfigs());
      }
    
      private Map<String, Object> consumerConfigs() {
          return kafka1ExtListenerKafkaProperties().buildConsumerProperties();
      }
    
      @Bean("kafkaTemplate")
      @Primary
      public KafkaTemplate<String, String> kafkaTemplate() {
          return new KafkaTemplate<>(producerFactory());
      }
    
      private ProducerFactory<String, String> producerFactory() {
          DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
          return producerFactory;
      }
    
      private Map<String, Object> producerConfigs() {
          return kafka1ExtListenerKafkaProperties().buildProducerProperties();
      }
    }
    
  • 实例2配置

    @Configuration
    @EnableKafka
    public class Kafka2Config {
    
      @Bean("kafka2ListenerKafkaProperties")
      @ConfigurationProperties(prefix = "spring.kafka.kafka2")
      public KafkaProperties kafka2ListenerKafkaProperties() {
          return new KafkaProperties();
      }
    
      @Bean("kafka2ListenerContainerFactory")
      public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafka2ListenerContainerFactory() {
          ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
          factory.setConsumerFactory(consumerFactory());
          factory.setConcurrency(3);
          factory.getContainerProperties().setPollTimeout(3000);
          return factory;
      }
    
      /**
       * 消费者工厂的bean
       *
       * @return
       */
      private ConsumerFactory<Integer, String> consumerFactory() {
          return new DefaultKafkaConsumerFactory<>(consumerConfigs());
      }
    
      private Map<String, Object> consumerConfigs() {
          return kafka2ListenerKafkaProperties().buildConsumerProperties();
      }
    
      @Bean("kafka2Template")
      public KafkaTemplate<String, String> kafka2Template() {
          return new KafkaTemplate<>(producerFactory());
      }
    
      private ProducerFactory<String, String> producerFactory() {
          DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
          return producerFactory;
      }
    
      private Map<String, Object> producerConfigs() {
          return kafka2ListenerKafkaProperties().buildProducerProperties();
      }
    }
    

使用

@Slf4j
@Service
public class MessageSendService{
    @Resource(name = "kafkaTemplate")
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Resource(name = "kafkaTemplateForMonitor")
    private KafkaTemplate<String, Object> kafkaTemplateForMonitor;

    public boolean kafka1Send(String topic, String message) {
        try {
            if (kafkaMq) {
                kafkaTemplate.send(topic, message);
            }
        } catch (Exception e) {
            log.warn("kafka1 发送kafka消息失败 topic={},error={}",topic,e);
        }
        return true;
    }
    
    public boolean kafka2Send(String topic, String message){
        try {
            kafkaTemplateForMonitor.send(topic, message);
        } catch (Exception e) {
            log.warn("kafka2 发送kafka消息失败 topic={},error={}",topic,e);
        }
        return true;
    }
}

文章标题:SpringBoot集成Kafka

字数:719

本文作者:imfan

发布时间:2021-02-03, 16:06:00

最后更新:2021-08-19, 09:37:57

原始链接:https://wiki.aistart.cc/2021/02/03/backend/mq/kafka/

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

×

喜欢就点赞,疼爱就打赏