Spring Boot / Kafka

単体テスト (Junit5)

build.gradleに追記。

testImplementation "org.springframework.kafka:spring-kafka-test"

テストコード。

// 9092ポートだとローカルで起動してるものと被っていやなので9992にしている。
// ブローカのポートは指定しないとランダムになるっぽい?
@EmbeddedKafka(topics = ["testTopic"], brokerProperties = ["listeners=PLAINTEXT://localhost:9992"])
@ActiveProfiles("unit")
class KafkaTest {
    @Autowired
    private lateinit var template: KafkaTemplate<String, String>
}

// KafkaTemplateの設定
@Configuration
@EnableKafka
class Config {
    @Value("\${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
    private lateinit var brokerAddresses: String

    @Bean
    fun kpf(): ProducerFactory<String, String> {
        val configs = HashMap<String, Any>()
        configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = this.brokerAddresses
        configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        return DefaultKafkaProducerFactory(configs)
    }

    @Bean
    fun kt(): KafkaTemplate<String, String> {
        return KafkaTemplate(kpf())
    }
}

Ref: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinTests.kt https://github.com/spring-projects/spring-kafka/blob/ee240006b3ddc2117ac5205a52f80000e094875d/src/reference/asciidoc/testing.adoc

application-unit.yml

spring:
  kafka:
    # ここが大事。組み込みKafkaを指定している。
    bootstrap-servers: ${spring.embedded.kafka.brokers}

    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
      enable-auto-commit: false

    listener:
      ack-mode: COUNT_TIME
      ack-count: 100
      ack-time: 10000

Ref: @EmbeddedKafkaについて

@SpringBootTest
@SpringJUnitConfig
@EmbeddedKafka(topics = ["testTopic"])
@ActiveProfiles("unit")
class MyTest {
    // 内側に@Configurationがあると@Autowiredがうまく動かない
    @Configration
    class Config{
        @Bean
        fun a() {}
    }
}

Spring+WebFluxでKafkaのメッセージをMySQLに保存する

VSCode拡張を入れる

  • Java Extension Pack
  • Gradle Language Support
  • kotlin
  • Spring Boot Extension Pack

Spring Initializr

  • Project - Gradle Project
  • Language - Kotlin
  • Spring Boot - 2.2.0(SNAPSHOT)
  • Dependencies - MySQL driver, Spring for Apache Kafka

javacが入っていない場合はJDKをインストールする。

apt install -y default-jdk
gradle init